Doris Routine Load正则表达实战
Doris Routine Load正则表达实战
1. Kafka安装
#1.下载安装包
wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
#2. 解析安装包
$ tar -xzf kafka_2.13-3.2.0.tgz
$ cd kafka_2.13-3.2.0
#3. 启动zookeeper
$ nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
#4. 启动kafka
$ nohup bin/kafka-server-start.sh config/server.properties &
#5. 创建topic
$ bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
2. Doirs库、表、Routine Load任务创建
# 创建数据库
create database kafka_doris;
#切换数据库
use kafka_doris;
#创建clicklog表
CREATE TABLE IF NOT EXISTS kafka_doris.clicklog
(
`clickTime` DATETIME NOT NULL COMMENT "点击时间",
`type` VARCHAR(20) NOT NULL COMMENT "点击类型",
`id` VARCHAR(100) COMMENT "唯一id",
`user` VARCHAR(100) COMMENT "用户名称",
`city` VARCHAR(50) COMMENT "所在城市"
)
DUPLICATE KEY(`clickTime`, `type`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
创建有正则表达式的Routine Load任务:
CREATE ROUTINE LOAD kafka_doris.load_from_kafka ON clicklog
COLUMNS(clickTime,id,type,user=regexp_extract(type, '([[:lower:]]+)C([[:lower:]]+)', 2))
PROPERTIES
("desired_concurrent_number"="1","max_batch_interval" = "5","max_batch_rows" = "300000","max_batch_size" = "209715200","strict_mode" = "false","format" = "json"
)
FROM KAFKA
("kafka_broker_list" = "127.0.0.1:9092","kafka_topic" = "test","property.group.id" = "doris");
- kafka_doris :Routine Load 任务所在的数据库
- load_from_kafka_test:Routine Load 任务名称
- clicklog:Routine Load 任务的目标表,也就是配置Routine Load 任务将数据导入到Doris哪个表中。
- strict_mode:导入是否为严格模式,这里设置为false。
- format:导入数据的类型,这里配置为json。
- kafka_broker_list:kafka broker服务的地址
- kafka_broker_list:kafka topic名称,也就是同步哪个topic上的数据。
- property.group.id:消费组id
其中user=regexp_extract(type, ‘([[:lower:]]+)C([[:lower:]]+)’, 2)将type字段中的数据按照正则表达函数抽取到user字段。
3.正则表达式验证
mysql> SELECT regexp_extract('AbCdE', '([[:lower:]]+)C([[:lower:]]+)', 2);
+-------------------------------------------------------------+
| regexp_extract('AbCdE', '([[:lower:]]+)C([[:lower:]]+)', 2) |
+-------------------------------------------------------------+
| d |
具体Doirs 的regexp_extract函数参照: https://doris.apache.org/zh-CN/docs/sql-manual/sql-functions/string-functions/regexp/regexp_extract?_highlight=regexp_extract
4.Kafka数据生产
通过kafka-console-producer向kafka集群发送数据。
[root@17a5da45700b kafka_2.12-2.8.0]# ./bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
>
>{"id":"1","id":"user","type":"AbCdE","clickTime":"2022-06-17 01:08:21"}
5. 数据验证
数据验证:
mysql> select * from clicklog;
+---------------------+---------+------+------+------+
| clickTime | type | id | user | city |
+---------------------+---------+------+------+------+
| 2022-06-17 01:08:21 | AbCdE | 1 | d | NULL |
+---------------------+---------+------+------+------+
可以看到user字段的值是对type执行正则表达式【user=regexp_extract(type, ‘([[:lower:]]+)C([[:lower:]]+)’, 2)】抽取的结果。
任务查看:
mysql> SHOW ALL ROUTINE LOAD FOR load_from_kafka \G;
*************************** 1. row ***************************Id: 1884278Name: load_from_kafka_testCreateTime: 2022-08-23 13:16:38PauseTime: NULLEndTime: NULLDbName: default_cluster:kafka_dorisTableName: clicklogState: RUNNINGDataSourceType: KAFKACurrentTaskNum: 1JobProperties: {"timezone":"Europe/London","send_batch_parallelism":"1","load_to_single_tablet":"false","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","currentTaskConcurrentNum":"1","fuzzy_parse":"false","partitions":"*","columnToColumnExpr":"clickTime,id,type,user=regexp_extract(`type`, '([[:lower:]]+)C([[:lower:]]+)', 2)","maxBatchIntervalS":"5","whereExpr":"*","dataFormat":"json","precedingFilter":"*","mergeType":"APPEND","format":"json","json_root":"","deleteCondition":"*","desireTaskConcurrentNum":"1","maxErrorNum":"0","strip_outer_array":"false","execMemLimit":"2147483648","num_as_string":"false","maxBatchRows":"300000"}
DataSourceProperties: {"topic":"test","currentKafkaPartitions":"0","brokerList":"127.0.0.1:9092"}CustomProperties: {"group.id":"doris","kafka_default_offsets":"OFFSET_END"}Statistic: {"receivedBytes":71,"runningTxns":[],"errorRows":0,"committedTaskNum":7,"loadedRows":1,"loadRowsRate":0,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":1,"unselectedRows":0,"receivedBytesRate":0,"taskExecuteTimeMs":35621}Progress: {"0":"49"}Lag: {"0":0}
ReasonOfStateChanged: ErrorLogUrls: OtherMsg:
1 row in set (0.00 sec)ERROR:
No query specified
从任务状态可看出
6. 新书宣传
最后宣传下我的书:《Spark内核和应用实战》,可以购买我的新书。
京东地址: https://item.jd.com/13613302.html
Doris Routine Load正则表达实战相关推荐
- Doris Routine Load数据导入实战【每秒导入16w】
Doris Routine Load实战[每秒导入16w] 1. Kafka安装 #1.下载安装包 wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2. ...
- Doris Routine Load接入Kafka0.8.0实战
Doris Routine Load接入Kafka0.8.0实战 想要更全面了解Spark内核和应用实战,可以购买我的新书. <图解Spark 大数据快速分析实战>(王磊) 1. 问题产生 ...
- 数据导入 - Kafka 结合Doris Routine load 任务导入
背景 参与项目有关数据采集,采集数据同步到数据库之前是使用sql的形式去进行同步,考虑到全表同步数据时数据量过大导致mybatis批量插入数据内存异常,原始解决方案采取分批次进行导入,但是同步数据速度 ...
- doris routine load
https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/routine-load-manual/ json (无嵌套) C ...
- Apache Doris Routine Load数据导入使用方法
Apache Doris 代码仓库地址:apache/incubator-doris 欢迎大家关注加星 1.概要 Routine load 功能为用户提供了一种自动从指定数据源进行数据导入的功能. R ...
- Apache Doris 系列: 基础篇-Routine Load
简介 Routine Load 支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中. 目前仅支持通过无认证或者 SSL 认证方式,从 Kakfa 导入 CS ...
- Doris之Routine Load
Routine Load 例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能. 本文档主要介绍该功能的实现原理.使用方式以及最佳实践. FE:Frontend, ...
- Doris系列13-数据导入之Routine Load
文章目录 一. Routine Load 概述 二. Kafka 例行导入 2.1 创建例行导入任务 2.1.1 columns_mapping 2.1.2 where_predicates 2.1. ...
- MySQL 学习笔记(2)— 通配符/正则表达/运算符
本文继续对 MySQL 中的通配符过滤.正则表达式.运算符进行分类总结. 1. 通配符 LIKE 指示 MySQL,后跟的搜索模式利用通配符匹配而不是直接相等匹配进行比较,其中分为 % 和 _ 通配符 ...
最新文章
- springboot启动异常java.lang.NoSuchFieldError: DEFAULT_INCOMPATIBLE_IMPROVEMENTS
- 二十四、爬取古诗网中的100首古诗文
- 为节约而生:从标准Attention到稀疏Attention
- 单图说TDSQL;OceanBase 2.2 事务引擎核心功能;穿云箭2.0版发布;RMAN DUPLICATE配置19C DG;外键上有无索引的影响;MySQL8.0 索引新功能;GaussDB C
- Java 堆内存是线程共享的!面试官:你确定吗?
- Linux内核源代码分析——中断(一鞭一条痕)(上)
- 【开发随笔】以强化学习环境 gym 库为例:为什么日常中我应该试图标准化接口?
- Promise.all和Promise.race区别,和使用场景
- 系统辨识理论及应用_液压系统故障智能诊断方法(2)
- java万年历算法_寿星万年历---java算法实现
- Java面试----2018年最新Struts2面试题
- react antd select默认选中第一项
- 华为以“平台应变”之道角逐数字化转型“深水区”
- PDF文件如何转JPG图片?简单三步轻松转换
- SpringBoot框架下使用过滤器Filter
- Vue 视频音频播放
- 填坑Ⅰ | 简单的数据结构
- python3 用socket编写ftp
- STAR法则修改简历
- redis 学习曲线及记录笔记
热门文章
- 如何将png转换为其他格式?格式转换器工具怎么用?
- Splinter入门(七) Interacting with elements 元素交互(二)
- Solidity constant view pure关键字的区别
- 如何将 EXE 文件转换为 APK 文件以在 Android 上运行
- 图像对比度提高的两种方法
- 【企业资源计划ERP】,描述该供应商背景;阐述其ERP产品的各项功能,并画出功能结构图;阐述其产品特点及适用行业(至少3个),并针对每个行业至少举出一个具体的案例。
- C语言学习(一.C语言概述)
- 手把手搭建简易负载均衡集群
- Ubuntu-while loading shared libraries: libXm.so.4: No such file or directory
- xcode 调试器 LLDB