您的位置:首页 > 其它

jms 消息选择器selector实例

2017-03-24 11:40 357 查看
一、前言

     在掌握了消息的结构之后,我们接下来看一下JMS的一个重要功能:选择器。有些时候,作为消费者只希望处理自己感兴趣的消息。如果某个消息只有一个消费者,我们可以在让该客户端根据规则来处理自己感兴趣的消息,那些不满足某些规则的就直接替丢弃掉。

     但如果消息是广播的机制,那么让每个客户端都去做这样的处理,就加大了客户端的工作量,一种更好的方式,就是由消息提供者来完成消息的过滤和路由工作,这样就能大减轻客户端的工作量,消费者在真正处理的时候,完全不用关注对消息的过滤,可以只负责对消息的处理。也就是说,JMS的消息提供者可以保证其收到的消息都是其想要的消息。

     JMS中,这种机制是通过selector来实现的,本文则详细介绍一下selector.

二、实现方式

     在JMS里,selector并不是一个对象,而是一个字符串,更确切的说,是一个条件表达式,这个表达式并不仅限于处理逻辑表达式,而是一个跟SQL92语法类似的表达式。表达式中的变量必须为标准的消息头信息的key或者是某个属性的key,如果表达式计算结果为true,则表示该消息满足条件,接收端需要处理,如果为false,则表达接收端不需要处理。

  该表达式在创建一个消费者的时候进行指定,示例如下:

  MessageConsumer receiver = session.createConsumer(myDest, "articleType ='joke'");  

  则该receiver只会收到消息中有articleType属性,且其值为'joke'的消息。假设有两条消息,如下:

  message1.setStringProperty("articleType", "joke");
 

  message2.setStringProperty("articleType", "pic");
 

  很明显只有message1会被处理。

  

三、表达式定义

 

  可以看到其实设置一个selector还是比较简单的,如果对于SQL语法比较熟悉的话,那么对于表达式的理解和编写也会比较简单。一些重要的规则如下:

  1. 表达式的操作符不区分大小写,但通常约定为大写

  2. 表达式的变量名跟普通java变量名的命名规则一致,区分大小写,如articleType和articletype是两个不同的变量。

  3. 表达式的变量名必须是标准的头header名或者已存在的属性名,否则对应的值会为NULL

  4. 表达式的值不会进行类型转化,即假设有message2.setStringProperty("version", "1");
那么表达式"version=1" 对于message2来说,结果是false.

  5. 支持基本的算术操作,逻辑操作,括号,取反等运算符

  6. 支持BETWEEN .. AND ... , IN (NOT IN)等操作

  7. 支持LIKE操作进行模糊匹配,%表示匹配多个任意字符,_表示匹配一个任意字符

  8. 支持IS NULL及IS NOT NULL

  9. 其它未提到的SQL 92 标准,可以认为能在WHERE语句后使用的表达式,在这儿都可以使用。

 

四、小结

  通过message的头信息和属性,结合selector,就能很方便的进行消息过滤。

 实例:

生产者类:

package cn.base.jms;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* @author gu.fei
* @version 2017-03-24 8:36
*/
public class TopicProducer {

public static final String url = "tcp://localhost:61616";
public static final String topicName = "test_topic";
public static final boolean transacted = false;

public static void main(String[] args){
Connection connection = null;
Session session = null;

try{
// create the connection
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();

// create the session
session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topicName);

MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
String selectorStr = ((i+1) % 2 == 0) ? "myselector":"yourselector";
Message message = session.createTextMessage("hello zhang" + i);
message.setStringProperty("selector", selectorStr);
producer.send(message);
}
}catch (Exception e){
e.printStackTrace();
}finally{
try{
// close session and connection
if (session != null){
session.close();
}
if (connection != null){
connection.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
}

}消费者类:

package cn.base.jms;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicMessage {
public static final String url = "tcp://localhost:61616";
public static final String topicName = "test_topic";
public static final boolean transacted = false;
public static final boolean persistent = false;

public static void main(String[] args){
Connection connection = null;
Session session = null;

try{
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();

session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topicName);

MessageConsumer consumer1 = session.createConsumer(destination,"selector='myselector'");
consumer1.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage recvMessage = (TextMessage)message;
try{
System.out.println("Receive message by the 1st Consumer: " + recvMessage.getText());
}catch (JMSException e){
e.printStackTrace();
}
}
});

//这里的设置的语法类似于sql where条件语法
MessageConsumer consumer2 = session.createConsumer(destination,"selector='yourselector' or selector='myselector'");
consumer2.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage recvMessage = (TextMessage)message;
try{
System.out.println("Receive message by the 2nd Consumer: " + recvMessage.getText());
}catch (JMSException e){
e.printStackTrace();
}
}
});

Thread.sleep(5000000);

}catch (Exception e){
e.printStackTrace();
}finally{
try{
if (session != null){
session.close();
}
if (connection != null){
connection.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}

运行结果:

Receive message by the 2nd Consumer: hello zhang0

Receive message by the 1st Consumer: hello zhang1

Receive message by the 2nd Consumer: hello zhang1

Receive message by the 2nd Consumer: hello zhang2

Receive message by the 1st Consumer: hello zhang3

Receive message by the 2nd Consumer: hello zhang3

Receive message by the 2nd Consumer: hello zhang4

Receive message by the 1st Consumer: hello zhang5

Receive message by the 2nd Consumer: hello zhang5

Receive message by the 2nd Consumer: hello zhang6

Receive message by the 1st Consumer: hello zhang7

Receive message by the 2nd Consumer: hello zhang7

Receive message by the 2nd Consumer: hello zhang8

Receive message by the 1st Consumer: hello zhang9

Receive message by the 2nd Consumer: hello zhang9

通过调整 下面条件可以看到运行结果的不同 以便详细理解工作原理
selector='yourselector' or selector='myselector'


MessageConsumer consumer2 = session.createConsumer(destination,"selector='yourselector' or selector='myselector'");
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: