1 系统介绍

WhereHows是linkedin开源的数据发现平台,它从各种数据源收集元数据,标准化处理,提供统一的元数据服务。WhereHows的名字与warehouse谐音。同时,还包含了两层含义:

  • Where:数据在哪,即元数据收集。
  • How:数据的前世今生,生产者、消费者是谁,即血缘。

在统一元数据的基础上,平台提供了一系列功能:

  • 数据发现:搜索数据。
  • 数据血缘:跟踪数据集和任务的上下游。
  • 工作流发现:通过项目、流、子流、任务的树结构,查看任务血缘。
  • 提供页面访问及后台API。

2 架构介绍

2.1 总览

官方架构图:

上图中,除了缺少Kafka部分外,还是比较准确的。可以看到,核心是akka分布式框架。通过akka actor执行各个存储系统和调度系统元数据的收集,并转换为统一模型存入WhereHows的mysql中。而不同存储系统和调度系统的元数据的收集、转换、存储的逻辑存在差异,所以具体的ETL逻辑是通过jython脚本编写的,这样就实现了面对不同存储和引擎的扩展性。新增存储系统或者计算调度系统,只需要对应的ETL job jython脚本。

2.2 数据模型

官方数据模型:

数据模型分为四部分:

  • 数据集:数据集schema、备注、样本等。
  • 操作数据:流组、流、任务、任务依赖、执行、ID映射表等。
  • 血缘数据:任务的输入和输出、依赖、分区、位点等。
  • ETL和Web服务配置。

dataset_前缀的表与数据集相关;flow_和job_前缀的表与流和任务相关;cfg_前缀开头的表与物理配置相关;wh_前缀开头的表与内部定时执行的元数据收集任务相关,负责记录定时执行的任务信息,以及任务执行状态等。

最顶层的抽象是dataset数据集的概念,以URN作为数据集唯一标识,schema抽象了不同的数据结构。除此之外还包括app应用系统信息(cfg_application)和db数据库系统信息(cfg_database)。

dataset作为顶层抽象,通过dataset_id关联到底层实际物理元数据信息(dict_dataset_instance、cfg_object_name_map、cfg_database等),包括hdfs、hive、teradata、dalids等。其中有DB模型的(hive、teradata、dalids),也有非DB模型的(hdfs)。cfg_object_name_map.sub_type区分具体的物理类型。

任务执行信息和数据集之间通过job_execution_data_lineage关联,即血缘表。数据集和操作数据是两个端点,而血缘数据就是连接两者的桥梁。用户可以从一个数据集出发,找到它的生产者任务或消费者任务;也可以从一个任务触发,找到这个任务读\写的数据集。

2.3 ETL

WhereHows使用java和jython语言。以java构建框架,定义每个ETL阶段的输入和输出;使用jython资源文件实现ETL处理过程。这使得结构更加清晰,对于需要自定义的内容一目了然。同时,提高了引入新的源系统的灵活性和扩展性。框架定义的元数据收集步骤按照ETL三阶段执行:extract、transform、load。

2.3.1 数据集ETL

数据集元数据包括:schema、path、partition、sample data、watermark、owner等等。WhereHows收集了HDFS、Teradata等数据集元数据。

HDFS:

使用具有读权限的账号,扫描白名单下的目录,并收集数据集信息。程序将schemaFetch.jar文件copy到hadoop gateway上,执行扫描,再将结果copy回本地,经过转换存储到mysql。

Teradata:

直接在jython脚本中查询库中的元数据。

2.3.2 操作数据ETL

操作数据包括:流组定义、流定义、流执行信息、任务定义、任务执行信息、owner信息、调度信息等。WhereHows的操作数据ETL定时从调度系统的数据库中爬取信息到WhereHows的数据库中。

  • 痛点1:不同的调度系统具有不同的数据模型,需要将他们转换成统一的模型。例如:Azkaban的执行是在流级别,流执行具有唯一ID,但是任务执行没有唯一ID。反观Oozie,每个任务执行都具有唯一ID。为了解决这种情况,WhereHows结合流执行ID与任务在流中的顺序为Azkaban任务生成任务ID。同时,将所有的唯一ID映射为整型来适配相同的模型。每当添加调度系统,都要添加不同的映射规则将原生数据模型转换成WhereHows数据模型。
  • 痛点2:很多源系统没有跟踪版本,或者版本很不直观。例如,Oozie和Azkaban这些调度系统没有流版本。在这种情况下,WhereHows收集流元数据之后,与老的DAG结构进行比较,在内部推导维护版本信息。WhereHows将这种推导的DAG版本用作所有操作数据的通用版本。

2.3.3 血缘ETL

血缘信息包括:任务和数据集之间的依赖关系,读还是写,涉及多少条记录等。

How:

基本策略是从日志和配置文件中收集血缘信息。比如:一个Pig任务有很多Hadoop任务,在Hadoop任务的配置文件中可以看到“mapreduce.input.fileinputformat.inputdir”一类的属性,可以代表Hadoop任务的血缘。WhereHows收集所有子Hadoop任务的输入和输出,将它们聚合到父级别任务,并存储下来。

When:

血缘ETL任务需要一个入口点,即任务ID信息列表,根据这个列表提取血缘信息。WhereHows作为一个元数据仓库,延迟容忍度高,5到10分钟的延迟是可以接受的,所以采用“微批拉取”的解决方案。通过一个拉取任务,检查最近30分钟(窗口可配)有哪些任务结束,然后以此作为入口点,拉取它们的日志和配置文件分析血缘。

3 代码解读

3.1 调度

WhereHows使用了akka分布式并发框架,其中三个核心的actor包括:SchedulerActor、EtlJobActor、TreeBuilderActor。这三个actor以静态变量的形式注册在ActorRegistry中:

public class ActorRegistry {public static Scheduler scheduler = actorSystem.scheduler();public static ActorRef schedulerActor = actorSystem.actorOf(Props.create(SchedulerActor.class), "SchedulerActor");public static ActorRef etlJobActor =actorSystem.actorOf(new SmallestMailboxPool(ETL_POOL_SIZE).props(Props.create(EtlJobActor.class)), "EtlJobActor");public static ActorRef treeBuilderActor =actorSystem.actorOf(Props.create(TreeBuilderActor.class), "TreeBuilderActor");
}

通过SchedulerUtil启动SchedulerActor,则启动了整个调度引擎。定时向SchedulerActor发送“checking”消息触发调度检查:

public class SchedulerUtil {public static synchronized void start(Long mins) {if (schedulerRef != null) {schedulerRef.cancel();}schedulerRef = ActorRegistry.scheduler.schedule(Duration.create(0, TimeUnit.MILLISECONDS), Duration.create(mins, TimeUnit.MINUTES),ActorRegistry.schedulerActor, "checking", ActorRegistry.dispatcher, null);}}

3.1.1 SchedulerActor

SchedulerActor在收到“checking”消息后,从应用配置的目录位置,读取所有需要执行的job文件:

并从DB中读取已经记录的定时调度job(包括job的下一次调度时间)。

遍历所有的job文件,将每个job的下一次调度时间更新到DB中。如果已经记录的job中存在当前job,则判断是否到了调度时间,到了就执行,没到则跳过。而触发job执行就是将对应的job信息作为消息发送给EtlJobActor。

  public void onReceive(Object message) throws Exception {if (message.equals("checking")) {runDueJobs();}}private void runDueJobs() throws Exception {Map<String, Properties> enabledJobs = JobsUtil.getScheduledJobs(ETL_JOBS_DIR);Map<String, Long> scheduledJobs = getScheduledJobs();long now = System.currentTimeMillis() / 1000;for (Map.Entry<String, Properties> entry : enabledJobs.entrySet()) {String etlJobName = entry.getKey();Properties properties = entry.getValue();EtlJobMessage etlMsg = new EtlJobMessage(etlJobName, properties);// Schedule next run if a cron expr is defined.String cronExpr = etlMsg.getCronExpr();if (cronExpr != null) {EtlJobDao.updateNextRun(etlJobName, cronExpr, new Date());}if (scheduledJobs.getOrDefault(etlJobName, Long.MAX_VALUE) > now) {continue;}Long whExecId = EtlJobDao.insertNewRun(etlJobName);etlMsg.setWhEtlExecId(whExecId);if (Global.getCurrentRunningJob().contains(etlJobName)) {EtlJobDao.endRun(etlMsg.getWhEtlExecId(), JobStatus.ERROR, "Previous is still running, Aborted!");} else {Global.getCurrentRunningJob().add(etlJobName);ActorRegistry.etlJobActor.tell(etlMsg, getSelf());}}}

3.1.2 EtlJobActor

EtlJobActor的任务是启动一个新的进程执行SchedulerActor发送的job。这个新的进程的执行框架在Launcher中定义。

3.1.3 Launcher

Launcher负责job的ETL的触发。ETL的实际处理class在job文件中定义,以HDFS_METADATA_ETL.job文件为例,该job用来提取HDFS数据集的元数据:

# Fetch HDFS dataset metadata

# Common ETL configs

job.class=metadata.etl.dataset.hdfs.HdfsMetadataEtl

job.cron.expr=0 0 5 ? * TUE,THU,SUN *

job.timeout=3000

job.ref.id=11

# Cluster name

hdfs.cluster=your_cluster_name

# Run job on remote machine, true/false

hdfs.remote.mode=false

# Remote Hadoop gateway machine name (could be localhost)

hdfs.remote.machine=

# Private key used to log in to the remote machine

hdfs.private_key_location=

# JAR file location on the remote machine

hdfs.remote.jar=

# User login on remote machine

hdfs.remote.user=

# Metadata JSON file location on remote machine

hdfs.remote.raw_metadata=

# Sample data CSV file location on remote machine

hdfs.remote.sample=

# Place to store field metadata file

hdfs.local.field_metadata=

# Place to store metadata CSV file

hdfs.local.metadata=hdfs_metadata.csv

# Place to store metadata JSON file

hdfs.local.raw_metadata=hdfs_raw_metadata.json

# Place to store sample file

hdfs.local.sample=hdfs_sample.dat

# The whitelist of folder to collect metadata

hdfs.white_list=

# Optional. number of threads you want to scrape the HDFS

hdfs.num_of_thread=1

# Optional. The map of file path regex and dataset source. e.g. [{"/data/tracking.":"Kafka"},{"/data/retail.":"Teradata"}]

hdfs.file_path_regex_source_map=

文件中的“job.class”属性定义了实际执行的class为“metadata.etl.dataset.hdfs.HdfsMetadataEtl”。Launcher在实例化具体的job class之后,调用其run方法触发ETL执行。

3.2 ETL

所有的ETL类都继承EtlJob。EtlJob定义了extract、transform、load三个抽象方法,分别对应着ETL的三个执行步骤,由具体的实现类实现。在EtlJob.run()方法中顺序执行三个方法:

  public abstract void extract() throws Exception;public abstract void transform() throws Exception;public abstract void load() throws Exception;public void run() throws Exception {setup();extract();transform();load();close();}

以实现类HdfsMetadataEtl为例:

(1)extract

将schemaFetch.jar拷贝到HDFS文件系统上,并执行。执行过程中,扫描白名单中的路径,将所有文件的元数据信息记录在HDFS文件系统的指定路径。执行结束后,将结果拷贝回本地。

(2)transform

根据名为“HdfsTransform.py”的jython脚本中的逻辑,对上一步中拷贝到本地的数据进行转换处理:

  public void transform() throws Exception {InputStream inputStream = classLoader.getResourceAsStream("jython/HdfsTransform.py");interpreter.execfile(inputStream);inputStream.close();}

(3)load

在名为“HdfsLoad.py”的jython脚本中,将transform转换结果存入DB:

  public void load() throws Exception {InputStream inputStream = classLoader.getResourceAsStream("jython/HdfsLoad.py");interpreter.execfile(inputStream);inputStream.close();}

3.3 Kafka

WhereHows集成了kafka,可以通过消息触发元数据和血缘的更新。kafka的consumer和producer同样封装为akka的actor。

3.3.1 KafkaClientMaster

KafkaClientMaster是主actor,在ApplicationStart中启动:

  public static void main(String[] args) {final ActorSystem actorSystem = ActorSystem.create("KAFKA");String kafkaJobDir = config.getString("kafka.jobs.dir");actorSystem.actorOf(Props.create(KafkaClientMaster.class, kafkaJobDir), "KafkaMaster");}

其中,配置属性“kafka.jobs.dir”代表kafka消费/生产配置文件的目录。

KafkaClientMaster在preStart阶段从“kafka.jobs.dir”获取所有的kafka任务文件,遍历文件,构造对应的KafkaWorker(actor)并启动。在构造KafkaWorker的过程中,根据文件中的consumer topic和consumer属性构造kafka consumer,作为KafkaWorker的数据来源;根据“kafka.processor”属性实例化处理数据的Processor class;如果processor在处理过程中需要发送消息,则根据producer topic和producer属性构造kafka producer。通过向KafkaWorker发送“WORKER_START”消息触发kafka consumer的消费。

KafkaClientMaster由于在preStart阶段已经启动了所有worker,不需要接收、处理消息。

  public void preStart() throws Exception {_kafkaJobList = JobsUtil.getEnabledJobs(KAFKA_JOB_DIR);log.info("Kafka jobs: {}", _kafkaJobList.keySet());if (_kafkaJobList.size() == 0) {context().stop(getSelf());return;}log.info("Starting KafkaClientMaster...");for (Map.Entry<String, Properties> entry : _kafkaJobList.entrySet()) {// handle one kafka topicfinal String kafkaJobName = entry.getKey();final Properties props = entry.getValue();final int numberOfWorkers = Integer.parseInt(props.getProperty(Constant.KAFKA_WORKER_COUNT, "1"));log.info("Create Kafka client with config: " + props);try {// create workerfor (int i = 0; i < numberOfWorkers; i++) {ActorRef worker = makeKafkaWorker(kafkaJobName, props);_kafkaWorkers.add(worker);worker.tell(KafkaWorker.WORKER_START, getSelf());log.info("Started Kafka worker #{} for job {}", i, kafkaJobName);}} catch (Exception e) {log.error("Error starting Kafka job: " + kafkaJobName, e);}}}

3.3.2 KafkaWorker

KafkaWorker收到“WORKER_START”消息后,开始不断从kafka consumer拉取数据,并传给processor处理:

  public void onReceive(@Nonnull Object message) throws Exception {if (!message.equals(WORKER_START)) {log.warn("Must send WORKER_START message first!");unhandled(message);}while (RUNNING) {ConsumerRecords<String, IndexedRecord> records;try {records = _consumer.poll(POLL_TIMEOUT_MS);} catch (SerializationException e) {log.error("Serialization Error: ", e);moveOffset(extractPartition(e), 1);continue;}try {process(records);} catch (Exception e) {log.error("Unhandled processing exception: ", e);break;}}getContext().stop(getSelf());}private void process(@Nonnull ConsumerRecords<String, IndexedRecord> records) {for (ConsumerRecord<String, IndexedRecord> record : records) {_receivedRecordCount++;_processor.process(record.value());if (_receivedRecordCount % 1000 == 0) {log.info("{}: received {} messages", _topic, _receivedRecordCount);}}_consumer.commitAsync();}

3.3.3 KafkaMessageProcessor

上面提到“kafka.processor”属性指定实际处理消息的class。所有处理kafka消息的class都继承抽象类KafkaMessageProcessor,并实现process抽象方法。WhereHows提供了三个KafkaMessageProcessor实现类:MetadataChangeProcessor、MetadataInventoryProcessor、MetadataLineageProcessor。

MetadataChangeProcessor

从命名可以看出来,MetadataChangeProcessor处理单个数据集元数据的变更,输入消息为MetadataChangeEvent。它解析消息内容,并将变更作用于存储中。MetadataChangeEvent的changeType字段代表操作类型,包括创建、更新、删除。

处理失败的消息再通过kafka发送出去,便于记录、跟踪和重试。

MetadataInventoryProcessor

MetadataInventoryProcessor负责某个集群的数据集的批量处理。目前WhereHows代码中,只实现了删除。MetadataInventoryProcessor处理的消息为MetadataInventoryEvent,其中包含某个集群当前所有的数据集名称(nativeNames)。MetadataInventoryProcessor先根据消息中的集群信息,查询该集群在WhereHows中已存的数据集名称,再将存量数据集名称与消息中的数据集名称列表做减法,得到需要从元数据中删除的数据集名称。最后将这些需要删除的数据集包装为MetadataChangeEvent通过kafka发送出去,由上面的MetadataChangeProcessor进行消费处理。

MetadataLineageProcessor

MetadataLineageProcessor处理的消息是MetadataLineageEvent。MetadataLineageProcessor遍历消息中的血缘列表,对每个血缘的source列表和destination列表进行去重和校验,然后将血缘列表插入到数据库中。不过入库这一步暂未实现。

4 总结

WhereHows作为一个post-hoc型数据集管理系统,可以认为是GOODS思想的一个具体实现,高扩展性是其一个优点。引入新的源系统,可以通过jython脚本动态引入ETL逻辑,但仍要写java代码实现ETL job,灵活性在这里受到了限制。

5 参考资料

[1] WhereHows官方:https://github.com/linkedin/WhereHows

WhereHows解读相关推荐

  1. Python Re 模块超全解读!详细

    内行必看!Python Re 模块超全解读! 2019.08.08 18:59:45字数 953阅读 121 re模块下的函数 compile(pattern):创建模式对象 > import ...

  2. Bert系列(二)——源码解读之模型主体

    本篇文章主要是解读模型主体代码modeling.py.在阅读这篇文章之前希望读者们对bert的相关理论有一定的了解,尤其是transformer的结构原理,网上的资料很多,本文内容对原理部分就不做过多 ...

  3. Bert系列(三)——源码解读之Pre-train

    https://www.jianshu.com/p/22e462f01d8c pre-train是迁移学习的基础,虽然Google已经发布了各种预训练好的模型,而且因为资源消耗巨大,自己再预训练也不现 ...

  4. NLP突破性成果 BERT 模型详细解读 bert参数微调

    https://zhuanlan.zhihu.com/p/46997268 NLP突破性成果 BERT 模型详细解读 章鱼小丸子 不懂算法的产品经理不是好的程序员 ​关注她 82 人赞了该文章 Goo ...

  5. 解读模拟摇杆原理及实验

    解读模拟摇杆原理及实验 Interpreting Analog Sticks 当游戏支持控制器时,玩家可能会一直使用模拟摇杆.在整个体验过程中,钉住输入处理可能会对质量产生重大影响.让来看一些核心概念 ...

  6. 自监督学习(Self-Supervised Learning)多篇论文解读(下)

    自监督学习(Self-Supervised Learning)多篇论文解读(下) 之前的研究思路主要是设计各种各样的pretext任务,比如patch相对位置预测.旋转预测.灰度图片上色.视频帧排序等 ...

  7. 自监督学习(Self-Supervised Learning)多篇论文解读(上)

    自监督学习(Self-Supervised Learning)多篇论文解读(上) 前言 Supervised deep learning由于需要大量标注信息,同时之前大量的研究已经解决了许多问题.所以 ...

  8. 可视化反投射:坍塌尺寸的概率恢复:ICCV9论文解读

    可视化反投射:坍塌尺寸的概率恢复:ICCV9论文解读 Visual Deprojection: Probabilistic Recovery of Collapsed Dimensions 论文链接: ...

  9. 从单一图像中提取文档图像:ICCV2019论文解读

    从单一图像中提取文档图像:ICCV2019论文解读 DewarpNet: Single-Image Document Unwarping With Stacked 3D and 2D Regressi ...

  10. 点云配准的端到端深度神经网络:ICCV2019论文解读

    点云配准的端到端深度神经网络:ICCV2019论文解读 DeepVCP: An End-to-End Deep Neural Network for Point Cloud Registration ...

最新文章

  1. Maven工程构建时报编码警告的解决办法
  2. 工具在软件过程改进中的重要作用
  3. MyEclipse 如何使用断点调试
  4. 2018黄河奖设计大赛获奖_宣布我们的freeCodeCamp 2018杰出贡献者奖获奖者
  5. python tkinter 窗口禁止编辑_python tkinter禁用文本窗口中的换行
  6. 图标插件java_java – Eclipse插件:标记的自定义图标
  7. matlab中unique的作用,matlab中的unique函数详解
  8. 3t硬盘 xp_如何在Windows XP SP3 32位系统下识别3T容量GPT格式硬盘
  9. 详细解析Photoshop10个必学的抠图技巧
  10. k8s的命令行管理工具
  11. UOJ #60 [UR #5] 怎样提高智商
  12. termux python turtle_如何在termux上安装Python的turtle库?
  13. 第二章 Dubbo框架
  14. c++11新特性std::is_trivial
  15. 2015年全部企业校园招聘情况+薪资水平!
  16. 借助小程序·云开发制作校园导览小程序丨实战
  17. Python系列英文原版电子书
  18. 【linux】lsb_release -a命令
  19. add_months()函数介绍
  20. Golang 从入门到放弃

热门文章

  1. 信息处理技术员的作用
  2. 服务器时间修改方法,修改云服务器时间设置方法
  3. 基于slicer开发(简化功能与插件开发)
  4. python程序设计实训报告-Python编程实践(1)
  5. 啊哈算法——第一章第二节:冒泡排序
  6. 记一次PSP游戏文件(iso)提取BGM(cpk文件处理,无后缀音频文件格式转换,pmf文件转换)
  7. 计算地球经纬度两点之间的弧长
  8. vue视频保存不下来_vue视频如何导出? 视频导出方法攻略解答!
  9. 生信-使用NCBI进行目的基因的引物设计
  10. 如何准备计算机二级-Java