RocketMQ / rocketmq-example / src / main / java / com / alibaba / rocketmq / example / simple /
2014-09-05 11:01
405 查看
https://github.com/alibaba/RocketMQ/blob/develop/rocketmq-example/src/main/java/com/alibaba/rocketmq/example/simple/PushConsumer.java
/**
* Copyright (C) 2010-2013 Alibaba Group Holding Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.rocketmq.example.simple;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
public class PushConsumer {
/**
* 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>
* 但是实际PushConsumer内部是使用长轮询Pull方式从Broker拉消息,然后再回调用户Listener方法<br>
*/
public static void main(String[] args) throws InterruptedException, MQClientException {
/**
* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
* 注意:ConsumerGroupName需要由应用来保证唯一
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_001");
// consumer.setNamesrvAddr("10.235.169.72:9876");
// consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 订阅指定topic下tags分别等于TagA或TagC或TagD
*/
consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
/**
* 订阅指定topic下所有消息<br>
* 注意:一个consumer对象可以订阅多个topic
*/
consumer.subscribe("TopicTest2", "*");
consumer.subscribe("TopicTest3", "*");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
MessageExt msg = msgs.get(0);
if (msg.getTopic().equals("TopicTest1")) {
// 执行TopicTest1的消费逻辑
if (msg.getTags() != null && msg.getTags().equals("TagA")) {
// 执行TagA的消费
}
else if (msg.getTags() != null && msg.getTags().equals("TagC")) {
// 执行TagC的消费
}
else if (msg.getTags() != null && msg.getTags().equals("TagD")) {
// 执行TagD的消费
}
}
else if (msg.getTopic().equals("TopicTest2")) {
// 执行TopicTest2的消费逻辑
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
*/
consumer.start();
System.out.println("Consumer Started.");
}
}
/**
* Copyright (C) 2010-2013 Alibaba Group Holding Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.rocketmq.example.simple;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
public class PushConsumer {
/**
* 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>
* 但是实际PushConsumer内部是使用长轮询Pull方式从Broker拉消息,然后再回调用户Listener方法<br>
*/
public static void main(String[] args) throws InterruptedException, MQClientException {
/**
* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
* 注意:ConsumerGroupName需要由应用来保证唯一
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_001");
// consumer.setNamesrvAddr("10.235.169.72:9876");
// consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 订阅指定topic下tags分别等于TagA或TagC或TagD
*/
consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
/**
* 订阅指定topic下所有消息<br>
* 注意:一个consumer对象可以订阅多个topic
*/
consumer.subscribe("TopicTest2", "*");
consumer.subscribe("TopicTest3", "*");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
MessageExt msg = msgs.get(0);
if (msg.getTopic().equals("TopicTest1")) {
// 执行TopicTest1的消费逻辑
if (msg.getTags() != null && msg.getTags().equals("TagA")) {
// 执行TagA的消费
}
else if (msg.getTags() != null && msg.getTags().equals("TagC")) {
// 执行TagC的消费
}
else if (msg.getTags() != null && msg.getTags().equals("TagD")) {
// 执行TagD的消费
}
}
else if (msg.getTopic().equals("TopicTest2")) {
// 执行TopicTest2的消费逻辑
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
*/
consumer.start();
System.out.println("Consumer Started.");
}
}
相关文章推荐
- RocketMQ / rocketmq-example / src / main / java / com / alibaba / rocketmq / example / simple / Prod
- rocketmq 异常 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <rocketmq-n
- 【Quartz】【程序目录结构】/DetectNonWorkingDay/src/main/java/com/apple/sqm/dnwd/DNWDServer.java
- 【Quartz】【程序目录结构】/DetectNonWorkingDay/src/main/java/com/apple/sqm/dnwd/detect/ADetect.java
- 异常解决方案:java.lang.ClassNotFoundException: com.example.robo.MainActivity in loader da
- Android项目编译的时候出现:Caused by: java.lang.ClassNotFoundException: com.example.aaa.MainActivity
- java.lang.ClassNotFoundException: Didn't find class "com.example.b.MainActivity
- 【Quartz】【程序目录结构】/DetectNonWorkingDay/src/main/java/com/apple/sqm/dnwd/detect/delta/Detect.java
- Simple example of using the Java Native Interface
- Exception in thread "main" java.lang.NoClassDefFoundError: com/sun/mail/util/LineInputStream解决MyEclipse8.0
- 发送邮件时遇到mail包冲突异常Exception in thread "main" java.lang.NoClassDefFoundError: com/sun/mail/util/LineInputStream 解决方法
- Exception in thread "main" java.lang.NoClassDefFoundError: com/sun/mail/util/LineInputStream
- Exception in thread "main" java.lang.NoClassDefFoundError: com/sun/mail/util/Lin
- packages/providers/contactsprovider/src/com/androidproviders/contacts/ContactsDatabaseHelpher.java
- Jbpm发邮件抛出Exception in thread "main" java.lang.NoClassDefFoundError: com/sun/mail/util/LineInputStream
- jstring Java_com_example_hellojni_HelloJni_stringFromJNI
- java reflect example(http://tutorials.jenkov.com/java-reflection/classes.html)
- 【myeclipse 10 】Exception in thread "main" java.lang.NoClassDefFoundError: com/sun/mail/util/LineInpu
- com.bruceeckel.simpletest in "Thinking in Java 3rd Edition"
- Unable to find a javac compiler; com.sun.tools.javac.Main is not on the classpath.Perhaps JAVA_HOME does not point to the JDK