Doris Routine Load实战【每秒导入16w】

1. Kafka安装

#1.下载安装包
wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
#2. 解析安装包
$ tar -xzf kafka_2.13-3.2.0.tgz
$ cd kafka_2.13-3.2.0
#3. 启动zookeeper
$ nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
#4. 启动kafka
$ nohup bin/kafka-server-start.sh config/server.properties  &
#5. 创建topic
$ bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092

2. Doirs库、表、Routine Load任务创建。

# 创建数据库
create database kafka_doris;
#切换数据库
use kafka_doris;
#创建clicklog表
CREATE TABLE IF NOT EXISTS kafka_doris.clicklog
(
`clickTime` DATETIME NOT NULL COMMENT "点击时间",
`type` VARCHAR(20) NOT NULL COMMENT "点击类型",
`id`  VARCHAR(100) COMMENT "唯一id",
`user` VARCHAR(100) COMMENT "用户名称",
`city` VARCHAR(50) COMMENT "所在城市"
)
DUPLICATE KEY(`clickTime`, `type`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

创建Routine Load任务:

CREATE ROUTINE LOAD kafka_doris.load_from_kafka_test ON clicklog
COLUMNS(clickTime,id,type,user)
PROPERTIES
("desired_concurrent_number"="1","max_batch_interval" = "20","max_batch_rows" = "300000","max_batch_size" = "209715200","strict_mode" = "false","format" = "json"
)
FROM KAFKA
("kafka_broker_list" = "127.0.0.1:9092","kafka_topic" = "test","property.group.id" = "doris");
  1. pulsar_doris :Routine Load 任务所在的数据库
  2. load_from_kafka_test:Routine Load 任务名称
  3. clicklog:Routine Load 任务的目标表,也就是配置Routine Load 任务将数据导入到Doris哪个表中。
  4. strict_mode:导入是否为严格模式,这里设置为false。
  5. format:导入数据的类型,这里配置为json。
  6. kafka_broker_list:kafka broker服务的地址
  7. kafka_broker_list:kafka topic名称,也就是同步哪个topic上的数据。
  8. property.group.id:消费组id

3. Kafka数据生产

我写了一个java程序模拟生成kafka数据并发送到kafka,该程序模拟生成5000万条数据并发送到kafka。核心代码如下:

@Component
public class ProducerThread implements CommandLineRunner {@Autowiredprivate Producer producer;@Overridepublic void run(String... args) throws Exception {System.out.println("The Runner start to initialize ...");String strDateFormat = "yyyy-MM-dd HH:mm:ss";try {for(int j =0 ; j<50000;j++){int batchSize = 1000;for(int i = 0 ; i<batchSize ;i++){ClickLog clickLog  = new ClickLog();clickLog.setId(UUID.randomUUID().toString());SimpleDateFormat simpleDateFormat = new SimpleDateFormat(strDateFormat);clickLog.setClickTime(simpleDateFormat.format(new Date()));clickLog.setType("webset");clickLog.setUser("user"+ new Random().nextInt(1000) +i);producer.sendMessage(Constant.topicName, JSONObject.toJSONString(clickLog));}// Thread.sleep(50L);Constant.msgCount += batchSize;System.out.println("msgCount:"+Constant.msgCount);}System.out.println("msgCount:"+Constant.msgCount);} catch (Exception e) {e.printStackTrace();}}
}

执行如下命令下载源码并编译执行,注意这些代码中默认的配为

  • spring.kafka.bootstrap-servers=127.0.0.1:9092
  • spring.kafka.consumer.group-id=myGroup
  • topicName = “test”
#下载源码
git clone https://github.com/LOVEGISER/kafka-test
#进行项目
cd kafka-test/java
#编译项目
mvn install
#运行项目:如果需要更多数据写入Kafka。这里可以同时运行多个任务,
nohup java -jar target/kafka-0.0.1-SNAPSHOT.jar &

4. 数据验证

数据验证:

mysql> select count(1) from clicklog ;
+-----------+
| count(1)  |
+-----------+
| 211067116 |
+-----------+
1 row in set (7.75 sec)

任务查看:

mysql> SHOW ALL ROUTINE LOAD FOR load_from_kafka_test   \G;
*************************** 1. row ***************************Id: 11003Name: load_from_kafka_test6CreateTime: 2022-07-29 09:18:23PauseTime: NULLEndTime: NULLDbName: default_cluster:pulsar_dorisTableName: clicklogState: RUNNINGDataSourceType: KAFKACurrentTaskNum: 1JobProperties: {"timezone":"Europe/London","send_batch_parallelism":"1","load_to_single_tablet":"false","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","currentTaskConcurrentNum":"1","fuzzy_parse":"false","partitions":"*","columnToColumnExpr":"clickTime,id,type,user","maxBatchIntervalS":"20","whereExpr":"*","dataFormat":"json","precedingFilter":"*","mergeType":"APPEND","format":"json","json_root":"","deleteCondition":"*","desireTaskConcurrentNum":"1","maxErrorNum":"0","strip_outer_array":"false","execMemLimit":"2147483648","num_as_string":"false","maxBatchRows":"300000"}
DataSourceProperties: {"topic":"test0729","currentKafkaPartitions":"0","brokerList":"127.0.0.1:9092"}CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"doris"}Statistic: {"receivedBytes":12113141736,"runningTxns":[],"errorRows":0,"committedTaskNum":352,"loadedRows":105533558,"loadRowsRate":159000,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":105533558,"unselectedRows":0,"receivedBytesRate":18274000,"taskExecuteTimeMs":662829}Progress: {"0":"105533557"}Lag: {"0":0}
ReasonOfStateChanged: ErrorLogUrls: OtherMsg:
1 row in set (0.00 sec)ERROR:
No query specifiedmysql>

从任务状态可看出:662829毫秒写入105533558条数数据,大概每秒写入16w(105533558/662829约等于159216)数据。

  1. “totalRows”:105533558
  2. “taskExecuteTimeMs”:662829

5. 新书宣传

最后宣传下我的书:Spark内核和应用实战,可以购买我的新书。
京东地址: https://item.jd.com/13613302.html

Doris Routine Load数据导入实战【每秒导入16w】相关推荐

  1. Doris Routine Load正则表达实战

    Doris Routine Load正则表达实战 1. Kafka安装 #1.下载安装包 wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3. ...

  2. Apache Doris Routine Load数据导入使用方法

    Apache Doris 代码仓库地址:apache/incubator-doris 欢迎大家关注加星 1.概要 Routine load 功能为用户提供了一种自动从指定数据源进行数据导入的功能. R ...

  3. 数据导入 - Kafka 结合Doris Routine load 任务导入

    背景 参与项目有关数据采集,采集数据同步到数据库之前是使用sql的形式去进行同步,考虑到全表同步数据时数据量过大导致mybatis批量插入数据内存异常,原始解决方案采取分批次进行导入,但是同步数据速度 ...

  4. Doris Routine Load接入Kafka0.8.0实战

    Doris Routine Load接入Kafka0.8.0实战 想要更全面了解Spark内核和应用实战,可以购买我的新书. <图解Spark 大数据快速分析实战>(王磊) 1. 问题产生 ...

  5. doris routine load

    https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/routine-load-manual/ json (无嵌套) C ...

  6. Apache Doris 系列: 基础篇-Routine Load

    简介 Routine Load 支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中. 目前仅支持通过无认证或者 SSL 认证方式,从 Kakfa 导入 CS ...

  7. Doris系列13-数据导入之Routine Load

    文章目录 一. Routine Load 概述 二. Kafka 例行导入 2.1 创建例行导入任务 2.1.1 columns_mapping 2.1.2 where_predicates 2.1. ...

  8. 第3.3章:StarRocks数据导入--Routine Load

    Routine Load(例行导入)是StarRocks自带的一种可以从Kafka中持续不断的导入数据的方式,我们可以方便的在StarRocks中通过SQL来控制导入任务的暂停.继续及停止. 关于Ro ...

  9. Doris之Routine Load

    Routine Load 例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能. 本文档主要介绍该功能的实现原理.使用方式以及最佳实践. FE:Frontend, ...

最新文章

  1. 应用程序启动器 标记为信任_为什么您今天不能信任应用程序-以及如何解决它...
  2. 74. Leetcode 501. 二叉搜索树中的众数 (二叉搜索树-中序遍历类)
  3. 为什么说多道程序概念得到了中断和通道技术的支持?
  4. docker 搭建指定版本的cas_Docker搭建-生成SpringBoot项目脚手架-各版本
  5. Qt界面UI之QML初见(学习笔记四)
  6. Java中的异常处理与抛出
  7. 详解样条曲线(上)(包含贝塞尔曲线)
  8. 【python实战】23个爬虫项目源码:微信、淘宝、知乎、微博...
  9. JavaWeb实现的超市收银、基于SSM+mysql的 vue便利店收银管理系统实现【文档】【代码过程】
  10. 深度学习入门——03 MNIST手写数字图像集识别实验
  11. 【数据分析认知课(一):数据分析思维观】——读后感
  12. 无线电波是怎么产生的
  13. MySQL 怎么插入10天前的日期_使用 MySQL 的 SQL_MODE 有哪些坑,你知道么?
  14. 世界弹射物语 模拟抽卡
  15. C与C++中的常用符号与标点用法详解及实例
  16. 无需交 300 元认证费,快速创建已认证的小程序
  17. FPGA 之 SOPC 系列(四)NIOS II 外围设备--标准系统搭建
  18. Unity 屏幕特效 之 简单地调整颜色的 色散效果 的实现
  19. Cryptology Unlocked
  20. leetcode 876.链表中间结点

热门文章

  1. java针刺治疗尿潴留_针刺治疗中风后尿潴留优势病例分析
  2. css选择器重点内容
  3. 规划(计划):以智慧为音符在空间上写就的与时间相关的乐章
  4. biee mysql_【Oracle BIEE学习笔记一】Oracle BIEE简介 | 学步园
  5. 怎样设计才能让文字排版更好看(二)
  6. 家庭电路的三根线:火线、零线和地线
  7. 复杂环境下结构光光条中心的几种提取方法
  8. 同站 和 同源 你理解清楚了么?
  9. 标准库:fileinput
  10. AutoSAR系列讲解(实践篇)9.2-信息发送的Filter机制