会发现, 现在storm里面有两套metrics系统, metrics framework和stats framework

并且在所有地方都是同时注册两套, 貌似准备用metrics来替代stats, 但当前版本UI仍然使用stats

这个模块统计的数据怎么被使用,

1. 在worker中, 会定期调用do-executor-heartbeats去往zk同步hb
可以看到, stats也会作为hb的一部分被同步到zk上

(defnk do-executor-heartbeats [worker :executors nil];; stats is how we know what executors are assigned to this worker (let [stats (if-not executors(into {} (map (fn [e] {e nil}) (:executors worker)))(->> executors(map (fn [e] {(executor/get-executor-id e) (executor/render-stats e)}))(apply merge)))zk-hb {:storm-id (:storm-id worker):

executor-stats stats

               :uptime ((:uptime worker)):time-secs (current-time-secs)}];; do the zookeeper heartbeat(.worker-heartbeat! (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (:port worker) zk-hb)    ))

2. 现在任何人都可以通过nimbus的thrift接口来得到相关信息

(^TopologyInfo getTopologyInfo [this ^String storm-id]beats (.executor-beats storm-cluster-state storm-id (:executor->node+port assignment))stats (:stats heartbeat))

3. 最直接的用户就是storm UI, 在准备topology page的时候, 就会调用getTopologyInfo来获取数据

(defn topology-page [id window include-sys?](with-nimbus nimbus(let [summ (.getTopologyInfo ^Nimbus$Client nimbus id)]
)

Stats

这个模块用于spout和bolt来抽样统计数据, 需要统计的具体metics如下

(def COMMON-FIELDS [:emitted :transferred])
(defrecord CommonStats [emitted transferred rate])(def BOLT-FIELDS [:acked :failed :process-latencies :executed :execute-latencies])
;;acked and failed count individual tuples
(defrecord BoltExecutorStats [common acked failed process-latencies executed execute-latencies])(def SPOUT-FIELDS [:acked :failed :complete-latencies])
;;acked and failed count tuple completion
(defrecord SpoutExecutorStats [common acked failed complete-latencies])

抽样的比例在storm-conf, TOPOLOGY_STATS_SAMPLE_RATE, 配置

为什么统计时每次加rate, 而不是加1?

因为这里的统计是抽样的, 所以如果抽样比例是10%, 那么发现一个, 应该加1/(10%), 10个

(defn sampling-rate [conf](->> (conf TOPOLOGY-STATS-SAMPLE-RATE)(/ 1)int))

然后统计是基于时间窗口的, 底下是对应默认的bucket和时间窗口的定义

(def NUM-STAT-BUCKETS 20) ;;bucket数
;; 10 minutes, 3 hours, 1 day ;;定义3种时间窗口
(def STAT-BUCKETS [30 540 4320]) ;;bucket大小分别是30,540,4320秒

核心数据结构是RollingWindowSet, 包含:
统计数据需要的函数, updater extractor, 之所以治理也需要是因为需要统计all-time 
一组rolling windows, 默认是3个时间窗, 10 minutes, 3 hours, 1 day
all-time, 在完整的时间区间上的统计结果

(defrecord RollingWindowSet [updater extractor windows all-time])
(defn rolling-window-set [updater merger extractor num-buckets & bucket-sizes](RollingWindowSet. updater extractor (dofor [s bucket-sizes] (rolling-window updater merger extractor s num-buckets)) nil))

继续看看rolling window的定义,
核心数据, buckets, hashmap, {streamid, data}, 初始化为{}
统计data需要的函数, updater merger extractor
时间窗口, buckets大小和buckets个数

(defrecord RollingWindow [updater merger extractor bucket-size-secs num-buckets buckets])
(defn rolling-window [updater merger extractor bucket-size-secs num-buckets](RollingWindow. updater merger extractor bucket-size-secs num-buckets {}))

1. mk-stats

在mk-executedata的时候需要创建stats

mk-executor-stats <> (sampling-rate storm-conf)

;; TODO: refactor this to be part of an executor-specific map
(defmethod mk-executor-stats :spout [_ rate](stats/mk-spout-stats rate))
(defmethod mk-executor-stats :bolt [_ rate](stats/mk-bolt-stats rate))

第一个参数忽略, 其实就是分别调用stats/mk-spout-stats或stats/mk-bolt-stats, 可见就是对于每个需要统计的数据, 创建一个rolling-windows-set

(defn- mk-common-stats [rate](CommonStats. (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))rate))(defn mk-bolt-stats [rate](BoltExecutorStats. (mk-common-stats rate)(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))))(defn mk-spout-stats [rate](SpoutExecutorStats. (mk-common-stats rate)(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))))

2. 数据更新

(defn spout-acked-tuple! [^SpoutExecutorStats stats stream latency-ms](update-executor-stat! stats :acked stream (stats-rate stats))(update-executor-stat! stats :complete-latencies stream latency-ms))

(defmacro update-executor-stat! [stats path & args](let [path (collectify path)]`(swap! (-> ~stats ~@path) update-rolling-window-set ~@args)))

就以update-executor-stat! stats :acked stream (stats-rate stats)为例子看看怎么做的?

SpoutExecutorStats取出用于记录spout acked情况的rolling-windows-set
然后使用update-rolling-window-set来swap这个atom

来看看记录acked的rolling-windows-set是如何定义的?

keyed-counter-rolling-window-set, 预定义了updater merger extractor
updater, incr-val [amap key amt], 把给定的值amt加到amap的对应的key的value上
merger, (partial merge-with +), 用+作为map merge的逻辑, 即出现相同key则相加
extractor, counter-extract, (if v v {}), 有则返回, 无则返回{}
windows, rolling-window的list
all-time, 初始化为nil

(defn keyed-counter-rolling-window-set [num-buckets & bucket-sizes](apply rolling-window-set incr-val (partial merge-with +) counter-extract num-buckets bucket-sizes))

好, 下面就看看, 当spout-acked-tuple!时更新:acked时, 如何update的?

首先更新每个rolling-window, 并把更新过的rolling-window-set更新到:windows
并且更新:all-time, (apply (:updater rws) (:all-time rws) args)
updated, incr-val [amap key amt]
args, steamid, rate
all-time, 是用来记录整个时间区间上的, 某个stream的统计情况

(defn update-rolling-window-set([^RollingWindowSet rws & args](let [now (current-time-secs)new-windows (dofor [w (:windows rws)](apply update-rolling-window w now args))](assoc rws :windows new-windows :all-time (apply (:updater rws) (:all-time rws) args)))))

看下如何更新某个rolling-windw
根据now算出当前属于哪个bucket, time-bucket
取出buckets, 并使用:updater更新相应的bucket, 这里的操作仍然是把rate叠加到streamid的value上

(defn update-rolling-window([^RollingWindow rw time-secs & args];; this is 2.5x faster than using update-in...(let [time-bucket (curr-time-bucket time-secs (:bucket-size-secs rw))buckets (:buckets rw)curr (get buckets time-bucket)           curr (apply (:updater rw) curr args)](assoc rw :buckets (assoc buckets time-bucket curr)))))

转载于:https://www.cnblogs.com/fxjwind/p/3223110.html

Storm-源码分析-Stats (backtype.storm.stats)相关推荐

  1. JStorm与Storm源码分析(三)--Scheduler,调度器

    Scheduler作为Storm的调度器,负责为Topology分配可用资源. Storm提供了IScheduler接口,用户可以通过实现该接口来自定义Scheduler. 其定义如下: public ...

  2. JStorm与Storm源码分析(二)--任务分配,assignment

    mk-assignments主要功能就是产生Executor与节点+端口的对应关系,将Executor分配到某个节点的某个端口上,以及进行相应的调度处理.代码注释如下: ;;参数nimbus为nimb ...

  3. Storm源码分析之四: Trident源码分析

    Storm源码分析之四: Trident源码分析 @(STORM)[storm] Storm源码分析之四 Trident源码分析 一概述 0小结 1简介 2关键类 1Spout的创建 2spout的消 ...

  4. JStorm与Storm源码分析(四)--均衡调度器,EvenScheduler

    EvenScheduler同DefaultScheduler一样,同样实现了IScheduler接口,  由下面代码可以看出: (ns backtype.storm.scheduler.EvenSch ...

  5. JStorm与Storm源码分析(二)--任务分配,assignmen

    mk-assignments主要功能就是产生Executor与节点+端口的对应关系,将Executor分配到某个节点的某个端口上,以及进行相应的调度处理.代码注释如下: 1 ;;参数nimbus为ni ...

  6. JStorm与Storm源码分析(八)--计时器工具-mk-timer

    Storm使用计时器线程来处理一些周期性调度事件. 与计时器相关的操作主要有:创建计时器线程.查看线程是否活跃.向线程中加入新的待调度事件.取消计时器线程 mk-timer方法用于创建一个计时器线程. ...

  7. JStorm与Storm源码分析(七)--BasicBoltExecutor与装饰模式

    在Storm中IBasicBolt的主要作用是为用户提供一种更为简单的Bolt编写方式,更为简单体现在Storm框架本身帮你处理了所发出消息的Ack.Fail和Anchor操作,而这部分操作是由执行器 ...

  8. JStorm与Storm源码分析(六)--收集器 IOutputCollector 、OutputCollector

    在Storm中,多个地方使用了IOutputCollector收集器接口,收集器OutputCollector的接口就是IOutputCollector.所以有必要对接口IOutputCollecto ...

  9. JStorm与Storm源码分析(一)--nimbus-data

    Nimbus里定义了一些共享数据结构,比如nimbus-data. nimbus-data结构里定义了很多公用的数据,请看下面代码: (defn nimbus-data [conf inimbus]( ...

  10. storm源码分析研究(五)

    2021SC@SDUSC spout源码分析(四) 2021SC@SDUSC spout: ack机制 为保证无数据丢失,Storm/JStorm使用了非常漂亮的可靠性处理机制,当定义Topology ...

最新文章

  1. Django model.py表单设置默认值允许为空
  2. 使用 NCS2 异步推理——人脸识别
  3. 信息系统项目管理师:第二三章:信息系统项目管理基础与立项管理
  4. perl malformed JSON string, neither tag, array, object, number, string or atom, at character offset
  5. 第四范式携手工银科技 首批入驻雄安人工智能算法开放平台
  6. 学生用计算机中sto,STO 文件扩展名: 它是什么以及如何打开它?
  7. 前端学习(3131):react-hello-react之总结ref
  8. prettier 配置参数说明
  9. PHP String
  10. iOS中如何旋转UIView
  11. springMVC web项目 对访问数据库的用户名密码进行加密解密
  12. Java经典编程题50道之三十二
  13. 开关电源设计-基础视频教程(53集全,含配套资料)-道合顺大数据Infinigo
  14. 简述Java运行环境
  15. linux切换任务栏快捷键,ubuntu常用命令及快捷键整理
  16. Python-struct
  17. 领你走进10位管理大师的思想境界
  18. hdu4747-线段树
  19. Bootloader和Linux启动过程总结
  20. 长期一个人独自生活会怎么样?

热门文章

  1. Google C++ 编码风格精简
  2. 网络编程中的缓冲区溢出
  3. 数据结构:复杂度分析以及数据结构整体概览
  4. Nike推Nike Fit可轻松丈量足部尺寸与推荐鞋款
  5. thinkphp-add方法错误
  6. [20171109]缓存命中率神话.txt
  7. springboot+mongodb
  8. 管理员必备的Linux系统监控工具
  9. 用Java调用jdbc接口连接MySQL数据库——实现对数据库的增删改查
  10. php接受post值报错,php接收post参数时报错怎么办