消息中间件-activemq实战之整合Spring(四)
2017-05-05 08:20
761 查看
前面的理论准备已经很充分,这一节我们来实战:将activemq整合到Spring框架才行中,因为Spring已经集成了JMS,这也为我们配置activermq带来了方便。
ConnectionFactory: 和jms服务器的连接, 可以是外部的jms server, 也可以使用embedded ActiveMQ Broker;
Destination: 有topic和queue两种方式;
JmsTemplate: spring提供的jms模板;
MessageConverter: 消息转换器;
MessageProducer: 消息生产者;
MessageConsumer: 消息消费者;
MessageListener: 消息监听器;
MessageListenerContainer: 消息监听容器。
下面我把完整的配置文件按照上面的步骤拆开分别讲解:
1.1首先我们配置ConnectionFactory:
brokerURL是指要连接的activeMQ server的地址,该配置即使用activemq独立的消息存储环境,即使服务器重启消息也不会丢失。
我们从Spring给我们提供的connectionFactory中获取Connection,并且把该connectionFactory注册到上面定义的activemq server中。
1.2 Destination:
由前面我们知道Destination有两种形式:P2P和Pub/Sub。那么在配置中表示就是:
或:
1.3 JmsTemplate:
将connectionFactory和defaultDestination注入JmsTemplate中:
在Java相关处理文件中添加(这里用的是@Inject注解,当然也可以用@Autowired):
1.4 MessageConverter
MessageConverter实现的是org.springframework.jms.support.converter.MessageConverter接口, 提供消息的转换功能。
1.5 MessageProducer和MessageConsumer
此处灵活使用,可以以服务的形式提供也可以以工具类的形式提供,详情见下面的示例代码。
1.6 MessageListener
消息的消费者应有的有对应的Listener。
1.7 MessageListenerContainer
MessageListenerContainer即Listener的容器,用来对Listener坐一些配置,每一个listener都对应着一个Container:
Spring为我们听过了两种类型的MessageListenerContainer:SimpleMessageListenerContainer和DefaultMessageListenerContainer。
SimpleMessageListenerContainer会在一开始的时候就创建一个会话Session和消费者Consumer,并且会适用标准的JMS的MessageConsumer.setMessageListener()方法注册监听器让JMS提供调用监听器的回调函数。它不会动态的适应运行时需要和参与外部的事务管理。兼容性方面,它非常接近于独立的JMS规范,但一般不兼容J2EE的JMS限制。大多数情况下,我们还是使用DefaultMessageListenerContainer。
DefaultMessageListenerContainer,与SimpleMessageListenerContainer相比,它会动态的适应运行时的需求,并且能够参与外部的事务管理。
上面就是mq的配置文件部分,如果从上到下的配置部分都清楚地话使用起来肯定没有问题,我们再做一个简要的总结:
可以有一个或者多个消息生产者向同一个destination发送消息;
queue类型的只能有一个消息消费者;
topic类型的可以有多个消息消费者;
每个消费者对应一个MessageListener和一个MessageListenerContainer。
下面我们看一下整合的全部代码:
首先上pom.xml看一下依赖的jar包:
然后是我们的Spring配置文件applicationContext.xml:
activemq的配置文件applicationContext-ActiveMQ.xml:
配置的介绍在上面我已经讲过了,不明白的地方翻到上面去看看。
web.xml文件的配置如下:
我们的工程目录结构如下:
上service的代码:
ProducerService.java
ProducerServiceImpl.java
ConsumerService.java
ConsumerServiceImpl.java
QueueMessageListener.java
接下来是controller:
代码就是上面这些,我们先启动acticemq server,然后下启动工程,在地址栏中输入:http://localhost:8080/SendMessage?msg=nihao,
代码很简单我就没有写前台页面啦,msg部分你可以随便写。回车之后我们去看一下控制台消息就发送出去了。
1. Spring对jms的支持
因为Spring已经将JMS集成到框架里面了,对jms做了自己的封装,我们使用起来更加方便,在Spring中使用jms比较麻烦的就是配置,在Spring中配置JMS大体需要8个部分:ConnectionFactory: 和jms服务器的连接, 可以是外部的jms server, 也可以使用embedded ActiveMQ Broker;
Destination: 有topic和queue两种方式;
JmsTemplate: spring提供的jms模板;
MessageConverter: 消息转换器;
MessageProducer: 消息生产者;
MessageConsumer: 消息消费者;
MessageListener: 消息监听器;
MessageListenerContainer: 消息监听容器。
下面我把完整的配置文件按照上面的步骤拆开分别讲解:
1.1首先我们配置ConnectionFactory:
<amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" />
brokerURL是指要连接的activeMQ server的地址,该配置即使用activemq独立的消息存储环境,即使服务器重启消息也不会丢失。
<!-- 配置JMS连接工厂 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <constructor-arg ref="amqConnectionFactory" /> <property name="sessionCacheSize" value="100" /> </bean>
我们从Spring给我们提供的connectionFactory中获取Connection,并且把该connectionFactory注册到上面定义的activemq server中。
1.2 Destination:
由前面我们知道Destination有两种形式:P2P和Pub/Sub。那么在配置中表示就是:
<!-- 定义消息队列(Queue) --> <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg> <value>first-queue</value> </constructor-arg> </bean>
或:
<!-- 定义消息队列(topic) --> <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 设置消息队列的名字 --> <constructor-arg> <value>first-queue</value> </constructor-arg> </bean>
1.3 JmsTemplate:
将connectionFactory和defaultDestination注入JmsTemplate中:
<!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="demoQueueDestination" /> <property name="receiveTimeout" value="10000" /> <!-- true是topic,false是queue,默认是false,此处显示写出false --> <property name="pubSubDomain" value="false" /> </bean>
在Java相关处理文件中添加(这里用的是@Inject注解,当然也可以用@Autowired):
@Resource(name="jmsTemplate") private JmsTemplate jmsTemplate; TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
1.4 MessageConverter
MessageConverter实现的是org.springframework.jms.support.converter.MessageConverter接口, 提供消息的转换功能。
<bean id="defaultMessageConverter" class="cn.edu.hust.activemq.filter.DefaultMessageConverter" />
1.5 MessageProducer和MessageConsumer
此处灵活使用,可以以服务的形式提供也可以以工具类的形式提供,详情见下面的示例代码。
1.6 MessageListener
消息的消费者应有的有对应的Listener。
<!-- 配置消息队列监听者(Queue) --> <bean id="queueMessageListener" class="cn.edu.hust.activemq.filter.QueueMessageListener" />
1.7 MessageListenerContainer
MessageListenerContainer即Listener的容器,用来对Listener坐一些配置,每一个listener都对应着一个Container:
<!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 --> <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="demoQueueDestination" /> <property name="messageListener" ref="queueMessageListener" /> </bean>
Spring为我们听过了两种类型的MessageListenerContainer:SimpleMessageListenerContainer和DefaultMessageListenerContainer。
SimpleMessageListenerContainer会在一开始的时候就创建一个会话Session和消费者Consumer,并且会适用标准的JMS的MessageConsumer.setMessageListener()方法注册监听器让JMS提供调用监听器的回调函数。它不会动态的适应运行时需要和参与外部的事务管理。兼容性方面,它非常接近于独立的JMS规范,但一般不兼容J2EE的JMS限制。大多数情况下,我们还是使用DefaultMessageListenerContainer。
DefaultMessageListenerContainer,与SimpleMessageListenerContainer相比,它会动态的适应运行时的需求,并且能够参与外部的事务管理。
上面就是mq的配置文件部分,如果从上到下的配置部分都清楚地话使用起来肯定没有问题,我们再做一个简要的总结:
可以有一个或者多个消息生产者向同一个destination发送消息;
queue类型的只能有一个消息消费者;
topic类型的可以有多个消息消费者;
每个消费者对应一个MessageListener和一个MessageListenerContainer。
下面我们看一下整合的全部代码:
首先上pom.xml看一下依赖的jar包:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>q</groupId> <artifactId>q</artifactId> <packaging>war</packaging> <version>1.0-SNAPSHOT</version> <name>q Maven Webapp</name> <url>http://maven.apache.org</url> <properties> <springframework>4.3.0.RELEASE</springframework> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> <!-- spring --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${springframework}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${springframework}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>${springframework}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>${springframework}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${springframework}</version> </dependency> <!-- xbean 如<amq:connectionFactory /> --> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.16</version> </dependency> <!-- activemq --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.12.1</version> </dependency> </dependencies> <build> <finalName>q</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </build> </project>
然后是我们的Spring配置文件applicationContext.xml:
<?xml version="1.0" encoding="UTF-8"?> <!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd"> <!-- 指定Sping组件扫描的基本包路径 --> <context:component-scan base-package="cn.edu.hust.activemq" > <!-- 这里只扫描Controller,不可重复加载Service --> <context:include-filter type="annotation" expression="org.springframework.stereotype.Controller"/> </context:component-scan> <!-- 启用MVC注解 --> <mvc:annotation-driven /> <!-- JSP视图解析器--> <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"> <property name="prefix" value="/" /> <property name="suffix" value=".jsp" /> <!-- 定义其解析视图的order顺序为1 --> <property name="order" value="1" /> </bean> </beans>
activemq的配置文件applicationContext-ActiveMQ.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd">
<context:component-scan base-package="cn.edu.hust.activemq" />
<mvc:annotation-driven />
<amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" />
<!-- 配置JMS连接工厂 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <constructor-arg ref="amqConnectionFactory" /> <property name="sessionCacheSize" value="100" /> </bean>
<!-- 定义消息队列(Queue) -->
<bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg>
<value>first-queue</value>
</constructor-arg>
</bean>
<!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="demoQueueDestination" /> <property name="receiveTimeout" value="10000" /> <!-- true是topic,false是queue,默认是false,此处显示写出false --> <property name="pubSubDomain" value="false" /> </bean>
<!-- 配置消息队列监听者(Queue) -->
<bean id="queueMessageListener" class="cn.edu.hust.activemq.filter.QueueMessageListener" />
<!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 --> <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="demoQueueDestination" /> <property name="messageListener" ref="queueMessageListener" /> </bean>
</beans>
配置的介绍在上面我已经讲过了,不明白的地方翻到上面去看看。
web.xml文件的配置如下:
<?xml version="1.0" encoding="UTF-8"?> <web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" version="3.0"> <display-name>Archetype Created Web Application</display-name> <context-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:applicationContext-ActiveMQ.xml</param-value> </context-param> <listener> <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class> </listener> <servlet> <servlet-name>springMVC</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> <init-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:applicationContext.xml</param-value> </init-param> <load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>springMVC</servlet-name> <url-pattern>/</url-pattern> </servlet-mapping> <!-- 处理编码格式 --> <filter> <filter-name>characterEncodingFilter</filter-name> <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class> <init-param> <param-name>encoding</param-name> <param-value>UTF-8</param-value> </init-param> <init-param> <param-name>forceEncoding</param-name> <param-value>true</param-value> </init-param> </filter> <filter-mapping> <filter-name>characterEncodingFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping> </web-app>
我们的工程目录结构如下:
上service的代码:
ProducerService.java
import javax.jms.Destination; /** * Created by Administrator on 2017/5/3. */ public interface ProducerService { void sendMessage(Destination destination,final String msg); void sendMessage(final String msg); }
ProducerServiceImpl.java
import cn.edu.hust.activemq.service.ProducerService; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; /** * Created by Administrator on 2017/5/3. */ @Service public class ProducerServiceImpl implements ProducerService { @Resource(name="jmsTemplate") private JmsTemplate jmsTemplate; @Override public void sendMessage(Destination destination, final String msg) { System.out.println(Thread.currentThread().getName()+" 向队列"+destination.toString()+"发送消息--------->"+msg); jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } @Override public void sendMessage(final String msg) { String destination = jmsTemplate.getDefaultDestinationName(); System.out.println(Thread.currentThread().getName()+" 向队列"+destination+"发送消息-------->"+msg); jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } }
ConsumerService.java
import javax.jms.Destination; import javax.jms.TextMessage; /** * Created by Administrator on 2017/5/3. */ public interface ConsumerService { TextMessage receive(Destination destination); }
ConsumerServiceImpl.java
import cn.edu.hust.activemq.service.ConsumerService; import javax.jms.Destination; import javax.jms.TextMessage; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import javax.jms.JMSException; /** * Created by Administrator on 2017/5/3. */ @Service public class ConsumerServiceImpl implements ConsumerService { @Resource(name="jmsTemplate") private JmsTemplate jmsTemplate; @Override public TextMessage receive(Destination destination){ TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination); try{ System.out.println("从队列" + destination.toString() + "收到了消息:\t" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } return textMessage; } }
QueueMessageListener.java
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * Created by Administrator on 2017/5/3. */ public class QueueMessageListener implements MessageListener { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("QueueMessageListener监听到了文本消息:\t" + tm.getText()); //do something ... } catch (JMSException e) { e.printStackTrace(); } } }
接下来是controller:
import cn.edu.hust.activemq.service.ConsumerService; import cn.edu.hust.activemq.service.ProducerService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.TextMessage; /** * Created by Administrator on 2017/5/3. */ @Controller public class MessageController { private Logger logger = LoggerFactory.getLogger(MessageController.class); @Resource(name = "demoQueueDestination") private Destination destination; //队列消息生产者 @Resource private ProducerService producer; //队列消息消费者 @Resource private ConsumerService consumer; @RequestMapping(value = "/SendMessage", method = RequestMethod.GET) @ResponseBody public void send(String msg) { logger.info(Thread.currentThread().getName()+"------------开始发送消息"); producer.sendMessage(msg); logger.info(Thread.currentThread().getName()+"------------发送完毕"); } @RequestMapping(value= "/ReceiveMessage",method = RequestMethod.GET) @ResponseBody public Object receive(){ logger.info(Thread.currentThread().getName()+"------------开始接受消息"); TextMessage tm = consumer.receive(destination); logger.info(Thread.currentThread().getName()+"------------接受完毕"); return tm; } }
代码就是上面这些,我们先启动acticemq server,然后下启动工程,在地址栏中输入:http://localhost:8080/SendMessage?msg=nihao,
代码很简单我就没有写前台页面啦,msg部分你可以随便写。回车之后我们去看一下控制台消息就发送出去了。
相关文章推荐
- 消息中间件-activemq实战整合Spring之Topic模式(五)
- 消息中间件-activemq实战之整合Spring(四)
- 消息中间件之ActiveMQ整合Spring实现邮箱发送(四)
- spring整合消息中间件ActiveMQ简单使用
- 利用activeMQ消息中间件整合spring mail发邮件
- linux下 消息中间件ActiveMQ整合spring笔记二 接收消息
- Spring整合ActiveMQ消息中间件
- activeMQ消息中间件的与spring的整合
- linux下 消息中间件ActiveMQ整合spring笔记二 发消息
- 消息中间件activemq-5.13.0整合spring
- spring整合activemq发送MQ消息[queue模式]实例
- Spring整合ActiveMQ一(消息发送端配置)
- spring整合activemq消息队列之点对点模式
- spring整合activemq发送消息[queue类型]实例
- Spring整合JMS(消息中间件)
- Spring2.5整合ActiveMQ 5.2(P2P文本消息)
- Java消息队列-Spring整合ActiveMq
- spring整合apache activemq实现消息发送的三种方式代码配置实例
- Java ActiveMQ 讲解(二)Spring ActiveMQ整合+注解消息监听
- ActiveMQ与Spring整合:(3)消息监听器