您的位置:首页 > 其它

JMS消息发送和接收实例 - 点对点模式

2012-10-18 09:22 369 查看
发送消息

不管是将消息发送到队列还是发布到主题,编程的步骤是相同的,差别在于使用不同的JMS对象。具体定义见表:



发送消息的过程大体分为以下几步;

1、获得一个Weblogic Server上下文的引用;

2、创建连接工厂;

3、使用连接工厂创建一个连接;

4、使用连接创建一个会话;

5、获取一个目的;

6、使用会话和目的创建消息的生产者;

7、创建消息对象;

8、使用连接创建一个需要发送的消息类型的实例;

9、使用连接的一个队列发送器或主题公布器,然后使用发送器或公布器发送消息。

在敲代码之前要先导入需要的JAR包,并且配置JMS服务器。



注意:

wlfullclient.jar生成方式是,进入weblogic的安装目录例如C:\Oracle\Middleware\wlserver_10.3\server\lib,运行 java -jar wljarbuilder.jar就能生成wlfullclient.jar文件 

JMS服务器配置请参考《学习在Weblogic服务器中配置消息服务器图解  

发送消息代码:
package com.xu.testDemo;

import java.util.Properties;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class MsgQueueSender {

/**
* @功能:JMS中实现点对点消息服务--发送消息
* @作者:
* @日期:2012-10-17
*/

private QueueSender sender;
private TextMessage msg;

public MsgQueueSender(String[] argv) throws NamingException, JMSException {
/* 初始化上下文对象 */
String url = "t3://localhost:7001";
Properties p = new Properties();
p.put(Context.INITIAL_CONTEXT_FACTORY,
"weblogic.jndi.WLInitialContextFactory");
p.put(Context.PROVIDER_URL, url);
Context ctx = new InitialContext(p);

/* 创建一个连接工厂 */
QueueConnectionFactory qConFactory = (QueueConnectionFactory) ctx
.lookup("weblogic.jms.ConnectionFactory");

/* 创建一个队列 */
Queue messageQueue = (Queue) ctx.lookup("jms/MyMDB");

/* 创建连接 */
QueueConnection qCon = qConFactory.createQueueConnection();

/* 创建一个会话 */
QueueSession session = qCon.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);

/* 创建一个发送者 */
sender = session.createSender(messageQueue);

/* 创建一个消息 */
msg = session.createTextMessage();

}

public void runClient(String str) throws JMSException {
/* 设置消息,并发送 */
msg.setText("Hello");
sender.send(msg);
msg.setText("Welcome to JMS");
sender.send(msg);
msg.setText(str);
sender.send(msg);
}

public static void main(String[] args) throws Exception {
try {
MsgQueueSender mqs = new MsgQueueSender(args);
mqs.runClient("aaa");

} catch (NamingException e) {
System.err.println("");
System.err.println("**请确保已经正确地设置JMS服务器。在运行之前必须配置JMS服务器和正确的JMS目的。");
System.err.println("");
throw e;

}
}

}


接收消息分为同步与异步接收。

同步接收代码:

package com.xu.testDemo;

import java.util.Properties;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class SyncMesConsumer {

/**
* @功能:同步接收消息实例
* @作者:
* @日期:2012-10-17
*/

private QueueReceiver receiver;
private TextMessage msg;

public SyncMesConsumer() throws NamingException, JMSException {
/* 初始化上下文对象 */
String url = "t3://localhost:7001";
Properties p = new Properties();
p.put(Context.INITIAL_CONTEXT_FACTORY,
"weblogic.jndi.WLInitialContextFactory");
p.put(Context.PROVIDER_URL, url);
Context ctx = new InitialContext(p);

/* 创建一个连接工厂 */
QueueConnectionFactory qConFactory = (QueueConnectionFactory) ctx
.lookup("weblogic.jms.ConnectionFactory");
/* 创建一个队列 */
Queue messageQueue = (Queue) ctx.lookup("jms/MyMDB");
/* 创建一个连接 */
QueueConnection qCon = qConFactory.createQueueConnection();
/* 创建一个会话 */
QueueSession session = qCon.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);

/* 创建消息接收者 */
receiver = session.createReceiver(messageQueue);
/* 在调用此方法之前,消息传递被禁止 */
qCon.start();

}

public void runClient() throws JMSException {
msg = (TextMessage) receiver.receive();
System.err.println("Reciverd:" + msg.getText());
msg = (TextMessage) receiver.receive();
System.err.println("Reciverd:" + msg.getText());
msg = (TextMessage) receiver.receive();
System.err.println("Reciverd:" + msg.getText());
}

public static void main(String[] args) throws Exception {
SyncMesConsumer consumer = new SyncMesConsumer();
consumer.runClient();

}

}


代码运行结果:



异步接收代码:

package com.xu.testDemo;

import java.util.Properties;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class AsynMesConsumer implements MessageListener {

/**
* @功能:异步接收消息
* @作者:
* @日期:2012-10-18
*/

private int EXPECTED_MESSAGE_COUNT = 2;
private int messageCount = 0;
private QueueReceiver receiver;
private TextMessage msg;

public AsynMesConsumer() throws NamingException, JMSException
{
/*初始化上下文对象*/
String url = "t3://localhost:7001";
Properties p = new Properties();
p.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
p.put(Context.PROVIDER_URL, url);
Context ctx = new InitialContext(p);

/*创建连接工厂*/
QueueConnectionFactory qConFactory = (QueueConnectionFactory)
ctx.lookup("weblogic.jms.ConnectionFactory");
/*创建队列*/
Queue messageQueue = (Queue) ctx.lookup("jms/MyMDB");
/*创建连接*/
QueueConnection qCon = qConFactory.createQueueConnection();
/*创建一个会话*/
QueueSession session = qCon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
/*创建一个接收者*/
receiver = session.createReceiver(messageQueue);
/*设置一个消息监听*/
receiver.setMessageListener(this);
qCon.start();

}
@Override
public void onMessage(Message m) {

try {
msg = (TextMessage) m;
System.out.println("Receiver:"+msg.getText());

} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
messageCount ++;
}
public boolean expectMoreMessage()
{
return messageCount < EXPECTED_MESSAGE_COUNT;
}
public static void main(String[] args) throws Exception {
int MAX_TRIES = 10;
int tryCount = 0;
AsynMesConsumer consumer = new AsynMesConsumer();
while(consumer.expectMoreMessage() && (tryCount < MAX_TRIES))
{
try{
Thread.sleep(1000);
}catch(InterruptedException e)
{
e.printStackTrace();
}
tryCount ++;
}

}
}

运行效果:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: