activeMq实战
2015-11-18 13:18
260 查看
jms模式为queues
如果消费者的session应答模式为AUTO_ACKNOWLEDGE,则javax.jms.MessageListener.onMessage(Message)出现异常会进入ActiveMQ.DLQ(死信队列)详见截图:
如果消费者的session应答模式为CLIENT_ACKNOWLEDGE,则javax.jms.MessageListener.onMessage(Message)手动调用 message.acknowledge();activemq才会标记已处理,除非该消费者断开连接消息重新发给其他消费者处理。
相关代码和环境如下:
centos7
jkd7
activemq5
maven3.2.5
生产者代码:
消费者代码:
代码包下载:http://pan.baidu.com/s/1kTuA9eB
如果消费者的session应答模式为AUTO_ACKNOWLEDGE,则javax.jms.MessageListener.onMessage(Message)出现异常会进入ActiveMQ.DLQ(死信队列)详见截图:
如果消费者的session应答模式为CLIENT_ACKNOWLEDGE,则javax.jms.MessageListener.onMessage(Message)手动调用 message.acknowledge();activemq才会标记已处理,除非该消费者断开连接消息重新发给其他消费者处理。
相关代码和环境如下:
centos7
jkd7
activemq5
maven3.2.5
生产者代码:
package com.bj58.jms.produce; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ProducerApp { private static final Logger LOGGER = LoggerFactory .getLogger(ProducerApp.class); private static final String BROKER_URL = "tcp://192.168.146.128:61616"; private static final String SUBJECT = "demo"; public static void main(String[] args) throws JMSException { // 初始化连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( BROKER_URL); // 获得连接 Connection conn = connectionFactory.createConnection(); // 启动连接 conn.start(); // 创建Session,此方法第一个参数表示会话是否在事务中执行,第二个参数设定会话的应答模式 Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列 Destination dest = session.createQueue(SUBJECT); // createTopic方法用来创建Topic // session.createTopic("TOPIC"); // 通过session可以创建消息的生产者 MessageProducer producer = session.createProducer(dest); for (int i = 0; i < 10; i++) { // 初始化一个mq消息 TextMessage message = session .createTextMessage("sad轻轻巧巧fasmq 中文" + i); // 发送消息 producer.send(message); LOGGER.info("send message {}", i); } // 关闭mq连接 conn.close(); } }
消费者代码:
package com.bj58.jms.consumer; import org.apache.activemq.ActiveMQConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.*; public class ConsumerApp implements MessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerApp.class); private static final String BROKER_URL = "tcp://192.168.146.128:61616"; private static final String SUBJECT = "demo"; public static void main(String[] args) throws JMSException { //初始化ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); //创建mq连接 Connection conn = connectionFactory.createConnection(); //启动连接 conn.start(); //创建会话 Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); //通过会话创建目标 Destination dest = session.createQueue(SUBJECT); //创建mq消息的消费者 MessageConsumer consumer = session.createConsumer(dest); //初始化MessageListener ConsumerApp me = new ConsumerApp(); //给消费者设定监听对象 consumer.setMessageListener(me); } public void onMessage(Message message) { TextMessage txtMessage = (TextMessage)message; int i=0; int [] arr={1,2}; // if(i==0){ // i=arr[2]; // } try { LOGGER.info ("get message " + txtMessage.getText()); //告诉activemq 成功处理消息 message.acknowledge(); } catch (JMSException e) { LOGGER.error("error {}", e); } } }
代码包下载:http://pan.baidu.com/s/1kTuA9eB
相关文章推荐
- Java线程创建详解
- 经得起雷劈:关于double和int/long相互转换失去精度计算错误的问题
- oracle增加表空间的四种方法 - Oracle
- koala不支持中文的解决办法(问题出现在使用中文字体时报错)
- 第1章 ビッグデータとデータの活用
- 第1章 ビッグデータとデータの活用
- 传递一个父id返回所有子id的用法,可用于删除父级以下的所有子级
- Windows下error LNK2019: 无法解析的外部符号 __imp__select@20
- nginx配置拒绝直接访问js文件
- Window下memcached安装与测试步骤
- 多核计算与并发编程 语言篇
- 将一个数组中奇数放在数组前边,偶数放在数组后边
- Linux文件目录详解
- linux下redis的安装
- hdu 1106 排序
- 中文pppoe中文拨号的解决方案
- Linux防火墙设置
- location.search
- hdu 1106 排序
- select count(*)和select count(1)的区别