Spring+ActiveMQ配置
2016-09-17 12:34
435 查看
依赖jar包:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version>5.13.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>3.2.2.RELEASE</version>
</dependency>
一、发送端
1、XML配置
<bean id="producerService" class="com.vispractice.vplog.activemq.ProducerServiceImpl" />
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${mq.brokerURL}"/>
<property name="trustAllPackages" value="true"/>
</bean>
<!-- Spring Caching连接工厂
Spring用于管理真正的ConnectionFactory的ConnectionFactory
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="sessionCacheSize" value="100" />
</bean>
-->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!-- Spring JmsTemplate 的消息生产者 start-->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!--这个是队列目的地,点对点的-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>vp-log-queue</value>
</constructor-arg>
</bean>
2.类ProducerServiceImpl .java
@Component
public class ProducerServiceImpl implements ProducerService {
private JmsTemplate jmsTemplate;
public void sendMessage(Destination destination, final ActiveMQObjMsgBean jmsObject) {
System.out.println("---------------生产者发送消息-----------------");
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage(jmsObject);
}
});
}
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
@Resource
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
}
二、接收端
1、XML配置
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${mq.brokerURL}"/>
<property name="trustAllPackages" value="true"/>
</bean>
<!-- Spring Caching连接工厂 Spring用于管理真正的ConnectionFactory的ConnectionFactory 有缓存功能
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="sessionCacheSize" value="100" />
</bean>
-->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!-- Spring JmsTemplate 的消息生产者 start-->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!--这个是队列目的地,点对点的-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>vp-log-queue</value>
</constructor-arg>
</bean>
<!-- 消息监听器 -->
<bean id="receiverListener" class="com.vispractice.vplog.activemq.ReceiverListener"/>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="receiverListener" />
</bean>
2、消息监听器ReceiverListener.java
public class ReceiverListener implements MessageListener {
@Resource(name = "logServiceImpl")
private LogService logService;
/**
* 现在只使用到ObjectMessage,其他的消息类型备用
*/
public void onMessage(Message msg) {
try {
if (msg instanceof TextMessage) {
TextMessage message = (TextMessage) msg;
System.out.println("------Received TextMessage------");
System.out.println(message.getText());
} else if (msg instanceof MapMessage) {
MapMessage message = (MapMessage) msg;
System.out.println("------Received MapMessage------");
System.out.println(message.getString("sysName"));
System.out.println(message.getString("sysFlag"));
System.out.println(message.getString("operation"));
System.out.println(message.getString("operator"));
System.out.println(message.getString("content"));
System.out.println(message.getString("parameter"));
System.out.println(message.getString("ip"));
System.out.println("------Received MapMessage for while------");
Enumeration enumer = message.getMapNames();
while (enumer.hasMoreElements()) {
Object obj = enumer.nextElement();
System.out.println(message.getObject(obj.toString()));
}
} else if (msg instanceof StreamMessage) {
StreamMessage message = (StreamMessage) msg;
System.out.println("------Received StreamMessage------");
System.out.println(message.readString());
System.out.println(message.readBoolean());
System.out.println(message.readLong());
} else if (msg instanceof ObjectMessage) {
System.out.println("------Received ObjectMessage------");
ObjectMessage message = (ObjectMessage) msg;
ActiveMQObjMsgBean jmsObject = (ActiveMQObjMsgBean) message.getObject();
String sysName = jmsObject.getSysName();
String sysFlag = jmsObject.getSysFlag();
String operation = jmsObject.getOperation();
String operator = jmsObject.getOperator();
String content = jmsObject.getContent();
String ip = jmsObject.getIp();
String[] ignoreParameters = jmsObject.getIgnoreParameters();
StringBuffer parameter = new StringBuffer();
Map<String, String[]> parameterMap = jmsObject.getParameterMap();
if (parameterMap != null) {
for (Entry<String, String[]> entry : parameterMap.entrySet()) {
String parameterName = entry.getKey();
if (!ArrayUtils.contains(ignoreParameters, parameterName)) {
String[] parameterValues = entry.getValue();
if (parameterValues != null) {
for (String parameterValue : parameterValues) {
parameter.append(parameterName + " = " + parameterValue + "\n");
}
}
}
}
}
Log log = new Log();
log.setSysName(sysName);
log.setSysFlag(sysFlag);
log.setOperation(operation);
log.setOperator(operator);
log.setContent(content);
log.setParameter(parameter.toString());
log.setIp(ip);
log.setCreateDate(new Date());
logService.save(log);
} else if (msg instanceof BytesMessage) {
System.out.println("------Received BytesMessage------");
BytesMessage message = (BytesMessage) msg;
byte[] byteContent = new byte[1024];
int length = -1;
StringBuffer content = new StringBuffer();
while ((length = message.readBytes(byteContent)) != -1) {
content.append(new String(byteContent, 0, length));
}
System.out.println(content.toString());
} else {
System.out.println(msg);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version>5.13.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>3.2.2.RELEASE</version>
</dependency>
一、发送端
1、XML配置
<bean id="producerService" class="com.vispractice.vplog.activemq.ProducerServiceImpl" />
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${mq.brokerURL}"/>
<property name="trustAllPackages" value="true"/>
</bean>
<!-- Spring Caching连接工厂
Spring用于管理真正的ConnectionFactory的ConnectionFactory
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="sessionCacheSize" value="100" />
</bean>
-->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!-- Spring JmsTemplate 的消息生产者 start-->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!--这个是队列目的地,点对点的-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>vp-log-queue</value>
</constructor-arg>
</bean>
2.类ProducerServiceImpl .java
@Component
public class ProducerServiceImpl implements ProducerService {
private JmsTemplate jmsTemplate;
public void sendMessage(Destination destination, final ActiveMQObjMsgBean jmsObject) {
System.out.println("---------------生产者发送消息-----------------");
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage(jmsObject);
}
});
}
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
@Resource
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
}
二、接收端
1、XML配置
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${mq.brokerURL}"/>
<property name="trustAllPackages" value="true"/>
</bean>
<!-- Spring Caching连接工厂 Spring用于管理真正的ConnectionFactory的ConnectionFactory 有缓存功能
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="sessionCacheSize" value="100" />
</bean>
-->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!-- Spring JmsTemplate 的消息生产者 start-->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!--这个是队列目的地,点对点的-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>vp-log-queue</value>
</constructor-arg>
</bean>
<!-- 消息监听器 -->
<bean id="receiverListener" class="com.vispractice.vplog.activemq.ReceiverListener"/>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="receiverListener" />
</bean>
2、消息监听器ReceiverListener.java
public class ReceiverListener implements MessageListener {
@Resource(name = "logServiceImpl")
private LogService logService;
/**
* 现在只使用到ObjectMessage,其他的消息类型备用
*/
public void onMessage(Message msg) {
try {
if (msg instanceof TextMessage) {
TextMessage message = (TextMessage) msg;
System.out.println("------Received TextMessage------");
System.out.println(message.getText());
} else if (msg instanceof MapMessage) {
MapMessage message = (MapMessage) msg;
System.out.println("------Received MapMessage------");
System.out.println(message.getString("sysName"));
System.out.println(message.getString("sysFlag"));
System.out.println(message.getString("operation"));
System.out.println(message.getString("operator"));
System.out.println(message.getString("content"));
System.out.println(message.getString("parameter"));
System.out.println(message.getString("ip"));
System.out.println("------Received MapMessage for while------");
Enumeration enumer = message.getMapNames();
while (enumer.hasMoreElements()) {
Object obj = enumer.nextElement();
System.out.println(message.getObject(obj.toString()));
}
} else if (msg instanceof StreamMessage) {
StreamMessage message = (StreamMessage) msg;
System.out.println("------Received StreamMessage------");
System.out.println(message.readString());
System.out.println(message.readBoolean());
System.out.println(message.readLong());
} else if (msg instanceof ObjectMessage) {
System.out.println("------Received ObjectMessage------");
ObjectMessage message = (ObjectMessage) msg;
ActiveMQObjMsgBean jmsObject = (ActiveMQObjMsgBean) message.getObject();
String sysName = jmsObject.getSysName();
String sysFlag = jmsObject.getSysFlag();
String operation = jmsObject.getOperation();
String operator = jmsObject.getOperator();
String content = jmsObject.getContent();
String ip = jmsObject.getIp();
String[] ignoreParameters = jmsObject.getIgnoreParameters();
StringBuffer parameter = new StringBuffer();
Map<String, String[]> parameterMap = jmsObject.getParameterMap();
if (parameterMap != null) {
for (Entry<String, String[]> entry : parameterMap.entrySet()) {
String parameterName = entry.getKey();
if (!ArrayUtils.contains(ignoreParameters, parameterName)) {
String[] parameterValues = entry.getValue();
if (parameterValues != null) {
for (String parameterValue : parameterValues) {
parameter.append(parameterName + " = " + parameterValue + "\n");
}
}
}
}
}
Log log = new Log();
log.setSysName(sysName);
log.setSysFlag(sysFlag);
log.setOperation(operation);
log.setOperator(operator);
log.setContent(content);
log.setParameter(parameter.toString());
log.setIp(ip);
log.setCreateDate(new Date());
logService.save(log);
} else if (msg instanceof BytesMessage) {
System.out.println("------Received BytesMessage------");
BytesMessage message = (BytesMessage) msg;
byte[] byteContent = new byte[1024];
int length = -1;
StringBuffer content = new StringBuffer();
while ((length = message.readBytes(byteContent)) != -1) {
content.append(new String(byteContent, 0, length));
}
System.out.println(content.toString());
} else {
System.out.println(msg);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
相关文章推荐
- Windows 下 Spring + ActiveMq + maven 集成配置 之一 服务端配置
- 基于Maven的ActiveMQ+spring 多消息队列配置实例
- Windows 下 Spring + ActiveMq + maven 集成配置 之二 客户端配置
- spring+activemq的配置
- ActiveMQ Spring 集成配置
- spring+activemq配置多个生产者,多个消费者并发处理消息
- spring+activemq实战之配置监听多队列实现不同队列消息消费
- activemq + spring 配置内置vm
- activeMQ + Spring 配置
- Spring+ActiveMQ+Mysql 配置JMS
- JMS学习十二(Spring+ActiveMQ集群配置)
- spring+activemq的配置
- activeMQ+spring+tomcat简单配置
- JMS学习十二(Spring+ActiveMQ集群配置)
- activemq spring配置
- jms+spring+activemq配置
- zookeeper+activemq配置消息中间件集群 spring客户端连接
- Spring+ActiveMQ+Mysql 配置JMS
- spring boot activeMQ 配置
- jms+spring+activemq配置(发送和接收消息)