openstack nova基础知识——RabbitMQ
2012-08-17 06:20
453 查看
nova中各个组件之间的交互是通过“消息队列”来实现的,其中一种实现方法就是使用RabbitMQ,对RabbitMQ的使用,官方文档上有一个非常好的Get Started,由浅及深,结合例子,很容易理解。现在对RabbitMQ的理解,就是利用它可以非常灵活的定制自己想要实现的消息收发机制。
其中,有这样几个角色:producer, consumer, exchange, queue
producer是消息发送者,consumer是消息接受者,中间要通过exchange和queue。producer将消息发送给exchange,exchange决定消息的路由,即决定要将消息发送给哪个queue,然后consumer从queue中取出消息,进行处理,大致流程如下图:
这几个角色当中,我觉得最关键的是这个exchange,它有3种类型:direct, topic, fanout。其中,功能最强大的就是topic,用它完全可以实现direct和fanout的功能。
direct是单条件的路由,即在exchange判断要将消息发送给哪个queue时,判断的依据只能是一个条件;
fanout是广播式的路由,即将消息发送给所有的queue;
topic是多条件的路由,转发消息时,依据的条件是多个,所以只使用topic就可以实现direct和fanout的功能。
上面所说的“条件”,反映到程序中,就是routing_key,这个routing_key出现在两个地方:
1. 每一个发送的消息都有一个routing_key,表示发送的是一个什么样的消息;
2. 每一个queue要和exchange绑定,绑定的时候要提供一个routing_key,表示这个queue想要接收什么样的消息。
这样,exchange就可以根据routing_key,来将消息发送到合适的queue中。
基本的思路就这些吧,下面来看一下官方文档上的那由浅及深的六个例子:
(我很喜欢这种风格的文档,整体由浅及深,适合初学者,其次文章没有大量的生僻词汇,而且例子+图片,比较容易懂,更好的是文章还带点小小的幽默,不由得让人汇心一笑,感觉老外做事就是认真细腻,希望自己也能养成这样的风格)
1. Hello World
最简单的情况,发一个消息,接收,打印出来这个消息。
send.py:
recv.py:
2. 多个consumer
这个例子跟第一个例子基本上一样,只是启动了多个consumer,并且模拟真实情况,即发送的消息使得consumer在短时间内不能完成工作。在这种情况下,多个consumer是如何协调工作的呢?其实,这些都是可以在程序中进行控制的。
send.py
recv.py:
3. fanout exchange的例子:
send.py:
recv.py:
4. direct exchange的例子:
需要注意,一个queue是可以和同一个exchange多次绑定的,每次绑定要用不同的routing_key
send.py:
recv.py:
5. topic exchange的例子
这里的routing_key可以使用一种类似正则表达式的形式,但是特殊字符只能是“*”和“#”,“*”代表一个单词,“#”代表0个或是多个单词。这样发送过来的消息如果符合某个queue的routing_key定义的规则,那么就会转发给这个queue。如下图示例:
send.py:
recv.py:
6. PRC(Remote Procedure Call,远程过程调用)
目前对这个的理解就是发送一个消息,然后还要得到一个结果,即消息要走一个来回。如下图所示:
client.py:
server.py:
几个RabbitMQ相关的命令:
其中,有这样几个角色:producer, consumer, exchange, queue
producer是消息发送者,consumer是消息接受者,中间要通过exchange和queue。producer将消息发送给exchange,exchange决定消息的路由,即决定要将消息发送给哪个queue,然后consumer从queue中取出消息,进行处理,大致流程如下图:
这几个角色当中,我觉得最关键的是这个exchange,它有3种类型:direct, topic, fanout。其中,功能最强大的就是topic,用它完全可以实现direct和fanout的功能。
direct是单条件的路由,即在exchange判断要将消息发送给哪个queue时,判断的依据只能是一个条件;
fanout是广播式的路由,即将消息发送给所有的queue;
topic是多条件的路由,转发消息时,依据的条件是多个,所以只使用topic就可以实现direct和fanout的功能。
上面所说的“条件”,反映到程序中,就是routing_key,这个routing_key出现在两个地方:
1. 每一个发送的消息都有一个routing_key,表示发送的是一个什么样的消息;
2. 每一个queue要和exchange绑定,绑定的时候要提供一个routing_key,表示这个queue想要接收什么样的消息。
这样,exchange就可以根据routing_key,来将消息发送到合适的queue中。
基本的思路就这些吧,下面来看一下官方文档上的那由浅及深的六个例子:
(我很喜欢这种风格的文档,整体由浅及深,适合初学者,其次文章没有大量的生僻词汇,而且例子+图片,比较容易懂,更好的是文章还带点小小的幽默,不由得让人汇心一笑,感觉老外做事就是认真细腻,希望自己也能养成这样的风格)
1. Hello World
最简单的情况,发一个消息,接收,打印出来这个消息。
send.py:
#!/usr/bin/env python import pika # 1. Establish a connection with RabbitMQ server. connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() # 2. Create a queue to which the message will be delivered, let's name it 'hello' channel.queue_declare(queue='hello') # 3. Use a default exchange identified by an empty string, which allows us to specify # exactly to which queue the message should go. The queue name needs to be specified # in the routing_key parameter: channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print " [x] Sent 'Hello World!'" # 4. Close the connection connection.close()
recv.py:
#!/usr/bin/env python import pika # 1. Establish a connection with RabbitMQ server connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # 2. Make sure that the queue exists,run the command as many times as we like, and only one will be created. channel.queue_declare(queue='hello') print ' [*] Waiting for messages. To exit press CTRL+C' # 3. Define a callback function.Whenever we receive a message, # this callback function is called by the Pika library. def callback(ch, method, properties, body): print " [x] Received %r" % (body,) # 4. Subscribe the callback function to a queue. # Tell RabbitMQ that this particular callback function should receive messages from our hello queue. channel.basic_consume(callback, queue='hello', no_ack=True) # 5. Enter a never-ending loop that waits for data and runs callbacks whenever necessary. channel.start_consuming()
2. 多个consumer
这个例子跟第一个例子基本上一样,只是启动了多个consumer,并且模拟真实情况,即发送的消息使得consumer在短时间内不能完成工作。在这种情况下,多个consumer是如何协调工作的呢?其实,这些都是可以在程序中进行控制的。
send.py
#!/usr/bin/env python import pika import sys # 1. Establish a connection with RabbitMQ server. connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() # 2. Create a queue to which the message will be delivered, let's name it 'hello' # 'durable=True' makes the queue persistent channel.queue_declare(queue='task_queue',durable=True) message=' '.join(sys.argv[1:]) or "Hello World!" # 3. Use a default exchange identified by an empty string, which allows us to specify # exactly to which queue the message should go. The queue name needs to be specified # in the routing_key parameter: channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) # 4. Close the connection connection.close()
recv.py:
#!/usr/bin/env python import pika import time # 1. Establish a connection with RabbitMQ server connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # 2. Make sure that the queue exists,run the command as many times as we like, and only one will be created. # 'durable=True' makes the queue persistent channel.queue_declare(queue='task_queue',durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' # 3. Define a callback function.Whenever we receive a message, # this callback function is called by the Pika library. # # Send a ack to tell rabbitmq a task is done, then it can release the message. # If a worker dies, rabbitmq fail to receive the ack, it will redeliver the message to another worker. # Remember to write the last line code, or rabbitmq will eat more and more memory. def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(body.count('.')) print "[x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) # Fair dispatch: Tell rabbitmq not give a worker more than one messages at a time channel.basic_qos(prefetch_count=1) # 4. Subscribe the callback function to a queue. # Tell RabbitMQ that this particular callback function should receive messages from our hello queue. channel.basic_consume(callback, queue='task_queue', no_ack=False)# turn on the (ack)onwledgment, default is False # 5. Enter a never-ending loop that waits for data and runs callbacks whenever necessary. channel.start_consuming()
3. fanout exchange的例子:
send.py:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # declare a exchange, type is fanout(means broadcast),named 'logs'. # exchange is used to receive messages form producer, and send messages to queue. # there are four exchange types: direct, topic, headers and fanout channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', #routing_key is '', because 'fanout' exchange will ignore its value. body=message) print " [x] Sent %r" % (message,) connection.close()
recv.py:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # if a exchange named 'logs' have not declared yet, then declare one, # or just use the existed exchange. channel.exchange_declare(exchange='logs', type='fanout') # declare a temporary queue with a random name # 'exclusive=True' flag will delete the queue when the consumer dies. result = channel.queue_declare(exclusive=True) queue_name = result.method.queue # bind the queue to the exchange, to tell the exchange to send messages to our queue. channel.queue_bind(exchange='logs', queue=queue_name) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r" % (body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
4. direct exchange的例子:
需要注意,一个queue是可以和同一个exchange多次绑定的,每次绑定要用不同的routing_key
send.py:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # declare a exchange, type is direct, named 'logs'. channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' # a message is sent to the direct exchange with a routing_key. # a message is identified by the routing_key. channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print " [x] Sent %r:%r" % (severity, message) connection.close()
recv.py:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # declare a direct exchange named 'direct_logs' channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \ (sys.argv[0],) sys.exit(1) # Bind the queue to the direct exchange, # 'routing_key' flag tells the direct exchange which kind of message it wants to receive. # A queue can bind multiple times to the same direct exchange with different routing_keys, # which means it wants to receive several kinds of messages. for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r:%r" % (method.routing_key, body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
5. topic exchange的例子
这里的routing_key可以使用一种类似正则表达式的形式,但是特殊字符只能是“*”和“#”,“*”代表一个单词,“#”代表0个或是多个单词。这样发送过来的消息如果符合某个queue的routing_key定义的规则,那么就会转发给这个queue。如下图示例:
send.py:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # declare a exchange, type is topic, named 'topic_logs'. # topic exchange allows to do routing based on multiple criteria. channel.exchange_declare(exchange='topic_logs', type='topic') severity = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' # a message is sent to the topic exchange with a routing_key. # a message is identified by the routing_key. # the topic routing_key can be like 'topic.host','topic.topic1.topic3', etc # also can use '*'(one word) and '#'(zero or more words) to substitute word(s). channel.basic_publish(exchange='topic_logs', routing_key=severity, body=message) print " [x] Sent %r:%r" % (severity, message) connection.close()
recv.py:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # declare a topic exchange named 'topic_logs' channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],) sys.exit(1) # Bind the queue to the topic exchange, # 'routing_key' flag tells the topic exchange which kind of message it wants to receive. # A queue can bind multiple times to the same direct exchange with different routing_keys, # which means it wants to receive several kinds of messages. for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r:%r" % (method.routing_key, body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
6. PRC(Remote Procedure Call,远程过程调用)
目前对这个的理解就是发送一个消息,然后还要得到一个结果,即消息要走一个来回。如下图所示:
client.py:
#!/usr/bin/env python import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print " [x] Requesting fib(30)" response = fibonacci_rpc.call(30) print " [.] Got %r" % (response,)
server.py:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print " [.] fib(%s)" % (n,) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue') print " [x] Awaiting RPC requests" channel.start_consuming()
几个RabbitMQ相关的命令:
1. 查看RabbitMQ中有多少个queue,以及每个queue中有多少个消息: $ sudo rabbitmqctl list_queues 2. 查看RabbitMQ中exchange的情况: $ sudo rabbitmqctl list_exchanges 3. 查看RabbitMQ中exchange和queue绑定情况: $ sudo rabbitmqctl list_bindings 4. 启动/停止RabbitMQ: $ sudo invoke-rc.d rabbitmq-server stop/start/etc.
相关文章推荐
- openstack nova 基础知识——eventlet
- openstack nova 基础知识——scheduler的filter和weight
- openstack nova 基础知识——Quota(配额管理)
- openstack nova 基础知识——Quota(配额管理)
- openstack nova 基础知识——policy
- openstack nova 基础知识——eventlet
- openstack nova基础知识——libvirt和qemu(2)
- openstack nova 基础知识——cfg
- openstack nova 基础知识——Quota(配额管理)
- openstack nova 基础知识——rpc回调机制(callback)
- openstack nova基础知识——libvirt和qemu(2)
- openstack nova基础知识——libvirt和qemu(2)
- openstack nova 基础知识――wsgi
- openstack从零开始(1)——基础知识WSGI
- openstack nova基础知识——RabbitMQ
- [部署篇5]VMWare搭建Openstack——计算节点的基础部署和Nova的安装
- openstack基础知识汇总
- OpenStack之基础知识
- 轻松搞定RabbitMQ(一)——RabbitMQ基础知识+HelloWorld