openstack nova 基础知识——rpc回调机制(callback) 和rpc模块中的kombu
2013-05-09 09:54
711 查看
http://www.verydemo.com/demo_c288_i57159.html
前几篇已经知道了nova中消息是怎么发送和接收的了,但是不太明白消息被接收之后,是怎么处理的,看代码中消息接收那块传递的参数不是proxy就是callback,或者是callbacks,这个回调方法到底调用的是哪的方法呢?
从一个服务的启动程序上看,有这样一段代码:
查看文本打印?
rpc_dispatcher = self.manager.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
一个manager创建了一个dispatcher,然后用这个dispatcher创建了一个consumer,这个rpc_dispatcher就是callback,就是Consumer接收到消息之后,调用方法所使用的类。
发送消息时,其实消息的内容就是一个方法名和方法所需要的参数,这两者组成一个字典:
查看文本打印?
@staticmethod
def make_msg(method, **kwargs):
return {'method': method, 'args': kwargs}
rpc_dispatcher相当于manager的一个代理类,其中有一个dispatch()方法,就是用来执行它所代理的manager中的方法的,这个方法就是由接收到的msg(消息)来指定的。manager中定义了对某一个服务相关的管理工作,所以可以通过消息来指定要调用哪个方法进行相关的管理,比如说创建一个实例,就调用manager中的run_instance()方法。
来看一下整体的类图:
还有一个ProxyCallback类,这个是Consumer直接使用的类,是对RpcDispatcher进行了一下封装,主要是将回调的方法放到绿色线程中执行,总的来说,Consumer执行的回调方法是来自相关的Manager,并且在绿色线程中执行。
************************************************************************************************************************************************************************************
Kombu是一个AMQP(Advanced Message Queuing Protocol)消息框架。所谓框架,就是一个软件的半成品,是为了提高开发效率而开发的。
AMQP是一个协议,而RabbitMQ是对这个协议的一个实现。
Kombu和RabbitMQ的关系是什么呢?
我觉得就好像javaAPI和Structs/Hibernate这些框架的关系一样,Kombu对RabbitMQ提供的API进行了封装,使得程序更加面向对象化,比如封装出了Exchange, Queue等这些类,使得对RabbitMQ的操作更加简单,同时,功能更加强悍。
在nova支持好几种这样的框架,可以通过配置文件来配置使用哪种框架,默认的就是使用Kombu。但是Kombu比较让人头疼的地方就是资料太少了,谷歌百度翻墙狂搜下来,也没找到几个有用的介绍,这让小白如何是好?官方文档也甚是简陋,整个文档下来,才2、3个例子,而且还是让人摸不着头脑,没办法,只能硬着头皮上了,把官方的那个example试探性的改了一下,改的简单了一点,最基本的原型。再加上前一篇对RabbitMQ的理解和示例,然后再结合Kombu的文档看,也就慢慢摸清楚状况了。
简单的示例:
查看文本打印?
#! -*- coding:cp936 -*-
# entity.py
from kombu import Exchange, Queue
#定义了一个exchange
task_exchange = Exchange('tasks', type='direct')
#在这里进行了exchange和queue的绑定,并且指定了这个queue的routing_key
task_queue = Queue('piap', task_exchange, routing_key='suo_piao')
查看文本打印?
# send.py
from kombu import Connection
from kombu.messaging import Producer
from entity import task_exchange
from kombu.transport.base import Message
connection = Connection('amqp://guest:guest@localhost:5672//')
channel = connection.channel()
message=Message(channel,body='Hello Kombu')
# produce
producer = Producer(channel,exchange=task_exchange)
producer.publish(message.body,routing_key='suo_piao')
查看文本打印?
#! -*- coding:cp936 -*-
# recv.py
from kombu import Connection
from kombu.messaging import Consumer
from entity import task_queue
connection = Connection('amqp://guest:guest@localhost:5672//')
channel = connection.channel()
def process_media(body, message):#body是某种格式的数据,message是一个Message对象,这两个参数必须提供
print body
message.ack()
# consume
consumer = Consumer(channel, task_queue)
consumer.register_callback(process_media)
consumer.consume()
while True:
connection.drain_events()
nova中的rpc模块并没有直接使用kombu,而是又对它进行了一次封装,封装的主要是发送者和接收者,在nova中,封装成了两个类:Publisher和ConsumerBase,并且根据这两个类派生出了DirectPublisher, DirectConsumer, TopicPublisher, TopicConsumer等子类,使操作更加的方便。
一直就觉得Exchange和发送者的关系很密切,或者说发送者只需要和Exchange打交道,而接收者和Queue的关系很密切,在这里证实了这一点:Publisher中封装了一个Exchange对象,而ConsumerBase中封装了Queue对象。看下面两个类图:
接收者派生系:
发送者派生系:
根据类图可以看到,根据Exchange的类型不同,将发送者和接收者分别分为了三类:Direct, Topic, Fanout
再来看一下Connection类:
amqp模块的Pool类是一个连接池,存放Connection对象,ConnectionContext是从连接池中取出Connection对象,然后进行一下封装,添加上__enter__()和__exit()__方法,可以使用with来安全的创建和释放连接。
基本上使用这些就可以实现消息的发送和接收了,用这些基本的类来看一个简单的例子:
查看文本打印?
#! -*- coding:cp936 -*-
'''''send.py'''
from impl_kombu import *
connection=Connection(None)#创建了一个BrokerConnection对象
channel=connection.get_channel()#得到这个连接的channel
# 生成了一个Exchange对象,name=='nova',type='topic'
# 生成了一个Producer对象,exchange==self.exchange,channel==channel,topic=='compute'
publisher=TopicPublisher(None,channel,'compute')
publisher.send('hello nova')
查看文本打印?
#! -*- coding:cp936 -*-
'''''recv.py'''
from impl_kombu import *
connection=Connection(None)#创建了一个BrokerConnection对象
channel=connection.get_channel()#得到这个连接的channel
def callback(message):
print message
consumer=TopicConsumer(None,channel,'compute',callback,'')
consumer.consume()
while True:
connection.connection.drain_events()
当然,nova中不是直接这样使用的,而是对这些基本的类用模块中的方法(上面类图中蓝色的字体)又进行了一次封装,在外部直接调用这些方法就可以了。
前几篇已经知道了nova中消息是怎么发送和接收的了,但是不太明白消息被接收之后,是怎么处理的,看代码中消息接收那块传递的参数不是proxy就是callback,或者是callbacks,这个回调方法到底调用的是哪的方法呢?
从一个服务的启动程序上看,有这样一段代码:
查看文本打印?
rpc_dispatcher = self.manager.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
一个manager创建了一个dispatcher,然后用这个dispatcher创建了一个consumer,这个rpc_dispatcher就是callback,就是Consumer接收到消息之后,调用方法所使用的类。
发送消息时,其实消息的内容就是一个方法名和方法所需要的参数,这两者组成一个字典:
查看文本打印?
@staticmethod
def make_msg(method, **kwargs):
return {'method': method, 'args': kwargs}
rpc_dispatcher相当于manager的一个代理类,其中有一个dispatch()方法,就是用来执行它所代理的manager中的方法的,这个方法就是由接收到的msg(消息)来指定的。manager中定义了对某一个服务相关的管理工作,所以可以通过消息来指定要调用哪个方法进行相关的管理,比如说创建一个实例,就调用manager中的run_instance()方法。
来看一下整体的类图:
还有一个ProxyCallback类,这个是Consumer直接使用的类,是对RpcDispatcher进行了一下封装,主要是将回调的方法放到绿色线程中执行,总的来说,Consumer执行的回调方法是来自相关的Manager,并且在绿色线程中执行。
************************************************************************************************************************************************************************************
Kombu是一个AMQP(Advanced Message Queuing Protocol)消息框架。所谓框架,就是一个软件的半成品,是为了提高开发效率而开发的。
AMQP是一个协议,而RabbitMQ是对这个协议的一个实现。
Kombu和RabbitMQ的关系是什么呢?
我觉得就好像javaAPI和Structs/Hibernate这些框架的关系一样,Kombu对RabbitMQ提供的API进行了封装,使得程序更加面向对象化,比如封装出了Exchange, Queue等这些类,使得对RabbitMQ的操作更加简单,同时,功能更加强悍。
在nova支持好几种这样的框架,可以通过配置文件来配置使用哪种框架,默认的就是使用Kombu。但是Kombu比较让人头疼的地方就是资料太少了,谷歌百度翻墙狂搜下来,也没找到几个有用的介绍,这让小白如何是好?官方文档也甚是简陋,整个文档下来,才2、3个例子,而且还是让人摸不着头脑,没办法,只能硬着头皮上了,把官方的那个example试探性的改了一下,改的简单了一点,最基本的原型。再加上前一篇对RabbitMQ的理解和示例,然后再结合Kombu的文档看,也就慢慢摸清楚状况了。
简单的示例:
查看文本打印?
#! -*- coding:cp936 -*-
# entity.py
from kombu import Exchange, Queue
#定义了一个exchange
task_exchange = Exchange('tasks', type='direct')
#在这里进行了exchange和queue的绑定,并且指定了这个queue的routing_key
task_queue = Queue('piap', task_exchange, routing_key='suo_piao')
查看文本打印?
# send.py
from kombu import Connection
from kombu.messaging import Producer
from entity import task_exchange
from kombu.transport.base import Message
connection = Connection('amqp://guest:guest@localhost:5672//')
channel = connection.channel()
message=Message(channel,body='Hello Kombu')
# produce
producer = Producer(channel,exchange=task_exchange)
producer.publish(message.body,routing_key='suo_piao')
查看文本打印?
#! -*- coding:cp936 -*-
# recv.py
from kombu import Connection
from kombu.messaging import Consumer
from entity import task_queue
connection = Connection('amqp://guest:guest@localhost:5672//')
channel = connection.channel()
def process_media(body, message):#body是某种格式的数据,message是一个Message对象,这两个参数必须提供
print body
message.ack()
# consume
consumer = Consumer(channel, task_queue)
consumer.register_callback(process_media)
consumer.consume()
while True:
connection.drain_events()
nova中的rpc模块并没有直接使用kombu,而是又对它进行了一次封装,封装的主要是发送者和接收者,在nova中,封装成了两个类:Publisher和ConsumerBase,并且根据这两个类派生出了DirectPublisher, DirectConsumer, TopicPublisher, TopicConsumer等子类,使操作更加的方便。
一直就觉得Exchange和发送者的关系很密切,或者说发送者只需要和Exchange打交道,而接收者和Queue的关系很密切,在这里证实了这一点:Publisher中封装了一个Exchange对象,而ConsumerBase中封装了Queue对象。看下面两个类图:
接收者派生系:
发送者派生系:
根据类图可以看到,根据Exchange的类型不同,将发送者和接收者分别分为了三类:Direct, Topic, Fanout
再来看一下Connection类:
amqp模块的Pool类是一个连接池,存放Connection对象,ConnectionContext是从连接池中取出Connection对象,然后进行一下封装,添加上__enter__()和__exit()__方法,可以使用with来安全的创建和释放连接。
基本上使用这些就可以实现消息的发送和接收了,用这些基本的类来看一个简单的例子:
查看文本打印?
#! -*- coding:cp936 -*-
'''''send.py'''
from impl_kombu import *
connection=Connection(None)#创建了一个BrokerConnection对象
channel=connection.get_channel()#得到这个连接的channel
# 生成了一个Exchange对象,name=='nova',type='topic'
# 生成了一个Producer对象,exchange==self.exchange,channel==channel,topic=='compute'
publisher=TopicPublisher(None,channel,'compute')
publisher.send('hello nova')
查看文本打印?
#! -*- coding:cp936 -*-
'''''recv.py'''
from impl_kombu import *
connection=Connection(None)#创建了一个BrokerConnection对象
channel=connection.get_channel()#得到这个连接的channel
def callback(message):
print message
consumer=TopicConsumer(None,channel,'compute',callback,'')
consumer.consume()
while True:
connection.connection.drain_events()
当然,nova中不是直接这样使用的,而是对这些基本的类用模块中的方法(上面类图中蓝色的字体)又进行了一次封装,在外部直接调用这些方法就可以了。
相关文章推荐
- openstack nova 基础知识——rpc回调机制(callback)
- openstack nova基础知识——rpc模块中的kombu
- openstack nova基础知识——rpc模块中的kombu
- openstack nova 基础知识——rpc回调机制(callback)
- Callback回调机制知识大全
- openstack nova 基础知识——Kombu
- openstack nova 基础知识——Kombu
- 模拟EventCenter,flash自带的事件机制的一个解耦框架,callback回调方式用于模块之间的通信
- openstack nova 基础知识——Kombu
- 【广告算法工程师入门 15】机制设计-基础知识
- python模块之bsddb: bdb高性能嵌入式数据库 1.基础知识
- 基于.NET 2.0的GIS开源项目SharpMap分析手记(十四):ASP.NET2.0实现无刷新客户端回调的Callback机制及例子代码下载
- Servlet基础知识(三)—— 会话机制Session,Session和Cookie的异同
- ASP.NET2.0实现无刷新客户端回调的Callback机制(示例源码)
- Java 回调机制(CallBack) 详解及实例代码
- Hibernate操作数据库的回调机制--Callback
- openstack nova基础知识——RabbitMQ
- openstack nova 基础知识——从源码看一个服务是如何启动的
- 回调(callback)机制
- JAVA异常处理机制(基础知识)