S4(Simple Scalable Stream System) 流数据处理系统是Yahoo!公司提出的,在2011年的时候成为Apache软件基金下的一个孵化项目,可惜的是在2014年的时候该孵化项目“退休”了,具体原因未知!!从这里可以了解它当前的状态信息:link. 阅读了所发表的论文S4:Distributed Stream Computing Platform之后,发现该系统模型有其独特之处,总结起来就是

  • 灵活
  • 系统架构模型和编程接口简单
  • 可拓展性强

虽然该项目已经退休,但是因为其”系统架构模型和编程接口简单“,其论文通俗易懂,并且源码开放,因此决定从系统模型架构和源码两个方面较为深入地学习它。

文章目录

  • 系统模型架构
  • programer的编程模型
  • 系统实现逻辑
  • Event数据处理
  • 多线程的使用
  • 关键组建的关系
  • 问题

接下来先讲系统模型架构,然后针对几个问题通过源码来寻找答案,以此来了解S4的实现过程。

系统模型架构

S4系统把流处理的逻辑过程表示成一个或者多个有向无环图,如上图所示,在图中有两个关键组建,Stream和ProcessingElement(PE)。在上图中Stream表示成带箭头的线,负责数据的传递;PE表示一个节点,代表一个处理单元,也是用户业务逻辑所在地。Stream中传递的主要为以Event为基类的对象,也就是事件对象。什么是事件对象呢?从编程实现的角度来讲,一个事件对象有其时间和其他属性,这里的时间通常表示事件所发现的时间。每个Event对象能够通过两个字段信息一起确定其所属的Stream,分别是1. Event的类型,在Java中表示为继承了Event类的子类的Class类型,2. Event的Key值,这个可以为null。这里的Key和Value跟Hadoop的Key和Value是一个设计思想。举一个例子,看上面的图片,WordEvent统一表示单词这样的事件类型,因为单词事件类型中有很多中不同的单词,因此使用Key来表示每一个单词,使用Value表示每种单词的个数。

PE作为用户的逻辑处理单元,需要用户编程实现或者重写下面几个主要方法。系统会为用户隐藏其他细节,比如PE如何接收数据,如何发送处理完的中间数据等等。

 @Overrideprotected void onCreate() {}@Overrideprotected void onRemove() {}public void onEvent(Event event) {}public void onTrigger(Event event) {}

这样的系统模型,看起来非常简单且灵活,但是需要考虑并解决一些问题。
一个Stream只会传输一种类型的Event对象或者同种类型的Key相同的Event对象。然后可能向一个或者多个PE发送Event数据。一台机器中可能有一个或者多个PE,整个系统使用Zookeeper来协调整合。

下面这张图片是系统的整体架构图,Processing Node表示一台机器,在机器接收到一个事件之后,需要根据事件的类型和key值来判断该事件应该由哪一个PE来处理。在PE处理之后,由Emiter发送。了解了系统模型架构,下面通过源码来学习具体的实现。

源码下载地址:download, 建议使用Intelij IDEA导入。

主要包含三个小模块,分别为s4-base, s4-comm和s4-core,第二个模块用于数据传输,我们的源码基本指涉及到s4-base和s4-core.

programer的编程模型

在看源码之前,非常有必要了解编程人员如何编程构建和使用PE。用户需要继承ProcessingElement类,然后覆写和实现onEvent和onTrigger等方法。其中onEvent是事件到来时的处理逻辑,onTrigger是要触发输出处理结果的逻辑。触发条件有两个,一个是事件数量触发,另一个是时间触发。在下面的代码onEvent方法中,每来一个事件就计数加一,在onTrigger中,每来一个事件就将结果放到输出流中。

public class CounterPE extends ProcessingElement {private Stream<CountEvent>[] countStream;public CounterPE(App app) {super(app);}public Stream<CountEvent>[] getCountStream() {return countStream;}public void setCountStream(Stream<CountEvent>... countStream) {this.countStream = countStream;}private long counter = 0;public void onEvent(Event event) {counter += 1;}public void onTrigger(Event event) {CountEvent countEvent = new CountEvent(getId(), counter);emit(countEvent, countStream);}@Overrideprotected void onCreate() {}@Overrideprotected void onRemove() {}
}

系统实现逻辑

  • Event类是所有事件类的基类,是该系统的基础类。一个事件主要包含事件和其他保存在Map中的属性。
public class Event {final private long time;private String streamName;private int appId;private Map<String, Data<?>> map;

在Stream中,Event对象进一步被封装成EventMessage对象,顾名思义,EventMessage对象类似于互联网的一个消息报文,相比于Event对象,其主要是增加了数据在不同PE、不同机器上面传输所需要的定位属性。从下面源码的定义看出,一个EventMessage能够根据appName和streamName来定位数据的接收者。

public class EventMessage {private String appName;private String streamName;private byte[] serializedEvent;

一个Stream对象只容纳一种类型的事件数据,使用的数据结构为ArrayBlockingQueue阻塞队列,

protected final BlockingQueue<EventMessage> queue = new ArrayBlockingQueue<EventMessage>(CAPACITY);

其特点是,当队列为空的时候,如果要获取,阻塞take的线程,直到有新的数据;如果队列满了,阻塞put的线程,直到队列不满。

  • Stream类主要字段为如下所示,具体请看注释。一个stream只存储一种类型的事件数据,保存在一个阻塞队列中,专门使用一个线程不断地读取队列中地数据并调用PE的方法处理。下面我们来具体看Stream类如何将事件数据保存到队列中,以及如何读取和处理队列中的数据。
public class Stream<T extends Event> implements Runnable, Streamable {protected Key<T> key; //该对象定义了获取Event对象的key的方法private ProcessingElement[] targetPEs;//一个Stream中的数据可能流向多个PEprotected final BlockingQueue<EventMessage> queue = new ArrayBlockingQueue<EventMessage>(CAPACITY);private Thread thread;//新开一个线程不断地从阻塞队列中读取事件private Class<T> eventType = null;//保存该Stream的事件类型

Stream作为连接不同PE或者说不同节点的组建,所以它具有两个重要的功能,1是接收数据,2是发送由PE处理后的数据。

  • 接收数据
    Stream中的接收event的方法很简单,只是简单地把event对象放入阻塞队列中。因为会有很多个Stream对象,因此使用了Reciever类来统一管理所有的Stream类的receiveEvent方法。也就是下面的receiveEvent方法在Reciever类中被调用。
 public void receiveEvent(EventMessage event) {try {queue.put(event);} catch (InterruptedException e) {logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());Thread.currentThread().interrupt();}}

Reciever类实现了Runnable接口,下面的方法是Reciever类的run方法。在该方法中,首先通过appName和streamName来找到对应的Stream对象,然后调用其receiveEvent方法。

public void run() {// TODO: this thread never seems to get interrupted. SHould we catch an interrupted exception from listener// here?while (!Thread.interrupted()) {byte[] message = listener.recv();EventMessage event = (EventMessage) serDeser.deserialize(message);int appId = Integer.valueOf(event.getAppName());String streamId = event.getStreamName();/** Match appId and streamId in event to the target stream and pass the event to the target stream. TODO:* make this more efficient for the case in which we send the same event to multiple PEs.*/try {streams.get(appId).get(streamId).receiveEvent(event);} catch (NullPointerException e) {logger.error("Could not find target stream for event with appId={} and streamId={}", appId, streamId);}}}

小结:统一使用Reciever来接收event对象,通过appName和streamName找到对应的Stream对象,然后调用Stream对象的recieveEvent方法,将event对象放入queue中。

  • 发送数据

下面的函数用于发送一个Event对象给拓扑图中下一层节点,这里的下一层节点有可能包含当前Stream类的阻塞队列queue。Stream中的put方法表示了该发送的逻辑。在该方法中,首先设置事件的streamName和App Name。然后检测该Stream是否跟远程的节点关联,如果没有,直接把event放入当前的queue中,如果有,使用sender.sendToRemotePartitions(event)来将event发送到远程队列中,同时put到本地的当前的queue中。下面将会讲sender.sendToRemotePartitions(event)方法。

 public void put(Event event) {try {event.setStreamId(getName());event.setAppId(app.getId());/** Events may be sent to local or remote partitions or both. The following code implements the logic.*/if (key != null) {/** We send to a specific PE instance using the key but we don't know if the target partition is remote* or local. We need to ask the sender.*/if (!sender.checkAndSendIfNotLocal(key.get((T) event), event)) {/** Sender checked and decided that the target is local so we simply put the event in the queue and* we save the trip over the network.*/queue.put(new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), app.getSerDeser().serialize(event)));}} else {/** We are broadcasting this event to all PE instance. In a cluster, we need to send the event to every* node. The sender method takes care of the remote partitions an we take care of putting the event into* the queue.*/sender.sendToRemotePartitions(event);queue.put(new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), app.getSerDeser().serialize(event)));}} catch (InterruptedException e) {logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());Thread.currentThread().interrupt();}
}

该方法用于讲一个event对象发送到拓扑图中的下一层的节点。如何定位下一层的节点呢?这里只用到了AppID (跟App Name相同)和stream Name,这两个信息便可以定位了。每一个Event对象都包含这两个属性。

emitter.send方法将event对象发送到下一个远程节点,目前有TCP和UDP两种实现。

 public void sendToRemotePartitions(Event event) {for (int i = 0; i < emitter.getPartitionCount(); i++) {/* Don't use the comm layer when we send to the same partition. */if (localPartitionId != i)emitter.send(i, new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), serDeser  .serialize(event)));}}

讲到这里,有一个问题,Stream类的发送evnet对象的方法put方法最后由谁来调用呢?答案是先由PE来调用,最终由用户在PE继承类中调用。下面是ProcessingElement类emit方法的代码。该方法没有使用到PE类的成员变量,这个需要注意,一个PE类没有将Stream作为成员变量。emit方法只是简单地遍历所传入的Stream对象数组,然后调用它们的put方法,该方法在上面讲过。而最终这个emit方法将由用户调用,在其所继承的ProcessingElement类中,具体请看上一节代码例子。

 //Helper method to be used by PE implementation classes. Sends an event to all the target streams.protected <T extends Event> void emit(T event, Stream<T>[] streamArray) {for (int i = 0; i < streamArray.length; i++) {streamArray[i].put(event);}}

现在,我们知道了一个流如何获取event数据以及如何发送event数据,现在数据已经保存在一个阻塞队列中了,那么它是怎么从队列中获取数据然后处理的呢?

Event数据处理

Stream类中有一个成员变量Thread thread, 用于不断地从queue中获取和处理数据。因此,处理逻辑放在run方法中,下面是run方法源码,其主要是采用阻塞的方式不断从queue中获取数据,然后调用相应的PE的pe.handleInputEvent(event)方法。那么pe.handleInputEvent(event);方法又做了什么呢?

 @Overridepublic void run() {while (true) {try {/* Get oldest event in queue. */EventMessage eventMessage = queue.take();@SuppressWarnings("unchecked")T event = (T) app.getSerDeser().deserialize(eventMessage.getSerializedEvent());/* Send event to each target PE. */for (int i = 0; i < targetPEs.length; i++) {if (key == null) {/* Broadcast to all PE instances! *//* STEP 1: find all PE instances. */Collection<ProcessingElement> pes = targetPEs[i].getInstances();/* STEP 2: iterate and pass event to PE instance. */for (ProcessingElement pe : pes) {pe.handleInputEvent(event);}} else {/* We have a key, send to target PE. *//* STEP 1: find the PE instance for key. */ProcessingElement pe = targetPEs[i].getInstanceForKey(key.get(event));/* STEP 2: pass event to PE instance. */pe.handleInputEvent(event);}}} catch (InterruptedException e) {logger.info("Closing stream {}.", name);receiver.removeStream(this);Thread.currentThread().interrupt();return;}}}

PE的handleInputEvent方法主要做了两件事,1是在第一次调用该方法的时候,判断是否有该PE的持久化历史纪录,如果有的话就恢复;2是调用OverloadDispatcher类的dispatchEvent和dispatchTrigger两个方法。这两个方法将调用由用户实现的onEvent和onTrigger方法。onEvent和onTrigger方法在上一节讲过,它们是用户的逻辑处理单元。那么,dispatchEvent和dispatchTrigger两个方法是什么样子的呢?

protected void handleInputEvent(Event event) {Object object;if (isThreadSafe) {object = new Object(); // a dummy object TODO improve this.} else {object = this;}synchronized (object) {if (!recoveryAttempted) {recover();recoveryAttempted = true;}/* Dispatch onEvent() method. */overloadDispatcher.dispatchEvent(this, event);/* Dispatch onTrigger() method. */if (haveTriggers && isTrigger(event)) {overloadDispatcher.dispatchTrigger(this, event);}eventCount++;dirty = true;if (isCheckpointable()) {checkpoint();}}}

进入OverloadDispatcher 的定义,发现它是一个接口,而且没有找到实现类。有这么一行注释Implementations of this interface are typically generated at runtime.。原来该接口的实现类是动态实现的。使用的第三方工具包为org.objectweb.asm。我们现在已经明确,下面的两个方法将会调用由用户定义的onEvent和onTrigger方法,调用的方法体是动态实现的。那么问题有:为什么使用动态的方式生成它们的实现方法呢?

public interface OverloadDispatcher {public void dispatchEvent(ProcessingElement pe, Event event);public void dispatchTrigger(ProcessingElement pe, Event event);
}

在回答这个问题之前,我们需要考虑另外一个问题,编程人员定义了一个继承了ProcessingElement的类MyPE,实现了onEvent和onTrigger方法,S4系统编程人员在发布系统之前是不知道用户的PE继承类的,那么系统如何知道所继承的类是MyPE,并且如何找到并加载MyPE类并调用onEvent和onTrigger方法呢?为了解决这个问题,S4使用了动态生成代码的方法。动态生成代码的类为org.apache.s4.core.gen.OverloadDispatcherGenerator。这样动态生成实现类的方式到底优雅不优雅呢?性能如何?有待考量。

多线程的使用

系统中使用到的多线程主要有:

  • 读取Stream类中的阻塞队列中的数据的时候,使用了专门的线程
  • Reciever类实现了Runnable接口,用于接收数据,并把数据保存到对应的Stream中。
  • 发送数据的时候,使用到了NIO之Socket NIO,使用到了Netty框架。

关键组建的关系

一个Stream对象包含多个目标PE,对于每一个到来的event对象,根据事件类型或者事件类型+事件的key来确定相应的PE。然后调用PE的处理逻辑方法。

Reciever对象保存了单个node下的所有的Stream对象引用,它统一接收event数据,然后根据事件类型或者事件类型+事件的key来确定相应的Stream对象。

问题

  • 问题: 一个PE能否处理多中Event?
    回答:一个PE只能处理一种类型的Event,但是可以处理同种类型的不同key的event。

  • 问题:如果按照上面第一张图片所示的例子,从上往下第二次拓扑中,一个单词生成对应的一个PE,那么最终会生成大量的PE,所占用的内存也是很可观的。有一些PE在生成之后很可能只用到了几次,一直占用这内存。

  • Event类是该系统最基础的类,其使用到了Map作为成员变量,而map在put一个数据之后就会有一个默认初始化大小,如果map中只有一个数据,那么意味着存在多个空的内存占用。这导致一个Event的内存使用率很低。这样的设计是不是有失优雅呢?

public class Event {final private long time;private String streamName;private int appId;private Map<String, Data<?>> map;

谢谢!

Stream Processing: S4系统模型分析和关键源码读解相关推荐

  1. [Linux 基础] -- V4L2 实例分析 —— vivi.c 源码详解(深度好文)

    本文主要是通过对虚拟视频驱动程序 vivi.c 源码分析,加深对 V4L2 框架的理解.转载于:https://blog.csdn.net/yanbixing123/article/details/5 ...

  2. 深入理解HashMap(三): 关键源码逐行分析之构造函数

    前言 系列文章目录 上一篇我们说明了HashMap的hash算法, 说到HashMap在构造时会自动将table设为2的整数次幂. 本篇我们就来聊聊HashMap的构造函数. 本文的源码基于 jdk8 ...

  3. 【多输入模型 Multiple-Dimension 数学原理分析以及源码详解 深度学习 Pytorch笔记 B站刘二大人 (6/10)】

    多输入模型 Multiple-Dimension 数学原理分析以及源码源码详解 深度学习 Pytorch笔记 B站刘二大人(6/10) 数学推导 在之前实现的模型普遍都是单输入单输出模型,显然,在现实 ...

  4. euler欧拉系统尝试用yum、源码安装pacemaker失败

    euler欧拉系统尝试用yum.源码安装pacemaker失败 用yum安装pacemaker ====================================== [root@euler ~ ...

  5. 基于SSM + MySQL的服饰服装购物平台系统的设计与实现源码+论文+包安装配置+讲解视频

    下载:https://download.csdn.net/download/nike8x/83598568 项目介绍: 基于SSM + MySQL的服饰服装购物平台系统的设计与实现源码+论文+包安装配 ...

  6. 手把手教你SSM整合开发办公系统(OA)——报销单(含源码)

    文章目录 前言 项目展示 技能要求 一.开始前的准备 1.OA系统是什么? 2.人员权利与报销流程 3.数据库设计 4.创建项目及作用说明 5.包与全局变量配置 6.编写过滤器 7.静态资源的复制与请 ...

  7. Android 事件分发机制分析及源码详解

    Android 事件分发机制分析及源码详解 文章目录 Android 事件分发机制分析及源码详解 事件的定义 事件分发序列模型 分发序列 分发模型 事件分发对象及相关方法 源码分析 事件分发总结 一般 ...

  8. Java生鲜电商平台-电商会员体系系统的架构设计与源码解析

    Java生鲜电商平台-电商会员体系系统的架构设计与源码解析 说明:Java生鲜电商平台中会员体系作为电商平台的基础设施,重要性不容忽视.我去年整理过生鲜电商中的会员系统,但是比较粗,现在做一个最好的整 ...

  9. 带着问题分析Framework层源码(一):按键音声音太小,我们该如何增大?

    作为一名Android开发人员,对源码的阅读是必不可少的.但是Android源码那么庞大,从何开始阅读,如何开始阅读,很多人都会感觉无从下手,今天我来带着问题,去带大家分析一下Android源码,并解 ...

最新文章

  1. 用矩阵内积的办法构造迭代次数受控的神经网络1:0.6:0.1=4:3:2
  2. c语言删除s字符串中所有子串t,从串s中删除所有和串t相同的子串的算法
  3. C#LeetCode刷题-字典树
  4. Interesting Finds: 2007.12.19
  5. ffmpeg(5):SDL相关学习
  6. 分享一个在线301跳转代码生成工具
  7. 1和new Number(1)的区别
  8. java多文件生成zip_如何在java中创建多部分压缩zip文件
  9. Compose Modifier修饰符详细解说
  10. POI实现Excel模板下载
  11. 技术人如何自我成长?
  12. ip网络广播系统服务器,IP网络广播服务器
  13. SQL Server视频
  14. 敢用这张图片做“壁纸”手机秒变砖!
  15. Power BI-销售业绩分析展示
  16. Win软件 - 这可能是首款能在电脑上控制 iPhone 的工具
  17. 客户端负载均衡与服务端负载均衡
  18. Anaconda开始菜单快捷方式打不开或找不到——解决办法
  19. flask写接口文件打包成exe
  20. fnd_lookup_values与fnd_lookup_types

热门文章

  1. php 查oracle 表不存在报错处理,Oracle ORA-08104报错处理方法及注意事项
  2. hexo 搜索_Hexo--本地搜索localsearch之url路径问题
  3. python字典作业_python第一天作业:字典
  4. 利用结构体输入输出员工信息
  5. sentinel限流_微服务架构进阶:Sentinel实现服务限流、熔断与降级
  6. 阿里技术人的第一节课
  7. LayerMask小结
  8. 11.保存登录状态与注销功能
  9. ASP.NET MVC使用Bootstrap系列(1)——开始使用Bootstrap
  10. AngularJs中的directives(指令part1)