您的位置:首页 > 编程语言 > Java开发

Spring整合ActiveMQ

2017-03-18 13:47 465 查看
一、引入activemq的包和jms的包

<!-- activemq -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${org.apache.activemq.version}</version>
</dependency>
<!-- /activemq -->
<!-- jms -->
<dependency>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
<version>${javax.jms.version}</version>
</dependency>
<!-- /jms -->


二、消息发送端的srping配置

<!-- 引入属性文件 -->
<context:property-placeholder location="classpath:config/sysConfig.properties" ignore-unresolvable="true"/>

<!-- 配置JMS连接工厂 -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${brokerURL}" />
</bean>

<!-- 配置jmsFactory -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory" ref="connectionFactory"></property>
<property name="maxConnections" value="${maxConnections}"></property>
</bean>

<!-- 定义消息Destination -->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg index="0" value="${MQname}" />
</bean>

<!-- 定义消息Destination
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="Topic.subject" />
</bean>
-->

<!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory" />
<property name="defaultDestination" ref="destinationQueue" />
<property name="receiveTimeout" value="10000" />
<!-- 区别它采用的模式为false是Queue点对点为true是Topic订阅 -->
<property name="pubSubDomain" value="false" />
</bean>

<!-- 配置JMS模板(Queue),这里用于发送文件 -->
<bean id="jmsBolbTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="destinationQueue" />
<property name="receiveTimeout" value="10000" />
<!-- 区别它采用的模式为false是Queue点对点为true是Topic订阅 -->
<property name="pubSubDomain" value="false" />
</bean>


三、消息发送端代码

import java.io.InputStream;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import net.sf.json.JSONObject;

import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import com.chhuang.mongo.core.holder.SpringContextHolder;
import com.chhuang.printer.consts.SysConstants;
import com.chhuang.printer.dao.FileDao;
import com.chhuang.printer.model.FileInfo;
import com.chhuang.utils.config.SysConfig;
import com.mongodb.gridfs.GridFSDBFile;

/**
* 发送给mq服务器
*
* @author PrinceChen
*
*/

public class MQSender implements Runnable {
private static final Log log = LogFactory.getLog(MQSender.class);

private JmsTemplate jmsBolbTemplate;
private FileDao fileDao;

private FileInfo fileInfo;
public MQSender(FileInfo fileInfo){
this.fileInfo = fileInfo;
//从spring中获取jmsBolbTemplate
jmsBolbTemplate = SpringContextHolder.getBean("jmsBolbTemplate");
fileDao = SpringContextHolder.getBean("fileDao");
}

@Override
public void run() {
//从数据库读取文件
GridFSDBFile gridFile = fileDao.findFileById(fileInfo.getFileId());
try {
//发送到mq服务器并接收返回消息
JSONObject json = sendBlobMessage(null, gridFile.getInputStream(), fileInfo);
//判断文件接收是否成功
if(json!=null && json.getBoolean(SysConstants.Status.SUCCESS)){
fileDao.updatePrintStatus(fileInfo.getId(), true);
log.debug(fileInfo.getFilename()+"--客户端接收成功");
}
} catch (JMSException e) {
e.printStackTrace();
}
}

private JSONObject sendBlobMessage(Destination destination, final InputStream inputStream, final FileInfo fileInfo) throws JMSException {
JSONObject json = null;
if(null == destination){
destination = jmsBolbTemplate.getDefaultDestination();
}
//发送到mq服务器并接收返回消息
Message message = jmsBolbTemplate.sendAndReceive(destination, new MessageCreator(){
@Override
public Message createMessage(Session arg0) throws JMSException {
ActiveMQSession session = (ActiveMQSession)arg0;
BlobMessage blobMessage = session.createBlobMessage(inputStream);
blobMessage.setObjectProperty(SysConstants.FILE_ID, fileInfo.getFileId());
blobMessage.setObjectProperty(SysConstants.FILE_NAME, fileInfo.getFilename());
blobMessage.setObjectProperty(SysConstants.FILE_SIZE, fileInfo.getFileSize());
blobMessage.setObjectProperty(SysConstants.CONTENT_TYPE, fileInfo.getContentType());
blobMessage.setObjectProperty(SysConstants.MD5, fileInfo.getMd5());
return blobMessage;
}
});
//读取返回消息
if (message instanceof TextMessage) {
json = JSONObject.fromObject(((TextMessage)message).getText());
}
return json;
}

}


四、消息接收端spring配置

<!-- 引入属性文件 -->
<bean id="configBean" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>file:conf/printer.properties</value>
</list>
</property>
</bean>

<!-- 配置消息消费监听者 -->
<bean id="messageReceiver" class="com.chhuang.printer.mq.MSListener"></bean>

<!-- 配置JMS连接工厂 -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${brokerURL}" />
</bean>

<!-- 配置jmsFactory -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory" ref="connectionFactory"></property>
<property name="maxConnections" value="${maxConnections}"></property>
</bean>

<!-- 定义消息Destination -->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg index="0" value="${MQname}" />
</bean>

<!-- 定义消息Destination
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="Topic.subject" />
</bean>
-->

<!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory" />
<property name="defaultDestination" ref="destinationQueue" />
<property name="receiveTimeout" value="10000" />
<!-- 区别它采用的模式为false是Queue点对点为true是Topic订阅 -->
<property name="pubSubDomain" value="false" />
</bean>

<!-- 消息监听容器,配置连接工厂,监听器是上面定义的监听器 -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="destinationQueue" />
<!--主题(Topic)和队列消息的主要差异体现在JmsTemplate中"pubSubDomain"是否设置为True。如果为True,则是Topic;如果是false或者默认,则是queue-->
<property name="pubSubDomain" value="false" />
<property name="messageListener" ref="messageReceiver" />
</bean>


五、消息接收端监听器代码

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Date;

import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

import net.sf.json.JSONObject;

import org.apache.activemq.BlobMessage;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.SessionAwareMessageListener;

import com.chhuang.printer.consts.PrinterConsts;
import com.chhuang.printer.main.PrinterMain;
import com.chhuang.printer.thread.PrinterRunnable;
import com.chhuang.printer.util.DateUtils;
import com.chhuang.printer.util.PrinterConfig;

public class MSListener implements SessionAwareMessageListener<Message>{
private static final Log log = LogFactory.getLog(MSListener.class);

@Resource
private JmsTemplate jmsTemplate;

@Override
public void onMessage(Message message, Session session) throws JMSException {
if(message instanceof BlobMessage){
log.debug("BlobMessage");
receive((BlobMessage)message);
}else if (message instanceof ObjectMessage) {
log.debug("ObjectMessage");
receive((ObjectMessage)message);
} else if (message instanceof TextMessage) {
log.debug("TextMessage");
receive((TextMessage)message);
} else if (message instanceof StreamMessage) {
log.debug("StreamMessage");
receive((StreamMessage)message);
} else if (message instanceof MapMessage) {
log.debug("MapMessage");
receive((MapMessage)message);
}
//回复消息
jmsTemplate.send(message.getJMSReplyTo(), new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
JSONObject json = new JSONObject();
json.put("success", true);
return session.createTextMessage(json.toString());
}
});
}

private void receive(BlobMessage message) {
try {
String fileName = message.getStringProperty(PrinterConfig.getValue(PrinterConsts.FILE_NAME));
Long fileSize = message.getLongProperty(PrinterConfig.getValue(PrinterConsts.FILE_SIZE));
String contentType = message.getStringProperty(PrinterConfig.getValue(PrinterConsts.CONTENT_TYPE));
String md5 = message.getStringProperty(PrinterConfig.getValue(PrinterConsts.MD5));

InputStream inputStream = message.getInputStream();
String outPath = PrinterConsts.PRINTER_FILE_PATH+fileName;
OutputStream outputStream = new FileOutputStream(outPath);
IOUtils.copyLarge(inputStream, outputStream);
inputStream.close();
outputStream.close();
((ActiveMQBlobMessage)message).deleteFile();//注意处理完后需要手工删除服务器端文件
log.debug(fileName+"文件接收成功");

//启动线程打印文件
new Thread(new PrinterRunnable(outPath)).start();

//写入日志
String date = DateUtils.date2String(new Date(),DateUtils.YYYY_MM_DD_HH_MM_SS);
JSONObject content = new JSONObject();
content.put(PrinterConfig.getValue(PrinterConsts.FILE_NAME), fileName);
content.put(PrinterConfig.getValue(PrinterConsts.FILE_SIZE), fileSize);
content.put(PrinterConsts.CONTENT_TYPE, contentType);
content.put(PrinterConsts.MD5, md5);
content.put(PrinterConfig.getValue(PrinterConsts.PRINT_DATE), date);
PrinterMain.saveLog(content.toString());
} catch (IOException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}

private void receive(ObjectMessage message) {
// TODO Auto-generated method stub

}

private void receive(StreamMessage message) {
// TODO Auto-generated method stub

}

private void receive(TextMessage message) {
// TODO Auto-generated method stub
try {
log.debug(message.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

private void receive(MapMessage message) {
// TODO Auto-generated method stub

}

}


六、测试

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"file:conf/applicationContext_mq.xml"})
public class MQSender {
@Resource
private JmsTemplate jmsTemplate;

@Resource
private JmsTemplate jmsBolbTemplate;

@Test
public void test(){
//        sendTextMessage(null,"spring activemq topic type message[with listener] !");
try {
sendBlobMessage(null, new FileInputStream("C:/Users/HuangChen/Desktop/test/测试.doc"), "测试.doc", 100);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

/**los
* 说明:发送的时候如果这里没有显示的指定destination.将用spring xml中配置的destination
* @param destination
* @param message
*/
public void sendTextMessage(Destination destination, final String message){
if(null == destination){
destination = jmsTemplate.getDefaultDestination();
}
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
System.out.println("spring send textmessage...");
}

public void sendBlobMessage(Destination destination, final InputStream inputStream, final Object... properties) throws JMSException {
if(null == destination){
destination = jmsBolbTemplate.getDefaultDestination();
}
Message message = jmsBolbTemplate.sendAndReceive(destination, new MessageCreator(){
@Override
public Message createMessage(Session arg0) throws JMSException {
ActiveMQSession session = (ActiveMQSession)arg0;
BlobMessage blobMessage = session.createBlobMessage(inputStream);
blobMessage.setObjectProperty("filename", properties[0]);
blobMessage.setObjectProperty("fileSize", properties[1]);
return blobMessage;
}
});
System.out.println("spring send blobmessage...");
if (message instanceof TextMessage) {
System.out.println(((TextMessage)message).getText());
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spring activemq jms