您的位置:首页 > 编程语言 > Java开发

ActiveMQ的使用及整合spring的使用实例

2018-03-10 23:15 417 查看

ActiveMQ的主要使用步骤

主要讲述的是使用的一些步骤,关于两种消息不提供解释。

信息生产者

 /**
* 发送点对点的消息
* @throws Exception
*/
public void sendQueueTest() throws Exception{
//第一步:创建ActiveMQConnectionFactory对象,需要指定服务端ip及端口号。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.123:61616");
//第二步:使用ConnectionFactory对象创建一个Connection对象。
Connection connection = connectionFactory.createConnection();
//第三步:开启连接,调用Connection对象的start方法。
connection.start();
//第四步:使用Connection对象创建一个Session对象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
Queue queue = session.createQueue("test-queue");
//第六步:使用Session对象创建一个Producer对象。
MessageProducer producer = session.createProducer(queue);
//第七步:创建一个Message对象,创建一个TextMessage对象。
TextMessage message = session.createTextMessage("测试消息");
//第八步:使用Producer对象发送消息。
producer.send(message);
//第九步:关闭资源
producer.close();
session.close();
connection.close();
}
 /**
* 发布消息
* @throws Exception
*/
public void sendTopicTest() throws Exception{
//第一步:创建ActiveMQConnectionFactory对象,需要指定服务端ip及端口号。
ConnectionFactory ConnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.191.4:61616");
//第二步:使用ConnectionFactory对象创建一个Connection对象。
Connection connection = ConnectionFactory.createConnection();
//第三步:开启连接,调用Connection对象的start方法。
connection.start();
//第四步:使用Connection对象创建一个Session对象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Topic对象。
Topic topic = session.createTopic("Dai-Topic");
//第六步:使用Session对象创建一个Producer对象。
MessageProducer producer = session.createProducer(topic);
//第七步:创建一个Message对象,创建一个TextMessage对象。
TextMessage message = session.createTextMessage("test1_dai-Topic");
//第八步:使用Producer对象发送消息。
producer.send(message);
//第九步:关闭资源
producer.close();
session.close();
connection.close();
}

信息的接受者

 /**
* 接收点对点的消息
* @throws Exception
*/
public void getQueueTest() throws Exception {
//消费者:接收消息。
//第一步:创建一个ConnectionFactory对象。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.123:61616");
//第二步:从ConnectionFactory对象中获得一个Connection对象。
Connection connection = connectionFactory.createConnection();
//第三步:开启连接。调用Connection对象的start方法。
connection.start();
//第四步:使用Connection对象创建一个Session对象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
Queue queue = session.createQueue("test-queue");
//第六步:使用Session对象创建一个Consumer对象。
MessageConsumer consumer = session.createConsumer(queue);
//第七步:接收消息。
consumer.setMessageListener(new MessageListener() {

@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
//第八步:打印消息。
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
//第九步:关闭资源
consumer.close();
session.close();
connection.close();
}

/**
* 订阅消息
* @throws Exception
*/
public void getTopicTest() throws Exception {
//消费者:接收消息。
//第一步:创建一个ConnectionFactory对象。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.191.4:61616");
//第二步:从ConnectionFactory对象中获得一个Connection对象。
Connection connection = connectionFactory.createConnection();
//第三步:开启连接。调用Connection对象的start方法。
connection.start();
//第四步:使用Connection对象创建一个Session对象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//第五步:使用Session对象创建一个Destination对象。和发送端保持一致Topic。
Topic topic = session.createTopic("Dai-Topic");
//第六步:使用Session对象创建一个Consumer对象。
MessageConsumer consumer = session.createConsumer(topic);
//第七步:接收消息。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("接收到消息是:"+textMessage.getText().toString());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
//第九步:关闭资源
consumer.close();
session.close();
connection.close();
}
 

整合spring后的使用(配置文件略)

信息生产者

 //JMS提供的ActiveMq的工具类
@Autowired
private JmsTemplate jmsTemplate;
//这个是发布/订阅模式
@Resource
private Destination topicDestination;
//将添加商品的消息添加到队列
jmsTemplate.send(topicDestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(itemID + "");
}
});


信息的接受者

/**
* 消息队列的监听器
* @author adrain
*
*/
public class SearchMessageListener implements MessageListener {

@Autowired
private SearchItemService searchItemService;

@Override
public void onMessage(Message message) {
try {
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
String itemId = textMessage.getText();
Thread.sleep(1000);
searchItemService.addDocument(Long.parseLong(itemId));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: