您的位置:首页 > 编程语言 > Java开发

Spring整合activeMQ

2014-11-18 09:30 218 查看
1.下载MQ,本文采用apache-activemq-5.10.0

2.包文件除了spring的包与其他必须包外,还需要spring-jms-3.0.5.RELEASE.jar,同时需要加入下载的MQ的包activemq-all-5.10.0.jar

3.建立JMS队列模式消息监听类,实现MessageListener接口,监听地址有消息时自动触发所重写的onMessage方法

public class JMSConsumer implements MessageListener{

@Override
public void onMessage(Message msg) {

TextMessage tm = (TextMessage) msg;
try {
System.out.println("收到消息,内容为:" + tm.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
}


4.新建spring-jms.xml文件,内容如下

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/p http://www.springframework.org/schema/p/spring-p-3.0.xsd ">

<!-- Queue配置start -->
<!-- 配置connectionFactory -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://127.0.0.1:61616</value>
</property>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>

<!-- Spring JMS Template -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref local="jmsFactory" />
</property>
<property name="defaultDestinationName" value="queue_001" />
<!-- 区别它采用的模式为:false是p2p,true是订阅 -->
<property name="pubSubDomain" value="false" />
</bean>

<!-- 发送消息的目的地(一个队列) -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg index="0" value="queue_001" />
</bean>

<!-- 消息监听 -->
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="concurrentConsumers" value="10" />
<property name="connectionFactory" ref="jmsFactory" />
<property name="destinationName" value="queue_001" />
<property name="messageListener" ref="messageReceiver" />
<property name="pubSubNoLocal" value="false"></property>
</bean>

<bean id="messageReceiver" class="com.spring.demo.jms.JMSConsumer">
</bean>
<!-- Queue配置end -->

<!-- Topic配置start -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref local="jmsFactory" />
</property>
<property name="defaultDestinationName" value="topic_001" />
<property name="pubSubDomain" value="true" />
</bean>

<bean id="topic_destination" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 设置主题的名字 -->
<constructor-arg index="0" value="topic_001" />
</bean>
<!-- Topic配置end -->
</beans>


5.启动MQ

6.进行队列消息测试,发送消息后会触发JMSConsumer的onMessage方法

public static void main(String[] args) {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext(
new String[] { "classpath:com/spring/demo/config/spring-jms.xml" });

JmsTemplate jmsTemplate = (JmsTemplate) applicationContext
.getBean("jmsTemplate");

jmsTemplate.send(new MessageCreator() {
@Override
public Message createMessage(Session session) {
System.out.println("发送queue消息成功。。。");
try {
return session.createTextMessage("hello,我是消息啊!");
} catch (Exception e) {
e.printStackTrace();
}
return null;

}
});
}


7.进行主题-订阅消息测试。建立JMSTopicReceive类,用于接收消息,主题-订阅模式的消息不会触发某个类的方法,需自己去监听,下面调用receive()方法是会阻塞方法,一直进行消息的接收。

public class JMSTopicReceive {

public static void main(String[] args) {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext(
new String[] { "classpath:com/spring/demo/config/spring-jms.xml" });

JmsTemplate jmsTemplate = (JmsTemplate) applicationContext
.getBean("jmsTopicTemplate");

TextMessage message = (TextMessage) jmsTemplate.receive();

try {
System.out.println("订阅者收到消息:" + message.getText());
} catch (JMSException e) {
e.printStackTrace();
}

}
}


运行以上接收方法,可运行多个。建立主题消息发送测试方法

public static void main(String[] args) {

ApplicationContext applicationContext = new ClassPathXmlApplicationContext(
new String[] { "classpath:com/spring/demo/config/spring-jms.xml" });

JmsTemplate jmsTemplate = (JmsTemplate) applicationContext
.getBean("jmsTopicTemplate");

jmsTemplate.send(new MessageCreator() {
@Override
public Message createMessage(Session session) {
int ran = new Random().nextInt(1000000000);
System.out.println("发送topic消息成功,随机数字:"+ran);

try {
return session.createTextMessage("topic,这是主题消息!随机数字:" + ran);
} catch (Exception e) {
e.printStackTrace();
}
return null;

}
});
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: