您的位置:首页 > 编程语言 > Java开发

使用Spring配置ActiveMQ的发布订阅模式

2016-07-14 13:28 1701 查看
通过Spring对ActiveMQ进行配置开发,发布订阅模式,支持消息的持久化。

需要Spring2.5版本以上,如果有多个订阅者,每个订阅者需要指定不同的 clientId 。

 

发布者的配置

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd">
<!-- 配置JMS连接工厂 -->
<bean id="myConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="10" />
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- MQ地址 -->
<property name="brokerURL" value="tcp://localhost:61616" />
<!-- 是否异步发送 -->
<property name="useAsyncSend" value="true" />
</bean>
</property>
</bean>

<!-- 发送消息的目的地(一个主题) -->
<bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 设置消息主题的名字 -->
<constructor-arg index="0" value="Online.Notice.Topic" />
</bean>

<!-- 配置JMS模版 -->
<bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="myConnectionFactory" />
<property name="defaultDestination" ref="myDestination" />
<!-- 订阅发布模式 -->
<property name="pubSubDomain" value="true" />
<property name="receiveTimeout" value="10000" />
</bean>
</beans>

发布者的代码

package com.xikang.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class SimpleJMSSender {

public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext-send.xml");

JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("myJmsTemplate");
for (int i = 0; i < 10; i++) {
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage msg = session.createTextMessage();
// 设置消息属性
msg.setStringProperty("phrCode", "C001");
// 设置消息内容
msg.setText("Hello World!");
return msg;
}
});
}
}
}


订阅者的配置:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd">
<!-- 配置JMS连接工厂 -->
<bean id="myConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="10" />
<!-- 接收者ID -->
<property name="clientId" value="client_119" />
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- MQ地址 -->
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</property>
</bean>

<!-- 发送消息的目的地(一个主题) -->
<bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 设置消息主题的名字 -->
<constructor-arg index="0" value="Online.Notice.Topic" />
</bean>

<!-- 生产消息配置 (自己定义)-->
<bean id="myTopicConsumer" class="com.xikang.jms.SimpleJMSReceiver" />

<!-- 消息监听器 -->
<bean id="myTopicListener"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="myTopicConsumer" />
<!-- 接收消息的方法名称 -->
<property name="defaultListenerMethod" value="receive" />
<!-- 不进行消息转换 -->
<property name="messageConverter"><null/></property>
</bean>

<!-- 消息监听容器 -->
<bean id="myListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="myConnectionFactory" />
<!-- 发布订阅模式 -->
<property name="pubSubDomain" value="true"/>
<!-- 消息持久化 -->
<property name="subscriptionDurable" value="true"/>
<property name="receiveTimeout" value="10000"/>
<!-- 接收者ID -->
<property name="clientId" value="client_119" />
<property name="durableSubscriptionName" value="client_119"/>
<property name="destination" ref="myDestination" />
<property name="messageListener" ref="myTopicListener" />
</bean>

</beans>

订阅者的代码:

package com.xikang.jms;

import javax.jms.JMSException;
import javax.jms.TextMessage;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.JmsException;

public class SimpleJMSReceiver {

public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext-receive.xml");
while(true) {
}
}

public void receive(TextMessage message) throws JmsException, JMSException {
System.out.println(message.getStringProperty("phrCode"));
System.out.println(message.getText());
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spring activemq