详解RocketMQ中的consumer
2016-04-30 15:08
405 查看
上一篇博客着重讲解了一下RocketMQ中的Producer,那么接下来这篇博客来带大家来了解一下RocketMQ中的Consumer角色
![](https://img-blog.csdn.net/20160104145002253?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
上述就是MQ中有关Consumer的类图,下面来介绍一下每个类
1.MQAdmin:底层类,上篇博客已经提过,就不再此重提 2.MQConsumer:Consumer公共的接口,常用的方法如下
如果消费失败的话,消息将会返回到broker中,并且延迟一会消费的时间 void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
3.MQPushConsumer:Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Listener接口方法
4.MQPullConsumer:Consumer的一种,应用通常主动调用Consumer的拉消息方法从Broker拉消息,主动权由应用控制
在上图中出现了两类的消费者分别是PushConsumer和PullConsumer,下面来看一下
PushConsumer:通过注册监听的方式来消费信息 [java] view plain copy print?<span style="font-family:Comic Sans MS;font-size:18px;">/**
* @FileName: Consumer.java
* @Package:com.test
* @Description: TODO
* @author: LUCKY
* @date:2015年12月28日 下午2:43:23
* @version V1.0
*/
package com.test;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* @ClassName: Consumer
* @Description: 模拟消费者
* @author: LUCKY
* @date:2015年12月28日 下午2:43:23
*/
public class ConsumerTest {
public static void main(String[] args) {
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("broker-a");
consumer.setNamesrvAddr("100.66.154.81:9876");
try {
// 订阅PushTopic下Tag为push的消息,都订阅消息
consumer.subscribe("PushTopic", "push");
// 程序第一次启动从消息队列头获取数据
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//可以修改每次消费消息的数量,默认设置是每次消费一条
// consumer.setConsumeMessageBatchMaxSize(10);
//注册消费的监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
//在此监听中消费信息,并返回消费的状态信息
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// msgs中只收集同一个topic,同一个tag,并且key相同的message
// 会把不同的消息分别放置到不同的队列中
for(Message msg:msgs){
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
Thread.sleep(5000);
//5秒后挂载消费端消费
consumer.suspend();
} catch (Exception e) {
e.printStackTrace();
}
}
}
</span>
PullConsumer:通过拉去的方式来消费消息[java] view plain copy print?<span style="font-family:Comic Sans MS;font-size:18px;">/**
* @FileName: Consumer.java
* @Package:com.test
* @Description: TODO
* @author: LUCKY
* @date:2015年12月28日 下午2:43:23
* @version V1.0
*/
package com.test;
import java.util.Set;
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.MessageQueueListener;
import com.alibaba.rocketmq.common.message.MessageQueue;
/**
* @ClassName: Consumer
* @Description: 模拟消费者
* @author: LUCKY
* @date:2015年12月28日 下午2:43:23
*/
public class ConsumerPullTest {
public static void main(String[] args) {
DefaultMQPullConsumer consumer=new DefaultMQPullConsumer();
consumer.setNamesrvAddr("100.66.154.81:9876");
consumer.setConsumerGroup("broker");
try {
consumer.start();
Set<MessageQueue> messageQueues= consumer.fetchSubscribeMessageQueues("PushTopic");
for(MessageQueue messageQueue:messageQueues){
System.out.println(messageQueue.getTopic());
}
//消息队列的监听
consumer.registerMessageQueueListener("", new MessageQueueListener() {
@Override
//消息队列有改变,就会触发
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll,
Set<MessageQueue> mqDivided) {
// TODO Auto-generated method stub
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
</span>
一般在应用中都会采用push的方法来自动的消费信息
上述就是MQ中有关Consumer的类图,下面来介绍一下每个类
1.MQAdmin:底层类,上篇博客已经提过,就不再此重提 2.MQConsumer:Consumer公共的接口,常用的方法如下
如果消费失败的话,消息将会返回到broker中,并且延迟一会消费的时间 void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
3.MQPushConsumer:Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Listener接口方法
4.MQPullConsumer:Consumer的一种,应用通常主动调用Consumer的拉消息方法从Broker拉消息,主动权由应用控制
在上图中出现了两类的消费者分别是PushConsumer和PullConsumer,下面来看一下
PushConsumer:通过注册监听的方式来消费信息 [java] view plain copy print?<span style="font-family:Comic Sans MS;font-size:18px;">/**
* @FileName: Consumer.java
* @Package:com.test
* @Description: TODO
* @author: LUCKY
* @date:2015年12月28日 下午2:43:23
* @version V1.0
*/
package com.test;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* @ClassName: Consumer
* @Description: 模拟消费者
* @author: LUCKY
* @date:2015年12月28日 下午2:43:23
*/
public class ConsumerTest {
public static void main(String[] args) {
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("broker-a");
consumer.setNamesrvAddr("100.66.154.81:9876");
try {
// 订阅PushTopic下Tag为push的消息,都订阅消息
consumer.subscribe("PushTopic", "push");
// 程序第一次启动从消息队列头获取数据
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//可以修改每次消费消息的数量,默认设置是每次消费一条
// consumer.setConsumeMessageBatchMaxSize(10);
//注册消费的监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
//在此监听中消费信息,并返回消费的状态信息
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// msgs中只收集同一个topic,同一个tag,并且key相同的message
// 会把不同的消息分别放置到不同的队列中
for(Message msg:msgs){
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
Thread.sleep(5000);
//5秒后挂载消费端消费
consumer.suspend();
} catch (Exception e) {
e.printStackTrace();
}
}
}
</span>
PullConsumer:通过拉去的方式来消费消息[java] view plain copy print?<span style="font-family:Comic Sans MS;font-size:18px;">/**
* @FileName: Consumer.java
* @Package:com.test
* @Description: TODO
* @author: LUCKY
* @date:2015年12月28日 下午2:43:23
* @version V1.0
*/
package com.test;
import java.util.Set;
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.MessageQueueListener;
import com.alibaba.rocketmq.common.message.MessageQueue;
/**
* @ClassName: Consumer
* @Description: 模拟消费者
* @author: LUCKY
* @date:2015年12月28日 下午2:43:23
*/
public class ConsumerPullTest {
public static void main(String[] args) {
DefaultMQPullConsumer consumer=new DefaultMQPullConsumer();
consumer.setNamesrvAddr("100.66.154.81:9876");
consumer.setConsumerGroup("broker");
try {
consumer.start();
Set<MessageQueue> messageQueues= consumer.fetchSubscribeMessageQueues("PushTopic");
for(MessageQueue messageQueue:messageQueues){
System.out.println(messageQueue.getTopic());
}
//消息队列的监听
consumer.registerMessageQueueListener("", new MessageQueueListener() {
@Override
//消息队列有改变,就会触发
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll,
Set<MessageQueue> mqDivided) {
// TODO Auto-generated method stub
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
</span>
一般在应用中都会采用push的方法来自动的消费信息
相关文章推荐
- maven插件
- 详解RocketMQ中的Producer
- allegro Disable custom colors is enabled
- 背包系列第六篇----完全背包(求解最大价值的个数)
- RocketMq相关介绍
- ARP协议
- 复制指定目录下的指定文件,并修改后缀名
- Windbg、dump分析类资源链接
- 有一个文本文件中存储了几个名称,写一个程序实现随机获取一个人的名字(抽奖)
- test
- 负载均衡算法
- HNOI2016 大数(number)<莫队>
- opencv的CV_EXPORT
- hrbust/哈理工oj 1877 区间【水题】
- 文科状元转CS
- 数据结构相同情况下数据表之间数据的快速"copy"
- maven学习系列3----仓库
- Java注解(二) 系统注解
- 将文件中字符串赋值到ArrayList中
- 关于Wireshark无法启动 一直在加载 loading configrue 或者 initializing解决