您的位置:首页 > 运维架构

Camel ActiveMQ topic route with jms selector

2013-02-28 20:56 344 查看
http://www.andrejkoelewijn.com/blog/2011/02/21/camel-activemq-topic-route-with-jms-selector/

Here’s a quick example how to create a camel route that creates a route from an ActiveMQ topic. It uses a selector to receive a subset of all messages.

import org.apache.camel.builder.RouteBuilder;

public class SubscriberRouteBuilder extends RouteBuilder {
public String resultEndpoint = "mock:result";
@Override
public void configure() throws Exception {
from("activemq:topic:STOCKS?clientId=1&durableSubscriptionName=dsn1&selector=SE='NASDAQ'")
.to("log:stocks?showAll=true")
.to(resultEndpoint);
}

}

To test this you should start ActiveMQ in the unit test. The test itself uses a mock endpoint to specify how many messages you expect, and what the contents of the messages should be.

import java.util.Arrays;import
java.util.concurrent.TimeUnit;import
org.apache.activemq.broker.BrokerService;import
org.apache.camel.builder.RouteBuilder;import
org.apache.camel.test.junit4.CamelTestSupport;import
org.junit.BeforeClass;import org.junit.Test;public
class SubscriberRouteBuilderTest
extends CamelTestSupport
{ private
String rep =
"mock:result";
@Override protected
RouteBuilder createRouteBuilder()
throws Exception
{ SubscriberRouteBuilder srb
= new
SubscriberRouteBuilder(); srb.resultEndpoint
= rep;
return new
SubscriberRouteBuilder();
} private
static BrokerService brokerSvc
= null;
@BeforeClass public
static void setupClass()
throws Exception
{ brokerSvc =
new BrokerService(); brokerSvc.setBrokerName("TestBroker");
brokerSvc.addConnector("tcp://localhost:61616"); brokerSvc.start();
} @AfterClass
public static
void shutdownClass()
throws Exception
{ brokerSvc.stop();
} @Test
public void testConfigure()
throws Exception
{ String[] nasdaqStocks
= new
String[]{"AAPL=350.56","ORCL=33.68","CSCO=18.85","GOOG=630.08","AAPL=350.10"};
getMockEndpoint(rep).expectedMessageCount(5); getMockEndpoint(rep).expectedBodiesReceived(Arrays.asList(nasdaqStocks));
template.sendBodyAndHeader("activemq:topic:STOCKS", nasdaqStocks[0],
"SE",
"NASDAQ");
template.sendBodyAndHeader("activemq:topic:STOCKS",
"HD=38.48",
"SE",
"DOWJONES");
template.sendBodyAndHeader("activemq:topic:STOCKS", nasdaqStocks[1],
"SE",
"NASDAQ");
template.sendBodyAndHeader("activemq:topic:STOCKS",
"HD=38.00",
"SE",
"DOWJONES");
template.sendBodyAndHeader("activemq:topic:STOCKS", nasdaqStocks[2],
"SE",
"NASDAQ");
template.sendBodyAndHeader("activemq:topic:STOCKS", nasdaqStocks[3],
"SE",
"NASDAQ");
template.sendBodyAndHeader("activemq:topic:STOCKS", nasdaqStocks[4],
"SE",
"NASDAQ"); assertMockEndpointsSatisfied(10,
TimeUnit.SECONDS);
}}


================
http://fusesource.com/docs/router/2.4/eip/MsgEnd-Selective.html

Selective Consumer

Overview

The selective consumer pattern, shown in
Figure 9.5, describes a consumer that applies a filter to incoming messages, so that only messages meeting specific selection criteria are processed.

Figure 9.5. Selective Consumer Pattern



You can implement the selective consumer pattern in Apache Camel using one of the following approaches:

JMS selector

JMS selector in ActiveMQ

Message filter

JMS selector

A JMS selector is a predicate expression involving JMS headers and JMS properties. If the selector evaluates to
true
, the JMS message is allowed to reach the consumer, and if the selector evaluates to
false
, the JMS message is blocked. For example, to consume messages from the queue,
selective
, and select only those messages whose country code property is equal to
US
, you can use the following Java DSL route:

from("jms:selective?selector=" + java.net.URLEncoder.encode("CountryCode='US'","UTF-8")).
to("cxf:bean:replica01");

Where the selector string,
CountryCode='US'
, must be URL encoded (using UTF-8 characters) to avoid trouble with parsing the query options. This example presumes that the JMS property,
CountryCode
, is set by the sender. For more details about JMS selectors, see

JMS selectors.


Note
If a selector is applied to a JMS queue, messages that are not selected remain on the queue and are potentially available to other consumers attached to the same queue.

JMS selector in ActiveMQ

You can also define JMS selectors on ActiveMQ endpoints. For example:

from("acivemq:selective?selector=" + java.net.URLEncoder.encode("CountryCode='US'","UTF-8")).
to("cxf:bean:replica01");

For more details, see
ActiveMQ: JMS Selectors and
ActiveMQ Message Properties.

Message filter

If it is not possible to set a selector on the consumer endpoint, you can insert a filter processor into your route instead. For example, you can define a selective consumer that processes only messages with a US country code using Java DSL, as follows:

from("seda:a").filter(header("CountryCode").isEqualTo("US")).process(myProcessor);

The same route can be defined using XML configuration, as follows:

<camelContext id="buildCustomProcessorWithFilter" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="seda:a"/>
<filter>
<xpath>$CountryCode = 'US'</xpath>
<process ref="#myProcessor"/>
</filter>
</route>
</camelContext>

For more information about the Apache Camel filter processor, see
Message Filter.


Warning
Be careful about using a message filter to select messages from a JMS
queue. When using a filter processor, blocked messages are simply discarded. Hence, if the messages are consumed from a queue (which allows each message to be consumed only once—see

Competing Consumers), then blocked messages are not processed at all. This might not be the behavior you want.

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: