Stream Processing: S4系统模型分析和关键源码读解
S4(Simple Scalable Stream System) 流数据处理系统是Yahoo!公司提出的,在2011年的时候成为Apache软件基金下的一个孵化项目,可惜的是在2014年的时候该孵化项目“退休”了,具体原因未知!!从这里可以了解它当前的状态信息:link. 阅读了所发表的论文S4:Distributed Stream Computing Platform之后,发现该系统模型有其独特之处,总结起来就是
- 灵活
- 系统架构模型和编程接口简单
- 可拓展性强
虽然该项目已经退休,但是因为其”系统架构模型和编程接口简单“,其论文通俗易懂,并且源码开放,因此决定从系统模型架构和源码两个方面较为深入地学习它。
文章目录
- 系统模型架构
- programer的编程模型
- 系统实现逻辑
- Event数据处理
- 多线程的使用
- 关键组建的关系
- 问题
接下来先讲系统模型架构,然后针对几个问题通过源码来寻找答案,以此来了解S4的实现过程。
系统模型架构
S4系统把流处理的逻辑过程表示成一个或者多个有向无环图,如上图所示,在图中有两个关键组建,Stream和ProcessingElement(PE)。在上图中Stream表示成带箭头的线,负责数据的传递;PE表示一个节点,代表一个处理单元,也是用户业务逻辑所在地。Stream中传递的主要为以Event为基类的对象,也就是事件对象。什么是事件对象呢?从编程实现的角度来讲,一个事件对象有其时间和其他属性,这里的时间通常表示事件所发现的时间。每个Event对象能够通过两个字段信息一起确定其所属的Stream,分别是1. Event的类型,在Java中表示为继承了Event类的子类的Class类型,2. Event的Key值,这个可以为null。这里的Key和Value跟Hadoop的Key和Value是一个设计思想。举一个例子,看上面的图片,WordEvent统一表示单词这样的事件类型,因为单词事件类型中有很多中不同的单词,因此使用Key来表示每一个单词,使用Value表示每种单词的个数。
PE作为用户的逻辑处理单元,需要用户编程实现或者重写下面几个主要方法。系统会为用户隐藏其他细节,比如PE如何接收数据,如何发送处理完的中间数据等等。
@Overrideprotected void onCreate() {}@Overrideprotected void onRemove() {}public void onEvent(Event event) {}public void onTrigger(Event event) {}
这样的系统模型,看起来非常简单且灵活,但是需要考虑并解决一些问题。
一个Stream只会传输一种类型的Event对象或者同种类型的Key相同的Event对象。然后可能向一个或者多个PE发送Event数据。一台机器中可能有一个或者多个PE,整个系统使用Zookeeper来协调整合。
下面这张图片是系统的整体架构图,Processing Node表示一台机器,在机器接收到一个事件之后,需要根据事件的类型和key值来判断该事件应该由哪一个PE来处理。在PE处理之后,由Emiter发送。了解了系统模型架构,下面通过源码来学习具体的实现。
源码下载地址:download, 建议使用Intelij IDEA导入。
主要包含三个小模块,分别为s4-base, s4-comm和s4-core,第二个模块用于数据传输,我们的源码基本指涉及到s4-base和s4-core.
programer的编程模型
在看源码之前,非常有必要了解编程人员如何编程构建和使用PE。用户需要继承ProcessingElement类,然后覆写和实现onEvent和onTrigger等方法。其中onEvent是事件到来时的处理逻辑,onTrigger是要触发输出处理结果的逻辑。触发条件有两个,一个是事件数量触发,另一个是时间触发。在下面的代码onEvent方法中,每来一个事件就计数加一,在onTrigger中,每来一个事件就将结果放到输出流中。
public class CounterPE extends ProcessingElement {private Stream<CountEvent>[] countStream;public CounterPE(App app) {super(app);}public Stream<CountEvent>[] getCountStream() {return countStream;}public void setCountStream(Stream<CountEvent>... countStream) {this.countStream = countStream;}private long counter = 0;public void onEvent(Event event) {counter += 1;}public void onTrigger(Event event) {CountEvent countEvent = new CountEvent(getId(), counter);emit(countEvent, countStream);}@Overrideprotected void onCreate() {}@Overrideprotected void onRemove() {}
}
系统实现逻辑
- Event类是所有事件类的基类,是该系统的基础类。一个事件主要包含事件和其他保存在Map中的属性。
public class Event {final private long time;private String streamName;private int appId;private Map<String, Data<?>> map;
在Stream中,Event对象进一步被封装成EventMessage对象,顾名思义,EventMessage对象类似于互联网的一个消息报文,相比于Event对象,其主要是增加了数据在不同PE、不同机器上面传输所需要的定位属性。从下面源码的定义看出,一个EventMessage能够根据appName和streamName来定位数据的接收者。
public class EventMessage {private String appName;private String streamName;private byte[] serializedEvent;
一个Stream对象只容纳一种类型的事件数据,使用的数据结构为ArrayBlockingQueue阻塞队列,
protected final BlockingQueue<EventMessage> queue = new ArrayBlockingQueue<EventMessage>(CAPACITY);
其特点是,当队列为空的时候,如果要获取,阻塞take的线程,直到有新的数据;如果队列满了,阻塞put的线程,直到队列不满。
- Stream类主要字段为如下所示,具体请看注释。一个stream只存储一种类型的事件数据,保存在一个阻塞队列中,专门使用一个线程不断地读取队列中地数据并调用PE的方法处理。下面我们来具体看Stream类如何将事件数据保存到队列中,以及如何读取和处理队列中的数据。
public class Stream<T extends Event> implements Runnable, Streamable {protected Key<T> key; //该对象定义了获取Event对象的key的方法private ProcessingElement[] targetPEs;//一个Stream中的数据可能流向多个PEprotected final BlockingQueue<EventMessage> queue = new ArrayBlockingQueue<EventMessage>(CAPACITY);private Thread thread;//新开一个线程不断地从阻塞队列中读取事件private Class<T> eventType = null;//保存该Stream的事件类型
Stream作为连接不同PE或者说不同节点的组建,所以它具有两个重要的功能,1是接收数据,2是发送由PE处理后的数据。
- 接收数据
Stream中的接收event的方法很简单,只是简单地把event对象放入阻塞队列中。因为会有很多个Stream对象,因此使用了Reciever类来统一管理所有的Stream类的receiveEvent方法。也就是下面的receiveEvent方法在Reciever类中被调用。
public void receiveEvent(EventMessage event) {try {queue.put(event);} catch (InterruptedException e) {logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());Thread.currentThread().interrupt();}}
Reciever类实现了Runnable接口,下面的方法是Reciever类的run方法。在该方法中,首先通过appName和streamName来找到对应的Stream对象,然后调用其receiveEvent方法。
public void run() {// TODO: this thread never seems to get interrupted. SHould we catch an interrupted exception from listener// here?while (!Thread.interrupted()) {byte[] message = listener.recv();EventMessage event = (EventMessage) serDeser.deserialize(message);int appId = Integer.valueOf(event.getAppName());String streamId = event.getStreamName();/** Match appId and streamId in event to the target stream and pass the event to the target stream. TODO:* make this more efficient for the case in which we send the same event to multiple PEs.*/try {streams.get(appId).get(streamId).receiveEvent(event);} catch (NullPointerException e) {logger.error("Could not find target stream for event with appId={} and streamId={}", appId, streamId);}}}
小结:统一使用Reciever来接收event对象,通过appName和streamName找到对应的Stream对象,然后调用Stream对象的recieveEvent方法,将event对象放入queue中。
- 发送数据
下面的函数用于发送一个Event对象给拓扑图中下一层节点,这里的下一层节点有可能包含当前Stream类的阻塞队列queue。Stream中的put方法表示了该发送的逻辑。在该方法中,首先设置事件的streamName和App Name。然后检测该Stream是否跟远程的节点关联,如果没有,直接把event放入当前的queue中,如果有,使用sender.sendToRemotePartitions(event)
来将event发送到远程队列中,同时put到本地的当前的queue中。下面将会讲sender.sendToRemotePartitions(event)
方法。
public void put(Event event) {try {event.setStreamId(getName());event.setAppId(app.getId());/** Events may be sent to local or remote partitions or both. The following code implements the logic.*/if (key != null) {/** We send to a specific PE instance using the key but we don't know if the target partition is remote* or local. We need to ask the sender.*/if (!sender.checkAndSendIfNotLocal(key.get((T) event), event)) {/** Sender checked and decided that the target is local so we simply put the event in the queue and* we save the trip over the network.*/queue.put(new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), app.getSerDeser().serialize(event)));}} else {/** We are broadcasting this event to all PE instance. In a cluster, we need to send the event to every* node. The sender method takes care of the remote partitions an we take care of putting the event into* the queue.*/sender.sendToRemotePartitions(event);queue.put(new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), app.getSerDeser().serialize(event)));}} catch (InterruptedException e) {logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());Thread.currentThread().interrupt();}
}
该方法用于讲一个event对象发送到拓扑图中的下一层的节点。如何定位下一层的节点呢?这里只用到了AppID (跟App Name相同)和stream Name,这两个信息便可以定位了。每一个Event对象都包含这两个属性。
emitter.send
方法将event对象发送到下一个远程节点,目前有TCP和UDP两种实现。
public void sendToRemotePartitions(Event event) {for (int i = 0; i < emitter.getPartitionCount(); i++) {/* Don't use the comm layer when we send to the same partition. */if (localPartitionId != i)emitter.send(i, new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), serDeser .serialize(event)));}}
讲到这里,有一个问题,Stream类的发送evnet对象的方法put方法最后由谁来调用呢?答案是先由PE来调用,最终由用户在PE继承类中调用。下面是ProcessingElement类emit方法的代码。该方法没有使用到PE类的成员变量,这个需要注意,一个PE类没有将Stream作为成员变量。emit方法只是简单地遍历所传入的Stream对象数组,然后调用它们的put方法,该方法在上面讲过。而最终这个emit方法将由用户调用,在其所继承的ProcessingElement类中,具体请看上一节代码例子。
//Helper method to be used by PE implementation classes. Sends an event to all the target streams.protected <T extends Event> void emit(T event, Stream<T>[] streamArray) {for (int i = 0; i < streamArray.length; i++) {streamArray[i].put(event);}}
现在,我们知道了一个流如何获取event数据以及如何发送event数据,现在数据已经保存在一个阻塞队列中了,那么它是怎么从队列中获取数据然后处理的呢?
Event数据处理
Stream类中有一个成员变量Thread thread, 用于不断地从queue中获取和处理数据。因此,处理逻辑放在run方法中,下面是run方法源码,其主要是采用阻塞的方式不断从queue中获取数据,然后调用相应的PE的pe.handleInputEvent(event)
方法。那么pe.handleInputEvent(event);
方法又做了什么呢?
@Overridepublic void run() {while (true) {try {/* Get oldest event in queue. */EventMessage eventMessage = queue.take();@SuppressWarnings("unchecked")T event = (T) app.getSerDeser().deserialize(eventMessage.getSerializedEvent());/* Send event to each target PE. */for (int i = 0; i < targetPEs.length; i++) {if (key == null) {/* Broadcast to all PE instances! *//* STEP 1: find all PE instances. */Collection<ProcessingElement> pes = targetPEs[i].getInstances();/* STEP 2: iterate and pass event to PE instance. */for (ProcessingElement pe : pes) {pe.handleInputEvent(event);}} else {/* We have a key, send to target PE. *//* STEP 1: find the PE instance for key. */ProcessingElement pe = targetPEs[i].getInstanceForKey(key.get(event));/* STEP 2: pass event to PE instance. */pe.handleInputEvent(event);}}} catch (InterruptedException e) {logger.info("Closing stream {}.", name);receiver.removeStream(this);Thread.currentThread().interrupt();return;}}}
PE的handleInputEvent
方法主要做了两件事,1是在第一次调用该方法的时候,判断是否有该PE的持久化历史纪录,如果有的话就恢复;2是调用OverloadDispatcher类的dispatchEvent和dispatchTrigger两个方法。这两个方法将调用由用户实现的onEvent和onTrigger方法。onEvent和onTrigger方法在上一节讲过,它们是用户的逻辑处理单元。那么,dispatchEvent和dispatchTrigger两个方法是什么样子的呢?
protected void handleInputEvent(Event event) {Object object;if (isThreadSafe) {object = new Object(); // a dummy object TODO improve this.} else {object = this;}synchronized (object) {if (!recoveryAttempted) {recover();recoveryAttempted = true;}/* Dispatch onEvent() method. */overloadDispatcher.dispatchEvent(this, event);/* Dispatch onTrigger() method. */if (haveTriggers && isTrigger(event)) {overloadDispatcher.dispatchTrigger(this, event);}eventCount++;dirty = true;if (isCheckpointable()) {checkpoint();}}}
进入OverloadDispatcher 的定义,发现它是一个接口,而且没有找到实现类。有这么一行注释Implementations of this interface are typically generated at runtime.
。原来该接口的实现类是动态实现的。使用的第三方工具包为org.objectweb.asm
。我们现在已经明确,下面的两个方法将会调用由用户定义的onEvent和onTrigger方法,调用的方法体是动态实现的。那么问题有:为什么使用动态的方式生成它们的实现方法呢?
public interface OverloadDispatcher {public void dispatchEvent(ProcessingElement pe, Event event);public void dispatchTrigger(ProcessingElement pe, Event event);
}
在回答这个问题之前,我们需要考虑另外一个问题,编程人员定义了一个继承了ProcessingElement的类MyPE,实现了onEvent和onTrigger方法,S4系统编程人员在发布系统之前是不知道用户的PE继承类的,那么系统如何知道所继承的类是MyPE,并且如何找到并加载MyPE类并调用onEvent和onTrigger方法呢?为了解决这个问题,S4使用了动态生成代码的方法。动态生成代码的类为org.apache.s4.core.gen.OverloadDispatcherGenerator
。这样动态生成实现类的方式到底优雅不优雅呢?性能如何?有待考量。
多线程的使用
系统中使用到的多线程主要有:
- 读取Stream类中的阻塞队列中的数据的时候,使用了专门的线程
- Reciever类实现了Runnable接口,用于接收数据,并把数据保存到对应的Stream中。
- 发送数据的时候,使用到了NIO之Socket NIO,使用到了Netty框架。
关键组建的关系
一个Stream对象包含多个目标PE,对于每一个到来的event对象,根据事件类型或者事件类型+事件的key来确定相应的PE。然后调用PE的处理逻辑方法。
Reciever对象保存了单个node下的所有的Stream对象引用,它统一接收event数据,然后根据事件类型或者事件类型+事件的key来确定相应的Stream对象。
问题
问题: 一个PE能否处理多中Event?
回答:一个PE只能处理一种类型的Event,但是可以处理同种类型的不同key的event。问题:如果按照上面第一张图片所示的例子,从上往下第二次拓扑中,一个单词生成对应的一个PE,那么最终会生成大量的PE,所占用的内存也是很可观的。有一些PE在生成之后很可能只用到了几次,一直占用这内存。
Event类是该系统最基础的类,其使用到了Map作为成员变量,而map在put一个数据之后就会有一个默认初始化大小,如果map中只有一个数据,那么意味着存在多个空的内存占用。这导致一个Event的内存使用率很低。这样的设计是不是有失优雅呢?
public class Event {final private long time;private String streamName;private int appId;private Map<String, Data<?>> map;
谢谢!
Stream Processing: S4系统模型分析和关键源码读解相关推荐
- [Linux 基础] -- V4L2 实例分析 —— vivi.c 源码详解(深度好文)
本文主要是通过对虚拟视频驱动程序 vivi.c 源码分析,加深对 V4L2 框架的理解.转载于:https://blog.csdn.net/yanbixing123/article/details/5 ...
- 深入理解HashMap(三): 关键源码逐行分析之构造函数
前言 系列文章目录 上一篇我们说明了HashMap的hash算法, 说到HashMap在构造时会自动将table设为2的整数次幂. 本篇我们就来聊聊HashMap的构造函数. 本文的源码基于 jdk8 ...
- 【多输入模型 Multiple-Dimension 数学原理分析以及源码详解 深度学习 Pytorch笔记 B站刘二大人 (6/10)】
多输入模型 Multiple-Dimension 数学原理分析以及源码源码详解 深度学习 Pytorch笔记 B站刘二大人(6/10) 数学推导 在之前实现的模型普遍都是单输入单输出模型,显然,在现实 ...
- euler欧拉系统尝试用yum、源码安装pacemaker失败
euler欧拉系统尝试用yum.源码安装pacemaker失败 用yum安装pacemaker ====================================== [root@euler ~ ...
- 基于SSM + MySQL的服饰服装购物平台系统的设计与实现源码+论文+包安装配置+讲解视频
下载:https://download.csdn.net/download/nike8x/83598568 项目介绍: 基于SSM + MySQL的服饰服装购物平台系统的设计与实现源码+论文+包安装配 ...
- 手把手教你SSM整合开发办公系统(OA)——报销单(含源码)
文章目录 前言 项目展示 技能要求 一.开始前的准备 1.OA系统是什么? 2.人员权利与报销流程 3.数据库设计 4.创建项目及作用说明 5.包与全局变量配置 6.编写过滤器 7.静态资源的复制与请 ...
- Android 事件分发机制分析及源码详解
Android 事件分发机制分析及源码详解 文章目录 Android 事件分发机制分析及源码详解 事件的定义 事件分发序列模型 分发序列 分发模型 事件分发对象及相关方法 源码分析 事件分发总结 一般 ...
- Java生鲜电商平台-电商会员体系系统的架构设计与源码解析
Java生鲜电商平台-电商会员体系系统的架构设计与源码解析 说明:Java生鲜电商平台中会员体系作为电商平台的基础设施,重要性不容忽视.我去年整理过生鲜电商中的会员系统,但是比较粗,现在做一个最好的整 ...
- 带着问题分析Framework层源码(一):按键音声音太小,我们该如何增大?
作为一名Android开发人员,对源码的阅读是必不可少的.但是Android源码那么庞大,从何开始阅读,如何开始阅读,很多人都会感觉无从下手,今天我来带着问题,去带大家分析一下Android源码,并解 ...
最新文章
- 用矩阵内积的办法构造迭代次数受控的神经网络1:0.6:0.1=4:3:2
- c语言删除s字符串中所有子串t,从串s中删除所有和串t相同的子串的算法
- C#LeetCode刷题-字典树
- Interesting Finds: 2007.12.19
- ffmpeg(5):SDL相关学习
- 分享一个在线301跳转代码生成工具
- 1和new Number(1)的区别
- java多文件生成zip_如何在java中创建多部分压缩zip文件
- Compose Modifier修饰符详细解说
- POI实现Excel模板下载
- 技术人如何自我成长?
- ip网络广播系统服务器,IP网络广播服务器
- SQL Server视频
- 敢用这张图片做“壁纸”手机秒变砖!
- Power BI-销售业绩分析展示
- Win软件 - 这可能是首款能在电脑上控制 iPhone 的工具
- 客户端负载均衡与服务端负载均衡
- Anaconda开始菜单快捷方式打不开或找不到——解决办法
- flask写接口文件打包成exe
- fnd_lookup_values与fnd_lookup_types
热门文章
- php 查oracle 表不存在报错处理,Oracle ORA-08104报错处理方法及注意事项
- hexo 搜索_Hexo--本地搜索localsearch之url路径问题
- python字典作业_python第一天作业:字典
- 利用结构体输入输出员工信息
- sentinel限流_微服务架构进阶:Sentinel实现服务限流、熔断与降级
- 阿里技术人的第一节课
- LayerMask小结
- 11.保存登录状态与注销功能
- ASP.NET MVC使用Bootstrap系列(1)——开始使用Bootstrap
- AngularJs中的directives(指令part1)