ActiveMQ基础实例演示demo
2015-11-10 22:42
323 查看
推荐ActiveMQ视频教学 一头扎进JMS之ActiveMQ
本博客只是根据视频做的简单总结。
一、新建项目,引入jar包
新建java项目JMS,引入ActiveMQ完整包activemq-all-5.12.1.jar
具体操作,在项目下新建lib文件夹,将activemq-all-5.12.1.jar复制到lib文件夹中,然后选中jar包,右键—— build path—— add build path
下载window版的ActiveMQ ,下载地址:http://download.csdn.net/download/wangshuxuncom/8241085 ;
二、Point-To-Point 点对点通信模式
消息消费者:
消息生产者:
消息监听者:
启动ActiveMQ服务,进入主页:
由于我的机器是64位的,所以选中win64文件夹下的activemq.bat
如果启动出错,首先检查java的环境变量设置,然后可能是端口被占用了。参考如下方法解决:
用netstat-an无法查出61616被哪个进程占用
经过排查和网上资料参考,被windows的Internetconnection share(ICS)服务占用,ICS是windows的一个Internet共享服务。
为家庭和小型办公网络提供网络地址转换、寻址、名称解析和/或入侵保护服务。
在计算机-》管理,服务中暂停这个服务即可。
启动activemq之后,然后再启动ics即可。这样两个就都可以使用了
启动成功后,打开浏览器,输入http://127.0.0.1:8161/admin/地址,得到下图:用户名和密码都是admin
运行生产者代码:
控制台打印了产生的信息
ActiveMQ的队列FirstQueue1中接到了10条消息,由于消费者没有启动,消息都没有被消费
启动消息消费者:
消费者配置的监听器收到的消息:
队列中的消息已经被消费
这里注意我们的操作顺序:
在Point-To-Point 点对点通信模式中,消息只要没有被消费,一直是挂起的状态,直到被消费者消费。
而在发布/订阅(Pub/Sub)模型中,如果消费者没有订阅主题或没有启动服务,则获得不到消息。
三、发布/订阅(Pub/Sub)模型
消息生产者
主题订阅者一
主题订阅者二
监听器一
监听器二
运行:
这里要消费者先订阅主题,然后生产者再生产主题
本博客只是根据视频做的简单总结。
一、新建项目,引入jar包
新建java项目JMS,引入ActiveMQ完整包activemq-all-5.12.1.jar
具体操作,在项目下新建lib文件夹,将activemq-all-5.12.1.jar复制到lib文件夹中,然后选中jar包,右键—— build path—— add build path
下载window版的ActiveMQ ,下载地址:http://download.csdn.net/download/wangshuxuncom/8241085 ;
二、Point-To-Point 点对点通信模式
消息消费者:
package com.jms.demo; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.swing.text.StyledEditorKit.BoldAction; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息消费者 * @author admin * */ public class JMSConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { ConnectionFactory connectionFactory;//连接工厂 Connection connection = null;//连接 Session session;//会话 Destination destination;//消息的目的地 MessageConsumer messageConsumer;//消息消费者 //实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL); try { connection = connectionFactory.createConnection(); connection.start();//启动连接 //创建session,第一个参数是否有事务 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //创建消息队列,(创建队列作为目的地),名称要对应 destination = session.createQueue("FirstQueue1"); //创建消息消费者 messageConsumer = session.createConsumer(destination); //使用监听器监听发送过来的消息 messageConsumer.setMessageListener(new Listener()); //一直在监听,循环等待 /*while(true){ TextMessage textMessage = (TextMessage) messageConsumer.receive(10000); if(textMessage != null){ System.out.println("收到消息:"+textMessage.getText()); }else{ break; } }*/ } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
消息生产者:
package com.jms.demo; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息生产者 * @author admin * */ public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; private static final int SENDNUM = 10;//发送的消息数量 public static void main(String[] args) { ConnectionFactory connectionFactory;//连接工厂 Connection connection = null;//连接 Session session;//会话 Destination destination;//消息的目的地 MessageProducer messageProducer;//消息生产者 //实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL); //通过连接工厂获取连接 try { connection = connectionFactory.createConnection(); connection.start();//启动连接 //创建session,第一个参数是否有事务 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //创建消息队列,(创建队列作为目的地) destination = session.createQueue("FirstQueue1"); //创建消息生产者 messageProducer = session.createProducer(destination); sendMessage(session,messageProducer); //提交事务 session.commit(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ if(connection != null){ try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } /** * 发送消息 * @throws Exception */ public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{ for (int i = 0; i < JMSProducer.SENDNUM; i++) { TextMessage message = session.createTextMessage("ActiveMQ 发送的消息"+i); System.out.println("发送消息:"+ "ActiveMQ 发送的消息"+i); messageProducer.send(message); } } }
消息监听者:
package com.jms.demo; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 消息监听器 * @author admin * */ public class Listener implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.println("监听器收到的消息:"+((TextMessage)message).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
启动ActiveMQ服务,进入主页:
由于我的机器是64位的,所以选中win64文件夹下的activemq.bat
如果启动出错,首先检查java的环境变量设置,然后可能是端口被占用了。参考如下方法解决:
用netstat-an无法查出61616被哪个进程占用
经过排查和网上资料参考,被windows的Internetconnection share(ICS)服务占用,ICS是windows的一个Internet共享服务。
为家庭和小型办公网络提供网络地址转换、寻址、名称解析和/或入侵保护服务。
在计算机-》管理,服务中暂停这个服务即可。
启动activemq之后,然后再启动ics即可。这样两个就都可以使用了
启动成功后,打开浏览器,输入http://127.0.0.1:8161/admin/地址,得到下图:用户名和密码都是admin
运行生产者代码:
控制台打印了产生的信息
ActiveMQ的队列FirstQueue1中接到了10条消息,由于消费者没有启动,消息都没有被消费
启动消息消费者:
消费者配置的监听器收到的消息:
队列中的消息已经被消费
这里注意我们的操作顺序:
在Point-To-Point 点对点通信模式中,消息只要没有被消费,一直是挂起的状态,直到被消费者消费。
而在发布/订阅(Pub/Sub)模型中,如果消费者没有订阅主题或没有启动服务,则获得不到消息。
三、发布/订阅(Pub/Sub)模型
消息生产者
package com.jms.pubToSub; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息生产者 * * 发布订阅模式 * @author admin * */ public class JMSProducer1 { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; private static final int SENDNUM = 10;//发送的消息数量 public static void main(String[] args) { ConnectionFactory connectionFactory;//连接工厂 Connection connection = null;//连接 Session session;//会话 Destination destination;//消息的目的地 MessageProducer messageProducer;//消息生产者 //实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL); //通过连接工厂获取连接 try { connection = connectionFactory.createConnection(); connection.start();//启动连接 //创建session,第一个参数是否有事务 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //创建Topic主题 destination = session.createTopic("FirstTopic1"); //创建消息生产者 messageProducer = session.createProducer(destination); sendMessage(session,messageProducer); //提交事务 session.commit(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ if(connection != null){ try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } /** * 发送消息 * @throws Exception */ public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{ for (int i = 0; i < JMSProducer1.SENDNUM; i++) { TextMessage message = session.createTextMessage("ActiveMQ 发布的主题信息"+i); System.out.println("发布的消息:"+ "ActiveMQ 发布的主题信息"+i); messageProducer.send(message); } } }
主题订阅者一
package com.jms.pubToSub; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.swing.text.StyledEditorKit.BoldAction; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 主题 订阅者一 * * * 注意:现有先订阅了,才能收到消息 * @author admin * */ public class JMSConsumer1 { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { ConnectionFactory connectionFactory;//连接工厂 Connection connection = null;//连接 Session session;//会话 Destination destination;//消息的目的地 MessageConsumer messageConsumer;//消息消费者 //实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL); try { connection = connectionFactory.createConnection(); connection.start();//启动连接 //创建session,第一个参数是否有事务 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //创建消息队列,(创建队列作为目的地),名称要对应 destination = session.createTopic("FirstTopic1"); //创建消息消费者 messageConsumer = session.createConsumer(destination); //使用监听器监听发送过来的消息 messageConsumer.setMessageListener(new Listener1()); //一直在监听,循环等待 /*while(true){ TextMessage textMessage = (TextMessage) messageConsumer.receive(10000); if(textMessage != null){ System.out.println("收到消息:"+textMessage.getText()); }else{ break; } }*/ } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
主题订阅者二
package com.jms.pubToSub; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.swing.text.StyledEditorKit.BoldAction; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 主题 订阅者二 * @author admin * */ public class JMSConsumer2 { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { ConnectionFactory connectionFactory;//连接工厂 Connection connection = null;//连接 Session session;//会话 Destination destination;//消息的目的地 MessageConsumer messageConsumer;//消息消费者 //实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL); try { connection = connectionFactory.createConnection(); connection.start();//启动连接 //创建session,第一个参数是否有事务 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //创建消息队列,(创建队列作为目的地),名称要对应 destination = session.createTopic("FirstTopic1"); //创建消息消费者 messageConsumer = session.createConsumer(destination); //使用监听器监听发送过来的消息 messageConsumer.setMessageListener(new Listener2()); //一直在监听,循环等待 /*while(true){ TextMessage textMessage = (TextMessage) messageConsumer.receive(10000); if(textMessage != null){ System.out.println("收到消息:"+textMessage.getText()); }else{ break; } }*/ } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
监听器一
package com.jms.pubToSub; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 消息监听器 * @author admin * */ public class Listener1 implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.println("订阅者一收到的消息:"+((TextMessage)message).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
监听器二
package com.jms.pubToSub; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 消息监听器 * @author admin * */ public class Listener2 implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.println("订阅者二收到的消息:"+((TextMessage)message).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
运行:
这里要消费者先订阅主题,然后生产者再生产主题
相关文章推荐
- 如何创建dll工程
- 转:OPTION SQL_SELECT_LIMIT=DEFAULT java连接mysql错误
- UVa1339 - Ancient Cipher
- LeetCode 22 Generate Parentheses(生成括号)
- JavaScript学习笔记三
- 诚风老师-十年了,当年那帮做直销的穷小子都成了富豪
- The hidden implementation(被隐藏的具体实现)
- scanf注意事项
- SDWebImage
- leetcode 49:Group Anagrams
- 进击的KFC:UI(一)UIView及其子类
- python学习笔记(1)
- longestSubstring
- HDU 4739 Zhuge Liang's Mines(DP)
- /etc/fstab 只读无法修改的解决办法
- Android Studio 简单设置
- 高斯混合模型EM算法
- 4个Linux服务器监控工具
- Shiro使用和源码分析---6
- Java Spring的IoC和AOP的知识点速记