您的位置:首页 > 编程语言 > Java开发

基于spring整合activeMQ以及点对点队列的封装

2017-01-03 14:02 423 查看
      ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

     配置文件内容

     

<!-- 队列服务配置项 -->
<!-- 配置JMS连接工厂-->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!--将该值开启官方说法是可以取得更高的发送速度(5倍)-->
<property name="useAsyncSend" value="true" />
<!-- 对于一个connection如果只有一个session,该值有效,否则该值无效,默认这个参数的值为true。-->
<property name="alwaysSessionAsync" value="true" />
<property name="useDedicatedTaskRunner" value="true" />
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<!--消息转换-->
<bean id="messageConverter" class="cn.com.do1.component.ability.polycost.common.ObjectMessageConverter"/>

<!--Spring提供的JMS工具类,它可以进行消息发送、接收等  -->
<bean id="jmsTemplate" name="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">

<property name="connectionFactory" ref="connectionFactory" />
<property name="receiveTimeout" value="10000" />
<!-- <property name="defaultDestination" ref="queueDestination" />-->
<!--pubSubDomain false表示是队列 -->
<property name="pubSubDomain" value="false" />
<!-- 类转换  -->
<property name="messageConverter" ref="messageConverter"></property>
</bean>
<!-- 队列服务配置项  end-->
由于activemq传对象消息需要进行转化,所以,要配置一个消息转化器,注意:传对象消息,对象必须进行序列化。

消息转化器代码:

package cn.com.do1.component.ability.polycost.common;
import java.io.Serializable;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.apache.activemq.command.ActiveMQObjectMessage;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;

public class ObjectMessageConverter implements MessageConverter{

@Override
public Message toMessage(Object object, Session session)
throws JMSException, MessageConversionException {
ActiveMQObjectMessage msg = (ActiveMQObjectMessage) session.createObjectMessage();
msg.setObject((Serializable) object);
return msg;
}

@Override
public Object fromMessage(Message message) throws JMSException,
MessageConversionException {
Object object = null;
if(message instanceof ActiveMQObjectMessage){
ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage) message;
object=(Object) aMsg.getObject();
}
return object;
}

}


以上就配置了消息链接工厂以及接受处理消息的jms模板,接下来封装生产者,代码如下:

package cn.com.do1.component.ability.polycost.common;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.commons.lang3.RandomStringUtils;
import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
public class BaseProducer {

private JmsTemplate jmsTemplate;
private String queueName;

public BaseProducer(String queueName,JmsTemplate jmsTemplate) {
super();
this.queueName = queueName;
this.jmsTemplate=jmsTemplate;

}

public String getQueueName() {
return queueName;
}

public void setQueueName(String queueName) {
this.queueName = queueName;
}
/**
* 发送文本消息
* @param txtMsg  发送普通文本消息
* @param isReply 设置是否发送的是应答消息,true发送repMsg,false发送txtMsg。
* @param repMsg 应答消息
* */
public void sendTxtMsg(final String txtMsg, final Boolean isReply,final String repMsg){
try {
Connection connection=	jmsTemplate.getConnectionFactory().createConnection();
Session session=connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);//消息队列设置自动应答,不需要客户端调用ACKNOWLEDGE来确认消费了消息。
final Destination destination=session.createQueue(queueName); // 如果该队列不存在就创建消息队列,否则就连接队列
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
// TODO Auto-generated method stub
if(isReply){
String correlationId = RandomStringUtils.randomNumeric(5);
TextMessage message =session.createTextMessage(repMsg);
message.setJMSReplyTo(destination);//设置回复消息的目的地
message.setJMSCorrelationID(correlationId);  //设置回复队列消息的id
return message;
}
return session.createTextMessage(txtMsg);
}
});
} catch (JmsException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
/**
* 发送对象消息*/
public void sendObjMsg(Object message){
try {
Connection connection=	jmsTemplate.getConnectionFactory().createConnection();
Session session=connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Destination destination=session.createQueue(queueName); // 如果该队列不存在就创建消息队列,否则就连接队列
jmsTemplate.convertAndSend(destination, message);
} catch (JmsException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**发送对象消息并等待应答
* message:发送的消息内容
* reciveQueueName:接受应答消息的队列名
* */
public String sendMsgAndRecive(Object message,String reciveQueueName){
String replyMsg = null;
try {
Connection connection=	jmsTemplate.getConnectionFactory().createConnection();
Session session=connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
Destination destination=session.createQueue(queueName); //如果该队列不存在就创建消息队列,否则就连接队列
jmsTemplate.convertAndSend(destination, message);
// 发送之后,等待那边发送确认消息
Destination recall_destination=session.createQueue(reciveQueueName);
TextMessage textMsg=(TextMessage) jmsTemplate.receive(recall_destination);
replyMsg=textMsg.getText();
} catch (JmsException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return replyMsg;
}
}


上面代码封装了生产者的发送普通消息,发送对象消息,发送点对点应答消息,三种模式的生产者。调用代码如下,以应答模式为例:

BaseProducer producer =new BaseProducer("polycost.jhs.queue", jmsTemplate);
String msg=producer.sendMsgAndRecive(orderPO, "jhs.orderReply.queue");
msg就是接受消费者消费消息后的回复消息。

接下来就是配置监听器,配置文件如下:

<bean id="orderListener" class="cn.com.do1.component.ability.polycost.listener.OrderListener" />
<bean id="orderListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<property name="delegate" ref="orderListener" />
</bean>
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connection
a226
Factory" ref="connectionFactory" />
<property name="destinationName" value="polycost.jhs.queue" /> <!-- 需要监听的队列-->
<property name="messageListener" ref="orderListenerAdapter" />
<property name="sessionTransacted" value="true" />
<property name="concurrentConsumers" value="1"/><!-- 表示开启一个线程去消费-->
</bean>


到此,整个基于spring的整合activeMQ就完成了,可以通过 new 封装的生产者,在不同的模块中调用队列服务,配置文件中只需要配置自己模块的监听器就可以了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spring JAVA MQ