您的位置:首页 > 运维架构

AMQ 虚拟topic

2016-04-07 20:43 387 查看
业务场景:

为了做到高可用性,topic的consumer服务通常是多台服务。如果用普通的Topic,则多个consumer的服务就会出现重复消费的情况。

解决方案:

AMQ引入了虚拟Topic,如果Topic的名字是以"VirtualTopic."开头,则AMQ自动将其识别为虚拟主题的Topic,如 VirtualTopic.NORMAL

其对应的consumer则需要以 "Consumer.groupName.VirtualTopic.X"的格式命名,其中groupName是为了标记consumer的分组,如 Consumer.normal.VirtualTopic.NORMAL。

原理:

AMQ是通过Queue来实现这个功能。

package com.hayden.amq;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Test;

public class TestVirtualTopic {

@Test
public void testNormalTopic() {
try {

ActiveMQConnectionFactory factoryA = getAMQConnectionFactory();

ActiveMQTopic queue = new ActiveMQTopic(getNormalTopicName());
ActiveMQConnection conn = (ActiveMQConnection) factoryA.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

MessageConsumer consumer1 = session.createConsumer( queue );
MessageConsumer consumer2 = session.createConsumer( queue );
final AtomicInteger aint1 = new AtomicInteger(0);
MessageListener listenerA = new MessageListener() {
public void onMessage(Message message) {
try {
int index = aint1.incrementAndGet();
System.out.println(index
+ " => receive from "+ getNormalTopicName() +": " + message);
Thread.sleep(10L);

} catch (Exception e) {
e.printStackTrace();
}
}
};

consumer1.setMessageListener(listenerA);
consumer2.setMessageListener(listenerA);

MessageProducer producer = session.createProducer(new ActiveMQTopic(getNormalTopicName()));
int index = 0;
while (index++ < 100) {
//            	System.out.println("Start to send msg");
TextMessage message = session.createTextMessage(index
+ " message.");
producer.send(message);
Thread.sleep(5L);
}

} catch (Exception e) {
e.printStackTrace();
}
}

@Test
public void testNoralVirtualTopic() {
try {

ActiveMQConnectionFactory factoryA = getAMQConnectionFactory();

Queue queue = new ActiveMQQueue(getVirtualTopicConsumerName());
ActiveMQConnection conn = (ActiveMQConnection) factoryA.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

MessageConsumer consumer1 = session.createConsumer( queue );
MessageConsumer consumer2 = session.createConsumer( queue );

final AtomicInteger aint1 = new AtomicInteger(0);
MessageListener listenerA = new MessageListener() {
public void onMessage(Message message) {
try {
int index = aint1.incrementAndGet();
System.out.println(index
+ " => receive from "+ getNormalVirtualTopicName() +": " + message);
//	                        if(index % 2==0){
//	                        	this.wait(1000L);
//	                        }
} catch (Exception e) {
e.printStackTrace();
}
}
};
consumer1.setMessageListener(listenerA);
consumer2.setMessageListener(listenerA);
MessageProducer producer = session.createProducer(new ActiveMQTopic(getNormalVirtualTopicName()));
int index = 0;
while (index++ < 100) {
TextMessage message = session.createTextMessage(index
+ " message.");
producer.send(message);
}

} catch (Exception e) {
e.printStackTrace();
}
}

@Test
public void testVirtualTopic() {
try {

ActiveMQConnectionFactory factoryA = getAMQConnectionFactory();

Queue queue = new ActiveMQQueue(getVirtualTopicConsumerNameA());
ActiveMQConnection conn = (ActiveMQConnection) factoryA.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

MessageConsumer consumer1 = session.createConsumer( queue );
MessageConsumer consumer2 = session.createConsumer( queue );
MessageConsumer consumer3 = session.createConsumer( new ActiveMQQueue(getVirtualTopicConsumerNameB()) );
final AtomicInteger aint1 = new AtomicInteger(0);
MessageListener listenerA = new MessageListener() {
public void onMessage(Message message) {
try {
System.out.println(aint1.incrementAndGet()
+ " => receive from "+ getVirtualTopicConsumerNameA() +": " + message);
} catch (Exception e) {
e.printStackTrace();
}
}
};
consumer1.setMessageListener(listenerA);
consumer2.setMessageListener(listenerA);
final AtomicInteger aint2 = new AtomicInteger(0);
MessageListener listenerB = new MessageListener() {
public void onMessage(Message message) {
try {
System.out.println(aint2.incrementAndGet()
+ " => receive from "+ getVirtualTopicConsumerNameB() +": " + message);
} catch (Exception e) {
e.printStackTrace();
}
}
};
consumer3.setMessageListener(listenerB);

MessageProducer producer = session.createProducer(new ActiveMQTopic(getVirtualTopicName()));
int index = 0;
while (index++ < 10) {
TextMessage message = session.createTextMessage(index
+ " message.");
producer.send(message);
}

} catch (Exception e) {
e.printStackTrace();
}
}

private ActiveMQConnectionFactory getAMQConnectionFactory() {
return new ActiveMQConnectionFactory(
"tcp://127.0.0.1:61616");
}

protected static String getNormalTopicName() {
return "nomal.TEST";
}

protected static String getNormalVirtualTopicName() {
return "VirtualTopic.NORMAL";
}

protected static String getVirtualTopicName() {
return "VirtualTopic.TEST";
}

protected static String getVirtualTopicConsumerName() {
return "Consumer.normal.VirtualTopic.NORMAL";
}

protected static String getVirtualTopicConsumerNameA() {
return "Consumer.A.VirtualTopic.TEST";
}

protected static String getVirtualTopicConsumerNameB() {
return "Consumer.B.VirtualTopic.TEST";
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: