背景
当apiserver由于各种原因重启后,如果在cache未就绪时,原来的很多list请求都会访问到etcd(如针对resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact的list请求),这样对etcd和apiserver的稳定性是巨大的挑战.
引入WatchCacheInitialization后,如果cache未就绪时:
针对list请求会返回429(除了label/field selector为空且limit大于0的请求会转发给etcd)
针对get请求直接转发给etcd
针对watch请求会返回429
具体设计可以看https://kep.k8s.io/4568
源码
apiserver PostStartHook
需要开启APIserver的WatchCacheInitializationPostStartHook特性
给cache一定的时间(默认1分钟)进行初始化
pkg/controlplane/apiserver/server.go中
构建apiserver
func (c completedConfig) New(name string, delegationTarget genericapiserver.DelegationTarget) (*Server, error) {
...
注册存储就绪的PostStartHook
if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.WatchCacheInitializationPostStartHook) {
s.GenericAPIServer.AddPostStartHookOrDie("storage-readiness", s.GenericAPIServer.StorageReadinessHook.Hook)
}
...
}
staging/src/k8s.io/apiserver/pkg/server/storage_readiness_hook.go中
注册存储就绪检查
func (h *StorageReadinessHook) RegisterStorage(gvr metav1.GroupVersionResource, storage rest.StorageWithReadiness) {
h.lock.Lock()
defer h.lock.Unlock()
if _, ok := h.checks[gvr.String()]; !ok {
h.checks[gvr.String()] = storage.ReadinessCheck
} else {
klog.Errorf("Registering storage readiness hook for %s again: ", gvr.String())
}
}
存储就绪的hook
func (h *StorageReadinessHook) Hook(ctx PostStartHookContext) error {
默认1分钟超时
deadlineCtx, cancel := context.WithTimeout(ctx, h.timeout)
defer cancel()
轮询直到超时
err := wait.PollUntilContextCancel(deadlineCtx, 100*time.Millisecond, true,
func(_ context.Context) (bool, error) {
if ok := h.check(); ok {
return true, nil
}
return false, nil
})
超时不返回错误
if errors.Is(err, context.DeadlineExceeded) {
klog.Warningf("Deadline exceeded while waiting for storage readiness... ignoring")
}
return nil
}
staging/src/k8s.io/apiserver/pkg/server/config.go中
构建apiserver配置
func NewConfig(codecs serializer.CodecFactory) *Config {
...
存储初始化超时时间
StorageInitializationTimeout: time.Minute,
...
}
get请求
需要开启apiserver的ResilientWatchCacheInitialization特性
staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go中
func (c *CacheDelegator) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
...
转发给etcd
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.cacher.Ready() {
// If Cache is not initialized, delegator Get requests to storage
// as described in https://kep.k8s.io/4568
span.AddEvent("About to Get from underlying storage - cache not initialized")
return c.storage.Get(ctx, key, opts, objPtr)
}
}
...
}
list请求
需要开启apiserver的ResilientWatchCacheInitialization特性
staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go中
func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
...
除非cache未就绪且同时满足label selector为空,fieldselector为空,limit大于0
此时才会将请求转发到etcd
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.cacher.Ready() && shouldDelegateListOnNotReadyCache(opts) {
// If Cacher is not initialized, delegator List requests to storage
// as described in https://kep.k8s.io/4568
return c.storage.GetList(ctx, key, opts, listObj)
}
...
}
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go中
list请求
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
...
cache未就绪则返回429
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if downtime, err := c.ready.check(); err != nil {
// If Cacher is not initialized, reject List requests
// as described in https://kep.k8s.io/4568
return errors.NewTooManyRequests(err.Error(), calculateRetryAfterForUnreadyCache(downtime))
}
...
}
watch请求
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go中
Watch请求
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
...
如果cache没就绪则返回429
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
var err error
var downtime time.Duration
readyGeneration, downtime, err = c.ready.checkAndReadGeneration()
if err != nil {
return nil, errors.NewTooManyRequests(err.Error(), calculateRetryAfterForUnreadyCache(downtime))
}
...
}












网友评论