Prometheus通过scrapeManager抓取的指标(metrics)可通过本地TSDB时序数据库存储,简单高效,但无法持久化数据.所以,可根据需求,选择本地存储或远端存储.本文不涉及存储的具体实现,而是分析指标(metrics)在存储前合法性校验,即指标缓存层(scrapeCache).

  由上文Prometheus源码系列:指标采集(scrapeManager)可知,scrapeLoop结构体包含scrapeCache,调用scrapeLoop.append方法处理指标(metrics)存储.在方法append中,把每个指标(metirics)放到结构体scrapeCache对应的方法中进行合法性验证,过滤和存储.

  • scrapeCache结构及实例化如下:

prometheus/scrape/scrape.go// scrapeCache tracks mappings of exposed metric strings to label sets and
// storage references. Additionally, it tracks staleness of series between
// scrapes.
type scrapeCache struct {iter uint64 // Current scrape iteration. //被缓存的迭代次数// Parsed string to an entry with information about the actual label set// and its storage reference.series map[string]*cacheEntry  //map类型,key是metric,value是cacheEntry结构体// Cache of dropped metric strings and their iteration. The iteration must// be a pointer so we can update it without setting a new entry with an unsafe// string in addDropped().droppedSeries map[string]*uint64 //缓存不合法指标(metrics)// seriesCur and seriesPrev store the labels of series that were seen// in the current and previous scrape.// We hold two maps and swap them out to save allocations.seriesCur  map[uint64]labels.Labels //缓存本次scrape的指标(metrics)seriesPrev map[uint64]labels.Labels  //缓存上次scrape的指标(metrics)metaMtx  sync.Mutex //同步锁metadata map[string]*metaEntry //元数据
}func newScrapeCache() *scrapeCache {return &scrapeCache{series:        map[string]*cacheEntry{},droppedSeries: map[string]*uint64{},seriesCur:     map[uint64]labels.Labels{},seriesPrev:    map[uint64]labels.Labels{},metadata:      map[string]*metaEntry{},}
}
  • scrapeCache结构体包含多种方法(按照在方法append出现的顺序列出)

  (1)  判断指标(metrics)的合法性

prometheus/scrape/scrape.gofunc (c *scrapeCache) getDropped(met string) bool {// 判断metric是否在非法的dropperSeries的map类型里, key是metric, value是迭代版本号iterp, ok := c.droppedSeries[met]if ok {*iterp = c.iter}return ok
}

  (2) 根据指标(metrics)信息获取结构体cacheEntry

prometheus/scrape/scrape.gofunc (c *scrapeCache) get(met string) (*cacheEntry, bool) {series是map类型,key是metric,value是结构体cachaEntrye, ok := c.series[met]if !ok {return nil, false}e.lastIter = c.iterreturn e, true
}

其中,cacheEntry结构体包含metric以下几个参数

prometheus/scrape/scrape.gotype cacheEntry struct {//添加到本地数据库或者远程数据库的一个返回值: //A reference number is returned which can be //used to add further samples in the same or later transactions.//Returned reference numbers are ephemeral and may be rejected in calls//to AddFast() at any point. Adding the sample via Add() returns a new//reference number.//If the reference is 0 it must not be used for caching.ref      uint64  lastIter uint64  //上一个版本号hash     uint64  // hash值lset     labels.Labels //包含的labels
}

(3) 添加不带时间戳的指标(metrics)到map类型seriesCur,以metric lset的hash值作为唯一标识

prometheus/scrape/scrape.gofunc (c *scrapeCache) trackStaleness(hash uint64, lset labels.Labels) {c.seriesCur[hash] = lset
}

  (4)  添加无效指标(metrics)到map类型droppedSeries

prometheus/scrape/scrape.gofunc (c *scrapeCache) addDropped(met string) {iter := c.iter// droppedSeries是map类型,以metric作为key, 版本作为valuec.droppedSeries[met] = &iter
}

  (5) 根据指标(metircs)信息添加该meitric的结构体cacheEntry

prometheus/scrape/scrape.gofunc (c *scrapeCache) addRef(met string, ref uint64, lset labels.Labels, hash uint64) {if ref == 0 {return}// series是map类型, key为metric, value是结构体cacheEntryc.series[met] = &cacheEntry{ref: ref, lastIter: c.iter, lset: lset, hash: hash}
}

  (6) 比较两个map:seriesCur和seriesPrev,查找过期指标

prometheus/scrape/scrape.gofunc (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {for h, lset := range c.seriesPrev {// 判断之前的metric是否在当前的seriesCur里.if _, ok := c.seriesCur[h]; !ok {if !f(lset) {break}}}
}

  (7) 整理scrapeCache结构体中包含的几个map类型的缓存.

prometheus/scrape/scrape.gofunc (c *scrapeCache) iterDone() {// All caches may grow over time through series churn// or multiple string representations of the same metric. Clean up entries// that haven't appeared in the last scrape.// 只保存最近两个版本的合法的seriesfor s, e := range c.series {if c.iter-e.lastIter > 2 {delete(c.series, s)}}// 只保留最近两个版本的droppedSeriesfor s, iter := range c.droppedSeries {if c.iter-*iter > 2 {delete(c.droppedSeries, s)}}c.metaMtx.Lock()// 保留最近十个版本的metadatafor m, e := range c.metadata {// Keep metadata around for 10 scrapes after its metric disappeared.if c.iter-e.lastIter > 10 {delete(c.metadata, m)}}c.metaMtx.Unlock()// Swap current and previous series.// 把上次采集的指标(metircs)集合和本次采集的指标集(metrics)互换c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev// We have to delete every single key in the map.// 删除本地获取的指标集(metrics)for k := range c.seriesCur {delete(c.seriesCur, k)}//迭代版本号自增c.iter++
}
  • append方法利用scrapeCache的以上几个方法, 可实现对指标(metircs)的合法性验证,过滤及存储.

其中,存储路径分两条

(1) 如果能够在scrapeCache中找到该metric的cacheEntry, 说明之前添加过,则做AddFast存储路径

(2) 如果在scrapeCache中不能找到该metric的cacheEntry, 需要生成指标的lables,hash及ref等,通过Add方法jinxing存储, 然后把该指标(metric)的cacheEntry加到scrapeCache中.

具体实现如下:

func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, added int, err error) {var (app            = sl.appender() // 获取指标(metrics)存储器(本地或远端),在服务启动的时候已经指定p              = textparse.New(b, contentType) // 创建指标(metrics)解析器,其中b包含所有一个target的所有指标(metrics)defTime        = timestamp.FromTime(ts)numOutOfOrder  = 0numDuplicates  = 0numOutOfBounds = 0)var sampleLimitErr errorloop:for {var et textparse.Entryif et, err = p.Next(); err != nil {if err == io.EOF {err = nil}break // 退出的条件是该target获取的所有指标(metrics)存储结束}switch et {case textparse.EntryType:sl.cache.setType(p.Type())continuecase textparse.EntryHelp:sl.cache.setHelp(p.Help())continuecase textparse.EntryUnit:sl.cache.setUnit(p.Unit())continuecase textparse.EntryComment:continuedefault:}total++t := defTimemet, tp, v := p.Series() //获取一个指标(metric)的信息if tp != nil {t = *tp}// 判断指标(metric)是否合法if sl.cache.getDropped(yoloString(met)) {continue}// 根据指标(metric)判断是否在scrapeCache中存在该指标的cacheEntryce, ok := sl.cache.get(yoloString(met))if ok {// 若存在,则存储指标(metric)到数据库switch err = app.AddFast(ce.lset, ce.ref, t, v); err {case nil:if tp == nil {sl.cache.trackStaleness(ce.hash, ce.lset)}// 错误处理case storage.ErrNotFound:ok = falsecase storage.ErrOutOfOrderSample:numOutOfOrder++level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met))targetScrapeSampleOutOfOrder.Inc()continuecase storage.ErrDuplicateSampleForTimestamp:numDuplicates++level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met))targetScrapeSampleDuplicate.Inc()continuecase storage.ErrOutOfBounds:numOutOfBounds++level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met))targetScrapeSampleOutOfBounds.Inc()continuecase errSampleLimit:// Keep on parsing output if we hit the limit, so we report the correct// total number of samples scraped.sampleLimitErr = erradded++continuedefault:break loop}}if !ok {var lset labels.Labels// 若不存在, 把指标(metric)格式从ASCII转换labels字典:先把ASCII转换成字串川,然后把字符串分割成label的name和value,具体的不再细讲mets := p.Metric(&lset)// 对指标(metirc)的labels做hash值:先在每个label的key和value后面分别加上255,然后根据golang的库函数xxhash,计算hash值hash := lset.Hash()// Hash label set as it is seen local to the target. Then add target labels// and relabeling and store the final label set.// 根据sp.config.HonorLabels和sp.config.MetricRelabelConfigs规则,对指标(metric)的lables进行重置lset = sl.sampleMutator(lset)// The label set may be set to nil to indicate dropping.// 若重置后的labels为空,则把该指标(metirc)加到droppedSeriesif lset == nil {sl.cache.addDropped(mets)continue}var ref uint64// 添加到本地或远程存储中,返回值ref用于后期AddFast函数的判断ref, err = app.Add(lset, t, v)// TODO(fabxc): also add a dropped-cache?switch err {case nil:case storage.ErrOutOfOrderSample:err = nilnumOutOfOrder++level.Debug(sl.l).Log("msg", "Out of order sample", "series", string(met))targetScrapeSampleOutOfOrder.Inc()continuecase storage.ErrDuplicateSampleForTimestamp:err = nilnumDuplicates++level.Debug(sl.l).Log("msg", "Duplicate sample for timestamp", "series", string(met))targetScrapeSampleDuplicate.Inc()continuecase storage.ErrOutOfBounds:err = nilnumOutOfBounds++level.Debug(sl.l).Log("msg", "Out of bounds metric", "series", string(met))targetScrapeSampleOutOfBounds.Inc()continuecase errSampleLimit:sampleLimitErr = erradded++continuedefault:level.Debug(sl.l).Log("msg", "unexpected error", "series", string(met), "err", err)break loop}if tp == nil {// Bypass staleness logic if there is an explicit timestamp.sl.cache.trackStaleness(hash, lset)}// 根据指标(metircs)信息添加该meitric的结构体cacheEntrysl.cache.addRef(mets, ref, lset, hash)}// 个数加1added++}// 错误处理if sampleLimitErr != nil {if err == nil {err = sampleLimitErr}// We only want to increment this once per scrape, so this is Inc'd outside the loop.targetScrapeSampleLimit.Inc()}if numOutOfOrder > 0 {level.Warn(sl.l).Log("msg", "Error on ingesting out-of-order samples", "num_dropped", numOutOfOrder)}if numDuplicates > 0 {level.Warn(sl.l).Log("msg", "Error on ingesting samples with different value but same timestamp", "num_dropped", numDuplicates)}if numOutOfBounds > 0 {level.Warn(sl.l).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "num_dropped", numOutOfBounds)}if err == nil {sl.cache.forEachStale(func(lset labels.Labels) bool {// Series no longer exposed, mark it stale._, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))switch err {case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:// Do not count these in logging, as this is expected if a target// goes away and comes back again with a new scrape loop.err = nil}return err == nil})}// 若出错,则做回滚操作if err != nil {app.Rollback()return total, added, err}// 否则,数据库做提交处理if err := app.Commit(); err != nil {return total, added, err}// 整理scrapeCache结构体中包含的几个map类型的缓存sl.cache.iterDone()return total, added, nil
}示例:
(dlv) p lset
github.com/prometheus/prometheus/pkg/labels.Labels len: 4, cap: 4, [{Name: "__name__",Value: "go_gc_duration_seconds",},{Name: "instance",Value: "localhost:9090",},{Name: "job",Value: "prometheus",},{Name: "quantile",Value: "0.25",},
]
(dlv) p mets
"go_gc_duration_seconds{quantile=\"0.25\"}"
(dlv) p hash
10509196582921338824
(dlv) p ref
2

至此,指标缓存(scrapeCache)功能分析结束

Prometheus源码系列:指标缓存(scrapeCache)相关推荐

  1. RabbitMQ 客户端源码系列 - Channel

    前言 续上次分享 RabbitMQ 客户端源码系列 - Connection ,继续分享Channel相关的源码分析 (com.rabbitmq:amqp-client:4.8.3) 友情提醒:本次分 ...

  2. c++ map 获取key列表_好未来Golang源码系列一:Map实现原理分析

    分享老师:学而思网校 郭雨田 一.map的结构与设计原理 golang中map是一个kv对集合.底层使用hash table,用链表来解决冲突 ,出现冲突时,不是每一个key都申请一个结构通过链表串起 ...

  3. Spring源码系列:依赖注入(二)createBean

    在Spring源码系列:依赖注入(一)(AbstractBeanFactory-getBean)最后说道getBean是依赖注入的起点,bean的创建都是通过createBean来完成具体的创建的.c ...

  4. Spring源码系列:BeanDefinition载入(下)

    在Spring源码系列:BeanDefinition载入(上)中已经大概捋了一下解析过程,本篇将记录一下bean的注册过程. bean的注册就是DefaultListableBeanFactory中r ...

  5. Spring源码系列(十二)Spring创建Bean的过程(二)

    1.写在前面 上篇博客主要Spring在创建Bean的时候,第一次调用的Bean的后置处理器的过程,同时笔者也打算将整个Spring创建的Bean的过程,通过这个系列,将Bean的创建过程给讲清楚,废 ...

  6. Spring读源码系列之AOP--03---aop底层基础类学习

    Spring读源码系列之AOP--03---aop底层基础类学习 引子 Spring AOP常用类解释 AopInfrastructureBean---免被AOP代理的标记接口 ProxyConfig ...

  7. Spring源码系列- Spring Beans - 核心类的基本介绍

    Spring源码系列- Spring Beans - 核心类的基本介绍 读过上一篇文章的读者应该都能对Spring的体系结构有一个大致的了解,在结尾处,我也说过会从spring-beans包开始分析, ...

  8. 源码系列第1弹 | 带你快速攻略Kafka源码之旅入门篇

    大家过年好,我是 华仔, 又跟大家见面了. 从今天开始我将为大家奉上 Kafka 源码剖析系列文章,正式开启 「Kafka的源码之旅」,跟我一起来掌握 Kafka 源码核心架构设计思想吧. 今天这篇我 ...

  9. SpringMVC源码系列:HandlerMapping

    SpringMVC源码系列:HandlerMapping SpringMVC源码系列:AbstractHandlerMapping HandlerMapping接口是用来查找Handler的.在Spr ...

最新文章

  1. 领域驱动设计门槛很高,没有深厚的面向对象编码能力很难实践成功
  2. b树与b+树的区别_一篇文章理清B树、B-树、B+树、B*树索引之间的区别与联系
  3. 如何用python制作动态二维码,提升表白成功率?
  4. Martin Davis最新访谈:机器学习是一个收敛的过程,背后理论并不高深
  5. php truepath,php – 为什么switch(true)具有比if()elseif()更小的NPath复杂度?
  6. 翻译: Oralce官方文档-- Data Blocks, Extents, and Segments
  7. android 动态调用apk,通过反射动态加载未安装apk
  8. 带你了解两种线性规划的方法:稀疏矩阵存储和预处理
  9. 计算机网络流媒体播放,流媒体播放方式包含以下哪几种方式
  10. 对于PHP大型开发框架的看法
  11. ajax post提交到SpringMVC的Controller并将处理结果传递到前台输出总结(1)
  12. input 限制只能输入数字,且保留小数后两位
  13. JAVA中的多线程(八):线程的优先级和yield方法
  14. android图片底部居中对齐,Android 解决图文混排,图片和文字居中对齐问题(ImageSpan)...
  15. 项目一实时数仓数据采集
  16. Resnet网络结构图和对应参数表的简单理解
  17. 根据关系图非常简单的求出三种关系闭包(自反闭包、对称闭包、传递闭包)附练习题
  18. 华为智能音响2代鸿蒙,99999元!华为全屋智能方案来了:鸿蒙生态是亮点
  19. 江苏中理网络科技有限公司介绍
  20. 我的2020年度总结 “既往不恋,纵情向前”

热门文章

  1. 直接使用word模板生成word文件
  2. We‘re sorry but XXX doesn‘t work properly without JavaScript enabled.
  3. 研究生毕业2w的工作是什么样的?
  4. chrome扩展设置代理
  5. 不了解外贸装箱,这一篇够够的
  6. Linux下堆漏洞利用(off-by-one)
  7. CSP_201412-2_Z字形扫描
  8. 达人评测锐龙r7 5800h和酷睿i7 12650h选哪个好
  9. 二体问题之6:轨道根数及其转化
  10. 汇编语言更象是野球拳,哈哈……