上篇笔记中梳理了一把 resolver 和 balancer,这里顺着前面的流程走一遍入口的 ClientConn 对象。

ClientConn

// ClientConn represents a virtual connection to a conceptual endpoint, to
// perform RPCs.
//
// A ClientConn is free to have zero or more actual connections to the endpoint
// based on configuration, load, etc. It is also free to determine which actual
// endpoints to use and may change it every RPC, permitting client-side load
// balancing.
//
// A ClientConn encapsulates a range of functionality including name
// resolution, TCP connection establishment (with retries and backoff) and TLS
// handshakes. It also handles errors on established connections by
// re-resolving the name and reconnecting.
type ClientConn struct {ctx    context.Contextcancel context.CancelFunctarget       stringparsedTarget resolver.Targetauthority    stringdopts        dialOptionscsMgr        *connectivityStateManagerbalancerBuildOpts balancer.BuildOptionsblockingpicker    *pickerWrappermu              sync.RWMutexresolverWrapper *ccResolverWrappersc              *ServiceConfigconns           map[*addrConn]struct{}// Keepalive parameter can be updated if a GoAway is received.mkp             keepalive.ClientParameterscurBalancerName stringbalancerWrapper *ccBalancerWrapperretryThrottler  atomic.ValuefirstResolveEvent *grpcsync.EventchannelzID int64 // channelz unique identification numberczData     *channelzData
}

首先是 ctx 和 cancel 两个字段,之前好像有看到什么最佳实战说不要把 context 字段放在 struct 里传递而要放在 func 里传递,但是这里确实属于一个非常合理的场景:管理连接的生命周期,这个 ctx 和 cancel 都是来自建立连接时的 DialContext,标准库的 net.Conn 的结构体中也有同样的两个字段,这样请求上下文中建立的连接,可以在请求结束时安全释放掉。ClientConn 中派生出的 goroutine,也能通过 cancel 函数安全地关闭掉。

target、parsedTarget、authority、dopts 似乎都属于比较原始的参数。

csMgr 用于管理 ClientConn 总体的连接状态,先放一下,后面详细看。

resolverWrapper、conns、curBalancerName、balancerWrapper、firstResolveEvent 跟名字解析、负载均衡相关,上一篇笔记中简单看过一点。retryThrottler 大约是重试的退避策略,还没有了解过。

sc *ServiceConfig 是服务端给出的服务参数信息,大约是 maxRequestMessageBytes、timeout 之类的控制信息,可以具体到接口级别。mkp keepalive.ClientParameters 也是参数信息,与 keepalive 相关。

channelzID 和 czData 与 channelz 的信息相关,channelz 是 grpc 内部的一些埋点监控性质的信息,大体上是一个异步的 AddTraceEvent 然后汇聚数值,看代码的时候应该可以忽略这部分。

ClientConn 与 resolverWrapper / balancerWrapper 的交互

clientConn 与 resolver / balancer 之间的交互在上一篇笔记中简单梳理过,好处是接口比较明确,所以交互比较清晰。clientConn 与 resolverWrapper / balancerWrapper 之间的交互都是具体的方法,手工梳理一下。

resolverWrapper 对 clientConn 的调用有 updateResolverState。

clientConn 对 resolverWrapper 的调用有 resolveNow。

clientConn 对 balancerWrapper 的调用有:

  • resolveError:调用来自 clientConn 的 updateResolverState 方法,该方法是被 resolverWrapper 所调用的。
  • handleSubConnStateChange,调用来自 clientConn 的 handleSubConnStateChange 方法,该方法又是被 addrConn 的 updateConnectivityState 调用的。
  • updateClientConnState,调用来自 clientConn 的 updateResolverState,用于传递名字解析的更新。

balancerWrapper 对 clientConn 的调用有:

  • newAddrConn、removeAddrConn:大体上与 NewSubConn 和 RemoveSubConn 相映射,addrConn 是具体的 SubConn 的实现。
  • blockingPicker.updatePicker、csMgr.updateState:皆在 UpdateBalancerState 时调用,将 balancer.State 中的 picker 与总连接状态设置给 clientConn。
  • resolveNow:来自 ResolveNow,向 clientConn 发起 resolver 的解析。

画一张图:

交互的过程感觉有点像 k8s 那种侦听结构体的字段变动做收敛逻辑的意思,比如 resolver 给出后端地址、ServiceConfig、附加元信息的 State 结构体,ClientConn 跟 balancer 都拿这一个结构体中自己关心的字段做自己的逻辑,整个流程都异步做。

这张图里只有 handleSubConnStateChange 的来源没标注。它是来自 addrConn 的回调,后面再展开梳理。

ClientConn 的初始化

名字解析与负载均衡都是持续动态刷新的过程,那么整个流程是怎样启动的?裁剪一下 DialContext 函数:

// DialContext creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
// dial, use WithBlock() dial option.
//
// In the non-blocking case, the ctx does not act against the connection. It
// only controls the setup steps.
//
// In the blocking case, ctx can be used to cancel or expire the pending
// connection. Once this function returns, the cancellation and expiration of
// ctx will be noop. Users should call ClientConn.Close to terminate all the
// pending operations after this function returns.
//
// The target name syntax is defined in
// <https://github.com/grpc/grpc/blob/master/doc/naming.md>.
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {cc := &ClientConn{target:            target,csMgr:             &connectivityStateManager{},conns:             make(map[*addrConn]struct{}),dopts:             defaultDialOptions(),blockingpicker:    newPickerWrapper(),czData:            new(channelzData),firstResolveEvent: grpcsync.NewEvent(),}cc.retryThrottler.Store((*retryThrottler)(nil))cc.ctx, cc.cancel = context.WithCancel(context.Background())for _, opt := range opts {opt.apply(&cc.dopts)}// 好像是初始化什么钩子chainUnaryClientInterceptors(cc)chainStreamClientInterceptors(cc)defer func() {if err != nil {cc.Close()}}()if channelz.IsOn() {// ... 初始化 channelz}if !cc.dopts.insecure {// ... tlz 相关参数检查}if cc.dopts.defaultServiceConfigRawJSON != nil {// ... 解析参数指定的默认 ServiceConfig 的 JSON}cc.mkp = cc.dopts.copts.KeepaliveParamsif cc.dopts.copts.Dialer == nil {// ... 默认 Dialer 函数}if cc.dopts.copts.UserAgent != "" {cc.dopts.copts.UserAgent += " " + grpcUA} else {cc.dopts.copts.UserAgent = grpcUA}// 配置 Dial 的超时if cc.dopts.timeout > 0 {var cancel context.CancelFuncctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)defer cancel()}// 退出函数时,如果 DialContext 的 ctx 如果中途撤销或者超时了,则返回 ctx.Err()defer func() {select {case <-ctx.Done():conn, err = nil, ctx.Err()default:}}()// 从 scChan 中侦听接收 serviceConfig 信息scSet := falseif cc.dopts.scChan != nil {// Try to get an initial service config.select {case sc, ok := <-cc.dopts.scChan:if ok {cc.sc = &scscSet = true}default:}}// 默认取指数退避if cc.dopts.bs == nil {cc.dopts.bs = backoff.DefaultExponential}// 根据名字的 Scheme 选择 resolverBuilder// Determine the resolver to use.cc.parsedTarget = parseTarget(cc.target)grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)if resolverBuilder == nil {// .. 如果没有找到则按默认的 resolverBuilder}creds := cc.dopts.copts.TransportCredentials// ..  初始化 cc.authority// 阻塞等待 scChanif cc.dopts.scChan != nil && !scSet {// Blocking wait for the initial service config.select {case sc, ok := <-cc.dopts.scChan:if ok {cc.sc = &sc}case <-ctx.Done():return nil, ctx.Err()}}if cc.dopts.scChan != nil {go cc.scWatcher()}// 初始化 balancervar credsClone credentials.TransportCredentialsif creds := cc.dopts.copts.TransportCredentials; creds != nil {credsClone = creds.Clone()}cc.balancerBuildOpts = balancer.BuildOptions{DialCreds:        credsClone,CredsBundle:      cc.dopts.copts.CredsBundle,Dialer:           cc.dopts.copts.Dialer,ChannelzParentID: cc.channelzID,Target:           cc.parsedTarget,}// Build the resolver.rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)if err != nil {return nil, fmt.Errorf("failed to build resolver: %v", err)}cc.mu.Lock()cc.resolverWrapper = rWrappercc.mu.Unlock()// A blocking dial blocks until the clientConn is ready.if cc.dopts.block {for {s := cc.GetState()if s == connectivity.Ready {break} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {if err = cc.blockingpicker.connectionError(); err != nil {terr, ok := err.(interface {Temporary() bool})if ok && !terr.Temporary() {return nil, err}}}if !cc.WaitForStateChange(ctx, s) {// ctx got timeout or canceled.return nil, ctx.Err()}}}return cc, nil
}

cc.dopts.scChan 这里有一些逻辑,再就是在 dopts.block 时,有主动等连接的逻辑。

顺着 cc.dopts.scChan 找过去,发现参数定义的 dialoptions 里面有这一段:

// WithServiceConfig returns a DialOption which has a channel to read the
// service configuration.
//
// Deprecated: service config should be received through name resolver or via
// WithDefaultServiceConfig, as specified at
// <https://github.com/grpc/grpc/blob/master/doc/service_config.md>.  Will be
// removed in a future 1.x release.
func WithServiceConfig(c <-chan ServiceConfig) DialOption {return newFuncDialOption(func(o *dialOptions) {o.scChan = c})
}

说 scChan 这个字段要废弃了,要么换 WithDefaultServiceConfig 传一个默认的 json,要么通过 resolver 的 UpdateState 中 State 结构体里的 ServiceConfig 字段去动态拿。

ServiceConfig 比想象中更神通广大一点,ClientConn 中有个 applyServiceConfigAndBalancer 方法,甚至会根据动态下发的 ServiceConfig 来调用 switchBalancer 动态切换 balancer 策略。

csMgr 与 WaitForStateChange

回去单独看一下 cc.dopts.block 的逻辑:

// A blocking dial blocks until the clientConn is ready.if cc.dopts.block {for {s := cc.GetState()if s == connectivity.Ready {break} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {if err = cc.blockingpicker.connectionError(); err != nil {terr, ok := err.(interface {Temporary() bool})if ok && !terr.Temporary() {return nil, err}}}if !cc.WaitForStateChange(ctx, s) {// ctx got timeout or canceled.return nil, ctx.Err()}}}

大约是一个死循环连接状态直到 Ready 为止,ClientConn 的连接状态来自 cc.csMgr 做管理,而 csMgr 中的连接状态来自 balancer 对 ClientConn 的 UpdateState 的回调。balancer 的连接状态是对多个连接的连接状态的汇聚,大约是只要有一个连接 Ready,便将 balancer 的连接状态视为 Ready。之前看 balancer 做汇聚连接状态还不大清楚这个的用处,现在看应该主要是为 WaitForStateChange 这个方法服务的,而且这个方法是公共方法,是 ClientConn 的对外 API。

工程上如果开启 cc.dopts.block,似乎配合一个 cc.dopts.timeout 比较好,这样能超时退出。

csMgr 主要做的事情是辅助 ClientConn 实现 connectivity.Reporter 接口,尤其是 WaitForStateChange 方法:

// connectivityStateManager keeps the connectivity.State of ClientConn.
// This struct will eventually be exported so the balancers can access it.
type connectivityStateManager struct {mu         sync.Mutexstate      connectivity.StatenotifyChan chan struct{}channelzID int64
}// ...// updateState updates the connectivity.State of ClientConn.
// If there's a change it notifies goroutines waiting on state change to
// happen.
func (csm *connectivityStateManager) updateState(state connectivity.State) {csm.mu.Lock()defer csm.mu.Unlock()if csm.state == connectivity.Shutdown {return}if csm.state == state {return}csm.state = stateif channelz.IsOn() {// ...}if csm.notifyChan != nil {// There are other goroutines waiting on this channel.close(csm.notifyChan)csm.notifyChan = nil}
}func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {csm.mu.Lock()defer csm.mu.Unlock()if csm.notifyChan == nil {csm.notifyChan = make(chan struct{})}return csm.notifyChan
}// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
// ctx expires. A true value is returned in former case and false in latter.
// This is an EXPERIMENTAL API.
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {ch := cc.csMgr.getNotifyChan()if cc.csMgr.getState() != sourceState {return true}select {case <-ctx.Done():return falsecase <-ch:return true}
}

notifyChan 这个 channel 仅通过 close 做广播性的通知。每当 state 状态变化会惰性产生新的 notifyChan,当这个 notifyChan 被关闭时就意味着状态有变化了,起到一个类似条件变量的作用。

blockingpicker

除了 balancerWrapper、resolverWrapper,ClientConn 中还有一个 pickerWrapper 类型的 blockingPicker 字段,本体也是同样主要是并发同步为主。

// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
// actions and unblock when there's a picker update.
type pickerWrapper struct {mu         sync.Mutexdone       boolblockingCh chan struct{}picker     balancer.V2Picker// The latest connection error.  TODO: remove when V1 picker is deprecated;// balancer should be responsible for providing the error.*connErr
}type connErr struct {mu  sync.Mutexerr error
}

大约是初始化时生成一个 blockingCh,随后每当 updatePickerV2 改动 picker 时,则关闭旧 blockingCh 同时生成一个新的 blockingCh。

pickerWrapper 对外的主要功能入口是 pick 方法,先看它的注释:

// pick returns the transport that will be used for the RPC.
// It may block in the following cases:
// - there's no picker
// - the current picker returns ErrNoSubConnAvailable
// - the current picker returns other errors and failfast is false.
// - the subConn returned by the current picker is not READY
// When one of these situations happens, pick blocks until the picker gets updated.
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {// ...

这些阻塞唯有 balancer 生成新的 picker 对象交给 ClientConn 才能解除。实现风格上,与 WaitForStateChange 类似,每当状态变化时关闭旧 chan、生成新 chan,上锁确保状态变化与更替 chan 两步操作的原子性,对方阻塞等待 chan 的关闭。

picker.Pick() 方法本身是线程安全的,不是很清楚每个 SubConn 能否被多个 goroutine 使用,后面再确认一下这点。

先看到这里,下面是 addrConn,也就是 SubConn 的实现。

grpc 传递上下文_grpc 源码笔记 02:ClientConn相关推荐

  1. SpringBoot源码笔记分析

    SpringBoot源码笔记分析 1.基础 1.配置SpringBoot热部署 1.引入依赖 <dependency><groupId>org.springframework. ...

  2. redis源码笔记 - 刘浩de技术博客 - 博客园

    redis源码笔记 - 刘浩de技术博客 - 博客园 redis源码笔记 - 刘浩de技术博客 - 博客园 redis源码笔记 记录发现的一个hiredis的bug 摘要: hiredis是redis ...

  3. Dubbo-go 源码笔记(二)客户端调用过程

    作者 | 李志信 导读:有了上一篇文章<Dubbo-go 源码笔记(一)Server 端开启服务过程>的铺垫,可以类比客户端启动于服务端的启动过程.其中最大的区别是服务端通过 zk 注册服 ...

  4. phpcms 指定id范围 调用_Dubbogo 源码笔记(二)客户端调用过程

    作者 | 李志信 导读:有了上一篇文章<Dubbo-go 源码笔记(一)Server 端开启服务过程>的铺垫,可以类比客户端启动于服务端的启动过程.其中最大的区别是服务端通过 zk 注册服 ...

  5. 面试大厂被MyBatis问到“哑口无言”?这份MyBatis源码笔记助你吊打面试官!

    写在前面 随着手机.平板电脑等移动终端的广泛应用,移动互联网时代已经到来.在这个时代里,构建一个高效的平台并提供服务是移动互联网的基础,在众多的网站服务中,使用Java构建网站的不在少数,移动互联网的 ...

  6. web前端源码笔记_canvas【爱创课堂专业前端培训】

    爱创课堂前端源码笔记--canvas 一.canvas canvas是HTML5新增的标签用于提供"画布" 可以通过canvas元素获取对应的"上下文"(可以理 ...

  7. Web前端全栈开发_node源码笔记【爱创课堂】

    一.NodeJS简单复习 NodeJS是模块化开发的,有许多内置模块.HTTP模块用于搭建服务器.FS模块用于操作文件和文件夹.URL模块用于URL字符串和URL对象的转换.QueryStrings模 ...

  8. Kernel源码笔记之调度:3.CFS

    Kernel源码笔记目录 简介 主要介绍CFS(Completely Fair Scheduler)完全公平调度器. 代码基于4.19. 两个核心数据结构 // kernel/sched/sched. ...

  9. Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析

    文章目录 前言 1. 消费者负载均衡的实现 2. 源码分析 2.1 KafkaConsumer 的初始化 2.2 KafkaConsumer 的消息拉取 2.2.1 消息拉取的准备及入口 2.2.2 ...

最新文章

  1. JavaScript异步史
  2. 图形上下文的栈操作(保存和恢复)
  3. 【Spark Summit EU 2016】在在线学习中使用Structured Streaming流数据处理引擎
  4. 演示方法:有抱负的分析师
  5. docker export_Docker 几个相似命令的区别
  6. 位图引起的内存溢出OutOfMemory解决方案
  7. 免费python课程排行榜-用python爬取2017年中国最好大学排名
  8. windows 截屏快捷键x220_电脑截屏快捷键是什么啊
  9. numpy.mgrid的用法图解
  10. sql 插入多行数据
  11. 在你学习计算机的路上,哪些书籍对你的帮助最大?
  12. Linux之DNS域名解析
  13. PID控制,matlab/simulink
  14. Excel常见技巧GIF示例
  15. 互联网内容安全中的音频审核应该怎么做好
  16. 如何使用Element Plus 提供的Icon图标库
  17. dvb-c usb android,安卓智能DVB-C高清数字机顶盒
  18. 《组织能力的杨三角》——企业持续成功的秘诀(杨国安)读书摘要
  19. PHP5.4 如何连接MS Sql Server
  20. 图象淡入淡出(VB6)

热门文章

  1. Error:warning: Ignoring InnerClasses attribute for an anonymous inner class
  2. MySQL Antelope和Barracuda的区别分析
  3. PL/SQL Developer(解压版)连接64位的Oracle11g
  4. vb socket的使用
  5. PHP实现XML传输
  6. 【转】Maven Jetty 插件的问题(css/js等目录死锁)的解决
  7. 新的小游戏发布啦。Pop Jungle
  8. POJ1088(滑雪)
  9. IEs 4 Linux 新版支撑 IE 7
  10. 幼儿园语言活动包括哪几类_幼儿园教育:《一起玩》语言活动教案