您的位置:首页 > 其它

RocketMQ消息系统 - pull模式

2016-03-13 20:24 459 查看

RocketMQ消息系统 - pull模式

一、消费模式

消息系统常见的消费模式分为push模式和pull模式,push模式是服务端主动给客户端推送数据,也是比较常见的模式,pull模式则是客户端主动去服务端拉取数据。

 

1.1 push模式

① 服务端保存push状态,失败重发;

② 客户端无状态;

③ push是实时的;

④ 负载均衡,服务端统一处理和控制,需要根据消费者的消费能力做流控;

 

1.2 pull模式

① 服务端无状态;

② 客户端保存当前pull状态,以便在故障或者重启时恢复;

③ pull分长轮询(实时)和短轮询(与pull的时间间隔有关);

④ 负载均衡,消费者自行控制;

 

 

二、pull模式原理


  

PullMessageTask:

① 每3s拉取一次;

② 更新拉取的队列的offset到broker;

③ 每个队列启动一个PullMessageTask,负责这个队列的消息拉取; 

④ 消费失败,或者listener执行的时候出错的消息,保存到H2数据库中;

 

ReconsumerMessgaeTask:

对于消费失败的消息,从数据库中获取并重新消费;

 

三、pull模式使用

3.1 消费者

消息topic:mengka-cc2

groupId:  consumerG2

消息模式:  PULL

拉取间隔时间:3000ms

 

<bean id="consumerNotifyManager" class="com.mengka.mq.client.NotifyManagerBean" init-method="init">
<property name="groupId" value="consumerG2" />
<property name="name" value="taaNotifyManager" />
<property name="topic" value="mengka-cc2"/>
<property name="ctype" value="PULL"/>
<property name="namesrvAddr" value="192.168.1.42:9876"/>
</bean>

<bean id="pullMessageManager" class="com.mengka.mq.listener.PullMessageManager">
<property name="consumerNotifyManager" ref="consumerNotifyManager"/>
<property name="messageListener" ref="taaPullMessageListener" />
<property name="spaceTime" value="3000"/>
</bean>

 

/**
* User: mengka
* Date: 15-8-8
*/
@Component
public class TaaPullMessageListener implements MessageListenerPull {

private static final Logger log = LoggerFactory.getLogger(TaaPullMessageListener.class);

@Override
public ConsumeConcurrentlyStatus consumeMessage(MessageExt msg) {
log.info("---------------, receive message id = "+msg.getMsgId()+" , content = "+new String(msg.getBody())+" , tags = "+msg.getTags());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}

 

 

3.2 生产者

消息topic:mengka-cc2

groupId:  consumerG2

 

<bean id="producterNotifyManager" class="com.mengka.mq.client.NotifyManagerBean" init-method="initProducter">
<property name="groupId" value="consumerG2" />
<property name="name" value="taaNotifyManager" />
<property name="topic" value="mengka-cc2"/>
<property name="namesrvAddr" value="192.168.1.42:9876"/>
</bean>

 

String serviceConfigXMLs[] = new String[]{"rocketmq_pull_06/rocketmq-pull-producer.xml"};
ApplicationContext context = new ClassPathXmlApplicationContext(serviceConfigXMLs);
NotifyManager producterNotifyManager = (NotifyManager) context.getBean("producterNotifyManager");

String content = "Just for test[" + TimeUtil.toDate(new Date(), TimeUtil.format_1);
Message message = new BytesMessage(content.getBytes());
SendResult result = producterNotifyManager.sendMessage(message);

 

 

3.3 拉取消息消费

生产者发送消息之后,在下一次的轮询中,消费者就拉取到了未消费的消息数据,消息消费成功后并更新offset到服务端;

 
 

 

 

 

 

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: