您的位置:首页 > 其它

详解RocketMQ中的consumer

2016-04-30 15:08 405 查看
上一篇博客着重讲解了一下RocketMQ中的Producer,那么接下来这篇博客来带大家来了解一下RocketMQ中的Consumer角色 


 上述就是MQ中有关Consumer的类图,下面来介绍一下每个类
 1.MQAdmin:底层类,上篇博客已经提过,就不再此重提 2.MQConsumer:Consumer公共的接口,常用的方法如下
 如果消费失败的话,消息将会返回到broker中,并且延迟一会消费的时间   void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)  throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
 3.MQPushConsumer:Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Listener接口方法
4.MQPullConsumer:Consumer的一种,应用通常主动调用Consumer的拉消息方法从Broker拉消息,主动权由应用控制
 在上图中出现了两类的消费者分别是PushConsumer和PullConsumer,下面来看一下
 PushConsumer:通过注册监听的方式来消费信息 [java] view plain copy print?<span style="font-family:Comic Sans MS;font-size:18px;">/**      
 * @FileName: Consumer.java    
 * @Package:com.test    
 * @Description: TODO   
 * @author: LUCKY     
 * @date:2015年12月28日 下午2:43:23    
 * @version V1.0      
 */  
package com.test;  
  
import java.util.List;  
  
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
import com.alibaba.rocketmq.common.message.Message;  
import com.alibaba.rocketmq.common.message.MessageExt;  
  
/** 
 * @ClassName: Consumer 
 * @Description: 模拟消费者 
 * @author: LUCKY 
 * @date:2015年12月28日 下午2:43:23 
 */  
public class ConsumerTest {  
  
    public static void main(String[] args) {  
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("broker-a");  
        consumer.setNamesrvAddr("100.66.154.81:9876");  
        try {  
              
            // 订阅PushTopic下Tag为push的消息,都订阅消息  
            consumer.subscribe("PushTopic", "push");  
              
            // 程序第一次启动从消息队列头获取数据  
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
            //可以修改每次消费消息的数量,默认设置是每次消费一条  
            // consumer.setConsumeMessageBatchMaxSize(10);  
  
            //注册消费的监听  
            consumer.registerMessageListener(new MessageListenerConcurrently() {  
               //在此监听中消费信息,并返回消费的状态信息  
                public ConsumeConcurrentlyStatus consumeMessage(  
                        List<MessageExt> msgs,  
                        ConsumeConcurrentlyContext context) {  
                      
                    // msgs中只收集同一个topic,同一个tag,并且key相同的message  
                    // 会把不同的消息分别放置到不同的队列中  
                    for(Message msg:msgs){  
              
                        System.out.println(new String(msg.getBody()));  
                    }     
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
                }  
            });  
  
            consumer.start();  
            Thread.sleep(5000);  
            //5秒后挂载消费端消费  
            consumer.suspend();  
              
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
}  
</span>  

 PullConsumer:通过拉去的方式来消费消息[java] view plain copy print?<span style="font-family:Comic Sans MS;font-size:18px;">/**      
 * @FileName: Consumer.java    
 * @Package:com.test    
 * @Description: TODO   
 * @author: LUCKY     
 * @date:2015年12月28日 下午2:43:23    
 * @version V1.0      
 */  
package com.test;  
  
import java.util.Set;  
  
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;  
import com.alibaba.rocketmq.client.consumer.MessageQueueListener;  
import com.alibaba.rocketmq.common.message.MessageQueue;  
  
/** 
 * @ClassName: Consumer 
 * @Description: 模拟消费者 
 * @author: LUCKY 
 * @date:2015年12月28日 下午2:43:23 
 */  
public class ConsumerPullTest {  
  
    public static void main(String[] args) {  
        DefaultMQPullConsumer consumer=new DefaultMQPullConsumer();  
        consumer.setNamesrvAddr("100.66.154.81:9876");  
       consumer.setConsumerGroup("broker");  
        try {  
            consumer.start();  
        Set<MessageQueue> messageQueues=  consumer.fetchSubscribeMessageQueues("PushTopic");        
  
        for(MessageQueue messageQueue:messageQueues){  
          
            System.out.println(messageQueue.getTopic());  
        }  
          
          
        //消息队列的监听  
        consumer.registerMessageQueueListener("", new MessageQueueListener() {  
              
            @Override  
            //消息队列有改变,就会触发  
            public void messageQueueChanged(String topic, Set<MessageQueue> mqAll,  
                    Set<MessageQueue> mqDivided) {  
                // TODO Auto-generated method stub  
                  
            }  
        });  
              
      
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
}  
</span>  

一般在应用中都会采用push的方法来自动的消费信息
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: