AMQ 虚拟topic
2016-04-07 20:43
387 查看
业务场景:
为了做到高可用性,topic的consumer服务通常是多台服务。如果用普通的Topic,则多个consumer的服务就会出现重复消费的情况。
解决方案:
AMQ引入了虚拟Topic,如果Topic的名字是以"VirtualTopic."开头,则AMQ自动将其识别为虚拟主题的Topic,如 VirtualTopic.NORMAL。
其对应的consumer则需要以 "Consumer.groupName.VirtualTopic.X"的格式命名,其中groupName是为了标记consumer的分组,如 Consumer.normal.VirtualTopic.NORMAL。
原理:
AMQ是通过Queue来实现这个功能。
为了做到高可用性,topic的consumer服务通常是多台服务。如果用普通的Topic,则多个consumer的服务就会出现重复消费的情况。
解决方案:
AMQ引入了虚拟Topic,如果Topic的名字是以"VirtualTopic."开头,则AMQ自动将其识别为虚拟主题的Topic,如 VirtualTopic.NORMAL。
其对应的consumer则需要以 "Consumer.groupName.VirtualTopic.X"的格式命名,其中groupName是为了标记consumer的分组,如 Consumer.normal.VirtualTopic.NORMAL。
原理:
AMQ是通过Queue来实现这个功能。
package com.hayden.amq; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.junit.Test; public class TestVirtualTopic { @Test public void testNormalTopic() { try { ActiveMQConnectionFactory factoryA = getAMQConnectionFactory(); ActiveMQTopic queue = new ActiveMQTopic(getNormalTopicName()); ActiveMQConnection conn = (ActiveMQConnection) factoryA.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer1 = session.createConsumer( queue ); MessageConsumer consumer2 = session.createConsumer( queue ); final AtomicInteger aint1 = new AtomicInteger(0); MessageListener listenerA = new MessageListener() { public void onMessage(Message message) { try { int index = aint1.incrementAndGet(); System.out.println(index + " => receive from "+ getNormalTopicName() +": " + message); Thread.sleep(10L); } catch (Exception e) { e.printStackTrace(); } } }; consumer1.setMessageListener(listenerA); consumer2.setMessageListener(listenerA); MessageProducer producer = session.createProducer(new ActiveMQTopic(getNormalTopicName())); int index = 0; while (index++ < 100) { // System.out.println("Start to send msg"); TextMessage message = session.createTextMessage(index + " message."); producer.send(message); Thread.sleep(5L); } } catch (Exception e) { e.printStackTrace(); } } @Test public void testNoralVirtualTopic() { try { ActiveMQConnectionFactory factoryA = getAMQConnectionFactory(); Queue queue = new ActiveMQQueue(getVirtualTopicConsumerName()); ActiveMQConnection conn = (ActiveMQConnection) factoryA.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer1 = session.createConsumer( queue ); MessageConsumer consumer2 = session.createConsumer( queue ); final AtomicInteger aint1 = new AtomicInteger(0); MessageListener listenerA = new MessageListener() { public void onMessage(Message message) { try { int index = aint1.incrementAndGet(); System.out.println(index + " => receive from "+ getNormalVirtualTopicName() +": " + message); // if(index % 2==0){ // this.wait(1000L); // } } catch (Exception e) { e.printStackTrace(); } } }; consumer1.setMessageListener(listenerA); consumer2.setMessageListener(listenerA); MessageProducer producer = session.createProducer(new ActiveMQTopic(getNormalVirtualTopicName())); int index = 0; while (index++ < 100) { TextMessage message = session.createTextMessage(index + " message."); producer.send(message); } } catch (Exception e) { e.printStackTrace(); } } @Test public void testVirtualTopic() { try { ActiveMQConnectionFactory factoryA = getAMQConnectionFactory(); Queue queue = new ActiveMQQueue(getVirtualTopicConsumerNameA()); ActiveMQConnection conn = (ActiveMQConnection) factoryA.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer1 = session.createConsumer( queue ); MessageConsumer consumer2 = session.createConsumer( queue ); MessageConsumer consumer3 = session.createConsumer( new ActiveMQQueue(getVirtualTopicConsumerNameB()) ); final AtomicInteger aint1 = new AtomicInteger(0); MessageListener listenerA = new MessageListener() { public void onMessage(Message message) { try { System.out.println(aint1.incrementAndGet() + " => receive from "+ getVirtualTopicConsumerNameA() +": " + message); } catch (Exception e) { e.printStackTrace(); } } }; consumer1.setMessageListener(listenerA); consumer2.setMessageListener(listenerA); final AtomicInteger aint2 = new AtomicInteger(0); MessageListener listenerB = new MessageListener() { public void onMessage(Message message) { try { System.out.println(aint2.incrementAndGet() + " => receive from "+ getVirtualTopicConsumerNameB() +": " + message); } catch (Exception e) { e.printStackTrace(); } } }; consumer3.setMessageListener(listenerB); MessageProducer producer = session.createProducer(new ActiveMQTopic(getVirtualTopicName())); int index = 0; while (index++ < 10) { TextMessage message = session.createTextMessage(index + " message."); producer.send(message); } } catch (Exception e) { e.printStackTrace(); } } private ActiveMQConnectionFactory getAMQConnectionFactory() { return new ActiveMQConnectionFactory( "tcp://127.0.0.1:61616"); } protected static String getNormalTopicName() { return "nomal.TEST"; } protected static String getNormalVirtualTopicName() { return "VirtualTopic.NORMAL"; } protected static String getVirtualTopicName() { return "VirtualTopic.TEST"; } protected static String getVirtualTopicConsumerName() { return "Consumer.normal.VirtualTopic.NORMAL"; } protected static String getVirtualTopicConsumerNameA() { return "Consumer.A.VirtualTopic.TEST"; } protected static String getVirtualTopicConsumerNameB() { return "Consumer.B.VirtualTopic.TEST"; } }
相关文章推荐
- 架构高性能网站秘笈(二)——动态内容缓存
- 架构高性能网站秘笈(二)——动态内容缓存
- 什么是Docker
- Centos下zookeeper安装配置
- 理解 Keystone 核心概念 - 每天5分钟玩转 OpenStack(18)
- linux下搭建svn服务器 (多个项目的权限分组管理)
- Centos常用的进程管理和资源查看工具
- Citrix-Desktop7.8版本应用交付
- SetOperations
- 一、认识linux目录结构
- Linux写时拷贝技术(copy-on-write)
- hadoop1.2.1安装配置
- CentOS修改主机名(hostname)
- nginx log 时间转换时间戳在php,perl,python中的使用
- PropertiesFactoryBean的使用
- 做了个工具类的小网站---tool.admaster.club
- Linux 设备驱动开发思想 —— 驱动分层与驱动分离
- shell 脚本 小列举
- CentOS:开放80、22、3306端口操作
- 关于linux的读写锁