美文网首页
k8s 之 kubelet 源码简单分析

k8s 之 kubelet 源码简单分析

作者: wwq2020 | 来源:发表于2020-07-16 17:24 被阅读0次

简介

kubelet 监听资源的变更,并通过容器运行时运行pod
对Pod进行健康检查,并且把状态通过apiserver的接口更新到etcd

cmd/kubelet/kubelet.go 中

func main() {
    ...
    command := app.NewKubeletCommand()
    ...
    if err := command.Execute(); err != nil {
        os.Exit(1)
    }
}

cmd/kubelet/app/server.go 中

func NewKubeletCommand() *cobra.Command {
    ...
    kubeletServer := &options.KubeletServer{
        KubeletFlags:         *kubeletFlags,
        KubeletConfiguration: *kubeletConfig,
    }

    kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
    if err != nil {
        klog.Fatal(err)
    }
    ...
    if err := Run(kubeletServer, kubeletDeps, utilfeature.              DefaultFeatureGate, stopCh); err != nil {
                klog.Fatal(err)
    }
    ...
}

func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.FeatureGate) (*kubelet.Dependencies, error) {
    // Initialize the TLS Options
    tlsOptions, err := InitializeTLS(&s.KubeletFlags, &s.KubeletConfiguration)
    if err != nil {
        return nil, err
    }

    mounter := mount.New(s.ExperimentalMounterPath)
    subpather := subpath.New(mounter)
    hu := hostutil.NewHostUtil()
    var pluginRunner = exec.New()

    var dockerOptions *kubelet.DockerOptions
    if s.ContainerRuntime == kubetypes.DockerContainerRuntime {
        dockerOptions = &kubelet.DockerOptions{
            DockerEndpoint:            s.DockerEndpoint,
            RuntimeRequestTimeout:     s.RuntimeRequestTimeout.Duration,
            ImagePullProgressDeadline: s.ImagePullProgressDeadline.Duration,
        }
    }

    plugins, err := ProbeVolumePlugins(featureGate)
    if err != nil {
        return nil, err
    }
    return &kubelet.Dependencies{
        Auth:                nil, // default does not enforce auth[nz]
        CAdvisorInterface:   nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
        Cloud:               nil, // cloud provider might start background processes
        ContainerManager:    nil,
        DockerOptions:       dockerOptions,
        KubeClient:          nil,
        HeartbeatClient:     nil,
        EventClient:         nil,
        HostUtil:            hu,
        Mounter:             mounter,
        Subpather:           subpather,
        OOMAdjuster:         oom.NewOOMAdjuster(),
        OSInterface:         kubecontainer.RealOS{},
        VolumePlugins:       plugins,
        DynamicPluginProber: GetDynamicPluginProber(s.VolumePluginDir, pluginRunner),
        TLSOptions:          tlsOptions}, nil
}

func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) error {
    // To help debugging, immediately log version
    klog.Infof("Version: %+v", version.Get())
    if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
        return fmt.Errorf("failed OS init: %v", err)
    }
    if err := run(s, kubeDeps, featureGate, stopCh); err != nil {
        return fmt.Errorf("failed to run Kubelet: %v", err)
    }
    return nil
}

func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) (err error) {
    ...
    if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
        return err
    }
    ...
}

func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
    hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
    if err != nil {
        return err
    }
    // Query the cloud provider for our node name, default to hostname if kubeDeps.Cloud == nil
    nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
    if err != nil {
        return err
    }
    hostnameOverridden := len(kubeServer.HostnameOverride) > 0
    // Setup event recorder if required.
    makeEventRecorder(kubeDeps, nodeName)

    capabilities.Initialize(capabilities.Capabilities{
        AllowPrivileged: true,
    })

    credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
    klog.V(2).Infof("Using root directory: %v", kubeServer.RootDirectory)

    if kubeDeps.OSInterface == nil {
        kubeDeps.OSInterface = kubecontainer.RealOS{}
    }

    k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
        kubeDeps,
        &kubeServer.ContainerRuntimeOptions,
        kubeServer.ContainerRuntime,
        hostname,
        hostnameOverridden,
        nodeName,
        kubeServer.NodeIP,
        kubeServer.ProviderID,
        kubeServer.CloudProvider,
        kubeServer.CertDirectory,
        kubeServer.RootDirectory,
        kubeServer.RegisterNode,
        kubeServer.RegisterWithTaints,
        kubeServer.AllowedUnsafeSysctls,
        kubeServer.ExperimentalMounterPath,
        kubeServer.KernelMemcgNotification,
        kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
        kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
        kubeServer.MinimumGCAge,
        kubeServer.MaxPerPodContainerCount,
        kubeServer.MaxContainerCount,
        kubeServer.MasterServiceNamespace,
        kubeServer.RegisterSchedulable,
        kubeServer.KeepTerminatedPodVolumes,
        kubeServer.NodeLabels,
        kubeServer.SeccompProfileRoot,
        kubeServer.BootstrapCheckpointPath,
        kubeServer.NodeStatusMaxImages)
    if err != nil {
        return fmt.Errorf("failed to create kubelet: %v", err)
    }

    // NewMainKubelet should have set up a pod source config if one didn't exist
    // when the builder was run. This is just a precaution.
    if kubeDeps.PodConfig == nil {
        return fmt.Errorf("failed to create kubelet, pod source config was nil")
    }
    podCfg := kubeDeps.PodConfig

    if err := rlimit.SetNumFiles(uint64(kubeServer.MaxOpenFiles)); err != nil {
        klog.Errorf("Failed to set rlimit on max file handles: %v", err)
    }

    // process pods and exit.
    if runOnce {
        if _, err := k.RunOnce(podCfg.Updates()); err != nil {
            return fmt.Errorf("runonce failed: %v", err)
        }
        klog.Info("Started kubelet as runonce")
    } else {
        startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
        klog.Info("Started kubelet")
    }
    return nil
}

func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
    // start the kubelet
    go k.Run(podCfg.Updates())

    // start the kubelet server
    if enableServer {
        go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)

    }
    if kubeCfg.ReadOnlyPort > 0 {
        go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
    }
    if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
        go k.ListenAndServePodResources()
    }
}


func createAndInitKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
    kubeDeps *kubelet.Dependencies,
    crOptions *config.ContainerRuntimeOptions,
    containerRuntime string,
    hostname string,
    hostnameOverridden bool,
    nodeName types.NodeName,
    nodeIP string,
    providerID string,
    cloudProvider string,
    certDirectory string,
    rootDirectory string,
    registerNode bool,
    registerWithTaints []api.Taint,
    allowedUnsafeSysctls []string,
    experimentalMounterPath string,
    kernelMemcgNotification bool,
    experimentalCheckNodeCapabilitiesBeforeMount bool,
    experimentalNodeAllocatableIgnoreEvictionThreshold bool,
    minimumGCAge metav1.Duration,
    maxPerPodContainerCount int32,
    maxContainerCount int32,
    masterServiceNamespace string,
    registerSchedulable bool,
    keepTerminatedPodVolumes bool,
    nodeLabels map[string]string,
    seccompProfileRoot string,
    bootstrapCheckpointPath string,
    nodeStatusMaxImages int32) (k kubelet.Bootstrap, err error) {
    // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
    // up into "per source" synchronizations

    k, err = kubelet.NewMainKubelet(kubeCfg,
        kubeDeps,
        crOptions,
        containerRuntime,
        hostname,
        hostnameOverridden,
        nodeName,
        nodeIP,
        providerID,
        cloudProvider,
        certDirectory,
        rootDirectory,
        registerNode,
        registerWithTaints,
        allowedUnsafeSysctls,
        experimentalMounterPath,
        kernelMemcgNotification,
        experimentalCheckNodeCapabilitiesBeforeMount,
        experimentalNodeAllocatableIgnoreEvictionThreshold,
        minimumGCAge,
        maxPerPodContainerCount,
        maxContainerCount,
        masterServiceNamespace,
        registerSchedulable,
        keepTerminatedPodVolumes,
        nodeLabels,
        seccompProfileRoot,
        bootstrapCheckpointPath,
    if err != nil {
        return nil, err
    }

    k.BirthCry()

    k.StartGarbageCollection()

    return k, nil
}

pkg/kubelet/kubelet.go 中

func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
    kubeDeps *Dependencies,
    crOptions *config.ContainerRuntimeOptions,
    containerRuntime string,
    hostname string,
    hostnameOverridden bool,
    nodeName types.NodeName,
    nodeIP string,
    providerID string,
    cloudProvider string,
    certDirectory string,
    rootDirectory string,
    registerNode bool,
    registerWithTaints []api.Taint,
    allowedUnsafeSysctls []string,
    experimentalMounterPath string,
    kernelMemcgNotification bool,
    experimentalCheckNodeCapabilitiesBeforeMount bool,
    experimentalNodeAllocatableIgnoreEvictionThreshold bool,
    minimumGCAge metav1.Duration,
    maxPerPodContainerCount int32,
    maxContainerCount int32,
    masterServiceNamespace string,
    registerSchedulable bool,
    keepTerminatedPodVolumes bool,
    nodeLabels map[string]string,
    seccompProfileRoot string,
    bootstrapCheckpointPath string,
    nodeStatusMaxImages int32) (*Kubelet, error) {
    ...
    klet.probeManager = prober.NewManager(
        klet.statusManager,
        klet.livenessManager,
        klet.startupManager,
        klet.runner,
        kubeDeps.Recorder)
    ...
    klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
    ...
}

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    ...
     kl.probeManager.Start()
    ...
    kl.syncLoop(updates, kl)
}

func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    klog.Info("Starting kubelet main sync loop.")
    // The syncTicker wakes up kubelet to checks if there are any pod workers
    // that need to be sync'd. A one-second period is sufficient because the
    // sync interval is defaulted to 10s.
    syncTicker := time.NewTicker(time.Second)
    defer syncTicker.Stop()
    housekeepingTicker := time.NewTicker(housekeepingPeriod)
    defer housekeepingTicker.Stop()
    plegCh := kl.pleg.Watch()
    const (
        base   = 100 * time.Millisecond
        max    = 5 * time.Second
        factor = 2
    )
    duration := base
    // Responsible for checking limits in resolv.conf
    // The limits do not have anything to do with individual pods
    // Since this is called in syncLoop, we don't need to call it anywhere else
    if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
        kl.dnsConfigurer.CheckLimitsForResolvConf()
    }

    for {
        if err := kl.runtimeState.runtimeErrors(); err != nil {
            klog.Errorf("skipping pod synchronization - %v", err)
            // exponential backoff
            time.Sleep(duration)
            duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
            continue
        }
        // reset backoff if we have a success
        duration = base

        kl.syncLoopMonitor.Store(kl.clock.Now())
        if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
            break
        }
        kl.syncLoopMonitor.Store(kl.clock.Now())
    }
}

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    select {
    case u, open := <-configCh:
        // Update from a config source; dispatch it to the right handler
        // callback.
        if !open {
            klog.Errorf("Update channel is closed. Exiting the sync loop.")
            return false
        }

        switch u.Op {
        case kubetypes.ADD:
            klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
            // After restarting, kubelet will get all existing pods through
            // ADD as if they are new pods. These pods will then go through the
            // admission process and *may* be rejected. This can be resolved
            // once we have checkpointing.
            handler.HandlePodAdditions(u.Pods)
        case kubetypes.UPDATE:
            klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.REMOVE:
            klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
            handler.HandlePodRemoves(u.Pods)
        case kubetypes.RECONCILE:
            klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
            handler.HandlePodReconcile(u.Pods)
        case kubetypes.DELETE:
            klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
            // DELETE is treated as a UPDATE because of graceful deletion.
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.RESTORE:
            klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
            // These are pods restored from the checkpoint. Treat them as new
            // pods.
            handler.HandlePodAdditions(u.Pods)
        case kubetypes.SET:
            // TODO: Do we want to support this?
            klog.Errorf("Kubelet does not support snapshot update")
        }

        if u.Op != kubetypes.RESTORE {
            // If the update type is RESTORE, it means that the update is from
            // the pod checkpoints and may be incomplete. Do not mark the
            // source as ready.

            // Mark the source ready after receiving at least one update from the
            // source. Once all the sources are marked ready, various cleanup
            // routines will start reclaiming resources. It is important that this
            // takes place only after kubelet calls the update handler to process
            // the update to ensure the internal pod cache is up-to-date.
            kl.sourcesReady.AddSource(u.Source)
        }
    case e := <-plegCh:
        if isSyncPodWorthy(e) {
            // PLEG event for a pod; sync it.
            if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
                klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
                handler.HandlePodSyncs([]*v1.Pod{pod})
            } else {
                // If the pod no longer exists, ignore the event.
                klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
            }
        }

        if e.Type == pleg.ContainerDied {
            if containerID, ok := e.Data.(string); ok {
                kl.cleanUpContainersInPod(e.ID, containerID)
            }
        }
    case <-syncCh:
        // Sync pods waiting for sync
        podsToSync := kl.getPodsToSync()
        if len(podsToSync) == 0 {
            break
        }
        klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
        handler.HandlePodSyncs(podsToSync)
    case update := <-kl.livenessManager.Updates():
        if update.Result == proberesults.Failure {
            // The liveness manager detected a failure; sync the pod.

            // We should not use the pod from livenessManager, because it is never updated after
            // initialization.
            pod, ok := kl.podManager.GetPodByUID(update.PodUID)
            if !ok {
                // If the pod no longer exists, ignore the update.
                klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
                break
            }
            klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
            handler.HandlePodSyncs([]*v1.Pod{pod})
        }
    case <-housekeepingCh:
        if !kl.sourcesReady.AllReady() {
            // If the sources aren't ready or volume manager has not yet synced the states,
            // skip housekeeping, as we may accidentally delete pods from unready sources.
            klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
        } else {
            klog.V(4).Infof("SyncLoop (housekeeping)")
            if err := handler.HandlePodCleanups(); err != nil {
                klog.Errorf("Failed cleaning pods: %v", err)
            }
        }
    }
    return true
}


func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    start := kl.clock.Now()
    sort.Sort(sliceutils.PodsByCreationTime(pods))
    for _, pod := range pods {
        existingPods := kl.podManager.GetPods()
        // Always add the pod to the pod manager. Kubelet relies on the pod
        // manager as the source of truth for the desired state. If a pod does
        // not exist in the pod manager, it means that it has been deleted in
        // the apiserver and no action (other than cleanup) is required.
        kl.podManager.AddPod(pod)

        if kubetypes.IsMirrorPod(pod) {
            kl.handleMirrorPod(pod, start)
            continue
        }

        if !kl.podIsTerminated(pod) {
            // Only go through the admission process if the pod is not
            // terminated.

            // We failed pods that we rejected, so activePods include all admitted
            // pods that are alive.
            activePods := kl.filterOutTerminatedPods(existingPods)

            // Check if we can admit the pod; if not, reject it.
            if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
                kl.rejectPod(pod, reason, message)
                continue
            }
        }
        mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
        kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
        kl.probeManager.AddPod(pod)
    }
}

func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    // check whether we are ready to delete the pod from the API server (all status up to date)
    containersTerminal, podWorkerTerminal := kl.podAndContainersAreTerminal(pod)
    if pod.DeletionTimestamp != nil && containersTerminal {
        klog.V(4).Infof("Pod %q has completed execution and should be deleted from the API server: %s", format.Pod(pod), syncType)
        kl.statusManager.TerminatePod(pod)
        return
    }

    // optimization: avoid invoking the pod worker if no further changes are possible to the pod definition
    if podWorkerTerminal {
        klog.V(4).Infof("Pod %q has completed, ignoring remaining sync work: %s", format.Pod(pod), syncType)
        return
    }

    // Run the sync in an async worker.
    kl.podWorkers.UpdatePod(&UpdatePodOptions{
        Pod:        pod,
        MirrorPod:  mirrorPod,
        UpdateType: syncType,
        OnCompleteFunc: func(err error) {
            if err != nil {
                metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
            }
        },
    })
    // Note the number of containers for new pods.
    if syncType == kubetypes.SyncPodCreate {
        metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
    }
}


func (kl *Kubelet) syncPod(o syncPodOptions) error {
    // pull out the required options
    pod := o.pod
    mirrorPod := o.mirrorPod
    podStatus := o.podStatus
    updateType := o.updateType

    // if we want to kill a pod, do it now!
    if updateType == kubetypes.SyncPodKill {
        killPodOptions := o.killPodOptions
        if killPodOptions == nil || killPodOptions.PodStatusFunc == nil {
            return fmt.Errorf("kill pod options are required if update type is kill")
        }
        apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus)
        kl.statusManager.SetPodStatus(pod, apiPodStatus)
        // we kill the pod with the specified grace period since this is a termination
        if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
            kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
            // there was an error killing the pod, so we return that error directly
            utilruntime.HandleError(err)
            return err
        }
        return nil
    }

    // Latency measurements for the main workflow are relative to the
    // first time the pod was seen by the API server.
    var firstSeenTime time.Time
    if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
        firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
    }

    // Record pod worker start latency if being created
    // TODO: make pod workers record their own latencies
    if updateType == kubetypes.SyncPodCreate {
        if !firstSeenTime.IsZero() {
            // This is the first time we are syncing the pod. Record the latency
            // since kubelet first saw the pod if firstSeenTime is set.
            metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
        } else {
            klog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
        }
    }

    // Generate final API pod status with pod and status manager status
    apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
    // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
    // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
    // set pod IP to hostIP directly in runtime.GetPodStatus
    podStatus.IPs = make([]string, 0, len(apiPodStatus.PodIPs))
    for _, ipInfo := range apiPodStatus.PodIPs {
        podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
    }

    if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 {
        podStatus.IPs = []string{apiPodStatus.PodIP}
    }

    // Record the time it takes for the pod to become running.
    existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
    if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
        !firstSeenTime.IsZero() {
        metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
    }

    runnable := kl.canRunPod(pod)
    if !runnable.Admit {
        // Pod is not runnable; update the Pod and Container statuses to why.
        apiPodStatus.Reason = runnable.Reason
        apiPodStatus.Message = runnable.Message
        // Waiting containers are not creating.
        const waitingReason = "Blocked"
        for _, cs := range apiPodStatus.InitContainerStatuses {
            if cs.State.Waiting != nil {
                cs.State.Waiting.Reason = waitingReason
            }
        }
        for _, cs := range apiPodStatus.ContainerStatuses {
            if cs.State.Waiting != nil {
                cs.State.Waiting.Reason = waitingReason
            }
        }
    }

    // Update status in the status manager
    kl.statusManager.SetPodStatus(pod, apiPodStatus)

    // Kill pod if it should not be running
    if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
        var syncErr error
        if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
            kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
            syncErr = fmt.Errorf("error killing pod: %v", err)
            utilruntime.HandleError(syncErr)
        } else {
            if !runnable.Admit {
                // There was no error killing the pod, but the pod cannot be run.
                // Return an error to signal that the sync loop should back off.
                syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
            }
        }
        return syncErr
    }

    // If the network plugin is not ready, only start the pod if it uses the host network
    if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
        kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
        return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
    }

    // Create Cgroups for the pod and apply resource parameters
    // to them if cgroups-per-qos flag is enabled.
    pcm := kl.containerManager.NewPodContainerManager()
    // If pod has already been terminated then we need not create
    // or update the pod's cgroup
    if !kl.podIsTerminated(pod) {
        // When the kubelet is restarted with the cgroups-per-qos
        // flag enabled, all the pod's running containers
        // should be killed intermittently and brought back up
        // under the qos cgroup hierarchy.
        // Check if this is the pod's first sync
        firstSync := true
        for _, containerStatus := range apiPodStatus.ContainerStatuses {
            if containerStatus.State.Running != nil {
                firstSync = false
                break
            }
        }
        // Don't kill containers in pod if pod's cgroups already
        // exists or the pod is running for the first time
        podKilled := false
        if !pcm.Exists(pod) && !firstSync {
            if err := kl.killPod(pod, nil, podStatus, nil); err == nil {
                podKilled = true
            }
        }
        // Create and Update pod's Cgroups
        // Don't create cgroups for run once pod if it was killed above
        // The current policy is not to restart the run once pods when
        // the kubelet is restarted with the new flag as run once pods are
        // expected to run only once and if the kubelet is restarted then
        // they are not expected to run again.
        // We don't create and apply updates to cgroup if its a run once pod and was killed above
        if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
            if !pcm.Exists(pod) {
                if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
                    klog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)
                }
                if err := pcm.EnsureExists(pod); err != nil {
                    kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
                    return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
                }
            }
        }
    }

    // Create Mirror Pod for Static Pod if it doesn't already exist
    if kubetypes.IsStaticPod(pod) {
        podFullName := kubecontainer.GetPodFullName(pod)
        deleted := false
        if mirrorPod != nil {
            if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
                // The mirror pod is semantically different from the static pod. Remove
                // it. The mirror pod will get recreated later.
                klog.Infof("Trying to delete pod %s %v", podFullName, mirrorPod.ObjectMeta.UID)
                var err error
                deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
                if deleted {
                    klog.Warningf("Deleted mirror pod %q because it is outdated", format.Pod(mirrorPod))
                } else if err != nil {
                    klog.Errorf("Failed deleting mirror pod %q: %v", format.Pod(mirrorPod), err)
                }
            }
        }
        if mirrorPod == nil || deleted {
            node, err := kl.GetNode()
            if err != nil || node.DeletionTimestamp != nil {
                klog.V(4).Infof("No need to create a mirror pod, since node %q has been removed from the cluster", kl.nodeName)
            } else {
                klog.V(4).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
                if err := kl.podManager.CreateMirrorPod(pod); err != nil {
                    klog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)
                }
            }
        }
    }

    // Make data directories for the pod
    if err := kl.makePodDataDirs(pod); err != nil {
        kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
        klog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
        return err
    }

    // Volume manager will not mount volumes for terminated pods
    if !kl.podIsTerminated(pod) {
        // Wait for volumes to attach/mount
        if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
            kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
            klog.Errorf("Unable to attach or mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
            return err
        }
    }

    // Fetch the pull secrets for the pod
    pullSecrets := kl.getPullSecretsForPod(pod)

    // Call the container runtime's SyncPod callback
    result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
    kl.reasonCache.Update(pod.UID, result)
    if err := result.Error(); err != nil {
        // Do not return error if the only failures were pods in backoff
        for _, r := range result.SyncResults {
            if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
                // Do not record an event here, as we keep all event logging for sync pod failures
                // local to container runtime so we get better errors
                return err
            }
        }

        return nil
    }

    return nil
}

pkg/kubelet/pod_workers.go 中

func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQueue queue.WorkQueue,
    resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers {
    return &podWorkers{
        podUpdates:                map[types.UID]chan UpdatePodOptions{},
        isWorking:                 map[types.UID]bool{},
        lastUndeliveredWorkUpdate: map[types.UID]UpdatePodOptions{},
        syncPodFn:                 syncPodFn,
        recorder:                  recorder,
        workQueue:                 workQueue,
        resyncInterval:            resyncInterval,
        backOffPeriod:             backOffPeriod,
        podCache:                  podCache,
    }
}

func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
    pod := options.Pod
    uid := pod.UID
    var podUpdates chan UpdatePodOptions
    var exists bool

    p.podLock.Lock()
    defer p.podLock.Unlock()
    if podUpdates, exists = p.podUpdates[uid]; !exists {
        // We need to have a buffer here, because checkForUpdates() method that
        // puts an update into channel is called from the same goroutine where
        // the channel is consumed. However, it is guaranteed that in such case
        // the channel is empty, so buffer of size 1 is enough.
        podUpdates = make(chan UpdatePodOptions, 1)
        p.podUpdates[uid] = podUpdates

        // Creating a new pod worker either means this is a new pod, or that the
        // kubelet just restarted. In either case the kubelet is willing to believe
        // the status of the pod for the first pod worker sync. See corresponding
        // comment in syncPod.
        go func() {
            defer runtime.HandleCrash()
            p.managePodLoop(podUpdates)
        }()
    }
    if !p.isWorking[pod.UID] {
        p.isWorking[pod.UID] = true
        podUpdates <- *options
    } else {
        // if a request to kill a pod is pending, we do not let anything overwrite that request.
        update, found := p.lastUndeliveredWorkUpdate[pod.UID]
        if !found || update.UpdateType != kubetypes.SyncPodKill {
            p.lastUndeliveredWorkUpdate[pod.UID] = *options
        }
    }
}

func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
    var lastSyncTime time.Time
    for update := range podUpdates {
        err := func() error {
            podUID := update.Pod.UID
            // This is a blocking call that would return only if the cache
            // has an entry for the pod that is newer than minRuntimeCache
            // Time. This ensures the worker doesn't start syncing until
            // after the cache is at least newer than the finished time of
            // the previous sync.
            status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
            if err != nil {
                // This is the legacy event thrown by manage pod loop
                // all other events are now dispatched from syncPodFn
                p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
                return err
            }
            err = p.syncPodFn(syncPodOptions{
                mirrorPod:      update.MirrorPod,
                pod:            update.Pod,
                podStatus:      status,
                killPodOptions: update.KillPodOptions,
                updateType:     update.UpdateType,
            })
            lastSyncTime = time.Now()
            return err
        }()
        // notify the call-back function if the operation succeeded or not
        if update.OnCompleteFunc != nil {
            update.OnCompleteFunc(err)
        }
        if err != nil {
            // IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors
            klog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err)
        }
        p.wrapUp(update.Pod.UID, err)
    }
}

pkg/kubelet/config/config.go中

func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
    updates := make(chan kubetypes.PodUpdate, 50)
    storage := newPodStorage(updates, mode, recorder)
    podConfig := &PodConfig{
        pods:    storage,
        mux:     config.NewMux(storage),
        updates: updates,
        sources: sets.String{},
    }
    return podConfig
}

func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage {
    return &podStorage{
        pods:        make(map[string]map[types.UID]*v1.Pod),
        mode:        mode,
        updates:     updates,
        sourcesSeen: sets.String{},
        recorder:    recorder,
    }
}

func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate {
    return c.updates
}

func (c *PodConfig) Channel(source string) chan<- interface{} {
    c.sourcesLock.Lock()
    defer c.sourcesLock.Unlock()
    c.sources.Insert(source)
    return c.mux.Channel(source)
}

func NewMux(merger Merger) *Mux {
    mux := &Mux{
        sources: make(map[string]chan interface{}),
        merger:  merger,
    }
    return mux
}

func (m *Mux) Channel(source string) chan interface{} {
    if len(source) == 0 {
        panic("Channel given an empty name")
    }
    m.sourceLock.Lock()
    defer m.sourceLock.Unlock()
    channel, exists := m.sources[source]
    if exists {
        return channel
    }
    newChannel := make(chan interface{})
    m.sources[source] = newChannel
    go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
    return newChannel
}

func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
    for update := range listenChannel {
        m.merger.Merge(source, update)
    }
}

func (s *podStorage) Merge(source string, change interface{}) error {
    s.updateLock.Lock()
    defer s.updateLock.Unlock()

    seenBefore := s.sourcesSeen.Has(source)
    adds, updates, deletes, removes, reconciles, restores := s.merge(source, change)
    firstSet := !seenBefore && s.sourcesSeen.Has(source)

    // deliver update notifications
    switch s.mode {
    case PodConfigNotificationIncremental:
        if len(removes.Pods) > 0 {
            s.updates <- *removes
        }
        if len(adds.Pods) > 0 {
            s.updates <- *adds
        }
        if len(updates.Pods) > 0 {
            s.updates <- *updates
        }
        if len(deletes.Pods) > 0 {
            s.updates <- *deletes
        }
        if len(restores.Pods) > 0 {
            s.updates <- *restores
        }
        if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
            // Send an empty update when first seeing the source and there are
            // no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
            // the source is ready.
            s.updates <- *adds
        }
        // Only add reconcile support here, because kubelet doesn't support Snapshot update now.
        if len(reconciles.Pods) > 0 {
            s.updates <- *reconciles
        }

    case PodConfigNotificationSnapshotAndUpdates:
        if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
            s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
        }
        if len(updates.Pods) > 0 {
            s.updates <- *updates
        }
        if len(deletes.Pods) > 0 {
            s.updates <- *deletes
        }

    case PodConfigNotificationSnapshot:
        if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
            s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*v1.Pod), Op: kubetypes.SET, Source: source}
        }

    case PodConfigNotificationUnknown:
        fallthrough
    default:
        panic(fmt.Sprintf("unsupported PodConfigNotificationMode: %#v", s.mode))
    }

    return nil
}

pkg/kubelet/config/apiserver.go中

func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
    lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
    newSourceApiserverFromLW(lw, updates)
}

func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
    send := func(objs []interface{}) {
        var pods []*v1.Pod
        for _, o := range objs {
            pods = append(pods, o.(*v1.Pod))
        }
        updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
    }
    r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
    go r.Run(wait.NeverStop)
}

k8s.io/client-go/tools/cache/reflector.go中

func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}

// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    realClock := &clock.RealClock{}
    r := &Reflector{
        name:          name,
        listerWatcher: lw,
        store:         store,
        // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
        // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
        // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
        backoffManager:    wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
        resyncPeriod:      resyncPeriod,
        clock:             realClock,
        watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
    }
    r.setExpectedType(expectedType)
    return r
}

func (r *Reflector) Run(stopCh <-chan struct{}) {
    klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
    wait.BackoffUntil(func() {
        if err := r.ListAndWatch(stopCh); err != nil {
            r.watchErrorHandler(r, err)
        }
    }, r.backoffManager, true, stopCh)
    klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
    var resourceVersion string

    options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

    if err := func() error {
        initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
        defer initTrace.LogIfLong(10 * time.Second)
        var list runtime.Object
        var paginatedResult bool
        var err error
        listCh := make(chan struct{}, 1)
        panicCh := make(chan interface{}, 1)
        go func() {
            defer func() {
                if r := recover(); r != nil {
                    panicCh <- r
                }
            }()
            // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
            // list request will return the full response.
            pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
                return r.listerWatcher.List(opts)
            }))
            switch {
            case r.WatchListPageSize != 0:
                pager.PageSize = r.WatchListPageSize
            case r.paginatedResult:
                // We got a paginated result initially. Assume this resource and server honor
                // paging requests (i.e. watch cache is probably disabled) and leave the default
                // pager size set.
            case options.ResourceVersion != "" && options.ResourceVersion != "0":
                // User didn't explicitly request pagination.
                //
                // With ResourceVersion != "", we have a possibility to list from watch cache,
                // but we do that (for ResourceVersion != "0") only if Limit is unset.
                // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
                // switch off pagination to force listing from watch cache (if enabled).
                // With the existing semantic of RV (result is at least as fresh as provided RV),
                // this is correct and doesn't lead to going back in time.
                //
                // We also don't turn off pagination for ResourceVersion="0", since watch cache
                // is ignoring Limit in that case anyway, and if watch cache is not enabled
                // we don't introduce regression.
                pager.PageSize = 0
            }

            list, paginatedResult, err = pager.List(context.Background(), options)
            if isExpiredError(err) || isTooLargeResourceVersionError(err) {
                r.setIsLastSyncResourceVersionUnavailable(true)
                // Retry immediately if the resource version used to list is unavailable.
                // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
                // continuation pages, but the pager might not be enabled, the full list might fail because the
                // resource version it is listing at is expired or the cache may not yet be synced to the provided
                // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
                // the reflector makes forward progress.
                list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
            }
            close(listCh)
        }()
        select {
        case <-stopCh:
            return nil
        case r := <-panicCh:
            panic(r)
        case <-listCh:
        }
        if err != nil {
            return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
        }

        // We check if the list was paginated and if so set the paginatedResult based on that.
        // However, we want to do that only for the initial list (which is the only case
        // when we set ResourceVersion="0"). The reasoning behind it is that later, in some
        // situations we may force listing directly from etcd (by setting ResourceVersion="")
        // which will return paginated result, even if watch cache is enabled. However, in
        // that case, we still want to prefer sending requests to watch cache if possible.
        //
        // Paginated result returned for request with ResourceVersion="0" mean that watch
        // cache is disabled and there are a lot of objects of a given type. In such case,
        // there is no need to prefer listing from watch cache.
        if options.ResourceVersion == "0" && paginatedResult {
            r.paginatedResult = true
        }

        r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
        initTrace.Step("Objects listed")
        listMetaInterface, err := meta.ListAccessor(list)
        if err != nil {
            return fmt.Errorf("unable to understand list result %#v: %v", list, err)
        }
        resourceVersion = listMetaInterface.GetResourceVersion()
        initTrace.Step("Resource version extracted")
        items, err := meta.ExtractList(list)
        if err != nil {
            return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
        }
        initTrace.Step("Objects extracted")
        if err := r.syncWith(items, resourceVersion); err != nil {
            return fmt.Errorf("unable to sync list result: %v", err)
        }
        initTrace.Step("SyncWith done")
        r.setLastSyncResourceVersion(resourceVersion)
        initTrace.Step("Resource version updated")
        return nil
    }(); err != nil {
        return err
    }

    resyncerrc := make(chan error, 1)
    cancelCh := make(chan struct{})
    defer close(cancelCh)
    go func() {
        resyncCh, cleanup := r.resyncChan()
        defer func() {
            cleanup() // Call the last one written into cleanup
        }()
        for {
            select {
            case <-resyncCh:
            case <-stopCh:
                return
            case <-cancelCh:
                return
            }
            if r.ShouldResync == nil || r.ShouldResync() {
                klog.V(4).Infof("%s: forcing resync", r.name)
                if err := r.store.Resync(); err != nil {
                    resyncerrc <- err
                    return
                }
            }
            cleanup()
            resyncCh, cleanup = r.resyncChan()
        }
    }()

    for {
        // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
        select {
        case <-stopCh:
            return nil
        default:
        }

        timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        options = metav1.ListOptions{
            ResourceVersion: resourceVersion,
            // We want to avoid situations of hanging watchers. Stop any wachers that do not
            // receive any events within the timeout window.
            TimeoutSeconds: &timeoutSeconds,
            // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
            // Reflector doesn't assume bookmarks are returned at all (if the server do not support
            // watch bookmarks, it will ignore this field).
            AllowWatchBookmarks: true,
        }

        // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
        start := r.clock.Now()
        w, err := r.listerWatcher.Watch(options)
        if err != nil {
            // If this is "connection refused" error, it means that most likely apiserver is not responsive.
            // It doesn't make sense to re-list all objects because most likely we will be able to restart
            // watch where we ended.
            // If that's the case wait and resend watch request.
            if utilnet.IsConnectionRefused(err) {
                time.Sleep(time.Second)
                continue
            }
            return err
        }

        if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if err != errorStopRequested {
                switch {
                case isExpiredError(err):
                    // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
                    // has a semantic that it returns data at least as fresh as provided RV.
                    // So first try to LIST with setting RV to resource version of last observed object.
                    klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
                default:
                    klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
                }
            }
            return nil
        }
    }
}

pkg/kubelet/prober/prober_manager.go中

func NewManager(
    statusManager status.Manager,
    livenessManager results.Manager,
    startupManager results.Manager,
    runner kubecontainer.CommandRunner,
    recorder record.EventRecorder) Manager {

    prober := newProber(runner, recorder)
    readinessManager := results.NewManager()
    return &manager{
        statusManager:    statusManager,
        prober:           prober,
        readinessManager: readinessManager,
        livenessManager:  livenessManager,
        startupManager:   startupManager,
        workers:          make(map[probeKey]*worker),
    }
}

func (m *manager) Start() {
    // Start syncing readiness.
    go wait.Forever(m.updateReadiness, 0)
    // Start syncing startup.
    go wait.Forever(m.updateStartup, 0)
}

func (m *manager) updateReadiness() {
    update := <-m.readinessManager.Updates()

    ready := update.Result == results.Success
    m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
}

func (m *manager) updateStartup() {
    update := <-m.startupManager.Updates()

    started := update.Result == results.Success
    m.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
}

func (m *manager) AddPod(pod *v1.Pod) {
    m.workerLock.Lock()
    defer m.workerLock.Unlock()

    key := probeKey{podUID: pod.UID}
    for _, c := range pod.Spec.Containers {
        key.containerName = c.Name

        if c.StartupProbe != nil && utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) {
            key.probeType = startup
            if _, ok := m.workers[key]; ok {
                klog.Errorf("Startup probe already exists! %v - %v",
                    format.Pod(pod), c.Name)
                return
            }
            w := newWorker(m, startup, pod, c)
            m.workers[key] = w
            go w.run()
        }

        if c.ReadinessProbe != nil {
            key.probeType = readiness
            if _, ok := m.workers[key]; ok {
                klog.Errorf("Readiness probe already exists! %v - %v",
                    format.Pod(pod), c.Name)
                return
            }
            w := newWorker(m, readiness, pod, c)
            m.workers[key] = w
            go w.run()
        }

        if c.LivenessProbe != nil {
            key.probeType = liveness
            if _, ok := m.workers[key]; ok {
                klog.Errorf("Liveness probe already exists! %v - %v",
                    format.Pod(pod), c.Name)
                return
            }
            w := newWorker(m, liveness, pod, c)
            m.workers[key] = w
            go w.run()
        }
    }
}

pkg/kubelet/status/status_manager.go中

func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager, podDeletionSafety PodDeletionSafetyProvider) Manager {
    return &manager{
        kubeClient:        kubeClient,
        podManager:        podManager,
        podStatuses:       make(map[types.UID]versionedPodStatus),
        podStatusChannel:  make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
        apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64),
        podDeletionSafety: podDeletionSafety,
    }
}
func (m *manager) Start() {
    // Don't start the status manager if we don't have a client. This will happen
    // on the master, where the kubelet is responsible for bootstrapping the pods
    // of the master components.
    if m.kubeClient == nil {
        klog.Infof("Kubernetes client is nil, not starting status manager.")
        return
    }

    klog.Info("Starting to sync pod status with apiserver")
    //lint:ignore SA1015 Ticker can link since this is only called once and doesn't handle termination.
    syncTicker := time.Tick(syncPeriod)
    // syncPod and syncBatch share the same go routine to avoid sync races.
    go wait.Forever(func() {
        for {
            select {
            case syncRequest := <-m.podStatusChannel:
                klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
                    syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
                m.syncPod(syncRequest.podUID, syncRequest.status)
            case <-syncTicker:
                klog.V(5).Infof("Status Manager: syncing batch")
                // remove any entries in the status channel since the batch will handle them
                for i := len(m.podStatusChannel); i > 0; i-- {
                    <-m.podStatusChannel
                }
                m.syncBatch()
            }
        }
    }, 0)
}
func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool) {
    m.podStatusesLock.Lock()
    defer m.podStatusesLock.Unlock()

    pod, ok := m.podManager.GetPodByUID(podUID)
    if !ok {
        klog.V(4).Infof("Pod %q has been deleted, no need to update startup", string(podUID))
        return
    }

    oldStatus, found := m.podStatuses[pod.UID]
    if !found {
        klog.Warningf("Container startup changed before pod has synced: %q - %q",
            format.Pod(pod), containerID.String())
        return
    }

    // Find the container to update.
    containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
    if !ok {
        klog.Warningf("Container startup changed for unknown container: %q - %q",
            format.Pod(pod), containerID.String())
        return
    }

    if containerStatus.Started != nil && *containerStatus.Started == started {
        klog.V(4).Infof("Container startup unchanged (%v): %q - %q", started,
            format.Pod(pod), containerID.String())
        return
    }

    // Make sure we're not updating the cached version.
    status := *oldStatus.status.DeepCopy()
    containerStatus, _, _ = findContainerStatus(&status, containerID.String())
    containerStatus.Started = &started

    m.updateStatusInternal(pod, status, false)
}

func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUpdate bool) bool {
    var oldStatus v1.PodStatus
    cachedStatus, isCached := m.podStatuses[pod.UID]
    if isCached {
        oldStatus = cachedStatus.status
    } else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok {
        oldStatus = mirrorPod.Status
    } else {
        oldStatus = pod.Status
    }

    // Check for illegal state transition in containers
    if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil {
        klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
        return false
    }
    if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil {
        klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
        return false
    }

    // Set ContainersReadyCondition.LastTransitionTime.
    updateLastTransitionTime(&status, &oldStatus, v1.ContainersReady)

    // Set ReadyCondition.LastTransitionTime.
    updateLastTransitionTime(&status, &oldStatus, v1.PodReady)

    // Set InitializedCondition.LastTransitionTime.
    updateLastTransitionTime(&status, &oldStatus, v1.PodInitialized)

    // Set PodScheduledCondition.LastTransitionTime.
    updateLastTransitionTime(&status, &oldStatus, v1.PodScheduled)

    // ensure that the start time does not change across updates.
    if oldStatus.StartTime != nil && !oldStatus.StartTime.IsZero() {
        status.StartTime = oldStatus.StartTime
    } else if status.StartTime.IsZero() {
        // if the status has no start time, we need to set an initial time
        now := metav1.Now()
        status.StartTime = &now
    }

    normalizeStatus(pod, &status)
    // The intent here is to prevent concurrent updates to a pod's status from
    // clobbering each other so the phase of a pod progresses monotonically.
    if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate {
        klog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
        return false // No new status.
    }

    newStatus := versionedPodStatus{
        status:       status,
        version:      cachedStatus.version + 1,
        podName:      pod.Name,
        podNamespace: pod.Namespace,
    }
    m.podStatuses[pod.UID] = newStatus

    select {
    case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}:
        klog.V(5).Infof("Status Manager: adding pod: %q, with status: (%d, %v) to podStatusChannel",
            pod.UID, newStatus.version, newStatus.status)
        return true
    default:
        // Let the periodic syncBatch handle the update if the channel is full.
        // We can't block, since we hold the mutex lock.
        klog.V(4).Infof("Skipping the status update for pod %q for now because the channel is full; status: %+v",
            format.Pod(pod), status)
        return false
    }
}

func (m *manager) syncBatch() {
    var updatedStatuses []podStatusSyncRequest
    podToMirror, mirrorToPod := m.podManager.GetUIDTranslations()
    func() { // Critical section
        m.podStatusesLock.RLock()
        defer m.podStatusesLock.RUnlock()

        // Clean up orphaned versions.
        for uid := range m.apiStatusVersions {
            _, hasPod := m.podStatuses[types.UID(uid)]
            _, hasMirror := mirrorToPod[uid]
            if !hasPod && !hasMirror {
                delete(m.apiStatusVersions, uid)
            }
        }

        for uid, status := range m.podStatuses {
            syncedUID := kubetypes.MirrorPodUID(uid)
            if mirrorUID, ok := podToMirror[kubetypes.ResolvedPodUID(uid)]; ok {
                if mirrorUID == "" {
                    klog.V(5).Infof("Static pod %q (%s/%s) does not have a corresponding mirror pod; skipping", uid, status.podName, status.podNamespace)
                    continue
                }
                syncedUID = mirrorUID
            }
            if m.needsUpdate(types.UID(syncedUID), status) {
                updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
            } else if m.needsReconcile(uid, status.status) {
                // Delete the apiStatusVersions here to force an update on the pod status
                // In most cases the deleted apiStatusVersions here should be filled
                // soon after the following syncPod() [If the syncPod() sync an update
                // successfully].
                delete(m.apiStatusVersions, syncedUID)
                updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
            }
        }
    }()

    for _, update := range updatedStatuses {
        klog.V(5).Infof("Status Manager: syncPod in syncbatch. pod UID: %q", update.podUID)
        m.syncPod(update.podUID, update.status)
    }
}

func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
    if !m.needsUpdate(uid, status) {
        klog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid)
        return
    }

    // TODO: make me easier to express from client code
    pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{})
    if errors.IsNotFound(err) {
        klog.V(3).Infof("Pod %q does not exist on the server", format.PodDesc(status.podName, status.podNamespace, uid))
        // If the Pod is deleted the status will be cleared in
        // RemoveOrphanedStatuses, so we just ignore the update here.
        return
    }
    if err != nil {
        klog.Warningf("Failed to get status for pod %q: %v", format.PodDesc(status.podName, status.podNamespace, uid), err)
        return
    }

    translatedUID := m.podManager.TranslatePodUID(pod.UID)
    // Type convert original uid just for the purpose of comparison.
    if len(translatedUID) > 0 && translatedUID != kubetypes.ResolvedPodUID(uid) {
        klog.V(2).Infof("Pod %q was deleted and then recreated, skipping status update; old UID %q, new UID %q", format.Pod(pod), uid, translatedUID)
        m.deletePodStatus(uid)
        return
    }

    oldStatus := pod.Status.DeepCopy()
    newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status))
    klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes)
    if err != nil {
        klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
        return
    }
    if unchanged {
        klog.V(3).Infof("Status for pod %q is up-to-date: (%d)", format.Pod(pod), status.version)
    } else {
        klog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)
        pod = newPod
    }

    m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version

    // We don't handle graceful deletion of mirror pods.
    if m.canBeDeleted(pod, status.status) {
        deleteOptions := metav1.DeleteOptions{
            GracePeriodSeconds: new(int64),
            // Use the pod UID as the precondition for deletion to prevent deleting a
            // newly created pod with the same name and namespace.
            Preconditions: metav1.NewUIDPreconditions(string(pod.UID)),
        }
        err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions)
        if err != nil {
            klog.Warningf("Failed to delete status for pod %q: %v", format.Pod(pod), err)
            return
        }
        klog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod))
        m.deletePodStatus(uid)
    }
}

pkg/kubelet/prober/worker.go中

func newWorker(
    m *manager,
    probeType probeType,
    pod *v1.Pod,
    container v1.Container) *worker {

    w := &worker{
        stopCh:       make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
        pod:          pod,
        container:    container,
        probeType:    probeType,
        probeManager: m,
    }

    switch probeType {
    case readiness:
        w.spec = container.ReadinessProbe
        w.resultsManager = m.readinessManager
        w.initialValue = results.Failure
    case liveness:
        w.spec = container.LivenessProbe
        w.resultsManager = m.livenessManager
        w.initialValue = results.Success
    case startup:
        w.spec = container.StartupProbe
        w.resultsManager = m.startupManager
        w.initialValue = results.Unknown
    }

    basicMetricLabels := metrics.Labels{
        "probe_type": w.probeType.String(),
        "container":  w.container.Name,
        "pod":        w.pod.Name,
        "namespace":  w.pod.Namespace,
        "pod_uid":    string(w.pod.UID),
    }

    w.proberResultsSuccessfulMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
    w.proberResultsSuccessfulMetricLabels["result"] = probeResultSuccessful

    w.proberResultsFailedMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
    w.proberResultsFailedMetricLabels["result"] = probeResultFailed

    w.proberResultsUnknownMetricLabels = deepCopyPrometheusLabels(basicMetricLabels)
    w.proberResultsUnknownMetricLabels["result"] = probeResultUnknown

    return w
}

func (w *worker) run() {
    probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second

    // If kubelet restarted the probes could be started in rapid succession.
    // Let the worker wait for a random portion of tickerPeriod before probing.
    time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))

    probeTicker := time.NewTicker(probeTickerPeriod)

    defer func() {
        // Clean up.
        probeTicker.Stop()
        if !w.containerID.IsEmpty() {
            w.resultsManager.Remove(w.containerID)
        }

        w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
        ProberResults.Delete(w.proberResultsSuccessfulMetricLabels)
        ProberResults.Delete(w.proberResultsFailedMetricLabels)
        ProberResults.Delete(w.proberResultsUnknownMetricLabels)
    }()

probeLoop:
    for w.doProbe() {
        // Wait for next probe tick.
        select {
        case <-w.stopCh:
            break probeLoop
        case <-probeTicker.C:
            // continue
        }
    }
}

func (w *worker) doProbe() (keepGoing bool) {
    defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging)
    defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })

    status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
    if !ok {
        // Either the pod has not been created yet, or it was already deleted.
        klog.V(3).Infof("No status for pod: %v", format.Pod(w.pod))
        return true
    }

    // Worker should terminate if pod is terminated.
    if status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded {
        klog.V(3).Infof("Pod %v %v, exiting probe worker",
            format.Pod(w.pod), status.Phase)
        return false
    }

    c, ok := podutil.GetContainerStatus(status.ContainerStatuses, w.container.Name)
    if !ok || len(c.ContainerID) == 0 {
        // Either the container has not been created yet, or it was deleted.
        klog.V(3).Infof("Probe target container not found: %v - %v",
            format.Pod(w.pod), w.container.Name)
        return true // Wait for more information.
    }

    if w.containerID.String() != c.ContainerID {
        if !w.containerID.IsEmpty() {
            w.resultsManager.Remove(w.containerID)
        }
        w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
        w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
        // We've got a new container; resume probing.
        w.onHold = false
    }

    if w.onHold {
        // Worker is on hold until there is a new container.
        return true
    }

    if c.State.Running == nil {
        klog.V(3).Infof("Non-running container probed: %v - %v",
            format.Pod(w.pod), w.container.Name)
        if !w.containerID.IsEmpty() {
            w.resultsManager.Set(w.containerID, results.Failure, w.pod)
        }
        // Abort if the container will not be restarted.
        return c.State.Terminated == nil ||
            w.pod.Spec.RestartPolicy != v1.RestartPolicyNever
    }

    // Probe disabled for InitialDelaySeconds.
    if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
        return true
    }

    if c.Started != nil && *c.Started {
        // Stop probing for startup once container has started.
        if w.probeType == startup {
            return true
        }
    } else {
        // Disable other probes until container has started.
        if w.probeType != startup {
            return true
        }
    }

    // TODO: in order for exec probes to correctly handle downward API env, we must be able to reconstruct
    // the full container environment here, OR we must make a call to the CRI in order to get those environment
    // values from the running container.
    result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
    if err != nil {
        // Prober error, throw away the result.
        return true
    }

    switch result {
    case results.Success:
        ProberResults.With(w.proberResultsSuccessfulMetricLabels).Inc()
    case results.Failure:
        ProberResults.With(w.proberResultsFailedMetricLabels).Inc()
    default:
        ProberResults.With(w.proberResultsUnknownMetricLabels).Inc()
    }

    if w.lastResult == result {
        w.resultRun++
    } else {
        w.lastResult = result
        w.resultRun = 1
    }

    if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
        (result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
        // Success or failure is below threshold - leave the probe state unchanged.
        return true
    }

    w.resultsManager.Set(w.containerID, result, w.pod)

    if (w.probeType == liveness || w.probeType == startup) && result == results.Failure {
        // The container fails a liveness/startup check, it will need to be restarted.
        // Stop probing until we see a new container ID. This is to reduce the
        // chance of hitting #21751, where running `docker exec` when a
        // container is being stopped may lead to corrupted container state.
        w.onHold = true
        w.resultRun = 0
    }

    return true
}

pkg/kubelet/kuberuntime/kuberuntime_manager.go中

func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
    // Step 1: Compute sandbox and container changes.
    podContainerChanges := m.computePodActions(pod, podStatus)
    klog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod))
    if podContainerChanges.CreateSandbox {
        ref, err := ref.GetReference(legacyscheme.Scheme, pod)
        if err != nil {
            klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
        }
        if podContainerChanges.SandboxID != "" {
            m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
        } else {
            klog.V(4).Infof("SyncPod received new pod %q, will create a sandbox for it", format.Pod(pod))
        }
    }

    // Step 2: Kill the pod if the sandbox has changed.
    if podContainerChanges.KillPod {
        if podContainerChanges.CreateSandbox {
            klog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
        } else {
            klog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))
        }

        killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
        result.AddPodSyncResult(killResult)
        if killResult.Error() != nil {
            klog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())
            return
        }

        if podContainerChanges.CreateSandbox {
            m.purgeInitContainers(pod, podStatus)
        }
    } else {
        // Step 3: kill any running containers in this pod which are not to keep.
        for containerID, containerInfo := range podContainerChanges.ContainersToKill {
            klog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))
            killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
            result.AddSyncResult(killContainerResult)
            if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
                killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
                klog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)
                return
            }
        }
    }

    // Keep terminated init containers fairly aggressively controlled
    // This is an optimization because container removals are typically handled
    // by container garbage collector.
    m.pruneInitContainersBeforeStart(pod, podStatus)

    // We pass the value of the PRIMARY podIP and list of podIPs down to
    // generatePodSandboxConfig and generateContainerConfig, which in turn
    // passes it to various other functions, in order to facilitate functionality
    // that requires this value (hosts file and downward API) and avoid races determining
    // the pod IP in cases where a container requires restart but the
    // podIP isn't in the status manager yet. The list of podIPs is used to
    // generate the hosts file.
    //
    // We default to the IPs in the passed-in pod status, and overwrite them if the
    // sandbox needs to be (re)started.
    var podIPs []string
    if podStatus != nil {
        podIPs = podStatus.IPs
    }

    // Step 4: Create a sandbox for the pod if necessary.
    podSandboxID := podContainerChanges.SandboxID
    if podContainerChanges.CreateSandbox {
        var msg string
        var err error

        klog.V(4).Infof("Creating PodSandbox for pod %q", format.Pod(pod))
        createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
        result.AddSyncResult(createSandboxResult)
        podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
        if err != nil {
            createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
            klog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err)
            ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
            if referr != nil {
                klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)
            }
            m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedCreatePodSandBox, "Failed to create pod sandbox: %v", err)
            return
        }
        klog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))

        podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
        if err != nil {
            ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
            if referr != nil {
                klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), referr)
            }
            m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedStatusPodSandBox, "Unable to get pod sandbox status: %v", err)
            klog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod))
            result.Fail(err)
            return
        }

        // If we ever allow updating a pod from non-host-network to
        // host-network, we may use a stale IP.
        if !kubecontainer.IsHostNetworkPod(pod) {
            // Overwrite the podIPs passed in the pod status, since we just started the pod sandbox.
            podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, podSandboxStatus)
            klog.V(4).Infof("Determined the ip %v for pod %q after sandbox changed", podIPs, format.Pod(pod))
        }
    }

    // the start containers routines depend on pod ip(as in primary pod ip)
    // instead of trying to figure out if we have 0 < len(podIPs)
    // everytime, we short circuit it here
    podIP := ""
    if len(podIPs) != 0 {
        podIP = podIPs[0]
    }

    // Get podSandboxConfig for containers to start.
    configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
    result.AddSyncResult(configPodSandboxResult)
    podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
    if err != nil {
        message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
        klog.Error(message)
        configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message)
        return
    }

    // Helper containing boilerplate common to starting all types of containers.
    // typeName is a label used to describe this type of container in log messages,
    // currently: "container", "init container" or "ephemeral container"
    start := func(typeName string, spec *startSpec) error {
        startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
        result.AddSyncResult(startContainerResult)

        isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)
        if isInBackOff {
            startContainerResult.Fail(err, msg)
            klog.V(4).Infof("Backing Off restarting %v %+v in pod %v", typeName, spec.container, format.Pod(pod))
            return err
        }

        klog.V(4).Infof("Creating %v %+v in pod %v", typeName, spec.container, format.Pod(pod))
        // NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.
        if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
            startContainerResult.Fail(err, msg)
            // known errors that are logged in other places are logged at higher levels here to avoid
            // repetitive log spam
            switch {
            case err == images.ErrImagePullBackOff:
                klog.V(3).Infof("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg)
            default:
                utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))
            }
            return err
        }

        return nil
    }

    // Step 5: start ephemeral containers
    // These are started "prior" to init containers to allow running ephemeral containers even when there
    // are errors starting an init container. In practice init containers will start first since ephemeral
    // containers cannot be specified on pod creation.
    if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
        for _, idx := range podContainerChanges.EphemeralContainersToStart {
            start("ephemeral container", ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
        }
    }

    // Step 6: start the init container.
    if container := podContainerChanges.NextInitContainerToStart; container != nil {
        // Start the next init container.
        if err := start("init container", containerStartSpec(container)); err != nil {
            return
        }

        // Successfully started the container; clear the entry in the failure
        klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))
    }

    // Step 7: start containers in podContainerChanges.ContainersToStart.
    for _, idx := range podContainerChanges.ContainersToStart {
        start("container", containerStartSpec(&pod.Spec.Containers[idx]))
    }

    return
}

相关文章

网友评论

      本文标题:k8s 之 kubelet 源码简单分析

      本文链接:https://www.haomeiwen.com/subject/tfhmhktx.html