Flink sql实时读取Kafka写入Iceberg 实践亲测

前言

 本文记录了使用HDFS的一个路径作为iceberg 的结果表,使用Flink sql实时消费kafka中的数据并写入iceberg表,并且使用Hive作为客户端实时读取。话不多说,上代码!!!

测试环境:

JDK1.8
Flink 1.11.1
iceberg 0.11.0
Hadoop3.0.0
Hive2.1.1

测试前准备工作:

1.安装flink1.11.1
2.下载kafka、hive与flink的集成jar包
3.下载iceberg-flink-runtime-0.11.0.jar

测试步骤:

启动flink sql-client:

bin/sql-client.sh embedded -j lib/iceberg-flink=runtime-0.11.0.jar -j lib/flink-connector-hive.jar -j lib/flink-sql-connector-kafka.jar shell

创建hive_catalog:

CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://xxx:9083',
'warehouse'='hdfs://nameservice1/user/hive/warehouse',
'clients'='5',
'property-version'='1'
);

使用hive_catalog:

use catalog hive_catalog;

创建并使用数据库:

create database iceberg_db;
use iceberg_db;

创建iceberg目标表:

CREATE TABLE  hive_catalog.iceberg_db.iceberg01 (
user_id STRING COMMENT 'user_id',
order_amount DOUBLE COMMENT 'order_amount',
log_ts STRING
);

创建topic:source_kafka01,并写入数据:

kafka-console-producer.sh --broker-list kafka-server:9092 --topic source_kafka01
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:12:12"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:15:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:20:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"}
{"user_id":"a1111","order_amount":13.0,"log_ts":"2020-06-29 12:32:00"}
{"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:12:12"}

使用hive catalog创建kafka流表:

CREATE TABLE myhive.mydatabase.source_kafka01 (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3)
) WITH (
'connector'='kafka',
'topic'='source_kafka01',
'scan.startup.mode'='earliest-offset',
'properties.bootstrap.servers'='kafka-server:9092',
'properties.group.id' = 'testGroup',
'format'='json'
);

使用SQL连接kafka流表和iceberg目标表:

insert into hive_catalog.iceberg_db.iceberg01 select * from myhive.mydatabase.source_kafka01;

查看数据是否写入:

select * from hive_catalog.iceberg_db.iceberg01;

查看iceberg表的数据目录:

总结:

Iceberg与flink sql的集成,目前还存在不足,比如对column schema的修改、生成文件的大小控制等。不过社区一直在完善,欢迎持续关注。

Flinksql读取Kafka写入Iceberg 实践亲测相关推荐

  1. 【Spark】Spark Stream读取kafka写入kafka报错 AbstractMethodError

    1.概述 根据这个博客 [Spark]Spark 2.4 Stream 读取kafka 写入kafka 报错如下 Exception in thread "main" java.l ...

  2. 【Spark】Spark 2.4 Stream 读取kafka 写入kafka

    1.概述 昨天一网友写了一个spark程序 读取kafka写入kafka,结果数据就是无法写入,然后交给我看看,这个程序是spark stream ,这个东东我都没玩过,我用过spark struct ...

  3. demo flink写入kafka_Flink结合Kafka实时写入Iceberg实践笔记

    前言 上文提到使用Flink SQL写入hadoop catalog 的iceberg table 的简单示例,这次我就flink 消费kafka 流式写入iceberg table做一个验证,现记录 ...

  4. Flink结合Kafka实时写入Iceberg实践笔记

    前言 上文提到使用Flink SQL写入hadoop catalog 的iceberg table 的简单示例,这次我就flink 消费kafka 流式写入iceberg table做一个验证,现记录 ...

  5. flinksql实时读取kafka写入mysql

    因为flink版本迭代比较迅速,在我们进行代码的编写过程中容易出现版本不兼容的问题,为此本文是在flink版本为1.12.0的基础上完成开发的. 1:配置maven依赖(重要) <?xml ve ...

  6. java读写德卡数据_Spark Streaming 读取Kafka数据写入ES

    简介: 目前项目中已有多个渠道到Kafka的数据处理,本文主要记录通过Spark Streaming 读取Kafka中的数据,写入到Elasticsearch,达到一个实时(严格来说,是近实时,刷新时 ...

  7. flink写入iceberg(没有搞完)

    Reference: [1]Flink集成数据湖之实时数据写入iceberg [2]在 Flink 中使用 iceberg [3]基于 Flink+Iceberg 构建企业级实时数据湖 [4]Flin ...

  8. 实践数据湖iceberg 第三十七课 kakfa写入iceberg的 icberg表的 enfource ,not enfource测试

    系列文章目录 实践数据湖iceberg 第一课 入门 实践数据湖iceberg 第二课 iceberg基于hadoop的底层数据格式 实践数据湖iceberg 第三课 在sqlclient中,以sql ...

  9. Oracle 数据怎么实时同步到 Kafka | 亲测干货分享建议收藏

     摘要: 很多 DBA 同学经常会遇到要从一个数据库实时同步到另一个数据库的问题,同构数据还相对容易,遇上异构数据.表多.数据量大等情况就难以同步.我自己亲测了一种方式,可以非常方便地完成 Oracl ...

最新文章

  1. Nagios借助yahoo.cn邮箱实现手机短信报警功能
  2. Windows Server云服务器配置深度学习环境WS
  3. Python做web开发,推荐几个能立马上手的小项目
  4. 经典异或题:汉明距离
  5. 如果用神经网络分类处于纠缠态的一对粒子?
  6. Python环境下的数据库编程
  7. 【CyberSecurityLearning 2】IP地址与DOS命令
  8. 关于mysql触发器和存储过程的理解
  9. java连接mysql 5.7数据库_javaJDBC连接mysql(5.7)数据库,一看就懂的详细例子
  10. 网络安全防护部署,升级支持IPv6
  11. 官网下载JDK需要登陆的Oracle账号
  12. java 取对象的类_Java中通过Class类获取Class对象的方法详解
  13. Redis 6.0 源码阅读笔记(7) -- ZSet 数据类型源码分析
  14. 写出python的基本语法规则_【Python基础】python基本语法规则有哪些-赵小刀的回答...
  15. 有限元编程示例matlab + C++
  16. 想要与北上争雄,深圳还有哪些课要补?| DT城数
  17. 后盾网原创实战网站建设教程【PS切片+html+div+css+织梦后台...
  18. 用20行代码读懂复仇者联盟的人物关系(无剧透)
  19. 【深入理解多线程】 Moniter的实现原理(四)
  20. 163免费企业邮箱服务地址

热门文章

  1. oracle rac war配置,Oracle RAC安装配置流程
  2. html图片上加水印,css给图片添加水印
  3. python学习机器学习_机器学习Python技巧,令人毛骨悚然的Linux命令,Thelio,Podman等
  4. 无罪的罪人_探索敏捷和无罪的文化
  5. linux 路由器_为什么我要建立自己的自制Linux路由器
  6. (15)Node.js 自定义模块
  7. Bootstrap 使用Less变量
  8. android h5选择图片上传,js-微信H5选择多张图片预览并上传(兼容ios,安卓,已测试)...
  9. 深度学习笔记(48) 内容代价函数
  10. 怎么把ps转化成html,【论文】浅谈Photoshop转化成Html的方法.pdf