您的位置:首页 > 其它

ActiveMQ 的的使用

2016-06-02 15:24 197 查看
消息队列

消息生产者:JMSProducer

package com.me.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 消息生产者
* @author Administrator
*
*/
public class JMSProducer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认的连接的用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认的连接的密码
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默认的连接地址
private static final int SENDNUM = 10;// 发送的消息数量

public static void main(String[] args) {

ConnectionFactory connectionFactory;// 连接工厂
Connection connection = null;// 连接
Session session;// 回话 接受或者发送消息的线程
Destination destination;// 消息的目的地
MessageProducer messageProducer;// 消息的消费者

// 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD,
JMSProducer.BROKER_URL);
try {
connection = connectionFactory.createConnection();// 通过工厂获取连接
connection.start();// 启动连接
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);// 创建session
destination = session.createQueue("FirstQueue1");// 创建消息队列
messageProducer = session.createProducer(destination);// 创建消息的生产者
sendMessage(session, messageProducer);//发送消息
session.commit();

} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if(connection!=null){
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException{
for (int i = 0; i < JMSProducer.SENDNUM; i++) {
TextMessage message =session.createTextMessage("ActiveMQ 发送的消息"+i);
System.out.println("ActiveMQ 发送的消息"+i);
messageProducer.send(message);
}
}
}<span style="font-family:Arial, Helvetica, sans-serif;"><span style="white-space: normal;">
</span></span>


消息消费者第一种方式(不推荐)

package com.me.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 消息消费者
*
* @author Administrator
*
*/
public class JMSConsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认的连接的用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认的连接的密码
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默认的连接地址

public static void main(String[] args) {

ConnectionFactory connectionFactory;// 连接工厂
Connection connection = null;// 连接
Session session;// 回话 接受或者发送消息的线程
Destination destination;// 消息的目的地
MessageConsumer messageConsumer;// 消息的消费者

// 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD,
JMSConsumer.BROKER_URL);
try {
connection = connectionFactory.createConnection();// 通过工厂获取连接
connection.start();// 启动连接
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 创建session
destination = session.createQueue("FirstQueue1");// 创建连接的消息队列
messageConsumer = session.createConsumer(destination);// 创建消费者
while (true) {
TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
if (textMessage != null) {
System.out.println("收到的消息:" + textMessage.getText());
} else {
break;
}
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} //
}
}


消息消费者第二种方式(使用监听)
消息监听器:

package com.me.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
* 消息监听
*
* @author Administrator
*
*/
public class Listener implements MessageListener {

@Override
public void onMessage(Message message) {
try {
System.out.println("收到的消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}


JMSConsumer2

package com.me.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 消息消费者
*
* @author Administrator
*
*/
public class JMSConsumer2 {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认的连接的用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认的连接的密码
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默认的连接地址

public static void main(String[] args) {

ConnectionFactory connectionFactory;// 连接工厂
Connection connection = null;// 连接
Session session;// 回话 接受或者发送消息的线程
Destination destination;// 消息的目的地
MessageConsumer messageConsumer;// 消息的消费者

// 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD,
JMSConsumer2.BROKER_URL);
try {
connection = connectionFactory.createConnection();// 通过工厂获取连接
connection.start();// 启动连接
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 创建session
destination = session.createQueue("FirstQueue1");// 创建连接的消息队列
messageConsumer = session.createConsumer(destination);// 创建消费者
messageConsumer.setMessageListener(new Listener());//注册消息监听
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} //
}
}


订阅/发布

消息生产者-消息发布者:

package com.me.activemq2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 消息生产者-消息发布者
* @author Administrator
*
*/
public class JMSProducer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认的连接的用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认的连接的密码
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默认的连接地址
private static final int SENDNUM = 10;// 发送的消息数量

public static void main(String[] args) {

ConnectionFactory connectionFactory;// 连接工厂
Connection connection = null;// 连接
Session session;// 回话 接受或者发送消息的线程
Destination destination;// 消息的目的地
MessageProducer messageProducer;// 消息的消费者

// 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD,
JMSProducer.BROKER_URL);
try {
connection = connectionFactory.createConnection();// 通过工厂获取连接
connection.start();// 启动连接
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);// 创建session
//destination = session.createQueue("FirstQueue1");// 创建消息队列
destination=session.createTopic("FirstTopic1");
messageProducer = session.createProducer(destination);// 创建消息的生产者
sendMessage(session, messageProducer);//发送消息
session.commit();

} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if(connection!=null){
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException{
for (int i = 0; i < JMSProducer.SENDNUM; i++) {
TextMessage message =session.createTextMessage("ActiveMQ 发送的消息"+i);
System.out.println("ActiveMQ 发送的消息"+i);
messageProducer.send(message);
}
}
}


消息订阅者一

监听器

package com.me.activemq2;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
* 消息监听
*
* @author Administrator
*
*/
public class Listener implements MessageListener {

@Override
public void onMessage(Message message) {
try {
System.out.println("订阅者一收到的消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}
消息消费者--消息订阅者1
package com.me.activemq2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 消息消费者--消息订阅者1
*
* @author Administrator
*
*/
public class JMSConsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认的连接的用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认的连接的密码
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默认的连接地址

public static void main(String[] args) {

ConnectionFactory connectionFactory;// 连接工厂
Connection connection = null;// 连接
Session session;// 回话 接受或者发送消息的线程
Destination destination;// 消息的目的地
MessageConsumer messageConsumer;// 消息的消费者

// 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD,
JMSConsumer.BROKER_URL);
try {
connection = connectionFactory.createConnection();// 通过工厂获取连接
connection.start();// 启动连接
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 创建session
//	destination = session.createQueue("FirstQueue1");// 创建连接的消息队列
destination = session.createTopic("FirstTopic1");
messageConsumer = session.createConsumer(destination);// 创建消费者
messageConsumer.setMessageListener(new Listener());// 注册消息监听

} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} //
}
}


消息订阅者2

监听器

package com.me.activemq2;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
* 消息监听
*
* @author Administrator
*
*/
public class Listener2 implements MessageListener {

@Override
public void onMessage(Message message) {
try {
System.out.println("订阅者二收到的消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}


消息消费者--消息订阅者2

package com.me.activemq2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
*  消息消费者--消息订阅者2
*
* @author Administrator
*
*/
public class JMSConsumer2 {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认的连接的用户名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认的连接的密码
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默认的连接地址

public static void main(String[] args) {

ConnectionFactory connectionFactory;// 连接工厂
Connection connection = null;// 连接
Session session;// 回话 接受或者发送消息的线程
Destination destination;// 消息的目的地
MessageConsumer messageConsumer;// 消息的消费者

// 实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD,
JMSConsumer2.BROKER_URL);
try {
connection = connectionFactory.createConnection();// 通过工厂获取连接
connection.start();// 启动连接
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 创建session
destination = session.createTopic("FirstTopic1");
messageConsumer = session.createConsumer(destination);// 创建消费者
messageConsumer.setMessageListener(new Listener2());// 注册消息监听
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} //
}
}

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
</properties>

<dependencies>

<!--activemq -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.3</version>
</dependency>

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.14.3</version>
</dependency>

<!-- Log -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: