您的位置:首页 > 运维架构

Openstack oslo.message rpc简介

2015-03-17 11:19 190 查看
Openstack中几乎所有的组件都使用MQ作为消息传递的中间件,通过MQ完成了很多协程任务,实现了服务之间的分布式部署。所以学习Openstack中的MQ模块可以更好的掌握Openstack组件之间的通信,其中,oslo.message模块封装了Openstack中几乎所有的对MQ的操作,而rpc又是MQ操作中最典型的应用场景,对它的学习很有必要。

Oslo.message 中的重要概念

server(服务器端): 服务器端为客户端提供了可用的PRC接口。

client(客户端): 客户端负责调用服务器端提供的RPC接口。

exchange(交换):一个包含了各个项目主题(topic)的容器。

topic(主题):一个RPC接口的标识,服务器端在某个主题上监控方法调用,而客户端在某个主题上调用方法。

namespace(命名空间):服务器端可以在一个主题上暴露多组方法,而每组方法属于一个命名空间。

method(方法):一个方法由一个名字和一组命名的参数组成。

transport(传输工具):一个传送PRC请求到服务器端并将响应返回给客户端的底层消息系统。目前常用的transport有 rabbitmq 和qpid。

API version:每个命名空间都有一个版本号,当命令空间的接口变化时,这个版本号也会相应增加。向前兼容的修改只需要更改小版本号,向前不兼容的更改需要更改大版本号。

Oslo.messsage 的典型应用场景

1. Invoke method on one of multiple servers(选择多个服务器端的一个服务器进行远程方法调用)

这种情况下,会有多个服务器端在一个exchange和一个topic上监听方法调用,当客户在相应的exchange和topic上调用方法时,这个请求将会分发给多个服务器端中的一个服务器进行方法的调用。请求的方式会采用round robin模式。

下面的代码是这种应用场景在qpid上的实现,首先是服务器端的代码:

#! /usr/bin/env python

import sys
import time
import logging

from oslo_config import cfg
from oslo import messaging

class TestEndpoint(object):

def __init__(self, server, target=None):
self.server = server
self.target = target

def methodA(self, ctx, **args):
print("%s::TestEndpoint::methodA( ctxt=%s arg=%s ) is called"
% (self.server, str(ctx),str(args)))

def main():
url = 'qpid://localhost:5673'
exchange = 'my-exchange'
topic = 'my-topic'
namespace = 'my-namespace'
argv = sys.argv[1:]
if argv is None:
print 'You should specify the server name'
return -1
server = argv[0]
version = '1.1'
logging.basicConfig(level=logging.INFO)
transport = messaging.get_transport(cfg.CONF, url=url)
target = messaging.Target(exchange=exchange,
topic=topic,
namespace=namespace,
server=server,
version=version)
endpoints = [TestEndpoint(server, target)]
server = messaging.get_rpc_server(transport, target, endpoints, executor='blocking')

try:
server.start()
except KeyboardInterrupt:
server.stop()
server.wait()

if __name__ == "__main__":
sys.exit(main())
接下来是是客户端的代码:
#! /usr/bin/env python

import sys
import time
import logging

from oslo_config import cfg
from oslo import messaging

def main(argv=None):
url = 'qpid://localhost:5673'
exchange = 'my-exchange'
topic = 'my-topic'
namespace = 'my-namespace'
version = '1.1'
logging.basicConfig(level=logging.INFO)
transport = messaging.get_transport(cfg.CONF, url=url)
target = messaging.Target(exchange=exchange,
topic=topic,
namespace=namespace,
version=version)
client = messaging.RPCClient(transport, target, version_cap=version)
test_context = {"application": "my-client",
"time": time.ctime(),
"cast": False}
args = {}
client.call(test_context, 'methodA', **args)
transport.cleanup()

if __name__ == "__main__":
sys.exit(main())

确保使用以下的方式启动qpid和执行代码:

qpidd --auth no -p 5673 #启动qpid
python rpc_qpid_server.py my-server01 #启动server01
python rpc_qpid_server.py my-server02 #启动server02
python rpc_qpid_client.py # 发送消息

2. Invoke Method on a Specific Server (指定多个服务器中的一个进行方法的调用)
这种情况下,会有多个服务器端在一个exchange和一个topic上监听方法调用,客户在相应的exchange和topic上调用方法并指定具体一个server进行方法调用。

此时只需要修改客户端中建立target的代码:

target = messaging.Target(exchange=exchange,
topic=topic,
<strong>server='my-server01',</strong>
namespace=namespace,
version=version)

3. Invoke Method on all of Multiple Servers(在所有的服务器中进行方法调用)
此时需要采用fanout的方法,修改客户端的代码如下:

target = messaging.Target(exchange=exchange,
topic=topic,
<strong> fanout=True,</strong>
namespace=namespace,
version=version)

Oslo.message 提供的rpc的方式:
1. cast - 采用异步的方式,没有结果返回

2. call - 采用同步的方式,会等待结果返回

上面所说的第三个应用场景只能采用cast的方式。

参考文献: https://wiki.openstack.org/wiki/Oslo/Messaging
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息