您的位置:首页 > 编程语言 > Python开发

python使用rabbitmq实例二,工作队列 (2)

2013-12-26 23:27 1011 查看
上一篇介绍了rabbitmq的安装和经典的hello world!实例。这里将对工作队列(Work Queues)做一个了解。因为是接上一篇说明的,所以如果没看过上一篇,看这篇可能会比较难理解。上一篇的地址是:ubuntu安装rabbitmq和python的使用实现

消息也可以理解为任务,消息发送者可以理解为任务分配者,消息接收者可以理解为工作者,当工作者接收到一个任务,还没完成的时候,任务分配者又发一个任务过来,那就忙不过来了,于是就需要多个工作者来共同处理这些任务,这些工作者,就称为工作队列。结构图如下:





rabbitmq的python实例工作队列

准备工作(Preparation)

在实例程序中,用new_task.py来模拟任务分配者, worker.py来模拟工作者。

修改send.py,从命令行参数里接收信息,并发送

1
import
sys
2
3
message
=
'
'
.join(sys.argv[
1
:])
or
"Hello
World!"
4
channel.basic_publish(exchange
=
'',
5
routing_key
=
'hello'
,
6
body
=
message)
7
print
"
[x] Sent %r"
%
(message,)
修改receive.py的回调函数。

1
import
time
2
3
def
callback(ch,
method,properties,body):
4
print
"
[x] Received %r"
%
(body,)
5
time.sleep(
body.count(
'.'
)
)
6
print
"
[x] Done"
这边先打开两个终端,都运行worker.py,处于监听状态,这边就相当于两个工作者。打开第三个终端,运行new_task.py
$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....


观察worker.py接收到任务,其中一个工作者接收到3个任务 :
$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Received 'Third message...'
[x] Received 'Fifth message.....'


另外一个工作者接收到2个任务 :
$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message..'
[x] Received 'Fourth message....'


从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。

消息确认(Message acknowledgment)

消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:

1
def
callback(ch,
method,properties,body):
2
print
"
[x] Received %r"
%
(body,)
3
time.sleep(
5
)
4
print
"
[x] Done"
5
ch.basic_ack(delivery_tag
=
method.delivery_tag)
这边停顿5秒,可以方便ctrl+c退出。

去除no_ack=True参数或者设置为False也可以。

1
channel.basic_consume(callback,
queue
=
'hello'
,
no_ack
=
False
)
用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。

消息持久化存储(Message durability)

虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:

1
channel.queue_declare(queue
=
'hello'
,
durable
=
True
)
但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:

1
channel.queue_declare(queue
=
'task_queue'
,
durable
=
True
)
在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:

1
channel.basic_publish(exchange
=
'',
2
routing_key
=
"task_queue"
,
3
body
=
message,
4
properties
=
pika.BasicProperties(
5
   
delivery_mode
=
2
,
#
make messagepersistent
6
))
公平调度(Fair dispatch)

上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。

1
channel.basic_qos(prefetch_count
=
1
)
new_task.py完整代码

1
#!/usr/bin/env
python
2
import
pika
3
import
sys
4
5
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
6
host
=
'localhost'
))
7
channel
=
connection.channel()
8
9
channel.queue_declare(queue
=
'task_queue'
,
durable
=
True
)
10
11
message
=
'
'
.join(sys.argv[
1
:])
or
"Hello
World!"
12
channel.basic_publish(exchange
=
'',
13
routing_key
=
'task_queue'
,
14
body
=
message,
15
properties
=
pika.BasicProperties(
16
   
delivery_mode
=
2
,
#
make messagepersistent
17
))
18
print
"
[x] Sent %r"
%
(message,)
19
connection.close()
worker.py完整代码

1
#!/usr/bin/env
python
2
import
pika
3
import
time
4
5
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
6
host
=
'localhost'
))
7
channel
=
connection.channel()
8
9
channel.queue_declare(queue
=
'task_queue'
,
durable
=
True
)
10
print
'
[*] Waiting for messages. To exit press CTRL+C'
11
12
def
callback(ch,
method,properties,body):
13
print
"
[x] Received %r"
%
(body,)
14
time.sleep(
body.count(
'.'
)
)
15
print
"
[x] Done"
16
ch.basic_ack(delivery_tag
=
method.delivery_tag)
17
18
channel.basic_qos(prefetch_count
=
1
)
19
channel.basic_consume(callback,
20
queue
=
'task_queue'
)
21
22
channel.start_consuming()
原文链接:http://www.rabbitmq.com/tutorials/tutorial-two-python.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: