前言

当前Iceberg仅支持flink 1.11.x 使用Datastream和Table API写iceberg表,鉴于hive catalog 的测试暂时未通过 参考ISSUE,
故以下使用hadoop catalog记录下过程,后面测试完成后再行补充。

提示:以下是本篇文章正文内容,下面案例可供参考

一、什么是Iceberg?

Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.

Iceberg作为一个数据湖解决方案它 支持 ACID 事务、(基于spark.2.4.5+以上)的修改和删除、独立于计算引擎、支持表结构和分区方式动态变更等特性,
同时,为了支持流式数据的写入,引入 Flink 作为流式处理框架,并将 Iceberg 作为 Flink sink的终表解决方案。
当然了,iceberg并不支持row level  update,仅支持insert into/overwrite, iceberg团队在未来将致力于解决这个问题

在当前的flink iceberg集成工作中,有一些特性当前还不支持,如下所示:

  • 不支持创建隐士分区
  • 不支持创建包含计算列的iceberg表
  • 不支持创建带水印的iceberg表
  • 不支持添加列、删除列、重命名列、更改列
  • 不支持flink使用流模式读iceberg表
Feature support Flink 1.11.0 Notes
SQL create catalog ✔️  
SQL create database ✔️  
SQL create table ✔️  
SQL alter table ✔️ Only support altering table properties(仅限于hive catalog表), Columns/PartitionKey changes are not supported now
SQL drop_table ✔️  
SQL select ✔️ Only support batch mode now.
SQL insert into ✔️ ️ Support both streaming and batch mode
SQL insert overwrite ✔️ ️  
DataStream read ✔️ ️  
DataStream append ✔️ ️  
DataStream overwrite ✔️ ️  
Metadata tables

二、Flink CLI 测试读写Iceberg表

1.启动客户端

代码如下(示例):

#flink-1.11.2
# 相关包下载地址:https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-hive-runtime/0.10.0/iceberg-hive-runtime-0.10.0.jar
./bin/sql-client.sh embedded \-j /data/flink-1.11.2/lib/iceberg-flink-runtime-0.10.0.jar \-j /data/flink-1.11.2/lib/flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar \shell

2.DDL测试

代码如下(示例):

-- 1. 创建hadoop_catalog
CREATE CATALOG hadoop_catalog WITH ('type'='iceberg','catalog-type'='hadoop','warehouse'='hdfs://nameservice1/data/iceberg','property-version'='1'
);
[INFO] Catalog has been created.-- 2. 创建database
Flink SQL> CREATE DATABASE iceberg_db;
[INFO] Database has been created.Flink SQL> use iceberg_db;-- 3.创建非分区表和分区表
Flink SQL> CREATE TABLE sample_iceberg (
>     id BIGINT COMMENT 'unique id',
>     data STRING
> );
[INFO] Table has been created.Flink SQL> CREATE TABLE sample_iceberg_partition (
>     id BIGINT COMMENT 'unique id',
>     data STRING
> ) PARTITIONED BY (data);
[INFO] Table has been created.insert into sample_iceberg values (1,'test1');
insert into sample_iceberg values (2,'test2');INSERT into sample_iceberg_partition PARTITION(data='city') SELECT 86;-- 4.
Flink cli query ...-- 5. 查看存储结构
[bigdata03:flink-1.11.2]17:16:22$ hadoop fs -ls -r /data/iceberg/iceberg_db/sample_iceberg/metadata
Found 8 items
-rw-r--r--   2 app_prd supergroup          1 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/version-hint.text
-rw-r--r--   2 app_prd supergroup       2808 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/v3.metadata.json
-rw-r--r--   2 app_prd supergroup       1795 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/v2.metadata.json
-rw-r--r--   2 app_prd supergroup        816 2021-01-07 16:36 /data/iceberg/iceberg_db/sample_iceberg/metadata/v1.metadata.json
-rw-r--r--   2 app_prd supergroup       3020 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/snap-8264611366906128313-1-b4da00c0-19cc-45da-a386-e13d2a63bb92.avro
-rw-r--r--   2 app_prd supergroup       3092 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/snap-3280093718175212361-1-d6b2c7d9-08be-4f9e-bc5e-bb302aae6a4e.avro
-rw-r--r--   2 app_prd supergroup       4580 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/d6b2c7d9-08be-4f9e-bc5e-bb302aae6a4e-m0.avro
-rw-r--r--   2 app_prd supergroup       4582 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/metadata/b4da00c0-19cc-45da-a386-e13d2a63bb92-m0.avro
[bigdata03:flink-1.11.2]17:16:34$ hadoop fs -ls -r /data/iceberg/iceberg_db/sample_iceberg/data
Found 2 items
-rw-r--r--   2 app_prd supergroup        664 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/data/00000-0-548654c2-77dd-46a5-827a-9a207df1e4f6-00001.parquet
-rw-r--r--   2 app_prd supergroup        665 2021-01-07 16:52 /data/iceberg/iceberg_db/sample_iceberg/data/00000-0-42ab18d5-5d71-4efa-9eea-2438cfc4b8f7-00001.parquet

三、使用编程SQL方式读写Iceberg表

1. 依赖添加

        <dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-flink-runtime</artifactId><version>0.10.0</version></dependency>

2. 部分代码实现

// 使用table api 创建 hadoop catalogTableResult tableResult = tenv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" +"  'type'='iceberg',\n" +"  'catalog-type'='hadoop',\n" +"  'warehouse'='hdfs://nameservice1/tmp',\n" +"  'property-version'='1'\n" +")");// 使用catalogtenv.useCatalog("hadoop_catalog");// 创建库tenv.executeSql("CREATE DATABASE if not exists iceberg_hadoop_db");tenv.useDatabase("iceberg_hadoop_db");// 创建iceberg 结果表tenv.executeSql("drop table hadoop_catalog.iceberg_hadoop_db.iceberg_001");tenv.executeSql("CREATE TABLE  hadoop_catalog.iceberg_hadoop_db.iceberg_001 (\n" +"    id BIGINT COMMENT 'unique id',\n" +"    data STRING\n" +")");// 测试写入tenv.executeSql("insert into hadoop_catalog.iceberg_hadoop_db.iceberg_001 select 100,'abc'");

3. 创建hive的外部表来实时查询iceberg表

hive> add jar /tmp/iceberg-hive-runtime-0.10.0.jar;hive> CREATE EXTERNAL TABLE tmp.iceberg_001(id bigint,data string)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION '/tmp/iceberg_hadoop_db/iceberg_001';hive> select * from tmp.iceberg_001;
OK
100     abc
1001    abcd
Time taken: 0.535 seconds, Fetched: 2 row(s)

总结

以上再测试过程中发现以下问题:
1、无界的流式数据不支持overwrite写入hadoop catalog表

Flink SQL> INSERT OVERWRITE sample_iceberg_partition PARTITION(data='city') SELECT 86;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Unbounded data stream doesn't support overwrite operation.

2、暂不支持hadoop catalog更改表名等操作,当前仅支持更新设置属性和删除

Flink SQL>ALTER TABLE sample_iceberg SET ('write.format.default'='avro');
[INFO] Alter table succeeded!

Flink SQL> ALTER TABLE sample_iceberg RENAME TO sample_iceberg_test;
[ERROR] Could not execute SQL statement. Alter table failed! Reason:
java.lang.UnsupportedOperationException: Cannot rename Hadoop tables

Flink SQL> CREATE TABLE sample (
>     id BIGINT COMMENT 'unique id',
>     data STRING
> );
[INFO] Table has been created.

Flink SQL> show tables;
sample
sample_iceberg
sample_iceberg_partition

Flink SQL> drop table  sample;
[INFO] Table has been removed.

以上就是我在测试时候遇到的问题,后面尝试使用hive catalog 并且结合kafka消费流式数据再做个补充。
>> 完 <<

Flink结合Iceberg的一种实现方式笔记相关推荐

  1. Flink、Iceberg和Hive的Catalog比较研究

    所谓Catalog即数据目录,简单讲,Catalog是企业用于管理数据资产的方式,Catalog借助元数据来管理数据,包括数据收集.组织.访问.发现和治理.可见,Catalog在数据资产管理中处于核心 ...

  2. Flink 和 Iceberg 如何解决数据入湖面临的挑战

    简介:4.17 上海站 Meetup 胡争老师分享内容:数据入湖的挑战有哪些,以及如何用 Flink + Iceberg 解决此类问题. GitHub 地址 https://github.com/ap ...

  3. Flink集成Iceberg在同程艺龙的实践

    简介:本文由同城艺龙大数据开发工程师张军分享,主要介绍同城艺龙 Flink 集成 Iceberg 的生产实践. 本文由同城艺龙大数据开发工程师张军分享,主要介绍同城艺龙 Flink 集成 Iicebe ...

  4. Apache Doris 整合 FLINK CDC + Iceberg 构建实时湖仓一体的联邦查询

    1.概览 这篇教程将展示如何使用 Flink CDC + Iceberg + Doris 构建实时湖仓一体的联邦查询分析,Doris 1.1版本提供了Iceberg的支持,本文主要展示Doris和Ic ...

  5. Hive metastore三种配置方式

    Hive的meta数据支持以下三种存储方式,其中两种属于本地存储,一种为远端存储.远端存储比较适合生产环境.Hive官方wiki详细介绍了这三种方式,链接为:Hive Metastore. 一.本地d ...

  6. c语言程序设计分段定时器,单片机C语言编程定时器的几种表达方式

    原标题:单片机C语言编程定时器的几种表达方式 吴鉴鹰单片机开发板地址 店铺:[吴鉴鹰的小铺] 地址:[https://item.taobao.com/item.htm?_u=ukgdp5a7629&a ...

  7. C++中的两种绑定方式(静态绑定、动态绑定)

    两种绑定方式 静态绑定:在编译时刻,根据指针或引用变量的静态类型来决定成员函数属于哪一个类. 动态绑定:在运行时刻,根据指针或引用变量实际指向或引用的对象类型(动态类型)来确定成员函数属于哪一个类. ...

  8. python数据结构与算法:二叉树及三种遍历方式(先序遍历/中序遍历/后序遍历)

    树的实现采用queue的形式: 树的三种遍历方式(广度优先白能力法):先序遍历(根左右),中序遍历(左根右)以及后序遍历(左右根) ######################P6.4 数据结构### ...

  9. Java多线程的11种创建方式以及纠正网上流传很久的一个谬误

    创建线程比较传统的方式是继承Thread类和实现Runnable,也可以用内部类,Lambda表达式,线程池,FutureTask等. 经常面试会问到继承Thread类和实现Runnable的区别,然 ...

最新文章

  1. 关于spring中commons-attributes-compiler.jar的使用问题
  2. 机器学习中的数学基础(1)——向量和范数
  3. Dubbo 线上调服务方法空指针问题
  4. 电梯里为什么放镜子?90%的人都不知道
  5. C语言中执行python代码或源程序文件(高级嵌入方式)
  6. windows 下 vmware 安装 Mac X lion 10.7 终极教程!
  7. 台达A2/B2伺服电机编码器改功率软件 台达A2/B2伺服电机编码修改, 用于更换编码器写匹配电机参数
  8. Python入门学习—列表(FishC)
  9. redis下载与安装(windows版)
  10. android客户端服务器传输,【图片】【转】通过Android 客户端上传数据到服务器【aide吧】_百度贴吧...
  11. C’est lavie
  12. 关于各种网站音频mp3的外链地址,真实的外链播放地址
  13. excel打不开html超链接,mac excel打不开超链接
  14. 存储过程(无参,IN多个输入参数,OUT多个输出参数,INOUT输入输出)
  15. 真正程序员的工资是怎样的?
  16. 为什么肯德基和麦当劳总是开在一起?
  17. MT8732 / MT8735处理器特点/芯片组型号资料介绍
  18. 利用MATLAB求系统响应
  19. 【持续更新】1999-2023年英伟达历代桌面GeForce显卡列表,GeForce显卡发布日期
  20. Maven 手工上传JAR包到私有仓库

热门文章

  1. 高考数学题目:导数及其简单应用
  2. 世界上最美的72个地方
  3. [JZOJ5445]【NOIP2017提高A组冲刺11.2】失格
  4. Python 正则表达式里的单行s和多行m模式
  5. oracle查询导致 gc等待,如何诊断Oracle RAC系统中的等待事件gc cr multi block request?...
  6. 未明学院活动:领跑新年活动!寒假1个月,掌握券商/互联网/机器学习等七大热门行业实战技能!
  7. 小白最优化学习(四) 算法学习 不精确一维搜索方法
  8. SAP采购订单行项目中的免费和发票收据的同步逻辑
  9. 带你深入了解Java!十七、超市会员管理系统!
  10. Paint---FontMetrics