rocketmq学习笔记 二 官方实例<消费者操作>
2016-12-08 22:16
471 查看
来学习最后一个例子,消费者的一些操作api
rocketmq,支持推拉模式, 所谓的推模式,rocketmq是通过客户端长轮询拉取来实现的
0.拉消息
1.推消息 注册Listener,broker回调Listener
2.定时拉消息
粗看下源码
rocketmq,支持推拉模式, 所谓的推模式,rocketmq是通过客户端长轮询拉取来实现的
0.拉消息
/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.rocketmq.example.simple; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Set; import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; import com.alibaba.rocketmq.client.consumer.PullResult; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageQueue; public class PullConsumer { private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.start(); //获取订阅topic的queue Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1"); for (MessageQueue mq : mqs) { System.out.println("Consume from the queue: " + mq); SINGLE_MQ: while (true) { try {//阻塞的拉去消息,中止时间默认20s PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); System.out.println(Thread.currentThread().getName()+new Date()+""+pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND://pullSataus break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = offseTable.get(mq); if (offset != null) return offset; return 0; } private static void putMessageQueueOffset(MessageQueue mq, long offset) { offseTable.put(mq, offset); } }
1.推消息 注册Listener,broker回调Listener
/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.rocketmq.example.simple; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("Jodie_topic_1023", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() { /** */ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
2.定时拉消息
/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.rocketmq.example.simple; import java.util.Date; import com.alibaba.rocketmq.client.consumer.MQPullConsumer; import com.alibaba.rocketmq.client.consumer.MQPullConsumerScheduleService; import com.alibaba.rocketmq.client.consumer.PullResult; import com.alibaba.rocketmq.client.consumer.PullTaskCallback; import com.alibaba.rocketmq.client.consumer.PullTaskContext; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; public class PullScheduleService { public static void main(String[] args) throws MQClientException { final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1"); scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("127.0.0.1:9876"); scheduleService.setMessageModel(MessageModel.CLUSTERING); scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() { @Override public void doPullTask(MessageQueue mq, PullTaskContext context) { MQPullConsumer consumer = context.getPullConsumer(); try { long offset = consumer.fetchConsumeOffset(mq, false); if (offset < 0) offset = 0; PullResult pullResult = consumer.pull(mq, "*", offset, 32); System.out.println(new Date()+"--"+offset + "\t" + mq + "\t" + pullResult); switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: case OFFSET_ILLEGAL: break; default: break; } consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset()); //设置隔多长时间进行下次拉去 context.setPullNextDelayTimeMillis(10000); } catch (Exception e) { e.printStackTrace(); } } }); scheduleService.start(); } }
粗看下源码
class PullTaskImpl implements Runnable { private final MessageQueue messageQueue; private volatile boolean cancelled = false; public PullTaskImpl(final MessageQueue messageQueue) { this.messageQueue = messageQueue; } @Override public void run() { String topic = this.messageQueue.getTopic(); if (!this.isCancelled()) { PullTaskCallback pullTaskCallback = MQPullConsumerScheduleService.this.callbackTable.get(topic); if (pullTaskCallback != null) { final PullTaskContext context = new PullTaskContext(); context.setPullConsumer(MQPullConsumerScheduleService.this.defaultMQPullConsumer); try { pullTaskCallback.doPullTask(this.messageQueue, context); } catch (Throwable e) { context.setPullNextDelayTimeMillis(1000); log.error("doPullTask Exception", e); } if (!this.isCancelled()) { MQPullConsumerScheduleService.this.scheduledThreadPoolExecutor.schedule(this, context.getPullNextDelayTimeMillis(), TimeUnit.MILLISECONDS); } else { log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue); } } else { log.warn("Pull Task Callback not exist , {}", topic); } } else { log.warn("The Pull Task is cancelled, {}", messageQueue); } } public boolean isCancelled() { return cancelled; } public void setCancelled(boolean cancelled) { this.cancelled = cancelled; } public MessageQueue getMessageQueue() { return messageQueue; } } }
相关文章推荐
- rocketmq学习笔记 二 官方实例<分布式事务>
- rocketmq学习笔记 二 官方实例<广播>
- <Head First Java>学习笔记--第四章:方法操作实例变量 对象的行为
- rocketmq学习笔记 二 官方实例<消息过滤>
- C++学习笔记之对文件的操作<1>
- LDAP学习笔记<四>jldap实现Java对LDAP的基本操作
- 官方游戏<恶魔射手>--学习笔记二(英雄脚本)
- 『算法学习笔记』10th day. 文件操作<1>
- Unity官方第一个人称射击游戏<恶魔射手>—学习笔记四(游戏中的管理)
- rocketmq学习笔记 二 官方实例<生产者操作>
- C++学习笔记之对文件的操作<1>
- unix学习笔记<2> 多线程 udp聊天程序 简单实例
- C++学习笔记之对文件的操作<2>
- Java学习笔记之<反射、内省、BeanUtils包操作JavaBean的对比>
- Unity官方第一人称射击游戏<恶魔射手>_学习笔记三(敌人脚本)
- C++学习笔记之对文件的操作<1>
- oracle开发之<<SQL Cookbook>>学习笔记整理:第三章 操作多个表
- Git学习笔记之<撤销操作>
- C++学习笔记之对文件的操作<1>