目录

  • Flinksql入门前的概念
    • 物化视图
    • 动态表和连续查询
    • 在流上定义表
    • 连续查询
    • 更新和追加查询
    • 查询限制
    • 表到流的转换
  • 初步实现Flinksql
    • 导入依赖
    • 实现Flinksql
  • 进一步实现Flinksql,全程sql
    • flink中的print表
    • 将读取的数据放入MySQL
      • 不按规范数据格式输入的处理办法
    • 集群中提交代码

Flinksql入门前的概念

想要搞清楚flink如何使用sql进行处理,我们首先要搞清楚动态表的概念,我们可以先进入flink官网查看,hive中的表是静态的,而这里处理的数据是流式,处理的表是动态的

物化视图

尽管存在这些差异,但是使用关系查询和 SQL 处理流并不是不可能的。高级关系数据库系统提供了一个称为 物化视图(Materialized Views) 的特性。物化视图被定义为一条 SQL 查询,就像常规的虚拟视图一样。与虚拟视图相反,物化视图缓存查询的结果,因此在访问视图时不需要对查询进行计算。缓存的一个常见难题是防止缓存为过期的结果提供服务。当其定义查询的基表被修改时,物化视图将过期。 即时视图维护(Eager View Maintenance) 是一种一旦更新了物化视图的基表就立即更新视图的技术。
原表只要有更新,物化视图中的数据也会有更新

动态表和连续查询

动态表 是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个 连续查询 。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。本质上,动态表上的连续查询非常类似于定义物化视图的查询。
动态表首先是一个逻辑概念。在查询执行期间不一定(完全)物化动态表
需要注意的是,连续查询的结果在语义上总是等价于以批处理模式在输入表快照上执行的相同查询的结果。

在动态表上做的查询叫做连续查询,这个查询不会结束,会一直出结果,输出的也是一张动态表;所以flinksql在流和流之间做了sql的api,先在流上做一个动态表,在动态表上做连续查询,再转化成动态表,再转换成流;中间的过程通过SQL实现

在流上定义表

为了使用关系查询处理流,必须将其转换成 Table。从概念上讲,流的每条记录都被解释为对结果表的 INSERT 操作。本质上我们正在从一个 INSERT-only 的 changelog 流构建表。
每来一个数据,都相当于做一个insert操作
流上的表并没有被存起来,只是一个概念

连续查询

在动态表上计算一个连续查询,并生成一个新的动态表。与批处理查询不同,连续查询从不终止,并根据其输入表上的更新更新其结果表。在任何时候,连续查询的结果在语义上与以批处理模式在输入表快照上执行的相同查询的结果相同。
也就是说流处理可以理解为批处理的一个快照,在某一时刻,将流处理的数据,使用批处理的方式处理,也是一样的结果

flink中的数据在左侧是一个动态表,是在实时更新的,右侧的数据是根据左侧的数据做出的结果,当有相同的数据需要进行处理的时候,flink的做法是将相同的数据撤回去(在前面加一个-号),也就是相当于先删除,再添加,之前的计算结果,放进状态

更新和追加查询

虽然这两个示例查询看起来非常相似(都计算分组计数聚合),但它们在一个重要方面不同:第一个查询更新先前输出的结果,即定义结果表的 changelog 流包含 INSERT 和 UPDATE 操作。
第二个查询只附加到结果表,即结果表的 changelog 流只包含 INSERT 操作。
一个查询是产生一个只追加的表还是一个更新的表有一些含义:
产生更新更改的查询通常必须维护更多的状态(请参阅以下部分)。
将 append-only 的表转换为流与将已更新的表转换为流是不同的
只进行更新操作和进行追加操作,是两种不同的流

查询限制

许多(但不是全部)语义上有效的查询可以作为流上的连续查询进行评估。有些查询代价太高而无法计算,这可能是由于它们需要维护的状态大小,也可能是由于计算更新代价太高。

状态大小: 连续查询在无界流上计算,通常应该运行数周或数月。因此,连续查询处理的数据总量可能非常大。必须更新先前输出的结果的查询需要维护所有输出的行,以便能够更新它们。例如,第一个查询示例需要存储每个用户的 URL 计数,以便能够增加该计数并在输入表接收新行时发送新结果。如果只跟踪注册用户,则要维护的计数数量可能不会太高。但是,如果未注册的用户分配了一个惟一的用户名,那么要维护的计数数量将随着时间增长,并可能最终导致查询失败。

SELECT user, COUNT(url)
FROM clicks
GROUP BY user;

计算更新: 有些查询需要重新计算和更新大量已输出的结果行,即使只添加或更新一条输入记录。显然,这样的查询不适合作为连续查询执行。下面的查询就是一个例子,它根据最后一次单击的时间为每个用户计算一个 RANK。一旦 click 表接收到一个新行,用户的 lastAction 就会更新,并必须计算一个新的排名。然而,由于两行不能具有相同的排名,所以所有较低排名的行也需要更新。

SELECT user, RANK() OVER (ORDER BY lastLogin)
FROM (
SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
);

也就是说,有些SQL需要遍历整张很大的表,性能就很低,这些数据是存在状态中的,维护成本很大

表到流的转换

动态表可以像普通数据库表一样通过 INSERT、UPDATE 和 DELETE 来不断修改。它可能是一个只有一行、不断更新的表,也可能是一个 insert-only 的表,没有 UPDATE 和 DELETE 修改,或者介于两者之间的其他表。
在将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink的 Table API 和 SQL 支持三种方式来编码一个动态表的变化:

  • Append-only 流: 仅通过 INSERT 操作修改的动态表可以通过输出插入的行转换为流。(这种状态指的是只进行了插入操作的表转换成的流,比如将学生信息插入表中
  • Retract 流: retract 流包含两种类型的 message: add messages 和 retract messages 。通过将INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message、将 UPDATE 操作编码为更新(先前)行的 retract message 和更新(新)行的 add message,将动态表转换为 retract 流。下图显示了将动态表转换为 retract 流的过程。(指的是更新的流,包含insert和delete
  • Upsert 流: upsert 流包含两种类型的 message: upsert messages 和delete messages。转换为 upsert 流的动态表需要(可能是组合的)唯一键。通过将 INSERT 和 UPDATE 操作编码为 upsert message,将 DELETE 操作编码为 delete message ,将具有唯一键的动态表转换为流。消费流的算子需要知道唯一键的属性,以便正确地应用 message。与 retract 流的主要区别在于 UPDATE 操作是用单个 message 编码的,因此效率更高。下图显示了将动态表转换为 upsert 流的过程。

初步实现Flinksql

导入依赖

这里出现了阿里贡献的blink,是作为Planner(SQL解析器)出现的

从官网中可以查询到我们需要的依赖
Scala和Java的编译器

<!-- Either... -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>1.11.2</version><scope>provided</scope>
</dependency>
<!-- or... -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>1.11.2</version><scope>provided</scope>
</dependency>

blink计划器

<!-- or.. (for the new Blink planner) -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>1.11.2</version><scope>provided</scope>
</dependency>

table的依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.11.2</version><scope>provided</scope>
</dependency>

自定义函数、自定义格式解析Kafka数据的依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>1.11.2</version><scope>provided</scope>
</dependency>

实现Flinksql

可以进入官网直接先拿去环境创建的模板


在这里的flink的流处理、flink的批处理,blink的流处理,blink的批处理,选择blink的流处理

// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentval fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings)
// or val fsTableEnv = TableEnvironment.create(fsSettings)// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironmentval fbEnv = ExecutionEnvironment.getExecutionEnvironment
val fbTableEnv = BatchTableEnvironment.create(fbEnv)// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentval bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
// or val bsTableEnv = TableEnvironment.create(bsSettings)// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)

代码:

package com.shujia.SQLimport org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Rowobject Demo1TableApi {def main(args: Array[String]): Unit = {val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner() //使用blink的计划器.inStreamingMode() //使用流处理模型.build()//创建table的环境val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)//构建一个流val linesDS: DataStream[String] = bsEnv.socketTextStream("master",8888)//将流转换成动态表 可以指定字段//这个table类似于spark中的dataframe,可以使用dsl的api,flink中的dsl不是很友好,不使用val table: Table = bsTableEnv.fromDataStream(linesDS,$"word")//注册一张表bsTableEnv.createTemporaryView("words",table)//在动态表上进行连续查询val countTable: Table = bsTableEnv.sqlQuery("""|select word,count(1) from words group by word""".stripMargin)//更新流//将结果表转换成流//多了一个boolean,为true就是插入,为false就是删除val resultDS: DataStream[(Boolean, Row)] = countTable.toRetractStream[Row]resultDS.print()bsEnv.execute()}
}

true表示输入了数据,false表示删掉了数据

进一步实现Flinksql,全程sql

可以首先进入官网查看,这里通过table的方式直接连接Kafka

比如我们这里可以通过table的方式直接连接kafka

这里也可以修改读取的不同数据来源的格式(这里举例的是csv格式,官网可查看,直接加参数可修改

package com.shujia.SQLimport org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Rowobject Demo2FlinksqlOnKafka {def main(args: Array[String]): Unit = {//flinksql的环境val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner() //使用blink的计划器.inStreamingMode() //使用流处理模型.build()//创建table的环境val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)//两种格式,一种是executesql直接写sql返回一个结果,一种是sqlquery,返回一个表/*** 基于Kafka的topic创建动态表* 这里读取的格式是csv格式,默认就是逗号分割,当然可以修改,官网可查看*/bsTableEnv.executeSql("""||CREATE TABLE student (| id STRING,| name STRING,| age BIGINT,| gender STRING,| clazz STRING|) WITH (| 'connector' = 'kafka',| 'topic' = 'student2',| 'properties.bootstrap.servers' = 'master:9092',| 'properties.group.id' = 'abc',| 'format' = 'csv',| 'scan.startup.mode' = 'earliest-offset'|)|""".stripMargin)val countTable: Table = bsTableEnv.sqlQuery("""select clazz,count(1) from student group by clazz""".stripMargin)//更新流countTable.toRetractStream[Row].print()bsEnv.execute()}
}

但是这里在将代码写完之后会出现一个问题

这里的意思是,我们试着读取csv格式的文件,但是我们这里没有csv的依赖,需要导入csv的依赖jar包

结果出来:

flink中的print表

在之前我们在写完sql想要输出结果的时候,都需要将表转换成流,再进行输出,在这里,flink为我们提供了一种print表,可以供我们方便的调试

package com.shujia.SQLimport org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._object Demo2FlinksqlOnKafka {def main(args: Array[String]): Unit = {//flinksql的环境val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner() //使用blink的计划器.inStreamingMode() //使用流处理模型.build()//创建table的环境val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)//两种格式,一种是executesql直接写sql返回一个结果,一种是sqlquery,返回一个表/*** 基于Kafka的topic创建动态表* 这里读取的格式是csv格式,默认就是逗号分割,当然可以修改,官网可查看*/bsTableEnv.executeSql("""||CREATE TABLE student (| id STRING,| name STRING,| age BIGINT,| gender STRING,| clazz STRING|) WITH (| 'connector' = 'kafka',| 'topic' = 'student2',| 'properties.bootstrap.servers' = 'master:9092',| 'properties.group.id' = 'abc',| 'format' = 'csv',| 'scan.startup.mode' = 'earliest-offset'|)|""".stripMargin)/*** 用于测试打印的print表*/bsTableEnv.executeSql("""|CREATE TABLE print_table (| clazz STRING,| c BIGINT|) WITH (| 'connector' = 'print'|)""".stripMargin)/*** 执行sql查询,将结果保存到另一张表中*/bsTableEnv.executeSql("""|insert into print_table|select clazz,count(1) as c from student group by clazz""".stripMargin)}
}

将读取的数据放入MySQL

(使用flinksql读取Kafka的数据,统计班级人数,放入MySQL)

我们也可以从官网找到如何使用jdbc连接MySQL(全程sql)

先导入依赖,再使用下面的范例直接连接

package com.shujia.SQLimport org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._object Demo2FlinksqlOnKafka {def main(args: Array[String]): Unit = {//flinksql的环境val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner() //使用blink的计划器.inStreamingMode() //使用流处理模型.build()//创建table的环境val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)//两种格式,一种是executesql直接写sql返回一个结果,一种是sqlquery,返回一个表/*** 基于Kafka的topic创建动态表* 这里读取的格式是csv格式,默认就是逗号分割,当然可以修改,官网可查看*/bsTableEnv.executeSql("""||CREATE TABLE student (| id STRING,| name STRING,| age BIGINT,| gender STRING,| clazz STRING|) WITH (| 'connector' = 'kafka',| 'topic' = 'student2',| 'properties.bootstrap.servers' = 'master:9092',| 'properties.group.id' = 'abc',| 'format' = 'csv',| 'scan.startup.mode' = 'earliest-offset'|)|""".stripMargin)/*** 用于测试打印的print表*/
//    bsTableEnv.executeSql(
//      """
//        |CREATE TABLE print_table (
//        | clazz STRING,
//        | c BIGINT
//        |) WITH (
//        | 'connector' = 'print'
//        |)
//      """.stripMargin)/*** 执行sql查询,将结果保存到另一张表中*/
//      bsTableEnv.executeSql(
//      """
//         |insert into print_table
//         |select clazz,count(1) as c from student group by clazz
//      """.stripMargin)/*** 用于写入MySQL数据库的表*   PRIMARY KEY (id) NOT ENFORCED*/bsTableEnv.executeSql("""|CREATE TABLE students (|  clazz STRING,|  c BIGINT,|  PRIMARY KEY (clazz) NOT ENFORCED|) WITH (|   'connector' = 'jdbc',|   'url' = 'jdbc:mysql://192.168.5.201:3306/student?useUnicode=true&characterEncoding=utf-8',|   'table-name' = 'student1',|   'username'='root',|   'password'='123456'|)""".stripMargin)bsTableEnv.executeSql("""|insert into students|select clazz,count(1) as c from student group by clazz""".stripMargin)}
}

这里记得先在MySQL中将clazz设置为主键

结果:

这里我们还可以看到,我们在Kafka生产端生产数据,消费端会出来数据,而且MySQL数据库中的结果也会实时进行更新
生产端生产了一个数据:

消费端出来了数据:

MySQL端也进行了更新:106到了107

不按规范数据格式输入的处理办法

如果我们输入的数据格式不符合MySQL中数据格式的要求,程序就会直接终止,显示无法解析我们输入的数据的格式

这里我们继续看官网,这里给了我们一个参数,解析异常时我们可以跳过该条数据,将参数设置为true即可

但我们修改完成之后,这里还是会出现一个错误,因为它遇到异常的时候会将我们的数据解析为null,而我们的主键要为非空,所以这里还需做一些修改

这里说的是我们需要设置configuration,那我们new一个出来

val configuration: Configuration = new Configuration()configuration.setString("table.exec.sink.not-null-enforcer","drop")

这里有两个地方可以添加configuration
1、把环境从get变成create


2、

我们选择第二种

修改过后,这里随便输入也不会有问题了

整体代码:

package com.shujia.SQLimport org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._object Demo2FlinksqlOnKafka {def main(args: Array[String]): Unit = {val configuration: Configuration = new Configuration()configuration.setString("table.exec.sink.not-null-enforcer","drop")//flinksql的环境val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner() //使用blink的计划器.inStreamingMode() //使用流处理模型.build()//创建table的环境val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)bsTableEnv.getConfig.addConfiguration(configuration)//两种格式,一种是executesql直接写sql返回一个结果,一种是sqlquery,返回一个表/*** 基于Kafka的topic创建动态表* 这里读取的格式是csv格式,默认就是逗号分割,当然可以修改,官网可查看*/bsTableEnv.executeSql("""||CREATE TABLE student (| id STRING,| name STRING,| age BIGINT,| gender STRING,| clazz STRING|) WITH (| 'connector' = 'kafka',| 'topic' = 'student2',| 'properties.bootstrap.servers' = 'master:9092',| 'properties.group.id' = 'abc',| 'format' = 'csv',| 'csv.ignore-parse-errors' = 'true',| 'scan.startup.mode' = 'latest-offset'|)|""".stripMargin)/*** 用于测试打印的print表*/
//    bsTableEnv.executeSql(
//      """
//        |CREATE TABLE print_table (
//        | clazz STRING,
//        | c BIGINT
//        |) WITH (
//        | 'connector' = 'print'
//        |)
//      """.stripMargin)/*** 执行sql查询,将结果保存到另一张表中*/
//      bsTableEnv.executeSql(
//      """
//         |insert into print_table
//         |select clazz,count(1) as c from student group by clazz
//      """.stripMargin)/*** 用于写入MySQL数据库的表*   PRIMARY KEY (id) NOT ENFORCED*/bsTableEnv.executeSql("""|CREATE TABLE students (|  clazz STRING,|  c BIGINT,|  PRIMARY KEY (clazz) NOT ENFORCED|) WITH (|   'connector' = 'jdbc',|   'url' = 'jdbc:mysql://192.168.5.201:3306/student?useUnicode=true&characterEncoding=utf-8',|   'table-name' = 'student1',|   'username'='root',|   'password'='123456'|)""".stripMargin)bsTableEnv.executeSql("""|insert into students|select clazz,count(1) as c from student group by clazz""".stripMargin)}
}

集群中提交代码

打包到目录,直接运行

先导入依赖

提交集群运行flink
flink run -c com.shujia.SQL.Demo2FlinksqlOnKafka flink-1.0-SNAPSHOT.jar

正常运行

flinksql也可以实现从各种数据源读取数据,写入别的数据源,在其中做一些变换,在这边的都可以用,要多看官网

感谢阅读,我是啊帅和和,一位大数据专业大四学生,祝你快乐。

Flink sql入门篇-概念、初步实现相关推荐

  1. Flink 最锋利的武器:Flink SQL 入门和实战

    学习路径:<2021年最新从零到大数据专家学习路径指南> 面      试:<2021年最新版大数据面试题全面开启更新> [注意]:Flink1.9版本后的Flink SQL使 ...

  2. 力扣sql入门篇(五)

    力扣sql入门篇(五) 1 组合两个表 1.1 题目内容 1.1.1 基本题目信息 1.1.2 示例输入输出 1.2 示例sql语句 SELECT firstname,lastname,IFNULL( ...

  3. Flink最锋利的武器:Flink SQL入门和实战 | 附完整实现代码

    作者 | 机智的王知无 转载自大数据技术与架构(ID: import_bigdata) 一.Flink SQL 背景 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门 ...

  4. Flink 最锋利的武器:Flink SQL 入门和实战带你了解NBA球星数据

    一.Flink SQL 背景 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言. 自 2015 年开始,阿里巴巴开始调研 ...

  5. Flink SQL篇,SQL实操、Flink Hive、CEP、CDC、GateWay

    Flink源码篇,作业提交流程.作业调度流程.作业内部转换流程图 Flink核心篇,四大基石.容错机制.广播.反压.序列化.内存管理.资源管理 Flink基础篇,基本概念.设计理念.架构模型.编程模型 ...

  6. 实时计算 Flink SQL 核心功能解密

    2019独角兽企业重金招聘Python工程师标准>>> 实时计算 Flink SQL 核心功能解密 Flink SQL 是于2017年7月开始面向集团开放流计算服务的.虽然是一个非常 ...

  7. 伍翀:大数据实时计算Flink SQL解密

    [IT168 专稿]本文根据伍翀老师在2018年5月12日[第九届中国数据库技术大会]现场演讲内容整理而成. 讲师简介: 伍翀,阿里巴巴高级研发工程师,花名"云邪",阿里巴巴计算平 ...

  8. 伍翀 :大数据实时计算Flink SQL解密

    [IT168 专稿]本文根据伍翀老师在2018年5月12日[第九届中国数据库技术大会]现场演讲内容整理而成. 讲师简介:  伍翀,阿里巴巴高级研发工程师,花名"云邪",阿里巴巴计算 ...

  9. Flink从入门到精通100篇(十五)-Flink SQL FileSystem Connector 分区提交与自定义小文件合并策略 ​

    前言 本文先通过源码简单过一下分区提交机制的两个要素--即触发(trigger)和策略(policy)的实现,然后用合并小文件的实例说一下自定义分区提交策略的方法. PartitionCommitTr ...

最新文章

  1. exe编辑器_windows下的EXE文件大揭密
  2. 智慧城市新探索:摩拜京东联合利用智能单车数据检测违章停车
  3. 解决ssh远程连接错误问题
  4. CVPR2020十个顶级开源数据集
  5. VSS 数据库地址批量更改器 - VSS Database Changer
  6. CleanCodeHandbook Chapter 4: Binary Tree(25-32)
  7. python计算数组元素个数_python简单获取数组元素个数的方法
  8. 最全面的Fiddler 4教程讲解(界面)
  9. 6,EWF写保护功能介绍与使用
  10. 网站漏洞修复之CSRF跨站攻击
  11. match函数的用法
  12. Wireshark捕获过滤器
  13. 《三体3:死神永生》读后感
  14. RSS概念 以及 POTO周博通 资讯阅读器 使用入门
  15. Android ROM开发(一)——Windows下Cygwin和Android_Kitchen厨房的安装
  16. ABAP 针式打印机横向打印问题
  17. LTE IPV6地址配置
  18. C++的双缓冲队列机制
  19. 范祖红金融消费者权益保护及投诉处理专家
  20. vue2的指令和自定义指令

热门文章

  1. 盘古开源:“大内存”时代即将到来,颠覆性的存储模式
  2. 安装VM虚拟机需要多大内存?
  3. 为了更好地推广威尔士语,政府定制了一套计算机字体
  4. Win10共享文件夹的最简单最管用方法
  5. 运维之思科篇 -----5. NAT及静态转换 、 动态转换及PAT
  6. ctfshow-web829
  7. Team Foundation Server Workgroup Edition的5用户限制其实是防君子不防小人的,可以轻松破解...
  8. 所谓的“人口红利”是什么?白话理解
  9. Android渐进式加载图片,渐进式加载 - 基础讲解
  10. 主题之美,排名靠前的 10 个 VSCode 主题