作者:杜杨浩,腾讯云高级工程师,热衷于开源、容器和Kubernetes。目前主要从事镜像仓库、Kubernetes集群高可用&备份还原,以及边缘计算相关研发工作。

前言

SuperEdge 是基于原生 Kubernetes 的边缘容器管理系统。该系统把云原生能力扩展到边缘侧,很好的实现了云端对边缘端的管理和控制,极大简化了应用从云端部署到边缘端的过程。同时SuperEdge设计了分布式健康检查机制规避了云边网络不稳定造成的大量pod迁移和重建,保证了服务的稳定。

边缘计算场景下,边缘节点与云端的网络环境十分复杂,连接并不可靠,在原生 Kubernetes 集群中,会造成 apiserver 和节点连接的中断,节点状态的异常,最终导致pod的驱逐和 endpoint 的缺失,造成服务的中断和波动,具体来说原生 Kubernetes 处理如下:

  • 失联的节点被置为 ConditionUnknown 状态,并被添加 NoSchedule 和 NoExecute 的 taints
  • 失联的节点上的 pod 被驱逐,并在其他节点上进行重建
  • 失联的节点上的 pod 从 Service 的 Endpoint 列表中移除

因此,边缘计算场景仅仅依赖边端和 apiserver 的连接情况是不足以判断节点是否异常的,会因为网络的不可靠造成误判,影响正常服务。而相较于云端和边缘端的连接,显然边端节点之间的连接更为稳定,具有更高的参考价值,因此 superedge 提出了边缘分布式健康检查机制。该机制中节点状态判定除了要考虑 apiserver 的因素外,还引入了节点的评估因素,进而对节点进行更为全面的状态判断。通过这个功能,能够避免由于云边网络不可靠造成的大量的pod迁移和重建,保证服务的稳定

具体来说,主要通过如下三个层面增强节点状态判断的准确性:

  • 每个节点定期探测其他节点健康状态
  • 集群内所有节点定期投票决定各节点的状态
  • 云端和边端节点共同决定节点状态

而分布式健康检查最终的判断处理如下:

edge-health-daemon 源码分析

在深入源码之前先介绍一下分布式健康检查的实现原理,其架构图如下所示:

Kubernetes 每个 node 在 kube-node-lease namespace 下会对应一个 Lease object,kubelet 每隔 node-status-update-frequency 时间(默认10s)会更新对应node的 Lease object

node-controller 会每隔 node-monitor-period 时间(默认5s)检查 Lease object 是否更新,如果超过 node-monitor-grace-period 时间(默认40s)没有发生过更新,则认为这个 node 不健康,会更新 NodeStatus(ConditionUnknown)

而当节点心跳超时(ConditionUnknown)之后,node controller 会给该 node 添加如下 taints:

spec:...taints:- effect: NoSchedulekey: node.kubernetes.io/unreachabletimeAdded: "2020-07-02T03:50:47Z"- effect: NoExecutekey: node.kubernetes.io/unreachabletimeAdded: "2020-07-02T03:50:53Z"

同时,endpoint controller 会从 endpoint backend 中踢掉该母机上的所有 pod

对于打上 NoSchedule taint 的母机,Scheduler 不会调度新的负载在该 node 上了;而对于打上 NoExecute(node.kubernetes.io/unreachable) taint 的母机,node controller 会在节点心跳超时之后一段时间(默认5mins)驱逐该节点上的 pod

分布式健康检查边端的 edge-health-daemon 组件会对同区域边缘节点执行分布式健康检查,并向 apiserver 发送健康状态投票结果(给 node 打 annotation)

此外,为了实现在云边断连且分布式健康检查状态正常的情况下:

  • 失联的节点上的 pod 不会从 Service 的 Endpoint 列表中移除
  • 失联的节点上的 pod 不会被驱逐

还需要在云端运行 edge-health-admission(Kubernetes mutating admission webhook),不断根据 node edge-health annotation 调整 kube-controller-manager 设置的 node taint(去掉NoExecute taint)以及 endpoints (将失联节点上的 pods 从 endpoint subsets notReadyAddresses 移到 addresses中),从而实现云端和边端共同决定节点状态

本章将主要介绍 edge-health-daemon 原理,如下为 edge-health-daemon 的相关数据结构:

type EdgeHealthMetadata struct {*NodeMetadata*CheckMetadata
}
type NodeMetadata struct {NodeList []v1.Nodesync.RWMutex
}
type CheckMetadata struct {CheckInfo            map[string]map[string]CheckDetail // Checker ip:{Checked ip:Check detail}CheckPluginScoreInfo map[string]map[string]float64     // Checked ip:{Plugin name:Check score}sync.RWMutex
}
type CheckDetail struct {Normal boolTime   time.Time
}
type CommunInfo struct {SourceIP    string                 // ClientIP,Checker ipCheckDetail map[string]CheckDetail // Checked ip:Check detailHmac        string
}

含义如下:

  • NodeMetadata:为了实现分区域分布式健康检查机制而维护的边缘节点 cache,其中包含该区域内的所有边缘节点列表 NodeList
  • CheckMetadata:存放健康检查的结果,具体来说包括两个数据结构:
    • CheckPluginScoreInfo:为Checked ip:{Plugin name:Check score}组织形式。第一级 key 表示:被检查的ip;第二级 key 表示:检查插件的名称;value 表示:检查分数
    • CheckInfo:为Checker ip:{Checked ip:Check detail}组织形式。第一级key表示:执行检查的ip;第二级key表示:被检查的ip;value表示检查结果 CheckDetail
  • CheckDetail:代表健康检查的结果
    • Normal:Normal 为 true 表示检查结果正常;false 表示异常
    • Time:表示得出该结果时的时间,用于结果有效性的判断(超过一段时间没有更新的结果将无效)
  • CommunInfo:边缘节点向其它节点发送健康检查结果时使用的数据,其中包括:
    • SourceIP:表示执行检查的ip
    • CheckDetail:为Checked ip:Check detail组织形式,包含被检查的ip以及检查结果
    • Hmac:SourceIP 以及 CheckDetail 进行 hmac 得到,用于边缘节点通信过程中判断传输数据的有效性(是否被篡改)

edge-health-daemon 主体逻辑包括四部分功能:

  • SyncNodeList:根据边缘节点所在的 zone 刷新 node cache,同时更新 CheckMetadata相关数据
  • ExecuteCheck:对每个边缘节点执行若干种类的健康检查插件(ping,kubelet等),并将各插件检查分数汇总,根据用户设置的基准线得出节点是否健康的结果
  • Commun:将本节点对其它各节点健康检查的结果发送给其它节点
  • Vote:对所有节点健康检查的结果分类,如果某个节点被大多数(>1/2)节点判定为正常,则对该节点添加superedgehealth/node-health:true annotation,表明该节点分布式健康检查结果为正常;否则,对该节点添加superedgehealth/node-health:false annotation,表明该节点分布式健康检查结果为异常

下面依次对上述功能进行源码分析:

1、SyncNodeList

SyncNodeList 每隔 HealthCheckPeriod 秒(health-check-period 选项)执行一次,会按照如下情况分类刷新 node cache:

  • 如果 kube-system namespace 下不存在名为 edge-health-zone-config的configmap,则没有开启多地域探测,因此会获取所有边缘节点列表并刷新 node cache
  • 否则,如果 edge-health-zone-config 的 configmap 数据部分 TaintZoneAdmission 为 false,则没有开启多地域探测,因此会获取所有边缘节点列表并刷新 node cache
  • 如果 TaintZoneAdmission 为 true,且 node 有"superedgehealth/topology-zone"标签(标示区域),则获取"superedgehealth/topology-zone" label value 相同的节点列表并刷新 node cache
  • 如果 node 没有"superedgehealth/topology-zone" label,则只会将边缘节点本身添加到分布式健康检查节点列表中并刷新 node cache(only itself)
func (ehd *EdgeHealthDaemon) SyncNodeList() {// Only sync nodes when self-located foundvar host *v1.Nodeif host = ehd.metadata.GetNodeByName(ehd.cfg.Node.HostName); host == nil {klog.Errorf("Self-hostname %s not found", ehd.cfg.Node.HostName)return}// Filter cloud nodes and retain edge onesmasterRequirement, err := labels.NewRequirement(common.MasterLabel, selection.DoesNotExist, []string{})if err != nil {klog.Errorf("New masterRequirement failed %+v", err)return}masterSelector := labels.NewSelector()masterSelector = masterSelector.Add(*masterRequirement)if mrc, err := ehd.cmLister.ConfigMaps(metav1.NamespaceSystem).Get(common.TaintZoneConfigMap); err != nil {if apierrors.IsNotFound(err) { // multi-region configmap not foundif NodeList, err := ehd.nodeLister.List(masterSelector); err != nil {klog.Errorf("Multi-region configmap not found and get nodes err %+v", err)return} else {ehd.metadata.SetByNodeList(NodeList)}} else {klog.Errorf("Get multi-region configmap err %+v", err)return}} else { // multi-region configmap foundmrcv := mrc.Data[common.TaintZoneConfigMapKey]klog.V(4).Infof("Multi-region value is %s", mrcv)if mrcv == "false" { // close multi-region checkif NodeList, err := ehd.nodeLister.List(masterSelector); err != nil {klog.Errorf("Multi-region configmap exist but disabled and get nodes err %+v", err)return} else {ehd.metadata.SetByNodeList(NodeList)}} else { // open multi-region checkif hostZone, existed := host.Labels[common.TopologyZone]; existed {klog.V(4).Infof("Host %s has HostZone %s", host.Name, hostZone)zoneRequirement, err := labels.NewRequirement(common.TopologyZone, selection.Equals, []string{hostZone})if err != nil {klog.Errorf("New masterZoneRequirement failed: %+v", err)return}masterZoneSelector := labels.NewSelector()masterZoneSelector = masterZoneSelector.Add(*masterRequirement, *zoneRequirement)if nodeList, err := ehd.nodeLister.List(masterZoneSelector); err != nil {klog.Errorf("TopologyZone label for hostname %s but get nodes err: %+v", host.Name, err)return} else {ehd.metadata.SetByNodeList(nodeList)}} else { // Only check itself if there is no TopologyZone labelklog.V(4).Infof("Only check itself since there is no TopologyZone label for hostname %s", host.Name)ehd.metadata.SetByNodeList([]*v1.Node{host})}}}// Init check plugin scoreipList := make(map[string]struct{})for _, node := range ehd.metadata.Copy() {for _, addr := range node.Status.Addresses {if addr.Type == v1.NodeInternalIP {ipList[addr.Address] = struct{}{}ehd.metadata.InitCheckPluginScore(addr.Address)}}}// Delete redundant check plugin scorefor _, checkedIp := range ehd.metadata.CopyCheckedIp() {if _, existed := ipList[checkedIp]; !existed {ehd.metadata.DeleteCheckPluginScore(checkedIp)}}// Delete redundant check infofor checkerIp := range ehd.metadata.CopyAll() {if _, existed := ipList[checkerIp]; !existed {ehd.metadata.DeleteByIp(ehd.cfg.Node.LocalIp, checkerIp)}}klog.V(4).Infof("SyncNodeList check info %+v successfully", ehd.metadata)
}
...
func (cm *CheckMetadata) DeleteByIp(localIp, ip string) {cm.Lock()defer cm.Unlock()delete(cm.CheckInfo[localIp], ip)delete(cm.CheckInfo, ip)
}

在按照如上逻辑更新node cache之后,会初始化CheckMetadata.CheckPluginScoreInfo,将节点ip赋值给CheckPluginScoreInfo key(Checked ip:被检查的ip)

另外,会删除CheckMetadata.CheckPluginScoreInfo以及CheckMetadata.CheckInfo中多余的items(不属于该边缘节点检查范围)

2、ExecuteCheck

ExecuteCheck也是每隔HealthCheckPeriod秒(health-check-period选项)执行一次,会对每个边缘节点执行若干种类的健康检查插件(ping,kubelet等),并将各插件检查分数汇总,根据用户设置的基准线HealthCheckScoreLine(health-check-scoreline选项)得出节点是否健康的结果

func (ehd *EdgeHealthDaemon) ExecuteCheck() {util.ParallelizeUntil(context.TODO(), 16, len(ehd.checkPlugin.Plugins), func(index int) {ehd.checkPlugin.Plugins[index].CheckExecute(ehd.metadata.CheckMetadata)})klog.V(4).Infof("CheckPluginScoreInfo is %+v after health check", ehd.metadata.CheckPluginScoreInfo)for checkedIp, pluginScores := range ehd.metadata.CopyCheckPluginScore() {totalScore := 0.0for _, score := range pluginScores {totalScore += score}if totalScore >= ehd.cfg.Check.HealthCheckScoreLine {ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: true})} else {ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: false})}}klog.V(4).Infof("CheckInfo is %+v after health check", ehd.metadata.CheckInfo)
}

这里会调用 ParallelizeUntil 并发执行各检查插件,edge-health 目前支持 ping 以及 kubelet 两种检查插件,在 checkplugin 目录(github.com/superedge/superedge/pkg/edge-health/checkplugin),通过 Register 注册到 PluginInfo 单例(plugin列表)中,如下:

// TODO: handle flag parse errors
func (pcp *PingCheckPlugin) Set(s string) error {var err errorfor _, para := range strings.Split(s, ",") {if len(para) == 0 {continue}arr := strings.Split(para, "=")trimKey := strings.TrimSpace(arr[0])switch trimKey {case "timeout":timeout, _ := strconv.Atoi(strings.TrimSpace(arr[1]))pcp.HealthCheckoutTimeOut = timeoutcase "retries":retries, _ := strconv.Atoi(strings.TrimSpace(arr[1]))pcp.HealthCheckRetries = retriescase "weight":weight, _ := strconv.ParseFloat(strings.TrimSpace(arr[1]), 64)pcp.Weight = weightcase "port":port, _ := strconv.Atoi(strings.TrimSpace(arr[1]))pcp.Port = port}}PluginInfo = NewPlugin()PluginInfo.Register(pcp)return err
}
func (p *Plugin) Register(plugin CheckPlugin) {p.Plugins = append(p.Plugins, plugin)klog.V(4).Info("Register check plugin: %+v", plugin)
}
...
var (PluginOnce sync.OncePluginInfo Plugin
)
type Plugin struct {Plugins []CheckPlugin
}
func NewPlugin() Plugin {PluginOnce.Do(func() {PluginInfo = Plugin{Plugins: []CheckPlugin{},}})return PluginInfo
}

每种插件具体执行健康检查的逻辑封装在 CheckExecute 中,这里以 ping plugin 为例:

// github.com/superedge/superedge/pkg/edge-health/checkplugin/pingcheck.go
func (pcp *PingCheckPlugin) CheckExecute(checkMetadata *metadata.CheckMetadata) {copyCheckedIp := checkMetadata.CopyCheckedIp()util.ParallelizeUntil(context.TODO(), 16, len(copyCheckedIp), func(index int) {checkedIp := copyCheckedIp[index]var err errorfor i := 0; i < pcp.HealthCheckRetries; i++ {if _, err := net.DialTimeout("tcp", checkedIp+":"+strconv.Itoa(pcp.Port), time.Duration(pcp.HealthCheckoutTimeOut)*time.Second); err == nil {break}}if err == nil {klog.V(4).Infof("Edge ping health check plugin %s for ip %s succeed", pcp.Name(), checkedIp)checkMetadata.SetByPluginScore(checkedIp, pcp.Name(), pcp.GetWeight(), common.CheckScoreMax)} else {klog.Warning("Edge ping health check plugin %s for ip %s failed, possible reason %s", pcp.Name(), checkedIp, err.Error())checkMetadata.SetByPluginScore(checkedIp, pcp.Name(), pcp.GetWeight(), common.CheckScoreMin)}})
}
// CheckPluginScoreInfo relevant functions
func (cm *CheckMetadata) SetByPluginScore(checkedIp, pluginName string, weight float64, score int) {cm.Lock()defer cm.Unlock()if _, existed := cm.CheckPluginScoreInfo[checkedIp]; !existed {cm.CheckPluginScoreInfo[checkedIp] = make(map[string]float64)}cm.CheckPluginScoreInfo[checkedIp][pluginName] = float64(score) * weight
}

CheckExecute 会对同区域每个节点执行 ping 探测(net.DialTimeout),如果失败,则给该节点打 CheckScoreMin 分(0);否则,打 CheckScoreMax 分(100)

每种检查插件会有一个 Weight 参数,表示了该检查插件分数的权重值,所有权重参数之和应该为1,对应基准分数线 HealthCheckScoreLine 范围0-100。因此这里在设置分数时,会乘以权重

回到 ExecuteCheck 函数,在调用各插件执行健康检查得出权重分数(CheckPluginScoreInfo)后,还需要将该分数与基准线 HealthCheckScoreLine 对比:如果高于(>=)分数线,则认为该节点本次检查正常;否则异常

func (ehd *EdgeHealthDaemon) ExecuteCheck() {util.ParallelizeUntil(context.TODO(), 16, len(ehd.checkPlugin.Plugins), func(index int) {ehd.checkPlugin.Plugins[index].CheckExecute(ehd.metadata.CheckMetadata)})klog.V(4).Infof("CheckPluginScoreInfo is %+v after health check", ehd.metadata.CheckPluginScoreInfo)for checkedIp, pluginScores := range ehd.metadata.CopyCheckPluginScore() {totalScore := 0.0for _, score := range pluginScores {totalScore += score}if totalScore >= ehd.cfg.Check.HealthCheckScoreLine {ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: true})} else {ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: false})}}klog.V(4).Infof("CheckInfo is %+v after health check", ehd.metadata.CheckInfo)
}

3、Commun

在对同区域各边缘节点执行健康检查后,需要将检查的结果传递给其它各节点,这也就是 commun 模块负责的事情:

func (ehd *EdgeHealthDaemon) Run(stopCh <-chan struct{}) {// Execute edge health prepare and checkehd.PrepareAndCheck(stopCh)// Execute votevote := vote.NewVoteEdge(&ehd.cfg.Vote)go vote.Vote(ehd.metadata, ehd.cfg.Kubeclient, ehd.cfg.Node.LocalIp, stopCh)// Execute communicationcommunEdge := commun.NewCommunEdge(&ehd.cfg.Commun)communEdge.Commun(ehd.metadata.CheckMetadata, ehd.cmLister, ehd.cfg.Node.LocalIp, stopCh)<-stopCh
}

既然是互相传递结果给其它节点,则必然会有接受和发送模块:

func (c *CommunEdge) Commun(checkMetadata *metadata.CheckMetadata, cmLister corelisters.ConfigMapLister, localIp string, stopCh <-chan struct{}) {go c.communReceive(checkMetadata, cmLister, stopCh)wait.Until(func() {c.communSend(checkMetadata, cmLister, localIp)}, time.Duration(c.CommunPeriod)*time.Second, stopCh)
}

其中 communSend 负责向其它节点发送本节点对它们的检查结果;而 communReceive 负责接受其它边缘节点的检查结果。下面依次分析:

func (c *CommunEdge) communSend(checkMetadata *metadata.CheckMetadata, cmLister corelisters.ConfigMapLister, localIp string) {copyLocalCheckDetail := checkMetadata.CopyLocal(localIp)var checkedIps []stringfor checkedIp := range copyLocalCheckDetail {checkedIps = append(checkedIps, checkedIp)}util.ParallelizeUntil(context.TODO(), 16, len(checkedIps), func(index int) {// Only send commun information to other edge nodes(excluding itself)dstIp := checkedIps[index]if dstIp == localIp {return}// Send commun informationcommunInfo := metadata.CommunInfo{SourceIP: localIp, CheckDetail: copyLocalCheckDetail}if hmac, err := util.GenerateHmac(communInfo, cmLister); err != nil {log.Errorf("communSend: generateHmac err %+v", err)return} else {communInfo.Hmac = hmac}commonInfoBytes, err := json.Marshal(communInfo)if err != nil {log.Errorf("communSend: json.Marshal commun info err %+v", err)return}commonInfoReader := bytes.NewReader(commonInfoBytes)for i := 0; i < c.CommunRetries; i++ {req, err := http.NewRequest("PUT", "http://"+dstIp+":"+strconv.Itoa(c.CommunServerPort)+"/result", commonInfoReader)if err != nil {log.Errorf("communSend: NewRequest for remote edge node %s err %+v", dstIp, err)continue}if err = util.DoRequestAndDiscard(c.client, req); err != nil {log.Errorf("communSend: DoRequestAndDiscard for remote edge node %s err %+v", dstIp, err)} else {log.V(4).Infof("communSend: put commun info %+v to remote edge node %s successfully", communInfo, dstIp)break}}})
}

发送逻辑如下:

  • 构建 CommunInfo 结构体,包括:

    • SourceIP:表示执行检查的ip
    • CheckDetail:为 Checked ip:Check detail 组织形式,包含被检查的ip以及检查结果
  • 调用 GenerateHmac 构建 Hmac:实际上是以 kube-system 下的 hmac-config configmap hmackey 字段为 key,对 SourceIP 以及 CheckDetail进行 hmac 得到,用于判断传输数据的有效性(是否被篡改)
func GenerateHmac(communInfo metadata.CommunInfo, cmLister corelisters.ConfigMapLister) (string, error) {addrBytes, err := json.Marshal(communInfo.SourceIP)if err != nil {return "", err}detailBytes, _ := json.Marshal(communInfo.CheckDetail)if err != nil {return "", err}hmacBefore := string(addrBytes) + string(detailBytes)if hmacConf, err := cmLister.ConfigMaps(metav1.NamespaceSystem).Get(common.HmacConfig); err != nil {return "", err} else {return GetHmacCode(hmacBefore, hmacConf.Data[common.HmacKey])}
}
func GetHmacCode(s, key string) (string, error) {h := hmac.New(sha256.New, []byte(key))if _, err := io.WriteString(h, s); err != nil {return "", err}return fmt.Sprintf("%x", h.Sum(nil)), nil
}
  • 发送上述构建的 CommunInfo 给其它边缘节点(DoRequestAndDiscard)

communReceive逻辑也很清晰:

// TODO: support changeable server listen port
func (c *CommunEdge) communReceive(checkMetadata *metadata.CheckMetadata, cmLister corelisters.ConfigMapLister, stopCh <-chan struct{}) {svr := &http.Server{Addr: ":" + strconv.Itoa(c.CommunServerPort)}svr.ReadTimeout = time.Duration(c.CommunTimeout) * time.Secondsvr.WriteTimeout = time.Duration(c.CommunTimeout) * time.Secondhttp.HandleFunc("/debug/flags/v", pkgutil.UpdateLogLevel)http.HandleFunc("/result", func(w http.ResponseWriter, r *http.Request) {var communInfo metadata.CommunInfoif r.Body == nil {http.Error(w, "Invalid commun information", http.StatusBadRequest)return}err := json.NewDecoder(r.Body).Decode(&communInfo)if err != nil {http.Error(w, fmt.Sprintf("Invalid commun information %+v", err), http.StatusBadRequest)return}log.V(4).Infof("Received common information from %s : %+v", communInfo.SourceIP, communInfo.CheckDetail)if _, err := io.WriteString(w, "Received!\n"); err != nil {log.Errorf("communReceive: send response err %+v", err)http.Error(w, fmt.Sprintf("Send response err %+v", err), http.StatusInternalServerError)return}if hmac, err := util.GenerateHmac(communInfo, cmLister); err != nil {log.Errorf("communReceive: server GenerateHmac err %+v", err)http.Error(w, fmt.Sprintf("GenerateHmac err %+v", err), http.StatusInternalServerError)return} else {if hmac != communInfo.Hmac {log.Errorf("communReceive: Hmac not equal, hmac is %s but received commun info hmac is %s", hmac, communInfo.Hmac)http.Error(w, "Hmac not match", http.StatusForbidden)return}}log.V(4).Infof("communReceive: Hmac match")checkMetadata.SetByCommunInfo(communInfo)log.V(4).Infof("After communicate, check info is %+v", checkMetadata.CheckInfo)})go func() {if err := svr.ListenAndServe(); err != http.ErrServerClosed {log.Fatalf("Server: exit with error %+v", err)}}()for {select {case <-stopCh:ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)defer cancel()if err := svr.Shutdown(ctx); err != nil {log.Errorf("Server: program exit, server exit error %+v", err)}returndefault:}}
}

负责接受其它边缘节点的检查结果,并写入自身检查结果 CheckInfo,流程如下:

  • 通过/result路由接受请求,并将请求内容解析成 CommunInfo

  • 对 CommunInfo 执行 GenerateHmac 获取hmac值,并与 CommunInfo.Hmac 字段进行对比,检查接受数据的有效性

  • 最后将 CommunInfo 检查结果写入 CheckInfo,注意:CheckDetail.Time 设置为写入时的时间

    // CheckInfo relevant functions
    func (cm *CheckMetadata) SetByCommunInfo(c CommunInfo) {cm.Lock()defer cm.Unlock()if _, existed := cm.CheckInfo[c.SourceIP]; !existed {cm.CheckInfo[c.SourceIP] = make(map[string]CheckDetail)}for k, detail := range c.CheckDetail {// Update time to local timestamp since different machines have different onesdetail.Time = time.Now()c.CheckDetail[k] = detail}cm.CheckInfo[c.SourceIP] = c.CheckDetail
    }
    
  • 最后在接受到 stopCh 信号时,通过 svr.Shutdown 平滑关闭服务

4、Vote

在接受到其它节点的健康检查结果后,vote 模块会对结果进行统计得出最终判决,并向 apiserver 报告:

func (v *VoteEdge) Vote(edgeHealthMetadata *metadata.EdgeHealthMetadata, kubeclient clientset.Interface,localIp string, stopCh <-chan struct{}) {go wait.Until(func() {v.vote(edgeHealthMetadata, kubeclient, localIp, stopCh)}, time.Duration(v.VotePeriod)*time.Second, stopCh)
}

首先根据检查结果统计出状态正常以及异常的节点列表:

type votePair struct {pros intcons int
}
...
var (prosVoteIpList, consVoteIpList []string// Init votePair since cannot assign to struct field voteCountMap[checkedIp].pros in mapvp votePair
)
voteCountMap := make(map[string]votePair) // {"127.0.0.1":{"pros":1,"cons":2}}
copyCheckInfo := edgeHealthMetadata.CopyAll()
// Note that voteThreshold should be calculated by checked instead of checker
// since checked represents the total valid edge health nodes while checker may contain partly ones.
voteThreshold := (edgeHealthMetadata.GetCheckedIpLen() + 1) / 2
for _, checkedDetails := range copyCheckInfo {for checkedIp, checkedDetail := range checkedDetails {if !time.Now().After(checkedDetail.Time.Add(time.Duration(v.VoteTimeout) * time.Second)) {if _, existed := voteCountMap[checkedIp]; !existed {voteCountMap[checkedIp] = votePair{0, 0}}vp = voteCountMap[checkedIp]if checkedDetail.Normal {vp.pros++if vp.pros >= voteThreshold {prosVoteIpList = append(prosVoteIpList, checkedIp)}} else {vp.cons++if vp.cons >= voteThreshold {consVoteIpList = append(consVoteIpList, checkedIp)}}voteCountMap[checkedIp] = vp}}
}
log.V(4).Infof("Vote: voteCountMap is %+v", voteCountMap)
...

其中状态判断的逻辑如下:

  • 如果超过一半(>)的节点对该节点的检查结果为正常,则认为该节点状态正常(注意时间差在 VoteTimeout 内)
  • 如果超过一半(>)的节点对该节点的检查结果为异常,则认为该节点状态异常(注意时间差在 VoteTimeout 内)
  • 除开上述情况,认为节点状态判断无效,对这些节点不做任何处理(可能存在脑裂的情况)

对状态正常的节点做如下处理:

...
// Handle prosVoteIpList
util.ParallelizeUntil(context.TODO(), 16, len(prosVoteIpList), func(index int) {if node := edgeHealthMetadata.GetNodeByAddr(prosVoteIpList[index]); node != nil {log.V(4).Infof("Vote: vote pros to edge node %s begin ...", node.Name)nodeCopy := node.DeepCopy()needUpdated := falseif nodeCopy.Annotations == nil {nodeCopy.Annotations = map[string]string{common.NodeHealthAnnotation: common.NodeHealthAnnotationPros,}needUpdated = true} else {if healthy, existed := nodeCopy.Annotations[common.NodeHealthAnnotation]; existed {if healthy != common.NodeHealthAnnotationPros {nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationProsneedUpdated = true}} else {nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationProsneedUpdated = true}}if index, existed := admissionutil.TaintExistsPosition(nodeCopy.Spec.Taints, common.UnreachableNoExecuteTaint); existed {nodeCopy.Spec.Taints = append(nodeCopy.Spec.Taints[:index], nodeCopy.Spec.Taints[index+1:]...)needUpdated = true}if needUpdated {if _, err := kubeclient.CoreV1().Nodes().Update(context.TODO(), nodeCopy, metav1.UpdateOptions{}); err != nil {log.Errorf("Vote: update pros vote to edge node %s error %+v ", nodeCopy.Name, err)} else {log.V(2).Infof("Vote: update pros vote to edge node %s successfully", nodeCopy.Name)}}} else {log.Warningf("Vote: edge node addr %s not found", prosVoteIpList[index])}
})
...
  • 添加或者更新"superedgehealth/node-health" annotation 值为"true",表明分布式健康检查判断该节点状态正常。
  • 如果node存在 NoExecute(node.kubernetes.io/unreachable) taint,则将其去掉,并更新 node.

而对状态异常的节点会添加或者更新"superedgehealth/node-health" annotation值为"false",表明分布式健康检查判断该节点状态异常:

// Handle consVoteIpList
util.ParallelizeUntil(context.TODO(), 16, len(consVoteIpList), func(index int) {if node := edgeHealthMetadata.GetNodeByAddr(consVoteIpList[index]); node != nil {log.V(4).Infof("Vote: vote cons to edge node %s begin ...", node.Name)nodeCopy := node.DeepCopy()needUpdated := falseif nodeCopy.Annotations == nil {nodeCopy.Annotations = map[string]string{common.NodeHealthAnnotation: common.NodeHealthAnnotationCons,}needUpdated = true} else {if healthy, existed := nodeCopy.Annotations[common.NodeHealthAnnotation]; existed {if healthy != common.NodeHealthAnnotationCons {nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationConsneedUpdated = true}} else {nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationConsneedUpdated = true}}if needUpdated {if _, err := kubeclient.CoreV1().Nodes().Update(context.TODO(), nodeCopy, metav1.UpdateOptions{}); err != nil {log.Errorf("Vote: update cons vote to edge node %s error %+v ", nodeCopy.Name, err)} else {log.V(2).Infof("Vote: update cons vote to edge node %s successfully", nodeCopy.Name)}}} else {log.Warningf("Vote: edge node addr %s not found", consVoteIpList[index])}
})

在边端 edge-health-daemon 向 apiserver 发送节点健康结果后,云端运行 edge-health-admission(Kubernetes mutating admission webhook),会不断根据 node edge-health annotation 调整 kube-controller-manager 设置的 node taint(去掉NoExecute taint) 以及 endpoints(将失联节点上的 pods 从 endpoint subsets notReadyAddresses 移到 addresses中),从而实现即便云边断连,但是分布式健康检查状态正常的情况下:

  • 失联的节点上的 pod 不会从 Service 的 Endpoint 列表中移除
  • 失联的节点上的 pod 不会被驱逐

总结

  • 分布式健康检查对于云边断连情况的处理区别原生 Kubernetes 如下:

    • 原生 Kubernetes:

      • 失联的节点被置为 ConditionUnknown 状态,并被添加 NoSchedule 和 NoExecute 的 taints
      • 失联的节点上的pod被驱逐,并在其他节点上进行重建
      • 失联的节点上的pod从 Service 的 Endpoint 列表中移除
    • 分布式健康检查:
  • 分布式健康检查主要通过如下三个层面增强节点状态判断的准确性:
    • 每个节点定期探测其他节点健康状态
    • 集群内所有节点定期投票决定各节点的状态
    • 云端和边端节点共同决定节点状态
  • 分布式健康检查功能由边端的 edge-health-daemon 以及云端的 edge-health-admission 组成,功能分别如下:
    • edge-health-daemon:对同区域边缘节点执行分布式健康检查,并向 apiserver 发送健康状态投票结果(给 node 打 annotation),主体逻辑包括四部分功能:

      • SyncNodeList:根据边缘节点所在的 zone 刷新 node cache,同时更新 CheckMetadata 相关数据
      • ExecuteCheck:对每个边缘节点执行若干种类的健康检查插件(ping,kubelet等),并将各插件检查分数汇总,根据用户设置的基准线得出节点是否健康的结果
      • Commun:将本节点对其它各节点健康检查的结果发送给其它节点
      • Vote:对所有节点健康检查的结果分类,如果某个节点被大多数(>1/2)节点判定为正常,则对该节点添加 superedgehealth/node-health:true annotation,表明该节点分布式健康检查结果为正常;否则,对该节点添加 superedgehealth/node-health:false annotation,表明该节点分布式健康检查结果为异常
    • edge-health-admission(Kubernetes mutating admission webhook):不断根据 node edge-health annotation 调整 kube-controller-manager 设置的 node taint(去掉 NoExecute taint)以及endpoints(将失联节点上的 pods 从 endpoint subsets notReadyAddresses 移到 addresses中),从而实现云端和边端共同决定节点状态

duyanghao kubernetes-reading-notes

【腾讯云原生】云说新品、云研新术、云游新活、云赏资讯,扫码关注同名公众号,及时获取更多干货!!

一文读懂 SuperEdge 分布式健康检查 (边端)相关推荐

  1. 腾讯资深架构师干货总结:一文读懂大型分布式系统设计的方方面面

    1.引言 我们常常会听说,某个互联网应用的服务器端系统多么牛逼,比如QQ.微信.淘宝.那么,一个大型互联网应用的服务器端系统,到底牛逼在什么地方?为什么海量的用户访问,会让一个服务器端系统变得更复杂? ...

  2. 一文读懂:Kafka(分布式消息队列)的基础概念,教程

    [提前声明] 文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章 写作不易,转载请注明,谢谢! 代码案例地址: ?https://github.com/Mydreamandrea ...

  3. 独家 | 一文读懂Corda分布式记账技术

    作者:Dan Newton 翻译:申利彬 校对:丁楠雅 本文约2600字,建议阅读10分钟. 本文为你介绍借鉴了区块链的部分特性的分布式记账技术,并分析其背后的原因. 什么是Corda? 最近我开始了 ...

  4. 从根上理解高性能、高并发(七):深入操作系统,一文读懂进程、线程、协程

    本文引用了"一文读懂什么是进程.线程.协程"一文的主要内容,感谢原作者的无私分享. 1.系列文章引言 1.1 文章目的 作为即时通讯技术的开发者来说,高性能.高并发相关的技术概念早 ...

  5. 即时通讯新手入门:一文读懂什么是Nginx?它能否实现IM的负载均衡?

    本文引用了"蔷薇Nina"的"Nginx 相关介绍(Nginx是什么?能干嘛?)"一文部分内容,感谢作者的无私分享. 1.引言 Nginx(及其衍生产品)是目前 ...

  6. 一文读懂什么是分布式文件系统

    一文读懂什么是分布式文件系统 什么是分布式文件系统 一般文件系统 什么是分布式文件系统 分布式文件系统与一般文件系统的对比 什么是分布式文件系统 本篇来讲讲什么是一般文件系统,什么是分布式文件系统,以 ...

  7. AI洞观 | 一文读懂英特尔的AI之路

    AI洞观 | 一文读懂英特尔的AI之路 https://mp.weixin.qq.com/s/E9NqeywzQ4H2XCFFOFcKXw 11月13日-14日,英特尔人工智能大会(AIDC)在北京召 ...

  8. 一文读懂BERT(原理篇)

    一文读懂BERT(原理篇) 2018年的10月11日,Google发布的论文<Pre-training of Deep Bidirectional Transformers for Langua ...

  9. 一文读懂:什么是区块链

    今天写一个能够一文读懂区块链的文章,以后谁再问我区块链是什么东西,我就把这篇文章发给他. -------- 个人技术公众号:解决方案工程师 欢迎同领域的朋友关注.相互交流. -------- 区块链技 ...

最新文章

  1. 如何将深度学习研究论文实现为代码的几个要点
  2. C++判断一个数是否为回文数palindrome的算法(附完整源码)
  3. java里的字符流_javaIO流中字符流的应用
  4. History(历史)命令用法 15 例
  5. mysql8优化实战
  6. 7-7 念数字 (10 分)
  7. 电商无线页面设计手机移动端的设计模板
  8. C#中引用第三方ocx控件引发的问题以及解决办法
  9. 学习笔记(01):5天Python闯关训练营-103期-re模块使用案例
  10. RK3288_Android7.1平台基于DRM框架的LCD开发
  11. Modbus通讯协议详解与RTU通信实例演示
  12. 鱼之死,越狱章鱼和雾霾黑客
  13. Excel 制作色卡
  14. 【WiFi 6E】WiFi 6E信道分布
  15. Photoshop CC 2019 软件安装教程
  16. MYSQL 查询某个月有多少天数?
  17. sublime text 添加到鼠标右键功能
  18. PicPick软件免费版与正式版区别
  19. 使用跟踪查看器查看 ASP.NET 跟踪信息
  20. Linux文件打补丁

热门文章

  1. 商城倒计时(时分秒倒计时、分秒毫秒倒计时)
  2. podman基本设置使用及分发容器镜像与一些常用命令
  3. 关于Headroom电压余量的介绍
  4. excel单元格式设置
  5. 【参考】前端面试攻略(自整理)
  6. Thinkphp5中使用redis队列发送消息
  7. 鐘明系列十:『32阶3次幻方』
  8. Transformations between ECEF and ENU coordinates
  9. DNA 1. Germline Mutation Vs. Somatic Mutation 傻傻分不清楚
  10. 【推荐系统】{1} —— 基于用户的协同过滤算法