本文主要研究一下storm TridentBoltExecutor的finishBatch方法

MasterBatchCoordinator.nextTuple

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/MasterBatchCoordinator.java

    public void nextTuple() {sync();}private void sync() {// note that sometimes the tuples active may be less than max_spout_pending, e.g.// max_spout_pending = 3// tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),// and there won't be a batch for tx 4 because there's max_spout_pending tx activeTransactionStatus maybeCommit = _activeTx.get(_currTransaction);if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {maybeCommit.status = AttemptStatus.COMMITTING;_collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this);}if(_active) {if(_activeTx.size() < _maxTransactionActive) {Long curr = _currTransaction;for(int i=0; i<_maxTransactionActive; i++) {if(!_activeTx.containsKey(curr) && isReady(curr)) {// by using a monotonically increasing attempt id, downstream tasks// can be memory efficient by clearing out state for old attempts// as soon as they see a higher attempt id for a transactionInteger attemptId = _attemptIds.get(curr);if(attemptId==null) {attemptId = 0;} else {attemptId++;}_attemptIds.put(curr, attemptId);for(TransactionalState state: _states) {state.setData(CURRENT_ATTEMPTS, _attemptIds);}TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);_activeTx.put(curr, newTransactionStatus);_collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this);_throttler.markEvent();}curr = nextTransactionId(curr);}}}}
复制代码
  • MasterBatchCoordinator是整个trident的真正的spout,它的nextTuple方法会向TridentSpoutCoordinator向MasterBatchCoordinator.BATCH_STREAM_ID($batch)发射tuple

TridentSpoutCoordinator.execute

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutCoordinator.java

    public void execute(Tuple tuple, BasicOutputCollector collector) {TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {_state.cleanupBefore(attempt.getTransactionId());_coord.success(attempt.getTransactionId());} else {long txid = attempt.getTransactionId();Object prevMeta = _state.getPreviousState(txid);Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));_state.overrideState(txid, meta);collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));}}
复制代码
  • TridentSpoutCoordinator接收MasterBatchCoordinator在MasterBatchCoordinator.BATCH_STREAM_ID($batch)发过来的tuple,然后向包装用户spout的TridentBoltExecutor发送batch指令

TridentBoltExecutor(TridentSpoutExecutor)

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java

    public void execute(Tuple tuple) {if(TupleUtils.isTick(tuple)) {long now = System.currentTimeMillis();if(now - _lastRotate > _messageTimeoutMs) {_batches.rotate();_lastRotate = now;}return;}String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());if(batchGroup==null) {// this is so we can do things like have simple DRPC that doesn't need to use batch processing_coordCollector.setCurrBatch(null);_bolt.execute(null, tuple);_collector.ack(tuple);return;}IBatchID id = (IBatchID) tuple.getValue(0);//get transaction id//if it already exists and attempt id is greater than the attempt thereTrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
//        if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
//            System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
//                    + " (" + _batches.size() + ")" +
//                    "\ntuple: " + tuple +
//                    "\nwith tracked " + tracked +
//                    "\nwith id " + id +
//                    "\nwith group " + batchGroup
//                    + "\n");
//
//        }//System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());// this code here ensures that only one attempt is ever tracked for a batch, so when// failures happen you don't get an explosion in memory usage in the tasksif(tracked!=null) {if(id.getAttemptId() > tracked.attemptId) {_batches.remove(id.getId());tracked = null;} else if(id.getAttemptId() < tracked.attemptId) {// no reason to try to execute a previous attempt than we've already seenreturn;}}if(tracked==null) {tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());_batches.put(id.getId(), tracked);}_coordCollector.setCurrBatch(tracked);//System.out.println("TRACKED: " + tracked + " " + tuple);TupleType t = getTupleType(tuple, tracked);if(t==TupleType.COMMIT) {tracked.receivedCommit = true;checkFinish(tracked, tuple, t);} else if(t==TupleType.COORD) {int count = tuple.getInteger(1);tracked.reportedTasks++;tracked.expectedTupleCount+=count;checkFinish(tracked, tuple, t);} else {tracked.receivedTuples++;boolean success = true;try {_bolt.execute(tracked.info, tuple);if(tracked.condition.expectedTaskReports==0) {success = finishBatch(tracked, tuple);}} catch(FailedException e) {failBatch(tracked, e);}if(success) {_collector.ack(tuple);                   } else {_collector.fail(tuple);}}_coordCollector.setCurrBatch(null);}private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {boolean success = true;try {_bolt.finishBatch(tracked.info);String stream = COORD_STREAM(tracked.info.batchGroup);for(Integer task: tracked.condition.targetTasks) {_collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));}if(tracked.delayedAck!=null) {_collector.ack(tracked.delayedAck);tracked.delayedAck = null;}} catch(FailedException e) {failBatch(tracked, e);success = false;}_batches.remove(tracked.info.batchId.getId());return success;}
复制代码
  • TridentBoltExecutor.execute方法,首先会创建并初始化TrackedBatch(如果TrackedBatch不存在的话),之后接收到batch指令的时候,对tracked.receivedTuple累加,然后调用_bolt.execute(tracked.info, tuple)
  • 对于spout来说,这里的_bolt是TridentSpoutExecutor,它的execute方法会往下游的TridentBoltExecutor发射一个batch的tuples;由于spout的expectedTaskReports==0,所以这里在调用完TridentSpoutExecutor发射batch的tuples时,它就立马调用finishBatch
  • finishBatch操作,这里会通过COORD_STREAM往下游的TridentBoltExecutor发射[id,count]数据,告知下游TridentBoltExecutor说它一共发射了多少tuples

TridentBoltExecutor(SubtopologyBolt)

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java

    @Overridepublic void execute(Tuple tuple) {if(TupleUtils.isTick(tuple)) {long now = System.currentTimeMillis();if(now - _lastRotate > _messageTimeoutMs) {_batches.rotate();_lastRotate = now;}return;}String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());if(batchGroup==null) {// this is so we can do things like have simple DRPC that doesn't need to use batch processing_coordCollector.setCurrBatch(null);_bolt.execute(null, tuple);_collector.ack(tuple);return;}IBatchID id = (IBatchID) tuple.getValue(0);//get transaction id//if it already exists and attempt id is greater than the attempt thereTrackedBatch tracked = (TrackedBatch) _batches.get(id.getId());
//        if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) {
//            System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()
//                    + " (" + _batches.size() + ")" +
//                    "\ntuple: " + tuple +
//                    "\nwith tracked " + tracked +
//                    "\nwith id " + id +
//                    "\nwith group " + batchGroup
//                    + "\n");
//
//        }//System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());// this code here ensures that only one attempt is ever tracked for a batch, so when// failures happen you don't get an explosion in memory usage in the tasksif(tracked!=null) {if(id.getAttemptId() > tracked.attemptId) {_batches.remove(id.getId());tracked = null;} else if(id.getAttemptId() < tracked.attemptId) {// no reason to try to execute a previous attempt than we've already seenreturn;}}if(tracked==null) {tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());_batches.put(id.getId(), tracked);}_coordCollector.setCurrBatch(tracked);//System.out.println("TRACKED: " + tracked + " " + tuple);TupleType t = getTupleType(tuple, tracked);if(t==TupleType.COMMIT) {tracked.receivedCommit = true;checkFinish(tracked, tuple, t);} else if(t==TupleType.COORD) {int count = tuple.getInteger(1);tracked.reportedTasks++;tracked.expectedTupleCount+=count;checkFinish(tracked, tuple, t);} else {tracked.receivedTuples++;boolean success = true;try {_bolt.execute(tracked.info, tuple);if(tracked.condition.expectedTaskReports==0) {success = finishBatch(tracked, tuple);}} catch(FailedException e) {failBatch(tracked, e);}if(success) {_collector.ack(tuple);                   } else {_collector.fail(tuple);}}_coordCollector.setCurrBatch(null);}private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {if(tracked.failed) {failBatch(tracked);_collector.fail(tuple);return;}CoordCondition cond = tracked.condition;boolean delayed = tracked.delayedAck==null &&(cond.commitStream!=null && type==TupleType.COMMIT|| cond.commitStream==null);if(delayed) {tracked.delayedAck = tuple;}boolean failed = false;if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {if(tracked.receivedTuples == tracked.expectedTupleCount) {finishBatch(tracked, tuple);                } else {//TODO: add logging that not all tuples were receivedfailBatch(tracked);_collector.fail(tuple);failed = true;}}if(!delayed && !failed) {_collector.ack(tuple);}}private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {boolean success = true;try {_bolt.finishBatch(tracked.info);String stream = COORD_STREAM(tracked.info.batchGroup);for(Integer task: tracked.condition.targetTasks) {_collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));}if(tracked.delayedAck!=null) {_collector.ack(tracked.delayedAck);tracked.delayedAck = null;}} catch(FailedException e) {failBatch(tracked, e);success = false;}_batches.remove(tracked.info.batchId.getId());return success;}
复制代码
  • TridentBoltExecutor(SubtopologyBolt)是spout下游的bolt,它的_bolt是SubtopologyBolt,而且它的tracked.condition.expectedTaskReports不为0,因而它是在接收到TupleType.COORD的tuple的时候,才进行checkFinish操作(这里先忽略TupleType.COMMIT类型)
  • 由于BoltExecutor是使用Utils.asyncLoop来挨个消费receiveQueue的数据的,而且emitBatch的时候也是挨个接收batch的tuples,最后再接收到TridentBoltExecutor(TridentSpoutExecutor)在finishBatch的时候通过COORD_STREAM发过来的[id,count]的tuple(注意这里的COORD_STREAM是分发给每个task的,如果TridentBoltExecutor有多个parallel,则他们是按各自的task来接收的)
  • 所以TridentBoltExecutor(SubtopologyBolt)先挨个处理每个tuple,处理完之后才轮到TupleType.COORD这个tuple,然后触发checkFinish操作;在没有commitStream的情况下,tracked.receivedCommit默认为true,因而这里只要检测收到的tuples与应收的tuples数一致,就执行_bolt.finishBatch操作完成一个batch,然后再往它的下游TridentBoltExecutor发射它应收的[id,count]的tuple

小结

  • 对于trident来说,真正的spout是MasterBatchCoordinator,它的nextTuple会触发batch的发送,它将batch指令发送给TridentSpoutCoordinator,而TridentSpoutCoordinator将触发TridentBoltExecutor(TridentSpoutExecutor)的execute方法,进而触发ITridentSpout的emitter的emitBatch,从而发送一个batch的数据
  • TridentBoltExecutor(TridentSpoutExecutor)的expectedTaskReports==0,它在调用完TridentSpoutExecutor发射batch的tuples时,就立马调用finishBatch操作,通过COORD_STREAM往下游的TridentBoltExecutor发射[id,count]数据,告知下游TridentBoltExecutor说它一共发射了多少tuples
  • spout的下游bolt为TridentBoltExecutor(SubtopologyBolt),它的tracked.condition.expectedTaskReports不为0,因而它是在接收到TupleType.COORD的tuple的时候,才进行checkFinish操作(这里先忽略TupleType.COMMIT类型),由于spout是先执行emitBatch操作再最后finishBatch发送[id,count]数据,正常情况下按顺序进入到TridentBoltExecutor(SubtopologyBolt)的receiveQueue队列,然后TridentBoltExecutor(SubtopologyBolt)挨个消费tuple,调用SubtopologyBolt.execute,最后再处理[id,count]数据,触发checkFinish操作,只要检测收到的tuples与应收的tuples数一致,就执行SubtopologyBolt.finishBatch操作完成这个batch,然后再往它的下游TridentBoltExecutor发射它应收的[id,count]的tuple

doc

  • Trident Tutorial
  • 聊聊storm worker的executor与task
  • 聊聊storm的AggregateProcessor的execute及finishBatch方法

聊聊storm TridentBoltExecutor的finishBatch方法相关推荐

  1. 聊聊storm的AggregateProcessor的execute及finishBatch方法

    序 本文主要研究一下storm的AggregateProcessor的execute及finishBatch方法 实例 TridentTopology topology = new TridentTo ...

  2. 聊聊storm的LoggingClusterMetricsConsumer

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下storm的LoggingClusterMetricsConsumer LoggingClusterMetrics ...

  3. 计算机中丢失storm.d,win10 64位电脑缺少storm.dll怎么办_win10玩暗黑破坏神2缺少storm.dll文件修复方法...

    storm.dll文件是专门为暴雪游戏像我们大家熟知的暗黑破坏神2所打造的游戏dll文件,只有拥有它,游戏才能够正常运行,当用户遇到win10电脑运行游戏缺少storm.dll时应该怎么办呢?这里就来 ...

  4. 23种设计模式,今天来聊聊模板模式,工厂方法模式,单例模式。

    23种设计模式,今天来聊聊模板模式,工厂方法模式,单例模式. 1. 随处可见的模板模式 2. 暗含讲究的工厂方法模式 3. 单例模式隐含的坑你能看到第几层? 视频讲解如下,点击观看: [干货篇]23种 ...

  5. 聊聊storm的stream的分流与合并

    序 本文主要研究一下storm的stream的分流与合并 实例 @Testpublic void testStreamSplitJoin() throws InvalidTopologyExcepti ...

  6. 聊聊storm的direct grouping

    序 本文主要研究一下storm的direct grouping direct grouping direct grouping是一种特殊的grouping,它是由上游的producer直接指定下游哪个 ...

  7. 聊聊storm的LoggingMetricsConsumer

    序 本文主要研究一下storm的LoggingMetricsConsumer LoggingMetricsConsumer storm-2.0.0/storm-client/src/jvm/org/a ...

  8. 聊聊storm supervisor的启动

    序 本文主要研究一下storm supervisor的启动 Supervisor.launch storm-core-1.2.2-sources.jar!/org/apache/storm/daemo ...

  9. 聊聊storm的IWaitStrategy

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下storm的IWaitStrategy IWaitStrategy storm-2.0.0/storm-clien ...

最新文章

  1. 重磅!Elasticsearch 8 正式发布!
  2. 获取个人借阅信息---图书馆client
  3. 探索Apache Camel Core –文件组件
  4. 链串实现功能(初始化、判断空串、串的赋值、串的连接、获取子串)
  5. linux下用grep命令根据文件内容进行关键字搜索[linux ubuntu grep] -转
  6. datagrid combobox 选择后显示valueField 而不是 textValue解决方法
  7. spring awre的理解
  8. 安装Vue-DevTools插件及免费分享安装包
  9. 占内存小的android浏览器,一点浏览器占内存吗 世界最小浏览器使用评测
  10. 金蝶K3销售订单自动携带客户收货信息并可修改,支持下推关联携带
  11. petalinux 安装
  12. Springboot JPA注解@Enumerated
  13. 阿里网盘“该文件类型暂时不支持分享”解决方案
  14. 华为服务器智能机柜,华为网络柜 室内一体柜 华为服务器机柜
  15. isis和ospf比较
  16. 全民战疫,我们在行动!
  17. Matlab进行射频TRL校准,如何设计和验证TRL校准件,TRL校准应如何操作?-射频/微波-与非网...
  18. 100种乡村旅游盈利方式
  19. 基于IIC的温度传感器实验
  20. 解决:Error resolving template [/xxx], template might not exist or might not be accessible by any of th

热门文章

  1. Android Studio 单刷《第一行代码》系列 07 —— Broadcast 广播
  2. 论面向组合子程序设计方法 之 重构2
  3. 10 个常见的 Linux 终端仿真器
  4. 性能测试应该怎么做?
  5. Spring MVC快速入门
  6. Java访问指示符 访问修饰符
  7. Python NumPy-快速处理数据
  8. 第五章 Python数据结构
  9. 傅里叶变换在图像处理中的作用
  10. Machine Learning week 9 quiz: Anomaly Detection