简介:本文将主要介绍 FlinkX 读取和写入 Clickhouse 的过程及相关参数,核心内容将围绕以下3个问题:1. FlinkX读写Clickhouse支持哪个版本?、2. ClickHouse读写Clickhouse有哪些参数?、3. ClickHouse读写Clickhouse参数都有哪些说明?

本文将主要介绍 FlinkX 读取和写入 Clickhouse 的过程及相关参数,核心内容将围绕以下3个问题,FlinkX 插件下载:

https://github.com/DTStack/flinkx

  1. FlinkX读写Clickhouse支持哪个版本?
  2. ClickHouse读写Clickhouse有哪些参数?
  3. ClickHouse读写Clickhouse参数都有哪些说明?

ClickHouse 读取

一、插件名称

名称:clickhousereader

二、支持的数据源版本

ClickHouse 19.x及以上

三、参数说明

「jdbcUrl」

  • 描述:针对关系型数据库的jdbc连接字符串
  • jdbcUrl参考文档:clickhouse-jdbc官方文档
  • 必选:是
  • 默认值:无

「username」

  • 描述:数据源的用户名
  • 必选:是
  • 默认值:无

「password」

  • 描述:数据源指定用户名的密码
  • 必选:是
  • 默认值:无

「where」

  • 描述:筛选条件,reader插件根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > time。
  • 注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。
  • 必选:否
  • 默认值:无

「splitPk」

  • 描述:当speed配置中的channel大于1时指定此参数,Reader插件根据并发数和此参数指定的字段拼接sql,使每个并发读取不同的数据,提升读取速率。注意:推荐splitPk使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。目前splitPk仅支持整形数据切分,不支持浮点、字符串、日期等其他类型。如果用户指定其他非支持类型,FlinkX将报错!如果channel大于1但是没有配置此参数,任务将置为失败。
  • 必选:否
  • 默认值:无

「fetchSize」

  • 描述:读取时每批次读取的数据条数。
  • 注意:此参数的值不可设置过大,否则会读取超时,导致任务失败。
  • 必选:否
  • 默认值:1000

「queryTimeOut」

  • 描述:查询超时时间,单位秒。
  • 注意:当数据量很大,或者从视图查询,或者自定义sql查询时,可通过此参数指定超时时间。
  • 必选:否
  • 默认值:1000

「customSql」

  • 描述:自定义的查询语句,如果只指定字段不能满足需求时,可通过此参数指定查询的sql,可以是任意复杂的查询语句。注意:只能是查询语句,否则会导致任务失败;查询语句返回的字段需要和column列表里的字段严格对应;当指定了此参数时,connection里指定的table无效;当指定此参数时,column必须指定具体字段信息,不能以*号代替;
  • 必选:否
  • 默认值:无

「column」

  • 描述:需要读取的字段。
  • 格式:支持3种格式

1.读取全部字段,如果字段数量很多,可以使用下面的写法:

"column":["*"]

2.只指定字段名称:

"column":["id","name"]

3.指定具体信息:

"column": [{"name": "col","type": "datetime","format": "yyyy-MM-dd hh:mm:ss","value": "value"
}]

属性说明:

  1. name:字段名称
  2. type:字段类型,可以和数据库里的字段类型不一样,程序会做一次类型转换
  3. format:如果字段是时间字符串,可以指定时间的格式,将字段类型转为日期格式返回
  4. value:如果数据库里不存在指定的字段,则会报错。如果指定的字段存在,当指定字段的值为null时,会以此value值作为默认值返回
  • 必选:是
  • 默认值:无

「polling」

  • 描述:是否开启间隔轮询,开启后会根据pollingInterval轮询间隔时间周期性的从数据库拉取数据。开启间隔轮询还需配置参数pollingInterval,increColumn,可以选择配置参数startLocation。若不配置参数startLocation,任务启动时将会从数据库中查询增量字段最大值作为轮询的开始位置。
  • 必选:否
  • 默认值:false

「pollingInterval」

描述:轮询间隔时间,从数据库中拉取数据的间隔时间,默认为5000毫秒。
必选:否
默认值:5000

「requestAccumulatorInterval」

  • 描述:发送查询累加器请求的间隔时间。
  • 必选:否
  • 默认值:2

配置示例

1、基础配置

{"job": {"content": [{"reader": {"parameter" : {"column" : [ {"name" : "id","type" : "bigint","key" : "id"}, {"name" : "user_id","type" : "bigint","key" : "user_id"}, {"name" : "name","type" : "varchar","key" : "name"} ],"username" : "username","password" : "password","connection" : [ {"jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],"table" : [ "tableTest" ]} ],"where": "id > 1","splitPk": "id","fetchSize": 1000,"queryTimeOut": 1000,"customSql": "","requestAccumulatorInterval": 2},"name" : "clickhousereader"},"writer": {"name": "streamwriter","parameter": {"print": true}}}],"setting": {"speed": {"channel": 1,"bytes": 0},"errorLimit": {"record": 100}}}
}

2、多通道

{"job": {"content": [{"reader": {"parameter" : {"column" : [ {"name" : "id","type" : "bigint","key" : "id"}, {"name" : "user_id","type" : "bigint","key" : "user_id"}, {"name" : "name","type" : "varchar","key" : "name"} ],"username" : "username","password" : "password","connection" : [ {"jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],"table" : [ "tableTest" ]} ],"where": "id > 1","splitPk": "id","fetchSize": 1000,"queryTimeOut": 1000,"customSql": "","requestAccumulatorInterval": 2},"name" : "clickhousereader"},"writer": {"name": "streamwriter","parameter": {"print": true}}}],"setting": {"speed": {"channel": 3,"bytes": 0},"errorLimit": {"record": 100}}}
}

3、指定customSql

{"job": {"content": [{"reader": {"parameter" : {"column" : [ {"name" : "id","type" : "bigint","key" : "id"}, {"name" : "user_id","type" : "bigint","key" : "user_id"}, {"name" : "name","type" : "varchar","key" : "name"} ],"username" : "username","password" : "password","connection" : [ {"jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],"table" : [ "tableTest" ]} ],"where": "id > 1","splitPk": "id","fetchSize": 1000,"queryTimeOut": 1000,"customSql": "select id from tableTest","requestAccumulatorInterval": 2},"name" : "clickhousereader"},"writer": {"name": "streamwriter","parameter": {"print": true}}}],"setting": {"speed": {"channel": 1,"bytes": 0},"errorLimit": {"record": 100}}}
}

4、增量同步指定startLocation

{"job": {"content": [{"reader": {"parameter" : {"column" : [ {"name" : "id","type" : "bigint","key" : "id"}, {"name" : "user_id","type" : "bigint","key" : "user_id"}, {"name" : "name","type" : "varchar","key" : "name"} ],"username" : "username","password" : "password","connection" : [ {"jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],"table" : [ "tableTest" ]} ],"where": "id > 1","splitPk": "id","fetchSize": 1000,"queryTimeOut": 1000,"customSql": "","increColumn": "id","startLocation": "20","requestAccumulatorInterval": 2},"name" : "clickhousereader"},"writer": {"name": "streamwriter","parameter": {"print": true}}}],"setting": {"speed": {"channel": 1,"bytes": 0},"errorLimit": {"record": 100}}}
}

5、间隔轮询

{"job": {"content": [{"reader": {"parameter" : {"column" : [ {"name" : "id","type" : "bigint","key" : "id"}, {"name" : "user_id","type" : "bigint","key" : "user_id"}, {"name" : "name","type" : "varchar","key" : "name"} ],"username" : "username","password" : "password","connection" : [ {"jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],"table" : [ "tableTest" ]} ],"where": "id > 1","splitPk": "id","fetchSize": 1000,"queryTimeOut": 1000,"customSql": "","requestAccumulatorInterval": 2,"polling": true,"pollingInterval": 3000},"name" : "clickhousereader"},"writer": {"name": "streamwriter","parameter": {"print": true}}}],"setting": {"speed": {"channel": 1,"bytes": 0},"errorLimit": {"record": 100}}}
}

ClickHouse 写入

一、插件名称

名称:clickhousewriter

二、支持的数据源版本

ClickHouse 19.x及以上

三、参数说明

「jdbcUrl」

  • 描述:针对关系型数据库的jdbc连接字符串
  • 必选:是
  • 默认值:无

「username」

  • 描述:数据源的用户名
  • 必选:是
  • 默认值:无

「password」

  • 描述:数据源指定用户名的密码
  • 必选:是
  • 默认值:无

「column」

  • 描述:目的表需要写入数据的字段,字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。
  • 必选:是
  • 默认值:否
  • 默认值:无

「preSql」

  • 描述:写入数据到目的表前,会先执行这里的一组标准语句
  • 必选:否
  • 默认值:无

「postSql」

  • 描述:写入数据到目的表后,会执行这里的一组标准语句
  • 必选:否
  • 默认值:无

「table」

  • 描述:目的表的表名称。目前只支持配置单个表,后续会支持多表
  • 必选:是
  • 默认值:无

「writeMode」

  • 描述:控制写入数据到目标表采用 insert into 语句,只支持insert操作
  • 必选:是
  • 所有选项:insert
  • 默认值:insert

「batchSize」

  • 描述:一次性批量提交的记录数大小,该值可以极大减少FlinkX与数据库的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成FlinkX运行进程OOM情况
  • 必选:否
  • 默认值:1024

文章来源如下,感兴趣的同学可查看原文:
https://www.aboutyun.com/forum.php?mod=viewthread&tid=29271

更多 Flink 技术问题可在钉钉群交流

原文链接:https://developer.aliyun.com/article/770821?

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

FlinkX 如何读取和写入 Clickhouse?相关推荐

  1. 【spark】Spark通过ClickHouse-Native-JDBC写入Clickhouse

    文章目录 1.概述 1.概述 转载:Spark通过ClickHouse-Native-JDBC写入Clickhouse 目前通过JDBC写Clickhouse有两种插件可以用 官方的JDBC:8123 ...

  2. 【用户画像】功能实现值写入ClickHouse人群包、预估和更新分群人数,NoSQL数据库介绍

    文章目录 一 写入ClickHouse人群包 1 组合查询Bitmap表SQL代码实现 (1)SQL语句分析 (2)实现思路 (3)实现过程 controller层 service层 Taginfo实 ...

  3. python 点云las、laz文件的读取、写入、压缩

    1. python las读取与写入 用laspy库: https://pypi.org/project/laspy/ 获取las.laz的规格.点数据格式,点数据的具体字段名称,具体的点数据 解析l ...

  4. Java CSV文件读取、写入及追加

    Java CSV文件读取.写入及追加 https://blog.csdn.net/liq816/article/details/81286472 追加: FileOutputStream out = ...

  5. C#尝试读取或写入受保护的内存。这通常指示其他内存已损坏。

    用VS2012调试时发现在调用数据集时提示"尝试读取或写入受保护的内存.这通常指示其他内存已损坏." 用管理员身份运行CMD,输入netsh winsock reset并回车 转载 ...

  6. xml 和android脚本之家,Android利用Document实现xml读取和写入操作

    本文实例为大家分享了利用Document实现xml读取和写入操作,供大家参考,具体内容如下 首先先来介绍一下什么xml?xml是可扩展标记语言,他可以用来标记数据,定义数据类型.是一种允许用户对自己标 ...

  7. 写入位置 0x00000004 时发生访问冲突_HDFS读取和写入数据简介

    HDFS 的文件访问机制为流式访问机制,即通过 API 打开文件的某个数据块之后,可以顺序读取或者写入某个文件.由于 HDFS 中存在多个角色,且对应的应用场景主要为一次写入.多次读取的场景,因此其读 ...

  8. java连接Excel数据库读取,写入,操纵Excel表格

    java连接Excel数据库读取,写入,操纵Excel表格 (2009-11-15 14:21:03) 转载 标签: java excel 连接 杂谈 分类:技术文档 java连接MicroSoft ...

  9. python读取、写入、移动、复制文件(夹)以及其他关于文件(夹)的操作

    文章目录 基础操作 判断文件或者目录是否存在 创建目录 连接两个路径成为一个路径 读取文件 写入文件 移动文件(夹) 复制文件(夹) 突然发现,经常需要进行文件操作,因为如果程序运行时间很长,我们需要 ...

最新文章

  1. 张亚勤:领导者的3种能力
  2. 拖链电缆 机器人电缆_选购电缆拖链需要注意哪些要点
  3. cudamemcpy运行速度很慢_只要设置好这几个选项,让你的 PS CC 2019 运行如飞
  4. NOIP模拟测试24「star way to hevaen·lost my music」
  5. xamarin和mysql_Xamarin.Android 使用 SQLiteOpenHelper 进行数据库操作
  6. php添加导航和删除导航,新增/修改/删除ECSHOP后台左侧导航菜单
  7. 【推荐】一个移动开发的网站
  8. 双鉴探测器是哪两种探测方式结合_报警的基本知识
  9. Shell设置环境变量
  10. 南阳oj入门题-兰州烧饼
  11. 2017“硅谷技划”日记之五:从组织者眼光看Google IO大会
  12. android桌面壁纸显示不全屏显示,手机壁纸怎么全屏 全屏显示手机壁纸方法
  13. 金融行业必看20部电影
  14. (信息学奥赛一本通 1299)糖果#线性动态规划#
  15. Linux下Mysql数据库
  16. Word基础(三十五)题注的插入与删除
  17. 汇编语言笔记-ARM汇编器伪操作
  18. 中国移动合肥移动面试总结(计算机类)
  19. 5g有线工业级路由器 有线无线自由切换
  20. 最简单的TAR,TPR,FAR,FPR的说明

热门文章

  1. javax.el.ELException: Provider com.sun.el.ExpressionFactoryImpl not found
  2. 马赛克,克星,真来了!
  3. 自动驾驶「无视」障碍物:百度研究人员攻陷激光雷达
  4. Github标星3w+,热榜第一,如何用Python实现所有算法
  5. python scipy模块文档_scipy模块stats文档
  6. php怎么关联默认打开程序,win10系统打开文件时提示“请在默认程序控制面板中创建关联”如何解决...
  7. 类方法的实例python_Python Class 的实例方法/类方法/静态方法
  8. cocos2d python文档_【Cocos2D-X 学习笔记】Cocos2D-x 3.0+VS开发环境搭建[使用Python]
  9. linux内核_查看Linux内核版本
  10. pdm生成mysql sql语句_如何用Powerdesigner的PDM生成数据库