activeMQ(二)--spring整合activeMQ
2018-03-17 12:29
316 查看
### 工程结构
config.properties
log4j.properties
生产者
消费者
测试
运行生产者,以后,可以看到发送了一条消息等待消费,对ActiveMQ自带Web管控台不熟悉的,可以参考上一篇 http://blog.csdn.net/u013041642/article/details/79547128
运行消费者
消费者中我们写了一个死循环,使线程持续运行,接受消息。如果不循环,只运行一次,那每次只能消费一条消息。
activemq.xml中增加监听容器的配置, 需要设置监听的地址和监听器的bean.
测试代码
运行生产者,加载配置文件的时候,我们配好的监听容器同时被加载,所以生产消息和监听消息同时进行。
todo: 队列消息的监听器,加载以后可以一次性接受之前生产的未消费的所有队列消息。但是主题消息的监听器,不能收到监听器加载之前的消息。有待研究。
消息监听器除了MessageListener还有SessionAwareMessageListener和MessageListenerAdapter,后面再总结。
pom
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring-version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring-version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.13.2</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> <version>${spring-version}</version> </dependency> </dependencies>
配置文件
activemq.xml<context:component-scan base-package="com.susq" /> <context:property-placeholder location="config.properties" /> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${activemq.url}"/> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等,设置它为队列模式 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory" /> <!-- 非pub/sub模型(发布/订阅),即队列模式 --> <property name="pubSubDomain" value="false" /> </bean> <!-- 设置它为Topic类型 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="connectionFactory" /> <!-- pub/sub模型(发布/订阅) --> <property name="pubSubDomain" value="true" /> </bean> <!--设置队列消息目的地--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <!-- 构造参数的值是队列的名字 --> <value>${activemq.queue}</value> </constructor-arg> </bean> <!--设置主题消息目的地--> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="${activemq.topic}"/> </bean>
config.properties
activemq.url=tcp://localhost:61616 activemq.queue=su.queue activemq.topic=su.topic
log4j.properties
log4j.rootLogger=ERROR,console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%c]-[%p] %m%n #给自己项目设置日志级别 log4j.logger.com.susq=DEBUG
工程代码
JmsTemplate 是spring 提供的简化了同步JMS访问代码的帮助类。文档:https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/core/JmsTemplate.html生产者
package com.susq.service.impl; import com.susq.service.ProducerService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; import javax.jms.Destination; import javax.jms.Session; @Component("producerService") @Slf4j public class ProducerServiceImpl implements ProducerService { @Autowired @Qualifier("jmsQueueTemplate") private JmsTemplate jmsTemplate; public void sendMessage(Destination destination, final String message) { log.debug("---------------生产者发送消息-----------------"); log.debug("---------------生产者发了一个消息:" + message); /* send()方法第二个参数是MessageCreator类型,它里面只有一个方法,所以他是一个函数式接口,支持lambda表达式 public interface MessageCreator { Message createMessage(Session var1) throws JMSException; } 为了防止“函数是接口” 变成"非函数接口”,我们可以在这个上面加上一个声明@FunctionalInterface, 这样别人就无法在里面添加新的接口函数了。 */ jmsTemplate.send(destination, (Session session) -> session.createTextMessage(message)); } }
消费者
package com.susq.service.impl; import com.susq.service.ConsumerService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Component; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.TextMessage; @Slf4j @Component("consumerService") public class ConsumerServiceImpl implements ConsumerService { @Autowired @Qualifier("jmsQueueTemplate") private JmsTemplate jmsTemplate; public void receiveMessage(Destination destination) { log.debug("------------消费消息--------------"); while (true) { try { //使用JMSTemplate接收消息 TextMessage txtmsg = (TextMessage) jmsTemplate.receive(destination); if (null != txtmsg) { log.debug("--- 收到消息内容为: " + txtmsg.getText()); } else { break; } } catch (JMSException e) { log.error("错误 {}", e); } } } }
测试
package com.susq; import com.susq.service.ConsumerService; import com.susq.service.ProducerService; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import javax.jms.Destination; /** * Unit test for simple App. */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:activemq.xml") public class AppTest { @Autowired private ProducerService producerService; @Autowired private Destination queueDestination; @Autowired private ConsumerService consumerService; @Test public void testProducer() { producerService.sendMessage(queueDestination, "生产消息"); } @Test public void testConsumer() { consumerService.receiveMessage(queueDestination); } }
运行生产者,以后,可以看到发送了一条消息等待消费,对ActiveMQ自带Web管控台不熟悉的,可以参考上一篇 http://blog.csdn.net/u013041642/article/details/79547128
运行消费者
消费者中我们写了一个死循环,使线程持续运行,接受消息。如果不循环,只运行一次,那每次只能消费一条消息。
5. 消息监听器
上面我们是用 jmsTemplate.receive(destination) 来接受消息。而使用消息监听器是一种更优雅的消息接受方式,只需要实现消息监听器接口MessageListener即可。topic监听器写法也一样。import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * @author susq * @since 2018-03-15-21:18 */ @Slf4j @Component("consumerServiceNew") public class ConsumerServiceNew implements MessageListener { @Override public void onMessage(Message message) { try { log.debug("ConsumerServiceNew接收到消息:"+((TextMessage)message).getText()); } catch (JMSException e) { log.debug("接收消息异常,{}", e); } } }
activemq.xml中增加监听容器的配置, 需要设置监听的地址和监听器的bean.
<!-- 定义Queue监听器 --> <jms:listener-container> <jms:listener destination="${activemq.queue}" ref="consumerServiceNew"/> </jms:listener-container> <!-- 定义topic监听器 --> <jms:listener-container destination-type="topic" > <jms:listener destination="${activemq.topic}" ref="topicConsumer"/> </jms:listener-container>
测试代码
package com.susq; import com.susq.service.ConsumerService; import com.susq.service.ProducerService; import com.susq.service.TopicConsumer; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import javax.jms.Destination; /** * Unit test for simple App. */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:activemq.xml") public class AppTest { @Autowired private ProducerService producerService; @Autowired private Destination queueDestination; @Autowired private Destination topicDestination; @Test public void testProducer() { producerService.sendMessage(queueDestination, "queue生产消息"); } @Test public void testTopicProducer() { producerService.sendMessage(topicDestination, "topic生产消息"); } }
运行生产者,加载配置文件的时候,我们配好的监听容器同时被加载,所以生产消息和监听消息同时进行。
todo: 队列消息的监听器,加载以后可以一次性接受之前生产的未消费的所有队列消息。但是主题消息的监听器,不能收到监听器加载之前的消息。有待研究。
消息监听器除了MessageListener还有SessionAwareMessageListener和MessageListenerAdapter,后面再总结。
相关文章推荐
- ActiveMQ整合Spring
- Spring2.5整合ActiveMQ 5.2(P2P文本消息)
- Spring 整合 ActiveMQ
- Spring整合JMS——基于ActiveMQ实现
- 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例
- Spring JMS 整合Tomcat和ActiveMQ
- Spring整合Jms学习(一)_基于ActiveMQ实现
- Spring整合ActiveMQ实现简单的消息队列
- ActiveMQ和spring整合
- spring activeMQ 整合(三): 确认机制ACK(收到消息后,应该有一个回应也就是确认答复)
- ActiveMQ整合Spring
- Spring2.5整合ActiveMQ 5.2(P2P文本消息)
- 深入浅出JMS--Spring和ActiveMQ整合的完整实例
- JMS(四)--Spring和ActiveMQ整合的完整实例
- Spring整合JMS-基于activeMQ实现(一)
- spring 整合activeMQ
- spring 整合 ActiveMQ
- JMS【四】--Spring和ActiveMQ整合的完整实例
- Spring整合JMS(一)——基于ActiveMQ实现
- spring boot整合activeMQ,实现ptp和topic两者消息模式