动态

详情 返回 返回

HAMi vGPU 原理分析 Part4:Spread&Binpack 高級調度策略實現 - 动态 详情

hami-analyze-4-spread-binpark-policy.png

上篇我們分析了 hami-scheduler 工作流程,知道了 hami-webhook、hami-scheduler 是怎麼配合工作的。

本文為 HAMi 原理分析的第四篇,分析 hami-scheduler 在調度時是如何選擇節點的,即:Spread、Binpack 等高級調度策略是怎麼實現的。

<!--more-->

這篇文章我們解決最後一個問題:_Spread、Binpack 等高級調度策略是怎麼實現的_

以下分析基於 HAMi v2.4.0

這裏在貼一下上一篇總結的 HAMi Webhook 、Scheduler 工作流程:

hami-scheduler.png

  • 1)用户創建 Pod 並在 Pod 中申請了 vGPU 資源
  • 2)kube-apiserver 根據 MutatingWebhookConfiguration 配置請求 HAMi-Webhook
  • 3)HAMi-Webhook 檢測 Pod 中的 Resource,發現是申請的由 HAMi 管理的 vGPU 資源,因此把 Pod 中的 SchedulerName 改成了 hami-scheduler,這樣這個 Pod 就會由 hami-scheduler 進行調度了。

    • 對於特權模式的 Pod,Webhook 會直接跳過不處理
    • 對於使用 vGPU 資源但指定了 nodeName 的 Pod,Webhook 會直接拒絕
  • 4)hami-scheduler 進行 Pod 調度,不過就是用的 k8s 的默認 kube-scheduler 鏡像,因此調度邏輯和默認的 default-scheduler 是一樣的,kube-scheduler 根據 KubeSchedulerConfiguration 配置,調用 Extender Scheduler 插件

    • 這個 Extender Scheduler 就是 hami-scheduler Pod 中的另一個 Container,該 Container 同時提供了 Webhook 和 Scheduler 相關 API。
    • 當 Pod 申請了 vGPU 資源時,kube-scheduler 就會根據配置以 HTTP 形式調用 Extender Scheduler 插件,這樣就實現了自定義調度邏輯
  • 5)Extender Scheduler 插件包含了真正的 hami 調度邏輯, 調度時根據節點剩餘資源量進行打分選擇節點
  • 6)異步任務,包括 GPU 感知邏輯

    • devicePlugin 中的後台 Goroutine 定時上報 Node 上的 GPU 資源並寫入到 Node 的 Annoations
    • Extender Scheduler 插件根據 Node Annoations 解析出 GPU 資源總量、從 Node 上已經運行的 Pod 的 Annoations 中解析出 GPU 使用量,計算出每個 Node 剩餘的可用資源保存到內存供調度時使用

1. 配置調度策略

hami-scheduler 提供了兩種不同級別的調度策略:

  • 節點調度策略:作用於調度過程中如何為 Pod 選擇節點
  • GPU 調度策略:作用於選擇節點後,節點存在多 GPU 時如何為 Pod 選擇 GPU

根據部署文檔,我們可以在部署時指定調度策略

  • scheduler.defaultSchedulerPolicy.nodeSchedulerPolicy: 字符串類型,預設值為"binpack",表示 GPU 節點調度策略。

    • "binpack"表示儘量將任務分配到同一個 GPU 節點上
    • "spread"表示儘量將任務分配到不同 GPU 節點上。
  • scheduler.defaultSchedulerPolicy.gpuSchedulerPolicy: 字符串類型,預設值為"spread", 表示 GPU 調度策略。

    • "binpack"表示儘量將任務分配到同一個 GPU 上
    • "spread"表示儘量將任務分配到不同 GPU 上。

就像這樣:

helm install vgpu vgpu-charts/vgpu --set scheduler.defaultSchedulerPolicy.nodeSchedulerPolicy=binpark --set scheduler.defaultSchedulerPolicy.gpuSchedulerPolicy=spread

部署後,這兩個配置作用域 hami-scheduler 上具體如下:

kk get deploy vgpu-hami-scheduler -oyaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: vgpu-hami-scheduler
  namespace: kube-system
spec:
  template:
    spec:
      containers:
      - command:
        - scheduler
        - --resource-name=nvidia.com/gpu
        - --resource-mem=nvidia.com/gpumem
        - --resource-cores=nvidia.com/gpucores
        - --resource-mem-percentage=nvidia.com/gpumem-percentage
        - --resource-priority=nvidia.com/priority
        - --http_bind=0.0.0.0:443
        - --cert_file=/tls/tls.crt
        - --key_file=/tls/tls.key
        - --scheduler-name=hami-scheduler
        - --metrics-bind-address=:9395
        - --default-mem=0
        - --default-gpu=1
        - --default-cores=0
        - --iluvatar-memory=iluvatar.ai/vcuda-memory
        - --iluvatar-cores=iluvatar.ai/vcuda-core
        - --cambricon-mlu-name=cambricon.com/vmlu
        - --cambricon-mlu-memory=cambricon.com/mlu.smlu.vmemory
        - --cambricon-mlu-cores=cambricon.com/mlu.smlu.vcore
        - --ascend-name=huawei.com/Ascend910
        - --ascend-memory=huawei.com/Ascend910-memory
        - --ascend310p-name=huawei.com/Ascend310P
        - --ascend310p-memory=huawei.com/Ascend310P-memory
        - --overwrite-env=false
        - --node-scheduler-policy=binpack
        - --gpu-scheduler-policy=spread

就是這兩個參數

- --node-scheduler-policy=binpack
- --gpu-scheduler-policy=spread

2. Node 調度策略原理

這部分比較簡單,選擇節點的邏輯就在 Filter 接口中。

// pkg/scheduler/scheduler.go#L444
func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {
    klog.InfoS("begin schedule filter", "pod", args.Pod.Name, "uuid", args.Pod.UID, "namespaces", args.Pod.Namespace)
    nums := k8sutil.Resourcereqs(args.Pod)
    total := 0
    for _, n := range nums {
        for _, k := range n {
            total += int(k.Nums)
        }
    }
    if total == 0 {
        klog.V(1).Infof("pod %v not find resource", args.Pod.Name)
        s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("does not request any resource"))
        return &extenderv1.ExtenderFilterResult{
            NodeNames:   args.NodeNames,
            FailedNodes: nil,
            Error:       "",
        }, nil
    }
    annos := args.Pod.Annotations
    s.delPod(args.Pod)
    nodeUsage, failedNodes, err := s.getNodesUsage(args.NodeNames, args.Pod)
    if err != nil {
        s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
        return nil, err
    }
    if len(failedNodes) != 0 {
        klog.V(5).InfoS("getNodesUsage failed nodes", "nodes", failedNodes)
    }
    nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod)
    if err != nil {
        err := fmt.Errorf("calcScore failed %v for pod %v", err, args.Pod.Name)
        s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
        return nil, err
    }
    if len((*nodeScores).NodeList) == 0 {
        klog.V(4).Infof("All node scores do not meet for pod %v", args.Pod.Name)
        s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("no available node, all node scores do not meet"))
        return &extenderv1.ExtenderFilterResult{
            FailedNodes: failedNodes,
        }, nil
    }
    klog.V(4).Infoln("nodeScores_len=", len((*nodeScores).NodeList))
    sort.Sort(nodeScores)
    m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
    klog.Infof("schedule %v/%v to %v %v", args.Pod.Namespace, args.Pod.Name, m.NodeID, m.Devices)
    annotations := make(map[string]string)
    annotations[util.AssignedNodeAnnotations] = m.NodeID
    annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10)

    for _, val := range device.GetDevices() {
        val.PatchAnnotations(&annotations, m.Devices)
    }

    //InRequestDevices := util.EncodePodDevices(util.InRequestDevices, m.devices)
    //supportDevices := util.EncodePodDevices(util.SupportDevices, m.devices)
    //maps.Copy(annotations, InRequestDevices)
    //maps.Copy(annotations, supportDevices)
    s.addPod(args.Pod, m.NodeID, m.Devices)
    err = util.PatchPodAnnotations(args.Pod, annotations)
    if err != nil {
        s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
        s.delPod(args.Pod)
        return nil, err
    }
    s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringSucceed, []string{m.NodeID}, nil)
    res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}
    return &res, nil
}

主要就是下面這幾句:

//計算得分,拿到所有滿足條件的節點
nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod)

// 排序
sort.Sort(nodeScores)
// 直接選擇最後一個節點
m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]

// 返回結果
res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}
return &res, nil

可以分為兩個部分:

  • 1)為所有節點計算得分
  • 2)根據調度策略選擇最合適的節點

計算得分

得分計算邏輯在 calcScore 方法裏:

// pkg/scheduler/score.go#L185
func (s *Scheduler) calcScore(nodes *map[string]*NodeUsage, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*policy.NodeScoreList, error) {
    userNodePolicy := config.NodeSchedulerPolicy
    if annos != nil {
        if value, ok := annos[policy.NodeSchedulerPolicyAnnotationKey]; ok {
            userNodePolicy = value
        }
    }
    res := policy.NodeScoreList{
        Policy:   userNodePolicy,
        NodeList: make([]*policy.NodeScore, 0),
    }

    //func calcScore(nodes *map[string]*NodeUsage, errMap *map[string]string, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*NodeScoreList, error) {
    //    res := make(NodeScoreList, 0, len(*nodes))
    for nodeID, node := range *nodes {
        viewStatus(*node)
        score := policy.NodeScore{NodeID: nodeID, Devices: make(util.PodDevices), Score: 0}
        score.ComputeScore(node.Devices)

        //This loop is for different container request
        ctrfit := false
        for ctrid, n := range nums {
            sums := 0
            for _, k := range n {
                sums += int(k.Nums)
            }

            if sums == 0 {
                for idx := range score.Devices {
                    if len(score.Devices[idx]) <= ctrid {
                        score.Devices[idx] = append(score.Devices[idx], util.ContainerDevices{})
                    }
                    score.Devices[idx][ctrid] = append(score.Devices[idx][ctrid], util.ContainerDevice{})
                    continue
                }
            }
            klog.V(5).InfoS("fitInDevices", "pod", klog.KObj(task), "node", nodeID)
            fit, _ := fitInDevices(node, n, annos, task, &score.Devices)
            ctrfit = fit
            if !fit {
                klog.InfoS("calcScore:node not fit pod", "pod", klog.KObj(task), "node", nodeID)
                break
            }
        }

        if ctrfit {
            res.NodeList = append(res.NodeList, &score)
        }
    }
    return &res, nil
}

具體算法在 ComputeScore 中:

// pkg/scheduler/policy/node_policy.go#L53
func (ns *NodeScore) ComputeScore(devices DeviceUsageList) {
    // current user having request resource
    used, usedCore, usedMem := int32(0), int32(0), int32(0)
    for _, device := range devices.DeviceLists {
        used += device.Device.Used
        usedCore += device.Device.Usedcores
        usedMem += device.Device.Usedmem
    }
    klog.V(2).Infof("node %s used %d, usedCore %d, usedMem %d,", ns.NodeID, used, usedCore, usedMem)

    total, totalCore, totalMem := int32(0), int32(0), int32(0)
    for _, deviceLists := range devices.DeviceLists {
        total += deviceLists.Device.Count
        totalCore += deviceLists.Device.Totalcore
        totalMem += deviceLists.Device.Totalmem
    }
    useScore := float32(used) / float32(total)
    coreScore := float32(usedCore) / float32(totalCore)
    memScore := float32(usedMem) / float32(totalMem)
    ns.Score = float32(Weight) * (useScore + coreScore + memScore)
    klog.V(2).Infof("node %s computer score is %f", ns.NodeID, ns.Score)
}

具體打分邏輯則是根據每個節點上的已經使用的 GPU Core、GPU Memory 資源和總的 GPU Core、GPU Memory 的比值,根據權重歸一化處理後得到最終的得分。

總的來説就是:節點上 GPU Core 和 GPU Memory 資源剩餘越少,得分越高

乍一看這個邏輯有點反直覺了,不是應該資源越多得分越高嗎。

問題不大,等看完後續 根據策略選擇節點 章節就清楚了。

過濾節點

打分之後還需要根據 Pod 申請的 GPU 信息,過濾掉不滿足條件的節點。

比如:Pod 申請 2 vGPU,Node 上只有一張卡,肯定是不行的。

解析 Pod 申請的 GPU 信息

首先是從 Pod 信息中解析出申請的 GPU 信息:

// pkg/scheduler/scheduler.go#L444
func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {
    nums := k8sutil.Resourcereqs(args.Pod)
}

Resourcereqs 內容如下:

// pkg/k8sutil/pod.go#L27
func Resourcereqs(pod *corev1.Pod) (counts util.PodDeviceRequests) {
    counts = make(util.PodDeviceRequests, len(pod.Spec.Containers))
    //Count Nvidia GPU
    for i := 0; i < len(pod.Spec.Containers); i++ {
        devices := device.GetDevices()
        counts[i] = make(util.ContainerDeviceRequests)
        for idx, val := range devices {
            request := val.GenerateResourceRequests(&pod.Spec.Containers[i])
            if request.Nums > 0 {
                counts[i][idx] = val.GenerateResourceRequests(&pod.Spec.Containers[i])
            }
        }
    }
    klog.InfoS("collect requestreqs", "counts", counts)
    return counts
}

GenerateResourceRequests 是 Interface,以 NVIDIA 實現為例

// pkg/device/nvidia/device.go#L264
func (dev *NvidiaGPUDevices) GenerateResourceRequests(ctr *corev1.Container) util.ContainerDeviceRequest {
    resourceName := corev1.ResourceName(ResourceName)
    resourceMem := corev1.ResourceName(ResourceMem)
    resourceMemPercentage := corev1.ResourceName(ResourceMemPercentage)
    resourceCores := corev1.ResourceName(ResourceCores)
    v, ok := ctr.Resources.Limits[resourceName]
    if !ok {
        v, ok = ctr.Resources.Requests[resourceName]
    }
    if ok {
        if n, ok := v.AsInt64(); ok {
            memnum := 0
            mem, ok := ctr.Resources.Limits[resourceMem]
            if !ok {
                mem, ok = ctr.Resources.Requests[resourceMem]
            }
            if ok {
                memnums, ok := mem.AsInt64()
                if ok {
                    memnum = int(memnums)
                }
            }
            mempnum := int32(101)
            mem, ok = ctr.Resources.Limits[resourceMemPercentage]
            if !ok {
                mem, ok = ctr.Resources.Requests[resourceMemPercentage]
            }
            if ok {
                mempnums, ok := mem.AsInt64()
                if ok {
                    mempnum = int32(mempnums)
                }
            }
            if mempnum == 101 && memnum == 0 {
                if config.DefaultMem != 0 {
                    memnum = int(config.DefaultMem)
                } else {
                    mempnum = 100
                }
            }
            corenum := config.DefaultCores
            core, ok := ctr.Resources.Limits[resourceCores]
            if !ok {
                core, ok = ctr.Resources.Requests[resourceCores]
            }
            if ok {
                corenums, ok := core.AsInt64()
                if ok {
                    corenum = int32(corenums)
                }
            }
            return util.ContainerDeviceRequest{
                Nums:             int32(n),
                Type:             NvidiaGPUDevice,
                Memreq:           int32(memnum),
                MemPercentagereq: int32(mempnum),
                Coresreq:         int32(corenum),
            }
        }
    }
    return util.ContainerDeviceRequest{}
}

邏輯也比較簡單,就是從 Container 的 Resources 中根據名稱解析拿到申請的 gpu、gpucore、gpumem 等信息。

過濾節點

邏輯同樣在 calcScore 方法中,具體如下:

        ctrfit := false
        for ctrid, n := range nums {
            sums := 0
            for _, k := range n {
                sums += int(k.Nums)
            }

            if sums == 0 {
                for idx := range score.Devices {
                    if len(score.Devices[idx]) <= ctrid {
                        score.Devices[idx] = append(score.Devices[idx], util.ContainerDevices{})
                    }
                    score.Devices[idx][ctrid] = append(score.Devices[idx][ctrid], util.ContainerDevice{})
                    continue
                }
            }
            klog.V(5).InfoS("fitInDevices", "pod", klog.KObj(task), "node", nodeID)
            fit, _ := fitInDevices(node, n, annos, task, &score.Devices)
            ctrfit = fit
            if !fit {
                klog.InfoS("calcScore:node not fit pod", "pod", klog.KObj(task), "node", nodeID)
                break
            }
        }

        if ctrfit {
            res.NodeList = append(res.NodeList, &score)
        }
  • nums 就是上一步解析出來的 Pod 的 GPU 申請情況
  • score.Devices:就是當前節點上的 GPU 設備

具體過濾規則在這裏:

fit, _ := fitInDevices(node, n, annos, task, &score.Devices)

fitInDevices 內容如下:

內容比較多,去掉了其他無關內容,主要就是做了這幾個判斷,如果都滿足則記錄對應的 GPU 信息並返回 true,否則返回 false,表示該節點無法調度。

func fitInCertainDevice(node *NodeUsage, request util.ContainerDeviceRequest, annos map[string]string, pod *corev1.Pod) (bool, map[string]util.ContainerDevices) {
        // ....
     for i := len(node.Devices.DeviceLists) - 1; i >= 0; i-- {
          if node.Devices.DeviceLists[i].Device.Totalmem-node.Devices.DeviceLists[i].Device.Usedmem < memreq {
            continue
        }
        if node.Devices.DeviceLists[i].Device.Totalcore-node.Devices.DeviceLists[i].Device.Usedcores < k.Coresreq {
            continue
        }
        // Coresreq=100 indicates it want this card exclusively
        if node.Devices.DeviceLists[i].Device.Totalcore == 100 && k.Coresreq == 100 && node.Devices.DeviceLists[i].Device.Used > 0 {
            continue
        }
        // You can't allocate core=0 job to an already full GPU
        if node.Devices.DeviceLists[i].Device.Totalcore != 0 && node.Devices.DeviceLists[i].Device.Usedcores == node.Devices.DeviceLists[i].Device.Totalcore && k.Coresreq == 0 {
            continue
        }
        if k.Nums > 0 {
            klog.InfoS("first fitted", "pod", klog.KObj(pod), "device", node.Devices.DeviceLists[i].Device.ID)
            k.Nums--
            tmpDevs[k.Type] = append(tmpDevs[k.Type], util.ContainerDevice{
                Idx:       int(node.Devices.DeviceLists[i].Device.Index),
                UUID:      node.Devices.DeviceLists[i].Device.ID,
                Type:      k.Type,
                Usedmem:   memreq,
                Usedcores: k.Coresreq,
            })
        }
        if k.Nums == 0 {
            klog.InfoS("device allocate success", "pod", klog.KObj(pod), "allocate device", tmpDevs)
            return true, tmpDevs
        }
    }
    return false, tmpDevs
}

這樣,我們就把不滿足條件的節點給過濾掉了,剩下的節點都是可以正常調度 Pod 的,不過具體選擇哪個節點還需要依賴於配置的調度策略。

根據策略選擇節點

上一步計算出了每個節點的得分之後,就可以根據策略進行選擇了。

//計算得分,拿到所有滿足條件的節點
nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod)

// 排序
sort.Sort(nodeScores)
// 直接選擇最後一個節點
m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]

// 返回結果
res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}
return &res, nil

具體的選擇邏輯在這裏:

// 排序
sort.Sort(nodeScores)
// 直接選擇最後一個節點
m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]

對得分數據排序後,直接就選擇了最後一個節點。

初次看到這裏時也有點懵,想不明白這怎麼和調度策略牽扯到一起的。

實際上具體邏輯就在 sort 這裏,NodeScoreList 要實現 sort 接口才能進行排序,因此看下是怎麼實現的:

// pkg/scheduler/policy/node_policy.go#L32
type NodeScoreList struct {
    NodeList []*NodeScore
    Policy   string
}

func (l NodeScoreList) Len() int {
    return len(l.NodeList)
}

func (l NodeScoreList) Swap(i, j int) {
    l.NodeList[i], l.NodeList[j] = l.NodeList[j], l.NodeList[i]
}

func (l NodeScoreList) Less(i, j int) bool {
    if l.Policy == NodeSchedulerPolicySpread.String() {
       return l.NodeList[i].Score > l.NodeList[j].Score
    }
    // default policy is Binpack
    return l.NodeList[i].Score < l.NodeList[j].Score
}

核心部分:

func (l NodeScoreList) Less(i, j int) bool {
    if l.Policy == NodeSchedulerPolicySpread.String() {
       return l.NodeList[i].Score > l.NodeList[j].Score
    }
    // default policy is Binpack
    return l.NodeList[i].Score < l.NodeList[j].Score
}

根據我們的 Policy 不同,有兩種排序方式,而且排序正好相反。

// NodeSchedulerPolicyBinpack is node use binpack scheduler policy.
NodeSchedulerPolicyBinpack SchedulerPolicyName = "binpack"
// NodeSchedulerPolicySpread is node use spread scheduler policy.
NodeSchedulerPolicySpread SchedulerPolicyName = "spread"

這裏涉及到 sort.Sort() 的實現,簡單來説:

  • 如果Less()方法中使用大於(>)比較,最終排序結果將是降序。
  • 如果Less()方法中使用小於(<)比較,最終排序結果將是升序。

對應到調度策略:

  • Binpack 策略使用 小於(<)比較,最終排序結果將是升序
  • Spread 策略使用 大於(>)比較,最終排序結果將是降序

又因為前面打分時的規則是:剩餘資源越少,得分越低,再加上我們會選擇排序後的最後一個節點

至此,邏輯就清晰了。

  • Binpack 策略選擇最後一個節點,因為升序排列,最後一個 Node 得分最高,即:空閒資源最少
  • Spread 策略選擇最後一個節點,因為降序排列,最後一個 Node 得分最低,即:空閒資源最多

正好符合了策略的原本含義:

  • Binpack 則是讓所有 Pod 儘量調度到同一個節點,優先把一個節點資源用完,然後再使用其他節點。
  • Spread 則是相反,儘量讓 Pod 分散到所有節點上去。

調度策略是哪兒來的

這部分邏輯實際上是在 calScore 方法裏:

func (s *Scheduler) calcScore(nodes *map[string]*NodeUsage, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*policy.NodeScoreList, error) {
    userNodePolicy := config.NodeSchedulerPolicy
    if annos != nil {
        if value, ok := annos[policy.NodeSchedulerPolicyAnnotationKey]; ok {
            userNodePolicy = value
        }
    }
    res := policy.NodeScoreList{
        Policy:   userNodePolicy,
        NodeList: make([]*policy.NodeScore, 0),
    }
}

首先使用默認的調度策略,當然默認調度策略也是會被參數覆蓋的:

    rootCmd.Flags().StringVar(&config.NodeSchedulerPolicy, "node-scheduler-policy", policy.NodeSchedulerPolicyBinpack.String(), "node scheduler policy")
    rootCmd.Flags().StringVar(&config.GPUSchedulerPolicy, "gpu-scheduler-policy", policy.GPUSchedulerPolicySpread.String(), "GPU scheduler policy")

然後解析 Pod 的 Annoations,如果有指定hami.io/node-scheduler-policy 就使用 Pod 上指定的調度策略。

至此,Node 調度策略就分析完成了。

3. GPU 調度策略原理

當 Node 選好之後,Node 上有多塊 GPU,Pod 還分配哪塊呢? 這時候就該 GPU 調度策略生效了。

實際上選擇 GPU 的邏輯也暗含在 Filter 方法裏了。

// pkg/scheduler/scheduler.go#L444
func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {
    klog.InfoS("begin schedule filter", "pod", args.Pod.Name, "uuid", args.Pod.UID, "namespaces", args.Pod.Namespace)
    nums := k8sutil.Resourcereqs(args.Pod)
    total := 0
    for _, n := range nums {
        for _, k := range n {
            total += int(k.Nums)
        }
    }
    if total == 0 {
        klog.V(1).Infof("pod %v not find resource", args.Pod.Name)
        s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("does not request any resource"))
        return &extenderv1.ExtenderFilterResult{
            NodeNames:   args.NodeNames,
            FailedNodes: nil,
            Error:       "",
        }, nil
    }
    annos := args.Pod.Annotations
    s.delPod(args.Pod)
    nodeUsage, failedNodes, err := s.getNodesUsage(args.NodeNames, args.Pod)
    if err != nil {
        s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
        return nil, err
    }
    if len(failedNodes) != 0 {
        klog.V(5).InfoS("getNodesUsage failed nodes", "nodes", failedNodes)
    }
    nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod)
    if err != nil {
        err := fmt.Errorf("calcScore failed %v for pod %v", err, args.Pod.Name)
        s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
        return nil, err
    }
    if len((*nodeScores).NodeList) == 0 {
        klog.V(4).Infof("All node scores do not meet for pod %v", args.Pod.Name)
        s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("no available node, all node scores do not meet"))
        return &extenderv1.ExtenderFilterResult{
            FailedNodes: failedNodes,
        }, nil
    }
    klog.V(4).Infoln("nodeScores_len=", len((*nodeScores).NodeList))
    sort.Sort(nodeScores)
    m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
    klog.Infof("schedule %v/%v to %v %v", args.Pod.Namespace, args.Pod.Name, m.NodeID, m.Devices)
    annotations := make(map[string]string)
    annotations[util.AssignedNodeAnnotations] = m.NodeID
    annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10)

    for _, val := range device.GetDevices() {
        val.PatchAnnotations(&annotations, m.Devices)
    }

    //InRequestDevices := util.EncodePodDevices(util.InRequestDevices, m.devices)
    //supportDevices := util.EncodePodDevices(util.SupportDevices, m.devices)
    //maps.Copy(annotations, InRequestDevices)
    //maps.Copy(annotations, supportDevices)
    s.addPod(args.Pod, m.NodeID, m.Devices)
    err = util.PatchPodAnnotations(args.Pod, annotations)
    if err != nil {
        s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
        s.delPod(args.Pod)
        return nil, err
    }
    s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringSucceed, []string{m.NodeID}, nil)
    res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}
    return &res, nil
}

過濾 GPU

解析 Pod 申請的 GPU 信息

首先是從 Pod 信息中解析出申請的 GPU 信息:

// pkg/scheduler/scheduler.go#L444
func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {
    nums := k8sutil.Resourcereqs(args.Pod)
}

和 Node 調度策略中的一樣,這裏就不在贅述了。

過濾 GPU

這部分邏輯隱藏比較深刻,在fitInDevices 方法中

// pkg/scheduler/score.go#L185
func (s *Scheduler) calcScore(nodes *map[string]*NodeUsage, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*policy.NodeScoreList, error) {

    for nodeID, node := range *nodes {
            fit, _ := fitInDevices(node, n, annos, task, &score.Devices)
  }
}

fitInDevices 內容如下:

func fitInDevices(node *NodeUsage, requests util.ContainerDeviceRequests, annos map[string]string, pod *corev1.Pod, devinput *util.PodDevices) (bool, float32) {
    //devmap := make(map[string]util.ContainerDevices)
    devs := util.ContainerDevices{}
    total, totalCore, totalMem := int32(0), int32(0), int32(0)
    free, freeCore, freeMem := int32(0), int32(0), int32(0)
    sums := 0
    // computer all device score for one node
    for index := range node.Devices.DeviceLists {
        node.Devices.DeviceLists[index].ComputeScore(requests)
    }
    //This loop is for requests for different devices
    for _, k := range requests {
        sums += int(k.Nums)
        if int(k.Nums) > len(node.Devices.DeviceLists) {
            klog.InfoS("request devices nums cannot exceed the total number of devices on the node.", "pod", klog.KObj(pod), "request devices nums", k.Nums, "node device nums", len(node.Devices.DeviceLists))
            return false, 0
        }
        sort.Sort(node.Devices)
        fit, tmpDevs := fitInCertainDevice(node, k, annos, pod)
        if fit {
            for _, val := range tmpDevs[k.Type] {
                total += node.Devices.DeviceLists[val.Idx].Device.Count
                totalCore += node.Devices.DeviceLists[val.Idx].Device.Totalcore
                totalMem += node.Devices.DeviceLists[val.Idx].Device.Totalmem
                free += node.Devices.DeviceLists[val.Idx].Device.Count - node.Devices.DeviceLists[val.Idx].Device.Used
                freeCore += node.Devices.DeviceLists[val.Idx].Device.Totalcore - node.Devices.DeviceLists[val.Idx].Device.Usedcores
                freeMem += node.Devices.DeviceLists[val.Idx].Device.Totalmem - node.Devices.DeviceLists[val.Idx].Device.Usedmem

                node.Devices.DeviceLists[val.Idx].Device.Used++
                node.Devices.DeviceLists[val.Idx].Device.Usedcores += val.Usedcores
                node.Devices.DeviceLists[val.Idx].Device.Usedmem += val.Usedmem
            }
            devs = append(devs, tmpDevs[k.Type]...)
        } else {
            return false, 0
        }
        (*devinput)[k.Type] = append((*devinput)[k.Type], devs)
    }
    return true, 0
}

核心部分:

for _, k := range requests {
      sort.Sort(node.Devices)
        fit, tmpDevs := fitInCertainDevice(node, k, annos, pod)
      if fit {
            devs = append(devs, tmpDevs[k.Type]...)
        } else {
            return false, 0
        }
        (*devinput)[k.Type] = append((*devinput)[k.Type], devs)
}

這裏又出現了 sort.Sort 是不是有點熟悉,不過暫時先不管,還是先分析怎麼過濾 GPU 的。

核心部分在fitInCertainDevice 中,根據 Pod 申請的 GPU 信息找出當前節點上所有滿足條件的 GPU

fit, tmpDevs := fitInCertainDevice(node, k, annos, pod)

fitInCertainDevice 在前面過濾 Node 時也分析過,這裏就簡單看下

func fitInCertainDevice(node *NodeUsage, request util.ContainerDeviceRequest, annos map[string]string, pod *corev1.Pod) (bool, map[string]util.ContainerDevices) {
     for i := len(node.Devices.DeviceLists) - 1; i >= 0; i-- {
            continue
        }
        if node.Devices.DeviceLists[i].Device.Totalcore-node.Devices.DeviceLists[i].Device.Usedcores < k.Coresreq {
            continue
        }
        // Coresreq=100 indicates it want this card exclusively
        if node.Devices.DeviceLists[i].Device.Totalcore == 100 && k.Coresreq == 100 && node.Devices.DeviceLists[i].Device.Used > 0 {
            continue
        }
        // You can't allocate core=0 job to an already full GPU
        if node.Devices.DeviceLists[i].Device.Totalcore != 0 && node.Devices.DeviceLists[i].Device.Usedcores == node.Devices.DeviceLists[i].Device.Totalcore && k.Coresreq == 0 {
            continue
        }
        if k.Nums > 0 {

            k.Nums--
            tmpDevs[k.Type] = append(tmpDevs[k.Type], util.ContainerDevice{
                Idx:       int(node.Devices.DeviceLists[i].Device.Index),
                UUID:      node.Devices.DeviceLists[i].Device.ID,
                Type:      k.Type,
                Usedmem:   memreq,
                Usedcores: k.Coresreq,
            })
        }
        if k.Nums == 0 {
            klog.InfoS("device allocate success", "pod", klog.KObj(pod), "allocate device", tmpDevs)
            return true, tmpDevs
        }
    }
    return false, tmpDevs
  }
}

如果某個 GPU 能滿足這些條件就認為該 GPU 可以分配給對應 Container。

又回到前面的核心邏輯,對於滿足條件的 GPU,這裏使用了 devinput 對象進行記錄。

for _, k := range requests {
      sort.Sort(node.Devices)
        fit, tmpDevs := fitInCertainDevice(node, k, annos, pod)
      if fit {
            devs = append(devs, tmpDevs[k.Type]...)
        } else {
            return false, 0
        }
        (*devinput)[k.Type] = append((*devinput)[k.Type], devs)
}

這裏的 devinput 實際上就是前面傳進來的 Score 對象。

type NodeScore struct {
    NodeID  string
    Devices util.PodDevices
    // Score recode every node all device user/allocate score
    Score float32
}

標記到 Pod 上

hami 為了讓後續的 DevicePlugin 能夠知道要把哪些 GPU 分配給該 Pod,是直接將其記錄到 Pod 的 Annoations 上的。

// pkg/scheduler/scheduler.go
func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {
    sort.Sort(nodeScores)
    m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
    klog.Infof("schedule %v/%v to %v %v", args.Pod.Namespace, args.Pod.Name, m.NodeID, m.Devices)
    annotations := make(map[string]string)
    annotations[util.AssignedNodeAnnotations] = m.NodeID
    annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10)

    for _, val := range device.GetDevices() {
        val.PatchAnnotations(&annotations, m.Devices)
    }
}

選擇到節點之後,還把 m.Devices 信息記錄到了 Pod 的 Annoations 上。

    annotations := make(map[string]string)
    annotations[util.AssignedNodeAnnotations] = m.NodeID
    annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10)

    for _, val := range device.GetDevices() {
        val.PatchAnnotations(&annotations, m.Devices)
    }

這裏的 m.Devices 實際上就是前面我們過濾出來的滿足條件的 GPU。

Annoations 大概是這樣的:

root@test:~/lixd/hami# k get po hami-30 -oyaml
apiVersion: v1
kind: Pod
metadata:
  annotations:
    hami.io/bind-phase: allocating
    hami.io/bind-time: "1732072495"
    hami.io/vgpu-devices-allocated: GPU-1afede84-4e70-2174-49af-f07ebb94d1ae,NVIDIA,20000,30:;
    hami.io/vgpu-devices-to-allocate: GPU-1afede84-4e70-2174-49af-f07ebb94d1ae,NVIDIA,20000,30:;
    hami.io/vgpu-node: test
    hami.io/vgpu-time: "1732072495"
  • hami.io/vgpu-devices-to-allocate 是 Scheduler 為 Pod 選擇的目標 GPU
  • hami.io/vgpu-devices-allocated 是當前已經分配的
ps:對於已經調度的 Pod hami.io/vgpu-devices-to-allocate 會被清空

調度完成後,DevicePlugin 直接讀取 hami.io/vgpu-devices-to-allocate 就知道要為該 Pod 分配哪些 GPU 了。

根據策略選擇 GPU

前面都已經選出了滿足條件的 GPU 甚至都記錄到了 Pod 的 Annoations 上了,那麼 GPU 調度策略是什麼時候生效的呢?

GPU 打分

邏輯和 Node 打分邏輯基本一致:都是剩餘資源越多,得分越低

func (ds *DeviceListsScore) ComputeScore(requests util.ContainerDeviceRequests) {
    request, core, mem := int32(0), int32(0), int32(0)
    // Here we are required to use the same type device
    for _, container := range requests {
        request += container.Nums
        core += container.Coresreq
        if container.MemPercentagereq != 0 && container.MemPercentagereq != 101 {
            mem += ds.Device.Totalmem * (container.MemPercentagereq / 100.0)
            continue
        }
        mem += container.Memreq
    }
    klog.V(2).Infof("device %s user %d, userCore %d, userMem %d,", ds.Device.ID, ds.Device.Used, ds.Device.Usedcores, ds.Device.Usedmem)

    usedScore := float32(request+ds.Device.Used) / float32(ds.Device.Count)
    coreScore := float32(core+ds.Device.Usedcores) / float32(ds.Device.Totalcore)
    memScore := float32(mem+ds.Device.Usedmem) / float32(ds.Device.Totalmem)
    ds.Score = float32(Weight) * (usedScore + coreScore + memScore)
    klog.V(2).Infof("device %s computer score is %f", ds.Device.ID, ds.Score)
}

排序

這部分也在 fitInDevices 方法中

// pkg/scheduler/score.go#L144
func fitInDevices(node *NodeUsage, requests util.ContainerDeviceRequests, annos map[string]string, pod *corev1.Pod, devinput *util.PodDevices) (bool, float32) {
  for _, k := range requests {
      sort.Sort(node.Devices)
      fit, tmpDevs := fitInCertainDevice(node, k, annos, pod)
      if fit {
        devs = append(devs, tmpDevs[k.Type]...)
      } else {
        return false, 0
      }
      (*devinput)[k.Type] = append((*devinput)[k.Type], devs)
  }
}

核心就是這個 sort 方法

sort.Sort(node.Devices)

又出現了 sort.Sort 是不是想到了什麼。

前面選擇節點的時候也是這樣實現的,把具體邏輯放在 sort 接口實現上。看看 GPU 的 Sort 接口怎麼實現的:

func (l DeviceUsageList) Len() int {
    return len(l.DeviceLists)
}

func (l DeviceUsageList) Swap(i, j int) {
    l.DeviceLists[i], l.DeviceLists[j] = l.DeviceLists[j], l.DeviceLists[i]
}

func (l DeviceUsageList) Less(i, j int) bool {
    if l.Policy == GPUSchedulerPolicyBinpack.String() {
        if l.DeviceLists[i].Device.Numa == l.DeviceLists[j].Device.Numa {
            return l.DeviceLists[i].Score < l.DeviceLists[j].Score
        }
        return l.DeviceLists[i].Device.Numa > l.DeviceLists[j].Device.Numa
    }
    // default policy is spread
    if l.DeviceLists[i].Device.Numa == l.DeviceLists[j].Device.Numa {
        return l.DeviceLists[i].Score > l.DeviceLists[j].Score
    }
    return l.DeviceLists[i].Device.Numa < l.DeviceLists[j].Device.Numa
}

果然又是這樣的,根據不同的 GPU 調度策略,Less 方法返回不同結果以控制排序結果是降序還是升序。

選擇 GPU

然後後續再選擇 GPU 的時候的代碼如下:

func fitInCertainDevice(node *NodeUsage, request util.ContainerDeviceRequest, annos map[string]string, pod *corev1.Pod) (bool, map[string]util.ContainerDevices) {
     for i := len(node.Devices.DeviceLists) - 1; i >= 0; i-- {
            continue
        }
        if node.Devices.DeviceLists[i].Device.Totalcore-node.Devices.DeviceLists[i].Device.Usedcores < k.Coresreq {
            continue
        }
        // Coresreq=100 indicates it want this card exclusively
        if node.Devices.DeviceLists[i].Device.Totalcore == 100 && k.Coresreq == 100 && node.Devices.DeviceLists[i].Device.Used > 0 {
            continue
        }
        // You can't allocate core=0 job to an already full GPU
        if node.Devices.DeviceLists[i].Device.Totalcore != 0 && node.Devices.DeviceLists[i].Device.Usedcores == node.Devices.DeviceLists[i].Device.Totalcore && k.Coresreq == 0 {
            continue
        }
        if k.Nums > 0 {
            k.Nums--
            tmpDevs[k.Type] = append(tmpDevs[k.Type], util.ContainerDevice{
                Idx:       int(node.Devices.DeviceLists[i].Device.Index),
                UUID:      node.Devices.DeviceLists[i].Device.ID,
                Type:      k.Type,
                Usedmem:   memreq,
                Usedcores: k.Coresreq,
            })
        }
        if k.Nums == 0 {
            klog.InfoS("device allocate success", "pod", klog.KObj(pod), "allocate device", tmpDevs)
            return true, tmpDevs
        }
    }
    return false, tmpDevs
  }
}

核心是這個 for 循環

for i := len(node.Devices.DeviceLists) - 1; i >= 0; i-- {
}

也是從最後一個 GPU 開始的,也就是如果排在後面的 GPU 滿足條件就會直接被選中,不會再去選擇前面的了。

  • Binpack 策略:結果為升序,越往後的 GPU 空閒資源越少
  • Spread 策略:結果為降序,越往後的 GPU 空閒資源越多

同樣也是符合對應策略含義的。

至此,GPU 調度策略也分析完了。

DevicePlugin 解析 GPU 信息

在調度時,我們把最終選擇的 GPU 記錄到了 Pod 的 Annoations 上,DevicePlugin 這邊就不需要選擇 GPU 了,從 Annoations 上解析即可

// pkg/util/util.go#L281
func GetNextDeviceRequest(dtype string, p corev1.Pod) (corev1.Container, ContainerDevices, error) {
    pdevices, err := DecodePodDevices(InRequestDevices, p.Annotations)
    if err != nil {
       return corev1.Container{}, ContainerDevices{}, err
    }
    klog.Infof("pod annotation decode vaule is %+v", pdevices)
    res := ContainerDevices{}

    pd, ok := pdevices[dtype]
    if !ok {
       return corev1.Container{}, res, errors.New("device request not found")
    }
    for ctridx, ctrDevice := range pd {
       if len(ctrDevice) > 0 {
          return p.Spec.Containers[ctridx], ctrDevice, nil
       }
    }
    return corev1.Container{}, res, errors.New("device request not found")
}

// pkg/util/util.go#L254
func DecodePodDevices(checklist map[string]string, annos map[string]string) (PodDevices, error) {
    klog.V(5).Infof("checklist is [%+v], annos is [%+v]", checklist, annos)
    if len(annos) == 0 {
       return PodDevices{}, nil
    }
    pd := make(PodDevices)
    for devID, devs := range checklist {
       str, ok := annos[devs]
       if !ok {
          continue
       }
       pd[devID] = make(PodSingleDevice, 0)
       for _, s := range strings.Split(str, OnePodMultiContainerSplitSymbol) {
          cd, err := DecodeContainerDevices(s)
          if err != nil {
             return PodDevices{}, nil
          }
          if len(cd) == 0 {
             continue
          }
          pd[devID] = append(pd[devID], cd)
       }
    }
    klog.InfoS("Decoded pod annos", "poddevices", pd)
    return pd, nil
}

具體的解析邏輯如下,就是按照預設規則,以冒號,逗號進行切分

// pkg/util/util.go#L223
func DecodeContainerDevices(str string) (ContainerDevices, error) {
    if len(str) == 0 {
       return ContainerDevices{}, nil
    }
    cd := strings.Split(str, OneContainerMultiDeviceSplitSymbol)
    contdev := ContainerDevices{}
    tmpdev := ContainerDevice{}
    klog.V(5).Infof("Start to decode container device %s", str)
    if len(str) == 0 {
       return ContainerDevices{}, nil
    }
    for _, val := range cd {
       if strings.Contains(val, ",") {
          //fmt.Println("cd is ", val)
          tmpstr := strings.Split(val, ",")
          if len(tmpstr) < 4 {
             return ContainerDevices{}, fmt.Errorf("pod annotation format error; information missing, please do not use nodeName field in task")
          }
          tmpdev.UUID = tmpstr[0]
          tmpdev.Type = tmpstr[1]
          devmem, _ := strconv.ParseInt(tmpstr[2], 10, 32)
          tmpdev.Usedmem = int32(devmem)
          devcores, _ := strconv.ParseInt(tmpstr[3], 10, 32)
          tmpdev.Usedcores = int32(devcores)
          contdev = append(contdev, tmpdev)
       }
    }
    klog.V(5).Infof("Finished decoding container devices. Total devices: %d", len(contdev))
    return contdev, nil
}

至此,hami 提供的 Node、GPU 級別的 Spread、Binpack 高級調度策略就分析完成了。


【Kubernetes 系列】持續更新中,搜索公眾號【探索雲原生】訂閲,閲讀更多文章。


4. 小結

調度策略配置

hami-scheduler 提供了兩種不同級別的調度策略:

  • 節點調度策略:作用於調度過程中如何為 Pod 選擇節點
  • GPU 調度策略:作用於選擇節點後,節點存在多 GPU 時如何為 Pod 選擇 GPU

二者都支持 Spread 和 Binpack 兩種配置:

  • Spread 表示儘量將任務分配到不同 Node 或 GPU 上,讓集羣中的 Node 或 GPU 負載儘量保持相同水位線。
  • Binpack 表示儘量將任務分配到同一 Node 或者 GPU 上,儘量先佔滿一個 Node 或者 GPU 後再使用別的

具體 Node、GPU 調度策略實現都可以分為以下幾步

  • 1)給 Node、GPU 打分
  • 2)過濾掉不滿足條件的 Node、GPU
  • 3)根據調度策略選擇出最優 Node、GPU

    • 具體邏輯都在 sort.Sort 接口的 Less 方法實現的
    • 對於 Spread 策略就選擇剩餘資源多的 Node、GPU,Binpack 策略就選擇剩餘資源少的 Node、GPU
  • 4)對結果進行記錄

    • Node 則是通過 Bind 結果直接和 Pod 綁定
    • GPU 則是記錄到 Pod 的 Annoations 上

所有邏輯都在 Filter 方法裏,Node 的調度策略還算比較清晰,除了 sort.Sort 這個點需要多看會之外,其他都還好。

GPU 調度策略就複雜了一點,所有邏輯都混在一起的,不是很容易區分,需要慢慢分析。

user avatar free_like_bird 头像 rancherlabs 头像 yian 头像 ydswin 头像 dolphinscheduler 头像 gouguoyin 头像 weiwudejiqimao 头像 mimangdeyangcong 头像 gangyidesongshu 头像
点赞 9 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.