美文网首页
k8s 之 kube-proxy 源码简单分析

k8s 之 kube-proxy 源码简单分析

作者: wwq2020 | 来源:发表于2020-07-17 17:00 被阅读0次

简介

kube-proxy 监听 apiserver 获取 endpoint 变化,然后写入到 iptables

cmd/kube-proxy/proxy.go 中

func main() {
    ...
    command := app.NewProxyCommand()
    ...
    if err := command.Execute(); err != nil {
        os.Exit(1)
    }
}

cmd/kube-proxy/app/server.go 中

func NewProxyCommand() *cobra.Command {
    ...
    if err := opts.Run(); err != nil {
                klog.Exit(err)
    }
    ...
}

func (o *Options) Run() error {
    defer close(o.errCh)
    if len(o.WriteConfigTo) > 0 {
        return o.writeConfigFile()
    }

    proxyServer, err := NewProxyServer(o)
    if err != nil {
        return err
    }

    if o.CleanupAndExit {
        return proxyServer.CleanupAndExit()
    }

    o.proxyServer = proxyServer
    return o.runLoop()
}

func (o *Options) runLoop() error {
    if o.watcher != nil {
        o.watcher.Run()
    }

    // run the proxy in goroutine
    go func() {
        err := o.proxyServer.Run()
        o.errCh <- err
    }()

    for {
        err := <-o.errCh
        if err != nil {
            return err
        }
    }
}

func (s *ProxyServer) Run() error {
    ...
    endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
    endpointsConfig.RegisterEventHandler(s.Proxier)
    go endpointsConfig.Run(wait.NeverStop)
    ...
    go s.Proxier.SyncLoop()

    return <-errCh
}

cmd/kube-proxy/app/server_others.go 中

func NewProxyServer(o *Options) (*ProxyServer, error) {
    return newProxyServer(o.config, o.CleanupAndExit, o.master)
}

func newProxyServer(
    config *proxyconfigapi.KubeProxyConfiguration,
    cleanupAndExit bool,
    master string) (*ProxyServer, error) {
    if proxyMode == proxyModeIPTables {
        ...
        proxier, err = iptables.NewProxier(
                iptInterface,
                utilsysctl.New(),
                execer,
                config.IPTables.SyncPeriod.Duration,
                config.IPTables.MinSyncPeriod.Duration,
                config.IPTables.MasqueradeAll,
                int(*config.IPTables.MasqueradeBit),
                localDetector,
                hostname,
                nodeIP,
                recorder,
                healthzServer,
                config.NodePortAddresses,
            )
        ...
    }
    ...
    return &ProxyServer{
        Client:                 client,
        EventClient:            eventClient,
        IptInterface:           iptInterface,
        IpvsInterface:          ipvsInterface,
        IpsetInterface:         ipsetInterface,
        execer:                 execer,
        Proxier:                proxier,
        Broadcaster:            eventBroadcaster,
        Recorder:               recorder,
        ConntrackConfiguration: config.Conntrack,
        Conntracker:            &realConntracker{},
        ProxyMode:              proxyMode,
        NodeRef:                nodeRef,
        MetricsBindAddress:     config.MetricsBindAddress,
        BindAddressHardFail:    config.BindAddressHardFail,
        EnableProfiling:        config.EnableProfiling,
        OOMScoreAdj:            config.OOMScoreAdj,
        ConfigSyncPeriod:       config.ConfigSyncPeriod.Duration,
        HealthzServer:          healthzServer,
        UseEndpointSlices:      utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying),
    }, nil
}

pkg/proxy/iptables/proxier.go 中

func NewProxier(ipt utiliptables.Interface,
    sysctl utilsysctl.Interface,
    exec utilexec.Interface,
    syncPeriod time.Duration,
    minSyncPeriod time.Duration,
    masqueradeAll bool,
    masqueradeBit int,
    localDetector proxyutiliptables.LocalTrafficDetector,
    hostname string,
    nodeIP net.IP,
    recorder record.EventRecorder,
    healthzServer healthcheck.ProxierHealthUpdater,
    nodePortAddresses []string,
) (*Proxier, error) {
    // Set the route_localnet sysctl we need for
    if err := utilproxy.EnsureSysctl(sysctl, sysctlRouteLocalnet, 1); err != nil {
        return nil, err
    }

    // Proxy needs br_netfilter and bridge-nf-call-iptables=1 when containers
    // are connected to a Linux bridge (but not SDN bridges).  Until most
    // plugins handle this, log when config is missing
    if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
        klog.Warning("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
    }

    // Generate the masquerade mark to use for SNAT rules.
    masqueradeValue := 1 << uint(masqueradeBit)
    masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
    klog.V(2).Infof("iptables(%s) masquerade mark: %s", ipt.Protocol(), masqueradeMark)

    endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying)

    serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)

    isIPv6 := ipt.IsIPv6()
    var incorrectAddresses []string
    nodePortAddresses, incorrectAddresses = utilproxy.FilterIncorrectCIDRVersion(nodePortAddresses, isIPv6)
    if len(incorrectAddresses) > 0 {
        klog.Warning("NodePortAddresses of wrong family; ", incorrectAddresses)
    }
    proxier := &Proxier{
        portsMap:                 make(map[utilproxy.LocalPort]utilproxy.Closeable),
        serviceMap:               make(proxy.ServiceMap),
        serviceChanges:           proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
        endpointsMap:             make(proxy.EndpointsMap),
        endpointsChanges:         proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, &isIPv6, recorder, endpointSlicesEnabled),
        syncPeriod:               syncPeriod,
        iptables:                 ipt,
        masqueradeAll:            masqueradeAll,
        masqueradeMark:           masqueradeMark,
        exec:                     exec,
        localDetector:            localDetector,
        hostname:                 hostname,
        nodeIP:                   nodeIP,
        portMapper:               &listenPortOpener{},
        recorder:                 recorder,
        serviceHealthServer:      serviceHealthServer,
        healthzServer:            healthzServer,
        precomputedProbabilities: make([]string, 0, 1001),
        iptablesData:             bytes.NewBuffer(nil),
        existingFilterChainsData: bytes.NewBuffer(nil),
        filterChains:             bytes.NewBuffer(nil),
        filterRules:              bytes.NewBuffer(nil),
        natChains:                bytes.NewBuffer(nil),
        natRules:                 bytes.NewBuffer(nil),
        nodePortAddresses:        nodePortAddresses,
        networkInterfacer:        utilproxy.RealNetwork{},
    }

    burstSyncs := 2
    klog.V(2).Infof("iptables(%s) sync params: minSyncPeriod=%v, syncPeriod=%v, burstSyncs=%d",
        ipt.Protocol(), minSyncPeriod, syncPeriod, burstSyncs)
    // We pass syncPeriod to ipt.Monitor, which will call us only if it needs to.
    // We need to pass *some* maxInterval to NewBoundedFrequencyRunner anyway though.
    // time.Hour is arbitrary.
    proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, time.Hour, burstSyncs)

    go ipt.Monitor(utiliptables.Chain("KUBE-PROXY-CANARY"),
        []utiliptables.Table{utiliptables.TableMangle, utiliptables.TableNAT, utiliptables.TableFilter},
        proxier.syncProxyRules, syncPeriod, wait.NeverStop)

    if ipt.HasRandomFully() {
        klog.V(2).Infof("iptables(%s) supports --random-fully", ipt.Protocol())
    } else {
        klog.V(2).Infof("iptables(%s) does not support --random-fully", ipt.Protocol())
    }

    return proxier, nil
}

func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
    if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() {
        proxier.Sync()
    }
}

func (proxier *Proxier) Sync() {
    if proxier.healthzServer != nil {
        proxier.healthzServer.QueuedUpdate()
    }
    metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
    proxier.syncRunner.Run()
}

func (proxier *Proxier) SyncLoop() {
    // Update healthz timestamp at beginning in case Sync() never succeeds.
    if proxier.healthzServer != nil {
        proxier.healthzServer.Updated()
    }

    // synthesize "last change queued" time as the informers are syncing.
    metrics.SyncProxyRulesLastQueuedTimestamp.SetToCurrentTime()
    proxier.syncRunner.Loop(wait.NeverStop)
}

// The only other iptables rules are those that are setup in iptablesInit()
// This assumes proxier.mu is NOT held
func (proxier *Proxier) syncProxyRules() {
    proxier.mu.Lock()
    defer proxier.mu.Unlock()

    // don't sync rules till we've received services and endpoints
    if !proxier.isInitialized() {
        klog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
        return
    }

    // Keep track of how long syncs take.
    start := time.Now()
    defer func() {
        metrics.SyncProxyRulesLatency.Observe(metrics.SinceInSeconds(start))
        klog.V(2).Infof("syncProxyRules took %v", time.Since(start))
    }()

    localAddrs, err := utilproxy.GetLocalAddrs()
    if err != nil {
        klog.Errorf("Failed to get local addresses during proxy sync: %v, assuming external IPs are not local", err)
    } else if len(localAddrs) == 0 {
        klog.Warning("No local addresses found, assuming all external IPs are not local")
    }

    localAddrSet := utilnet.IPSet{}
    localAddrSet.Insert(localAddrs...)

    nodeAddresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
    if err != nil {
        klog.Errorf("Failed to get node ip address matching nodeport cidrs %v, services with nodeport may not work as intended: %v", proxier.nodePortAddresses, err)
    }

    // We assume that if this was called, we really want to sync them,
    // even if nothing changed in the meantime. In other words, callers are
    // responsible for detecting no-op changes and not calling this function.
    serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
    endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)

    staleServices := serviceUpdateResult.UDPStaleClusterIP
    // merge stale services gathered from updateEndpointsMap
    for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
        if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
            klog.V(2).Infof("Stale %s service %v -> %s", strings.ToLower(string(svcInfo.Protocol())), svcPortName, svcInfo.ClusterIP().String())
            staleServices.Insert(svcInfo.ClusterIP().String())
            for _, extIP := range svcInfo.ExternalIPStrings() {
                staleServices.Insert(extIP)
            }
        }
    }

    klog.V(2).Info("Syncing iptables rules")

    success := false
    defer func() {
        if !success {
            klog.Infof("Sync failed; retrying in %s", proxier.syncPeriod)
            proxier.syncRunner.RetryAfter(proxier.syncPeriod)
        }
    }()

    // Create and link the kube chains.
    for _, jump := range iptablesJumpChains {
        if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil {
            klog.Errorf("Failed to ensure that %s chain %s exists: %v", jump.table, jump.dstChain, err)
            return
        }
        args := append(jump.extraArgs,
            "-m", "comment", "--comment", jump.comment,
            "-j", string(jump.dstChain),
        )
        if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jump.table, jump.srcChain, args...); err != nil {
            klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", jump.table, jump.srcChain, jump.dstChain, err)
            return
        }
    }

    //
    // Below this point we will not return until we try to write the iptables rules.
    //

    // Get iptables-save output so we can check for existing chains and rules.
    // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
    existingFilterChains := make(map[utiliptables.Chain][]byte)
    proxier.existingFilterChainsData.Reset()
    err = proxier.iptables.SaveInto(utiliptables.TableFilter, proxier.existingFilterChainsData)
    if err != nil { // if we failed to get any rules
        klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
    } else { // otherwise parse the output
        existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, proxier.existingFilterChainsData.Bytes())
    }

    // IMPORTANT: existingNATChains may share memory with proxier.iptablesData.
    existingNATChains := make(map[utiliptables.Chain][]byte)
    proxier.iptablesData.Reset()
    err = proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
    if err != nil { // if we failed to get any rules
        klog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
    } else { // otherwise parse the output
        existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes())
    }

    // Reset all buffers used later.
    // This is to avoid memory reallocations and thus improve performance.
    proxier.filterChains.Reset()
    proxier.filterRules.Reset()
    proxier.natChains.Reset()
    proxier.natRules.Reset()

    // Write table headers.
    writeLine(proxier.filterChains, "*filter")
    writeLine(proxier.natChains, "*nat")

    // Make sure we keep stats for the top-level chains, if they existed
    // (which most should have because we created them above).
    for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} {
        if chain, ok := existingFilterChains[chainName]; ok {
            writeBytesLine(proxier.filterChains, chain)
        } else {
            writeLine(proxier.filterChains, utiliptables.MakeChainLine(chainName))
        }
    }
    for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} {
        if chain, ok := existingNATChains[chainName]; ok {
            writeBytesLine(proxier.natChains, chain)
        } else {
            writeLine(proxier.natChains, utiliptables.MakeChainLine(chainName))
        }
    }

    // Install the kubernetes-specific postrouting rules. We use a whole chain for
    // this so that it is easier to flush and change, for example if the mark
    // value should ever change.
    // NB: THIS MUST MATCH the corresponding code in the kubelet
    writeLine(proxier.natRules, []string{
        "-A", string(kubePostroutingChain),
        "-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark),
        "-j", "RETURN",
    }...)
    // Clear the mark to avoid re-masquerading if the packet re-traverses the network stack.
    writeLine(proxier.natRules, []string{
        "-A", string(kubePostroutingChain),
        // XOR proxier.masqueradeMark to unset it
        "-j", "MARK", "--xor-mark", proxier.masqueradeMark,
    }...)
    masqRule := []string{
        "-A", string(kubePostroutingChain),
        "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
        "-j", "MASQUERADE",
    }
    if proxier.iptables.HasRandomFully() {
        masqRule = append(masqRule, "--random-fully")
    }
    writeLine(proxier.natRules, masqRule...)

    // Install the kubernetes-specific masquerade mark rule. We use a whole chain for
    // this so that it is easier to flush and change, for example if the mark
    // value should ever change.
    writeLine(proxier.natRules, []string{
        "-A", string(KubeMarkMasqChain),
        "-j", "MARK", "--or-mark", proxier.masqueradeMark,
    }...)

    // Accumulate NAT chains to keep.
    activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set

    // Accumulate the set of local ports that we will be holding open once this update is complete
    replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{}

    // We are creating those slices ones here to avoid memory reallocations
    // in every loop. Note that reuse the memory, instead of doing:
    //   slice = <some new slice>
    // you should always do one of the below:
    //   slice = slice[:0] // and then append to it
    //   slice = append(slice[:0], ...)
    endpoints := make([]*endpointsInfo, 0)
    endpointChains := make([]utiliptables.Chain, 0)
    // To avoid growing this slice, we arbitrarily set its size to 64,
    // there is never more than that many arguments for a single line.
    // Note that even if we go over 64, it will still be correct - it
    // is just for efficiency, not correctness.
    args := make([]string, 64)

    // Compute total number of endpoint chains across all services.
    proxier.endpointChainsNumber = 0
    for svcName := range proxier.serviceMap {
        proxier.endpointChainsNumber += len(proxier.endpointsMap[svcName])
    }

    // Build rules for each service.
    for svcName, svc := range proxier.serviceMap {
        svcInfo, ok := svc.(*serviceInfo)
        if !ok {
            klog.Errorf("Failed to cast serviceInfo %q", svcName.String())
            continue
        }
        isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP())
        protocol := strings.ToLower(string(svcInfo.Protocol()))
        svcNameString := svcInfo.serviceNameString

        allEndpoints := proxier.endpointsMap[svcName]

        hasEndpoints := len(allEndpoints) > 0

        // Service Topology will not be enabled in the following cases:
        // 1. externalTrafficPolicy=Local (mutually exclusive with service topology).
        // 2. ServiceTopology is not enabled.
        // 3. EndpointSlice is not enabled (service topology depends on endpoint slice
        // to get topology information).
        if !svcInfo.OnlyNodeLocalEndpoints() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) {
            allEndpoints = proxy.FilterTopologyEndpoint(proxier.nodeLabels, svcInfo.TopologyKeys(), allEndpoints)
            hasEndpoints = len(allEndpoints) > 0
        }

        svcChain := svcInfo.servicePortChainName
        if hasEndpoints {
            // Create the per-service chain, retaining counters if possible.
            if chain, ok := existingNATChains[svcChain]; ok {
                writeBytesLine(proxier.natChains, chain)
            } else {
                writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain))
            }
            activeNATChains[svcChain] = true
        }

        svcXlbChain := svcInfo.serviceLBChainName
        if svcInfo.OnlyNodeLocalEndpoints() {
            // Only for services request OnlyLocal traffic
            // create the per-service LB chain, retaining counters if possible.
            if lbChain, ok := existingNATChains[svcXlbChain]; ok {
                writeBytesLine(proxier.natChains, lbChain)
            } else {
                writeLine(proxier.natChains, utiliptables.MakeChainLine(svcXlbChain))
            }
            activeNATChains[svcXlbChain] = true
        }

        // Capture the clusterIP.
        if hasEndpoints {
            args = append(args[:0],
                "-A", string(kubeServicesChain),
                "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
                "-m", protocol, "-p", protocol,
                "-d", utilproxy.ToCIDR(svcInfo.ClusterIP()),
                "--dport", strconv.Itoa(svcInfo.Port()),
            )
            if proxier.masqueradeAll {
                writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
            } else if proxier.localDetector.IsImplemented() {
                // This masquerades off-cluster traffic to a service VIP.  The idea
                // is that you can establish a static route for your Service range,
                // routing to any node, and that node will bridge into the Service
                // for you.  Since that might bounce off-node, we masquerade here.
                // If/when we support "Local" policy for VIPs, we should update this.
                writeLine(proxier.natRules, proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))...)
            }
            writeLine(proxier.natRules, append(args, "-j", string(svcChain))...)
        } else {
            // No endpoints.
            writeLine(proxier.filterRules,
                "-A", string(kubeServicesChain),
                "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
                "-m", protocol, "-p", protocol,
                "-d", utilproxy.ToCIDR(svcInfo.ClusterIP()),
                "--dport", strconv.Itoa(svcInfo.Port()),
                "-j", "REJECT",
            )
        }

        // Capture externalIPs.
        for _, externalIP := range svcInfo.ExternalIPStrings() {
            // If the "external" IP happens to be an IP that is local to this
            // machine, hold the local port open so no other process can open it
            // (because the socket might open but it would never work).
            if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(net.ParseIP(externalIP)) {
                lp := utilproxy.LocalPort{
                    Description: "externalIP for " + svcNameString,
                    IP:          externalIP,
                    Port:        svcInfo.Port(),
                    Protocol:    protocol,
                }
                if proxier.portsMap[lp] != nil {
                    klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
                    replacementPortsMap[lp] = proxier.portsMap[lp]
                } else {
                    socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6)
                    if err != nil {
                        msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err)

                        proxier.recorder.Eventf(
                            &v1.ObjectReference{
                                Kind:      "Node",
                                Name:      proxier.hostname,
                                UID:       types.UID(proxier.hostname),
                                Namespace: "",
                            }, v1.EventTypeWarning, err.Error(), msg)
                        klog.Error(msg)
                        continue
                    }
                    replacementPortsMap[lp] = socket
                }
            }

            if hasEndpoints {
                args = append(args[:0],
                    "-A", string(kubeServicesChain),
                    "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString),
                    "-m", protocol, "-p", protocol,
                    "-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
                    "--dport", strconv.Itoa(svcInfo.Port()),
                )

                destChain := svcXlbChain
                // We have to SNAT packets to external IPs if externalTrafficPolicy is cluster.
                if !(utilfeature.DefaultFeatureGate.Enabled(features.ExternalPolicyForExternalIP) && svcInfo.OnlyNodeLocalEndpoints()) {
                    destChain = svcChain
                    writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
                }

                // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
                // nor from a local process to be forwarded to the service.
                // This rule roughly translates to "all traffic from off-machine".
                // This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later.
                externalTrafficOnlyArgs := append(args,
                    "-m", "physdev", "!", "--physdev-is-in",
                    "-m", "addrtype", "!", "--src-type", "LOCAL")
                writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", string(destChain))...)
                dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
                // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local.
                // This covers cases like GCE load-balancers which get added to the local routing table.
                writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(destChain))...)
            } else {
                // No endpoints.
                writeLine(proxier.filterRules,
                    "-A", string(kubeExternalServicesChain),
                    "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
                    "-m", protocol, "-p", protocol,
                    "-d", utilproxy.ToCIDR(net.ParseIP(externalIP)),
                    "--dport", strconv.Itoa(svcInfo.Port()),
                    "-j", "REJECT",
                )
            }
        }

        // Capture load-balancer ingress.
        fwChain := svcInfo.serviceFirewallChainName
        for _, ingress := range svcInfo.LoadBalancerIPStrings() {
            if ingress != "" {
                if hasEndpoints {
                    // create service firewall chain
                    if chain, ok := existingNATChains[fwChain]; ok {
                        writeBytesLine(proxier.natChains, chain)
                    } else {
                        writeLine(proxier.natChains, utiliptables.MakeChainLine(fwChain))
                    }
                    activeNATChains[fwChain] = true
                    // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
                    // This currently works for loadbalancers that preserves source ips.
                    // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.

                    args = append(args[:0],
                        "-A", string(kubeServicesChain),
                        "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
                        "-m", protocol, "-p", protocol,
                        "-d", utilproxy.ToCIDR(net.ParseIP(ingress)),
                        "--dport", strconv.Itoa(svcInfo.Port()),
                    )
                    // jump to service firewall chain
                    writeLine(proxier.natRules, append(args, "-j", string(fwChain))...)

                    args = append(args[:0],
                        "-A", string(fwChain),
                        "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
                    )

                    // Each source match rule in the FW chain may jump to either the SVC or the XLB chain
                    chosenChain := svcXlbChain
                    // If we are proxying globally, we need to masquerade in case we cross nodes.
                    // If we are proxying only locally, we can retain the source IP.
                    if !svcInfo.OnlyNodeLocalEndpoints() {
                        writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
                        chosenChain = svcChain
                    }

                    if len(svcInfo.LoadBalancerSourceRanges()) == 0 {
                        // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain
                        writeLine(proxier.natRules, append(args, "-j", string(chosenChain))...)
                    } else {
                        // firewall filter based on each source range
                        allowFromNode := false
                        for _, src := range svcInfo.LoadBalancerSourceRanges() {
                            writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...)
                            // ignore error because it has been validated
                            _, cidr, _ := net.ParseCIDR(src)
                            if cidr.Contains(proxier.nodeIP) {
                                allowFromNode = true
                            }
                        }
                        // generally, ip route rule was added to intercept request to loadbalancer vip from the
                        // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
                        // Need to add the following rule to allow request on host.
                        if allowFromNode {
                            writeLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(ingress)), "-j", string(chosenChain))...)
                        }
                    }

                    // If the packet was able to reach the end of firewall chain, then it did not get DNATed.
                    // It means the packet cannot go thru the firewall, then mark it for DROP
                    writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
                } else {
                    // No endpoints.
                    writeLine(proxier.filterRules,
                        "-A", string(kubeServicesChain),
                        "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
                        "-m", protocol, "-p", protocol,
                        "-d", utilproxy.ToCIDR(net.ParseIP(ingress)),
                        "--dport", strconv.Itoa(svcInfo.Port()),
                        "-j", "REJECT",
                    )
                }
            }
        }

        // Capture nodeports.  If we had more than 2 rules it might be
        // worthwhile to make a new per-service chain for nodeport rules, but
        // with just 2 rules it ends up being a waste and a cognitive burden.
        if svcInfo.NodePort() != 0 {
            // Hold the local port open so no other process can open it
            // (because the socket might open but it would never work).
            if len(nodeAddresses) == 0 {
                continue
            }

            lps := make([]utilproxy.LocalPort, 0)
            for address := range nodeAddresses {
                lp := utilproxy.LocalPort{
                    Description: "nodePort for " + svcNameString,
                    IP:          address,
                    Port:        svcInfo.NodePort(),
                    Protocol:    protocol,
                }
                if utilproxy.IsZeroCIDR(address) {
                    // Empty IP address means all
                    lp.IP = ""
                    lps = append(lps, lp)
                    // If we encounter a zero CIDR, then there is no point in processing the rest of the addresses.
                    break
                }
                lps = append(lps, lp)
            }

            // For ports on node IPs, open the actual port and hold it.
            for _, lp := range lps {
                if proxier.portsMap[lp] != nil {
                    klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
                    replacementPortsMap[lp] = proxier.portsMap[lp]
                } else if svcInfo.Protocol() != v1.ProtocolSCTP {
                    socket, err := proxier.portMapper.OpenLocalPort(&lp, isIPv6)
                    if err != nil {
                        klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
                        continue
                    }
                    if lp.Protocol == "udp" {
                        // TODO: We might have multiple services using the same port, and this will clear conntrack for all of them.
                        // This is very low impact. The NodePort range is intentionally obscure, and unlikely to actually collide with real Services.
                        // This only affects UDP connections, which are not common.
                        // See issue: https://github.com/kubernetes/kubernetes/issues/49881
                        err := conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
                        if err != nil {
                            klog.Errorf("Failed to clear udp conntrack for port %d, error: %v", lp.Port, err)
                        }
                    }
                    replacementPortsMap[lp] = socket
                }
            }

            if hasEndpoints {
                args = append(args[:0],
                    "-A", string(kubeNodePortsChain),
                    "-m", "comment", "--comment", svcNameString,
                    "-m", protocol, "-p", protocol,
                    "--dport", strconv.Itoa(svcInfo.NodePort()),
                )
                if !svcInfo.OnlyNodeLocalEndpoints() {
                    // Nodeports need SNAT, unless they're local.
                    writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
                    // Jump to the service chain.
                    writeLine(proxier.natRules, append(args, "-j", string(svcChain))...)
                } else {
                    // TODO: Make all nodePorts jump to the firewall chain.
                    // Currently we only create it for loadbalancers (#33586).

                    // Fix localhost martian source error
                    loopback := "127.0.0.0/8"
                    if isIPv6 {
                        loopback = "::1/128"
                    }
                    writeLine(proxier.natRules, append(args, "-s", loopback, "-j", string(KubeMarkMasqChain))...)
                    writeLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...)
                }
            } else {
                // No endpoints.
                writeLine(proxier.filterRules,
                    "-A", string(kubeExternalServicesChain),
                    "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
                    "-m", "addrtype", "--dst-type", "LOCAL",
                    "-m", protocol, "-p", protocol,
                    "--dport", strconv.Itoa(svcInfo.NodePort()),
                    "-j", "REJECT",
                )
            }
        }

        if !hasEndpoints {
            continue
        }

        // Generate the per-endpoint chains.  We do this in multiple passes so we
        // can group rules together.
        // These two slices parallel each other - keep in sync
        endpoints = endpoints[:0]
        endpointChains = endpointChains[:0]
        var endpointChain utiliptables.Chain
        for _, ep := range allEndpoints {
            epInfo, ok := ep.(*endpointsInfo)
            if !ok {
                klog.Errorf("Failed to cast endpointsInfo %q", ep.String())
                continue
            }

            endpoints = append(endpoints, epInfo)
            endpointChain = epInfo.endpointChain(svcNameString, protocol)
            endpointChains = append(endpointChains, endpointChain)

            // Create the endpoint chain, retaining counters if possible.
            if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok {
                writeBytesLine(proxier.natChains, chain)
            } else {
                writeLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain))
            }
            activeNATChains[endpointChain] = true
        }

        // First write session affinity rules, if applicable.
        if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
            for _, endpointChain := range endpointChains {
                args = append(args[:0],
                    "-A", string(svcChain),
                )
                args = proxier.appendServiceCommentLocked(args, svcNameString)
                args = append(args,
                    "-m", "recent", "--name", string(endpointChain),
                    "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
                    "-j", string(endpointChain),
                )
                writeLine(proxier.natRules, args...)
            }
        }

        // Now write loadbalancing & DNAT rules.
        n := len(endpointChains)
        localEndpointChains := make([]utiliptables.Chain, 0)
        for i, endpointChain := range endpointChains {
            // Write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic.
            if svcInfo.OnlyNodeLocalEndpoints() && endpoints[i].IsLocal {
                localEndpointChains = append(localEndpointChains, endpointChains[i])
            }

            epIP := endpoints[i].IP()
            if epIP == "" {
                // Error parsing this endpoint has been logged. Skip to next endpoint.
                continue
            }

            // Balancing rules in the per-service chain.
            args = append(args[:0], "-A", string(svcChain))
            args = proxier.appendServiceCommentLocked(args, svcNameString)
            if i < (n - 1) {
                // Each rule is a probabilistic match.
                args = append(args,
                    "-m", "statistic",
                    "--mode", "random",
                    "--probability", proxier.probability(n-i))
            }
            // The final (or only if n == 1) rule is a guaranteed match.
            args = append(args, "-j", string(endpointChain))
            writeLine(proxier.natRules, args...)

            // Rules in the per-endpoint chain.
            args = append(args[:0], "-A", string(endpointChain))
            args = proxier.appendServiceCommentLocked(args, svcNameString)
            // Handle traffic that loops back to the originator with SNAT.
            writeLine(proxier.natRules, append(args,
                "-s", utilproxy.ToCIDR(net.ParseIP(epIP)),
                "-j", string(KubeMarkMasqChain))...)
            // Update client-affinity lists.
            if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
                args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
            }
            // DNAT to final destination.
            args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].Endpoint)
            writeLine(proxier.natRules, args...)
        }

        // The logic below this applies only if this service is marked as OnlyLocal
        if !svcInfo.OnlyNodeLocalEndpoints() {
            continue
        }

        // First rule in the chain redirects all pod -> external VIP traffic to the
        // Service's ClusterIP instead. This happens whether or not we have local
        // endpoints; only if localDetector is implemented
        if proxier.localDetector.IsImplemented() {
            args = append(args[:0],
                "-A", string(svcXlbChain),
                "-m", "comment", "--comment",
                `"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`,
            )
            writeLine(proxier.natRules, proxier.localDetector.JumpIfLocal(args, string(svcChain))...)
        }

        // Next, redirect all src-type=LOCAL -> LB IP to the service chain for externalTrafficPolicy=Local
        // This allows traffic originating from the host to be redirected to the service correctly,
        // otherwise traffic to LB IPs are dropped if there are no local endpoints.
        args = append(args[:0], "-A", string(svcXlbChain))
        writeLine(proxier.natRules, append(args,
            "-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s LB IP"`, svcNameString),
            "-m", "addrtype", "--src-type", "LOCAL", "-j", string(KubeMarkMasqChain))...)
        writeLine(proxier.natRules, append(args,
            "-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString),
            "-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain))...)

        numLocalEndpoints := len(localEndpointChains)
        if numLocalEndpoints == 0 {
            // Blackhole all traffic since there are no local endpoints
            args = append(args[:0],
                "-A", string(svcXlbChain),
                "-m", "comment", "--comment",
                fmt.Sprintf(`"%s has no local endpoints"`, svcNameString),
                "-j",
                string(KubeMarkDropChain),
            )
            writeLine(proxier.natRules, args...)
        } else {
            // First write session affinity rules only over local endpoints, if applicable.
            if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
                for _, endpointChain := range localEndpointChains {
                    writeLine(proxier.natRules,
                        "-A", string(svcXlbChain),
                        "-m", "comment", "--comment", svcNameString,
                        "-m", "recent", "--name", string(endpointChain),
                        "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
                        "-j", string(endpointChain))
                }
            }

            // Setup probability filter rules only over local endpoints
            for i, endpointChain := range localEndpointChains {
                // Balancing rules in the per-service chain.
                args = append(args[:0],
                    "-A", string(svcXlbChain),
                    "-m", "comment", "--comment",
                    fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcNameString),
                )
                if i < (numLocalEndpoints - 1) {
                    // Each rule is a probabilistic match.
                    args = append(args,
                        "-m", "statistic",
                        "--mode", "random",
                        "--probability", proxier.probability(numLocalEndpoints-i))
                }
                // The final (or only if n == 1) rule is a guaranteed match.
                args = append(args, "-j", string(endpointChain))
                writeLine(proxier.natRules, args...)
            }
        }
    }

    // Delete chains no longer in use.
    for chain := range existingNATChains {
        if !activeNATChains[chain] {
            chainString := string(chain)
            if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") {
                // Ignore chains that aren't ours.
                continue
            }
            // We must (as per iptables) write a chain-line for it, which has
            // the nice effect of flushing the chain.  Then we can remove the
            // chain.
            writeBytesLine(proxier.natChains, existingNATChains[chain])
            writeLine(proxier.natRules, "-X", chainString)
        }
    }

    // Finally, tail-call to the nodeports chain.  This needs to be after all
    // other service portal rules.
    isIPv6 := proxier.iptables.IsIPv6()
    for address := range nodeAddresses {
        // TODO(thockin, m1093782566): If/when we have dual-stack support we will want to distinguish v4 from v6 zero-CIDRs.
        if utilproxy.IsZeroCIDR(address) {
            args = append(args[:0],
                "-A", string(kubeServicesChain),
                "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
                "-m", "addrtype", "--dst-type", "LOCAL",
                "-j", string(kubeNodePortsChain))
            writeLine(proxier.natRules, args...)
            // Nothing else matters after the zero CIDR.
            break
        }
        // Ignore IP addresses with incorrect version
        if isIPv6 && !utilnet.IsIPv6String(address) || !isIPv6 && utilnet.IsIPv6String(address) {
            klog.Errorf("IP address %s has incorrect IP version", address)
            continue
        }
        // create nodeport rules for each IP one by one
        args = append(args[:0],
            "-A", string(kubeServicesChain),
            "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
            "-d", address,
            "-j", string(kubeNodePortsChain))
        writeLine(proxier.natRules, args...)
    }

    // Drop the packets in INVALID state, which would potentially cause
    // unexpected connection reset.
    // https://github.com/kubernetes/kubernetes/issues/74839
    writeLine(proxier.filterRules,
        "-A", string(kubeForwardChain),
        "-m", "conntrack",
        "--ctstate", "INVALID",
        "-j", "DROP",
    )

    // If the masqueradeMark has been added then we want to forward that same
    // traffic, this allows NodePort traffic to be forwarded even if the default
    // FORWARD policy is not accept.
    writeLine(proxier.filterRules,
        "-A", string(kubeForwardChain),
        "-m", "comment", "--comment", `"kubernetes forwarding rules"`,
        "-m", "mark", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark),
        "-j", "ACCEPT",
    )

    // The following two rules ensure the traffic after the initial packet
    // accepted by the "kubernetes forwarding rules" rule above will be
    // accepted.
    writeLine(proxier.filterRules,
        "-A", string(kubeForwardChain),
        "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`,
        "-m", "conntrack",
        "--ctstate", "RELATED,ESTABLISHED",
        "-j", "ACCEPT",
    )
    writeLine(proxier.filterRules,
        "-A", string(kubeForwardChain),
        "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`,
        "-m", "conntrack",
        "--ctstate", "RELATED,ESTABLISHED",
        "-j", "ACCEPT",
    )

    // Write the end-of-table markers.
    writeLine(proxier.filterRules, "COMMIT")
    writeLine(proxier.natRules, "COMMIT")

    // Sync rules.
    // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table
    proxier.iptablesData.Reset()
    proxier.iptablesData.Write(proxier.filterChains.Bytes())
    proxier.iptablesData.Write(proxier.filterRules.Bytes())
    proxier.iptablesData.Write(proxier.natChains.Bytes())
    proxier.iptablesData.Write(proxier.natRules.Bytes())

    klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
    err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
    if err != nil {
        klog.Errorf("Failed to execute iptables-restore: %v", err)
        metrics.IptablesRestoreFailuresTotal.Inc()
        // Revert new local ports.
        klog.V(2).Infof("Closing local ports after iptables-restore failure")
        utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
        return
    }
    success = true

    for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
        for _, lastChangeTriggerTime := range lastChangeTriggerTimes {
            latency := metrics.SinceInSeconds(lastChangeTriggerTime)
            metrics.NetworkProgrammingLatency.Observe(latency)
            klog.V(4).Infof("Network programming of %s took %f seconds", name, latency)
        }
    }

    // Close old local ports and save new ones.
    for k, v := range proxier.portsMap {
        if replacementPortsMap[k] == nil {
            v.Close()
        }
    }
    proxier.portsMap = replacementPortsMap

    if proxier.healthzServer != nil {
        proxier.healthzServer.Updated()
    }
    metrics.SyncProxyRulesLastTimestamp.SetToCurrentTime()

    // Update service healthchecks.  The endpoints list might include services that are
    // not "OnlyLocal", but the services list will not, and the serviceHealthServer
    // will just drop those endpoints.
    if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
        klog.Errorf("Error syncing healthcheck services: %v", err)
    }
    if err := proxier.serviceHealthServer.SyncEndpoints(endpointUpdateResult.HCEndpointsLocalIPSize); err != nil {
        klog.Errorf("Error syncing healthcheck endpoints: %v", err)
    }

    // Finish housekeeping.
    // TODO: these could be made more consistent.
    for _, svcIP := range staleServices.UnsortedList() {
        if err := conntrack.ClearEntriesForIP(proxier.exec, svcIP, v1.ProtocolUDP); err != nil {
            klog.Errorf("Failed to delete stale service IP %s connections, error: %v", svcIP, err)
        }
    }
    proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
}

pkg/util/async/bounded_frequency_runner.go 中

func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
    timer := &realTimer{timer: time.NewTimer(0)} // will tick immediately
    <-timer.C()                                  // consume the first tick
    return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
}

func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner {
    if maxInterval < minInterval {
        panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, maxInterval, minInterval))
    }
    if timer == nil {
        panic(fmt.Sprintf("%s: timer must be non-nil", name))
    }

    bfr := &BoundedFrequencyRunner{
        name:        name,
        fn:          fn,
        minInterval: minInterval,
        maxInterval: maxInterval,
        run:         make(chan struct{}, 1),
        retry:       make(chan struct{}, 1),
        timer:       timer,
    }
    if minInterval == 0 {
        bfr.limiter = nullLimiter{}
    } else {
        // allow burst updates in short succession
        qps := float32(time.Second) / float32(minInterval)
        bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
    }
    return bfr
}

func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
    klog.V(3).Infof("%s Loop running", bfr.name)
    bfr.timer.Reset(bfr.maxInterval)
    for {
        select {
        case <-stop:
            bfr.stop()
            klog.V(3).Infof("%s Loop stopping", bfr.name)
            return
        case <-bfr.timer.C():
            bfr.tryRun()
        case <-bfr.run:
            bfr.tryRun()
        case <-bfr.retry:
            bfr.doRetry()
        }
    }
}

func (bfr *BoundedFrequencyRunner) Run() {
    // If it takes a lot of time to run the underlying function, noone is really
    // processing elements from <run> channel. So to avoid blocking here on the
    // putting element to it, we simply skip it if there is already an element
    // in it.
    select {
    case bfr.run <- struct{}{}:
    default:
    }
}

func (bfr *BoundedFrequencyRunner) tryRun() {
    bfr.mu.Lock()
    defer bfr.mu.Unlock()

    if bfr.limiter.TryAccept() {
        // We're allowed to run the function right now.
        bfr.fn()
        bfr.lastRun = bfr.timer.Now()
        bfr.timer.Stop()
        bfr.timer.Reset(bfr.maxInterval)
        klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
        return
    }

    // It can't run right now, figure out when it can run next.
    elapsed := bfr.timer.Since(bfr.lastRun)   // how long since last run
    nextPossible := bfr.minInterval - elapsed // time to next possible run
    nextScheduled := bfr.timer.Remaining()    // time to next scheduled run
    klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)

    // It's hard to avoid race conditions in the unit tests unless we always reset
    // the timer here, even when it's unchanged
    if nextPossible < nextScheduled {
        nextScheduled = nextPossible
    }
    bfr.timer.Stop()
    bfr.timer.Reset(nextScheduled)
}

func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyncPeriod time.Duration) *EndpointsConfig {
    result := &EndpointsConfig{
        listerSynced: endpointsInformer.Informer().HasSynced,
    }

    endpointsInformer.Informer().AddEventHandlerWithResyncPeriod(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    result.handleAddEndpoints,
            UpdateFunc: result.handleUpdateEndpoints,
            DeleteFunc: result.handleDeleteEndpoints,
        },
        resyncPeriod,
    )

    return result
}

func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) {
    c.eventHandlers = append(c.eventHandlers, handler)
}

func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
    klog.Info("Starting endpoints config controller")

    if !cache.WaitForNamedCacheSync("endpoints config", stopCh, c.listerSynced) {
        return
    }

    for i := range c.eventHandlers {
        klog.V(3).Infof("Calling handler.OnEndpointsSynced()")
        c.eventHandlers[i].OnEndpointsSynced()
    }
}

相关文章

网友评论

      本文标题:k8s 之 kube-proxy 源码简单分析

      本文链接:https://www.haomeiwen.com/subject/gitnhktx.html