1 概述:

1.1 源码环境

版本信息如下:
a、thanos组件版本:v0.16.0

1.2 Thanos Sidecar的作用

Thanos Query组件和prometheus实例绑定在一起,三大作用:
1)作为访问代理,对客户端暴露grpc接口,业务逻辑是访问其绑定的prometheus实例的http接口,从而获取metrics和rule数据,最终返回给客户端。
2)如果开启对象存储功能,会将promethues tsdb目录下的所有block目录上传至指定的对象存储系统中。
3)监听promethues配置文件的变化,发现文件变化后也是访问prometheus实例的http接口让prometheus重载配置。

2 源码简析:

使用github.com/oklog/run包来启动一组协程,这些协程的逻辑主要是启动了http server、grpc server、动态发现位于下游的实现STORE API的组件等。

2.1 main方法

Thanos的启动命令格式如下,格式都是thanos开头(因为是同一个可执行二进制文件)。启动哪个组件,在于第一个参数,在本例子中是sidecar,因此这条命令是启动sidecar组件的逻辑。

thanos sidecar \
--prometheus.url=http://localhost:9090/ \
--tsdb.path=/prometheus \
--grpc-address=[$(POD_IP)]:10901 \
--http-address=[$(POD_IP)]:10902 \

来具体看看main方法。创建app对象,app对象包含了所有Thanos组件的启动函数,但真正启动时只从map中取出一个函数进行启动,取出哪个函数取决于启动命令。

func main() {/*其他代码*/app := extkingpin.NewApp(kingpin.New(filepath.Base(os.Args[0]), "A block storage based long-term storage for Prometheus").Version(version.Print("thanos")))/*其他代码*/// 把所有组件的启动逻辑都放进app对象中的setups列表中registerSidecar(app)registerStore(app)registerQuery(app)registerRule(app)registerCompact(app)registerTools(app)registerReceive(app)registerQueryFrontend(app)// 根据命令行的信息,从app对象的setups列表中取出一个组件逻辑cmd, setup := app.Parse()logger := logging.NewLogger(*logLevel, *logFormat, *debugName)/*其他代码*/var g run.Groupvar tracer opentracing.Tracer/*tracing相关的代码*/reloadCh := make(chan struct{}, 1)// 启动特定的一个组件(sidecar、query、store等组件中的一种),底层还是执行g.Add(...)if err := setup(&g, logger, metrics, tracer, reloadCh, *logLevel == "debug"); err != nil {        os.Exit(1)}// 监听来自系统的杀死信号.{cancel := make(chan struct{})g.Add(func() error {return interrupt(logger, cancel)}, func(error) {close(cancel)})}// 监听来配置重载的信号{cancel := make(chan struct{})g.Add(func() error {return reload(logger, cancel, reloadCh)}, func(error) {close(cancel)})}// 阻塞地等待所有协程中的退出// 有一个协程返回,其他协程也会返回if err := g.Run(); err != nil {level.Error(logger).Log("err", fmt.Sprintf("%+v", errors.Wrapf(err, "%s command failed", cmd)))os.Exit(1)}// 到达此处,说明整个程序结束了。level.Info(logger).Log("msg", "exiting")
}

2.2 registerQuery方法


func registerSidecar(app *extkingpin.App) {cmd := app.Command(component.Sidecar.String(), "Sidecar for Prometheus server")conf := &sidecarConfig{}// 解析命令行参数conf.registerFlag(cmd)// Setup()的入参方法,会被放入app对象的setups列表中// 最核心的是runSidecar()方法cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {rl := reloader.New(log.With(logger, "component", "reloader"),extprom.WrapRegistererWithPrefix("thanos_sidecar_", reg),&reloader.Options{ReloadURL:     reloader.ReloadURLFromBase(conf.prometheus.url),CfgFile:       conf.reloader.confFile,CfgOutputFile: conf.reloader.envVarConfFile,WatchedDirs:   conf.reloader.ruleDirectories,WatchInterval: conf.reloader.watchInterval,RetryInterval: conf.reloader.retryInterval,})return runSidecar(g, logger, reg, tracer, rl, component.Sidecar, *conf)})
}

2.3 runSidecar方法

使用run.Group对象来启动http server、grpc server、传输block目录至对象存储的协程、监听prometheus配置文件的协程、定期检测prometheus实例存活的协程。

细节说明:
1)查看prometheus实例的心跳机制是通过/api/v1/status/config接口。
2)监听prometheus配置文件变化的工具包是github.com/fsnotify/fsnotify。
3)开启上传block功能,则每30s遍历prometheus tsdb目录下的所有的block目录(已上传的block或空block会被忽略,默认情况下被压缩过的block也会被忽略),并上传相应的文件至对象存储。
4)获取不到prometheus实例的external label或者prometheus没有配置external label,会导致sidecar启动失败。

func runSidecar(g *run.Group,logger log.Logger,reg *prometheus.Registry,tracer opentracing.Tracer,reloader *reloader.Reloader,comp component.Component,conf sidecarConfig,
) error {// 用一个结构体来保存prometheus实例的url、prometheus实例的external label、prometheus client等信息。var m = &promMetadata{promURL: conf.prometheus.url,mint: conf.limitMinTime.PrometheusTimestamp(),maxt: math.MaxInt64,limitMinTime: conf.limitMinTime,client:       promclient.NewWithTracingClient(logger, "thanos-sidecar"),}// 获取对象存储的配置信息,如果有,说明是开启上传block至对象存储的功能。confContentYaml, err := conf.objStore.Content()if err != nil {return errors.Wrap(err, "getting object store config")}var uploads = trueif len(confContentYaml) == 0 {level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled")uploads = false}grpcProbe := prober.NewGRPC()httpProbe := prober.NewHTTP()statusProber := prober.Combine(httpProbe,grpcProbe,prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)),)// 创建http server,并启动server(只有/metrics、/-/healthy、/-/ready等接口)srv := httpserver.New(logger, reg, comp, httpProbe,httpserver.WithListen(conf.http.bindAddress),httpserver.WithGracePeriod(time.Duration(conf.http.gracePeriod)),)g.Add(func() error {statusProber.Healthy()return srv.ListenAndServe()}, func(err error) {statusProber.NotReady(err)defer statusProber.NotHealthy(err)srv.Shutdown(err)})// 获取promehtues实例的external label,并做心跳{// promUp记录promehtues是否正常,0表示不正常,1表示正常promUp := promauto.With(reg).NewGauge(prometheus.GaugeOpts{Name: "thanos_sidecar_prometheus_up",Help: "Boolean indicator whether the sidecar can reach its Prometheus peer.",})// lastHeartbeat记录最后一次心跳时间lastHeartbeat := promauto.With(reg).NewGauge(prometheus.GaugeOpts{Name: "thanos_sidecar_last_heartbeat_success_time_seconds",Help: "Timestamp of the last successful heartbeat in seconds.",})ctx, cancel := context.WithCancel(context.Background())// 获取prometheus实例的external label(/api/v1/status/config接口),并通过定期(30s)做这件事情来做心跳g.Add(func() error {/*检查性代码*/// 获取prometheus实例的external labelerr := runutil.Retry(2*time.Second, ctx.Done(), func() error {// m.UpdateLabels(ctx)去访问prometheus实例的/api/v1/status/config接口,并将返回的数据设置到自己的属性labelsif err := m.UpdateLabels(ctx); err != nil {                     promUp.Set(0)statusProber.NotReady(err)return err}          promUp.Set(1)statusProber.Ready()// 记录心跳时间lastHeartbeat.SetToCurrentTime()return nil})// 拿不到prometheus实例的external label或者prometheus没有配置external label则退出if err != nil {return errors.Wrap(err, "initial external labels query")}         if len(m.Labels()) == 0 {return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.")}// 每个30s从prometheus实例获取exterlan label,通过此方式来记录心跳时间return runutil.Repeat(30*time.Second, ctx.Done(), func() error {               /*其他代码*/if err := m.UpdateLabels(iterCtx); err != nil {level.Warn(logger).Log("msg", "heartbeat failed", "err", err)promUp.Set(0)} else {promUp.Set(1)// 记录心跳时间lastHeartbeat.SetToCurrentTime()}return nil})}, func(error) {cancel()})}// 使用github.com/fsnotify/fsnotify包监听prometheus实例的配置文件的变化// 如果文件发生变化则发送一个POST请求给prometheus实例,让它重新加载配置文件{ctx, cancel := context.WithCancel(context.Background())g.Add(func() error {return reloader.Watch(ctx)}, func(error) {cancel()})}{t := exthttp.NewTransport()t.MaxIdleConnsPerHost = conf.connection.maxIdleConnsPerHostt.MaxIdleConns = conf.connection.maxIdleConnsc := promclient.NewClient(&http.Client{Transport: tracing.HTTPTripperware(logger, t)}, logger, thanoshttp.ThanosUserAgent)promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps)if err != nil {return errors.Wrap(err, "create Prometheus store")}tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"),conf.grpc.tlsSrvCert, conf.grpc.tlsSrvKey, conf.grpc.tlsSrvClientCA)if err != nil {return errors.Wrap(err, "setup gRPC server")}// 创建并grpc servers := grpcserver.New(logger, reg, tracer, comp, grpcProbe,// 注册grpc handler(通过http client从prometheus实例中获取指标数据)grpcserver.WithServer(store.RegisterStoreServer(promStore)), // 注册grpc handler(通过http client从prometheus实例中获取rule数据)grpcserver.WithServer(rules.RegisterRulesServer(rules.NewPrometheus(conf.prometheus.url, c, m.Labels))), grpcserver.WithListen(conf.grpc.bindAddress),grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),grpcserver.WithTLSConfig(tlsCfg),)g.Add(func() error {statusProber.Ready()return s.ListenAndServe()}, func(err error) {statusProber.NotReady(err)s.Shutdown(err)})}// 若开启了上传block功能,则定期遍历prometehus tsdb目录下的所有block目录并上传文件至对象存储。if uploads {// 获取一个对象存储bucketbkt, err := client.NewBucket(logger, confContentYaml, reg, component.Sidecar.String())if err != nil {return err}/*其他代码*/ctx, cancel := context.WithCancel(context.Background())g.Add(func() error {/*其他代码*//*拿不到prometheus实例的external label或者prometheus没有配置external label则退出*/s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload)// 每30执行一次s.Sync(ctx)// s.Sync(ctx)会遍历prometheus tsdb目录下的所有block目录(已上传的block或空block会被忽略,默认情况下被压缩过的block也会被忽略),并上传相应的文件return runutil.Repeat(30*time.Second, ctx.Done(), func() error {if uploaded, err := s.Sync(ctx); err != nil {// 至少有一个block上传失败,则打印日志}/*其他代码*/return nil})}, func(error) {cancel()})}level.Info(logger).Log("msg", "starting sidecar")return nil
}

3 总结:

Thanos Sidecar组件的代码逻辑简单、易懂,通过http协议访问与其绑定的prometheus实例,从prometheus实例中获取到的数据则通过grpc接口对外进行暴露,遍历所有block目录进行文件上传,还有监听promethues配置文件变化的小功能。

[ thanos源码分析系列 ]thanos sidecar组件源码简析相关推荐

  1. 查询已有链表的hashmap_源码分析系列1:HashMap源码分析(基于JDK1.8)

    1.HashMap的底层实现图示 如上图所示: HashMap底层是由  数组+(链表)=(红黑树) 组成,每个存储在HashMap中的键值对都存放在一个Node节点之中,其中包含了Key-Value ...

  2. 源码分析系列1:HashMap源码分析(基于JDK1.8)

    1.HashMap的底层实现图示 如上图所示: HashMap底层是由  数组+(链表)+(红黑树) 组成,每个存储在HashMap中的键值对都存放在一个Node节点之中,其中包含了Key-Value ...

  3. 【Android源码分析】Android系统关键服务启动简析

    一.关于Android系统重要的进程 (1).init进程:init进程是Linux内核启动完成之后,启动的第一个用户进程,Android系统就是在这个进程的基础上启动起来的,进程pid为1.init ...

  4. java底层app_Java底层类和源码分析系列-ArrayBlockingQueue底层架构和源码分析

    ArrayBlockingQueue是一个基于数组实现的有界的阻塞队列. 几个要点 ArrayBlockingQueue是一个用数组实现的队列,所以在效率上比链表结构的LinkedBlockingQu ...

  5. jQuery源码分析系列

    声明:本文为原创文章,如需转载,请注明来源并保留原文链接Aaron,谢谢! 版本截止到2013.8.24 jQuery官方发布最新的的2.0.3为准 附上每一章的源码注释分析 :https://git ...

  6. [转]jQuery源码分析系列

    文章转自:jQuery源码分析系列-Aaron 版本截止到2013.8.24 jQuery官方发布最新的的2.0.3为准 附上每一章的源码注释分析 :https://github.com/JsAaro ...

  7. vue源码分析系列二:$mount()和new Watcher()的执行过程

    续vue源码分析系列一:new Vue的初始化过程 在initMixin()里面调用了$mount() if (vm.$options.el) {vm.$mount(vm.$options.el);/ ...

  8. Tomcat8源码分析系列-spring boot集成tomcat

    前言 本文基于 spring boot 1.5.9 spring boot 支持目前主流的 servlet 容器,包括 tomcat.jetty.undertow,可以在我们的项目中方便地集成这些 s ...

  9. MyBatis 源码分析系列文章合集

    1.简介 我从七月份开始阅读MyBatis源码,并在随后的40天内陆续更新了7篇文章.起初,我只是打算通过博客的形式进行分享.但在写作的过程中,发现要分析的代码太多,以至于文章篇幅特别大.在这7篇文章 ...

  10. MyBatis 源码分析系列文章导读

    1.本文速览 本篇文章是我为接下来的 MyBatis 源码分析系列文章写的一个导读文章.本篇文章从 MyBatis 是什么(what),为什么要使用(why),以及如何使用(how)等三个角度进行了说 ...

最新文章

  1. 助力区域性银行突破困局,网易云信入选爱分析报告典型案例
  2. spring boot controller构造方法_面试前突击Spring,我只需要十分钟,那么你呢?
  3. Edit Control中追加文字
  4. bzoj 3036: 绿豆蛙的归宿(Dp)
  5. elementui中下拉菜单需要传入多个参数的处理
  6. Ceph添加、删除osd及故障硬盘更换
  7. 如何开发Android安卓APP读写NFC Ntag
  8. 路由器去广告 去除 免刷路由系统 手机网页去广告 安卓去广告 苹果iOS去广告
  9. 原生JS路由实现页面跳转
  10. 数字孪生新型智慧城市一网统管云平台建设方案(44页PPT)
  11. Java永久保存数据_java怎么保存数据
  12. Table [xx] contains physical column name referred to by multiple physical column names 错误处理
  13. 数据分析报告入门(3)
  14. Android 4.4(KitKat)窗口管理子系统 - 体系框架
  15. ElasticSearch.bat 文件闪退 解决
  16. ZLIB 压缩的数据格式规范
  17. 写一副对子_一副对子的传奇故事
  18. 计算机毕业设计Java南京新东方学校家校通系统(源码+系统+mysql数据库+lw文档)
  19. EMC常见术语-dB、dBm、dBw以及如何计算
  20. 建oracle簇表,详解ORACLE簇表、堆表、IOT表、分区表

热门文章

  1. JavaAwt子部件定位设置大小,setBounds(x, y, w, h);setLocation(x, y); setSize(w, h); 一开始不起作用,加个延时起作用了
  2. IDEA2017配置springmvc遇到的错误
  3. Excel自动化教程之通过python将Excel与Word集成无缝生成自动报告
  4. 【EXCEL绘制地图】获取地图图标信息时遇到问题。请确保处于联机状态,然后重试
  5. python背单词游戏,python背单词小程序
  6. 征途服务器修改,征途【改版教程】-装备程序的修改-转载于-喜欢玩网游单机站...
  7. 正则表达式在一个字符串上多次搜索、正则表达式匹配书名等
  8. hdu 1290 (切西瓜问题)
  9. U盘安装Windows 11正式版绕过TPM检查
  10. 哥德巴赫猜想python