您的位置:首页 > 编程语言 > Java开发

消息中间件-activemq实战之整合Spring(四)

2017-05-05 08:20 761 查看
前面的理论准备已经很充分,这一节我们来实战:将activemq整合到Spring框架才行中,因为Spring已经集成了JMS,这也为我们配置activermq带来了方便。

1. Spring对jms的支持

因为Spring已经将JMS集成到框架里面了,对jms做了自己的封装,我们使用起来更加方便,在Spring中使用jms比较麻烦的就是配置,在Spring中配置JMS大体需要8个部分:

ConnectionFactory: 和jms服务器的连接, 可以是外部的jms server, 也可以使用embedded ActiveMQ Broker;

Destination: 有topic和queue两种方式;

JmsTemplate: spring提供的jms模板;

MessageConverter: 消息转换器;

MessageProducer: 消息生产者;

MessageConsumer: 消息消费者;

MessageListener: 消息监听器;

MessageListenerContainer: 消息监听容器。

下面我把完整的配置文件按照上面的步骤拆开分别讲解:

1.1首先我们配置ConnectionFactory:

<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://127.0.0.1:61616"
userName="admin"
password="admin" />


brokerURL是指要连接的activeMQ server的地址,该配置即使用activemq独立的消息存储环境,即使服务器重启消息也不会丢失。

<!-- 配置JMS连接工厂 -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
<property name="sessionCacheSize" value="100" />
</bean>


我们从Spring给我们提供的connectionFactory中获取Connection,并且把该connectionFactory注册到上面定义的activemq server中。

1.2 Destination:

由前面我们知道Destination有两种形式:P2P和Pub/Sub。那么在配置中表示就是:

<!-- 定义消息队列(Queue) -->
<bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg>
<value>first-queue</value>
</constructor-arg>
</bean>


或:

<!-- 定义消息队列(topic) -->
<bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 设置消息队列的名字 -->
<constructor-arg>
<value>first-queue</value>
</constructor-arg>
</bean>


1.3 JmsTemplate:

将connectionFactory和defaultDestination注入JmsTemplate中:

<!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="demoQueueDestination" />
<property name="receiveTimeout" value="10000" />
<!-- true是topic,false是queue,默认是false,此处显示写出false -->
<property name="pubSubDomain" value="false" />
</bean>


在Java相关处理文件中添加(这里用的是@Inject注解,当然也可以用@Autowired):

@Resource(name="jmsTemplate")
private JmsTemplate jmsTemplate;

TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);


1.4 MessageConverter

MessageConverter实现的是org.springframework.jms.support.converter.MessageConverter接口, 提供消息的转换功能。

<bean id="defaultMessageConverter" class="cn.edu.hust.activemq.filter.DefaultMessageConverter" />


1.5 MessageProducer和MessageConsumer

此处灵活使用,可以以服务的形式提供也可以以工具类的形式提供,详情见下面的示例代码。

1.6 MessageListener

消息的消费者应有的有对应的Listener。

<!-- 配置消息队列监听者(Queue) -->
<bean id="queueMessageListener" class="cn.edu.hust.activemq.filter.QueueMessageListener" />


1.7 MessageListenerContainer

MessageListenerContainer即Listener的容器,用来对Listener坐一些配置,每一个listener都对应着一个Container:

<!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
<bean id="queueListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="demoQueueDestination" />
<property name="messageListener" ref="queueMessageListener" />
</bean>


Spring为我们听过了两种类型的MessageListenerContainer:SimpleMessageListenerContainer和DefaultMessageListenerContainer。

SimpleMessageListenerContainer会在一开始的时候就创建一个会话Session和消费者Consumer,并且会适用标准的JMS的MessageConsumer.setMessageListener()方法注册监听器让JMS提供调用监听器的回调函数。它不会动态的适应运行时需要和参与外部的事务管理。兼容性方面,它非常接近于独立的JMS规范,但一般不兼容J2EE的JMS限制。大多数情况下,我们还是使用DefaultMessageListenerContainer。

DefaultMessageListenerContainer,与SimpleMessageListenerContainer相比,它会动态的适应运行时的需求,并且能够参与外部的事务管理。

上面就是mq的配置文件部分,如果从上到下的配置部分都清楚地话使用起来肯定没有问题,我们再做一个简要的总结:

可以有一个或者多个消息生产者向同一个destination发送消息;

queue类型的只能有一个消息消费者;

topic类型的可以有多个消息消费者;

每个消费者对应一个MessageListener和一个MessageListenerContainer。

下面我们看一下整合的全部代码:

首先上pom.xml看一下依赖的jar包:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>q</groupId>
<artifactId>q</artifactId>
<packaging>war</packaging>
<version>1.0-SNAPSHOT</version>
<name>q Maven Webapp</name>
<url>http://maven.apache.org</url>
<properties>
<springframework>4.3.0.RELEASE</springframework>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<!-- spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${springframework}</version>
</dependency>
<!-- xbean 如<amq:connectionFactory /> -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>

<!-- activemq -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.12.1</version>
</dependency>
</dependencies>
<build>
<finalName>q</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
</project>


然后是我们的Spring配置文件applicationContext.xml:

<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd"> 
<!-- 指定Sping组件扫描的基本包路径 -->
<context:component-scan base-package="cn.edu.hust.activemq" >
<!-- 这里只扫描Controller,不可重复加载Service -->
<context:include-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
</context:component-scan>
<!-- 启用MVC注解 -->
<mvc:annotation-driven />

<!-- JSP视图解析器-->
<bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
<property name="prefix" value="/" />
<property name="suffix" value=".jsp" />
<!--  定义其解析视图的order顺序为1 -->
<property name="order" value="1" />
</bean>
</beans>


activemq的配置文件applicationContext-ActiveMQ.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd">

<context:component-scan base-package="cn.edu.hust.activemq" />
<mvc:annotation-driven />

<amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" />

<!-- 配置JMS连接工厂 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <constructor-arg ref="amqConnectionFactory" /> <property name="sessionCacheSize" value="100" /> </bean>

<!-- 定义消息队列(Queue) -->
<bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg>
<value>first-queue</value>
</constructor-arg>
</bean>

<!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="demoQueueDestination" /> <property name="receiveTimeout" value="10000" /> <!-- true是topic,false是queue,默认是false,此处显示写出false --> <property name="pubSubDomain" value="false" /> </bean>

<!-- 配置消息队列监听者(Queue) -->
<bean id="queueMessageListener" class="cn.edu.hust.activemq.filter.QueueMessageListener" />

<!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 --> <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="demoQueueDestination" /> <property name="messageListener" ref="queueMessageListener" /> </bean>

</beans>


配置的介绍在上面我已经讲过了,不明白的地方翻到上面去看看。

web.xml文件的配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" version="3.0">
<display-name>Archetype Created Web Application</display-name>
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:applicationContext-ActiveMQ.xml</param-value>
</context-param>

<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>

<servlet>
<servlet-name>springMVC</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:applicationContext.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>springMVC</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>

<!-- 处理编码格式 -->
<filter>
<filter-name>characterEncodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
<init-param>
<param-name>forceEncoding</param-name>
<param-value>true</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>characterEncodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>

</web-app>


我们的工程目录结构如下:



上service的代码:

ProducerService.java

import javax.jms.Destination;

/**
* Created by Administrator on 2017/5/3.
*/
public interface ProducerService {

void sendMessage(Destination destination,final String msg);

void sendMessage(final String msg);
}


ProducerServiceImpl.java

import cn.edu.hust.activemq.service.ProducerService;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

/**
* Created by Administrator on 2017/5/3.
*/
@Service
public class ProducerServiceImpl implements ProducerService {

@Resource(name="jmsTemplate")
private JmsTemplate jmsTemplate;

@Override
public void sendMessage(Destination destination, final String msg) {
System.out.println(Thread.currentThread().getName()+" 向队列"+destination.toString()+"发送消息--------->"+msg);
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}

@Override
public void sendMessage(final String msg) {
String destination = jmsTemplate.getDefaultDestinationName();
System.out.println(Thread.currentThread().getName()+" 向队列"+destination+"发送消息-------->"+msg);
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}
}


ConsumerService.java

import javax.jms.Destination;
import javax.jms.TextMessage;
/**
* Created by Administrator on 2017/5/3.
*/
public interface ConsumerService {

TextMessage receive(Destination destination);
}


ConsumerServiceImpl.java

import cn.edu.hust.activemq.service.ConsumerService;

import javax.jms.Destination;
import javax.jms.TextMessage;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.JMSException;

/**
* Created by Administrator on 2017/5/3.
*/
@Service
public class ConsumerServiceImpl implements ConsumerService {
@Resource(name="jmsTemplate")
private JmsTemplate jmsTemplate;

@Override
public TextMessage receive(Destination destination){
TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
try{
System.out.println("从队列" + destination.toString() + "收到了消息:\t"
+ textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
return textMessage;
}
}


QueueMessageListener.java

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
* Created by Administrator on 2017/5/3.
*/
public class QueueMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("QueueMessageListener监听到了文本消息:\t"
+ tm.getText());
//do something ...
} catch (JMSException e) {
e.printStackTrace();
}
}
}


接下来是controller:

import cn.edu.hust.activemq.service.ConsumerService;
import cn.edu.hust.activemq.service.ProducerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.TextMessage;

/**
* Created by Administrator on 2017/5/3.
*/
@Controller
public class MessageController {
private Logger logger = LoggerFactory.getLogger(MessageController.class);
@Resource(name = "demoQueueDestination")
private Destination destination;

//队列消息生产者
@Resource
private ProducerService producer;

//队列消息消费者
@Resource
private ConsumerService consumer;

@RequestMapping(value = "/SendMessage", method = RequestMethod.GET)
@ResponseBody
public void send(String msg) {
logger.info(Thread.currentThread().getName()+"------------开始发送消息");
producer.sendMessage(msg);
logger.info(Thread.currentThread().getName()+"------------发送完毕");
}

@RequestMapping(value= "/ReceiveMessage",method = RequestMethod.GET)
@ResponseBody
public Object receive(){
logger.info(Thread.currentThread().getName()+"------------开始接受消息");
TextMessage tm = consumer.receive(destination);
logger.info(Thread.currentThread().getName()+"------------接受完毕");
return tm;
}

}


代码就是上面这些,我们先启动acticemq server,然后下启动工程,在地址栏中输入:http://localhost:8080/SendMessage?msg=nihao

代码很简单我就没有写前台页面啦,msg部分你可以随便写。回车之后我们去看一下控制台消息就发送出去了。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息