在 ETCD 源码学习过程,不会讲解太多的源码知识,只讲解相关的实现机制,需要关注源码细节的朋友可以自行根据文章中的提示,找到相关源码进行学习。

服务端监听某个 key 或某个范围 key 变化,当被监听的 key 的值有任何变动,都会主动通知客户端(变更的版本号>指定版本号)。

主要文件

/etcd/clientv3/watch.go

结构图

主要数据结构

//对外提供 Watch 接口,同时负责关闭 grpcStream 的流程。
type watcher struct {
//访问服务的RPC客户端remote   pb.WatchClient //etcdserver/etcdserverpb/rpc.pb.go...
// key=>grpcStream
streams map[string]*watchGrpcStreamlg      *zap.Logger
}
/*
1.实例化过程中,会创建 WatchClient, 这个 WatchClient 用于和服务端进行数据传输。同时会启动一个 goroutine 用于接收服务端的消息。
2.管理 watcherStream 关闭,创建等流程,
3.启动一个 goroutine 用于处理 reqc、respc 等管道的消息。
4.分发事件给所有监听者。
*/
type watchGrpcStream struct {owner    *watcherremote   pb.WatchClient
....//watchID=>watcherStream, 一个长连接substreams map[int64]*watcherStream//等待与服务器建立连接的 watcherStreamresuming []*watcherStream//客户端发起 watch 请求会被追加到管道reqc chan watchStreamRequest//服务端的响应追加到该管道respc chan *pb.WatchResponse....
resumec chan struct{}
}
/*
主要用于管理 watcheRequest 与各个管道之间的数据传输关系。
管道outc、recvc、buf 之间的关系是:(1)watchClient会有个goroutine 用于接收服务端的消息,并将这个消息追加到recvc 管道,(2)如果 recvc 中有数据,会读取出来然后追加到buf缓冲中。(3)如果 buf 不为空,会将消息发送给 outc, 客户端通过该管道,接收服务端的响应。
*/type watcherStream struct {//记录用户的请求实例initReq watchRequest//服务端响应会被追加到这个管道,outc chan WatchResponse//接收服务端的响应recvc chan *WatchResponse...//响应缓冲池buf []*WatchResponse
}
/*请求结构体
一个watcherRequest 代表一个监听请求。
*/
type watchRequest struct {ctx context.Context/*key 和 end 共同组成监控范围rev 是被监控的版本*/key stringend stringrev int64...//最终客户会通过该通道接收服务端的响应retc chan chan WatchResponse
}
//响应结构体
type WatchResponse struct {Header pb.ResponseHeader
Events []*EventCompactRevision int64
...
}

主要函数

监控key

/*
主要流程:
1.请求数据构建
2.生成 grpcStream 的在 streams 中的标识
3.如果标识不存在于 streams,创建一个 grpcStream
4.发起请求,等待请求响应
5.返回监听者接收消息的管道
*/
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {var filters []pb.WatchCreateRequest_FilterType...//组装请求wr := &watchRequest{...}ok := false//获取streamKeyctxKey := streamKeyFromCtx(ctx)w.mu.Lock()if w.streams == nil { //streams 未初始化,有问题// closedw.mu.Unlock()ch := make(chan WatchResponse)close(ch)return ch}//如果已存在直接获取,不存则申请一个grpcStreamwgs := w.streams[ctxKey]if wgs == nil {wgs = w.newWatcherGrpcStream(ctx)w.streams[ctxKey] = wgs}...// couldn't create channel; return closed channelcloseCh := make(chan WatchResponse, 1)// 发送请求select {case reqc <- wr:ok = true....}//等待与服务器建立链接,同时获取接收消息的管道,并返回给监听者if ok {select {case ret := <-wr.retc:return ret...}...
}

创建GrpcStream 实例

func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {ctx, cancel := context.WithCancel(&valCtx{inctx})wgs := &watchGrpcStream{...}go wgs.run()return wgs
}
/*
流程:
1.创建与服务交互的客户端实例
2.处理监听者请求
3.处理服务端消息
*/func (w *watchGrpcStream) run() {...defer func() {//关闭所有stream...}()//创建与服务端交互的客户端if wc, closeErr = w.newWatchClient(); closeErr != nil {return}...var cur *pb.WatchResponsefor {select {//监听请求case req := <-w.reqc:switch wreq := req.(type) {//监听请求case *watchRequest:outc := make(chan WatchResponse, 1)ws := &watcherStream{...}//启动一个 goroutine 用于处理 watcherStream 管道之间的数据传输go w.serveSubstream(ws, w.resumec)//等待与服务器建立连接的 ws//系统保证 watchRequest 会按照先进先出的规则与服务器建立连接w.resuming = append(w.resuming, ws)if len(w.resuming) == 1 { //如果只有一个,说明当前 ws 可以马上建立连接if err := wc.Send(ws.initReq.toPB()); err != nil {lg.Warningf("error when sending request: %v", err)}}//????case *progressRequest:if err := wc.Send(wreq.toPB()); err != nil {lg.Warningf("error when sending request: %v", err)}}// 处理服务端的消息case pbresp := <-w.respc:...switch {case pbresp.Created:// response to head of queue creationif ws := w.resuming[0]; ws != nil {//注册 WathchId=>ws 的关系w.addSubstream(pbresp, ws)//通知所有的监听者w.dispatchEvent(pbresp)w.resuming[0] = nil}//处理下一个需要与服务器交互的请求if ws := w.nextResume(); ws != nil {if err := wc.Send(ws.initReq.toPB()); err != nil {...}}case pbresp.Canceled && pbresp.CompactRevision == 0:case cur.Fragment:continuedefault:}// watch client failed on Recv; spawn another if possiblecase err := <-w.errc:case <-w.ctx.Done():case ws := <-w.closingc:}}
}

watchStream 消息处理

/*
1.如果 buf 有消息,先发送到对应的管道
2.处理服务消息
(1)如果创建类型,需要看看是否给监听者发送过消息接收的管道,如果没有,就发送,同时设置已发送。
(2)更新下一次监听的版本号
(3)将需要发送给监听者的消息暂存到 buf 中
*/
func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {//下一次监听的版本号nextRev := ws.initReq.rev...emptyWr := &WatchResponse{}for {curWr := emptyWroutc := ws.outcif len(ws.buf) > 0 { //如果缓冲中还有消息,先发送给监听者curWr = ws.buf[0]} else {outc = nil}select {case outc <- *curWr: //发送服务消息给监听者...case wr, ok := <-ws.recvc: //如果服务有消息发送过来if !ok {// shutdown from closeSubstreamreturn}//创建监听请求类型if wr.Created {if ws.initReq.retc != nil { //如果不为 nil,说明还没给监听者发送接收消息的 消息管道ws.initReq.retc <- ws.outc //给监听者发送接收消息管道ws.initReq.retc = nil //表示已经发送消息管道if ws.initReq.createdNotify { //发送消息该监听者ws.outc <- *wr}if ws.initReq.rev == 0 { //设置最新版本号nextRev = wr.Header.Revision}}} else { //更新下次接收变化的版本号nextRev = wr.Header.Revision}if wr.Created {continue}//将消息缓存到 buf 中ws.buf = append(ws.buf, wr)case <-w.ctx.Done():returncase <-ws.initReq.ctx.Done():returncase <-resumec:resuming = truereturn}}
}

消息分发

func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {events := make([]*Event, len(pbresp.Events))for i, ev := range pbresp.Events {events[i] = (*Event)(ev)}wr := &WatchResponse{...}if wr.IsProgressNotify() && pbresp.WatchId == -1 {return w.broadcastResponse(wr)}return w.unicastResponse(wr, pbresp.WatchId)}//将消息广播给所有监听者
func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {for _, ws := range w.substreams {select {case ws.recvc <- wr:case <-ws.donec:}}return true
}
//将消息发送给特定监听者
func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {ws, ok := w.substreams[watchId]if !ok {return false}select {case ws.recvc <- wr:case <-ws.donec:return false}return true
}
客户端实例:
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {...//打开 or 创建 客户端实例wc, err := w.openWatchClient() ...//goroutine 用于接收服务端消息go w.serveWatchClient(wc)return wc, nil
}func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {for {resp, err := wc.Recv()...select {case w.respc <- resp:...}
}

总结

1.监听通过 watcher、 watchGrpcStream 、watchStream、watcherRequest 和 WatchResponse 等数据结构实现,其中 wathcer 主要对外提供 Watch 接口和处理关闭流程,每一个 watcherRequest 会构建 一个 watchGrpcStream,watchGrpcStream 用于管理 watchStream 和各种管道消息处理,watchStream 主要用于管理 watcheRequest 与各个管道之间的数据传输关系。

2.总共有3种后台 goroutine,watchGrpcStream.run 处理监听请求和服务端消息。 watchGrpcStream.serveSubstream(每个 watcherRequest 都会有),作为服务端和监听者的中间层,将服务端的消息分发给监听者。watchGrpcStream.serveWatchClient 用于接收服务端的消息,并将消息发送给 w.respc, 等待 serveSubstream 处理。

3.Watch 主要实现是通过Stream 实现,只有其中的内部原理,有待研究。

ETCD 源码学习--Watch(client)相关推荐

  1. ETCD 源码学习--lease(一)

    在 ETCD 源码学习过程,不会讲解太多的源码知识,只讲解相关的实现机制,需要关注源码细节的朋友可以自行根据文章中的提示,找到相关源码进行学习. 租赁,在 etcd 中主要用于设置 key 的有效期, ...

  2. ETCD 源码学习--WAL 实现

    在 ETCD 源码学习过程,不会讲解太多的源码知识,只讲解相关的实现机制,需要关注源码细节的朋友可以自行根据文章中的提示,找到相关源码进行学习. 预写式日志(Write Ahead Log, WAL) ...

  3. ETCD 源码学习--Raft 中 progress 的 inFlight 实现(九)

    首先需要搞清什么是 inFlight,inFlight 在 Raft 中存储的是已发送给 Follower 的 MsgApp 消息,但没有收到 MsgAppResp 的消息 Index  值.简单的说 ...

  4. 菜鸟学源码之Nacos v1.1.3源码学习-Client模块(1):NacosNamingService初始化

    摘要: 本文是Nacos源码学习的第一篇,基于Nacos v1.1.3版本对Nacos源码进行学习,本片主要从exmaple的App示例入手,切入Nacos客户端NacosNamingService的 ...

  5. 第十四课 k8s源码学习和二次开发原理篇-调度器原理

    第十四课 k8s源码学习和二次开发原理篇-调度器原理 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第十四课 k8s源码学习和二次开发原理篇-调度器原理 第一节 ...

  6. 第八课 k8s源码学习和二次开发原理篇-KubeBuilder使用和Controller-runtime原理

    第八课 k8s源码学习和二次开发原理篇-KubeBuilder使用和Controller-runtime原理 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第八课 ...

  7. Etcd源码分析-存储3

    Etcd是分布式存储系统,当leader有数据变化,要及时更新到其他节点,这里就涉及到数据同步. 一.数据同步 上一篇介绍,Etcd接收到客户端的请求,会把相关数据传递到Raft状态机中,那么进入状态 ...

  8. client-go源码学习(二):Reflector、DeltaFIFO

    本文基于Kubernetes v1.22.4版本进行源码学习,对应的client-go版本为v0.22.4 3.Informer机制 在Kubernetes系统中,组件之间通过HTTP协议进行通信,在 ...

  9. 第三课 k8s源码学习和二次开发-缓存机制Informers和Reflector组件学习

    第三课 k8s源码学习和二次开发-缓存机制Informers和Reflector组件学习 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第三课 k8s源码学习和二 ...

最新文章

  1. 转: 从微信的故障谈谈服务可用性
  2. centos6.0下ffmpeg的安装编译经历
  3. 页面用ajax实现简单的连接数据库
  4. 数据建模-聚类分析-K-Means算法
  5. Ubuntu 16.04 安装CUDA8.0+Cudnn6.0+TensorFlow+Caffe安装
  6. python保存与加载LGBM模型,并解决报错TypeError: Need at least one training dataset or model file or model string..
  7. u盘启动linux hp服务器,分享下用U盘启动给服务器【HP DL165 G7】安装原版2003系统的经验...
  8. 考研:研究生考试(十五天学完)之《高等数学-上册/下册》研究生学霸重点知识点总结之目录(函数与极限、导数与微分、微分中值定理与导数、不定积分、定积分及其应用、微分方程、空间解析几何与向量代数、多元函数
  9. vue项目中用Iconfont阿里巴巴矢量图标库解析失败的解决办法
  10. 线性回归 T检验P值计算
  11. WebDAV之葫芦儿·派盘+Solid Explorer = 全能 Android 文件管理器
  12. JavaScript - canvas - 使用鼠标画线,带撤销与重做功能
  13. MMdetection2测试voc数据获取precision
  14. cadence 17.2 入门学习2 allegro
  15. 学习天才犹太人的经商法则
  16. 质数的判断(Python)找到100(1000)以内的所有质数
  17. 四个程序员编辑器,编程必备!!!
  18. 如何运营好微信公众号
  19. 服务器omv系统,开源NAS系统OpenMediaVault安装与体验
  20. Qt使用两组RadioButton,两组之间相互独立

热门文章

  1. OpenCV实现击中击不中变换和形态学细化
  2. 如何用计算机将图片整成手绘画,【新手教程】如何将手绘作品转变成电子档,并让其更像“作品”?...
  3. excel表格怎么拆分成多个表格?
  4. 【黄啊码】vue配合PHP实现导出excel进度条显示
  5. 李沐动手学深度学习第四章-4.9.环境和分布偏移
  6. 爆料称macOS Ventura正式版预计将于10月最后一周推出 支持新款14/16英寸MacBook Pro
  7. 第十三届蓝桥杯省赛C++B组题解
  8. web前端开发浏览器兼容性 - 持续更新
  9. [Excel VBA]如何批量產出QRcode?
  10. [欧美音乐]Tamas Wells -《A Plea en Vendredi》[MP3+FLAC]