来源:SpringForAll社区

1 概述

Spring Cloud Data Flow是一个用于构建实时数据管道和批处理过程的云原生工具包。 Spring Cloud Data Flow已准备好用于一系列数据处理用例,如简单的导入/导出,ETL处理,事件流和预测分析。

在本教程中,我们将学习使用流管道实时提取转换和加载(ETL)的示例,该管道从JDBC数据库中提取数据,将其转换为简单的POJO并将其加载到MongoDB中。

2. ETL and Event-Stream Processing

ETL - 提取,转换和加载 -通常被认为将数据从多个数据库和系统批量加载到公共数据仓库中的过程。在此数据仓库中,可以在不影响系统整体性能的情况下进行大量数据分析处理。

然而,新趋势正在改变如何做到这一点的方式。 ETL仍然可以将数据传输到数据仓库和数据池。

现在,可以借助Spring Cloud Data Flow的事件流体系架构使用流来完成此操作。

3. Spring Cloud Data Flow

借助Spring Cloud Data Flow(SCDF),开发人员可以创建两种风格的数据管道:

  • 使用Spring Cloud Stream的长效实时流应用程序

  • 使用Spring Cloud Task的批处理短期任务应用程序

在本文中,我们将介绍第一个,基于Spring Cloud Stream的长效流媒体应用程序。

3.1. Spring Cloud Stream Applications

SCDF管道流由不同的步骤组成,其中每一步都是使用Spring Cloud Stream微框架以Spring Boot样式构建的应用程序。这些应用程序集成了像Apache Kafka或RabbitMQ等的消息中间件。

这些应用程序分为源,处理器和接收器。与ETL过程相比,我们可以说源是“提取”,处理器是“转换器”,接收器是“加载”部分。

在某些情况下,我们可以在管道的一个或多个步骤中使用应用程序启动器。这意味着我们不需要为每一步实现新的应用程序,而是配置已实现的现有应用程序启动器。

可以在此处找到应用程序启动器列表。

3.2. Spring Cloud Data Flow Server


4. 环境设置(Environment Setup)

在开始之前,我们需要选择这个复杂部署的部分。要定义的第一部分是SCDF服务器。

为了进行测试,我们将使用SCDF Server Local进行本地开发。对于生产部署,我们稍后可以选择云本机运行时,如SCDF Server Kubernetes。我们可以在这里找到服务器运行列表。

现在,检查系统要求是否满足运行此服务器。

4.1. 系统要求(System Requirements)

要运行SCDF服务器,我们必须定义并设置两个依赖项:

  • 消息中间件

  • 关系数据库管理系统(the RDBMS)

我们将使用RabbitMQ作为消息中间件,我们选择PostgreSQL作为RDBMS来存储我们的管道流定义。

为了运行RabbitMQ,可以在此处下载最新版本并使用默认配置启动RabbitMQ实例或运行以下Docker命令:

  1. docker run --name dataflow-rabbit -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

最后的设置步骤:在默认端口5432上安装并运行PostgreSQL RDBMS。之后,创建一个数据库,SCDF可以使用以下脚本存储其流定义:

  1. CREATE DATABASE dataflow;

4.2. 本地Spring Cloud数据流服务器(Spring Cloud Data Flow Server Local)

我们可以选择使用docker-compose启动服务器,或者将其作为Java应用程序启动来运行SCDF Server Local。

在这里,我们将SCDF Server Local作为Java应用程序运行。为了配置应用程序,我们必须将配置定义为Java应用程序参数。我们在系统路径中需要配置Java 8。

为了托管jar和依赖项,我们需要为SCDF Server创建一个主文件夹,并将SCDF Server Local发行版下载到此文件夹中。您可以在此处下载SCDF Server Local最新分行版。

此外,我们需要创建一个lib文件夹并在其中放置JDBC驱动程序。这里提供了最新版本的PostgreSQL驱动程序。

最后,运行SCDF本地服务器:

  1. $java -Dloader.path=lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \

  2.    --spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/dataflow \

  3.    --spring.datasource.username=postgres_username \

  4.    --spring.datasource.password=postgres_password \

  5.    --spring.datasource.driver-class-name=org.postgresql.Driver \

  6.    --spring.rabbitmq.host=127.0.0.1 \

  7.    --spring.rabbitmq.port=5672 \

  8.    --spring.rabbitmq.username=guest \

  9.    --spring.rabbitmq.password=guest

我们可以通过查看此URL来检查它是否正在运行:http://localhost:9393/dashboard

4.3. Spring Cloud Data Flow Shell

SCDF Shell是一个命令行工具,可以轻松组合和部署我们的应用程序和管道。这些Shell命令在Spring Cloud Data Flow Server REST API上运行。

在此处获得最新版本的jar,并且下载到SCDF主文件夹中。完成后,运行以下命令(根据需要更新版本):

  1. $ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar

  2.  ____                              ____ _                __

  3. / ___| _ __  _ __(_)_ __   __ _   / ___| | ___  _   _  __| |

  4. \___ \| '_ \| '__| | '_ \ / _` | | |   | |/ _ \| | | |/ _` |

  5.  ___) | |_) | |  | | | | | (_| | | |___| | (_) | |_| | (_| |

  6. |____/| .__/|_|  |_|_| |_|\__, |  \____|_|\___/ \__,_|\__,_|

  7.  ____ |_|    _          __|___/                 __________

  8. |  _ \  __ _| |_ __ _  |  ___| | _____      __  \ \ \ \ \ \

  9. | | | |/ _` | __/ _` | | |_  | |/ _ \ \ /\ / /   \ \ \ \ \ \

  10. | |_| | (_| | || (_| | |  _| | | (_) \ V  V /    / / / / / /

  11. |____/ \__,_|\__\__,_| |_|   |_|\___/ \_/\_/    /_/_/_/_/_/

  12. Welcome to the Spring Cloud Data Flow shell. For assistance hit TAB or type "help".

  13. dataflow:>

如果最后一行中获得“server-unknown:>”而不是“dataflow:>”,则表示您没有在localhost上运行SCDF服务器。在这种情况下,请运行以下命令以连接到另一台主机:

  1. server-unknown:>dataflow config server http://{host}

现在,Shell连接到SCDF服务器,我们可以运行我们的命令。

我们在Shell中需要做的第一件事就是导入应用程序启动器。在Spring Boot 2.0.x中找到RabbitMQ + Maven的最新版本,并运行以下命令(再次声明,根据需要更新版本,此处为“Darwin-SR1”):

  1. $ dataflow:>app import --uri http://bit.ly/Darwin-SR1-stream-applications-rabbit-maven

检查应用程序是否安装完成,请运行以下Shell命令:

  1. $ dataflow:> app list

因此,我们应该看到一个包含所有已安装应用程序的表。

此外,SCDF提供了一个名为Flo的图形界面,我们可以通过以下地址访问:http://localhost:9393/dashboard。但是,它的使用不在本文的范围内。

5 编写ETL管道(Composing an ETL Pipeline)

我们现在创建我们的流管道。为此,我们将使用JDBC Source应用启动程序从关系数据库中提取信息。

此外,我们将创建一个自定义处理器,用于转换信息结构和一个自定义接收器,将数据加载到MongoDB中。

5.1 提取-准备关系数据库以进行提取

创建一个名为crm的数据库和一个名为customer的表:

  1. CREATE DATABASE crm;

  2. CREATE TABLE customer (

  3.    id bigint NOT NULL,

  4.    imported boolean DEFAULT false,

  5.    customer_name character varying(50),

  6.    PRIMARY KEY(id)

  7. )

请注意,我们正在使用导入的标志,该标志将存储已导入的记录。如有必要,我们还可以将此信息存储在另一个表中。

现在,插入一些数据:

  1. INSERT INTO customer(id, customer_name, imported) VALUES (1, 'John Doe', false);

5.2 转换-JDBC字段映射到MongoDB字段结构

对于转换步骤,我们将把源表customer_name字段简单转换为新字段名称。其他转换可以在这里完成,但尽量保持简短例子。

为此,我们将创建一个名为customer-transform的新项目。最简单的方法是使用Spring Initializr站点创建项目。看到网站后,选择一个Group和一个Artifact名称。我们将分别使用com.customer和customer-transform。

完成后,单击“生成项目”按钮下载项目。然后,解压缩项目并将其导入喜欢的IDE,并将以下依赖项添加到pom.xml:

  1. <dependency>

  2.    <groupId>org.springframework.cloud</groupId>

  3.    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>

  4. </dependency>

现在我们开始为字段名称转换进行编码。为此,我们将创建Customer类作为适配器。此类将通过setName()方法接收customer_name,并将通过getName方法输出其值。

@JsonProperty注释在JSON反序列化到Java时执行转换:

  1. public class Customer {

  2.    private Long id;

  3.    private String name;

  4.    @JsonProperty("customer_name")

  5.    public void setName(String name) {

  6.        this.name = name;

  7.    }

  8.    @JsonProperty("name")

  9.    public String getName() {

  10.        return name;

  11.    }

  12.    // Getters and Setters

  13. }

处理器需要从输入接收数据,进行转换并将结果绑定到输出通道。创建一个类来执行此操作:

  1. import org.springframework.cloud.stream.annotation.EnableBinding;

  2. import org.springframework.cloud.stream.messaging.Processor;

  3. import org.springframework.integration.annotation.Transformer;

  4. @EnableBinding(Processor.class)

  5. public class CustomerProcessorConfiguration {

  6.    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)

  7.    public Customer convertToPojo(Customer payload) {

  8.        return payload;

  9.    }

  10. }

在上面的代码中,我们可以观察到转换是自动发生的。输入接收JSON数据,Jackson使用set方法将其反序列化为Customer对象。

与输入相反,输出使用get方法将数据序列化为JSON。

5.3 加载-MongoDB接收器

与转换步骤类似,我们将创建另一个maven项目,名为customer-mongodb-sink。再次,访问Spring Initializr,Group名为com.customer,Artifact名为customer-mongodb-sink。然后,在依赖项搜索框中键入“MongoDB”并下载项目。

接下来,项目解压缩并导入IDE.

然后,添加与customer-transform项目中相同的额外依赖项。

现在我们将创建另一个Customer类,用于在此步骤中接收输入:

  1. import org.springframework.data.mongodb.core.mapping.Document;

  2. @Document(collection="customer")

  3. public class Customer {

  4.    private Long id;

  5.    private String name;

  6.    // Getters and Setters

  7. }

为了接收Customer,我们将创建一个Listener类,它将使用CustomerRepository保存客户实体:

  1. @EnableBinding(Sink.class)

  2. public class CustomerListener {

  3.    @Autowired

  4.    private CustomerRepository repository;

  5.    @StreamListener(Sink.INPUT)

  6.    public void save(Customer customer) {

  7.        repository.save(customer);

  8.    }

  9. }

在这种情况下,CustomerRepository是Spring Data的MongoRepository:

  1. import org.springframework.data.mongodb.repository.MongoRepository;

  2. import org.springframework.stereotype.Repository;

  3. @Repository

  4. public interface CustomerRepository extends MongoRepository<Customer, Long> {

  5. }

5.4 流定义

现在,两个自定义应用程序都可以在SCDF服务器上注册。为了实现这个目标,先使用Maven命令mvn install编译这两个项目。

之后,使用Spring Cloud Data Flow Shell注册它们:

  1. app register --name customer-transform --type processor --uri maven://com.customer:customer-transform:0.0.1-SNAPSHOT

  2. app register --name customer-mongodb-sink --type sink --uri maven://com.customer:customer-mongodb-sink:jar:0.0.1-SNAPSHOT

最后,检查应用程序是否存储在SCDF中,在shell中运行application list命令:

  1. app list

因此,我们应该在结果表中看到这两个应用程序。

5.4.1 流管道领域特定语言-DSL

DSL定义应用程序之间的配置和数据流。 SCDF DSL很简单。在第一个单词中,我们定义应用程序的名称,然后是配置。

此外,语法是受Unix启发的Pipeline语法,它使用垂直条(也称为“管道”)来连接多个应用程序:

  1. http --port=8181 | log

创建端口是8181HTTP应用程序,该应用程序将收到的任何正文有效负载发送到日志。

现在,让我们看看如何创建JDBC Source的DSL流定义。

5.4.2. JDBC Source流定义

JDBC Source的关键配置是查询和更新。查询将选择未读记录,而更新将更改标志以防止重新读取当前记录。

此外,我们将定义JDBC Source以30秒的固定延迟轮询并轮询最多1000行。最后,我们将定义连接的配置,如驱动程序,用户名,密码和连接URL:

  1. jdbc

  2.    --query='SELECT id, customer_name FROM public.customer WHERE imported = false'

  3.    --update='UPDATE public.customer SET imported = true WHERE id in (:id)'

  4.    --max-rows-per-poll=1000

  5.    --fixed-delay=30 --time-unit=SECONDS

  6.    --driver-class-name=org.postgresql.Driver

  7.    --url=jdbc:postgresql://localhost:5432/crm

  8.    --username=postgres

  9.    --password=postgres

可以在此处找到更多JDBC Source配置属性。

5.4.3 Customer MongoDB Sink流定义

由于我们没有在customer-mongodb-sink的application.properties中定义连接配置,我们将通过DSL参数进行配置。

我们的应用程序完全基于MongoDataAutoConfiguration。您可以在此处查看其他可能的配置。基本上,我们将定义spring.data.mongodb.uri:

  1. customer-mongodb-sink --spring.data.mongodb.uri=mongodb://localhost/main

5.4.4 创建和部署定义

首先,要创建最终的流定义,请返回到Shell并执行以下命令(没有换行符,刚刚插入它们以便于阅读):

  1. stream create --name jdbc-to-mongodb

  2.  --definition "jdbc

  3.  --query='SELECT id, customer_name FROM public.customer WHERE imported=false'

  4.  --fixed-delay=30

  5.  --max-rows-per-poll=1000

  6.  --update='UPDATE customer SET imported=true WHERE id in (:id)'

  7.  --time-unit=SECONDS

  8.  --password=postgres

  9.  --driver-class-name=org.postgresql.Driver

  10.  --username=postgres

  11.  --url=jdbc:postgresql://localhost:5432/crm | customer-transform | customer-mongodb-sink

  12.  --spring.data.mongodb.uri=mongodb://localhost/main"

此DSL流被定义名为jdbc-to-mongodb。接下来,我们将按名称部署流:

  1. stream deploy --name jdbc-to-mongodb

最后,我们应该在日志输出中看到所有可用日志的位置:

  1. Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink

  2. Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-transform

  3. Logs will be in {PATH_TO_LOG}/spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc

6 结束语

在本文中,我们已经看到了使用Spring Cloud Data Flow的ETL数据管道的完整示例。

最值得注意的是,我们看到了应用启动程序的配置,使用Spring Cloud Data Flow Shell创建了一个ETL流管道,并为我们的读取,转换和写数据实现了自定义应用程序。

与往常一样,示例代码可以在GitHub中找到。

原文链接:https://www.baeldung.com/spring-cloud-data-flow-etl

作者:Norberto Ritzmann

译者:Emma

-END-

 近期热文:

  • 如何判断一个元素在亿级数据中是否存在?

  • Minor GC、Major GC和Full GC之间的区别

  • 15个Spring的核心注释示例

  • Log4j2的性能为什么这么好?

  • 微服务架构如何保障双11狂欢下的99.99%高可用?

  • Spring Cloud Stream如何消费自己生产的消息?

关注我

点击“阅读原文”,看本号其他精彩内容

Spring Cloud Data Flow 中的 ETL相关推荐

  1. Spring Cloud Data Flow

    Spring Cloud Data Flow 1 Spring Cloud Data Flow 介绍 2 Local Server for development 3 Data Flow Server ...

  2. Spring Cloud Data Flow 简介

    Spring Cloud Data Flow 介绍 1.Data flow 是一个用于开发和执行大范围数据处理其模式包括ETL,批量运算和持续运算的统一编程模型和托管服务. 2.对于在现代运行环境中可 ...

  3. Spring Cloud Data Flow手动安装

    前言 Spring Cloud Data Flow 2.4.2 win7 简介 Microservice based Streaming and Batch data processing for C ...

  4. Spring系列学习之Spring Cloud Data Flow 微服务数据流

    英文原文:https://cloud.spring.io/spring-cloud-dataflow/ 目录 Spring Cloud数据流 概览 社区实现 快速开始 ?构建Spring Spring ...

  5. Pivotal发布Spring Cloud Data Flow 1.5版本

    Pivotal发布了Spring Cloud Data Flow 1.5版本,这是一款用于构建实时数据服务的项目,该版本的新功能包括: \\ 对用户界面改进\\t 更新的Spring Cloud St ...

  6. Spring Cloud Data Flow流处理入门-5

    流处理入门 Spring Cloud Data Flow 提供了 70 多个预构建的流应用程序,您可以立即使用它们来实现常见的流用例.在本指南中,我们使用其中两个应用程序来构建一个简单的数据管道,该管 ...

  7. Spring Cloud Data Flow整合UAA之使用LDAP进行账号管理

    我最新最全的文章都在 南瓜慢说 www.pkslow.com ,欢迎大家来喝茶! 1 前言 Spring Cloud Data Flow整合UAA的文章已经写了两篇,之前的方案是把用户信息保存在数据库 ...

  8. Spring Cloud Data Flow系列教程简介-1

    简介 前言 Spring Cloud data flow 为基于微服务的分布式流处理和批处理数据通道提供了一系列模型和最佳实践. Spring Cloud Data Flow 提供了为流和批处理数据管 ...

  9. Spring Cloud Data Flow整合UAA使用外置数据库和API接口

    我最新最全的文章都在 南瓜慢说 www.pkslow.com ,欢迎大家来喝茶! 1 前言 之前的文章<Spring Cloud Data Flow整合Cloudfoundry UAA服务做权限 ...

最新文章

  1. sh计算机,计算机教程:shellII(sh).pdf
  2. c++编程例子_如何开始厉害的C语言编程?大神都是这样开始的!
  3. 编译linux内核成vmlinuz,编译一个内核 - no bzImage/vmlinuz生成
  4. PHP 5.6 开启CURL HTTPS 类型
  5. Linux系统安装Apache 2.4.6
  6. .net 微服务实践
  7. 前端错误日志收集方案
  8. @PathVariable为空时指定默认值
  9. ubuntu之间传输文件
  10. Linux检测内存泄露的脚本
  11. 中级软件设计师真题与答案(2009到2018)
  12. 平板电脑的桌面计算机图标,苹果平板电脑桌面图标删除不了怎么办
  13. 分享几个超好用的矢量图标网站
  14. 鸟哥的linux私房菜——蔡德明
  15. 12张大数据图看看2016年世界各地发生大事件!
  16. 语义分割的评价指标——IoU
  17. Esx host补丁更新
  18. IO那些事01-IO总述和文件描述符
  19. 我们是如何改进YOLOv3进行红外小目标检测的?
  20. 磁场强度单位T(特斯拉)和高斯

热门文章

  1. linux 系统邮件 查看清空
  2. password is not set 问题解决
  3. redis启动问题:/var/redis/run/redis_6379.pid exists, process is already running or crashed
  4. linux内存管理基本概念
  5. docker --restart=always 参数 docker重启容器自动重启
  6. Android开发工具——ADB(Android Debug Bridge) 二HOST端
  7. 分支优化:neg+sbb算术运算代替逻辑跳
  8. Windows 中 FS 段寄存器
  9. 编程之美2.1 求二进制中1的个数
  10. Linux Shell 脚本攻略学习--四