rabbitmq-consumer-x-priority
2015-11-30 13:53
246 查看
参考:http://www.aichengxu.com/view/37900
如果是一个队列只希望一个消费者进行处理,那么定义队列的时候可以指定时独占模式:exclusive
如果是一个队列由多个消费者,但是只希望消息由其中的一个消费者优先进行处理,当这个消费者挂掉的时候,再由其他消费者进行处理的话,可以给消费者设置不同的优先级
从RabbitMQ的3.2版本开始,这个消息代理支持消费者优先级。这个可以通过设置消费者的x-priority进行配置。
spring配置:为了方便,命名空间在listener元素上提供了priority属性:
<rabbit:listener-containerconnection-factory="rabbitConnectionFactory">
<rabbit:listenerqueues="some.queue"ref="somePojo"method="handle"priority="10"/>
</rabbit:listener-container>
String queue = channel.queueDeclare().getQueue(); //声明三个不同级别的消费者
QueueingConsumer highConsumer = new QueueingConsumer(channel);
QueueingConsumer medConsumer = new QueueingConsumer(channel);
QueueingConsumer lowConsumer = new QueueingConsumer(channel);
String high = channel.basicConsume(queue, true, args(1), highConsumer); // 优先级为1 最高
String med = channel.basicConsume(queue, true, medConsumer); //优先级为0 第二
channel.basicConsume(queue, true, args(-1), lowConsumer);// 优先级为-1 最低 publish(queue, COUNT, "high");//COUNT=10 发送10条消息
publish(queue, COUNT, "med");
publish(queue, COUNT, "low"); assertContents(highConsumer, COUNT, "high");//消费消息
assertContents(medConsumer, COUNT, "med");
assertContents(lowConsumer, COUNT, "low");
} private Map<String, Object> args(Object o) {
Map<String, Object> map = new HashMap<String, Object>();
map.put("x-priority", o);
return map;
} private void assertContents(QueueingConsumer qc, int count, String msg) throws InterruptedException {
for (int i = 0; i < count; i++) {
QueueingConsumer.Delivery d = qc.nextDelivery();
assertEquals(msg, new String(d.getBody()));
}
assertEquals(null, qc.nextDelivery(0));
} 解析: 1、这里一共声明了三个优先级不一样的消费者:高、中、低 2、发送消息时,发送的顺序是高、中、低。那么就意味着broker发送消息时,也是高、中、低的顺序。 4、这里还发现一点:就是首先发送的消息为A,然后首先接收消息的是也是A,但是A的优先级不是最高的,那么程序就会一直阻塞。 原因:这里如果给consumer设置了优先级,这里假设两个consumer的优先级是一样的。那么在初始环境下,broker会轮流发送消息。这个意思就是假设broker里面有4个消息,那么broker会把1和3的消息先发给先接受的consumer。比如这里C1先接收消息,那么C1的队列里面就有2个消息了。但是当我们第3次接收消息时,就会阻塞。因为consumer接收消息队列的是LinkedBlockingQueue。
相关文章推荐
- RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)(转)
- RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)
- RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)
- Rabbitmq RabConsumer2类
- RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)
- RabbitMQ Queue分发多个Consumer
- RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)[转]
- RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)
- RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)
- 关于 RabbitMQ 中 consumer 侧的 ack 属性分析
- RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)
- RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)
- Rabbitmq-Java-Client重连和consumer恢复机制
- rabbitmq 集群 ha负载 Consumer raised exception, processing can restart if the connection factory
- RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)
- RabbitMQ Queue分发多个Consumer
- 消费RabbitMQ时的注意事项,如何禁止大量的消息涌到Consumer
- rabbitmq性能优化之Consumer utilisation
- RabbitMQ之Consumer消费模式(Push & Pull)
- RabbitMQ消息队列(四):分发到多Consumer(Publish/Subscribe)