您的位置:首页 > 其它

AciveMQ开发事例

2016-01-26 14:22 387 查看
发送消息的基本步骤:

(1)、创建连接使用的工厂类JMS ConnectionFactory

(2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动

(3)、使用连接Connection 建立会话Session

(4)、使用会话Session和管理对象Destination创建消息生产者MessageSender

(5)、使用消息生产者MessageSender发送消息



消息接收者从JMS接受消息的步骤

(1)、创建连接使用的工厂类JMS ConnectionFactory

(2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动

(3)、使用连接Connection 建立会话Session

(4)、使用会话Session和管理对象Destination创建消息接收者MessageReceiver

(5)、使用消息接收者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver消息接收者必须实现了MessageListener接口,需要定义onMessage事件方法。


事例:

package com.banksteel.mall.item.schedule.container;

import org.apache.log4j.Logger;

import org.springframework.context.support.ClassPathXmlApplicationContext;

/**

* @description: 统计启动入口

* @projectName:banksteel-pop-inventory-count-schedule

* @className:Main.java

* @see: com.banksteel.inventory.count.schedule.container

* @author: zhaiang

* @createTime:2015年8月18日 下午6:32:21

* @version 3.0.0

*/

public class Main

{

private static final Logger logger = Logger.getLogger(Main.class);

private static ClassPathXmlApplicationContext context;

public static void main(String[] args)

{

try

{

context = new ClassPathXmlApplicationContext(new String[]

{ "classpath:META-INF/spring/root-config.xml" });

context.start();

}

catch (Exception e)

{

logger.error("==>againListenner start error:", e);

System.exit(0);

}

}

}

package com.banksteel.mall.item.schedule.listener;

import java.util.ArrayList;

import java.util.List;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.Session;

import org.apache.activemq.command.ActiveMQTextMessage;

import org.apache.log4j.Logger;

import org.springframework.beans.factory.annotation.Autowired;

import com.alibaba.fastjson.JSONArray;

import com.alibaba.fastjson.JSONObject;

import com.banksteel.mall.item.schedule.utils.CommonUtils;

import com.banksteel.mall.itemindex.service.ItemIndexService;

import cn.mysteel.mmq.listener.MMQMessageListener;

/**

* @description: 明细索引消息监听

* @projectName:banksteel-mall-item-schedule

* @className:ItemIndexMessageListener.java

* @see: com.banksteel.mall.item.schedule.listener

* @author: zhaiang

* @createTime:2016年1月18日 上午11:18:59

* @version 3.0.0

*/

public class ItemIndexMessageListener extends MMQMessageListener

{

public static final Logger logger = Logger.getLogger(ItemIndexMessageListener.class);

@Autowired

private ItemIndexService itemIndexService;

@Override

public void onMessage(Message message, Session session) throws JMSException

{

try

{

ActiveMQTextMessage msg = (ActiveMQTextMessage) message;

String ms = msg.getText();

JSONObject jsonObject = CommonUtils.parseJsonObject(ms);

List<Long> itemIds = null;

if (jsonObject != null)

{

String action = (String) jsonObject.get("action");

JSONArray jsonArray = jsonObject.getJSONArray("itemIds");

if (jsonArray != null && !jsonArray.isEmpty())

{

itemIds = new ArrayList<Long>();

for (int i = 0; i < jsonArray.size(); i++)

{

itemIds.add(jsonArray.getLong(i));

}

}

if (itemIds != null && !itemIds.isEmpty())

{

if ("put".equals(action))

{

logger.info("资源上架创建索引开始:" + itemIds);

try

{

itemIndexService.createOrModifyItemIndexIds(itemIds);

}

catch (Exception e)

{

e.printStackTrace();

}

}

else if ("out".equals(action))

{

logger.info("资源下架创建索引开始:" + itemIds);

try

{

itemIndexService.removeItemIndexByIdList(itemIds);

}

catch (Exception e)

{

e.printStackTrace();

}

}

}

}

logger.info("********************明细商品索引监听器:" + ms + "********************");

}

catch (Exception e)

{

logger.error("队列解析出错");

}

}

}

配置文件

root-config.xml

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context"

xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans-3.2.xsd

http://www.springframework.org/schema/aop

http://www.springframework.org/schema/aop/spring-aop-3.2.xsd

http://www.springframework.org/schema/tx

http://www.springframework.org/schema/tx/spring-tx-3.2.xsd

http://www.springframework.org/schema/context

http://www.springframework.org/schema/context/spring-context-3.2.xsd"
default-autowire="byName" default-lazy-init="false">

<!-- 采用注释的方式配置bean -->

<context:annotation-config />

<!-- 读入配置属性文件 -->

<context:property-placeholder location="classpath:/META-INF/mq.properties,classpath:/META-INF/dubbo.properties" />

<aop:aspectj-autoproxy proxy-target-class="true" />

<import resource="dubbo-service-in.xml" />

<import resource="dubbo-service.xml" />

</beans>

dubbo-service.xml



<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context"

xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans-3.2.xsd

http://www.springframework.org/schema/aop

http://www.springframework.org/schema/aop/spring-aop-3.2.xsd

http://www.springframework.org/schema/tx

http://www.springframework.org/schema/tx/spring-tx-3.2.xsd

http://www.springframework.org/schema/context

http://www.springframework.org/schema/context/spring-context-3.2.xsd"
default-autowire="byName" default-lazy-init="false">

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->

<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">

<!-- ActiveMQ服务地址 -->

<property name="brokerURL" value="${mq.brokerURL}" />

<property name="userName" value="${mq.userName}"></property>

<property name="password" value="${mq.password}"></property>

</bean>

<!-- ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory 可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。 要依赖于 activemq-pool包 -->

<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">

<property name="connectionFactory" ref="targetConnectionFactory" />

<property name="maxConnections" value="${mq.pool.maxConnections}" />

</bean>

<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->

<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">

<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->

<property name="targetConnectionFactory" ref="pooledConnectionFactory" />

</bean>

<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->

<!-- 队列模板 -->

<bean id="activeMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate">

<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->

<property name="connectionFactory" ref="connectionFactory" />

</bean>

<!-- 明细商品索引监听配置 -->

<bean id="itemSessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">

<property name="connectionFactory" ref="connectionFactory" />

<property name="destination" ref="itemSessionAwareQueue" />

<property name="messageListener" ref="itemIndexMessageListener" />

</bean>

<bean id="itemSessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue">

<constructor-arg>

<value>${itemQueueName}</value>

</constructor-arg>

</bean>

<bean id="itemIndexMessageListener" class="com.banksteel.mall.item.schedule.listener.ItemIndexMessageListener"></bean>

<!-- 汇总商品索引监听配置 -->

<bean id="summaryItemSessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">

<property name="connectionFactory" ref="connectionFactory" />

<property name="destination" ref="summaryItemSessionAwareQueue" />

<property name="messageListener" ref="summaryItemIndexMessageListener" />

</bean>

<bean id="summaryItemSessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue">

<constructor-arg>

<value>${summaryQueueName}</value>

</constructor-arg>

</bean>

<bean id="summaryItemIndexMessageListener" class="com.banksteel.mall.item.schedule.listener.SummaryItemIndexMessageListener"></bean>

</beans>

MQProperties

## MQ

mq.brokerURL=tcp\://192.168.200.17\:61616

mq.userName=purchase

mq.password=mysteel

mq.pool.maxConnections=10

#queueName

itemQueueName=banksteel.mall.item.itemIndex

summaryQueueName=banksteel.mall.item.summaryIndex



http://www.cnblogs.com/xwdreamer/archive/2012/02/21/2360818.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: