逆熵(anti-entropy) 指的是 agent 本地定期向 consul server 同步信息,以及拉取 consul server 中的信息然后合并的过程。

熵是系统变得越来越无序的趋势。consul 的逆熵机制旨在对抗这种趋势,即使在组件发生故障的情况下也能保持集群的有序状态。

有两个关键概念需要区分:

agent - 即 consul agent,agent 中的信息由 consul client 自己维护。

catalog - 是集群中所有 agent 中信息的抽象。catalog 中的信息由 consul servers 进行维护。

比如

http://10.41.12.170:8500/v1/agent/service/wax-delivery 得到的是注册在 10.41.12.170 上服务名为 wax-delivery 的节点(1个);

curl http://10.41.12.170:8500/v1/catalog/service/wax-delivery 得到的是整个集群中服务名为 wax-delivery 的节点(36个)。

即当使用 /v1/agent 路径时,agent 会进行本地处理,然后把处理结果同步给 consul server;而当使用 /v1/catalog 路径时,agent 会将请求转发给 consul server,由 consul server 进行处理。

换句话说,即使在某一个 consul client 所在的 agent 请求 /v1/catalog/register 注册某 A 服务,也不会在该 agent 的 /v1/agent/services 响应中包含 A 服务。

例如,当用户向 agent 注册新服务时,agent 会通知 catalog 更改。类似地,当从 agent 中注销服务时,也会同步给 catalog。

注意,agent 中的信息具有权威性。即当 agent 拉取到 server 中的信息和本地不同时,以 agent 为准。


实现

在 agent 启动时,会开始进行同步任务:

cmd/agent/agent.go

func (a *Agent) Start(ctx context.Context) error {// ...a.State = local.NewState(LocalConfig(c), a.logger, a.tokens)// c.AEInterval consul 内部写死 1 分钟。a.sync = ae.NewStateSyncer(a.State, c.AEInterval, a.shutdownCh, a.logger)// ...// Setup either the client or the server.if c.ServerMode {server, err := consul.NewServer(consulCfg, options...)if err != nil {return fmt.Errorf("Failed to start Consul server: %v", err)}a.delegate = server} else {client, err := consul.NewClient(consulCfg, options...)if err != nil {return fmt.Errorf("Failed to start Consul client: %v", err)}a.delegate = client}// the staggering of the state syncing depends on the cluster size.a.sync.ClusterSize = func() int { return len(a.delegate.LANMembers()) }// ...
}

其中 a.State 实现了如下的 SyncState 接口,即 a.sync 的数据存储为 a.State,并通过 SyncState 接口 提供的方法进行同步操作。

type SyncState interface {SyncChanges() errorSyncFull() error
}

a.sync 实例化之后运行 Run 方法:

func (s *StateSyncer) Run() {if s.ClusterSize == nil {panic("ClusterSize not set")}s.resetNextFullSyncCh() // s.runFSM(fullSyncState, s.nextFSMState)
}

s.resetNextFullSyncCh 初始化同步开始的时刻,内部使用了随机时刻,避免同时开始带来的服务器压力(thundering herd)。

func (s *StateSyncer) resetNextFullSyncCh() {if s.stagger != nil {s.nextFullSyncCh = time.After(s.Interval + s.stagger(s.Interval))} else {s.nextFullSyncCh = time.After(s.Interval)}
}
func (s *StateSyncer) staggerFn(d time.Duration) time.Duration {f := scaleFactor(s.ClusterSize()) // 计算基准时间 f分钟return libRandomStagger(time.Duration(f) * d) // 在 0秒 ~ f分钟 之间随机选一个时间
}

上面代码实现了官网中的“节点数 - 同步间隔”表。

// runFSM runs the state machine.
func (s *StateSyncer) runFSM(fs fsmState, next func(fsmState) fsmState) {for {if fs = next(fs); fs == doneState {return}}
}// nextFSMState determines the next state based on the current state.
func (s *StateSyncer) nextFSMState(fs fsmState) fsmState {switch fs {case fullSyncState:if s.Paused() {return retryFullSyncState}err := s.State.SyncFull() // 调用 State 接口的 SyncFull 方法if err != nil {s.Logger.Error("failed to sync remote state", "error", err)return retryFullSyncState}return partialSyncStatecase retryFullSyncState:e := s.retrySyncFullEvent()switch e {case syncFullNotifEvent, syncFullTimerEvent:return fullSyncStatecase shutdownEvent:return doneStatedefault:panic(fmt.Sprintf("invalid event: %s", e))}case partialSyncState:e := s.syncChangesEvent()switch e {case syncFullNotifEvent, syncFullTimerEvent:return fullSyncStatecase syncChangesNotifEvent:if s.Paused() {return partialSyncState}err := s.State.SyncChanges() // 调用 State 接口的 SyncChanges 方法if err != nil {s.Logger.Error("failed to sync changes", "error", err)}return partialSyncStatecase shutdownEvent:return doneStatedefault:panic(fmt.Sprintf("invalid event: %s", e))}default:panic(fmt.Sprintf("invalid state: %s", fs))}
}

下面是 State 的实现,即上面 SyncFull 和 SyncChanges 的实现。

agent/local/state.go

func (l *State) SyncFull() error {if err := l.updateSyncState(); err != nil {return err}return l.SyncChanges()
}

这里重点关注 updateSyncState 方法:

// updateSyncState does a read of the server state, and updates
// the local sync status as appropriate
func (l *State) updateSyncState() error {// 获取远端服务列表req := structs.NodeSpecificRequest{Datacenter: l.config.Datacenter,Node:       l.config.NodeName,QueryOptions: structs.QueryOptions{Token:            l.tokens.AgentToken(),AllowStale:       true,MaxStaleDuration: fullSyncReadMaxStale,},EnterpriseMeta: *structs.WildcardEnterpriseMeta(),}// 这里的 Delegate 实现可以是 client 或者 server// client 的实现是一次真正的远程调用// server 的实现其实是本地函数调用l.Delegate.RPC("Catalog.NodeServiceList", &req, &out1)for _, svc := range out1.NodeServices.Services {remoteServices[svc.CompoundServiceID()] = svc}// ...// 遍历本地服务列表for id, s := range l.services {if remoteServices[id] == nil {s.InSync = false // 标记本地刚注册的服务}}// 遍历远端服务列表,并合并到本地for id, rs := range remoteServices {ls := l.services[id] // ls => localServiceif ls == nil { // 本地没有的服务标记为已删除// ...l.services[id] = &ServiceState{Deleted: true}continue}if ls.Deleted {continue}// ...ls.InSync = ls.Service.IsSame(rs) // 比较本地服务是否和远端获取的这个服务相同}// ...
}
func (l *State) SyncChanges() error {// ...// 遍历刚合并过远程拉取信息的本地服务for id, s := range l.services {var err errorswitch {case s.Deleted: // 本地已删除的服务err = l.deleteService(id) // 从远端删除服务case !s.InSync: // 本地刚注册的服务还未同步到远端err = l.syncService(id) // 将服务添加到远端default:l.logger.Debug("Service in sync", "service", id.String())}if err != nil {return err}}// ...return nil
}

什么时候会 pause ?

当向 agent 发起请求,比如注册服务时会触发 pause:

func (l *State) setServiceStateLocked(s *ServiceState) {s.WatchCh = make(chan struct{}, 1)key := s.Service.CompoundServiceID()old, hasOld := l.services[key]l.services[key] = s // 将新注册的服务写入到本地服务列表// ...
}

至此一个状态更新回路完成:

注意上面的 agent 指的是运行在 client mode 下的 agent,对于运行在 server mode 下的 agent 情况也是类似的,只是 RPC 的实现过程不同其余的都是一样。


参考:

  • https://www.consul.io/docs/architecture/anti-entropy

Consul arch(二) 逆熵 anti-entropy相关推荐

  1. 平均符号熵的计算公式_交叉熵(Cross Entropy)从原理到代码解读

    交叉熵(Cross Entropy)是Shannon(香浓)信息论中的一个概念,在深度学习领域中解决分类问题时常用它作为损失函数. 原理部分:要想搞懂交叉熵需要先清楚一些概念,顺序如下:==1.自信息 ...

  2. 图像的一维熵和二维熵

    图像的一维熵和二维熵 图像的熵是一种特征的统计形式,它反映了图像中平均信息量的多少.图像的一维熵表示图像中灰度分布的聚集特征所包含的信息量,令Pi表示图像中灰度值为i的像素所占的比例,则定义灰度图象的 ...

  3. 灵遁者哲学书籍《 重构世界》:意识是物质逆熵存在的本征

    <重构世界>为灵遁者创作的哲学书籍,首发于豆瓣阅读 [1] .该文以人为自己的"局限"出发,重新定义意识,即意识是物质逆熵存在的本征,从而以更广的思维角度来" ...

  4. 什么是熵(entropy)?

    熵的概念最早起源于物理学,用于度量一个热力学系统的无序程度.在信息论里面,熵是对不确定性的测量. 1.1 熵的引入 事实上,熵的英文原文为entropy,最初由德国物理学家鲁道夫·克劳修斯提出,其表达 ...

  5. 熵(entropy)的定义

    熵的概念最早起源于物理学,用于度量一个热力学系统的无序程度.在信息论里面,熵是对不确定性的测量. 1.1 熵的引入 事实上,熵的英文原文为entropy,最初由德国物理学家鲁道夫·克劳修斯提出,其表达 ...

  6. 熵(entropy)到底是什么?科普

    上一篇博客熵(entropy):宇宙的终极规则 比较有哲理性,但对于熵具体的解释没有太多,有的地方可能不太明白,比如冰变水应该是一个吸收热量的过程,文中举得例子冰变成水,从有序变成无序,是一个能量释放 ...

  7. 崩坏3rd逆熵小说成就脚本,10秒全成就

    首先感谢@povsister大佬写的脚本,技术宅拯救世界,我只是在github上偶然看到了这个脚本,忍不住想安利给更多人.我是搬运工,大佬永远是大佬. 下面来介绍这个脚本的用法,实际非常简单-.几句话 ...

  8. 熵(entropy)、交叉熵(cross-entropy)

    原文:一文搞懂熵(Entropy),交叉熵(Cross-Entropy) 一.熵 1.混乱程度,不确定性,信息量? 不同的人对熵有不同的解释:混乱程度,不确定性,惊奇程度,不可预测性,信息量等等,面对 ...

  9. 熵(Entropy)、信息熵增益、信息熵增率和基尼(Gini)指数

    文章中的这些概念为衡量特征(属性)选择的方法,特征选择在于选取对训练数据具有分类能力的特征,提高决策树学习的效率,特征选择是决定用哪个特征来划分特征空间. 文章目录 信息熵(information e ...

最新文章

  1. 吴恩达《Machine Learning》Jupyter Notebook 版笔记发布!图解、公式、习题都有了
  2. ubuntu下mysql中文乱码_Ubuntu的MySQL中文乱码问题--自己躺坑
  3. [转]C++和C#编写并且相互调用COM组件
  4. linux nexus状态,linux 启动 nexus
  5. vector 指针 的指针
  6. 养蛙游戏刷爆朋友圈,养蛙成功“反杀”传统手游?
  7. docker 简版教程
  8. Android仿美团加载数据、小人奔跑进度动画对话框(附顺丰快递员奔跑效果)
  9. 个人管理 - Learn More,Study Less!
  10. Unity如何刚体控制物体的移动以及旋转
  11. 数据库索引实现(B+,B-,hash)
  12. VS调试按钮和运行按钮无法使用
  13. 高等数学导数公式与积分表
  14. 生产环境下服务器台账
  15. Smobiler 仿得到APP个人主页
  16. html简繁体转换,在线繁体字转换工具
  17. 打印机设备与计算机连接类型,电脑打印机的连接方式 打印机的种类有哪些
  18. JavaScript——Symbol类型
  19. MySQL的循环语句使用总结
  20. matlab赌徒破产模型转移矩阵,[转载]【转】生成土地利用变化转移矩阵的方法

热门文章

  1. python中对字典的循环遍历的方式
  2. Win10 Version 1803 四月更新正式版 ISO 镜像下载
  3. mmap和shmget的区别
  4. 老雷socket编程之认识常用协议
  5. informatica优化
  6. ButterKnife与Fragment的爱恨情仇(java.lang.IllegalStateException: Bindings already cleared.)
  7. 最强神作!Crysis深度剖析与优化指南(1-8)
  8. 计算机专业英语09章在线测,審计学第09章在线测试.doc
  9. RSS应用现状以及我的一些想法
  10. 北大计算机专业年薪,清华、北大毕业生的年薪全国最高吗?一般能达到多少?...