Python 之RabbitMQ使用
2018-03-29 08:55
302 查看
1. IO 多路复用
# select 模拟socket server # server 端 import select import socket import sys import queue server = socket.socket() server.setblocking(False) server_addr = ('localhost', 10000) print('starting up on %s port %s' % server_addr) server.bind(server_addr) server.listen(5) inputs = [server, ] outputs = [] message_queues = {} while True: print('waiting for next event...') readable, writeable, exeptional = select.select(inputs, outputs, inputs) for s in readable: # 每个 s 就是一个socket if s in server: conn, client_addr = s.accept() print('new connection from', client_addr) conn.setblocking(False) inputs.append(conn) message_queues[conn] = queue.Queue() else: data = s.recv(1024) if data: print('收到来自[%s]的数据:' % s.getpeername()[0], data) message_queues[s].put[data] if s not in outputs: outputs.append(s) else: print('客户端断开了', s) if s in outputs: outputs.remove(s) inputs.remove(s) del message_queues[s] for s in writeable: try: next_msg = message_queues[s].get_nowait() except queue.Empty: print('client [%s]' % s.getpeername()[0], 'queue is empty...') outputs.remove(s) else: print('sending msg to [%s]' % s.getpeername()[0], next_msg) s.send(next_msg.upper()) for s in exceptional: print('handling exception for', s.getpeername()) inputs.remove(s) if s in outputs: outputs.remove(s) s.close() del message_queues[s] # client 端 import socket import sys messages = [b'This is the message ', b'It will be sent', b'in parts'] server_address = ('localhost', 10000) socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM), socket.socket(socket.AF_INET, socket.SOCK_STREAM),] print('connecting to %s port %s' % server_address) for s in socks: s.connect(server_address) for message in messages: for s in socks: print('%s: sending "%s"' % (s.getsockname(), message)) s.send(message) for s in socks: data = s.recv(1024) print('%s: received "%s"' % (s.getsockname(), data)) if not data: print(sys.stderr, 'closing socket', s.getsockname())
2. RabbitMQ
2.1 RabbitMQ 准备工作(以mac为例)
- 安装
RabbitMQ
:brew install rabbitmq
- 安装
pika
:pip3 install pika
- 启动
RabbitMQ
:/usr/local/Cellar/rabbitmq/3.7.4/sbin/rabbitmq-server
# 示例: # 发送端 import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() # 声明一个管道 # 声明queue channel.queue_declare(queue='hello') # 需要 exchange 作为中转站 channel.basic_publish(exchange='', routing_key='hello', # queue 名字 body='Hello World!') print("' [x] Sent 'Hello World!'") connection.close() # 接收端 import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, # 如果收到消息,就调用 CALLBACK 函数来处理消息 queue='hello', no_ack=True) # no acknowledgement 不确认 print(' [*] Waiting fo messages. To exit press CTRL+C') channel.start_consuming()
2.2 RabbitMQ 消息分发轮询
- 先启动消息生产者,然后再分别启动3个消费者,通过生产者多发送几条消息,这几条消息会依次分配到各个消费者身上;
2.3 RabbitMQ 消息持久化(发送端)
# 示例: `channel.queue_declare(queue='hello', durable=True)`: 队列持久化; # 示例二: 消息持久化 channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties(delivery_mode=2,) # 将消息持久化 )
2.4 RabbitMQ fanout广播模式
- 如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个配置不高的机器的消费者那里
堆积了很多消息处理不完,同时配置高的消费者却一直很轻松; - 为解决上述问题,可以在各个消费者端,配置
perfetch=1
,意思是告诉RabbitMQ,消费者当前消息还没处理完的时候,就不
要再向该消费者发送新消息了。
# 示例: # 发送端: import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello', durable=True) message='Hello World!' channel.basic_publish(exchange='', routing_key='hello', body=message, properties=pika.BasicProperties(delivery_mode=2,)) print(' [x] Sent %r' % message) connection.close() # 消费者端 import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello', durable=True) def callback(ch, method, properties, body): print(' [x] Received %r' % body) time.sleep(body.count(b'.')) print(' [x] Done') ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello') channel.start_consuming()
2.5 RabbitMQ (发布/订阅模式)
- 发布/订阅模式,需要使用
Exchange
; Exchange
在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息:fanout
: 所有bind到此exchange的queue都可以接收消息;direct
: 通过routingKey和exchange决定的那个唯一的queue可以接收消息;topic
: 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息;headers
: 通过headers来决定把消息发给哪些queue;
# 表达式符号说明: # 代表一个或多个字符, * 代表任何字符 # 例: #.a 会匹配 a.a, aa.a, aaa.a 等 # *.a 会匹配 a.a, b.a, c.a 等 # 示例: fanout模式,一方发送,多方同时接收 # 发送端 import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = 'Hello World!' channel.basic_publish(exchange='logs', routing_key='', body=message) print(' [x] Sent %r' % message) connection.close() # 接收端 import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字, exclusive=True # 会在使用此queue的消费者断开后,自动将queue删除 queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*]Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(' [x] %r' % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() # 示例二: direct 模式(有选择的接收消息) # 发送端(server.py) import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(' [x] Sent %r:%r' % (severity, message)) connection.close() # 接收端(client.py) import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write('Usage: %s [info] [warning] [error]\n' % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*]Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(' [x] %r:%r' % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() # 说明: # 以上操作在terminal启动: python3 client.py info (info级别的接收方) # python3 server.py info (info级别的发送方) # python3 client.py error (error级别的接收方) # python3 server.py error (error级别的发送方) # 示例三: 更细致的消息过滤(topic 模式) # 划分为不同应用程序(例如mysql,python等),不同级别(error, info, debug) # 发送端 import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(' [x] Sent %r:%r' % (routing_key, message)) connection.close() # 接收端 import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write('Usage: %s [binding_key]...\n' % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(' [x] %r:%r' % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() # 说明: # 以上操作在terminal启动: python3 client.py mysql.* (接收任何以mysql开头的消息) # python3 server.py mysql.error (发送mysql的报错日志)
2.6 RabbitMQ rpc实现
# 示例: # RPC server import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print(' [.] fib(%s)' % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print(' [x] Awaiting RPC requests') channel.start_consuming() # RPC client import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue # 生成随机Queue self.channel.basic_consume(self.on_response, # 只要一收到消息,就调用 on_response no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() # 非阻塞版的start_consuming print('no message') return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(' [x] Requesting fib(30)') response = fibonacci_rpc.call(30) print(' [.] Got %r' % response)
参考资料:
相关文章推荐
- ubuntu安装rabbitmq和python的使用实现 (1)
- python使用rabbitmq实例三,交换机(3)
- python使用rabbitmq实现网络爬虫示例
- python使用rabbitmq实现网络爬虫示例
- RabbitMQ 基于python的使用方法(二)
- python中使用rabbitmq消息中间件
- rabbitmq使用__python客户端(消息发送者)
- ubuntu安装rabbitmq和python的使用实现
- python使用rabbitmq实例五,路由键模糊匹配
- python使用rabbitmq实例二,工作队列
- 使用python操作Memcache、Redis、RabbitMQ、
- 使用python操作RabbitMQ,Redis,Memcache,SQLAlchemy 其一
- 使用python操作RabbitMQ,Redis,Memcache,SQLAlchemy 其二
- ubuntu安装rabbitmq和python的使用实现
- python使用rabbitmq实例五,路由键模糊匹配(5)
- python使用rabbitmq实例六,远程结果返回
- python使用rabbitmq实例二,工作队列 (2)
- python使用rabbitmq实例三,交换机
- python采用pika库使用rabbitmq总结,多篇笔记和示例
- python使用rabbitmq实例七,相互关联编号correlation id(7)