ETCD 源码学习--Watch(client)
在 ETCD 源码学习过程,不会讲解太多的源码知识,只讲解相关的实现机制,需要关注源码细节的朋友可以自行根据文章中的提示,找到相关源码进行学习。
服务端监听某个 key 或某个范围 key 变化,当被监听的 key 的值有任何变动,都会主动通知客户端(变更的版本号>指定版本号)。
主要文件
/etcd/clientv3/watch.go
结构图
主要数据结构
//对外提供 Watch 接口,同时负责关闭 grpcStream 的流程。
type watcher struct {
//访问服务的RPC客户端remote pb.WatchClient //etcdserver/etcdserverpb/rpc.pb.go...
// key=>grpcStream
streams map[string]*watchGrpcStreamlg *zap.Logger
}
/*
1.实例化过程中,会创建 WatchClient, 这个 WatchClient 用于和服务端进行数据传输。同时会启动一个 goroutine 用于接收服务端的消息。
2.管理 watcherStream 关闭,创建等流程,
3.启动一个 goroutine 用于处理 reqc、respc 等管道的消息。
4.分发事件给所有监听者。
*/
type watchGrpcStream struct {owner *watcherremote pb.WatchClient
....//watchID=>watcherStream, 一个长连接substreams map[int64]*watcherStream//等待与服务器建立连接的 watcherStreamresuming []*watcherStream//客户端发起 watch 请求会被追加到管道reqc chan watchStreamRequest//服务端的响应追加到该管道respc chan *pb.WatchResponse....
resumec chan struct{}
}
/*
主要用于管理 watcheRequest 与各个管道之间的数据传输关系。
管道outc、recvc、buf 之间的关系是:(1)watchClient会有个goroutine 用于接收服务端的消息,并将这个消息追加到recvc 管道,(2)如果 recvc 中有数据,会读取出来然后追加到buf缓冲中。(3)如果 buf 不为空,会将消息发送给 outc, 客户端通过该管道,接收服务端的响应。
*/type watcherStream struct {//记录用户的请求实例initReq watchRequest//服务端响应会被追加到这个管道,outc chan WatchResponse//接收服务端的响应recvc chan *WatchResponse...//响应缓冲池buf []*WatchResponse
}
/*请求结构体
一个watcherRequest 代表一个监听请求。
*/
type watchRequest struct {ctx context.Context/*key 和 end 共同组成监控范围rev 是被监控的版本*/key stringend stringrev int64...//最终客户会通过该通道接收服务端的响应retc chan chan WatchResponse
}
//响应结构体
type WatchResponse struct {Header pb.ResponseHeader
Events []*EventCompactRevision int64
...
}
主要函数
监控key
/*
主要流程:
1.请求数据构建
2.生成 grpcStream 的在 streams 中的标识
3.如果标识不存在于 streams,创建一个 grpcStream
4.发起请求,等待请求响应
5.返回监听者接收消息的管道
*/
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {var filters []pb.WatchCreateRequest_FilterType...//组装请求wr := &watchRequest{...}ok := false//获取streamKeyctxKey := streamKeyFromCtx(ctx)w.mu.Lock()if w.streams == nil { //streams 未初始化,有问题// closedw.mu.Unlock()ch := make(chan WatchResponse)close(ch)return ch}//如果已存在直接获取,不存则申请一个grpcStreamwgs := w.streams[ctxKey]if wgs == nil {wgs = w.newWatcherGrpcStream(ctx)w.streams[ctxKey] = wgs}...// couldn't create channel; return closed channelcloseCh := make(chan WatchResponse, 1)// 发送请求select {case reqc <- wr:ok = true....}//等待与服务器建立链接,同时获取接收消息的管道,并返回给监听者if ok {select {case ret := <-wr.retc:return ret...}...
}
创建GrpcStream 实例
func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {ctx, cancel := context.WithCancel(&valCtx{inctx})wgs := &watchGrpcStream{...}go wgs.run()return wgs
}
/*
流程:
1.创建与服务交互的客户端实例
2.处理监听者请求
3.处理服务端消息
*/func (w *watchGrpcStream) run() {...defer func() {//关闭所有stream...}()//创建与服务端交互的客户端if wc, closeErr = w.newWatchClient(); closeErr != nil {return}...var cur *pb.WatchResponsefor {select {//监听请求case req := <-w.reqc:switch wreq := req.(type) {//监听请求case *watchRequest:outc := make(chan WatchResponse, 1)ws := &watcherStream{...}//启动一个 goroutine 用于处理 watcherStream 管道之间的数据传输go w.serveSubstream(ws, w.resumec)//等待与服务器建立连接的 ws//系统保证 watchRequest 会按照先进先出的规则与服务器建立连接w.resuming = append(w.resuming, ws)if len(w.resuming) == 1 { //如果只有一个,说明当前 ws 可以马上建立连接if err := wc.Send(ws.initReq.toPB()); err != nil {lg.Warningf("error when sending request: %v", err)}}//????case *progressRequest:if err := wc.Send(wreq.toPB()); err != nil {lg.Warningf("error when sending request: %v", err)}}// 处理服务端的消息case pbresp := <-w.respc:...switch {case pbresp.Created:// response to head of queue creationif ws := w.resuming[0]; ws != nil {//注册 WathchId=>ws 的关系w.addSubstream(pbresp, ws)//通知所有的监听者w.dispatchEvent(pbresp)w.resuming[0] = nil}//处理下一个需要与服务器交互的请求if ws := w.nextResume(); ws != nil {if err := wc.Send(ws.initReq.toPB()); err != nil {...}}case pbresp.Canceled && pbresp.CompactRevision == 0:case cur.Fragment:continuedefault:}// watch client failed on Recv; spawn another if possiblecase err := <-w.errc:case <-w.ctx.Done():case ws := <-w.closingc:}}
}
watchStream 消息处理
/*
1.如果 buf 有消息,先发送到对应的管道
2.处理服务消息
(1)如果创建类型,需要看看是否给监听者发送过消息接收的管道,如果没有,就发送,同时设置已发送。
(2)更新下一次监听的版本号
(3)将需要发送给监听者的消息暂存到 buf 中
*/
func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {//下一次监听的版本号nextRev := ws.initReq.rev...emptyWr := &WatchResponse{}for {curWr := emptyWroutc := ws.outcif len(ws.buf) > 0 { //如果缓冲中还有消息,先发送给监听者curWr = ws.buf[0]} else {outc = nil}select {case outc <- *curWr: //发送服务消息给监听者...case wr, ok := <-ws.recvc: //如果服务有消息发送过来if !ok {// shutdown from closeSubstreamreturn}//创建监听请求类型if wr.Created {if ws.initReq.retc != nil { //如果不为 nil,说明还没给监听者发送接收消息的 消息管道ws.initReq.retc <- ws.outc //给监听者发送接收消息管道ws.initReq.retc = nil //表示已经发送消息管道if ws.initReq.createdNotify { //发送消息该监听者ws.outc <- *wr}if ws.initReq.rev == 0 { //设置最新版本号nextRev = wr.Header.Revision}}} else { //更新下次接收变化的版本号nextRev = wr.Header.Revision}if wr.Created {continue}//将消息缓存到 buf 中ws.buf = append(ws.buf, wr)case <-w.ctx.Done():returncase <-ws.initReq.ctx.Done():returncase <-resumec:resuming = truereturn}}
}
消息分发
func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {events := make([]*Event, len(pbresp.Events))for i, ev := range pbresp.Events {events[i] = (*Event)(ev)}wr := &WatchResponse{...}if wr.IsProgressNotify() && pbresp.WatchId == -1 {return w.broadcastResponse(wr)}return w.unicastResponse(wr, pbresp.WatchId)}//将消息广播给所有监听者
func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {for _, ws := range w.substreams {select {case ws.recvc <- wr:case <-ws.donec:}}return true
}
//将消息发送给特定监听者
func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {ws, ok := w.substreams[watchId]if !ok {return false}select {case ws.recvc <- wr:case <-ws.donec:return false}return true
}
客户端实例:
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {...//打开 or 创建 客户端实例wc, err := w.openWatchClient() ...//goroutine 用于接收服务端消息go w.serveWatchClient(wc)return wc, nil
}func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {for {resp, err := wc.Recv()...select {case w.respc <- resp:...}
}
总结
1.监听通过 watcher、 watchGrpcStream 、watchStream、watcherRequest 和 WatchResponse 等数据结构实现,其中 wathcer 主要对外提供 Watch 接口和处理关闭流程,每一个 watcherRequest 会构建 一个 watchGrpcStream,watchGrpcStream 用于管理 watchStream 和各种管道消息处理,watchStream 主要用于管理 watcheRequest 与各个管道之间的数据传输关系。
2.总共有3种后台 goroutine,watchGrpcStream.run 处理监听请求和服务端消息。 watchGrpcStream.serveSubstream(每个 watcherRequest 都会有),作为服务端和监听者的中间层,将服务端的消息分发给监听者。watchGrpcStream.serveWatchClient 用于接收服务端的消息,并将消息发送给 w.respc, 等待 serveSubstream 处理。
3.Watch 主要实现是通过Stream 实现,只有其中的内部原理,有待研究。
ETCD 源码学习--Watch(client)相关推荐
- ETCD 源码学习--lease(一)
在 ETCD 源码学习过程,不会讲解太多的源码知识,只讲解相关的实现机制,需要关注源码细节的朋友可以自行根据文章中的提示,找到相关源码进行学习. 租赁,在 etcd 中主要用于设置 key 的有效期, ...
- ETCD 源码学习--WAL 实现
在 ETCD 源码学习过程,不会讲解太多的源码知识,只讲解相关的实现机制,需要关注源码细节的朋友可以自行根据文章中的提示,找到相关源码进行学习. 预写式日志(Write Ahead Log, WAL) ...
- ETCD 源码学习--Raft 中 progress 的 inFlight 实现(九)
首先需要搞清什么是 inFlight,inFlight 在 Raft 中存储的是已发送给 Follower 的 MsgApp 消息,但没有收到 MsgAppResp 的消息 Index 值.简单的说 ...
- 菜鸟学源码之Nacos v1.1.3源码学习-Client模块(1):NacosNamingService初始化
摘要: 本文是Nacos源码学习的第一篇,基于Nacos v1.1.3版本对Nacos源码进行学习,本片主要从exmaple的App示例入手,切入Nacos客户端NacosNamingService的 ...
- 第十四课 k8s源码学习和二次开发原理篇-调度器原理
第十四课 k8s源码学习和二次开发原理篇-调度器原理 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第十四课 k8s源码学习和二次开发原理篇-调度器原理 第一节 ...
- 第八课 k8s源码学习和二次开发原理篇-KubeBuilder使用和Controller-runtime原理
第八课 k8s源码学习和二次开发原理篇-KubeBuilder使用和Controller-runtime原理 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第八课 ...
- Etcd源码分析-存储3
Etcd是分布式存储系统,当leader有数据变化,要及时更新到其他节点,这里就涉及到数据同步. 一.数据同步 上一篇介绍,Etcd接收到客户端的请求,会把相关数据传递到Raft状态机中,那么进入状态 ...
- client-go源码学习(二):Reflector、DeltaFIFO
本文基于Kubernetes v1.22.4版本进行源码学习,对应的client-go版本为v0.22.4 3.Informer机制 在Kubernetes系统中,组件之间通过HTTP协议进行通信,在 ...
- 第三课 k8s源码学习和二次开发-缓存机制Informers和Reflector组件学习
第三课 k8s源码学习和二次开发-缓存机制Informers和Reflector组件学习 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第三课 k8s源码学习和二 ...
最新文章
- 转: 从微信的故障谈谈服务可用性
- centos6.0下ffmpeg的安装编译经历
- 页面用ajax实现简单的连接数据库
- 数据建模-聚类分析-K-Means算法
- Ubuntu 16.04 安装CUDA8.0+Cudnn6.0+TensorFlow+Caffe安装
- python保存与加载LGBM模型,并解决报错TypeError: Need at least one training dataset or model file or model string..
- u盘启动linux hp服务器,分享下用U盘启动给服务器【HP DL165 G7】安装原版2003系统的经验...
- 考研:研究生考试(十五天学完)之《高等数学-上册/下册》研究生学霸重点知识点总结之目录(函数与极限、导数与微分、微分中值定理与导数、不定积分、定积分及其应用、微分方程、空间解析几何与向量代数、多元函数
- vue项目中用Iconfont阿里巴巴矢量图标库解析失败的解决办法
- 线性回归 T检验P值计算
- WebDAV之葫芦儿·派盘+Solid Explorer = 全能 Android 文件管理器
- JavaScript - canvas - 使用鼠标画线,带撤销与重做功能
- MMdetection2测试voc数据获取precision
- cadence 17.2 入门学习2 allegro
- 学习天才犹太人的经商法则
- 质数的判断(Python)找到100(1000)以内的所有质数
- 四个程序员编辑器,编程必备!!!
- 如何运营好微信公众号
- 服务器omv系统,开源NAS系统OpenMediaVault安装与体验
- Qt使用两组RadioButton,两组之间相互独立
热门文章
- OpenCV实现击中击不中变换和形态学细化
- 如何用计算机将图片整成手绘画,【新手教程】如何将手绘作品转变成电子档,并让其更像“作品”?...
- excel表格怎么拆分成多个表格?
- 【黄啊码】vue配合PHP实现导出excel进度条显示
- 李沐动手学深度学习第四章-4.9.环境和分布偏移
- 爆料称macOS Ventura正式版预计将于10月最后一周推出 支持新款14/16英寸MacBook Pro
- 第十三届蓝桥杯省赛C++B组题解
- web前端开发浏览器兼容性 - 持续更新
- [Excel VBA]如何批量產出QRcode?
- [欧美音乐]Tamas Wells -《A Plea en Vendredi》[MP3+FLAC]