您的位置:首页 > 其它

kubernetes组件kubelet之源码分析 启动流程

2017-10-03 11:50 621 查看
1.kubelet简介

kubelet是在每个节点上运行的主要“节点代理”。

kubelet的工作原理是PodSpec。

kubelet采用一组通过各种机制提供的PodSpecs(主要通过apiserver),并确保这些PodSpec中描述的容器运行正常。

kubelet不管理不是由Kubernetes创建的容器。

除了来自Apiserver的PodSpec之外,还有三种可以向Kubelet提供容器清单的方法。

文件:路径作为标志在命令行中传递。此路径下的文件将被定期监控以进行更新。监控期间默认为20秒,可通过标志进行配置。

HTTP端点:HTTP端点作为参数在命令行中传递。每20秒检查一次这个端点(也可以用一个标志来配置)。

HTTP服务器:kubelet还可以监听HTTP并响应一个简单的API提交新的清单。

2.分析的代码版本
v1.5.0


3.KubeletServer 配置对象

kubelet 的路口文件跟其他组件一样,都是在
cmd/
目录下,其真正实现的源码在
/pkg


➜  kubernetes git:(master) ✗ git checkout v1.5.0
Note: checking out 'v1.5.0'.
➜  kubernetes git:(58b7c16a52) ✗


➜  cmd git:(58b7c16a52) ✗ tree kubelet
kubelet
├── BUILD
├── OWNERS
├── app
│   ├── BUILD
│   ├── auth.go
│   ├── bootstrap.go
│   ├── bootstrap_test.go
│   ├── options
│   │   ├── BUILD
│   │   └── options.go
│   ├── plugins.go
│   ├── server.go
│   ├── server_linux.go
│   ├── server_test.go
│   └── server_unsupported.go
└── kubelet.go

2 directories, 14 files


kubelet main 函数入口

package main

import (
"fmt"
"os"

"k8s.io/kubernetes/cmd/kubelet/app"
"k8s.io/kubernetes/cmd/kubelet/app/options"
_ "k8s.io/kubernetes/pkg/client/metrics/prometheus" // for client metric registration
"k8s.io/kubernetes/pkg/util/flag"
"k8s.io/kubernetes/pkg/util/logs"
_ "k8s.io/kubernetes/pkg/version/prometheus" // for version metric registration
"k8s.io/kubernetes/pkg/version/verflag"

"github.com/spf13/pflag"
)

func main() {
s := options.NewKubeletServer()
s.AddFlags(pflag.CommandLine)

flag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()

verflag.PrintAndExitIfRequested()

if err := app.Run(s, nil); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
}


这段代码的作用:

创建一个 KubeletServer 对象,这个对象保存着 kubelet 运行需要的所有配置信息
解析命令行,根据命令行的参数更新 KubeletServer
根据 KubeletServer 的配置运行真正的 kubelet 程序


options.NewKubeletServer() 主要是初始化启动kubelet所需要的配置参数

// KubeletServer encapsulates all of the parameters necessary for starting up
// a kubelet. These can either be set via command line or directly.
type KubeletServer struct {
componentconfig.KubeletConfiguration

KubeConfig          flag.StringFlag
BootstrapKubeconfig string

// If true, an invalid KubeConfig will result in the Kubelet exiting with an error.
RequireKubeConfig bool
AuthPath          flag.StringFlag // Deprecated -- use KubeConfig instead
APIServerList     []string        // Deprecated -- use KubeConfig instead

// Insert a probability of random errors during calls to the master.
ChaosChance float64
// Crash immediately, rather than eating panics.
ReallyCrashForTesting bool

// TODO(mtaufen): It is increasingly looking like nobody actually uses the
//                Kubelet's runonce mode anymore, so it may be a candidate
//                for deprecation and removal.
// If runOnce is true, the Kubelet will check the API server once for pods,
// run those in addition to the pods specified by the local manifest, and exit.
RunOnce bool
}

// NewKubeletServer will create a new KubeletServer with default values.
func NewKubeletServer() *KubeletServer {
versioned := &v1alpha1.KubeletConfiguration{}
api.Scheme.Default(versioned)
config := componentconfig.KubeletConfiguration{}
api.Scheme.Convert(versioned, &config, nil)
return &KubeletServer{
KubeConfig:           flag.NewStringFlag("/var/lib/kubelet/kubeconfig"),
RequireKubeConfig:    false, // in 1.5, default to true
KubeletConfiguration: config,
}
}


这方面涉及的太多,例如 DockerEndpoint 等等 读者有兴趣 可以去看看源码

在这里就不再解释

app.Run(s, nil) 是真正创建并启动kubelet实例

func Run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) error {
if err := run(s, kubeDeps); err != nil {
return fmt.Errorf("failed to run Kubelet: %v", err)

}
return nil
}

func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) {
// TODO: this should be replaced by a --standalone flag
standaloneMode := (len(s.APIServerList) == 0 && !s.RequireKubeConfig)

if s.ExitOnLockContention && s.LockFilePath == "" {
return errors.New("cannot exit on lock file contention: no lock file specified")
}

done := make(chan struct{})
if s.LockFilePath != "" {
glog.Infof("acquiring file lock on %q", s.LockFilePath)
if err := flock.Acquire(s.LockFilePath); err != nil {
return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
}
if s.ExitOnLockContention {
glog.Infof("watching for inotify events for: %v", s.LockFilePath)
if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
return err
}
}
}

// Set feature gates based on the value in KubeletConfiguration
err = utilconfig.DefaultFeatureGate.Set(s.KubeletConfiguration.FeatureGates)
if err != nil {
return err
}

// Register current configuration with /configz endpoint
cfgz, cfgzErr := initConfigz(&s.KubeletConfiguration)
if utilconfig.DefaultFeatureGate.DynamicKubeletConfig() {
// Look for config on the API server. If it exists, replace s.KubeletConfiguration
// with it and continue. initKubeletConfigSync also starts the background thread that checks for new config.

// Don't do dynamic Kubelet configuration in runonce mode
if s.RunOnce == false {
remoteKC, err := initKubeletConfigSync(s)
if err == nil {
// Update s (KubeletServer) with new config from API server
s.KubeletConfiguration = *remoteKC
// Ensure that /configz is up to date with the new config
if cfgzErr != nil {
glog.Errorf("was unable to register configz before due to %s, will not be able to set now", cfgzErr)
} else {
setConfigz(cfgz, &s.KubeletConfiguration)
}
// Update feature gates from the new config
err = utilconfig.DefaultFeatureGate.Set(s.KubeletConfiguration.FeatureGates)
if err != nil {
return err
}
}
}
}

if kubeDeps == nil {
var kubeClient, eventClient *clientset.Clientset
var cloud cloudprovider.Interface

if s.CloudProvider != componentconfigv1alpha1.AutoDetectCloudProvider {
cloud, err = cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
return err
}
if cloud == nil {
glog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
} else {
glog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
}
}

if s.BootstrapKubeconfig != "" {
nodeName, err := getNodeName(cloud, nodeutil.GetHostname(s.HostnameOverride))
if err != nil {
return err
}
if err := bootstrapClientCert(s.KubeConfig.Value(), s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil {
return err
}
}

clientConfig, err := CreateAPIServerClientConfig(s)
if err == nil {
kubeClient, err = clientset.NewForConfig(clientConfig)
if err != nil {
glog.Warningf("New kubeClient from clientConfig error: %v", err)
}
// make a separate client for events
eventClientConfig := *clientConfig
eventClientConfig.QPS = float32(s.EventRecordQPS)
eventClientConfig.Burst = int(s.EventBurst)
eventClient, err = clientset.NewForConfig(&eventClientConfig)
}
if err != nil {
if s.RequireKubeConfig {
return fmt.Errorf("invalid kubeconfig: %v", err)
}
if standaloneMode {
glog.Warningf("No API client: %v", err)
}
}

kubeDeps, err = UnsecuredKubeletDeps(s)
if err != nil {
return err
}

kubeDeps.Cloud = cloud
kubeDeps.KubeClient = kubeClient
kubeDeps.EventClient = eventClient
}

if kubeDeps.Auth == nil {
nodeName, err := getNodeName(kubeDeps.Cloud, nodeutil.GetHostname(s.HostnameOverride))
if err != nil {
return err
}
auth, err := buildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
if err != nil {
return err
}
kubeDeps.Auth = auth
}

if kubeDeps.CAdvisorInterface == nil {
kubeDeps.CAdvisorInterface, err = cadvisor.New(uint(s.CAdvisorPort), s.ContainerRuntime, s.RootDirectory)
if err != nil {
return err
}
}

if kubeDeps.ContainerManager == nil {
if s.SystemCgroups != "" && s.CgroupRoot == "" {
return fmt.Errorf("invalid configuration: system container was specified and cgroup root was not specified")
}
kubeDeps.ContainerManager, err = cm.NewContainerManager(
kubeDeps.Mounter,
kubeDeps.CAdvisorInterface,
cm.NodeConfig{
RuntimeCgroupsName:    s.RuntimeCgroups,
SystemCgroupsName:     s.SystemCgroups,
KubeletCgroupsName:    s.KubeletCgroups,
ContainerRuntime:      s.ContainerRuntime,
CgroupsPerQOS:         s.ExperimentalCgroupsPerQOS,
CgroupRoot:            s.CgroupRoot,
CgroupDriver:          s.CgroupDriver,
ProtectKernelDefaults: s.ProtectKernelDefaults,
EnableCRI:             s.EnableCRI,
},
s.ExperimentalFailSwapOn)

if err != nil {
return err
}
}

if err := checkPermissions(); err != nil {
glog.Error(err)
}

utilruntime.ReallyCrash = s.ReallyCrashForTesting

rand.Seed(time.Now().UTC().UnixNano())

// TODO(vmarmol): Do this through container config.
oomAdjuster := kubeDeps.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
glog.Warning(err)
}

if err := RunKubelet(&s.KubeletConfiguration, kubeDeps, s.RunOnce, standaloneMode); err != nil {
return err
}

if s.HealthzPort > 0 {
healthz.DefaultHealthz()
go wait.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil)
if err != nil {
glog.Errorf("Starting health server failed: %v", err)
}
}, 5*time.Second, wait.NeverStop)
}

if s.RunOnce {
return nil
}

<-done
return nil
}


KubeDeps 包含的组件很多,下面列出一些:

CAdvisorInterface:提供 cAdvisor 接口功能的组件,用来获取监控信息
DockerClient:docker 客户端,用来和 docker 交互
KubeClient:apiserver 客户端,用来和 api server 通信
Mounter:执行 mount 相关操作
NetworkPlugins:网络插件,执行网络设置工作
VolumePlugins:volume 插件,执行 volume 设置工作


run 方法允许传进来的 kubeDeps 为空,这个时候它会自动生成默认的 kubeDeps 对象,这也就是我们上面代码的逻辑。运行 HTTP Server 的代码我们暂时略过,留作以后再讲,继续来看 RunKubelet,它的代码是这样的

func RunKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, runOnce bool, standaloneMode bool) error {
hostname := nodeutil.GetHostname(kubeCfg.HostnameOverride)
// Query the cloud provider for our node name, default to hostname if kcfg.
1624d
Cloud == nil
nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
if err != nil {
return err
}

eventBroadcaster := record.NewBroadcaster()
kubeDeps.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: string(nodeName)})
eventBroadcaster.StartLogging(glog.V(3).Infof)
if kubeDeps.EventClient != nil {
glog.V(4).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
} else {
glog.Warning("No api server defined - no events will be sent to API server.")
}

// TODO(mtaufen): I moved the validation of these fields here, from UnsecuredKubeletConfig,
//                so that I could remove the associated fields from KubeletConfig. I would
//                prefer this to be done as part of an independent validation step on the
//                KubeletConfiguration. But as far as I can tell, we don't have an explicit
//                place for validation of the KubeletConfiguration yet.
hostNetworkSources, err := kubetypes.GetValidatedSources(kubeCfg.HostNetworkSources)
if err != nil {
return err
}

hostPIDSources, err := kubetypes.GetValidatedSources(kubeCfg.HostPIDSources)
if err != nil {
return err
}

hostIPCSources, err := kubetypes.GetValidatedSources(kubeCfg.HostIPCSources)
if err != nil {
return err
}

privilegedSources := capabilities.PrivilegedSources{
HostNetworkSources: hostNetworkSources,
HostPIDSources:     hostPIDSources,
HostIPCSources:     hostIPCSources,
}
capabilities.Setup(kubeCfg.AllowPrivileged, privilegedSources, 0)

credentialprovider.SetPreferredDockercfgPath(kubeCfg.RootDirectory)
glog.V(2).Infof("Using root directory: %v", kubeCfg.RootDirectory)

builder := kubeDeps.Builder
if builder == nil {
builder = CreateAndInitKubelet
}
if kubeDeps.OSInterface == nil {
kubeDeps.OSInterface = kubecontainer.RealOS{}
}
k, err := builder(kubeCfg, kubeDeps, standaloneMode)
if err != nil {
return fmt.Errorf("failed to create kubelet: %v", err)
}

// NewMainKubelet should have set up a pod source config if one didn't exist
// when the builder was run. This is just a precaution.
if kubeDeps.PodConfig == nil {
return fmt.Errorf("failed to create kubelet, pod source config was nil!")
}
podCfg := kubeDeps.PodConfig

rlimit.RlimitNumFiles(uint64(kubeCfg.MaxOpenFiles))

// TODO(dawnchen): remove this once we deprecated old debian containervm images.
// This is a workaround for issue: https://github.com/opencontainers/runc/issues/726 // The current chosen number is consistent with most of other os dist.
const maxkeysPath = "/proc/sys/kernel/keys/root_maxkeys"
const minKeys uint64 = 1000000
key, err := ioutil.ReadFile(maxkeysPath)
if err != nil {
glog.Errorf("Cannot read keys quota in %s", maxkeysPath)
} else {
fields := strings.Fields(string(key))
nkey, _ := strconv.ParseUint(fields[0], 10, 64)
if nkey < minKeys {
glog.Infof("Setting keys quota in %s to %d", maxkeysPath, minKeys)
err = ioutil.WriteFile(maxkeysPath, []byte(fmt.Sprintf("%d", uint64(minKeys))), 0644)
if err != nil {
glog.Warningf("Failed to update %s: %v", maxkeysPath, err)
}
}
}
const maxbytesPath = "/proc/sys/kernel/keys/root_maxbytes"
const minBytes uint64 = 25000000
bytes, err := ioutil.ReadFile(maxbytesPath)
if err != nil {
glog.Errorf("Cannot read keys bytes in %s", maxbytesPath)
} else {
fields := strings.Fields(string(bytes))
nbyte, _ := strconv.ParseUint(fields[0], 10, 64)
if nbyte < minBytes {
glog.Infof("Setting keys bytes in %s to %d", maxbytesPath, minBytes)
err = ioutil.WriteFile(maxbytesPath, []byte(fmt.Sprintf("%d", uint64(minBytes))), 0644)
if err != nil {
glog.Warningf("Failed to update %s: %v", maxbytesPath, err)
}
}
}

// process pods and exit.
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %v", err)
}
glog.Infof("Started kubelet %s as runonce", version.Get().String())
} else {
err := startKubelet(k, podCfg, kubeCfg, kubeDeps)
if err != nil {
return err
}
glog.Infof("Started kubelet %s", version.Get().String())
}
return nil
}


RunKubelet 的内容可以分成三个部分:

1.初始化各个对象,比如 eventBroadcaster,这样就能给 apiserver 发送 kubelet 的事件

2.通过 builder 创建出来 Kubelet

3.根据运行模式,运行 Kubelet

创建工作是在 k, err := builder(kubeCfg, kubeDeps, standaloneMode) 这句完成的,默认的 builder 是 CreateAndInitKubelet:

func CreateAndInitKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, standaloneMode bool) (k kubelet.KubeletBootstrap, err error) {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations

k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, standaloneMode)
if err != nil {
return nil, err
}

k.BirthCry()

k.StartGarbageCollection()

return k, nil
}


kubelet 的创建

func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (*Kubelet, error) {
......

// PodConfig 非常重要,它是 pod 信息的来源,kubelet 支持文件、URL 和 apiserver 三种渠道,PodConfig 将它们汇聚到一起,通过管道来传递
if kubeDeps.PodConfig == nil {
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName)
}
......

// exec 处理函数,进入到容器中执行命令的方式。之前使用的是 nsenter 命令行的方式,后来 docker 提供了 `docker exec` 命令,默认是后者
var dockerExecHandler dockertools.ExecHandler
switch kubeCfg.DockerExecHandlerName {
case "native":
dockerExecHandler = &dockertools.NativeExecHandler{}
case "nsenter":
dockerExecHandler = &dockertools.NsenterExecHandler{}
default:
glog.Warningf("Unknown Docker exec handler %q; defaulting to native", kubeCfg.DockerExecHandlerName)
dockerExecHandler = &dockertools.NativeExecHandler{}
}

// 使用 reflector 把 ListWatch 得到的服务信息实时同步到 serviceStore 对象中
serviceStore := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
if kubeClient != nil {
serviceLW := cache.NewListWatchFromClient(kubeClient.Core().RESTClient(), "services", api.NamespaceAll, fields.Everything())
cache.NewReflector(serviceLW, &api.Service{}, serviceStore, 0).Run()
}
serviceLister := &cache.StoreToServiceLister{Indexer: serviceStore}

// 使用 reflector 把 ListWatch 得到的节点信息实时同步到  nodeStore 对象中
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
if kubeClient != nil {
fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
nodeLW := cache.NewListWatchFromClient(kubeClient.Core().RESTClient(), "nodes", api.NamespaceAll, fieldSelector)
cache.NewReflector(nodeLW, &api.Node{}, nodeStore, 0).Run()
}
nodeLister := &cache.StoreToNodeLister{Store: nodeStore}
nodeInfo := &predicates.CachedNodeInfo{StoreToNodeLister: nodeLister}

......

// 根据配置信息和各种对象创建 Kubelet 实例
klet := &Kubelet{
hostname:                       hostname,
nodeName:                       nodeName,
dockerClient:                   kubeDeps.DockerClient,
kubeClient:                     kubeClient,
......
clusterDomain:                  kubeCfg.ClusterDomain,
clusterDNS:                     net.ParseIP(kubeCfg.ClusterDNS),
serviceLister:                  serviceLister,
nodeLister:                     nodeLister,
nodeInfo:                       nodeInfo,
masterServiceNamespace:         kubeCfg.MasterServiceNamespace,
streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
recorder:                       kubeDeps.Recorder,
cadvisor:                       kubeDeps.CAdvisorInterface,
diskSpaceManager:               diskSpaceManager,
......
}

......

// 网络插件的初始化工作
if plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, kubeCfg.NetworkPluginName, &criNetworkHost{&networkHost{klet}}, klet.hairpinMode, klet.nonMasqueradeCIDR, int(kubeCfg.NetworkPluginMTU)); err != nil {
return nil, err
} else {
klet.networkPlugin = plug
}

// 从 cAdvisor 获取当前机器的信息
machineInfo, err := klet.GetCachedMachineInfo()
......

procFs := procfs.NewProcFS()
imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)

klet.livenessManager = proberesults.NewManager()

// podManager 负责管理当前节点上的 pod 信息,它保存了所有 pod 的内容,包括 static pod。
// kubelet 从本地文件、网络地址和 apiserver 三个地方获取 pod 的内容,
klet.podCache = kubecontainer.NewCache()
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))
......

// 创建 runtime 对象,以后会改用 CRI 接口和 runtime 交互,目前使用 DockerManager
if kubeCfg.EnableCRI {
......
} else {
switch kubeCfg.ContainerRuntime {
case "docker":
runtime := dockertools.NewDockerManager(
kubeDeps.DockerClient,
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
containerRefManager,
klet.podManager,
machineInfo,
kubeCfg.PodInfraContainerImage,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
ContainerLogsDir,
kubeDeps.OSInterface,
klet.networkPlugin,
klet,
klet.httpClient,
dockerExecHandler,
kubeDeps.OOMAdjuster,
procFs,
klet.cpuCFSQuota,
imageBackOff,
kubeCfg.SerializeImagePulls,
kubeCfg.EnableCustomMetrics,
klet.hairpinMode == componentconfig.HairpinVeth && kubeCfg.NetworkPluginName != "kubenet",
kubeCfg.SeccompProfileRoot,
kubeDeps.ContainerRuntimeOptions...,
)
klet.containerRuntime = runtime
klet.runner = kubecontainer.DirectStreamingRunner(runtime)
case "rkt":
......
default:
return nil, fmt.Errorf("unsupported container runtime %q specified", kubeCfg.ContainerRuntime)
}
}

......

klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
klet.updatePodCIDR(kubeCfg.PodCIDR)

// 创建 containerGC 对象,进行周期性的容器清理工作
containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy)
if err != nil {
return nil, err
}
klet.containerGC = containerGC
klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))

// 创建 imageManager 对象,管理镜像
imageManager, err := images.NewImageGCManager(klet.containerRuntime, kubeDeps.CAdvisorInterface, kubeDeps.Recorder, nodeRef, imageGCPolicy)
if err != nil {
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
}
klet.imageManager = imageManager

// statusManager 实时检测节点上 pod 的状态,并更新到 apiserver 对应的 pod
klet.statusManager = status.NewManager(kubeClient, klet.podManager)

// probeManager 检测 pod 的状态,并通过 statusManager 进行更新
klet.probeManager = prober.NewManager(
klet.statusManager,
klet.livenessManager,
klet.runner,
containerRefManager,
kubeDeps.Recorder)

// volumeManager 管理节点上 volume

klet.volumePluginMgr, err =
NewInitializedVolumePluginMgr(klet, kubeDeps.VolumePlugins)
if err != nil {
return nil, err
}
......
// setup volumeManager
klet.volumeManager, err = volumemanager.NewVolumeManager(
kubeCfg.EnableControllerAttachDetach,
nodeName,
klet.podManager,
klet.kubeClient,
klet.volumePluginMgr,
klet.containerRuntime,
kubeDeps.Mounter,
klet.getPodsDir(),
kubeDeps.Recorder,
kubeCfg.ExperimentalCheckNodeCapabilitiesBeforeMount)

// 保存了节点上正在运行的 pod 信息
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {
return nil, err
}
klet.runtimeCache = runtimeCache
klet.reasonCache = NewReasonCache()
klet.workQueue = queue.NewBasicWorkQueue(klet.clock)

// podWorkers 是具体的执行者
klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)

......
klet.kubeletConfiguration = *kubeCfg
return klet, nil
}


NewMainKubelet 正如名字所示,主要的工作就是创建 Kubelet 这个对象,它包含了 kubelet 运行需要的所有对象,上面的代码就是各种对象的初始化和赋值的过程,这里只介绍几个非常重要的对象来说:

podConfig:这个对象里面会从文件、网络和 apiserver 三个来源中汇聚节点要运行的 pod 信息,并通过管道发送出来,读取这个管道就能获取实时的 pod 最新配置
ServiceLister:能够读取 kubernetes 中服务信息
nodeLister:能够读取 apiserver 中节点的信息
diskSpaceManager:返回容器存储空间的信息
podManager:缓存了 pod 的信息,是所有需要该信息都会去访问的地方
runtime:容器运行时,对容器引擎(docker 或者 rkt)的一层封装,负责调用容器引擎接口管理容器的状态,比如启动、暂停、杀死容器等
probeManager:如果 pod 配置了状态监测,那么 probeManager 会定时检查 pod 是否正常工作,并通过 statusManager 向 apiserver 更新 pod 的状态
volumeManager:负责容器需要的 volume 管理。检测某个 volume 是否已经 mount、获取 pod 使用的 volume 等
podWorkers:具体的执行者,每次有 pod 需要更新的时候都会发送给它


这里并不一一展开所有对象的实现和具体功能,以后的文章会对其中一些继续分析。

kubelet 的运行

func startKubelet(k kubelet.KubeletBootstrap, podCfg *config.PodConfig, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps) error {
// start the kubelet
go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)

// start the kubelet server
if kubeCfg.EnableServer {
go wait.Until(func() {
k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers)
}, 0, wait.NeverStop)
}
if kubeCfg.ReadOnlyPort > 0 {
go wait.Until(func() {
k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
}, 0, wait.NeverStop)
}

return nil
}


运行 kubelet 主要启动两个功能,k.Run() 来进入主循环,k.ListenAndServe() 启动 kubelet 的 API 服务,后者并不是这篇文章的重点,我们来看看前者,它的执行入口是 k.Run(podCfg.Updates()),podCfg.Updates() 我们前面已经说过,它是一个管道,会实时地发送过来 pod 最新的配置信息,至于是怎么实现的,我们以后再说,这里知道它的作用就行。Run 方法的代码如下:

// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
glog.Warning("No api server defined - no node status update will be sent.")
}
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, events.KubeletSetupFailed, err.Error())
glog.Error(err)
kl.runtimeState.setInitError(err)
}

// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

if kl.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
}
go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop)
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

// Start loop to sync iptables util rules
if kl.makeIPTablesUtilChains {
go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
}

// Start a goroutine responsible for killing pods (that are not properly
// handled by pod workers).
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)

// Start component sync loops.
kl.statusManager.Start()
kl.probeManager.Start()

// Start the pod lifecycle event generator.
kl.pleg.Start()
kl.syncLoop(updates, kl)
}


基本上就是 kubelet 各种组件的启动,每个组件都是以 goroutine 运行的,这里不做赘述。最后一句 kl.syncLoop(updates, kl) 是处理所有 pod 更新的主循环,获取 pod 的变化(新建、修改和删除),调用对应的处理函数保证节点上的容器符合 pod 的配置。

参考原文地址

http://cizixs.com/2017/06/06/kubelet-source-code-analysis-part-1
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kubernetes