activemq queue开发(持久化方式)
2010-04-17 00:59
330 查看
公司要求activemq通信,我经过三天的努力,今天算是有所前进。现将我对activemq的认识总结如下:
要使用activemq进行通信,就必须开启一个broker, 他可以理解为管理通信连接的东东。关于broker的启动,有两种方式:一是activemq文件内的bin/activemq.bat文件。这种启动方式比较适合多用户的开发。另一种就是在我们的java代码调用activemq相关的类来构造并启动brokerService。
第二个问题就是信息的持久化问题,这个要达到的最终效果是只要信息发送成功,但没有被消费掉,不论出现什么情况(最终导致activemq关闭),当activemq重新启动时没有被消费的信息仍然存在与消息队列里。
要实现这个效果需要两步:一是activemq在消息被消费时要存进文件或数据库,而不是在内存里。二是发送到消息要指明实现持久化。第一步的实现有多个方式(具体o就不说了),但大体上就是存进log文件和存进本地数据库两种。关于activemq文件配置的实现o就不提了。o要提的是通过java代码来实现数据的持久化。
代码如下:
BrokerService broker=new BrokerService();
broker.setBrokerName(brokerName); //设置信息持久化
//定义datasource
DataSource orclds=(DataSource) new BasicDataSource();
((BasicDataSource)orclds).setPoolPreparedStatements(true);
((BasicDataSource)orclds).setMaxActive(200);
((BasicDataSource)orclds).setDriverClassName(" ");//数据库驱动
((BasicDataSource)orclds).setUrl(" ");//数据库地址
((BasicDataSource)orclds).setUsername("×××");//数据库用户名
((BasicDataSource)orclds).setPassword("×××");//数据库用户密码
//定义JDBCPersistenceAdapter
PersistenceAdapter jdbcperAdapter=new JDBCPersistenceAdapter();
((JDBCPersistenceAdapter)jdbcperAdapter).setBrokerService(broker);
((DataSourceSupport)jdbcperAdapter).setDataSource(orclds);
broker.setPersistenceAdapter(jdbcperAdapter);
broker.setPersistent(true);
broker.addConnector(" ");//添加activemq连接
broker.start();
Object lock = new Object();
synchronized (lock) {
lock.wait();
}
如果要使用的是mysql数据库可能还有几点细节要注意。
在消息发送端,要设置消息生产者为Persistence的。
producer.setDeliveryMode(DeliveryMode.PERSISTENT);//消息持久化
。
当然,还可以通过把activemq文件拿到我们本地工程下来配置:
BrokerService broker = BrokerFactory.createBroker(new URI("xbean:com/activemq.xml"));
关于activemq获取连接发送消息的方式:一个是工厂模式
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
producer = session.createProducer(destination);
producer.send(messagep);
另一个是jndi方式:
InitialContext initCtx = new InitialContext();
Context envContext = (Context) initCtx.lookup("java:comp/env");
ConnectionFactory connectionFactory = (ConnectionFactory) envContext.lookup("jms/NormalConnectionFactory");
Connection connection = connectionFactory.createConnection();
Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = jmsSession.createProducer((Destination) envContext.lookup("jms/topic/MyTopic"));
//设置持久方式
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
关于消息的管理activemq提供了三个message类TextMessage,MapMessage,ObjectMessage
extmessage:
Message testMessage = jmsSession.createMessage();
//发布刷新文章消息
testMessage.setStringProperty("RefreshArticleId", "2046");
producer.send(testMessage);
//发布刷新帖子消息
testMessage.clearProperties();
testMessage.setStringProperty("RefreshThreadId", "331");
mapmessage:
MapMessage messagep = session.createMapMessage();<BR> //构造消息头 <BR> //messagep.setStringProperty("SENSE", "BTNM 3.0");<BR> //messagep.setStringProperty("CLASS"," ComputerSystem");<BR> //构造消息体<BR> //messagep.setString("CPULOAD:NO=0, ID=182910293","0.1");
关于object的o还没有研究。
要使用activemq进行通信,就必须开启一个broker, 他可以理解为管理通信连接的东东。关于broker的启动,有两种方式:一是activemq文件内的bin/activemq.bat文件。这种启动方式比较适合多用户的开发。另一种就是在我们的java代码调用activemq相关的类来构造并启动brokerService。
第二个问题就是信息的持久化问题,这个要达到的最终效果是只要信息发送成功,但没有被消费掉,不论出现什么情况(最终导致activemq关闭),当activemq重新启动时没有被消费的信息仍然存在与消息队列里。
要实现这个效果需要两步:一是activemq在消息被消费时要存进文件或数据库,而不是在内存里。二是发送到消息要指明实现持久化。第一步的实现有多个方式(具体o就不说了),但大体上就是存进log文件和存进本地数据库两种。关于activemq文件配置的实现o就不提了。o要提的是通过java代码来实现数据的持久化。
代码如下:
BrokerService broker=new BrokerService();
broker.setBrokerName(brokerName); //设置信息持久化
//定义datasource
DataSource orclds=(DataSource) new BasicDataSource();
((BasicDataSource)orclds).setPoolPreparedStatements(true);
((BasicDataSource)orclds).setMaxActive(200);
((BasicDataSource)orclds).setDriverClassName(" ");//数据库驱动
((BasicDataSource)orclds).setUrl(" ");//数据库地址
((BasicDataSource)orclds).setUsername("×××");//数据库用户名
((BasicDataSource)orclds).setPassword("×××");//数据库用户密码
//定义JDBCPersistenceAdapter
PersistenceAdapter jdbcperAdapter=new JDBCPersistenceAdapter();
((JDBCPersistenceAdapter)jdbcperAdapter).setBrokerService(broker);
((DataSourceSupport)jdbcperAdapter).setDataSource(orclds);
broker.setPersistenceAdapter(jdbcperAdapter);
broker.setPersistent(true);
broker.addConnector(" ");//添加activemq连接
broker.start();
Object lock = new Object();
synchronized (lock) {
lock.wait();
}
如果要使用的是mysql数据库可能还有几点细节要注意。
在消息发送端,要设置消息生产者为Persistence的。
producer.setDeliveryMode(DeliveryMode.PERSISTENT);//消息持久化
。
当然,还可以通过把activemq文件拿到我们本地工程下来配置:
BrokerService broker = BrokerFactory.createBroker(new URI("xbean:com/activemq.xml"));
关于activemq获取连接发送消息的方式:一个是工厂模式
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
producer = session.createProducer(destination);
producer.send(messagep);
另一个是jndi方式:
InitialContext initCtx = new InitialContext();
Context envContext = (Context) initCtx.lookup("java:comp/env");
ConnectionFactory connectionFactory = (ConnectionFactory) envContext.lookup("jms/NormalConnectionFactory");
Connection connection = connectionFactory.createConnection();
Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = jmsSession.createProducer((Destination) envContext.lookup("jms/topic/MyTopic"));
//设置持久方式
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
关于消息的管理activemq提供了三个message类TextMessage,MapMessage,ObjectMessage
extmessage:
Message testMessage = jmsSession.createMessage();
//发布刷新文章消息
testMessage.setStringProperty("RefreshArticleId", "2046");
producer.send(testMessage);
//发布刷新帖子消息
testMessage.clearProperties();
testMessage.setStringProperty("RefreshThreadId", "331");
mapmessage:
MapMessage messagep = session.createMapMessage();<BR> //构造消息头 <BR> //messagep.setStringProperty("SENSE", "BTNM 3.0");<BR> //messagep.setStringProperty("CLASS"," ComputerSystem");<BR> //构造消息体<BR> //messagep.setString("CPULOAD:NO=0, ID=182910293","0.1");
关于object的o还没有研究。
相关文章推荐
- springMVC+activemq整合注解方式
- iOS开发之数据持久化(iOS中常用的四种数据存储方式)
- ActiveMQ BrokeUrl的配置和消息持久化配置
- activemq queue使用
- ActiveMQ queue 代码示例
- mq的概念(一种互联网项目开发方式)
- ActiveMQ CMS 开发环境编译
- iOS开发笔记--数据持久化的四种方式
- Spring+ActiveMQ消息持久化,Topic持久化订阅
- ActiveMQ—Queue与Topic区别
- JMS 服务器ActiveMQ Queue和Topic区别
- ActiveMQ LevelDB持久化机制
- activemq+spring 持久化发送消息
- Asterisk队列(Queue)振铃方式(ring strategy) - [CTI开发]
- spring+activemq 关闭持久化
- activemq+spring 持久化发送消息
- MQ-传输方式Topic和Queue的对比
- mq topic持久化订阅者(topic、queue的producer.setDeliveryMode(DeliveryMode. PERSISTENT)是指的mq服务),queue的消费者不在也会给
- spring+activemq 发送10W消息报端口被占用的异常分析以及topic持久化订阅
- ActiveMQ Queue和Topic