iceberg Flink操作
Flink操作
1.配置参数和jar包
1) Flink1.11 开始就不在提供 flink-shaded-hadoop-2-uber 的支持,所以如果需要 flink 支持hadoop 得配置环境变量 HADOOP_CLASSPATH
[root@hadoop103 flink-1.11.0]# vim bin/config.sh
export HADOOP_COMMON_HOME=/opt/module/hadoop-3.1.3
export HADOOP_HDFS_HOME=/opt/module/hadoop-3.1.3
export HADOOP_YARN_HOME=/opt/module/hadoop-3.1.3
export HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3
export HADOOP_CLASSPATH=`hadoop classpath`
export PATH=$PATH:$HADOOP_CLASSPATH
2) 目前 Iceberg 只支持 flink1.11.x 的版本,所以我这使用 flink1.11.0,将构建好的 Iceberg的 jar 包复制到 flink 下
/opt/module/iceberg-apache-iceberg-0.11.1/flink-runtime/build/libs/
[root@hadoop103 libs]# cp *.jar /opt/module/flink-1.11.0/lib/
2. Flink SQL Client
1)在 hadoop 环境下,启动一个单独的 flink 集群
root@hadoop103 flink-1.11.0]# bin/start-cluster.sh
启动:
./sql-client.sh embedded
-j /home/softs/flink-1.11.1/lib/iceberg-flink-runtime-0.11.1.jar
-j /home/softs/flink-1.11.1/lib/flink-sql-connector-hive-2.3.6_2.12-1.15.0.jar shell
3.使用catalog创建目录
--创建hadoop_catalog,执行下面命令
CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://master:9000/user/hive/warehouse'
);--创建hive_catalog,执行下面命令:
CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://master:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://master:9000/user/hive/warehouse'
);--创建自定义目录
CREATE CATALOG my_catalog WITH ('type'='iceberg','catalog-impl'='com.my.custom.CatalogImpl','my-additional-catalog-config'='my-value'
);
- uri: Hive 元存储的 thrift URI。(必需的)
- clients:Hive Metastore 客户端池大小,默认值为 2。(可选)
- warehouse:Hive仓库位置,如果既不设置hive-conf-dir指定包含hive-site.xml配置文件的位置也不添加正确hive-site.xml的类路径,用户应指定此路径。
- hive-conf-dirhive-site.xml:包含将用于提供自定义 Hive配置值的配置文件的目录的路径。如果同时设置和创建冰山目录时,hive.metastore.warehouse.dirfrom/hive-site.xml(或来自类路径的 hive 配置文件)的值将被该值覆盖。warehousehive-conf-dirwarehouse
使用创建的catalog
Flink SQL> use catalog hadoop_catalog;
创建数据库
Flink SQL> create database iceberg_sjc;
创建表
Flink SQL>
> create table testA(
> id bigint,
> name string,
> age int,
> dt string)
> PARTITIONED by(dt);
插入数据
insert into testA values(1001,' 张三',18,'2021-07-01'),(1001,' 李四',19,'2021-07-02');
重写数据
insert overwrite testA values(1010,' zz ',18,'2021-07-01'),(201,' 马 六',192,'2021-07-01');
4.Flink API操作
1)配置pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flink_demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.11.1</flink.version><scala.version>2.11.8</scala.version><scala.binary.version>2.11</scala.binary.version><log4j.version>1.2.17</log4j.version><slf4j.version>1.7.22</slf4j.version><iceberg.version>0.11.1</iceberg.version><hadoop.version>2.6.5</hadoop.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-compatibility_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><!--https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!--https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-blink --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-flink-runtime</artifactId><version>0.11.1</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.76</version></dependency></dependencies><build><finalName>flink_demo</finalName><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.7.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><!--<manifest>--><!--<mainClass>com.sxd.util.QR_Code</mainClass>--><!--</manifest>--></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
2)读取表数据
1.batch read
package cn.dp.icberg.flink;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.source.FlinkSource;import java.util.ArrayList;/*** @author: 商俊超* @version:* @date: 2022/5/26* @description:*/
/* 执行命令/home/softs/flink-1.11.1/bin/flink run -c cn.dp.icberg.flink.TableRead flink_demo-jar-with-dependencies.jar*/
public class TableRead {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://master:9000/user/hive/warehouse/default/testA");batchRead(env, tableLoader);env.execute();}/*** 批次读取* @param env* @param tableLoader*/public static void batchRead(StreamExecutionEnvironment env, TableLoader tableLoader) {DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();batch.map(item -> item.getLong(0)+"\t"+item.getString(1)+"\t"+item.getInt(2)+"\t"+item.getString(3)).print();}
2.streaming read读
package cn.dp.icberg.flink;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.source.FlinkSource;import java.util.ArrayList;/*** @author: 商俊超* @version:* @date: 2022/5/26* @description:*/
/* 执行命令/home/softs/flink-1.11.1/bin/flink run -c cn.dp.icberg.flink.TableRead flink_demo-jar-with-dependencies.jar*/
public class TableRead {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://master:9000/user/hive/warehouse/default/testA");streamingRead(env, tableLoader);env.execute();}/*** 批次读取* @param env* @param tableLoader*/public static void batchRead(StreamExecutionEnvironment env, TableLoader tableLoader) {DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();batch.map(item -> item.getLong(0)+"\t"+item.getString(1)+"\t"+item.getInt(2)+"\t"+item.getString(3)).print();}/*** 流式读取* @param env* @param tableLoader*/public static void streamingRead(StreamExecutionEnvironment env, TableLoader tableLoader){DataStream<RowData> stream = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(true).build();stream.print();}
3)写数据
1.累加 写入 Appending Data
package cn.dp.icberg.flink;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.source.FlinkSource;import java.util.ArrayList;/*** @author: 商俊超* @version:* @date: 2022/5/26* @description:*/
/* 执行命令/home/softs/flink-1.11.1/bin/flink run -c cn.dp.icberg.flink.TableRead flink_demo-jar-with-dependencies.jar*/
public class TableRead {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://master:9000/user/hive/warehouse/default/testA");appendingData(env, tableLoader);env.execute();}/*** 累加写入* @param env* @param tableLoader*/
public static void appendingData(StreamExecutionEnvironment env,TableLoader tableLoader){DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();TableLoader tableB = TableLoader.fromHadoopTable("hdfs://master:9000/user/hive/warehouse/default/testB");FlinkSink.forRowData(batch).tableLoader(tableB).build();
}
2.覆盖写 Overwrite Data
package cn.dp.icberg.flink;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.source.FlinkSource;import java.util.ArrayList;/*** @author: 商俊超* @version:* @date: 2022/5/26* @description:*/
/* 执行命令/home/softs/flink-1.11.1/bin/flink run -c cn.dp.icberg.flink.TableRead flink_demo-jar-with-dependencies.jar*/
public class TableRead {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://master:9000/user/hive/warehouse/default/testA");overtData(env, tableLoader);env.execute();}
/*** 覆盖写* @param env* @param tableLoader*/
public static void overtData(StreamExecutionEnvironment env,TableLoader tableLoader){ DataStream<RowData> batch =FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); TableLoader tableB =TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/testB"); FlinkSink.forRowData(batch).tableLoader(tableB).overwrite(true).build();
}
4)插入1W条测试数据
package cn.dp.icberg.flink;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.source.FlinkSource;import java.util.ArrayList;/*** @author: 商俊超* @version:* @date: 2022/5/26* @description:*/
/* 执行命令/home/softs/flink-1.11.1/bin/flink run -c cn.dp.icberg.flink.TableRead flink_demo-jar-with-dependencies.jar*/
public class TableRead {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://master:9000/user/hive/warehouse/default/testA");dataTest(env, tableLoader);env.execute();}
/*** 添加 10000条测试数据* @param env* @param tableLoader*/
public static void dataTest(StreamExecutionEnvironment env,TableLoader tableLoader){ArrayList<String> list = new ArrayList<>();for (int i =1 ; i<= 10000; i++){list.add("{\"id\":\""+i+"\",\"name\":\""+"liusi"+i+"\",\"age\":\""+i+1+"\",\"dt\":\""+"2022-05-26"+"\"}");}DataStream<String> data = env.fromCollection(list);DataStream<RowData> input = data.map(item -> {JSONObject jsonData = JSONObject.parseObject(item);//参数个数GenericRowData rowData = new GenericRowData(4);rowData.setField(0, jsonData.getLongValue("id"));rowData.setField(1, StringData.fromString(jsonData.getString("name")));rowData.setField(2, jsonData.getIntValue("age"));rowData.setField(3, StringData.fromString(jsonData.getString("dt")));return rowData;});FlinkSink.forRowData(input).tableLoader(tableLoader).overwrite(true).build();}
iceberg Flink操作相关推荐
- 【Flink】Flink 操作HDFS报错 hadoop is not in the classpath/dependencies
文章目录 1.背景 2.方案1 3.方案2 2.场景2 1.背景 写了一个FLink程序,用来设置RocksDb,然后报错 @Testpublic void flatMapStateBackendTe ...
- Flink操作json数据
1 kafka中的json数据 关键词:json.嵌套.复杂结构.Map.Array.Row 1. Flink SQL 解析嵌套的 JSON 数据_JasonLee_后厂村程序员-CSDN博客 - 2 ...
- Flink集成Iceberg在同程艺龙的实践
简介:本文由同城艺龙大数据开发工程师张军分享,主要介绍同城艺龙 Flink 集成 Iceberg 的生产实践. 本文由同城艺龙大数据开发工程师张军分享,主要介绍同城艺龙 Flink 集成 Iicebe ...
- 实践数据湖iceberg 第二十一课 flink1.13.5 + iceberg0.131 CDC(测试成功INSERT,变更操作失败)
系列文章目录 实践数据湖iceberg 第一课 入门 实践数据湖iceberg 第二课 iceberg基于hadoop的底层数据格式 实践数据湖iceberg 第三课 在sqlclient中,以sql ...
- Flink+Iceberg搭建实时数据湖实战
点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 全网最全大数据面试提升手册! 第一部分:Iceberg 核心功能原理剖析 : Apache Ice ...
- Spark+Flink+Iceberg打造湖仓一体架构实践探索
数据湖-大数据生态杀青 数据仓库的痛点 只能存储结构化数据,无法采集存储非结构化数据 无法存储原始数据,所有的数据须经过ETL清洗过程 离线数仓的数据表牵一发而动全身,数据调整工程量大 实时数仓存储空 ...
- Flink重写Iceberg数据湖小文件变大文件
1. 重写小文件变大文件 Flink支持Batch任务,将iceberg表的小文件重写成大文件 合并前HDFS的metadata和data目录文件如下: [root@flink1 ~]# [root@ ...
- 实时数仓 大数据 Hadoop flink kafka
⼀.实时数仓建设背景 实时需求⽇趋迫切 ⽬前各⼤公司的产品需求和内部决策对于数据实时性的要求越来越迫切,需要实时数仓的能⼒来赋能.传统离 线数仓的数据时效性是 T+1,调度频率以天为单位,⽆法⽀撑实时 ...
- 探究flink-stream如何增量的读取iceberg table
从iceberg的官方文档上可以看到如下介绍: 实例程序中设置了startSnapshotId,介绍说可以从指定的快照版本号开始读取增量的数据.那么笔者的问题来了: flink-stream如何增量的 ...
最新文章
- 设置mysql默认字符集_MySQL之修改默认引擎和字符集
- ajax通讯原理,ajax通讯原理以及自己封装一个ajax函数
- python 中关于py2exe打包
- 前端测试简述及使用Karma/Mocha实现的集成测试栗子(Travis CI/Coverage)
- golang中的os包
- lua源代码分析02:内存管理
- Windows 软RAID 1操作教程
- powerbuilder查询符合条件的数据并且过滤掉其他数据_论文浅尝 ISWC2020 | KnowlyBERT: 知识图谱结合语言模型补全图谱查询...
- java前期_【JAVA】前期环境配置
- python的repr和str有什么不同_str()和repr()的异同
- Android ListView使用BaseAdapter与ListView的优化
- 修改阿里云ECS服务器的系统时区
- 原生js添加鼠标事件的兼容性写法
- sql附加服务器数据库文件,批量附加sql数据库
- php redis缓存雪崩,redis雪崩是什么
- SpringBoot集成JApiDocs实现自动生成接口文档
- 魔百盒ZXV10 B863AV3.2-M/B863AV3.1-M2_S905L3A-B_线刷+卡刷精简固件
- js截取url所带参数方法与url截取字段中包含中文会乱码的解决方案
- [2023年的每一天]1. 15 看了一天的 B站
- 波士顿动力狗 SPOT 权威购买指北