jms 消息选择器selector实例
2017-03-24 11:40
357 查看
一、前言
在掌握了消息的结构之后,我们接下来看一下JMS的一个重要功能:选择器。有些时候,作为消费者只希望处理自己感兴趣的消息。如果某个消息只有一个消费者,我们可以在让该客户端根据规则来处理自己感兴趣的消息,那些不满足某些规则的就直接替丢弃掉。
但如果消息是广播的机制,那么让每个客户端都去做这样的处理,就加大了客户端的工作量,一种更好的方式,就是由消息提供者来完成消息的过滤和路由工作,这样就能大减轻客户端的工作量,消费者在真正处理的时候,完全不用关注对消息的过滤,可以只负责对消息的处理。也就是说,JMS的消息提供者可以保证其收到的消息都是其想要的消息。
JMS中,这种机制是通过selector来实现的,本文则详细介绍一下selector.
二、实现方式
在JMS里,selector并不是一个对象,而是一个字符串,更确切的说,是一个条件表达式,这个表达式并不仅限于处理逻辑表达式,而是一个跟SQL92语法类似的表达式。表达式中的变量必须为标准的消息头信息的key或者是某个属性的key,如果表达式计算结果为true,则表示该消息满足条件,接收端需要处理,如果为false,则表达接收端不需要处理。
该表达式在创建一个消费者的时候进行指定,示例如下:
MessageConsumer receiver = session.createConsumer(myDest, "articleType ='joke'");
则该receiver只会收到消息中有articleType属性,且其值为'joke'的消息。假设有两条消息,如下:
message1.setStringProperty("articleType", "joke");
message2.setStringProperty("articleType", "pic");
很明显只有message1会被处理。
三、表达式定义
可以看到其实设置一个selector还是比较简单的,如果对于SQL语法比较熟悉的话,那么对于表达式的理解和编写也会比较简单。一些重要的规则如下:
1. 表达式的操作符不区分大小写,但通常约定为大写
2. 表达式的变量名跟普通java变量名的命名规则一致,区分大小写,如articleType和articletype是两个不同的变量。
3. 表达式的变量名必须是标准的头header名或者已存在的属性名,否则对应的值会为NULL
4. 表达式的值不会进行类型转化,即假设有message2.setStringProperty("version", "1");
那么表达式"version=1" 对于message2来说,结果是false.
5. 支持基本的算术操作,逻辑操作,括号,取反等运算符
6. 支持BETWEEN .. AND ... , IN (NOT IN)等操作
7. 支持LIKE操作进行模糊匹配,%表示匹配多个任意字符,_表示匹配一个任意字符
8. 支持IS NULL及IS NOT NULL
9. 其它未提到的SQL 92 标准,可以认为能在WHERE语句后使用的表达式,在这儿都可以使用。
四、小结
通过message的头信息和属性,结合selector,就能很方便的进行消息过滤。
实例:
生产者类:
package cn.base.jms;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author gu.fei
* @version 2017-03-24 8:36
*/
public class TopicProducer {
public static final String url = "tcp://localhost:61616";
public static final String topicName = "test_topic";
public static final boolean transacted = false;
public static void main(String[] args){
Connection connection = null;
Session session = null;
try{
// create the connection
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
// create the session
session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topicName);
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
String selectorStr = ((i+1) % 2 == 0) ? "myselector":"yourselector";
Message message = session.createTextMessage("hello zhang" + i);
message.setStringProperty("selector", selectorStr);
producer.send(message);
}
}catch (Exception e){
e.printStackTrace();
}finally{
try{
// close session and connection
if (session != null){
session.close();
}
if (connection != null){
connection.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}消费者类:
package cn.base.jms;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TopicMessage {
public static final String url = "tcp://localhost:61616";
public static final String topicName = "test_topic";
public static final boolean transacted = false;
public static final boolean persistent = false;
public static void main(String[] args){
Connection connection = null;
Session session = null;
try{
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topicName);
MessageConsumer consumer1 = session.createConsumer(destination,"selector='myselector'");
consumer1.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage recvMessage = (TextMessage)message;
try{
System.out.println("Receive message by the 1st Consumer: " + recvMessage.getText());
}catch (JMSException e){
e.printStackTrace();
}
}
});
//这里的设置的语法类似于sql where条件语法
MessageConsumer consumer2 = session.createConsumer(destination,"selector='yourselector' or selector='myselector'");
consumer2.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage recvMessage = (TextMessage)message;
try{
System.out.println("Receive message by the 2nd Consumer: " + recvMessage.getText());
}catch (JMSException e){
e.printStackTrace();
}
}
});
Thread.sleep(5000000);
}catch (Exception e){
e.printStackTrace();
}finally{
try{
if (session != null){
session.close();
}
if (connection != null){
connection.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}
运行结果:
Receive message by the 2nd Consumer: hello zhang0
Receive message by the 1st Consumer: hello zhang1
Receive message by the 2nd Consumer: hello zhang1
Receive message by the 2nd Consumer: hello zhang2
Receive message by the 1st Consumer: hello zhang3
Receive message by the 2nd Consumer: hello zhang3
Receive message by the 2nd Consumer: hello zhang4
Receive message by the 1st Consumer: hello zhang5
Receive message by the 2nd Consumer: hello zhang5
Receive message by the 2nd Consumer: hello zhang6
Receive message by the 1st Consumer: hello zhang7
Receive message by the 2nd Consumer: hello zhang7
Receive message by the 2nd Consumer: hello zhang8
Receive message by the 1st Consumer: hello zhang9
Receive message by the 2nd Consumer: hello zhang9
通过调整 下面条件可以看到运行结果的不同 以便详细理解工作原理
在掌握了消息的结构之后,我们接下来看一下JMS的一个重要功能:选择器。有些时候,作为消费者只希望处理自己感兴趣的消息。如果某个消息只有一个消费者,我们可以在让该客户端根据规则来处理自己感兴趣的消息,那些不满足某些规则的就直接替丢弃掉。
但如果消息是广播的机制,那么让每个客户端都去做这样的处理,就加大了客户端的工作量,一种更好的方式,就是由消息提供者来完成消息的过滤和路由工作,这样就能大减轻客户端的工作量,消费者在真正处理的时候,完全不用关注对消息的过滤,可以只负责对消息的处理。也就是说,JMS的消息提供者可以保证其收到的消息都是其想要的消息。
JMS中,这种机制是通过selector来实现的,本文则详细介绍一下selector.
二、实现方式
在JMS里,selector并不是一个对象,而是一个字符串,更确切的说,是一个条件表达式,这个表达式并不仅限于处理逻辑表达式,而是一个跟SQL92语法类似的表达式。表达式中的变量必须为标准的消息头信息的key或者是某个属性的key,如果表达式计算结果为true,则表示该消息满足条件,接收端需要处理,如果为false,则表达接收端不需要处理。
该表达式在创建一个消费者的时候进行指定,示例如下:
MessageConsumer receiver = session.createConsumer(myDest, "articleType ='joke'");
则该receiver只会收到消息中有articleType属性,且其值为'joke'的消息。假设有两条消息,如下:
message1.setStringProperty("articleType", "joke");
message2.setStringProperty("articleType", "pic");
很明显只有message1会被处理。
三、表达式定义
可以看到其实设置一个selector还是比较简单的,如果对于SQL语法比较熟悉的话,那么对于表达式的理解和编写也会比较简单。一些重要的规则如下:
1. 表达式的操作符不区分大小写,但通常约定为大写
2. 表达式的变量名跟普通java变量名的命名规则一致,区分大小写,如articleType和articletype是两个不同的变量。
3. 表达式的变量名必须是标准的头header名或者已存在的属性名,否则对应的值会为NULL
4. 表达式的值不会进行类型转化,即假设有message2.setStringProperty("version", "1");
那么表达式"version=1" 对于message2来说,结果是false.
5. 支持基本的算术操作,逻辑操作,括号,取反等运算符
6. 支持BETWEEN .. AND ... , IN (NOT IN)等操作
7. 支持LIKE操作进行模糊匹配,%表示匹配多个任意字符,_表示匹配一个任意字符
8. 支持IS NULL及IS NOT NULL
9. 其它未提到的SQL 92 标准,可以认为能在WHERE语句后使用的表达式,在这儿都可以使用。
四、小结
通过message的头信息和属性,结合selector,就能很方便的进行消息过滤。
实例:
生产者类:
package cn.base.jms;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author gu.fei
* @version 2017-03-24 8:36
*/
public class TopicProducer {
public static final String url = "tcp://localhost:61616";
public static final String topicName = "test_topic";
public static final boolean transacted = false;
public static void main(String[] args){
Connection connection = null;
Session session = null;
try{
// create the connection
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
// create the session
session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topicName);
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
String selectorStr = ((i+1) % 2 == 0) ? "myselector":"yourselector";
Message message = session.createTextMessage("hello zhang" + i);
message.setStringProperty("selector", selectorStr);
producer.send(message);
}
}catch (Exception e){
e.printStackTrace();
}finally{
try{
// close session and connection
if (session != null){
session.close();
}
if (connection != null){
connection.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}消费者类:
package cn.base.jms;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TopicMessage {
public static final String url = "tcp://localhost:61616";
public static final String topicName = "test_topic";
public static final boolean transacted = false;
public static final boolean persistent = false;
public static void main(String[] args){
Connection connection = null;
Session session = null;
try{
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topicName);
MessageConsumer consumer1 = session.createConsumer(destination,"selector='myselector'");
consumer1.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage recvMessage = (TextMessage)message;
try{
System.out.println("Receive message by the 1st Consumer: " + recvMessage.getText());
}catch (JMSException e){
e.printStackTrace();
}
}
});
//这里的设置的语法类似于sql where条件语法
MessageConsumer consumer2 = session.createConsumer(destination,"selector='yourselector' or selector='myselector'");
consumer2.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage recvMessage = (TextMessage)message;
try{
System.out.println("Receive message by the 2nd Consumer: " + recvMessage.getText());
}catch (JMSException e){
e.printStackTrace();
}
}
});
Thread.sleep(5000000);
}catch (Exception e){
e.printStackTrace();
}finally{
try{
if (session != null){
session.close();
}
if (connection != null){
connection.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}
运行结果:
Receive message by the 2nd Consumer: hello zhang0
Receive message by the 1st Consumer: hello zhang1
Receive message by the 2nd Consumer: hello zhang1
Receive message by the 2nd Consumer: hello zhang2
Receive message by the 1st Consumer: hello zhang3
Receive message by the 2nd Consumer: hello zhang3
Receive message by the 2nd Consumer: hello zhang4
Receive message by the 1st Consumer: hello zhang5
Receive message by the 2nd Consumer: hello zhang5
Receive message by the 2nd Consumer: hello zhang6
Receive message by the 1st Consumer: hello zhang7
Receive message by the 2nd Consumer: hello zhang7
Receive message by the 2nd Consumer: hello zhang8
Receive message by the 1st Consumer: hello zhang9
Receive message by the 2nd Consumer: hello zhang9
通过调整 下面条件可以看到运行结果的不同 以便详细理解工作原理
selector='yourselector' or selector='myselector'
MessageConsumer consumer2 = session.createConsumer(destination,"selector='yourselector' or selector='myselector'");
相关文章推荐
- JMS消息选择器selector
- JMS 监听消息选择器selector
- 深入浅出JMS(四)--ActiveMQ消息选择器Selector
- ActiveMQ编写JMS(Java消息服务)实例分享
- JMS消息监听和转发实例
- activemq 消息选择器Selector
- activemq 消息选择器Selector
- JMS消息选择器
- 利用jms消息选择器过滤消息
- Spring整合JMS(消息中间件)实例
- JMS消息发送和接收实例 - 点对点模式
- java通过ActiveMQ实现JMS的消息队列实例
- JMS消息发送和接收实例 - 点对点模式
- 【消息队列】二、Spring整合JMS(消息中间件)实例
- Android编程中selector背景选择器用法实例分析
- 通过会话Bean发送JMS消息给MDB的实例
- JMS消息选择器
- activemq 消息选择器Selector
- JMS - 消息选择器
- ActiveMQ实例2--Spring JMS发送消息