Spring集成RabbitMQ
2018-01-11 17:40
239 查看
一、什么是 RabbitMQ
RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 RabbitMQ 是由 Erlang 语言开发,安装 RabbitMQ 服务需要先安装 Erlang 语言包。在 CentOS7上安装 RabbitMQ 请点击:http://www.cnblogs.com/libra0920/p/7920698.html二、如何与 Spring 集成
1. 我们都需要哪些 Jar 包?抛开单独使用 Spring 的包不说,引入 RabbitMQ 我们还需要两个:<!-- RabbitMQ --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.5.1</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.5.RELEASE</version> </dependency>
2. 使用外部参数文件 application.properties:
mq.host=127.0.0.1 mq.username=queue mq.password=1234 mq.port=8001 # 统一XML配置中易变部分的命名 mq.queue=test_mq
易变指的是在实际项目中,如果测试与生产环境使用的同一个 RabbitMQ 服务器。那我们在部署时直接修改 properties 文件的参数即可,防止测试与生产环境混淆。 修改 applicationContext.xml 文件,引入我们创建的 properties 文件
<context:property-placeholder location="classpath:application.properties"/> <util:properties id="appConfig" location="classpath:application.properties"></util:properties>3. 连接 RabbitMQ 服务器
<!-- 连接配置 --> <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}" /> <rabbit:admin connection-factory="connectionFactory"/>4. 声明一个 RabbitMQ Template
<rabbit:template id="amqpTemplate" exchange="${mq.queue}_exchange" connection-factory="connectionFactory" />5. 在 applicationContext.xml 中声明一个交换机。<rabbit:topic-exchange> 标签的 name 属性就是在 RabbitMQ 服务器配置交换机的 name 值。
<rabbit:binding> 标签的 queue 属性是下面 <queue> 标签的 id 属性。<rabbit:binding> 标签的 pattern 属性是在 RabbitMQ 服务器配置交换机与队列绑定时的 Routing key 值(路由)。
<rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/> </rabbit:bindings> </rabbit:topic-exchange>交换机的四种模式:direct:转发消息到 routigKey 指定的队列。
topic:对 key 进行模式匹配,比如ab*可以传到到所有 ab* 的 queue。
headers:(这个还没有接触到)
fanout:转发消息到所有绑定队列,忽略 routigKey
交换器的属性:持久性:如果启用,交换器将会在server重启前都有效。
自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身。
惰性:如果没有声明交换器,那么在执行到使用的时候会导致异常,并不会主动声明。
如果没有队列绑定在交换机上,则发送到该交换机上的消息会丢失。 一个交换机可以绑定多个队列,一个队列可以被多个交换机绑定。 topic 类型交换器通过模式匹配分析消息的 routing-key 属性。它将 routing-key 和 binding-key 的字符串切分成单词。这些单词之间用点隔开。它同样也会识别两个通配符:#匹配0个或者多个单词,*匹配一个单词。例如,binding key:*.stock.#匹配 routing key:usd.stcok 和 eur.stock.db,但是不匹配 stock.nana。 因为交换器是命名实体,声明一个已经存在的交换器,但是试图赋予不同类型是会导致错误。客户端需要删除这个已经存在的交换器,然后重新声明并且赋予新的类型。 6. 在 applicationContext.xml 中声明一个队列。<rabbit:queue> 标签的 name 属性就是在 RabbitMQ 服务器中 Queue 的 name。
<rabbit:queue> 标签的 id 属性是上面 <rabbit:binding> 标签的 queue 属性。
<rabbit:queue id="test_queue" name="${mq.queue}_testQueue" durable="true" auto-delete="false" exclusive="false" />durable:是否持久化
exclusive:仅创建者可以使用的私有队列,断开后自动删除
auto-delete:当所有消费端连接断开后,是否自动删除队列
7. 创建生产者端
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @Description: 消息队列发送者 * @Author: * @CreateTime: */ @Service public class Producer { @Autowired private AmqpTemplate amqpTemplate; public void sendQueue(String exchange_key, String queue_key, Object object) { // convertAndSend 将Java对象转换为消息发送至匹配key的交换机中Exchange amqpTemplate.convertAndSend(exchange_key, queue_key, object); } }
8. 在 applicationContext.xml 中配置监听及消费者端
<!-- 消费者 --> <bean name="rabbitmqService" class="com.enh.mq.RabbitmqService"></bean> <!-- 配置监听 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <!-- queues 监听队列,多个用逗号分隔 ref 监听器 --> <rabbit:listener queues="test_queue" ref="rabbitmqService"/> </rabbit:listener-container>
消费者 Java 代码:
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class RabbitmqService implements MessageListener { public void onMessage(Message message) { System.out.println("消息消费者 = " + message.toString()); } }
至此,我们的所有配置文件就写完了,最终如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- RabbitMQ start -->
<!-- 连接配置 --> <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}" /> <rabbit:admin connection-factory="connectionFactory"/>
<!-- 消息队列客户端 -->
<rabbit:template id="amqpTemplate" exchange="${mq.queue}_exchange" connection-factory="connectionFactory" />
<!-- queue 队列声明 -->
<!--
durable 是否持久化
exclusive 仅创建者可以使用的私有队列,断开后自动删除
auto-delete 当所有消费端连接断开后,是否自动删除队列 -->
<rabbit:queue id="test_queue" name="${mq.queue}_testQueue" durable="true" auto-delete="false" exclusive="false" />
<!-- 交换机定义 -->
<!--
交换机:一个交换机可以绑定多个队列,一个队列也可以绑定到多个交换机上。
如果没有队列绑定到交换机上,则发送到该交换机上的信息则会丢失。
direct模式:消息与一个特定的路由器完全匹配,才会转发
topic模式:按模式匹配
-->
<rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<!-- 设置消息Queue匹配的pattern (direct模式为key) -->
<rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<bean name="rabbitmqService" class="com.enh.mq.RabbitmqService"></bean>
<!-- 配置监听 消费者 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<!--
queues 监听队列,多个用逗号分隔
ref 监听器 -->
<rabbit:listener queues="test_queue" ref="rabbitmqService"/>
</rabbit:listener-container>
</beans>
9. 如何使用 RabbitMQ 发送一个消息
@Autowired private Producer producer; @Value("#{appConfig['mq.queue']}") private String queueId; /** * @Description: 消息队列 * @Author: * @CreateTime: */ @ResponseBody @RequestMapping("/sendQueue") public String testQueue() { try { Map<String, Object> map = new HashMap<String, Object>(); map.put("data", "hello rabbitmq"); // 注意:第二个属性是 Queue 与 交换机绑定的路由 producer.sendQueue(queueId + "_exchange", queueId + "_patt", map); } catch (Exception e) { e.printStackTrace(); } return "发送完毕"; }
嗯。这个是 SpringMVC 框架的 Controller 类。
三、优化 - 将消费者由类级别改为方法级别。
1. 重写消费者 Java 代码import java.util.Map; import org.springframework.stereotype.Service; @Service public class ConsumerService { public void getMessage(Map<String, Object> message) { System.out.println("消费者:" + message); } }
2. 更改 applicationContext 配置文件将:
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <!-- queues 监听队列,多个用逗号分隔 ref 监听器 --> <rabbit:listener queues="test_queue" ref="consumerService" /> </rabbit:listener-container>改为:
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <!-- queues 监听队列,多个用逗号分隔 ref 监听器 --> <rabbit:listener queues="test_queue" ref="consumerService" method="getMessage" /> </rabbit:listener-container>完毕。
问题:rabbitmq rabbitmq集成spring的配置参数疑问
描述:主要是acknowledge、requeue-rejected以及concurrency的参数作用。acknowledge是auto的话是不是会业务处理完再从队列移出?如果发生异常会重新放到队列?这个跟manual有什么差别?requeue-rejected设为false是不是就算异常也不会重新进入队列?concurrency是每个消费者先分配固定的线程?这个是否会自动网上加,比如线程池有100个,先concurrency设成10个,是不是不够的时候会自动增加,直到所有的线程池都满?concurrency在生产环境中是设置比较好还是不设比较好?线程池pool-size已经设置过了![](http://image.codes51.com/Article/image/20170511/20170511003125_5000.jpg)
![](http://image.codes51.com/Article/image/20170511/20170511003125_8281.jpg)
解决方案1:acknowledge设置为auto表示自动确认。不需要手动ack,如果设置为manual的话一般是处理完消息之后再ack,表示这条消息已经被成功处理了。requeue-rejected设为false表示一条消息即使没有被ack,也不会再重新发送。concurrency就是表示有多少个channel,这个是固定的,生产环境是需要配置的。
相关文章推荐
- springboot-26-springboot 集成rabbitmq
- springboot 集成rabbitmq 实例
- springboot中rabbitmq集成——单项目
- spring boot集成rabbitmq的实例教程
- 消息队列RabbitMQ与Spring集成
- 消息队列RabbitMQ与Spring集成
- Spring集成RabbitMQ
- spring boot 集成RabbitMQ
- Spring集成rabbitmq消息中间件
- springboot(集成篇):RabbitMQ集成详解
- rabbitMQ第五篇:Spring集成RabbitMQ
- Spring Cloud(十一)高可用的分布式配置中心 Spring Cloud Bus 消息总线集成(RabbitMQ)
- Spring Boot (十三)集成RabbitMQ
- Spring集成RabbitMQ-使用RabbitMQ更方便
- Spring集成消息队列RabbitMQ
- Java SpringBoot集成RabbitMq实战和总结
- spring集成rabbitmq
- spring-boot 集成 rabbitmq
- Spring集成消息队列RabbitMQ(消息失败处理)
- RabbitMQ学习以及与Spring的集成(二)