您的位置:首页 > 大数据 > 人工智能

【containerd 1.0 源码分析】containerd 启动流程分析

2017-08-02 13:17 781 查看


前言

   

containerd 只是一个守护进程,容器的实际运行时由 runC 控制。containerd 主要职责是镜像管理(镜像、元信息等)、容器执行(调用最终运行时组件执行)




一. containerd 源码编译

   需要安装依赖包:btrfs-tools
    直接 make 即可生成 ctr containerd containerd-shim binaries 可执行文件

二. containerd main 函数

   2.1 入口目录为
cmd/containerd/main.go 中 main 函数,NewApp 使用了第三个一个命令行设置,设置名字,以及命令行启动参数,子命令等

app := cli.NewApp()
app.Name = "containerd"
app.Version = version.Version
app.Usage = usage
app.Flags = []cli.Flag{
cli.StringFlag{
Name:  "config,c",
Usage: "path to the configuration file",
Value: defaultConfigPath,
},
cli.StringFlag{
Name:  "log-level,l",
Usage: "set the logging level [debug, info, warn, error, fatal, panic]",
},
cli.StringFlag{
Name:  "address,a",
Usage: "address for containerd's GRPC server",
},
cli.StringFlag{
Name:  "root",
Usage: "containerd root directory",
},
}
app.Commands = []cli.Command{
configCommand,
}


    2.1 Action
这个函数设置的内容比较多,默认的 config 为:root 为 /var/lib/containerd,默认 socket 地址为 /run/containerd./containerd.sock,server.New 在 2.1.1 讲解

app.Action = func(context *cli.Context) error {
if err := server.LoadConfig(context.GlobalString("config"), config); err != nil && !os.IsNotExist(err) {

// apply flags to the config
if err := applyFlags(context, config); err != nil {

address := config.GRPC.Address

server, err := server.New(ctx, config)

if config.Debug.Address != "" {
l, err := sys.GetLocalListener(config.Debug.Address, config.Debug.Uid, config.Debug.Gid)
}
if config.Metrics.Address != "" {
l, err := net.Listen("tcp", config.Metrics.Address)

serve(log.WithModule(ctx, "metrics"), l, server.ServeMetrics)
}

l, err := sys.GetLocalListener(address, config.GRPC.Uid, config.GRPC.Gid)

serve(log.WithModule(ctx, "grpc"), l, server.ServeGRPC)

return handleSignals(ctx, signals, server)
}


    2.1.1 New 函数创建以及初始化
containerd server:

创建 /var/lib/containerd 目录
从 /var/lib/containerd/plugins 加载 plugin
建立 GRPC server
初始化各个插件
最后注册服务就是各个插件里面的实现的接口 Register

// New creates and initializes a new containerd server
func New(ctx context.Context, config *Config) (*Server, error) {
if err := os.MkdirAll(config.Root, 0711); err != nil {
return nil, err
}
if err := apply(ctx, config); err != nil {
return nil, err
}
plugins, err := loadPlugins(config)

rpc := grpc.NewServer(
grpc.UnaryInterceptor(interceptor),
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
)

for _, p := range plugins {
id := p.URI()

initContext := plugin.NewContext(
ctx,
initialized,
config.Root,
id,
)
initContext.Events = s.events
initContext.Address = config.GRPC.Address

// load the plugin specific configuration if it is provided
if p.Config != nil {
pluginConfig, err := config.Decode(p.ID, p.Config)

initContext.Config = pluginConfig
}
instance, err := p.Init(initContext)

if types, ok := initialized[p.Type]; ok {
types[p.ID] = instance
} else {
initialized[p.Type] = map[string]interface{}{
p.ID: instance,
}
}
// check for grpc services that should be registered with the server
if service, ok := instance.(plugin.Service); ok {
services = append(services, service)
}
}
// register services after all plugins have been initialized
for _, service := range services {
if err := service.Register(rpc); err != nil {
return nil, err
}
}
return s, nil
}


     Init 函数 以及 Register 函数在第四章后讲解,各个插件初始,实例化并注册服务

三. plugin

 

Registration 数据结构:

type Registration struct {
Type     PluginType
ID       string
Config   interface{}
Requires []PluginType
Init     func(*InitContext) (interface{}, error)

added bool
}


     
init 函数初始化类型包括

const (
RuntimePlugin     PluginType = "io.containerd.runtime.v1"
GRPCPlugin        PluginType = "io.containerd.grpc.v1"
SnapshotPlugin    PluginType = "io.containerd.snapshotter.v1"
TaskMonitorPlugin PluginType = "io.containerd.monitor.v1"
DiffPlugin        PluginType = "io.containerd.differ.v1"
MetadataPlugin    PluginType = "io.containerd.metadata.v1"
ContentPlugin     PluginType = "io.containerd.content.v1"
)


四. plugin:io.containerd.grpc.v1.containers

路径: services/containers/service.go,ID 为 containers,提供基础元素的存储

funcinit() {
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin,
ID:   "containers",
Requires: []plugin.PluginType{
plugin.MetadataPlugin,
},
Init: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
return NewService(m.(*bolt.DB), ic.Events), nil
},
})
}


实例化为 Service 结构体

type Service struct {
db        *bolt.DB
publisher events.Publisher
}

func NewService(db *bolt.DB, publisher events.Publisher) api.ContainersServer {
return &Service{db: db, publisher: publisher}
}


Register 接口实现,注册 GRPC

func(s *Service) Register(server *grpc.Server) error {
api.RegisterContainersServer(server, s)
return nil
}


可以有如下 GRPC API 方法

type ContainersServer interface {
Get(context.Context, *GetContainerRequest) (*GetContainerResponse, error)
List(context.Context, *ListContainersRequest) (*ListContainersResponse, error)
Create(context.Context, *CreateContainerRequest) (*CreateContainerResponse, error)
Update(context.Context, *UpdateContainerRequest) (*UpdateContainerResponse, error)
Delete(context.Context, *DeleteContainerRequest) (*google_protobuf2.Empty, error)
}


根据下面这个对应哪个 handler 处理

var _Containers_serviceDesc = grpc.ServiceDesc{
ServiceName: "containerd.services.containers.v1.Containers",
HandlerType: (*ContainersServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Get",
Handler:    _Containers_Get_Handler,
},
{
MethodName: "List",
Handler:    _Containers_List_Handler,
},
{
MethodName: "Create",
Handler:    _Containers_Create_Handler,
},
{
MethodName: "Update",
Handler:    _Containers_Update_Handler,
},
{
MethodName: "Delete",
Handler:    _Containers_Delete_Handler,
},
},
Streams:  []grpc.StreamDesc{},
Metadata: "github.com/containerd/containerd/api/services/containers/v1/containers.proto",
}


五. plugin:io.containerd.snapshotter.v1.btrfs

路径:snapshot/btrfs/btrfs.go,ID 为 btrfs

funcinit() {
plugin.Register(&plugin.Registration{
ID:   "btrfs",
Type: plugin.SnapshotPlugin,
Init: func(ic *plugin.InitContext) (interface{}, error) {
return NewSnapshotter(ic.Root)
},
})
}

    
实例化为 snapshotter 结构体

type snapshotter struct {
device string // device of the root
root   string // root provides paths for internal storage.
ms     *storage.MetaStore
}


路径 snapshot/overlay/overlay.go,注册插件 overlayfs

funcinit() {
plugin.Register(&plugin.Registration{
Type: plugin.SnapshotPlugin,
ID:   "overlayfs",
Init: func(ic *plugin.InitContext) (interface{}, error) {
return NewSnapshotter(ic.Root)
},
})
}


io.containerd.diff.v1
类型

    路径 differ/differ.go,注册插件 base-diff

func init() {
plugin.Register(&plugin.Registration{
Type: plugin.DiffPlugin,
ID:   "base-diff",
Requires: []plugin.PluginType{
plugin.ContentPlugin,
plugin.MetadataPlugin,
},
Init: func(ic *plugin.InitContext) (interface{}, error) {
c, err := ic.Get(plugin.ContentPlugin)
if err != nil {
return nil, err
}
md, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
return NewBaseDiff(metadata.NewContentStore(md.(*bolt.DB), c.(content.Store)))
},
})
}


六. plugin:io.containerd.runtime.v1.cgroup

路径:linux/runtime.go,ID 为 linux

func init() {
plugin.Register(&plugin.Registration{
Type: plugin.RuntimePlugin,
ID:   "linux",
Init: New,
Requires: []plugin.PluginType{
plugin.TaskMonitorPlugin,
plugin.MetadataPlugin,
},
Config: &Config{
Shim:    defaultShim,
Runtime: defaultRuntime,
},
})
}


init 函数为 New

func New(ic *plugin.InitContext) (interface{}, error) {
if err := os.MkdirAll(ic.Root, 0711); err != nil {
return nil, err
}
monitor, err := ic.Get(plugin.TaskMonitorPlugin)
if err != nil {
return nil, err
}
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
cfg := ic.Config.(*Config)
r := &Runtime{
root:      ic.Root,
remote:    !cfg.NoShim,
shim:      cfg.Shim,
shimDebug: cfg.ShimDebug,
runtime:   cfg.Runtime,
monitor:   monitor.(runtime.TaskMonitor),
tasks:     runtime.NewTaskList(),
db:        m.(*bolt.DB),
address:   ic.Address,
events:    ic.Events,
}
tasks, err := r.restoreTasks(ic.Context)
if err != nil {
return nil, err
}
for _, t := range tasks {
if err := r.tasks.AddWithNamespace(t.namespace, t); err != nil {
return nil, err
}
}
return r, nil
}


返回结构体 Runtime

type Runtime struct {
root      string
shim      string
shimDebug bool
runtime   string
remote    bool
address   string

monitor runtime.TaskMonitor
tasks   *runtime.TaskList
db      *bolt.DB
events  *events.Exchange
}


   所有插件就不一一列举了,同样方法执行,至此建立 GRPC server 启动完毕。

总结:
   加载插件,根据插件建立 GRPC API 服务
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: