您的位置:首页 > 其它

rocketmq

2016-01-11 10:41 302 查看


1.nameserver是无状态的,控制着broker,producer,consumer集群的同步

2.broker是处理消息中转,负责储存和过滤消息

3.rocketmq有两种消息消费模型,广播模式,一个消费者组中的每一个消费者都消费;集群模式,一个消费者组中的每一个消费者平均消费消息

4.顺序消费,生产者向一个topic的一个队列,顺序发送消息,切生产者只能单线程发送,消费者在消费时使用顺序消费模式


rocketmq是什么

1.一种队列模型的消息中间件,高性能,高可靠,高实时,分布式

2.producer,consumer,broker都可以分布式

3.默认情况下,prducer会向一个topic下的一些队列轮流发送消息


api

生产者

//顺序发送
DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP);
producer.setNamesrvAddr(ROCKET_SERVER);
Message message = new Message("test", "tags", "1", info.getBytes());
producer.start();
for (int i = 0; i < 10; i++) {
SendResult result = producer.send(message,new    SelectMessageQueueByHash(),i);
}
producer.shutdown();

 

//随机发送
DefaultMQProducer producer = new DefaultMQProducer(PRODUCT_GROUP);
producer.setNamesrvAddr(ROCKET_SERVER);
Message message = new Message("test", "tags", "1", info.getBytes());
producer.start();
for (int i = 0; i < 10; i++) {
SendResult result = producer.send(message);
}
producer.shutdown();



消费者

//普通消费
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setNamesrvAddr(ROCKET_SERVER);
consumer.subscribe("test", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (CollectionUtils.isNotEmpty(list)) {
Iterator it = list.iterator();
while (it.hasNext()) {
MessageExt msg = (MessageExt) it.next();
System.out.println(msg.toString());
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();

 

//顺序消费
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setNamesrvAddr(ROCKET_SERVER);
consumer.subscribe("test", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
if (CollectionUtils.isNotEmpty(list)) {
Iterator it = list.iterator();
while (it.hasNext()) {
MessageExt msg = (MessageExt) it.next();
System.out.println(msg.toString());
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();


启动广播模式

consumer.setMessageModel(MessageModel.BROADCASTING);

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: