您的位置:首页 > 其它

Camel自定义组件示例

2015-06-15 23:09 483 查看
要想在Camel中自定义组件,就要对Camel中关键概念进行理解,了解Camel中路由的构建过程与启动过程,在前面的文章中已经有相关的叙述。

这里就给出一个自定义组件的例子。该例子还是以文件轮询为主题,下面是具体代码。

组件类:

[java] view
plaincopy

package com.xtayfjpk.esb.components.file;  

  

import java.util.Map;  

  

import org.apache.camel.Endpoint;  

import org.apache.camel.impl.DefaultComponent;  

  

public class MyFileComponent extends DefaultComponent {  

  

    @Override  

    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {  

        return new MyFileEndpoint(this, uri);  

    }  

  

}  

Endpoint类:

[java] view
plaincopy

package com.xtayfjpk.esb.components.file;  

  

import java.io.File;  

  

import org.apache.camel.Consumer;  

import org.apache.camel.Exchange;  

import org.apache.camel.Processor;  

import org.apache.camel.Producer;  

import org.apache.camel.component.file.FileComponent;  

import org.apache.camel.impl.DefaultEndpoint;  

import org.apache.camel.impl.DefaultExchange;  

  

public class MyFileEndpoint extends DefaultEndpoint {  

  

    public MyFileEndpoint(MyFileComponent component, String uri) {  

        super(uri, component);  

    }  

  

    @Override  

    public Producer createProducer() throws Exception {  

        return new MyFileProducer(this);  

    }  

  

    @Override  

    public Consumer createConsumer(Processor processor) throws Exception {  

        return new MyFileConsumer(this, processor);  

    }  

  

    @Override  

    public boolean isSingleton() {  

        return false;  

    }  

      

    public Exchange createExchange(File file) {  

        Exchange exchange = new DefaultExchange(getCamelContext());  

        exchange.setProperty(FileComponent.FILE_EXCHANGE_FILE, file);  

        exchange.getIn().setBody(file, File.class);  

        return exchange;  

    }  

}  

Consumer类:

[java] view
plaincopy

package com.xtayfjpk.esb.components.file;  

  

import java.io.File;  

import java.util.concurrent.Executors;  

import java.util.concurrent.ScheduledExecutorService;  

import java.util.concurrent.TimeUnit;  

  

import org.apache.camel.Endpoint;  

import org.apache.camel.Exchange;  

import org.apache.camel.Processor;  

import org.apache.camel.impl.DefaultConsumer;  

  

public class MyFileConsumer extends DefaultConsumer implements Runnable {  

    private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);  

    private File pollDir;  

      

    public MyFileConsumer(Endpoint endpoint, Processor processor) {  

        super(endpoint, processor);  

          

        String pollDir = endpoint.getEndpointUri().substring(endpoint.getEndpointUri().indexOf(":")+1);  

        this.pollDir = new File(pollDir);  

    }  

      

      

    @Override  

    protected void doStart() throws Exception {  

        super.doStart();  

          

        executorService.scheduleAtFixedRate(this, 1000L, 1000L, TimeUnit.MILLISECONDS);  

          

    }  

  

  

    @Override  

    public void run() {  

        File[] files = pollDir.listFiles();  

        for(File file : files) {  

            MyFileEndpoint endpoint = (MyFileEndpoint) getEndpoint();  

            Exchange exchange = endpoint.createExchange(file);  

            try {  

                processExchange(exchange);  

            } catch (Exception e) {  

                e.printStackTrace();  

            }  

        }  

          

    }  

  

  

    private void processExchange(Exchange exchange) throws Exception {  

        this.getProcessor().process(exchange);  

    }  

  

}  

Producer类:

[java] view
plaincopy

package com.xtayfjpk.esb.components.file;  

  

import java.io.File;  

  

import org.apache.camel.Endpoint;  

import org.apache.camel.Exchange;  

import org.apache.camel.impl.DefaultProducer;  

import org.apache.commons.io.FileUtils;  

  

public class MyFileProducer extends DefaultProducer {  

    private File outputDir;  

  

    public MyFileProducer(Endpoint endpoint) {  

        super(endpoint);  

          

        this.outputDir = new File(endpoint.getEndpointUri().substring(endpoint.getEndpointUri().indexOf(":")+1));  

    }  

  

    @Override  

    public void process(Exchange exchange) throws Exception {  

        File file = exchange.getIn().getBody(File.class);  

        if(file!=null) {  

            FileUtils.moveFileToDirectory(file, outputDir, true);  

        }  

    }  

  

}  

测试类:

[java] view
plaincopy

package com.xtayfjpk.esb.components.file;  

  

import java.io.BufferedReader;  

import java.io.File;  

import java.io.FileInputStream;  

import java.io.InputStreamReader;  

import java.io.PrintStream;  

  

import org.apache.camel.CamelContext;  

import org.apache.camel.Exchange;  

import org.apache.camel.Processor;  

import org.apache.camel.builder.RouteBuilder;  

import org.apache.camel.impl.DefaultCamelContext;  

  

public class MyComponentTest {  

  

    /** 

     * @param args 

     * @throws Exception  

     */  

    public static void main(String[] args) throws Exception {  

        CamelContext camelContext = new DefaultCamelContext();  

        //为了简单起见,用该方法替代了在类路径META-INF/services/org/apache/camel/component/下放置properties文件方式  

        camelContext.addComponent("myfile", new MyFileComponent());  

  

        camelContext.addRoutes(new RouteBuilder() {  

              

            @Override  

            public void configure() throws Exception {  

                this.from("myfile:H:/temp/in").process(new Processor() {  

                    @Override  

                    public void process(Exchange exchange) throws Exception {  

                        File file = exchange.getIn().getBody(File.class);  

                        PrintStream ps = new PrintStream(System.out);  

                        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(file)));  

                        String line = null;  

                        while((line=br.readLine())!=null) {  

                            ps.println(line);  

                        }  

                          

                        ps.close();  

                        br.close();  

                    }  

                }).to("myfile:H:/temp/out");  

                  

            }  

        });  

          

        camelContext.start();  

          

        Object object = new Object();  

        synchronized (object) {  

            object.wait();  

        }  

    }  

  

}  

如果一个组件是一个起始组件,或称为源组件,那么对应的Endpoint就必须要能创建出一个Consumer对象,如果这个组件还想是目的组件,那么对应的Endpoint还必须能创建出Producer对象。Consumer负责Exchange对象的产生,而Producer负责将Exchange对象中的消费。当然在真实应该中组件不可能写得如此简单,但大致流程是一样的,在这个例子中已经能够简单实现轮询出组件并搬运至指定目录了。

ref:http://blog.csdn.net/xtayfjpk/article/details/39122349
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: