您的位置:首页 > 产品设计 > UI/UE

activemq queue开发(持久化方式)

2010-04-17 00:59 330 查看
公司要求activemq通信,我经过三天的努力,今天算是有所前进。现将我对activemq的认识总结如下:
要使用activemq进行通信,就必须开启一个broker, 他可以理解为管理通信连接的东东。关于broker的启动,有两种方式:一是activemq文件内的bin/activemq.bat文件。这种启动方式比较适合多用户的开发。另一种就是在我们的java代码调用activemq相关的类来构造并启动brokerService。
第二个问题就是信息的持久化问题,这个要达到的最终效果是只要信息发送成功,但没有被消费掉,不论出现什么情况(最终导致activemq关闭),当activemq重新启动时没有被消费的信息仍然存在与消息队列里。
要实现这个效果需要两步:一是activemq在消息被消费时要存进文件或数据库,而不是在内存里。二是发送到消息要指明实现持久化。第一步的实现有多个方式(具体o就不说了),但大体上就是存进log文件和存进本地数据库两种。关于activemq文件配置的实现o就不提了。o要提的是通过java代码来实现数据的持久化。
代码如下:
BrokerService broker=new BrokerService();

broker.setBrokerName(brokerName); //设置信息持久化

//定义datasource

DataSource orclds=(DataSource) new BasicDataSource();

((BasicDataSource)orclds).setPoolPreparedStatements(true);

((BasicDataSource)orclds).setMaxActive(200);

((BasicDataSource)orclds).setDriverClassName(" ");//数据库驱动

((BasicDataSource)orclds).setUrl(" ");//数据库地址

((BasicDataSource)orclds).setUsername("×××");//数据库用户名
((BasicDataSource)orclds).setPassword("×××");//数据库用户密码

//定义JDBCPersistenceAdapter

PersistenceAdapter jdbcperAdapter=new JDBCPersistenceAdapter();

((JDBCPersistenceAdapter)jdbcperAdapter).setBrokerService(broker);

((DataSourceSupport)jdbcperAdapter).setDataSource(orclds);

broker.setPersistenceAdapter(jdbcperAdapter);

broker.setPersistent(true);

broker.addConnector(" ");//添加activemq连接

broker.start();

Object lock = new Object();

synchronized (lock) {

lock.wait();

}
如果要使用的是mysql数据库可能还有几点细节要注意。
在消息发送端,要设置消息生产者为Persistence的。
producer.setDeliveryMode(DeliveryMode.PERSISTENT);//消息持久化

当然,还可以通过把activemq文件拿到我们本地工程下来配置:
BrokerService broker = BrokerFactory.createBroker(new URI("xbean:com/activemq.xml"));

关于activemq获取连接发送消息的方式:一个是工厂模式
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = connectionFactory.createConnection();

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

destination = session.createQueue(subject);

producer = session.createProducer(destination);
producer.send(messagep);
另一个是jndi方式:
InitialContext initCtx = new InitialContext();
Context envContext = (Context) initCtx.lookup("java:comp/env");
ConnectionFactory connectionFactory = (ConnectionFactory) envContext.lookup("jms/NormalConnectionFactory");
Connection connection = connectionFactory.createConnection();
Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = jmsSession.createProducer((Destination) envContext.lookup("jms/topic/MyTopic"));
//设置持久方式
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

关于消息的管理activemq提供了三个message类TextMessage,MapMessage,ObjectMessage
extmessage:
Message testMessage = jmsSession.createMessage();
//发布刷新文章消息
testMessage.setStringProperty("RefreshArticleId", "2046");
producer.send(testMessage);
//发布刷新帖子消息
testMessage.clearProperties();
testMessage.setStringProperty("RefreshThreadId", "331");
mapmessage:
MapMessage messagep = session.createMapMessage();<BR> //构造消息头 <BR> //messagep.setStringProperty("SENSE", "BTNM 3.0");<BR> //messagep.setStringProperty("CLASS"," ComputerSystem");<BR> //构造消息体<BR> //messagep.setString("CPULOAD:NO=0, ID=182910293","0.1");
关于object的o还没有研究。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: