您的位置:首页 > 其它

JMS消息服务器(二)——点对点消息传送模型

2015-05-17 22:17 1011 查看

一、点对点模型概览

当你只需要将消息发布送给唯一的一个消息消费者是,就应该使用点对点模型。虽然可能或有多个消费者在队列中侦听统一消息,但是,只有一个且仅有一个消费者线程会接受到该消息

在p2p模型中,生产者称为发送者,而消费者则称为接受者。点对点模型最重要的特性如下:

消息通过称为队列的一个虚拟通道来进行交换。队列是生产者发送消息的目的地和接受者消费消息的消息源。

每条消息通仅会传送给一个接受者。可能会有多个接受者在一个队列中侦听,但是每个队列中的消息只能被队列中的一个接受者消费。

消息存在先后顺序。一个队列会按照消息服务器将消息放入队列中的顺序,把它们传送给消费者当消息已被消费时,就会从队列头部将它们删除。

生产者和消费者之间没有耦合。接受者和发送者可以在运行时动态添加,这使得系统的复杂性可以随着时间而增长或降低(这是消息传送系统的普遍特性)。

点对点消息传送模型有两种类型:异步即发即弃(fire-and-forget)处理异步请求/应答处理。使用即发即弃处理时,消息生产者向某个队列发送一条消息,而且它并不会期望接受到一个响应(至少不是立刻接收到响应)。这类处理可用于触发一个事件,或者用于向接受者发出请求来执行一个并不需要响应的特定活动。异步即发即弃处理如图4-1所示:



>

使用异步请求/应答处理时,消息生产者向队里发送一条消息,然后阻塞等待(blocking wait)应答队列,该应答队列正在等待来自接受者的响应。请求/应答处理实现了生产者和消费者之间的高度去耦,允许消息生产者和消费者组件采用不同的语言或平台。异步请求/应答处理如下图所示:



用于连接、创建、发送和接受的特定p2p接口见表:

公共API点对点模型API
ConnectionFactoryQueueConnectionFactory
DestinationQueue
ConnectionQueueConnection
SessionQueueSession
MessageConsumerQueueSender
MessageProducerQueueReceiver

1.1 何时使用点对点消息传送模型

JMS的初衷是要提供一种公共API的方法,用于访问现有的消息传送系统。在提出JMS规范概念的时候,一些消息传送系统厂商使用的是P2P模型,而另一些厂商使用的则是发布/订阅模型。

当你想让接受者对某个指定的消息进行一次而且仅仅一次处理时,就必须使用点对点模型。这可能是这两种模型之间的最重要的区别:点对点模型只会保证只有一个消费者来处理一条指定的消息。在消息要移除分别接受处理时,要在多个JMS客户端之间均衡消息处理的负载,这是极为重要的。点对点模型的另一优点就是,它所提供的QueueBrowser允许JMS客户端对队列进行快照(Snapshot),以查看正在等待被消费的消息。发布/订阅模型则没有这种浏览特性。

点对点消息传送模型的另一个用例是:您需要在组件之间进行同步通信,而那些组件却是用不同的编程语言编写的,或者是在不同的技术平台(如J2EE或.NET)上实现的。

使用点对点消息传送模型的另一个充分理由是:使用基于消息的负载均衡,可以让服务端的组件实现更大的吞吐量,特别是对于同构组件来说更是如此。

1.2 QBorrower和Qlender应用程序

为了说明点对点消息传送模型是如何工作的,我们将使用一个简单去耦的请求/应答用例。其中,QBorrower类使用点对点消息传送,向QLender类发出了一个简单的抵押贷款申请。QBorrower类使用LoanRequest队列,向QLender类发送贷款申请,而且根据特定的业务规则,QLender类使用LoanResponseQ队列向QBorrower类发回一个响应,表明该LoanRequest是被批准还是拒绝。由于QBorrower感兴趣的是要马上弄清楚贷款批准与否,一旦LoanRequest被发送出去,QBorrower类就会阻塞,并一直等待来自QLender类的响应,无响应就不再继续进行工作。

1.2.1 配置并运行应用程序

安装下载ActiveMQ并运行,如下图所示



配置发送接收队列:进入activeMQ的conf目录中,打开activemq.xml文件,在其中增加如下代码:

<destinations>
<queue name="LoanRequestQ" physicalName="jms.LoanRequestQ"/>
<queue name="LoanResponseQ" physicalName="jms.LoanResponseQ"/>
</destinations>


重新启动MQ,启动成功后打开浏览器输入网址:http://127.0.0.1:8161/,选择Queues可看到我们刚才加进来的队列,如下图



3.在程序中生成一个配置文件jndi.properties,内容如下:



4.我们再来看一张我们的程序运行示意图:



1.2.2 QBorrower类

QBorrowerl 类负责向包含工资额和贷款额的一个队列发送LoanRequest消息。这个类非常简单:构造函数建立一个到JMS提供者的连接,创建一个QueueSession,并使用JNDI查找获得请求和响应队列。

package cn.com.paner.jms.p2p;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.Buffer;
import java.util.StringTokenizer;
import java.util.UUID;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class QBorrower {

private QueueConnection qConnect = null;
private QueueSession qSession = null;
private Queue responseQ  =  null;
private Queue requestQ  = null;

public QBorrower(String ququecf,String requestQueue,
String responseQueue)
{

try {
//连接提供者并获取JMS连接
Context ctxContext = new InitialContext();
QueueConnectionFactory qFactory = (QueueConnectionFactory)
ctxContext.lookup(ququecf);
qConnect = qFactory.createQueueConnection();

//创建JMS会话
qSession = qConnect.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);

//查找请求队列和响应队列

requestQ = (Queue)ctxContext.lookup(requestQueue);
responseQ = (Queue)ctxContext.lookup(responseQueue);

//现在完成创建,启动连接
qConnect.start();

} catch (NamingException | JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

private void sendLoanRequest(double salary,double loanAmt){

try {
//创建JMS消息‘
MapMessage msg  = qSession.createMapMessage();
msg.setDouble("Salary", salary);
msg.setDouble("LoanAmount", loanAmt);
msg.setJMSReplyTo(responseQ);
UUID uuid = UUID.randomUUID();
msg.setStringProperty("UUID", uuid.toString());

//创建发送者并发送消息
QueueSender qSender = qSession.createSender(requestQ);
qSender.send(msg);

//等待查看贷款申请被接受或拒绝
//String filter  ="JMSCorrelationID = '"+ msg.getJMSMessageID()+"'";
String filter  ="JMSCorrelationID='"+ uuid.toString()+"'";
System.out.println(filter);
//String slecector = "CustomerType = 'GOLD' OR JMSPriority BETWEEN 5 AND 9";
QueueReceiver qReceiver = qSession.createReceiver(responseQ,filter);
TextMessage tmsg = (TextMessage) qReceiver.receive(3000);
System.out.println(tmsg);
if (tmsg == null) {
System.out.println("QLender not responding.");
}else {
System.out.println("Loan request was "+tmsg.getText());
}

} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

private void exit()
{
try {
qConnect.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.exit(0);
}

public static void main(String[] args) {
// TODO Auto-generated method stub

String queuecf = null;
String requestq = null;
String responseq = null;

if (args.length == 3) {
queuecf = args[0];
requestq = args[1];
responseq = args[2];
}else {
System.out.println("Usage:queueFactoy requestQueue responseQueue.");
System.exit(0);
}

QBorrower borrower = new QBorrower(queuecf, requestq, responseq);
try {

BufferedReader stdin = new BufferedReader
(new InputStreamReader(System.in));
System.out.println("QBorrower Application Started");
System.out.println("Press enter to quit application");
System.out.println("Enter : Salary,Loan_Amount");
System.out.println("\neg,g, 5000 , 12000");

while (true) {

System.out.println(">");
String loanRequest = stdin.readLine();
if (loanRequest == null ||
loanRequest.trim().length() <=0) {
borrower.exit();
}

//解析交易说明】
StringTokenizer st = new StringTokenizer(loanRequest, ",");
double salary = Double.valueOf(st.nextToken().trim()).doubleValue();
double loanAmt = Double.valueOf(st.nextToken().trim()).doubleValue();

borrower.sendLoanRequest(salary, loanAmt);

}
} catch (Exception e) {
// TODO: handle exception
}

}

}


QBorrower类的面方法从命令行接收3个参数:队列连接工厂的JNDI名称、贷款申请队列的JNDI名称,最后是贷款响应队列的JNDI名称,这个响应队列将接收来自QLender类的响应。

java -jar QBorrowe.jar QueueCF LoanRequestQ LoanResponseQ


java -jar  QLender.jar QueueCF  LoanResponseQ


[b]JMS初始化[/b]

在QBorrower类中,所有的JMS初始化逻辑都在构造函数中处理。构造函数要做到第一件事就是:通过创建一个InitalContext,建立一个到JMS提供者的连接:

//连接提供者并获取JMS连接
Context ctxContext = new InitialContext();
QueueConnectionFactory qFactory = (QueueConnectionFactory)
ctxContext.lookup(ququecf);
qConnect = qFactory.createQueueConnection();


当创建QueueConnection时,该连接最初是处于停止模式的。这就意味着虽然你可以将消息发送给队列,但是没有消息消费者能够从这个连接接受到消息,直到它被启动为止。

QueueConnection对象用于创建一个JMS Session对象,该对象时JMS中的一个工作线程和事务性工作单元。

通常来说,应用程序会在应用程序启动时创建一个单独的JMS connection,并维护一个Session对象池,供生产或消费者使用。

QueuSession对象通过QueueConnection对象上的工厂对象来创建。关闭Connection很重要,关闭Connection对象也将关闭所有打开的、和该连接有关的Session对象。创建QueueSession的语句如下:

//创建JMS会话

qSession = qConnect.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);


请注意:createQueueSession方法使用两个参数,第一个参数表示QueueSession是否为事务性的。true表示是事务性的,这意味着,在QueueSession预期试用期内发送到队列的消息,将不会传送给接受者,直到QueueSession上调用commit方法为止。同样,在QueueSession上调用rollback方法,会删除事务性回话期间发送的所有消息。第二个参数表示确认模式。

最后一行代码启动连接,自此允许在该连接上接受消息。通常来说,在启动该连接之前执行所有的初始化逻辑,是一个明智之举。

[b]发送消息和接受消息[/b]

JMS消息时通过和消息类型相匹配的一个工厂方法,是从Session对象中创建的。使用new来时实例化一个新的JMS消息将不会奏效;他必须从Session对象中创建。在创建并加载消息对象之后,我们还为响应队列设置了JMSReplyTo消息头属性,这会进一步解决生产者和消费者之间的耦合。使用请求/应答模型时,在消息生产者中设置JMSReplyTo消息头属性,而不是在消息消费者中指定应答队列,这是一种同行的标准做法。

//创建JMS消息‘
MapMessage msg  = qSession.createMapMessage();
msg.setDouble("Salary", salary);
msg.setDouble("LoanAmount", loanAmt);
msg.setJMSReplyTo(responseQ);
UUID uuid = UUID.randomUUID();
msg.setStringProperty("UUID", uuid.toString());


在创建消息之后,接下来我们将创建QueuSender对象,指定希望发送消息的队列,然后在使用send方法消息;

//创建发送者并发送消息
QueueSender qSender = qSession .createSender(requestQ);
qSender.send(msg );


在QueueSender对象中,有若干种可用的重写send方法。

一旦消息已被发送出去,QBorrower类就会被阻塞,并等待QLender关于贷款被批准或拒绝的响应。这个过程的第一步就是去创建一个消息选择器,以便我们能够将响应消息和发送的消息关联起来。这是很有必要的,因为申请贷款时,可能同时还有许多其他的贷款申请正被发送到贷款申请队列,或者从中出发。为了确保能够得到准确的响应消息,我们使用一种消息关联的技术。

在创建QueueReceiver时,我们会指定过滤器,表明只有在JMSCorrelationID和原始的JMSMessageID相等时才会接受消息。由于有QueueReceiver,我们能够调用receive方法进行阻塞等待,直到响应消息被接受为止。

//等待查看贷款申请被接受或拒绝
//String filter  ="JMSCorrelationID = '"+ msg.getJMSMessageID()+"'";
String filter  ="JMSCorrelationID='"+ uuid.toString()+"'";
System.out.println(filter);
//String slecector = "CustomerType = 'GOLD' OR JMSPriority BETWEEN 5 AND 9";
QueueReceiver qReceiver = qSession.createReceiver(responseQ,filter);
TextMessage tmsg = (TextMessage) qReceiver.receive(3000);
System.out.println(tmsg);
if (tmsg == null) {
System.out.println("QLender not responding.");
}else {
System.out.println("Loan request was "+tmsg.getText());
}


始终未receive方法指定一个合理的延时值,这肯定是一个明智之举;否则,它将在那里一直等待。

1.2.3 QLender

QLender类的作用是去侦听贷款申请队列上的贷款申请,判断工资是否满足必要的商业要求,并最终将结果发回给借方。

package cn.com.paner.jms.p2p;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;

public class QLender implements MessageListener {

private QueueConnection qConnect = null;
private QueueSession qSession = null;
private Queue  requestQ = null;

public QLender(String Queuecf,String requetQueue){

try {
//连接到提供者并获得JMS连接
Context ctxContext = new InitialContext();
QueueConnectionFactory qFactory = (QueueConnectionFactory) ctxContext.lookup(Queuecf);
qConnect = qFactory.createQueueConnection();

//创建JMS会话
qSession = qConnect.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

//查找申请队列
requestQ = (Queue) ctxContext.lookup(requetQueue);

//启动连接
qConnect.start();

//创建消息侦听器
QueueReceiver qReceiver = qSession.createReceiver(requestQ);
qReceiver.setMessageListener(this);

System.out.println("Waitting for loan request ...");

} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
System.exit(0);
}
}

@Override
public void onMessage(Message arg0) {
// TODO Auto-generated method stub

try {
boolean accepted = false;

if (arg0 instanceof MapMessage) {
System.out.println("message type is legal.");
}

//从消息中获取数据
MapMessage msg = (MapMessage)arg0;
double salary = msg.getDouble("Salary");
double loanAmt = msg.getDouble("LoanAmount");

//决定是否接受或拒绝贷款申请
if (loanAmt < 200000) {
accepted = (salary / loanAmt) > 0.25;
}else {
accepted = (salary / loanAmt) > 0.33;
}

System.out.println("% = "+(salary / loanAmt)+"loan is ?"
+(accepted? "Accepted" : "Declined"));

//将结果返回
TextMessage tmsg = qSession.createTextMessage();
tmsg.setText(accepted? "Accepted" : "Declined");
//tmsg.setJMSCorrelationID(arg0.getJMSMessageID());
//  System.out.println("JMSCorrelationID = "+arg0.getStringProperty("UUID"));
tmsg.setJMSCorrelationID(arg0.getStringProperty("UUID"));

//创建发送者并发送消息
QueueSender qSender =
qSession.createSender((Queue)arg0.getJMSReplyTo());
qSender.setTimeToLive(30*60*1000);
qSender.send(tmsg);

System.out.println("\nWaiting for loan requests...");

} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
System.exit(0);
}

}

private void exit()
{
try {
qConnect.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.exit(0);
}

public static void main(String argv[])
{
String queuecf  = null;
String requestq = null;

if (argv.length == 2) {
queuecf = argv[0];
requestq = argv[1];
}else {
System.out.println("Invalid arguments,Should be:");
System.out.println("java OLender factory request_queue");
System.exit(0);;
}

QLender lender = new QLender(queuecf, requestq);
try {
//
BufferedReader stdin = new BufferedReader(
new InputStreamReader(System.in));

System.out.println("OLender application stared");
System.out.println("Press enter to quit application.");
stdin.readLine();
lender.exit();

} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}

}


QLender类之所以称为一个异步消息侦听器,意味着它和前面的QBorrower类不同,他在等待消息时,不会阻塞。从QLender类实现MessageListener接口和重写OnMessage方法的事实来看,这一点是显而易见的。

一旦启动连接,QLender类就可以开始接受消息。不过在它能够接受消息之前,必须由QueueReceiver注册一个消息侦听器:

//创建消息侦听器
QueueReceiver qReceiver = qSession.createReceiver(requestQ);
qReceiver.setMessageListener(this);


至此,已经启动了一个单独的侦听线程。该线程将一直等待,直到接受到一个消息为止,而且一旦它接受到一条消息,就会调用侦听器类的onMessage方法。我们可以很容易地将消息传送工作委托给实现了MessageListener接口的另一个类:

qReceiver.setMessageListener(otherclass);


在createReceiver方法指定的队列中接受一条消息时,侦听器线程将异步调用侦听器类的OnMessage方法。OnMessage方法首先将消息造型成一个MapMessage。

为了使它更安全,最好是在另一种类型的消息正被发送到该队列的情况下,再使用关键字instanceof检查一下JMS的消息类型:

if (arg0 instanceof MapMessage) {
System. out.println("message type is legal." );
}


一旦贷款申请已被分析并作出决定,QLenderl类就须向借方发回响应。为了完成这个工作,它首先创建一条要发送的JMS消息。响应消息无须和QLender接收的贷款申请消息时相同类型的JMS消息。

1.3 动态队列对受管队列

动态队列是通过使用厂商特定API的应用程序源代码创建的队列。受管队列则是在JMS提供者配置文件或管理工具中定义的队列。

动态队列的生成和配置往往取决于特定的厂商。一个队列可以由一个消费者专用,也可以被多个消费者共享。根据内存共享和溢出到磁盘选项的不同,它可能还会有容量大小的限制。

JMS不会试图为一个队列的所有可能选项定义一组API,而是用厂商特定 的管理方式来管理地设置这些选项,这样应该是可能的。为了运行时管理队列,大多数厂商都会提供命令行管理工具、图形界面管理工具或API。

JMS提供了一个QueueSession.createQueue(string queueName)方法,打这个方法并不是要在消息传送系统中定义一个新的队列。它的设计目的使用用于返回代表某个现有队列的Queue对象。另外还有一个JMS定义的方法,即QueueSession.createTemporaryQueue()方法,JMS客户端可以使用这个方法创建一个临时队里,而这个临时队里也只能由该JMS客户单消费。

如果你有很多队列,它们的数量还可能会随时间而增多,那么创建动态队列就大有用处。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: