您的位置:首页 > 其它

【activeMQ】ActiveMQ 发送接收消息(含安装运行测试)

2016-01-28 14:47 567 查看

准备工作

下载

下载地址:http://activemq.apache.org/ 我这里使用的版本为当前最新5.8.0。

下载版本有Windows和Linux两个版本,且都分为32位和64位。根据自己需要选择下载。

安装

我这里下载的为windows的32位版本(apache-activemq-5.8.0-bin.zip),下载后直接解压到需要安装的目录或在直接解压到当前目录也可,解压完安装也完成。

解压后目录如上图,里面包含了示例和文档,及所有的jar包。



运行

进入到bin目录(apache-activemq-5.8.0\bin),双击activemq.bat,就会运行,运行截图如下:



此时表示ActiveMQ已经在运行了,当然正常生产环境下可以设置作为服务在后台运行,并且随系统启动而启动。

测试

ActiveMQ自带了一套管理系统,访问http://localhost:8161/admin/,会出现需要输入用户名和密码的页面如下:



默认用户名和密码都是admin,进入后则为主界面:



在这个界面上,我们可以管理队列及其他的一些功能,为了下面的继续,我们在这里创建一个Queue和一个Topic。



点击目录上的Queues进入创建Queue页面,输入Queue名称,点击Create后下面就创建了G2Queue的queue队列。



这里也可以不用这样手工创建,在发送端指定了一个Queue或Topic名字后,会自动创建一个队列,如上面的choice.queue和FirstQueue都是我测试程序时,程序里面指定的Queue名称,自动创建的。

同样的方式创建一个Topic,如下:



需要引用的jar包

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.2</version>
</dependency>


发送消息

import java.util.Random;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class SendMessage {
private static final String url = "tcp://localhost:61616";
private static final String QUEUE_NAME = "G2Queue";

public void sendMessage() throwsJMSException {
// JMS 客户端到JMSProvider 的连接
Connection connection = null;
try {
// 连接工厂,JMS 用它创建连接
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
ConnectionFactory connectionFactory = newActiveMQConnectionFactory(url);
connection = (Connection)connectionFactory.createConnection();
// 启动连接
connection.start();
//Session:发送或接收消息的线程
// 获取session
Session session = (Session) connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
// 消息的目的地,消息发送到那个队列
Destination destination = session.createQueue(QUEUE_NAME);
//MessageProducer:消息发送者(生产者)
// 创建消息发送者
MessageProducer producer =session.createProducer(destination);
// 设置是否持久化
//DeliveryMode.NON_PERSISTENT:不持久化
//DeliveryMode.PERSISTENT:持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

String msg = "";
int i = 0;
do {
msg = "第"+i + "次发送的消息:"+new Random();
TextMessagemessage = session.createTextMessage(msg);
Thread.sleep(1000);
// 发送消息到目的地方
producer.send(message);
System.out.println("发送消息:" +msg);
i++;
} while (i<1000);
} catch (Exception e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
SendMessage sndMsg = newSendMessage();
try {
sndMsg.sendMessage();
} catch (Exception ex) {
System.out.println(ex.toString());
}
}
}


运行结果如下:



接收消息

package cn.g2room.mq.test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 消息接收类
*
* @createTime:Apr 7, 2013 5:11:11 PM
* @author:<a href="mailto:252909344@qq.com">迷蝶</a>
* @version:0.1
*@lastVersion: 0.1
* @updateTime:
*@updateAuthor: <a href="mailto:252909344@qq.com">迷蝶</a>
* @changesSum:
*
*/
public class ReceiveMessage {
privatestatic final String url = "tcp://localhost:61616";
privatestatic final String QUEUE_NAME = "G2Queue";

publicvoid receiveMessage() {
Connectionconnection = null;
try{
try{
ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory(
url);
connection= connectionFactory.createConnection();
}catch (Exception e) {
System.out.println(e.toString());
}
connection.start();
Sessionsession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destinationdestination = session.createQueue(QUEUE_NAME);
//消息接收者,也就是消费者
MessageConsumerconsumer = session.createConsumer(destination);

consumeMessagesAndClose(connection,session, consumer);
}catch (Exception e) {
System.out.println(e.toString());
}
}
/**
* 接收和关闭消息,如遇到消息内容为close则,关闭连接
*
* @param connection   JMS 客户端到JMSProvider 的连接
* @param session                   发送或接收消息的线程
* @param consumer              消息接收对象
* @throws JMSException
* @auther <ahref="mailto:252909344@qq.com">迷蝶</a>
* Apr 8, 2013 10:31:55 AM
*/
protectedvoid consumeMessagesAndClose(Connection connection,
Sessionsession, MessageConsumer consumer) throws JMSException {
do{
Messagemessage = consumer.receive(1000);
if("close".equals(message)){
consumer.close();
session.close();
connection.close();
}
if(message != null) {
onMessage(message);
}
}while (true);

}

publicvoid onMessage(Message message) {
try{
if(message instanceof TextMessage) {
TextMessagetxtMsg = (TextMessage) message;
Stringmsg = txtMsg.getText();
System.out.println("Received:" + msg);
}
}catch (Exception e) {
e.printStackTrace();
}

}

publicstatic void main(String args[]) {
ReceiveMessagerm = new ReceiveMessage();
rm.receiveMessage();
}
}


运行结果如下:



至此结束
本文转自:http://blog.csdn.net/leadergg/article/details/8771218
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: