快速入门分布式消息队列之 RabbitMQ(3)
2017-12-10 21:49
645 查看
目录
目录前文列表
前言
通道 Channel
一个基本的生产者消费者实现
消费者
生产者
运行结果
应用预取计数
应用 ACK 机制
最后
前文列表
快速入门分布式消息队列之 RabbitMQ(1)快速入门分布式消息队列之 RabbitMQ(2)
前言
在前文列表中,分别介绍了 RabbitMQ 的对象概念及其关键特性,本篇将通过编程的方式来回溯这些知识点,从实践应用的角度继续深入 RabbitMQ。安装 RabbitMQ 的 Python 客户端 pika:
pip install pika
通道 Channel
在代码实现之前我们还需要补充一个通讯概念——通道通道 channel:也被称为频道,指在 TCP 连接中建立的虚拟通信渠道。RabbitMQ 客户端与服务器之间的通讯并没有直接使用 TCP 连接,因为每一次通讯都需要创建和销毁 TCP 连接,这对操作系统来说是一笔昂贵的开销。并且 TCP 连接数量是有限制的,会成为通讯性能的瓶颈。解决该问题的方法就是在一个 TCP 连接中创建多个虚拟连接通道,而且通道的数量并没有限制,性能也很好。所以,通常我们只需要在应用程序中维护少量甚至一个 TCP 连接即可满足需求。
一个基本的生产者/消费者实现
消费者
import pika # 消费者回调任务,在这里定义消费者处理消息数据的逻辑。 def consumer_callback(channel, method, properties, body): print " [x] Consumed %r" % (body,) # 定义 AMQP URL,这里使用前文中已经创建好的超级管理员 mickey 和虚拟主机 web_app params = pika.URLParameters('amqp://mickey:passw0rd@localhost:5672/web_app') # 创建与 RabbitMQ 的连接,也称为消息代理连接 conn = pika.BlockingConnection(params) # 新建一个连接中的通道 channel = conn.channel() # 声明一个直连交换机,通过消息路由键和绑定路由键的匹配来完成路由转发策略 # 参数 durable=True, auto_delete=False 表示希望持久化交换机 # 其中 durable=True 表示 RabbitMQ 重启后会自动重建该交换机 channel.exchange_declare(exchange='web', exchange_type='direct', passive=False, durable=True, auto_delete=False) # 声明一个队列,如果生产者将消息发送给了一个不存在的队列,那么 RabbitMQ 会自动丢弃该消息 channel.queue_declare(queue='app') # 将队列绑定到交换机,并设置一个路由键 channel.queue_bind(queue='app', exchange='web', routing_key='web_app_route') # 指定消费者订阅的队列,并且告诉消息代理不需要等待 ACK channel.basic_consume(consumer_callback, queue='app', no_ack=True) # 开始监听订阅队列,直到 CTRL+C 退出。 try: print(" [*] Waiting for messages. To exit press CTRL+C") channel.start_consuming() except KeyboardInterrupt as err: channel.stop_consuming() conn.close()
生产者
import json import pika import sys # 同样需要建立连接和通道 params = pika.URLParameters('amqp://mickey:passw0rd@localhost:5672/web_app') conn = pika.BlockingConnection(params) channel = conn.channel() # 将在生产者中声明的 RabbitMQ 对象再重新声明一次,如果已经存在了则不会重复创建 # 这段逻辑实际上可有可无,只是为了说明声明一个 RabbitMQ 对象并不表示一定会创建 # 只有在第一次声明该对象的时候才会创建,之后无论在生成者或消费者中都可以再次声明 # channel.exchange_declare(exchange='web', exchange_type='direct', # passive=False, durable=True, auto_delete=False) # channel.queue_declare(queue='app') # channel.queue_bind(queue='app', # exchange='web', # routing_key='web_app_route') # 配置 AMQP 消息的 BasicProperties 基本属性 # 在 AMQP 协议中定义了 14 种 Properties,会随消息一同传递,这里表示使用 JSON 格式数据流 # 参数 delivery_mode=2 表示希望持久化消息,在 RabbitMQ 重启后自动重建消息 # 前文也提到过,持久化消息需要考虑到性能成本的问题 props = pika.BasicProperties(content_type='application/json', delivery_mode=2) message = ' '.join(sys.argv[1:]) or "Hello World!" body = {'msg': message} # 发布消息,指定消息传递的交换机和所携带的路由键 print(" [x] Publish %s", message) channel.basic_publish(exchange='web', routing_key='web_app_route', body=json.dumps(body), properties=props) conn.close()
NOTE:如果你希望使用缺省的虚拟主机和 guest 用户时,你的 AMQP URL 应该是这样的:
# '%2F' 是缺省虚拟主机 '/' 的转义 params = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
运行结果
在两个终端分别运行生产者和消费者。生产者
$ python producer.py First message. (' [x] Publish %s', 'First message.') $ python producer.py First message (' [x] Publish %s', 'First message') $ python producer.py Second message (' [x] Publish %s', 'Second message') $ python producer.py Third message (' [x] Publish %s', 'Third message') $ python producer.py (' [x] Publish %s', 'Hello World!')
消费者
$ python consumer.py [*] Waiting for messages. To exit press CTRL+C [x] Consumed '{"msg": "First message"}' [x] Consumed '{"msg": "Second message"}' [x] Consumed '{"msg": "Third message"}' [x] Consumed '{"msg": "Hello World!"}'
可以通过 CLI 来查看队列的消息情况:
$ rabbitmqctl list_queues -p web_app Listing queues ... app 0 ...done.
NOTE:需要注意的是,上例中仅运行了一对生产者/消费者。实际上我们可以尝试同时运行多个消费者,并订阅到一个队列。这样的话,RabbitMQ 就会默认以分摊的方式将消息分别给多个消费者。
应用预取计数
我们知道,因为每个消费者执行的任务长度不尽相同,如果使用分摊的方式来分配消息的话,那么任务粒度小、执行时间短的消费者就会闲置下来。解决的方法就是在消费者中应用预期计数来实现公平调度(Fair dispatch)的效果。# 当预取计数为 1 时,RabbitMQ 不会同时为消费者分配多个任务,只有等消费者处理完消息之后,才会接收下一个消息 channel.basic_qos(prefetch_count=1) # 当预取计数为 10 时,RabbitMQ 会同时让消费者取出 10 个消息,直到 10 个消息都处理完之后,再继续接收下一次 10 个消息 # channel.basic_qos(prefetch_count=10)
可见,当消费者执行的任务长度较短时,应该给予更大的预取计数,来充当发挥消费者的性能。
应用 ACK 机制
应用 ACK 机制,来保证消息的有效传递。def consumer_callback(channel, method, header_props, body): print " [x] Consumed %r" % (body,) # 在处理完消息之后,返回 ACK 消息应答 channel.basic_ack(delivery_tag=method.delivery_tag) # 订阅队列的时候,参数 no_ack=False 表示告诉消息代理要等待 ACK 之后才将消息丢弃 channel.basic_consume(consumer_callback, queue='app', no_ack=False)
最后
如果你常接触 RabbitMQ,那么建议你结合上篇和中篇里提到的对象概念以及特性来浏览代码,相信会有更深的感触。除此之外,我们还可以在 RabbitMQ Tutorials 中获取更多的 Samples。相关文章推荐
- 快速入门分布式消息队列之 RabbitMQ(2)
- 快速入门分布式消息队列之 RabbitMQ(1)
- 快速入门分布式消息队列之 RabbitMQ(上)
- 快速入门分布式消息队列之 RabbitMQ(下)
- RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)
- 分布式消息队列RabbitMQ之一:基本概念理解
- 学渣讲消息队列之RabbitMQ从敲门到入门(第二讲)—— "Hello World!"
- 【转】快速理解Kafka分布式消息队列框架
- RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)
- 消息队列RabbitMQ入门介绍
- 跨数据库分布式实时事务 - 基于RabbitMQ实时消息队列服务实现
- 快速理解Kafka分布式消息队列框架
- 消息队列RABBITMQ入门介绍
- RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)
- RabbitMQ入门与消息队列模式详解
- 消息队列RabbitMQ入门介绍
- rabbitmq消息队列的简单入门
- RabbitMQ消息队列入门篇(环境配置+Java实例+基础概念)
- 消息队列RabbitMQ入门介绍
- Celery 分布式任务队列快速入门