您的位置:首页 > 其它

ActiveMq持久订阅小例子

2017-04-16 11:45 218 查看
public class ProducerTopic {
public static void main(String[] args) throws JMSException {
String user = ActiveMQConnectionFactory.DEFAULT_USER;
String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
String brokerUrl = "tcp://hadoop2:61616";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, brokerUrl);
/**
* 创建连接并开启
*/
Connection connection = connectionFactory.createConnection();

/**
* 创建session
* 参数1:不启动事务
* 参数2:签收模式为自动签收
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 创建topic或者queue
*/
Topic topic_2 = session.createTopic("topic_2");
/**
* 创建生产者
*/
MessageProducer producer = session.createProducer(topic_2);
//此生产者生产的数据持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
/**
* 注意,一定要在设置了持久化之后再启动connection
*/
connection.start();
/**
* 生产数据
*/
for (int i = 0; i < 10; i++) {
TextMessage textMessage = session.createTextMessage();
textMessage.setText("hello java " + i);
producer.send(textMessage);
}

/**
* 释放连接
*/
connection.close();

}
}


public class ConsumerTopic {
public static void main(String[] args) {
String user = ActiveMQConnectionFactory.DEFAULT_USER;
String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
String brokerUrl = "tcp://hadoop2:61616";

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, brokerUrl);
Connection connection = null;
try {
/**
* 创建连接并开启,注册一个客户号,让broker知道是谁订阅了。
* 需要在producer发消息之前,先运行一下此程序,目的是注册订阅信息,然后就可以关闭程序
* 这样就可以做到再次启动时,收到离线的消息。
*/
connection = connectionFactory.createConnection();
connection.setClientID("c1");
/**
* 创建session
* 参数1:不启动事务
* 参数2:签收模式为自动签收

aa8b
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 创建topic或者queue
*/
Topic topic_2 = session.createTopic("topic_2");
//对于topic_2,建立持久订阅,名字随便起
TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic_2, "somebody");
connection.start();
/**
* 消费数据
*/
while (true) {
TextMessage textMessage = (TextMessage) durableSubscriber.receive();
System.out.println("消费消息:" + textMessage.getText());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: