文章目录

  • 一. Routine Load 概述
  • 二. Kafka 例行导入
    • 2.1 创建例行导入任务
      • 2.1.1 columns_mapping
      • 2.1.2 where_predicates
      • 2.1.3 desired_concurrent_number
      • 2.1.4 max_batch_interval/max_batch_rows/max_batch_size
      • 2.1.5 max_error_number
      • 2.1.6 data_source_properties
      • 2.1.7 strict_mode
      • 2.1.8 merge_type
      • 2.1.9 strict mode 与 source data 的导入关系
    • 2.2 查看导入作业状态
    • 2.3 修改作业属性
    • 2.4 作业控制
  • 三. 相关参数
  • 四. 案例
  • 参考:

一. Routine Load 概述

例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能。

名词解释:

  1. FE:Frontend,Doris 的前端节点。负责元数据管理和请求接入。
  2. BE:Backend,Doris 的后端节点。负责查询执行和数据存储。
  3. RoutineLoadJob:用户提交的一个例行导入作业。
  4. JobScheduler:例行导入作业调度器,用于调度和拆分一个 RoutineLoadJob 为多个 Task。
  5. Task:RoutineLoadJob 被 JobScheduler 根据规则拆分的子任务。
  6. TaskScheduler:任务调度器。用于调度 Task 的执行。

原理:

         +---------+|  Client |+----+----+|
+-----------------------------+
| FE          |               |
| +-----------v------------+  |
| |                        |  |
| |   Routine Load Job     |  |
| |                        |  |
| +---+--------+--------+--+  |
|     |        |        |     |
| +---v--+ +---v--+ +---v--+  |
| | task | | task | | task |  |
| +--+---+ +---+--+ +---+--+  |
|    |         |        |     |
+-----------------------------+|         |        |v         v        v+---+--+   +--+---+   ++-----+|  BE  |   |  BE  |   |  BE  |+------+   +------+   +------+

如上图,Client 向 FE 提交一个例行导入作业。

FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 TaskScheduler 分配到指定的 BE 上执行。

在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报。

FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。

整个例行导入作业通过不断的产生新的 Task,来完成数据不间断的导入。

二. Kafka 例行导入

当前我们仅支持从 Kafka 系统进行例行导入。该部分会详细介绍 Kafka 例行导入使用方式和最佳实践。

使用限制:

  1. 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。
  2. 支持的消息格式为 csv, json 文本格式。csv 每一个 message 为一行,且行尾不包含换行符。
  3. 仅支持 Kafka 0.10.0.0(含) 以上版本。

2.1 创建例行导入任务

创建例行导入任务的的详细语法可以连接到 Doris 后,执行 HELP ROUTINE LOAD; 查看语法帮助。这里主要详细介绍,创建作业时的注意事项。

2.1.1 columns_mapping

columns_mapping 主要用于指定表结构和 message 中的列映射关系,以及一些列的转换。如果不指定,Doris 会默认 message 中的列和表结构的列按顺序一一对应。虽然在正常情况下,如果源数据正好一一对应,则不指定也可以进行正常的数据导入。但是我们依然强烈建议用户显式的指定列映射关系。这样当表结构发生变化(比如增加一个 nullable 的列),或者源文件发生变化(比如增加了一列)时,导入任务依然可以继续进行。否则,当发生上述变动后,因为列映射关系不再一一对应,导入将报错。

在 columns_mapping 中我们同样可以使用一些内置函数进行列的转换。但需要注意函数参数对应的实际列类型。举例说明:
假设用户需要导入只包含 k1 一列的表,列类型为 int。并且需要将源文件中的 null 值转换为 0。该功能可以通过 ifnull 函数实现。正确的使用方式如下:

COLUMNS (xx, k1=ifnull(xx, "0"))

注意这里我们使用 “0” 而不是 0,虽然 k1 的类型为 int。因为对于导入任务来说,源数据中的列类型都为 varchar,所以这里 xx 虚拟列的类型也为 varchar。所以我们需要使用 “0” 来进行对应的匹配,否则 ifnull 函数无法找到参数为 (varchar, int) 的函数签名,将出现错误。

再举例,假设用户需要导入只包含 k1 一列的表,列类型为 int。并且需要将源文件中的对应列进行处理:将负数转换为正数,而将正数乘以 100。这个功能可以通过 case when 函数实现,正确写法应如下:

COLUMNS (xx, k1 = case when xx < 0 then cast(-xx as varchar) else cast((xx + '100') as varchar) end)

注意这里我们需要将 case when 中所有的参数都最终转换为 varchar,才能得到期望的结果。

2.1.2 where_predicates

where_predicates 中的的列的类型,已经是实际的列类型了,所以无需向 columns_mapping 那样强制的转换为 varchar 类型。按照实际的列类型书写即可。

2.1.3 desired_concurrent_number

desired_concurrent_number 用于指定一个例行作业期望的并发度。即一个作业,最多有多少 task 同时在执行。对于 Kafka 导入而言,当前的实际并发度计算如下:

Min(partition num, desired_concurrent_number, alive_backend_num, Config.max_routine_load_task_concurrrent_num)

其中 Config.max_routine_load_task_concurrrent_num 是系统的一个默认的最大并发数限制。这是一个 FE 配置,可以通过改配置调整。默认为 5。

其中 partition num 指订阅的 Kafka topic 的 partition 数量。alive_backend_num 是当前正常的 BE 节点数。

2.1.4 max_batch_interval/max_batch_rows/max_batch_size

这三个参数用于控制单个任务的执行时间。其中任意一个阈值达到,则任务结束。其中 max_batch_rows 用于记录从 Kafka 中读取到的数据行数。max_batch_size 用于记录从 Kafka 中读取到的数据量,单位是字节。目前一个任务的消费速率大约为 5-10MB/s。

那么假设一行数据 500B,用户希望每 100MB 或 10 秒为一个 task。100MB 的预期处理时间是 10-20 秒,对应的行数约为 200000 行。则一个合理的配置为:

"max_batch_interval" = "10",
"max_batch_rows" = "200000",
"max_batch_size" = "104857600"

以上示例中的参数也是这些配置的默认参数。

2.1.5 max_error_number

max_error_number 用于控制错误率。在错误率过高的时候,作业会自动暂停。因为整个作业是面向数据流的,且由于数据流的无边界性,我们无法像其他导入任务一样,通过一个错误比例来计算错误率。因此这里提供了一种新的计算方式,来计算数据流中的错误比例。

我们设定了一个采样窗口。窗口的大小为 max_batch_rows * 10。在一个采样窗口内,如果错误行数超过 max_error_number,则作业被暂停。如果没有超过,则下一个窗口重新开始计算错误行数。

我们假设 max_batch_rows 为 200000,则窗口大小为 2000000。设 max_error_number 为 20000,即用户预期每 2000000 行的错误行为 20000。即错误率为 1%。但是因为不是每批次任务正好消费 200000 行,所以窗口的实际范围是 [2000000, 2200000],即有 10% 的统计误差。

错误行不包括通过 where 条件过滤掉的行。但是包括没有对应的 Doris 表中的分区的行。

2.1.6 data_source_properties

data_source_properties 中可以指定消费具体的 Kafka partition。如果不指定,则默认消费所订阅的 topic 的所有 partition。

注意,当显式的指定了 partition,则导入作业不会再动态的检测 Kafka partition 的变化。如果没有指定,则会根据 kafka partition 的变化,动态调整需要消费的 partition。

2.1.7 strict_mode

Routine load 导入可以开启 strict mode 模式。开启方式为在 job_properties 中增加 “strict_mode” = “true” 。默认的 strict mode 为关闭。

strict mode 模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:

  1. 对于列类型转换来说,如果 strict mode 为true,则错误的数据将被 filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。

  2. 对于导入的某列由函数变换生成时,strict mode 对其不产生影响。

  3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。

2.1.8 merge_type

数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete 条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理.

2.1.9 strict mode 与 source data 的导入关系

这里以列类型为 TinyInt 来举例

这里以列类型为 Decimal(1,0) 举例

2.2 查看导入作业状态

查看作业状态的具体命令和示例可以通过 HELP SHOW ROUTINE LOAD; 命令查看。

查看任务运行状态的具体命令和示例可以通过 HELP SHOW ROUTINE LOAD TASK; 命令查看。

只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。

2.3 修改作业属性

用户可以修改已经创建的作业。具体说明可以通过 HELP ALTER ROUTINE LOAD; 命令查看。或参阅 ALTER ROUTINE LOAD。

2.4 作业控制

用户可以通过 STOP/PAUSE/RESUME 三个命令来控制作业的停止,暂停和重启。可以通过 HELP STOP ROUTINE LOAD;, HELP PAUSE ROUTINE LOAD; 以及 HELP RESUME ROUTINE LOAD; 三个命令查看帮助和示例。

三. 相关参数

一些系统配置参数会影响例行导入的使用。

  1. max_routine_load_task_concurrent_num
    FE 配置项,默认为 5,可以运行时修改。该参数限制了一个例行导入作业最大的子任务并发数。建议维持默认值。设置过大,可能导致同时并发的任务数过多,占用集群资源。

  2. max_routine_load_task_num_per_be
    FE 配置项,默认为5,可以运行时修改。该参数限制了每个 BE 节点最多并发执行的子任务个数。建议维持默认值。如果设置过大,可能导致并发任务数过多,占用集群资源。

  3. max_routine_load_job_num
    FE 配置项,默认为100,可以运行时修改。该参数限制的例行导入作业的总数,包括 NEED_SCHEDULED, RUNNING, PAUSE 这些状态。超过后,不能在提交新的作业。

  4. max_consumer_num_per_group
    BE 配置项,默认为 3。该参数表示一个子任务中最多生成几个 consumer 进行数据消费。对于 Kafka 数据源,一个 consumer 可能消费一个或多个 kafka partition。假设一个任务需要消费 6 个 kafka partition,则会生成 3 个 consumer,每个 consumer 消费 2 个 partition。如果只有 2 个 partition,则只会生成 2 个 consumer,每个 consumer 消费 1 个 partition。

  5. push_write_mbytes_per_sec
    BE 配置项。默认为 10,即 10MB/s。该参数为导入通用参数,不限于例行导入作业。该参数限制了导入数据写入磁盘的速度。对于 SSD 等高性能存储设备,可以适当增加这个限速。

  6. max_tolerable_backend_down_num
    FE 配置项,默认值是0。在满足某些条件下,Doris可PAUSED的任务重新调度,即变成RUNNING。该参数为0代表只有所有BE节点是alive状态才允许重新调度。

  7. period_of_auto_resume_min
    FE 配置项,默认是5分钟。Doris重新调度,只会在5分钟这个周期内,最多尝试3次. 如果3次都失败则锁定当前任务,后续不在进行调度。但可通过人为干预,进行手动恢复。

四. 案例

创建表:

CREATE TABLE IF NOT EXISTS kafka_test1
(siteid INT DEFAULT '10',citycode SMALLINT,username VARCHAR(32) DEFAULT '',pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "3");

创建Kafka主题,并录入数据:

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic doriscd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/
sh kafka-console-producer.sh --broker-list 10.31.1.124:9092 --topic doris
6|12|pp|123
7|32|ww|231
8|12|ee|213
9|12|ff|213
10|12|gg|215

数据导入到doris:

CREATE ROUTINE LOAD example_db.kafka_test6 ON kafka_test1 COLUMNS TERMINATED BY "|",COLUMNS(siteid,citycode,username,pv)PROPERTIES("desired_concurrent_number"="1","max_batch_rows"="200000","max_batch_size"="104857600")FROM KAFKA("kafka_broker_list"="10.31.1.124:9092,10.31.1.125:9092,10.31.1.126:9092","kafka_topic"="doris","property.group.id"="gid6","property.clinet.id"="cid6","property.kafka_default_offsets"="OFFSET_BEGINNING");

Kafka日志报错:

我们来查看10.31.1.121这个BE的 日志:

我命令用的是IP,为什么这个地方反向给我整成域名了呢?
可能是之前测试环境配置过一些域名的信息吧。

在10.31.1.123里面加上,问题解决。

10.31.1.123             hp1
10.31.1.124             hp2
10.31.1.125             hp3
10.31.1.126             hp4

参考:

  1. https://doris.apache.org/master/zh-CN/administrator-guide/load-data/routine-load-manual.html

Doris系列13-数据导入之Routine Load相关推荐

  1. 2021-02-23 Matlab数据导入--importdata和load函数

    Matlab数据导入--importdata和load函数 importdata和load函数 1.引言 在使用matlab将数据导入到工作空间的时候,经常会使用到两个函数,一个是importdata ...

  2. Neo4j 数据导入案例NorthWind load csv

    介绍如何从关系数据库,以csv的文件格式,导入数据到neo4j数据库.重点理解关系数据库和图形数据库建模的联系. 一. Northwind 数据库介绍 社区版本的数据样例,主要用来练习sql语句的查询 ...

  3. 8s数据导入导出的load和unload解析

    GBase 8s支持使用 SQL LOAD 和 UNLOAD 语句来移动数据. LOAD 语句速度较快且较易于使用,但它只接受指定的数据格式.通常可将使用 UNLOAD 语句准备好的数据用于 LOAD ...

  4. MYSQL批量数据导入方法之一 LOAD DATA

    1.local_infile服务器变量设置为ON. SHOW VARIABLES LIKE '%local%'; SET GLOBAL local_infile=1 2.load data infil ...

  5. Doris之Routine Load

    Routine Load 例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能. 本文档主要介绍该功能的实现原理.使用方式以及最佳实践. FE:Frontend, ...

  6. Shell_mysql命令以及将数据导入Mysql数据库

    连接MYSQL数据库 mysql -h${db_ip} -u${db_user} -p${db_pawd} -P${db_port} -D${db_name} -s -e "${sql}&q ...

  7. Apache Doris Routine Load数据导入使用方法

    Apache Doris 代码仓库地址:apache/incubator-doris 欢迎大家关注加星 1.概要 Routine load 功能为用户提供了一种自动从指定数据源进行数据导入的功能. R ...

  8. Doris Routine Load数据导入实战【每秒导入16w】

    Doris Routine Load实战[每秒导入16w] 1. Kafka安装 #1.下载安装包 wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2. ...

  9. 数据导入 - Kafka 结合Doris Routine load 任务导入

    背景 参与项目有关数据采集,采集数据同步到数据库之前是使用sql的形式去进行同步,考虑到全表同步数据时数据量过大导致mybatis批量插入数据内存异常,原始解决方案采取分批次进行导入,但是同步数据速度 ...

最新文章

  1. uniapp 长链接 socket 封装
  2. mysql删除有空格字符名称的触发器
  3. VC调用matlab中定义的.m文件中的函数的实例
  4. win7内部版本7601副本不是正版
  5. Error: could not open `C:\Java\jre7\lib\i386\jvm.cfg
  6. C#的二进制序列化组件MessagePack介绍
  7. C#制作WinForm控件
  8. 一起学习C语言:初谈指针(三)
  9. Java实训项目:GUI学生信息管理系统(2019)【中】
  10. Jupyter.net:使用Jupyter进行交互式计算的Windows应用程序
  11. 使用Tomcat发布war包
  12. 在html中写python代码的语法和特点-----基于webpy的httpserver
  13. Bilinear Pairing双线性配对的解释
  14. 中石油职称计算机报名,中石油职称计算机考试题库(单选).doc
  15. 【不忘初心】Win10 20H2 19042.964_X64_四合一太阳谷图标_[纯净精简版][2.83G](2021.5.1)
  16. office相关文件转pdf的几种方式
  17. Linux中断子系统(一)中断控制器GIC架构
  18. 纯css实现文字跳动的动画效果
  19. 正则表达式匹配任意字符串
  20. t型三电平matlab仿真,T型三电平逆变器在不间断电源中的实现

热门文章

  1. 浅浅时光,几许温暖,拥一份恬静安然、守住一颗宁静的心,不染悲伤。
  2. 《收件人列表》:生成相应的收件人列表
  3. anchorPoint随记
  4. Java集合框架图解
  5. html如何判断素数,判断素数.html
  6. C++public、protected和private的区别
  7. 电商平台母婴商品销售数据分析 - Excel
  8. less模块——color函数
  9. kmeans python_二分kmeans python实现
  10. matlab 判断奇异矩阵,[转载]matlab错误 关于奇异矩阵