您的位置:首页 > 其它

activemq的使用方法

2015-09-22 21:23 351 查看
activemq是Apache的一款开源消息总线,主要用来做消息的分发。

首先需要下载MQ,进行启动。

然后在控制台创建队列,初始用户名密码admin/admin。

然后可以写生产者、消费者进行测试了。由于activemq支持spring,因此有两种不同的写法:

方法一:创建factory, connection, session, destination, producer,consumer

方法二:通过配置文件进行创建(未尝试)。

最初在其作用的理解上有一些偏差,实际上是,在发送端引入MQ的jar包,向指定的MQ服务器发送信息,MQ会自动将其添加到消息队列中,用控制台可以比较清晰的看到队列情况:http://localhost:8161/admin/

在接收端循环扫描要接收的队列,当读取到信息时进行接收处理。

需要注意的是,mq支持持久化,可将消息持久化到本地文件、数据库。

另一个需要注意的地方是,创建会话session时,第一个参数为true时,需要向服务器确认消息的接收。否则服务器认为没有成功接收,引用一下其他同学的话:

createSession(paramA,paramB);

paramA 取值有 : true or false 表示是否支持事务

paramB 取值有:Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE,SESSION_TRANSACTED

createSession(paramA,paramB);

paramA是设置事务的,paramB设置acknowledgment mode

paramA设置为false时:paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。

paramA设置为true时:paramB的值忽略, acknowledgment mode被jms服务器设置为SESSION_TRANSACTED 。

Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。

Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。

DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。

附代码
接收端:

package com.receiver;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.util.Properties;

import javax.jms.*;

import org.apache.activemq.*;

public class MessageReceiver implements IMessageReceiver {

public ActiveMQConnectionFactory connectionFactory = null;
public Connection connection = null;
public Session session = null;
public Destination destination = null;
public MessageConsumer getConsumer() {
return consumer;
}

public void setConsumer(MessageConsumer consumer) {
this.consumer = consumer;
}

public MessageConsumer consumer = null;

//初始化,创建factory, connection, session, destination, producer
public MessageReceiver(){
try {
InputStream inProperties=MessageReceiver.class.getResourceAsStream("../config/connection.properties");
Properties properties = new Properties();
properties.load(inProperties);
//创建factory
connectionFactory = new ActiveMQConnectionFactory(properties.getProperty("name"),
properties.getProperty("password"),
properties.getProperty("brokerURL"));
//创建connection
connection = connectionFactory.createConnection();
connection.start();
//获取操作连接
session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
//获取消息目的地,需在控制台配置
destination = session.createQueue(properties.getProperty("queueName"));
//得到消息接收者
consumer = session.createConsumer(destination);

} catch (Exception e) {
e.printStackTrace();
}
}

public void ReceiveMessage(MessageConsumer consumer) {
int i = 0;
while(true){
try {
TextMessage message = (TextMessage) consumer.receive(RECEIVE_TIME);
if(message != null){
System.out.println("queue1 "+message.getText()+"   "+i);
FileOutputStream out;
out = new FileOutputStream("D:/test.txt");
PrintStream p=new PrintStream(out);
p.println("queue1 "+message.getText()+"   "+i);
out.close();

}
Thread.sleep(1000);
i++;
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

public void CloseConnection(Connection connection) {
if(connection != null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}

public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}

public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}

public Connection getConnection() {
return connection;
}

public void setConnection(Connection connection) {
this.connection = connection;
}

public Session getSession() {
return session;
}

public void setSession(Session session) {
this.session = session;
}

public Destination getDestination() {
return destination;
}

public void setDestination(Destination destination) {
this.destination = destination;
}

}


package com.receiver;

import javax.jms.JMSException;
import javax.jms.TextMessage;

public class ReceiveMain {

/**
* @param args
* @throws JMSException
*/
public static void main(String[] args) throws JMSException {
MessageReceiver messageReceiver = new MessageReceiver();
messageReceiver.ReceiveMessage(messageReceiver.getConsumer());
messageReceiver.CloseConnection(messageReceiver.getConnection());
}

}


发送端:

package com.sender;

import java.io.InputStream;
import java.util.Properties;

import javax.jms.*;

import org.apache.activemq.*;

public class MessageSender implements IMessageSender {

public ActiveMQConnectionFactory connectionFactory = null;
public Connection connection = null;
public Session session = null;
public Destination destination = null;
public MessageProducer producer = null;

//初始化,创建factory, connection, session, destination, producer
public MessageSender(){
try {
InputStream inProperties=MessageSender.class.getResourceAsStream("../config/connection.properties");
Properties properties = new Properties();
properties.load(inProperties);
//创建factory
connectionFactory = new ActiveMQConnectionFactory(properties.getProperty("name"),
properties.getProperty("password"),
properties.getProperty("brokerURL"));
//创建connection
connection = connectionFactory.createConnection();
connection.start();
//获取操作连接
session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
//获取消息目的地,需在控制台配置
destination = session.createQueue(properties.getProperty("queueName"));
//得到消息发送者
producer = session.createProducer(destination);
//设置不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public TextMessage CreateMessage(Session session, int i) {
String strMessage = "hello world!   "+i;
TextMessage message = null;
try {
message = session.createTextMessage(strMessage);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return message;
}

public void SendMessage(TextMessage message, MessageProducer producer) {
try {
producer.send(message);

} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

public void CloseConnection(Connection connection) {
if(connection != null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}

public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}

public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}

public Connection getConnection() {
return connection;
}

public void setConnection(Connection connection) {
this.connection = connection;
}

public Session getSession() {
return session;
}

public void setSession(Session session) {
this.session = session;
}

public Destination getDestination() {
return destination;
}

public void setDestination(Destination destination) {
this.destination = destination;
}

public MessageProducer getProducer() {
return producer;
}

public void setProducer(MessageProducer producer) {
this.producer = producer;
}
}


package com.sender;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;

import javax.jms.JMSException;
import javax.jms.TextMessage;

public class SendMain {

/**
* @param args
* @throws JMSException
*/
public static void main(String[] args) throws JMSException {
MessageSender messageSender = new MessageSender();
for(int i = 0;i < 10;i++){
TextMessage textMessage = messageSender.CreateMessage(messageSender.getSession(),i);
messageSender.SendMessage(textMessage, messageSender.getProducer());
System.out.println("send message sucess!  :  " + i);
FileOutputStream out;
try {
out = new FileOutputStream("D:/test.txt");
PrintStream p=new PrintStream(out);
p.println("send message sucess!  :  " + i);
out.close();
} catch (FileNotFoundException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

messageSender.getSession().commit();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

messageSender.CloseConnection(messageSender.getConnection());
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: