您的位置:首页 > 运维架构 > Apache

Apache Camel框架之JMS路由

2016-01-12 10:57 666 查看


Apache Camel框架之JMS路由

本文简单的介绍和示例一个用Camel实现这样的需求:监听某一个文件夹是否有文件,取到文件后发送到另外一个系统监听的queue.(图片来源于Camel in Action)


1,因为要用JMS,这里介绍一个open source的activeMQ,可以从http://activemq.apache.org/download.html 下载,下载后解压,bin目录有一个activemq.bat文件,在命令行里运行activemq 启动activeMQ,如果能从从浏览器里访问 http://localhost:8161/admin/则activeMQ成功启动了.
2,在Camel里实现上图所示的路由:JAVA项目里需要将activeMQ的jar包配置到classpath下,Java代码如下:

[java] view
plaincopy

private static String user = ActiveMQConnection.DEFAULT_USER;

private static String password = ActiveMQConnection.DEFAULT_PASSWORD;

private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

public static void main(String args[]) throws Exception {

CamelContext context = new DefaultCamelContext();

ConnectionFactory connectionFactory =

new ActiveMQConnectionFactory(user, password, url);

context.addComponent("jms",

JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

System.out.println(url + " " + user + password);

context.addRoutes(new RouteBuilder() {

public void configure() {

from("file:d:/temp/inbox").to(

"jms:queue:TOOL.DEFAULT");

}

});

context.start();

boolean loop = true;

while (loop) {

Thread.sleep(25000);

}

context.stop();

}

Camel会在路由的时候将文件的内容以binary message发到activeMQ的名为'TOOL.DEFAULT'的queue .

当然也可以用如下的方式发送textmessage:from("file:d:/temp/inbox").convertBodyTo(String.class).to("jms:queue:TOOL.DEFAULT");

用下面的代码可以从Camel发送的queue里取到消息.

[java] view
plaincopy

private static String user = ActiveMQConnection.DEFAULT_USER;

private static String password = ActiveMQConnection.DEFAULT_PASSWORD;

private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

private static boolean transacted;

private static int ackMode = Session.AUTO_ACKNOWLEDGE;

public static void main(String[] args) throws Exception {

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);

Connection connection = connectionFactory.createConnection();

connection.start();

Session session = connection.createSession(transacted, ackMode);

Destination destination = session.createQueue("TOOL.DEFAULT");

MessageConsumer consumer = session.createConsumer(destination);

Message message = consumer.receive(1000);

if (message instanceof TextMessage) {

TextMessage txtMsg = (TextMessage) message;

System.out.println("Received Text message : " + txtMsg.getText());

} else if(message != null){

BytesMessage bytesMsg = (BytesMessage) message;

byte[] bytes = new byte[(int) bytesMsg.getBodyLength()];

bytesMsg.readBytes(bytes);

System.out.println("Received byte message: " + new String(bytes));

}

consumer.close();

session.close();

connection.close();

}

同样,上面的路由也可以通过Spring配置实现:

[html] view
plaincopy

<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">

<property name="connectionFactory">

<bean class="org.apache.activemq.ActiveMQConnectionFactory">

<property name="brokerURL" value="failover://tcp://localhost:61616"/>

</bean>

</property>

</bean>

<camelContext xmlns="http://camel.apache.org/schema/spring">

<route>

<from uri="file:d:/temp/inbox"/>

<to uri="jms:queue:TOOL.DEFAULT"/>

</route>

</camelContext>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: