• 背景
  • iceberg简介
  • flink实时写入
    • 准备sql client环境
    • 创建catalog
    • 创建db
    • 创建table
    • 插入数据
    • 查询
    • 代码版本
  • 总结

背景

随着大数据处理结果的实时性要求越来越高,越来越多的大数据处理从离线转到了实时,其中以flink为主的实时计算在大数据处理中占有重要地位。

Flink消费kafka等实时数据流。然后实时写入hive,在大数据处理方面有着广泛的应用。此外由于列式存储格式如parquet或者orc在查询性能方面有着显著的提高,所以大家都会优先选择列式存储作为我们的存储格式。

传统的这种架构看似不错,但是还是有很多没有解决的问题:

  • 实时写入造成大量小文件,需要单独的程序来进行合并
  • 实时的写入,读取,还有合并小文件在同时进行,那么如何保证事务,读取数据的时候不会出现脏读。
  • Hdfs的数据一般是一次写入。多次读写,但是如果因为程序出错导致数据错了,确实要修改某一条数据改怎么办
  • 消费kafka的数据落地到hive,有一天kafka的数据多了几个字段,如何同步到hive?必须删了重建吗?
  • 订单等业务数据一般存储在传统数据库,如mysql等。如何实时同步这些cdc数据到hive仓库呢,包括ddl和dml

如果你有上面的需求,那么你可以考虑一下数据湖了,目前开源的数据湖技术主要有以下几个:delta、hudi、iceberg,但是侧重点有所不同,我上面说的问题也不完全都能实现,但是这些都是数据湖要做的东西,随着社区的不断发展,这些功能都会有的。

一些介绍可以参考下这个ppt 【基于Flink+Iceberg构建企业级实时数据湖.pdf】

但是目前世面上这些数据湖技术都与spark紧密绑定。而我们目前实时计算主要以flink为主,而且我个人觉得未来实时计算也将以flink为主,所以我选择了iceberg为我们的数据湖,虽然他有一些功能不是很完善,但是有着良好的抽象,并且不强制绑定spark,所以对于iceberg没有的功能,我们可以自己给补全,再回馈给社区,一起成长。

iceberg简介

其实对于iceberg,官方的定义是一种表格式。

Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.

我们可以简单理解为他是基于计算层(flink , spark)和存储层(orc,parqurt)的一个中间层,我们在hive建立一个iceberg格式的表。用flink或者spark写入iceberg,然后再通过其他方式来读取这个表,比如spark,flink,presto等。

当然数据湖的概念远不止这点,我们今天就先简单的这么理解,后续写一篇文章专门介绍一下iceberg。

flink实时写入

目前官网的flink集成iceberg写入的时候有一个小bug,我改了改,自己重新编译打了一个包。接下来我们使用flink sql client来测试一下如何使用flink将实时的流数据写入iceberg,然后使用presto查询结果。

准备sql client环境

目前官方的测试版本是基于scala 2.12版本的flink。所以我们也用和官方同步的版本来测试下,下载下面的两个jar放到flink的lib下面,然后启动一下flink集群,standalone模式。

  • 下载flink :flink-1.11.2-bin-scala_2.12.tgz
  • 下载 iceberg-flink-runtime.jar 这个包目前版本(0.9.1)没有提供,需要的话需要自己编译一下,我编译好了一个,并且该了创建catalog的bug,可以来这里获取。

https://github.com/zhangjun0x01/bigdata-examples/tree/master/iceberg/libs/iceberg-flink-runtime-0.9.1.jar

  • 下载flink 集成hive的connector,flink-sql-connector-hive-2.3.6_2.12-1.11.2.jar
  • 目前官方的hive测试版本是 2.3.7,其他的版本可能有不兼容

注意要配置flink的checkpoint,因为目前flink提交iceberg的信息是在每次checkpoint的时候提交的。在sql client配置checkpoint的方法如下:

在flink-conf.yaml添加如下配置

execution.checkpointing.interval: 10s   # checkpoint间隔时间
execution.checkpointing.tolerable-failed-checkpoints: 10  # checkpoint 失败容忍次数

创建catalog

iceberg在创建catalog的时候有一个小bug,他需要一个warehouse,但是系统没有提供,根据issue的讨论,借鉴flink集成hive,大家更倾向于提供一个hive-site.xml配置,但是如果是配置一个本地路径的话,对于flink application mode提交任务是有问题的,因为这种模式用户程序的加载是在flink的jobmanager端的,可能那个机器是没hive-site.xml配置文件的。所以我自己写了一个方案,提供一个hive-site.xml的配置路径,可以是本地或者hdfs路径,如果是hdfs,则先下载到本地,然后再加载。

相关的issue[1]和pr[2]

官方提供的创建catalog的版本ddl如下:

CREATE CATALOG iceberg WITH ('type'='iceberg','catalog-type'='hive','uri'='thrift://localhost:9083'
);

我修改后的DDL如下:

CREATE CATALOG iceberg WITH ('type'='iceberg','catalog-type'='hive','hive-site-path'='/Users/user/work/hive/conf/hive-site.xml1'
)

执行完之后,显示如下:

Flink SQL> show catalogs;
default_catalog
iceberg

我自己测试了一下,在flink的多种提供模式下都是没有问题的(sql client、standalnoe、yarn sesson、yarn per job、yarn application)。

创建db

use catalog iceberg;
CREATE DATABASE iceberg_db;
USE iceberg_db;

创建table

CREATE TABLE iceberg.iceberg_db.iceberg_001 (id BIGINT COMMENT 'unique id',data STRING
) WITH ('connector'='iceberg','write.format.default'='ORC');

插入数据

我们依然创建一个datagen的connector。

CREATE TABLE sourceTable (userid int,f_random_str STRING
) WITH ('connector' = 'datagen','rows-per-second'='100','fields.userid.kind'='random','fields.userid.min'='1','fields.userid.max'='100',
'fields.f_random_str.length'='10'
)

这时候我们看到有两个表了

Flink SQL> show tables;
iceberg_001
sourcetable

然后执行insert into插入数据:

insert into iceberg.iceberg_db.iceberg_001 select * from iceberg.iceberg_db.sourceTable

查询

我们这里使用presto来查询

presto的配置iceberg.properties 如下:

connector.name=iceberg
hive.metastore.uri=thrift://localhost:9083

代码版本

public class Flink2Iceberg{public static void main(String[] args) throws Exception{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(10000);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);tenv.executeSql("CREATE CATALOG iceberg WITH (n" +"  'type'='iceberg',n" +"  'catalog-type'='hive'," +//"  'hive-site-path'='hdfs://localhost/data/flink/conf/hive-site.xml'" +"  'hive-site-path'='/Users/user/work/hive/conf/hive-site.xml'" +")");tenv.useCatalog("iceberg");tenv.executeSql("CREATE DATABASE iceberg_db");tenv.useDatabase("iceberg_db");tenv.executeSql("CREATE TABLE sourceTable (n" +" userid int,n" +" f_random_str STRINGn" +") WITH (n" +" 'connector' = 'datagen',n" +" 'rows-per-second'='100',n" +" 'fields.userid.kind'='random',n" +" 'fields.userid.min'='1',n" +" 'fields.userid.max'='100',n" +"'fields.f_random_str.length'='10'n" +")");tenv.executeSql("insert into iceberg.iceberg_db.iceberg_001 select * from iceberg.iceberg_db.sourceTable");}
}

具体见:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/iceberg/src/main/java/com/Flink2Iceberg.java

总结

总结一下,iceberg的资料比较少,很多设计或者讨论等需要关注issues,然后再去撸源码,可能对于刚入门的小伙伴来说有点困难。后续我也会多分享一些关于iceberg的文章,欢迎大家关注我公众号【大数据技术与应用实战】。

参考:
[1].https://github.com/apache/iceberg/issues/1437
[2].https://github.com/apache/iceberg/pull/1527

phython在file同时写入两个_Flink集成数据湖之实时数据写入iceberg相关推荐

  1. Flink集成数据湖之实时数据写入iceberg

    文章目录 背景 iceberg简介 flink实时写入 准备sql client环境 创建catalog 创建db 创建table 插入数据 查询 代码版本 总结 背景 随着大数据处理结果的实时性要求 ...

  2. 两条华子也换不来的数据湖讲解

    前言 数据湖一词进入我的视野是两年前,我记得当时是我们老板给做了DataBricks的技术分享报告,那个时候其实是介绍Spark的一些新特性,然后顺便介绍了数据湖.在此期间,数据湖技术也由一开始的想法 ...

  3. 倒计时1天 | 三位 Apache PMC or Committer,两位名企负责人,纵论畅游数据湖体系之密...

    点击蓝字 关注我们 数字化转型的政策,吹响了深化大数据的号角.而前沿的大数据应用,离不开前沿数据基础设施.说到这一点,当前最令人兴奋的非数据湖建设莫属,这不,2021 年挺近 Apache 孵化器的 ...

  4. phython在file同时写入两个_轻松支撑百万级数据点写入 京东智联云时序数据库HoraeDB架构解密...

    本文将通过对时序数据的基本概念.应用场景以及京东智联云时序数据库HoraeDB的介绍,为大家揭秘HoraeDB的核心技术架构和解决方案. 首先我们来了解下时序数据库的基本概念.时序数据库全称时间序列数 ...

  5. 执行一次怎么会写入两次数据_浅谈 Redis 数据持久化之 AOF 模式

    我们知道 Redis 之所以读写快.性能高,得益于它是一种基于内存的数据库,毫无疑问它的操作都几乎都是基于内存.但是内存型数据库也有一个很大的弊端:如果进程崩溃或者服务重启的时候内存数据得不到保存,就 ...

  6. 执行一次怎么会写入两次数据_Java进阶知识:一文详解缓存Redis的持久化机制,新手看完也会用

    Redis 的数据全部在内存里,如果突然宕机,数据就会全部丢失,因此必须有一种机制来保证 Redis 的数据不会因为故障而丢失,这种机制就是 Redis 的持久化机制. Redis有两种持久化的方式: ...

  7. winfrom里面使用类似于table的合并的控件_Flink集成iceberg数据湖之合并小文件

    背景 使用 流式数据入湖 开启压缩程序 快照过期 删除无用文件 数据查询 遇到的坑 最大并发度问题 文件被重复压缩 扫描任务读取文件问题 不读取大文件 优化生成CombinedScanTask 后续问 ...

  8. 移动硬盘 Windows-延缓写入失败:无法为某文件保存所有数据,数据已经丢失

    Windows延缓写入失败:Windows无法为某盘某文件保存所有数据,数据已经丢失.这个错误可能是由于您的计算机硬件或网络连接的失败导致的. 提示"Windows-延缓写入失败" ...

  9. linux qt写入excel文件内容,Qt 读取Excel表格数据 生成Excel表格并写入数据

    Qt 读取Excel表格数据 生成Excel表格并写入数据 Qt 读取Excel表格数据 生成Excel表格并写入数据 修改.pro文件,增加 axcontainer QT += axcontaine ...

最新文章

  1. 在项目中使用JMail发送邮件
  2. boost::callable_traits添加const成员的测试程序
  3. 离婚率逐年上升,数据分析告诉你背后的主因竟然是它!
  4. 实现Trie(前缀树)
  5. Flask初级(十)flash与前台交互post详解
  6. 不可多得的干货!BAT大厂Java面试真题锦集干货整理
  7. IntelliJ IDEA 14 license key gen
  8. html5 PHP 分片上传,H5分片上传含前端JS和后端处理(thinkphp)
  9. MySQL error(2014) Commands out of sync; you can't run this command now(情形2)
  10. 无线局域网和蜂窝移动网络_手机连上无线网络后,应不应该关掉移动数据?
  11. 2050年这些职业将逐渐被AI(人工智能)取代
  12. 直到输到-1停止 c语言,python新人求助raw_input()问题,不断提示输入字元或数字直到输入空值停止提示。...
  13. OCSP 在SSL证书中起什么作用
  14. 利用CASS使用三种方法计算两期土方
  15. CSDN客服联系方式(有QQ联系方式)
  16. ML—广义线性模型导论
  17. Istio官方文档翻译
  18. DEV C++编写程序出现 [errror]Id returned 1 exit status报错可能出现的原因及解决办法
  19. Android CPU使用率:top和dump cpuinfo的不同
  20. STM32端口重映射

热门文章

  1. 程序员需知的 58 个网站,墙裂推荐!
  2. 如何查已经欠费的联通手机号码
  3. 【Java并发编程 四】Java的进程与线程
  4. 拼多多618手机品牌官旗销量同比增长124%,4000+高价位手机同比增长156%
  5. 【上课课件整理复习】第六章 网页数据的采集(1)
  6. 千兆12光12电工业级环网交换机24口全千兆二层网管型机架式工业以太网交换机
  7. 安装wine及相关软件
  8. Tga图片格式分析以及程序实现
  9. 乐学习知选择--我的J2EE技术历程
  10. 技术问答站点与论坛为什么半死不活