有不少读者反馈,参考上篇文章《Hive 终于等来了 Flink》部署 Flink 并集成 Hive 时,出现一些 bug 以及兼容性等问题。虽已等来,却未可用。所以笔者增加了这一篇文章,作为姊妹篇。 回顾 在上篇文章中,笔者使用的 CDH 版本为 5.16.2,其中 Hive 版本为 1.1.0(CDH 5.x 系列 Hive 版本都不高于 1.1.0,是不是不可理解),Flink 源代码本身对 Hive 1.1.0 版本兼容性不好,存在不少问题。为了兼容目前版本,笔者基于 CDH 5.16.2 环境,对 Flink 代码进行了修改,重新打包并部署。 其实经过很多开源项目的实战,比如 Apache Atlas,Apache Spark 等,Hive 1.2.x 和 Hive 1.1.x 在大部分情况下,替换一些 Jar 包,是可以解决兼容性的问题。对于笔者的环境来说,可以使用 Hive 1.2.1 版本的一些 Jar 包来代替 Hive 1.1.0 版本的 Jar 包。在本篇文章的开始部分,笔者会解决这个问题,然后再补充上篇文章缺少的实战内容。 剪不断理还乱的问题 根据读者的反馈,笔者将所有的问题总结为三类:

  1. Flink 如何连接 Hive 除了 API 外,有没有类似 spark-sql 命令

  2. 识别不到 Hadoop 环境或配置文件找不到

  3. 依赖包、类或方法找不到

1. Flink 如何连接 Hive 有的读者不太清楚,如何配置 Flink 连接 Hive 的 Catalog,这里补充一个完整的 conf/sql-client-hive.yaml 示例: catalogs: - name: staginghive type: hive hive-conf-dir: /etc/hive/conf hive-version: 1.2.1 execution: planner: blink type: batch time-characteristic: event-time periodic-watermarks-interval: 200 result-mode: table max-table-result-rows: 1000000 parallelism: 1 max-parallelism: 128 min-idle-state-retention: 0 max-idle-state-retention: 0 current-catalog: staginghive current-database: ssb restart-strategy: type: fallback deployment: response-timeout: 5000 gateway-address: "" gateway-port: 0 m: yarn-cluster yn: 2 ys: 5 yjm: 1024 ytm: 2048 sql-client-hive.yaml 配置文件里面包含:

  1. Hive 配置文件 catalogs 中配置了 Hive 的配置文件路径。

  2. Yarn 配置信息 deployment 中配置了 Yarn 的配置信息。

  3. 执行引擎信息 execution 配置了 blink planner,并且使用 batch 模式。batch 模式比较稳定,适合传统的批处理作业,而且可以容错,另外中间数据落盘,建议开启压缩功能。除了 batch,Flink 也支持 streaming 模式。

■ Flink SQL CLI 工具 类似 spark-sql 命令,Flink 提供了 SQL CLI 工具,即 sql-client.sh 脚本。在 Flink 1.10 版本中,Flink SQL CLI 改进了很多功能,笔者后面讲解。 sql-client.sh 使用方式如下: $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml 2. 识别不到 Hadoop 环境或配置文件找不到 笔者在上篇文章中提到过,在部署 Flink 的环境上部署 CDH gateway,包括 Hadoop、Hive 客户端,另外还需要配置一些环境变量,如下: export HADOOP_CONF_DIR=/etc/hadoop/conf export YARN_CONF_DIR=/etc/hadoop/conf export HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive export HIVE_CONF_DIR=/etc/hive/conf 3. 依赖包、类或方法找不到 先查看一下 Flink 家目录下的 lib 目录: $ tree lib lib ├── flink-connector-hive_2.11-1.10.0.jar ├── flink-dist_2.11-1.10.0.jar ├── flink-hadoop-compatibility_2.11-1.10.0.jar ├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar ├── flink-table_2.11-1.10.0.jar ├── flink-table-blink_2.11-1.10.0.jar ├── hive-exec-1.1.0-cdh5.16.2.jar ├── hive-metastore-1.1.0-cdh5.16.2.jar ├── libfb303-0.9.3.jar ├── log4j-1.2.17.jar └── slf4j-log4j12-1.7.15.jar 如果上面前两个问题都解决后,执行如下命令: $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml 报错,报错,还是报错: Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory 其实在运行 sql-client.sh 脚本前,需要指定 Hadoop 环境的依赖包的路径,建议不要报错一个添加一个,除非有的读者喜欢。这里笔者提示一个方便的方式,即设置 HADOOPCLASSPATH(可以添加到 ~/.bashprofile 中)环境变量: export HADOOP_CLASSPATH=`hadoop classpath` 再次执行: $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml 很抱歉,继续报错: Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:753) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client 这里就是 Hive 1.1.0 版本的 Jar 包与 Flink 出现版本不兼容性的问题了,解决方法是:

  1. 下载 apache-hive-1.2.1 版本

  2. 替换 Flink lib 目录下的 Hive Jar 包 删除掉 hive-exec-1.1.0-cdh5.16.2.jar、 hive-metastore-1.1.0-cdh5.16.2.jar 和 libfb303-0.9.3.jar,然后添加 hive-exec-1.2.1.jar、 hive-metastore-1.2.1.jar 和 libfb303-0.9.2.jar,再次查看 lib 目录:

$ tree lib lib ├── flink-connector-hive_2.11-1.10.0.jar ├── flink-dist_2.11-1.10.0.jar ├── flink-hadoop-compatibility_2.11-1.10.0.jar ├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar ├── flink-table_2.11-1.10.0.jar ├── flink-table-blink_2.11-1.10.0.jar ├── hive-exec-1.2.1.jar ├── hive-metastore-1.2.1.jar ├── libfb303-0.9.2.jar ├── log4j-1.2.17.jar └── slf4j-log4j12-1.7.15.jar 最后再执行: $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml 这时,读者就可以看到手握栗子的可爱小松鼠了。

Flink SQL CLI 实践 在 Flink 1.10 版本(目前为 RC1 阶段) 中,Flink 社区对 SQL CLI 做了大量的改动,比如支持 View、支持更多的数据类型和 DDL 语句、支持分区读写、支持 INSERT OVERWRITE 等,实现了更多的 TableEnvironment API 的功能,更加方便用户使用。 接下来,笔者详细讲解 Flink SQL CLI。 0. Help 执行下面命令,登录 Flink SQL 客户端: $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml Flink SQL> 执行 HELP,查看 Flink SQL 支持的命令,如下为大部分常用的:

  • CREATE TABLE

  • DROP TABLE

  • CREATE VIEW

  • DESCRIBE

  • DROP VIEW

  • EXPLAIN

  • INSERT INTO

  • INSERT OVERWRITE

  • SELECT

  • SHOW FUNCTIONS

  • USE CATALOG

  • SHOW TABLES

  • SHOW DATABASES

  • SOURCE

  • USE

  • SHOW CATALOGS

1. Hive 操作 ■ 1.1 创建表和导入数据 为了方便读者进行实验,笔者使用 ssb-dbgen 生成测试数据,读者也可以使用测试环境已有的数据来进行实验。 具体如何在 Hive 中一键式创建表并插入数据,可以参考笔者早期的项目 https://github.com/MLikeWater/ssb-kylin。 ■ 1.2 Hive 表 查看上个步骤中创建的 Hive 表: 0: jdbc:hive2://xx.xxx.xxx.xxx:10000> show tables; +--------------+--+ | tab_name | +--------------+--+ | customer | | dates | | lineorder | | p_lineorder | | part | | supplier | +--------------+--+ 读者可以对 Hive 进行各种查询,对比后面 Flink SQL 查询的结果。 2. Flink 操作 ■ 2.1 通过 HiveCatalog 访问 Hive 数据库 登录 Flink SQL CLI,并查询 catalogs: $ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml Flink SQL> show catalogs; default_catalog staginghive Flink SQL> use catalog staginghive; 通过 show catalogs 获取配置的所有 catalog。由于笔者在 sql-client-hive.yaml 文件中设置了默认的 catalog,即为 staginghive。如果需要切换到其他 catalog,可以使用 usecatalog xxx。 ■ 2.2 查询 Hive 元数据 通过 Flink SQL 查询 Hive 数据库和表: # 查询数据库 Flink SQL> show databases; ... ssb tmp ... Flink SQL> use ssb; # 查询表 Flink SQL> show tables; customer dates lineorder p_lineorder part supplier # 查询表结构 Flink SQL> DESCRIBE customer; root |-- c_custkey: INT |-- c_name: STRING |-- c_address: STRING |-- c_city: STRING |-- c_nation: STRING |-- c_region: STRING |-- c_phone: STRING |-- c_mktsegment: STRING 这里需要注意,Hive 的元数据在 Flink catalog 中都以小写字母使用。 ■ 2.3 查询 接下来,在 Flink SQL CLI 中查询一些 SQL 语句,完整 SQL 参考 https://github.com/MLikeWater/ssb-kylin 的 README。 目前 Flink SQL 解析 Hive 视图元数据时,会遇到一些 Bug,比如执行 Q1.1 SQL: Flink SQL> select sum(v_revenue) as revenue > from p_lineorder > left join dates on lo_orderdate = d_datekey > where d_year = 1993 > and lo_discount between 1 and 3 > and lo_quantity < 25; [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.sql.validate.SqlValidatorException: Tabeorder' not found; did you mean 'LINEORDER'? Flink SQL 找不到视图中的实体表。 p_lineorder 表是 Hive 中的一张视图,创建表的语句如下: CREATE VIEW P_LINEORDER AS SELECT LO_ORDERKEY, LO_LINENUMBER, LO_CUSTKEY, LO_PARTKEY, LO_SUPPKEY, LO_ORDERDATE, LO_ORDERPRIOTITY, LO_SHIPPRIOTITY, LO_QUANTITY, LO_EXTENDEDPRICE, LO_ORDTOTALPRICE, LO_DISCOUNT, LO_REVENUE, LO_SUPPLYCOST, LO_TAX, LO_COMMITDATE, LO_SHIPMODE, LO_EXTENDEDPRICE*LO_DISCOUNT AS V_REVENUE FROM ssb.LINEORDER; 但是对于 Hive 中视图的定义,Flink SQL 并没有很好地处理元数据。为了后面 SQL 的顺利执行,这里笔者在 Hive 中删除并重建该视图: 0: jdbc:hive2://xx.xxx.xxx.xxx:10000> create view p_lineorder as select lo_orderkey, lo_linenumber, lo_custkey, lo_partkey, lo_suppkey, lo_orderdate, lo_orderpriotity, lo_shippriotity, lo_quantity, lo_extendedprice, lo_ordtotalprice, lo_discount, lo_revenue, lo_supplycost, lo_tax, lo_commitdate, lo_shipmode, lo_extendedprice*lo_discount as v_revenue from ssb.lineorder; 然后继续在 Flink SQL CLI 中查询 Q1.1 SQL: Flink SQL> select sum(v_revenue) as revenue > from p_lineorder > left join dates on lo_orderdate = d_datekey > where d_year = 1993 > and lo_discount between 1 and 3 > and lo_quantity < 25; revenue 894280292647 继续查询 Q2.1 SQL: Flink SQL> select sum(lo_revenue) as lo_revenue, d_year, p_brand > from p_lineorder > left join dates on lo_orderdate = d_datekey > left join part on lo_partkey = p_partkey > left join supplier on lo_suppkey = s_suppkey > where p_category = 'MFGR#12' and s_region = 'AMERICA' > group by d_year, p_brand > order by d_year, p_brand; lo_revenue d_year p_brand 819634128 1998 MFGR#1206 877651232 1998 MFGR#1207 754489428 1998 MFGR#1208 816369488 1998 MFGR#1209 668482306 1998 MFGR#1210 660366608 1998 MFGR#1211 862902570 1998 MFGR#1212 ... 最后再查询一个 Q4.3 SQL: Flink SQL> select d_year, s_city, p_brand, sum(lo_revenue) - sum(lo_supplycost) as profit > from p_lineorder > left join dates on lo_orderdate = d_datekey > left join customer on lo_custkey = c_custkey > left join supplier on lo_suppkey = s_suppkey > left join part on lo_partkey = p_partkey > where c_region = 'AMERICA'and s_nation = 'UNITED STATES' > and (d_year = 1997 or d_year = 1998) > and p_category = 'MFGR#14' > group by d_year, s_city, p_brand > order by d_year, s_city, p_brand; d_year s_city p_brand profit 1998 UNITED ST9 MFGR#1440 6665681 如果读者感兴趣的话,可以查询剩余的 SQL,当然也可以和 Spark SQL 进行比较。另外 Flink SQL 也支持 EXPLAIN,查询 SQL 的执行计划。 ■ 2.4 创建视图 同样,可以在 Flink SQL CLI 中创建和删除视图,如下: Flink SQL> create view p_lineorder2 as > select lo_orderkey, > lo_linenumber, > lo_custkey, > lo_partkey, > lo_suppkey, > lo_orderdate, > lo_orderpriotity, > lo_shippriotity, > lo_quantity, > lo_extendedprice, > lo_ordtotalprice, > lo_discount, > lo_revenue, > lo_supplycost, > lo_tax, > lo_commitdate, > lo_shipmode, > lo_extendedprice * lo_discount as v_revenue > from ssb.lineorder; [INFO] View has been created. 这里笔者需要特别强调的是,目前 Flink 无法删除 Hive 中的视图: Flink SQL> drop view p_lineorder; [ERROR] Could not execute SQL statement. Reason: The given view does not exist in the current CLI session. Only views created with a CREATE VIEW statement can be accessed. ■ 2.5 分区操作 Hive 数据库中创建一张分区表: CREATE TABLE IF NOT EXISTS flink_partition_test ( id int, name string ) PARTITIONED BY (day string, type string) stored as textfile; 接着,通过 Flink SQL 插入和查询数据: # 插入静态分区的数据 Flink SQL> INSERT INTO flink_partition_test PARTITION (type='Flink', `day`='2020-02-01') SELECT 100001, 'Flink001'; # 查询 Flink SQL> select * from flink_partition_test; id name day type 100001 Flink001 2020-02-01 Flink # 插入动态分区 Flink SQL> INSERT INTO flink_partition_test SELECT 100002, 'Spark', '2020-02-02', 'SparkSQL'; # 查询 Flink SQL> select * from flink_partition_test; id name day type 100002 Spark 2020-02-02 SparkSQL 100001 FlinkSQL 2020-02-01 Flink # 动态和静态分区结合使用类似,不再演示 # 覆盖插入数据 Flink SQL> INSERT OVERWRITE flink_partition_test PARTITION (type='Flink') SELECT 100002, 'Spark', '2020-02-08', 'SparkSQL-2.4'; id name day type 100002 Spark 2020-02-02 SparkSQL 100001 FlinkSQL 2020-02-01 Flink 字段 day 在 Flink 属于关键字,要特殊处理。 ■ 2.6 其他功能

  • 2.6.1 函数

Flink SQL 支持内置的函数和自定义函数。对于内置的函数,可以执行 show functions 进行查看,这一块笔者以后会单独介绍如何创建自定义函数。

  • 2.6.2 设置参数

Flink SQL 支持设置环境参数,可以使用 set 命令查看和设置参数: Flink SQL> set; deployment.gateway-address= deployment.gateway-port=0 deployment.m=yarn-cluster deployment.response-timeout=5000 deployment.yjm=1024 deployment.yn=2 deployment.ys=5 deployment.ytm=2048 execution.current-catalog=staginghive execution.current-database=ssb execution.max-idle-state-retention=0 execution.max-parallelism=128 execution.max-table-result-rows=1000000 execution.min-idle-state-retention=0 execution.parallelism=1 execution.periodic-watermarks-interval=200 execution.planner=blink execution.restart-strategy.type=fallback execution.result-mode=table execution.time-characteristic=event-time execution.type=batch Flink SQL> set deployment.yjm = 2048; 总结 在本文中,笔者通过 Flink SQL 比较详细地去操作 Hive 数据库,以及 Flink SQL 提供的一些功能。 当然,目前 Flink SQL 操作 Hive 数据库还是存在一些问题:

  • 目前只支持 TextFile 存储格式,还无法指定其他存储格式 只支持 Hive 数据库中 TextFile 存储格式的表,而且 row format serde 是 org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe。虽然实现了 RCFile、ORC、Parquet、Sequence 等存储格式,但是无法自动识别 Hive 表的存储格式。如果要使用其他存储格式,需要修改源码,重新编译。不过社区已经对这些存储格式进行了测试,相信不久以后就可以在 Flink SQL 中使用。

  • OpenCSVSerde 支持不完善 如果读者使用 TextFile 的 row format serde 为 org.apache.hadoop.hive.serde2.OpenCSVSerde 时,无法正确识别字段类型,会把 Hive 表的字段全部映射为 String 类型。

  • 暂时不支持 Bucket 表

  • 暂时不支持 ACID 表

  • Flink SQL 优化方面功能较少

  • 权限控制方面 这方面和 Spark SQL 类似,目前基于 HDFS ACL 控制,暂时还没有实现 Sentry 或 Ranger 控制权限,不过目前 Cloudera 正在开发基于 Ranger 设置 Spark SQL 和 Hive 共享访问权限的策略,实现行/列级控制以及审计信息。

Flink 社区发展很快,所有这些问题只是暂时的,随着新版本的发布会被逐个解决。 如果 Flink SQL 目前不满足的需求,建议使用 API 方式来解决问题。

原文链接
本文为云栖社区原创内容,未经允许不得转载。

Flink 与 Hive 的磨合期相关推荐

  1. SmartNews:基于 Flink 加速 Hive 日表生产的实践

    简介: 将 Flink 无缝地集成到以 Airflow 和 Hive 为主的批处理系统的技术挑战和应对方案. 本文介绍了 SmartNews 利用 Flink 加速 Hive 日表的生产,将 Flin ...

  2. cdh hive on spark_Flink 与 Hive 的磨合期

    有不少读者反馈,参考上篇文章<Hive 终于等来了 Flink>部署 Flink 并集成 Hive 时,出现一些 bug 以及兼容性等问题.虽已等来,却未可用.所以笔者增加了这一篇文章,作 ...

  3. 【Flink】未解决 FLink 写 hive MemoryManager New Memory allocation smaller than the minimum allocation size

    1.概述 flink 写 hive 分区表,parquet 格式,然后报错如下 Caused by: org.apache.parquet.hadoop.MemoryManager$1: New Me ...

  4. Flink on Hive构建流批一体数仓

    Flink使用HiveCatalog可以通过批或者流的方式来处理Hive中的表.这就意味着Flink既可以作为Hive的一个批处理引擎,也可以通过流处理的方式来读写Hive中的表,从而为实时数仓的应用 ...

  5. flink写入hive的时区问题

    概述 本文主要对[3]进行复现和阐述 环境版本 组件 版本 Hadoop 3.1.2 Hive 2.3.6 Flink 1.12.0 Zookeeper 3.6.0 ################# ...

  6. Flink实战(八十五):flink-sql使用(十二)Flink 与 hive 结合使用(四)Hive Read Write

    0 简介 Using the HiveCatalog and Flink's connector to Hive, Flink can read and write from Hive data as ...

  7. Flink学习4-流式SQL

    Flink学习4-流式SQL Flink系列文章 更多Flink系列文章请点击Flink系列文章 更多大数据文章请点击大数据好文推荐 摘要 介绍Flink Table Sql API相关概念,还会提供 ...

  8. 2021年大数据Flink(三十八):​​​​​​​Table与SQL ​​​​​​案例五 FlinkSQL整合Hive

    目录 案例五 FlinkSQL整合Hive 介绍 集成Hive的基本方式 准备工作 1.添加hadoop_classpath 2.下载jar并上传至flink/lib目录 3.修改hive配置 4.启 ...

  9. 性能提升约 7 倍!Apache Flink 与 Apache Hive 的集成

    导读:随着 Flink 在流式计算的应用场景逐渐成熟和流行,如果 Flink 能同时把批量计算的应用场景处理好,就能减少用户在使用 Flink 时开发和维护的成本,并且能够丰富 Flink 的生态.S ...

最新文章

  1. c++ lambda函数_C++ Lambda表达式
  2. numpy.argwhere 返回的为索引值的array
  3. bn层初始化参数_神经网络参数初始化方式
  4. Django 笔记5 -- 数据库
  5. python多线程并行编程,Python并行编程(二):基于线程的并行
  6. JVM内存管理机制和垃圾回收机制
  7. LoadRunner常用函数(转)
  8. observable java_Observable基本用法(RxJava)
  9. 中国移动将于11月4日首发上会 拟募资560亿元
  10. html中input描述,input的type值类型和描述-HTML
  11. vue router name命名规范_vue-router使用
  12. 21. Django进阶:内建用户系统
  13. 千兆网线和百兆网线可以通用吗?
  14. Linux的LCD驱动
  15. 2008新版眼保健操图解
  16. 分类问题中正负样本分布不均衡问题的解决方法
  17. Windows server 2008 安装Hyper-V
  18. 在控制面板找不到程序的情况下,卸载流氓软件
  19. Android版本9华为,华为应用市场旧版本下载-华为应用市场老版v9.0.0.303 安卓版 - 极光下载站...
  20. 婴儿脸上起湿疹吃什么好

热门文章

  1. python数组排序sort_详解python中sort排序使用
  2. python把浮点数转换成16进制_Python将colorsys RGB坐标转换为十六进制
  3. python写选择排序_如何快速掌握python选择排序算法?
  4. Ubuntu下的Linux内核的编译及安装
  5. android fragment 管理器,Android Fragment 與 Fragment管理器
  6. linux curl 编译命令,linux 编译 curl 出错
  7. mysql b 树原因_复习系列之数据库(四):MySQL为什么采用B+树作为索引结构?
  8. python enumerate函数_Python中enumerate函数用法详解
  9. 打开fiddler后打不开网页_如何通过fiddler的导入导出功能,保存一份分类管理的请求报文...
  10. mysql utf8mb4 造成慢_mysql使用utf8mb4经验吐血总结