k8s源码解析(6)--kubelet

kubelet

kubelet是k8s集群中一个组件,其作为一个agent的角色分布在各个节点上,主要功能有:

  • 节点状态同步:kublet给api-server同步当前节点的状态,会同步当前节点的CPU,内存及磁盘空间等资源到api-server,为scheduler调度pod时提供基础数据支撑

  • Pod的启停及状态管理:kubelet会启动经scheduler调度到本节点的pod,同步它的状态保障它运行,当Pod关闭时负责资源回收

1. 端口

在node1节点进行端口查询:

[root@node1 ~]# netstat -anp|grep kubelet
tcp        0      0 127.0.0.1:10248         0.0.0.0:*               LISTEN      1718/kubelet                
tcp6       0      0 :::10250                :::*                    LISTEN      1718/kubelet 

我们可以看到监听两个端口:

  • 10250: kubelet与apiserver通信的端口,定期请求apiserver获取自己所应当处理的任务,通过该端口可以访问获取node资源以及状态

  • 10248: 通过访问该端口可以判断kubelet是否正常工作,比如:

    [root@node1 ~]# curl http://127.0.0.1:10248/healthz
    ok
    

2. 启动

查看kubelet的配置

[root@node1 ~]# cat /var/lib/kubelet/config.yaml
apiVersion: kubelet.config.k8s.io/v1beta1
authentication:
  anonymous:
    enabled: false
  webhook:
    cacheTTL: 0s
    enabled: true
  x509:
    clientCAFile: /etc/kubernetes/pki/ca.crt
authorization:
  mode: Webhook
  webhook:
    cacheAuthorizedTTL: 0s
    cacheUnauthorizedTTL: 0s
cgroupDriver: systemd
clusterDNS:
- 10.96.0.10
clusterDomain: cluster.local
cpuManagerReconcilePeriod: 0s
evictionPressureTransitionPeriod: 0s
failSwapOn: false
fileCheckFrequency: 0s
healthzBindAddress: 127.0.0.1
healthzPort: 10248
httpCheckFrequency: 0s
imageMinimumGCAge: 0s
kind: KubeletConfiguration
logging:
  flushFrequency: 0
  options:
    json:
      infoBufferSize: "0"
  verbosity: 0
memorySwap: {}
nodeStatusReportFrequency: 0s
nodeStatusUpdateFrequency: 0s
rotateCertificates: true
runtimeRequestTimeout: 0s
shutdownGracePeriod: 0s
shutdownGracePeriodCriticalPods: 0s
staticPodPath: /etc/kubernetes/manifests
streamingConnectionIdleTimeout: 0s
syncFrequency: 0s
volumeStatsAggPeriod: 0s

2.1 入口

//cmd/kubelet/kubelet.go
func main() {
 command := app.NewKubeletCommand()

 // kubelet uses a config file and does its own special
 // parsing of flags and that config file. It initializes
 // logging after it is done with that. Therefore it does
 // not use cli.Run like other, simpler commands.
 code := run(command)
 os.Exit(code)
}

同样是cobra这个框架搭建。

同理最终运行的Run。

func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
 // To help debugging, immediately log version
 klog.InfoS("Kubelet version", "kubeletVersion", version.Get())

 klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
 //不同的操作系统可能需要做不同的初始化操作
 if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {
  return fmt.Errorf("failed OS init: %w", err)
 }
 if err := run(ctx, s, kubeDeps, featureGate); err != nil {
  return fmt.Errorf("failed to run Kubelet: %w", err)
 }
 return nil
}

func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
 // Set global feature gates based on the value on the initial KubeletServer
    //基于kubelet初始化配置设置kubelet的特性开关
 err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
 if err != nil {
  return err
 }
 // validate the initial KubeletServer (we set feature gates first, because this validation depends on feature gates)
    //校验kubelet的配置参数等
 if err := options.ValidateKubeletServer(s); err != nil {
  return err
 }

 // Warn if MemoryQoS enabled with cgroups v1
 if utilfeature.DefaultFeatureGate.Enabled(features.MemoryQoS) &&
  !isCgroup2UnifiedMode() {
  klog.InfoS("Warning: MemoryQoS feature only works with cgroups v2 on Linux, but enabled with cgroups v1")
 }
 // Obtain Kubelet Lock File
 if s.ExitOnLockContention && s.LockFilePath == "" {
  return errors.New("cannot exit on lock file contention: no lock file specified")
 }
 done := make(chan struct{})
 if s.LockFilePath != "" {
  klog.InfoS("Acquiring file lock", "path", s.LockFilePath)
  if err := flock.Acquire(s.LockFilePath); err != nil {
   return fmt.Errorf("unable to acquire file lock on %q: %w", s.LockFilePath, err)
  }
  if s.ExitOnLockContention {
   klog.InfoS("Watching for inotify events", "path", s.LockFilePath)
   if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
    return err
   }
  }
 }

 // Register current configuration with /configz endpoint
    //初始化configz endpoint
 err = initConfigz(&s.KubeletConfiguration)
 if err != nil {
  klog.ErrorS(err, "Failed to register kubelet configuration with configz")
 }

 if len(s.ShowHiddenMetricsForVersion) > 0 {
  metrics.SetShowHidden()
 }

 // About to get clients and such, detect standaloneMode
 standaloneMode := true
 if len(s.KubeConfig) > 0 {
  standaloneMode = false
 }

 if kubeDeps == nil {
  kubeDeps, err = UnsecuredDependencies(s, featureGate)
  if err != nil {
   return err
  }
 }
 //初始化CloudProvider
 if kubeDeps.Cloud == nil {
  if !cloudprovider.IsExternal(s.CloudProvider) {
   cloudprovider.DeprecationWarningForProvider(s.CloudProvider)
   cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
   if err != nil {
    return err
   }
   if cloud != nil {
    klog.V(2).InfoS("Successfully initialized cloud provider", "cloudProvider", s.CloudProvider, "cloudConfigFile", s.CloudConfigFile)
   }
   kubeDeps.Cloud = cloud
  }
 }

 hostName, err := nodeutil.GetHostname(s.HostnameOverride)
 if err != nil {
  return err
 }
 nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
 if err != nil {
  return err
 }

 // if in standalone mode, indicate as much by setting all clients to nil
    //初始化KubeClient, EventClient, HeartbeatClient
 switch {
 case standaloneMode:
  kubeDeps.KubeClient = nil
  kubeDeps.EventClient = nil
  kubeDeps.HeartbeatClient = nil
  klog.InfoS("Standalone mode, no API client")

 case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
  clientConfig, onHeartbeatFailure, err := buildKubeletClientConfig(ctx, s, nodeName)
  if err != nil {
   return err
  }
  if onHeartbeatFailure == nil {
   return errors.New("onHeartbeatFailure must be a valid function other than nil")
  }
  kubeDeps.OnHeartbeatFailure = onHeartbeatFailure

  kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
  if err != nil {
   return fmt.Errorf("failed to initialize kubelet client: %w", err)
  }

  // make a separate client for events
  eventClientConfig := *clientConfig
  eventClientConfig.QPS = float32(s.EventRecordQPS)
  eventClientConfig.Burst = int(s.EventBurst)
  kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
  if err != nil {
   return fmt.Errorf("failed to initialize kubelet event client: %w", err)
  }

  // make a separate client for heartbeat with throttling disabled and a timeout attached
  heartbeatClientConfig := *clientConfig
  heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
  // The timeout is the minimum of the lease duration and status update frequency
  leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
  if heartbeatClientConfig.Timeout > leaseTimeout {
   heartbeatClientConfig.Timeout = leaseTimeout
  }

  heartbeatClientConfig.QPS = float32(-1)
  kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
  if err != nil {
   return fmt.Errorf("failed to initialize kubelet heartbeat client: %w", err)
  }
 }

 if kubeDeps.Auth == nil {
        //创建Auth
  auth, runAuthenticatorCAReload, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
  if err != nil {
   return err
  }
  kubeDeps.Auth = auth
  runAuthenticatorCAReload(ctx.Done())
 }

 var cgroupRoots []string
 nodeAllocatableRoot := cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupsPerQOS, s.CgroupDriver)
 cgroupRoots = append(cgroupRoots, nodeAllocatableRoot)
 kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
 if err != nil {
  klog.InfoS("Failed to get the kubelet's cgroup. Kubelet system container metrics may be missing.", "err", err)
 } else if kubeletCgroup != "" {
  cgroupRoots = append(cgroupRoots, kubeletCgroup)
 }

 if err != nil {
  klog.InfoS("Failed to get the container runtime's cgroup. Runtime system container metrics may be missing.", "err", err)
 } else if s.RuntimeCgroups != "" {
  // RuntimeCgroups is optional, so ignore if it isn't specified
  cgroupRoots = append(cgroupRoots, s.RuntimeCgroups)
 }

 if s.SystemCgroups != "" {
  // SystemCgroups is optional, so ignore if it isn't specified
  cgroupRoots = append(cgroupRoots, s.SystemCgroups)
 }

 if kubeDeps.CAdvisorInterface == nil {
  imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.RemoteRuntimeEndpoint)
        //创建cadvisor组件
  kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.RemoteRuntimeEndpoint))
  if err != nil {
   return err
  }
 }

 // Setup event recorder if required.
 makeEventRecorder(kubeDeps, nodeName)

 if kubeDeps.ContainerManager == nil {
  if s.CgroupsPerQOS && s.CgroupRoot == "" {
   klog.InfoS("--cgroups-per-qos enabled, but --cgroup-root was not specified.  defaulting to /")
   s.CgroupRoot = "/"
  }

  machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
  if err != nil {
   return err
  }
  reservedSystemCPUs, err := getReservedCPUs(machineInfo, s.ReservedSystemCPUs)
  if err != nil {
   return err
  }
  if reservedSystemCPUs.Size() > 0 {
   // at cmd option validation phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok
   klog.InfoS("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved and SystemReserved", "kubeReservedCPUs", s.KubeReserved, "systemReservedCPUs", s.SystemReserved)
   if s.KubeReserved != nil {
    delete(s.KubeReserved, "cpu")
   }
   if s.SystemReserved == nil {
    s.SystemReserved = make(map[string]string)
   }
   s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
   klog.InfoS("After cpu setting is overwritten", "kubeReservedCPUs", s.KubeReserved, "systemReservedCPUs", s.SystemReserved)
  }

  kubeReserved, err := parseResourceList(s.KubeReserved)
  if err != nil {
   return err
  }
  systemReserved, err := parseResourceList(s.SystemReserved)
  if err != nil {
   return err
  }
  var hardEvictionThresholds []evictionapi.Threshold
  // If the user requested to ignore eviction thresholds, then do not set valid values for hardEvictionThresholds here.
  if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
   hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
   if err != nil {
    return err
   }
  }
  experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
  if err != nil {
   return err
  }

  devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)

  var cpuManagerPolicyOptions map[string]string
  if utilfeature.DefaultFeatureGate.Enabled(features.CPUManager) {
   if utilfeature.DefaultFeatureGate.Enabled(features.CPUManagerPolicyOptions) {
    cpuManagerPolicyOptions = s.CPUManagerPolicyOptions
   } else if s.CPUManagerPolicyOptions != nil {
    return fmt.Errorf("CPU Manager policy options %v require feature gates %q, %q enabled",
     s.CPUManagerPolicyOptions, features.CPUManager, features.CPUManagerPolicyOptions)
   }
  }
  //创建ContainerManager组件,ContaienrManager是kubelet最为重要的组件之一
  kubeDeps.ContainerManager, err = cm.NewContainerManager(
   kubeDeps.Mounter,
   kubeDeps.CAdvisorInterface,
   cm.NodeConfig{
    RuntimeCgroupsName:    s.RuntimeCgroups,
    SystemCgroupsName:     s.SystemCgroups,
    KubeletCgroupsName:    s.KubeletCgroups,
    KubeletOOMScoreAdj:    s.OOMScoreAdj,
    CgroupsPerQOS:         s.CgroupsPerQOS,
    CgroupRoot:            s.CgroupRoot,
    CgroupDriver:          s.CgroupDriver,
    KubeletRootDir:        s.RootDirectory,
    ProtectKernelDefaults: s.ProtectKernelDefaults,
    NodeAllocatableConfig: cm.NodeAllocatableConfig{
     KubeReservedCgroupName:   s.KubeReservedCgroup,
     SystemReservedCgroupName: s.SystemReservedCgroup,
     EnforceNodeAllocatable:   sets.NewString(s.EnforceNodeAllocatable...),
     KubeReserved:             kubeReserved,
     SystemReserved:           systemReserved,
     ReservedSystemCPUs:       reservedSystemCPUs,
     HardEvictionThresholds:   hardEvictionThresholds,
    },
    QOSReserved:                             *experimentalQOSReserved,
    ExperimentalCPUManagerPolicy:            s.CPUManagerPolicy,
    ExperimentalCPUManagerPolicyOptions:     cpuManagerPolicyOptions,
    ExperimentalCPUManagerReconcilePeriod:   s.CPUManagerReconcilePeriod.Duration,
    ExperimentalMemoryManagerPolicy:         s.MemoryManagerPolicy,
    ExperimentalMemoryManagerReservedMemory: s.ReservedMemory,
    ExperimentalPodPidsLimit:                s.PodPidsLimit,
    EnforceCPULimits:                        s.CPUCFSQuota,
    CPUCFSQuotaPeriod:                       s.CPUCFSQuotaPeriod.Duration,
    ExperimentalTopologyManagerPolicy:       s.TopologyManagerPolicy,
    ExperimentalTopologyManagerScope:        s.TopologyManagerScope,
   },
   s.FailSwapOn,
   devicePluginEnabled,
   kubeDeps.Recorder)

  if err != nil {
   return err
  }
 }

 // TODO(vmarmol): Do this through container config.
 oomAdjuster := kubeDeps.OOMAdjuster
 if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
  klog.InfoS("Failed to ApplyOOMScoreAdj", "err", err)
 }
 //初始化RuntimeService  设置CRI
 err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration, kubeDeps, s.RemoteRuntimeEndpoint, s.RemoteImageEndpoint)
 if err != nil {
  return err
 }
 //执行RunKubelet函数
 if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
  return err
 }

 if s.HealthzPort > 0 {
  mux := http.NewServeMux()
  healthz.InstallHandler(mux)
  go wait.Until(func() {
   err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
   if err != nil {
    klog.ErrorS(err, "Failed to start healthz server")
   }
  }, 5*time.Second, wait.NeverStop)
 }

 if s.RunOnce {
  return nil
 }

 // If systemd is used, notify it that we have started
 go daemon.SdNotify(false, "READY=1")

 select {
 case <-done:
  break
 case <-ctx.Done():
  break
 }

 return nil
}

上图是kubelet组件中的模块以及模块间的划分。

注意,上图是1.24版本之前的

  • PLEG(Pod Lifecycle Event Generator)

    PLEG 是 kubelet 的核心模块,PLEG 会一直调用 container runtime 获取本节点 containers/sandboxes 的信息,并与自身维护的 pods cache 信息进行对比,生成对应的 PodLifecycleEvent,然后输出到 eventChannel 中,通过 eventChannel 发送到 kubelet syncLoop 进行消费,然后由 kubelet syncPod 来触发 pod 同步处理过程,最终达到用户的期望状态。

  • cAdvisor

    cAdvisor(https://github.com/google/cadvisor)是 google 开发的容器监控工具,集成在 kubelet 中,起到收集本节点和容器的监控信息,大部分公司对容器的监控数据都是从 cAdvisor 中获取的 ,cAvisor 模块对外提供了 interface 接口

  • OOMWatcher

    系统 OOM 的监听器,会与 cadvisor 模块之间建立 SystemOOM,通过 Watch方式从 cadvisor 那里收到的 OOM 信号,并产生相关事件。

  • probeManager

    probeManager 依赖于 statusManager,livenessManager,containerRefManager,会定时去监控 pod 中容器的健康状况,当前支持两种类型的探针:livenessProbe 和readinessProbe。 livenessProbe:用于判断容器是否存活,如果探测失败,kubelet 会 kill 掉该容器,并根据容器的重启策略做相应的处理。

    readinessProbe:用于判断容器是否启动完成,将探测成功的容器加入到该 pod 所在 service 的 endpoints 中,反之则移除。

    readinessProbe 和 livenessProbe 有三种实现方式:http、tcp 以及 cmd。

  • statusManager

    statusManager 负责维护状态信息,并把 pod 状态更新到 apiserver,但是它并不负责监控 pod 状态的变化,而是提供对应的接口供其他组件调用,比如 probeManager。

  • containerRefManager

    容器引用的管理,相对简单的Manager,用来报告容器的创建,失败等事件,通过定义 map 来实现了 containerID 与 v1.ObjectReferece 容器引用的映射。

  • evictionManager

    当节点的内存、磁盘或 inode 等资源不足时,达到了配置的 evict 策略, node 会变为 pressure 状态,此时 kubelet 会按照 qosClass 顺序来驱赶 pod,以此来保证节点的稳定性。可以通过配置 kubelet 启动参数 --eviction-hard= 来决定 evict 的策略值。

  • imageGC

    imageGC 负责 node 节点的镜像回收,当本地的存放镜像的本地磁盘空间达到某阈值的时候,会触发镜像的回收,删除掉不被 pod 所使用的镜像,回收镜像的阈值可以通过 kubelet 的启动参数 --image-gc-high-threshold--image-gc-low-threshold 来设置。

  • containerGC

    containerGC 负责清理 node 节点上已消亡的 container,具体的 GC 操作由runtime 来实现。

  • imageManager

    调用 kubecontainer 提供的PullImage/GetImageRef/ListImages/RemoveImage/ImageStates 方法来保证pod 运行所需要的镜像。

  • volumeManager

    负责 node 节点上 pod 所使用 volume 的管理,volume 与 pod 的生命周期关联,负责 pod 创建删除过程中 volume 的 mount/umount/attach/detach 流程,kubernetes 采用 volume Plugins 的方式,实现存储卷的挂载等操作,内置几十种存储插件。

  • containerManager

    负责 node 节点上运行的容器的 cgroup 配置信息,kubelet 启动参数如果指定 --cgroups-per-qos 的时候,kubelet 会启动 goroutine 来周期性的更新 pod 的 cgroup 信息,维护其正确性,该参数默认为 true,实现了 pod 的Guaranteed/BestEffort/Burstable 三种级别的 Qos。

  • runtimeManager

    containerRuntime 负责 kubelet 与不同的 runtime 实现进行对接,实现对于底层 container 的操作,初始化之后得到的 runtime 实例将会被之前描述的组件所使用。可以通过 kubelet 的启动参数 --container-runtime 来定义是使用docker 还是 rkt,默认是 docker

  • podManager

    podManager 提供了接口来存储和访问 pod 的信息,维持 static pod 和 mirror pods 的关系,podManager 会被statusManager/volumeManager/runtimeManager 所调用,podManager 的接口处理流程里面会调用 secretManager 以及 configMapManager。

func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
 hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
 if err != nil {
  return err
 }
 // Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
 nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
 if err != nil {
  return err
 }
 hostnameOverridden := len(kubeServer.HostnameOverride) > 0
 // Setup event recorder if required.
 makeEventRecorder(kubeDeps, nodeName)

 var nodeIPs []net.IP
 if kubeServer.NodeIP != "" {
  for _, ip := range strings.Split(kubeServer.NodeIP, ",") {
   parsedNodeIP := netutils.ParseIPSloppy(strings.TrimSpace(ip))
   if parsedNodeIP == nil {
    klog.InfoS("Could not parse --node-ip ignoring", "IP", ip)
   } else {
    nodeIPs = append(nodeIPs, parsedNodeIP)
   }
  }
 }

 if len(nodeIPs) > 2 || (len(nodeIPs) == 2 && netutils.IsIPv6(nodeIPs[0]) == netutils.IsIPv6(nodeIPs[1])) {
  return fmt.Errorf("bad --node-ip %q; must contain either a single IP or a dual-stack pair of IPs", kubeServer.NodeIP)
 } else if len(nodeIPs) == 2 && kubeServer.CloudProvider != "" {
  return fmt.Errorf("dual-stack --node-ip %q not supported when using a cloud provider", kubeServer.NodeIP)
 } else if len(nodeIPs) == 2 && (nodeIPs[0].IsUnspecified() || nodeIPs[1].IsUnspecified()) {
  return fmt.Errorf("dual-stack --node-ip %q cannot include '0.0.0.0' or '::'", kubeServer.NodeIP)
 }

 capabilities.Initialize(capabilities.Capabilities{
  AllowPrivileged: true,
 })

 credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
 klog.V(2).InfoS("Using root directory", "path", kubeServer.RootDirectory)

 if kubeDeps.OSInterface == nil {
  kubeDeps.OSInterface = kubecontainer.RealOS{}
 }

 if kubeServer.KubeletConfiguration.SeccompDefault && !utilfeature.DefaultFeatureGate.Enabled(features.SeccompDefault) {
  return fmt.Errorf("the SeccompDefault feature gate must be enabled in order to use the SeccompDefault configuration")
 }
 // 创建并实例化kubelet
 k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
  kubeDeps,
  &kubeServer.ContainerRuntimeOptions,
  hostname,
  hostnameOverridden,
  nodeName,
  nodeIPs,
  kubeServer.ProviderID,
  kubeServer.CloudProvider,
  kubeServer.CertDirectory,
  kubeServer.RootDirectory,
  kubeServer.ImageCredentialProviderConfigFile,
  kubeServer.ImageCredentialProviderBinDir,
  kubeServer.RegisterNode,
  kubeServer.RegisterWithTaints,
  kubeServer.AllowedUnsafeSysctls,
  kubeServer.ExperimentalMounterPath,
  kubeServer.KernelMemcgNotification,
  kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
  kubeServer.MinimumGCAge,
  kubeServer.MaxPerPodContainerCount,
  kubeServer.MaxContainerCount,
  kubeServer.MasterServiceNamespace,
  kubeServer.RegisterSchedulable,
  kubeServer.KeepTerminatedPodVolumes,
  kubeServer.NodeLabels,
  kubeServer.NodeStatusMaxImages,
  kubeServer.KubeletFlags.SeccompDefault || kubeServer.KubeletConfiguration.SeccompDefault,
 )
 if err != nil {
  return fmt.Errorf("failed to create kubelet: %w", err)
 }

 // NewMainKubelet should have set up a pod source config if one didn't exist
 // when the builder was run. This is just a precaution.
 if kubeDeps.PodConfig == nil {
  return fmt.Errorf("failed to create kubelet, pod source config was nil")
 }
 podCfg := kubeDeps.PodConfig

 if err := rlimit.SetNumFiles(uint64(kubeServer.MaxOpenFiles)); err != nil {
  klog.ErrorS(err, "Failed to set rlimit on max file handles")
 }

 // process pods and exit.
 if runOnce {
  if _, err := k.RunOnce(podCfg.Updates()); err != nil {
   return fmt.Errorf("runonce failed: %w", err)
  }
  klog.InfoS("Started kubelet as runonce")
 } else {
        // 开始运行kubelet
  startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
  klog.InfoS("Started kubelet")
 }
 return nil
}

3. 启动Pod

上面的启动最终会启动一个syncLoop的事件循环。

当一个Pod被Scheduler调度到某个Node之后,就会触发到kubelet.syncLoop里面的事件,经过一系列的操作,最后达到Pod正常跑起来。

func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
 klog.InfoS("Starting kubelet main sync loop")
 // The syncTicker wakes up kubelet to checks if there are any pod workers
 // that need to be sync'd. A one-second period is sufficient because the
 // sync interval is defaulted to 10s.
    //这里意思是每秒检测是否有需要同步的pod workers
 syncTicker := time.NewTicker(time.Second)
 defer syncTicker.Stop()
    //housekeepingPeriod 2秒 每两秒检测一次是否有需要清理的 pod
 housekeepingTicker := time.NewTicker(housekeepingPeriod)
 defer housekeepingTicker.Stop()
 plegCh := kl.pleg.Watch()
 const (
  base   = 100 * time.Millisecond
  max    = 5 * time.Second
  factor = 2
 )
 duration := base
 // Responsible for checking limits in resolv.conf
 // The limits do not have anything to do with individual pods
 // Since this is called in syncLoop, we don't need to call it anywhere else
 if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
  kl.dnsConfigurer.CheckLimitsForResolvConf()
 }
 //在这里我们看到一个for循环
 for {
        //如果在每次循环过程中出现比较严重的错误,kubelet 会记录到 runtimeState 中
        //遇到错误 那么就200毫秒-5秒 等待
  if err := kl.runtimeState.runtimeErrors(); err != nil {
   klog.ErrorS(err, "Skipping pod synchronization")
   // exponential backoff
   time.Sleep(duration)
   duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
   continue
  }
  // reset backoff if we have a success
  duration = base

  kl.syncLoopMonitor.Store(kl.clock.Now())
        //Iteration迭代 相当于一直调用syncLoopIteration方法
  if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
   break
  }
  kl.syncLoopMonitor.Store(kl.clock.Now())
 }
}

3.1 syncLoopIteration

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
 syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    //对多个管道进行遍历,发现任何一个管道有消息就交给 handler 去处理
    
 select {
 case u, open := <-configCh: //将有配置变化的pod进行分发
  // Update from a config source; dispatch it to the right handler
  // callback.
  if !open {
   klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
   return false
  }

  switch u.Op {
  case kubetypes.ADD: // pod新增事件
   klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))
   // After restarting, kubelet will get all existing pods through
   // ADD as if they are new pods. These pods will then go through the
   // admission process and *may* be rejected. This can be resolved
   // once we have checkpointing.
   handler.HandlePodAdditions(u.Pods)
  case kubetypes.UPDATE: // pod更新事件
   klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjs(u.Pods))
   handler.HandlePodUpdates(u.Pods)
  case kubetypes.REMOVE: // pod移除事件
   klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjs(u.Pods))
   handler.HandlePodRemoves(u.Pods)
  case kubetypes.RECONCILE: // pod RECONCILE事件
   klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjs(u.Pods))
   handler.HandlePodReconcile(u.Pods)
  case kubetypes.DELETE: // pod删除事件
   klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjs(u.Pods))
   // DELETE is treated as a UPDATE because of graceful deletion.
   handler.HandlePodUpdates(u.Pods)
  case kubetypes.SET: // pod set 事件
   // TODO: Do we want to support this?
   klog.ErrorS(nil, "Kubelet does not support snapshot update")
  default:
   klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
  }

  kl.sourcesReady.AddSource(u.Source)

 case e := <-plegCh: //更新runtime cache 同步pod
  if e.Type == pleg.ContainerStarted {
   // record the most recent time we observed a container start for this pod.
   // this lets us selectively invalidate the runtimeCache when processing a delete for this pod
   // to make sure we don't miss handling graceful termination for containers we reported as having started.
   kl.lastContainerStartedTime.Add(e.ID, time.Now())
  }
  if isSyncPodWorthy(e) {
   // PLEG event for a pod; sync it.
   if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
    klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
    handler.HandlePodSyncs([]*v1.Pod{pod})
   } else {
    // If the pod no longer exists, ignore the event.
    klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
   }
  }

  if e.Type == pleg.ContainerDied {
   if containerID, ok := e.Data.(string); ok {
    kl.cleanUpContainersInPod(e.ID, containerID)
   }
  }
 case <-syncCh: //每秒一次 同步pod
  // Sync pods waiting for sync
  podsToSync := kl.getPodsToSync()
  if len(podsToSync) == 0 {
   break
  }
  klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", klog.KObjs(podsToSync))
  handler.HandlePodSyncs(podsToSync)
 case update := <-kl.livenessManager.Updates(): //存活探针
  if update.Result == proberesults.Failure {
   handleProbeSync(kl, update, handler, "liveness", "unhealthy")
  }
 case update := <-kl.readinessManager.Updates(): //判断容器是否完成
  ready := update.Result == proberesults.Success
  kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)

  status := ""
  if ready {
   status = "ready"
  }
  handleProbeSync(kl, update, handler, "readiness", status)
 case update := <-kl.startupManager.Updates():
  started := update.Result == proberesults.Success
  kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)

  status := "unhealthy"
  if started {
   status = "started"
  }
  handleProbeSync(kl, update, handler, "startup", status)
 case <-housekeepingCh: //检查是否有需要清理的pod
  if !kl.sourcesReady.AllReady() {
   // If the sources aren't ready or volume manager has not yet synced the states,
   // skip housekeeping, as we may accidentally delete pods from unready sources.
   klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
  } else {
   start := time.Now()
   klog.V(4).InfoS("SyncLoop (housekeeping)")
   if err := handler.HandlePodCleanups(); err != nil {
    klog.ErrorS(err, "Failed cleaning pods")
   }
   duration := time.Since(start)
   if duration > housekeepingWarningDuration {
    klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
   }
   klog.V(4).InfoS("SyncLoop (housekeeping) end")
  }
 }
 return true
}

3.2 pod新增事件


// HandlePodAdditions is the callback in SyncHandler for pods being added from
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
 start := kl.clock.Now()
    //将pods按照创建日期排列,保证最先创建的 pod 会最先被处理
 sort.Sort(sliceutils.PodsByCreationTime(pods))
 for _, pod := range pods {
  existingPods := kl.podManager.GetPods()
  // Always add the pod to the pod manager. Kubelet relies on the pod
  // manager as the source of truth for the desired state. If a pod does
  // not exist in the pod manager, it means that it has been deleted in
  // the apiserver and no action (other than cleanup) is required.
        // 把 pod 加入到 podManager 中。statusManager,volumeManager,runtimeManager都依赖于这个podManager
  kl.podManager.AddPod(pod)
  //处理静态pod,实际上内部同样是调用了kl.dispatchWork,这里主要跳过了拒绝掉pod的判断
  if kubetypes.IsMirrorPod(pod) {
   kl.handleMirrorPod(pod, start)
   continue
  }

  // Only go through the admission process if the pod is not requested
  // for termination by another part of the kubelet. If the pod is already
  // using resources (previously admitted), the pod worker is going to be
  // shutting it down. If the pod hasn't started yet, we know that when
  // the pod worker is invoked it will also avoid setting up the pod, so
  // we simply avoid doing any work.
  if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
   // We failed pods that we rejected, so activePods include all admitted
   // pods that are alive.
   activePods := kl.filterOutInactivePods(existingPods)

   // Check if we can admit the pod; if not, reject it.
            //验证 pod 是否能在该节点运行,如果不可以直接拒绝;
   if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
    kl.rejectPod(pod, reason, message)
    continue
   }
  }
  mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
  kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
 }
}

3.2.1 UpdatePod


// UpdatePod carries a configuration change or termination state to a pod. A pod is either runnable,
// terminating, or terminated, and will transition to terminating if deleted on the apiserver, it is
// discovered to have a terminal phase (Succeeded or Failed), or if it is evicted by the kubelet.
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
  // start the pod worker goroutine if it doesn't exist
    //通过一个podUpdates这个map标记是否有创建过协程,然后通过status.working标记是否有运行,没有运行的往通道里面传递,让managePodLoop得以执行
 podUpdates, exists := p.podUpdates[uid]
 if !exists {
  // We need to have a buffer here, because checkForUpdates() method that
  // puts an update into channel is called from the same goroutine where
  // the channel is consumed. However, it is guaranteed that in such case
  // the channel is empty, so buffer of size 1 is enough.
  podUpdates = make(chan podWork, 1)
  p.podUpdates[uid] = podUpdates

  // ensure that static pods start in the order they are received by UpdatePod
  if kubetypes.IsStaticPod(pod) {
   p.waitingToStartStaticPodsByFullname[status.fullname] =
    append(p.waitingToStartStaticPodsByFullname[status.fullname], uid)
  }

  // allow testing of delays in the pod update channel
  var outCh <-chan podWork
  if p.workerChannelFn != nil {
   outCh = p.workerChannelFn(uid, podUpdates)
  } else {
   outCh = podUpdates
  }

  // Creating a new pod worker either means this is a new pod, or that the
  // kubelet just restarted. In either case the kubelet is willing to believe
  // the status of the pod for the first pod worker sync. See corresponding
  // comment in syncPod.
  go func() {
   defer runtime.HandleCrash()
   p.managePodLoop(outCh)
  }()
 }
 // dispatch a request to the pod worker if none are running
 if !status.IsWorking() {
  status.working = true
  podUpdates <- work
  return
 }
}

3.2.2 managePodLoop

func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
    ...
    //最终调用到 syncPodFn这个方法
    isTerminal, err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status)
    ...
}

3.2.3 syncPod

syncPodFn这个方法 实际就是 syncPod这个方法

func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
 klog.V(4).InfoS("syncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
 defer func() {
  klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID, "isTerminal", isTerminal)
 }()

 // Latency measurements for the main workflow are relative to the
 // first time the pod was seen by kubelet.
 var firstSeenTime time.Time
 if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
  firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
 }

 // Record pod worker start latency if being created
 // TODO: make pod workers record their own latencies
    //如果是 pod 创建事件,会记录一些 pod latency 相关的 metrics;
 if updateType == kubetypes.SyncPodCreate {
  if !firstSeenTime.IsZero() {
   // This is the first time we are syncing the pod. Record the latency
   // since kubelet first saw the pod if firstSeenTime is set.
   metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
  } else {
   klog.V(3).InfoS("First seen time not recorded for pod",
    "podUID", pod.UID,
    "pod", klog.KObj(pod))
  }
 }

 // Generate final API pod status with pod and status manager status
    //生成一个 v1.PodStatus 对象,Pod的状态包括这些 Pending Running Succeeded Failed Unknown
 apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
 // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
 // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
 // set pod IP to hostIP directly in runtime.GetPodStatus
 podStatus.IPs = make([]string, 0, len(apiPodStatus.PodIPs))
 for _, ipInfo := range apiPodStatus.PodIPs {
  podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
 }
 if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 {
  podStatus.IPs = []string{apiPodStatus.PodIP}
 }

 // If the pod is terminal, we don't need to continue to setup the pod
    //PodStatus 生成之后,将发送给 Pod statusManager
 if apiPodStatus.Phase == v1.PodSucceeded || apiPodStatus.Phase == v1.PodFailed {
  kl.statusManager.SetPodStatus(pod, apiPodStatus)
  isTerminal = true
  return isTerminal, nil
 }

 // If the pod should not be running, we request the pod's containers be stopped. This is not the same
 // as termination (we want to stop the pod, but potentially restart it later if soft admission allows
 // it later). Set the status and phase appropriately
    //运行一系列 admission handlers
 runnable := kl.canRunPod(pod)
 if !runnable.Admit {
  // Pod is not runnable; and update the Pod and Container statuses to why.
  if apiPodStatus.Phase != v1.PodFailed && apiPodStatus.Phase != v1.PodSucceeded {
   apiPodStatus.Phase = v1.PodPending
  }
  apiPodStatus.Reason = runnable.Reason
  apiPodStatus.Message = runnable.Message
  // Waiting containers are not creating.
  const waitingReason = "Blocked"
  for _, cs := range apiPodStatus.InitContainerStatuses {
   if cs.State.Waiting != nil {
    cs.State.Waiting.Reason = waitingReason
   }
  }
  for _, cs := range apiPodStatus.ContainerStatuses {
   if cs.State.Waiting != nil {
    cs.State.Waiting.Reason = waitingReason
   }
  }
 }

 // Record the time it takes for the pod to become running
 // since kubelet first saw the pod if firstSeenTime is set.
 existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
 if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
  !firstSeenTime.IsZero() {
  metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
 }
 //PodStatus 生成之后,将发送给 Pod status manager
 kl.statusManager.SetPodStatus(pod, apiPodStatus)

 // Pods that are not runnable must be stopped - return a typed error to the pod worker
 if !runnable.Admit {
  klog.V(2).InfoS("Pod is not runnable and must have running containers stopped", "pod", klog.KObj(pod), "podUID", pod.UID, "message", runnable.Message)
  var syncErr error
  p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
  if err := kl.killPod(pod, p, nil); err != nil {
   kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
   syncErr = fmt.Errorf("error killing pod: %v", err)
   utilruntime.HandleError(syncErr)
  } else {
   // There was no error killing the pod, but the pod cannot be run.
   // Return an error to signal that the sync loop should back off.
   syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
  }
  return false, syncErr
 }

 // If the network plugin is not ready, only start the pod if it uses the host network
 if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
  kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
  return false, fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
 }

 // ensure the kubelet knows about referenced secrets or configmaps used by the pod
 if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
  if kl.secretManager != nil {
   kl.secretManager.RegisterPod(pod)
  }
  if kl.configMapManager != nil {
   kl.configMapManager.RegisterPod(pod)
  }
 }

 // Create Cgroups for the pod and apply resource parameters
 // to them if cgroups-per-qos flag is enabled.
 pcm := kl.containerManager.NewPodContainerManager()
 // If pod has already been terminated then we need not create
 // or update the pod's cgroup
 // TODO: once context cancellation is added this check can be removed
 if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
  // When the kubelet is restarted with the cgroups-per-qos
  // flag enabled, all the pod's running containers
  // should be killed intermittently and brought back up
  // under the qos cgroup hierarchy.
  // Check if this is the pod's first sync
  firstSync := true
  for _, containerStatus := range apiPodStatus.ContainerStatuses {
   if containerStatus.State.Running != nil {
    firstSync = false
    break
   }
  }
  // Don't kill containers in pod if pod's cgroups already
  // exists or the pod is running for the first time
  podKilled := false
  if !pcm.Exists(pod) && !firstSync {
   p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
   if err := kl.killPod(pod, p, nil); err == nil {
    podKilled = true
   } else {
    klog.ErrorS(err, "KillPod failed", "pod", klog.KObj(pod), "podStatus", podStatus)
   }
  }
  // Create and Update pod's Cgroups
  // Don't create cgroups for run once pod if it was killed above
  // The current policy is not to restart the run once pods when
  // the kubelet is restarted with the new flag as run once pods are
  // expected to run only once and if the kubelet is restarted then
  // they are not expected to run again.
  // We don't create and apply updates to cgroup if its a run once pod and was killed above
  if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
   if !pcm.Exists(pod) {
                //kubelet 将为这个 pod 创建 cgroups
    if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
     klog.V(2).InfoS("Failed to update QoS cgroups while syncing pod", "pod", klog.KObj(pod), "err", err)
    }
    if err := pcm.EnsureExists(pod); err != nil {
     kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
     return false, fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
    }
   }
  }
 }

 // Create Mirror Pod for Static Pod if it doesn't already exist
 if kubetypes.IsStaticPod(pod) {
  deleted := false
  if mirrorPod != nil {
   if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
    // The mirror pod is semantically different from the static pod. Remove
    // it. The mirror pod will get recreated later.
    klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID)
    podFullName := kubecontainer.GetPodFullName(pod)
    var err error
    deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
    if deleted {
     klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod))
    } else if err != nil {
     klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
    }
   }
  }
  if mirrorPod == nil || deleted {
   node, err := kl.GetNode()
   if err != nil || node.DeletionTimestamp != nil {
    klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
   } else {
    klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
    if err := kl.podManager.CreateMirrorPod(pod); err != nil {
     klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod))
    }
   }
  }
 }

 // Make data directories for the pod
    //创建容器目录
 if err := kl.makePodDataDirs(pod); err != nil {
  kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
  klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod))
  return false, err
 }

 // Volume manager will not mount volumes for terminating pods
 // TODO: once context cancellation is added this check can be removed
 if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
  // Wait for volumes to attach/mount
        //等待挂载volumes
  if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
   kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
   klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
   return false, err
  }
 }

 // Fetch the pull secrets for the pod
    //从 apiserver 获取 Spec.ImagePullSecrets 中指定的 secrets,注入容器
 pullSecrets := kl.getPullSecretsForPod(pod)

 // Ensure the pod is being probed
 kl.probeManager.AddPod(pod)

 // Call the container runtime's SyncPod callback
    //容器运行时(runtime)创建容器
 result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
 kl.reasonCache.Update(pod.UID, result)
 if err := result.Error(); err != nil {
  // Do not return error if the only failures were pods in backoff
  for _, r := range result.SyncResults {
   if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
    // Do not record an event here, as we keep all event logging for sync pod failures
    // local to container runtime, so we get better errors.
    return false, err
   }
  }

  return false, nil
 }

 return false, nil
}
// SyncPod syncs the running pod into the desired pod by executing following steps:
//
//  1. Compute sandbox and container changes.  计算sandbox和container变化
//  2. Kill pod sandbox if necessary.  如果sandbox变更了就要把pod kill了
//  3. Kill any containers that should not be running. kill掉pod中没有运行的container
//  4. Create sandbox if necessary. 要创建sandbox的就创建 这里就是容器运行时环境去创建容器了
//  5. Create ephemeral containers. 创建临时容器
//  6. Create init containers. 创建init容器
//  7. Create normal containers. 创建正常(业务)容器
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
}

3.2.4 startContainer


// startContainer starts a container and returns a message indicates why it is failed on error.
// It starts the container through the following steps:
// * pull the image  拉镜像
// * create the container  创建容器
// * start the container 启动容器
// * run the post start lifecycle hooks (if applicable) 执行post start hook
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
 
}

4. 关闭Pod

  • 删除一个Pod后系统默认给30s的宽限期,并将它的状态设置成Terminating

  • kublectl发现Pod状态为Terminating则尝试执行preStop生命周期勾子,并可多给2s的宽限期

  • 同时控制面将Pod中svc的endpoint中去除

  • 宽限期到则发送TERM信号

  • Pod还不关闭再发送SIGKILL强制关闭,并清理sandbox

  • kubelet删除Pod资源对象

syncPod:

func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
    
    ...
    if !runnable.Admit {
  klog.V(2).InfoS("Pod is not runnable and must have running containers stopped", "pod", klog.KObj(pod), "podUID", pod.UID, "message", runnable.Message)
  var syncErr error
  p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
        //关闭Pod
  if err := kl.killPod(pod, p, nil); err != nil {
   kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
   syncErr = fmt.Errorf("error killing pod: %v", err)
   utilruntime.HandleError(syncErr)
  } else {
   // There was no error killing the pod, but the pod cannot be run.
   // Return an error to signal that the sync loop should back off.
   syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
  }
  return false, syncErr
 }
    ...
}

// killPodWithSyncResult kills a runningPod and returns SyncResult.
// Note: The pod passed in could be *nil* when kubelet restarted.
func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) { 
    //kill 属于该pod的容器
 killContainerResults := m.killContainersWithSyncResult(pod, runningPod, gracePeriodOverride)
 for _, containerResult := range killContainerResults {
  result.AddSyncResult(containerResult)
 }

 // stop sandbox, the sandbox will be removed in GarbageCollect
 killSandboxResult := kubecontainer.NewSyncResult(kubecontainer.KillPodSandbox, runningPod.ID)
 result.AddSyncResult(killSandboxResult)
 // Stop all sandboxes belongs to same pod
    //停止属于该pod的sandbox
 for _, podSandbox := range runningPod.Sandboxes {
  if err := m.runtimeService.StopPodSandbox(podSandbox.ID.ID); err != nil && !crierror.IsNotFound(err) {
   killSandboxResult.Fail(kubecontainer.ErrKillPodSandbox, err.Error())
   klog.ErrorS(nil, "Failed to stop sandbox", "podSandboxID", podSandbox.ID)
  }
 }

 return
}

// killContainer kills a container through the following steps:
// * Run the pre-stop lifecycle hooks (if applicable).
// * Stop the container.
func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, reason containerKillReason, gracePeriodOverride *int64) error {
   var containerSpec *v1.Container
   if pod != nil {
      if containerSpec = kubecontainer.GetContainerSpec(pod, containerName); containerSpec == nil {
         return fmt.Errorf("failed to get containerSpec %q (id=%q) in pod %q when killing container for reason %q",
            containerName, containerID.String(), format.Pod(pod), message)
      }
   } else {
      // Restore necessary information if one of the specs is nil.
      restoredPod, restoredContainer, err := m.restoreSpecsFromContainerLabels(containerID)
      if err != nil {
         return err
      }
      pod, containerSpec = restoredPod, restoredContainer
   }

   // From this point, pod and container must be non-nil.
    //宽限时间设置
   gracePeriod := int64(minimumGracePeriodInSeconds)
   switch {
   case pod.DeletionGracePeriodSeconds != nil:
      gracePeriod = *pod.DeletionGracePeriodSeconds
   case pod.Spec.TerminationGracePeriodSeconds != nil:
      gracePeriod = *pod.Spec.TerminationGracePeriodSeconds

      switch reason {
      case reasonStartupProbe:
         if containerSpec.StartupProbe != nil && containerSpec.StartupProbe.TerminationGracePeriodSeconds != nil {
            gracePeriod = *containerSpec.StartupProbe.TerminationGracePeriodSeconds
         }
      case reasonLivenessProbe:
         if containerSpec.LivenessProbe != nil && containerSpec.LivenessProbe.TerminationGracePeriodSeconds != nil {
            gracePeriod = *containerSpec.LivenessProbe.TerminationGracePeriodSeconds
         }
      }
   }

   if len(message) == 0 {
      message = fmt.Sprintf("Stopping container %s", containerSpec.Name)
   }
   m.recordContainerEvent(pod, containerSpec, containerID.ID, v1.EventTypeNormal, events.KillingContainer, message)

   // Run internal pre-stop lifecycle hook
  
   if err := m.internalLifecycle.PreStopContainer(containerID.ID); err != nil {
      return err
   }

   // Run the pre-stop lifecycle hooks if applicable and if there is enough time to run it
     //执行preStop钩子
   if containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil && gracePeriod > 0 {
       //这里执行完会返回剩余的宽限时间
      gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod)
   }
   // always give containers a minimal shutdown window to avoid unnecessary SIGKILLs
  //宽限时间不够可以再多给2s
    if gracePeriod < minimumGracePeriodInSeconds {
      gracePeriod = minimumGracePeriodInSeconds
   }
   if gracePeriodOverride != nil {
      gracePeriod = *gracePeriodOverride
      klog.V(3).InfoS("Killing container with a grace period override", "pod", klog.KObj(pod), "podUID", pod.UID,
         "containerName", containerName, "containerID", containerID.String(), "gracePeriod", gracePeriod)
   }

   klog.V(2).InfoS("Killing container with a grace period", "pod", klog.KObj(pod), "podUID", pod.UID,
      "containerName", containerName, "containerID", containerID.String(), "gracePeriod", gracePeriod)
 //停止容器
   err := m.runtimeService.StopContainer(containerID.ID, gracePeriod)
   if err != nil && !crierror.IsNotFound(err) {
      klog.ErrorS(err, "Container termination failed with gracePeriod", "pod", klog.KObj(pod), "podUID", pod.UID,
         "containerName", containerName, "containerID", containerID.String(), "gracePeriod", gracePeriod)
      return err
   }
   klog.V(3).InfoS("Container exited normally", "pod", klog.KObj(pod), "podUID", pod.UID,
      "containerName", containerName, "containerID", containerID.String())

   return nil
}

4.1 后续清理资源

停止pod后,要进行清理,是由statusManger执行的

//pkg/kubelet/status/status_manager.go
func (m *manager) Start() {
 // Don't start the status manager if we don't have a client. This will happen
 // on the master, where the kubelet is responsible for bootstrapping the pods
 // of the master components.
 if m.kubeClient == nil {
  klog.InfoS("Kubernetes client is nil, not starting status manager")
  return
 }

 klog.InfoS("Starting to sync pod status with apiserver")

 //nolint:staticcheck // SA1015 Ticker can leak since this is only called once and doesn't handle termination.
 syncTicker := time.NewTicker(syncPeriod).C

 // syncPod and syncBatch share the same go routine to avoid sync races.
 go wait.Forever(func() {
  for {
   select {
   case syncRequest := <-m.podStatusChannel:
    klog.V(5).InfoS("Status Manager: syncing pod with status from podStatusChannel",
     "podUID", syncRequest.podUID,
     "statusVersion", syncRequest.status.version,
     "status", syncRequest.status.status)
    m.syncPod(syncRequest.podUID, syncRequest.status)
   case <-syncTicker:
    klog.V(5).InfoS("Status Manager: syncing batch")
    // remove any entries in the status channel since the batch will handle them
    for i := len(m.podStatusChannel); i > 0; i-- {
     <-m.podStatusChannel
    }
    m.syncBatch()
   }
  }
 }, 0)
}

func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
 // We don't handle graceful deletion of mirror pods.
 if m.canBeDeleted(pod, status.status) {
  deleteOptions := metav1.DeleteOptions{
   GracePeriodSeconds: new(int64),
   // Use the pod UID as the precondition for deletion to prevent deleting a
   // newly created pod with the same name and namespace.
   Preconditions: metav1.NewUIDPreconditions(string(pod.UID)),
  }
  err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions)
  if err != nil {
   klog.InfoS("Failed to delete status for pod", "pod", klog.KObj(pod), "err", err)
   return
  }
  klog.V(3).InfoS("Pod fully terminated and removed from etcd", "pod", klog.KObj(pod))
  m.deletePodStatus(uid)
 }
}

canDelete是判断是否能执行pod资源删除,最终会调用

func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
 if kl.podWorkers.CouldHaveRunningContainers(pod.UID) {
  // We shouldn't delete pods that still have running containers
  klog.V(3).InfoS("Pod is terminated, but some containers are still running", "pod", klog.KObj(pod))
  return false
 }
 if count := countRunningContainerStatus(status); count > 0 {
  // We shouldn't delete pods until the reported pod status contains no more running containers (the previous
  // check ensures no more status can be generated, this check verifies we have seen enough of the status)
  klog.V(3).InfoS("Pod is terminated, but some container status has not yet been reported", "pod", klog.KObj(pod), "running", count)
  return false
 }
 if kl.podVolumesExist(pod.UID) && !kl.keepTerminatedPodVolumes {
  // We shouldn't delete pods whose volumes have not been cleaned up if we are not keeping terminated pod volumes
  klog.V(3).InfoS("Pod is terminated, but some volumes have not been cleaned up", "pod", klog.KObj(pod))
  return false
 }
 if kl.kubeletConfiguration.CgroupsPerQOS {
  pcm := kl.containerManager.NewPodContainerManager()
  if pcm.Exists(pod) {
   klog.V(3).InfoS("Pod is terminated, but pod cgroup sandbox has not been cleaned up", "pod", klog.KObj(pod))
   return false
  }
 }

 // Note: we leave pod containers to be reclaimed in the background since dockershim requires the
 // container for retrieving logs and we want to make sure logs are available until the pod is
 // physically deleted.

 klog.V(3).InfoS("Pod is terminated and all resources are reclaimed", "pod", klog.KObj(pod))
 return true
}


k8s源码解析(6)--kubelet
http://47.123.5.226:8090//archives/k8syuan-ma-jie-xi-6---kubelet
作者
pony
发布于
2024年05月09日
许可协议