QX项目实战-9.ActiveMQ编程实例
2012-11-22 11:03
423 查看
按照参考1的配置,ActiveMQ已经可以运行了。但是牵涉到它的架构、部署和运行机制比较复杂,动手编程比较困难。所幸基本的消息收发机制已经可以实现了,所以暂时写一个测试例子来测试下消息传递的功能。
按照参考2所述,本例子采用queue消息的形式,用producerClient来发送消息到ActiveMQ的消息队列,用consumerClient来接收ActiveMQ的消息来显示。producerClient和consumerClient分别调用producerTool.java和consumerTool.java类来实现建立连接、发送和接收消息的功能。
producerTool.java代码如下,注意这里的destination= session.createQueue(queue);代码,此时创建的是队列消息,换成destination= session.createTopic(topic);就是主题消息队列。这里使用的是队列消息机制。代码如下:
consumerTool.java如下:
producerClient如下:
consumerClient如下:
首先调用ProducerClient,程序发送一条消息helloworld!到消息服务器的队列中。然后调用ConsumerClient来接受这条消息。可以看到ActiveMQ队列中消息数的前后变化。
运行结果ConsumerClient收到的消息为:
PS(2012-11-22):本例中客户端和服务器代码均运行于ActiveMQ服务器端机器上,生产和消费均在同一IP地址下,测试没有问题。为了保证消息传递,我们采用在服务器端发送消息,客户端接收消息的方法来实现代码。在消费者代码中,我们修改连接URL修改为private String url ="tcp://192.168.195.54:61616";这里192.168.195.54是ActiveMQ服务器地址。上述改写完成后,程序即可实现在服务器端发送消息,在客户端接收消息。
参考
1. QX项目实战-7.ActiveMQ的安装与测试
2. QX项目实战-8.ActiveMQ的Queue消息和Topic消息
按照参考2所述,本例子采用queue消息的形式,用producerClient来发送消息到ActiveMQ的消息队列,用consumerClient来接收ActiveMQ的消息来显示。producerClient和consumerClient分别调用producerTool.java和consumerTool.java类来实现建立连接、发送和接收消息的功能。
producerTool.java代码如下,注意这里的destination= session.createQueue(queue);代码,此时创建的是队列消息,换成destination= session.createTopic(topic);就是主题消息队列。这里使用的是队列消息机制。代码如下:
package homework; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class ProducerTool { private String user =ActiveMQConnection.DEFAULT_USER; private Stringpassword = ActiveMQConnection.DEFAULT_PASSWORD; private String url =ActiveMQConnection.DEFAULT_BROKER_URL; private String queue ="TOOL.DEFAULT"; private String topic ="TOOL.TOPIC"; private Destinationdestination = null; private Connectionconnection = null; private Sessionsession = null; privateMessageProducer producer = null; // 初始化 private voidinitialize() throws JMSException, Exception { ActiveMQConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory( user,password, url); connection =connectionFactory.createConnection(); session =connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination= session.createQueue(queue); // destination =session.createTopic(topic); producer =session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } // 发送消息 public voidproduceMessage(String message) throws JMSException, Exception { initialize(); TextMessagemsg = session.createTextMessage(message); connection.start(); System.out.println("Producer:->Sendingmessage: " + message); producer.send(msg); System.out.println("Producer:->Messagesent complete!"); } // 关闭连接 public void close()throws JMSException { System.out.println("Producer:->Closingconnection"); if (producer!= null) producer.close(); if (session!= null) session.close(); if(connection != null) connection.close(); } }
consumerTool.java如下:
package homework; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.MessageListener; import javax.jms.Message; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class ConsumerTool implements MessageListener { private String user =ActiveMQConnection.DEFAULT_USER; private Stringpassword = ActiveMQConnection.DEFAULT_PASSWORD; private String url =ActiveMQConnection.DEFAULT_BROKER_URL; private String queue ="TOOL.DEFAULT"; private String topic ="TOOL.TOPIC"; private Destinationdestination = null; private Connectionconnection = null; private Sessionsession = null; privateMessageConsumer consumer = null; // 初始化 private voidinitialize() throws JMSException, Exception { ActiveMQConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory( user,password, url); connection =connectionFactory.createConnection(); session =connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // destination =session.createTopic(topic); destination= session.createQueue(queue); consumer =session.createConsumer(destination); } // 消费消息 public voidconsumeMessage() throws JMSException, Exception { initialize(); connection.start(); System.out.println("Consumer:->Beginlistening "); // 开始监听 consumer.setMessageListener(this); //Messagemessage = consumer.receive(); // System.out.println(((TextMessage)message).getText()); } // 关闭连接 public void close()throws JMSException { System.out.println("Consumer:->Closingconnection"); if (consumer!= null) consumer.close(); if (session!= null) session.close(); if(connection != null) connection.close(); } // 消息处理函数 public voidonMessage(Message message) { try { if(message instanceof TextMessage) { TextMessagetxtMsg = (TextMessage) message; Stringmsg = txtMsg.getText(); System.out.println("Consumer:->Received:" + msg); }else { System.out.println("Consumer:->Received:" + message); } } catch(JMSException e) { //TODO Auto-generated catch block e.printStackTrace(); } } }
producerClient如下:
package homework; import javax.jms.JMSException; public class producerClient{ public static voidmain(String[] args) throws JMSException, Exception{ ProducerTool producer = newProducerTool(); producer.produceMessage("Hello, world!"); producer.close(); } }
consumerClient如下:
package homework; import javax.jms.JMSException; public class consumerClient { public static voidmain(String[] args) throws JMSException, Exception { ConsumerToolconsumer = new ConsumerTool(); consumer.consumeMessage(); Thread.sleep(500); consumer.close(); } }
首先调用ProducerClient,程序发送一条消息helloworld!到消息服务器的队列中。然后调用ConsumerClient来接受这条消息。可以看到ActiveMQ队列中消息数的前后变化。
运行结果ConsumerClient收到的消息为:
PS(2012-11-22):本例中客户端和服务器代码均运行于ActiveMQ服务器端机器上,生产和消费均在同一IP地址下,测试没有问题。为了保证消息传递,我们采用在服务器端发送消息,客户端接收消息的方法来实现代码。在消费者代码中,我们修改连接URL修改为private String url ="tcp://192.168.195.54:61616";这里192.168.195.54是ActiveMQ服务器地址。上述改写完成后,程序即可实现在服务器端发送消息,在客户端接收消息。
参考
1. QX项目实战-7.ActiveMQ的安装与测试
2. QX项目实战-8.ActiveMQ的Queue消息和Topic消息
相关文章推荐
- activeMQ实例在项目中的运用【项目实战系列】
- activeMQ实例在项目中的运用【项目实战系列】
- activeMQ实例在项目中的运用【项目实战系列】
- activeMQ实例在项目中的运用二【项目实战系列】
- activeMQ实例在项目中的运用【项目实战系列】
- activeMQ实例在项目中的运用二【项目实战系列】
- activeMQ实例在项目中的运用二【项目实战系列】
- activeMQ实例在项目中的运用【项目实战系列】
- activeMQ实例在项目中的运用【项目实战系列】
- C#多线程编程实例实战
- 实战Linux Bluetooth编程 (六) L2CAP编程实例
- Android项目实战(二十八):使用Zxing实现二维码及优化实例
- c#多线程编程实例实战
- Asp.Net Core 2.0 项目实战(6)Redis配置、封装帮助类RedisHelper及使用实例
- C#多线程编程实例实战
- QX项目实战-20.阶段性总结反思
- ActiveMQ 项目实战
- QX项目实战-2.模块分类以及配置读取
- QX项目实战-13.基础架构试验四:JavaWeb消息平台