Spring整合ActiveMQ
2017-03-18 13:47
465 查看
一、引入activemq的包和jms的包
二、消息发送端的srping配置
三、消息发送端代码
四、消息接收端spring配置
五、消息接收端监听器代码
六、测试
<!-- activemq --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>${org.apache.activemq.version}</version> </dependency> <!-- /activemq --> <!-- jms --> <dependency> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> <version>${javax.jms.version}</version> </dependency> <!-- /jms -->
二、消息发送端的srping配置
<!-- 引入属性文件 --> <context:property-placeholder location="classpath:config/sysConfig.properties" ignore-unresolvable="true"/> <!-- 配置JMS连接工厂 --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${brokerURL}" /> </bean> <!-- 配置jmsFactory --> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory" ref="connectionFactory"></property> <property name="maxConnections" value="${maxConnections}"></property> </bean> <!-- 定义消息Destination --> <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg index="0" value="${MQname}" /> </bean> <!-- 定义消息Destination <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="Topic.subject" /> </bean> --> <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsFactory" /> <property name="defaultDestination" ref="destinationQueue" /> <property name="receiveTimeout" value="10000" /> <!-- 区别它采用的模式为false是Queue点对点为true是Topic订阅 --> <property name="pubSubDomain" value="false" /> </bean> <!-- 配置JMS模板(Queue),这里用于发送文件 --> <bean id="jmsBolbTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="destinationQueue" /> <property name="receiveTimeout" value="10000" /> <!-- 区别它采用的模式为false是Queue点对点为true是Topic订阅 --> <property name="pubSubDomain" value="false" /> </bean>
三、消息发送端代码
import java.io.InputStream; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import net.sf.json.JSONObject; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.BlobMessage; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import com.chhuang.mongo.core.holder.SpringContextHolder; import com.chhuang.printer.consts.SysConstants; import com.chhuang.printer.dao.FileDao; import com.chhuang.printer.model.FileInfo; import com.chhuang.utils.config.SysConfig; import com.mongodb.gridfs.GridFSDBFile; /** * 发送给mq服务器 * * @author PrinceChen * */ public class MQSender implements Runnable { private static final Log log = LogFactory.getLog(MQSender.class); private JmsTemplate jmsBolbTemplate; private FileDao fileDao; private FileInfo fileInfo; public MQSender(FileInfo fileInfo){ this.fileInfo = fileInfo; //从spring中获取jmsBolbTemplate jmsBolbTemplate = SpringContextHolder.getBean("jmsBolbTemplate"); fileDao = SpringContextHolder.getBean("fileDao"); } @Override public void run() { //从数据库读取文件 GridFSDBFile gridFile = fileDao.findFileById(fileInfo.getFileId()); try { //发送到mq服务器并接收返回消息 JSONObject json = sendBlobMessage(null, gridFile.getInputStream(), fileInfo); //判断文件接收是否成功 if(json!=null && json.getBoolean(SysConstants.Status.SUCCESS)){ fileDao.updatePrintStatus(fileInfo.getId(), true); log.debug(fileInfo.getFilename()+"--客户端接收成功"); } } catch (JMSException e) { e.printStackTrace(); } } private JSONObject sendBlobMessage(Destination destination, final InputStream inputStream, final FileInfo fileInfo) throws JMSException { JSONObject json = null; if(null == destination){ destination = jmsBolbTemplate.getDefaultDestination(); } //发送到mq服务器并接收返回消息 Message message = jmsBolbTemplate.sendAndReceive(destination, new MessageCreator(){ @Override public Message createMessage(Session arg0) throws JMSException { ActiveMQSession session = (ActiveMQSession)arg0; BlobMessage blobMessage = session.createBlobMessage(inputStream); blobMessage.setObjectProperty(SysConstants.FILE_ID, fileInfo.getFileId()); blobMessage.setObjectProperty(SysConstants.FILE_NAME, fileInfo.getFilename()); blobMessage.setObjectProperty(SysConstants.FILE_SIZE, fileInfo.getFileSize()); blobMessage.setObjectProperty(SysConstants.CONTENT_TYPE, fileInfo.getContentType()); blobMessage.setObjectProperty(SysConstants.MD5, fileInfo.getMd5()); return blobMessage; } }); //读取返回消息 if (message instanceof TextMessage) { json = JSONObject.fromObject(((TextMessage)message).getText()); } return json; } }
四、消息接收端spring配置
<!-- 引入属性文件 --> <bean id="configBean" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>file:conf/printer.properties</value> </list> </property> </bean> <!-- 配置消息消费监听者 --> <bean id="messageReceiver" class="com.chhuang.printer.mq.MSListener"></bean> <!-- 配置JMS连接工厂 --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${brokerURL}" /> </bean> <!-- 配置jmsFactory --> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory" ref="connectionFactory"></property> <property name="maxConnections" value="${maxConnections}"></property> </bean> <!-- 定义消息Destination --> <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg index="0" value="${MQname}" /> </bean> <!-- 定义消息Destination <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="Topic.subject" /> </bean> --> <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsFactory" /> <property name="defaultDestination" ref="destinationQueue" /> <property name="receiveTimeout" value="10000" /> <!-- 区别它采用的模式为false是Queue点对点为true是Topic订阅 --> <property name="pubSubDomain" value="false" /> </bean> <!-- 消息监听容器,配置连接工厂,监听器是上面定义的监听器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="destinationQueue" /> <!--主题(Topic)和队列消息的主要差异体现在JmsTemplate中"pubSubDomain"是否设置为True。如果为True,则是Topic;如果是false或者默认,则是queue--> <property name="pubSubDomain" value="false" /> <property name="messageListener" ref="messageReceiver" /> </bean>
五、消息接收端监听器代码
import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Date; import javax.annotation.Resource; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.TextMessage; import net.sf.json.JSONObject; import org.apache.activemq.BlobMessage; import org.apache.activemq.command.ActiveMQBlobMessage; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.jms.listener.SessionAwareMessageListener; import com.chhuang.printer.consts.PrinterConsts; import com.chhuang.printer.main.PrinterMain; import com.chhuang.printer.thread.PrinterRunnable; import com.chhuang.printer.util.DateUtils; import com.chhuang.printer.util.PrinterConfig; public class MSListener implements SessionAwareMessageListener<Message>{ private static final Log log = LogFactory.getLog(MSListener.class); @Resource private JmsTemplate jmsTemplate; @Override public void onMessage(Message message, Session session) throws JMSException { if(message instanceof BlobMessage){ log.debug("BlobMessage"); receive((BlobMessage)message); }else if (message instanceof ObjectMessage) { log.debug("ObjectMessage"); receive((ObjectMessage)message); } else if (message instanceof TextMessage) { log.debug("TextMessage"); receive((TextMessage)message); } else if (message instanceof StreamMessage) { log.debug("StreamMessage"); receive((StreamMessage)message); } else if (message instanceof MapMessage) { log.debug("MapMessage"); receive((MapMessage)message); } //回复消息 jmsTemplate.send(message.getJMSReplyTo(), new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { JSONObject json = new JSONObject(); json.put("success", true); return session.createTextMessage(json.toString()); } }); } private void receive(BlobMessage message) { try { String fileName = message.getStringProperty(PrinterConfig.getValue(PrinterConsts.FILE_NAME)); Long fileSize = message.getLongProperty(PrinterConfig.getValue(PrinterConsts.FILE_SIZE)); String contentType = message.getStringProperty(PrinterConfig.getValue(PrinterConsts.CONTENT_TYPE)); String md5 = message.getStringProperty(PrinterConfig.getValue(PrinterConsts.MD5)); InputStream inputStream = message.getInputStream(); String outPath = PrinterConsts.PRINTER_FILE_PATH+fileName; OutputStream outputStream = new FileOutputStream(outPath); IOUtils.copyLarge(inputStream, outputStream); inputStream.close(); outputStream.close(); ((ActiveMQBlobMessage)message).deleteFile();//注意处理完后需要手工删除服务器端文件 log.debug(fileName+"文件接收成功"); //启动线程打印文件 new Thread(new PrinterRunnable(outPath)).start(); //写入日志 String date = DateUtils.date2String(new Date(),DateUtils.YYYY_MM_DD_HH_MM_SS); JSONObject content = new JSONObject(); content.put(PrinterConfig.getValue(PrinterConsts.FILE_NAME), fileName); content.put(PrinterConfig.getValue(PrinterConsts.FILE_SIZE), fileSize); content.put(PrinterConsts.CONTENT_TYPE, contentType); content.put(PrinterConsts.MD5, md5); content.put(PrinterConfig.getValue(PrinterConsts.PRINT_DATE), date); PrinterMain.saveLog(content.toString()); } catch (IOException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } } private void receive(ObjectMessage message) { // TODO Auto-generated method stub } private void receive(StreamMessage message) { // TODO Auto-generated method stub } private void receive(TextMessage message) { // TODO Auto-generated method stub try { log.debug(message.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void receive(MapMessage message) { // TODO Auto-generated method stub } }
六、测试
import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.InputStream; import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.BlobMessage; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations={"file:conf/applicationContext_mq.xml"}) public class MQSender { @Resource private JmsTemplate jmsTemplate; @Resource private JmsTemplate jmsBolbTemplate; @Test public void test(){ // sendTextMessage(null,"spring activemq topic type message[with listener] !"); try { sendBlobMessage(null, new FileInputStream("C:/Users/HuangChen/Desktop/test/测试.doc"), "测试.doc", 100); } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /**los * 说明:发送的时候如果这里没有显示的指定destination.将用spring xml中配置的destination * @param destination * @param message */ public void sendTextMessage(Destination destination, final String message){ if(null == destination){ destination = jmsTemplate.getDefaultDestination(); } jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); System.out.println("spring send textmessage..."); } public void sendBlobMessage(Destination destination, final InputStream inputStream, final Object... properties) throws JMSException { if(null == destination){ destination = jmsBolbTemplate.getDefaultDestination(); } Message message = jmsBolbTemplate.sendAndReceive(destination, new MessageCreator(){ @Override public Message createMessage(Session arg0) throws JMSException { ActiveMQSession session = (ActiveMQSession)arg0; BlobMessage blobMessage = session.createBlobMessage(inputStream); blobMessage.setObjectProperty("filename", properties[0]); blobMessage.setObjectProperty("fileSize", properties[1]); return blobMessage; } }); System.out.println("spring send blobmessage..."); if (message instanceof TextMessage) { System.out.println(((TextMessage)message).getText()); } } }
相关文章推荐
- spring整合ActiveMQ
- Spring整合JMS-基于activeMQ实现(二)
- Spring与ActiveMQ整合
- Java消息队列-Spring整合ActiveMq
- 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例
- 基于ActiveMQ实现Spring整合JMS
- spring整合activemq发送MQ消息[queue模式]实例
- activeMQ 点对点以及发布与订阅 - 以及spring的整合&集群方式
- 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例
- Spring整合JMS(一)——基于ActiveMQ实现
- 浅谈Spring Boot 整合ActiveMQ的过程
- ActiveMQ整合Spring
- spring boot 整合activemq 进行服务端消息推送(web页面)
- Java中间件JMS(四)之ActiveMQ整合spring之类转换(三)
- 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例
- 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例
- Spring整合JMS(一)——基于ActiveMQ实现
- spring boot整合JMS(ActiveMQ实现)
- Spring与ActiveMQ的整合
- 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例