- [k8s源码分析][kube-scheduler]schedul
- [k8s源码分析][kube-scheduler]schedul
- [k8s源码分析][kube-scheduler]schedul
- [k8s源码分析][kube-scheduler]schedul
- [k8s源码分析][kube-scheduler]schedul
- [k8s源码分析][kube-scheduler]schedul
- [k8s源码分析][kube-scheduler]schedul
- [k8s源码分析][kube-scheduler]schedul
- [k8s源码分析][kube-scheduler]schedul
- [k8s源码分析][kube-scheduler]schedul
1. 前言
转载请说明原文出处, 尊重他人劳动成果!
本文将分析
kubernetes/pkg/scheduler/internal中的文件, 其中包括node_tree.go和cache.go
源码位置: https://github.com/nicktming/kubernetes/tree/tming-v1.13/pkg/scheduler/internal/cache
分支: tming-v1.13 (基于v1.13版本)
2. node_tree
2.1 nodeArray
type nodeArray struct {
// 所有的节点
nodes []string
// 遍历nodes时的下标位置
lastIndex int
}
nodeArray其实就是一个存节点的数组, 并且留了一个下标在next()中体现
//取nodeArray中节点的下一个
func (na *nodeArray) next() (nodeName string, exhausted bool) {
if len(na.nodes) == 0 {
klog.Error("The nodeArray is empty. It should have been deleted from NodeTree.")
return "", false
}
if na.lastIndex >= len(na.nodes) {
return "", true
}
nodeName = na.nodes[na.lastIndex]
na.lastIndex++
return nodeName, false
}
如果遍历到最后一个则返回空 并且返回该数组已经遍历结束. 因此需要有一个
lastIndex来表示下一个要遍历的节点的位置.
2.2 NodeTree
type NodeTree struct {
// key 是一个zone value是一个nodeArray(一个存有该zones下所有的array)
tree map[string]*nodeArray // a map from zone (region-zone) to an array of nodes in the zone.
// 所有的zones
zones []string // a list of all the zones in the tree (keys)
// 遍历zones的时候的下标所在位置
zoneIndex int
// 节点的个数
NumNodes int
mu sync.RWMutex
}
可以看到
NodeTree本质上就是一个Map来存储每个zone下有哪些节点.
zones: 存着所有的zones, 与tree的key组成的数组是一样的.
zoneIndex: 与nodeArray中的lastIndex类似, 它存着zones数组中要遍历的下一个zone的下标.
接下来可以看看
NodeTree的AddNode方法进而来理解该数据结构.
2.2.1 AddNode
func (nt *NodeTree) AddNode(n *v1.Node) {
nt.mu.Lock()
defer nt.mu.Unlock()
nt.addNode(n)
}
/**
1. 获得该节点所在的zone
2. 如果该zone不存在 添加到zones 和 tree中
3. 如果该zone存在
3.1 检查该node是不是已经在tree中nodearray中
3.2 如存在则直接返回 不存在则添加
*/
func (nt *NodeTree) addNode(n *v1.Node) {
zone := utilnode.GetZoneKey(n)
if na, ok := nt.tree[zone]; ok {
for _, nodeName := range na.nodes {
if nodeName == n.Name {
klog.Warningf("node %v already exist in the NodeTree", n.Name)
return
}
}
na.nodes = append(na.nodes, n.Name)
} else {
nt.zones = append(nt.zones, zone)
nt.tree[zone] = &nodeArray{nodes: []string{n.Name}, lastIndex: 0}
}
klog.V(5).Infof("Added node %v in group %v to NodeTree", n.Name, zone)
nt.NumNodes++
}
很常规的添加节点方法, 还有删除节点, 增加删除
zone方法就不介绍了, 基本上就是Map的一些操作.
2.2.3 Next
// 从头开始 (因为已经整个Map遍历完了)
func (nt *NodeTree) resetExhausted() {
for _, na := range nt.tree {
na.lastIndex = 0
}
nt.zoneIndex = 0
}
// Next returns the name of the next node. NodeTree iterates over zones and in each zone iterates
// over nodes in a round robin fashion.
// Next() 返回下一个节点
// 遍历整个zones中的每个node
// 说白了就是把整个Map结构想像成一个List 然后遍历它
func (nt *NodeTree) Next() string {
nt.mu.Lock()
defer nt.mu.Unlock()
if len(nt.zones) == 0 {
return ""
}
numExhaustedZones := 0
for {
if nt.zoneIndex >= len(nt.zones) {
nt.zoneIndex = 0
}
zone := nt.zones[nt.zoneIndex]
nt.zoneIndex++
// We do not check the exhausted zones before calling next() on the zone. This ensures
// that if more nodes are added to a zone after it is exhausted, we iterate over the new nodes.
nodeName, exhausted := nt.tree[zone].next()
if exhausted {
numExhaustedZones++
if numExhaustedZones >= len(nt.zones) { // all zones are exhausted. we should reset.
nt.resetExhausted()
}
} else {
return nodeName
}
}
}
2.2.4 总结
可以看到
NodeTree其实就是一个Map结构, 存储着所有zone中所有的节点. 类似于java中Map<String, List<String>>的数据结构.
但是为什么又有一些别的属性呢, 主要是为了实现
Next()方法, 该方法相当于从Map中一个一个遍历的取节点, 所以才有zoneIndex,lastIndex等属性, 进而就有了nodeArray结构体.
所以整个
NodeTree除了维护自身数据结构的增删改查等方法以外就是该Next方法供外界调用.
3. cache
整个
schedulerCache数据结构的设计是为了实现该Cache接口.
type Cache interface {
// 将该pod设置为assumed 状态
AssumePod(pod *v1.Pod) error
// 设置该Pod bindingFinished=true
FinishBinding(pod *v1.Pod) error
// 从cache中删除该Pod(该pod必须为assumed状态)
ForgetPod(pod *v1.Pod) error
// AddPod either confirms a pod if it's assumed, or adds it back if it's expired.
// If added back, the pod's information would be added again.
AddPod(pod *v1.Pod) error
// 只能从Added 状态调用
UpdatePod(oldPod, newPod *v1.Pod) error
// 只能从Added 状态调用
RemovePod(pod *v1.Pod) error
// 从podState中获得一个pod
GetPod(pod *v1.Pod) (*v1.Pod, error)
// 判断该pod是否为assumed 状态
IsAssumedPod(pod *v1.Pod) (bool, error)
// 添加一个节点 该节点所有信息会保存起来
AddNode(node *v1.Node) error
// 更新节点
UpdateNode(oldNode, newNode *v1.Node) error
// 删除节点
RemoveNode(node *v1.Node) error
// UpdateNodeNameToInfoMap updates the passed infoMap to the current contents of Cache.
// The node info contains aggregated information of pods scheduled (including assumed to be)
// on this node.
UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error
// 从nodes中返回所有pod
List(labels.Selector) ([]*v1.Pod, error)
// 从nodes中返回所有符合条件的pod
FilteredList(filter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error)
// 备份assumed pods 和 节点
Snapshot() *Snapshot
// 返回nodetree
NodeTree() *NodeTree
}
type Snapshot struct {
AssumedPods map[string]bool
Nodes map[string]*schedulercache.NodeInfo
}
在该数据结构的设计中,
pod有5个状态Initial,Assumed,Added,Expired和Deleted.
// +-------------------------------------------+ +----+
// | Add | | |
// | | | | Update
// + Assume Add v v |
//Initial +--------> Assumed +------------+---> Added <--+
// ^ + + | +
// | | | | |
// | | | Add | | Remove
// | | | | |
// | | | + |
// +----------------+ +-----------> Expired +----> Deleted
// Forget Expire
关系如上
Initial状态可以通过调用AssumePod方法进而成为Assumed状态.
Initial状态可以通过调用AddPod方法进而成为Added状态.
Assumed状态可以通过调用AddPod方法进而成为Added状态.
Assumed状态可以通过调用ForgetPod方法进而成为Initial状态.
Assumed状态可以通过调用ExpirePod方法(过期时间到了)进而成为Expired状态.
Expired状态可以通过调用AddPod方法进而成为Added状态.
Added状态可以通过调用UpdatePod方法进而成为Added状态.
Added状态可以通过调用RemovePod方法进而成为Deleted状态.
其中
Deleted,Initial和Expired实际上该pod在cache中是不存在的.
另外有一段话说得挺清楚的, 自行理解吧.
// Cache collects pods' information and provides node-level aggregated information.
// It's intended for generic scheduler to do efficient lookup.
// Cache's operations are pod centric. It does incremental updates based on pod events.
// Pod events are sent via network. We don't have guaranteed delivery of all events:
// We use Reflector to list and watch from remote.
// Reflector might be slow and do a relist, which would lead to missing events.
// Note that an assumed pod can expire, because if we haven't received Add event notifying us
// for a while, there might be some problems and we shouldn't keep the pod in cache anymore.
//
// Note that "Initial", "Expired", and "Deleted" pods do not actually exist in cache.
// Based on existing use cases, we are making the following assumptions:
// - No pod would be assumed twice
// - A pod could be added without going through scheduler. In this case, we will see Add but not Assume event.
// - If a pod wasn't added, it wouldn't be removed or updated.
// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue,
// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.
3.1 schedulerCache
3.1.1 结构
整个结构有三个结构体, 包括
schedulerCache,podState和imageState.
schedulerCache中主要有assumedPods用一个map结构存储该pod是否是assumed状态.podStates用map结构存储该pod的状态, 其中包括是否已经完成binding.nodes用一个map存储该节点的总体信息以及有哪些pod.imageStates用一个map存储着该image的信息, 其中包括哪些节点有该image.
type schedulerCache struct {
stop <-chan struct{}
// ttl是assume pod 过期的时间
ttl time.Duration
// period是每隔period调用清理过期的assumed pod
period time.Duration
// This mutex guards all fields within this cache struct.
mu sync.RWMutex
// a set of assumed pod keys.
// The key could further be used to get an entry in podStates.
// 已经assumed pod
assumedPods map[string]bool
// a map from pod key to podState.
// 存着一些pod的状态
podStates map[string]*podState
// 每个节点的信息
nodes map[string]*schedulercache.NodeInfo
nodeTree *NodeTree
// A map from image name to its imageState.
// 每个image的信息
imageStates map[string]*imageState
}
type podState struct {
pod *v1.Pod
// Used by assumedPod to determinate expiration.
// assumedPod过期时间
deadline *time.Time
// Used to block cache from expiring assumedPod if binding still runs
// bindingFinished为true的时候 过期才会起作用
bindingFinished bool
}
type imageState struct {
// Size of the image
// iamge 大小
size int64
// A set of node names for nodes having this image present
// 拥有该image的所有节点
nodes sets.String
}
3.1.2 AssumePod
可以看到调用
assumePod就是将该Pod存入到此三个数据结构中PodState,assumedPods和nodes中.
// 1. 获得key
// 2. 根据podStates来检查该pod是否已经存在 如果存在则返回错误, 因为一个pod不能assume两次
// 3. 调用addPod添加该pod
// 4. 存到podState中 此时(deadline和bindingFinished没有被赋值)
// 5. 存到assumedPods中 表明该pod处于assume状态
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
key, err := schedulercache.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
if _, ok := cache.podStates[key]; ok {
return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
}
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
cache.assumedPods[key] = true
return nil
}
// Assumes that lock is already acquired.
// 1. 从nodes中得到NodeInfo
// 2. 然后将该Pod加入到NodeInfo中
func (cache *schedulerCache) addPod(pod *v1.Pod) {
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
n = schedulercache.NewNodeInfo()
cache.nodes[pod.Spec.NodeName] = n
}
n.AddPod(pod)
}
类似的相反操作就是
ForgetPod是从schedulerCache中完全删除该Pod.
3.1.3 New 和 run方法
var (
cleanAssumedPeriod = 1 * time.Second
)
// New returns a Cache implementation.
// It automatically starts a go routine that manages expiration of assumed pods.
// "ttl" is how long the assumed pod will get expired.
// "stop" is the channel that would close the background goroutine.
func New(ttl time.Duration, stop <-chan struct{}) Cache {
cache := newSchedulerCache(ttl, cleanAssumedPeriod, stop)
cache.run()
return cache
}
func (cache *schedulerCache) run() {
go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
}
func (cache *schedulerCache) cleanupExpiredAssumedPods() {
cache.cleanupAssumedPods(time.Now())
}
// cleanupAssumedPods exists for making test deterministic by taking time as input argument.
func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
cache.mu.Lock()
defer cache.mu.Unlock()
// 从assumed状态的pods中遍历
for key := range cache.assumedPods {
ps, ok := cache.podStates[key]
if !ok {
panic("Key found in assumed set but not in podStates. Potentially a logical error.")
}
// 如果没有完成binding 跳过
if !ps.bindingFinished {
klog.V(3).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
ps.pod.Namespace, ps.pod.Name)
continue
}
// 如果过期时间已经到了 则调用expirePod方法
if now.After(*ps.deadline) {
klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
if err := cache.expirePod(key, ps); err != nil {
klog.Errorf("ExpirePod failed for %s: %v", key, err)
}
}
}
}
// 1. 调用removePod删除该节点 (从nodes中删除)
// 2. 从assumedPods中删除
// 3. 从podStates中删除
// 整个已经从schedulerCache中完全删除
func (cache *schedulerCache) expirePod(key string, ps *podState) error {
if err := cache.removePod(ps.pod); err != nil {
return err
}
delete(cache.assumedPods, key)
delete(cache.podStates, key)
return nil
}
可以看到后台会启动一个
goroutine每隔Period时间将那些过期的assumed Pod设置过期状态, 说白了就是从cache完全删除.
3.1.4 AddPod
有三个状态可以调用
AddPod方法, 分别是Initial,expired和Assumed状态.
func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
key, err := schedulercache.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
switch {
// assumed pod -> 过来
case ok && cache.assumedPods[key]:
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
// The pod was added to a different node than it was assumed to.
klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
// Clean this up.
// 更换nodes的信息
cache.removePod(currState.pod)
cache.addPod(pod)
}
// 删除assumed 状态 变为added状态
delete(cache.assumedPods, key)
// deadline为nil bindingFinished=false
cache.podStates[key].deadline = nil
cache.podStates[key].pod = pod
case !ok:
// 可以从expired状态/也可以是initial状态 -> 过来
// Pod was expired. We should add it back.
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
default:
return fmt.Errorf("pod %v was already in added state", key)
}
return nil
}
调用完之后该
Pod已经保存到nodes中.
assumed pod -> AddPod: 表明该Pod已经在它所在的节点上已经运行了, 所以此时由assumed Pod转为Added状态了.
expired pod -> AddPod: 由于某种原因可能是网络原因, 可能会错失一些Event, 该过程中没有调用AddPod并且该Pod已经过期所以在cache已经不存在了, 所以重新加到nodes和podStates中.
UpdatePod 和 RemovePod 就不多说了, 是从
Added状态中才可以调用, 说白了就是更新一下nodes中信息.
3.1.5 AddNode
func (cache *schedulerCache) AddNode(node *v1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[node.Name]
if !ok {
n = schedulercache.NewNodeInfo()
cache.nodes[node.Name] = n
} else {
cache.removeNodeImageStates(n.Node())
}
// 添加到nodetree中
cache.nodeTree.AddNode(node)
// 设置imagestates 和 nodeinfo
cache.addNodeImageStates(node, n)
return n.SetNode(node)
}
func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *schedulercache.NodeInfo) {
newSum := make(map[string]*schedulercache.ImageStateSummary)
// 遍历该节点下所有的image
for _, image := range node.Status.Images {
for _, name := range image.Names {
// update the entry in imageStates
state, ok := cache.imageStates[name]
if !ok {
state = &imageState{
size: image.SizeBytes,
nodes: sets.NewString(node.Name),
}
cache.imageStates[name] = state
} else {
// 把该节点添加到此image的imageStates中
state.nodes.Insert(node.Name)
}
// create the imageStateSummary for this image
if _, ok := newSum[name]; !ok {
newSum[name] = cache.createImageStateSummary(state)
}
}
}
// 把该node下的ImageStateSummary放到该node下
nodeInfo.SetImageStates(newSum)
}
就是对于
node节点以及其image的维护, 包括UpdateNode,RemoveNode.
3.1.6 List 和 FilteredList
List 和 FilteredList 注意是将
nodes中所有节点中符合条件的pods返回
Snapshot 是将所有nodes和assumed pods备份
3.2 总结
该
schedulerCache其实是个工具数据结构, 在kube-scheduler调度的时候会用到, 在后续具体分析调度的时候会有涉及到具体如何使用.










网友评论