Go動態感知資源變更的技術實踐,你指定用過!
最近在倒騰“AI大模型基礎設施”, 宏觀目標是做一個基于云原生的AI算力平臺,目前因公司隱私暫不能公開宏觀背景和技術方案, 姑且記錄實踐中遇到的一些技能點。

前文已經記錄了第1步: 使用arena 提交訓練任務的實踐。
今天我們記錄聊一聊平臺側另一個核心能力:
動態納管云原生k8s集群,并監聽AI/ML訓練任務的狀態變化。也就是上圖的第4步。
作為面向算法開發者的云原生saas平臺,平臺在界面上提供了納管集群的交互入口,平臺啟動后會去監聽pytorch,mpi訓練任務的狀態變更,并回顯到界面(并給開發者發送飛書變更通知)。
本文核心關注:
- 如何動態納管k8s集群
- 如何重建k8s informer監聽
這里我提供我的實踐, 請看下圖:

- 程序啟動,加載初始k8s集群,informer監聽訓練任務狀態,并綁定informer停止信號(stopCh)、重建信號(rebuildCh)
package main
import "k8s.io/client-go/rest"
type StartInformerFunc func(clusterId string, restConf *rest.Config) (stopCh chan struct{}, err error)
type InformerManager struct {
clusterConfigs map[string]string
gvr map[string]StartInformerFunc
stopCh chan struct{} // informer 需要用到
rebuildCh chan struct{}
}
var InformerManagerInstance *InformerManager
func NewInFormerManager() *InformerManager {
InformerManagerInstance = &InformerManager{
clusterConfigs: map[string]string{
"id1": "kubeconfig1",
"id2": "kubeconfig2",
},
gvr: map[string]StartInformerFunc{
"pytorchjob": startPytorchjobInformer,
"mpijob": startMpijobInformer,
"job": startRawjobInformer,
},
stopCh: make(chan struct{}),
rebuildCh: make(chan struct{}, 1),
}
return InformerManagerInstance
}
- 開協程定時任務去輪循落盤的待納管k8s集群記錄
- 考慮納管的k8s集群數據可控,變更時機可控,采用md5校驗的方式判斷是否發生集群變更
下面的k8s.CheckClusterChanged(mgr.clusterConfigs) 是利用對kube-configs做md5, 前后對比判斷集群是否發生變更。
func (mgr *InformerManager) monitorcLusterChanged() bool {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if k8s.CheckClusterChanged(mgr.clusterConfigs) {
fmt.Println("cluster changed")
mgr.rebuildCh <- struct{}{}
} else {
fmt.Println("cluster no change")
}
case <-mgr.rebuildCh:
fmt.Println("rebuild informer")
mgr.clusterConfigs = k8s.GetInitclusters()
for k, v := range mgr.clusterConfigs {
rc, err := k8s.ConvertToRestConfig([]byte(v))
if err != nil {
fmt.Println("convert to rest config failed")
continue
}
for gvr, StartInformerFunc := range mgr.gvr {
go func(k string, rc *rest.Config) {
newStopch, err := StartInformerFunc(k, rc)
if err != nil {
return
}
if mgr.stopCh != nil {
close(mgr.stopCh)
}
mgr.stopCh = newStopch
fmt.Printf("%s informer started for cluster %s \n", gvr, k)
}(k, rc)
}
}
}
}
}
- 利用簡單的鏈表指針,重置informer監聽信道
func (mgr *InformerManager) Run() {
for k, v := range mgr.clusterConfigs {
rc, err := k8s.ConvertToRestConfig([]byte(v))
if err != nil {
fmt.Println("failed to convert kubeconfig to rest config")
continue
}
for gvr, StartInformerFunc := range mgr.gvr {
go func(k string, rc *rest.Config) {
newStopch, err := StartInformerFunc(k, rc)
if err != nil {
return
}
if mgr.stopCh != nil {
close(mgr.stopCh)
}
mgr.stopCh = newStopch
fmt.Printf("start %s informer for cluster %s \n", gvr,k)
}(k, rc)
}
go mgr.monitorClusterChanged()
}
}
本文記錄了使用定時任務感知資源變更,并利用golang信道作為變更信號的姿勢,可作為golang中動態感知資源變化的常規技能實踐。
本文來自博客園,作者:{有態度的馬甲},轉載請注明原文鏈接:http://www.rzrgm.cn/JulianHuang/p/19023906
歡迎關注我的原創技術、職場公眾號, 加好友談天說地,一起進化

浙公網安備 33010602011771號