系列文章目录

实践数据湖iceberg 第一课 入门
实践数据湖iceberg 第二课 iceberg基于hadoop的底层数据格式
实践数据湖iceberg 第三课 在sqlclient中,以sql方式从kafka读数据到iceberg
实践数据湖iceberg 第四课 在sqlclient中,以sql方式从kafka读数据到iceberg(升级版本到flink1.12.7)
实践数据湖iceberg 第五课 hive catalog特点
实践数据湖iceberg 第六课 从kafka写入到iceberg失败问题 解决
实践数据湖iceberg 第七课 实时写入到iceberg
实践数据湖iceberg 第八课 hive与iceberg集成
实践数据湖iceberg 第九课 合并小文件
实践数据湖iceberg 第十课 快照删除
实践数据湖iceberg 第十一课 测试分区表完整流程(造数、建表、合并、删快照)
实践数据湖iceberg 第十二课 catalog是什么
实践数据湖iceberg 第十三课 metadata比数据文件大很多倍的问题
实践数据湖iceberg 第十四课 元数据合并(解决元数据随时间增加而元数据膨胀的问题)
实践数据湖iceberg 第十五课 spark安装与集成iceberg(jersey包冲突)
实践数据湖iceberg 第十六课 通过spark3打开iceberg的认知之门
实践数据湖iceberg 第十七课 hadoop2.7,spark3 on yarn运行iceberg配置
实践数据湖iceberg 第十八课 多种客户端与iceberg交互启动命令(常用命令)
实践数据湖iceberg 第十九课 flink count iceberg,无结果问题
实践数据湖iceberg 第二十课 flink + iceberg CDC场景(版本问题,测试失败)
实践数据湖iceberg 第二十一课 flink1.13.5 + iceberg0.131 CDC(测试成功INSERT,变更操作失败)
实践数据湖iceberg 第二十二课 flink1.13.5 + iceberg0.131 CDC(CRUD测试成功)
实践数据湖iceberg 第二十三课 flink-sql从checkpoint重启
实践数据湖iceberg 第二十四课 iceberg元数据详细解析
实践数据湖iceberg 第二十五课 后台运行flink sql 增删改的效果
实践数据湖iceberg 第二十六课 checkpoint设置方法
实践数据湖iceberg 第二十七课 flink cdc 测试程序故障重启:能从上次checkpoint点继续工作
实践数据湖iceberg 第二十八课 把公有仓库上不存在的包部署到本地仓库
实践数据湖iceberg 第二十九课 如何优雅高效获取flink的jobId
实践数据湖iceberg 第三十课 mysql->iceberg,不同客户端有时区问题
实践数据湖iceberg 第三十一课 使用github的flink-streaming-platform-web工具,管理flink任务流,测试cdc重启场景
实践数据湖iceberg 第三十二课 DDL语句通过hive catalog持久化方法
实践数据湖iceberg 第三十三课 升级flink到1.14,自带functioin支持json函数
实践数据湖iceberg 第三十四课 基于数据湖icerberg的流批一体架构-流架构测试
实践数据湖iceberg 第三十五课 基于数据湖icerberg的流批一体架构–测试增量读是读全量还是仅读增量
实践数据湖iceberg 第三十六课 基于数据湖icerberg的流批一体架构–update mysql select from icberg语法是增量更新测试
实践数据湖iceberg 第三十七课 kakfa写入iceberg的 icberg表的 enfource ,not enfource测试
实践数据湖iceberg 更多的内容目录


文章目录

  • 系列文章目录
  • 前言
  • 一、测试思路
  • 二、测试not enforced代码
    • 2.1 测试代码
    • 2.2 制造数据
    • 2.3 运行结果
    • 2.4 运行结论
  • 三、 改为enforce,报错
    • 3.1 测试代码
  • 四、 'write.upsert.enabled' = 'true', 设置这个参数实现upsert功能
  • 总结

前言

测试 iceberg读取kafka的数据,能否根据kafka上的id,入湖时,自动更新iceberg的数据,对这个场景进行测试
测试结果:1.iceberg对从kafka流入的数据,默认是追加写的 2.通过 给iceberg表设置 ‘write.upsert.enabled’ = 'true 参数,可以实现upsert模式


一、测试思路

从kafka制造数据写入iceberg,iceberg设置pk时,观察是追加写入还是更新。

二、测试not enforced代码

2.1 测试代码

测试思路: 1. select from kafka
2. insert to iceberg
代码如下:

CREATE TABLE IF NOT EXISTS KafkaTableTest2_XXZH (`id` bigint,`data` STRING
) WITH ('connector' = 'kafka','topic' = 'test2_xxzh','properties.bootstrap.servers' = 'hadoop101:9092,hadoop102:9092,hadoop103:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'latest-offset','csv.ignore-parse-errors'='true','format' = 'csv'
);CREATE CATALOG hive_iceberg_catalog WITH ('type'='iceberg','catalog-type'='hive','uri'='thrift://hadoop101:9083','clients'='5','property-version'='1','warehouse'='hdfs:///user/hive/warehouse/hive_iceberg_catalog'
);
use catalog hive_iceberg_catalog;
CREATE TABLE IF NOT EXISTS ods_base.IcebergTest2_XXZH (`id` bigint,`data` STRING,primary key (id) not enforced
)with('write.metadata.delete-after-commit.enabled'='true','write.metadata.previous-versions-max'='5','format-version'='2');insert into  hive_iceberg_catalog.ods_base.IcebergTest2_XXZH select * from default_catalog.default_database.KafkaTableTest2_XXZH;

2.2 制造数据

[root@hadoop101 conf]#  kafka-console-producer.sh --broker-list  hadoop101:9092,hadoop102:9092,hadoop103:9092  --topic test2_xxzh
>1,abc
[2022-07-22 14:55:51,643] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test2_xxzh=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>2,bb
>3,cc
>4,dd
>5,ee
>3,cccc
>6,666
>4,ddddd
>

2.3 运行结果

spark-sql (default)> select *  from ods_base.IcebergTest2_XXZH;
22/07/22 15:12:28 WARN HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist
id      data
3       cc
4       ddddd
5       ee
3       cccc
6       666
4       dd
Time taken: 0.405 seconds, Fetched 6 row(s)

flink-sql的运行结果:

2.4 运行结论

无法根据kafka声明的pk对iceberg进行更新。 iceberg是追加的模式写入的。


三、 改为enforce,报错

3.1 测试代码

iceberg表的pk 改为enforced,重跑


Flink SQL> CREATE TABLE IF NOT EXISTS KafkaTableTest3_XXZH (
>     `id` bigint,
>     `data` STRING
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 'test2_xxzh',
>     'properties.bootstrap.servers' = 'hadoop101:9092,hadoop102:9092,hadoop103:9092',
>     'properties.group.id' = 'testGroup',
>     'scan.startup.mode' = 'latest-offset',
>     'csv.ignore-parse-errors'='true',
>     'format' = 'csv'
> );
>
[INFO] Execute statement succeed.Flink SQL> CREATE CATALOG hive_iceberg_catalog WITH (
>     'type'='iceberg',
>     'catalog-type'='hive',
>     'uri'='thrift://hadoop101:9083',
>     'clients'='5',
>     'property-version'='1',
>     'warehouse'='hdfs:///user/hive/warehouse/hive_iceberg_catalog'
> );
[INFO] Execute statement succeed.Flink SQL> use catalog hive_iceberg_catalog;
[INFO] Execute statement succeed.Flink SQL> CREATE TABLE IF NOT EXISTS ods_base.IcebergTest3_XXZH (
>     `id` bigint,
>     `data` STRING,
>     primary key (id) enforced
> )with(
>     'write.metadata.delete-after-commit.enabled'='true',
>     'write.metadata.previous-versions-max'='5',
>     'format-version'='2'
>  );
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Flink doesn't support ENFORCED mode for PRIMARY KEY constraint. ENFORCED/NOT ENFORCED  controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data therefore the only supported mode is the NOT ENFORCED mode

报错信息:
org.apache.flink.table.api.ValidationException: Flink doesn’t support ENFORCED mode for PRIMARY KEY constraint. ENFORCED/NOT ENFORCED controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data therefore the only supported mode is the NOT ENFORCED mode

flink没有自己拥有这些数据,所以只支持的模式是非强值的。

结论: iceberg是没有进行根据pk对数据进行update

四、 ‘write.upsert.enabled’ = ‘true’, 设置这个参数实现upsert功能

CREATE TABLE IF NOT EXISTS ods_base.IcebergTest4_XXZH (`id` bigint,`data` STRING,primary key (id) not enforced
)with('format-version' = '2','write.upsert.enabled' = 'true','write.distribution-mode'='hash','write.metadata.delete-after-commit.enabled'='true','write.metadata.previous-versions-max'='3');
[root@hadoop102 module]#  kafka-console-producer.sh --topic test4_xxzh --broker-list hadoop101:9092,hadoop102:9092,hadoop103:9092
>2,222
>3,333  (这里暂停,去spark观察)
>2,bbbb
>3,cccc
>4,444
>5,555

初始化数据

spark-sql (default)> select * from  ods_base.IcebergTest4_XXZH ;
id      data
2       222
3       333

更新数据,id=2,3的内容都更新了

spark-sql (default)> select * from  ods_base.IcebergTest4_XXZH ;
22/07/26 19:24:58 WARN HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist
id      data
2       bbbb
4       444
5       555
3       cccc

总结

1.iceberg对从kafka流入的数据,默认是追加写的
2.通过 给iceberg表设置 ‘write.upsert.enabled’ = 'true 参数,可以实现upsert模式

实践数据湖iceberg 第三十七课 kakfa写入iceberg的 icberg表的 enfource ,not enfource测试相关推荐

  1. 实践数据湖iceberg 第二十四课 iceberg元数据详细解析

    系列文章目录 实践数据湖iceberg 第一课 入门 实践数据湖iceberg 第二课 iceberg基于hadoop的底层数据格式 实践数据湖iceberg 第三课 在sqlclient中,以sql ...

  2. 实践数据湖iceberg 第三十四课 基于数据湖icerberg的流批一体架构-流架构测试

    系列文章目录 实践数据湖iceberg 第一课 入门 实践数据湖iceberg 第二课 iceberg基于hadoop的底层数据格式 实践数据湖iceberg 第三课 在sqlclient中,以sql ...

  3. 实践数据湖iceberg 第二十一课 flink1.13.5 + iceberg0.131 CDC(测试成功INSERT,变更操作失败)

    系列文章目录 实践数据湖iceberg 第一课 入门 实践数据湖iceberg 第二课 iceberg基于hadoop的底层数据格式 实践数据湖iceberg 第三课 在sqlclient中,以sql ...

  4. 实践数据湖iceberg 第十四课 元数据合并(解决元数据随时间增加而元数据膨胀的问题)

    系列文章目录 实践数据湖iceberg 第一课 入门 实践数据湖iceberg 第二课 iceberg基于hadoop的底层数据格式 实践数据湖iceberg 第三课 在sqlclient中,以sql ...

  5. 实践数据湖iceberg 第九课 合并小文件

    系列文章目录 实践数据湖iceberg 第一课 入门 实践数据湖iceberg 第二课 iceberg基于hadoop的底层数据格式 实践数据湖iceberg 第三课 在sqlclient中,以sql ...

  6. 实践数据湖iceberg 第十二课 catalog是什么

    系列文章目录 实践数据湖iceberg 第一课 入门 实践数据湖iceberg 第二课 iceberg基于hadoop的底层数据格式 实践数据湖iceberg 第三课 在sqlclient中,以sql ...

  7. NeHe OpenGL教程 第三十七课:卡通映射

    转自[翻译]NeHe OpenGL 教程 前言 声明,此 NeHe OpenGL教程系列文章由51博客yarin翻译(2010-08-19),本博客为转载并稍加整理与修改.对NeHe的OpenGL管线 ...

  8. 开源数据湖方案选型:Hudi、Delta、Iceberg深度对比

    文章目录 前言: 共同点 一.Databricks 和 Delta 1.1.Delta的意图,解决的疼点 1.没有 Delta 数据湖之前存在的问题 : 二.Uber和Apache Hudi 三.Ne ...

  9. activeform表单中的旧数据怎么显示_三分钟为你细数 Vue el-form 表单校验的坑点

    背景 Vue 的 el-form 提供了表单校验功能,通过 :rules 属性设置校验规则,并通过 el-form-item 的 prop 属性绑定校验规则.通过封装,让前端校验更方便.具体使用过程中 ...

最新文章

  1. 【神经网络】(1) 简单网络,实例:气温预测,附python完整代码和数据集
  2. 如何处理win10系统内置Linux系统闪退问题
  3. sharepoint 2010 显示和隐藏Ribbon区域条
  4. python 功能键ord_ord()函数以及Python中的示例
  5. RabbitMQ+PHP 教程六(RPC)
  6. BZOJ3514 Codechef MARCH14 GERALD07加强版 LCT维护最大生成树 主席树
  7. AcWing(状态机模型) 1049. 大盗阿福
  8. python数值运算代码_Python数值
  9. tomcat web服务器优化
  10. 快进来,详解MySQL游标
  11. 命令行启动MySQL数据库
  12. 词云图,词频图,专门统计某些关键词的词云词频
  13. 一个简单移动页面ionic打包成app
  14. 热备份冗余技术HSRP
  15. C++源文件到可执行程序
  16. 微星MS16j9鼠标面板可以移动指针,无法通过面板点击
  17. PTA_Pra 1012 数字分类 (20分)
  18. Nowcoder java-二维数组中的查找
  19. Win7开启远程桌面——图文详解
  20. Vue中使用高德地图,简单明了

热门文章

  1. 别让我思考,和广告牌设计
  2. PCLINT(1) LINT 代码规范
  3. 积分体系设计必须了解的五个基础问题
  4. 移动 2G 3G 4G 5G 释义
  5. 计算机办公操作系统,计算机操作系统及办公软件的使用.ppt
  6. 半小时复习java全内容
  7. SEO人员,如何创建新闻源级别的文章?
  8. MobaXterm常用设置修改复制黏贴快捷键加开启保持ssh连接设置
  9. go语言封装http请求工具类(访问外部接口)
  10. 十年同舟十年情 达梦数据库献礼湖北省软件行业协会十周年