您的位置:首页 > 其它

【J2EE】企业级项目开发总结--JMS发布-订阅模型篇

2015-09-27 15:57 525 查看
通过上篇博客的示例,我们可以总结出消息通信的基本过程。首先得到连接工厂,通过工厂生产连接,从连接中得到一个会话,根据会话创建消息生产者,目的地,消息和消息消费者。详细过程如下图。



和前一篇P2P中的代码类似,我们今天来看看发布订阅模型。只是在这里把Queue换成了Topic,消息生产者叫做消息发布者,发布一个主题,所有订阅主题的类叫做消息订阅者,每个订阅者都对应一个监听类,当消息发送到主题队列后,会调用监听类的onMessage方法,从而使得各订阅者收到消息,所有的订阅者都能收到同样的消息,消息被消费了不只一次。

代码如下:

TopicPublisher


package com.tgb.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息发布者
 * @author ghy
 *
 */
public class TopicPublisher {
	
	private static final String USERNAME=ActiveMQConnectionFactory.DEFAULT_USER;//默认的连接用户名
	private static final String PASSWORD=ActiveMQConnectionFactory.DEFAULT_PASSWORD;//默认的连接密码
	private static final String BROKEURL=ActiveMQConnectionFactory.DEFAULT_BROKER_URL;//默认的连接地址
	private static final int SENDNUM=10;//发送的消息数量
	
	public static void main(String[] args){
		ConnectionFactory connectionFactory;//连接工厂
		Connection connection=null;	//连接
		Session session;  //会话,接受或者发送消息的线程
		MessageProducer messageProducer; //消息发送者
		
		//实例化连接工厂
		connectionFactory=new ActiveMQConnectionFactory(TopicPublisher.USERNAME,TopicPublisher.PASSWORD,TopicPublisher.BROKEURL);
		
		try {
			connection=connectionFactory.createConnection();//创建连接
			connection.start();//启动连接
			session=connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建连接
			Topic topic =session.createTopic("FirstTopic");//创建主题
			messageProducer=session.createProducer(topic);//创建消息生产者
			sendMessage(session,messageProducer);
			session.commit();
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			if(connection!=null){
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}				
	}
	//发送消息
	public static void sendMessage(Session session,MessageProducer messageProducer){
		for (int i=0;i<TopicPublisher.SENDNUM;i++){
			TextMessage message;
			try {
				message = session.createTextMessage("ActiveMQ 发送的消息" + i);			
				System.out.println("发送消息:" + "ActiveMQ 发送的消息" + i);			
				messageProducer.send(message);
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
}


TopicSubscriber


package com.tgb.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息订阅者1
 * @author ghy
 *
 */
public class TopicSubscriber {
	
	private static final String USERNAME=ActiveMQConnection.DEFAULT_USER;//默认连接的用户名
	private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
	private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL;//默认链接地址
	
	public static void main(String[] args){
		ConnectionFactory connectionFactory;//连接工厂
		Connection connection=null;//连接
		Session session;//会话
		MessageConsumer messageConsumer;//消息消费者
		
		connectionFactory=new ActiveMQConnectionFactory(TopicSubscriber.USERNAME,TopicSubscriber.PASSWORD,TopicSubscriber.BROKEURL);
		try {
			connection=connectionFactory.createConnection();//创建连接
			connection.start();//启动连接
			session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建session
			Topic topic=session.createTopic("FirstTopic");//创建主题
			messageConsumer=session.createConsumer(topic);//创建消息消费者
			
			//注册消息监听
			TopicListener listener=new TopicListener();
			messageConsumer.setMessageListener(listener);
			
		} catch (JMSException e) {
			e.printStackTrace();
		}
		
	}
}


TopicListener

package com.tgb.topic;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 监听器
 * @author ghy
 *
 */
public class TopicListener implements MessageListener {

	
	public void onMessage(Message message) {
		
		try {
			System.out.println("订阅者1收到的消息:" + ((TextMessage)message).getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}		
	}
	
}


TopicSubscriber2和TopicListener2和上面的一样,改一下名称和打印信息即可.

运行代码,得到如下结果:








关于P2P模型和Pub/Sub模型的特点,我们好好体会体会吧.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: