您的位置:首页 > 编程语言 > Java开发

activeMQ(二)--spring整合activeMQ

2018-03-17 12:29 316 查看
### 工程结构



pom

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring-version}</version>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring-version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.13.2</version>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>${spring-version}</version>
</dependency>
</dependencies>


配置文件

activemq.xml

<context:component-scan base-package="com.susq" />

<context:property-placeholder location="config.properties" />

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${activemq.url}"/>
</bean>

<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>

<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等,设置它为队列模式 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
<property name="pubSubDomain" value="false" />
</bean>

<!-- 设置它为Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="connectionFactory" />
<!-- pub/sub模型(发布/订阅) -->
<property name="pubSubDomain" value="true" />
</bean>

<!--设置队列消息目的地-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<!-- 构造参数的值是队列的名字 -->
<value>${activemq.queue}</value>
</constructor-arg>
</bean>

<!--设置主题消息目的地-->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="${activemq.topic}"/>
</bean>


config.properties

activemq.url=tcp://localhost:61616
activemq.queue=su.queue
activemq.topic=su.topic


log4j.properties

log4j.rootLogger=ERROR,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%c]-[%p] %m%n

#给自己项目设置日志级别

log4j.logger.com.susq=DEBUG




工程代码

JmsTemplate 是spring 提供的简化了同步JMS访问代码的帮助类。文档:https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/core/JmsTemplate.html

生产者

package com.susq.service.impl;

import com.susq.service.ProducerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import javax.jms.Destination;
import javax.jms.Session;

@Component("producerService")
@Slf4j
public class ProducerServiceImpl implements ProducerService {

@Autowired
@Qualifier("jmsQueueTemplate")
private JmsTemplate jmsTemplate;

public void sendMessage(Destination destination, final String message) {
log.debug("---------------生产者发送消息-----------------");
log.debug("---------------生产者发了一个消息:" + message);
/* send()方法第二个参数是MessageCreator类型,它里面只有一个方法,所以他是一个函数式接口,支持lambda表达式
public interface MessageCreator {
Message createMessage(Session var1) throws JMSException;
}
为了防止“函数是接口” 变成"非函数接口”,我们可以在这个上面加上一个声明@FunctionalInterface,
这样别人就无法在里面添加新的接口函数了。
*/
jmsTemplate.send(destination, (Session session) -> session.createTextMessage(message));
}
}


消费者

package com.susq.service.impl;

import com.susq.service.ConsumerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;

@Slf4j
@Component("consumerService")
public class ConsumerServiceImpl implements ConsumerService {

@Autowired
@Qualifier("jmsQueueTemplate")
private JmsTemplate jmsTemplate;

public void receiveMessage(Destination destination) {
log.debug("------------消费消息--------------");
while (true) {
try {
//使用JMSTemplate接收消息
TextMessage txtmsg = (TextMessage) jmsTemplate.receive(destination);
if (null != txtmsg) {
log.debug("--- 收到消息内容为: " + txtmsg.getText());
} else {
break;
}
} catch (JMSException e) {
log.error("错误 {}", e);
}
}
}
}


测试

package com.susq;

import com.susq.service.ConsumerService;
import com.susq.service.ProducerService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import javax.jms.Destination;

/**
* Unit test for simple App.
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:activemq.xml")
public class AppTest {

@Autowired
private ProducerService producerService;

@Autowired
private Destination queueDestination;

@Autowired
private ConsumerService consumerService;

@Test
public void testProducer() {
producerService.sendMessage(queueDestination, "生产消息");
}

@Test
public void testConsumer() {
consumerService.receiveMessage(queueDestination);
}

}


运行生产者,以后,可以看到发送了一条消息等待消费,对ActiveMQ自带Web管控台不熟悉的,可以参考上一篇 http://blog.csdn.net/u013041642/article/details/79547128



运行消费者



​ 消费者中我们写了一个死循环,使线程持续运行,接受消息。如果不循环,只运行一次,那每次只能消费一条消息。

5. 消息监听器

​ 上面我们是用 jmsTemplate.receive(destination) 来接受消息。而使用消息监听器是一种更优雅的消息接受方式,只需要实现消息监听器接口MessageListener即可。topic监听器写法也一样。

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
* @author susq
* @since 2018-03-15-21:18
*/
@Slf4j
@Component("consumerServiceNew")
public class ConsumerServiceNew implements MessageListener {

@Override
public void onMessage(Message message) {
try {
log.debug("ConsumerServiceNew接收到消息:"+((TextMessage)message).getText());
} catch (JMSException e) {
log.debug("接收消息异常,{}", e);
}
}
}


activemq.xml中增加监听容器的配置, 需要设置监听的地址和监听器的bean.

<!-- 定义Queue监听器 -->
<jms:listener-container>
<jms:listener destination="${activemq.queue}" ref="consumerServiceNew"/>
</jms:listener-container>

<!-- 定义topic监听器 -->
<jms:listener-container destination-type="topic" >
<jms:listener destination="${activemq.topic}" ref="topicConsumer"/>
</jms:listener-container>


测试代码

package com.susq;

import com.susq.service.ConsumerService;
import com.susq.service.ProducerService;
import com.susq.service.TopicConsumer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import javax.jms.Destination;

/**
* Unit test for simple App.
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:activemq.xml")
public class AppTest {

@Autowired
private ProducerService producerService;

@Autowired
private Destination queueDestination;

@Autowired
private Destination topicDestination;

@Test
public void testProducer() {
producerService.sendMessage(queueDestination, "queue生产消息");
}

@Test
public void testTopicProducer() {
producerService.sendMessage(topicDestination, "topic生产消息");
}
}


运行生产者,加载配置文件的时候,我们配好的监听容器同时被加载,所以生产消息和监听消息同时进行。

todo: 队列消息的监听器,加载以后可以一次性接受之前生产的未消费的所有队列消息。但是主题消息的监听器,不能收到监听器加载之前的消息。有待研究。

消息监听器除了MessageListener还有SessionAwareMessageListener和MessageListenerAdapter,后面再总结。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  activemq