您的位置:首页 > 其它

ActiveMQ订阅模式持久化实现

2017-11-13 19:07 357 查看
实现步骤:
1、配置发送xml,applicationContext-send.xml

[html] view plain copy

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd">
<context:property-placeholder location="classpath:/properties/jms.properties" />

<!-- 配置JMS连接工厂 -->

<bean id="myConnectionFactory"

class="org.springframework.jms.connection.CachingConnectionFactory">

<!-- Session缓存数量 -->

<property name="sessionCacheSize" value="10" />

<property name="targetConnectionFactory">

<bean class="org.apache.activemq.ActiveMQConnectionFactory">

<!-- MQ地址 -->

<property name="brokerURL" value="${brokerUrl}" />

<!-- 是否异步发送 -->

<property name="useAsyncSend" value="true" />

</bean>

</property>

</bean>

<!-- 发送消息的目的地(一个主题) -->

<bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">

<!-- 设置消息主题的名字 -->

<constructor-arg index="0" value="${send.name}" />

</bean>

<!-- 配置JMS模版 -->

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">

<property name="connectionFactory" ref="myConnectionFactory" />

<property name="defaultDestination" ref="myDestination" />

<!-- 订阅发布模式 -->

<property name="pubSubDomain" value="true" />

<property name="receiveTimeout" value="10000" />

</bean>

</beans>

2、编写发送java,ActiveMQsender.java

[java] view plain copy

package com.by.activeMQ;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.springframework.context.ApplicationContext;

import org.springframework.context.support.ClassPathXmlApplicationContext;

import org.springframework.jms.core.JmsTemplate;

import org.springframework.jms.core.MessageCreator;

public class ActiveMQsender {

public static void main(String[] args) {

@SuppressWarnings("resource")

ApplicationContext ctx = new ClassPathXmlApplicationContext(

"ApplicationContext/applicationContext-send.xml");

JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");

jmsTemplate.send(new MessageCreator() {

public Message createMessage(Session session) throws JMSException {

TextMessage msg = session.createTextMessage();

// 设置消息属性

msg.setStringProperty("mood", "happy");

// 设置消息内容

msg.setText("Hello World!");

return msg;

}

});

System.out.println("send end");

}

}

3、配置接收xml,applicationContext-receive.xml

[html] view plain copy

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd">
<context:property-placeholder location="classpath:/properties/jms.properties" />

<!-- 第一个接收者 -->

<!-- 配置JMS连接工厂 -->

<bean id="myConnectionFactory"

class="org.springframework.jms.connection.CachingConnectionFactory">

<!-- Session缓存数量 -->

<property name="sessionCacheSize" value="10" />

<!-- 接收者ID -->

<property name="clientId" value="${topic.clientId}" />

<property name="targetConnectionFactory">

<bean class="org.apache.activemq.ActiveMQConnectionFactory">

<!-- MQ地址 -->

<property name="brokerURL" value="${brokerUrl}" />

</bean>

</property>

</bean>

<!-- 发送消息的目的地(一个主题) -->

<bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">

<!-- 设置消息主题的名字 -->

<constructor-arg index="0" value="${topic.name}" />

</bean>

<!-- 生产消息配置 (自己定义)-->

<bean id="myTopicConsumer" class="com.by.activeMQ.ActiveMQreceiver" />

<!-- 消息监听器 -->

<bean id="myTopicListener"

class="org.springframework.jms.listener.adapter.MessageListenerAdapter">

<constructor-arg ref="myTopicConsumer" />

<!-- 接收消息的方法名称 -->

<property name="defaultListenerMethod" value="receive" />

<!-- 不进行消息转换 -->

<property name="messageConverter"><null/></property>

</bean>

<!-- 消息监听容器 -->

<bean id="myListenerContainer"

class="org.springframework.jms.listener.DefaultMessageListenerContainer">

<property name="connectionFactory" ref="myConnectionFactory" />

<!-- 发布订阅模式 -->

<property name="pubSubDomain" value="true"/>

<!-- 消息持久化 -->

<property name="subscriptionDurable" value="true"/>

<property name="receiveTimeout" value="10"/>

<!-- 接收者ID -->

<property name="clientId" value="${topic.clientId}" />

<property name="durableSubscriptionName" value="${topic.clientId}"/>

<property name="destination" ref="myDestination" />

<property name="messageListener" ref="myTopicListener" />

</bean>

<!-- 第二个接收者 -->

<!-- 配置JMS连接工厂 -->

<bean id="myConnectionFactory2"

class="org.springframework.jms.connection.CachingConnectionFactory">

<!-- Session缓存数量 -->

<property name="sessionCacheSize" value="10" />

<!-- 接收者ID -->

<property name="clientId" value="${topic2.clientId}" />

<property name="targetConnectionFactory">

<bean class="org.apache.activemq.ActiveMQConnectionFactory">

<!-- MQ地址 -->

<property name="brokerURL" value="${brokerUrl}" />

</bean>

</property>

</bean>

<!-- 发送消息的目的地(一个主题) -->

<bean id="myDestination2" class="org.apache.activemq.command.ActiveMQTopic">

<!-- 设置消息主题的名字 -->

<constructor-arg index="0" value="${topic2.name}" />

</bean>

<!-- 生产消息配置 (自己定义)-->

<bean id="myTopicConsumer2" class="com.by.activeMQ.ActiveMQreceiver2" />

<!-- 消息监听器 -->

<bean id="myTopicListener2"

class="org.springframework.jms.listener.adapter.MessageListenerAdapter">

<constructor-arg ref="myTopicConsumer2" />

<!-- 接收消息的方法名称 -->

<property name="defaultListenerMethod" value="receive" />

<!-- 不进行消息转换 -->

<property name="messageConverter"><null/></property>

</bean>

<!-- 消息监听容器 -->

<bean id="myListenerContainer2"

class="org.springframework.jms.listener.DefaultMessageListenerContainer">

<property name="connectionFactory" ref="myConnectionFactory2" />

<!-- 发布订阅模式 -->

<property name="pubSubDomain" value="true"/>

<!-- 消息持久化 -->

<property name="subscriptionDurable" value="true"/>

<property name="receiveTimeout" value="10"/>

<!-- 接收者ID -->

<property name="clientId" value="${topic2.clientId}" />

<property name="durableSubscriptionName" value="${topic2.clientId}"/>

<property name="destination" ref="myDestination2" />

<property name="messageListener" ref="myTopicListener2" />

</bean>

</beans>

4、编写接收java,ActiveMQreceiver.java

[java] view plain copy

package com.by.activeMQ;

import javax.jms.JMSException;

import javax.jms.TextMessage;

import org.springframework.jms.JmsException;

public class ActiveMQreceiver {

public void receive(TextMessage message) throws JmsException, JMSException {

String info = "this is receiver, "

+ " mood is " + message.getStringProperty("mood") + ","

+ "say " + message.getText();

System.out.println(info);

}

}

5、编写另一个接收java,ActiveMQreceiver.java

[java] view plain copy

package com.by.activeMQ;

import javax.jms.JMSException;

import javax.jms.TextMessage;

import org.springframework.jms.JmsException;

public class ActiveMQreceiver2 {

public void receive(TextMessage message) throws JmsException, JMSException {

String info = "this is receiver2,"

+ " mood is " + message.getStringProperty("mood") + ","

+ "say " + message.getText();

System.out.println(info);

}

}

6、编写一个main,开启接收监听,openReceive.java

[java] view plain copy

package com.by.activeMQ;

import org.springframework.context.ApplicationContext;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class openReceive {

public static void main(String[] args) {

@SuppressWarnings({ "unused", "resource" })

ApplicationContext ctx = new ClassPathXmlApplicationContext("ApplicationContext/applicationContext-receive.xml");

while(true) {

}

}

}

7、编写一个配置文件,jms.properties

[plain] view plain copy

#send

send.name=Topic_Mood

#receive

topic.name=Topic_Mood

topic.clientId=client_LiLei

topic2.name=Topic_Mood

topic2.clientId=client_HanMei

#url

brokerUrl=failover:(tcp://10.0.0.232:61616)?initialReconnectDelay=1000

8、pom里面添加activeMQ的依赖

[html] view plain copy

<dependency>

<groupId>org.apache.activemq</groupId>

<artifactId>activemq-pool</artifactId>

<version>5.11.1</version>

</dependency>

<dependency>

<groupId>org.apache.commons</groupId>

<artifactId>commons-pool2</artifactId>

<version>2.3</version>

</dependency>

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-jms</artifactId>

<version>4.0.0.RELEASE</version>

</dependency>

<dependency>

<groupId>org.apache.activemq</groupId>

<artifactId>activemq-all</artifactId>

<version>5.11.1</version>

</dependency>

耶,运行就ok了。
发送消息的输出是这样的:

[plain] view plain copy

2016-08-05 11:27:19 [ main:0 ] - [ INFO ] Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@16011db4: startup date [Fri Aug 05 11:27:19 CST 2016]; root of context hierarchy

2016-08-05 11:27:19 [ main:31 ] - [ INFO ] Loading XML bean definitions from class path resource [ApplicationContext/applicationContext-send.xml]

2016-08-05 11:27:19 [ main:187 ] - [ INFO ] Loading properties file from class path resource [properties/jms.properties]

2016-08-05 11:27:19 [ main:392 ] - [ INFO ] Established shared JMS Connection: ActiveMQConnection {id=ID:MDG42V9PSU28IKA-60542-1470367639797-1:1,clientId=null,started=false}

2016-08-05 11:27:19 [ ActiveMQ Task-1:467 ] - [ INFO ] Successfully connected to tcp://10.0.0.232:61616

send end

接收消息的输出是这样的:

[plain] view plain copy

2016-08-05 11:28:04 [ ActiveMQ Task-1:490 ] - [ INFO ] Successfully connected to tcp://10.0.0.232:61616

2016-08-05 11:28:04 [ main:498 ] - [ INFO ] Established shared JMS Connection: ActiveMQConnection {id=ID:MDG42V9PSU28IKA-60544-1470367684739-1:1,clientId=client_LiLei,started=false}

2016-08-05 11:28:04 [ ActiveMQ Task-1:504 ] - [ INFO ] Successfully connected to tcp://10.0.0.232:61616

2016-08-05 11:28:04 [ main:509 ] - [ INFO ] Established shared JMS Connection: ActiveMQConnection {id=ID:MDG42V9PSU28IKA-60544-1470367684739-3:1,clientId=client_HanMei,started=false}

this is receiver2, mood is happy,say Hello World!

this is receiver, mood is happy,say Hello World!

配置另一个接收者就是,把第一个接收者的配置复制,然后添加个2,再把接收类复制,添加个2,就搞定了。这种方式也适用于mongodb啊这种配置。在一个工程里面操作两个mongodb数据库。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: