您的位置:首页 > 编程语言 > Java开发

Spring 消息JMS(Java Message Service)

2016-02-25 13:27 441 查看
一、JMS简介。

JMS是用于应用程序间的通信。JMS实现的是应用程序之间的异步通信机制。当异步发送消息时,客户端不需要等待服务器处理消息,甚至不需要等待消息投递。客户端发送完消息就可以继续执行其他程序



二、在JMS中主要有两个概念: 消息代理(message broker)和目的地(destination)。当一个应用发送消息时,会将消息交给消息代理。由消息代理来确保消息投递到指定目的地,同时释放消息发布者,使其可以继续执行其他业务。

三、在JMS中主要有两种类型的目的地: 队列(Queue)和主题(Topic)。每种类型都与特定的消息模型相关联,分别是应用队列的点对点模型和应用于主题的发布/订阅模型。

点对点模型:

每条消息都有个发布者和一个接收者。一条消息只能被一个接收者收到


尽管消息队列中的每条消息只能被一个接收者收到,并不意味者只能使用一个接收者从队列中获取消息。为了提高应用程序消息的处理速度,可以在一个消息队列上定义多个接收者监听器(Listener)来处理队列中的消息。每个接收者处理自己接收到的消息。就类似于去银行办理业务,一个窗口就相当于一个接收者,所有要办理业务的人就是队列中的消息。

发布/订阅模型

在发布/订阅模型中,消息会发送给一个主题。与队列类似,多个接收者可以监听一个主题。但与队列不同的是,每条消息都会发送给所有监听该主题的接收者。


四、与同步通信相比采用JMS收发消息的优点:

无需等待。使用JMS发送消息,客户端无需等待消息被处理。客户端只需要将消息发送给消息代理就可确信消息被投递到指定的目的地。因为无需等待,节省了时间,极大提高了客户端的性能。

面向消息和解耦。使用JMS发送消息是以数据为中心的,这意味着客户端没有与特定的方法绑定。任何可以处理数据的队列和主题订阅者都可以处理由客户端发送的消息。客户端不必了解远程服务的任何规范。

位置独立。JMS客户端不必知道由谁来处理它发送的消息,也不需要关注服务器所在位置。

确保独立。使用JMS发送消息时,客户端完全可以放心消息被投递。即使消息发送时,服务无法使用了,消息也会被储存起来,直到服务重新可以使用。

五、搭建一个Spring JMS的消息代理

这里我们使用最流行的消息代理 —— ActiveMQ消息代理。

1. ActiveMQ准备。既然是使用的apache的activeMQ作为JMS的实现,那么首先我们应该到apache官网上下载activeMQ(http://activemq.apache.org/download.html),测试版本:apache-activemq-5.10.0。进行解压后运行其bin目录下面的activemq.bat文件。若启动失败 根据个人电脑选择bin目录下win32或win64目录下的activemq.bat文件运行。启动成功界面如下:



在浏览器访问地址:http://localhost:8161/admin/



若要停止服务器,只需要按着Ctrl+Shift+C,之后输入y即可。

2. 配置ConnectionFactory。ConnectionFactory是用于产生到JMS服务器的链接的,Spring为我们提供了多个ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory。SingleConnectionFactory对于建立JMS服务器链接的请求会一直返回同一个链接,并且会忽略Connection的close方法调用。CachingConnectionFactory继承了SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能,同时它还新增了缓存功能,它可以缓存Session、MessageProducer和MessageConsumer。这里我们使用CachingConnectionFactory来作为示例。

<bean id="connectionFactory"                   class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
<property name="sessionCacheSize" value="100"/>
</bean>


这样就定义好产生JMS服务器链接的ConnectionFactory了吗?答案是非也。Spring提供的ConnectionFactory只是Spring用于管理ConnectionFactory的,真正产生到JMS服务器链接的ConnectionFactory还得是由JMS服务厂商提供,并且需要把它注入到Spring提供的ConnectionFactory中。我们这里使用的是ActiveMQ实现的JMS,所以在我们这里真正的可以产生Connection的就应该是由ActiveMQ提供的ConnectionFactory。所以定义一个ConnectionFactory的完整代码应该如下所示:

<!--将ActiveMQ实现的JMS服务器连接ConnectionFactory注入到spring提供的ConnectionFactory中 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616"/><!-- 本地ActiveMQ -->
</bean>
<!--配置连接工厂ConnectionFactory-->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> <property name="sessionCacheSize" value="100"/> </bean>


配置生产者。配置好ConnectionFactory之后我们就需要配置生产者。生产者负责产生消息并发送到JMS服务器,这通常对应的是我们的一个业务逻辑服务实现类。但是我们的服务实现类是怎么进行消息的发送的呢?这通常是利用Spring为我们提供的JmsTemplate类来实现的,所以配置生产者其实最核心的就是配置进行消息发送的JmsTemplate。对于消息发送者而言,它在发送消息的时候要知道自己该往哪里发,为此,我们在定义JmsTemplate的时候需要往里面注入一个Spring提供的ConnectionFactory对象。

<!-- 配置生产者【负责将消息发送到JMS服务器,这里通常利用spring提供的JmsTemplate类来实现】 -->
<bean id="jsmTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
</bean>


在真正利用JmsTemplate进行消息发送的时候,我们需要知道消息发送的目的地,即destination。在Jms中有一个用来表示目的地的Destination接口,它里面没有任何方法定义,只是用来做一个标识而已。当我们在使用JmsTemplate进行消息发送时没有指定destination的时候将使用默认的Destination。默认Destination可以通过在定义jmsTemplate bean对象时通过属性def aultDestination或def aultDestinationName来进行注入,def aultDestinationName对应的就是一个普通字符串。在ActiveMQ中实现了两种类型的Destination,一个是点对点的ActiveMQQueue,另一个就是支持订阅/发布模式的ActiveMQTopic。在定义这两种类型的Destination时我们都可以通过一个name属性来进行构造,如:

<!-- 当利用JmsTemplate发送消息时,配置消息发送目的地【点对点的ActiveMQQueue、支持订阅/发布模式的ActiveMQTopic】 -->
<!-- 1.点对点模式(队列) -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue"/>
</bean>
<!-- 2.订阅模式(主题) -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic"/>
</bean>


我们定义了一个ProducerService,里面有一个向Destination发送纯文本消息的方法sendMessage

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import com.sun.istack.internal.FinalArrayList;
/**
* 生产者,发送消息类
* @author ntc
*/
@Component
public class ProduceManage {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination queueDestination;
@Autowired
private Destination topicDestination;
public void sendQueue(final String message) {
sendMessage(queueDestination, message);
}
public void sendTopic(final String message) {
sendMessage(topicDestination, message);
}

private void sendMessage(Destination destination, final String message) {
this.jmsTemplate.send(destination, new MessageCreator() {

@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}


配置消费者。生产者往指定目的地Destination发送消息后,接下来就是消费者对指定目的地的消息进行消费了。那么消费者是如何知道有生产者发送消息到指定目的地Destination了呢?这是通过Spring为我们封装的消息监听容器MessageListenerContainer实现的,它负责接收信息,并把接收到的信息分发给真正的MessageListener进行处理。每个消费者对应每个目的地都需要有对应的MessageListenerContainer。对于消息监听容器而言,除了要知道监听哪个目的地之外,还需要知道到哪里去监听,也就是说它还需要知道去监听哪个JMS服务器,这是通过在配置MessageConnectionFactory的时候往里面注入一个ConnectionFactory来实现的。所以我们在配置一个MessageListenerContainer的时候有三个属性必须指定,一个是表示从哪里监听的ConnectionFactory;一个是表示监听什么的Destination;一个是接收到消息以后进行消息处理的MessageListener。Spring一共为我们提供了两种类型的MessageListenerContainer,SimpleMessageListenerContainer和DefaultMessageListenerContainer。SimpleMessageListenerContainer会在一开始的时候就创建一个会话session和消费者Consumer,并且会使用标准的JMS MessageConsumer.setMessageListener()方法注册监听器让JMS提供者调用监听器的回调函数。它不会动态的适应运行时需要和参与外部的事务管理。兼容性方面,它非常接近于独立的JMS规范,但一般不兼容Java EE的JMS限制。大多数情况下我们还是使用的Def aultMessageListenerContainer,跟SimpleMessageListenerContainer相比,DefaultMessageListenerContainer会动态的适应运行时需要,并且能够参与外部的事务管理。它很好的平衡了对JMS提供者要求低、先进功能如事务参与和兼容Java EE环境。定义处理消息的MessageListener

要定义处理消息的MessageListener我们只需要实现JMS规范中的MessageListener接口就可以了。MessageListener接口中只有一个方法onMessage方法,当接收到消息的时候会自动调用该方法。

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* 消费者
* @author ntc
*
*/
public class consumerManagerListener implements MessageListener {
@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
TextMessage textMessage = (TextMessage) message;
System.out.println("1号窗口接收到一个纯文本消息。");
try {
System.out.println("消息内容是:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}


有了MessageListener之后我们就可以在Spring的配置文件中配置一个消息监听容器了。

<!-- 配置消费者【对指定目的地的消息进行消费,通过spring封装的消息监听容器MessageListener实现】
MessageListenerContainer必备的三个属性:从哪监听connectionFactory、监听什么destination、接收到消息后如何处理messageListener -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="queueDestination"/>
<property name="messageListener" ref="consumerManagerListener"/>
</bean>
<!-- 定义处理消息的MessageListener -->
<bean id="consumerManagerListener" class="com.wiseweb.pom.web.comments.consumerManagerListener"/>


若要定义多个接收者,只需要定义多个listener,以及多个listenerConainer。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: