您的位置:首页 > 编程语言 > Java开发

ActiveMQ5.0.1+Spring实现JMS异步消息发送

2009-08-13 21:04 651 查看
先到http://www.activemq.org(官网)上面去下载ActiveMQ5.0.1. 把一些毕要的jar包加入到项目的lib目录下面.

至于jar包的添加在官方网上都有介绍.

官方网上的资料信息会给你的开发带来很大的帮助.这是个人的感触虽然是全英文的.但哪些英语并不难.

下面是Spring的配置文件.

Xml代码



<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:amq="http://activemq.apache.org/schema/core"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schem
/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring

/camel-spring.xsd">

<!-- 数据源 -->

<bean id="dataSource"

class="org.apache.commons.dbcp.BasicDataSource"

destroy-method="close">

<property name="driverClassName"

value="oracle.jdbc.driver.OracleDriver" />

<property name="url"

value="jdbc:oracle:thin:@192.168.0.100:1521:ora10" />

<property name="username" value="ceipportal" />

<property name="password" value="ceipportal" />

</bean>

<!-- Spring中的jdbc模型 -->

<bean id="jdbcTemplate"

class="org.springframework.jdbc.core.JdbcTemplate">

<property name="dataSource">

<ref bean="dataSource" />

</property>

</bean>

<!-- 配置connectionFactory -->

<bean id="jmsFactory"

class="org.apache.activemq.pool.PooledConnectionFactory"

destroy-method="stop">

<property name="connectionFactory">

<bean

class="org.apache.activemq.ActiveMQConnectionFactory">

<property name="brokerURL">

<value>tcp://localhost:61616</value>

</property>

</bean>

</property>

</bean>

<!-- Spring JMS Template -->

<bean id="myJmsTemplate"

class="org.springframework.jms.core.JmsTemplate">

<property name="connectionFactory">

<ref local="jmsFactory" />

</property>

<property name="defaultDestinationName" value="subject" />

<property name="messageConverter" ref="conversion" />

<!-- 区别它采用的模式为false是p2p为true是订阅 -->

<property name="pubSubDomain" value="false" />

</bean>

<!-- 读取信息 -->

<bean id="consumer" class="org.spring.activeDemo01.Consumer">

<property name="jdbcTemplate" ref="jdbcTemplate" />

<property name="jmsTemplate" ref="myJmsTemplate" />

</bean>

<!-- 发送信息 -->

<bean id="producer" class="org.spring.activeDemo01.Producer">

<property name="jmsTemplet" ref="myJmsTemplate" />

<!-- <property name="destination" ref="destination"/>-->

<!-- <property name="destinationName" value="subject"/> -->

</bean>

<!-- 消息监听 -->

<bean id="listenerContainer"

class="org.springframework.jms.listener.DefaultMessageListenerContainer">

<property name="concurrentConsumers" value="5" />

<property name="connectionFactory" ref="jmsFactory" />

<property name="destinationName" value="subject" />

<property name="messageListener" ref="consumer" />

</bean>

<!-- 消息转换 -->

<bean id="conversion"

class="org.spring.activeDemo01.PaySettlementCoverter" />

<!-- POJO类 -->

<bean id="hello" class="org.spring.activeDemo01.Hello" />

</beans>

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schem /beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring
/camel-spring.xsd">

<!-- 数据源 -->
<bean id="dataSource"
class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName"
value="oracle.jdbc.driver.OracleDriver" />
<property name="url"
value="jdbc:oracle:thin:@192.168.0.100:1521:ora10" />
<property name="username" value="ceipportal" />
<property name="password" value="ceipportal" />
</bean>

<!-- Spring中的jdbc模型 -->
<bean id="jdbcTemplate"
class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource">
<ref bean="dataSource" />
</property>
</bean>

<!-- 配置connectionFactory -->
<bean id="jmsFactory"
class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<bean
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
</property>
</bean>

<!-- Spring JMS Template -->
<bean id="myJmsTemplate"
class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref local="jmsFactory" />
</property>
<property name="defaultDestinationName" value="subject" />
<property name="messageConverter" ref="conversion" />
<!-- 区别它采用的模式为false是p2p为true是订阅 -->
<property name="pubSubDomain" value="false" />
</bean>

<!-- 读取信息 -->
<bean id="consumer" class="org.spring.activeDemo01.Consumer">
<property name="jdbcTemplate" ref="jdbcTemplate" />
<property name="jmsTemplate" ref="myJmsTemplate" />
</bean>

<!-- 发送信息 -->
<bean id="producer" class="org.spring.activeDemo01.Producer">
<property name="jmsTemplet" ref="myJmsTemplate" />
<!--  <property name="destination" ref="destination"/>-->
<!--	<property name="destinationName" value="subject"/>	-->
</bean>

<!-- 消息监听	 -->
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="concurrentConsumers" value="5" />
<property name="connectionFactory" ref="jmsFactory" />
<property name="destinationName" value="subject" />
<property name="messageListener" ref="consumer" />
</bean>

<!-- 消息转换	 -->
<bean id="conversion"
class="org.spring.activeDemo01.PaySettlementCoverter" />

<!-- POJO类 -->
<bean id="hello" class="org.spring.activeDemo01.Hello" />
</beans>

发送信息类

Java代码



package org.spring.activeDemo01;

import org.springframework.jms.core.JmsTemplate;

public class Producer{

private JmsTemplate jmsTemplet = null ;

public void setJmsTemplet(JmsTemplate jmsTemplet) {

this.jmsTemplet = jmsTemplet;

}

public void simpleSend(Hello h) {

this.jmsTemplet.setDeliveryPersistent(true);

this.jmsTemplet.convertAndSend(h);

}

}

package org.spring.activeDemo01;
import org.springframework.jms.core.JmsTemplate;

public class Producer{
private JmsTemplate jmsTemplet = null ;
public void setJmsTemplet(JmsTemplate jmsTemplet) {
this.jmsTemplet = jmsTemplet;
}

public void simpleSend(Hello h) {
this.jmsTemplet.setDeliveryPersistent(true);
this.jmsTemplet.convertAndSend(h);
}
}

接收信息类

Java代码



public class Consumer implements MessageListener{

private JdbcTemplate jdbcTemplate;

private JmsTemplate jmsTemplate;

private static final String insert_sql = "insert into jms_queue_send (id, MessageID, MessageDetails) values (seq_jms_queue_id.nextval, ?,?)";

public JdbcTemplate getJdbcTemplate() {

return jdbcTemplate;

}

public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {

this.jdbcTemplate = jdbcTemplate;

}

public void onMessage(Message arg0) {

ActiveMQObjectMessage msg = (ActiveMQObjectMessage)arg0;

try {

System.out.print("From -->"+msg.getStringProperty("id"));

System.out.println("-->"+msg.getStringProperty("hello"));

/*String insert_sql = "insert into jms_queue_send (id, MessageID, MessageDetails)" +

" values (seq_jms_queue_id.nextval, '"+msg.getJMSMessageID()+"', '"+msg.getStringProperty("hello")+"')";

jdbcTemplate.execute(insert_sql);*/

} catch (JMSException e) {

e.printStackTrace();

}

}

public JmsTemplate getJmsTemplate() {

return jmsTemplate;

}

public void setJmsTemplate(JmsTemplate jmsTemplate) {

this.jmsTemplate = jmsTemplate;

}

}

public class Consumer implements MessageListener{
private JdbcTemplate  jdbcTemplate;
private JmsTemplate jmsTemplate;
private static final String insert_sql = "insert into jms_queue_send (id, MessageID, MessageDetails) values (seq_jms_queue_id.nextval, ?,?)";
public JdbcTemplate getJdbcTemplate() {
return jdbcTemplate;
}

public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}

public void onMessage(Message arg0) {

ActiveMQObjectMessage msg = (ActiveMQObjectMessage)arg0;
try {
System.out.print("From -->"+msg.getStringProperty("id"));
System.out.println("-->"+msg.getStringProperty("hello"));

/*String insert_sql = "insert into jms_queue_send (id, MessageID, MessageDetails)" +
" values (seq_jms_queue_id.nextval, '"+msg.getJMSMessageID()+"', '"+msg.getStringProperty("hello")+"')";
jdbcTemplate.execute(insert_sql);*/

} catch (JMSException e) {
e.printStackTrace();
}
}

public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}

public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}

}


消息转换类

Java代码



package org.spring.activeDemo01;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.ObjectMessage;

import javax.jms.Session;

import org.springframework.jms.support.converter.MessageConversionException;

import org.springframework.jms.support.converter.MessageConverter;

public class PaySettlementCoverter implements MessageConverter{

public Object fromMessage(Message message) throws JMSException,

MessageConversionException {

ObjectMessage objMsg = (ObjectMessage)message;

Hello hello = new Hello();

hello.setId(objMsg.getStringProperty("id"));

hello.setSaying(objMsg.getStringProperty("hello"));

return hello;

}

public Message toMessage(Object obj, Session session) throws JMSException,

MessageConversionException {

Hello hello = (Hello)obj;

ObjectMessage objMsg = session.createObjectMessage();

objMsg.setJMSCorrelationID("123654");

objMsg.setJMSReplyTo(objMsg.getJMSDestination());

objMsg.setStringProperty("id",hello.getId());

objMsg.setStringProperty("hello",hello.getSaying());

return objMsg;

}

}

package org.spring.activeDemo01;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;

public class PaySettlementCoverter implements MessageConverter{
public Object fromMessage(Message message) throws JMSException,
MessageConversionException {
ObjectMessage objMsg = (ObjectMessage)message;
Hello hello = new Hello();
hello.setId(objMsg.getStringProperty("id"));
hello.setSaying(objMsg.getStringProperty("hello"));
return hello;
}

public Message toMessage(Object obj, Session session) throws JMSException,
MessageConversionException {
Hello hello = (Hello)obj;
ObjectMessage objMsg = session.createObjectMessage();
objMsg.setJMSCorrelationID("123654");
objMsg.setJMSReplyTo(objMsg.getJMSDestination());
objMsg.setStringProperty("id",hello.getId());
objMsg.setStringProperty("hello",hello.getSaying());
return objMsg;
}
}


POJO类

Java代码



public class Hello {

/**

*

*/

private static final long serialVersionUID = 1L;

private String id;

private String saying;

public String getId() {

return id;

}

public void setId(String id) {

this.id = id;

}

public String getSaying() {

return saying;

}

public void setSaying(String saying) {

this.saying = saying;

}

}

public class Hello {
/**
*
*/
private static final long serialVersionUID = 1L;
private String id;
private String saying;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getSaying() {
return saying;
}
public void setSaying(String saying) {
this.saying = saying;
}
}


测试类

Java代码



import org.springframework.context.support.ClassPathXmlApplicationContext;

public class TestSender {

/**

* @param args

*/

public static void main(String[] args) {

ClassPathXmlApplicationContext c = new ClassPathXmlApplicationContext("appliactionContext.xml");

Producer d = (Producer)c.getBean("producer");

Hello h = (Hello)c.getBean("hello");

h.setId("123456789");

h.setSaying("Hello World.....!!! ");

d.simpleSend(h);

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: