美文网首页
从源码看WatchCacheInitialization

从源码看WatchCacheInitialization

作者: wwq2020 | 来源:发表于2025-04-21 21:43 被阅读0次

背景

当apiserver由于各种原因重启后,如果在cache未就绪时,原来的很多list请求都会访问到etcd(如针对resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact的list请求),这样对etcd和apiserver的稳定性是巨大的挑战.
引入WatchCacheInitialization后,如果cache未就绪时:
针对list请求会返回429(除了label/field selector为空且limit大于0的请求会转发给etcd)
针对get请求直接转发给etcd
针对watch请求会返回429

具体设计可以看https://kep.k8s.io/4568

源码

apiserver PostStartHook

需要开启APIserver的WatchCacheInitializationPostStartHook特性
给cache一定的时间(默认1分钟)进行初始化
pkg/controlplane/apiserver/server.go中

构建apiserver
func (c completedConfig) New(name string, delegationTarget genericapiserver.DelegationTarget) (*Server, error) {
  ...
注册存储就绪的PostStartHook
    if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.WatchCacheInitializationPostStartHook) {
        s.GenericAPIServer.AddPostStartHookOrDie("storage-readiness", s.GenericAPIServer.StorageReadinessHook.Hook)
    }
  ...
}

staging/src/k8s.io/apiserver/pkg/server/storage_readiness_hook.go中

注册存储就绪检查
func (h *StorageReadinessHook) RegisterStorage(gvr metav1.GroupVersionResource, storage rest.StorageWithReadiness) {
    h.lock.Lock()
    defer h.lock.Unlock()

    if _, ok := h.checks[gvr.String()]; !ok {
        h.checks[gvr.String()] = storage.ReadinessCheck
    } else {
        klog.Errorf("Registering storage readiness hook for %s again: ", gvr.String())
    }
}

存储就绪的hook
func (h *StorageReadinessHook) Hook(ctx PostStartHookContext) error {
默认1分钟超时
    deadlineCtx, cancel := context.WithTimeout(ctx, h.timeout)
    defer cancel()
  轮询直到超时
    err := wait.PollUntilContextCancel(deadlineCtx, 100*time.Millisecond, true,
        func(_ context.Context) (bool, error) {
            if ok := h.check(); ok {
                return true, nil
            }
            return false, nil
        })
  超时不返回错误
    if errors.Is(err, context.DeadlineExceeded) {
        klog.Warningf("Deadline exceeded while waiting for storage readiness... ignoring")
    }
    return nil
}

staging/src/k8s.io/apiserver/pkg/server/config.go中

构建apiserver配置
func NewConfig(codecs serializer.CodecFactory) *Config {
  ...
存储初始化超时时间
        StorageInitializationTimeout:   time.Minute,
 ...
}

get请求

需要开启apiserver的ResilientWatchCacheInitialization特性
staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go中

func (c *CacheDelegator) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
  ...
转发给etcd
    if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
        if !c.cacher.Ready() {
            // If Cache is not initialized, delegator Get requests to storage
            // as described in https://kep.k8s.io/4568
            span.AddEvent("About to Get from underlying storage - cache not initialized")
            return c.storage.Get(ctx, key, opts, objPtr)
        }
    }
 ...
}

list请求

需要开启apiserver的ResilientWatchCacheInitialization特性
staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go中

func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
  ...
除非cache未就绪且同时满足label selector为空,fieldselector为空,limit大于0
此时才会将请求转发到etcd
    if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
        if !c.cacher.Ready() && shouldDelegateListOnNotReadyCache(opts) {
            // If Cacher is not initialized, delegator List requests to storage
            // as described in https://kep.k8s.io/4568
            return c.storage.GetList(ctx, key, opts, listObj)
        }
  ...
}

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go中

list请求
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
  ...
cache未就绪则返回429
    if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
        if downtime, err := c.ready.check(); err != nil {
            // If Cacher is not initialized, reject List requests
            // as described in https://kep.k8s.io/4568
            return errors.NewTooManyRequests(err.Error(), calculateRetryAfterForUnreadyCache(downtime))
        }  
...
}

watch请求

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go中

Watch请求
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
  ...
    如果cache没就绪则返回429
    if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
        var err error
        var downtime time.Duration
        readyGeneration, downtime, err = c.ready.checkAndReadGeneration()
        if err != nil {
            return nil, errors.NewTooManyRequests(err.Error(), calculateRetryAfterForUnreadyCache(downtime))
        }
  ...
}

相关文章

网友评论

      本文标题:从源码看WatchCacheInitialization

      本文链接:https://www.haomeiwen.com/subject/cftibjtx.html