prometheus不仅支持本地存储还支持远端存储,先从远端存储说起,他是通过一个发送队列queue完成数据发送的。先看一下队列的定义:

func NewQueueManager(cfg QueueManagerConfig) *QueueManager {if cfg.QueueCapacity == 0 {cfg.QueueCapacity = defaultQueueCapacity}if cfg.MaxShards == 0 {cfg.MaxShards = defaultMaxShards}if cfg.MaxSamplesPerSend == 0 {cfg.MaxSamplesPerSend = defaultMaxSamplesPerSend}if cfg.BatchSendDeadline == 0 {cfg.BatchSendDeadline = defaultBatchSendDeadline}t := &QueueManager{cfg:         cfg,queueName:   cfg.Client.Name(),logLimiter:  rate.NewLimiter(logRateLimit, logBurst),numShards:   1,reshardChan: make(chan int),quit:        make(chan struct{}),samplesIn:          newEWMARate(ewmaWeight, shardUpdateDuration),samplesOut:         newEWMARate(ewmaWeight, shardUpdateDuration),samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration),}t.shards = t.newShards(t.numShards)numShards.WithLabelValues(t.queueName).Set(float64(t.numShards))queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity))return t
}

这个队列的最大分片是1000,每个分片没秒1000个sample,那么一秒就可以发送1000*1000个sample。每一种存储,无论是本地存储还有远端存储,写数据都实现Append方法,remote的也一样,在romte的Append就调用了queue的Append方法。

func (t *QueueManager) Append(s *model.Sample) error {var snew model.Samplesnew = *ssnew.Metric = s.Metric.Clone()for ln, lv := range t.cfg.ExternalLabels {if _, ok := s.Metric[ln]; !ok {snew.Metric[ln] = lv}}snew.Metric = model.Metric(relabel.Process(model.LabelSet(snew.Metric), t.cfg.RelabelConfigs...))if snew.Metric == nil {return nil}t.shardsMtx.Lock()enqueued := t.shards.enqueue(&snew)t.shardsMtx.Unlock()if enqueued {queueLength.WithLabelValues(t.queueName).Inc()} else {droppedSamplesTotal.WithLabelValues(t.queueName).Inc()if t.logLimiter.Allow() {log.Warn("Remote storage queue full, discarding sample. Multiple subsequent messages of this kind may be suppressed.")}}return nil
}

通过enqueued := t.shards.enqueue(&snew)发到队列里面,

func (s *shards) enqueue(sample *model.Sample) bool {s.qm.samplesIn.incr(1)fp := sample.Metric.FastFingerprint()shard := uint64(fp) % uint64(len(s.queues))select {case s.queues[shard] <- sample:return truedefault:return false}
}

这个里面是简单的求余数分组的方法,如果这里使用一致hash会不会更好点呢!把数据发动到分片的队列中。QueueManager启动的时候就启动了队列发送任务

func (s *shards) start() {for i := 0; i < len(s.queues); i++ {go s.runShard(i)}
}

继续看runShard

func (s *shards) runShard(i int) {defer s.wg.Done()queue := s.queues[i]// Send batches of at most MaxSamplesPerSend samples to the remote storage.// If we have fewer samples than that, flush them out after a deadline// anyways.pendingSamples := model.Samples{}for {select {case sample, ok := <-queue:if !ok {if len(pendingSamples) > 0 {log.Debugf("Flushing %d samples to remote storage...", len(pendingSamples))s.sendSamples(pendingSamples)log.Debugf("Done flushing.")}return}queueLength.WithLabelValues(s.qm.queueName).Dec()pendingSamples = append(pendingSamples, sample)for len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend {s.sendSamples(pendingSamples[:s.qm.cfg.MaxSamplesPerSend])pendingSamples = pendingSamples[s.qm.cfg.MaxSamplesPerSend:]}case <-time.After(s.qm.cfg.BatchSendDeadline):if len(pendingSamples) > 0 {s.sendSamples(pendingSamples)pendingSamples = pendingSamples[:0]}}}
}

具体发送样本的方法还要看里面的sendSamples

func (s *shards) sendSamples(samples model.Samples) {// Samples are sent to the remote storage on a best-effort basis. If a// sample isn't sent correctly the first time, it's simply dropped on the// floor.begin := time.Now()err := s.qm.cfg.Client.Store(samples)duration := time.Since(begin)if err != nil {log.Warnf("error sending %d samples to remote storage: %s", len(samples), err)failedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples)))} else {sentSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples)))}sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(duration.Seconds())s.qm.samplesOut.incr(int64(len(samples)))s.qm.samplesOutDuration.incr(int64(duration))
}

最终通过Store方法发送数据

func (c *Client) Store(samples model.Samples) error {req := &WriteRequest{Timeseries: make([]*TimeSeries, 0, len(samples)),}for _, s := range samples {ts := &TimeSeries{Labels: make([]*LabelPair, 0, len(s.Metric)),}for k, v := range s.Metric {ts.Labels = append(ts.Labels,&LabelPair{Name:  string(k),Value: string(v),})}ts.Samples = []*Sample{{Value:       float64(s.Value),TimestampMs: int64(s.Timestamp),},}req.Timeseries = append(req.Timeseries, ts)}data, err := proto.Marshal(req)if err != nil {return err}buf := bytes.Buffer{}if _, err := snappy.NewWriter(&buf).Write(data); err != nil {return err}httpReq, err := http.NewRequest("POST", c.url.String(), &buf)if err != nil {return err}httpReq.Header.Add("Content-Encoding", "snappy")ctx, _ := context.WithTimeout(context.Background(), c.timeout)httpResp, err := ctxhttp.Do(ctx, c.client, httpReq)if err != nil {return err}defer httpResp.Body.Close()if httpResp.StatusCode/100 != 2 {return fmt.Errorf("server returned HTTP status %s", httpResp.Status)}return nil
}

Store里面就是通过POST方式发送数据。说完了远端存储,再解释一下本地存储,这个设计的挺复杂,它是先放到内存中,并会批量将内存数据导入到磁盘中保存,具体看内存存储管理

type MemorySeriesStorage struct {// archiveHighWatermark and numChunksToPersist have to be aligned for atomic operations.archiveHighWatermark model.Time    // No archived series has samples after this time.numChunksToPersist   int64         // The number of chunks waiting for persistence.maxChunksToPersist   int           // If numChunksToPersist reaches this threshold, ingestion will be throttled.rushed               bool          // Whether the storage is in rushed mode.rushedMtx            sync.Mutex    // Protects entering and exiting rushed mode.throttled            chan struct{} // This chan is sent to whenever NeedsThrottling() returns true (for logging).fpLocker   *fingerprintLockerfpToSeries *seriesMapoptions *MemorySeriesStorageOptionsloopStopping, loopStopped  chan struct{}logThrottlingStopped       chan struct{}maxMemoryChunks            intdropAfter                  time.DurationcheckpointInterval         time.DurationcheckpointDirtySeriesLimit intpersistence *persistencemapper      *fpMapperevictList                   *list.ListevictRequests               chan chunk.EvictRequestevictStopping, evictStopped chan struct{}quarantineRequests                    chan quarantineRequestquarantineStopping, quarantineStopped chan struct{}persistErrors                 prometheus.CounterqueuedChunksToPersist         prometheus.CounternumSeries                     prometheus.GaugenumHeadChunks                 prometheus.GaugedirtySeries                   prometheus.GaugeseriesOps                     *prometheus.CounterVecingestedSamplesCount          prometheus.CounterdiscardedSamplesCount         *prometheus.CounterVecnonExistentSeriesMatchesCount prometheus.CountermaintainSeriesDuration        *prometheus.SummaryVecpersistenceUrgencyScore       prometheus.GaugerushedMode                    prometheus.Gauge
}

他是一个内存存储管理器。和remote一样,他也是实现了Append方法去保存sample。

func (s *MemorySeriesStorage) Append(sample *model.Sample) error {for ln, lv := range sample.Metric {if len(lv) == 0 {delete(sample.Metric, ln)}}rawFP := sample.Metric.FastFingerprint()s.fpLocker.Lock(rawFP)fp := s.mapper.mapFP(rawFP, sample.Metric)defer func() {s.fpLocker.Unlock(fp)}() // Func wrapper because fp might change below.if fp != rawFP {// Switch locks.s.fpLocker.Unlock(rawFP)s.fpLocker.Lock(fp)}series, err := s.getOrCreateSeries(fp, sample.Metric)if err != nil {return err // getOrCreateSeries took care of quarantining already.}if sample.Timestamp == series.lastTime {// Don't report "no-op appends", i.e. where timestamp and sample// value are the same as for the last append, as they are a// common occurrence when using client-side timestamps// (e.g. Pushgateway or federation).if sample.Timestamp == series.lastTime &&series.lastSampleValueSet &&sample.Value.Equal(series.lastSampleValue) {return nil}s.discardedSamplesCount.WithLabelValues(duplicateSample).Inc()return ErrDuplicateSampleForTimestamp // Caused by the caller.}if sample.Timestamp < series.lastTime {s.discardedSamplesCount.WithLabelValues(outOfOrderTimestamp).Inc()return ErrOutOfOrderSample // Caused by the caller.}completedChunksCount, err := series.add(model.SamplePair{Value:     sample.Value,Timestamp: sample.Timestamp,})if err != nil {s.quarantineSeries(fp, sample.Metric, err)return err}s.ingestedSamplesCount.Inc()s.incNumChunksToPersist(completedChunksCount)return nil
}

这个里面先通过getOrCreateSeries获取series,series你可以理解为,相同类型的监控数据放到一起,这样便于压缩查找,通过series.add保存。但这只是保存到内存中,怎么持久化呢?
在MemorySeriesStorage启动的时候

    p, err = newPersistence(s.options.PersistenceStoragePath,s.options.Dirty, s.options.PedanticChecks,syncStrategy,s.options.MinShrinkRatio,)if err != nil {return err}s.persistence = p// Persistence must start running before loadSeriesMapAndHeads() is called.go s.persistence.run()...go s.loop()

这个persistence负责把内存中的数据写到磁盘中,loop中

    for {select {case <-s.loopStopping:break loopcase fp := <-memoryFingerprints:if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) {dirty := atomic.AddInt64(&dirtySeriesCount, 1)s.dirtySeries.Set(float64(dirty))// Check if we have enough "dirty" series so that we need an early checkpoint.// However, if we are already behind persisting chunks, creating a checkpoint// would be counterproductive, as it would slow down chunk persisting even more,// while in a situation like that, where we are clearly lacking speed of disk// maintenance, the best we can do for crash recovery is to persist chunks as// quickly as possible. So only checkpoint if the urgency score is < 1.if dirty >= int64(s.checkpointDirtySeriesLimit) &&s.calculatePersistenceUrgencyScore() < 1 {checkpointTimer.Reset(0)}}case fp := <-archivedFingerprints:s.maintainArchivedSeries(fp, model.Now().Add(-s.dropAfter))}}

maintainMemorySeries保存series,

func (s *MemorySeriesStorage) maintainMemorySeries(fp model.Fingerprint, beforeTime model.Time,
) (becameDirty bool) {defer func(begin time.Time) {s.maintainSeriesDuration.WithLabelValues(maintainInMemory).Observe(time.Since(begin).Seconds(),)}(time.Now())s.fpLocker.Lock(fp)defer s.fpLocker.Unlock(fp)series, ok := s.fpToSeries.get(fp)if !ok {// Series is actually not in memory, perhaps archived or dropped in the meantime.return false}defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc()closed, err := series.maybeCloseHeadChunk()if err != nil {s.quarantineSeries(fp, series.metric, err)s.persistErrors.Inc()}if closed {s.incNumChunksToPersist(1)s.numHeadChunks.Dec()}seriesWasDirty := series.dirtyif s.writeMemorySeries(fp, series, beforeTime) {// Series is gone now, we are done.return false}iOldestNotEvicted := -1for i, cd := range series.chunkDescs {if !cd.IsEvicted() {iOldestNotEvicted = ibreak}}// Archive if all chunks are evicted. Also make sure the last sample has// an age of at least headChunkTimeout (which is very likely anyway).if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout {s.fpToSeries.del(fp)s.numSeries.Dec()s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime)s.seriesOps.WithLabelValues(archive).Inc()oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark))if oldWatermark < int64(series.lastTime) {if !atomic.CompareAndSwapInt64((*int64)(&s.archiveHighWatermark),oldWatermark, int64(series.lastTime),) {panic("s.archiveHighWatermark modified outside of maintainMemorySeries")}}return}// If we are here, the series is not archived, so check for Chunk.Desc// eviction next.series.evictChunkDescs(iOldestNotEvicted)return series.dirty && !seriesWasDirty
}

writeMemorySeries把数据写到磁盘,里面再调用persistChunks

func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk.Chunk) (index int, err error) {f, err := p.openChunkFileForWriting(fp)if err != nil {return -1, err}defer p.closeChunkFile(f)if err := p.writeChunks(f, chunks); err != nil {return -1, err}// Determine index within the file.offset, err := f.Seek(0, os.SEEK_CUR)if err != nil {return -1, err}index, err = chunkIndexForOffset(offset)if err != nil {return -1, err}return index - len(chunks), err
}

那这些series怎么查询呢?它有个index列表,通过著名的leveldb保存index,这样就可以通过index去查询了。他是一个keyvalue数据库,接口定义storage/local/index/interface.go

type KeyValueStore interface {Put(key, value encoding.BinaryMarshaler) error// Get unmarshals the result into value. It returns false if no entry// could be found for key. If value is nil, Get behaves like Has.Get(key encoding.BinaryMarshaler, value encoding.BinaryUnmarshaler) (bool, error)Has(key encoding.BinaryMarshaler) (bool, error)// Delete returns (false, nil) if key does not exist.Delete(key encoding.BinaryMarshaler) (bool, error)NewBatch() BatchCommit(b Batch) error// ForEach iterates through the complete KeyValueStore and calls the// supplied function for each mapping.ForEach(func(kv KeyValueAccessor) error) errorClose() error
}

它的实现在storage/local/index/leveldb.go里面,代码比较多,我就不粘出来了。

Prometheus 实战于源码分析之storage相关推荐

  1. Prometheus 实战于源码分析之discovery

    prometheus与时俱进在现在各种容器管理平台流行的当下,能够对各种容器管理平台进行数据采集和处理,并且能够自动发现监控对象,这个就是今天要说的discovery.他是一个资源发现的组件,能够根据 ...

  2. 从flink-example分析flink组件(3)WordCount 流式实战及源码分析

    前面介绍了批量处理的WorkCount是如何执行的 <从flink-example分析flink组件(1)WordCount batch实战及源码分析> <从flink-exampl ...

  3. 小朱笔记之hadoop应用实战、源码分析-目录

    小朱笔记之hadoop应用实战.源码分析 1.1 背景目的 该笔记从宏观架构.安装配置.源码分析.使用案例四个方面剖析了Hadoop1.0.3,希望能对同学们提供帮助,赠人玫瑰,手留余香.能够把had ...

  4. SpringCloudAlibaba注册中心与配置中心之利器Nacos实战与源码分析(下)

    源码资料 文档资料 <<Nacos架构与原理>>书籍于2021.12.21发布,并在Nacos官方网站非常Nice的提供其电子书的下载.我们学习Nacos源码更多是要吸取其优秀 ...

  5. SkyWalking 源码分析 —— Collector Storage 存储组件

    1. 概述 本文主要分享 SkyWalking Collector Storage 存储组件.顾名思义,负责将调用链路.应用.应用实例等等信息存储到存储器,例如,ES .H2 . 友情提示:建议先阅读 ...

  6. Spring Boot - security 实战与源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 一.实现步骤 1.在application.yml中添加起步依赖 2.自定义安全类 package com.example.d ...

  7. Go语言之 Context 实战与源码分析

    来自:指月 https://www.lixueduan.com 原文:https://www.lixueduan.com/post/go/context/ 本文主要简单介绍了Go语言(golang)中 ...

  8. SpringCloudAlibaba注册中心与配置中心之利器Nacos实战与源码分析(上)

    Python微信订餐小程序课程视频 https://blog.csdn.net/m0_56069948/article/details/122285951 Python实战量化交易理财系统 https ...

  9. SpringCloud Gateway微服务网关实战与源码分析-上

    概述 定义 Spring Cloud Gateway 官网地址 https://spring.io/projects/spring-cloud-gateway/ 最新版本3.1.3 Spring Cl ...

最新文章

  1. centos7加固手册
  2. Pytorch1.7.1与SimpleITK2.0.0在centos7上终端运行冲突的情况
  3. 国内三巨头为什么那么成功
  4. 管道 通过匿名管道在进程间双向通信
  5. python机器学习_(1)鸢尾花的分类
  6. 【Iftop】实时监控流量工具
  7. 为什么学计算机容易秃顶,为什么程序员更容易脱发?知道答案惊呆了!
  8. Flutter持久化存储之使用和封装shared_preferences
  9. 最新二开版本的源码博客论坛源码,UI很漂亮。
  10. 多卡聚合路由器和普通路由器的区别
  11. VMware设置静态IP
  12. ncnn报无法将参数 1 从“std::string”转换为“const ncnn::DataReader
  13. ms17-010(永恒之蓝)漏洞利用
  14. html转换为pdf的笔顺,2018年新部编二年级下册语文写字表字帖(附笔顺).pdf
  15. ProcessOn在线画流程图介绍
  16. 如何配置自己的服务器接入微信服务器
  17. android 内存不足警告,安卓手机老是提示内存不足怎么办
  18. summernote网页编辑器嵌入视频
  19. 自定义Android消息推送提示音
  20. 抓取个人支付宝微信爬虫账单

热门文章

  1. make install clean的意思
  2. 【MySQL】MySQL分库分表详解
  3. 360前端星计划学习-html
  4. 红帽 Red Hat Linux相关产品iso镜像下载【百度云】【更新6.9】
  5. httpclient3.1的relaseConnection的misunderstand
  6. 4、IOC 之Bean的依赖关系
  7. 保姆级教程:深度学习环境配置指南!(Windows、Mac、Ubuntu全讲解)
  8. CF - 791A. Bear and Big Brother - 模拟
  9. Linux常用命令及解析
  10. 2021年春季ACM训练赛第4场