RabbitMQ第五篇:Spring集成RabbitMQ
2017-07-20 17:47
666 查看
前面几篇讲解了如何使用rabbitMq,这一篇主要讲解spring集成rabbitmq。
首先引入配置文件org.springframework.amqp,如下
amqp-share.xml
其中
现在我们启动程序,效果如下
一定要注意实现
下面我们配置消费者
其中
现在只需要启动程序即可运行
程序执行结果
当然direct跟上面的情况差不多,只不过这个是根据路由匹配,先把数据发送到交换机,然后绑定路由和队列,通过交换机id和路由来找到队列,下面是一些主要的配置
ampq-share.xml
amqp-producer.xml
下面是消费者监听配置
amqp-consumer.xml
下面是程序
实际情况可能需要我们去分离消费者和生成者的程序。当然spring还有负载均衡的配置,这里就不多介绍了。
首先引入配置文件org.springframework.amqp,如下
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.7.3.RELEASE</version> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> <version>1.9.13</version> </dependency>
一:配置消费者和生成者公共部分
rabbitmq.propertiesrabbit.hosts=10.50.200.234 rabbit.username=june rabbit.password=june rabbit.port=5672 rabbit.virtualHost=/ # 统一XML配置中易变部分的命名 rabbit.queue=rabbitmq_test
amqp-share.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:util="http://www.springframework.org/schema/util" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <context:property-placeholder location="classpath:rabbitmq.properties"/> <util:properties id="appConfig" location="classpath:rabbitmq.properties"></util:properties> <rabbit:connection-factory id="connectionFactory" host="${rabbit.hosts}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}" virtual-host="${rabbit.virtualHost}" channel-cache-size="50"/> <rabbit:admin connection-factory="connectionFactory"/> <!--定义消息队列--> <rabbit:queue name="spittle.alert.queue.1" durable="true" auto-delete="false"/> <rabbit:queue name="spittle.alert.queue.2" durable="true" auto-delete="false"/> <rabbit:queue name="spittle.alert.queue.3" durable="true" auto-delete="false"/> <!--绑定队列--> <rabbit:fanout-exchange id="spittle.fanout" name="spittle.fanout" durable="true"> <rabbit:bindings> <rabbit:binding queue="spittle.alert.queue.1"></rabbit:binding> <rabbit:binding queue="spittle.alert.queue.2"></rabbit:binding> <rabbit:binding queue="spittle.alert.queue.3"></rabbit:binding> </rabbit:bindings> </rabbit:fanout-exchange> </beans>
二:配置生成者
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:util="http://www.springframework.org/schema/util" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <import resource="amqp-share.xml"/> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean> <!--创建消息队列模板--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="spittle.fanout" message-converter="jsonMessageConverter" /> </beans>
三:生产者程序
/** * june.mq:com.june.mq.rabbit.spring.Spittle.java * 日期:2017年7月13日 */ package com.june.mq.rabbit.spring; import java.io.Serializable; import java.util.Date; /** * Spittle <br> * * @author 王俊伟 wjw.happy.love@163.com * @blog https://www.github.com/junehappylove * @date 2017年7月13日 下午3:42:21 * @version 1.0.0 */ public class Spittle implements Serializable { private static final long serialVersionUID = 1L; private Long id; private Spitter spitter; private String message; private Date postedTime; public Spittle(Long id, Spitter spitter, String message, Date postedTime) { this.id = id; this.spitter = spitter; this.message = message; this.postedTime = postedTime; } public Long getId() { return this.id; } public String getMessage() { return this.message; } public Date getPostedTime() { return this.postedTime; } public Spitter getSpitter() { return this.spitter; } } /** * june.mq:com.june.mq.rabbit.spring.ProducerMain.java * 日期:2017年7月13日 */ package com.june.mq.rabbit.spring; import java.util.Date; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * ProducerMain <br> * * @author 王俊伟 wjw.happy.love@163.com * @blog https://www.github.com/junehappylove * @date 2017年7月13日 下午3:44:02 * @version 1.0.0 */ public class ProducerMain { private static ApplicationContext context; /** * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { context = new ClassPathXmlApplicationContext("amqp/amqp-producer.xml"); AmqpTemplate template = (AmqpTemplate) context.getBean("rabbitTemplate"); for (int i = 0; i < 10; i++) { System.out.println("Sending message #" + i); Spittle spittle = new Spittle((long) i, null, "Hello world (" + i + ")", new Date()); template.convertAndSend(spittle); Thread.sleep(3000); } System.out.println("Done!"); } }
其中
convertAndSend方法默认第一个参数是交换机名称,第二个参数是路由名称,第三个才是我们发送的数据
现在我们启动程序,效果如下
七月 20, 2017 4:01:04 下午 org.springframework.context.support.ClassPathXmlApplicationContext prepareRefresh 信息: Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@22d8cfe0: startup date [Thu Jul 20 16:01:04 CST 2017]; root of context hierarchy 七月 20, 2017 4:01:04 下午 org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions 信息: Loading XML bean definitions from class path resource [amqp/amqp-producer.xml] 七月 20, 2017 4:01:04 下午 org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions 信息: Loading XML bean definitions from class path resource [amqp/amqp-share.xml] 七月 20, 2017 4:01:04 下午 org.springframework.context.support.DefaultLifecycleProcessor start 信息: Starting beans in phase -2147482648 Sending message #0 七月 20, 2017 4:01:04 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory createBareConnection 信息: Created new connection: connectionFactory#2638011:0/SimpleConnection@c03cf28 [delegate=amqp://june@10.50.200.234:5672/, localPort= 57422] Sending message #1 Sending message #2 Sending message #3 Sending message #4 Sending message #5 Sending message #6 Sending message #7 Sending message #8 Sending message #9 Done!
四:消费者程序
首先编写一个用于监听生产者发送信息的代码package com.june.mq.rabbit.spring; import java.io.UnsupportedEncodingException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; /** * * SpittleAlertHandler <br> * * @author 王俊伟 wjw.happy.love@163.com * @blog https://www.github.com/junehappylove */ public class SpittleAlertHandler implements MessageListener { @Override public void onMessage(Message message) { try { String body = new String(message.getBody(), "UTF-8"); System.out.println(body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } }
一定要注意实现
MessageListener,我们只需要获取message的body即可,通过json来转换我们需要的程序(比如我们可以发送一个map,map存放方法和实体,这样我们可以通过反射来调用不同的程序来运行)。
下面我们配置消费者
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:util="http://www.springframework.org/schema/util" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <import resource="amqp-share.xml"/> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="spittleListener" method="onMessage" queues="spittle.alert.queue.1,spittle.alert.queue.3,spittle.alert.queue.2"/> </rabbit:listener-container> <bean id="spittleListener" class="com.june.mq.rabbit.spring.SpittleAlertHandler"/> </beans>
其中
spittleListener是监听的程序,method是执行的方法,queues是我们监听的队列,多个队列可以逗号隔开(因为我们采用的是分发,所以三个队列获取的消息是相同的,这里为了简便我放在一个监听程序中了,其实我们可以写三个消费者,每个消费者监听一个队列)
现在只需要启动程序即可运行
/** * june.mq:com.june.mq.rabbit.spring.ConsumerMain.java * 日期:2017年7月13日 */ package com.june.mq.rabbit.spring; import org.springframework.context.ApplicationContext; /** * ConsumerMain <br> * * @author 王俊伟 wjw.happy.love@163.com * @blog https://www.github.com/junehappylove */ public class ConsumerMain { static ApplicationContext context = null; /** * @param args */ public static void main(String[] args) { context = new org.springframework.context.support.ClassPathXmlApplicationContext("amqp/amqp-consumer.xml"); } }
程序执行结果
七月 20, 2017 4:09:05 下午 org.springframework.context.support.ClassPathXmlApplicationContext prepareRefresh 信息: Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@22d8cfe0: startup date [Thu Jul 20 16:09:05 CST 2017]; root of context hierarchy 七月 20, 2017 4:09:05 下午 org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions 信息: Loading XML bean definitions from class path resource [amqp/amqp-consumer.xml] 七月 20, 2017 4:09:05 下午 org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions 信息: Loading XML bean definitions from class path resource [amqp/amqp-share.xml] 七月 20, 2017 4:09:06 下午 org.springframework.context.support.DefaultLifecycleProcessor start 信息: Starting beans in phase -2147482648 七月 20, 2017 4:09:06 下午 org.springframework.context.support.DefaultLifecycleProcessor start 信息: Starting beans in phase 2147483647 七月 20, 2017 4:09:06 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory createBareConnection 信息: Created new connection: connectionFactory#40727802:0/SimpleConnection@5cf96df5 [delegate=amqp://june@10.50.200.234:5672/, localPort= 57477] {"id":0,"spitter":null,"message":"Hello world (0)","postedTime":1500537664622} {"id":0,"spitter":null,"message":"Hello world (0)","postedTime":1500537664622} {"id":0,"spitter":null,"message":"Hello world (0)","postedTime":1500537664622} {"id":1,"spitter":null,"message":"Hello world (1)","postedTime":1500537667743} {"id":1,"spitter":null,"message":"Hello world (1)","postedTime":1500537667743} {"id":1,"spitter":null,"message":"Hello world (1)","postedTime":1500537667743} {"id":2,"spitter":null,"message":"Hello world (2)","postedTime":1500537670744} {"id":2,"spitter":null,"message":"Hello world (2)","postedTime":1500537670744} {"id":2,"spitter":null,"message":"Hello world (2)","postedTime":1500537670744} {"id":3,"spitter":null,"message":"Hello world (3)","postedTime":1500537673745} {"id":3,"spitter":null,"message":"Hello world (3)","postedTime":1500537673745} {"id":3,"spitter":null,"message":"Hello world (3)","postedTime":1500537673745} {"id":4,"spitter":null,"message":"Hello world (4)","postedTime":1500537676747} {"id":4,"spitter":null,"message":"Hello world (4)","postedTime":1500537676747} {"id":4,"spitter":null,"message":"Hello world (4)","postedTime":1500537676747} {"id":5,"spitter":null,"message":"Hello world (5)","postedTime":1500537679748} {"id":5,"spitter":null,"message":"Hello world (5)","postedTime":1500537679748} {"id":5,"spitter":null,"message":"Hello world (5)","postedTime":1500537679748} {"id":6,"spitter":null,"message":"Hello world (6)","postedTime":1500537682748} {"id":6,"spitter":null,"message":"Hello world (6)","postedTime":1500537682748} {"id":6,"spitter":null,"message":"Hello world (6)","postedTime":1500537682748} {"id":7,"spitter":null,"message":"Hello world (7)","postedTime":1500537685748} {"id":7,"spitter":null,"message":"Hello world (7)","postedTime":1500537685748} {"id":7,"spitter":null,"message":"Hello world (7)","postedTime":1500537685748} {"id":8,"spitter":null,"message":"Hello world (8)","postedTime":1500537688748} {"id":8,"spitter":null,"message":"Hello world (8)","postedTime":1500537688748} {"id":8,"spitter":null,"message":"Hello world (8)","postedTime":1500537688748} {"id":9,"spitter":null,"message":"Hello world (9)","postedTime":1500537691748} {"id":9,"spitter":null,"message":"Hello world (9)","postedTime":1500537691748} {"id":9,"spitter":null,"message":"Hello world (9)","postedTime":1500537691748}
当然direct跟上面的情况差不多,只不过这个是根据路由匹配,先把数据发送到交换机,然后绑定路由和队列,通过交换机id和路由来找到队列,下面是一些主要的配置
ampq-share.xml
<rabbit:queue id="spring-test-queue1" durable="true" auto-delete="false" exclusive="false" name="spring-test-queue1"></rabbit:queue> <rabbit:queue name="spring-test-queue2" durable="true" auto-delete="false" exclusive="false"></rabbit:queue> <!--交换机定义--> <!--rabbit:direct-exchange:定义exchange模式为direct,意思就是消息与一个特定的路由键完全匹配,才会转发。 rabbit:binding:设置消息queue匹配的key--> <rabbit:direct-exchange name="${rabbit.exchange.direct}" durable="true" auto-delete="false" id="${rabbit.exchange.direct}"> <rabbit:bindings> <rabbit:binding queue="spring-test-queue1" key="spring.test.queueKey1"/> <rabbit:binding queue="spring-test-queue2" key="spring.test.queueKey2"/> </rabbit:bindings> </rabbit:direct-exchange>
amqp-producer.xml
<!--spring template声明--> <rabbit:template exchange="${rabbit.exchange.direct}" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter"></rabbit:template> <!--消息对象转成成json--> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>
下面是消费者监听配置
amqp-consumer.xml
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <rabbit:listener queues="spring-test-queue1" method="onMessage" ref="queueListenter"></rabbit:listener> </rabbit:listener-container> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <rabbit:listener queues="spring-test-queue2" method="onMessage" ref="queueListenter"></rabbit:listener> </rabbit:listener-container>
下面是程序
public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("amqp/amqp-producer.xml"); MQProducer mqProducer = (MQProducer) context.getBean("mqProducer"); mqProducer.sendDataToQueue("spring.test.queueKey1", "Hello World spring.test.queueKey1"); mqProducer.sendDataToQueue("spring.test.queueKey2", "Hello World spring.test.queueKey2"); }
实际情况可能需要我们去分离消费者和生成者的程序。当然spring还有负载均衡的配置,这里就不多介绍了。
站在巨人肩膀上整理的,原作者不详了 ---- 2017-07-20 junehappylove
相关文章推荐
- rabbitMQ第五篇:Spring集成RabbitMQ
- rabbitMQ第五篇:Spring集成RabbitMQ
- rabbitMQ第五篇:Spring集成RabbitMQ
- springboot集成rabbitMQ之对象传输
- spring集成rabbitmq
- spring boot 集成rabbitmq(9步搞定)
- spring集成rabbitMq(基于topic和fanout模式)
- Spring集成RabbitMQ
- 消息中间件系列四:RabbitMQ与Spring集成
- springboot集成rabbitMQ之对象传输的方法
- RabbitMQ学习以及与Spring的集成(一)
- rabbitMq集成Spring后,消费者设置手动ack,并且在业务上控制是否ack
- Spring Boot 集成 RabbitMq 实战操作(二)
- Spring boot集成Quartz与RabbitMQ
- Spring集成RabbitMQ
- Spring Cloud 集成 RabbitMQ
- Spring Boot(十三)RabbitMQ安装与集成
- RabbitMQ学习以及与Spring的集成(二)
- rabbitMQ与Spring集成
- rabbitMq集成Spring后,消费者设置手动ack,并且在业务上控制是否ack