Consul提供了api注册服务的途径,在http_oss文件中给出了接口的注册以及路径。

registerEndpoint("/v1/agent/service/register", []string{"PUT"}, (*HTTPServer).AgentRegisterService)

这里通过registerEndpoint()方法,对注册的路径,方法,已经调用的方法进行了配置。

func registerEndpoint(pattern string, methods []string, fn unboundEndpoint) {if endpoints == nil {endpoints = make(map[string]unboundEndpoint)}if endpoints[pattern] != nil || allowedMethods[pattern] != nil {panic(fmt.Errorf("Pattern %q is already registered", pattern))}endpoints[pattern] = fnallowedMethods[pattern] = methods
}

这里将路径与对应的方法作为键值对存储在map中,在agent调用start()方法启动的时候,通过listenHttp()方法开启对应路径的的监听,保证服务的调用,确保对应的方法能够被调用。

func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Request) (interface{}, error) {var args structs.ServiceDefinition// Fixup the type decode of TTL or Interval if a check if provided.decodeCB := func(raw interface{}) error {rawMap, ok := raw.(map[string]interface{})if !ok {return nil}// see https://github.com/hashicorp/consul/pull/3557 why we need this// and why we should get rid of it.config.TranslateKeys(rawMap, map[string]string{"enable_tag_override": "EnableTagOverride",})for k, v := range rawMap {switch strings.ToLower(k) {case "check":if err := FixupCheckType(v); err != nil {return err}case "checks":chkTypes, ok := v.([]interface{})if !ok {continue}for _, chkType := range chkTypes {if err := FixupCheckType(chkType); err != nil {return err}}}}return nil}if err := decodeBody(req, &args, decodeCB); err != nil {resp.WriteHeader(http.StatusBadRequest)fmt.Fprintf(resp, "Request decode failed: %v", err)return nil, nil}// Verify the service has a name.if args.Name == "" {resp.WriteHeader(http.StatusBadRequest)fmt.Fprint(resp, "Missing service name")return nil, nil}// Check the service address here and in the catalog RPC endpoint// since service registration isn't synchronous.if ipaddr.IsAny(args.Address) {resp.WriteHeader(http.StatusBadRequest)fmt.Fprintf(resp, "Invalid service address")return nil, nil}// Get the node service.ns := args.NodeService()if err := structs.ValidateMetadata(ns.ServiceMeta, false); err != nil {resp.WriteHeader(http.StatusBadRequest)fmt.Fprint(resp, fmt.Errorf("Invalid Service Meta: %v", err))return nil, nil}// Verify the check type.chkTypes, err := args.CheckTypes()if err != nil {resp.WriteHeader(http.StatusBadRequest)fmt.Fprint(resp, fmt.Errorf("Invalid check: %v", err))return nil, nil}for _, check := range chkTypes {if check.Status != "" && !structs.ValidStatus(check.Status) {resp.WriteHeader(http.StatusBadRequest)fmt.Fprint(resp, "Status for checks must 'passing', 'warning', 'critical'")return nil, nil}}// Get the provided token, if any, and vet against any ACL policies.var token strings.parseToken(req, &token)if err := s.agent.vetServiceRegister(token, ns); err != nil {return nil, err}// Add the service.if err := s.agent.AddService(ns, chkTypes, true, token); err != nil {return nil, err}s.syncChanges()return nil, nil
}

在接受到注册服务的请求的时候,服务的数据结构已经由结构体ServiceDefinition定义。

type ServiceDefinition struct {ID                stringName              stringTags              []stringAddress           stringMeta              map[string]stringPort              intCheck             CheckTypeChecks            CheckTypesToken             stringEnableTagOverride bool
}

这里,也已经能够清晰的得到注册服务所需要的参数。

而后通过decodeBody()方法对request请求中的数据进行解析,得到所需要的服务的具体数据。同时,在调用之前重新定义了decodeCB()方法,并一并随着decodeBody()方法作为参数传入。

func decodeBody(req *http.Request, out interface{}, cb func(interface{}) error) error {var raw interface{}dec := json.NewDecoder(req.Body)if err := dec.Decode(&raw); err != nil {return err}// Invoke the callback prior to decodeif cb != nil {if err := cb(raw); err != nil {return err}}return mapstructure.Decode(raw, out)
}

可以看到在解析初次得到request中的body之后,会调用cb对于特定的字段进行更加复杂的操作,而在此处,生成的decodeCB()方法如下。

decodeCB := func(raw interface{}) error {rawMap, ok := raw.(map[string]interface{})if !ok {return nil}// see https://github.com/hashicorp/consul/pull/3557 why we need this// and why we should get rid of it.config.TranslateKeys(rawMap, map[string]string{"enable_tag_override": "EnableTagOverride",})for k, v := range rawMap {switch strings.ToLower(k) {case "check":if err := FixupCheckType(v); err != nil {return err}case "checks":chkTypes, ok := v.([]interface{})if !ok {continue}for _, chkType := range chkTypes {if err := FixupCheckType(chkType); err != nil {return err}}}}return nil
}

对于check和checks需要进行特殊的处理,而处理方法实现在FixCheckType()当中。

func FixupCheckType(raw interface{}) error {rawMap, ok := raw.(map[string]interface{})if !ok {return nil}// See https://github.com/hashicorp/consul/pull/3557 why we need this// and why we should get rid of it. In Consul 1.0 we also didn't map// Args correctly, so we ended up exposing (and need to carry forward)// ScriptArgs, see https://github.com/hashicorp/consul/issues/3587.config.TranslateKeys(rawMap, map[string]string{"args":                              "ScriptArgs","script_args":                       "ScriptArgs","deregister_critical_service_after": "DeregisterCriticalServiceAfter","docker_container_id":               "DockerContainerID","tls_skip_verify":                   "TLSSkipVerify","service_id":                        "ServiceID",})parseDuration := func(v interface{}) (time.Duration, error) {if v == nil {return 0, nil}switch x := v.(type) {case time.Duration:return x, nilcase float64:return time.Duration(x), nilcase string:return time.ParseDuration(x)default:return 0, fmt.Errorf("invalid format")}}parseHeaderMap := func(v interface{}) (map[string][]string, error) {if v == nil {return nil, nil}vm, ok := v.(map[string]interface{})if !ok {return nil, errInvalidHeaderFormat}m := map[string][]string{}for k, vv := range vm {vs, ok := vv.([]interface{})if !ok {return nil, errInvalidHeaderFormat}for _, vs := range vs {s, ok := vs.(string)if !ok {return nil, errInvalidHeaderFormat}m[k] = append(m[k], s)}}return m, nil}for k, v := range rawMap {switch strings.ToLower(k) {case "header":h, err := parseHeaderMap(v)if err != nil {return fmt.Errorf("invalid %q: %s", k, err)}rawMap[k] = hcase "ttl", "interval", "timeout", "deregistercriticalserviceafter":d, err := parseDuration(v)if err != nil {return fmt.Errorf("invalid %q: %v", k, err)}rawMap[k] = d}}return nil
}

其中,首先对于这里的字段映射将特定的字段改成预定义的格式,而后的根据这里所设定的时间参数进行解析。

在完成对于数据的解析之后,开始对服务的地址进行验证确保是有效的ip地址。

而后将结构体的服务数据ServiceDefinition进一步转化为NodeService,数据并没有变化,而是将服务的主要定位数据保留,不保留check一系列数据。

func (s *ServiceDefinition) NodeService() *NodeService {ns := &NodeService{ID:                s.ID,Service:           s.Name,Tags:              s.Tags,Address:           s.Address,Meta:              s.Meta,Port:              s.Port,EnableTagOverride: s.EnableTagOverride,}if ns.ID == "" && ns.Service != "" {ns.ID = ns.Service}return ns
}

而后对check进行验证,确保实现了至少一种check的方式,并同时保证ttl与interval不会共存。

func (c *CheckType) Validate() error {intervalCheck := c.IsScript() || c.HTTP != "" || c.TCP != "" || c.GRPC != ""if c.Interval > 0 && c.TTL > 0 {return fmt.Errorf("Interval and TTL cannot both be specified")}if intervalCheck && c.Interval <= 0 {return fmt.Errorf("Interval must be > 0 for Script, HTTP, or TCP checks")}if !intervalCheck && c.TTL <= 0 {return fmt.Errorf("TTL must be > 0 for TTL checks")}return nil
}

接下来,对该agent进行验证,通过vetServiceRegister()方法保证在开启了acl验证的情况下该节点是可写的。

最后调用AddService()方法开始正式注册服务。

func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string) error {if service.Service == "" {return fmt.Errorf("Service name missing")}if service.ID == "" && service.Service != "" {service.ID = service.Service}for _, check := range chkTypes {if err := check.Validate(); err != nil {return fmt.Errorf("Check is not valid: %v", err)}}// Warn if the service name is incompatible with DNSif InvalidDnsRe.MatchString(service.Service) {a.logger.Printf("[WARN] agent: Service name %q will not be discoverable "+"via DNS due to invalid characters. Valid characters include "+"all alpha-numerics and dashes.", service.Service)}// Warn if any tags are incompatible with DNSfor _, tag := range service.Tags {if InvalidDnsRe.MatchString(tag) {a.logger.Printf("[DEBUG] agent: Service tag %q will not be discoverable "+"via DNS due to invalid characters. Valid characters include "+"all alpha-numerics and dashes.", tag)}}// Pause the service syncs during modificationa.PauseSync()defer a.ResumeSync()// Take a snapshot of the current state of checks (if any), and// restore them before resuming anti-entropy.snap := a.snapshotCheckState()defer a.restoreCheckState(snap)// Add the servicea.State.AddService(service, token)// Persist the service to a fileif persist && !a.config.DevMode {if err := a.persistService(service); err != nil {return err}}// Create an associated health checkfor i, chkType := range chkTypes {checkID := string(chkType.CheckID)if checkID == "" {checkID = fmt.Sprintf("service:%s", service.ID)if len(chkTypes) > 1 {checkID += fmt.Sprintf(":%d", i+1)}}name := chkType.Nameif name == "" {name = fmt.Sprintf("Service '%s' check", service.Service)}check := &structs.HealthCheck{Node:        a.config.NodeName,CheckID:     types.CheckID(checkID),Name:        name,Status:      api.HealthCritical,Notes:       chkType.Notes,ServiceID:   service.ID,ServiceName: service.Service,ServiceTags: service.Tags,}if chkType.Status != "" {check.Status = chkType.Status}if err := a.AddCheck(check, chkType, persist, token); err != nil {return err}}return nil
}

如果所需要注册的服务并没有设置id,则在这里直接将id赋值为serviceName,并在对check进行一次验证,验证方式与之前的一样。而后确保服务的名称与tags在dns可用。

如果可以,则开始停止节点之间的同步,开始调用State的addService()方法添加服务。

func (l *State) AddService(service *structs.NodeService, token string) error {if service == nil {return fmt.Errorf("no service")}// use the service name as id if the id was omittedif service.ID == "" {service.ID = service.Service}l.SetServiceState(&ServiceState{Service: service,Token:   token,})return nil
}
func (l *State) SetServiceState(s *ServiceState) {l.Lock()defer l.Unlock()l.services[s.Service.ID] = sl.TriggerSyncChanges()
}

所谓添加服务,实则是将服务id与服务作为键值对存储在state中的map中。

之后,通过persistService()方法开始对服务进行持久化。

func (a *Agent) persistService(service *structs.NodeService) error {svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID))wrapped := persistedService{Token:   a.State.ServiceToken(service.ID),Service: service,}encoded, err := json.Marshal(wrapped)if err != nil {return err}return writeFileAtomic(svcPath, encoded)
}

创建关于这个服务的json数据,而后创建该服务的tmp文件进行持久化。

此处,关于服务的注册已经完毕,但是,服务的check还没有开始进行,以http为例子。

在完成了持久化之后,根据得到的check数据构造 HealthCheck结构体,来准备开始check的工作。

check := &structs.HealthCheck{Node:        a.config.NodeName,CheckID:     types.CheckID(checkID),Name:        name,Status:      api.HealthCritical,Notes:       chkType.Notes,ServiceID:   service.ID,ServiceName: service.Service,ServiceTags: service.Tags,
}

而后调用addCheck()方法开始check所注册的服务,以http方式为例子。

case chkType.IsHTTP():if existing, ok := a.checkHTTPs[check.CheckID]; ok {existing.Stop()delete(a.checkHTTPs, check.CheckID)}if chkType.Interval < checks.MinInterval {a.logger.Println(fmt.Sprintf("[WARN] agent: check '%s' has interval below minimum of %v",check.CheckID, checks.MinInterval))chkType.Interval = checks.MinInterval}tlsClientConfig, err := a.setupTLSClientConfig(chkType.TLSSkipVerify)if err != nil {return fmt.Errorf("Failed to set up TLS: %v", err)}http := &checks.CheckHTTP{Notify:          a.State,CheckID:         check.CheckID,HTTP:            chkType.HTTP,Header:          chkType.Header,Method:          chkType.Method,Interval:        chkType.Interval,Timeout:         chkType.Timeout,Logger:          a.logger,TLSClientConfig: tlsClientConfig,}http.Start()a.checkHTTPs[check.CheckID] = http

如果设置了http方式的验证,首先在现有的check中根据checkID查找,如果已经存在,那么停止现有的并删除,重新准备加入。而后确保check的间隔不小于配置的最小间隔,否则设置为配置的最小间隔。

而后生成checkHttp,并调用其Start()方法正式准备开始验证,并将该checkHttp以id为键存储在map中。

func (c *CheckHTTP) Start() {c.stopLock.Lock()defer c.stopLock.Unlock()if c.httpClient == nil {// Create the transport. We disable HTTP Keep-Alive's to prevent// failing checks due to the keepalive interval.trans := cleanhttp.DefaultTransport()trans.DisableKeepAlives = true// Take on the supplied TLS client config.trans.TLSClientConfig = c.TLSClientConfig// Create the HTTP client.c.httpClient = &http.Client{Timeout:   10 * time.Second,Transport: trans,}// For long (>10s) interval checks the http timeout is 10s, otherwise the// timeout is the interval. This means that a check *should* return// before the next check begins.if c.Timeout > 0 && c.Timeout < c.Interval {c.httpClient.Timeout = c.Timeout} else if c.Interval < 10*time.Second {c.httpClient.Timeout = c.Interval}}c.stop = falsec.stopCh = make(chan struct{})go c.run()
}

如果之前的httpClient连接参数还没有设置,则在这里设置,如果check所设置的timeout小于设置的interval,在将http连接的timeout设置为check的timeout。又如果check的interval大于十秒,则将http连接的timeout设为check的interval。

而后,建立stopCh作为停止通道,开启协程调用checkHttp的run()方法。

// run is invoked by a goroutine to run until Stop() is called
func (c *CheckHTTP) run() {// Get the randomized initial pause timeinitialPauseTime := lib.RandomStagger(c.Interval)next := time.After(initialPauseTime)for {select {case <-next:c.check()next = time.After(c.Interval)case <-c.stopCh:return}}
}

这里在一个随机时间之后开启根据所设置的interval为间隔的周期http调用,来完成定时的周期http检查。

直到停止通道接收到信号量才停止。

以上,就是服务的注册。

consulAPI服务的注册源码相关推荐

  1. Eureka服务注册源码分析

    本文来说下Eureka服务注册源码 文章目录 Eureka-Client注册服务 啥时候会注册 定时器注册 自动注册 DiscoveryClient.register() Eureka-Server接 ...

  2. (Nacos源码解析五)Nacos服务事件变动源码解析

    Nacos源码解析系列目录 Nacos 源码编译运行 (Nacos源码解析一)Nacos 注册实例源码解析 (Nacos源码解析二)Nacos 服务发现源码解析 (Nacos源码解析三)Nacos 心 ...

  3. Android 录屏服务使用(源码)

    Android 录屏服务使用(源码) 从Android 5.0开始,可以对手机进行录屏,使用场景:如错误场景的视频上传,简单屏幕获取等,下面贴出使用用例和对使用的类一个简单的介绍 - MediaPro ...

  4. SpringCloud Nacos 心跳机制和服务健康检查源码解析

    1 客户端心跳机制 1.1 客户端注册源码流程 https://blog.csdn.net/qq_34125999/article/details/117566523 1.2 NacosNamingS ...

  5. java springcloud微服务航班管理系统源码+课程报告

    下载地址:https://download.csdn.net/download/qq_31293575/10728702 项目介绍 java springcloud微服务航班管理系统源码+课程报告 主 ...

  6. 视频教程-RPC服务框架(Dubbo)源码分析-Java

    RPC服务框架(Dubbo)源码分析 鲁班学院-子路老师曾就职于谷歌.天猫电商等多家互联网公司,历任java架构师.研发经理等职位,参与并主导千万级并发电商网站与后端供应链研发体系搭建,多次参与电商大 ...

  7. android与html注册登录,Android登录注册源码

    Android登录注册源码 资源下载此资源下载价格为2D币,请先登录 资源文件列表 andoird96pk/.classpath , 348 andoird96pk/.project , 847 an ...

  8. java开源即时通讯软件服务端openfire源码构建

    java开源即时通讯软件服务端openfire源码构建 本文使用最新的openfire主干代码为例,讲解了如何搭建一个openfire开源开发环境,正在实现自己写java聊天软件: 编译环境搭建 调试 ...

  9. java计算机毕业设计郑工社团交流服务信息平台源码+mysql数据库+系统+lw文档+部署

    java计算机毕业设计郑工社团交流服务信息平台源码+mysql数据库+系统+lw文档+部署 java计算机毕业设计郑工社团交流服务信息平台源码+mysql数据库+系统+lw文档+部署 本源码技术栈: ...

最新文章

  1. Python中的变量作用域
  2. POJ 3761:Bubble Sort——组合数学
  3. 前端基础知识 - 收藏集 - 掘金
  4. DataScience:风控场景之金融评分卡模型的构建(逻辑回归)开发(转评分卡)、使用过程(线上实现)之详细攻略
  5. linux 常用命令技巧
  6. 两个相邻盒子的边框怎么只显示一个_一篇文章带你快速理解盒子模型「经典案例」...
  7. 电脑两边黑边怎么还原_Mac电脑录制的视频有黑边?如何解决
  8. java json和对象互相装换
  9. 从UDP/TCP到HTTP/HTTP2,弄清楚网络层面上应该了解的知识。
  10. kmalloc、vmalloc、__get_free_pages()的区别
  11. linux shell中实现循环日期的实例代码
  12. tensorflow在文本处理中的使用——Word2Vec预测
  13. fiddler显示客户端请求时间
  14. GD32F103替换STM32F103
  15. 高通8953烧录之后报ERROR: UFDT apply overlay failed
  16. 密码学实验_7_S盒创建(python 实现)
  17. android 解压rar5,rar解压软件安卓中文
  18. PCM开发板模块实验指导--有刷直流马达正反转实验
  19. 开源3D激光SLAM项目BLAM
  20. 中职网络安全竞赛B模块新题

热门文章

  1. vue中选项和url根据彼此的改变实现高亮显示
  2. SpringBoot集成MyBatis-Plus自定义SQL
  3. 表单提交中文乱码_Java学习路线分享如何处理中文参数
  4. eclipse java不能编译_eclipse里.java可以编译但不能运行??
  5. FastDFS分布式文件系统
  6. 前端每日实战 2018 年 9 月份项目汇总(共 26 个项目)
  7. 90国央行齐聚华盛顿研讨区块链:“这一切意味着什么”
  8. get_class 方法
  9. RestTemplate.getForObject返回List的时候处理方式
  10. linux的文件系统架构