ip integrator

“ Lambda体系结构是一种数据处理体系结构,旨在通过利用批处理和流处理方法来处理大量数据。 这种体系结构方法尝试通过使用批处理提供批处理数据的全面而准确的视图,同时使用实时流处理提供在线数据的视图来平衡延迟 , 吞吐量和容错 。 在演示之前,可以将两个视图输出合并。 lambda体系结构的兴起与大数据的增长,实时分析以及减轻地图缩减延迟的驱动力有关。” –维基百科

以前,我已经写了一些博客,涉及许多用例,这些用例是使用Oracle Data Integrator(ODI)在MapR分发之上进行批处理,以及使用Oracle GoldenGate(OGG)将事务数据流式传输到MapR Streams和其他Hadoop组件中。 最新的ODI(12.2.1.2.6)结合了这两种产品以完全适合lambda架构,同时具有许多新的强大功能,包括能够将Kafka流作为ODI本身的源和目标进行处理。 通过简化我们在一种产品下以相同逻辑设计处理和处理批处理和快速数据的方式,此功能对已经拥有或计划拥有lambda架构的任何人都具有巨大的优势。 现在,如果我们将OGG流传输功能和ODI批处理/流传输功能结合在一起,则可能性是无限的。

在本博客中,我将向您展示如何使用Spark Streaming在Oracle Data Integrator上配置MapR流(又名Kafka)以创建真正的lambda体系结构:补充批处理和服务层的快速层。

在本文中,我将跳过ODI的“赞扬和称赞”部分,但我只想强调一点:自从ODI首次发布以来,为该博客设计的映射就像您要设计的所有其他映射一样,都是您可以立即使用Hadoop / Spark集群上的本机代码运行100%的代码,无需编写零行代码,也不必担心如何以及在何处编码。

我已经在MapR上完成了此操作,因此我可以制作“两只鸟一块石头”。 向您展示MapR Streams步骤和Kafka。 由于两者在概念或API实现上并没有太大差异,因此如果您使用的是Kafka,则可以轻松地应用相同的步骤。

如果您不熟悉MapR Streams和/或Kafka概念,建议您花一些时间来阅读它们。 以下内容假定您知道什么是MapR Streams和Kafka(当然还有ODI)。 否则,您仍然会对可能的功能有个好主意。

准备工作

MapR Streams(aka Kafka)相关的准备工作

显然,我们需要创建MapR Streams路径和主题。 与Kafka不同,MapR通过“ maprcli”命令行实用程序使用自己的API来创建和定义主题。 因此,如果您使用商品Kafka,则此步骤将略有不同。 Web上有很多有关如何创建和配置Kafka主题和服务器的示例,因此您并不孤单。

为了进行此演示,我创建了一个路径和该路径下的两个主题。 我们将让ODI从其中一个主题(注册)进行消费,并生成另一个主题(registrations2)。 这样,您将看到它如何通过ODI起作用。

创建一个名为“ users-stream”的MapR Streams路径和一个名为“ registrations”的主题:

在我之前定义的相同路径上创建第二个主题“ registrations2”:

Hadoop相关准备

由于我正在使用已安装并正在运行MapR的个人预配置VM,因此此处的准备工作不多。 但是,需要一些步骤才能成功完成ODI映射。 如果您想知道我是如何使ODI从事MapR发行的,那么您可以参考此博客文章 。

  • Spark:我已经在Spark 1.6.1上进行了测试,您也应该这样做。 至少不要转到任何较低版本。 此外,您需要针对Spark构建具有特定的标签版本。 我从标签1605(这是MapR发布约定)开始测试,但是我的工作失败了。 究其原因,我发现PySpark库不是MapR Streams API的最新版本。 他们可以使用商品Kafka,但不能使用MapR。 这是我使用过的RPM的链接 。
  • Spark日志记录:在spark路径下,有一个“ config”文件夹,其中包含不同的配置文件。 如果需要的话,我们只对其中一项感兴趣。 文件名为“ log4j.properties”。 您需要确保将“ rootCategory”参数设置为INFO,否则,当您运行提交到Spark的任何ODI映射时,都会出现异常:

  • Hadoop凭证存储:在提交的任何作业中需要特定密码时,ODI将引用Hadoop凭证存储。 这样,我们就不会在参数/属性文件或代码本身中包含任何明确的密码。 在此演示中,我们将在某个时候使用MySQL,因此我需要创建一个存储并为MySQL密码添加别名。 首先,您需要确保core-site.xml中存在凭证存储的条目,然后实际上为密码值创建别名:

上一张图片是我的“ site-core.xml”的摘要,向您显示了我添加的凭据存储。 下一步将是验证商店是否存在,然后为密码值创建别名:

更改之后,即使在编辑core-site.xml之后,也无需重新启动任何hadoop组件。

注意:如果遇到“操作系统异常”(例如137),请确保有足够的可用内存。

ODI相关准备

您将在ODI中进行的常规准备工作。 我将在此博客中显示相关内容。

Hadoop数据服务器

以下配置特定于MapR。 如果使用其他发行版,则需要输入相关的端口号和路径:

Spark-Python数据服务器

在此ODI版本12.2.1.2.6中,如果要使用Spark Streaming和常规Spark服务器/群集,则需要创建多个Spark数据服务器。 在此演示中,我仅创建了Spark Streaming服务器,并将其称为Spark-Async。

您需要将“主群集”值更改为实际使用的值:yarn-client或yarn-cluster,然后选择我们之前创建的Hadoop DataServer。

现在,这里配置的有趣部分是Spark-Async数据服务器的属性:

我已经强调了您需要注意的最重要的部分。 之所以使用ASYNC,是因为我们将使用Spark Streaming。 其余属性与性能有关。

卡夫卡数据服务器

在这里,我们将定义MapR Streams数据服务器:

元数据代理具有一个“虚拟”地址,仅符合Kafka API。 MapR Streams客户端将为您提供连接到MapR Streams所需的服务。 您不能在此处测试数据服务器,因为在MapR上没有运行这样的Kafka服务器。 因此,请安全地忽略此处的测试连接,因为它将失败(这样就可以了)。

对于属性,您需要定义以下内容:

您需要手动定义“ key.deserializer”和“ value.deserializer”。 MapR Streams都需要两者,如果未定义作业,作业将失败。

ODI映射设计

我已经在这里进行了测试,涵盖了五个用例。 但是,我将仅介绍一个完整的内容,并突出显示其他内容,以免您阅读多余和常识性的步骤。

1)MapR Streams(Kafka)=> Spark Streaming => MapR Streams(Kafka):

在此映射中,我们将从先前创建的主题之一中读取流数据,应用一些函数(简单的函数),然后将结果生成到另一个主题。 这是映射的逻辑设计:

我通过复制已经为MySQL反向工程设计的模型之一(结构相同)定义了MapR_Streams_Registrations1模型,但是在这种情况下,当然选择的技术是Kafka。 您将能够选择流数据的格式:Avro,JSON,Parquet或Delimited:

物理设计如下所示:

  • SOURCE_GROUP:这是我们的MapR Streams主题“注册”
  • TRANS_GROUP:这是我们的Spark异步服务器
  • TARGET_GROUP:这是我们的MapR Streams主题“ registrations2”

物理实现的属性为:

您需要选择暂存位置作为Spark Async并启用“流式传输”。

要将主题(注册)中的流数据加载到Spark Streaming,我们需要选择合适的LKM,即LKM Kafka到Spark:

然后从Spark Streaming加载到MapR Stream目标主题registrations2,我们需要选择LKM Spark到Kafka:

2)MapR-FS(HDFS)=> Spark Streaming => MapR Streams(Kafka):

除了使用的知识模块之外,我在这里不会向您展示太多。 要将MapR-FS(HDFS)加载到Spark Streaming,我使用了LKM File来Spark:

为了从Spark Streaming加载到MapR Streams,我像以前的映射一样使用LKM Spark到Kafka。

注意:LKM File to Spark将充当一个流,一个文件流(显然)。 ODI将仅接收任何更新/新文件,而不是静态文件。

3)MapR Streams(Kafka)=> Spark Streaming => MySQL:

要将MapR Streams(Kafka)加载到Spark Streaming,就像在第一个映射中一样,我使用了LKM Kafka到Spark。 然后从Spark Streaming加载到MySQL,我使用了LKM Spark到SQL:

4)MapR流(Kafka)=> Spark流=> MapR-FS(HDFS)

为了从MapR流加载到Spark流,我像以前一样使用LKM Kafka到Spark,然后从Spark流加载到MapR-FS(HDFS),我已经使用LKM Spark到File:

5)MapR Streams(Kafka)和Oracle DB => Spark Streaming => MySQL

这是另一个有趣的用例,您实际上可以在现场将Kafka流与SQL源一起加入。 这仅(当前)适用于查找组件:

请注意,驱动程序源必须是Kafka(在我们的示例中为MapR流),而查找源必须是SQL数据库。 我使用了与以前的映射几乎相同的LKM:从LKM SQL到Spark,从LKM Kafka到Spark和从LKM Spark到SQL。

行刑

我将仅向您展示第一个用例的执行步骤,即MapR Streams(Kafka)=> Spark Streaming => MapR Streams(Kafka)。 为了模拟这种情况,我创建了一个Kafka生产者控制台和另一个Kafka消费者控制台,以便可以监视结果。 查看下面的生产者,我粘贴了一些记录:

我已经突出显示了其中一个URL,以确保您注意到它是小写的。 等待几秒钟,Spark将处理这些消息并将其发送到目标MapR Streams主题:

请注意,所有URL均大写。 成功!

通过映射,结果与预期的一样。 我不会为他们展示测试步骤,因为它们很简单。 这里的想法是向您展示如何使用MapR Streams(Kafka)配置ODI。

最后的话

值得一提的是,在执行任何映射时,您都可以深入查看日志并查看正在发生的事情(生成的代码等)。 此外,您将获得指向工作历史URL的链接,以在Spark UI上进行访问:

打开链接将带我们到Spark UI:

如果要控制流作业可以生存多长时间,则需要增加Spark-Async数据服务器的“ spark.streaming.timeout”属性,或从映射配置本身覆盖它。 您可能还需要创建一个ODI程序包,该程序包具有一个循环和其他有用的组件来满足您的业务需求。

结论

ODI可以处理lambda架构中的两个层:批处理层和快速层。 这不仅是ODI在其非常长的综合功能列表中添加的一项重要功能,而且还将提高从一个统一,易于使用的界面设计数据管道的生产率和效率。 显然,ODI可以像使用商品Kafka一样轻松地与MapR Streams一起使用,这要感谢MapR的二进制文件与Kafka API兼容,以及ODI不需要依赖于一个框架。 这向您保证ODI是与众不同的真正开放和模块化的E-LT工具。

其他一些相关职位:

  • Oracle Data Integrator和MapR融合数据平台:请检查!
  • 使用Oracle GoldenGate将事务数据流式传输到MapR流中
  • 使用Oracle GoldenGate进行MapR-FS实时事务数据提取
  • 带有ODI的逆向工程师MapR-DB

免责声明

这里表达的思想,实践和观点仅是作者的观点,不一定反映Oracle的观点。

翻译自: https://www.javacodegeeks.com/2017/02/perfecting-lambda-architecture-oracle-data-integrator-kafka-mapr-streams.html

ip integrator

ip integrator_使用Oracle Data Integrator(和Kafka / MapR流)完善Lambda体系结构相关推荐

  1. 使用Oracle Data Integrator(和Kafka / MapR流)完善Lambda体系结构

    " Lambda体系结构是一种数据处理体系结构,旨在通过利用批处理和流处理方法来处理大量数据. 这种体系结构方法试图通过使用批处理提供批处理数据的全面而准确的视图,同时使用实时流处理提供在线 ...

  2. Oracle Data Integrator之代理创建

    本文依次介绍了如何创建ODI 12c的Standalone agent.Physical agent以及Logical agent.三种agent的含义如下图所示: 一.创建Standalone Ag ...

  3. Oracle data integrator 11g安装配置和一个实例应用指南pdf

    <Oracle data integrator 11g安装配置和一个实例应用指南pdf> 下载地址: 网盘下载 转载于:https://www.cnblogs.com/long12365/ ...

  4. Oracle Data Integrator(ODI)简介

        到目前为止,Oracle的ETL工具包括两种,分别是Oracle Warehouse Builder(OWB)和Oracle Data Integrator(ODI).前者是Oracle自己开 ...

  5. ODI(Oracle Data Integrator)基本使用教程(1)

    ODI(Oracle Date Integrator)是Oracle Fusion MiddleWare的一个组件,它可以实现不同以及相同异构数据源之间打的数据同步与集成. 1.主资料档案库的创建 主 ...

  6. Using Oracle Data Integrator Open Tools

    变化数据随时都可能生成,因此需要不断的将新的变化同步过去.有两种方法可以完成这个任务. 第一种办法可以通过计划实现.例如创建一个计划,每半个小时执行一次同步接口.这样可以每半个小时将变化数据同步到目标 ...

  7. oracle Data guard

    DATA GUARD的最主要的功能是冗灾.当然根据配置的不同,DATA GUARD还可以具备以下特点:高可用.性能提升.数据保护以及故障恢复等. DATA GUARD可以分为物理STANDBY和逻辑S ...

  8. Oracle Data Guard简介

    DATA GUARD的最主要的功能是冗灾.当然根据配置的不同,DATA GUARD还可以具备以下特点:高可用.性能提升.数据保护以及故障恢复等. DATA GUARD可以分为物理STANDBY和逻辑S ...

  9. 深入详解Oracle data change notification

    深入详解 Oracle  data change notification   0.什么是 Oracle  data change notification  ? 当有多个应用程序或者进程操作同一个数 ...

最新文章

  1. Servlet 工作原理解析
  2. 【机器学习】LBP特征融合最大灰度差、平均灰度、平均梯度改善SVM检测效果
  3. 【学习笔记】CO-PA 简介
  4. 安装Windows digits问题列表
  5. html dom反选,HTML DOM系列教材 (五)- 事件
  6. Vue 脚手架配置代理
  7. mysql改单行数据编码_mysql数据库字符编码修改
  8. 两个jqgrid 直接互相数据_MySQL数据库锁应该这样用
  9. CI如何接受POST请求中的JSON数据
  10. mysql5.7压缩包安装教程
  11. 右击文件转圈卡住、刷新、白屏、闪退、桌面崩溃的通用解决方法
  12. [论文笔记] Detection of Glottal Closure Instants from Speech Signals: CNN Method
  13. linux下的程序开发实验,Linux程序实验.docx
  14. Android的隐私沙盒,与iOS隐私政策有哪些不同?
  15. 掷骰子java程序_掷骰子游戏窗体实现--Java初级小项目
  16. 如何查看谷歌账户的实际消费金额和扣款金额是否一致?
  17. 继金山WPS,永中Office之后,国产再添全新型办公软件
  18. 女生学大数据好找工作么
  19. Firefox在线安装Firebug插件
  20. implode() 函数

热门文章

  1. 树链剖分概念及模板 + 例题 [POJ3237 tree + 软件包管理器]
  2. AT3860-[AGC020F]Arcs on a Circle【dp】
  3. P7077-函数调用【拓扑排序,dp】
  4. 牛客练习赛69C-旅行【结论,最大生成树】
  5. 51nod-猴猴吃苹果【线段树】
  6. ABC182——F - Valid payments Editorial
  7. 2019.01.27【NOIP普及组】模拟赛C组总结
  8. 动态规划训练20 [Treats for the Cows POJ - 3186 ]
  9. 写一个http服务器
  10. javaweb项目搭建ehcache缓存系统