Activemq学习笔记---Topic及JNDI使用
2016-01-21 10:29
417 查看
上一节已介绍了jms的数据传输模型基本概念,这节示例Pub/Sub模型。Activemq可以与jndi提供者配合使用,但其本身也提供了jndi服务,使得客户端可以在缺少JNDI提供者的情况下享受JNDI编程带来的好处。但该jndi只支持Activemq,其它需要提供jndi服务的不使用(比如jdbc)。该示例将结合jndi进行演示:
一、代码示例
发布者(ActivemqTopicPublisher.java)
订阅者(ActivemqTopicPublisher.java)
测试类:
jndi配置文件:
jndi放在src文件夹下即可
二、运行结果
可以看出每条消息都会被所有消费者消费
一、代码示例
发布者(ActivemqTopicPublisher.java)
package com.css.sword.service; import java.net.URISyntaxException; import javax.jms.*; import javax.naming.InitialContext; public class ActivemqTopicPublisher { private Session session; private MessageProducer publisher ; private Connection connection; public void initialize() throws URISyntaxException, Exception { /*这部分注释是原始方式构建topic*/ /*ConnectionFactory connectFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); connection = connectFactory.createConnection(); session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("topic1"); publisher = session.createProducer(destination); connection.start();*/ /*这部分是用Activemq本身自带jndi构建topic*/ // create a new intial context, which loads from jndi.properties file InitialContext ctx = new InitialContext(); // lookup the connection factory ConnectionFactory factory = (ConnectionFactory) ctx.lookup("ConnectionFactory"); // create a new TopicConnection for pub/sub messaging connection = factory.createConnection(); // lookup an existing topic javax.jms.Topic mytopic = (javax.jms.Topic)ctx.lookup("MyTopic"); // create a new TopicSession for the client session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); // create a new subscriber to receive messages publisher = session.createProducer(mytopic); connection.start(); } public void sendText(String Message) { try { TextMessage text = session.createTextMessage(Message); System.out.println("Sending message:"+text.getText()); publisher.send(text); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void submit() throws JMSException { session.commit(); } // 关闭连接 public void close() throws JMSException { System.out.println("Producer:->Closing connection"); if (publisher != null) publisher.close(); if (session != null) session.close(); if (connection != null) connection.close(); } }
订阅者(ActivemqTopicPublisher.java)
package com.css.sword.service; import javax.jms.*; import javax.naming.InitialContext; import javax.naming.NamingException; public class ActivemqTopicSubscriber implements MessageListener { private String name = ""; private String subject = "TOOL.DEFAULT"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageConsumer consumer = null; ActivemqTopicSubscriber(String name){ this.name=name; } public void initialize() throws JMSException, NamingException { /*采用原始方式构建topic*/ /*ConnectionFactory connectFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("topic1"); consumer = session.createConsumer(destination); connection.start();*/ /*利用activemq本身的jndi构建*/ // create a new intial context, which loads from jndi.properties file InitialContext ctx = new InitialContext(); // lookup the connection factory ConnectionFactory factory = (ConnectionFactory) ctx.lookup("ConnectionFactory"); // create a new TopicConnection for pub/sub messaging connection = factory.createConnection(); // lookup an existing topic javax.jms.Topic mytopic = (javax.jms.Topic)ctx.lookup("MyTopic"); // create a new TopicSession for the client session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); // create a new subscriber to receive messages consumer = session.createConsumer(mytopic); connection.start(); } public void recive() throws NamingException { try { initialize(); System.out.println("Consumer("+name+"):->Begin listening..."); // 开始监听 consumer.setMessageListener(this); /* Message message = consumer.receive(); //主动接收消息 System.out.println("consumer recive:"+((TextMessage)message).getText()); */ } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void onMessage(Message arg0) { // TODO Auto-generated method stub try{ if(arg0 instanceof TextMessage) { TextMessage txtMsg = (TextMessage) arg0; System.out.println("consumer("+name+") recive:"+txtMsg.getText()); } }catch(Exception e) { e.printStackTrace(); } } public void submit() throws JMSException { session.commit(); } // 关闭连接 public void close() throws JMSException { System.out.println("Consumer:->Closing connection"); if (consumer != null) consumer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } }
测试类:
package com.css.sword.service; import java.net.URISyntaxException; public class TestActiveMqTopic { public static void main(String[] args) throws URISyntaxException, Exception { ActivemqTopicPublisher producer = new ActivemqTopicPublisher(); ActivemqTopicSubscriber consumer = new ActivemqTopicSubscriber("1"); ActivemqTopicSubscriber consumer1 = new ActivemqTopicSubscriber("2"); producer.initialize(); System.out.println("consumer1开始监听"); consumer.recive(); System.out.println("consumer2开始监听"); consumer1.recive(); Thread.sleep(500); for(int i=0;i<10;i++) { producer.sendText("Hello, world!"+i); } producer.submit(); producer.close(); } }
jndi配置文件:
jndi放在src文件夹下即可
## --------------------------------------------------------------------------- ## Licensed to the Apache Software Foundation (ASF) under one or more ## contributor license agreements. See the NOTICE file distributed with ## this work for additional information regarding copyright ownership. ## The ASF licenses this file to You under the Apache License, Version 2.0 ## (the "License"); you may not use this file except in compliance with ## the License. You may obtain a copy of the License at ## ## http://www.apache.org/licenses/LICENSE-2.0 ## ## Unless required by applicable law or agreed to in writing, software ## distributed under the License is distributed on an "AS IS" BASIS, ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ## See the License for the specific language governing permissions and ## limitations under the License. ## --------------------------------------------------------------------------- # START SNIPPET: jndi java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory # use the following property to configure the default connector java.naming.provider.url = tcp://localhost:61616 # use the following property to specify the JNDI name the connection factory # should appear as. connectionFactoryNames = ConnectionFactory, queueConnectionFactory, topicConnectionFactry # register some queues in JNDI using the form # queue.[jndiName] = [physicalName] queue.MyQueue = example.MyQueue # register some topics in JNDI using the form # topic.[jndiName] = [physicalName] topic.MyTopic = example.MyTopic # END SNIPPET: jndi
二、运行结果
可以看出每条消息都会被所有消费者消费
相关文章推荐
- 解析ActiveMQ的使用说明总结
- ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现
- activemq报EOFExceptionjvm错误
- ActiveMQ 消息服务(一)
- ActiveMQ 消息服务(二)
- ActiveMQ 消息服务(三)
- Spring+Log4j+ActiveMQ实现远程记录日志——实战+分析
- 基于zookeeper+leveldb搭建activemq集群
- ActiveMQ 实例
- 一台机器上运行多个ActiveMq
- activemq安全设置 设置admin的用户名和密码
- Ubuntu 14.04.1 安装 activemq 5.11.1
- 在Spring中使用ActiveMQ发送邮件
- 多个地市连接MQ,如果较长时间没有消息发送,ActiveMQ的消费端会自动断开连接(topic端)
- 通过Java操作ActiveMQ的代码记录
- 通过spring开发ActiveMQ简单应用
- 【ActiveMQ教程】简介
- 【ActiveMQ教程】点对点(Point-to-Point)消息教程
- 【ActiveMQ教程】发布/订阅(Publish/Subscribe)消息教程
- JMS和消息驱动Bean(MDB)