前言

上文提到使用Flink SQL写入hadoop catalog 的iceberg table 的简单示例,这次我就flink 消费kafka 流式写入iceberg table做一个验证,现记录如下:

环境:本地测试环境 JDK1.8  、Flink 1.11.2  、Hadoop3.0.0 、Hive2.1.1

一、前置说明

本文记录了使用HDFS的一个路径作为iceberg 的结果表,使用Flink实时消费kafka中的数据并写入iceberg表,并且使用Hive作为客户端实时读取。

因为iceberg强大的读写分离特性,新写入的数据几乎可以实时读取。参考 数据湖技术Iceberg的探索与实践.pdf

二、使用步骤

1.创建Hadoop Catalog的Iceberg 表

代码如下(示例):

        // create hadoop catalogtenv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" +"  'type'='iceberg',\n" +"  'catalog-type'='hadoop',\n" +"  'warehouse'='hdfs://nameservice1/tmp',\n" +"  'property-version'='1'\n" +")");// change catalogtenv.useCatalog("hadoop_catalog");tenv.executeSql("CREATE DATABASE if not exists iceberg_hadoop_db");tenv.useDatabase("iceberg_hadoop_db");// create iceberg result tabletenv.executeSql("drop table hadoop_catalog.iceberg_hadoop_db.iceberg_002"); tenv.executeSql("CREATE TABLE  hadoop_catalog.iceberg_hadoop_db.iceberg_002 (\n" +"    user_id STRING COMMENT 'user_id',\n" +"    order_amount DOUBLE COMMENT 'order_amount',\n" +"    log_ts STRING\n" +")");

2.使用Hive Catalog创建Kafka流表

代码如下(示例):

        String HIVE_CATALOG = "myhive";String DEFAULT_DATABASE = "tmp";String HIVE_CONF_DIR = "/xx/resources";Catalog catalog = new HiveCatalog(HIVE_CATALOG, DEFAULT_DATABASE, HIVE_CONF_DIR);tenv.registerCatalog(HIVE_CATALOG, catalog);tenv.useCatalog("myhive");// create kafka stream tabletenv.executeSql("DROP TABLE IF EXISTS ods_k_2_iceberg");tenv.executeSql("CREATE TABLE ods_k_2_iceberg (\n" +" user_id STRING,\n" +" order_amount DOUBLE,\n" +" log_ts TIMESTAMP(3),\n" +" WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND\n" +") WITH (\n" +"  'connector'='kafka',\n" +"  'topic'='t_kafka_03',\n" +"  'scan.startup.mode'='latest-offset',\n" +"  'properties.bootstrap.servers'='xx:9092',\n" +"  'properties.group.id' = 'testGroup_01',\n" +"  'format'='json'\n" +")");

3. 使用SQL连接kafka流表和iceberg 目标表

代码如下(示例):

        System.out.println("---> 3. insert into iceberg  table from kafka stream table .... ");tenv.executeSql("INSERT INTO  hadoop_catalog.iceberg_hadoop_db.iceberg_002 " +" SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd') FROM myhive.tmp.ods_k_2_iceberg");

4.  数据验证

bin/kafka-console-producer.sh --broker-list xx:9092 --topic t_kafka_03
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:12:12"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:15:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:20:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"}
{"user_id":"a1111","order_amount":13.0,"log_ts":"2020-06-29 12:32:00"}
{"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:12:12"}hive> add jar /home/zmbigdata/iceberg-hive-runtime-0.10.0.jar;
hive> CREATE EXTERNAL TABLE tmp.iceberg_002(user_id STRING,order_amount DOUBLE,log_ts STRING)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION '/tmp/iceberg_hadoop_db/iceberg_002';
hive> select * from tmp.iceberg_002  limit 5;
a1111   11.0    2020-06-29
a1111   11.0    2020-06-29
a1111   11.0    2020-06-29
a1111   11.0    2020-06-29
a1111   13.0    2020-06-29
Time taken: 0.108 seconds, Fetched: 5 row(s)

总结

本文仅仅简单介绍了使用Flink Table API 消费kafka并实时写入基于HDFS Hadoop Catalog的iceberg 结果表中,初步验证了该方案的可行性,当然鉴于该示例比较单一未经过线上验证,所以仅供参考。
不足之处烦请斧正,如对你有些许的帮助,还请不吝点赞支持Thanks♪(・ω・)ノ

Flink结合Kafka实时写入Iceberg实践笔记相关推荐

  1. demo flink写入kafka_Flink结合Kafka实时写入Iceberg实践笔记

    前言 上文提到使用Flink SQL写入hadoop catalog 的iceberg table 的简单示例,这次我就flink 消费kafka 流式写入iceberg table做一个验证,现记录 ...

  2. Flinksql读取Kafka写入Iceberg 实践亲测

    Flink sql实时读取Kafka写入Iceberg 实践亲测 前言 本文记录了使用HDFS的一个路径作为iceberg 的结果表,使用Flink sql实时消费kafka中的数据并写入iceber ...

  3. 数据湖(十六):Structured Streaming实时写入Iceberg

    文章目录 Structured Streaming实时写入Iceberg 一.创建Kafka topic 二.编写向Kafka生产数据代码

  4. kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ

    前言 之前有文章 <从0到1学习Flink>-- Flink 写入数据到 Kafka 写过 Flink 将处理后的数据后发到 Kafka 消息队列中去,当然我们常用的消息队列可不止这一种, ...

  5. 网易游戏 Flink on TiDB 实时数据业务实践

    原文来源: https://tidb.net/blog/d6b389e9 作者介绍: 林佳,网易互娱计费数据中心实时业务负责人,实时开发框架 JFlink-SDK 和实时业务平台 JFlink 的主程 ...

  6. springboot和flink 大数据实时写入hdfs

    一:flink 官网API: https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastr ...

  7. flink写入iceberg(没有搞完)

    Reference: [1]Flink集成数据湖之实时数据写入iceberg [2]在 Flink 中使用 iceberg [3]基于 Flink+Iceberg 构建企业级实时数据湖 [4]Flin ...

  8. 写入mysql_《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL

    前言 之前其实在 <从0到1学习Flink>-- 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的东西当时是写死的,不能够通用,最近知识星 ...

  9. 如何基于Flink+TensorFlow打造实时智能异常检测平台?只看这一篇就够了

    作者 | 潘国庆编辑 | Natalie AI 前线导读:Flink 已经渐渐成为实时计算引擎的首选之一,从简单的实时 ETL 到复杂的 CEP 场景,Flink 都能够很好地驾驭.本文整理自携程实时 ...

最新文章

  1. 为什么123 and 456结果是456而123 or 456结果是123?
  2. Codeforces 454C - Little Pony and Expected Maximum
  3. Android:字节跳动必备Context原理解析及使用
  4. asp.net 程序,当发生找不到文件的错误时,如何正确定位是哪个文件?
  5. PHP开发调试环境配置(基于wampserver+Eclipse for PHP Developers )
  6. vue 自己写上传excel组件_vue结合elementui组件 el-upload 上传excel表格(二)
  7. Qt Creator开发基于小部件的应用程序
  8. 哨兵系列卫星_智利Panguipulli湖的卫星遥感水特征时空变化图
  9. 仅需一行代码,你的纯文本秒变Markdown
  10. react学习(51)--避免死循环
  11. vscode输入vue自动_使用vscode,新建.vue文件,tab自动生成vue代码模板
  12. 使用 Grafana+collectd+InfluxDB 打造现代监控系统
  13. Linxu 输入子系统分析
  14. 数学分析-1.2数列和收敛数列-例题1、2、3
  15. 猫有哪些比较常见的肢体语言?
  16. c语言语法错误标识符,error C2061: 语法错误: 标识符“std”
  17. LeetCode-717. 1比特与2比特字符
  18. IDEA 出现错误:找不到或无法加载主类
  19. Springboot无法启动:At least one base package must be specified
  20. 清北学堂 2017-10-01

热门文章

  1. node.js爬虫-校园网模拟登录
  2. 现有的几个Unity热更新方案该如何选择,各自的优缺点是什么?
  3. pfa100_PFA 的主要性能
  4. 基于pandas的粗糙集依赖度约简算法思路及实现
  5. 用IDEA创建XML文件 并用浏览器打开
  6. C-Mold 2000.1 1CD(模流分析系统)
  7. 阳光小小美女--我们都需要一个阳光积极的人生
  8. Linux学习19-gitlab配置邮箱postfix(新用户激活邮件)
  9. java 代码封装_封装 java代码
  10. 实验二+108+曾宏宇