您的位置:首页 > 编程语言 > Java开发

JavaEE(5) - JMS实现企业Pub-Sub消息处理

2015-01-25 22:53 405 查看
1. 在Weblogic服务器上配置Pub-Sub消息目的

向已有的JMS模块中添加消息主题:

Services-->Messaging-->JMS Modules--><Module Name>-->Configuration-->New-->Topic(Name: MessageTopic)

2. 可靠的JMS订阅(NetBeans创建java project: DurablePubSub)

#1. 编写Pub-Sub消息的生产者(MessageSender.java)

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties;

public class MessageSender {

public void sendMessage() throws NamingException, JMSException {
//定义WebLogic默认连接工厂的JNDI
final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
//获取JNDI服务所需的Context
Context ctx = getInitialContext();
//通过JNDI查找获取连接工厂
ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
//通过JNDI查找获取消息目的
Destination dest = (Destination) ctx.lookup("MessageTopic");
//连接工厂创建连接
Connection conn = connFactory.createConnection();
//JMS连接创建JMS会话
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//JMS会话创建消息生产者
MessageProducer sender = session.createProducer(dest);
//设置消息生产者生产出来的消息的传递模式、有效时间。
sender.setDeliveryMode(DeliveryMode.PERSISTENT);
sender.setTimeToLive(20000);
//通过JMS会话创建一个文本消息
TextMessage msg = session.createTextMessage();
//msg.setStringProperty("ConType","txt");
//设置消息内容
msg.setText("Hello");
//发送消息
sender.send(msg);
msg.setText("Welcome to JMS");
//再次发送消息
sender.send(msg);
//关闭资源
session.close();
conn.close();
}

//工具方法,用来获取命名服务的Context对象
private Context getInitialContext() {
// 参看(4)
}

public static void main(String[] args) throws Exception {
MessageSender sender = new MessageSender();
sender.sendMessage();
}
}


#2. 编写Pub-Sub消息的同步接收者(SyncConsumer.java)

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties;

public class SyncConsumer {

public void receiveMessage() throws JMSException, NamingException {
//定义WebLogic默认连接工厂的JNDI
final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
//获取JNDI服务所需的Context
Context ctx = getInitialContext();
//通过JNDI查找获取连接工厂
ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
//通过JNDI查找获取消息目的
Topic dest = (Topic) ctx.lookup("MessageTopic");
//连接工厂创建连接
Connection conn = connFactory.createConnection();
//将客户端ID设为crazyit.org
conn.setClientID("crazyit.org");
//启动JMS连接,让它开始传输JMS消息
conn.start();
//JMS连接创建JMS会话
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建可靠的消息订阅者
MessageConsumer receiver = session.createDurableSubscriber(dest, "crazyit.org");
//同步接收消息,如果没有接收到消息,该方法会阻塞线程
TextMessage msg = (TextMessage) receiver.receiveNoWait();
System.out.println(msg);
if (msg != null) {
System.out.println("同步接收到的消息:" + msg.getText());
}
//关闭资源
session.close();
conn.close();
}

//工具方法,用来获取命名服务的Context对象
private Context getInitialContext() {
// 参看(4)
}

public static void main(String[] args) throws Exception {
SyncConsumer sender = new SyncConsumer();
sender.receiveMessage();
}
}


#3. 编写Pub-Sub消息的异步接收者(AsyncConsumer.java)

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties;

//JMS异步消费者就是一个监听器,故实现MessageListener接口
public class AsyncConsumer implements MessageListener {

public AsyncConsumer() throws NamingException, JMSException, InterruptedException {
//定义WebLogic默认连接工厂的JNDI
final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
//获取JNDI服务所需的Context
Context ctx = getInitialContext();
//通过JNDI查找获取连接工厂
ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
//通过JNDI查找获取消息目的
Topic dest = (Topic) ctx.lookup("MessageTopic");
//连接工厂创建连接
Connection conn = connFactory.createConnection();
//将客户端ID设为crazyit.org
conn.setClientID("leegang.org");
//启动JMS连接,让它开始传输JMS消息
conn.start();
//JMS连接创建JMS会话
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建可靠的消息订阅者
MessageConsumer receiver = session.createDurableSubscriber(dest, "leegang.org");
//为JMS消息消费者绑定消息监听器
receiver.setMessageListener(this);
//程序暂停20s,在此期间内以异步方式接收消息
Thread.sleep(20000);
//关闭资源
session.close();
conn.close();
}

//实现消息监听器必须实现的方法。

public void onMessage(Message m) {
TextMessage msg = (TextMessage) m;
System.out.println(msg);
try {
System.out.println("异步接收的消息:" + msg.getText());
}
catch (JMSException ex) {
ex.printStackTrace();
}
}

//工具方法,用来获取命名服务的Context对象

private Context getInitialContext() {
// 参看(4)
}

public static void main(String[] args) throws Exception {
AsyncConsumer consumer = new AsyncConsumer();
}
}


3. 不可靠的JMS订阅(NetBeans创建java project: JmsPubSub)

#1. 编写Pub-Sub消息的生产者(MessageSender.java)

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties;

public class MessageSender {

public void sendMessage() throws NamingException, JMSException {
final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
Context ctx = getInitialContext();

ConnectionFactory connFactory = (ConnectionFactory)ctx.lookup(CONNECTION_FACTORY_JNDI);
Destination dest = (Destination)ctx.lookup("MessageTopic");

Connection conn = connFactory.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

MessageProducer sender = session.createProducer(dest);
sender.setDeliveryMode(DeliveryMode.PERSISTENT);
sender.setTimeToLive(20000);

TextMessage msg = session.createTextMessage();

msg.setText("Hello");
sender.send(msg);

msg.setText("Welcome to JMS");
sender.send(msg);

session.close();
conn.close();
}

private Context getInitialContext() {
// 参看(4)
}

public static void main(String[] args) throws Exception {
MessageSender sender = new MessageSender();
sender.sendMessage();
}
}


#2. 编写Pub-Sub消息的同步接收者(SyncConsumer.java)

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties;

public class SyncConsumer {

public void receiveMessage() throws JMSException, NamingException {
//定义WebLogic默认连接工厂的JNDI
final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
//获取JNDI服务所需的Context
Context ctx = getInitialContext();
//通过JNDI查找获取连接工厂
ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
//通过JNDI查找获取消息目的
Destination dest = (Destination) ctx.lookup("MessageTopic");
//连接工厂创建连接
Connection conn = connFactory.createConnection();
//启动JMS连接,让它开始传输JMS消息
conn.start();
//JMS连接创建JMS会话
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//JMS会话创建消息消费者
MessageConsumer receiver = session.createConsumer(dest);
//同步接收消息,如果没有接收到消息,该方法会阻塞线程
TextMessage msg = (TextMessage) receiver.receive();
System.out.println(msg);
System.out.println("同步接收到的消息:" + msg.getText());
//关闭资源
session.close();
conn.close();
}

//工具方法,用来获取命名服务的Context对象
private Context getInitialContext() {
// 参看(4)
}

public static void main(String[] args) throws Exception {
SyncConsumer consumer = new SyncConsumer();
consumer.receiveMessage();
}
}


#3. 编写Pub-Sub消息的异步接收者(AsyncConsumer.java)

package lee;

import javax.jms.*;
import javax.naming.*;
import java.util.Properties;

//JMS异步消费者就是一个监听器,故实现MessageListener接口
public class AsyncConsumer implements MessageListener {

public AsyncConsumer() throws NamingException, JMSException, InterruptedException {
//定义WebLogic默认连接工厂的JNDI
final String CONNECTION_FACTORY_JNDI = "weblogic.jms.ConnectionFactory";
//获取JNDI服务所需的Context
Context ctx = getInitialContext();
//通过JNDI查找获取连接工厂
ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI);
//通过JNDI查找获取消息目的
Destination dest = (Destination) ctx.lookup("MessageTopic");
//连接工厂创建连接
Connection conn = connFactory.createConnection();
//启动JMS连接,让它开始传输JMS消息
conn.start();
//JMS连接创建JMS会话
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//JMS会话创建消息消费者
MessageConsumer receiver = session.createConsumer(dest);
//为JMS消息消费者绑定消息监听器
receiver.setMessageListener(this);
//程序暂停20s,在此期间内以异步方式接收消息
Thread.sleep(20000);
//关闭资源
session.close();
conn.close();
}

//实现消息监听器必须实现的方法。
public void onMessage(Message m) {
TextMessage msg = (TextMessage) m;
System.out.println(msg);
try {
System.out.println("异步接收的消息:" + msg.getText());
} catch (JMSException ex) {
ex.printStackTrace();
}
}

//工具方法,用来获取命名服务的Context对象
private Context getInitialContext() {
// 参看(4)
}

public static void main(String[] args) throws Exception {
AsyncConsumer consumer = new AsyncConsumer();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: