您的位置:首页 > 运维架构 > Apache

Camel - Route

2017-09-26 09:39 651 查看

1 Routing in Camel

Routing is a step-by-step movement of the message,which originates from an endpoint in the role of a consumer.

The consumer could be receiving the message from an external service, polling for the message on some system, or even creating the message itself.

This message then flows through a processing component, which could be an enterprise integration pattern (EIP), a processor, an interceptor, or some other custom creation.

The message is finally sent to a target endpoint that’s in the role of a producer.

2 Create Route in Java

org.apache.camel.builder.RouteBuilder

the Java DSL

RouteBuilder

RouteBuilders are used to create routes in Camel.

Each RouteBuilder can create multiple routes.

1) 自定义 MyRouteBuilder 类继承 RouteBuilder

class MyRouteBuilder extends RouteBuilder {
public void configure() throws Exception {
...
}
}


2)加入 CamelContext

CamelContext context = new DefaultCamelContext();
context.addRoutes(new MyRouteBuilder());


也可以使用匿名内部类的形式

CamelContext context = new DefaultCamelContext();
context.addRoutes(new RouteBuilder() {
public void configure() throws Exception {
...
}
});


Using the Java DSLs

Domain-specific languages (DSLs) are computer languages that target a specific problem domain, rather than a general purpose domain like most programming languages.

The regular expression DSL is an external DSL — it has a custom syntax and so requires a separate compiler or interpreter to execute.

Internal DSLs, in contrast, use an existing general purpose language, such as Java, in such a way that the DSL feels like a language from a particular domain.

Camel’s domain is enterprise integration, so the Java DSL is essentially a set of fluent interfaces that contain methods named after terms from the EIP book.

Polling for FTP messages and sending them to the JMS queue

from("ftp://rider.com/orders?username=rider&password=secret")
.to("jms:incomingOrders");


ADDING A PROCESSOR

from("ftp://rider.com/orders?username=rider&password=secret")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
System.out.println("We just downloaded: "+ exchange.getIn().getHeader("CamelFileName"));
}
}).to("jms:incomingOrders");


3 Create Route with Spring

前提:要先集成 Spring ,详见 Camel-Spring 笔记

Camel provides custom XML extensions that we call the Spring DSL.

The Spring DSL allows you to do almost everything you can do in the Java DSL.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

<bean id="ftpRouteBuilder" class="examples.FtpRouteBuilder" />

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<route>
<from
uri="ftp://127.0.0.1/datas?noop=true&username=FTPUSER&password=Woaih3c" />
<to uri="file://target/datas?noop=true" />
</route>
<route>
<from uri="file://target/datas?noop=true" />
<to uri="bean:PromptBean?method=prompt" />
</route>
</camelContext>

</beans>


4 EIPs

Camel supports most of the Enterprise Integration Patterns from the excellent book by Gregor Hohpe and Bobby Woolf.

这本书里的 EIPs 可分为以下几大类:

Messaging Systems

Messaging Channels

Message Construction

Message Routing

Message Transformation

Messaging Endpoints

System Management

5 Content-based Router

A Content-Based Router (CBR) is a message router that routes a message to a destination based on its content.

The content could be a message header, the payload data type, part of the payload itself — pretty much anything in the message exchange.

To do the conditional routing required by the CBR, Camel introduces a few keywords in the DSL.

choise — 有点像 switch

when — 有点像 case

otherwise — 有点像 default

end — 关闭 choise ,之后可以继续接 to 语句在条件选择结果路由之后继续路由

predicate — 条件判断接口,其方法返回 true or false,支持表达式语法

from("jms:incomingOrders")
.choice()
.when(predicate)
.to("jms:xmlOrders")
.when(predicate)
.to("jms:csvOrders");


public interface Predicate {
boolean matches(Exchange exchange);
}


根据文件扩展名将消息路由到不同的处理单元:

from("jms:incomingOrders")
.choice()
.when(header("CamelFileName").endsWith(".xml"))
.to("jms:xmlOrders")
.when(header("CamelFileName").endsWith(".csv"))
.to("jms:csvOrders");


使用 end 继续路由:

from("jms:incomingOrders")
.choice()
.when(header("CamelFileName").endsWith(".xml"))
.to("jms:xmlOrders")
.when(header("CamelFileName").endsWith(".csv"))
.to("jms:csvOrders");
.end()
.to("jms:continuedProcessing");


在 Spring 中实现:

<route>
<from uri="jms:incomingOrders"/>
<choice>
<when>
<simple>${header.CamelFileName} regex '^.*xml$'</simple>
<to uri="jms:xmlOrders"/>
</when>
<when>
<simple>${header.CamelFileName} regex '^.*(csv|csl)$'</simple>
<to uri="jms:csvOrders"/>
</when>
<otherwise>
<to uri="jms:badOrders"/>
<stop/>
</otherwise>
</choice>
<to uri="jms:continuedProcessing"/>
</route>


6 Message Filters

A Message Filter allows you to filter out uninteresting messages based on some condition.

在处理 XML文件时,要过滤到测试用的文件:test=”true”

<?xml version="1.0" encoding="UTF-8"?>
<order name="motor" amount="1" customer="foo" test="true"/>


DSL in Java :

from("jms:xmlOrders")
.filter(xpath("/order[not(@test)]"))
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
System.out.println("Received XML order: "
+ exchange.getIn().getHeader("CamelFileName"));
}
});


DSL in Spring:

<route>
<from uri="jms:xmlOrders"/>
<filter>
<xpath>/order[not(@test)]</xpath>
<process ref="orderLogger"/>
</filter>
</route>


7 Multicasting

A multicast sends a message to a number of specified recipients.

DSL in Java:

from("jms:xmlOrders")
.multicast().to("jms:accounting", "jms:production");


By default, the multicast sends message copies sequentially.

Using extra DSL method : parallelProcessing can send messages in parallel.

from("jms:xmlOrders")
.multicast().parallelProcessing()
.to("jms:accounting", "jms:production");


A default thread pool size of 10 is used if you don’t specify anything else.

ExecutorService executor = Executors.newFixedThreadPool(16);
from("jms:xmlOrders")
.multicast().parallelProcessing().executorService(executor)
.to("jms:accounting", "jms:production");


By default, the multicast will continue sending messages to destinations even if one fails.

you can take advantage of the stopOnException feature of the multicast.

When enabled, this feature will stop the multicast on the first exception caught.

from("jms:xmlOrders")
.multicast().stopOnException()
.parallelProcessing().executorService(executor)
.to("jms:accounting", "jms:production");


DSL with Spring:

<route>
<from uri="jms:xmlOrders"/>
<multicast stopOnException="true" parallelProcessing="true"
executorServiceRef="executor">
<to uri="jms:accounting"/>
<to uri="jms:production"/>
</multicast>
</route>


<bean id="executor" class="java.util.concurrent.Executors"
factory-method="newFixedThreadPool">
<constructor-arg index="0" value="16"/>
</bean>


8 Using recipient lists



A recipient list inspects the incoming message and determines a list of recipients based on the content of the message.

In this case, the message is only sent to the A, B, and D destinations.

Note that the recipient list is different from the multicast because the list of recipients is dynamic.

Camel provides a recipientList method for implementing the Recipient List EIP.

from("jms:xmlOrders")
.recipientList(header("recipients"));


you could use an expression to create the list ”recipients”. And the expression result must be iterable.

from("jms:xmlOrders")
.setHeader("customer", xpath("/order/@customer"))
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
String recipients = "jms:accounting";
String customer =exchange.getIn().getHeader("customer",String.class);
if (customer.equals("honda")) {
recipients += ",jms:production";
}
exchange.getIn().setHeader("recipients", recipients);
}
})
.recipientList(header("recipients"));


DSL with Spring:

<route>
<from uri="jms:xmlOrders" />
<setHeader headerName="customer">
<xpath>/order/@customer</xpath>
</setHeader>
<process ref="calculateRecipients" />
<recipientList>
<header>recipients</header>
</recipientList>
</route>


Using RecipientList Annotation:

you can add a @RecipientList annotation to a method in a plain Java class (a Java bean).

The annotated method will be used to generate the list of recipients from the exchange.

This behavior only gets invoked if the class is used with Camel’s bean integration.

public class RecipientListBean {
@RecipientList
public String[] route( @XPath("/order/@customer") String customer) {
if ( isGoldCustomer(customer) ) {
return new String[] {"jms:accounting", "jms:production"};
} else {
return new String[] {"jms:accounting"};
}
}

private boolean isGoldCustomer(String customer) {
return customer.equals("honda");
}
}


from("jms:xmlOrders")
.bean(RecipientListBean.class);


9 Using the wireTap method



By using the wireTap method in the Java DSL, you can send a copy of the exchange

to a secondary destination without affecting the behavior of the rest of the route.

from("jms:incomingOrders")
.wireTap("jms:orderAudit")
.choice()
.when(header("CamelFileName").endsWith(".xml"))
.to("jms:xmlOrders")
.when(header("CamelFileName").regex("^.*(csv|csl)$"))
.to("jms:csvOrders")
.otherwise()
.to("jms:badOrders");
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  apache-camel route EIPs