在Storm中,多个地方使用了IOutputCollector收集器接口,收集器OutputCollector的接口就是IOutputCollector。所以有必要对接口IOutputCollector进行解读和分析.IOutputCollector的源码如下:

/*** @ClassName:  IOutputCollector接口扩展了IErrorReporter,* 并定义了一些基本方法*/
public interface IOutputCollector extends IErrorReporter {/***  用来向外发送数据,它的返回值是该消息所有发送目标的TaskId集合*  输入参数:*  streamId:消息将被输出到的流;*  anchors:输出消息的标记,通常代表该条消息是由哪些消息产生的,主要用于消息的Ack系统;*  tuple:要输出的消息,为一个Object列表*/List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);/*** 与上述方法emit类似,区别在于,emitDirect方法发送的消息只有指定taskId的Task才可以接收.* 该方法要求streamId对应的流必须为直接流而且接收端的Task必须通过直接分组的方式来接收消息,否则会抛出异常.* 这也就意味着:如果没有下游节点接收该消息,则此类消息其实并没有真正被发送*/void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);/*** ack和fail方法用来记录消息是否被成功处理*/void ack(Tuple input);void fail(Tuple input);
}

IOutputCollector 默认实现类OutputCollector,它实际上是一个代理类,持有IOutputCollector 类型的对象。emit、emitDirect等方法具体的执行都是通过IOutputCollector类型的对象调用相应的方法来实现的.
其定义如下:

/*** OutputCollector实现了接口IOutputCollector,* 其主要作用是用于从IRichBolt向外发送数据;* OutputCollector是一个代理类,它持有一个IOutputCollector类型的实例(该对象在Clojure中定义)*/
public class OutputCollector implements IOutputCollector {/*** 持有一个IOutputCollector类型的实例*/private IOutputCollector _delegate;public OutputCollector(IOutputCollector delegate) {_delegate = delegate;}/*** 定义了emit的各类重载方法* 说明如下:* 若未传入streamId,则使用default作为流序号;* 若未传入anchor,则使用null作为标记;* 若传入的anchor不是列表对象,则将其转化为列表对象.*/public List<Integer> emit(String streamId, Tuple anchor, List<Object> tuple) {return emit(streamId, Arrays.asList(anchor), tuple);}public List<Integer> emit(String streamId, List<Object> tuple) {return emit(streamId, (List) null, tuple);}public List<Integer> emit(Collection<Tuple> anchors, List<Object> tuple) {return emit(Utils.DEFAULT_STREAM_ID, anchors, tuple);}public List<Integer> emit(Tuple anchor, List<Object> tuple) {return emit(Utils.DEFAULT_STREAM_ID, anchor, tuple);}public List<Integer> emit(List<Object> tuple) {return emit(Utils.DEFAULT_STREAM_ID, tuple);}/*** 定义了emitDirect的各类重载方法* 说明如下:* 若未传入streamId,则使用default作为流序号;* 若未传入anchor,则使用null作为标记;* 若传入的anchor不是列表对象,则将其转化为列表对象.*/public void emitDirect(int taskId, String streamId, Tuple anchor, List<Object> tuple) {emitDirect(taskId, streamId, Arrays.asList(anchor), tuple);}public void emitDirect(int taskId, String streamId, List<Object> tuple) {emitDirect(taskId, streamId, (List) null, tuple);}public void emitDirect(int taskId, Collection<Tuple> anchors, List<Object> tuple) {emitDirect(taskId, Utils.DEFAULT_STREAM_ID, anchors, tuple);}public void emitDirect(int taskId, Tuple anchor, List<Object> tuple) {emitDirect(taskId, Utils.DEFAULT_STREAM_ID, anchor, tuple);}public void emitDirect(int taskId, List<Object> tuple) {emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);}/*** 实现了IOutputCollector接口,在代理对象中实现了这些方法*/@Overridepublic List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {return _delegate.emit(streamId, anchors, tuple);}@Overridepublic void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {_delegate.emitDirect(taskId, streamId, anchors, tuple);}@Overridepublic void ack(Tuple input) {_delegate.ack(input);}@Overridepublic void fail(Tuple input) {_delegate.fail(input);}@Overridepublic void reportError(Throwable error) {_delegate.reportError(error);}
}

注:学习李明等老师Storm源码分析笔记整理。
欢迎关注下面二维码进行技术交流:

JStorm与Storm源码分析(六)--收集器 IOutputCollector 、OutputCollector相关推荐

  1. JStorm与Storm源码分析(一)--nimbus-data

    Nimbus里定义了一些共享数据结构,比如nimbus-data. nimbus-data结构里定义了很多公用的数据,请看下面代码: (defn nimbus-data [conf inimbus]( ...

  2. JStorm与Storm源码分析(八)--计时器工具-mk-timer

    Storm使用计时器线程来处理一些周期性调度事件. 与计时器相关的操作主要有:创建计时器线程.查看线程是否活跃.向线程中加入新的待调度事件.取消计时器线程 mk-timer方法用于创建一个计时器线程. ...

  3. JStorm与Storm源码分析(七)--BasicBoltExecutor与装饰模式

    在Storm中IBasicBolt的主要作用是为用户提供一种更为简单的Bolt编写方式,更为简单体现在Storm框架本身帮你处理了所发出消息的Ack.Fail和Anchor操作,而这部分操作是由执行器 ...

  4. JStorm与Storm源码分析(三)--Scheduler,调度器

    Scheduler作为Storm的调度器,负责为Topology分配可用资源. Storm提供了IScheduler接口,用户可以通过实现该接口来自定义Scheduler. 其定义如下: public ...

  5. JStorm与Storm源码分析(二)--任务分配,assignment

    mk-assignments主要功能就是产生Executor与节点+端口的对应关系,将Executor分配到某个节点的某个端口上,以及进行相应的调度处理.代码注释如下: ;;参数nimbus为nimb ...

  6. JStorm与Storm源码分析(四)--均衡调度器,EvenScheduler

    EvenScheduler同DefaultScheduler一样,同样实现了IScheduler接口,  由下面代码可以看出: (ns backtype.storm.scheduler.EvenSch ...

  7. JStorm与Storm源码分析(二)--任务分配,assignmen

    mk-assignments主要功能就是产生Executor与节点+端口的对应关系,将Executor分配到某个节点的某个端口上,以及进行相应的调度处理.代码注释如下: 1 ;;参数nimbus为ni ...

  8. JStorm与Storm源码分析(五)--SpoutOutputCollector与代理模式

    本文主要是解析SpoutOutputCollector源码,顺便分析该类中所涉及的设计模式–代理模式. 首先介绍一下Spout输出收集器接口–ISpoutOutputCollector,该接口主要声明 ...

  9. Storm源码分析之四: Trident源码分析

    Storm源码分析之四: Trident源码分析 @(STORM)[storm] Storm源码分析之四 Trident源码分析 一概述 0小结 1简介 2关键类 1Spout的创建 2spout的消 ...

最新文章

  1. 常见证书格式及相互转换
  2. 1. redis简介
  3. C#怎么遍历一个对象里面的全部属性?
  4. rabbitmq-死信队列
  5. 我们如何体验 Line 的生活方式
  6. 【万里征程——Windows App开发】应用栏
  7. mysql 执行存储过程_MySQL存储过程的3种循环
  8. python两个一维数组合并_python:16.合并两个排序的链表
  9. 建立了索引怎么使用_对MySQL索引的认识
  10. pthread 线程冲突一个简单例子
  11. 计算机系统基础(1)
  12. 图灵奖得主David Patterson出席2022全球软件研发技术大会
  13. 凑个热闹 谈谈网红沈大师
  14. 银行用户画像建设步骤
  15. 计算机网络的ask表示啥,计算机网络理论知识:物理层
  16. 2021年起重机司机(限桥式起重机)考试APP及起重机司机(限桥式起重机)免费试题
  17. 训练好的深度学习模型是怎么部署的?
  18. JavaWeb 入门级项目实战 -- 文章发布系统 (第六节)
  19. 如何利用大数据实现精准扶贫?
  20. python程序化 k线指定时间更新_Python项目Keylogger(定时器)

热门文章

  1. vscode:解决操作git总让输入用户名及密码问题
  2. 如何使用多个参数调用Angular.js过滤器?
  3. 接口或抽象类:使用哪一个?
  4. 如何为HTML容器分配多个类?
  5. 在Java中有类似.NET的NotImplementedException吗?
  6. 携程Apollo(阿波罗)配置中心本地开发模式不接入配置中心进行本地开发
  7. Elasticsearch的Shield插件
  8. SelectBox插件
  9. 用 Docker 构建、运行、发布来一个 Spring Boot 应用
  10. SUSE Linux ntp 升级报错(MAKE [1]:*** 【all】Error 2)