您的位置:首页 > 运维架构

探索 OpenStack 之(14):OpenStack 中 RabbitMQ 的使用

2015-02-16 11:16 344 查看
本文是OpenStack中的RabbitMQ使用研究两部分中的第一部分,将介绍RabbitMQ的基本概念,即RabbitMQ是什么。第二部分将介绍其在OpenStack中的使用。

1RabbitMQ的基本概念

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。

AMQP是一个定义了在应用或者组织之间传送消息的协议的开放标准(anopenstandardforpassingbusinessmessagesbetweenapplicationsororganizations),它最新的版本是1.0。AMQP目标在于解决在两个应用之间传送消息存在的下列问题:

网络是不可靠的=>消息需要保存后再转发并有出错处理机制

与本地调用相比,网络速度慢=>得异步调用

应用之间是不同的(比如不同语言实现、不同操作系统等)=>得与应用无关

应用会经常变化=>同上

AMQP使用异步的、应用对应用的、二进制数据通信来解决这些问题。

RabbitMQ是AMQP的一种实现,它包括Server(服务器端)、Client(客户端)和Plugins(插件)。RabbitMQ服务器是用Erlang语言编写的,其最新版本是刚刚(2015/02/11)发布的3.4.4,而OpenStackJuno中使用的Server是2014年3月发布的3.2.4版本。现在RabbitMQ支持的AMQP版本依然是0.9.1。

1.1RabbitMQ的概念非常清晰、简洁

其基本概念参见下图:



RabbitMQ官网和其它网站上有很多文章来描述其基本概念。简单说明如下:

Message(消息):RabbitMQ转发的二进制对象,包括Headers(头)、Properties(属性)和Data(数据),其中数据部分不是必要的。具体见1.2部分的描述。

Producer(生产者):消息的生产者,负责产生消息并把消息发到交换机Exhange的应用。

Consumer(消费者):使用队列Queue从Exchange中获取消息的应用。

Exchange(交换机):负责接收生产者的消息并把它转到到合适的队列Queue。下面有1.3部分描述。

Queue(队列):一个存储Exchange发来的消息的缓冲,并将消息主动发送给Consumer,或者Consumer主动来获取消息。见1.4部分的描述。

Binding(绑定):队列和交换机之间的关系。Exchange根据消息的属性和Binding的属性来转发消息。绑定的一个重要属性是binding_key。

Connection(连接)和Channel(通道):生产者和消费者需要和RabbitMQ建立TCP连接。一些应用需要多个connection,为了节省TCP连接,可以使用Channel,它可以被认为是一种轻型的共享TCP连接的连接。连接需要用户认证,并且支持TLS(SSL)。连接需要显式关闭。



VirtualHost(虚拟主机):RabbitMQ用来进行资源隔离的机制。一个虚机主机会隔离用户、exchange、queue等。默认的虚拟主机为"/"。

1.2关于消息message

消息结构:



消息的几个重要属性:

routing_key:Direct和Topic类型的exchange会根据本属性来转发消息。

delivery_mode:将其值设置为2将用于消息的持久化,持久化的消息会被保存到磁盘上来防止其丢失。下面章节3有描述。

reply_to:一般用来表示RPC实现中客户端的回调队列的名字。下面章节4有描述。

correlation_id:用于使用RabbitMQ来实现RPC的情形。下面章节4有描述。

content_type:表示消息data的编码格式名称。实际上RabbitMQ只负责原样传送消息因此不会使用该属性,该属性只被Publisher和Consumer使用。

消息的确认/删除机制:

Consumer处理消息可能会失败,那么RabbitMQ怎么知道什么时候来删除queue中的消息呢?它使用两种机制:

当RabbitMQ主动将消息发给Consumer以后,它会删除消息

当Consumer发回一个确认后,RabbitMQ会删除消息。

第二种情况下,如果RabbitMQ没收到确认,它会把消息重新放进队列(re-queued)并添加标识'redelivered'表明该消息之前已经发送过,如果没有Consumer的话,消息将保持到有下一个Consumer为止。

Consumer可以主动告诉RabbitMQ消息处理失败了(拒绝消息),并告知RabbitMQ是删除消息还是重新放进队列。

1.3exchange交换机

exchange有几个重要的属性:

Name名字:交换机名字。空字符串名字的exchange为默认的exchange。

Type类型:Direct,Fanout,Topic,Headers。类型决定exchange的消息转发能力。下面章节2有描述。

durable:值为True/False。值为true的exchange在rabbitmq重启后会被自动创建。OpenStack使用的exchange的该值都为false。

auto_delete:值为True/False。设置为true的话,当所有消费者的连接都关闭后,该exchange会被自动删除。OpenStack使用的exchange的该值都为false。

exclusive:值为True/False。设置为true的话,该exchange只允许被创建的connection使用,并且在该connection关闭后它会被自动删除。

RabbitMQ默认会为每一种类型生成一个或者两个的默认的exchange:

Fanout类型:名字为amq.fanout

Topic类型:名字为amq.topic

Headers类型:名字为amq.match和amq.headers

Direct类型:名字为空字符串的exchange以及amq.direct。其中名字为空的exchange比较特殊。在一个Queue被创建后,RabbitMQ会自动建立它和该exchange之间的binding,并且设置其binding_key为该queue的名字。这样,该语句channel.basic_publish(exchange='',routing_key='hello',body=message)会让该默认的exchange将该message转发到名字为'hello'的队列中。

1.4队列Queue

队列同样有类似于exchange的name、durable、auto_delete和exclusive等属性,并且含义相同。

Exchange会将消息分发(copy)到符合要求的所有队列中。

Consumer可以主动获取或者被动接受Queue里面的消息:

被动接收消息(订阅消息"pushAPI"):使用basic.consume(shortreserved-1,queue-namequeue,consumer-tagconsumer-tag,no-localno-local,no-ackno-ack,bitexclusive,no-waitno-wait,tablearguments)
方法。见5.1示例代码。

主动获取消息("pullAPI"):使用basic.get(shortreserved-1,queue-namequeue,no-ackno-ack)方法。

一个Queue允许有多个Consumer,比如利用RabbitMQ来实现一个简单的loadbalancer。这时候,消息会在这些Consumer之间根据channel的prefetchlevel做分发(请参见AQMP:QoSormessageprefetching),如果该值一样的话,消息会被平均分发给这些Consumer。

1.5rabbitmqctlCli

RabbitMQ提供Clirabbitmqctl[-n<node>][-q]<command>[<commandoptions>]来进行管理和配置。常用到的命令有:

stop/start_app

add/delete/list_vhosts

list_queues/exchanges/bindings/connections/channels

trace_on/off

2消息转发机制

Exchange根据它自身的类型type、消息的属性routing_key或者headers,以及Binding的属性binding_key来转发消息。

Exchange的类型Type使用的消息属性使用的Binding属性转发模式
Fanout-(忽略消息的转发属性)-(忽略binding的转发属性)Exchange将消息转发到所有与它有binding关系的队列中。

这种方法转发效率较高。OpenStack大量使用这种类型的exchange。

Directrouting_key(任意的字符串,比如"abc")binding_key(任意的字符串,比如"abc")Exchange只将消息转到binding的binding_key等于消息的routing_key的队列中。
Topicrouting_key(以"."分割的多单词字符串,比如abc.efg.hij)binding_key(包含"#"和"*"的以“.”分割的多单词字符串,比如*.efg.*)Exchange只将消息转到消息的routing_key和binding的binding_key匹配的队列中。匹配规则如下:

(1)两者以"."分割的单词数目相同

(2)"*"可代表一个单词

(3)"#“可代表零个或多个单词

Headersheaders(消息头)binding_keyExchange只将消息转到消息的headers和binding的binding_key匹配的队列中。匹配规则待研究。

OpenStack不使用该类型的exchange。

参考文档:

https://www.rabbitmq.com/getstarted.html这里有详细的阐述和示例源代码。

/content/4167781.html这里有官网文档的中文版。

3持久化

消息的持久化意味着在RabbitMQ被重启后,消息依然还在。要实现持久化,得实现几个相关组件的持久化:

(1).交换机的持久化,需要将其durable属性设为true。chan.exchange_declare(exchange="sorting_room",type="direct",durable=True,auto_delete=False,)

(2).队列的持久化,需要将其durable属性设置为true。chan.queue_declare(queue="po_box",durable=True,exclusive=False,auto_delete=False)

(3).消息的持久化,需要将其DeliveryMode属性设置成2。msg.properties["delivery_mode"]=2

4RPC

可以使用RabbitMQ来实现RPC机制,这里说说其实现原理:



过程:

(1).客户端Client设置消息的routingkey为Service的队列op_q;设置消息的reply-to属性为返回的response的目标队列reponse_q,设置其correlation_id为以随机UUID,然后将消息发到exchange。比如channel.basic_publish(exchange='',routing_key='op_q',properties=pika.BasicProperties(reply_to=reponse_q,correlation_id=self.corr_id),body=request)

(2).Exchange将消息转发到Service的op_q

(3).Service收到该消息后进行处理,然后将response发到exchange,并设置消息的routing_key为原消息的reply_to属性,以及设置其correlation_id为原消息的correlation_id。

ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id=props.correlation_id),body=str(response))

(4).Exchange将消息转发到reponse_q

(5).Client逐一接受response_q中的消息,检查消息的correlation_id是否为等于它发出的消息的correlation_id,是的话表明该消息为它需要的response。

这里有详细的阐述。

5PythonAMQPSDK

常用的PythonAMQPSDK包括:

py-amqplib(支持AMQP0.8):http://barryp.org/software/py-amqplib/

pika(支持AMQP0.9.1,Python2.6和2.7):https://github.com/pika/pika

txamqp:https://launchpad.net/txamqp

5.1一个简单的使用py-amqplib的Consumer实现

#创建Connection和Channel连接到RabbitMQ服务器
conn=amqp.Connection(host="localhost:5672",userid="guest",password="1111",virtual_host="/",insist=False)
chan=conn.channel()

#创建queue
result=chan.queue_declare(queue="debug",durable=True,exclusive=False,auto_delete=False)

#创建exchange
result=chan.exchange_declare(exchange="sorting_room2",type="topic",durable=True,auto_delete=False,)

#创建binding
result=chan.queue_bind(queue="debug",exchange="sorting_room2",routing_key="*.debug")

#回调函数,当有message到达queue后,该函数会被调用
defrecv_callback(msg):
print'Received:'+msg.body+'fromchannel#'+str(msg.channel.channel_id)
#lChannel.basic_ack(msg.delivery_tag)#如果no_ack=False的话,可以需要发回一个确认

#启动一个consumer,consumer_tag是该consumer的一个唯一标识符
#no_ack=True表示该consumer不会发回确认
chan.basic_consume(queue='debug',no_ack=True,callback=recv_callback,consumer_tag="debugtag")

#等待有消息发到queue
whileTrue:
chan.wait()

#终止该consumer
chan.basic_cancel("testtag")

#关闭connection和channel
chan.close()
conn.close()


5.2一个简单的使用py-amqplib的Producer实现代码

fromamqplibimportclient_0_8asamqp
importsys

#创建connection和channel
conn=amqp.Connection(host="localhost:5672",userid="guest",password="1111",virtual_host="/",insist=False)
chan=conn.channel()

#创建message
msg=amqp.Message(sys.argv[1])
msg.properties["delivery_mode"]=2

#发送message
chan.basic_publish(msg,exchange="sorting_room2",routing_key=(sys.argv[2]))

#关闭connection和channel
chan.close()
conn.close()


5.3使用pika

5.3.1安装pika

wgethttps://pypi.python.org/packages/source/p/pika/pika-0.9.14.tar.gz#md5=b99aad4b88961d3c7e4876b8327fc97c
tarzxvfpika-0.9.14.tar.gz
cdpika-0.9.14
pythonsetup.pyinstall


5.3.2使用pika编程(来源)

#!/usr/bin/envpython
'''
rabbitmqtracescripts.
require(rabbitmq_tracing):
$sudorabbitmq-pluginsenablerabbitmq_tracing
usage:
$sudorabbitmqctltrace_on
$./rabbitmqtrace.py
<<output>>
'''
importsys
importtime
fromoptparseimportOptionParser
importpika

__AUTHOR__='smallfish'
__VERSION__='0.0.1'

def_out(args):
printtime.strftime('%Y-%m-%d%H:%M:%S'),args

def_run(host,port,vhost,user,password):
conn=pika.BlockingConnection(pika.ConnectionParameters(host=host,port=port,virtual_host=vhost,
credentials=pika.PlainCredentials(user,password)))
chan=conn.channel()
def_on_message(ch,method,properties,body):
ret={}
ret['routing_key']=method.routing_key
ret['headers']=properties.headers
ret['body']=body
_out(ret)
_out('startsubscribeamq.rabbitmq.trace')
ret=chan.queue_declare(exclusive=False,auto_delete=True)
queue=ret.method.queue
chan.queue_bind(exchange='amq.rabbitmq.trace',queue=queue,routing_key='#')
chan.queue_bind(exchange='amq.rabbitmq.log',queue=queue,routing_key='#')
chan.basic_consume(_on_message,queue=queue,no_ack=True)
chan.start_consuming()

defmain():
parser=OptionParser('usage:%prog')
parser.add_option('','--host',metavar='host',default='localhost',help='rabbitmqhostaddress,default:%default')
parser.add_option('','--port',metavar='port',default=5672,type='int',help='rabbitmqport,default:%default')
parser.add_option('','--vhost',metavar='vhost',default='/',help='rabbitmqvhost,default:%default')
parser.add_option('','--user',metavar='user',default='guest',help='rabbitmquser,default:%default')
parser.add_option('','--password',metavar='password',default='guest',help='rabbitmqpassword,default:%default')
(options,args)=parser.parse_args()
_run(options.host,options.port,options.vhost,options.user,options.password)

if__name__=='__main__':
main()


6插件和消息追踪

RabbitMQ支持使用插件来支持Management,Federation,Shovel和STOMP。所有的插件都在这里。

6.1rabbitmq-management插件

它提供HTTP-basedAPI和browser-basedUI以及CLI来管理RabbitMQ。它的GUI的访问地址是http://<rabbitmqserverip>:15672/#/traces。它的GUI上,提供了一个overview,还可以通过它来管理connection、channel、exchange和queue,以及virtualhost,tracing和policy等。



6.2RabbitMQ的firehose机制

该机制提供了一个查看被转发的消息的途径。当打开firehose的时候,RabbitMQ会自动建立amq.rabbitmq.trace和amq.rabbitmq.log两个exchange。你可以编程创建queue从这两个exchange里面获取trace和log,从而观察每一个被处理的消息。这里有一个开源代码实现。

6.3rabbitmq_tracing插件

rabbitmq_tracing插件在management插件增加了消息追踪的方法,它是从firehose中获取数据。在激活了rabbitmq-management,firehose和rabbitmq_tracing,你可以在managementGUI中追踪消息:



自此,RabbitMQ基本上算熟悉了,接下来可以开始分析OpenStack中是如何使用RabbitMQ了。


                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: