您的位置:首页 > 其它

rocketmq学习笔记 二 官方实例<消费者操作>

2016-12-08 22:16 471 查看
来学习最后一个例子,消费者的一些操作api

rocketmq,支持推拉模式, 所谓的推模式,rocketmq是通过客户端长轮询拉取来实现的

0.拉消息

/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0 *
*  Unless required by applicable law or agreed to in writing, software
*  distributed under the License is distributed on an "AS IS" BASIS,
*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*  See the License for the specific language governing permissions and
*  limitations under the License.
*/
package com.alibaba.rocketmq.example.simple;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageQueue;

public class PullConsumer {
private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();

public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
//获取订阅topic的queue
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
for (MessageQueue mq : mqs) {
System.out.println("Consume from the queue: " + mq);
SINGLE_MQ:
while (true) {
try {//阻塞的拉去消息,中止时间默认20s
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.println(Thread.currentThread().getName()+new Date()+""+pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND://pullSataus
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

consumer.shutdown();
}

private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null)
return offset;

return 0;
}

private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
}

}


1.推消息 注册Listener,broker回调Listener

/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0 *
*  Unless required by applicable law or agreed to in writing, software
*  distributed under the License is distributed on an "AS IS" BASIS,
*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*  See the License for the specific language governing permissions and
*  limitations under the License.
*/
package com.alibaba.rocketmq.example.simple;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

public class PushConsumer {

public static void main(String[] args) throws InterruptedException, MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

consumer.subscribe("Jodie_topic_1023", "*");

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.registerMessageListener(new MessageListenerConcurrently() {

/**

*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();

System.out.println("Consumer Started.");
}
}


2.定时拉消息

/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0 *
*  Unless required by applicable law or agreed to in writing, software
*  distributed under the License is distributed on an "AS IS" BASIS,
*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*  See the License for the specific language governing permissions and
*  limitations under the License.
*/

package com.alibaba.rocketmq.example.simple;

import java.util.Date;

import com.alibaba.rocketmq.client.consumer.MQPullConsumer;
import com.alibaba.rocketmq.client.consumer.MQPullConsumerScheduleService;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.consumer.PullTaskCallback;
import com.alibaba.rocketmq.client.consumer.PullTaskContext;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

public class PullScheduleService {

public static void main(String[] args) throws MQClientException {
final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");
scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("127.0.0.1:9876");
scheduleService.setMessageModel(MessageModel.CLUSTERING);
scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() {

@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {
MQPullConsumer consumer = context.getPullConsumer();
try {

long offset = consumer.fetchConsumeOffset(mq, false);
if (offset < 0)
offset = 0;

PullResult pullResult = consumer.pull(mq, "*", offset, 32);
System.out.println(new Date()+"--"+offset + "\t" + mq + "\t" + pullResult);
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
case OFFSET_ILLEGAL:
break;
default:
break;
}
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());

//设置隔多长时间进行下次拉去
context.setPullNextDelayTimeMillis(10000);
} catch (Exception e) {
e.printStackTrace();
}
}
});

scheduleService.start();
}
}


粗看下源码

class PullTaskImpl implements Runnable {
private final MessageQueue messageQueue;
private volatile boolean cancelled = false;

public PullTaskImpl(final MessageQueue messageQueue) {
this.messageQueue = messageQueue;
}

@Override
public void run() {
String topic = this.messageQueue.getTopic();
if (!this.isCancelled()) {
PullTaskCallback pullTaskCallback =
MQPullConsumerScheduleService.this.callbackTable.get(topic);
if (pullTaskCallback != null) {
final PullTaskContext context = new PullTaskContext();
context.setPullConsumer(MQPullConsumerScheduleService.this.defaultMQPullConsumer);
try {
pullTaskCallback.doPullTask(this.messageQueue, context);
} catch (Throwable e) {
context.setPullNextDelayTimeMillis(1000);
log.error("doPullTask Exception", e);
}

if (!this.isCancelled()) {
MQPullConsumerScheduleService.this.scheduledThreadPoolExecutor.schedule(this,
context.getPullNextDelayTimeMillis(), TimeUnit.MILLISECONDS);
} else {
log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
}
} else {
log.warn("Pull Task Callback not exist , {}", topic);
}
} else {
log.warn("The Pull Task is cancelled, {}", messageQueue);
}
}

public boolean isCancelled() {
return cancelled;
}

public void setCancelled(boolean cancelled) {
this.cancelled = cancelled;
}

public MessageQueue getMessageQueue() {
return messageQueue;
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: