作者:石蓓蓓

爱可生研发工程师,主要负责爱可生产品云DMP树产品的研发工作。

本文来源:原创投稿

*爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。


AlertManager 是处理对应用程序的告警的,比如Promethus的服务端。对于输入的告警,会经过分组、抑制、静默、去重等步骤,最终并将告警发送到接受者(邮箱等)。

alertManager 的框架图如下:

今天主要是分享 AlertManager 中 Pipeline 相关的流程代码,pipeline 主要是用来处理分组后的告警,经过抑制、静默、去重,然后发送。

首先在创建 Pipeline 的时候,会创建 GossipSettleStage 、MuteStage(包含抑制和静默)、WaitStage 、DedupStage 、RetryStage 、SetNotifiesStage 。

// New returns a map of receivers to Stages.
func (pb *PipelineBuilder) New(receivers map[string][]Integration,wait func() time.Duration,inhibitor *inhibit.Inhibitor,silencer *silence.Silencer,notificationLog NotificationLog,peer *cluster.Peer,
) RoutingStage {rs := make(RoutingStage, len(receivers))ms := NewGossipSettleStage(peer)is := NewMuteStage(inhibitor)ss := NewMuteStage(silencer)for name := range receivers {st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics)rs[name] = MultiStage{ms, is, ss, st}}return rs
}
// createReceiverStage creates a pipeline of stages for a receiver.
func createReceiverStage(name string,integrations []Integration,wait func() time.Duration,notificationLog NotificationLog,metrics *metrics,
) Stage {var fs FanoutStagefor i := range integrations {recv := &nflogpb.Receiver{GroupName:   name,Integration: integrations[i].Name(),Idx:         uint32(integrations[i].Index()),}var s MultiStages = append(s, NewWaitStage(wait))s = append(s, NewDedupStage(&integrations[i], notificationLog, recv))s = append(s, NewRetryStage(integrations[i], name, metrics))s = append(s, NewSetNotifiesStage(notificationLog, recv))fs = append(fs, s)}return fs
}

从上面的代码可以看到 AlertManager 在某一通道处理时会经过 GossipSettleStage 、MuteStage(包含抑制和静默)、WaitStage 、DedupStage 、RetryStage 、SetNotifiesStage 这7个 stage ,并且顺序执行。

Pipeline 的执行是遍历了所有的 stage ,每次执行 Exec 方法(见代码的第8行),且每次执行后返回的 alert 列表是下一步的参数(第8行的代码对传入的参数alerts赋予新的告警值,再下次执行Exec的时候传入的alerts的值是新的值),最终得到的alert列表是经过每次过滤后的告警列表

func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {var err errorfor _, s := range ms {if len(alerts) == 0 {return ctx, nil, nil}ctx, alerts, err = s.Exec(ctx, l, alerts...)if err != nil {return ctx, nil, err}}return ctx, alerts, nil
}

GossipSettle

等待集群准备完毕。

func (n *GossipSettleStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {if n.peer != nil {n.peer.WaitReady()}return ctx, alerts, nil
}

Inhibitor 抑制

抑制首先是会执行MuteStage的Exec,再匹配到后,就不会发送告警。主要是执行第6行的n.muter.Mutes方法来进行匹配:

func (n *MuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {var filtered []*types.Alertfor _, a := range alerts {// TODO(fabxc): increment total alerts counter.// Do not send the alert if muted.if !n.muter.Mutes(a.Labels) {filtered = append(filtered, a)} else {n.postMuteHandle(a)}// TODO(fabxc): increment muted alerts counter if muted.}return ctx, filtered, nil
}

抑制条件是如何匹配的呢?

我们在设置抑制规则时,会设置抑制源和抑制目标。在启动 Inhibitor 的时候,会先匹配抑制源(也就是Source),如果某条告警的 label 满足抑制源的条件,则会被放入 scache 中(第17行进行匹配,在21行时匹配成功写入 scache 中)。

func (ih *Inhibitor) run(ctx context.Context) {it := ih.alerts.Subscribe()defer it.Close()for {select {case <-ctx.Done():returncase a := <-it.Next():if err := it.Err(); err != nil {level.Error(ih.logger).Log("msg", "Error iterating alerts", "err", err)continue}// Update the inhibition rules' cache.for _, r := range ih.rules {if r.IsExpressionMatch {if matched, err := r.SourceExpMatcher.Match(a.Labels); err != nil {level.Error(ih.logger).Log("msg", "Error expression match alerts", "err", err)continue} else if matched {if err := r.scache.Set(a); err != nil {level.Error(ih.logger).Log("msg", "error on set alert", "err", err)}}} else if r.SourceMatchers.Match(a.Labels) {if err := r.scache.Set(a); err != nil {level.Error(ih.logger).Log("msg", "error on set alert", "err", err)}}}}}
}

此时如果有新产生的告警正好满足抑制规则的抑制目标(也就是 target)规则,那么这条规则会被通过方法 SetInhibited 设置成为抑制。在被设置为抑制时,被抑制的告警也会被设置抑制源告警的指纹。

// Mutes returns true if the given label set is muted. It implements the Muter
// interface.
func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
fp := lset.Fingerprint()for _, r := range ih.rules {
if r.IsExpressionMatch {if targetMatched, err := r.TargetExpMatcher.Match(lset); err != nil {level.Error(ih.logger).Log("msg", "Error inhibitor expression match alerts", "err", err)continue} else {if !targetMatched {continue}}} else {if !r.TargetMatchers.Match(lset) {// If target side of rule doesn't match, we don't need to look any further.continue}}// If we are here, the target side matches. If the source side matches, too, we// need to exclude inhibiting alerts for which the same is true.sourceMatched := falseif r.IsExpressionMatch {if matched, err := r.SourceExpMatcher.Match(lset); err != nil {level.Error(ih.logger).Log("msg", "Error inhibitor expression match alerts", "err", err)continue} else {sourceMatched = matched}} else {sourceMatched = r.SourceMatchers.Match(lset)}if inhibitedByFP, eq := r.hasEqual(ih.logger, lset, sourceMatched); eq {ih.marker.SetInhibited(fp, inhibitedByFP.String())return true}}ih.marker.SetInhibited(fp)return false
}

Silencer 静默

静默规则执行MuteStage的Exec,新的告警的labels匹配到静默规则的条件后,新的告警就会被静默,通过SetInhibited进行标记,同时会设置抑制源告警的指纹

// Mutes implements the Muter interface.
func (s *Silencer) Mutes(lset model.LabelSet) bool {fp := lset.Fingerprint()ids, markerVersion, _ := s.marker.Silenced(fp)var (err        errorsils       []*pb.SilencenewVersion = markerVersion)if markerVersion == s.silences.Version() {// No new silences added, just need to check which of the old// silences are still revelant.if len(ids) == 0 {// Super fast path: No silences ever applied to this// alert, none have been added. We are done.return false}// This is still a quite fast path: No silences have been added,// we only need to check which of the applicable silences are// currently active. Note that newVersion is left at// markerVersion because the Query call might already return a// newer version, which is not the version our old list of// applicable silences is based on.sils, _, err = s.silences.Query(QIDs(ids...),QState(types.SilenceStateActive),)} else {// New silences have been added, do a full query.sils, newVersion, err = s.silences.Query(QState(types.SilenceStateActive),QMatches(lset),)}if err != nil {level.Error(s.logger).Log("msg", "Querying silences failed, alerts might not get silenced correctly", "err", err)}if len(sils) == 0 {s.marker.SetSilenced(fp, newVersion)return false}idsChanged := len(sils) != len(ids)if !idsChanged {// Length is the same, but is the content the same?for i, s := range sils {if ids[i] != s.Id {idsChanged = truebreak}}}if idsChanged {// Need to recreate ids.ids = make([]string, len(sils))for i, s := range sils {ids[i] = s.Id}sort.Strings(ids) // For comparability.}if idsChanged || newVersion != markerVersion {// Update marker only if something changed.s.marker.SetSilenced(fp, newVersion, ids...)}return true
}

WaitStage

WaitStage 表示向其他实例发送 Notification Log 的时间间隔,只是单纯的时间等待。

// Exec implements the Stage interface.
func (ws *WaitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {select {case <-time.After(ws.wait()):case <-ctx.Done():return ctx, nil, ctx.Err()}return ctx, alerts, nil
}

DedupStage

DedupStage 主要是通过计算告警的hash值来起到去重的作用。

func (n *DedupStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {gkey, ok := GroupKey(ctx)if !ok {return ctx, nil, fmt.Errorf("group key missing")}repeatInterval, ok := RepeatInterval(ctx)if !ok {return ctx, nil, fmt.Errorf("repeat interval missing")}firingSet := map[uint64]struct{}{}resolvedSet := map[uint64]struct{}{}firing := []uint64{}resolved := []uint64{}var hash uint64for _, a := range alerts {hash = n.hash(a)if a.Resolved() {resolved = append(resolved, hash)resolvedSet[hash] = struct{}{}} else {firing = append(firing, hash)firingSet[hash] = struct{}{}}}ctx = WithFiringAlerts(ctx, firing)ctx = WithResolvedAlerts(ctx, resolved)entries, err := n.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(n.recv))if err != nil && err != nflog.ErrNotFound {return ctx, nil, err}var entry *nflogpb.Entryswitch len(entries) {case 0:case 1:entry = entries[0]default:return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries))}if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {return ctx, alerts, nil}return ctx, nil, nil
}

RetryStage

主要是根据不同的通道来发送告警,如果失败,会进行重试。

func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {var sent []*types.Alert// If we shouldn't send notifications for resolved alerts, but there are only// resolved alerts, report them all as successfully notified (we still want the// notification log to log them for the next run of DedupStage).if !r.integration.SendResolved() {firing, ok := FiringAlerts(ctx)if !ok {return ctx, nil, fmt.Errorf("firing alerts missing")}if len(firing) == 0 {return ctx, alerts, nil}for _, a := range alerts {if a.Status() != model.AlertResolved {sent = append(sent, a)}}} else {sent = alerts}var (i    = 0b    = backoff.NewExponentialBackOff()tick = backoff.NewTicker(b)iErr error)defer tick.Stop()for {i++// Always check the context first to not notify again.select {case <-ctx.Done():if iErr != nil {return ctx, nil, iErr}return ctx, nil, ctx.Err()default:}select {case <-tick.C:now := time.Now()retry, err := r.integration.Notify(ctx, sent...)r.metrics.notificationLatencySeconds.WithLabelValues(r.integration.Name()).Observe(time.Since(now).Seconds())r.metrics.numNotifications.WithLabelValues(r.integration.Name()).Inc()if err != nil {r.metrics.numFailedNotifications.WithLabelValues(r.integration.Name()).Inc()level.Debug(l).Log("msg", "Notify attempt failed", "attempt", i, "integration", r.integration.Name(), "receiver", r.groupName, "err", err)if !retry {return ctx, alerts, fmt.Errorf("cancelling notify retry for %q due to unrecoverable error: %s", r.integration.Name(), err)}// Save this error to be able to return the last seen error by an// integration upon context timeout.iErr = err} else {return ctx, alerts, nil}case <-ctx.Done():if iErr != nil {return ctx, nil, iErr}return ctx, nil, ctx.Err()}}
}

SetNotifiesStage

SetNotifiesStage 主要是用来确保告警已经发送给 了通道,并记录到 alertManager 的日志中。

func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
gkey, ok := GroupKey(ctx)
if !ok {
return ctx, nil, fmt.Errorf("group key missing")
}firing, ok := FiringAlerts(ctx)
if !ok {
return ctx, nil, fmt.Errorf("firing alerts missing")
}resolved, ok := ResolvedAlerts(ctx)
if !ok {
return ctx, nil, fmt.Errorf("resolved alerts missing")
}return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved)
}

技术分享 | AlertManager 源码解析相关推荐

  1. 轻量级社会化分享openShare源码解析

    ####开篇 关于社会化分享,一般用友盟比较多,但是也有其他的实现方式,这里介绍一下 openShare ,可以不利用官方SDK,直接进行分享.和友盟相比包小了太多,不过貌似没法统计,各有特色吧. # ...

  2. Java并发编程与技术内幕:ConcurrentHashMap源码解析

    林炳文Evankaka原创作品.转载请注明出处http://blog.csdn.net/evankaka 摘要:本文主要讲了Java中ConcurrentHashMap 的源码 ConcurrentH ...

  3. java 并发框架源码_Java并发编程高阶技术-高性能并发框架源码解析与实战

    Java并发编程高阶技术-高性能并发框架源码解析与实战 1 _0 Z' @+ l: s3 f6 r% t|____资料3 Z9 P- I2 x8 T6 ^ |____coding-275-master ...

  4. Windows用户层技术工具与源码分享

    Windows用户层技术工具与源码分享 文章目录 Windows用户层技术工具与源码分享 一.注入与隐藏 1.窗口界面介绍 2.远程线程注入 3.APC注入 4.突破SESSION0隔离的远程注入 5 ...

  5. Android技术栈--HashMap和ArrayMap源码解析

    1 总览 WARNING!!:本文字数较多,内容较为完整并且部分内容难度较大,阅读本文需要较长时间,建议读者分段并耐心阅读. 本文会对 Android 中常用的数据结构进行源码解析,包括 HashMa ...

  6. H.264压缩技术之视频基础(foundation of learning video)——Matlab源码解析

    前言 为了后续能更好的理解,I帧编码与P帧编码,所以笔者先对数字视频中的一些基础概念进行铺垫.后续比较复杂的帧内预测,与帧间预测理解起来就会相对容易些. 关于Matlab中h.264的main函数部分 ...

  7. Android技术栈(五)HashMap(包括红黑树)与ArrayMap源码解析

    1 总览 本文会对 Android 中常用HashMap(有红黑树)和ArrayMap进行源码解析,其中 HashMap 源码来自 Android Framework API 28 (JDK=1.8) ...

  8. openGauss数据库源码解析系列文章—— AI技术之“自调优”

    上一篇介绍了第七章执行器解析中"7.6 向量化引擎"及"7.7 小结"的相关内容,本篇我们开启第八章 AI技术中"8.1 概述"及" ...

  9. VVeboTableView 源码解析

    原文链接:http://www.jianshu.com/p/78027a3a2c41 最近在看一些 iOS 性能优化的文章,我找到了 VVeboTableView 这个框架.严格来说这个不属于框架,而 ...

最新文章

  1. HTTP协议与TCP/IP协议的关系
  2. mybatis-plus AutoGenerator
  3. was、ihs、 mq、 db2的版本查询
  4. 项目实战中如何使用抽象类和接口
  5. 一、css清除浮动方法学习笔记总结(超详细,简单易懂)
  6. linux 系统错误表 和对应的数值
  7. java B2B2C Springcloud多租户电子商城系统- Gateway 之Predict篇...
  8. java 单链表反转_Java实现单链表翻转详解
  9. beautifulsoup解析动态页面div未展开_Python爬虫 | 0xb 数据解析:PyQuery库
  10. 无法完成您的itunes store的请求_iTunes 谢幕,盘点它的这 18 年
  11. eclipse导入远程git代码及(push、pull、及maven工程导入)
  12. oracle共享内存系统全局,Oracle10g 管理系统全局区简介
  13. mybatis两个内置参数
  14. 笔记 -凸函数 /KL距离
  15. 计算机硬件系统测试,介绍几个常用的电脑硬件检测工具
  16. 计算几何——扇形面积
  17. 肖飒:区块链应用创业的法律边界及案例分析 | 清华x-lab公开课
  18. 【数据结构】顺序表实现超详解(保姆级教程)
  19. sql 创建表,批量插入数据
  20. 10个你可能不曾用过却很有用的 LINUX 命令

热门文章

  1. html怎么设置左偏移量,CSS中margin属性的偏移量详解(代码示例)
  2. 【云原生|实践指北】5:真实业务场景下云原生项目落地实践学习
  3. Android 面试要点
  4. _DataStructure_C_Impl:求图G中从顶点u到顶点v的一条简单路径
  5. 1.1.4实践环节--制作调查问卷
  6. 【人工智能】人工智能是什么?如何入门人工智能?我们为什么要学人工智能?
  7. 游戏分类 PRG AVG
  8. [杂言]打坐一定要盘腿么?
  9. linux外网服务器跳转内网服务器实现内网访问(iptables)
  10. jsp2022326税务税收协同办公系统