前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >k8s源码-揭开scheduler的算法面纱(上)

k8s源码-揭开scheduler的算法面纱(上)

原创
作者头像
ascehuang
发布2019-12-07 22:40:39
1.8K0
发布2019-12-07 22:40:39
举报
文章被收录于专栏:k8s源码解析k8s源码解析

上文通过源码解析了scheduler的调度流程,包括预选、优先级抢占。其中预选和优先的策略没有做详细介绍,本文则对以上的策略进行详细的解密。

1. 调度策略概览

预选

优选

CheckNodeUnschedulablePred (Node是否可调度) GeneralPred (检测资源是否充足,pod的host,port,selector是否匹配) HostNamePred (pod指定的node名称是否和node名称相同) PodFitsHostPortsPred (请求的pod的port,在该node上是否已经被占用) MatchNodeSelectorPred (NodeSelect匹配及亲和度匹配, label的匹配) PodFitsResourcesPred (资源检测) NoDiskConflictPred (检测挂载的卷和已经存在的卷是否有冲突) PodToleratesNodeTaintsPred (检测pod的容忍度能否容忍这个node上的污点 ) PodToleratesNodeNoExecuteTaintsPred CheckNodeLabelPresencePred (检测NodeLabel是否存在) CheckServiceAffinityPred (-) MaxEBSVolumeCountPred (过时) MaxGCEPDVolumeCountPred (过时) MaxCSIVolumeCountPred (检测Node的Volume数量是否超过最大值) MaxAzureDiskVolumeCountPred (过时) MaxCinderVolumeCountPred (过时) CheckVolumeBindingPred (检查该node的PV是否满足PVC) NoVolumeZoneConflictPred (Volume的Zone是否冲突) EvenPodsSpreadPred (node是否满足拓扑传播限制) MatchInterPodAffinityPred (检查是否打破pod Affinity与anti Affinity)

EqualPriority MostRequestedPriority RequestedToCapacityRatioPriority SelectorSpreadPriority ServiceSpreadingPriority InterPodAffinityPriority LeastRequestedPriority BalancedResourceAllocation NodePreferAvoidPodsPriority NodeAffinityPriority TaintTolerationPriority ImageLocalityPriority ResourceLimitsPriority EvenPodsSpreadPriority

算法目录结构
算法目录结构

预选和优选算法都在 pkg/scheduler/algorithm包下,在该包同级的包algorithmprovider注册默认算法(其实是将算法名字和function对应起来)的策略,调用的工厂类algorithm_factory进行注册。

注册入口
注册入口

2. 预选算法

源文件:pkg/scheduler/algorithm/predicates/predicates.go

首先看下预算算法定义的顺序,还记得预选过程中generic_scheduler.go#podFitsOnNode遍历了一个string数组吗?然后看配置中是否定义了使用每个算法,如果定义了,则调用predicate方法。

代码语言:txt
复制
for _, predicateKey := range predicates.Ordering() {
			var (
				fit     bool
				reasons []predicates.PredicateFailureReason
				err     error
			)

			if predicate, exist := g.predicates[predicateKey]; exist {
				fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
				if err != nil {
					return false, []predicates.PredicateFailureReason{}, nil, err
				}

				if !fit {
					// eCache is available and valid, and predicates result is unfit, record the fail reasons
					failedPredicates = append(failedPredicates, reasons...)
					// if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
					if !alwaysCheckAllPredicates {
						klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
							"evaluation is short circuited and there are chances " +
							"of other predicates failing as well.")
						break
					}
				}
			}
		}

再详细看下这个数组,是有顺序的,会顺序调用。

代码语言:txt
复制
var (
	predicatesOrdering = []string{CheckNodeUnschedulablePred,
		GeneralPred, HostNamePred, PodFitsHostPortsPred,
		MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
		PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
		CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, MaxCSIVolumeCountPred,
		MaxAzureDiskVolumeCountPred, MaxCinderVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
		EvenPodsSpreadPred, MatchInterPodAffinityPred}
)

// Ordering returns the ordering of predicates.
func Ordering() []string {
	return predicatesOrdering
}
下面详细看下各个策略的代码,太过复杂的方法,会将不影响理解的代码去掉。

2.1 CheckNodeUnschedulablePred Node是否可调度

  • 所有算法的第一步是获取node信息,如果获取不到,则返回ErrNodeUnknownCondition,当一个node下线之后,是不是经常见到这个错误
  • 检查node的标签是否unschedulable
  • 检查tolerates和taint是否容忍调度

注册的地方

代码语言:txt
复制
scheduler.RegisterMandatoryFitPredicate(predicates.CheckNodeUnschedulablePred, predicates.CheckNodeUnschedulablePredicate)

方法源码:

代码语言:txt
复制
func CheckNodeUnschedulablePredicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
	if nodeInfo == nil || nodeInfo.Node() == nil {
		return false, []PredicateFailureReason{ErrNodeUnknownCondition}, nil
	}

	// If pod tolerate unschedulable taint, it's also tolerate `node.Spec.Unschedulable`.
	podToleratesUnschedulable := v1helper.TolerationsTolerateTaint(pod.Spec.Tolerations, &v1.Taint{
		Key:    v1.TaintNodeUnschedulable,
		Effect: v1.TaintEffectNoSchedule,
	})

	// TODO (k82cn): deprecates `node.Spec.Unschedulable` in 1.13.
	if nodeInfo.Node().Spec.Unschedulable && !podToleratesUnschedulable {
		return false, []PredicateFailureReason{ErrNodeUnschedulable}, nil
	}

	return true, nil, nil
}

2.2 GeneralPred 检测资源是否充足,pod的host,port,selector是否匹配

这里又拆分为2步,调用了两个方法:noncriticalPredicates 和 EssentialPredicates

  • noncriticalPredicates:这里又调用了PodFitsResources方法,后面详细说明。
  • EssentialPredicates:基本所有的pod都需要预选的一步,包含3步:PodFitsHost, PodFitsHostPorts, PodMatchNodeSelector,后面会详细介绍每一个预选方法。

注册的地方及方法源码:

代码语言:txt
复制
scheduler.RegisterFitPredicate(predicates.GeneralPred, predicates.GeneralPredicates)

func GeneralPredicates(pod \*v1.Pod, meta Metadata, nodeInfo \*schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
	var predicateFails []PredicateFailureReason
	for \_, predicate := range []FitPredicate{noncriticalPredicates, EssentialPredicates} {
		fit, reasons, err := predicate(pod, meta, nodeInfo)
		if err != nil {
			return false, predicateFails, err
		}
		if !fit {
			predicateFails = append(predicateFails, reasons...)
		}
	}

	return len(predicateFails) == 0, predicateFails, nil
}

// noncriticalPredicates are the predicates that only non-critical pods need.
func noncriticalPredicates(pod \*v1.Pod, meta Metadata, nodeInfo \*schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
	var predicateFails []PredicateFailureReason
	fit, reasons, err := PodFitsResources(pod, meta, nodeInfo)
	if err != nil {
		return false, predicateFails, err
	}
	if !fit {
		predicateFails = append(predicateFails, reasons...)
	}

	return len(predicateFails) == 0, predicateFails, nil
}

// EssentialPredicates are the predicates that all pods, including critical pods, need.
func EssentialPredicates(pod \*v1.Pod, meta Metadata, nodeInfo \*schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
	var predicateFails []PredicateFailureReason
	// TODO: PodFitsHostPorts is essential for now, but kubelet should ideally
	//       preempt pods to free up host ports too
	for \_, predicate := range []FitPredicate{PodFitsHost, PodFitsHostPorts, PodMatchNodeSelector} {
		fit, reasons, err := predicate(pod, meta, nodeInfo)
		if err != nil {
			return false, predicateFails, err
		}
		if !fit {
			predicateFails = append(predicateFails, reasons...)
		}
	}

	return len(predicateFails) == 0, predicateFails, nil
} 

2.3 HostNamePred pod指定的node名称是否和node名称相同

注册的地方及方法源码:

代码语言:txt
复制
scheduler.RegisterFitPredicate(predicates.HostNamePred, predicates.PodFitsHost)

func PodFitsHost(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
	if len(pod.Spec.NodeName) == 0 {
		return true, nil, nil
	}
	node := nodeInfo.Node()
	if node == nil {
		return false, nil, fmt.Errorf("node not found")
	}
	if pod.Spec.NodeName == node.Name {
		return true, nil, nil
	}
	return false, []PredicateFailureReason{ErrPodNotMatchHostName}, nil
}

2.4 PodFitsHostPortsPred 请求的pod的port,在该node上是否已经被占用

注册的地方及方法源码:

代码语言:txt
复制
scheduler.RegisterFitPredicate(predicates.PodFitsHostPortsPred, predicates.PodFitsHostPorts)

func PodFitsHostPorts(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
	var wantPorts []*v1.ContainerPort
	if predicateMeta, ok := meta.(*predicateMetadata); ok && predicateMeta.podFitsHostPortsMetadata != nil {
		wantPorts = predicateMeta.podFitsHostPortsMetadata.podPorts
	} else {
		// We couldn't parse metadata - fallback to computing it.
		wantPorts = schedutil.GetContainerPorts(pod)
	}
	if len(wantPorts) == 0 {
		return true, nil, nil
	}

	existingPorts := nodeInfo.UsedPorts()

	// try to see whether existingPorts and  wantPorts will conflict or not
	if portsConflict(existingPorts, wantPorts) {
		return false, []PredicateFailureReason{ErrPodNotFitsHostPorts}, nil
	}

	return true, nil, nil
}

2.5 MatchNodeSelectorPred NodeSelect匹配及亲和度匹配

  • 亲和度的检测源码也复制过来: 注册的地方及方法源码:如果没有指定亲和度,则直接返回true,如果不满足亲和度,则不会调度到该节点,如果实在updating等过程中导致不满足亲和度,不也不会驱逐该pod
代码语言:txt
复制
scheduler.RegisterFitPredicate(predicates.MatchNodeSelectorPred, predicates.PodMatchNodeSelector)

func PodMatchNodeSelector(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
	node := nodeInfo.Node()
	if node == nil {
		return false, nil, fmt.Errorf("node not found")
	}
	if PodMatchesNodeSelectorAndAffinityTerms(pod, node) {
		return true, nil, nil
	}
	return false, []PredicateFailureReason{ErrNodeSelectorNotMatch}, nil
}

func PodMatchesNodeSelectorAndAffinityTerms(pod *v1.Pod, node *v1.Node) bool {
nodeAffinityMatches := true
	affinity := pod.Spec.Affinity
	if affinity != nil && affinity.NodeAffinity != nil {
		nodeAffinity := affinity.NodeAffinity
		// if no required NodeAffinity requirements, will do no-op, means select all nodes.
		// TODO: Replace next line with subsequent commented-out line when implement RequiredDuringSchedulingRequiredDuringExecution.
		if nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
			// if nodeAffinity.RequiredDuringSchedulingRequiredDuringExecution == nil && nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
			return true
		}

		// Match node selector for requiredDuringSchedulingRequiredDuringExecution.
		// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
		// if nodeAffinity.RequiredDuringSchedulingRequiredDuringExecution != nil {
		// 	nodeSelectorTerms := nodeAffinity.RequiredDuringSchedulingRequiredDuringExecution.NodeSelectorTerms
		// 	klog.V(10).Infof("Match for RequiredDuringSchedulingRequiredDuringExecution node selector terms %+v", nodeSelectorTerms)
		// 	nodeAffinityMatches = nodeMatchesNodeSelectorTerms(node, nodeSelectorTerms)
		// }

		// Match node selector for requiredDuringSchedulingIgnoredDuringExecution.
		if nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
			nodeSelectorTerms := nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
			klog.V(10).Infof("Match for RequiredDuringSchedulingIgnoredDuringExecution node selector terms %+v", nodeSelectorTerms)
			nodeAffinityMatches = nodeAffinityMatches && nodeMatchesNodeSelectorTerms(node, nodeSelectorTerms)
		}

	}
	}

2.6 PodFitsResourcesPred 资源检测

  • 该方法比较长,前面也提到过,概括起来是检测node资源是否充足,包括CPU,MEM,EphemeralStorage,GPU,允许的pod的数量等。
代码语言:txt
复制
scheduler.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)

func PodFitsResources(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
	node := nodeInfo.Node()
	if node == nil {
		return false, nil, fmt.Errorf("node not found")
	}

	var predicateFails []PredicateFailureReason
	allowedPodNumber := nodeInfo.AllowedPodNumber()
	if len(nodeInfo.Pods())+1 > allowedPodNumber {
		predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))
	}

	// No extended resources should be ignored by default.
	ignoredExtendedResources := sets.NewString()

	var podRequest *schedulernodeinfo.Resource
	if predicateMeta, ok := meta.(*predicateMetadata); ok && predicateMeta.podFitsResourcesMetadata != nil {
		podRequest = predicateMeta.podFitsResourcesMetadata.podRequest
		if predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources != nil {
			ignoredExtendedResources = predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources
		}
	} else {
		// We couldn't parse metadata - fallback to computing it.
		podRequest = GetResourceRequest(pod)
	}
	if podRequest.MilliCPU == 0 &&
		podRequest.Memory == 0 &&
		podRequest.EphemeralStorage == 0 &&
		len(podRequest.ScalarResources) == 0 {
		return len(predicateFails) == 0, predicateFails, nil
	}

	allocatable := nodeInfo.AllocatableResource()
	if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
		predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))
	}
	if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
		predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))
	}
	if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {
		predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))
	}

	for rName, rQuant := range podRequest.ScalarResources {
		if v1helper.IsExtendedResourceName(rName) {
			// If this resource is one of the extended resources that should be
			// ignored, we will skip checking it.
			if ignoredExtendedResources.Has(string(rName)) {
				continue
			}
		}
		if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {
			predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName]))
		}
	}

	if klog.V(10) {
		if len(predicateFails) == 0 {
			// We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is
			// not logged. There is visible performance gain from it.
			klog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",
				podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber)
		}
	}
	return len(predicateFails) == 0, predicateFails, nil
}

2.7 NoDiskConflictPred 检测挂载的卷和已经存在的卷是否有冲突

代码语言:txt
复制
scheduler.RegisterFitPredicate(predicates.NoDiskConflictPred, predicates.NoDiskConflict)

func NoDiskConflict(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
	for _, v := range pod.Spec.Volumes {
		for _, ev := range nodeInfo.Pods() {
			if isVolumeConflict(v, ev) {
				return false, []PredicateFailureReason{ErrDiskConflict}, nil
			}
		}
	}
	return true, nil, nil
}

2.8 PodToleratesNodeTaintsPred 检测pod的容忍度能否容忍这个node上的污点

  • 在CheckNodeUnschedulablePred已经调用过,这里又单独独立出来一个方法,最终调用的是v1helper的方法
  • 遍历node上的所有污点,逐个检测容忍度能否容忍污点。分3步: 1) 如果容忍度有effect,则检测容忍度和污点的effect是否相同,不相同则不能容忍; 2) key是否相同,不相同则不能容忍; 3) operator是否指定,如果指定为Exist,则能容忍,如果是Equal(空也是equal),则判断value是否相等。
代码语言:txt
复制
scheduler.RegisterMandatoryFitPredicate(predicates.PodToleratesNodeTaintsPred, predicates.PodToleratesNodeTaints)

func PodToleratesNodeTaints(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
	if nodeInfo == nil || nodeInfo.Node() == nil {
		return false, []PredicateFailureReason{ErrNodeUnknownCondition}, nil
	}

	return podToleratesNodeTaints(pod, nodeInfo, func(t *v1.Taint) bool {
		// PodToleratesNodeTaints is only interested in NoSchedule and NoExecute taints.
		return t.Effect == v1.TaintEffectNoSchedule || t.Effect == v1.TaintEffectNoExecute
	})
}

func podToleratesNodeTaints(pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, filter func(t *v1.Taint) bool) (bool, []PredicateFailureReason, error) {
	taints, err := nodeInfo.Taints()
	if err != nil {
		return false, nil, err
	}

	if v1helper.TolerationsTolerateTaintsWithFilter(pod.Spec.Tolerations, taints, filter) {
		return true, nil, nil
	}
	return false, []PredicateFailureReason{ErrTaintsTolerationsNotMatch}, nil
}

func (t *Toleration) ToleratesTaint(taint *Taint) bool {
	if len(t.Effect) > 0 && t.Effect != taint.Effect {
		return false
	}

	if len(t.Key) > 0 && t.Key != taint.Key {
		return false
	}

	// TODO: Use proper defaulting when Toleration becomes a field of PodSpec
	switch t.Operator {
	// empty operator means Equal
	case "", TolerationOpEqual:
		return t.Value == taint.Value
	case TolerationOpExists:
		return true
	default:
		return false
	}
}

2.9 PodToleratesNodeNoExecuteTaintsPred 没找到对应调用的地方,是删除了?

这里直接调用的2.7中对应的方法,但没有找到注册的地方,应该是被弃用了。

代码语言:txt
复制
func PodToleratesNodeNoExecuteTaints(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
	return podToleratesNodeTaints(pod, nodeInfo, func(t *v1.Taint) bool {
		return t.Effect == v1.TaintEffectNoExecute
	})
}

2.10 CheckNodeLabelPresencePred 检测NodeLabel是否存在,没有对应实现方法

这里只是注册了名称,给到plugin去检测,源码中则没有对应的方法,label的检测已经在MatchNodeSelectorPred实现过。

2.11 CheckServiceAffinityPred

关于ServiceAffinity的资料比较少,这里看代码及注释的意思是,如果svc的pod被调用到了某个node上,这个node上比如具有标签"region=shanghai",那这个svc剩下的pod也会调度到具有该标签的node上。这里有个svc affinity labels的概念,但API Reference1.16是没有ServiceAffinity这个字段的,跟踪了下调用链,发现是初始化default registry传进来的,暂时没弄清楚这里的关系。

代码语言:txt
复制
func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
	var services []*v1.Service
	var pods []*v1.Pod
	if pm, ok := meta.(*predicateMetadata); ok && pm.serviceAffinityMetadata != nil && (pm.serviceAffinityMetadata.matchingPodList != nil || pm.serviceAffinityMetadata.matchingPodServices != nil) {
		services = pm.serviceAffinityMetadata.matchingPodServices
		pods = pm.serviceAffinityMetadata.matchingPodList
	} else {
		// Make the predicate resilient in case metadata is missing.
		pm = &predicateMetadata{pod: pod}
		s.serviceAffinityMetadataProducer(pm)
		pods, services = pm.serviceAffinityMetadata.matchingPodList, pm.serviceAffinityMetadata.matchingPodServices
	}
	filteredPods := nodeInfo.FilterOutPods(pods)
	node := nodeInfo.Node()
	if node == nil {
		return false, nil, fmt.Errorf("node not found")
	}
	// check if the pod being scheduled has the affinity labels specified in its NodeSelector
	affinityLabels := FindLabelsInSet(s.labels, labels.Set(pod.Spec.NodeSelector))
	// Step 1: If we don't have all constraints, introspect nodes to find the missing constraints.
	if len(s.labels) > len(affinityLabels) {
		if len(services) > 0 {
			if len(filteredPods) > 0 {
				nodeWithAffinityLabels, err := s.nodeInfoLister.Get(filteredPods[0].Spec.NodeName)
				if err != nil {
					return false, nil, err
				}
				AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(nodeWithAffinityLabels.Node().Labels))
			}
		}
	}
	// Step 2: Finally complete the affinity predicate based on whatever set of predicates we were able to find.
	if CreateSelectorFromLabels(affinityLabels).Matches(labels.Set(node.Labels)) {
		return true, nil, nil
	}
	return false, []PredicateFailureReason{ErrServiceAffinityViolated}, nil
}

2.12 MaxCSIVolumeCountPred 检测Node的Volume数量是否超过最大值

CSI 代表容器存储接口,Container Storage Interface, 这里主要检测node上挂载的Volume的数量是否已经超过最大值,如果没有,则返回True,如果已经大于最大值,则返回失败。

代码语言:txt
复制
func (c *CSIMaxVolumeLimitChecker) attachableLimitPredicate(
	pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
	// If the new pod doesn't have any volume attached to it, the predicate will always be true
	if len(pod.Spec.Volumes) == 0 {
		return true, nil, nil
	}

	node := nodeInfo.Node()
	if node == nil {
		return false, nil, fmt.Errorf("node not found")
	}

	// If CSINode doesn't exist, the predicate may read the limits from Node object
	csiNode, err := c.csiNodeLister.Get(node.Name)
	if err != nil {
		// TODO: return the error once CSINode is created by default (2 releases)
		klog.V(5).Infof("Could not get a CSINode object for the node: %v", err)
	}

	newVolumes := make(map[string]string)
	if err := c.filterAttachableVolumes(csiNode, pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {
		return false, nil, err
	}

	// If the pod doesn't have any new CSI volumes, the predicate will always be true
	if len(newVolumes) == 0 {
		return true, nil, nil
	}

	// If the node doesn't have volume limits, the predicate will always be true
	nodeVolumeLimits := getVolumeLimits(nodeInfo, csiNode)
	if len(nodeVolumeLimits) == 0 {
		return true, nil, nil
	}

	attachedVolumes := make(map[string]string)
	for _, existingPod := range nodeInfo.Pods() {
		if err := c.filterAttachableVolumes(csiNode, existingPod.Spec.Volumes, existingPod.Namespace, attachedVolumes); err != nil {
			return false, nil, err
		}
	}

	attachedVolumeCount := map[string]int{}
	for volumeUniqueName, volumeLimitKey := range attachedVolumes {
		if _, ok := newVolumes[volumeUniqueName]; ok {
			// Don't count single volume used in multiple pods more than once
			delete(newVolumes, volumeUniqueName)
		}
		attachedVolumeCount[volumeLimitKey]++
	}

	newVolumeCount := map[string]int{}
	for _, volumeLimitKey := range newVolumes {
		newVolumeCount[volumeLimitKey]++
	}

	for volumeLimitKey, count := range newVolumeCount {
		maxVolumeLimit, ok := nodeVolumeLimits[v1.ResourceName(volumeLimitKey)]
		if ok {
			currentVolumeCount := attachedVolumeCount[volumeLimitKey]
			if currentVolumeCount+count > int(maxVolumeLimit) {
				return false, []PredicateFailureReason{ErrMaxVolumeCountExceeded}, nil
			}
		}
	}

	return true, nil, nil
}

2.13 CheckVolumeBindingPred 检查该node的PV是否满足PVC

  • 如果没有PVC,直接返回True
  • 如果有PVC,则查看该node上是否有满足的PV 1) Volume Mode是否满足 2) Volume是否被删除 3) Node的亲和性是否满足PV的NodeAffinity 4) Volume是否已经绑定到其他PVC上了 5) Volume是否可用 6) Volume的label是否满足PVC的selector 6) 访问Mode是否满足
代码语言:txt
复制
func (c *VolumeBindingChecker) predicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
	// If pod does not request any PVC, we don't need to do anything.
	if !podHasPVCs(pod) {
		return true, nil, nil
	}

	node := nodeInfo.Node()
	if node == nil {
		return false, nil, fmt.Errorf("node not found")
	}

	unboundSatisfied, boundSatisfied, err := c.binder.Binder.FindPodVolumes(pod, node)
	if err != nil {
		return false, nil, err
	}

	failReasons := []PredicateFailureReason{}
	if !boundSatisfied {
		klog.V(5).Infof("Bound PVs not satisfied for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
		failReasons = append(failReasons, ErrVolumeNodeConflict)
	}

	if !unboundSatisfied {
		klog.V(5).Infof("Couldn't find matching PVs for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
		failReasons = append(failReasons, ErrVolumeBindConflict)
	}

	if len(failReasons) > 0 {
		return false, failReasons, nil
	}

	// All volumes bound or matching PVs found for all unbound PVCs
	klog.V(5).Infof("All PVCs found matches for pod %v/%v, node %q", pod.Namespace, pod.Name, node.Name)
	return true, nil, nil
}

func FindMatchingVolume(
	claim *v1.PersistentVolumeClaim,
	volumes []*v1.PersistentVolume,
	node *v1.Node,
	excludedVolumes map[string]*v1.PersistentVolume,
	delayBinding bool) (*v1.PersistentVolume, error) {

	var smallestVolume *v1.PersistentVolume
	var smallestVolumeQty resource.Quantity
	requestedQty := claim.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
	requestedClass := v1helper.GetPersistentVolumeClaimClass(claim)

	var selector labels.Selector
	if claim.Spec.Selector != nil {
		internalSelector, err := metav1.LabelSelectorAsSelector(claim.Spec.Selector)
		if err != nil {
			// should be unreachable code due to validation
			return nil, fmt.Errorf("error creating internal label selector for claim: %v: %v", claimToClaimKey(claim), err)
		}
		selector = internalSelector
	}

	// Go through all available volumes with two goals:
	// - find a volume that is either pre-bound by user or dynamically
	//   provisioned for this claim. Because of this we need to loop through
	//   all volumes.
	// - find the smallest matching one if there is no volume pre-bound to
	//   the claim.
	for _, volume := range volumes {
		if _, ok := excludedVolumes[volume.Name]; ok {
			// Skip volumes in the excluded list
			continue
		}

		volumeQty := volume.Spec.Capacity[v1.ResourceStorage]

		// filter out mismatching volumeModes
		if CheckVolumeModeMismatches(&claim.Spec, &volume.Spec) {
			continue
		}

		// check if PV's DeletionTimeStamp is set, if so, skip this volume.
		if utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection) {
			if volume.ObjectMeta.DeletionTimestamp != nil {
				continue
			}
		}

		nodeAffinityValid := true
		if node != nil {
			// Scheduler path, check that the PV NodeAffinity
			// is satisfied by the node
			err := volumeutil.CheckNodeAffinity(volume, node.Labels)
			if err != nil {
				nodeAffinityValid = false
			}
		}

		if IsVolumeBoundToClaim(volume, claim) {
			// this claim and volume are pre-bound; return
			// the volume if the size request is satisfied,
			// otherwise continue searching for a match
			if volumeQty.Cmp(requestedQty) < 0 {
				continue
			}

			// If PV node affinity is invalid, return no match.
			// This means the prebound PV (and therefore PVC)
			// is not suitable for this node.
			if !nodeAffinityValid {
				return nil, nil
			}

			return volume, nil
		}

		if node == nil && delayBinding {
			// PV controller does not bind this claim.
			// Scheduler will handle binding unbound volumes
			// Scheduler path will have node != nil
			continue
		}

		// filter out:
		// - volumes in non-available phase
		// - volumes bound to another claim
		// - volumes whose labels don't match the claim's selector, if specified
		// - volumes in Class that is not requested
		// - volumes whose NodeAffinity does not match the node
		if volume.Status.Phase != v1.VolumeAvailable {
			// We ignore volumes in non-available phase, because volumes that
			// satisfies matching criteria will be updated to available, binding
			// them now has high chance of encountering unnecessary failures
			// due to API conflicts.
			continue
		} else if volume.Spec.ClaimRef != nil {
			continue
		} else if selector != nil && !selector.Matches(labels.Set(volume.Labels)) {
			continue
		}
		if v1helper.GetPersistentVolumeClass(volume) != requestedClass {
			continue
		}
		if !nodeAffinityValid {
			continue
		}

		if node != nil {
			// Scheduler path
			// Check that the access modes match
			if !CheckAccessModes(claim, volume) {
				continue
			}
		}

		if volumeQty.Cmp(requestedQty) >= 0 {
			if smallestVolume == nil || smallestVolumeQty.Cmp(volumeQty) > 0 {
				smallestVolume = volume
				smallestVolumeQty = volumeQty
			}
		}
	}

	if smallestVolume != nil {
		// Found a matching volume
		return smallestVolume, nil
	}

	return nil, nil
}

2.14 NoVolumeZoneConflictPred Volume的Zone是否冲突

针对node和pv打了zone或者region标签的调度,如果有冲突则返回False

代码语言:txt
复制
func (c *VolumeZoneChecker) predicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
	// If a pod doesn't have any volume attached to it, the predicate will always be true.
	// Thus we make a fast path for it, to avoid unnecessary computations in this case.
	if len(pod.Spec.Volumes) == 0 {
		return true, nil, nil
	}

	node := nodeInfo.Node()
	if node == nil {
		return false, nil, fmt.Errorf("node not found")
	}

	nodeConstraints := make(map[string]string)
	for k, v := range node.ObjectMeta.Labels {
		if k != v1.LabelZoneFailureDomain && k != v1.LabelZoneRegion {
			continue
		}
		nodeConstraints[k] = v
	}

	if len(nodeConstraints) == 0 {
		// The node has no zone constraints, so we're OK to schedule.
		// In practice, when using zones, all nodes must be labeled with zone labels.
		// We want to fast-path this case though.
		return true, nil, nil
	}

	namespace := pod.Namespace
	manifest := &(pod.Spec)
	for i := range manifest.Volumes {
		volume := &manifest.Volumes[i]
		if volume.PersistentVolumeClaim != nil {
			pvcName := volume.PersistentVolumeClaim.ClaimName
			if pvcName == "" {
				return false, nil, fmt.Errorf("PersistentVolumeClaim had no name")
			}
			pvc, err := c.pvcLister.PersistentVolumeClaims(namespace).Get(pvcName)
			if err != nil {
				return false, nil, err
			}

			if pvc == nil {
				return false, nil, fmt.Errorf("PersistentVolumeClaim was not found: %q", pvcName)
			}

			pvName := pvc.Spec.VolumeName
			if pvName == "" {
				scName := v1helper.GetPersistentVolumeClaimClass(pvc)
				if len(scName) > 0 {
					class, _ := c.scLister.Get(scName)
					if class != nil {
						if class.VolumeBindingMode == nil {
							return false, nil, fmt.Errorf("VolumeBindingMode not set for StorageClass %q", scName)
						}
						if *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer {
							// Skip unbound volumes
							continue
						}
					}
				}
				return false, nil, fmt.Errorf("PersistentVolumeClaim was not found: %q", pvcName)
			}

			pv, err := c.pvLister.Get(pvName)
			if err != nil {
				return false, nil, err
			}

			if pv == nil {
				return false, nil, fmt.Errorf("PersistentVolume was not found: %q", pvName)
			}

			for k, v := range pv.ObjectMeta.Labels {
				if k != v1.LabelZoneFailureDomain && k != v1.LabelZoneRegion {
					continue
				}
				nodeV, _ := nodeConstraints[k]
				volumeVSet, err := volumehelpers.LabelZonesToSet(v)
				if err != nil {
					klog.Warningf("Failed to parse label for %q: %q. Ignoring the label. err=%v. ", k, v, err)
					continue
				}

				if !volumeVSet.Has(nodeV) {
					klog.V(10).Infof("Won't schedule pod %q onto node %q due to volume %q (mismatch on %q)", pod.Name, node.Name, pvName, k)
					return false, []PredicateFailureReason{ErrVolumeZoneConflict}, nil
				}
			}
		}
	}

	return true, nil, nil
}

2.15 EvenPodsSpreadPred node是否满足拓扑传播限制

这个是1.16新加入的限制,拓扑传播限制, 但一些区域或者节点出现故障,可以进行限制pod被调度到这个故障区域里面。用户也可以自己定义拓扑区域。

代码语言:txt
复制
func EvenPodsSpreadPredicate(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
	node := nodeInfo.Node()
	if node == nil {
		return false, nil, fmt.Errorf("node not found")
	}

	var epsMeta *evenPodsSpreadMetadata
	if predicateMeta, ok := meta.(*predicateMetadata); ok {
		epsMeta = predicateMeta.evenPodsSpreadMetadata
	} else { // We don't have precomputed metadata. We have to follow a slow path to check spread constraints.
		// TODO(autoscaler): get it implemented
		return false, nil, errors.New("metadata not pre-computed for EvenPodsSpreadPredicate")
	}

	if epsMeta == nil || len(epsMeta.tpPairToMatchNum) == 0 || len(epsMeta.constraints) == 0 {
		return true, nil, nil
	}

	podLabelSet := labels.Set(pod.Labels)
	for _, c := range epsMeta.constraints {
		tpKey := c.topologyKey
		tpVal, ok := node.Labels[c.topologyKey]
		if !ok {
			klog.V(5).Infof("node '%s' doesn't have required label '%s'", node.Name, tpKey)
			return false, []PredicateFailureReason{ErrTopologySpreadConstraintsNotMatch}, nil
		}

		selfMatchNum := int32(0)
		if c.selector.Matches(podLabelSet) {
			selfMatchNum = 1
		}

		pair := topologyPair{key: tpKey, value: tpVal}
		paths, ok := epsMeta.tpKeyToCriticalPaths[tpKey]
		if !ok {
			// error which should not happen
			klog.Errorf("internal error: get paths from key %q of %#v", tpKey, epsMeta.tpKeyToCriticalPaths)
			continue
		}
		// judging criteria:
		// 'existing matching num' + 'if self-match (1 or 0)' - 'global min matching num' <= 'maxSkew'
		minMatchNum := paths[0].matchNum
		matchNum := epsMeta.tpPairToMatchNum[pair]
		skew := matchNum + selfMatchNum - minMatchNum
		if skew > c.maxSkew {
			klog.V(5).Infof("node '%s' failed spreadConstraint[%s]: matchNum(%d) + selfMatchNum(%d) - minMatchNum(%d) > maxSkew(%d)", node.Name, tpKey, matchNum, selfMatchNum, minMatchNum, c.maxSkew)
			return false, []PredicateFailureReason{ErrTopologySpreadConstraintsNotMatch}, nil
		}
	}

	return true, nil, nil
}

2.16 MatchInterPodAffinityPred 检查是否打破pod Affinity与anti Affinity

  • 如果pod调度到该node上,该node的其他存在的pod的反亲和性是否满足
  • 如果pod调度到该node上,该pod的亲和性或者反亲和性是否满足
代码语言:txt
复制
func (c *PodAffinityChecker) InterPodAffinityMatches(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
	node := nodeInfo.Node()
	if node == nil {
		return false, nil, fmt.Errorf("node not found")
	}
	if failedPredicates, error := c.satisfiesExistingPodsAntiAffinity(pod, meta, nodeInfo); failedPredicates != nil {
		failedPredicates := append([]PredicateFailureReason{ErrPodAffinityNotMatch}, failedPredicates)
		return false, failedPredicates, error
	}

	// Now check if <pod> requirements will be satisfied on this node.
	affinity := pod.Spec.Affinity
	if affinity == nil || (affinity.PodAffinity == nil && affinity.PodAntiAffinity == nil) {
		return true, nil, nil
	}
	if failedPredicates, error := c.satisfiesPodsAffinityAntiAffinity(pod, meta, nodeInfo, affinity); failedPredicates != nil {
		failedPredicates := append([]PredicateFailureReason{ErrPodAffinityNotMatch}, failedPredicates)
		return false, failedPredicates, error
	}

	if klog.V(10) {
		// We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is
		// not logged. There is visible performance gain from it.
		klog.Infof("Schedule Pod %+v on Node %+v is allowed, pod (anti)affinity constraints satisfied",
			podName(pod), node.Name)
	}
	return true, nil, nil
}

3 后记

本来打算一节都梳理完,但是发现内容确实比较多,这节先过了下预选算法,下一节将分析下优选算法的源码。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 调度策略概览
  • 2. 预选算法
    • 2.1 CheckNodeUnschedulablePred Node是否可调度
      • 2.2 GeneralPred 检测资源是否充足,pod的host,port,selector是否匹配
        • 2.3 HostNamePred pod指定的node名称是否和node名称相同
          • 2.4 PodFitsHostPortsPred 请求的pod的port,在该node上是否已经被占用
            • 2.5 MatchNodeSelectorPred NodeSelect匹配及亲和度匹配
              • 2.6 PodFitsResourcesPred 资源检测
                • 2.7 NoDiskConflictPred 检测挂载的卷和已经存在的卷是否有冲突
                  • 2.8 PodToleratesNodeTaintsPred 检测pod的容忍度能否容忍这个node上的污点
                    • 2.9 PodToleratesNodeNoExecuteTaintsPred 没找到对应调用的地方,是删除了?
                      • 2.10 CheckNodeLabelPresencePred 检测NodeLabel是否存在,没有对应实现方法
                        • 2.11 CheckServiceAffinityPred
                          • 2.12 MaxCSIVolumeCountPred 检测Node的Volume数量是否超过最大值
                            • 2.13 CheckVolumeBindingPred 检查该node的PV是否满足PVC
                              • 2.14 NoVolumeZoneConflictPred Volume的Zone是否冲突
                                • 2.15 EvenPodsSpreadPred node是否满足拓扑传播限制
                                  • 2.16 MatchInterPodAffinityPred 检查是否打破pod Affinity与anti Affinity
                                    • 3 后记
                                    相关产品与服务
                                    容器服务
                                    腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                                    领券
                                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
                                    http://www.vxiaotou.com