本系列准备介绍微服务框架的相关内容,以我目前在用的kite框架为栗子,也扩展一些业界常见的实现,主要包括一下的部分:

  1. rpc框架基础
  2. kite的具体实现
  3. 服务治理概述
  4. 其他实现细节及优化

文章目录

  • 服务端路径
    • 初始化processor
    • 初始化服务器
    • 注册中间件
    • 运行服务器
    • overLoader
    • 服务端调用关系图
  • 客户端路径
    • 生成客户端实例
    • 客户端方法的内部实现
    • 客户端调用关系图
  • 小结
  • 更新
    • 服务端

以一个简单的demo示例,从服务端和客户端两部分来进行解读。
thrift文件如下,定义了itemService,其接收一个带id的request,返回相应的response。

include "base.thrift"namespace go kite.example.itemstruct Item {1: required i64 id,2: required string title,3: required string content,10: optional map<string, string> extra,
}struct GetItemRequest {1: required i64 id,255: optional base.Base Base,
}struct GetItemResponse {1: required Item item,255: optional base.BaseResp BaseResp,
}service ItemService {GetItemResponse GetItem (1: GetItemRequest req),
}

从客户端和服务端两部分来进行解读。

服务端路径

  1. 初始化processor

kite.go文件中在init()中初始化了processor,我们写的业务函数(handler)就是通过processor挂载到kite框架上。

func init() {kite.Processor = item.NewItemServiceProcessor(&ItemServiceHandler{})
}// ItemServiceHandler具备GetItem方法
type ItemServiceHandler struct{}// ItemServiceHandler的GetItem方法实际上是将mkGetItem方法包装后(KiteMW是用chainMiddleware装饰)进行调用
func (h *ItemServiceHandler) GetItem(ctx context.Context, r *item.GetItemRequest) (*item.GetItemResponse, error) {resp, err := kite.KiteMW(mkGetItem())(kite.InitMethodContext(ctx, "GetItem"), &KiteGetItemRequest{r})if resp == nil {return nil, err}return resp.(endpoint.KiteResponse).RealResponse().(*item.GetItemResponse), err
}// mkGetItem实际就是返回一个EndPoint,这个函数调了handler中实现的GetItem;
func mkGetItem() endpoint.EndPoint {return func(ctx context.Context, request interface{}) (interface{}, error) {req := request.(endpoint.KiteRequest).RealRequest()r := req.(*item.GetItemRequest)// 此处的GetItem为在handler中实现的resp, err := GetItem(ctx, r)if resp == nil {return nil, err}return &KiteGetItemResponse{resp}, err}
}
  1. 初始化服务器

func main() {// 初始化服务器kite.Init()logs.Error("%v", kite.Run())logs.Stop()
}

初始化的过程中主要是初始化配置,其中配置优先级file > envs > args,单例生成RPCServer实例。
生成rpc主要是实例化了transport和protocol,rpc必备的东西。

// NewRpcServer create the global server instance
func NewRpcServer() *RpcServer {// Using buffered transport and binary protocol as default,// buffer size is 4096var s *RpcServeroriTransport := thrift.NewTBufferedTransportFactory(DefaultTransportBufferedSize)transport := thrift.NewHeaderTransportFactory(oriTransport)protocol := thrift.NewHeaderProtocolFactory(thrift.ProtocolIDBinary)s = &RpcServer{transportFactory: transport,protocolFactory:  protocol,shmipcDone:       make(chan struct{}),}// FIXME:when mesh mode, we don't need remoteConfiger&overloader,// but there are more functions depend with those,// so replace a empty kvstore with ETCDif ServiceMeshMode {s.remoteConfiger = newRemoteConfiger(s, newEmptyStorer())s.overloader = newOverloader(1<<20, 1<<20, limitQPSInterval)} else {s.remoteConfiger = newRemoteConfiger(s, kvstore.NewETCDStorer())s.overloader = newOverloader(limitMaxConns, limitQPS, limitQPSInterval)}return s
}
  1. 注册中间件

调用kite.Use方法注册中间件,例如PanicAndTimeoutRecovrey就是一个实现超时控制的中间件,先注册的中间件是放在最外层,后注册的中间价放在内层。
kite.Use注册的中间件会放到userMW中,另外暴露给用户的添加中间件方式除了kite.Use还有Kite.AddMethodMW,此方法可以选择性地对某个方法注册中间件,放在mMap中。

// AddMethodMW use a middleware for a define method.
// Note: this function must not be called after kite.Init() is invoked.
func AddMethodMW(m string, mws ...endpoint.Middleware) {if len(mws) >= 1 {if mw, ok := mMap[m]; ok {mMap[m] = endpoint.Chain(mw, mws...)} else {mMap[m] = endpoint.Chain(mws[0], mws[1:]...)}}
}// Use middlewares will enable for all this service's method.
// Note: this function must not be called after kite.Init() is invoked.
func Use(mws ...endpoint.Middleware) {if len(mws) >= 1 {if userMW != nil {userMW = endpoint.Chain(userMW, mws...)} else {userMW = endpoint.Chain(mws[0], mws[1:]...)}}
}
  1. 运行服务器

func main() {kite.Init()// 运行服务器logs.Error("%v", kite.Run())logs.Stop()
}

Run内部其实主要是ListenAndServe(),简单来说就是起了一个socket,ip和port是init时读取的,然后监听socket调用processor进行处理。当然里面还有更多的细节,有空再补。

func Run() error {errCh := make(chan error, 1)go func() { errCh <- RPCServer.ListenAndServe() }()if err := waitSignal(errCh); err != nil {return err}return RPCServer.Stop()
}
  1. overLoader

overLoader是用来防止kite的过载,其主要包含了connLimiter和qpsLimiter。

// overloader protect kite from overload
type overloader struct {connLimiter            *ratelimit.ConcurrencyLimterqpsLimiter             *ratelimit.QPSLimiterendpointQPSLimiter     map[string]endpointQPSLimterendpointQPSLimiterLock sync.RWMutex // for update endpointLimiter
}

connLimiter用来保证连接数不超过limit,其包含limit、now、tmp三个成员变量,tmp用来表示过渡状态。其通过atomic来保证并发环境下的有效性。整体比较简单,不做赘述。

// ConcurrencyLimter .
type ConcurrencyLimter struct {lim int64now int64tmp int64
}// TakeOne .
func (ml *ConcurrencyLimter) TakeOne() bool {x := atomic.AddInt64(&ml.tmp, 1)if x <= atomic.LoadInt64(&ml.lim) {atomic.AddInt64(&ml.now, 1)return true}atomic.AddInt64(&ml.tmp, -1)return false
}

qpsLimiter实际是一个简化的令牌桶,其牺牲了一定的精确程度来兼顾性能。其包含limit、tockens、intervals、once、ticker等成员变量。其直接另起一个协程来更新tockens,每隔intervals增加once数量的tockens,上限不超过limit。在更新tocken数量的时候直接将更新的tocken保存,而没有采取CAS的验证手段,会导致一定的误差,但是能提升性能。当interval越大,误差会越小。

type QPSLimiter struct {limit    int32 // 应该没有需求承受大于21亿qps的吧……tokens   int32interval time.Durationonce     int32 // 每一个interval补充多少ticker   *time.Ticker
}func (l *QPSLimiter) updateToken() {var v int32v = atomic.LoadInt32(&l.tokens)if v < 0 {v = l.once} else if v+l.once > l.limit {v = l.limit} else {v = v + l.once}atomic.StoreInt32(&l.tokens, v)
}

服务端调用关系图

客户端路径

  1. 生成客户端实例

调用导入的客户端的包生成客户端实例;

client, err = ItemClient.NewClient("kite.example.item",           #set PSMkitc.WithInstances(ins...),    #set instance
)

NewClient调用了kitc的相应方法,并多传一个&KitcItemServiceClient{},其在服务端的client包中实现的;

func NewClient(PSM string, options ...kitc.Option) (*Client, error) {kc, err := kitc.NewWithThriftClient(PSM, &KitcItemServiceClient{}, options...)return &Client{kc}, err
}

客户端最常见的对象为kc对象,即kitc.kitcClient,好多对象只是在其基础上的封装。

 resp, err := client.GetItem(ctx, req)
  1. 客户端方法的内部实现

  • 首先会通过MethodInit方法创建context,在MethodInit中首先会initMWChain,使用Once.Do保证单次执行,初始化rpc调用的中间件,rpc的日志输出、超时控制、降级、服务发现、LB等都是通过这些中间件来实现的。
  • 然后MethodInit会调用initRPCInfo来创建rpcInfo。rpcInfo是比较关键的一个对象,里面包括了rpc相关的所有信息,包括rpcMeta、rpcConf、logId、instances、net.conn等对象。这里instances一般为空的,是通过服务发现来发现下游实例,本demo中传入instance是因为本地起的客户端没有服务发现。
  • 调用mkGetItem返回next,next就是裸的rpc调用,next内部会在rpcInfo.conn基础上开启transport,然后远程调用。next要放到MethodCall中用中间件包一下。
func (c *Client) GetItem(ctx context.Context, r *item.GetItemRequest) (*KitcGetItemResponse, error) {if c.kc == nil {return nil, errors.New("Client is not initialized")}// 将"GetItem"(method)、r(request)、以及rpcinfo(关于rpc调用的信息)放入ctxctx, err := c.kc.MethodInit("GetItem", ctx, r)if err != nil {return nil, err}// NewTItemServiceClientBuilder 其实只是在kc对象上包了一层;// 关键在于调用mkGetItem方法,其返回的next为一个函数,会在MethodCall中调用;next := mkGetItem(NewTItemServiceClientBuilder(c.kc))request := &KiteGetItemRequest{r}// 之所以要用MethodCall,是因为传入的next需要先chan一下(用中间件包一下);resp, err := c.kc.MethodCall(next, ctx, request)kitcResp, ok := resp.(*KitcGetItemResponse)if !ok {if resp != nil && err == nil {return nil, fmt.Errorf("Response type assertion failure: %T to %v", resp, "*KitcGetItemResponse")}return nil, err}return kitcResp, err// mkGetItem 返回Endpoint(格式定义好的函数),其会开启连接,关键在于transport.OpenWithContext(ctx),前面说过rpcinfo放在context中,并调用相关方法;
func mkGetItem(cb ItemServiceClientBuilder) endpoint.EndPoint {return func(ctx context.Context, request interface{}) (interface{}, error) {client := cb.New()transport := client.Transport.(kitc.Transport)err := transport.OpenWithContext(ctx)if err != nil {return nil, err}defer transport.Close()resp, err := client.GetItem(request.(endpoint.KitcCallRequest).RealRequest().(*item.GetItemRequest))addr := transport.RemoteAddr()return &KitcGetItemResponse{resp, addr}, err}
}

客户端调用关系图

有空再画。

小结

RPC框架应该有的功能:简化远程调用,像本地调用一样调用远程接口;服务注册和发现;序列化;网络通信;负载均衡;健康检查;限流和降级;超时控制等。在前文的kite调用链的梳理中,大部分都涉及到了,服务注册和发现是通过consul来实现的,服务注册是TCE平台做的,服务发现是通过客户端的中间件来实现的;简化远程调用,是通过客户端的封装实现,这部分kite帮我们做了;包括LB、降级、rpc超时等也是通过客户端的中间件来实现的。然后关于序列化和网络通信部分只是带过,基本都是基于thrift实现的,可以参考Thrift详解 ,后面有时间的话再把一些相关内容补充进来吧。

更新

时隔几个月后又回来更新kite的内容,再看之前写的内容略显青涩。言归正传,这次更新kite的主要契机是我们的服务接入了service mesh,想看下kite如何接入service mesh,比如:接入mesh后框架大部分的服务治理就不需要做了,同样接入mesh后如何处理到mesh的连接。然后写完这部分内容后,会再补充一下服务端和客户端的调用路径,主要集中在之前没有的如何处理连接等。

服务端

服务端启动时,首先会调用kite.Init(),该方法会初始化一些配置,其中配置优先级file > envs > args,service mesh的开关应该是在envs中,主要是通过环境变量来控制,这样才能实现业务方的无感知的切换。其中涉及到的环境变量如下。

if !ServiceMeshMode && os.Getenv("SERVICE_MESH_INGRESS_ADDR") != "" {ServiceMeshMode = trueServiceMeshIngressAddr = os.Getenv("SERVICE_MESH_INGRESS_ADDR")// Load shmipc config if and only if service mesh mode is enabledif shmIPCAddr := os.Getenv("SHMIPC_INGRESS_ADDR"); shmIPCAddr != "" {ServiceMeshShmIPCAddr = shmIPCAddr}if shmIPCPath := os.Getenv("SHMIPC_PATH"); shmIPCPath != "" {ServiceMeshShmIPCConfig.ShareMemoryPathPrefix = shmIPCPath}if shmIPCBufferCap := os.Getenv("SHMIPC_BUFFER_CAPACITY"); shmIPCBufferCap != "" {n, err := strconv.ParseUint(shmIPCBufferCap, 10, 32)if err == nil {ServiceMeshShmIPCConfig.ShareMemoryBufferCap = uint32(n)}}
}

在byteMesh中,业务进程和proxy的通信不是通过iptables来进行的,而是使用unix域套接字和共享内存实现。其中数据传递主要是通过共享内存,通过共享内存的传递效率比较高,unix域套接字主要用来通知共享内存中有数据需要处理。

func (p *RpcServer) Serve(ln net.Listener) error {if p.l != nil {panic("KITE: Listener not nil")}p.l = lnlogs.Info("KITE: server listening on %s", ln.Addr())if Processor == nil {panic("KITE: Processor is nil")}p.processorFactory = thrift.NewTProcessorFactory(Processor)p.startDaemonRoutines()// mesh和非mesh在处理底层连接上有所区别if ServiceMeshShmIPCAddr != "" {go p.shmIPCServe()defer p.shmipcDoneOnce.Do(func() {close(p.shmipcDone)})}return p.acceptAndHandle(p.l.Accept, false)
}

之后的处理流程上,mesh和非mesh都一样,都是accept得到connection,起一个协程处理该连接:在connection上包装transport,在transport上封装protocol,然后processor进行处理。前文的服务端调用路径中补充如下。

【微服务系列】kite的具体实现相关推荐

  1. 微服务系列(五):事件驱动的数据管理

    编者的话|本文来自 Nginx 官方博客,是「Chris Richardson 微服务」系列的第五篇文章.第一篇文章介绍了微服务架构模式,并且讨论了使用微服务的优缺点:第二和第三篇描述了微服务架构模块 ...

  2. 微服务系列(七):将单体应用改造为微服务

    编者的话|本文来自 Nginx 官方博客,是「Chris Richardson 微服务」系列的第五篇文章.第一篇文章介绍了微服务架构模式,并且讨论了使用微服务的优缺点:第二和第三篇描述了微服务架构模块 ...

  3. 「微服务系列 13」熔断限流隔离降级

    我们知道微服务分布式依赖关系错综复杂,比方说前端的请求转化为后端调用的服务请求,一个前端请求会转为成很多个后端调用的服务请求,那么这个时候后台的服务出现不稳定或者延迟,如果没有好的限流熔断措施,可能会 ...

  4. Spring Cloud微服务系列文,服务调用框架Feign

    之前博文的案例中,我们是通过RestTemplate来调用服务,而Feign框架则在此基础上做了一层封装,比如,可以通过注解等方式来绑定参数,或者以声明的方式来指定请求返回类型是JSON.    这种 ...

  5. 01.微服务系列介绍

    微服务系列实践 .NET CORE 在开始之前呢,还是得废话一下,毕竟还是需要介绍一下这个系列我们要实现什么样的一套服务架构,也让大家能初步的有一个了解,后续实践起来也有一个完整的概念,相对也会容易的 ...

  6. 微服务系列实践 .NET CORE

    从事这个行业转眼已经6年了,从当初刚毕业的在北京朝八晚十,从二环到五环,仍每天精力充沛的小愤青:再到深圳一点一滴的辛勤在软件行业的耕种,从当初单体应用架构到现在微服务架构的经历,回想起来自己的收获倒是 ...

  7. 微服务系列:服务注册与发现的实现原理、及实现优劣势比较

    服务注册与发现的来源 首先,服务注册与发现是来自于微服务架构的产物. 在传统的服务架构中,服务的规模处于运维人员的可控范围内.当部署服务的多个节点时,一般使用静态配置的方式实现服务信息的设定.而在微服 ...

  8. 微服务系列:Dubbo与SpringCloud的Ribbon、Hystrix、Feign的优劣势比较

    在微服务架构中,分布式通信.分布式事务.分布式锁等问题是亟待解决的几个重要问题. Spring Cloud是一套完整的微服务解决方案,基于 Spring Boot 框架.确切的说,Spring Clo ...

  9. Mysql保存是事件驱动吗_【CHRIS RICHARDSON 微服务系列】事件驱动的数据管理-5

    编者的话 |本文来自 Nginx 官方博客,是「Chris Richardson 微服务」系列的第五篇文章.第一篇文章介绍了微服务架构模式,并且讨论了使用微服务的优缺点:第二和第三篇描述了微服务架构模 ...

  10. 微服务系列 —— 一小时搞定Eureka

    微服务系列  -- 一小时搞定Eureka 一.什么是Eureka Eureka是Netflix公司开源的产品,它是一种基于REST( Representational State Transfer  ...

最新文章

  1. 《c++语言导学》——1.7 常量
  2. 弹性盒子内容体居右对其_CSS怎么实现弹性盒中的元素居中对齐
  3. hdu 3786 寻找直系亲属
  4. 好像是第一次在公司外的论坛上公开演讲
  5. 二层交换机、三层交换机与路由器的比较
  6. jmap查看内存使用情况与生成heapdump--转
  7. 用稳压管保护单片机引脚_零基础入门单片机(2)学会控制IO引脚你就入门啦
  8. shell编程之条件判断语句和流程控制语句
  9. IOS15使用Masonry和自动计算Cell的高度
  10. java 异常怎么划分_java异常的分类
  11. linux下各种解压方法
  12. 手机微信如何设置浏览器打开时提醒
  13. noip2017普及组
  14. 《学习笔记》使用AngularJS模板来创建视图
  15. source insight护眼模式颜色
  16. 现在,让客服接管数字化企业
  17. 高等代数 具有度量的线性空间(第10章)5 正交空间与辛空间
  18. [ubuntu14.04]linux 开发装机必备
  19. 用于图像分割的卷积神经网络:从R-CNN到Mask R-CNN
  20. 区块链公司依靠电信主网颠覆汇款行业

热门文章

  1. 《中国航海》期刊从投递到录用指南
  2. 一键Ghost恢复系统的操作步骤
  3. Excel计算BMI
  4. FTP无法连接解决办法
  5. 什么是平均负载,以及影响平均负载的因素
  6. 机器算法有哪几种 python_机器学习10种经典算法的Python实现
  7. sqlserver 内连接 左右连接 自动连接
  8. CSS浮动的基本概念和运用
  9. 后台的作用以及如何设计后台。
  10. [经验]Class MediaTransCoder is implemented in both One of the two will be used. Which one is undefined