您的位置:首页 > 编程语言 > Java开发

spring整合activeMQ

2018-02-13 16:12 113 查看
Active的本机网址是http://localhost:8161/

队列queue 的特点就是,许多人可以监听它,但是只有一个人能够收到消息。

主题topic 的特点就是,许多人监听它,都能收到消息。

注意:里面有个坑,因为定义的模板属于同一个类型,用autowired的去注入会出现问题,这里需要提供具体是那个名字id去注入

加上这个注解@Qualifier(“topicTemplate”)

1。安装active

2。所依赖的pom

<!--activiMQ整合-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.1.6.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>4.1.6.RELEASE</version>
<scope>compile</scope>
</dependency>

<!-- 这个看教程是没有加的,但是我的报错了,加上这个就好了,不知道原因,
4000
你们看情况而定 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.5</version>
</dependency>


spring配置文件

配置文件很多,因为我整合了工作流,事务等,你只需要关注activemq那部分就可以,注意约束文件,坑很多。搞得我大半个小时才好

大致流程:你需要一个连接

然后把连接放到spring的管理工厂中

你需要定义一个模板,jms是spring提供的一套模板,分为queue和topic两种。

然后你就可以引入你定义的模板,然后发送消息,模板提供的只是一个接口,你需要去实现具体看代码

你还需要注册监听者,可以多个,监听你所创建的队列或者主题,这个接听者需要你自己实现,需要实现接口,具体看代码

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd" >

<!--引用jdbc配置文件-->
<context:property-placeholder location="classpath:jdbc.properties"/>
<!--开启切面自动代理-->
<aop:aspectj-autoproxy proxy-target-class="true" />
<!--开启包扫描-->
<context:component-scan base-package="com.coder520" />
<!--扫描注解生成bean-->
<context:annotation-config />

<!--阿里巴巴数据源-->
<!--数据库设置-->
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource"
destroy-method="close" init-method="init">
<property name="url" value="${jdbc_url}"/>
<property name="username" value="${jdbc_username}"/>
<property name="password" value="${jdbc_password}"/>
<!-- 初始化连接大小 -->
<property name="initialSize" value="0"/>
<!-- 连接池最大使用连接数量 -->
<property name="maxActive" value="20"/>
<!-- 连接池最小空闲 -->
<property name="minIdle" value="0"/>
<!-- 获取连接最大等待时间 -->
<property name="maxWait" value="60000"/>
<!--
<property name="poolPreparedStatements" value="true" />
<property name="maxPoolPreparedStatementPerConnectionSize" value="33" />
-->
<property name="validationQuery" value="${validationQuery}"/>
<property name="testOnBorrow" value="false"/>
<property name="testOnReturn" value="false"/>
<property name="testWhileIdle" value="true"/>
<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
<property name="timeBetweenEvictionRunsMillis" value="60000"/>
<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
<property name="minEvictableIdleTimeMillis" value="25200000"/>
<!-- 打开removeAbandoned功能 -->
<property name="removeAbandoned" value="true"/>
<!-- 1800秒,也就是30分钟 -->
<property name="removeAbandonedTimeout" value="1800"/>
<!-- 关闭abanded连接时输出错误日志 -->
<property name="logAbandoned" value="true"/>
<!-- 监控数据库 -->
<!-- <property name="filters" value="stat" /> -->
<property name="filters" value="mergeStat"/>
</bean>
<!--sessionFactory-->
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<!--数据源-->
<property name="dataSource" ref="dataSource" />
<!--需要扫描的mapper的映射包-->
<property name="mapperLocations" value="classpath:com/coder520/**/**.xml" />
</bean>

<!--mybatis的接口扫描-->
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<!--需要扫描的映射接口-->
<property name="basePackage" value="com.coder520.*.dao"/>
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
</bean>

<!--设置事务   采用注解的方式-->
<tx:annotation-driven transaction-manager="transactionManager"/>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>

<!--流程引擎配置项-->
<bean  id="processEngineConfiguration" class="org.activiti.spring.SpringProcessEngineConfiguration">
<!--数据源 和系统同一个-->
<property name="dataSource" ref="dataSource"/>
<property name="transactionManager" ref="transactionManager" />
<!--更新数据库表 如果没有 则创建-->
<property name="databaseSchemaUpdate" value="true" />
<!-- 是否启动jobExecutor -->
<property name="jobExecutorActivate" value="false" />
<property name="activityFontName" value="宋体"/>
<property name="labelFontName" value="宋体"/>
<!--自动部署流程-->
<!--<property name="deploymentResources">-->
<!--<list>-->
<!--<value>classpath*:workflow/*.bpm
127a9
n</value>-->
<!--</list>-->
<!--</property>-->
</bean>

<!--流程引擎-->
<bean id="processEngine" class="org.activiti.spring.ProcessEngineFactoryBean">
<property name="processEngineConfiguration" ref="processEngineConfiguration" />
</bean>

<!-- 工作流数据存储服务 -->
<bean id="repositoryService" factory-bean="processEngine" factory-method="getRepositoryService" />
<!-- 工作流运行时服务 -->
<bean id="runtimeService" factory-bean="processEngine" factory-method="getRuntimeService" />
<!--  工作流任务服务-->
<bean id="taskService" factory-bean="processEngine" factory-method="getTaskService" />
<!--  工作流历史数据服务-->
<bean id="historyService" factory-bean="processEngine" factory-method="getHistoryService" />
<!--  工作流管理服务-->
<bean id="managementService" factory-bean="processEngine" factory-method="getManagementService" />
<!-- 工作流身份识别服务 -->
<bean id="IdentityService" factory-bean="processEngine" factory-method="getIdentityService" />

<!--整合activiMQ-->
<amq:connectionFactory brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin"
id="amqConnectionFactory" />

<!-- 配置JMS连接工场 -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
<property name="sessionCacheSize" value="100" />
</bean>

<!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
<bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory" />
<property name="receiveTimeout" value="10000" />
<!-- true是topic,false是queue,默认是false,此处显示写出false -->
<property name="pubSubDomain" value="false" />
</bean>

<!-- 配置JMS模板(Topic),Spring提供的JMS工具类,它发送、接收消息。 -->
<bean id="topicTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory" />
<property name="receiveTimeout" value="10000" />
<!-- true是topic,false是queue,默认是false,此处显示写出false -->
<property name="pubSubDomain" value="true" />
</bean>

<!--监听队列 需要自定义两个类-->
<jms:listener-container connection-factory="connectionFactory" destination-type="queue">
<jms:listener destination="coder520.queue" ref="queueRece1" />
<jms:listener destination="coder520.queue" ref="queueRece2" />
</jms:listener-container>

<!--监听发布订阅 需要自定义两个类-->
<jms:listener-container connection-factory="connectionFactory" destination-type="topic">
<jms:listener destination="coder520.topic" ref="topicRece1" />
<jms:listener destination="coder520.topic" ref="topicRece2" />
</jms:listener-container>
</beans>


队列接收1

QueueRece1

package com.coder520.activeMQ.queue;

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
* Created by Administrator on 2018/2/13.
*/
@Component("queueRece1")
public class QueueRece1 implements MessageListener{
@Override
public void onMessage(Message message) {
try {
System.out.println("QueueRece1接收到消息--"+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}


QueueSend

package com.coder520.activeMQ.queue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

/**
* Created by Administrator on 2018/2/13.
*/
@Component
public class QueueSend {
@Autowired
@Qualifier("queueTemplate")
JmsTemplate queueTemplate;

public void send(String queueName,String message){
queueTemplate.send(queueName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}


主题topic

TopicRece1

package com.coder520.activeMQ.topic;

import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
* Created by Administrator on 2018/2/13.
*/
@Component("topicRece1")
public class TopicRece1 implements MessageListener{
@Override
public void onMessage(Message message) {
try {
System.out.println("topicRece1接收到消息--"+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}


TopicSend

package com.coder520.activeMQ.topic;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

/**
* Created by Administrator on 2018/2/13.
*/
@Component
public class TopicSend {
@Autowired
@Qualifier("topicTemplate")
JmsTemplate topicTemplate;

public void send(String queueName,String message){
topicTemplate.send(queueName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: