Camel ActiveMQ topic route with jms selector
2013-02-28 20:56
344 查看
http://www.andrejkoelewijn.com/blog/2011/02/21/camel-activemq-topic-route-with-jms-selector/
Here’s a quick example how to create a camel route that creates a route from an ActiveMQ topic. It uses a selector to receive a subset of all messages.
To test this you should start ActiveMQ in the unit test. The test itself uses a mock endpoint to specify how many messages you expect, and what the contents of the messages should be.
================
http://fusesource.com/docs/router/2.4/eip/MsgEnd-Selective.html
Figure 9.5, describes a consumer that applies a filter to incoming messages, so that only messages meeting specific selection criteria are processed.
Figure 9.5. Selective Consumer Pattern
![](http://fusesource.com/docs/router/2.4/eip/images/selective_consumer.gif)
You can implement the selective consumer pattern in Apache Camel using one of the following approaches:
JMS selector
JMS selector in ActiveMQ
Message filter
Where the selector string,
JMS selectors.
For more details, see
ActiveMQ: JMS Selectors and
ActiveMQ Message Properties.
The same route can be defined using XML configuration, as follows:
For more information about the Apache Camel filter processor, see
Message Filter.
Here’s a quick example how to create a camel route that creates a route from an ActiveMQ topic. It uses a selector to receive a subset of all messages.
import org.apache.camel.builder.RouteBuilder; public class SubscriberRouteBuilder extends RouteBuilder { public String resultEndpoint = "mock:result"; @Override public void configure() throws Exception { from("activemq:topic:STOCKS?clientId=1&durableSubscriptionName=dsn1&selector=SE='NASDAQ'") .to("log:stocks?showAll=true") .to(resultEndpoint); } }
To test this you should start ActiveMQ in the unit test. The test itself uses a mock endpoint to specify how many messages you expect, and what the contents of the messages should be.
import java.util.Arrays;import java.util.concurrent.TimeUnit;import org.apache.activemq.broker.BrokerService;import org.apache.camel.builder.RouteBuilder;import org.apache.camel.test.junit4.CamelTestSupport;import org.junit.BeforeClass;import org.junit.Test;public class SubscriberRouteBuilderTest extends CamelTestSupport { private String rep = "mock:result"; @Override protected RouteBuilder createRouteBuilder() throws Exception { SubscriberRouteBuilder srb = new SubscriberRouteBuilder(); srb.resultEndpoint = rep; return new SubscriberRouteBuilder(); } private static BrokerService brokerSvc = null; @BeforeClass public static void setupClass() throws Exception { brokerSvc = new BrokerService(); brokerSvc.setBrokerName("TestBroker"); brokerSvc.addConnector("tcp://localhost:61616"); brokerSvc.start(); } @AfterClass public static void shutdownClass() throws Exception { brokerSvc.stop(); } @Test public void testConfigure() throws Exception { String[] nasdaqStocks = new String[]{"AAPL=350.56","ORCL=33.68","CSCO=18.85","GOOG=630.08","AAPL=350.10"}; getMockEndpoint(rep).expectedMessageCount(5); getMockEndpoint(rep).expectedBodiesReceived(Arrays.asList(nasdaqStocks)); template.sendBodyAndHeader("activemq:topic:STOCKS", nasdaqStocks[0], "SE", "NASDAQ"); template.sendBodyAndHeader("activemq:topic:STOCKS", "HD=38.48", "SE", "DOWJONES"); template.sendBodyAndHeader("activemq:topic:STOCKS", nasdaqStocks[1], "SE", "NASDAQ"); template.sendBodyAndHeader("activemq:topic:STOCKS", "HD=38.00", "SE", "DOWJONES"); template.sendBodyAndHeader("activemq:topic:STOCKS", nasdaqStocks[2], "SE", "NASDAQ"); template.sendBodyAndHeader("activemq:topic:STOCKS", nasdaqStocks[3], "SE", "NASDAQ"); template.sendBodyAndHeader("activemq:topic:STOCKS", nasdaqStocks[4], "SE", "NASDAQ"); assertMockEndpointsSatisfied(10, TimeUnit.SECONDS); }}
================
http://fusesource.com/docs/router/2.4/eip/MsgEnd-Selective.html
Selective Consumer
Overview
The selective consumer pattern, shown inFigure 9.5, describes a consumer that applies a filter to incoming messages, so that only messages meeting specific selection criteria are processed.
Figure 9.5. Selective Consumer Pattern
![](http://fusesource.com/docs/router/2.4/eip/images/selective_consumer.gif)
You can implement the selective consumer pattern in Apache Camel using one of the following approaches:
JMS selector
JMS selector in ActiveMQ
Message filter
JMS selector
A JMS selector is a predicate expression involving JMS headers and JMS properties. If the selector evaluates totrue, the JMS message is allowed to reach the consumer, and if the selector evaluates to
false, the JMS message is blocked. For example, to consume messages from the queue,
selective, and select only those messages whose country code property is equal to
US, you can use the following Java DSL route:
from("jms:selective?selector=" + java.net.URLEncoder.encode("CountryCode='US'","UTF-8")). to("cxf:bean:replica01");
Where the selector string,
CountryCode='US', must be URL encoded (using UTF-8 characters) to avoid trouble with parsing the query options. This example presumes that the JMS property,
CountryCode, is set by the sender. For more details about JMS selectors, see
JMS selectors.
![]() | Note |
---|---|
If a selector is applied to a JMS queue, messages that are not selected remain on the queue and are potentially available to other consumers attached to the same queue. |
JMS selector in ActiveMQ
You can also define JMS selectors on ActiveMQ endpoints. For example:from("acivemq:selective?selector=" + java.net.URLEncoder.encode("CountryCode='US'","UTF-8")). to("cxf:bean:replica01");
For more details, see
ActiveMQ: JMS Selectors and
ActiveMQ Message Properties.
Message filter
If it is not possible to set a selector on the consumer endpoint, you can insert a filter processor into your route instead. For example, you can define a selective consumer that processes only messages with a US country code using Java DSL, as follows:from("seda:a").filter(header("CountryCode").isEqualTo("US")).process(myProcessor);
The same route can be defined using XML configuration, as follows:
<camelContext id="buildCustomProcessorWithFilter" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="seda:a"/> <filter> <xpath>$CountryCode = 'US'</xpath> <process ref="#myProcessor"/> </filter> </route> </camelContext>
For more information about the Apache Camel filter processor, see
Message Filter.
![]() | Warning |
---|---|
Be careful about using a message filter to select messages from a JMS queue. When using a filter processor, blocked messages are simply discarded. Hence, if the messages are consumed from a queue (which allows each message to be consumed only once—see Competing Consumers), then blocked messages are not processed at all. This might not be the behavior you want. |
相关文章推荐
- 深入浅出JMS(五)--ActiveMQ Topic发布订阅消息
- Efficient Lightweight JMS with Spring and ActiveMQ
- JMS学习十一(Spring+ActiveMQ消息持久化,Topic持久化订阅)
- JMS 服务器ActiveMQ Queue和Topic区别
- Efficient Lightweight JMS with Spring and ActiveMQ
- Spring JMS ActiveMQ Topic Example
- JMS 服务器ActiveMQ Queue和Topic区别
- ofbiz jms activemq
- 分布式消息队列RocketMQ源码分析之1 -- Topic路由数据结构解析 -- topicRoute与topicPublishInfo与queueId
- JMS ActiveMQ案例
- jms activeMQ与spring的集成(转载)
- (六)Spring+JMS+ActiveMQ+Tomcat实现消息服务
- ActiveMQ JMS 项目 基于 Maven 搭建 部署
- spring boot jms activemq
- JMS ActiveMQ
- tomcat监听activemq jms配置
- 基于Tomcat + JNDI + ActiveMQ实现JMS的点对点消息传送
- JMS——Spring+ActiveMQ
- 转: 使用Jmeter创建ActiveMQ JMS POINT TO POINT请求,环境搭建、请求创建、插件安装、监听服务器资源等
- jms activeMQ 应用