ActiveMQ 入门
2016-01-26 21:02
260 查看
1、介绍
ActiveMQ是一款流行的、强大的消息系统。
ActiveMQ是速度快,支持多种语客户端、支持多种协议。
2、运行环境
5.10及以下版本 需要JRE1.6,5.10以上版本需要JRE1.7
3、ActiveMQ安装
下载ActiveMQ,解压到相应目录下。
下载地址:http://activemq.apache.org/download.html
请根据自己的运行环境下载相应的版本。
4、运行ActiveMQ
windows 32位
cd [activemq_installdir]/bin/win32/activemq.bat
windows 64位
cd [activemq_installdir]/bin/win64/activemq.bat
linux 32位
cd [activemq_installdir]/bin/linux-x86-32/activemq start
或 cd [activemq_installdir]/bin/linux-x86-32/activemq console 可查看mq控制台
linux 64位
cd [activemq_installdir]/bin/linux-x86-64/activemq start
或 cd [activemq_installdir]/bin/linux-x86-64/activemq console 可查看mq控制台
ProducerTool.java 生产者
import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class ProducerTool { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "TOOL.DEFAULT"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageProducer producer = null; // 初始化 private void initialize() throws JMSException, Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); System.out.println(url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } // 发送消息 public void produceMessage(String message) throws JMSException, Exception { initialize(); TextMessage msg = session.createTextMessage(message); connection.start(); producer.send(msg); } // 关闭连接 public void close() throws JMSException { System.out.println("Producer:->Closing connection"); if (producer != null) producer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } }
ConsumerTool.java 消费者
import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.MessageListener; import javax.jms.Message; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class ConsumerTool implements MessageListener { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "TOOL.DEFAULT"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageConsumer consumer = null; // 初始化 private void initialize() throws JMSException, Exception { // 连接工厂是用户创建连接的对象,这里使用的是ActiveMQ的ActiveMQConnectionFactory根据url,username和password创建连接工厂。 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); // 连接工厂创建一个jms connection connection = connectionFactory.createConnection(); // 是生产和消费的一个单线程上下文。会话用于创建消息的生产者,消费者和消息。会话提供了一个事务性的上下文。 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 不支持事务 // 目的地是客户用来指定他生产消息的目标还有他消费消息的来源的对象,两种消息传递方式:点对点和发布/订阅 destination = session.createQueue(subject); // 会话创建消息的生产者将消息发送到目的地 consumer = session.createConsumer(destination); } // 消费消息 public void consumeMessage() throws JMSException, Exception { initialize(); connection.start(); System.out.println("Consumer:->Begin listening..."); // 开始监听 consumer.setMessageListener(this); // Message message = consumer.receive(); } // 关闭连接 public void close() throws JMSException { System.out.println("Consumer:->Closing connection"); if (consumer != null) consumer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } // 消息处理函数 public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); System.out.println("Consumer:->Received: " + msg); } else { System.out.println("Consumer:->Received: " + message); } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
测试类:
TestProducer.java
import javax.jms.JMSException; public class TestProducer { public static void main(String[] args) throws JMSException, Exception { ProducerTool producer = new ProducerTool(); producer.produceMessage("Hello, world!"); producer.close(); } }
TestConsumer.java
import javax.jms.JMSException; public class TestConsumer { public static void main(String[] args) throws JMSException, Exception { ConsumerTool consumer = new ConsumerTool(); consumer.consumeMessage(); //暂停1分钟,这样1分钟内的消息都可以接收 Thread.sleep(60000); consumer.close(); } }
相关文章推荐
- 解析ActiveMQ的使用说明总结
- ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现
- activemq报EOFExceptionjvm错误
- ActiveMQ 消息服务(一)
- ActiveMQ 消息服务(二)
- ActiveMQ 消息服务(三)
- Spring+Log4j+ActiveMQ实现远程记录日志——实战+分析
- JMS-使用消息队列优化网站性能
- 架构优化 - 应用,MQ Broker,业务处理分层
- 基于zookeeper+leveldb搭建activemq集群
- jms异步通信全攻略
- JMS
- ActiveMQ 实例
- 一台机器上运行多个ActiveMq
- activemq安全设置 设置admin的用户名和密码
- Ubuntu 14.04.1 安装 activemq 5.11.1
- 消息中间件之ActiveMQ
- 关于使用MQ系统解耦的一点思考
- 解决Activemq5.8启动报存储空间不足
- 解决Spring集成Activemq使用ObjectMessage报错