欢迎转载,转载请注明出处源自徽沪一郎。本文尝试分析tuple发送时的具体细节,本博的另一篇文章《bolt消息传递路径之源码解读》主要从消息接收方面来阐述问题,两篇文章互为补充。

worker进程内消息接收与处理全景图

先上幅图简要勾勒出worker进程接收到tuple消息之后的处理全过程

IConnection的建立与使用

话说在mk-threads :bolt函数的实现中有这么一段代码,其主要功能是实现tuple的emit功能

bolt-emit (fn [stream anchors values task](let [out-tasks (if task(tasks-fn task stream values)(tasks-fn stream values))](fast-list-iter [t out-tasks](let [anchors-to-ids (HashMap.)](fast-list-iter [^TupleImpl a anchors](let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)](when (pos? (count root-ids))(let [edge-id (MessageId/generateId rand)](.updateAckVal a edge-id)(fast-list-iter [root-id root-ids](put-xor! anchors-to-ids root-id edge-id))))))(transfer-fn t(TupleImpl. worker-contextvaluestask-idstream(MessageId/makeId anchors-to-ids)))))(or out-tasks [])))

加亮为蓝色的部分实现的功能是另外发送tuple,那么transfer-fn函数的定义在哪呢?见mk-threads的let部分,能见到下述一行代码

:transfer-fn (mk-executor-transfer-fn batch-transfer->worker)

在继续往下看每个函数实现之前,先确定一下这节代码阅读的目的。storm在线程之间使用disruptor进行通讯,在进程之间进行消息通讯使用的是zeromq或netty, 所以需要从transfer-fn追踪到使用zeromq或netty api的位置。

再看mk-executor-transfer-fn函数实现

(defn mk-executor-transfer-fn [batch-transfer->worker](fn this([task tuple block? ^List overflow-buffer](if (and overflow-buffer (not (.isEmpty overflow-buffer)))(.add overflow-buffer [task tuple])(try-cause(disruptor/publish batch-transfer->worker [task tuple] block?)(catch InsufficientCapacityException e(if overflow-buffer(.add overflow-buffer [task tuple])(throw e))))))([task tuple overflow-buffer](this task tuple (nil? overflow-buffer) overflow-buffer))([task tuple](this task tuple nil))))

disruptor/publish表示将消息从本线程发送出去,至于谁是该消息的接收者,请继续往下看。

worker进程中,有一个receiver-thread是用来专门接收来自外部进程的消息,那么与之相对的是有一个transfer-thread用来将本进程的消息发送给外部进程。所以刚才的disruptor/publish发送出来的消息应该被transfer-thread接收到。

在transfer-thread中,能找到这行下述一行代码

transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples)

对于接收到来自本进程中其它线程发送过来的消息利用transfer-tuples进行处理,transfer-tuples使用mk-transfer-tuples-handler来创建,所以需要看看mk-transfer-tuples-handler能否与zeromq或netty联系上呢?

(defn mk-transfer-tuples-handler [worker](let [^DisruptorQueue transfer-queue (:transfer-queue worker)drainer (ArrayList.)node+port->socket (:cached-node+port->socket worker)task->node+port (:cached-task->node+port worker)endpoint-socket-lock (:endpoint-socket-lock worker)](disruptor/clojure-handler(fn [packets _ batch-end?](.addAll drainer packets)(when batch-end?(read-locked endpoint-socket-lock(let [node+port->socket @node+port->sockettask->node+port @task->node+port];; consider doing some automatic batching here (would need to not be serialized at this point to remo;; try using multipart messages ... first sort the tuples by the target node (without changing the lo17(fast-list-iter [[task ser-tuple] drainer];; TODO: consider write a batch of tuples here to every target worker;; group by node+port, do multipart send(let [node-port (get task->node+port task)](when node-port (.send ^IConnection (get node+port->socket node-port) task ser-tuple))))))(.clear drainer))))))

上述代码中出现了与zeromq可能有联系的部分了即加亮为红色的一行。

那凭什么说加亮的IConnection一行与zeromq有关系的,这话得慢慢说起,需要从配置文件开始。

在storm.yaml中有这么一行配置项,即

storm.messaging.transport: "backtype.storm.messaging.zmq"

这个配置项与worker中的mqcontext相对应,所以在worker中以mqcontext为线索,就能够一步步找到IConnection的实现。connections在函数mk-refresh-connections中建立

refresh-connections (mk-refresh-connections worker)

mk-refresh-connection函数中与mq-context相关联的一部分代码如下所示

(swap! (:cached-node+port->socket worker)#(HashMap. (merge (into {} %1) %2))(into {}(dofor [endpoint-str new-connections:let [[node port] (string->endpoint endpoint-str)]][endpoint-str(.connect^IContext (:mq-context worker)storm-id((:node->host assignment) node)port)])))

注意加亮部分,利用mq-conext中connect函数来创建IConnection. 当打开zmq.clj时候,就能验证我们的猜测。

(^IConnection connect [this ^String storm-id ^String host ^int port](require 'backtype.storm.messaging.zmq)(-> context(mq/socket mq/push)(mq/set-hwm hwm)(mq/set-linger linger-ms)(mq/connect (get-connect-zmq-url local? host port))mk-connection))

代码走到这里,IConnection什么时候建立起来的谜底就揭开了,消息是如何从bolt或spout线程传递到transfer-thread,再由zeromq将tuple发送给下跳的路径打通了。

tuple的分发策略 grouping

从一个bolt中产生的tuple可以有多个bolt接收,到底发送给哪一个bolt呢?这牵扯到分发策略问题,其实在twitter storm中有两个层面的分发策略问题,一个是对于task level的,在讲topology submit的时候已经涉及到。另一个就是现在要讨论的针对tuple level的分发。

再次将视线拉回到bolt-emit中,这次将目光集中在变量t的前前后后。

  (let [out-tasks (if task            (tasks-fn task stream values)       (tasks-fn stream values))]  (fast-list-iter [t out-tasks]       (let [anchors-to-ids (HashMap.)]            (fast-list-iter [^TupleImpl a anchors]                    (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]                    (when (pos? (count root-ids))                   (let [edge-id (MessageId/generateId rand)]                      (.updateAckVal a edge-id)                     (fast-list-iter [root-id root-ids]                            (put-xor! anchors-to-ids root-id edge-id))                    )))) (transfer-fn t                (TupleImpl. worker-context                        values                        task-id                       stream                        (MessageId/makeId anchors-to-ids)))))

上述代码显示t从out-tasks来,而out-tasks是tasks-fn的返回值

    tasks-fn (:tasks-fn task-data)

一谈tasks-fn,原来从未涉及的文件task.clj这次被挂上了,task-data与由task/mk-task创建。将中间环节跳过,调用关系如下所列。

  • mk-task
  • mk-task-data
  • mk-tasks-fn

tasks-fn中会使用到grouping,处理代码如下

fn ([^Integer out-task-id ^String stream ^List values](when debug?(log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))(let [target-component (.getComponentId worker-context out-task-id)component->grouping (get stream->component->grouper stream)  grouping (get component->grouping target-component)out-task-id (if grouping out-task-id)](when (and (not-nil? grouping) (not= :direct grouping))(throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))                          (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))(when (emit-sampler)(builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream)(stats/emitted-tuple! executor-stats stream)(if out-task-id(stats/transferred-tuples! executor-stats stream 1)(builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream 1)))(if out-task-id [out-task-id])))

而每个topology中的grouping策略又是如何被executor知道的呢,这从另一端executor-data说起。

在mk-executor-data中有下面一行代码

:stream->component->grouper (outbound-components worker-context component-id)

outbound-components的定义如下

(defn outbound-components"Returns map of stream id to component id to grouper"[^WorkerTopologyContext worker-context component-id](->> (.getTargets worker-context component-id)clojurify-structure(map (fn [[stream-id component->grouping]][stream-id(outbound-groupingsworker-contextcomponent-idstream-id(.getComponentOutputFields worker-context component-id stream-id)component->grouping)]))(into {})(HashMap.)))

转载于:https://www.cnblogs.com/hseagle/p/3436304.html

twitter storm源码走读之2 -- tuple消息发送场景分析相关推荐

  1. twitter storm源码走读(二)

    topology提交过程分析 概要 storm cluster可以想像成为一个工厂,nimbus主要负责从外部接收订单和任务分配.除了从外部接单,nimbus还要将这些外部订单转换成为内部工作分配,这 ...

  2. twitter storm源码走读之1 -- nimbus启动场景分析

    欢迎转载,转载时请注明作者徽沪一郎及出处,谢谢. 本文详细介绍了twitter storm中的nimbus节点的启动场景,分析nimbus是如何一步步实现定义于storm.thrift中的servic ...

  3. twitter storm源码走读(五)

    TridentTopology创建过程详解 从用户层面来看TridentTopology,有两个重要的概念一是Stream,另一个是作用于Stream上的各种Operation.在实现层面来看,无论是 ...

  4. JStorm与Storm源码分析(七)--BasicBoltExecutor与装饰模式

    在Storm中IBasicBolt的主要作用是为用户提供一种更为简单的Bolt编写方式,更为简单体现在Storm框架本身帮你处理了所发出消息的Ack.Fail和Anchor操作,而这部分操作是由执行器 ...

  5. JStorm与Storm源码分析(六)--收集器 IOutputCollector 、OutputCollector

    在Storm中,多个地方使用了IOutputCollector收集器接口,收集器OutputCollector的接口就是IOutputCollector.所以有必要对接口IOutputCollecto ...

  6. storm源码之storm代码结构【译】

    说明:本文翻译自Storm在GitHub上的官方Wiki中提供的Storm代码结构描述一节Structure of the codebase,希望对正在基于Storm进行源码级学习和研究的朋友有所帮助 ...

  7. Storm源码分析之四: Trident源码分析

    Storm源码分析之四: Trident源码分析 @(STORM)[storm] Storm源码分析之四 Trident源码分析 一概述 0小结 1简介 2关键类 1Spout的创建 2spout的消 ...

  8. Apache Storm源码阅读笔记

    欢迎转载,转载请注明出处. 楔子 自从建了Spark交流的QQ群之后,热情加入的同学不少,大家不仅对Spark很热衷对于Storm也是充满好奇.大家都提到一个问题就是有关storm内部实现机理的资料比 ...

  9. 【原】storm源码之一个class解决nimbus单点问题

    [原]storm源码之一个class解决nimbus单点问题 参考文章: (1)[原]storm源码之一个class解决nimbus单点问题 (2)https://www.cnblogs.com/yu ...

最新文章

  1. python中文名字叫什么-什么是Python
  2. 查看DLL 及LIB 库导出函数方法
  3. 【NLP】新闻上的文本分类:机器学习大乱斗
  4. SAP Fiori Elements 公开课第一单元概要介绍
  5. 开源GIS解决方案,暨GeoServer+OpenLayer结合开发总结
  6. matlab busy 如何看进度,matlab solve 之后不出结果不报错,状态一直显示busy
  7. 记一道简单的Java面试题,但答错率很高!
  8. Python 变量 字符串 运算
  9. 【bzoj4355】Play with sequence 线段树区间最值操作
  10. UEditor实战分享(二)定制
  11. [原创]独立模式安装Hive
  12. CSS引用LCD 字体 简单倒计时功能
  13. java导出出行客人到Excel
  14. ubuntu-20.04.3-详细安装教程(图文)附下载地址
  15. react 类暴露_React 组件暴露自身 API 的方法
  16. onnxruntime.capi.onnxruntime_pybind11_state.InvalidProtobuf: [ONNXRuntimeError] : 7 : INVALID_PROTOB
  17. iOS知识分享 — iOS 13上的暗模式
  18. HGOI 20190821 慈溪一中互测
  19. Python 函数 | filter 函数详解
  20. 曲线拟合最小二乘法优缺点_对最小二乘法拟合曲线的简单说明

热门文章

  1. Golang——数组遍历、最大值、求和、多维数组
  2. java 正则表达式 反向_正则表达式中的数量表示符、反向引用、零宽断言、以及java中的用法...
  3. pythonopencv的配置_python配置与使用OpenCV
  4. Linux E325: ATTENTION Found a swap file by the name “./.backu.sh.swp“
  5. 2022年中国在线视频行业研究报告
  6. 互联网晚报 | 1月25日 星期二 | 知乎首次举办上星晚会;微信视频号上线首个付费直播间;淘宝天猫“春节不打烊”活动正式上线...
  7. 最新遇到的面试题20210319
  8. 面试题,客户经常变更需求该如何处理?
  9. 为什么需要建设中台?
  10. 不属于python循环结构的是( )_Python语句print(type(['a','1',2,3]))的输出结果是哪一项?_学小易找答案...