ActiveMQ 的的使用
2016-06-02 15:24
197 查看
消息队列
消息生产者:JMSProducer
消息消费者第一种方式(不推荐)
消息消费者第二种方式(使用监听)
消息监听器:
JMSConsumer2
订阅/发布
消息生产者-消息发布者:
消息订阅者一
监听器
消息订阅者2
监听器
消息消费者--消息订阅者2
消息生产者:JMSProducer
package com.me.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息生产者 * @author Administrator * */ public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认的连接的用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认的连接的密码 private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默认的连接地址 private static final int SENDNUM = 10;// 发送的消息数量 public static void main(String[] args) { ConnectionFactory connectionFactory;// 连接工厂 Connection connection = null;// 连接 Session session;// 回话 接受或者发送消息的线程 Destination destination;// 消息的目的地 MessageProducer messageProducer;// 消息的消费者 // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKER_URL); try { connection = connectionFactory.createConnection();// 通过工厂获取连接 connection.start();// 启动连接 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);// 创建session destination = session.createQueue("FirstQueue1");// 创建消息队列 messageProducer = session.createProducer(destination);// 创建消息的生产者 sendMessage(session, messageProducer);//发送消息 session.commit(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if(connection!=null){ try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException{ for (int i = 0; i < JMSProducer.SENDNUM; i++) { TextMessage message =session.createTextMessage("ActiveMQ 发送的消息"+i); System.out.println("ActiveMQ 发送的消息"+i); messageProducer.send(message); } } }<span style="font-family:Arial, Helvetica, sans-serif;"><span style="white-space: normal;"> </span></span>
消息消费者第一种方式(不推荐)
package com.me.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息消费者 * * @author Administrator * */ public class JMSConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认的连接的用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认的连接的密码 private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory;// 连接工厂 Connection connection = null;// 连接 Session session;// 回话 接受或者发送消息的线程 Destination destination;// 消息的目的地 MessageConsumer messageConsumer;// 消息的消费者 // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKER_URL); try { connection = connectionFactory.createConnection();// 通过工厂获取连接 connection.start();// 启动连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 创建session destination = session.createQueue("FirstQueue1");// 创建连接的消息队列 messageConsumer = session.createConsumer(destination);// 创建消费者 while (true) { TextMessage textMessage = (TextMessage) messageConsumer.receive(100000); if (textMessage != null) { System.out.println("收到的消息:" + textMessage.getText()); } else { break; } } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } // } }
消息消费者第二种方式(使用监听)
消息监听器:
package com.me.activemq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 消息监听 * * @author Administrator * */ public class Listener implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("收到的消息:" + ((TextMessage) message).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
JMSConsumer2
package com.me.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息消费者 * * @author Administrator * */ public class JMSConsumer2 { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认的连接的用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认的连接的密码 private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory;// 连接工厂 Connection connection = null;// 连接 Session session;// 回话 接受或者发送消息的线程 Destination destination;// 消息的目的地 MessageConsumer messageConsumer;// 消息的消费者 // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKER_URL); try { connection = connectionFactory.createConnection();// 通过工厂获取连接 connection.start();// 启动连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 创建session destination = session.createQueue("FirstQueue1");// 创建连接的消息队列 messageConsumer = session.createConsumer(destination);// 创建消费者 messageConsumer.setMessageListener(new Listener());//注册消息监听 } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } // } }
订阅/发布
消息生产者-消息发布者:
package com.me.activemq2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息生产者-消息发布者 * @author Administrator * */ public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认的连接的用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认的连接的密码 private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默认的连接地址 private static final int SENDNUM = 10;// 发送的消息数量 public static void main(String[] args) { ConnectionFactory connectionFactory;// 连接工厂 Connection connection = null;// 连接 Session session;// 回话 接受或者发送消息的线程 Destination destination;// 消息的目的地 MessageProducer messageProducer;// 消息的消费者 // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKER_URL); try { connection = connectionFactory.createConnection();// 通过工厂获取连接 connection.start();// 启动连接 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);// 创建session //destination = session.createQueue("FirstQueue1");// 创建消息队列 destination=session.createTopic("FirstTopic1"); messageProducer = session.createProducer(destination);// 创建消息的生产者 sendMessage(session, messageProducer);//发送消息 session.commit(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if(connection!=null){ try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException{ for (int i = 0; i < JMSProducer.SENDNUM; i++) { TextMessage message =session.createTextMessage("ActiveMQ 发送的消息"+i); System.out.println("ActiveMQ 发送的消息"+i); messageProducer.send(message); } } }
消息订阅者一
监听器
package com.me.activemq2; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 消息监听 * * @author Administrator * */ public class Listener implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("订阅者一收到的消息:" + ((TextMessage) message).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }消息消费者--消息订阅者1
package com.me.activemq2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息消费者--消息订阅者1 * * @author Administrator * */ public class JMSConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认的连接的用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认的连接的密码 private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory;// 连接工厂 Connection connection = null;// 连接 Session session;// 回话 接受或者发送消息的线程 Destination destination;// 消息的目的地 MessageConsumer messageConsumer;// 消息的消费者 // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKER_URL); try { connection = connectionFactory.createConnection();// 通过工厂获取连接 connection.start();// 启动连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 创建session // destination = session.createQueue("FirstQueue1");// 创建连接的消息队列 destination = session.createTopic("FirstTopic1"); messageConsumer = session.createConsumer(destination);// 创建消费者 messageConsumer.setMessageListener(new Listener());// 注册消息监听 } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } // } }
消息订阅者2
监听器
package com.me.activemq2; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 消息监听 * * @author Administrator * */ public class Listener2 implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("订阅者二收到的消息:" + ((TextMessage) message).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
消息消费者--消息订阅者2
package com.me.activemq2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息消费者--消息订阅者2 * * @author Administrator * */ public class JMSConsumer2 { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认的连接的用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认的连接的密码 private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory;// 连接工厂 Connection connection = null;// 连接 Session session;// 回话 接受或者发送消息的线程 Destination destination;// 消息的目的地 MessageConsumer messageConsumer;// 消息的消费者 // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKER_URL); try { connection = connectionFactory.createConnection();// 通过工厂获取连接 connection.start();// 启动连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 创建session destination = session.createTopic("FirstTopic1"); messageConsumer = session.createConsumer(destination);// 创建消费者 messageConsumer.setMessageListener(new Listener2());// 注册消息监听 } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } // } }
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <slf4j.version>1.7.7</slf4j.version> <log4j.version>1.2.17</log4j.version> </properties> <dependencies> <!--activemq --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.14.3</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.14.3</version> </dependency> <!-- Log --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency> </dependencies>
相关文章推荐
- node.js 动态执行脚本
- iOS用户信息单例的创建
- Powershell学习之道-文件夹共享及磁盘映射
- Murano Weekly Meeting 2016.05.24
- CentOS 网卡配置bond4(LACP)
- Introduction to Neural Machine Translation - part 2
- Activity启动模式之singleTask单栈模式
- 经典重拾-第一部分 语言篇 第一章 程序设计入门
- 用代码画流程图和时序图快餐教程(1) - graphviz的dot图
- yii 数据库model查询笔记
- android前端怎样php后台交互(基础篇)
- Android使用Glide加载Gif.解决Glide加载Gif非常慢问题
- Path类详解
- 算法导论之排序和顺序统计学
- c 关键字extern
- 百度BAE2.0 JAVA环境项目部署和调试
- Codeforces Round #355 (Div. 2) C. Vanya and Label 水题
- 课堂练习——买书问题
- js 日期大小比较
- 计算机图形学基础(零) 介绍