您的位置:首页 > 运维架构

Activemq学习笔记---Topic及JNDI使用

2016-01-21 10:29 417 查看
上一节已介绍了jms的数据传输模型基本概念,这节示例Pub/Sub模型。Activemq可以与jndi提供者配合使用,但其本身也提供了jndi服务,使得客户端可以在缺少JNDI提供者的情况下享受JNDI编程带来的好处。但该jndi只支持Activemq,其它需要提供jndi服务的不使用(比如jdbc)。该示例将结合jndi进行演示:

一、代码示例

发布者(ActivemqTopicPublisher.java)

package com.css.sword.service;

import java.net.URISyntaxException;

import javax.jms.*;
import javax.naming.InitialContext;

public class ActivemqTopicPublisher {
private  Session session;
private  MessageProducer publisher ;
private Connection connection;
public void initialize() throws URISyntaxException, Exception
{
/*这部分注释是原始方式构建topic*/
/*ConnectionFactory connectFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = connectFactory.createConnection();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination destination =  session.createTopic("topic1");
publisher = session.createProducer(destination);
connection.start();*/
/*这部分是用Activemq本身自带jndi构建topic*/
// create a new intial context, which loads from jndi.properties file
InitialContext ctx = new InitialContext();
// lookup the connection factory
ConnectionFactory factory = (ConnectionFactory) ctx.lookup("ConnectionFactory");
// create a new TopicConnection for pub/sub messaging
connection = factory.createConnection();
// lookup an existing topic
javax.jms.Topic mytopic = (javax.jms.Topic)ctx.lookup("MyTopic");
// create a new TopicSession for the client
session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
// create a new subscriber to receive messages
publisher = session.createProducer(mytopic);

connection.start();
}
public void sendText(String Message)
{
try {
TextMessage text = session.createTextMessage(Message);
System.out.println("Sending message:"+text.getText());
publisher.send(text);

} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

public void submit() throws JMSException
{
session.commit();
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Producer:->Closing connection");
if (publisher != null)
publisher.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
}


订阅者(ActivemqTopicPublisher.java)

package com.css.sword.service;

import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class ActivemqTopicSubscriber implements MessageListener {

private String name = "";

private String subject = "TOOL.DEFAULT";

private Destination destination = null;

private Connection connection = null;

private Session session = null;

private MessageConsumer consumer = null;
ActivemqTopicSubscriber(String name){
this.name=name;
}

public  void initialize() throws JMSException, NamingException
{
/*采用原始方式构建topic*/
/*ConnectionFactory connectFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination =  session.createTopic("topic1");
consumer = session.createConsumer(destination);
connection.start();*/

/*利用activemq本身的jndi构建*/
// create a new intial context, which loads from jndi.properties file
InitialContext ctx = new InitialContext();
// lookup the connection factory
ConnectionFactory factory = (ConnectionFactory) ctx.lookup("ConnectionFactory");
// create a new TopicConnection for pub/sub messaging
connection = factory.createConnection();
// lookup an existing topic
javax.jms.Topic mytopic = (javax.jms.Topic)ctx.lookup("MyTopic");
// create a new TopicSession for the client
session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
// create a new subscriber to receive messages
consumer = session.createConsumer(mytopic);

connection.start();

}

public void recive() throws NamingException
{
try {
initialize();
System.out.println("Consumer("+name+"):->Begin listening...");
// 开始监听
consumer.setMessageListener(this);
/* Message message = consumer.receive(); //主动接收消息
System.out.println("consumer recive:"+((TextMessage)message).getText());
*/
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void onMessage(Message arg0) {
// TODO Auto-generated method stub
try{
if(arg0 instanceof TextMessage)
{
TextMessage txtMsg = (TextMessage) arg0;
System.out.println("consumer("+name+") recive:"+txtMsg.getText());
}
}catch(Exception e)
{
e.printStackTrace();
}
}
public void submit() throws JMSException
{
session.commit();
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Consumer:->Closing connection");
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}

}


测试类:

package com.css.sword.service;

import java.net.URISyntaxException;

public class TestActiveMqTopic {

public static void main(String[] args) throws URISyntaxException, Exception
{
ActivemqTopicPublisher producer = new ActivemqTopicPublisher();
ActivemqTopicSubscriber consumer = new ActivemqTopicSubscriber("1");
ActivemqTopicSubscriber consumer1 = new ActivemqTopicSubscriber("2");
producer.initialize();
System.out.println("consumer1开始监听");
consumer.recive();
System.out.println("consumer2开始监听");
consumer1.recive();

Thread.sleep(500);
for(int i=0;i<10;i++)
{
producer.sendText("Hello, world!"+i);
}
producer.submit();
producer.close();

}

}


jndi配置文件:

jndi放在src文件夹下即可

## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements.  See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License.  You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0 ##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------

# START SNIPPET: jndi

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory

# use the following property to configure the default connector
java.naming.provider.url = tcp://localhost:61616

# use the following property to specify the JNDI name the connection factory
# should appear as.
connectionFactoryNames = ConnectionFactory, queueConnectionFactory, topicConnectionFactry

# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
queue.MyQueue = example.MyQueue

# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
topic.MyTopic = example.MyTopic

# END SNIPPET: jndi


二、运行结果



可以看出每条消息都会被所有消费者消费
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  activemq