rocketmq
2016-01-11 10:41
302 查看
1.nameserver是无状态的,控制着broker,producer,consumer集群的同步
2.broker是处理消息中转,负责储存和过滤消息
3.rocketmq有两种消息消费模型,广播模式,一个消费者组中的每一个消费者都消费;集群模式,一个消费者组中的每一个消费者平均消费消息
4.顺序消费,生产者向一个topic的一个队列,顺序发送消息,切生产者只能单线程发送,消费者在消费时使用顺序消费模式
rocketmq是什么
1.一种队列模型的消息中间件,高性能,高可靠,高实时,分布式
2.producer,consumer,broker都可以分布式
3.默认情况下,prducer会向一个topic下的一些队列轮流发送消息
api
生产者
//顺序发送 DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP); producer.setNamesrvAddr(ROCKET_SERVER); Message message = new Message("test", "tags", "1", info.getBytes()); producer.start(); for (int i = 0; i < 10; i++) { SendResult result = producer.send(message,new SelectMessageQueueByHash(),i); } producer.shutdown();
//随机发送 DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP); producer.setNamesrvAddr(ROCKET_SERVER); Message message = new Message("test", "tags", "1", info.getBytes()); producer.start(); for (int i = 0; i < 10; i++) { SendResult result = producer.send(message); } producer.shutdown();
消费者
//普通消费 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setNamesrvAddr(ROCKET_SERVER); consumer.subscribe("test", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { if (CollectionUtils.isNotEmpty(list)) { Iterator it = list.iterator(); while (it.hasNext()) { MessageExt msg = (MessageExt) it.next(); System.out.println(msg.toString()); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
//顺序消费 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setNamesrvAddr(ROCKET_SERVER); consumer.subscribe("test", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) { if (CollectionUtils.isNotEmpty(list)) { Iterator it = list.iterator(); while (it.hasNext()) { MessageExt msg = (MessageExt) it.next(); System.out.println(msg.toString()); } } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start();
启动广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
相关文章推荐
- 消息中间件RocketMQ的安装与部署
- RocketMQ原理解析-broker 3.load&recover
- RocketMQ在windows上安装和开发使用
- RocketMQ 笔记-转
- RocketMq 入门 (1)基础原理介绍
- 《RocketMQ 安装和使用》
- Rocketmq
- RocketMQ的安装与使用
- RocketMQ 的疑惑--131072 数字解答
- rocketmq开发指南-v3.2.4
- RocketMQ的重复消费问题
- Rocketmq拉取pull消息分页数目测试
- RocketMQ的NameServer安装报错解决
- RocketMQ 加载配置文件
- 使用阿里的rocketMQ查询相关的坑
- RocketMQ初探一:NameServer的作用
- 写一个 shell 脚本 守候rocketmq 进程不当掉
- RocketMQ生产者示例程序
- RocketMQ 自己的整理和理解
- rocketmq介绍及安装过程