一文读懂 SuperEdge 分布式健康检查 (边端)
作者:杜杨浩,腾讯云高级工程师,热衷于开源、容器和Kubernetes。目前主要从事镜像仓库、Kubernetes集群高可用&备份还原,以及边缘计算相关研发工作。
前言
SuperEdge 是基于原生 Kubernetes 的边缘容器管理系统。该系统把云原生能力扩展到边缘侧,很好的实现了云端对边缘端的管理和控制,极大简化了应用从云端部署到边缘端的过程。同时SuperEdge设计了分布式健康检查机制规避了云边网络不稳定造成的大量pod迁移和重建,保证了服务的稳定。
边缘计算场景下,边缘节点与云端的网络环境十分复杂,连接并不可靠,在原生 Kubernetes 集群中,会造成 apiserver 和节点连接的中断,节点状态的异常,最终导致pod的驱逐和 endpoint 的缺失,造成服务的中断和波动,具体来说原生 Kubernetes 处理如下:
- 失联的节点被置为 ConditionUnknown 状态,并被添加 NoSchedule 和 NoExecute 的 taints
- 失联的节点上的 pod 被驱逐,并在其他节点上进行重建
- 失联的节点上的 pod 从 Service 的 Endpoint 列表中移除
因此,边缘计算场景仅仅依赖边端和 apiserver 的连接情况是不足以判断节点是否异常的,会因为网络的不可靠造成误判,影响正常服务。而相较于云端和边缘端的连接,显然边端节点之间的连接更为稳定,具有更高的参考价值,因此 superedge 提出了边缘分布式健康检查机制。该机制中节点状态判定除了要考虑 apiserver 的因素外,还引入了节点的评估因素,进而对节点进行更为全面的状态判断。通过这个功能,能够避免由于云边网络不可靠造成的大量的pod迁移和重建,保证服务的稳定
具体来说,主要通过如下三个层面增强节点状态判断的准确性:
- 每个节点定期探测其他节点健康状态
- 集群内所有节点定期投票决定各节点的状态
- 云端和边端节点共同决定节点状态
而分布式健康检查最终的判断处理如下:
edge-health-daemon 源码分析
在深入源码之前先介绍一下分布式健康检查的实现原理,其架构图如下所示:
Kubernetes 每个 node 在 kube-node-lease namespace 下会对应一个 Lease object,kubelet 每隔 node-status-update-frequency 时间(默认10s)会更新对应node的 Lease object
node-controller 会每隔 node-monitor-period 时间(默认5s)检查 Lease object 是否更新,如果超过 node-monitor-grace-period 时间(默认40s)没有发生过更新,则认为这个 node 不健康,会更新 NodeStatus(ConditionUnknown)
而当节点心跳超时(ConditionUnknown)之后,node controller 会给该 node 添加如下 taints:
spec:...taints:- effect: NoSchedulekey: node.kubernetes.io/unreachabletimeAdded: "2020-07-02T03:50:47Z"- effect: NoExecutekey: node.kubernetes.io/unreachabletimeAdded: "2020-07-02T03:50:53Z"
同时,endpoint controller 会从 endpoint backend 中踢掉该母机上的所有 pod
对于打上 NoSchedule taint 的母机,Scheduler 不会调度新的负载在该 node 上了;而对于打上 NoExecute(node.kubernetes.io/unreachable) taint 的母机,node controller 会在节点心跳超时之后一段时间(默认5mins)驱逐该节点上的 pod
分布式健康检查边端的 edge-health-daemon 组件会对同区域边缘节点执行分布式健康检查,并向 apiserver 发送健康状态投票结果(给 node 打 annotation)
此外,为了实现在云边断连且分布式健康检查状态正常的情况下:
- 失联的节点上的 pod 不会从 Service 的 Endpoint 列表中移除
- 失联的节点上的 pod 不会被驱逐
还需要在云端运行 edge-health-admission(Kubernetes mutating admission webhook),不断根据 node edge-health annotation 调整 kube-controller-manager 设置的 node taint(去掉NoExecute taint)以及 endpoints (将失联节点上的 pods 从 endpoint subsets notReadyAddresses 移到 addresses中),从而实现云端和边端共同决定节点状态
本章将主要介绍 edge-health-daemon 原理,如下为 edge-health-daemon 的相关数据结构:
type EdgeHealthMetadata struct {*NodeMetadata*CheckMetadata
}
type NodeMetadata struct {NodeList []v1.Nodesync.RWMutex
}
type CheckMetadata struct {CheckInfo map[string]map[string]CheckDetail // Checker ip:{Checked ip:Check detail}CheckPluginScoreInfo map[string]map[string]float64 // Checked ip:{Plugin name:Check score}sync.RWMutex
}
type CheckDetail struct {Normal boolTime time.Time
}
type CommunInfo struct {SourceIP string // ClientIP,Checker ipCheckDetail map[string]CheckDetail // Checked ip:Check detailHmac string
}
含义如下:
- NodeMetadata:为了实现分区域分布式健康检查机制而维护的边缘节点 cache,其中包含该区域内的所有边缘节点列表 NodeList
- CheckMetadata:存放健康检查的结果,具体来说包括两个数据结构:
- CheckPluginScoreInfo:为
Checked ip:{Plugin name:Check score}
组织形式。第一级 key 表示:被检查的ip;第二级 key 表示:检查插件的名称;value 表示:检查分数 - CheckInfo:为
Checker ip:{Checked ip:Check detail}
组织形式。第一级key表示:执行检查的ip;第二级key表示:被检查的ip;value表示检查结果 CheckDetail
- CheckPluginScoreInfo:为
- CheckDetail:代表健康检查的结果
- Normal:Normal 为 true 表示检查结果正常;false 表示异常
- Time:表示得出该结果时的时间,用于结果有效性的判断(超过一段时间没有更新的结果将无效)
- CommunInfo:边缘节点向其它节点发送健康检查结果时使用的数据,其中包括:
- SourceIP:表示执行检查的ip
- CheckDetail:为
Checked ip:Check detail
组织形式,包含被检查的ip以及检查结果 - Hmac:SourceIP 以及 CheckDetail 进行 hmac 得到,用于边缘节点通信过程中判断传输数据的有效性(是否被篡改)
edge-health-daemon 主体逻辑包括四部分功能:
- SyncNodeList:根据边缘节点所在的 zone 刷新 node cache,同时更新 CheckMetadata相关数据
- ExecuteCheck:对每个边缘节点执行若干种类的健康检查插件(ping,kubelet等),并将各插件检查分数汇总,根据用户设置的基准线得出节点是否健康的结果
- Commun:将本节点对其它各节点健康检查的结果发送给其它节点
- Vote:对所有节点健康检查的结果分类,如果某个节点被大多数(>1/2)节点判定为正常,则对该节点添加
superedgehealth/node-health:true
annotation,表明该节点分布式健康检查结果为正常;否则,对该节点添加superedgehealth/node-health:false
annotation,表明该节点分布式健康检查结果为异常
下面依次对上述功能进行源码分析:
1、SyncNodeList
SyncNodeList 每隔 HealthCheckPeriod 秒(health-check-period 选项)执行一次,会按照如下情况分类刷新 node cache:
- 如果 kube-system namespace 下不存在名为 edge-health-zone-config的configmap,则没有开启多地域探测,因此会获取所有边缘节点列表并刷新 node cache
- 否则,如果 edge-health-zone-config 的 configmap 数据部分 TaintZoneAdmission 为 false,则没有开启多地域探测,因此会获取所有边缘节点列表并刷新 node cache
- 如果 TaintZoneAdmission 为 true,且 node 有"superedgehealth/topology-zone"标签(标示区域),则获取"superedgehealth/topology-zone" label value 相同的节点列表并刷新 node cache
- 如果 node 没有"superedgehealth/topology-zone" label,则只会将边缘节点本身添加到分布式健康检查节点列表中并刷新 node cache(only itself)
func (ehd *EdgeHealthDaemon) SyncNodeList() {// Only sync nodes when self-located foundvar host *v1.Nodeif host = ehd.metadata.GetNodeByName(ehd.cfg.Node.HostName); host == nil {klog.Errorf("Self-hostname %s not found", ehd.cfg.Node.HostName)return}// Filter cloud nodes and retain edge onesmasterRequirement, err := labels.NewRequirement(common.MasterLabel, selection.DoesNotExist, []string{})if err != nil {klog.Errorf("New masterRequirement failed %+v", err)return}masterSelector := labels.NewSelector()masterSelector = masterSelector.Add(*masterRequirement)if mrc, err := ehd.cmLister.ConfigMaps(metav1.NamespaceSystem).Get(common.TaintZoneConfigMap); err != nil {if apierrors.IsNotFound(err) { // multi-region configmap not foundif NodeList, err := ehd.nodeLister.List(masterSelector); err != nil {klog.Errorf("Multi-region configmap not found and get nodes err %+v", err)return} else {ehd.metadata.SetByNodeList(NodeList)}} else {klog.Errorf("Get multi-region configmap err %+v", err)return}} else { // multi-region configmap foundmrcv := mrc.Data[common.TaintZoneConfigMapKey]klog.V(4).Infof("Multi-region value is %s", mrcv)if mrcv == "false" { // close multi-region checkif NodeList, err := ehd.nodeLister.List(masterSelector); err != nil {klog.Errorf("Multi-region configmap exist but disabled and get nodes err %+v", err)return} else {ehd.metadata.SetByNodeList(NodeList)}} else { // open multi-region checkif hostZone, existed := host.Labels[common.TopologyZone]; existed {klog.V(4).Infof("Host %s has HostZone %s", host.Name, hostZone)zoneRequirement, err := labels.NewRequirement(common.TopologyZone, selection.Equals, []string{hostZone})if err != nil {klog.Errorf("New masterZoneRequirement failed: %+v", err)return}masterZoneSelector := labels.NewSelector()masterZoneSelector = masterZoneSelector.Add(*masterRequirement, *zoneRequirement)if nodeList, err := ehd.nodeLister.List(masterZoneSelector); err != nil {klog.Errorf("TopologyZone label for hostname %s but get nodes err: %+v", host.Name, err)return} else {ehd.metadata.SetByNodeList(nodeList)}} else { // Only check itself if there is no TopologyZone labelklog.V(4).Infof("Only check itself since there is no TopologyZone label for hostname %s", host.Name)ehd.metadata.SetByNodeList([]*v1.Node{host})}}}// Init check plugin scoreipList := make(map[string]struct{})for _, node := range ehd.metadata.Copy() {for _, addr := range node.Status.Addresses {if addr.Type == v1.NodeInternalIP {ipList[addr.Address] = struct{}{}ehd.metadata.InitCheckPluginScore(addr.Address)}}}// Delete redundant check plugin scorefor _, checkedIp := range ehd.metadata.CopyCheckedIp() {if _, existed := ipList[checkedIp]; !existed {ehd.metadata.DeleteCheckPluginScore(checkedIp)}}// Delete redundant check infofor checkerIp := range ehd.metadata.CopyAll() {if _, existed := ipList[checkerIp]; !existed {ehd.metadata.DeleteByIp(ehd.cfg.Node.LocalIp, checkerIp)}}klog.V(4).Infof("SyncNodeList check info %+v successfully", ehd.metadata)
}
...
func (cm *CheckMetadata) DeleteByIp(localIp, ip string) {cm.Lock()defer cm.Unlock()delete(cm.CheckInfo[localIp], ip)delete(cm.CheckInfo, ip)
}
在按照如上逻辑更新node cache之后,会初始化CheckMetadata.CheckPluginScoreInfo,将节点ip赋值给CheckPluginScoreInfo key(Checked ip
:被检查的ip)
另外,会删除CheckMetadata.CheckPluginScoreInfo以及CheckMetadata.CheckInfo中多余的items(不属于该边缘节点检查范围)
2、ExecuteCheck
ExecuteCheck也是每隔HealthCheckPeriod秒(health-check-period选项)执行一次,会对每个边缘节点执行若干种类的健康检查插件(ping,kubelet等),并将各插件检查分数汇总,根据用户设置的基准线HealthCheckScoreLine(health-check-scoreline选项)得出节点是否健康的结果
func (ehd *EdgeHealthDaemon) ExecuteCheck() {util.ParallelizeUntil(context.TODO(), 16, len(ehd.checkPlugin.Plugins), func(index int) {ehd.checkPlugin.Plugins[index].CheckExecute(ehd.metadata.CheckMetadata)})klog.V(4).Infof("CheckPluginScoreInfo is %+v after health check", ehd.metadata.CheckPluginScoreInfo)for checkedIp, pluginScores := range ehd.metadata.CopyCheckPluginScore() {totalScore := 0.0for _, score := range pluginScores {totalScore += score}if totalScore >= ehd.cfg.Check.HealthCheckScoreLine {ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: true})} else {ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: false})}}klog.V(4).Infof("CheckInfo is %+v after health check", ehd.metadata.CheckInfo)
}
这里会调用 ParallelizeUntil 并发执行各检查插件,edge-health 目前支持 ping 以及 kubelet 两种检查插件,在 checkplugin 目录(github.com/superedge/superedge/pkg/edge-health/checkplugin),通过 Register 注册到 PluginInfo 单例(plugin列表)中,如下:
// TODO: handle flag parse errors
func (pcp *PingCheckPlugin) Set(s string) error {var err errorfor _, para := range strings.Split(s, ",") {if len(para) == 0 {continue}arr := strings.Split(para, "=")trimKey := strings.TrimSpace(arr[0])switch trimKey {case "timeout":timeout, _ := strconv.Atoi(strings.TrimSpace(arr[1]))pcp.HealthCheckoutTimeOut = timeoutcase "retries":retries, _ := strconv.Atoi(strings.TrimSpace(arr[1]))pcp.HealthCheckRetries = retriescase "weight":weight, _ := strconv.ParseFloat(strings.TrimSpace(arr[1]), 64)pcp.Weight = weightcase "port":port, _ := strconv.Atoi(strings.TrimSpace(arr[1]))pcp.Port = port}}PluginInfo = NewPlugin()PluginInfo.Register(pcp)return err
}
func (p *Plugin) Register(plugin CheckPlugin) {p.Plugins = append(p.Plugins, plugin)klog.V(4).Info("Register check plugin: %+v", plugin)
}
...
var (PluginOnce sync.OncePluginInfo Plugin
)
type Plugin struct {Plugins []CheckPlugin
}
func NewPlugin() Plugin {PluginOnce.Do(func() {PluginInfo = Plugin{Plugins: []CheckPlugin{},}})return PluginInfo
}
每种插件具体执行健康检查的逻辑封装在 CheckExecute 中,这里以 ping plugin 为例:
// github.com/superedge/superedge/pkg/edge-health/checkplugin/pingcheck.go
func (pcp *PingCheckPlugin) CheckExecute(checkMetadata *metadata.CheckMetadata) {copyCheckedIp := checkMetadata.CopyCheckedIp()util.ParallelizeUntil(context.TODO(), 16, len(copyCheckedIp), func(index int) {checkedIp := copyCheckedIp[index]var err errorfor i := 0; i < pcp.HealthCheckRetries; i++ {if _, err := net.DialTimeout("tcp", checkedIp+":"+strconv.Itoa(pcp.Port), time.Duration(pcp.HealthCheckoutTimeOut)*time.Second); err == nil {break}}if err == nil {klog.V(4).Infof("Edge ping health check plugin %s for ip %s succeed", pcp.Name(), checkedIp)checkMetadata.SetByPluginScore(checkedIp, pcp.Name(), pcp.GetWeight(), common.CheckScoreMax)} else {klog.Warning("Edge ping health check plugin %s for ip %s failed, possible reason %s", pcp.Name(), checkedIp, err.Error())checkMetadata.SetByPluginScore(checkedIp, pcp.Name(), pcp.GetWeight(), common.CheckScoreMin)}})
}
// CheckPluginScoreInfo relevant functions
func (cm *CheckMetadata) SetByPluginScore(checkedIp, pluginName string, weight float64, score int) {cm.Lock()defer cm.Unlock()if _, existed := cm.CheckPluginScoreInfo[checkedIp]; !existed {cm.CheckPluginScoreInfo[checkedIp] = make(map[string]float64)}cm.CheckPluginScoreInfo[checkedIp][pluginName] = float64(score) * weight
}
CheckExecute 会对同区域每个节点执行 ping 探测(net.DialTimeout),如果失败,则给该节点打 CheckScoreMin 分(0);否则,打 CheckScoreMax 分(100)
每种检查插件会有一个 Weight 参数,表示了该检查插件分数的权重值,所有权重参数之和应该为1,对应基准分数线 HealthCheckScoreLine 范围0-100。因此这里在设置分数时,会乘以权重
回到 ExecuteCheck 函数,在调用各插件执行健康检查得出权重分数(CheckPluginScoreInfo)后,还需要将该分数与基准线 HealthCheckScoreLine 对比:如果高于(>=)分数线,则认为该节点本次检查正常;否则异常
func (ehd *EdgeHealthDaemon) ExecuteCheck() {util.ParallelizeUntil(context.TODO(), 16, len(ehd.checkPlugin.Plugins), func(index int) {ehd.checkPlugin.Plugins[index].CheckExecute(ehd.metadata.CheckMetadata)})klog.V(4).Infof("CheckPluginScoreInfo is %+v after health check", ehd.metadata.CheckPluginScoreInfo)for checkedIp, pluginScores := range ehd.metadata.CopyCheckPluginScore() {totalScore := 0.0for _, score := range pluginScores {totalScore += score}if totalScore >= ehd.cfg.Check.HealthCheckScoreLine {ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: true})} else {ehd.metadata.SetByCheckDetail(ehd.cfg.Node.LocalIp, checkedIp, metadata.CheckDetail{Normal: false})}}klog.V(4).Infof("CheckInfo is %+v after health check", ehd.metadata.CheckInfo)
}
3、Commun
在对同区域各边缘节点执行健康检查后,需要将检查的结果传递给其它各节点,这也就是 commun 模块负责的事情:
func (ehd *EdgeHealthDaemon) Run(stopCh <-chan struct{}) {// Execute edge health prepare and checkehd.PrepareAndCheck(stopCh)// Execute votevote := vote.NewVoteEdge(&ehd.cfg.Vote)go vote.Vote(ehd.metadata, ehd.cfg.Kubeclient, ehd.cfg.Node.LocalIp, stopCh)// Execute communicationcommunEdge := commun.NewCommunEdge(&ehd.cfg.Commun)communEdge.Commun(ehd.metadata.CheckMetadata, ehd.cmLister, ehd.cfg.Node.LocalIp, stopCh)<-stopCh
}
既然是互相传递结果给其它节点,则必然会有接受和发送模块:
func (c *CommunEdge) Commun(checkMetadata *metadata.CheckMetadata, cmLister corelisters.ConfigMapLister, localIp string, stopCh <-chan struct{}) {go c.communReceive(checkMetadata, cmLister, stopCh)wait.Until(func() {c.communSend(checkMetadata, cmLister, localIp)}, time.Duration(c.CommunPeriod)*time.Second, stopCh)
}
其中 communSend 负责向其它节点发送本节点对它们的检查结果;而 communReceive 负责接受其它边缘节点的检查结果。下面依次分析:
func (c *CommunEdge) communSend(checkMetadata *metadata.CheckMetadata, cmLister corelisters.ConfigMapLister, localIp string) {copyLocalCheckDetail := checkMetadata.CopyLocal(localIp)var checkedIps []stringfor checkedIp := range copyLocalCheckDetail {checkedIps = append(checkedIps, checkedIp)}util.ParallelizeUntil(context.TODO(), 16, len(checkedIps), func(index int) {// Only send commun information to other edge nodes(excluding itself)dstIp := checkedIps[index]if dstIp == localIp {return}// Send commun informationcommunInfo := metadata.CommunInfo{SourceIP: localIp, CheckDetail: copyLocalCheckDetail}if hmac, err := util.GenerateHmac(communInfo, cmLister); err != nil {log.Errorf("communSend: generateHmac err %+v", err)return} else {communInfo.Hmac = hmac}commonInfoBytes, err := json.Marshal(communInfo)if err != nil {log.Errorf("communSend: json.Marshal commun info err %+v", err)return}commonInfoReader := bytes.NewReader(commonInfoBytes)for i := 0; i < c.CommunRetries; i++ {req, err := http.NewRequest("PUT", "http://"+dstIp+":"+strconv.Itoa(c.CommunServerPort)+"/result", commonInfoReader)if err != nil {log.Errorf("communSend: NewRequest for remote edge node %s err %+v", dstIp, err)continue}if err = util.DoRequestAndDiscard(c.client, req); err != nil {log.Errorf("communSend: DoRequestAndDiscard for remote edge node %s err %+v", dstIp, err)} else {log.V(4).Infof("communSend: put commun info %+v to remote edge node %s successfully", communInfo, dstIp)break}}})
}
发送逻辑如下:
- 构建 CommunInfo 结构体,包括:
- SourceIP:表示执行检查的ip
- CheckDetail:为 Checked ip:Check detail 组织形式,包含被检查的ip以及检查结果
- 调用 GenerateHmac 构建 Hmac:实际上是以 kube-system 下的 hmac-config configmap hmackey 字段为 key,对 SourceIP 以及 CheckDetail进行 hmac 得到,用于判断传输数据的有效性(是否被篡改)
func GenerateHmac(communInfo metadata.CommunInfo, cmLister corelisters.ConfigMapLister) (string, error) {addrBytes, err := json.Marshal(communInfo.SourceIP)if err != nil {return "", err}detailBytes, _ := json.Marshal(communInfo.CheckDetail)if err != nil {return "", err}hmacBefore := string(addrBytes) + string(detailBytes)if hmacConf, err := cmLister.ConfigMaps(metav1.NamespaceSystem).Get(common.HmacConfig); err != nil {return "", err} else {return GetHmacCode(hmacBefore, hmacConf.Data[common.HmacKey])}
}
func GetHmacCode(s, key string) (string, error) {h := hmac.New(sha256.New, []byte(key))if _, err := io.WriteString(h, s); err != nil {return "", err}return fmt.Sprintf("%x", h.Sum(nil)), nil
}
- 发送上述构建的 CommunInfo 给其它边缘节点(DoRequestAndDiscard)
communReceive逻辑也很清晰:
// TODO: support changeable server listen port
func (c *CommunEdge) communReceive(checkMetadata *metadata.CheckMetadata, cmLister corelisters.ConfigMapLister, stopCh <-chan struct{}) {svr := &http.Server{Addr: ":" + strconv.Itoa(c.CommunServerPort)}svr.ReadTimeout = time.Duration(c.CommunTimeout) * time.Secondsvr.WriteTimeout = time.Duration(c.CommunTimeout) * time.Secondhttp.HandleFunc("/debug/flags/v", pkgutil.UpdateLogLevel)http.HandleFunc("/result", func(w http.ResponseWriter, r *http.Request) {var communInfo metadata.CommunInfoif r.Body == nil {http.Error(w, "Invalid commun information", http.StatusBadRequest)return}err := json.NewDecoder(r.Body).Decode(&communInfo)if err != nil {http.Error(w, fmt.Sprintf("Invalid commun information %+v", err), http.StatusBadRequest)return}log.V(4).Infof("Received common information from %s : %+v", communInfo.SourceIP, communInfo.CheckDetail)if _, err := io.WriteString(w, "Received!\n"); err != nil {log.Errorf("communReceive: send response err %+v", err)http.Error(w, fmt.Sprintf("Send response err %+v", err), http.StatusInternalServerError)return}if hmac, err := util.GenerateHmac(communInfo, cmLister); err != nil {log.Errorf("communReceive: server GenerateHmac err %+v", err)http.Error(w, fmt.Sprintf("GenerateHmac err %+v", err), http.StatusInternalServerError)return} else {if hmac != communInfo.Hmac {log.Errorf("communReceive: Hmac not equal, hmac is %s but received commun info hmac is %s", hmac, communInfo.Hmac)http.Error(w, "Hmac not match", http.StatusForbidden)return}}log.V(4).Infof("communReceive: Hmac match")checkMetadata.SetByCommunInfo(communInfo)log.V(4).Infof("After communicate, check info is %+v", checkMetadata.CheckInfo)})go func() {if err := svr.ListenAndServe(); err != http.ErrServerClosed {log.Fatalf("Server: exit with error %+v", err)}}()for {select {case <-stopCh:ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)defer cancel()if err := svr.Shutdown(ctx); err != nil {log.Errorf("Server: program exit, server exit error %+v", err)}returndefault:}}
}
负责接受其它边缘节点的检查结果,并写入自身检查结果 CheckInfo,流程如下:
通过
/result
路由接受请求,并将请求内容解析成 CommunInfo对 CommunInfo 执行 GenerateHmac 获取hmac值,并与 CommunInfo.Hmac 字段进行对比,检查接受数据的有效性
最后将 CommunInfo 检查结果写入 CheckInfo,注意:CheckDetail.Time 设置为写入时的时间
// CheckInfo relevant functions func (cm *CheckMetadata) SetByCommunInfo(c CommunInfo) {cm.Lock()defer cm.Unlock()if _, existed := cm.CheckInfo[c.SourceIP]; !existed {cm.CheckInfo[c.SourceIP] = make(map[string]CheckDetail)}for k, detail := range c.CheckDetail {// Update time to local timestamp since different machines have different onesdetail.Time = time.Now()c.CheckDetail[k] = detail}cm.CheckInfo[c.SourceIP] = c.CheckDetail }
最后在接受到 stopCh 信号时,通过 svr.Shutdown 平滑关闭服务
4、Vote
在接受到其它节点的健康检查结果后,vote 模块会对结果进行统计得出最终判决,并向 apiserver 报告:
func (v *VoteEdge) Vote(edgeHealthMetadata *metadata.EdgeHealthMetadata, kubeclient clientset.Interface,localIp string, stopCh <-chan struct{}) {go wait.Until(func() {v.vote(edgeHealthMetadata, kubeclient, localIp, stopCh)}, time.Duration(v.VotePeriod)*time.Second, stopCh)
}
首先根据检查结果统计出状态正常以及异常的节点列表:
type votePair struct {pros intcons int
}
...
var (prosVoteIpList, consVoteIpList []string// Init votePair since cannot assign to struct field voteCountMap[checkedIp].pros in mapvp votePair
)
voteCountMap := make(map[string]votePair) // {"127.0.0.1":{"pros":1,"cons":2}}
copyCheckInfo := edgeHealthMetadata.CopyAll()
// Note that voteThreshold should be calculated by checked instead of checker
// since checked represents the total valid edge health nodes while checker may contain partly ones.
voteThreshold := (edgeHealthMetadata.GetCheckedIpLen() + 1) / 2
for _, checkedDetails := range copyCheckInfo {for checkedIp, checkedDetail := range checkedDetails {if !time.Now().After(checkedDetail.Time.Add(time.Duration(v.VoteTimeout) * time.Second)) {if _, existed := voteCountMap[checkedIp]; !existed {voteCountMap[checkedIp] = votePair{0, 0}}vp = voteCountMap[checkedIp]if checkedDetail.Normal {vp.pros++if vp.pros >= voteThreshold {prosVoteIpList = append(prosVoteIpList, checkedIp)}} else {vp.cons++if vp.cons >= voteThreshold {consVoteIpList = append(consVoteIpList, checkedIp)}}voteCountMap[checkedIp] = vp}}
}
log.V(4).Infof("Vote: voteCountMap is %+v", voteCountMap)
...
其中状态判断的逻辑如下:
- 如果超过一半(>)的节点对该节点的检查结果为正常,则认为该节点状态正常(注意时间差在 VoteTimeout 内)
- 如果超过一半(>)的节点对该节点的检查结果为异常,则认为该节点状态异常(注意时间差在 VoteTimeout 内)
- 除开上述情况,认为节点状态判断无效,对这些节点不做任何处理(可能存在脑裂的情况)
对状态正常的节点做如下处理:
...
// Handle prosVoteIpList
util.ParallelizeUntil(context.TODO(), 16, len(prosVoteIpList), func(index int) {if node := edgeHealthMetadata.GetNodeByAddr(prosVoteIpList[index]); node != nil {log.V(4).Infof("Vote: vote pros to edge node %s begin ...", node.Name)nodeCopy := node.DeepCopy()needUpdated := falseif nodeCopy.Annotations == nil {nodeCopy.Annotations = map[string]string{common.NodeHealthAnnotation: common.NodeHealthAnnotationPros,}needUpdated = true} else {if healthy, existed := nodeCopy.Annotations[common.NodeHealthAnnotation]; existed {if healthy != common.NodeHealthAnnotationPros {nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationProsneedUpdated = true}} else {nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationProsneedUpdated = true}}if index, existed := admissionutil.TaintExistsPosition(nodeCopy.Spec.Taints, common.UnreachableNoExecuteTaint); existed {nodeCopy.Spec.Taints = append(nodeCopy.Spec.Taints[:index], nodeCopy.Spec.Taints[index+1:]...)needUpdated = true}if needUpdated {if _, err := kubeclient.CoreV1().Nodes().Update(context.TODO(), nodeCopy, metav1.UpdateOptions{}); err != nil {log.Errorf("Vote: update pros vote to edge node %s error %+v ", nodeCopy.Name, err)} else {log.V(2).Infof("Vote: update pros vote to edge node %s successfully", nodeCopy.Name)}}} else {log.Warningf("Vote: edge node addr %s not found", prosVoteIpList[index])}
})
...
- 添加或者更新"superedgehealth/node-health" annotation 值为"true",表明分布式健康检查判断该节点状态正常。
- 如果node存在 NoExecute(node.kubernetes.io/unreachable) taint,则将其去掉,并更新 node.
而对状态异常的节点会添加或者更新"superedgehealth/node-health" annotation值为"false",表明分布式健康检查判断该节点状态异常:
// Handle consVoteIpList
util.ParallelizeUntil(context.TODO(), 16, len(consVoteIpList), func(index int) {if node := edgeHealthMetadata.GetNodeByAddr(consVoteIpList[index]); node != nil {log.V(4).Infof("Vote: vote cons to edge node %s begin ...", node.Name)nodeCopy := node.DeepCopy()needUpdated := falseif nodeCopy.Annotations == nil {nodeCopy.Annotations = map[string]string{common.NodeHealthAnnotation: common.NodeHealthAnnotationCons,}needUpdated = true} else {if healthy, existed := nodeCopy.Annotations[common.NodeHealthAnnotation]; existed {if healthy != common.NodeHealthAnnotationCons {nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationConsneedUpdated = true}} else {nodeCopy.Annotations[common.NodeHealthAnnotation] = common.NodeHealthAnnotationConsneedUpdated = true}}if needUpdated {if _, err := kubeclient.CoreV1().Nodes().Update(context.TODO(), nodeCopy, metav1.UpdateOptions{}); err != nil {log.Errorf("Vote: update cons vote to edge node %s error %+v ", nodeCopy.Name, err)} else {log.V(2).Infof("Vote: update cons vote to edge node %s successfully", nodeCopy.Name)}}} else {log.Warningf("Vote: edge node addr %s not found", consVoteIpList[index])}
})
在边端 edge-health-daemon 向 apiserver 发送节点健康结果后,云端运行 edge-health-admission(Kubernetes mutating admission webhook),会不断根据 node edge-health annotation 调整 kube-controller-manager 设置的 node taint(去掉NoExecute taint) 以及 endpoints(将失联节点上的 pods 从 endpoint subsets notReadyAddresses 移到 addresses中),从而实现即便云边断连,但是分布式健康检查状态正常的情况下:
- 失联的节点上的 pod 不会从 Service 的 Endpoint 列表中移除
- 失联的节点上的 pod 不会被驱逐
总结
- 分布式健康检查对于云边断连情况的处理区别原生 Kubernetes 如下:
- 原生 Kubernetes:
- 失联的节点被置为 ConditionUnknown 状态,并被添加 NoSchedule 和 NoExecute 的 taints
- 失联的节点上的pod被驱逐,并在其他节点上进行重建
- 失联的节点上的pod从 Service 的 Endpoint 列表中移除
- 分布式健康检查:
- 原生 Kubernetes:
- 分布式健康检查主要通过如下三个层面增强节点状态判断的准确性:
- 每个节点定期探测其他节点健康状态
- 集群内所有节点定期投票决定各节点的状态
- 云端和边端节点共同决定节点状态
- 分布式健康检查功能由边端的 edge-health-daemon 以及云端的 edge-health-admission 组成,功能分别如下:
- edge-health-daemon:对同区域边缘节点执行分布式健康检查,并向 apiserver 发送健康状态投票结果(给 node 打 annotation),主体逻辑包括四部分功能:
- SyncNodeList:根据边缘节点所在的 zone 刷新 node cache,同时更新 CheckMetadata 相关数据
- ExecuteCheck:对每个边缘节点执行若干种类的健康检查插件(ping,kubelet等),并将各插件检查分数汇总,根据用户设置的基准线得出节点是否健康的结果
- Commun:将本节点对其它各节点健康检查的结果发送给其它节点
- Vote:对所有节点健康检查的结果分类,如果某个节点被大多数(>1/2)节点判定为正常,则对该节点添加 superedgehealth/node-health:true annotation,表明该节点分布式健康检查结果为正常;否则,对该节点添加 superedgehealth/node-health:false annotation,表明该节点分布式健康检查结果为异常
- edge-health-admission(Kubernetes mutating admission webhook):不断根据 node edge-health annotation 调整 kube-controller-manager 设置的 node taint(去掉 NoExecute taint)以及endpoints(将失联节点上的 pods 从 endpoint subsets notReadyAddresses 移到 addresses中),从而实现云端和边端共同决定节点状态
- edge-health-daemon:对同区域边缘节点执行分布式健康检查,并向 apiserver 发送健康状态投票结果(给 node 打 annotation),主体逻辑包括四部分功能:
duyanghao kubernetes-reading-notes
【腾讯云原生】云说新品、云研新术、云游新活、云赏资讯,扫码关注同名公众号,及时获取更多干货!!
一文读懂 SuperEdge 分布式健康检查 (边端)相关推荐
- 腾讯资深架构师干货总结:一文读懂大型分布式系统设计的方方面面
1.引言 我们常常会听说,某个互联网应用的服务器端系统多么牛逼,比如QQ.微信.淘宝.那么,一个大型互联网应用的服务器端系统,到底牛逼在什么地方?为什么海量的用户访问,会让一个服务器端系统变得更复杂? ...
- 一文读懂:Kafka(分布式消息队列)的基础概念,教程
[提前声明] 文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章 写作不易,转载请注明,谢谢! 代码案例地址: ?https://github.com/Mydreamandrea ...
- 独家 | 一文读懂Corda分布式记账技术
作者:Dan Newton 翻译:申利彬 校对:丁楠雅 本文约2600字,建议阅读10分钟. 本文为你介绍借鉴了区块链的部分特性的分布式记账技术,并分析其背后的原因. 什么是Corda? 最近我开始了 ...
- 从根上理解高性能、高并发(七):深入操作系统,一文读懂进程、线程、协程
本文引用了"一文读懂什么是进程.线程.协程"一文的主要内容,感谢原作者的无私分享. 1.系列文章引言 1.1 文章目的 作为即时通讯技术的开发者来说,高性能.高并发相关的技术概念早 ...
- 即时通讯新手入门:一文读懂什么是Nginx?它能否实现IM的负载均衡?
本文引用了"蔷薇Nina"的"Nginx 相关介绍(Nginx是什么?能干嘛?)"一文部分内容,感谢作者的无私分享. 1.引言 Nginx(及其衍生产品)是目前 ...
- 一文读懂什么是分布式文件系统
一文读懂什么是分布式文件系统 什么是分布式文件系统 一般文件系统 什么是分布式文件系统 分布式文件系统与一般文件系统的对比 什么是分布式文件系统 本篇来讲讲什么是一般文件系统,什么是分布式文件系统,以 ...
- AI洞观 | 一文读懂英特尔的AI之路
AI洞观 | 一文读懂英特尔的AI之路 https://mp.weixin.qq.com/s/E9NqeywzQ4H2XCFFOFcKXw 11月13日-14日,英特尔人工智能大会(AIDC)在北京召 ...
- 一文读懂BERT(原理篇)
一文读懂BERT(原理篇) 2018年的10月11日,Google发布的论文<Pre-training of Deep Bidirectional Transformers for Langua ...
- 一文读懂:什么是区块链
今天写一个能够一文读懂区块链的文章,以后谁再问我区块链是什么东西,我就把这篇文章发给他. -------- 个人技术公众号:解决方案工程师 欢迎同领域的朋友关注.相互交流. -------- 区块链技 ...
最新文章
- 如何将深度学习研究论文实现为代码的几个要点
- C++判断一个数是否为回文数palindrome的算法(附完整源码)
- java里的字符流_javaIO流中字符流的应用
- History(历史)命令用法 15 例
- mysql8优化实战
- 7-7 念数字 (10 分)
- 电商无线页面设计手机移动端的设计模板
- C#中引用第三方ocx控件引发的问题以及解决办法
- 学习笔记(01):5天Python闯关训练营-103期-re模块使用案例
- RK3288_Android7.1平台基于DRM框架的LCD开发
- Modbus通讯协议详解与RTU通信实例演示
- 鱼之死,越狱章鱼和雾霾黑客
- Excel 制作色卡
- 【WiFi 6E】WiFi 6E信道分布
- Photoshop CC 2019 软件安装教程
- MYSQL 查询某个月有多少天数?
- sublime text 添加到鼠标右键功能
- PicPick软件免费版与正式版区别
- 使用跟踪查看器查看 ASP.NET 跟踪信息
- Linux文件打补丁