简介

Routine Load 支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中。
目前仅支持通过无认证或者 SSL 认证方式,从 Kakfa 导入 CSV 或 Json 格式的数据。

接下来通过一个案例介绍 Routine Load 的使用。
案例内容:

  1. 部署单节点Kafka
  2. 准备测试数据并导入kafka
  3. 导入数据到 Doris

部署单节点Kafka

下载

https://archive.apache.org/dist/kafka/2.4.1/kafka_2.11-2.4.1.tgz

部署

  1. 解压
tar xvf  kafka_2.11-2.4.1.tgz
# 创建软连接
ln -s kafka_2.11-2.4.1 kafka
cd kafka

2)启动zookeeper和kafka

# 启动 zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties  # 启动 kafka server
bin/kafka-server-start.sh -daemon config/server.properties
  1. 查看日志
tail -f logs/server.log

出现一下日志说明Kafka server已经启动成功了。

[2022-09-13 06:38:03,596] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

准备测试数据并导入kafka

{"order_date":"2022-09-07","order_id":"10001","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:00","update_time":"2022-09-07 09:00:00"}
{"order_date":"2022-09-07","order_id":"10002","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:01","update_time":"2022-09-07 09:00:01"}
{"order_date":"2022-09-07","order_id":"10003","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:02","update_time":"2022-09-07 09:00:02"}
{"order_date":"2022-09-07","order_id":"10004","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:03","update_time":"2022-09-07 09:00:03"}
{"order_date":"2022-09-07","order_id":"10005","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:04","update_time":"2022-09-07 09:00:04"}
{"order_date":"2022-09-07","order_id":"10006","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:05","update_time":"2022-09-07 09:00:05"}

创建kafka topic

创建名叫 order_info 的kafka topic, 3个分区,1个副本

 bin/kafka-topics.sh --create --topic order_info --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

导入数据到 Kafka topic

 bin/kafka-console-producer.sh --topic order_info --broker-list localhost:9092 < /tmp/data.txt

查看导入的数据

bin/kafka-console-consumer.sh --topic order_info --bootstrap-server localhost:9092 --from-beginning{"order_date":"2022-09-07","order_id":"10001","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:00","update_time":"2022-09-07 09:00:00"}
{"order_date":"2022-09-07","order_id":"10002","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:01","update_time":"2022-09-07 09:00:01"}
{"order_date":"2022-09-07","order_id":"10003","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:02","update_time":"2022-09-07 09:00:02"}
{"order_date":"2022-09-07","order_id":"10004","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:03","update_time":"2022-09-07 09:00:03"}
{"order_date":"2022-09-07","order_id":"10005","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:04","update_time":"2022-09-07 09:00:04"}
{"order_date":"2022-09-07","order_id":"10006","buy_num":"2","user_id":"30001","create_time":"2022-09-07 09:00:05","update_time":"2022-09-07 09:00:05"}

导入数据到 Doris

登录mysql客户端

创建 Doris 数据表

CREATE TABLE IF NOT EXISTS test.order_info_example
(  `order_date` DATE NOT NULL COMMENT "下单日期",  `order_id` INT NOT NULL COMMENT "订单id",  `buy_num` TINYINT COMMENT "购买件数",  `user_id` INT COMMENT "[-9223372036854775808, 9223372036854775807]",  `create_time` DATETIME COMMENT "创建时间",  `update_time` DATETIME COMMENT "更新时间"
)
ENGINE=olap
DUPLICATE KEY(`order_date`, `order_id`)
DISTRIBUTED BY HASH(`order_id`) BUCKETS 32
PROPERTIES (  "replication_num" = "1");

创建 Doris ROUTINE LOAD

语法:

CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source [data_source_properties]  
CREATE ROUTINE LOAD test.order_info_routine_load_example ON order_info_example
WITH APPEND
COLUMNS(order_date,order_id,buy_num,user_id,create_time,update_time)
PROPERTIES
(  "desired_concurrent_number"="3",    "max_batch_interval" = "20",    "max_batch_rows" = "300000",    "max_batch_size" = "209715200",    "strict_mode" = "false",    "format" = "json")
FROM KAFKA
(  "kafka_broker_list" = "localhost:9092",    "kafka_topic" = "order_info",   "property.group.id" = "order_info_routine_load_example",  "property.kafka_default_offsets" = "OFFSET_BEGINNING");  

merge_type: 数据合并类型,默认为 WITH APPEND,表示导入的数据都是普通的追加写操作。MERGE 和 DELETE 类型仅适用于 Unique Key 模型表。
strict_mode: 是否开启严格模式,默认为关闭。如果开启后,非空原始数据的列类型变换如果结果为 NULL,则会被过滤
format: 指定导入数据格式,默认是csv,支持json格式。
更多参数请参考ROUTINE LOAD

可通过show routine load查看任务的运行情况

查看数据

mysql> select * from order_info_example;
±-----------±---------±--------±--------±--------------------±--------------------+
| order_date | order_id | buy_num | user_id | create_time | update_time |
±-----------±---------±--------±--------±--------------------±--------------------+
| 2022-09-07 | 10006 | 2 | 30001 | 2022-09-07 09:00:05 | 2022-09-07 09:00:05 |
| 2022-09-07 | 10005 | 2 | 30001 | 2022-09-07 09:00:04 | 2022-09-07 09:00:04 |
| 2022-09-07 | 10004 | 2 | 30001 | 2022-09-07 09:00:03 | 2022-09-07 09:00:03 |
| 2022-09-07 | 10002 | 2 | 30001 | 2022-09-07 09:00:01 | 2022-09-07 09:00:01 |
| 2022-09-07 | 10003 | 2 | 30001 | 2022-09-07 09:00:02 | 2022-09-07 09:00:02 |
| 2022-09-07 | 10001 | 2 | 30001 | 2022-09-07 09:00:00 | 2022-09-07 09:00:00 |
±-----------±---------±--------±--------±--------------------±--------------------+
6 rows in set (0.05 sec)

至此,数据已经成功写入到Doris。

Apache Doris 系列: 基础篇-Routine Load相关推荐

  1. SQL Server调优系列基础篇(子查询运算总结)

    前言 前面我们的几篇文章介绍了一系列关于运算符的介绍,以及各个运算符的优化方式和技巧.其中涵盖:查看执行计划的方式.几种数据集常用的连接方式.联合运算符方式.并行运算符等一系列的我们常见的运算符.有兴 ...

  2. SQL Server调优系列基础篇(联合运算符总结)

    前言 上两篇文章我们介绍了查看查询计划的方式,以及一些常用的连接运算符的优化技巧,本篇我们总结联合运算符的使用方式和优化技巧. 废话少说,直接进入本篇的主题. 技术准备 基于SQL Server200 ...

  3. SQL Server 调优系列基础篇 - 子查询运算总结

    前言 前面我们的几篇文章介绍了一系列关于运算符的介绍,以及各个运算符的优化方式和技巧.其中涵盖:查看执行计划的方式.几种数据集常用的连接方式.联合运算符方式.并行运算符等一系列的我们常见的运算符.有兴 ...

  4. SQL Server调优系列基础篇(常用运算符总结)

    原文:SQL Server调优系列基础篇(常用运算符总结) 前言 上一篇我们介绍了如何查看查询计划,本篇将介绍在我们查看的查询计划时的分析技巧,以及几种我们常用的运算符优化技巧,同样侧重基础知识的掌握 ...

  5. Ansible系列-基础篇-Ansible Inventory的合理化配置

    欢迎关注个人公众号 DailyJobOps 原文地址:Ansible系列-基础篇-Ansible Inventory的合理化配置 这里写目录标题 Ansible Inventory内置参数 Inven ...

  6. 一起学Pandas系列基础篇---loc和iloc

    一起学Pandas系列基础篇-loc和iloc 一起学Pandas系列基础篇---loc和iloc 一起学Pandas系列基础篇---loc和iloc 本篇学习内容介绍 一.loc 1. 选择索引为0 ...

  7. 一起学Pandas系列基础篇---at和iat

    一起学Pandas系列基础篇-数据选择之at和iat 一起学Pandas系列基础篇---at和iat 一起学Pandas系列基础篇---数据选择之at和iat 本篇学习内容介绍 一.at 1. 选择B ...

  8. Apache Doris 系列: 基础篇-Flink SQL写入Doris

    简介 本文介绍 Flink SQL如何流式写入 Apache Doris,分为一下几个部分: Flink Doris connector Doris FE 节点配置 Flink SQL 写 Doris ...

  9. MySQL调优系列基础篇

    前言 有一段时间没有写博客了,整天都在忙,上班,录制课程,恰巧最近一段时间比较清闲,打算弄弄MYSQL数据库. 关于MySQL数据库,这里就不做过多的介绍,开源.免费等特性深受各个互联网行业喜爱,尤其 ...

最新文章

  1. kafka高可用(集群)
  2. linux 内核round-robin scheduler代码,LINUX源代码阅读报告
  3. 客户端安装服务器的路径查找文件,柴少鹏的官方网站-puppet系列(一)之puppet的部署、配置文件以及命令详解...
  4. 未能在全局命名空间中找到类型或命名空间名称“Wuqi”
  5. C语言读入全都的文件内容2
  6. golang源码安装和学习环境搭建
  7. python tkinter button_[转载]Python Tkinter之Button(转载)
  8. 使用JWT的ASP.NET CORE令牌身份验证和授权(无Cookie)——第1部分
  9. python怎么编辑文件_关于python:如何在Google Colab中编辑和保存文本文件(.py)?
  10. ChaiNext:比特币再度考验5W关口
  11. ELK下钉钉邮件告警通知
  12. MonoCSharp Evaluator Extension
  13. Linux 如何打开pyo文件,Python的文件类型
  14. EPlan 下载和破解
  15. 青米母公司动力未来登陆新三板 小米生态链企业首个挂牌上市
  16. 电子工程师最全面试题大全
  17. 计算机视觉教程 章毓晋 pdf,计算机视觉教程 教学课件 章毓晋 CCV08.pdf
  18. 最全的PS快捷键大全!
  19. JPA学习(基于hibernate)
  20. 全网最全HTML基础

热门文章

  1. IOS开发时必须知道的哪些事。。。
  2. 敬业!华为23级大佬消耗巨资整理出2000页网络协议最全笔记
  3. 各种说明方法的答题格式_说明文说明方法答题格式x
  4. linux svn配置提交输入备注,linux下svn服务强制添加备注(注释)提交
  5. CSRF token is incorrect 问题解决
  6. 手机操作计算机考试题,中招计算机信息技术考试训练题库,MS Word操作题一
  7. position与anchorPoint理解(一)
  8. C-V2X车联网安全通信方案研究
  9. 深圳礼品展:节庆馈赠食品“卷”出新高地,美味又走心
  10. Web安全测试工具介绍