基于spring整合activeMQ以及点对点队列的封装
2017-01-03 14:02
423 查看
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
配置文件内容
消息转化器代码:
以上就配置了消息链接工厂以及接受处理消息的jms模板,接下来封装生产者,代码如下:
上面代码封装了生产者的发送普通消息,发送对象消息,发送点对点应答消息,三种模式的生产者。调用代码如下,以应答模式为例:
接下来就是配置监听器,配置文件如下:
到此,整个基于spring的整合activeMQ就完成了,可以通过 new 封装的生产者,在不同的模块中调用队列服务,配置文件中只需要配置自己模块的监听器就可以了。
配置文件内容
<!-- 队列服务配置项 --> <!-- 配置JMS连接工厂--> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <!--将该值开启官方说法是可以取得更高的发送速度(5倍)--> <property name="useAsyncSend" value="true" /> <!-- 对于一个connection如果只有一个session,该值有效,否则该值无效,默认这个参数的值为true。--> <property name="alwaysSessionAsync" value="true" /> <property name="useDedicatedTaskRunner" value="true" /> <property name="brokerURL" value="tcp://localhost:61616"/> </bean> <!--消息转换--> <bean id="messageConverter" class="cn.com.do1.component.ability.polycost.common.ObjectMessageConverter"/> <!--Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="jmsTemplate" name="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="receiveTimeout" value="10000" /> <!-- <property name="defaultDestination" ref="queueDestination" />--> <!--pubSubDomain false表示是队列 --> <property name="pubSubDomain" value="false" /> <!-- 类转换 --> <property name="messageConverter" ref="messageConverter"></property> </bean> <!-- 队列服务配置项 end-->由于activemq传对象消息需要进行转化,所以,要配置一个消息转化器,注意:传对象消息,对象必须进行序列化。
消息转化器代码:
package cn.com.do1.component.ability.polycost.common; import java.io.Serializable; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.apache.activemq.command.ActiveMQObjectMessage; import org.springframework.jms.support.converter.MessageConversionException; import org.springframework.jms.support.converter.MessageConverter; public class ObjectMessageConverter implements MessageConverter{ @Override public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException { ActiveMQObjectMessage msg = (ActiveMQObjectMessage) session.createObjectMessage(); msg.setObject((Serializable) object); return msg; } @Override public Object fromMessage(Message message) throws JMSException, MessageConversionException { Object object = null; if(message instanceof ActiveMQObjectMessage){ ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage) message; object=(Object) aMsg.getObject(); } return object; } }
以上就配置了消息链接工厂以及接受处理消息的jms模板,接下来封装生产者,代码如下:
package cn.com.do1.component.ability.polycost.common; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.commons.lang3.RandomStringUtils; import org.springframework.jms.JmsException; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class BaseProducer { private JmsTemplate jmsTemplate; private String queueName; public BaseProducer(String queueName,JmsTemplate jmsTemplate) { super(); this.queueName = queueName; this.jmsTemplate=jmsTemplate; } public String getQueueName() { return queueName; } public void setQueueName(String queueName) { this.queueName = queueName; } /** * 发送文本消息 * @param txtMsg 发送普通文本消息 * @param isReply 设置是否发送的是应答消息,true发送repMsg,false发送txtMsg。 * @param repMsg 应答消息 * */ public void sendTxtMsg(final String txtMsg, final Boolean isReply,final String repMsg){ try { Connection connection= jmsTemplate.getConnectionFactory().createConnection(); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);//消息队列设置自动应答,不需要客户端调用ACKNOWLEDGE来确认消费了消息。 final Destination destination=session.createQueue(queueName); // 如果该队列不存在就创建消息队列,否则就连接队列 jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { // TODO Auto-generated method stub if(isReply){ String correlationId = RandomStringUtils.randomNumeric(5); TextMessage message =session.createTextMessage(repMsg); message.setJMSReplyTo(destination);//设置回复消息的目的地 message.setJMSCorrelationID(correlationId); //设置回复队列消息的id return message; } return session.createTextMessage(txtMsg); } }); } catch (JmsException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 发送对象消息*/ public void sendObjMsg(Object message){ try { Connection connection= jmsTemplate.getConnectionFactory().createConnection(); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination=session.createQueue(queueName); // 如果该队列不存在就创建消息队列,否则就连接队列 jmsTemplate.convertAndSend(destination, message); } catch (JmsException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /**发送对象消息并等待应答 * message:发送的消息内容 * reciveQueueName:接受应答消息的队列名 * */ public String sendMsgAndRecive(Object message,String reciveQueueName){ String replyMsg = null; try { Connection connection= jmsTemplate.getConnectionFactory().createConnection(); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination=session.createQueue(queueName); //如果该队列不存在就创建消息队列,否则就连接队列 jmsTemplate.convertAndSend(destination, message); // 发送之后,等待那边发送确认消息 Destination recall_destination=session.createQueue(reciveQueueName); TextMessage textMsg=(TextMessage) jmsTemplate.receive(recall_destination); replyMsg=textMsg.getText(); } catch (JmsException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } return replyMsg; } }
上面代码封装了生产者的发送普通消息,发送对象消息,发送点对点应答消息,三种模式的生产者。调用代码如下,以应答模式为例:
BaseProducer producer =new BaseProducer("polycost.jhs.queue", jmsTemplate); String msg=producer.sendMsgAndRecive(orderPO, "jhs.orderReply.queue");msg就是接受消费者消费消息后的回复消息。
接下来就是配置监听器,配置文件如下:
<bean id="orderListener" class="cn.com.do1.component.ability.polycost.listener.OrderListener" /> <bean id="orderListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <property name="delegate" ref="orderListener" /> </bean> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connection a226 Factory" ref="connectionFactory" /> <property name="destinationName" value="polycost.jhs.queue" /> <!-- 需要监听的队列--> <property name="messageListener" ref="orderListenerAdapter" /> <property name="sessionTransacted" value="true" /> <property name="concurrentConsumers" value="1"/><!-- 表示开启一个线程去消费--> </bean>
到此,整个基于spring的整合activeMQ就完成了,可以通过 new 封装的生产者,在不同的模块中调用队列服务,配置文件中只需要配置自己模块的监听器就可以了。
相关文章推荐
- spring整合activemq消息队列之点对点模式
- activeMQ 点对点以及发布与订阅 - 以及spring的整合&集群方式
- Spring整合JMS——基于ActiveMQ实现
- ActiveMQ消息队列和spring进行整合实例
- Spring整合JMS(一)——基于ActiveMQ实现
- Spring整合JMS——基于ActiveMQ实现
- Spring整合JMS-基于activeMQ实现(二)
- 关于JMS与SPRING的整合实例(基于Apache ActiveMQ)
- Spring整合JMS——基于ActiveMQ实现
- Spring整合JMS(一)——基于ActiveMQ实现
- Spring整合JMS——基于ActiveMQ实现
- activemq与spring整合,自动消费队列的配置与代码
- Spring2.5,Activemq5.2整合遇到的问题以及解决方法
- spring整合JMS - 基于ActiveMQ实现
- Spring整合JMS(一)——基于ActiveMQ实现
- JMS与Spring的整合实例(基于Apache ActiveMQ)JMS的介绍
- Spring整合JMS(一)——基于ActiveMQ实现
- Spring整合Jms学习(一)_基于ActiveMQ实现
- Spring整合JMS(一)——基于ActiveMQ实现
- Spring整合JMS-基于activeMQ实现(一)