discovery是B站开源的类Eurekad的一款服务注册与发现框架,简单介绍如下:

1. 实现AP类型服务注册发现系统,在可用性极极极极强的情况下,努力保证数据最终一致性
2. 与公司k8s平台深度结合,注册打通、发布平滑、naming service等等
3. 网络闪断等异常情况,可自我保护,保证每个节点可用
4. 基于HTTP协议实现接口,简单易用,维护各流行语言SDK## 相对Netflix Eureka的改进* 长轮询监听应用变更(Eureka定期30s拉取一次)
* 只拉取感兴趣的AppID实例(Eureka一拉就是全部,无法区分)
* 合并node之间的同步请求/(ㄒoㄒ)/~~其实还没实现,是个TODO
* Dashboard骚操作~
* 多注册中心信息同步支持
* 更完善的日志记录

下面简单就discovery的源码进项分析,首先先熟悉一下框架的一些基本概念:

0. 通过AppID(服务名)和hostname定位实例
1. Node: discovery server节点
2. Provider: 服务提供者,目前托管给k8s平台,容器启动后发起register请求给Discover server,后定期(30s)心跳一次
3. Consumer: 启动时拉取node节点信息,后随机选择一个node发起long polling(30s一次)拉取服务instances列表
4. Instance: 保存在node内存中的AppID对应的容器节点信息,包含hostname/ip/service等

比较重要的一些特色是:

1. 心跳复制(Peer to Peer),数据一致性的保障:* AppID注册时根据当前时间生成dirtyTimestamp,nodeA向nodeB同步(register)时,nodeB可能有以下两种情况:* 返回-404 则nodeA携带dirtyTimestamp向nodeB发起注册请求,把最新信息同步:1. nodeB中不存在实例2. nodeB中dirtyTimestamp较小* 返回-409 nodeB不同意采纳nodeA信息,且返回自身信息,nodeA使用该信息更新自身* AppID注册成功后,Provider每(30s)发起一次heartbeat请求,处理流程如上
2. Instance管理* 正常检测模式,随机分批踢掉无心跳Instance节点,尽量避免单应用节点被一次全踢* 网络闪断和分区时自我保护模式* 60s内丢失大量(小于Instance总数*2*0.85)心跳数,“好”“坏”Instance信息都保留* 所有node都会持续提供服务,单个node的注册和发现功能不受影响* 最大保护时间,防止分区恢复后大量原先Instance真的已经不存在时,一直处于保护模式
3. Consumer客户端* 长轮询+node推送,服务发现准实时* 订阅式,只需要关注想要关注的AppID的Instance列表变化* 缓存实例Instance列表信息,保证与node网络不通等无法访问到node情况时原先的Instance可用

下面开始进行源码分析:

discovery的入口文件非常简单:

func main() {//解析配置文件flag.Parse()if err := conf.Init(); err != nil {log.Error("conf.Init() error(%v)", err)panic(err)}fmt.Println("conf", conf.Conf)log.Init(conf.Conf.Log)//开始一个新的discovery中心dis, cancel := discovery.New(conf.Conf)//本地实例的http监听端口,提供了一系列的http接口,比如注册,更新和下线接口等http.Init(conf.Conf, dis)// init signalc := make(chan os.Signal, 1)signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)for {s := <-clog.Info("discovery get a signal %s", s.String())switch s {//如果监听到停止信号,则进行收尾工作处理,在cancel函数中具体说明case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:cancel()time.Sleep(time.Second)log.Info("discovery quit !!!")returncase syscall.SIGHUP:default:return}}
}

其中最核心的函数是dis, cancel := discovery.New(conf.Conf),下面详细分析这个函数:

// New get a discovery.
func New(c *conf.Config) (d *Discovery, cancel context.CancelFunc) {//构建一个新的Discoveryd = &Discovery{protected: c.EnableProtect,c:         c,client:    http.NewClient(c.HTTPClient),//初始化服务注册中心,并开始协程进行保护模式下的逻辑处理registry: registry.NewRegistry(c),}//读取配置文件中的zone和node相关的配置读取出来,返回Nodes结构体// Nodes is helper to manage lifecycle of a collection of Nodes.//type Nodes struct {//   nodes    []*Node  //Node切片数组//  zones    map[string][]*Node//   selfAddr string  //本地http监听的地址//}d.nodes.Store(registry.NewNodes(c))//将除了本地http监听端口的host之外其他的host,拉取出其他host的所有实例,在本地进行注册d.syncUp()//注册自己本身,并每隔30s进行一次心跳检测cancel = d.regSelf()//开始协程进行长轮训,实现服务注册进来后准实时发现go d.nodesproc()//如果成功进行了两轮renew循环,则关闭保护模式//保护模式下不再接受其他Discovery中心的拉取和推送请求,之前提供的http接口也失效,但是本地Discovery还是可以正常提供服务注册go d.exitProtect()return
}

这个函数中包括了一系列的逻辑,先来看d.syncUp()函数

// syncUp populates the registry information from a peer eureka node.
func (d *Discovery) syncUp() {nodes := d.nodes.Load().(*registry.Nodes)//将配置文件中的所有node拿出来,循环处理for _, node := range nodes.AllNodes() {log.Info("syncUp nodes are %v", node)//如果是自己本身,则跳过if nodes.Myself(node.Addr) {continue}uri := fmt.Sprintf(_fetchAllURL, node.Addr)var res struct {Code int                          `json:"code"`Data map[string][]*model.Instance `json:"data"`}//获取其他节点的所有instance实例,如果其他节点没起来或者fetch的时候发生异常,则跳过这个节点的处理if err := d.client.Get(context.TODO(), uri, "", nil, &res); err != nil {log.Error("d.client.Get(%v) error(%v)", uri, err)continue}log.Info("fetch res are %v", res, res.Code)if res.Code != 0 {log.Error("service syncup from(%s) failed ", uri)continue}// sync success from other node,exit protected mode//如果能正常从其他节点拉取到他们的实例信息,则表示节点之间的通讯是正常的,如果配置文件中是开启保护模式的话,这个时候就可以关闭保护模式了d.protected = falsefor _, is := range res.Data {for _, i := range is {//将每个节点中的实例注册到本地_ = d.registry.Register(i, i.LatestTimestamp)}}// NOTE: no return, make sure that all instances from other nodes register into self.}//将本地node状态置为正常nodes.UP()
}

上面的流程主要是将其他节点的实例拉过来,然后注册到本地,所以下面我们来看本地注册干了些啥

/ Register a new instance.
func (r *Registry) Register(ins *model.Instance, latestTime int64) (err error) {//初始化一个APP实例,APP的结构在这里放一下,方便后面的理解/*// App Instances distinguished by hostnametype App struct {AppID           stringZone            stringinstances       map[string]*InstancelatestTimestamp int64 lock sync.RWMutex}*/a := r.newApp(ins)//将传入的ins传入后,copy后返回一个全新的instance,类似于快照,记录这个时刻的insi, ok := a.NewInstance(ins, latestTime)if ok {//如果注册成功,则将expPerMin+2(+2是因为每分钟discovery会renew两次,也计算出相应的expThreshold,方便后面的保护模式的计算)r.gd.incrExp()}// NOTE: make sure free poll before update appid latest timestamp.//既然有新的实例注册进来了,当然要广播出去,让本地去更新实例缓存了r.broadcast(i.Env, i.AppID)return
}

先来看r.newApp(ins)这个函数

func (r *Registry) newApp(ins *model.Instance) (a *model.App) {//先实例化一个appsas, _ := r.newapps(ins.AppID, ins.Env)//然后开始搞一个新的App实例a, _ = as.NewApp(ins.Zone, ins.AppID, ins.LatestTimestamp)return
}func (r *Registry) newapps(appid, env string) (a *model.Apps, ok bool) {//key是appid-env的stringkey := appsKey(appid, env)r.aLock.Lock()//先看下这个key是否有了Apps了/*// Apps app distinguished by zonetype Apps struct {apps            map[string]*Applock            sync.RWMutexlatestTimestamp int64}*/if a, ok = r.appm[key]; !ok {a = model.NewApps()r.appm[key] = a}r.aLock.Unlock()return
}// NewApp news a app by appid. If ok=false, returns the app of already exist.
func (p *Apps) NewApp(zone, appid string, lts int64) (a *App, new bool) {p.lock.Lock()a, ok := p.apps[zone]if !ok {a = NewApp(zone, appid)p.apps[zone] = a}if lts <= p.latestTimestamp {// insure increaselts = p.latestTimestamp + 1}//注意这里,在注册的逻辑里,如果有新的实例注册进来的话,latestTimestamp这个字段是有更新的//正常情况下,会用注册的实例的注册时间进行更新p.latestTimestamp = ltsp.lock.Unlock()new = !okreturn
}

通过上面返回的App实例,初始化Instance信息

// NewInstance new a instance.
func (a *App) NewInstance(ni *Instance, latestTime int64) (i *Instance, ok bool) {i = new(Instance)a.lock.Lock()oi, ok := a.instances[ni.Hostname]if ok {ni.UpTimestamp = oi.UpTimestampif ni.DirtyTimestamp < oi.DirtyTimestamp {log.Warn("register exist(%v) dirtyå timestamp over than caller(%v)", oi, ni)ni = oi}}a.instances[ni.Hostname] = nia.updateLatest(latestTime)*i = *nia.lock.Unlock()ok = !okreturn
}

实例已经注册好了,下面就广播出去吧

// broadcast on poll by chan.
// NOTE: make sure free poll before update appid latest timestamp.
func (r *Registry) broadcast(env, appid string) {key := pollKey(env, appid)r.cLock.Lock()conns, ok := r.conns[key]//如果是刚启动的时候将其他节点的实例注册进来的话,这里是空的,在这里直接返回fmt.Println("conns", conns)if !ok {fmt.Println("no co")r.cLock.Unlock()return}
第一次广播的时候,直接返回
.......
}

至此,其他节点的处理流程就处理完了,下面我们看下将自己本身注册进来

func (d *Discovery) regSelf() context.CancelFunc {ctx, cancel := context.WithCancel(context.Background())now := time.Now().UnixNano()ins := &model.Instance{Region:   d.c.Env.Region,Zone:     d.c.Env.Zone,Env:      d.c.Env.DeployEnv,Hostname: d.c.Env.Host,AppID:    model.AppID,Addrs: []string{"http://" + d.c.HTTPServer.Addr,},Status:          model.InstanceStatusUP,RegTimestamp:    now,UpTimestamp:     now,LatestTimestamp: now,RenewTimestamp:  now,DirtyTimestamp:  now,}//将自己注册进来,并将自身的实例信息同步到其他的节点进行注册d.Register(ctx, ins, now, false, false)go func() {ticker := time.NewTicker(30 * time.Second)defer ticker.Stop()for {select {case <-ticker.C:arg := &model.ArgRenew{AppID:    ins.AppID,Zone:     d.c.Env.Zone,Env:      d.c.Env.DeployEnv,Hostname: d.c.Env.Host,}//每隔30秒renew一下if _, err := d.Renew(ctx, arg); err != nil && err == ecode.NothingFound {log.Info("renew self err is %v", err)d.Register(ctx, ins, now, false, false)}case <-ctx.Done():arg := &model.ArgCancel{AppID:    model.AppID,Zone:     d.c.Env.Zone,Env:      d.c.Env.DeployEnv,Hostname: d.c.Env.Host,}if err := d.Cancel(context.Background(), arg); err != nil {log.Error("d.Cancel(%+v) error(%v)", arg, err)}return}}}()return cancel
}

这里的主要逻辑是将自身注册进来,并每隔30秒renew一次,将自己的信息同步到其他节点,下面分析一下renew的逻辑

// Renew marks the given instance of the given app name as renewed, and also marks whether it originated from replication.
func (d *Discovery) Renew(c context.Context, arg *model.ArgRenew) (i *model.Instance, err error) {log.Info("renew args are %v", arg)//获取自身实例信息i, ok := d.registry.Renew(arg)if !ok {err = ecode.NothingFoundlog.Error("renew appid(%s) hostname(%s) zone(%s) env(%s) error", arg.AppID, arg.Hostname, arg.Zone, arg.Env)return}//同步信息到其他节点(如果需要的话,这种情况下同步后直接退出了)if !arg.Replication {_ = d.nodes.Load().(*registry.Nodes).Replicate(c, model.Renew, i, arg.Zone != d.c.Env.Zone)return}//如果renew的DirtyTimestamp大于实例的DirtyTimestamp,返回-404,满足如下条件中的第二种条件// * 返回-404 则nodeA携带dirtyTimestamp向nodeB发起注册请求,把最新信息同步://            1. nodeB中不存在实例//            2. nodeB中dirtyTimestamp较小if arg.DirtyTimestamp > i.DirtyTimestamp {err = ecode.NothingFound} else if arg.DirtyTimestamp < i.DirtyTimestamp {err = ecode.Conflict}return
}//同步逻辑
// Replicate replicate information to all nodes except for this node.
func (ns *Nodes) Replicate(c context.Context, action model.Action, i *model.Instance, otherZone bool) (err error) {log.Warn("nodes is %v,len is %v", ns.nodes, len(ns.nodes))if len(ns.nodes) == 0 {return}eg, c := errgroup.WithContext(c)for _, n := range ns.nodes {log.Warn("Replicate node is %v", n)//将自身实例同步到其他节点if !ns.Myself(n.addr) {ns.action(c, eg, action, n, i)}}if !otherZone {for _, zns := range ns.zones {if n := len(zns); n > 0 {ns.action(c, eg, action, zns[rand.Intn(n)], i)}}}err = eg.Wait()return
}//action函数具体逻辑
func (ns *Nodes) action(c context.Context, eg *errgroup.Group, action model.Action, n *Node, i *model.Instance) {log.Info("action arg is %v", i)switch action {case model.Register:eg.Go(func() error {_ = n.Register(c, i)return nil})case model.Renew://开启协程去renew到其他节点eg.Go(func() error {_ = n.Renew(c, i)return nil})case model.Cancel:eg.Go(func() error {_ = n.Cancel(c, i)return nil})}
}//n.Renew函数的逻辑
// Renew send the heartbeat information of Instance receiving by this node to the peer node represented.
// If the instance does not exist the node, the instance registration information is sent again to the peer node.
func (n *Node) Renew(c context.Context, i *model.Instance) (err error) {var res *model.Instanceerr = n.call(c, model.Renew, i, n.renewURL, &res)log.Info("renew other node info are %v,url is %v,res is %v", i, n.renewURL, err)if err == ecode.ServerErr {log.Warn("node be called(%s) instance(%v) error(%v)", n.renewURL, i, err)n.status = model.NodeStatusLostreturn}n.status = model.NodeStatusUPif err == ecode.NothingFound {log.Warn("node be called(%s) instance(%v) error(%v)", n.renewURL, i, err)err = n.call(c, model.Register, i, n.registerURL, nil)return}// NOTE: register response instance whitch in conflict with peer nodeif err == ecode.Conflict && res != nil {err = n.call(c, model.Register, res, n.pRegisterURL, nil)}return
}//重点在于n.call(c, model.Renew, i, n.renewURL, &res)
func (n *Node) call(c context.Context, action model.Action, i *model.Instance, uri string, data interface{}) (err error) {params := url.Values{}params.Set("region", i.Region)params.Set("zone", i.Zone)params.Set("env", i.Env)params.Set("appid", i.AppID)params.Set("hostname", i.Hostname)params.Set("from_zone", "true")//同步到其他节点的时候,一般是走入为true的逻辑分支if n.otherZone {params.Set("replication", "false")} else {params.Set("replication", "true")}switch action {case model.Register:params.Set("addrs", strings.Join(i.Addrs, ","))params.Set("status", strconv.FormatUint(uint64(i.Status), 10))params.Set("version", i.Version)meta, _ := json.Marshal(i.Metadata)params.Set("metadata", string(meta))params.Set("reg_timestamp", strconv.FormatInt(i.RegTimestamp, 10))params.Set("dirty_timestamp", strconv.FormatInt(i.DirtyTimestamp, 10))params.Set("latest_timestamp", strconv.FormatInt(i.LatestTimestamp, 10))case model.Renew:params.Set("dirty_timestamp", strconv.FormatInt(i.DirtyTimestamp, 10))case model.Cancel:params.Set("latest_timestamp", strconv.FormatInt(i.LatestTimestamp, 10))}var res struct {Code int             `json:"code"`Data json.RawMessage `json:"data"`}//请求其他节点的renewUrl(http接口)if err = n.client.Post(c, uri, "", params, &res); err != nil {log.Error("node be called(%s) instance(%v) error(%v)", uri, i, err)return}if res.Code != 0 {log.Error("node be called(%s) instance(%v) response code(%v)", uri, i, res.Code)if err = ecode.Int(res.Code); err == ecode.Conflict {_ = json.Unmarshal([]byte(res.Data), data)}}return
}

renew的时候的逻辑是根据renew的参数找到自身实例,再将自身信息同步到其他节点

重点讲解一下renew的逻辑:

在将自身信息注册到其他的discovery之后,会有两种情况发生

1.其他节点向本地发起renew请求

先是从本地缓存中查找到renew请求arg中的zone,env和appid找到之前注册在本地的app信息,再通过arg中的host找到app中的instances(map)中的instance信息(在这个过程中会更新实例的renewTimestamp),正常情况下就没有其他的逻辑了,因为其他的分支都进不去

2.本地节点更新

每隔30秒本地会拿着conf文件中的zone,env,host和在本地注册时候的AppID作为参数进行renew,

也是先找出本地缓存中的实例信息,然后同步给其他node(http POST请求其他discovery节点的接口地址),在同步其他节点的时候,会从zones(初始化注册时候的其他zone的节点信息)切片中随机选取一个node进行同步

上述注册其他节点和自身节点完成后,就是一个长轮训,实现服务发现准实时

func (d *Discovery) nodesproc() {var (lastTs int64)for {arg := &model.ArgPolls{AppID:           []string{model.AppID},Env:             d.c.Env.DeployEnv,Hostname:        d.c.Env.Host,LatestTimestamp: []int64{lastTs},}log.Info("polls times is %v", time.Now().Format("2006-01-02 15:04:05"))//返回一个消费者chanch, _, _, err := d.registry.Polls(arg)if err != nil && err != ecode.NotModified {log.Error("d.registry(%v) error(%v)", arg, err)time.Sleep(time.Second)continue}log.Info("wait for ch out")apps := <-chins, ok := apps[model.AppID]fmt.Println("go process ins", ins)if !ok || ins == nil {return}var (nodes []stringzones = make(map[string][]string))for _, ins := range ins.Instances {for _, in := range ins {log.Info("range ins are %v", in)for _, addr := range in.Addrs {u, err := url.Parse(addr)if err == nil && u.Scheme == "http" {if in.Zone == d.c.Env.Zone {nodes = append(nodes, u.Host)} else {zones[in.Zone] = append(zones[in.Zone], u.Host)}}}}}lastTs = ins.LatestTimestampc := new(conf.Config)*c = *d.cc.Nodes = nodesc.Zones = zonesns := registry.NewNodes(c)ns.UP()d.nodes.Store(ns)log.Info("discovery changed nodes:%v zones:%v", nodes, zones)}
}

这个准实时的实现我的理解是:

1.在d.registry.Polls(arg)方法调用后会返回一个消费者channel,然后会在for循环中会阻塞在这个channel里,一旦有新的节点注册进来的时候(我们可以看一下,新实例注册的时候,会进行广播通知,这里有个

会往这个channel中写入新注册的instance信息。)就会立马收到注册信息,然后更新本地实例缓存,做到服务发现近乎实时。

还有discovery的自保护模式,在下一篇中进行分析了。

服务注册与发现框架discovery源码解析相关推荐

  1. 【SpringCloud系列】服务注册与发现 - Eureka Server源码分析(2)

    3.6.Eureka Server 源码分析 上一篇文章简单介绍了 Eureka 的一些概念,今天咱们来看看其原理和源码,首先先看 Eureka Server 的原理. 3.6.1.Eureka Se ...

  2. 【开源项目】动态线程池框架Hippo4j源码解析

    动态线程池框架Hippo4j源码解析 项目简介 Hippo-4J 通过对 JDK 线程池增强,以及扩展三方框架底层线程池等功能,为业务系统提高线上运行保障能力. 快速开始 https://hippo4 ...

  3. Java熔断框架有哪些_降级熔断框架 Hystrix 源码解析:滑动窗口统计

    降级熔断框架 Hystrix 源码解析:滑动窗口统计 概述 Hystrix 是一个开源的降级熔断框架,用于提高服务可靠性,适用于依赖大量外部服务的业务系统.什么是降级熔断呢? 降级 业务降级,是指牺牲 ...

  4. 深度学习框架Caffe源码解析

    作者:薛云峰(https://github.com/HolidayXue),主要从事视频图像算法的研究, 本文来源微信公众号:深度学习大讲堂.  原文:深度学习框架Caffe源码解析  欢迎技术投稿. ...

  5. Android之图片加载框架Picasso源码解析

    转载请标明出处: http://blog.csdn.net/hai_qing_xu_kong/article/details/76645535 本文出自:[顾林海的博客] 个人开发的微信小程序,目前功 ...

  6. react相关代码库以及框架的源码解析

    持续更新中react相关库源码浅析, react ts3 项目 ???对react相关代码库以及框架的源码进行了一定的分析 ?react16.6 View contents 源码实例分析:可见runl ...

  7. 迷你 JS 框架 Hyperapp 源码解析

    Hyperapp 是最近热度颇高的一款迷你 JS 框架,其源码不到 400 行,压缩 gzip 后只有 1kB,却具有相当高的完成度,拿来实现简单的 web 应用也不在话下.整体实现上,Hyperap ...

  8. android网络框架retrofit源码解析二

    注:源码解析文章参考了该博客:http://www.2cto.com/kf/201405/305248.html 前一篇文章讲解了retrofit的annotation,既然定义了,那么就应该有解析的 ...

  9. Android 图片加载框架Gilde源码解析

    1.使用Gilde显示一张图片 Glide.with(this).load("https://cn.bing.com/sa/simg/hpb/xxx.jpg").into(imag ...

最新文章

  1. 【MATLAB】数据类型 ( 执行代码 | 清空命令 | 注释 | 数字 | 字符 | 字符串 )
  2. 【GDOI2014模拟】旅行 题解代码
  3. mstsc /console 远程命令
  4. 订单数据持久化和验证相关解决方案
  5. Java 并发(JUC 包-04)
  6. 基于layui的框架模版,采用模块化设计,接口分离,组件化思想
  7. CCNP精粹系列之十三-----OSPF路由汇总
  8. VC2005中将Picture控件显示图片保存为BMP,JPG等格式
  9. Servlet开发(1)-----基础及MVC设计模式
  10. android 手机头提示消息,正确的手机头部声明(android,iphone)
  11. C#调用谷歌翻译API
  12. sql server 代理权限问题
  13. 高效程序员的7个技能
  14. 尚德机构季报图解:净利1.79亿 实现连续四个季度盈利
  15. android sms 接收短信,Android SMS 短信操作
  16. 小白求答疑,在vs连接数据库的一段配置代码有问题
  17. 史上最恐怖的10篇超短篇鬼故事(转…
  18. react and reduct 学习手记1
  19. 第二章:并行硬件和并行软件
  20. Linux上安装Mysql

热门文章

  1. Citrix 服务器虚拟化之三 Xenserver 网络管理
  2. C++多线程----进程与线程区别
  3. 分布式理论,看完这篇你定能有所获
  4. 关于JNI local ref未释放可在tombstones中去查看具体函数
  5. 线上渠道拓展:澳洲爱他美产品如何利用新媒体平台宣传?
  6. Nessus详细使用教程
  7. day_04_资源和图像、目录和定时器、鼠标和键盘
  8. JSP怎么给手机发短信对接验证码短信接口DEMO示例
  9. 5个理由告诉你:为什么你的工资只能叫收入,别人家的却叫高薪!
  10. mysql fulltext类型,mysql – 使用的表类型不支持FULLTEXT索引