kite 使用 go

从我担任软件工程师的第一天起,我总是听到很多方面相同要求:

我们希望所有内容都可配置,我们希望在运行时更改所有内容,我们希望有一个可视化工具来应用所有这些逻辑,以便非开发人员使用和配置我们的应用程序。

我也喜欢这种通用范围,但是众所周知,软件系统的适应性不强,客户的需求也不稳定。

在过去的几年中,我们已经使用传统的框架/技术(JMX,分布式缓存,Spring或JEE等)构建了此类可配置应用程序(并非100%可配置)。

近年来,我们的体系结构中还必须包含一个附加概念,这就是大数据 (或3V或4V或任何更合适的词)的概念。 这个概念淘汰了我们熟悉并在旧的3层应用程序中应用的各种解决方案或变通方法。

有趣的是,我很多次都和十年前一样。 这是软件开发的规则,它永远不会结束,因此个人才能和新冒险也永远不会结束:-)

主要问题仍然是相同的,即如何构建可配置的ETL分布式应用程序

因此,我构建了一个小型的适应性解决方案,该解决方案在许多使用案例中可能会有所帮助。 我在大数据世界中使用了3种常用工具: JavaApache StormKite SDK Morplines 。 Java是主要的编程语言, Apache Storm是分布式流处理引擎,而Kite SDK Morphlines是可配置的ETL引擎。

风筝SDK Morplines

从其描述复制: Morphlines是一个开源框架,可减少构建和更改Hadoop ETL流处理应用程序所需的时间和精力,该应用程序可将数据提取,转换并加载到Apache Solr,HBase,HDFS,Enterprise Data Warehouse或Analytic Online Dashboards中。 morphline是一个丰富的配置文件,可轻松定义一个转换链,该转换链可使用来自任何类型数据源的任何类型的数据,处理数据并将结果加载到Hadoop组件中。 它用简单的配置步骤代替了Java编程,并相应地减少了与开发和维护定制ETL项目相关的成本和集成工作。

除了内置命令外 ,您还可以轻松实现自己的命令 ,并在吗啉配置文件中使用它。

示例Morphline配置读取一个JSON字符串,解析它,然后只记录一个特定的JSON元素:

morphlines : [{id : json_terminal_logimportCommands : ["org.kitesdk.**"]commands : [# read the JSON blob{ readJson: {} }# extract JSON objects into head fields{ extractJsonPaths {flatten: truepaths: {name: /nameage: /age}} }# log data{ logInfo {format : "name: {}, record: {}"args : ["@{name}", "@{}"]}}]
}]

风暴变身螺栓

为了在Storm中使用Morphlines,我实现了一个自定义MorphlinesBolt 。 该螺栓的主要职责是:

  • 通过配置文件初始化Morphlines处理程序
  • 初始化映射说明:
    a)从元组到Morphline输入,以及
    b)从Morphline输出到新的输出元组
  • 使用已初始化的Morplines上下文处理每个传入事件
  • 如果Bolt不是Terminal ,则使用提供的Mapper (类型“ b”),使用Morphline执行的输出发出一个新的Tuple。

简单的可配置ETL拓扑

为了测试自定义MorphlinesBolt ,我编写了2个简单的测试。 在这些测试中,您可以看到MorphlinesBolt是如何初始化的,然后是每次执行的结果。 作为输入,我使用了一个自定义的Spout(RandomJsonTestSpout),它仅每100毫秒发出一次新的JSON字符串(可配置)。

DummyJsonTerminalLogTopology

一个简单的拓扑 ,该拓扑通过配置文件和每个传入的元组的执行Morphline处理程序来配置Morphline上下文。 在此拓扑上, MorphlinesBolt被配置为端子螺栓,这意味着对于每个输入Tuple不会发出新的Tuple。

public class DummyJsonTerminalLogTopology {public static void main(String[] args) throws Exception {Config config = new Config();RandomJsonTestSpout spout = new RandomJsonTestSpout().withComplexJson(false);String2ByteArrayTupleMapper tuppleMapper = new String2ByteArrayTupleMapper();tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);MorphlinesBolt morphBolt = new MorphlinesBolt().withTupleMapper(tuppleMapper).withMorphlineId("json_terminal_log").withMorphlineConfFile("target/test-classes/morphline_confs/json_terminal_log.conf");TopologyBuilder builder = new TopologyBuilder();builder.setSpout("WORD_SPOUT", spout, 1);builder.setBolt("MORPH_BOLT", morphBolt, 1).shuffleGrouping("WORD_SPOUT");if (args.length == 0) {LocalCluster cluster = new LocalCluster();cluster.submitTopology("MyDummyJsonTerminalLogTopology", config, builder.createTopology());Thread.sleep(10000);cluster.killTopology("MyDummyJsonTerminalLogTopology");cluster.shutdown();System.exit(0);} else if (args.length == 1) {StormSubmitter.submitTopology(args[0], config, builder.createTopology());} else {System.out.println("Usage: DummyJsonTerminalLogTopology <topology_name>");}}
}

DummyJson2StringTopology

一个简单的拓扑 ,该拓扑通过配置文件和每个传入的元组的执行Morphline处理程序来配置Morphline上下文。 在此拓扑上, MorphlinesBolt被配置为普通螺栓,这意味着对于每个输入Tuple,它都会发出一个新的Tuple。

public class DummyJson2StringTopology {public static void main(String[] args) throws Exception {Config config = new Config();RandomJsonTestSpout spout = new RandomJsonTestSpout().withComplexJson(false);String2ByteArrayTupleMapper tuppleMapper = new String2ByteArrayTupleMapper();tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);MorphlinesBolt morphBolt = new MorphlinesBolt().withTupleMapper(tuppleMapper).withMorphlineId("json2string").withMorphlineConfFile("target/test-classes/morphline_confs/json2string.conf")//.withOutputProcessors(Arrays.asList(resultRecordHandlers));.withOutputFields(CmnStormCons.TUPLE_FIELD_MSG).withRecordMapper(RecordHandlerFactory.genDefaultRecordHandler(String.class, new JsonNode2StringResultMapper()));LoggingBolt printBolt = new LoggingBolt().withFields(CmnStormCons.TUPLE_FIELD_MSG);TopologyBuilder builder = new TopologyBuilder();builder.setSpout("WORD_SPOUT", spout, 1);builder.setBolt("MORPH_BOLT", morphBolt, 1).shuffleGrouping("WORD_SPOUT");builder.setBolt("PRINT_BOLT", printBolt, 1).shuffleGrouping("MORPH_BOLT");if (args.length == 0) {LocalCluster cluster = new LocalCluster();cluster.submitTopology("MyDummyJson2StringTopology", config, builder.createTopology());Thread.sleep(10000);cluster.killTopology("MyDummyJson2StringTopology");cluster.shutdown();System.exit(0);} else if (args.length == 1) {StormSubmitter.submitTopology(args[0], config, builder.createTopology());} else {System.out.println("Usage: DummyJson2StringTopology <topology_name>");}}
}

最后的想法

MorphlinesBolt可以用作任何可配置ETL“解决方案”的一部分(作为单处理Bolt,作为终端Bolt,作为复杂管道的一部分,等等)。

我在github中的示例项目集合中,源代码作为maven模块( sv-etl-storm-morphlines )提供。

最好的组合是将MorphlinesBolt与Flux一起使用。 这可能会为您提供完全可配置的ETL拓扑!!!
我还没有添加为选项,以便保持更少的依赖关系(我可能添加了范围“ test”)。

该模块不是最终模块,我将尝试对其进行改进,因此许多人会在第一个实现中发现各种错误。

对于任何其他想法或澄清,请写评论:)

这是我2016年的第一篇文章! 希望您身体健康,思想和行动更好。 一切事物的首要美德/价值是人类以及对我们所有人赖以生存的环境(社会,地球,动物,植物等)的尊重。 所有其他都是次要优先事项,不应破坏优先事项所隐含的内容。 始终牢记最重要的美德,并在您采取的任何行动或思想中考虑它们。

翻译自: https://www.javacodegeeks.com/2016/01/configurable-etl-processing-using-apache-storm-kite-sdk-morphlines.html

kite 使用 go

kite 使用 go_使用Apache Storm和Kite SDK Morphlines的可配置ETL处理相关推荐

  1. 使用Apache Storm和Kite SDK Morphlines的可配置ETL处理

    从我担任软件工程师的第一天起,我总是听到很多方面的相同要求: " 我们希望所有内容都可配置,我们希望在运行时更改所有内容,我们希望有一个可视化工具来应用所有这些逻辑,以便非开发人员使用和配置 ...

  2. Spotify如何对Apache Storm进行规模扩展

    [编者的话]Spotify是一家音乐流媒体服务商,最新的数据显示他们已经有6000万用户.Spotify内部使用Apache Storm来构建实时类系统,包括广告定位.音乐推荐以及数据可视化等.本文来 ...

  3. Apache Storm 实时流处理系统ACK机制以及源码分析

    1.ACK机制简介 Storm的可靠性是指Storm会告知用户每一个消息单元是否在一个指定的时间(timeout)内被完全处理.完全处理的意思是该MessageId绑定的源Tuple以及由该源Tupl ...

  4. Apache Storm 实时流处理系统通信机制源码分析

    我们今天就来仔细研究一下Apache Storm 2.0.0-SNAPSHOT的通信机制.下面我将从大致思想以及源码分析,然后我们细致分析实时流处理系统中源码通信机制研究. 1. 简介 Worker间 ...

  5. BigData之Storm:Apache Storm的简介、深入理解、下载、案例应用之详细攻略

    BigData之Storm:Apache Storm的简介.深入理解.下载.案例应用之详细攻略 目录 Apache Storm的简介 Apache Storm的深入理解 1.Storm与hadoop ...

  6. storm apache java_Apache Ignite与Apache Storm(深入)

    Apache Ignite和Apache Storm在很多方面都是两种截然不同的技术 - 特别是因为Storm有一个非常具体的用例,而Ignite在同一个屋檐下有相当多的工具 . 据我了解,Ignit ...

  7. apache ignite_使用Apache Storm和Apache Ignite进行复杂事件处理(CEP)

    apache ignite 在本文中, "使用Apache Ignite进行高性能内存计算"一书的作者将讨论使用Apache Strom和Apache Ignite进行复杂的事件处 ...

  8. 使用Apache Storm和Apache Ignite进行复杂的事件处理(CEP)

    在本文中, "使用Apache Ignite进行高性能内存计算"一书的作者将讨论使用Apache Strom和Apache Ignite进行复杂的事件处理. 本文的一部分摘自 书 ...

  9. Apache Storm:如何使用Flux配置KafkaBolt

    微型框架中的助焊剂可以帮助我们定义和部署Storm拓扑. Flux有各种包装器,可帮助您定义所需的流并初始化Bolts和Spouts(使用带有或不带有参数的构造函数,并通过反射自动调用自定义配置方法) ...

最新文章

  1. SAP MM MB21创建预留单据报错- Error during conversion to alternative units of measure -
  2. 数据库MySQL关系模型之基本概念
  3. pcie和usb哪个带宽高_了解数字示波器采样率和模拟带宽的规格
  4. 6计算机系统的组成是,计算机系统的组成(范文)(6页)-原创力文档
  5. redis入门——安装篇
  6. Python与自然语言处理搭建环境
  7. Java 25天基础-DAY 05-面向对象-构造函数
  8. LAMP兄弟连网络基础视频地址全集!!!
  9. oracle工具sql loader,Oracle sql loader简单使用
  10. MacBook M1苹果电脑安装 SVN 以及 简单使用SVN
  11. 《分布式微服务电商源码》-项目简介
  12. 顽固文件删除终极武器
  13. iOS证书的种类和其作用
  14. 创建学生表,课程表,班级表,班级课程表
  15. 从懵逼到恍然大悟之Java中RMI的使用
  16. 微信小程序开发过程中出现的内存泄漏问题
  17. java 小数乘法_java复习题69151-_人人文库网
  18. selenium+python 的微博自动转赞评功能实现
  19. Linux下python环境搭建
  20. Linux基础——makefile编写

热门文章

  1. UOJ#33-[UR #2]树上GCD【长链剖分,根号分治】
  2. jzoj6307-安排【归并排序】
  3. 主席树 - 可持久化线段树
  4. 4152. [AMPPZ2014]The Captain(稠密图最短路)
  5. 网络流及建模专题(下)
  6. 动态规划训练5 [回文词]
  7. Javafx的WebEgine执行window对象设置属性后为undefined
  8. IntelliJ IDEA 2018.2 发布,支持 Java 11
  9. 关于Tomcat与MySQL连接池问题的详解
  10. JAVA实现汉字转换为拼音 pinyin4j/JPinyin