Flinksql读取Kafka写入Iceberg 实践亲测
Flink sql实时读取Kafka写入Iceberg 实践亲测
前言
本文记录了使用HDFS的一个路径作为iceberg 的结果表,使用Flink sql实时消费kafka中的数据并写入iceberg表,并且使用Hive作为客户端实时读取。话不多说,上代码!!!
测试环境:
JDK1.8
Flink 1.11.1
iceberg 0.11.0
Hadoop3.0.0
Hive2.1.1
测试前准备工作:
1.安装flink1.11.1
2.下载kafka、hive与flink的集成jar包
3.下载iceberg-flink-runtime-0.11.0.jar
测试步骤:
启动flink sql-client:
bin/sql-client.sh embedded -j lib/iceberg-flink=runtime-0.11.0.jar -j lib/flink-connector-hive.jar -j lib/flink-sql-connector-kafka.jar shell
创建hive_catalog:
CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://xxx:9083',
'warehouse'='hdfs://nameservice1/user/hive/warehouse',
'clients'='5',
'property-version'='1'
);
使用hive_catalog:
use catalog hive_catalog;
创建并使用数据库:
create database iceberg_db;
use iceberg_db;
创建iceberg目标表:
CREATE TABLE hive_catalog.iceberg_db.iceberg01 (
user_id STRING COMMENT 'user_id',
order_amount DOUBLE COMMENT 'order_amount',
log_ts STRING
);
创建topic:source_kafka01,并写入数据:
kafka-console-producer.sh --broker-list kafka-server:9092 --topic source_kafka01
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:12:12"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:15:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:20:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"}
{"user_id":"a1111","order_amount":13.0,"log_ts":"2020-06-29 12:32:00"}
{"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:12:12"}
使用hive catalog创建kafka流表:
CREATE TABLE myhive.mydatabase.source_kafka01 (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3)
) WITH (
'connector'='kafka',
'topic'='source_kafka01',
'scan.startup.mode'='earliest-offset',
'properties.bootstrap.servers'='kafka-server:9092',
'properties.group.id' = 'testGroup',
'format'='json'
);
使用SQL连接kafka流表和iceberg目标表:
insert into hive_catalog.iceberg_db.iceberg01 select * from myhive.mydatabase.source_kafka01;
查看数据是否写入:
select * from hive_catalog.iceberg_db.iceberg01;
查看iceberg表的数据目录:
总结:
Iceberg与flink sql的集成,目前还存在不足,比如对column schema的修改、生成文件的大小控制等。不过社区一直在完善,欢迎持续关注。
Flinksql读取Kafka写入Iceberg 实践亲测相关推荐
- 【Spark】Spark Stream读取kafka写入kafka报错 AbstractMethodError
1.概述 根据这个博客 [Spark]Spark 2.4 Stream 读取kafka 写入kafka 报错如下 Exception in thread "main" java.l ...
- 【Spark】Spark 2.4 Stream 读取kafka 写入kafka
1.概述 昨天一网友写了一个spark程序 读取kafka写入kafka,结果数据就是无法写入,然后交给我看看,这个程序是spark stream ,这个东东我都没玩过,我用过spark struct ...
- demo flink写入kafka_Flink结合Kafka实时写入Iceberg实践笔记
前言 上文提到使用Flink SQL写入hadoop catalog 的iceberg table 的简单示例,这次我就flink 消费kafka 流式写入iceberg table做一个验证,现记录 ...
- Flink结合Kafka实时写入Iceberg实践笔记
前言 上文提到使用Flink SQL写入hadoop catalog 的iceberg table 的简单示例,这次我就flink 消费kafka 流式写入iceberg table做一个验证,现记录 ...
- flinksql实时读取kafka写入mysql
因为flink版本迭代比较迅速,在我们进行代码的编写过程中容易出现版本不兼容的问题,为此本文是在flink版本为1.12.0的基础上完成开发的. 1:配置maven依赖(重要) <?xml ve ...
- java读写德卡数据_Spark Streaming 读取Kafka数据写入ES
简介: 目前项目中已有多个渠道到Kafka的数据处理,本文主要记录通过Spark Streaming 读取Kafka中的数据,写入到Elasticsearch,达到一个实时(严格来说,是近实时,刷新时 ...
- flink写入iceberg(没有搞完)
Reference: [1]Flink集成数据湖之实时数据写入iceberg [2]在 Flink 中使用 iceberg [3]基于 Flink+Iceberg 构建企业级实时数据湖 [4]Flin ...
- 实践数据湖iceberg 第三十七课 kakfa写入iceberg的 icberg表的 enfource ,not enfource测试
系列文章目录 实践数据湖iceberg 第一课 入门 实践数据湖iceberg 第二课 iceberg基于hadoop的底层数据格式 实践数据湖iceberg 第三课 在sqlclient中,以sql ...
- Oracle 数据怎么实时同步到 Kafka | 亲测干货分享建议收藏
摘要: 很多 DBA 同学经常会遇到要从一个数据库实时同步到另一个数据库的问题,同构数据还相对容易,遇上异构数据.表多.数据量大等情况就难以同步.我自己亲测了一种方式,可以非常方便地完成 Oracl ...
最新文章
- Nagios借助yahoo.cn邮箱实现手机短信报警功能
- Windows Server云服务器配置深度学习环境WS
- Python做web开发,推荐几个能立马上手的小项目
- 经典异或题:汉明距离
- 如果用神经网络分类处于纠缠态的一对粒子?
- Python环境下的数据库编程
- 【CyberSecurityLearning 2】IP地址与DOS命令
- 关于mysql触发器和存储过程的理解
- java连接mysql 5.7数据库_javaJDBC连接mysql(5.7)数据库,一看就懂的详细例子
- 网络安全防护部署,升级支持IPv6
- 官网下载JDK需要登陆的Oracle账号
- java 取对象的类_Java中通过Class类获取Class对象的方法详解
- Redis 6.0 源码阅读笔记(7) -- ZSet 数据类型源码分析
- 写出python的基本语法规则_【Python基础】python基本语法规则有哪些-赵小刀的回答...
- 有限元编程示例matlab + C++
- 想要与北上争雄,深圳还有哪些课要补?| DT城数
- 后盾网原创实战网站建设教程【PS切片+html+div+css+织梦后台...
- 用20行代码读懂复仇者联盟的人物关系(无剧透)
- 【深入理解多线程】 Moniter的实现原理(四)
- 163免费企业邮箱服务地址
热门文章
- oracle rac war配置,Oracle RAC安装配置流程
- html图片上加水印,css给图片添加水印
- python学习机器学习_机器学习Python技巧,令人毛骨悚然的Linux命令,Thelio,Podman等
- 无罪的罪人_探索敏捷和无罪的文化
- linux 路由器_为什么我要建立自己的自制Linux路由器
- (15)Node.js 自定义模块
- Bootstrap 使用Less变量
- android h5选择图片上传,js-微信H5选择多张图片预览并上传(兼容ios,安卓,已测试)...
- 深度学习笔记(48) 内容代价函数
- 怎么把ps转化成html,【论文】浅谈Photoshop转化成Html的方法.pdf