您的位置:首页 > 其它

ActiveMQ 入门

2016-01-26 21:02 260 查看
 

1、介绍

ActiveMQ是一款流行的、强大的消息系统。

ActiveMQ是速度快,支持多种语客户端、支持多种协议。

 

2、运行环境

5.10及以下版本 需要JRE1.6,5.10以上版本需要JRE1.7

 

3、ActiveMQ安装

下载ActiveMQ,解压到相应目录下。

下载地址:http://activemq.apache.org/download.html

请根据自己的运行环境下载相应的版本。

 

4、运行ActiveMQ

windows 32位

cd [activemq_installdir]/bin/win32/activemq.bat

windows 64位

cd [activemq_installdir]/bin/win64/activemq.bat

 

linux 32位

cd [activemq_installdir]/bin/linux-x86-32/activemq start

或 cd [activemq_installdir]/bin/linux-x86-32/activemq console  可查看mq控制台

linux 64位

cd [activemq_installdir]/bin/linux-x86-64/activemq start

或 cd [activemq_installdir]/bin/linux-x86-64/activemq console  可查看mq控制台

 

ProducerTool.java  生产者

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ProducerTool {

private String user = ActiveMQConnection.DEFAULT_USER;

private String password = ActiveMQConnection.DEFAULT_PASSWORD;

private String url = ActiveMQConnection.DEFAULT_BROKER_URL;

private String subject = "TOOL.DEFAULT";

private Destination destination = null;

private Connection connection = null;

private Session session = null;

private MessageProducer producer = null;

// 初始化
private void initialize() throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
System.out.println(url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}

// 发送消息
public void produceMessage(String message) throws JMSException, Exception {
initialize();
TextMessage msg = session.createTextMessage(message);
connection.start();
producer.send(msg);
}

// 关闭连接
public void close() throws JMSException {
System.out.println("Producer:->Closing connection");
if (producer != null)
producer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
}


ConsumerTool.java 消费者

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ConsumerTool implements MessageListener {

private String user = ActiveMQConnection.DEFAULT_USER;

private String password = ActiveMQConnection.DEFAULT_PASSWORD;

private String url = ActiveMQConnection.DEFAULT_BROKER_URL;

private String subject = "TOOL.DEFAULT";

private Destination destination = null;

private Connection connection = null;

private Session session = null;

private MessageConsumer consumer = null;

// 初始化
private void initialize() throws JMSException, Exception {
// 连接工厂是用户创建连接的对象,这里使用的是ActiveMQ的ActiveMQConnectionFactory根据url,username和password创建连接工厂。
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
// 连接工厂创建一个jms connection
connection = connectionFactory.createConnection();
// 是生产和消费的一个单线程上下文。会话用于创建消息的生产者,消费者和消息。会话提供了一个事务性的上下文。
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 不支持事务
// 目的地是客户用来指定他生产消息的目标还有他消费消息的来源的对象,两种消息传递方式:点对点和发布/订阅
destination = session.createQueue(subject);
// 会话创建消息的生产者将消息发送到目的地
consumer = session.createConsumer(destination);

}

// 消费消息
public void consumeMessage() throws JMSException, Exception {
initialize();
connection.start();

System.out.println("Consumer:->Begin listening...");
// 开始监听
consumer.setMessageListener(this);
// Message message = consumer.receive();
}

// 关闭连接
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}

// 消息处理函数
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
System.out.println("Consumer:->Received: " + msg);
} else {
System.out.println("Consumer:->Received: " + message);
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}


 

测试类:

TestProducer.java

import javax.jms.JMSException;

public class TestProducer {

public static void main(String[] args) throws JMSException, Exception {
ProducerTool producer = new ProducerTool();
producer.produceMessage("Hello, world!");
producer.close();
}

}


TestConsumer.java

import javax.jms.JMSException;

public class TestConsumer {

public static void main(String[] args) throws JMSException, Exception {
ConsumerTool consumer = new ConsumerTool();
consumer.consumeMessage();
//暂停1分钟,这样1分钟内的消息都可以接收
Thread.sleep(60000);
consumer.close();
}

}


 

 

 

 

 

 

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  jms activemq