您的位置:首页 > 其它

JMS基本概念

2016-08-10 00:00 204 查看
摘要: JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,即:Java消息服务。

基础信息

JMS的模型

P2P(Point-to-Point)点对点

消息生产者(发送者)与消息消费者(接收者)在没有时间上的依赖。生产消息将发送到queue中(及算完成),消息消费者从queue中取出并且消费消息。

每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)。

Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费

消费者在成功接收消息之后需向队列应答成功

Pub/Sub(Publish/Subscribe)发布订阅模型

PUB/SUB方式的工作流程,首先subscriber(订阅者)向JMS容器订阅(Listen to)自己感兴趣的topic(主题),多个订阅者可以同时对一个主题进行订阅,消息发布者发布一条消息,所有订阅了该主题的订阅者都能收到这个消息。默认情况下,pub/sub方式下的消息不是持久的,这意味着,消息一经发出,不管有没有人接收,都不会保存下来,而且订阅者只能接收到自已订阅之后发布者发出的消息。这种方式有点像订阅报刊杂志,一种报刊可以有多人同时订阅,但订阅者只能收到开始订阅之后的报社发行的期刊。

消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息

发布到topic的消息会被所有订阅者消费。

发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,消费者必须保持激活(运行)状态,才能消费生产者的消息(激活时间内的消息)。

为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。即使订阅者没有被激活(运行),它也能接收订阅之前生产者生产的消息。

持久订阅和非持久订阅

非持久订阅:只有当客户端处于激活状态,也就是和JMS Provider 保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。

持久订阅时:客户端向JMS 服务器注册一个自己身份的ID,当这个客户端处于离线时,JMS Provider 会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS Provider时,会根据自己的ID得到所有当自己处于离线时发送到主题的消息。

编程模式

3ff0

连接工厂(Connection Factories)

连接工厂是用来创建客户端到JMS容器之间JMS连接的工厂,连接工厂有两种:(QueueConnectionFactory和TopicConnectionFactory),分别用来创建QueueConnection 和 TopicConnection的。
Context ctx = new InitialContext();
QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");
TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx.lookup("TopicConnectionFactory");


目的地(Destinations)

目的地是消息生产者(producer)消息发住的目的地,也是消费者(consumer)接收消息的来源地,它有点像信箱,邮递员把信件投往信箱,收件人从信箱取信件。对P2P方式来说,目的地就是Queue,对pub/sub方式来说,目的地就是Topic。我们要得到这个目的地的引用,只能通过JNDI查找(lookup)的方式得到,因为目的地是注册在JMS服务器的(后面的章节会讲到如何注册一个目的地)
Topic myTopic = (Topic) ctx.lookup("MyTopic");
Queue myQueue = (Queue) ctx.lookup("MyQueue");


连接(Connection)

连接是指客户端与JMS提供者(容器)之间的连接。有两种:QueueConnection (P2P连接)和TopicConnection (Pub/Sub连接)。注意:连接用完之后必须记得关闭,否则连接资源不会被释放掉。关闭连接的同时会自动把会话、产生者、消费者都关闭掉。
QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
TopicConnection topicConnection = topicConnectionFactory.createTopicConnection();


会话(Session)

会话是用来创建消息产生者和消息消费者的单线程环境,你可以它来创建消息生产者、消费者、消息,用它来维持消息监听。
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSession queueSession = queueConnection.createQueueSession(true, 0);


消息生产者(Message Producers)

消息生产者(消息的发送者),
QueueSender(
P2P方式
)或者TopicPublisher(
Pub/Sub方式
)
是由session创建的,用来把把消息发送到目的地的对象。
QueueSender queueSender = queueSession.createSender(myQueue);
TopicPublisher topicPublisher = topicSession.createPublisher(myTopic);

/*发送消息*/
queueSender.send(message);
topicPublisher.publish(message);


消息消费者(Message Consumer)

消息消费者(消息的接收者),QueueReceiverP2P方式TopicSubscriber(Pub/Sub方式)是由session来创建的,用来接收来自目的地消息的对象。JMS容器来负责把消息从目的地投递到注册了该目的地的消息消费者。
QueueReceiver queueReceiver = queueSession.createReceiver(myQueue);
TopicSubscriber topicSubscriber = topicSession.createSubscriber(myTopic);
一旦创建好消息消费者,它就是活动的,你可以用它来接收消息,你也可以用close()方法来使它失效(Inactive)。当你调用Connection的start()方法之前,消费者是不会接收到任何消息的。两种接收者都有一个receive方法,这是一个同步的方法,也就是说程序执行到这个方法会被阻塞,直到收到消息为止。
queueConnection.start();
Message m = queueReceiver.receive();

/*订阅模式*/
topicConnection.start();
Message m = topicSubscriber.receive(1000); // time out after a second


消息监听器(Message Listener)

消息监听器是一个充当消息的异步事件处理器的对象,它实现了MessageListener接口,这个接口只有一个方法onMessage,在这个方法里,你可以定义当接收到消息之后的要做的操作。你可以用setMessageListener方法为消息消费者注册一个监听器。温馨提示:启动Connection 应setMessageListener 。
MessageListener listener = new MessageListener( {
/*当有消息的时候自动调用*/
public void onMessage(Message msg) {
//业务逻辑
}

});
topicSubscriber.setMessageListener(listener); // 注册监听
topicConnection.start();


消息选择器(Message Selectors)

假如你只需要一个对滤器来过滤收到的消息,那么你可以使用消息选择器,它允许消费者指定只对特定的消息感兴趣。消息选择器只能是工作在JMS容器的,而不是我们的应用程序上。消息选择器是一个包含一个表达式的字符串,这个表达式的语法类似SQL的条件表达式,在createReceiver, createSubscriber这些方法里有一个参数让你指定一个消息选择器,由这些方法创建的消费者就只能收到与消息选择器匹配的消息了。

消息(Messages)

JMS消息包括三个部分:消息头(Header),属性(Properties),消息体(Body)

消息头里你可以指定JMSMessageID, JMSCorrelationID, JMSReplyTo, JMSType等信息。

属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。

消息体是消息的内容:

StreamMessage:Java 数据流消息,用标准流操作来顺序的填充和读取。

MapMessage:一个Map类型的消息;名称为 string 类型,而值为 Java 的基本类型。

TextMessage:普通字符串消息,包含一个String。

ObjectMessage:对象消息,包含一个可序列化的Java 对象

BytesMessage:二进制数组消息,包含一个byte[]。

XMLMessage: 一个XML类型的消息。

TextMessage message = queueSession.createTextMessage();
message.setText(msg_text);

在消费者端,接收到的总是一个通用的Message对象,你需要把它转型成特定的类型才能提取出里面的内容。

Message m = queueReceiver.receive();
if (m instanceof TextMessage) {
TextMessage message = (TextMessage) m;
System.out.println("Reading message: " + message.getText());
} else {
// Handle error
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  JMS