Doris Routine Load正则表达实战

1. Kafka安装

#1.下载安装包
wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
#2. 解析安装包
$ tar -xzf kafka_2.13-3.2.0.tgz
$ cd kafka_2.13-3.2.0
#3. 启动zookeeper
$ nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
#4. 启动kafka
$ nohup bin/kafka-server-start.sh config/server.properties  &
#5. 创建topic
$ bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092

2. Doirs库、表、Routine Load任务创建

# 创建数据库
create database kafka_doris;
#切换数据库
use kafka_doris;
#创建clicklog表
CREATE TABLE IF NOT EXISTS kafka_doris.clicklog
(
`clickTime` DATETIME NOT NULL COMMENT "点击时间",
`type` VARCHAR(20) NOT NULL COMMENT "点击类型",
`id`  VARCHAR(100) COMMENT "唯一id",
`user` VARCHAR(100) COMMENT "用户名称",
`city` VARCHAR(50) COMMENT "所在城市"
)
DUPLICATE KEY(`clickTime`, `type`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

创建有正则表达式的Routine Load任务:

CREATE ROUTINE LOAD kafka_doris.load_from_kafka ON clicklog
COLUMNS(clickTime,id,type,user=regexp_extract(type, '([[:lower:]]+)C([[:lower:]]+)', 2))
PROPERTIES
("desired_concurrent_number"="1","max_batch_interval" = "5","max_batch_rows" = "300000","max_batch_size" = "209715200","strict_mode" = "false","format" = "json"
)
FROM KAFKA
("kafka_broker_list" = "127.0.0.1:9092","kafka_topic" = "test","property.group.id" = "doris");
  1. kafka_doris :Routine Load 任务所在的数据库
  2. load_from_kafka_test:Routine Load 任务名称
  3. clicklog:Routine Load 任务的目标表,也就是配置Routine Load 任务将数据导入到Doris哪个表中。
  4. strict_mode:导入是否为严格模式,这里设置为false。
  5. format:导入数据的类型,这里配置为json。
  6. kafka_broker_list:kafka broker服务的地址
  7. kafka_broker_list:kafka topic名称,也就是同步哪个topic上的数据。
  8. property.group.id:消费组id

其中user=regexp_extract(type, ‘([[:lower:]]+)C([[:lower:]]+)’, 2)将type字段中的数据按照正则表达函数抽取到user字段。

3.正则表达式验证

mysql> SELECT regexp_extract('AbCdE', '([[:lower:]]+)C([[:lower:]]+)', 2);
+-------------------------------------------------------------+
| regexp_extract('AbCdE', '([[:lower:]]+)C([[:lower:]]+)', 2) |
+-------------------------------------------------------------+
| d                                                           |

具体Doirs 的regexp_extract函数参照: https://doris.apache.org/zh-CN/docs/sql-manual/sql-functions/string-functions/regexp/regexp_extract?_highlight=regexp_extract

4.Kafka数据生产

通过kafka-console-producer向kafka集群发送数据。

[root@17a5da45700b kafka_2.12-2.8.0]# ./bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
>
>{"id":"1","id":"user","type":"AbCdE","clickTime":"2022-06-17 01:08:21"}

5. 数据验证

数据验证:

mysql> select * from clicklog;
+---------------------+---------+------+------+------+
| clickTime           | type    | id   | user | city |
+---------------------+---------+------+------+------+
| 2022-06-17 01:08:21 | AbCdE   | 1    | d    | NULL |
+---------------------+---------+------+------+------+

可以看到user字段的值是对type执行正则表达式【user=regexp_extract(type, ‘([[:lower:]]+)C([[:lower:]]+)’, 2)】抽取的结果。
任务查看:

mysql>  SHOW ALL ROUTINE LOAD FOR load_from_kafka   \G;
*************************** 1. row ***************************Id: 1884278Name: load_from_kafka_testCreateTime: 2022-08-23 13:16:38PauseTime: NULLEndTime: NULLDbName: default_cluster:kafka_dorisTableName: clicklogState: RUNNINGDataSourceType: KAFKACurrentTaskNum: 1JobProperties: {"timezone":"Europe/London","send_batch_parallelism":"1","load_to_single_tablet":"false","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","currentTaskConcurrentNum":"1","fuzzy_parse":"false","partitions":"*","columnToColumnExpr":"clickTime,id,type,user=regexp_extract(`type`, '([[:lower:]]+)C([[:lower:]]+)', 2)","maxBatchIntervalS":"5","whereExpr":"*","dataFormat":"json","precedingFilter":"*","mergeType":"APPEND","format":"json","json_root":"","deleteCondition":"*","desireTaskConcurrentNum":"1","maxErrorNum":"0","strip_outer_array":"false","execMemLimit":"2147483648","num_as_string":"false","maxBatchRows":"300000"}
DataSourceProperties: {"topic":"test","currentKafkaPartitions":"0","brokerList":"127.0.0.1:9092"}CustomProperties: {"group.id":"doris","kafka_default_offsets":"OFFSET_END"}Statistic: {"receivedBytes":71,"runningTxns":[],"errorRows":0,"committedTaskNum":7,"loadedRows":1,"loadRowsRate":0,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":1,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":35621}Progress: {"0":"49"}Lag: {"0":0}
ReasonOfStateChanged: ErrorLogUrls: OtherMsg:
1 row in set (0.00 sec)ERROR:
No query specified

从任务状态可看出

6. 新书宣传

最后宣传下我的书:《Spark内核和应用实战》,可以购买我的新书。
京东地址: https://item.jd.com/13613302.html

Doris Routine Load正则表达实战相关推荐

  1. Doris Routine Load数据导入实战【每秒导入16w】

    Doris Routine Load实战[每秒导入16w] 1. Kafka安装 #1.下载安装包 wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2. ...

  2. Doris Routine Load接入Kafka0.8.0实战

    Doris Routine Load接入Kafka0.8.0实战 想要更全面了解Spark内核和应用实战,可以购买我的新书. <图解Spark 大数据快速分析实战>(王磊) 1. 问题产生 ...

  3. 数据导入 - Kafka 结合Doris Routine load 任务导入

    背景 参与项目有关数据采集,采集数据同步到数据库之前是使用sql的形式去进行同步,考虑到全表同步数据时数据量过大导致mybatis批量插入数据内存异常,原始解决方案采取分批次进行导入,但是同步数据速度 ...

  4. doris routine load

    https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/routine-load-manual/ json (无嵌套) C ...

  5. Apache Doris Routine Load数据导入使用方法

    Apache Doris 代码仓库地址:apache/incubator-doris 欢迎大家关注加星 1.概要 Routine load 功能为用户提供了一种自动从指定数据源进行数据导入的功能. R ...

  6. Apache Doris 系列: 基础篇-Routine Load

    简介 Routine Load 支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中. 目前仅支持通过无认证或者 SSL 认证方式,从 Kakfa 导入 CS ...

  7. Doris之Routine Load

    Routine Load 例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能. 本文档主要介绍该功能的实现原理.使用方式以及最佳实践. FE:Frontend, ...

  8. Doris系列13-数据导入之Routine Load

    文章目录 一. Routine Load 概述 二. Kafka 例行导入 2.1 创建例行导入任务 2.1.1 columns_mapping 2.1.2 where_predicates 2.1. ...

  9. MySQL 学习笔记(2)— 通配符/正则表达/运算符

    本文继续对 MySQL 中的通配符过滤.正则表达式.运算符进行分类总结. 1. 通配符 LIKE 指示 MySQL,后跟的搜索模式利用通配符匹配而不是直接相等匹配进行比较,其中分为 % 和 _ 通配符 ...

最新文章

  1. springboot启动异常java.lang.NoSuchFieldError: DEFAULT_INCOMPATIBLE_IMPROVEMENTS
  2. 二十四、爬取古诗网中的100首古诗文
  3. 为节约而生:从标准Attention到稀疏Attention
  4. 单图说TDSQL;OceanBase 2.2 事务引擎核心功能;穿云箭2.0版发布;RMAN DUPLICATE配置19C DG;外键上有无索引的影响;MySQL8.0 索引新功能;GaussDB C
  5. Java 堆内存是线程共享的!面试官:你确定吗?
  6. Linux内核源代码分析——中断(一鞭一条痕)(上)
  7. 【开发随笔】以强化学习环境 gym 库为例:为什么日常中我应该试图标准化接口?
  8. Promise.all和Promise.race区别,和使用场景
  9. 系统辨识理论及应用_液压系统故障智能诊断方法(2)
  10. java万年历算法_寿星万年历---java算法实现
  11. Java面试----2018年最新Struts2面试题
  12. react antd select默认选中第一项
  13. 华为以“平台应变”之道角逐数字化转型“深水区”
  14. PDF文件如何转JPG图片?简单三步轻松转换
  15. SpringBoot框架下使用过滤器Filter
  16. Vue 视频音频播放
  17. 填坑Ⅰ | 简单的数据结构
  18. python3 用socket编写ftp
  19. STAR法则修改简历
  20. redis 学习曲线及记录笔记

热门文章

  1. 如何将png转换为其他格式?格式转换器工具怎么用?
  2. Splinter入门(七) Interacting with elements 元素交互(二)
  3. Solidity constant view pure关键字的区别
  4. 如何将 EXE 文件转换为 APK 文件以在 Android 上运行
  5. 图像对比度提高的两种方法
  6. 【企业资源计划ERP】,描述该供应商背景;阐述其ERP产品的各项功能,并画出功能结构图;阐述其产品特点及适用行业(至少3个),并针对每个行业至少举出一个具体的案例。
  7. C语言学习(一.C语言概述)
  8. 手把手搭建简易负载均衡集群
  9. Ubuntu-while loading shared libraries: libXm.so.4: No such file or directory
  10. xcode 调试器 LLDB