您的位置:首页 > 产品设计 > UI/UE

ActiveMQ queue 代码示例

2016-11-24 17:22 337 查看

生产者:

package com.111.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSProducer {
//默认连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
//发送的消息数量
private static final int SENDNUM = 10;

public static void main(String[] args) {
//连接工厂
ConnectionFactory connectionFactory;
//连接
Connection connection = null;
//会话 接受或者发送消息的线程
Session session = null;
//消息的目的地
Destination destination;
//消息生产者
MessageProducer messageProducer;
//消息队列名称
String queueName = "helloWord";

//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);

try {
//通过连接工厂获取连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建session
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//创建一个连接自定义队列名称的消息队列
destination = session.createQueue(queueName);
//创建消息生产者
messageProducer = session.createProducer(destination);
//发送消息
sendMessage(session, messageProducer);

session.commit();

} catch (Exception e) {
e.printStackTrace();
}finally{
if(connection != null){
try {
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}

}
/**
* 发送消息
* @param session
* @param messageProducer  消息生产者
* @throws Exception
*/
public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
for (int i = 0; i < SENDNUM; i++) {
//创建一条文本消息
TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i);
System.out.println("发送消息:Activemq 发送消息" + i);

//通过消息生产者发出消息
messageProducer.send(message);
}

}
}

消费者:

package com.111.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSConsumer {
//默认连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;

public static void main(String[] args) {
//连接工厂
ConnectionFactory connectionFactory;
//连接
Connection connection = null;
//会话 接受或者发送消息的线程
Session session;
//消息的目的地
Destination destination;
//消息的消费者
MessageConsumer messageConsumer;
//消息队列名称
String queueName = "helloWord";
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);

try {
//通过连接工厂获取连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个连接自定义队列名称的消息队列
destination = session.createQueue(queueName);
//创建消息消费者
messageConsumer = session.createConsumer(destination);

while (true) {
TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
if(textMessage != null){
System.out.println("收到的消息:" + textMessage.getText());
}else {
break;
}
}

} catch (JMSException e) {
e.printStackTrace();
}

}
}

多线程生产者:

package com.111.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSProducerMultithreading implements Runnable{
//默认连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
//发送的消息数量
private static final int SENDNUM = 3;

/**
* 发送消息
* @param session
* @param messageProducer  消息生产者
* @throws Exception
*/
public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
for (int i = 0; i < SENDNUM; i++) {
//获取当前线程id
String threadId = Thread.currentThread().getId()+"";
//创建一条文本消息
TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i+"生产者线程编号="+threadId);
//控制台打印
System.out.println("ActiveMQ 发送消息" +i+"生产者线程编号="+threadId);
//通过消息生产者发出消息
messageProducer.send(message);
}

}
@Override
public void run() {
//连接工厂
ConnectionFactory connectionFactory;
//连接
Connection connection = null;
//会话 接受或者发送消息的线程
Session session = null;
//消息的目的地
Destination destination;
//消息生产者
MessageProducer messageProducer;
//消息队列名称
String queueName = "Multithreading";
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);

try {
//通过连接工厂获取连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建session
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//创建一个名称为HelloWorld的消息队列
destination = session.createQueue(queueName);
//创建消息生产者
messageProducer = session.createProducer(destination);
//发送消息
sendMessage(session, messageProducer);

session.commit();

} catch (Exception e) {
e.printStackTrace();
}finally{
if(connection != null){
try {
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}

}
}

多线程消费者:

package com.111.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSConsumerMultithreading implements Runnable{
//默认连接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认连接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
@Override
public void run() {
ConnectionFactory connectionFactory;//连接工厂
Connection connection = null;//连接

Session session;//会话 接受或者发送消息的线程
Destination destination;//消息的目的地

MessageConsumer messageConsumer;//消息的消费者

//消息队列名称
String queueName = "Multithreading";

//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);

try {
//通过连接工厂获取连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个连接HelloWorld的消息队列
destination = session.createQueue(queueName);
//创建消息消费者
messageConsumer = session.createConsumer(destination);
String threadId = Thread.currentThread().getId()+"";
while (true) {
TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
if(textMessage != null){
System.out.println("收到的消息:" + textMessage.getText()+" 消费者线程编号="+threadId);
}else {
break;
}
}

} catch (JMSException e) {
e.printStackTrace();
}

}
}

多线程生产者测试类:

package com.111.activemq;

public class JMSProducerMultithreadingTest {
public static void main(String[] args) {

JMSProducerMultithreading jpm = new JMSProducerMultithreading();
//启动10个生产者线程
for(int i = 0 ; i < 10 ; i++){
Thread t = new Thread(jpm);
t.start();
}

}
}

多线程消费者测试类:

package com.111.activemq;

public class JMSConsumerMultithreadingTest {
public static void main(String[] args) {

JMSConsumerMultithreading jcm = new JMSConsumerMultithreading();
//启动3个消费者者线程
for(int i = 0 ; i < 3 ; i++){
Thread t = new Thread(jcm);
t.start();
}

}
}

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: