本文主要是解析SpoutOutputCollector源码,顺便分析该类中所涉及的设计模式–代理模式。
首先介绍一下Spout输出收集器接口–ISpoutOutputCollector,该接口主要声明了以下3个抽象方法用来约束ISpoutOutputCollector的实现类。接口定义与方法说明如下:

/*** ISpoutOutputCollector:Spout输出收集器接口*/
public interface ISpoutOutputCollector {/*** 改方法用来向外发送数据,它的返回值是该消息所有发送目标的taskID集合;* 参数:* streamId:消息Tuple将要被输出到的流* tuple:要输出的消息,是一个Object列表* messageId:输出消息的标记信息,如果messageId被设置为null,则Storm不会追踪该消息,* 否则它会被用来追踪所发出的消息处理情况*/List<Integer> emit(String streamId, List<Object> tuple, Object messageId);/*** 该方法与上面emit方法类似,区别在于:* 1.数据(消息)只由所指定taskId的Task接收;(这就意味着如果没有下游节点接收该消息,则该消息就没有被真正发送)* 2.该方法要求参数streamId所对应的流必须为直接流,接收端的Task必须以直接分组的方式来接收消息,* 否则会抛出异常.*/void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);/*** 用来处理异常*/void reportError(Throwable error);
}

Storm提供了接口ISpoutOutputCollector的默认类SpoutOutputCollector,这个类实际上是一个代理类,该类持有一个ISpoutOutputCollector类型的对象,所有的操作实际上都过该对象来实现的。SpoutOutputCollector定义如下:

public class SpoutOutputCollector implements ISpoutOutputCollector {/*** 持有SpoutOutputCollector要代理的对象*/ISpoutOutputCollector _delegate;public SpoutOutputCollector(ISpoutOutputCollector delegate) {_delegate = delegate;}/*** 实现了接口中的emit方法,并且提供了它的几个重载方法* eg.如果不指定streamId,默认使用default,如果不指定messageId,则默认使用空(null)*/public List<Integer> emit(String streamId, List<Object> tuple, Object messageId){return _delegate.emit(streamId, tuple, messageId);}public List<Integer> emit(List<Object> tuple, Object messageId) {return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);}public List<Integer> emit(List<Object> tuple) {return emit(tuple, null);}public List<Integer> emit(String streamId, List<Object> tuple) {return emit(streamId, tuple, null);}/*** 实现了接口中的emitDirect方法,同时也提供了几个重载方法,与上面emit方法一致.*/public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {_delegate.emitDirect(taskId, streamId, tuple, messageId);}public void emitDirect(int taskId, List<Object> tuple, Object messageId) {emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple, messageId);}public void emitDirect(int taskId, String streamId, List<Object> tuple) {emitDirect(taskId, streamId, tuple, null);}public void emitDirect(int taskId, List<Object> tuple) {emitDirect(taskId, tuple, null);}/*** 处理异常方法的实现*/@Overridepublic void reportError(Throwable error) {_delegate.reportError(error);}
}

PS:
代理模式主要分为两种:静态代理和动态代理

静态代理:
在程序运行前代理类与委托类的关系在运行前就确定,即在程序运行前就已经存在代理类的字节码文件了.
代理模式角色:
Subject(抽象主题角色):可以是抽象类也可以是接口,声明了被委托角色和委托类共有的处理方法;
RealSubject(具体主题角色):又称被委托角色、被代理角色,是业务逻辑的具体执行者;
ProxySubject(代理主题角色):又称委托类、代理类,负责对真实角色的应用,
把所有抽象主题类定义的方法限制委托给具体主题角色来实现,并且在具体主题角色处理完毕前后做预处理和善后处理.

静态代理模式案例如下:

//抽象主题
public interface Subject {public void process(String taskName);
}

被代理角色:

public class RealSubject implements Subject {@Overridepublic void process(String taskName) {System.out.println("正在执行任务:"+taskName);try {Thread.sleep(500);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}
}

代理类:

public class ProxySubject implements Subject {//代理类持有一个委托类的对象引用private Subject delegate;public ProxySubject(Subject delegate){this.delegate=delegate;}@Overridepublic void process(String taskName) {//预处理this.before();//将请求分派给委托类处理delegate.process(taskName);//善后处理this.after();}private void before(){System.out.println("预处理!");}private void after(){System.out.println("善后处理!");}
}

测试代码:

public class Test {public static void main(String[] args) {RealSubject subject = new RealSubject();ProxySubject p = new ProxySubject(subject);p.process("排水");}
}

测试结果:

预处理!
正在执行任务:排水
善后处理!

静态代理类的优缺点:
优点:
业务类只需关注业务逻辑本身,这样就保证了业务类的重用性.
缺点:
代理对象的一个接口只服务于一种类型的对象.当要代理的方法很多,就要为每一种方法进行代理。因此静态代理在程序规模变大时就无法很好地胜任工作了.

动态代理:
代理类和委托类的关系在程序运行时才确定的.动态代理类的源码是在程序运行期间由JVM根据反射等机制动态生成,所以不存在代理类的字节码文件.

动态代理模式案例如下:

public interface Service {//目标方法 public void process();
}
public class UserServiceImpl implements Service {@Overridepublic void process() {System.out.println("用户service处理");  }
}

动态代理实现实例

public class MyInvocatioHandler implements InvocationHandler {private Object target;public MyInvocatioHandler(Object target) {this.target = target;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args)throws Throwable {//System.out.println("-----before-----");this.before();Object result = method.invoke(target, args);// System.out.println("-----end-----");this.after();return result;}// 生成代理对象public Object getProxy() {ClassLoader loader = Thread.currentThread().getContextClassLoader();Class<?>[] interfaces = target.getClass().getInterfaces();return Proxy.newProxyInstance(loader, interfaces, this);}private void before(){System.out.println("预处理!");}private void after(){System.out.println("善后处理!");}
}

案列测试

public class ProxyTest {public static void main(String[] args) {Service service = new UserServiceImpl();MyInvocatioHandler handler = new MyInvocatioHandler(service);Service serviceProxy = (Service)handler.getProxy();serviceProxy.process();}
}

测试结果:

预处理!
用户service处理
善后处理!

动态代理的优缺点:
优点:
接口中的所有方法都被转移到调用处理器一个集中的方法中在方法“运行时”动态的加入,决定你是什么类型,较灵活
缺点:
1. 与静态代理相比,效率降低了
2. JDK动态代理只能对实现了接口的类进行代理

欢迎关注下面二维码进行技术交流:

JStorm与Storm源码分析(五)--SpoutOutputCollector与代理模式相关推荐

  1. JStorm与Storm源码分析(一)--nimbus-data

    Nimbus里定义了一些共享数据结构,比如nimbus-data. nimbus-data结构里定义了很多公用的数据,请看下面代码: (defn nimbus-data [conf inimbus]( ...

  2. JStorm与Storm源码分析(八)--计时器工具-mk-timer

    Storm使用计时器线程来处理一些周期性调度事件. 与计时器相关的操作主要有:创建计时器线程.查看线程是否活跃.向线程中加入新的待调度事件.取消计时器线程 mk-timer方法用于创建一个计时器线程. ...

  3. JStorm与Storm源码分析(七)--BasicBoltExecutor与装饰模式

    在Storm中IBasicBolt的主要作用是为用户提供一种更为简单的Bolt编写方式,更为简单体现在Storm框架本身帮你处理了所发出消息的Ack.Fail和Anchor操作,而这部分操作是由执行器 ...

  4. JStorm与Storm源码分析(六)--收集器 IOutputCollector 、OutputCollector

    在Storm中,多个地方使用了IOutputCollector收集器接口,收集器OutputCollector的接口就是IOutputCollector.所以有必要对接口IOutputCollecto ...

  5. JStorm与Storm源码分析(三)--Scheduler,调度器

    Scheduler作为Storm的调度器,负责为Topology分配可用资源. Storm提供了IScheduler接口,用户可以通过实现该接口来自定义Scheduler. 其定义如下: public ...

  6. JStorm与Storm源码分析(二)--任务分配,assignment

    mk-assignments主要功能就是产生Executor与节点+端口的对应关系,将Executor分配到某个节点的某个端口上,以及进行相应的调度处理.代码注释如下: ;;参数nimbus为nimb ...

  7. JStorm与Storm源码分析(四)--均衡调度器,EvenScheduler

    EvenScheduler同DefaultScheduler一样,同样实现了IScheduler接口,  由下面代码可以看出: (ns backtype.storm.scheduler.EvenSch ...

  8. JStorm与Storm源码分析(二)--任务分配,assignmen

    mk-assignments主要功能就是产生Executor与节点+端口的对应关系,将Executor分配到某个节点的某个端口上,以及进行相应的调度处理.代码注释如下: 1 ;;参数nimbus为ni ...

  9. Storm源码分析之四: Trident源码分析

    Storm源码分析之四: Trident源码分析 @(STORM)[storm] Storm源码分析之四 Trident源码分析 一概述 0小结 1简介 2关键类 1Spout的创建 2spout的消 ...

最新文章

  1. [iOS]调和 pop 手势导致 AVPlayer 播放卡顿
  2. C++关键字static
  3. Petrozavodsk Summer-2016. Ural FU Dandelion Contest
  4. CentoS 下安装gitlab
  5. C# 线程的定义和使用
  6. H.264 Profile、Level、Encoder三张简图
  7. linux进程通信发送方式,Linux服务器编程——Linux系统编程之进程通信
  8. webpack4.0各个击破(7)—— plugin篇
  9. 求解平稳分布matlab,计算二阶马尔可夫信源的熵
  10. centos系统的Visual Studio code卡死,无法选择或者输入
  11. Rman 非catalog恢复
  12. R语言ggplot2包之画折线图
  13. IDEA中单元测试使用Scanner控制台无法输入
  14. 深度学习后向算法的直观理解_强化学习的直观介绍
  15. Tcpping 替代ping测试网路延迟
  16. java jdk 文档下载_JDK8 API文档(下载)
  17. 2022年国内最牛的Java面试八股文合集(MCA版),不接受反驳
  18. FPGA之IP核的使用、搞懂功能仿真和时序仿真
  19. 马丁福勒《UML精粹》读书笔记_第一章
  20. Linux命令(5):pwd

热门文章

  1. 解决ping 127.0.0.1 一般故障 问题
  2. No input file specified.问题的解决
  3. on a null object reference 问题的解决办法
  4. Joda-Time中两个日期之间的天数
  5. 我怎样才能找到带有Mathematica的Waldo?
  6. Spring Boot 与DBunit 配合使用
  7. 工程项目如何实现采购效益最大化?
  8. vs2015_ef 连接mysql
  9. linux的crash之hardlock排查记录
  10. css中.和#的区别 不写时代表什么