前言

上文提到使用Flink SQL写入hadoop catalog 的iceberg table 的简单示例,这次我就flink 消费kafka 流式写入iceberg table做一个验证,现记录如下:

环境:本地测试环境 JDK1.8  、Flink 1.11.2  、Hadoop3.0.0 、Hive2.1.1

一、前置说明

本文记录了使用HDFS的一个路径作为iceberg 的结果表,使用Flink实时消费kafka中的数据并写入iceberg表,并且使用Hive作为客户端实时读取。

因为iceberg强大的读写分离特性,新写入的数据几乎可以实时读取。参考 数据湖技术Iceberg的探索与实践.pdf

二、使用步骤

1.创建Hadoop Catalog的Iceberg 表

代码如下(示例):

System.out.println("---> 1. create iceberg hadoop catalog table .... ");

// create hadoop catalog

tenv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n"

" 'type'='iceberg',\n"

" 'catalog-type'='hadoop',\n"

" 'warehouse'='hdfs://nameservice1/tmp',\n"

" 'property-version'='1'\n"

")");

// change catalog

tenv.useCatalog("hadoop_catalog");

tenv.executeSql("CREATE DATABASE if not exists iceberg_hadoop_db");

tenv.useDatabase("iceberg_hadoop_db");

// create iceberg result table

tenv.executeSql("drop table hadoop_catalog.iceberg_hadoop_db.iceberg_002");

tenv.executeSql("CREATE TABLE hadoop_catalog.iceberg_hadoop_db.iceberg_002 (\n"

" user_id STRING COMMENT 'user_id',\n"

" order_amount DOUBLE COMMENT 'order_amount',\n"

" log_ts STRING\n"

")");

2.使用Hive Catalog创建Kafka流表

代码如下(示例):

System.out.println("---> 2. create kafka Stream table .... ");

String HIVE_CATALOG = "myhive";

String DEFAULT_DATABASE = "tmp";

String HIVE_CONF_DIR = "/xx/resources";

Catalog catalog = new HiveCatalog(HIVE_CATALOG, DEFAULT_DATABASE, HIVE_CONF_DIR);

tenv.registerCatalog(HIVE_CATALOG, catalog);

tenv.useCatalog("myhive");

// create kafka stream table

tenv.executeSql("DROP TABLE IF EXISTS ods_k_2_iceberg");

tenv.executeSql(

"CREATE TABLE ods_k_2_iceberg (\n"

" user_id STRING,\n"

" order_amount DOUBLE,\n"

" log_ts TIMESTAMP(3),\n"

" WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND\n"

") WITH (\n"

" 'connector'='kafka',\n"

" 'topic'='t_kafka_03',\n"

" 'scan.startup.mode'='latest-offset',\n"

" 'properties.bootstrap.servers'='xx:9092',\n"

" 'properties.group.id' = 'testGroup_01',\n"

" 'format'='json'\n"

")");

3. 使用SQL连接kafka流表和iceberg 目标表

代码如下(示例):

System.out.println("---> 3. insert into iceberg table from kafka stream table .... ");

tenv.executeSql(

"INSERT INTO hadoop_catalog.iceberg_hadoop_db.iceberg_002 "

" SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd') FROM myhive.tmp.ods_k_2_iceberg");

4.  数据验证

bin/kafka-console-producer.sh --broker-list xx:9092 --topic t_kafka_03

{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:12:12"}

{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:15:00"}

{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:20:00"}

{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"}

{"user_id":"a1111","order_amount":13.0,"log_ts":"2020-06-29 12:32:00"}

{"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:12:12"}

hive> add jar /home/zmbigdata/iceberg-hive-runtime-0.10.0.jar;

hive> CREATE EXTERNAL TABLE tmp.iceberg_002(user_id STRING,order_amount DOUBLE,log_ts STRING)

STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'

LOCATION '/tmp/iceberg_hadoop_db/iceberg_002';

hive> select * from tmp.iceberg_002 limit 5;

a111111.02020-06-29

a111111.02020-06-29

a111111.02020-06-29

a111111.02020-06-29

a111113.02020-06-29

Time taken: 0.108 seconds, Fetched: 5 row(s)

总结

本文仅仅简单介绍了使用Flink Table API 消费kafka并实时写入基于HDFS Hadoop Catalog的iceberg 结果表中,初步验证了该方案的可行性,当然鉴于该示例比较单一未经过线上验证,所以仅供参考。

不足之处烦请斧正,如对你有些许的帮助,还请不吝点赞支持Thanks♪(・ω・)ノ

来源:https://www.icode9.com/content-4-827451.html

demo flink写入kafka_Flink结合Kafka实时写入Iceberg实践笔记相关推荐

  1. 1.30.Flink SQL案例将Kafka数据写入hive

    1.30.Flink SQL案例将Kafka数据写入hive 1.30.1.1.场景,环境,配置准备 1.30.1.2.案例代码 1.30.1.2.1.编写pom.xml文件 1.30.1.2.2.M ...

  2. Flink结合Kafka实时写入Iceberg实践笔记

    前言 上文提到使用Flink SQL写入hadoop catalog 的iceberg table 的简单示例,这次我就flink 消费kafka 流式写入iceberg table做一个验证,现记录 ...

  3. flink 写kafka_flink消费kafka的offset与checkpoint

    生产环境有个作业,逻辑很简单,读取kafka的数据,然后使用hive catalog,实时写入hbase,hive,redis.使用的flink版本为1.11.1. 为了防止写入hive的文件数量过多 ...

  4. 【Flink系列】开启jdbc批量写入

    背景 开发Flink应用要求计算结果实时写入数据库的,一般业务写入TPS在600-800,如果生产同时跑十几个任务,数据库写入TPS接近一万,对数据库造成了较大压力,使用窗口的优化方向不可行: 1. ...

  5. Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL

    上周六在深圳分享了<Flink SQL 1.9.0 技术内幕和最佳实践>,会后许多小伙伴对最后演示环节的 Demo 代码非常感兴趣,迫不及待地想尝试下,所以写了这篇文章分享下这份代码.希望 ...

  6. kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ

    前言 之前有文章 <从0到1学习Flink>-- Flink 写入数据到 Kafka 写过 Flink 将处理后的数据后发到 Kafka 消息队列中去,当然我们常用的消息队列可不止这一种, ...

  7. flink 写入到es_《从0到1学习Flink》—— Flink 写入数据到 Kafka

    前言 之前文章 <从0到1学习Flink>-- Flink 写入数据到 ElasticSearch 写了如何将 Kafka 中的数据存储到 ElasticSearch 中,里面其实就已经用 ...

  8. centos7安装flink集群_《从0到1学习Flink》—— Flink 写入数据到 Kafka

    前言 之前文章 <从0到1学习Flink>-- Flink 写入数据到 ElasticSearch 写了如何将 Kafka 中的数据存储到 ElasticSearch 中,里面其实就已经用 ...

  9. Flink集成数据湖之实时数据写入iceberg

    文章目录 背景 iceberg简介 flink实时写入 准备sql client环境 创建catalog 创建db 创建table 插入数据 查询 代码版本 总结 背景 随着大数据处理结果的实时性要求 ...

最新文章

  1. 存储引擎——概述|| 各种存储引擎的特性(InnoDB||MyISAM||MEMORY||MERGE) ||存储引擎的选择
  2. python高斯分布训练_Python画图高斯分布的示例
  3. 使用Hexo搭建博客,备份至GitHub过程(基于网上资料的实践操作)
  4. 关于Heap Dump
  5. Hive2.1.1的安装教程(元数据放在本地Mysql)
  6. 判断new出来的对象里面值为null
  7. html加了文档声明之后页面错乱,为登陆页面扩展和配置设计导入程序
  8. OCJP认证该不该考?
  9. dwg格式的计算机图,看图纸(DWG文件浏览器)
  10. 复杂性应对之道 - 领域建模
  11. CocosCreator之绳索摆动效果
  12. Java面试题(二)JMM,volatile,CAS
  13. openwrt安装ipk报错“incompatible with the architectures configured”
  14. 我的机器学习支线「模型复杂度」
  15. 魔方复原(BFS+剪枝)
  16. 西门子S7-1200PLC堆栈程序 在使用西门子1200PLC时候发现,系统没有自带的堆栈功能块,不能实现数据的先进先出后进后出功能
  17. MFC CFileDialog 相对路径
  18. redis 指定端口 启动
  19. 南方科技大学深港微电子学院夏令营
  20. django使用Q进行复杂查询

热门文章

  1. 在maven项目中使用Junit进行单元测试
  2. 几个简单java基础的例子
  3. 内核函数输出怎么看到_谈谈如何学习Linux内核
  4. 零基础入门学习Python22-递归2 斐波那契数列和汉诺塔
  5. NBT:牛瘤胃微生物组的参考基因组集
  6. 微生物绝对定量or相对定量,你选对了吗
  7. Nature Genetics:每年造成7亿人感染的酿脓链球菌的致病机制
  8. 宏基因组扩增子最新分析流程QIIME2:官方中文帮助文档
  9. R语言使用caretEnsemble包的caretList函数一次性构建多个机器学习模型、使用lattice包的bwplot函数使用箱图对比多个模型在多个指标上的性能差异
  10. Python使用matplotlib可视化华夫饼图(Waffle Chart) 、华夫饼图可以直观地显示完成度(百分比)或者部分占整体的比例、华夫饼图适合于同类型指标的比较(Waffle Chart)