您的位置:首页 > 其它

ActiveMQ的简单使用

2017-02-24 00:00 363 查看

项目中使用的介绍

一、运行ActiveMQ

在文件路径下...\apache-activemq-5.13.3\bin\win64

运行activemq.bat

这是系统中的使用,运行后还可以访问相应的页面。

二、项目中的实践

1.依赖的jar包

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-server</artifactId>
</dependency>

2.配置ActiveMQ的配置,要和运行的保持一致,在commom.properties文件中。

activemq.broker.url=tcp://127.0.0.1:61616
activemq.user=admin
activemq.password=admin
activemq.name=datacenter.test
activemq.name.access=datacenter.access

3.applicationContext-mq.xml 中配置(推消息端)

<!-- 1.配置ActiveMQ 连接工厂 -->
<amq:connectionFactory id="creditMQConnectionFactory"
brokerURL="${activemq.broker.url}"
userName="${activemq.user}"
password="${activemq.password}"  />

<!-- 2.配置缓存 ConnectionFactory -->
<bean id="creditMQconnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="creditMQConnectionFactory"/>
<property name="sessionCacheSize" value="100" />
</bean>

<!-- 定义JmsTemplate的Queue类型 -->
<bean id="creditJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="creditMQConnectionFactory" />
<property name="pubSubDomain" value="false" />
</bean>

<!-- 定义授权队列消费者
<bean id="creditConsumer" class="com.itom.mq.consumer.CreditConsumer"/>-->

<!-- 定义Queue监听器 -->
<jms:listener-container destination-type="queue" container-type="default"
connection-factory="creditMQconnectionFactory"
acknowledge="auto">
</jms:listener-container>
<!--运营相关配置结束-->

4.编写生产者和消费者

1) 生产者

import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.jms.Queue;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class CreditProducer {

private Map<String ,Queue> cacheQueue = new ConcurrentHashMap<String, Queue>();

@Resource
private JmsTemplate creditJmsTemplate;

public void pushCreditData(String queueName ,Object message){

Queue queue = cacheQueue.get(queueName);
if(queue == null){
queue = new ActiveMQQueue(queueName);
cacheQueue.put(queueName,queue);
}
this.creditJmsTemplate.convertAndSend(queue, message);
}
}

2) 消费者:

消费者xml配置信息:

<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<constructor-arg index="0" value="${jms.broker.user}"/>
<constructor-arg index="1" value="${jms.broker.password}"/>
<constructor-arg index="2" value="${jms.broker.url}"/>
</bean>

<!-- ConnectionFactory Definition -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory"/>
</bean>

<!--  Default Destination Queue Definition-->
<bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="${jms.queue.name}"/>
</bean>

<!-- JmsTemplate Definition -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestination" ref="defaultDestination"/>
</bean>

<!-- JmsTemplate Receiver-->
<bean id="messageReceiver" class="com.itom.rating.listener.MessageReceiver"/>
<bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" value="${jms.queue.name}"/>
<property name="messageListener" ref="messageReceiver"/>
</bean>

<!--the same ActiveMQ but different receiver -->
<bean id="accessMessageReceiver" class="com.itom.rating.listener.AccessMessageReceiver"/>
<bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" value="${jms.queue.access.name}"/>
<property name="accessMessageListener" ref="accessMessageReceiver"/>
</bean>

线程池的配置:

@Service("messageComputePool")
public class MessageComputePool {

public static final Logger logger = LoggerFactory.getLogger(MessageComputePool.class);

private ExecutorService tPool = null;

public MessageComputePool() {
this.tPool = Executors.newFixedThreadPool(50);
}

public void submit(Callable task) {
this.debug("messageComputePool 开始工作");
this.tPool.submit(task);
}

public void debug(String message) {
if (logger.isDebugEnabled()) {
logger.debug(message);
}
}

public void info(String message) {
if (logger.isInfoEnabled()) {
logger.info(message);
}
}
}

代码中使用:

public class MessageReceiver implements MessageListener {

//自己定义的类,使用线程池来处理消息
@Resource(name = "messageComputePool")
MessageComputePool pool;

public void onMessage(Message message) {
if(message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
System.out.println(String.format("Received: %s",text));
JSONObject jsonObj = (JSONObject)JSONObject.parse(text);
...
//消费者接收到消息就可以进行处理了
ComputeTask task = new ComputeTask();//...implements Callable<Result>...
pool.submit(task);//...
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}

3)使用生产者推消息

@Service("collectService ")
public class CollectServiceImpl implements CollectService {
public static final Logger LOG  = LoggerFactory.getLogger(CollectServiceImpl.class);

@Resource
private CreditProducer creditProducer;

@Override
public void collect(CollectParameter parameter) {
Map dataMap = new HashMap();
dataMap.put("sourceCode", parameter.getValue());
dataMap.put("userId",parameter.getUserid());
creditProducer.pushCreditData("itom.activemq.access", JSON.toJSONString(dataMap));
}
}
}


推荐个ActiveMQ入门教程:
http://blog.csdn.net/jolingogo/article/category/1658165
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  ActiveMQ