2、两种 planner(old & blink)的区别

  1. 批流统一:Blink 将批处理作业,视为流式处理的特殊情况。所以,blink 不支持表和DataSet 之间的转换,批处理作业将不转换为 DataSet 应用程序,而是跟流处理一样,转换为 DataStream 程序来处理。
  2. 因 为 批 流 统 一 , Blink planner 也 不 支 持 BatchTableSource , 而 使 用 有 界 的
  3. Blink planner 只支持全新的目录,不支持已弃用的 ExternalCatalog。
  4. 旧 planner 和 Blink planner 的 FilterableTableSource 实现不兼容。旧的 planner 会把PlannerExpressions 下推到 filterableTableSource 中,而 blink planner 则会把 Expressions 下推。
  5. 基于字符串的键值配置选项仅适用于 Blink planner。
  6. PlannerConfig 在两个 planner 中的实现不同。
  7. Blink planner 会将多个 sink 优化在一个 DAG 中(仅在 TableEnvironment 上受支持,而在 StreamTableEnvironment 上不受支持)。而旧 planner 的优化总是将每一个 sink 放在一个新的 DAG 中,其中所有 DAG 彼此独立。
  8. 旧的 planner 不支持目录统计,而 Blink planner 支持。

老版本创建流处理批处理

7、 老版本创建流处理批处理

7.1 老版本流处理

val settings = EnvironmentSettings.newInstance().useOldPlanner()    // 使用老版本 planner.inStreamingMode()  // 流处理模式.build()
val tableEnv = StreamTableEnvironment.create(env, settings)

7.2 老版本批处理

val batchEnv = ExecutionEnvironment.getExecutionEnvironment
val batchTableEnv = BatchTableEnvironment.create(batchEnv)

7.3 blink 版本的流处理环境

val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)

7.4 blink 版本的批处理环境

val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings) 

3、连接文件系统,创建hive catalog,对表进行操作,类似于Spark on Hive,flink可以直接获取Hive的元数据,并使用flink进行计算。

        // 连接外部文件bbTableEnv.connect(new FileSystem().path("file:///E:/d.txt")).withFormat(new Csv().fieldDelimiter(',')).withSchema(new Schema().field("id", DataTypes.STRING())).createTemporaryTable("output");// 设置 hive 方言bbTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);// 获取hive-site.xml目录String hiveConfDir = Thread.currentThread().getContextClassLoader().getResource("").getPath().substring(1);HiveCatalog hive = new HiveCatalog("hive", "warningplatform", hiveConfDir);bbTableEnv.registerCatalog("hive", hive);bbTableEnv.useCatalog("hive");bbTableEnv.useDatabase("warningplatform");bbTableEnv.executeSql("insert into  test select id from    default_catalog.default_database.output");
  • 通过bbTableEnv.connect()去创建临时表的方式已经过时了,建议使用bbTableEnv.executeSql()的方式,通过DDL去创建临时表,临时表到底是属于哪一个catalog目前还不太确定,到底是什么规则目前还不清楚。 查资料得知,临时表与单个Flink会话的生命周期相关,临时表始终存储在内存中。 永久表需要一个catalog来管理表对应的元数据,比如hive metastore,该表将一直存在,直到明确删除该表为止。 因此猜测:default_catalog是存储在内存中,如果在切换成hive catalog之前创建临时表,那我们就可以使用default_catalog.default_database.tableName来获取这个临时表。 如果切换了catalog再去创建临时表,那我们就无法获取到临时表了,因为它不在default_catalog中,而且保存在内存里面,直接查询临时表会去当前的catalog里面去查找临时表,因此一定要在default_catalog 里面创建临时表。 而临时视图好像是存储在当前的catalog里面

  • 通过bbTableEnv.createTemporaryView()创建的视图则是属于当前的database的

bbTableEnv.createTemporaryView("output",bbTableEnv.sqlQuery("select * from default_catalog.default_database.output"));

  • 注意1.11版本的执行sql的方法发生了改变,通过执行环境的executeSql(),executeInsert()等来进行插入或者执行sql语句

public static void main(String[] args) throws Exception {ParameterTool params = ParameterTool.fromArgs(args);EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner() // 使用BlinkPlanner.inBatchMode() // Batch模式,默认为StreamingMode.build();TableEnvironment tableEnv = TableEnvironment.create(settings);   String name = "myhive";                                 // Catalog名称,定义一个唯一的名称表示String defaultDatabase = params.get("defaultDatabase"); // 默认数据库名称String hiveConfDir = params.get("hiveConf");            // hive-site.xml路径String version = "2.1.1";                               // Hive版本号HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);tableEnv.registerCatalog("myhive", hive);tableEnv.useCatalog("myhive");TableResult result;String SelectTables_sql ="select * from test.testdata";result = tableEnv.executeSql(SelectTables_sql);result.print();
}

四、启动集群,提交任务

1、进入flink的目录下执行 bin/start-cluster.sh

2、登录rest.address : rest.port查看网页是否正常运行

3、执行命令提交任务

flink/flink-1.13.1/bin/flink run -c org.example.FlinkHiveIntegration flink/job/flinkcommonjob-1.1.jar -hiveConf /etc/hive/conf.cloudera.hive/ -defaultDatabase test

五、报错处理

Caused by: java.lang.NoClassDefFoundError: Lorg/apache/hadoop/mapred/JobConf;

解决方法:补充hadoop-mapreduce-client-core-3.0.0.jar包

6、案例 (新)

需求: 将一个txt文本文件作为输入流读取数据过滤id不等于sensor_1的数据
实现思路: 首先我们先构建一个table的env环境通过connect提供的方法来读取数据然后设置表结构将数据注册为一张表就可进行我们的数据过滤了(使用sql或者流处理方式进行解析)

准备数据

sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
object FlinkSqlTable {def main(args: Array[String]): Unit = {// 构建运行流处理的运行环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 构建table环境val tableEnv = StreamTableEnvironment.create(env)//通过 connect 读取数据tableEnv.connect(new FileSystem().path("D:\\d12\\Flink\\FlinkSql\\src\\main\\resources\\sensor.txt")).withFormat(new Csv()) //设置类型.withSchema(new Schema() // 给数据添加元数信息.field("id", DataTypes.STRING()).field("time", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE())).createTemporaryTable("inputTable")  // 创建一个临时表val resTable = tableEnv.from("inputTable").select("*").filter('id === "sensor_1")// 使用sql的方式查询数据var resSql = tableEnv.sqlQuery("select * from inputTable where id='sensor_1'")// 将数据转为流进行输出resTable.toAppendStream[(String, Long, Double)].print("resTable")resSql.toAppendStream[(String, Long, Double)].print("resSql")env.execute("FlinkSqlWrodCount")}
}

6、TableEnvironment 的作用
注册 catalog
在内部 catalog 中注册表
执行 SQL 查询
注册用户自定义函数
注册用户自定义函数
保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
         在创建 TableEnv 的时候,可以多传入一个 EnvironmentSettings 或者 TableConfig 参数,可以用来配置 TableEnvironment 的一些特性。

Flink SQL 连接Hive并写入/读取数据相关推荐

  1. python链接mysql 判断是否成功_python连接mysql数据库并读取数据的实现

    1.安装pymysql包 pip install pymysql 注: MySQLdb只支持python2,pymysql支持python3 2.连接数据 import pymysql import ...

  2. STM32F103C8T6通过内部Flash写入读取数据,模拟EEPROM(附代码)

    STM32F103C8T6通过内部Flash写入读取数据,模拟EEPROM(附代码) 优点: 1. 模块化编程,方便移植,集成度高: 2. 拿来直接用 Flash空间定定义 主函数初始化已经Flash ...

  3. Flink SQL 以catalog方式写入HIVE

    Flink 可以通过连接hive catalog的形式向hive写入数据. 重点!! hive 必须有以下属性: 'transactional' = 'false' 'sink.partition-c ...

  4. Spark连接MySQL数据库并读取数据

    (作者:陈玓玏) 打开pyspark,带驱动的那种 用命令行启动pyspark时需要加上jdbc的驱动路径: pyspark --driver-class-path D:/Users/chendile ...

  5. 【Flink 实战系列】Flink SQL 使用 filesystem connector 同步 Kafka 数据到 HDFS(parquet 格式 + snappy 压缩)

    Flink SQL 同步 Kafka 数据到 HDFS(parquet + snappy) 在上一篇文章中,我们用 datastream API 实现了从 Kafka 读取数据写到 HDFS 并且用 ...

  6. Flink Sql双流join-回撤机制导致数据重复

    数据源:kafka topic数据类型为json 业务场景:解析json,由于数据层级比较深,而且包含数组,此处键值可能为空 所以数据就变成了双流join,甚至是多流join. 问题原因:采用left ...

  7. 【错误记录】Android 应用连接 BLE 设备无法读取数据 ( 可以写出数据 | 无法读取数据 )

    文章目录 一.问题描述 二.问题分析 三.完整设置代码 一.问题描述 Android 应用连接 BLE 硬件设备后 , 出现如下情况 : 发送数据成功 : Android 应用 向 BLE 硬件设备发 ...

  8. Spark SQL连接Hive的一些常见报错

    错误一:Unable to locate hive jars to connect to metastore. Please set spark.sql.hive.metastore.jars. 解决 ...

  9. 【自用】MySQL for Mac创建表、写入读取数据及备份与数据恢复(2)

    实验环境:macOS 11.6.4 一.在新创建的数据库下增删表.写入并显示数据 首先登陆数据库,在mysql数据库提示符下输入以下命令: 1.创建数据库temp create database te ...

  10. flink sql连接mysql

    [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: Cou ...

最新文章

  1. [bzoj1042][HAOI2008]硬币购物
  2. Spring Cloud Finchley版中Consul多实例注册的问题处理
  3. java学习笔记(七)----异常
  4. MySQL CHAR、VARCHAR、TEXT、ENUM、SET(字符串类型)
  5. WordPress后台的文章、分类,媒体,页面,评论,链接等所有信息中显示ID并将ID设置为第一列...
  6. |洛谷|动态规划|P1164 小A点菜
  7. 信奥中的数学:计算几何
  8. js实现网页跳转脚本
  9. mysql查询表变量_MySQL 之SQL语言、表库操作、查询及变量精讲
  10. 心得2-类图、关联关系介绍.doc
  11. c++万能头文件包含的头文件
  12. 美国 ZIP Code 一览表
  13. python程序写诗_用Python光速为你写诗
  14. 89c51汇编语言波形发生器,51单片机汇编语言实现波形发生器.docx
  15. discuz怎么自定义php,Discuz添加自定义数据调用模块
  16. 数据结构与算法之美 | 别怕,有我!KMP 算法详解
  17. html+php内联执行JavaScript是报错
  18. 移动互联网下一章(转载)
  19. mysql 失败 捕获 net core_从NetCore报错到MySql安全
  20. 《第31天:JQuery - 轮播图》

热门文章

  1. 數據驅動的到底是什麼
  2. vue项目中使用ttf字体
  3. 87键键盘insert键使用方法
  4. 初秋进补 粥汤大对决
  5. 即时通讯IM技术领域提高篇
  6. H-大时钟(扩展欧几里得)
  7. 3、低功耗蓝牙(BLE)配对和解绑
  8. 蓝牙学习笔记之SMP协议(十四)
  9. 计算机改变世界英语作文,2013年3月3日托福独立写作范文:年轻人改变世界(英文版)...
  10. 一元三次方程求根公式推导