kubernetes组件kubelet之源码分析 启动流程
2017-10-03 11:50
621 查看
1.kubelet简介
kubelet是在每个节点上运行的主要“节点代理”。
kubelet的工作原理是PodSpec。
kubelet采用一组通过各种机制提供的PodSpecs(主要通过apiserver),并确保这些PodSpec中描述的容器运行正常。
kubelet不管理不是由Kubernetes创建的容器。
除了来自Apiserver的PodSpec之外,还有三种可以向Kubelet提供容器清单的方法。
文件:路径作为标志在命令行中传递。此路径下的文件将被定期监控以进行更新。监控期间默认为20秒,可通过标志进行配置。
HTTP端点:HTTP端点作为参数在命令行中传递。每20秒检查一次这个端点(也可以用一个标志来配置)。
HTTP服务器:kubelet还可以监听HTTP并响应一个简单的API提交新的清单。
2.分析的代码版本
3.KubeletServer 配置对象
kubelet 的路口文件跟其他组件一样,都是在
kubelet main 函数入口
这段代码的作用:
options.NewKubeletServer() 主要是初始化启动kubelet所需要的配置参数
这方面涉及的太多,例如 DockerEndpoint 等等 读者有兴趣 可以去看看源码
在这里就不再解释
app.Run(s, nil) 是真正创建并启动kubelet实例
KubeDeps 包含的组件很多,下面列出一些:
run 方法允许传进来的 kubeDeps 为空,这个时候它会自动生成默认的 kubeDeps 对象,这也就是我们上面代码的逻辑。运行 HTTP Server 的代码我们暂时略过,留作以后再讲,继续来看 RunKubelet,它的代码是这样的
RunKubelet 的内容可以分成三个部分:
1.初始化各个对象,比如 eventBroadcaster,这样就能给 apiserver 发送 kubelet 的事件
2.通过 builder 创建出来 Kubelet
3.根据运行模式,运行 Kubelet
创建工作是在 k, err := builder(kubeCfg, kubeDeps, standaloneMode) 这句完成的,默认的 builder 是 CreateAndInitKubelet:
kubelet 的创建
NewMainKubelet 正如名字所示,主要的工作就是创建 Kubelet 这个对象,它包含了 kubelet 运行需要的所有对象,上面的代码就是各种对象的初始化和赋值的过程,这里只介绍几个非常重要的对象来说:
这里并不一一展开所有对象的实现和具体功能,以后的文章会对其中一些继续分析。
kubelet 的运行
运行 kubelet 主要启动两个功能,k.Run() 来进入主循环,k.ListenAndServe() 启动 kubelet 的 API 服务,后者并不是这篇文章的重点,我们来看看前者,它的执行入口是 k.Run(podCfg.Updates()),podCfg.Updates() 我们前面已经说过,它是一个管道,会实时地发送过来 pod 最新的配置信息,至于是怎么实现的,我们以后再说,这里知道它的作用就行。Run 方法的代码如下:
基本上就是 kubelet 各种组件的启动,每个组件都是以 goroutine 运行的,这里不做赘述。最后一句 kl.syncLoop(updates, kl) 是处理所有 pod 更新的主循环,获取 pod 的变化(新建、修改和删除),调用对应的处理函数保证节点上的容器符合 pod 的配置。
参考原文地址
http://cizixs.com/2017/06/06/kubelet-source-code-analysis-part-1
kubelet是在每个节点上运行的主要“节点代理”。
kubelet的工作原理是PodSpec。
kubelet采用一组通过各种机制提供的PodSpecs(主要通过apiserver),并确保这些PodSpec中描述的容器运行正常。
kubelet不管理不是由Kubernetes创建的容器。
除了来自Apiserver的PodSpec之外,还有三种可以向Kubelet提供容器清单的方法。
文件:路径作为标志在命令行中传递。此路径下的文件将被定期监控以进行更新。监控期间默认为20秒,可通过标志进行配置。
HTTP端点:HTTP端点作为参数在命令行中传递。每20秒检查一次这个端点(也可以用一个标志来配置)。
HTTP服务器:kubelet还可以监听HTTP并响应一个简单的API提交新的清单。
2.分析的代码版本
v1.5.0
3.KubeletServer 配置对象
kubelet 的路口文件跟其他组件一样,都是在
cmd/目录下,其真正实现的源码在
/pkg下
➜ kubernetes git:(master) ✗ git checkout v1.5.0 Note: checking out 'v1.5.0'. ➜ kubernetes git:(58b7c16a52) ✗
➜ cmd git:(58b7c16a52) ✗ tree kubelet kubelet ├── BUILD ├── OWNERS ├── app │ ├── BUILD │ ├── auth.go │ ├── bootstrap.go │ ├── bootstrap_test.go │ ├── options │ │ ├── BUILD │ │ └── options.go │ ├── plugins.go │ ├── server.go │ ├── server_linux.go │ ├── server_test.go │ └── server_unsupported.go └── kubelet.go 2 directories, 14 files
kubelet main 函数入口
package main import ( "fmt" "os" "k8s.io/kubernetes/cmd/kubelet/app" "k8s.io/kubernetes/cmd/kubelet/app/options" _ "k8s.io/kubernetes/pkg/client/metrics/prometheus" // for client metric registration "k8s.io/kubernetes/pkg/util/flag" "k8s.io/kubernetes/pkg/util/logs" _ "k8s.io/kubernetes/pkg/version/prometheus" // for version metric registration "k8s.io/kubernetes/pkg/version/verflag" "github.com/spf13/pflag" ) func main() { s := options.NewKubeletServer() s.AddFlags(pflag.CommandLine) flag.InitFlags() logs.InitLogs() defer logs.FlushLogs() verflag.PrintAndExitIfRequested() if err := app.Run(s, nil); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } }
这段代码的作用:
创建一个 KubeletServer 对象,这个对象保存着 kubelet 运行需要的所有配置信息 解析命令行,根据命令行的参数更新 KubeletServer 根据 KubeletServer 的配置运行真正的 kubelet 程序
options.NewKubeletServer() 主要是初始化启动kubelet所需要的配置参数
// KubeletServer encapsulates all of the parameters necessary for starting up // a kubelet. These can either be set via command line or directly. type KubeletServer struct { componentconfig.KubeletConfiguration KubeConfig flag.StringFlag BootstrapKubeconfig string // If true, an invalid KubeConfig will result in the Kubelet exiting with an error. RequireKubeConfig bool AuthPath flag.StringFlag // Deprecated -- use KubeConfig instead APIServerList []string // Deprecated -- use KubeConfig instead // Insert a probability of random errors during calls to the master. ChaosChance float64 // Crash immediately, rather than eating panics. ReallyCrashForTesting bool // TODO(mtaufen): It is increasingly looking like nobody actually uses the // Kubelet's runonce mode anymore, so it may be a candidate // for deprecation and removal. // If runOnce is true, the Kubelet will check the API server once for pods, // run those in addition to the pods specified by the local manifest, and exit. RunOnce bool } // NewKubeletServer will create a new KubeletServer with default values. func NewKubeletServer() *KubeletServer { versioned := &v1alpha1.KubeletConfiguration{} api.Scheme.Default(versioned) config := componentconfig.KubeletConfiguration{} api.Scheme.Convert(versioned, &config, nil) return &KubeletServer{ KubeConfig: flag.NewStringFlag("/var/lib/kubelet/kubeconfig"), RequireKubeConfig: false, // in 1.5, default to true KubeletConfiguration: config, } }
这方面涉及的太多,例如 DockerEndpoint 等等 读者有兴趣 可以去看看源码
在这里就不再解释
app.Run(s, nil) 是真正创建并启动kubelet实例
func Run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) error { if err := run(s, kubeDeps); err != nil { return fmt.Errorf("failed to run Kubelet: %v", err) } return nil } func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) { // TODO: this should be replaced by a --standalone flag standaloneMode := (len(s.APIServerList) == 0 && !s.RequireKubeConfig) if s.ExitOnLockContention && s.LockFilePath == "" { return errors.New("cannot exit on lock file contention: no lock file specified") } done := make(chan struct{}) if s.LockFilePath != "" { glog.Infof("acquiring file lock on %q", s.LockFilePath) if err := flock.Acquire(s.LockFilePath); err != nil { return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err) } if s.ExitOnLockContention { glog.Infof("watching for inotify events for: %v", s.LockFilePath) if err := watchForLockfileContention(s.LockFilePath, done); err != nil { return err } } } // Set feature gates based on the value in KubeletConfiguration err = utilconfig.DefaultFeatureGate.Set(s.KubeletConfiguration.FeatureGates) if err != nil { return err } // Register current configuration with /configz endpoint cfgz, cfgzErr := initConfigz(&s.KubeletConfiguration) if utilconfig.DefaultFeatureGate.DynamicKubeletConfig() { // Look for config on the API server. If it exists, replace s.KubeletConfiguration // with it and continue. initKubeletConfigSync also starts the background thread that checks for new config. // Don't do dynamic Kubelet configuration in runonce mode if s.RunOnce == false { remoteKC, err := initKubeletConfigSync(s) if err == nil { // Update s (KubeletServer) with new config from API server s.KubeletConfiguration = *remoteKC // Ensure that /configz is up to date with the new config if cfgzErr != nil { glog.Errorf("was unable to register configz before due to %s, will not be able to set now", cfgzErr) } else { setConfigz(cfgz, &s.KubeletConfiguration) } // Update feature gates from the new config err = utilconfig.DefaultFeatureGate.Set(s.KubeletConfiguration.FeatureGates) if err != nil { return err } } } } if kubeDeps == nil { var kubeClient, eventClient *clientset.Clientset var cloud cloudprovider.Interface if s.CloudProvider != componentconfigv1alpha1.AutoDetectCloudProvider { cloud, err = cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) if err != nil { return err } if cloud == nil { glog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile) } else { glog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile) } } if s.BootstrapKubeconfig != "" { nodeName, err := getNodeName(cloud, nodeutil.GetHostname(s.HostnameOverride)) if err != nil { return err } if err := bootstrapClientCert(s.KubeConfig.Value(), s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil { return err } } clientConfig, err := CreateAPIServerClientConfig(s) if err == nil { kubeClient, err = clientset.NewForConfig(clientConfig) if err != nil { glog.Warningf("New kubeClient from clientConfig error: %v", err) } // make a separate client for events eventClientConfig := *clientConfig eventClientConfig.QPS = float32(s.EventRecordQPS) eventClientConfig.Burst = int(s.EventBurst) eventClient, err = clientset.NewForConfig(&eventClientConfig) } if err != nil { if s.RequireKubeConfig { return fmt.Errorf("invalid kubeconfig: %v", err) } if standaloneMode { glog.Warningf("No API client: %v", err) } } kubeDeps, err = UnsecuredKubeletDeps(s) if err != nil { return err } kubeDeps.Cloud = cloud kubeDeps.KubeClient = kubeClient kubeDeps.EventClient = eventClient } if kubeDeps.Auth == nil { nodeName, err := getNodeName(kubeDeps.Cloud, nodeutil.GetHostname(s.HostnameOverride)) if err != nil { return err } auth, err := buildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration) if err != nil { return err } kubeDeps.Auth = auth } if kubeDeps.CAdvisorInterface == nil { kubeDeps.CAdvisorInterface, err = cadvisor.New(uint(s.CAdvisorPort), s.ContainerRuntime, s.RootDirectory) if err != nil { return err } } if kubeDeps.ContainerManager == nil { if s.SystemCgroups != "" && s.CgroupRoot == "" { return fmt.Errorf("invalid configuration: system container was specified and cgroup root was not specified") } kubeDeps.ContainerManager, err = cm.NewContainerManager( kubeDeps.Mounter, kubeDeps.CAdvisorInterface, cm.NodeConfig{ RuntimeCgroupsName: s.RuntimeCgroups, SystemCgroupsName: s.SystemCgroups, KubeletCgroupsName: s.KubeletCgroups, ContainerRuntime: s.ContainerRuntime, CgroupsPerQOS: s.ExperimentalCgroupsPerQOS, CgroupRoot: s.CgroupRoot, CgroupDriver: s.CgroupDriver, ProtectKernelDefaults: s.ProtectKernelDefaults, EnableCRI: s.EnableCRI, }, s.ExperimentalFailSwapOn) if err != nil { return err } } if err := checkPermissions(); err != nil { glog.Error(err) } utilruntime.ReallyCrash = s.ReallyCrashForTesting rand.Seed(time.Now().UTC().UnixNano()) // TODO(vmarmol): Do this through container config. oomAdjuster := kubeDeps.OOMAdjuster if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil { glog.Warning(err) } if err := RunKubelet(&s.KubeletConfiguration, kubeDeps, s.RunOnce, standaloneMode); err != nil { return err } if s.HealthzPort > 0 { healthz.DefaultHealthz() go wait.Until(func() { err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil) if err != nil { glog.Errorf("Starting health server failed: %v", err) } }, 5*time.Second, wait.NeverStop) } if s.RunOnce { return nil } <-done return nil }
KubeDeps 包含的组件很多,下面列出一些:
CAdvisorInterface:提供 cAdvisor 接口功能的组件,用来获取监控信息 DockerClient:docker 客户端,用来和 docker 交互 KubeClient:apiserver 客户端,用来和 api server 通信 Mounter:执行 mount 相关操作 NetworkPlugins:网络插件,执行网络设置工作 VolumePlugins:volume 插件,执行 volume 设置工作
run 方法允许传进来的 kubeDeps 为空,这个时候它会自动生成默认的 kubeDeps 对象,这也就是我们上面代码的逻辑。运行 HTTP Server 的代码我们暂时略过,留作以后再讲,继续来看 RunKubelet,它的代码是这样的
func RunKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, runOnce bool, standaloneMode bool) error { hostname := nodeutil.GetHostname(kubeCfg.HostnameOverride) // Query the cloud provider for our node name, default to hostname if kcfg. 1624d Cloud == nil nodeName, err := getNodeName(kubeDeps.Cloud, hostname) if err != nil { return err } eventBroadcaster := record.NewBroadcaster() kubeDeps.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: string(nodeName)}) eventBroadcaster.StartLogging(glog.V(3).Infof) if kubeDeps.EventClient != nil { glog.V(4).Infof("Sending events to api server.") eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")}) } else { glog.Warning("No api server defined - no events will be sent to API server.") } // TODO(mtaufen): I moved the validation of these fields here, from UnsecuredKubeletConfig, // so that I could remove the associated fields from KubeletConfig. I would // prefer this to be done as part of an independent validation step on the // KubeletConfiguration. But as far as I can tell, we don't have an explicit // place for validation of the KubeletConfiguration yet. hostNetworkSources, err := kubetypes.GetValidatedSources(kubeCfg.HostNetworkSources) if err != nil { return err } hostPIDSources, err := kubetypes.GetValidatedSources(kubeCfg.HostPIDSources) if err != nil { return err } hostIPCSources, err := kubetypes.GetValidatedSources(kubeCfg.HostIPCSources) if err != nil { return err } privilegedSources := capabilities.PrivilegedSources{ HostNetworkSources: hostNetworkSources, HostPIDSources: hostPIDSources, HostIPCSources: hostIPCSources, } capabilities.Setup(kubeCfg.AllowPrivileged, privilegedSources, 0) credentialprovider.SetPreferredDockercfgPath(kubeCfg.RootDirectory) glog.V(2).Infof("Using root directory: %v", kubeCfg.RootDirectory) builder := kubeDeps.Builder if builder == nil { builder = CreateAndInitKubelet } if kubeDeps.OSInterface == nil { kubeDeps.OSInterface = kubecontainer.RealOS{} } k, err := builder(kubeCfg, kubeDeps, standaloneMode) if err != nil { return fmt.Errorf("failed to create kubelet: %v", err) } // NewMainKubelet should have set up a pod source config if one didn't exist // when the builder was run. This is just a precaution. if kubeDeps.PodConfig == nil { return fmt.Errorf("failed to create kubelet, pod source config was nil!") } podCfg := kubeDeps.PodConfig rlimit.RlimitNumFiles(uint64(kubeCfg.MaxOpenFiles)) // TODO(dawnchen): remove this once we deprecated old debian containervm images. // This is a workaround for issue: https://github.com/opencontainers/runc/issues/726 // The current chosen number is consistent with most of other os dist. const maxkeysPath = "/proc/sys/kernel/keys/root_maxkeys" const minKeys uint64 = 1000000 key, err := ioutil.ReadFile(maxkeysPath) if err != nil { glog.Errorf("Cannot read keys quota in %s", maxkeysPath) } else { fields := strings.Fields(string(key)) nkey, _ := strconv.ParseUint(fields[0], 10, 64) if nkey < minKeys { glog.Infof("Setting keys quota in %s to %d", maxkeysPath, minKeys) err = ioutil.WriteFile(maxkeysPath, []byte(fmt.Sprintf("%d", uint64(minKeys))), 0644) if err != nil { glog.Warningf("Failed to update %s: %v", maxkeysPath, err) } } } const maxbytesPath = "/proc/sys/kernel/keys/root_maxbytes" const minBytes uint64 = 25000000 bytes, err := ioutil.ReadFile(maxbytesPath) if err != nil { glog.Errorf("Cannot read keys bytes in %s", maxbytesPath) } else { fields := strings.Fields(string(bytes)) nbyte, _ := strconv.ParseUint(fields[0], 10, 64) if nbyte < minBytes { glog.Infof("Setting keys bytes in %s to %d", maxbytesPath, minBytes) err = ioutil.WriteFile(maxbytesPath, []byte(fmt.Sprintf("%d", uint64(minBytes))), 0644) if err != nil { glog.Warningf("Failed to update %s: %v", maxbytesPath, err) } } } // process pods and exit. if runOnce { if _, err := k.RunOnce(podCfg.Updates()); err != nil { return fmt.Errorf("runonce failed: %v", err) } glog.Infof("Started kubelet %s as runonce", version.Get().String()) } else { err := startKubelet(k, podCfg, kubeCfg, kubeDeps) if err != nil { return err } glog.Infof("Started kubelet %s", version.Get().String()) } return nil }
RunKubelet 的内容可以分成三个部分:
1.初始化各个对象,比如 eventBroadcaster,这样就能给 apiserver 发送 kubelet 的事件
2.通过 builder 创建出来 Kubelet
3.根据运行模式,运行 Kubelet
创建工作是在 k, err := builder(kubeCfg, kubeDeps, standaloneMode) 这句完成的,默认的 builder 是 CreateAndInitKubelet:
func CreateAndInitKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, standaloneMode bool) (k kubelet.KubeletBootstrap, err error) { // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop // up into "per source" synchronizations k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, standaloneMode) if err != nil { return nil, err } k.BirthCry() k.StartGarbageCollection() return k, nil }
kubelet 的创建
func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (*Kubelet, error) { ...... // PodConfig 非常重要,它是 pod 信息的来源,kubelet 支持文件、URL 和 apiserver 三种渠道,PodConfig 将它们汇聚到一起,通过管道来传递 if kubeDeps.PodConfig == nil { kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName) } ...... // exec 处理函数,进入到容器中执行命令的方式。之前使用的是 nsenter 命令行的方式,后来 docker 提供了 `docker exec` 命令,默认是后者 var dockerExecHandler dockertools.ExecHandler switch kubeCfg.DockerExecHandlerName { case "native": dockerExecHandler = &dockertools.NativeExecHandler{} case "nsenter": dockerExecHandler = &dockertools.NsenterExecHandler{} default: glog.Warningf("Unknown Docker exec handler %q; defaulting to native", kubeCfg.DockerExecHandlerName) dockerExecHandler = &dockertools.NativeExecHandler{} } // 使用 reflector 把 ListWatch 得到的服务信息实时同步到 serviceStore 对象中 serviceStore := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) if kubeClient != nil { serviceLW := cache.NewListWatchFromClient(kubeClient.Core().RESTClient(), "services", api.NamespaceAll, fields.Everything()) cache.NewReflector(serviceLW, &api.Service{}, serviceStore, 0).Run() } serviceLister := &cache.StoreToServiceLister{Indexer: serviceStore} // 使用 reflector 把 ListWatch 得到的节点信息实时同步到 nodeStore 对象中 nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc) if kubeClient != nil { fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector() nodeLW := cache.NewListWatchFromClient(kubeClient.Core().RESTClient(), "nodes", api.NamespaceAll, fieldSelector) cache.NewReflector(nodeLW, &api.Node{}, nodeStore, 0).Run() } nodeLister := &cache.StoreToNodeLister{Store: nodeStore} nodeInfo := &predicates.CachedNodeInfo{StoreToNodeLister: nodeLister} ...... // 根据配置信息和各种对象创建 Kubelet 实例 klet := &Kubelet{ hostname: hostname, nodeName: nodeName, dockerClient: kubeDeps.DockerClient, kubeClient: kubeClient, ...... clusterDomain: kubeCfg.ClusterDomain, clusterDNS: net.ParseIP(kubeCfg.ClusterDNS), serviceLister: serviceLister, nodeLister: nodeLister, nodeInfo: nodeInfo, masterServiceNamespace: kubeCfg.MasterServiceNamespace, streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration, recorder: kubeDeps.Recorder, cadvisor: kubeDeps.CAdvisorInterface, diskSpaceManager: diskSpaceManager, ...... } ...... // 网络插件的初始化工作 if plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, kubeCfg.NetworkPluginName, &criNetworkHost{&networkHost{klet}}, klet.hairpinMode, klet.nonMasqueradeCIDR, int(kubeCfg.NetworkPluginMTU)); err != nil { return nil, err } else { klet.networkPlugin = plug } // 从 cAdvisor 获取当前机器的信息 machineInfo, err := klet.GetCachedMachineInfo() ...... procFs := procfs.NewProcFS() imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff) klet.livenessManager = proberesults.NewManager() // podManager 负责管理当前节点上的 pod 信息,它保存了所有 pod 的内容,包括 static pod。 // kubelet 从本地文件、网络地址和 apiserver 三个地方获取 pod 的内容, klet.podCache = kubecontainer.NewCache() klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient)) ...... // 创建 runtime 对象,以后会改用 CRI 接口和 runtime 交互,目前使用 DockerManager if kubeCfg.EnableCRI { ...... } else { switch kubeCfg.ContainerRuntime { case "docker": runtime := dockertools.NewDockerManager( kubeDeps.DockerClient, kubecontainer.FilterEventRecorder(kubeDeps.Recorder), klet.livenessManager, containerRefManager, klet.podManager, machineInfo, kubeCfg.PodInfraContainerImage, float32(kubeCfg.RegistryPullQPS), int(kubeCfg.RegistryBurst), ContainerLogsDir, kubeDeps.OSInterface, klet.networkPlugin, klet, klet.httpClient, dockerExecHandler, kubeDeps.OOMAdjuster, procFs, klet.cpuCFSQuota, imageBackOff, kubeCfg.SerializeImagePulls, kubeCfg.EnableCustomMetrics, klet.hairpinMode == componentconfig.HairpinVeth && kubeCfg.NetworkPluginName != "kubenet", kubeCfg.SeccompProfileRoot, kubeDeps.ContainerRuntimeOptions..., ) klet.containerRuntime = runtime klet.runner = kubecontainer.DirectStreamingRunner(runtime) case "rkt": ...... default: return nil, fmt.Errorf("unsupported container runtime %q specified", kubeCfg.ContainerRuntime) } } ...... klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{}) klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime) klet.updatePodCIDR(kubeCfg.PodCIDR) // 创建 containerGC 对象,进行周期性的容器清理工作 containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy) if err != nil { return nil, err } klet.containerGC = containerGC klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod)) // 创建 imageManager 对象,管理镜像 imageManager, err := images.NewImageGCManager(klet.containerRuntime, kubeDeps.CAdvisorInterface, kubeDeps.Recorder, nodeRef, imageGCPolicy) if err != nil { return nil, fmt.Errorf("failed to initialize image manager: %v", err) } klet.imageManager = imageManager // statusManager 实时检测节点上 pod 的状态,并更新到 apiserver 对应的 pod klet.statusManager = status.NewManager(kubeClient, klet.podManager) // probeManager 检测 pod 的状态,并通过 statusManager 进行更新 klet.probeManager = prober.NewManager( klet.statusManager, klet.livenessManager, klet.runner, containerRefManager, kubeDeps.Recorder) // volumeManager 管理节点上 volume klet.volumePluginMgr, err = NewInitializedVolumePluginMgr(klet, kubeDeps.VolumePlugins) if err != nil { return nil, err } ...... // setup volumeManager klet.volumeManager, err = volumemanager.NewVolumeManager( kubeCfg.EnableControllerAttachDetach, nodeName, klet.podManager, klet.kubeClient, klet.volumePluginMgr, klet.containerRuntime, kubeDeps.Mounter, klet.getPodsDir(), kubeDeps.Recorder, kubeCfg.ExperimentalCheckNodeCapabilitiesBeforeMount) // 保存了节点上正在运行的 pod 信息 runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime) if err != nil { return nil, err } klet.runtimeCache = runtimeCache klet.reasonCache = NewReasonCache() klet.workQueue = queue.NewBasicWorkQueue(klet.clock) // podWorkers 是具体的执行者 klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache) ...... klet.kubeletConfiguration = *kubeCfg return klet, nil }
NewMainKubelet 正如名字所示,主要的工作就是创建 Kubelet 这个对象,它包含了 kubelet 运行需要的所有对象,上面的代码就是各种对象的初始化和赋值的过程,这里只介绍几个非常重要的对象来说:
podConfig:这个对象里面会从文件、网络和 apiserver 三个来源中汇聚节点要运行的 pod 信息,并通过管道发送出来,读取这个管道就能获取实时的 pod 最新配置 ServiceLister:能够读取 kubernetes 中服务信息 nodeLister:能够读取 apiserver 中节点的信息 diskSpaceManager:返回容器存储空间的信息 podManager:缓存了 pod 的信息,是所有需要该信息都会去访问的地方 runtime:容器运行时,对容器引擎(docker 或者 rkt)的一层封装,负责调用容器引擎接口管理容器的状态,比如启动、暂停、杀死容器等 probeManager:如果 pod 配置了状态监测,那么 probeManager 会定时检查 pod 是否正常工作,并通过 statusManager 向 apiserver 更新 pod 的状态 volumeManager:负责容器需要的 volume 管理。检测某个 volume 是否已经 mount、获取 pod 使用的 volume 等 podWorkers:具体的执行者,每次有 pod 需要更新的时候都会发送给它
这里并不一一展开所有对象的实现和具体功能,以后的文章会对其中一些继续分析。
kubelet 的运行
func startKubelet(k kubelet.KubeletBootstrap, podCfg *config.PodConfig, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps) error { // start the kubelet go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop) // start the kubelet server if kubeCfg.EnableServer { go wait.Until(func() { k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers) }, 0, wait.NeverStop) } if kubeCfg.ReadOnlyPort > 0 { go wait.Until(func() { k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort)) }, 0, wait.NeverStop) } return nil }
运行 kubelet 主要启动两个功能,k.Run() 来进入主循环,k.ListenAndServe() 启动 kubelet 的 API 服务,后者并不是这篇文章的重点,我们来看看前者,它的执行入口是 k.Run(podCfg.Updates()),podCfg.Updates() 我们前面已经说过,它是一个管道,会实时地发送过来 pod 最新的配置信息,至于是怎么实现的,我们以后再说,这里知道它的作用就行。Run 方法的代码如下:
// Run starts the kubelet reacting to config updates func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { if kl.logServer == nil { kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))) } if kl.kubeClient == nil { glog.Warning("No api server defined - no node status update will be sent.") } if err := kl.initializeModules(); err != nil { kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, events.KubeletSetupFailed, err.Error()) glog.Error(err) kl.runtimeState.setInitError(err) } // Start volume manager go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop) if kl.kubeClient != nil { // Start syncing node status immediately, this may set up things the runtime needs to run. go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) } go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop) go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) // Start loop to sync iptables util rules if kl.makeIPTablesUtilChains { go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop) } // Start a goroutine responsible for killing pods (that are not properly // handled by pod workers). go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop) // Start component sync loops. kl.statusManager.Start() kl.probeManager.Start() // Start the pod lifecycle event generator. kl.pleg.Start() kl.syncLoop(updates, kl) }
基本上就是 kubelet 各种组件的启动,每个组件都是以 goroutine 运行的,这里不做赘述。最后一句 kl.syncLoop(updates, kl) 是处理所有 pod 更新的主循环,获取 pod 的变化(新建、修改和删除),调用对应的处理函数保证节点上的容器符合 pod 的配置。
参考原文地址
http://cizixs.com/2017/06/06/kubelet-source-code-analysis-part-1
相关文章推荐
- k8s源码分析-----kubelet启动流程分析
- kubernetes源码分析 -- kubelet组件
- Kubernetes中controller-manager源码分析--启动流程
- Kubelet源码分析(一) 启动流程分析
- 【OpenStack源码分析之十】虚机启动流程中Nova Compute与周边组件的交互
- spark源码分析Master与Worker启动流程篇
- kubernetes的kube-apiserver组件源码分析
- HDFS2.X源码分析之:NameNode启动流程分析
- Phalcon框架启动流程(部分源码)分析
- SpringMVC 启动流程及相关源码分析
- [Android]Android系统启动流程源码分析
- 源码分析之application启动流程
- Activity启动流程(源码分析)
- Android源码/框架源码分析及Actviity/View等的启动流程
- Activity启动流程源码分析 (上篇)
- 【containerd 1.0 源码分析】containerd 启动流程分析
- 从Launcher开始启动App流程源码分析
- Java NIO框架Netty教程(四) – ServerBootStrap启动流程源码分析
- Java NIO框架Netty教程(四) – ServerBootStrap启动流程源码分析
- Service启动流程源码分析之startService(一)