您的位置:首页 > 运维架构

JMS学习六(ActiveMQ Topic之持久订阅)

2017-03-09 13:54 281 查看
上篇文章中Topic(主题)传送模型可以有两种订阅模式即持久订阅和非持久订阅,废话少说看代码:

一、非持久订阅

1、消息生产者

package mqtest2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer3 {
public static void main(String[] args) throws Exception {
// 创建一个JMS connection factory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"admin", "admin", "tcp://127.0.0.1:61616");
// 通过connection factory来创建JMS connection
Connection connection = connectionFactory.createConnection();
// 启动JMS connection
connection.start();
// 通过connection创建JMS session
Session session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 创建JMS destination
Destination destination = session.createTopic("topic-test3");
// 创建JMS producer
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < 5; i++) {
TextMessage message = session.createTextMessage("message: " + i);
// 发送message
producer.send(message);
}
// 关闭所有的JMS资源
session.close();
connection.close();
}

}
2、消息消费者

package mqtest2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Receive3 {
public static void main(String[] args) throws Exception {
// 创建一个JMS connection factory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"admin", "admin", "tcp://127.0.0.1:61616");
// 通过connection factory来创建JMS connection
Connection connection = connectionFactory.createConnection();
// 启动JMS connection
connection.start();
// 通过connection创建JMS session
Session session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 创建JMS destination
Topic destination = session.createTopic("topic-test3");
// 创建JMS consumer
MessageConsumer consumer = session.createConsumer(destination);
// 设置监听
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
message = (TextMessage) message;
try {
String value = ((TextMessage) message).getText();
System.out.println("value2: " + value);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
}


这个非持久的订阅是不是和第二篇中的一样啊,是的都是一样的。

二、持久订阅

持久订阅的前提是要发送持久化消息,将消息持久化到磁盘,一边之后再读取。

1、消息生产者

package mqtest2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
public class Producter {
public static void main(String[] args) throws Exception {
//创建一个JMS connection factory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://127.0.0.1:61616");
//通过connection factory来创建JMS connection
Connection connection = connectionFactory.createConnection();
//通过connection创建JMS session
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//创建JMS destination
Destination destination = session.createTopic("PersistenceTopic");
//创建JMS producer
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//启动JMS connection
connection.start();
for(int i = 0;i < 5;i++){
TextMessage message = session.createTextMessage("message"+i);
//发送message
producer.send(message);
}
//关闭所有的JMS资源
session.close();
connection.close();
}

}

2、消息消费者

package mqtest2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Receive1 {
public static void main(String[] args) throws Exception {
// 创建一个JMS connection factory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"admin", "admin", "tcp://127.0.0.1:61616");
// 通过connection factory来创建JMS connection
Connection connection = connectionFactory.createConnection();
// 设置一个标记id
connection.setClientID("TT");
// 启动JMS connection
connection.start();
// 通过connection创建JMS session
Session session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 创建JMS destination
Topic destination = session.createTopic("PersistenceTopic");
// 创建JMS consumer 按照目的地和唯一clientId标示。
// TopicSubscriber是MessageConsumer的子接口
TopicSubscriber ts = session.createDurableSubscriber(destination, "TT");
// 设置监听
ts.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
message = (TextMessage) message;
try {
String value = ((TextMessage) message).getText();
System.out.println("value2: " + value);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});

}
}


/************************ 持久订阅****************************/

1、消息生产者

package mqtest2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producter2 {
public static void main(String[] args) {
String name = "topic_test2";
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Topic topic = null;
MessageProducer producter = null;
TextMessage textMessage = null;
connectionFactory = new ActiveMQConnectionFactory("admin", "admin",
"tcp://127.0.0.1:61616");
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic(name);
producter = session.createProducer(topic);
// 持久化消息
producter.setDeliveryMode(DeliveryMode.PERSISTENT);
// 消息对象
textMessage = session.createTextMessage("one test!");
// 发送消息
producter.send(textMessage);
System.out.println("发送结束!");
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

2、消息消费者

package mqtest2;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Receive2 {
public static void main(String[] args) {
String clientid = "producter2";
String name = "topic_test2";
ActiveMQConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Topic topic = null;
MessageConsumer consumer = null;
try {
connectionFactory = new ActiveMQConnectionFactory("admin", "admin",
"tcp://127.0.0.1:61616");
connection = connectionFactory.createConnection();
connection.setClientID(clientid);
connection.start();
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic(name);
consumer = session.createDurableSubscriber(topic, clientid);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
message = (TextMessage) message;
try {
String value = ((TextMessage) message).getText();
System.out.println("value2: " + value);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}


上面是发布订阅传送模型的非持久化订阅和持久化订阅的demo。

非持久化订阅没有什么硬性的要求,但持久化订阅是有硬性要求的即:

(1)、发送的消息是持久化的。

(2)、activemq区分消费者是通过clientID和订阅主题名称来区分的。

(3)、按照订阅主题名称和给连接设置的id(ClientID)来创建一个消息消费者对象:consumer = session.createDurableSubscriber(topic, clientid);

(4)、订阅主题名称和设置的ClientID组合起来必须是唯一的。

(5)、使用相同的“clientID”和主题名称,则认为是同一个消费者。两个程序使用相同的“clientID”,则同时只能有一个连接到activemq,第二个连接的会报错。

参考文档:http://blog.csdn.net/zhu_tianwei/article/details/46303347
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: