ActiveMQ 生产者和消费者demo
2015-02-12 16:54
387 查看
http://www.cnblogs.com/hoojo/p/active_mq_jms_apache_activeMQ.html,这个写的不错。
生产者代码
这里是基于queue的,支持事务,并且持久化的。
JMS消息确认机制
JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:
Session.AUTO_ACKNOWLEDGE。当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
Session.CLIENT_ACKNOWLEDGE。
客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消
费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。
Session.DUPS_ACKNOWLEDGE。 该选择只是会话迟钝第确认消息的提交。如果JMS provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider必须把消息头的JMSRedelivered字段设置为true。
ActiveMQ消息确认机制
ActiveMQSession,实现了JMS的session,QueueSession, TopicSession
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE 每条消息都必须显式调用acknowledge方法确认消息。
消息持久性
JMS 支持以下两种消息提交模式:
DeliveryMode.PERSISTENT 指示JMS provider持久保存消息,以保证消息不会因为JMS provider的失败而丢失。 消息持久化在硬盘中,ActiveMQ持久化有三种方式:AMQ、KahaDB、JDBC。
DeliveryMode.NON_PERSISTENT 不要求JMS provider持久保存消息,消息存放在内存中,读写速度快,在JMS服务停止后消息会消失,没有持久化到硬盘。
生产者代码
package org.mule.util.ansyLog; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.pool.PooledConnectionFactory; /** * JMS消息生产者 * @author * */ public class JMSProducer implements ExceptionListener{ //设置连接的最大连接数 public final static int DEFAULT_MAX_CONNECTIONS=5; private int maxConnections = DEFAULT_MAX_CONNECTIONS; //设置每个连接中使用的最大活动会话数 private int maximumActiveSessionPerConnection = DEFAULT_MAXIMUM_ACTIVE_SESSION_PER_CONNECTION; public final static int DEFAULT_MAXIMUM_ACTIVE_SESSION_PER_CONNECTION=300; //线程池数量 private int threadPoolSize = DEFAULT_THREAD_POOL_SIZE; public final static int DEFAULT_THREAD_POOL_SIZE=50; //强制使用同步返回数据的格式 private boolean useAsyncSendForJMS = DEFAULT_USE_ASYNC_SEND_FOR_JMS; public final static boolean DEFAULT_USE_ASYNC_SEND_FOR_JMS=true; //是否持久化消息 private boolean isPersistent = DEFAULT_IS_PERSISTENT; public final static boolean DEFAULT_IS_PERSISTENT=true; //连接地址 private String brokerUrl; private String userName; private String password; private ExecutorService threadPool; private PooledConnectionFactory connectionFactory; public JMSProducer(String brokerUrl, String userName, String password) { this(brokerUrl, userName, password, DEFAULT_MAX_CONNECTIONS, DEFAULT_MAXIMUM_ACTIVE_SESSION_PER_CONNECTION, DEFAULT_THREAD_POOL_SIZE, DEFAULT_USE_ASYNC_SEND_FOR_JMS, DEFAULT_IS_PERSISTENT); } public JMSProducer(String brokerUrl, String userName, String password, int maxConnections, int maximumActiveSessionPerConnection, int threadPoolSize,boolean useAsyncSendForJMS, boolean isPersistent) { this.useAsyncSendForJMS = useAsyncSendForJMS; this.isPersistent = isPersistent; this.brokerUrl = brokerUrl; this.userName = userName; this.password = password; this.maxConnections = maxConnections; this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection; this.threadPoolSize = threadPoolSize; init(); } private void init() { //设置J***A线程池 this.threadPool = Executors.newFixedThreadPool(this.threadPoolSize); //ActiveMQ的连接工厂 ActiveMQConnectionFactory actualConnectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerUrl); actualConnectionFactory.setUseAsyncSend(this.useAsyncSendForJMS); //Active中的连接池工厂 this.connectionFactory = new PooledConnectionFactory(actualConnectionFactory); this.connectionFactory.setCreateConnectionOnStartup(true); this.connectionFactory.setMaxConnections(this.maxConnections); this.connectionFactory.setMaximumActiveSessionPerConnection(this.maximumActiveSessionPerConnection); } /** * 执行发送消息的具体方法 * @param queue * @param map */ public void send(final String queue, final Map<String, Object> map) { //直接使用线程池来执行具体的调用 this.threadPool.execute(new Runnable(){ @Override public void run() { try { sendMsg(queue,map); } catch (Exception e) { e.printStackTrace(); } } }); } /** * 执行发送消息的具体方法 * @param queue * @param map */ public void send(final String queue, final String message) { //直接使用线程池来执行具体的调用 this.threadPool.execute(new Runnable(){ @Override public void run() { try { sendMsg(queue,message); } catch (Exception e) { e.printStackTrace(); } } }); } /** * 真正的执行消息发送 * @param queue * @param map * @throws Exception */ private void sendMsg(String queue, String msg) throws Exception { Connection connection = null; Session session = null; try { //从连接池工厂中获取一个连接 connection = this.connectionFactory.createConnection(); /*createSession(boolean transacted,int acknowledgeMode) transacted - indicates whether the session is transacted acknowledgeMode - indicates whether the consumer or the client will acknowledge any messages it receives; ignored if the session is transacted. Legal values are Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, and Session.DUPS_OK_ACKNOWLEDGE. */ //false 参数表示 为非事务型消息,后面的参数表示消息的确认类型 // session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); session = connection.createSession(Boolean.TRUE, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); //true 开启事务,需要客户端确认 // session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE); //Destination is superinterface of Queue //PTP消息方式 Destination destination = session.createQueue(queue); //Creates a MessageProducer to send messages to the specified destination MessageProducer producer = session.createProducer(destination); //set delevery mode producer.setDeliveryMode(this.isPersistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); //map convert to javax message TextMessage message = session.createTextMessage(msg); producer.send(message); session.commit(); } finally { closeSession(session); closeConnection(connection); } } /** * 真正的执行消息发送 * @param queue * @param map * @throws Exception */ private void sendMsg(String queue, Map<String, Object> map) throws Exception { Connection connection = null; Session session = null; try { //从连接池工厂中获取一个连接 connection = this.connectionFactory.createConnection(); //false 参数表示 为非事务型消息,后面的参数表示消息的确认类型 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //PTP消息方式 Destination destination = session.createQueue(queue); //Creates a MessageProducer to send messages to the specified destination MessageProducer producer = session.createProducer(destination); //set delevery mode producer.setDeliveryMode(this.isPersistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); //map convert to javax message Message message = getMessage(session, map); producer.send(message); } finally { closeSession(session); closeConnection(connection); } } private Message getMessage(Session session, Map<String, Object> map) throws JMSException { MapMessage message = session.createMapMessage(); if (map != null && !map.isEmpty()) { Set<String> keys = map.keySet(); for (String key : keys) { message.setObject(key, map.get(key)); } } return message; } private void closeSession(Session session) { try { if (session != null) { session.close(); } } catch (Exception e) { e.printStackTrace(); } } private void closeConnection(Connection connection) { try { if (connection != null) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } @Override public void onException(JMSException e) { e.printStackTrace(); } }消费者代码,这里只贴出main方法里的内容
// ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 // Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // 消费者,消息接收者 MessageConsumer consumer = null; connectionFactory = new ActiveMQConnectionFactory( USERNAME,PASSWORD,ACTIVE_URL); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue(QUEUE_NAME_LOG); consumer = session.createConsumer(destination); while (true) { TextMessage message = (TextMessage) consumer.receive(); if (null != message) { System.out.println("--------------------"+message.getText()); JSONObject json = (JSONObject) JSON.parse(message.getText()); //保存到数据库 System.out.println(json.getString("inPro")); message.acknowledge(); } else { break; } } } catch (Exception e) { e.printStackTrace(); }
这里是基于queue的,支持事务,并且持久化的。
JMS消息确认机制
JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:
Session.AUTO_ACKNOWLEDGE。当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
Session.CLIENT_ACKNOWLEDGE。
客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消
费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。
Session.DUPS_ACKNOWLEDGE。 该选择只是会话迟钝第确认消息的提交。如果JMS provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider必须把消息头的JMSRedelivered字段设置为true。
ActiveMQ消息确认机制
ActiveMQSession,实现了JMS的session,QueueSession, TopicSession
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE 每条消息都必须显式调用acknowledge方法确认消息。
消息持久性
JMS 支持以下两种消息提交模式:
DeliveryMode.PERSISTENT 指示JMS provider持久保存消息,以保证消息不会因为JMS provider的失败而丢失。 消息持久化在硬盘中,ActiveMQ持久化有三种方式:AMQ、KahaDB、JDBC。
DeliveryMode.NON_PERSISTENT 不要求JMS provider持久保存消息,消息存放在内存中,读写速度快,在JMS服务停止后消息会消失,没有持久化到硬盘。
相关文章推荐
- activeMQ的创建生产者和消费者的demo(队列模式)
- kafka java 生产者消费者demo
- Java 写一个生产者和消费者的多线程Demo
- Disruptor多个消费者独立处理生产者消息的简单demo
- Java多线程之生产者消费者demo
- kafka-3python生产者和消费者实用demo
- Disruptor多个消费者不重复处理生产者发送的消息的demo
- 在Windows环境中安装并使用kafka以及生产者消费者Demo
- Kafka 生产者和消费者 demo (java&scala)
- RocketMQ生产者消费者DEMO
- activemq消息生产者与消息消费者简单例子
- 简单的activemq,生产者和消费者代码
- 分布式服务框架学习笔记9 ActiveMQ入门2 管理、生产者/消费者模式
- spring JMS、activemq中消费者收不到生产者发送的消息的原因解析
- 【消息队列】ActiveMQ的简单实例 - 生产者消费者模式
- 生产者与消费者--demo1---bai
- 【Active入门-2】ActiveMQ学习-生产者与消费者
- 生产者与消费者---demo2---boke
- activemq生产者和消费者的双向通信
- Rocketmq生产者和push消费者demo