您的位置:首页 > 编程语言

QX项目实战-9.ActiveMQ编程实例

2012-11-22 11:03 423 查看
按照参考1的配置,ActiveMQ已经可以运行了。但是牵涉到它的架构、部署和运行机制比较复杂,动手编程比较困难。所幸基本的消息收发机制已经可以实现了,所以暂时写一个测试例子来测试下消息传递的功能。

按照参考2所述,本例子采用queue消息的形式,用producerClient来发送消息到ActiveMQ的消息队列,用consumerClient来接收ActiveMQ的消息来显示。producerClient和consumerClient分别调用producerTool.java和consumerTool.java类来实现建立连接、发送和接收消息的功能。

producerTool.java代码如下,注意这里的destination= session.createQueue(queue);代码,此时创建的是队列消息,换成destination= session.createTopic(topic);就是主题消息队列。这里使用的是队列消息机制。代码如下:

package homework;
 
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class ProducerTool {
         private String user =ActiveMQConnection.DEFAULT_USER;
         private Stringpassword = ActiveMQConnection.DEFAULT_PASSWORD;
         private String url =ActiveMQConnection.DEFAULT_BROKER_URL;
         private String queue ="TOOL.DEFAULT";
         private String topic ="TOOL.TOPIC";
        
         private Destinationdestination = null;
         private Connectionconnection = null;
         private Sessionsession = null;
         privateMessageProducer producer = null;
 
         // 初始化
         private voidinitialize() throws JMSException, Exception {
                  ActiveMQConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory(
                                     user,password, url);
                   connection =connectionFactory.createConnection();
                   session =connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                   destination= session.createQueue(queue);
//               destination =session.createTopic(topic);
                   producer =session.createProducer(destination);
                   producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
         }
 
         // 发送消息
         public voidproduceMessage(String message) throws JMSException, Exception {
                   initialize();
                   TextMessagemsg = session.createTextMessage(message);
                   connection.start();
                   System.out.println("Producer:->Sendingmessage: " + message);
                   producer.send(msg);
                   System.out.println("Producer:->Messagesent complete!");
         }
 
         // 关闭连接
         public void close()throws JMSException {
                   System.out.println("Producer:->Closingconnection");
                   if (producer!= null)
                            producer.close();
                   if (session!= null)
                            session.close();
                   if(connection != null)
                            connection.close();
         }
}


consumerTool.java如下:

package homework;
 
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.TextMessage;
 
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class ConsumerTool implements MessageListener {
         private String user =ActiveMQConnection.DEFAULT_USER;
         private Stringpassword = ActiveMQConnection.DEFAULT_PASSWORD;
         private String url =ActiveMQConnection.DEFAULT_BROKER_URL;
         private String queue ="TOOL.DEFAULT";
         private String topic ="TOOL.TOPIC";
         private Destinationdestination = null;
         private Connectionconnection = null;
         private Sessionsession = null;
         privateMessageConsumer consumer = null;
         // 初始化
         private voidinitialize() throws JMSException, Exception {
                   ActiveMQConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory(
                                     user,password, url);
                   connection =connectionFactory.createConnection();
                   session =connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//               destination =session.createTopic(topic);
                   destination= session.createQueue(queue);
                   consumer =session.createConsumer(destination);
         }
 
         // 消费消息
         public voidconsumeMessage() throws JMSException, Exception {
                   initialize();
                   connection.start();
 
                   System.out.println("Consumer:->Beginlistening ");
                   // 开始监听
                   consumer.setMessageListener(this);
                   //Messagemessage = consumer.receive();
//               System.out.println(((TextMessage)message).getText());
         }
 
         // 关闭连接
         public void close()throws JMSException {
                   System.out.println("Consumer:->Closingconnection");
                   if (consumer!= null)
                            consumer.close();
                   if (session!= null)
                            session.close();
                   if(connection != null)
                            connection.close();
         }
 
         // 消息处理函数
         public voidonMessage(Message message) {
                   try {
                            if(message instanceof TextMessage) {
                                     TextMessagetxtMsg = (TextMessage) message;
                                     Stringmsg = txtMsg.getText();
                                     System.out.println("Consumer:->Received:" + msg);
                            }else {
                                     System.out.println("Consumer:->Received:" + message);
                            }
                   } catch(JMSException e) {
                            //TODO Auto-generated catch block
                            e.printStackTrace();
                   }
         }
}


producerClient如下:

package homework;
import javax.jms.JMSException;
public class producerClient{
 public static voidmain(String[] args) throws JMSException, Exception{
  ProducerTool producer = newProducerTool();
 producer.produceMessage("Hello, world!");
  producer.close();
 }
}


consumerClient如下:

package homework;
 
import javax.jms.JMSException;
 
public class consumerClient {
         public static voidmain(String[] args) throws JMSException, Exception {
                   ConsumerToolconsumer = new ConsumerTool();
                   consumer.consumeMessage();
 
                   Thread.sleep(500);
                   consumer.close();
         }
}


首先调用ProducerClient,程序发送一条消息helloworld!到消息服务器的队列中。然后调用ConsumerClient来接受这条消息。可以看到ActiveMQ队列中消息数的前后变化。



运行结果ConsumerClient收到的消息为:



PS(2012-11-22):本例中客户端和服务器代码均运行于ActiveMQ服务器端机器上,生产和消费均在同一IP地址下,测试没有问题。为了保证消息传递,我们采用在服务器端发送消息,客户端接收消息的方法来实现代码。在消费者代码中,我们修改连接URL修改为private String url ="tcp://192.168.195.54:61616";这里192.168.195.54是ActiveMQ服务器地址。上述改写完成后,程序即可实现在服务器端发送消息,在客户端接收消息。

参考

1. QX项目实战-7.ActiveMQ的安装与测试

2. QX项目实战-8.ActiveMQ的Queue消息和Topic消息
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: