您的位置:首页 > 其它

Neutron 源码分析 --- Neutron-server创建

2018-01-15 18:09 351 查看
本文源码基于Pike版本.

通过setup.py里面:

[entry_points]

console_scripts =

....

neutron-server = neutron.cmd.eventlet.server:main

.....

可知,neutron-server的入口函数在neutron/cmd/eventlet/server/__init__.py:

def main():
server.boot_server(wsgi_eventlet.eventlet_wsgi_server)

def main_rpc_eventlet():
server.boot_server(rpc_eventlet.eventlet_rpc_server)

neutron/server/__init__.py:

def _init_configuration():
# the configuration will be read into the cfg.CONF global data structure
config.init(sys.argv[1:])   #此处传入的参数一般为 --config-file /etc/neutron/neutron.conf --config-file /etc/neutron/plugins/ml2/ml2_conf.ini
config.setup_logging()   #设置neutron的logging,就是以neutron为进程名的打印
config.set_config_defaults()
...省略...

def boot_server(server_func):
_init_configuration()
try:
server_func()
...省略...

server_func为传入的wsgi_eventlet.eventlet_wsgi_server,neutron/server/wsgi_eventlet.py:

def eventlet_wsgi_server():
neutron_api = service.serve_wsgi(service.NeutronApiService) #创建neutron-server服务进程
start_api_and_rpc_workers(neutron_api)

def start_api_and_rpc_workers(neutron_api):
try:
worker_launcher = service.start_all_workers()

pool = eventlet.GreenPool()
api_thread = pool.spawn(neutron_api.wait)
plugin_workers_thread = pool.spawn(worker_launcher.wait)

# api and other workers should die together. When one dies,
# kill the other.
api_thread.link(lambda gt: plugin_workers_thread.kill())
plugin_workers_thread.link(lambda gt: api_thread.kill())

pool.waitall()
except NotImplementedError:
LOG.info("RPC was already started in parent process by "
"plugin.")

neutron_api.wait()

def serve_wsgi(cls):

try:
service = cls.create()
service.start()
class NeutronApiService(WsgiService):
...省略....

class WsgiService(object):
"""Base class for WSGI based services.

For each api you define, you must also define these flags:
:<api>_listen: The address on which to listen
:<api>_listen_port: The port on which to listen

"""

def __init__(self, app_name):
self.app_name = app_name
self.wsgi_app = None

def start(self):
self.wsgi_app = _run_wsgi(self.app_name)  #加载/etc/neutron/api-paste.ini,启动WSGI服务器

def wait(self):
self.wsgi_app.wait()
def _run_wsgi(app_name):
app = config.load_paste_app(app_name)
if not app:
LOG.error('No known API applications configured.')
return
return run_wsgi_app(app)
def load_paste_app(app_name):
"""Builds and returns a WSGI app from a paste config file.

:param app_name: Name of the application to load
"""
loader = wsgi.Loader(cfg.CONF)
app = loader.load_app(app_name)  #加载/etc/neutron/api-paste.ini文件
return app
WSGI介绍可参考WSGI接口.

def run_wsgi_app(app):
server = wsgi.Server("Neutron")
server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
workers=_get_api_workers()) #启动neutron的WSGI server
LOG.info("Neutron service started, listening on %(host)s:%(port)s",
{'host': cfg.CONF.bind_host, 'port': cfg.CONF.bind_port})
return server
def start(self, application, port, host='0.0.0.0', workers=0):
"""Run a WSGI server with the given application."""
self._host = host #/etc/neutron/neutron.conf中的bind_host,默认值为0.0.0.0
self._port = port #/etc/neutron/neutron.conf中的bind_port,默认值为9696,也就是neutron的endpoint 端口
backlog = CONF.backlog #4096

self._socket = self._get_socket(self._host,
self._port,
backlog=backlog)

self._launch(application, workers) #启动服务
回到上面一点,加载/etc/neutron/api-paster.ini文件,这一步过程非常重要,因为它决定了核心功能app的入口,贴上这个文件的内容:

[composite:neutron]
use = egg:Paste#urlmap
/: neutronversions_composite
/v2.0: neutronapi_v2_0

[composite:neutronapi_v2_0]
use = call:neutron.auth:pipeline_factory
noauth = cors http_proxy_to_wsgi request_id catch_errors extensions neutronapiapp_v2_0
keystone = cors http_proxy_to_wsgi request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0

[composite:neutronversions_composite]
use = call:neutron.auth:pipeline_factory
noauth = cors http_proxy_to_wsgi neutronversions
keystone = cors http_proxy_to_wsgi neutronversions

[filter:request_id]
paste.filter_factory = oslo_middleware:RequestId.factory

[filter:catch_errors]
paste.filter_factory = oslo_middleware:CatchErrors.factory

[filter:cors]
paste.filter_factory = oslo_middleware.cors:filter_factory
oslo_config_project = neutron

[filter:http_proxy_to_wsgi]
paste.filter_factory = oslo_middleware.http_proxy_to_wsgi:HTTPProxyToWSGI.factory

[filter:keystonecontext]
paste.filter_factory = neutron.auth:NeutronKeystoneContext.factory

[filter:authtoken]
paste.filter_factory = keystonemiddleware.auth_token:filter_factory

[filter:extensions]
paste.filter_factory = neutron.api.extensions:plugin_aware_extension_middleware_factory

[app:neutronversions]
paste.app_factory = neutron.api.versions:Versions.factory

[app:neutronapiapp_v2_0]
paste.app_factory = neutron.api.v2.router:APIRouter.factory

[filter:osprofiler]
paste.filter_factory = osprofiler.web:WsgiMiddleware.factory
paste depoly的介绍可参考Paste Deployment或者Openstack Restful API 开发框架 Paste + PasteDeploy + Routes +
WebOb.

这里我们重点关注V2.0:

use = call:neutron.auth:pipeline_factory

打开neutron/auth.py:

其中pipeline_factory函数会根据neutron.conf中cfg.CONF.auth_strategy的值来加载pipline,因为这个参数的值是keystone,因此会加载keystone = cors http_proxy_to_wsgi request_id catch_errors authtoken keystonecontext extensions neutronapiapp_v2_0,这一堆filters.

def pipeline_factory(loader, global_conf, **local_conf):
"""Create a paste pipeline based on the 'auth_strategy' config option."""
pipeline = local_conf[cfg.CONF.auth_strategy] #auth_strategy='keystone'
pipeline = pipeline.split()
filters = [loader.get_filter(n) for n in pipeline[:-1]] # filters就是上面keystone后面接的那一堆filters
app = loader.get_app(pipeline[-1]) #基于pecan web框架获取响应REST请求的application,获取的app类型为pecan.middleware.recursive.RecursiveMiddleware对象
filters.reverse() #把上述的filters进行逆序一下,即从最后一个filter开始操作
for filter in filters:
app = filter(app) #利用每一个filter封装一下app,返回最终封装的app
return app
接下来一个一个的从最后一个开始逐步往前进行filter封装,这里以neutronapiapp_v2_0为例.

[app:neutronapiapp_v2_0]
paste.app_factory = neutron.api.v2.router:APIRouter.factory
这个函数会进行web framework的构造, neutron.conf中CONF.web_framework的值默认会pecan,pecan是一个简化的web框架,比paste deploy +webob要便捷许多,关于pecan的介绍请参考pecan documentation,

class APIRouter(base_wsgi.Router):

@classmethod
def factory(cls, global_config, **local_config):
if cfg.CONF.web_framework == 'pecan':
return pecan_app.v2_factory(global_config, **local_config)
return cls(**local_config)
def v2_factory(global_config, **local_config):
...省略...
app = pecan.make_app(root.V2Controller(),
debug=False,
force_canonical=False,
hooks=app_hooks,
guess_content_type_from_ext=True)
startup.initialize_all()
return app
initialize_all这个函数非常重要,它用来构造URL和controller的映射,实际上来讲,也就是把对port,network,subnet等resource的请求路由到真实的application上面去.
RESOURCES = {'network': 'networks',
'subnet': 'subnets',
'subnetpool': 'subnetpools',
'port': 'ports'}

def initialize_all():
manager.init() #构造一个NeutronManager实例
ext_mgr = extensions.PluginAwareExtensionManager.get_instance() #neutron.api.extensions.PluginAwareExtensionManager对象
ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)
# At this stage we have a fully populated resource attribute map;
# build Pecan controllers and routes for all core resources
plugin = directory.get_plugin() #获取core_plugin "ML2Plugin"
for resource, collection in RESOURCES.items(): #对核心资源(subnet等)的操作(CRUD)映射操作函数,具体都由ML2Plugin进行响应请求
resource_registry.register_resource_by_name(resource)
new_controller = res_ctrl.CollectionsController(collection, resource,
plugin=plugin)
manager.NeutronManager.set_controller_for_resource(
collection, new_controller)
manager.NeutronManager.set_plugin_for_resource(collection, plugin)

pecanized_resources = ext_mgr.get_pecan_resources()
for pec_res in pecanized_resources:
manager.NeutronManager.set_controller_for_resource(
pec_res.collection, pec_res.controller)
manager.NeutronManager.set_plugin_for_resource(
pec_res.collection, pec_res.plugin)
如果CONF.web_framework = legacy,则就是原来老的架构,paste deploy+webob+route的复杂架构实现了,代码流程会进入如下实现,但目的效果是一样的.

neutron/api/v2/route.py:

class APIRouter(base_wsgi.Router):
...省略...
def __init__(self, **local_config):
mapper = routes_mapper.Mapper()
manager.init()
plugin = directory.get_plugin()
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)
...省略....
for resource in RESOURCES:
_map_resource(RESOURCES[resource], resource,
attributes.RESOURCE_ATTRIBUTE_MAP.get(
RESOURCES[resource], dict()))
resource_registry.register_resource_by_name(resource)

for resource in SUB_RESOURCES:
_map_resource(SUB_RESOURCES[resource]['collection_name'], resource,
attributes.RESOURCE_ATTRIBUTE_MAP.get(
SUB_RESOURCES[resource]['collection_name'],
dict()),
SUB_RESOURCES[resource]['parent'])
....
到此,当触发对subnet,network,port的create,delete,update,list操作请求时,最终会路由到neutron/plugins/ml2/plugin.py中进行响应处理.

至于剩下的filter这里就不再继续深入了,原理都是一样的.

def start_api_and_rpc_workers(neutron_api):
try:
worker_launcher = service.start_all_workers()

pool = eventlet.GreenPool()
api_thread = pool.spawn(neutron_api.wait)
plugin_workers_thread = pool.spawn(worker_launcher.wait)

# api and other workers should die together. When one dies,
# kill the other.
api_thread.link(lambda gt: plugin_workers_thread.kill())
plugin_workers_thread.link(lambda gt: api_thread.kill())

pool.waitall()
except NotImplementedError:
LOG.info("RPC was already started in parent process by "
"plugin.")

neutron_api.wait()
由代码可见,RPC是由绿色线程(协程)eventlet来实现的,eventlet相较与python thread,区别主要有两点:

1. eventlet使用不需要加锁,同一个时间只有有一个协程运行,而线程一般是并发执行,需要协调工作的机制,比如加锁.

2. 线程由操作系统调度,协程由应用自己控制.

GreenPool就是协程池,可以并发执行任务.

关于eventlet的详细请参考eventlet官方文档,或者openstack基础之eventlet.

def start_all_workers():
workers = _get_rpc_workers() + _get_plugins_workers()
launcher = _start_workers(workers)
registry.notify(resources.PROCESS, events.AFTER_SPAWN, None)
return launcher




可见,plugin主要就是core_plugin = ml2,service_plugins就比较多了,贴上debug出来的值:



这些plugin其实就是源代码目录neutron/services下面的各种plugin模块以及core plugin ML2Plugin.

def _get_plugins_workers():
# NOTE(twilson) get_plugins also returns the core plugin
plugins = directory.get_unique_plugins()

# TODO(twilson) Instead of defaulting here, come up with a good way to
# share a common get_workers default between NeutronPluginBaseV2 and
# ServicePluginBase
return [
plugin_worker
for plugin in plugins if hasattr(plugin, 'get_workers')
for plugin_worker in plugin.get_workers()
]
就是获取含有get_works函数的plugin,在上述具体的那些plugin中并没有直接对这个函数定义,因为他们是定义在父类中的,比如FlavorsPlugin,它的get_works函数是在service_base.ServicePluginBase,即/usr/local/lib/python2.7/dist-packages/neutron_lib/services/base.py中定义的.获取到workers之后,交由oslo.service的ProcessLauncher进行启动,到此neutron启动完毕.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: