Camel自定义组件示例
2015-06-15 23:09
483 查看
要想在Camel中自定义组件,就要对Camel中关键概念进行理解,了解Camel中路由的构建过程与启动过程,在前面的文章中已经有相关的叙述。
这里就给出一个自定义组件的例子。该例子还是以文件轮询为主题,下面是具体代码。
组件类:
[java] view
plaincopy
package com.xtayfjpk.esb.components.file;
import java.util.Map;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.DefaultComponent;
public class MyFileComponent extends DefaultComponent {
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
return new MyFileEndpoint(this, uri);
}
}
Endpoint类:
[java] view
plaincopy
package com.xtayfjpk.esb.components.file;
import java.io.File;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.component.file.FileComponent;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;
public class MyFileEndpoint extends DefaultEndpoint {
public MyFileEndpoint(MyFileComponent component, String uri) {
super(uri, component);
}
@Override
public Producer createProducer() throws Exception {
return new MyFileProducer(this);
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
return new MyFileConsumer(this, processor);
}
@Override
public boolean isSingleton() {
return false;
}
public Exchange createExchange(File file) {
Exchange exchange = new DefaultExchange(getCamelContext());
exchange.setProperty(FileComponent.FILE_EXCHANGE_FILE, file);
exchange.getIn().setBody(file, File.class);
return exchange;
}
}
Consumer类:
[java] view
plaincopy
package com.xtayfjpk.esb.components.file;
import java.io.File;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
public class MyFileConsumer extends DefaultConsumer implements Runnable {
private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
private File pollDir;
public MyFileConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
String pollDir = endpoint.getEndpointUri().substring(endpoint.getEndpointUri().indexOf(":")+1);
this.pollDir = new File(pollDir);
}
@Override
protected void doStart() throws Exception {
super.doStart();
executorService.scheduleAtFixedRate(this, 1000L, 1000L, TimeUnit.MILLISECONDS);
}
@Override
public void run() {
File[] files = pollDir.listFiles();
for(File file : files) {
MyFileEndpoint endpoint = (MyFileEndpoint) getEndpoint();
Exchange exchange = endpoint.createExchange(file);
try {
processExchange(exchange);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void processExchange(Exchange exchange) throws Exception {
this.getProcessor().process(exchange);
}
}
Producer类:
[java] view
plaincopy
package com.xtayfjpk.esb.components.file;
import java.io.File;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
import org.apache.commons.io.FileUtils;
public class MyFileProducer extends DefaultProducer {
private File outputDir;
public MyFileProducer(Endpoint endpoint) {
super(endpoint);
this.outputDir = new File(endpoint.getEndpointUri().substring(endpoint.getEndpointUri().indexOf(":")+1));
}
@Override
public void process(Exchange exchange) throws Exception {
File file = exchange.getIn().getBody(File.class);
if(file!=null) {
FileUtils.moveFileToDirectory(file, outputDir, true);
}
}
}
测试类:
[java] view
plaincopy
package com.xtayfjpk.esb.components.file;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
public class MyComponentTest {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
CamelContext camelContext = new DefaultCamelContext();
//为了简单起见,用该方法替代了在类路径META-INF/services/org/apache/camel/component/下放置properties文件方式
camelContext.addComponent("myfile", new MyFileComponent());
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
this.from("myfile:H:/temp/in").process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
File file = exchange.getIn().getBody(File.class);
PrintStream ps = new PrintStream(System.out);
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
String line = null;
while((line=br.readLine())!=null) {
ps.println(line);
}
ps.close();
br.close();
}
}).to("myfile:H:/temp/out");
}
});
camelContext.start();
Object object = new Object();
synchronized (object) {
object.wait();
}
}
}
如果一个组件是一个起始组件,或称为源组件,那么对应的Endpoint就必须要能创建出一个Consumer对象,如果这个组件还想是目的组件,那么对应的Endpoint还必须能创建出Producer对象。Consumer负责Exchange对象的产生,而Producer负责将Exchange对象中的消费。当然在真实应该中组件不可能写得如此简单,但大致流程是一样的,在这个例子中已经能够简单实现轮询出组件并搬运至指定目录了。
ref:http://blog.csdn.net/xtayfjpk/article/details/39122349
这里就给出一个自定义组件的例子。该例子还是以文件轮询为主题,下面是具体代码。
组件类:
[java] view
plaincopy
package com.xtayfjpk.esb.components.file;
import java.util.Map;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.DefaultComponent;
public class MyFileComponent extends DefaultComponent {
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
return new MyFileEndpoint(this, uri);
}
}
Endpoint类:
[java] view
plaincopy
package com.xtayfjpk.esb.components.file;
import java.io.File;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.component.file.FileComponent;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;
public class MyFileEndpoint extends DefaultEndpoint {
public MyFileEndpoint(MyFileComponent component, String uri) {
super(uri, component);
}
@Override
public Producer createProducer() throws Exception {
return new MyFileProducer(this);
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
return new MyFileConsumer(this, processor);
}
@Override
public boolean isSingleton() {
return false;
}
public Exchange createExchange(File file) {
Exchange exchange = new DefaultExchange(getCamelContext());
exchange.setProperty(FileComponent.FILE_EXCHANGE_FILE, file);
exchange.getIn().setBody(file, File.class);
return exchange;
}
}
Consumer类:
[java] view
plaincopy
package com.xtayfjpk.esb.components.file;
import java.io.File;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
public class MyFileConsumer extends DefaultConsumer implements Runnable {
private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
private File pollDir;
public MyFileConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
String pollDir = endpoint.getEndpointUri().substring(endpoint.getEndpointUri().indexOf(":")+1);
this.pollDir = new File(pollDir);
}
@Override
protected void doStart() throws Exception {
super.doStart();
executorService.scheduleAtFixedRate(this, 1000L, 1000L, TimeUnit.MILLISECONDS);
}
@Override
public void run() {
File[] files = pollDir.listFiles();
for(File file : files) {
MyFileEndpoint endpoint = (MyFileEndpoint) getEndpoint();
Exchange exchange = endpoint.createExchange(file);
try {
processExchange(exchange);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void processExchange(Exchange exchange) throws Exception {
this.getProcessor().process(exchange);
}
}
Producer类:
[java] view
plaincopy
package com.xtayfjpk.esb.components.file;
import java.io.File;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
import org.apache.commons.io.FileUtils;
public class MyFileProducer extends DefaultProducer {
private File outputDir;
public MyFileProducer(Endpoint endpoint) {
super(endpoint);
this.outputDir = new File(endpoint.getEndpointUri().substring(endpoint.getEndpointUri().indexOf(":")+1));
}
@Override
public void process(Exchange exchange) throws Exception {
File file = exchange.getIn().getBody(File.class);
if(file!=null) {
FileUtils.moveFileToDirectory(file, outputDir, true);
}
}
}
测试类:
[java] view
plaincopy
package com.xtayfjpk.esb.components.file;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
public class MyComponentTest {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
CamelContext camelContext = new DefaultCamelContext();
//为了简单起见,用该方法替代了在类路径META-INF/services/org/apache/camel/component/下放置properties文件方式
camelContext.addComponent("myfile", new MyFileComponent());
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
this.from("myfile:H:/temp/in").process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
File file = exchange.getIn().getBody(File.class);
PrintStream ps = new PrintStream(System.out);
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
String line = null;
while((line=br.readLine())!=null) {
ps.println(line);
}
ps.close();
br.close();
}
}).to("myfile:H:/temp/out");
}
});
camelContext.start();
Object object = new Object();
synchronized (object) {
object.wait();
}
}
}
如果一个组件是一个起始组件,或称为源组件,那么对应的Endpoint就必须要能创建出一个Consumer对象,如果这个组件还想是目的组件,那么对应的Endpoint还必须能创建出Producer对象。Consumer负责Exchange对象的产生,而Producer负责将Exchange对象中的消费。当然在真实应该中组件不可能写得如此简单,但大致流程是一样的,在这个例子中已经能够简单实现轮询出组件并搬运至指定目录了。
ref:http://blog.csdn.net/xtayfjpk/article/details/39122349
相关文章推荐
- Java系列笔记(3) - Java 内存区域和GC机制
- maven2打包不同jdk版本的包
- SBCL 从REPL 中提取lisp代码
- python学习――yield用法
- Java面向对象 匿名内部类
- Number of Islands
- HDU3234&&UVA12232&&LA4487:Exclusive-OR(经典带权并查集)
- 【Mybatis框架】输出映射-resultType与resultMap
- maven中snapshot快照库和release发布库的区别和作用
- Jquery ajax传递复杂参数给WebService
- 7.6
- Windows Phone 六、JSON序列化
- MySQL优化GROUP BY-松散索引扫描与紧凑索引扫描
- Uva - 548 - Tree
- Uva - 548 - Tree
- maven2-snapshot快照库和release发布库的应用
- Android开发由eclipse转Android Studio中一些常见出错问题解决方法
- epoll使用详解
- 44Exchange 2010升级到Exchange 2013-公网切换
- Maven2-profile多环境配置