您的位置:首页 > 其它

cloud foundry的源码解析一览(cloud controller,dea)

2013-07-30 10:17 211 查看
关于cloud foundry 的源码解析的文章很多,最近我也在研究这个东西,感觉内容比较多,也相对比较复杂,我在这里做一个整合,方便大家了解。

1.CloudController

了解整个Cloud Foundry需要熟悉的内容很多,但最核心的东西是nats和event-machine. 关于nats上一篇已经做了介绍,大家可参考基于Cloud Foundry的PaaS实践(二) Cloud Foundry集群部署 ,安装一下执行个小示例程序便可一目了然。关于event-machine,大家可参考EventMachine-scalable-non-blocking-i-o-in-ruby 和 eventmachine_introduction_10.pdf
。简而言之,就是一个用很牛的算法(reactor)写的一个非阻塞的socket通信框架,有多种语言的实现,nats也用到。同样的,照着上面资料中的示例执行一下,就明白了。

有了以上的基础,我们可以开始共同看一下Cloud Foundry的核心,cloud_controller.

Cloud_Controller负责整个Cloud Foundry的用户管理/应用管理/服务管理,基于rails框架对外提供RESTFUL接口服务。我们可以通过分析一般rails应用的方法加以分析。rails应用的掌握个人喜欢通过MVC框架和RESTFUL接口把握。

首先,看一下cloud_controller/config/routes.rb:



按列从左至右依次为:http方法/访问路径/代码文件和函数信息/略。以第一行为例,如果cloud_controller接收到http get请求info路径信息,将使用cloud_controller/app/default_controller.rb文件中的info函数响应。以上供刚刚接触rails的同学参考。

了解了routers.rb的路由方法后,还可以使用wireshark工具,安装vmc(安装ruby后执行gem install vmc),通过vmc help帮助执行vmc 命令的同时,用wireshark抓取http包,刚cloud_controller RESTFUL接口可完全把握。下面使用LINUX的curl工具,显示示例包如下:



当然,这个应答信息显示的不够全面,大家还是wireshark,我这台电脑没装,暂时就不能截图了。另外,大家也可以使用Advance RESTclient,示例:



到此,我们已经可以把握全部restful接口,以及每个接口执行的路径,再列一下cloud_controller/app目录下的文件,一目了然。

在看代码之前,最好再看一下数据模型。我曾经画了个模型图,可惜暂时找不到了,先做个文字介绍吧。

cloud_controller的主要数据模型包括:



(临时画的简表,抱歉未添加表关系和示例数据,过两天要是翻到我写的手册就补上)。

表的内容不难,此处列出的只是比较主要的,可能会有变化,但相信这里的大部分东西不会大变。特别说明,service在Cloud Foundry中指的是System Service,比如Cloud Foundry会根据用户的需要,由用户执行vmc create-server mysql后,为用户创建一个mysql,然后执行vmc bind-service mysql-name app-name后,就将此服务和应用绑定,即应用可以访问VCAP_SERVICES环境变量,得到这个mysql实例的连接信息,包括用户名/密码/IP/端口啥的。这块目前Cloud
Foundry支持API,用法更加简单,但原理如是。实现的信息会保存在services表中,连接信息会保存在binding_tokens表中,服务绑定信息会放在service_bindings表中,服务版本信息会保存service_configs中。大家可以执行一下命令,看看表中数据的变化即可顿悟。

2.dea

上一篇文章分析了router的运行原理,在这一篇开始分析dea组件的运行。。。,记得以前自己刚刚开始接触cloud foundry的时候就会觉得dea是最神奇的一个模块,不过看了它的代码之后它的神秘感也就渐渐的消失了。。

首先来看启动的代码:

[cpp]
view plaincopy

EM.run {
agent = DEA::Agent.new(config)
agent.run()
}

还是在eventmachine中启动,主要的代码都在agent类中。。。
agent类的构造函数主要是进行一些基本信息的配置,例如app目录的路径,总共可用的内存大小等。。。这里就不一看他的代码了。。主要是来看run函数中的代码,因为重要的部分都在这个里面。。。

[html]
view plaincopy

begin #用于创建文件夹
FileUtils.mkdir_p(@droplet_dir)
FileUtils.mkdir_p(@staged_dir)
FileUtils.mkdir_p(@apps_dir)
FileUtils.mkdir_p(@db_dir)
if @secure # Allow traversal by secure users
FileUtils.chmod(0711, @apps_dir)
FileUtils.chmod(0711, @droplet_dir)
end
rescue => e
@logger.fatal("Can't create support directories: #{e}")
exit 1
end

这部分代码用于创建dea存放app代码的文件夹,以及其他一些运行信息的代码。。。。接下来就是最为重要的部分了。。。

[html]
view plaincopy

#启动NATS循环
NATS.start(:uri => @nats_uri) do
# Register ourselves with the system
status_config = @config['status'] || {}
VCAP::Component.register(:type => 'DEA',
:host => @local_ip,
:index => @config['index'],
:config => @config,
:port => status_config['port'],
:user => status_config['user'],
:password => status_config['password'])

uuid = VCAP::Component.uuid
@logger.info("DEA uuid #{uuid}")

# Setup our identity
@hello_message = { :id => uuid, :ip => @local_ip, :port => @file_viewer_port, :version => VERSION }.freeze
@hello_message_json = @hello_message.to_json

# Setup our listeners..
NATS.subscribe('dea.status') { |msg, reply| process_dea_status(msg, reply) } #用于报告dea的状态
NATS.subscribe('droplet.status') { |msg, reply| process_droplet_status(msg, reply) } #用于报告app的状态
NATS.subscribe('dea.discover') { |msg, reply| process_dea_discover(msg, reply) } #controller发送的查找合适的运行环境的询问
NATS.subscribe('dea.find.droplet') { |msg, reply| process_dea_find_droplet(msg, reply) } #在dea中寻找对应的droplet
NATS.subscribe('dea.update') { |msg| process_dea_update(msg) }
NATS.subscribe('dea.stop') { |msg| process_dea_stop(msg) }
NATS.subscribe("dea.#{uuid}.start") { |msg| process_dea_start(msg) }
NATS.subscribe('router.start') { |msg| process_router_start(msg) } #相当于是router的心跳广播消息,dea收到消息后向router报告当前dea的一些基本信息
NATS.subscribe('healthmanager.start') { |msg| process_healthmanager_start(msg) } #healthmanger的广播消息,也是用于报告一些消息
NATS.subscribe('dea.locate') { |msg| process_dea_locate(msg) } #用于通告当前dea的一些环境信息

# Recover existing application state.
recover_existing_droplets
delete_untracked_instance_dirs

EM.add_periodic_timer(@heartbeat_interval) { send_heartbeat } #10
EM.add_periodic_timer(@advertise_interval) { send_advertise } #5
EM.add_timer(MONITOR_INTERVAL) { monitor_apps } #监视app的状态
EM.add_periodic_timer(CRASHES_REAPER_INTERVAL) { crashes_reaper }
EM.add_periodic_timer(VARZ_UPDATE_INTERVAL) { snapshot_varz }
EM.add_periodic_timer(DROPLET_FS_PERCENT_USED_UPDATE_INTERVAL) { update_droplet_fs_usage }

NATS.publish('dea.start', @hello_message_json) #发布消息,表示当前这个dea已经启动了,告诉别人
send_advertise

首先注册该dea组件,接着要在nats中订阅以及发布消息,最后还要向整个系统发送广播消息,表示当前dea模块已经启动了。。。
这样,整个模块就已经启动了,接下来dea就开始进入消息循环了,不断的等待nats的消息,并发送一些心跳消息等。。。

这里就分析几个重要的就可以了。。。首先是:

[html]
view plaincopy

NATS.subscribe('dea.discover') { |msg, reply| process_dea_discover(msg, reply) } #controller发送的查找合适的运行环境的询问

该消息是cc模块发送过来的,当dea接收到这个消息之后会根据发送过来的数据进行分析,判断当前的运行环境是否满足要求,如果满足的话,那么会向cc返回消息,如果不满足的话,那么就会忽略这个消息。。。另外有一个小的插曲就是,dea会根据当前所运行的app的负载状态来推迟回复消息的时间,而cc会选择第一个回复该消息的dea运行该app。。。

[html]
view plaincopy

#这个函数用来处理从nats收到的controller用来查找合适的运行环境的查询,如果不满足,那么dea将不会向controller返回消息
def process_dea_discover(message, reply)
return if @shutting_down
@logger.debug("DEA received discovery message: #{message}") #msg一般是需要运行的app的一些基本的信息
message_json = JSON.parse(message)
# Respond with where to find us if we can help.
if @shutting_down #dea是否已经停止了
@logger.warn('Ignoring request, shutting down.')
elsif @num_clients >= @max_clients || @reserved_mem > @max_memory #判断app的数目是否已经超过了限制
@logger.warn('Ignoring request, not enough resources.')
elsif droplet_fs_usage_threshold_exceeded?
@logger.warn("Droplet FS has exceeded usage threshold, ignoring request")
else
# Check that we properly support the runtime requested
unless runtime_supported? message_json['runtime_info'] #判断运行环境是否满足要求
@logger.debug("Ignoring request, #{message_json['runtime_info']} runtime not supported.")
return
end
# Ensure app's prod flag is set if DEA's prod flag is set
if @prod && !message_json['prod']
@logger.debug("Ignoring request, app_prod=#{message_json['prod']} isn't set, and dea_prod=#{@prod} is.")
return
end
# Pull resource limits and make sure we can accomodate
limits = message_json['limits']
mem_needed = limits['mem']
droplet_id = message_json['droplet'].to_s
if (@reserved_mem + mem_needed > @max_memory)
@logger.warn('Ignoring request, not enough resources.')
return
end
delay = calculate_help_taint(droplet_id)
delay = ([delay, TAINT_MAX_DELAY].min)/1000.0
EM.add_timer(delay) { NATS.publish(reply, @hello_message_json) } #之所以要延迟返回,时为了平衡各个dea之间的压力,因为压力较小的dea先返回
end
end

上述就是discovery消息的处理函数,其实也很简单,说白了就是判断当前的运行环境以及内存等情况是否满足app的需求。。。
接下来再来看一个十分重要的消息:

[html]
view plaincopy

NATS.subscribe("dea.#{uuid}.start") { |msg| process_dea_start(msg) }

这个是从cc发送过来的运行某个app的消息。。。该消息的处理函数首先会根据发送过来的消息生成相应的app的信息,接着会获取该app的源代码,然后再进行启动,由于代码比较长,我们就重点来看app是怎么启动的吧。。。

[html]
view plaincopy

@logger.debug('Completed download')
if not instance[:uris].empty?
port = grab_port #用于获取app的运行端口
if port
instance[:port] = port
else
@logger.warn("Unable to allocate port for instance#{instance[:log_id]}")
stop_droplet(instance)
return
end
else
@logger.info("No URIs found for application. Not assigning a port")
end
if debug
debug_port = grab_port
if debug_port
instance[:debug_ip] = VCAP.local_ip
instance[:debug_port] = debug_port
instance[:debug_mode] = debug
else
@logger.warn("Unable to allocate debug port for instance#{instance[:log_id]}")
stop_droplet(instance)
return
end
end

if console
console_port = grab_port
if console_port
instance[:console_ip] = VCAP.local_ip
instance[:console_port] = console_port
else
@logger.warn("Unable to allocate console port for instance#{instance[:log_id]}")
stop_droplet(instance)
return
end
end

首先是为app分配运行时端口等信息,

[html]
view plaincopy

#启动app的函数
exec_operation = proc do |process|
process.send_data("cd #{instance_dir}\n")
if @secure || @enforce_ulimit
process.send_data("renice 0 $$\n")
process.send_data("ulimit -m #{mem_kbytes} 2> /dev/null\n") # ulimit -m takes kb, soft enforce
process.send_data("ulimit -v 3000000 2> /dev/null\n") # virtual memory at 3G, this will be enforced
process.send_data("ulimit -n #{num_fds} 2> /dev/null\n")
process.send_data("ulimit -u 512 2> /dev/null\n") # processes/threads
process.send_data("ulimit -f #{disk_limit} 2> /dev/null\n") # File size to complete disk usage
process.send_data("umask 077\n")
end
#设置环境变量,书要是访问服务的一些环境变量
app_env.each { |env| process.send_data("export #{env}\n") }
if instance[:port]
process.send_data("./startup -p #{instance[:port]}\n")
else
process.send_data("./startup\n")
end
process.send_data("exit\n")
end

exit_operation = proc do |_, status|
@logger.info("#{name} completed running with status = #{status}.")
@logger.info("#{name} uptime was #{Time.now - instance[:start]}.")
stop_droplet(instance)
end

# Being a bit paranoid here and wipe all processes for the secure user
# before we start..
kill_all_procs_for_user(user) if @secure

#用于真正的启动
Bundler.with_clean_env { EM.system("#{@dea_ruby} -- #{prepare_script} true #{sh_command}", exec_operation, exit_operation) }

接着调用bundler来启动app,在着了我们可以看到

[html]
view plaincopy

process.send_data("renice 0 $$\n")
process.send_data("ulimit -m #{mem_kbytes} 2> /dev/null\n") # ulimit -m takes kb, soft enforce
process.send_data("ulimit -v 3000000 2> /dev/null\n") # virtual memory at 3G, this will be enforced
process.send_data("ulimit -n #{num_fds} 2> /dev/null\n")
process.send_data("ulimit -u 512 2> /dev/null\n") # processes/threads
process.send_data("ulimit -f #{disk_limit} 2> /dev/null\n") # File size to complete disk usage
process.send_data("umask 077\n")

这些用于设置app的运行时的一些资源限制。。。。

[html]
view plaincopy

app_env.each { |env| process.send_data("export #{env}\n") }

这句代码用于给app设置环境变量,其实这些环境变量大多是用于访问服务的一些信息,例如访问数据库的url等信息。。在app当中就可以通过这些环境变量的信息来访问服务了。。。。

[html]
view plaincopy

if instance[:port]
process.send_data("./startup -p #{instance[:port]}\n")
else
process.send_data("./startup\n")
end
process.send_data("exit\n")

最后就是调用启动脚本具体启动app了。。。

[html]
view plaincopy

detect_app_ready(instance, manifest) do |detected|
if detected and not instance[:stop_processed]
@logger.info("Instance #{instance[:log_id]} is ready for connections, notifying system of status")
instance[:state] = :RUNNING
instance[:state_timestamp] = Time.now.to_i
send_single_heartbeat(instance)
register_instance_with_router(instance) #向router注册app的信息
schedule_snapshot
else
@logger.warn('Giving up on connecting app.')
stop_droplet(instance)
end
end

最后分析app的运行情况,并向router发送注册刚刚启动的app的信息。。。

好了,dea最重要的部分就已经分析完成了。。。

其余的消息还有

[html]
view plaincopy

NATS.subscribe('dea.status') { |msg, reply| process_dea_status(msg, reply) } #用于报告dea的状态

用于报告当前dea的运行状态。。。

[html]
view plaincopy

NATS.subscribe('droplet.status') { |msg, reply| process_droplet_status(msg, reply) } #用于报告app的状态

用于报告droplet的状态
嗯,,还有其余的。。就不一一列举出来了。。。。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: