您的位置:首页 > 编程语言 > Java开发

Activemq5.2.0的spring开发

2009-03-22 17:39 246 查看
  在本篇我要做一个spring和Activemq结合的例子。将activemq和spring无缝衔接,并且将activemq的信息持久化到mysql数据库中。这里使用queue

一、依赖的jar,这些是开发的必备jar

 A、activemq的jar:activemq-all-5.2.0.jar

 B、xbean.jar:用来解析xsd之类的东东

 C、xbean-spring-3.1.jar

 

二、开发步骤

 1、用来存储消息的pojo,一定要序列化

package com.hc360.components.jms;

import java.io.Serializable;

/**
* 用来存储我们要发送消息的pojo
* */
public class InvokeMessage implements Serializable{

private static final long serialVersionUID = 2L;

private String name;

private String operate;

private String msg;

public String getMsg() {
return msg;
}

public void setMsg(String msg) {
this.msg = msg;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getOperate() {
return operate;
}

public void setOperate(String operate) {
this.operate = operate;
}

}


 2、消息产生器,用来向队列发送消息:Producter

package com.hc360.components.jms;

import javax.jms.Queue;
import org.springframework.jms.core.JmsTemplate;

/**
* 发送消息
* @author kongqz
*/
public class InvokeMessageProducer {

private JmsTemplate template;

private Queue destination;

public void setTemplate(JmsTemplate template) {
this.template = template;
}

public void setDestination(Queue destination) {
this.destination = destination;
}

public void send(InvokeMessage invokeMessage) {
template.convertAndSend(this.destination, invokeMessage);
}

}


 3、将pojo转化成可以发送到队列的形式:MessageConverter

package com.hc360.components.jms;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.command.ActiveMQObjectMessage;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;

/**
* @see MessageConverter
* 将要发送的pojo转化成activemq可以辨识的类型
*/
public class InvokeMessageConverter implements MessageConverter{
//接收消息时候使用
public Object fromMessage(Message msg) throws JMSException, MessageConversionException {
if (msg instanceof ObjectMessage) {
if (msg instanceof ObjectMessage) {
HashMap<String, byte[]> map = (HashMap<String, byte[]>) ((ObjectMessage) msg).getObjectProperty("Map");
try {
// POJO must implements Seralizable
ByteArrayInputStream bis = new ByteArrayInputStream(map.get("InvokeMessage"));
ObjectInputStream ois = new ObjectInputStream(bis);
Object returnObject = ois.readObject();
return returnObject;
} catch (IOException e) {
System.out.println("fromMessage(Message)");
e.printStackTrace();

} catch (ClassNotFoundException e) {
System.out.println("fromMessage(Message)");
e.printStackTrace();
}
}

} else {
throw new JMSException("Msg:[" + msg + "] is not Map");
}
return null;
}
//发送消息时候使用
public Message toMessage(Object obj, Session session) throws JMSException, MessageConversionException {

if (obj instanceof InvokeMessage) {
ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) session.createObjectMessage();
Map<String, byte[]> map = new HashMap<String, byte[]>();
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
map.put("InvokeMessage", bos.toByteArray());
} catch (IOException e) {
e.printStackTrace();
}
objMsg.setObjectProperty("Map", map);
return objMsg;
} else {
throw new JMSException("Object:[" + obj + "] is not InvokeMessage");
}
}
}


 4、消息处理器:Consumer

package com.hc360.components.jms;

/**
*消费消息
* @author kongqz
*/
public class InvokeMessageConsumer{

/**
* @author Administrator
* */
public void printMyOut(InvokeMessage invokeMessage) {
System.out.println("等待1秒再处理");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {

e.printStackTrace();
}
System.out.println("执行业务操作["+invokeMessage.getName()+"],["+invokeMessage.getOperate()+"],["+invokeMessage.getMsg()+"]");

}

}


 5、调用jms发送消息,将下边的代码嵌入到相关的触发地。通过我们consumer的延迟处理,我们的队列被处理效果将很容易在控制台看到

 
try{
for(int i=0;i<200;i++){
//准备发送jms消息
InvokeMessage im = new  InvokeMessage();
im.setMsg(i+":有人查询用户列表了!["+new Date()+"]");
im.setName("当前系统通知");
im.setOperate("查询!");
this.getInvokeMessageProducer().send(im);
}

}catch(Exception e){
System.out.println("发送系统消息出错");
e.printStackTrace();
}


6、相关配置文件(我将内置的activemq5.2服务器直接写入到一个配置文件中,并将相关的jms配置放到一起)

<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd"> <!--
推荐版本,使用spring的listenerContainer,消息用数据库持久化保存,服务器重启不会丢失 http://activemq.apache.org/spring-support.html -->
<!--  embedded ActiveMQ Broker,内置的ActiveMq服务器 -->
<amq:broker useJmx="false" persistent="true">
<amq:persistenceAdapter>
<amq:jdbcPersistenceAdapter id="jdbcAdapter" dataSource="#mysql-ds" createTablesOnStartup="true"
useDatabaseLock="true"/>

<!--
Mysql can setup useDatabaseLock="true",this is defualt
HSQLDB,MSSQL plz setup useDatabaseLock="false",
if u setup useDatabaseLock="true",u will catch error:
MSSQL Error Info:FOR UPDATE clause allowed only for DECLARE CURSOR
HSQLDB Error Info:FOR in statement [SELECT * FROM ACTIVEMQ_LOCK FOR UPDATE]

see http://www.nabble.com/ActiveMQ-JDBC-Persistence-with-SQL-Server-tf2022248.html#a5560296 -->

</amq:persistenceAdapter>
<!-- 开发给外部的链接使用tcp方式 -->
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:0"/>
</amq:transportConnectors>
</amq:broker>

<!--  ActiveMQ connectionFactory  -->
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost"/>

<!--  ActiveMQ destinations,队列名称  -->
<amq:queue name="destination" physicalName="org.apache.activemq.spring.Test.spring.embedded"/>

<!-- The mysql Datasource that will be used by the Broker -->
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">

<property name="driverClassName" value="${jdbc.driverClassName}"/>
<property name="url">
<value>${jdbc.activemq.url}</value>
<!-- mysql version
<value>jdbc:mysql://localhost/myproject?relaxAutoCommit=true</value>
-->
</property>
<property name="username" value="${jdbc.username}"/>
<property name="password" value="${jdbc.password}"/>
<property name="poolPreparedStatements" value="true"/>
</bean>

<!--  Spring JmsTemplate config -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<!--  lets wrap in a pool to avoid creating a connection per send -->
<bean class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
</bean>
</property>
<!-- custom MessageConverter -->
<property name="messageConverter" ref="invokeMessageConverter"/>
</bean>

<!--  invokeMessage converter  -->
<bean id="invokeMessageConverter" class="com.hc360.components.jms.InvokeMessageConverter"/>

<!-- POJO which send Message uses  Spring JmsTemplate -->
<bean id="invokeMessageProducer" class="com.hc360.components.jms.InvokeMessageProducer">
<property name="template" ref="jmsTemplate"/>
<property name="destination" ref="destination"/>
</bean>

<!--  Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="com.hc360.components.jms.InvokeMessageConsumer">
</bean>
</constructor-arg>
<!--  may be other method -->
<property name="defaultListenerMethod" value="printMyOut"/>
<!-- custom MessageConverter define -->
<property name="messageConverter" ref="invokeMessageConverter"/>
</bean>

<!--  listener container,MDP无需实现接口 -->
<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
</beans>


7、相关的property配置文件(我的持久化jdbc和ssh项目用的jdbc都用单独的配置文件管理)

jdbc.driverClassName=com.mysql.jdbc.Driver
jdbc.url=jdbc:mysql://localhost:3306/hdssh?useUnicode=true&characterEncoding=UTF-8&jdbcCompliantTruncation=false
jdbc.username=root
jdbc.password=spring_framework

jdbc.activemq.url=jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true&useUnicode=true&characterEncoding=UTF-8&jdbcCompliantTruncation=false

hibernate.dialect=org.hibernate.dialect.MySQLDialect

hibernate.jdbc.batch_size=25
hibernate.jdbc.fetch_size=50
hibernate.show_sql=true
hibernate.hbm2ddl.auto=update


 

三、常见问题处理

 1、用mysql的时候无法直接生成相关监控表ACTIVEMQ_ACKS没有创建(mysql5)。这个可能是因为你将表的默认字符集设置为utf-8,如果你使用latin1就不会有这个问题。将activemq的持久化到mysql数据库的时候一定要设定合适的字符集
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息