spring整合activeMQ
2018-02-13 16:12
113 查看
Active的本机网址是http://localhost:8161/
队列queue 的特点就是,许多人可以监听它,但是只有一个人能够收到消息。
主题topic 的特点就是,许多人监听它,都能收到消息。
注意:里面有个坑,因为定义的模板属于同一个类型,用autowired的去注入会出现问题,这里需要提供具体是那个名字id去注入
加上这个注解@Qualifier(“topicTemplate”)
1。安装active
2。所依赖的pom
spring配置文件
配置文件很多,因为我整合了工作流,事务等,你只需要关注activemq那部分就可以,注意约束文件,坑很多。搞得我大半个小时才好
大致流程:你需要一个连接
然后把连接放到spring的管理工厂中
你需要定义一个模板,jms是spring提供的一套模板,分为queue和topic两种。
然后你就可以引入你定义的模板,然后发送消息,模板提供的只是一个接口,你需要去实现具体看代码
你还需要注册监听者,可以多个,监听你所创建的队列或者主题,这个接听者需要你自己实现,需要实现接口,具体看代码
队列接收1
QueueRece1
QueueSend
主题topic
TopicRece1
TopicSend
队列queue 的特点就是,许多人可以监听它,但是只有一个人能够收到消息。
主题topic 的特点就是,许多人监听它,都能收到消息。
注意:里面有个坑,因为定义的模板属于同一个类型,用autowired的去注入会出现问题,这里需要提供具体是那个名字id去注入
加上这个注解@Qualifier(“topicTemplate”)
1。安装active
2。所依赖的pom
<!--activiMQ整合--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.6.RELEASE</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> <version>4.1.6.RELEASE</version> <scope>compile</scope> </dependency> <!-- 这个看教程是没有加的,但是我的报错了,加上这个就好了,不知道原因, 4000 你们看情况而定 --> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>4.5</version> </dependency>
spring配置文件
配置文件很多,因为我整合了工作流,事务等,你只需要关注activemq那部分就可以,注意约束文件,坑很多。搞得我大半个小时才好
大致流程:你需要一个连接
然后把连接放到spring的管理工厂中
你需要定义一个模板,jms是spring提供的一套模板,分为queue和topic两种。
然后你就可以引入你定义的模板,然后发送消息,模板提供的只是一个接口,你需要去实现具体看代码
你还需要注册监听者,可以多个,监听你所创建的队列或者主题,这个接听者需要你自己实现,需要实现接口,具体看代码
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd" > <!--引用jdbc配置文件--> <context:property-placeholder location="classpath:jdbc.properties"/> <!--开启切面自动代理--> <aop:aspectj-autoproxy proxy-target-class="true" /> <!--开启包扫描--> <context:component-scan base-package="com.coder520" /> <!--扫描注解生成bean--> <context:annotation-config /> <!--阿里巴巴数据源--> <!--数据库设置--> <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close" init-method="init"> <property name="url" value="${jdbc_url}"/> <property name="username" value="${jdbc_username}"/> <property name="password" value="${jdbc_password}"/> <!-- 初始化连接大小 --> <property name="initialSize" value="0"/> <!-- 连接池最大使用连接数量 --> <property name="maxActive" value="20"/> <!-- 连接池最小空闲 --> <property name="minIdle" value="0"/> <!-- 获取连接最大等待时间 --> <property name="maxWait" value="60000"/> <!-- <property name="poolPreparedStatements" value="true" /> <property name="maxPoolPreparedStatementPerConnectionSize" value="33" /> --> <property name="validationQuery" value="${validationQuery}"/> <property name="testOnBorrow" value="false"/> <property name="testOnReturn" value="false"/> <property name="testWhileIdle" value="true"/> <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 --> <property name="timeBetweenEvictionRunsMillis" value="60000"/> <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 --> <property name="minEvictableIdleTimeMillis" value="25200000"/> <!-- 打开removeAbandoned功能 --> <property name="removeAbandoned" value="true"/> <!-- 1800秒,也就是30分钟 --> <property name="removeAbandonedTimeout" value="1800"/> <!-- 关闭abanded连接时输出错误日志 --> <property name="logAbandoned" value="true"/> <!-- 监控数据库 --> <!-- <property name="filters" value="stat" /> --> <property name="filters" value="mergeStat"/> </bean> <!--sessionFactory--> <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"> <!--数据源--> <property name="dataSource" ref="dataSource" /> <!--需要扫描的mapper的映射包--> <property name="mapperLocations" value="classpath:com/coder520/**/**.xml" /> </bean> <!--mybatis的接口扫描--> <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer"> <!--需要扫描的映射接口--> <property name="basePackage" value="com.coder520.*.dao"/> <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" /> </bean> <!--设置事务 采用注解的方式--> <tx:annotation-driven transaction-manager="transactionManager"/> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource"/> </bean> <!--流程引擎配置项--> <bean id="processEngineConfiguration" class="org.activiti.spring.SpringProcessEngineConfiguration"> <!--数据源 和系统同一个--> <property name="dataSource" ref="dataSource"/> <property name="transactionManager" ref="transactionManager" /> <!--更新数据库表 如果没有 则创建--> <property name="databaseSchemaUpdate" value="true" /> <!-- 是否启动jobExecutor --> <property name="jobExecutorActivate" value="false" /> <property name="activityFontName" value="宋体"/> <property name="labelFontName" value="宋体"/> <!--自动部署流程--> <!--<property name="deploymentResources">--> <!--<list>--> <!--<value>classpath*:workflow/*.bpm 127a9 n</value>--> <!--</list>--> <!--</property>--> </bean> <!--流程引擎--> <bean id="processEngine" class="org.activiti.spring.ProcessEngineFactoryBean"> <property name="processEngineConfiguration" ref="processEngineConfiguration" /> </bean> <!-- 工作流数据存储服务 --> <bean id="repositoryService" factory-bean="processEngine" factory-method="getRepositoryService" /> <!-- 工作流运行时服务 --> <bean id="runtimeService" factory-bean="processEngine" factory-method="getRuntimeService" /> <!-- 工作流任务服务--> <bean id="taskService" factory-bean="processEngine" factory-method="getTaskService" /> <!-- 工作流历史数据服务--> <bean id="historyService" factory-bean="processEngine" factory-method="getHistoryService" /> <!-- 工作流管理服务--> <bean id="managementService" factory-bean="processEngine" factory-method="getManagementService" /> <!-- 工作流身份识别服务 --> <bean id="IdentityService" factory-bean="processEngine" factory-method="getIdentityService" /> <!--整合activiMQ--> <amq:connectionFactory brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" id="amqConnectionFactory" /> <!-- 配置JMS连接工场 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <constructor-arg ref="amqConnectionFactory" /> <property name="sessionCacheSize" value="100" /> </bean> <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 --> <bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory" /> <property name="receiveTimeout" value="10000" /> <!-- true是topic,false是queue,默认是false,此处显示写出false --> <property name="pubSubDomain" value="false" /> </bean> <!-- 配置JMS模板(Topic),Spring提供的JMS工具类,它发送、接收消息。 --> <bean id="topicTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory" /> <property name="receiveTimeout" value="10000" /> <!-- true是topic,false是queue,默认是false,此处显示写出false --> <property name="pubSubDomain" value="true" /> </bean> <!--监听队列 需要自定义两个类--> <jms:listener-container connection-factory="connectionFactory" destination-type="queue"> <jms:listener destination="coder520.queue" ref="queueRece1" /> <jms:listener destination="coder520.queue" ref="queueRece2" /> </jms:listener-container> <!--监听发布订阅 需要自定义两个类--> <jms:listener-container connection-factory="connectionFactory" destination-type="topic"> <jms:listener destination="coder520.topic" ref="topicRece1" /> <jms:listener destination="coder520.topic" ref="topicRece2" /> </jms:listener-container> </beans>
队列接收1
QueueRece1
package com.coder520.activeMQ.queue; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * Created by Administrator on 2018/2/13. */ @Component("queueRece1") public class QueueRece1 implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.println("QueueRece1接收到消息--"+((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
QueueSend
package com.coder520.activeMQ.queue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; /** * Created by Administrator on 2018/2/13. */ @Component public class QueueSend { @Autowired @Qualifier("queueTemplate") JmsTemplate queueTemplate; public void send(String queueName,String message){ queueTemplate.send(queueName, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } }
主题topic
TopicRece1
package com.coder520.activeMQ.topic; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * Created by Administrator on 2018/2/13. */ @Component("topicRece1") public class TopicRece1 implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.println("topicRece1接收到消息--"+((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
TopicSend
package com.coder520.activeMQ.topic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; /** * Created by Administrator on 2018/2/13. */ @Component public class TopicSend { @Autowired @Qualifier("topicTemplate") JmsTemplate topicTemplate; public void send(String queueName,String message){ topicTemplate.send(queueName, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } }
相关文章推荐
- SpringBoot系列八:SpringBoot整合消息服务(SpringBoot 整合 ActiveMQ、SpringBoot 整合 RabbitMQ、SpringBoot 整合 Kafka)
- ActiveMQ 与spring整合
- Spring与ActiveMQ整合及用JmsTemplate发送消息
- 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例
- 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例
- Java消息队列-Spring整合ActiveMq
- 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例
- 消息队列activemq整合spring发送端和接收端配置
- 消息中间件之ActiveMQ整合Spring实现邮箱发送(四)
- Spring与ActiveMQ整合
- 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例
- Spring整合JMS——基于ActiveMQ实现
- 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例
- Spring和ActiveMQ整合
- Spring整合JMS——基于ActiveMQ实现
- 5、ActiveMQ入门教程(五)--Spring和ActiveMQ整合
- ActiveMQ的入门案例以及整合Spring的简单使用
- Spring Boot与ActiveMQ整合
- 【报错】spring整合activeMQ,pom.xml文件缺架包,启动报错:Caused by: java.lang.ClassNotFoundException: org.apache.xbean.spring.context.v2.XBeanNamespaceHandler
- springboot 整合 activemq,JmsMessagingTemplate注入失败