探索 OpenStack 之(14):OpenStack 中 RabbitMQ 的使用
2015-02-16 11:16
344 查看
本文是OpenStack中的RabbitMQ使用研究两部分中的第一部分,将介绍RabbitMQ的基本概念,即RabbitMQ是什么。第二部分将介绍其在OpenStack中的使用。
AMQP是一个定义了在应用或者组织之间传送消息的协议的开放标准(anopenstandardforpassingbusinessmessagesbetweenapplicationsororganizations),它最新的版本是1.0。AMQP目标在于解决在两个应用之间传送消息存在的下列问题:
网络是不可靠的=>消息需要保存后再转发并有出错处理机制
与本地调用相比,网络速度慢=>得异步调用
应用之间是不同的(比如不同语言实现、不同操作系统等)=>得与应用无关
应用会经常变化=>同上
AMQP使用异步的、应用对应用的、二进制数据通信来解决这些问题。
RabbitMQ是AMQP的一种实现,它包括Server(服务器端)、Client(客户端)和Plugins(插件)。RabbitMQ服务器是用Erlang语言编写的,其最新版本是刚刚(2015/02/11)发布的3.4.4,而OpenStackJuno中使用的Server是2014年3月发布的3.2.4版本。现在RabbitMQ支持的AMQP版本依然是0.9.1。
RabbitMQ官网和其它网站上有很多文章来描述其基本概念。简单说明如下:
Message(消息):RabbitMQ转发的二进制对象,包括Headers(头)、Properties(属性)和Data(数据),其中数据部分不是必要的。具体见1.2部分的描述。
Producer(生产者):消息的生产者,负责产生消息并把消息发到交换机Exhange的应用。
Consumer(消费者):使用队列Queue从Exchange中获取消息的应用。
Exchange(交换机):负责接收生产者的消息并把它转到到合适的队列Queue。下面有1.3部分描述。
Queue(队列):一个存储Exchange发来的消息的缓冲,并将消息主动发送给Consumer,或者Consumer主动来获取消息。见1.4部分的描述。
Binding(绑定):队列和交换机之间的关系。Exchange根据消息的属性和Binding的属性来转发消息。绑定的一个重要属性是binding_key。
Connection(连接)和Channel(通道):生产者和消费者需要和RabbitMQ建立TCP连接。一些应用需要多个connection,为了节省TCP连接,可以使用Channel,它可以被认为是一种轻型的共享TCP连接的连接。连接需要用户认证,并且支持TLS(SSL)。连接需要显式关闭。
VirtualHost(虚拟主机):RabbitMQ用来进行资源隔离的机制。一个虚机主机会隔离用户、exchange、queue等。默认的虚拟主机为"/"。
消息的几个重要属性:
routing_key:Direct和Topic类型的exchange会根据本属性来转发消息。
delivery_mode:将其值设置为2将用于消息的持久化,持久化的消息会被保存到磁盘上来防止其丢失。下面章节3有描述。
reply_to:一般用来表示RPC实现中客户端的回调队列的名字。下面章节4有描述。
correlation_id:用于使用RabbitMQ来实现RPC的情形。下面章节4有描述。
content_type:表示消息data的编码格式名称。实际上RabbitMQ只负责原样传送消息因此不会使用该属性,该属性只被Publisher和Consumer使用。
消息的确认/删除机制:
Consumer处理消息可能会失败,那么RabbitMQ怎么知道什么时候来删除queue中的消息呢?它使用两种机制:
当RabbitMQ主动将消息发给Consumer以后,它会删除消息
当Consumer发回一个确认后,RabbitMQ会删除消息。
第二种情况下,如果RabbitMQ没收到确认,它会把消息重新放进队列(re-queued)并添加标识'redelivered'表明该消息之前已经发送过,如果没有Consumer的话,消息将保持到有下一个Consumer为止。
Consumer可以主动告诉RabbitMQ消息处理失败了(拒绝消息),并告知RabbitMQ是删除消息还是重新放进队列。
Name名字:交换机名字。空字符串名字的exchange为默认的exchange。
Type类型:Direct,Fanout,Topic,Headers。类型决定exchange的消息转发能力。下面章节2有描述。
durable:值为True/False。值为true的exchange在rabbitmq重启后会被自动创建。OpenStack使用的exchange的该值都为false。
auto_delete:值为True/False。设置为true的话,当所有消费者的连接都关闭后,该exchange会被自动删除。OpenStack使用的exchange的该值都为false。
exclusive:值为True/False。设置为true的话,该exchange只允许被创建的connection使用,并且在该connection关闭后它会被自动删除。
RabbitMQ默认会为每一种类型生成一个或者两个的默认的exchange:
Fanout类型:名字为amq.fanout
Topic类型:名字为amq.topic
Headers类型:名字为amq.match和amq.headers
Direct类型:名字为空字符串的exchange以及amq.direct。其中名字为空的exchange比较特殊。在一个Queue被创建后,RabbitMQ会自动建立它和该exchange之间的binding,并且设置其binding_key为该queue的名字。这样,该语句channel.basic_publish(exchange='',routing_key='hello',body=message)会让该默认的exchange将该message转发到名字为'hello'的队列中。
Exchange会将消息分发(copy)到符合要求的所有队列中。
Consumer可以主动获取或者被动接受Queue里面的消息:
被动接收消息(订阅消息"pushAPI"):使用basic.consume(shortreserved-1,queue-namequeue,consumer-tagconsumer-tag,no-localno-local,no-ackno-ack,bitexclusive,no-waitno-wait,tablearguments)
方法。见5.1示例代码。
主动获取消息("pullAPI"):使用basic.get(shortreserved-1,queue-namequeue,no-ackno-ack)方法。
一个Queue允许有多个Consumer,比如利用RabbitMQ来实现一个简单的loadbalancer。这时候,消息会在这些Consumer之间根据channel的prefetchlevel做分发(请参见AQMP:QoSormessageprefetching),如果该值一样的话,消息会被平均分发给这些Consumer。
stop/start_app
add/delete/list_vhosts
list_queues/exchanges/bindings/connections/channels
trace_on/off
参考文档:
https://www.rabbitmq.com/getstarted.html这里有详细的阐述和示例源代码。
/content/4167781.html这里有官网文档的中文版。
(1).交换机的持久化,需要将其durable属性设为true。chan.exchange_declare(exchange="sorting_room",type="direct",durable=True,auto_delete=False,)
(2).队列的持久化,需要将其durable属性设置为true。chan.queue_declare(queue="po_box",durable=True,exclusive=False,auto_delete=False)
(3).消息的持久化,需要将其DeliveryMode属性设置成2。msg.properties["delivery_mode"]=2
过程:
(1).客户端Client设置消息的routingkey为Service的队列op_q;设置消息的reply-to属性为返回的response的目标队列reponse_q,设置其correlation_id为以随机UUID,然后将消息发到exchange。比如channel.basic_publish(exchange='',routing_key='op_q',properties=pika.BasicProperties(reply_to=reponse_q,correlation_id=self.corr_id),body=request)
(2).Exchange将消息转发到Service的op_q
(3).Service收到该消息后进行处理,然后将response发到exchange,并设置消息的routing_key为原消息的reply_to属性,以及设置其correlation_id为原消息的correlation_id。
ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id=props.correlation_id),body=str(response))
(4).Exchange将消息转发到reponse_q
(5).Client逐一接受response_q中的消息,检查消息的correlation_id是否为等于它发出的消息的correlation_id,是的话表明该消息为它需要的response。
这里有详细的阐述。
py-amqplib(支持AMQP0.8):http://barryp.org/software/py-amqplib/
pika(支持AMQP0.9.1,Python2.6和2.7):https://github.com/pika/pika
txamqp:https://launchpad.net/txamqp
5.3.2使用pika编程(
这里。
6.1
它提供HTTP-basedAPI和browser-basedUI以及CLI来管理RabbitMQ。它的GUI的访问地址是http://<rabbitmqserverip>:15672/#/traces。它的GUI上,提供了一个overview,还可以通过它来管理connection、channel、exchange和queue,以及virtualhost,tracing和policy等。
开源代码实现。
rabbitmq-management,firehose和rabbitmq_tracing,你可以在managementGUI中追踪消息:
自此,RabbitMQ基本上算熟悉了,接下来可以开始分析OpenStack中是如何使用RabbitMQ了。
1RabbitMQ的基本概念
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。网络是不可靠的=>消息需要保存后再转发并有出错处理机制
与本地调用相比,网络速度慢=>得异步调用
应用之间是不同的(比如不同语言实现、不同操作系统等)=>得与应用无关
应用会经常变化=>同上
AMQP使用异步的、应用对应用的、二进制数据通信来解决这些问题。
1.1RabbitMQ的概念非常清晰、简洁
其基本概念参见下图:RabbitMQ
Message(消息):RabbitMQ转发的二进制对象,包括Headers(头)、Properties(属性)和Data(数据),其中数据部分不是必要的。具体见1.2部分的描述。
Producer(生产者):消息的生产者,负责产生消息并把消息发到交换机Exhange的应用。
Consumer(消费者):使用队列Queue从Exchange中获取消息的应用。
Exchange(交换机):负责接收生产者的消息并把它转到到合适的队列Queue。下面有1.3部分描述。
Queue(队列):一个存储Exchange发来的消息的缓冲,并将消息主动发送给Consumer,或者Consumer主动来获取消息。见1.4部分的描述。
Binding(绑定):队列和交换机之间的关系。Exchange根据消息的属性和Binding的属性来转发消息。绑定的一个重要属性是binding_key。
Connection(连接)和Channel(通道):生产者和消费者需要和RabbitMQ建立TCP连接。一些应用需要多个connection,为了节省TCP连接,可以使用Channel,它可以被认为是一种轻型的共享TCP连接的连接。连接需要用户认证,并且支持TLS(SSL)。连接需要显式关闭。
VirtualHost(虚拟主机):RabbitMQ用来进行资源隔离的机制。一个虚机主机会隔离用户、exchange、queue等。默认的虚拟主机为"/"。
1.2关于消息message
消息结构:消息的几个重要属性:
routing_key:Direct和Topic类型的exchange会根据本属性来转发消息。
delivery_mode:将其值设置为2将用于消息的持久化,持久化的消息会被保存到磁盘上来防止其丢失。下面章节3有描述。
reply_to:一般用来表示RPC实现中客户端的回调队列的名字。下面章节4有描述。
correlation_id:用于使用RabbitMQ来实现RPC的情形。下面章节4有描述。
content_type:表示消息data的编码格式名称。实际上RabbitMQ只负责原样传送消息因此不会使用该属性,该属性只被Publisher和Consumer使用。
消息的确认/删除机制:
Consumer处理消息可能会失败,那么RabbitMQ怎么知道什么时候来删除queue中的消息呢?它使用两种机制:
当RabbitMQ主动将消息发给Consumer以后,它会删除消息
当Consumer发回一个确认后,RabbitMQ会删除消息。
第二种情况下,如果RabbitMQ没收到确认,它会把消息重新放进队列(re-queued)并添加标识'redelivered'表明该消息之前已经发送过,如果没有Consumer的话,消息将保持到有下一个Consumer为止。
Consumer可以主动告诉RabbitMQ消息处理失败了(拒绝消息),并告知RabbitMQ是删除消息还是重新放进队列。
1.3exchange交换机
exchange有几个重要的属性:Name名字:交换机名字。空字符串名字的exchange为默认的exchange。
Type类型:Direct,Fanout,Topic,Headers。类型决定exchange的消息转发能力。下面章节2有描述。
durable:值为True/False。值为true的exchange在rabbitmq重启后会被自动创建。OpenStack使用的exchange的该值都为false。
auto_delete:值为True/False。设置为true的话,当所有消费者的连接都关闭后,该exchange会被自动删除。OpenStack使用的exchange的该值都为false。
exclusive:值为True/False。设置为true的话,该exchange只允许被创建的connection使用,并且在该connection关闭后它会被自动删除。
RabbitMQ默认会为每一种类型生成一个或者两个的默认的exchange:
Fanout类型:名字为amq.fanout
Topic类型:名字为amq.topic
Headers类型:名字为amq.match和amq.headers
Direct类型:名字为空字符串的exchange以及amq.direct。其中名字为空的exchange比较特殊。在一个Queue被创建后,RabbitMQ会自动建立它和该exchange之间的binding,并且设置其binding_key为该queue的名字。这样,该语句channel.basic_publish(exchange='',routing_key='hello',body=message)会让该默认的exchange将该message转发到名字为'hello'的队列中。
1.4队列Queue
队列同样有类似于exchange的name、durable、auto_delete和exclusive等属性,并且含义相同。Exchange会将消息分发(copy)到符合要求的所有队列中。
Consumer可以主动获取或者被动接受Queue里面的消息:
被动接收消息(订阅消息"pushAPI"):使用basic.consume(
方法。见5.1示例代码。
主动获取消息("pullAPI"):使用basic.get(
一个Queue允许有多个Consumer,比如利用RabbitMQ来实现一个简单的loadbalancer。这时候,消息会在这些Consumer之间根据channel的prefetchlevel做分发(请参见AQMP:QoSormessageprefetching),如果该值一样的话,消息会被平均分发给这些Consumer。
1.5rabbitmqctlCli
RabbitMQ提供Clirabbitmqctl[-n<node>][-q]<command>[<commandoptions>]来进行管理和配置。常用到的命令有:stop/start_app
add/delete/list_vhosts
list_queues/exchanges/bindings/connections/channels
trace_on/off
2消息转发机制
Exchange根据它自身的类型type、消息的属性routing_key或者headers,以及Binding的属性binding_key来转发消息。Exchange的类型Type | 使用的消息属性 | 使用的Binding属性 | 转发模式 |
Fanout | -(忽略消息的转发属性) | -(忽略binding的转发属性) | Exchange将消息转发到所有与它有binding关系的队列中。 这种方法转发效率较高。OpenStack大量使用这种类型的exchange。 |
Direct | routing_key(任意的字符串,比如"abc") | binding_key(任意的字符串,比如"abc") | Exchange只将消息转到binding的binding_key等于消息的routing_key的队列中。 |
Topic | routing_key(以"."分割的多单词字符串,比如abc.efg.hij) | binding_key(包含"#"和"*"的以“.”分割的多单词字符串,比如*.efg.*) | Exchange只将消息转到消息的routing_key和binding的binding_key匹配的队列中。匹配规则如下: (1)两者以"."分割的单词数目相同 (2)"*"可代表一个单词 (3)"#“可代表零个或多个单词 |
Headers | headers(消息头) | binding_key | Exchange只将消息转到消息的headers和binding的binding_key匹配的队列中。匹配规则待研究。 OpenStack不使用该类型的exchange。 |
3持久化
消息的持久化意味着在RabbitMQ被重启后,消息依然还在。要实现持久化,得实现几个相关组件的持久化:(1).交换机的持久化,需要将其durable属性设为true。chan.exchange_declare(exchange="sorting_room",type="direct",durable=True,auto_delete=False,)
(2).队列的持久化,需要将其durable属性设置为true。chan.queue_declare(queue="po_box",durable=True,exclusive=False,auto_delete=False)
(3).消息的持久化,需要将其DeliveryMode属性设置成2。msg.properties["delivery_mode"]=2
4RPC
可以使用RabbitMQ来实现RPC机制,这里说说其实现原理:过程:
(1).客户端Client设置消息的routingkey为Service的队列op_q;设置消息的reply-to属性为返回的response的目标队列reponse_q,设置其correlation_id为以随机UUID,然后将消息发到exchange。比如channel.basic_publish(exchange='',routing_key='op_q',properties=pika.BasicProperties(reply_to=reponse_q,correlation_id=self.corr_id),body=request)
(2).Exchange将消息转发到Service的op_q
(3).Service收到该消息后进行处理,然后将response发到exchange,并设置消息的routing_key为原消息的reply_to属性,以及设置其correlation_id为原消息的correlation_id。
ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id=props.correlation_id),body=str(response))
(4).Exchange将消息转发到reponse_q
(5).Client逐一接受response_q中的消息,检查消息的correlation_id是否为等于它发出的消息的correlation_id,是的话表明该消息为它需要的response。
5PythonAMQPSDK
常用的PythonAMQPSDK包括:py-amqplib(支持AMQP0.8):
pika(支持AMQP0.9.1,Python2.6和2.7):
txamqp:
5.1一个简单的使用py-amqplib的Consumer实现
#创建Connection和Channel连接到RabbitMQ服务器 conn=amqp.Connection(host="localhost:5672",userid="guest",password="1111",virtual_host="/",insist=False) chan=conn.channel() #创建queue result=chan.queue_declare(queue="debug",durable=True,exclusive=False,auto_delete=False) #创建exchange result=chan.exchange_declare(exchange="sorting_room2",type="topic",durable=True,auto_delete=False,) #创建binding result=chan.queue_bind(queue="debug",exchange="sorting_room2",routing_key="*.debug") #回调函数,当有message到达queue后,该函数会被调用 defrecv_callback(msg): print'Received:'+msg.body+'fromchannel#'+str(msg.channel.channel_id) #lChannel.basic_ack(msg.delivery_tag)#如果no_ack=False的话,可以需要发回一个确认
#启动一个consumer,consumer_tag是该consumer的一个唯一标识符 #no_ack=True表示该consumer不会发回确认 chan.basic_consume(queue='debug',no_ack=True,callback=recv_callback,consumer_tag="debugtag")
#等待有消息发到queue whileTrue: chan.wait() #终止该consumer chan.basic_cancel("testtag") #关闭connection和channel chan.close() conn.close()
5.2一个简单的使用py-amqplib的Producer实现代码
fromamqplibimportclient_0_8asamqp importsys #创建connection和channel conn=amqp.Connection(host="localhost:5672",userid="guest",password="1111",virtual_host="/",insist=False) chan=conn.channel() #创建message msg=amqp.Message(sys.argv[1]) msg.properties["delivery_mode"]=2 #发送message chan.basic_publish(msg,exchange="sorting_room2",routing_key=(sys.argv[2])) #关闭connection和channel chan.close() conn.close()
5.3使用pika
5.3.1安装pika
wgethttps://pypi.python.org/packages/source/p/pika/pika-0.9.14.tar.gz#md5=b99aad4b88961d3c7e4876b8327fc97c tarzxvfpika-0.9.14.tar.gz cdpika-0.9.14 pythonsetup.pyinstall
5.3.2使用pika编程(来源)
#!/usr/bin/envpython ''' rabbitmqtracescripts. require(rabbitmq_tracing): $sudorabbitmq-pluginsenablerabbitmq_tracing usage: $sudorabbitmqctltrace_on $./rabbitmqtrace.py <<output>> ''' importsys importtime fromoptparseimportOptionParser importpika __AUTHOR__='smallfish' __VERSION__='0.0.1' def_out(args): printtime.strftime('%Y-%m-%d%H:%M:%S'),args def_run(host,port,vhost,user,password): conn=pika.BlockingConnection(pika.ConnectionParameters(host=host,port=port,virtual_host=vhost, credentials=pika.PlainCredentials(user,password))) chan=conn.channel() def_on_message(ch,method,properties,body): ret={} ret['routing_key']=method.routing_key ret['headers']=properties.headers ret['body']=body _out(ret) _out('startsubscribeamq.rabbitmq.trace') ret=chan.queue_declare(exclusive=False,auto_delete=True) queue=ret.method.queue chan.queue_bind(exchange='amq.rabbitmq.trace',queue=queue,routing_key='#') chan.queue_bind(exchange='amq.rabbitmq.log',queue=queue,routing_key='#') chan.basic_consume(_on_message,queue=queue,no_ack=True) chan.start_consuming() defmain(): parser=OptionParser('usage:%prog') parser.add_option('','--host',metavar='host',default='localhost',help='rabbitmqhostaddress,default:%default') parser.add_option('','--port',metavar='port',default=5672,type='int',help='rabbitmqport,default:%default') parser.add_option('','--vhost',metavar='vhost',default='/',help='rabbitmqvhost,default:%default') parser.add_option('','--user',metavar='user',default='guest',help='rabbitmquser,default:%default') parser.add_option('','--password',metavar='password',default='guest',help='rabbitmqpassword,default:%default') (options,args)=parser.parse_args() _run(options.host,options.port,options.vhost,options.user,options.password) if__name__=='__main__': main()
6插件和消息追踪
RabbitMQ支持使用插件来支持Management,Federation,Shovel和STOMP。所有的插件都在6.1rabbitmq-management插件
它提供HTTP-basedAPI和browser-basedUI以及CLI来管理RabbitMQ。它的GUI的访问地址是6.2RabbitMQ的firehose机制
该机制提供了一个查看被转发的消息的途径。当打开firehose的时候,RabbitMQ会自动建立amq.rabbitmq.trace和amq.rabbitmq.log两个exchange。你可以编程创建queue从这两个exchange里面获取trace和log,从而观察每一个被处理的消息。这里有一个6.3rabbitmq_tracing插件
rabbitmq_tracing插件在management插件增加了消息追踪的方法,它是从firehose中获取数据。在激活了自此,RabbitMQ基本上算熟悉了,接下来可以开始分析OpenStack中是如何使用RabbitMQ了。
相关文章推荐
- 探索 OpenStack 之(14):OpenStack 中 RabbitMQ 的使用(转)
- NHibernate之旅(14):探索NHibernate中使用视图
- NHibernate之旅(14):探索NHibernate中使用视图
- NHibernate之旅(14):探索NHibernate中使用视图
- Ubuntu 14下使用Devstack安装OpenStack Icehouse-->(1)
- NHibernate之旅(14):探索NHibernate中使用视图
- NHibernate之旅(14):探索NHibernate中使用视图
- NHibernate之旅(14):探索NHibernate中使用视图
- 使用Word中的CheckBox,探索属性/方法 (转)
- ASP.NET DEMO 14: 如何在 GridView/DataGrid 模板列中使用自动回发的 CheckBox/DropDownList
- Effective C# 原则14:使用构造函数链(译)
- Nebula2探秘14-nGuiServer的创建与使用
- 探索 SOA 体系结构和服务的基本原则,第 1 部分: 使用体系结构和抽象级别来创建更好的 SOA
- Atlas学习手记(14):使用ToggleButton Extender来装扮CheckBox
- 探索 SOA 体系结构和服务的基本原则,第 1 部分: 使用体系结构和抽象级别来创建更好的 SOA
- 在企业级 SOA 中使用 Web 服务,第 14/15 部分
- 使用VIM开发软件项目 - (14) 指随意动,移动如飞 (一)
- ASP.NET DEMO 14: 如何在 GridView/DataGrid 模板列中使用自动回发的 CheckBox/DropDownList
- 探索JSF框架中使用的设计模式
- 使用Word中的CheckBox,探索属性/方法