Checkpoint介绍

checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保 证应用流图状态的一致性。Flink的checkpoint机制原理来自“Chandy-Lamport algorithm”算法。

每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。

CheckpointCoordinator周期性的向该流应用的所有source算子发送barrier。

2.当某个source算子收到一个barrier时,便暂停数据处理过程,然后将自己的当前状 态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告 自己快照制作情况,同时向自身所有下游算子广播该barrier,恢复数据处理

3.下游算子收到barrier之后,会暂停自己的数据处理过程,然后将自身的相关状态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告自身 快照情况,同时向自身所有下游算子广播该barrier,恢复数据处理。

每个算子按照步骤3不断制作快照并向下游广播,直到最后barrier传递到sink算子,快照制作完成。 当CheckpointCoordinator收到所有算子的报告之后,认为该周期的快照制作成功; 否则,如果在规定的时间内没有收到所有算子的报告,则认为本周期快照制作失败

如果一个算子有两个输入源,则暂时阻塞先收到barrier的输入源,等到第二个输入源相 同编号的barrier到来时,再制作自身快照并向下游广播该barrier。具体如下图所示

两个输入源 checkpoint 过程

假设算子C有A和B两个输入源 在第i个快照周期中,由于某些原因(如处理时延、网络时延等)输入源A发出的 barrier先到来,这时算子C暂时将输入源A的输入通道阻塞,仅收输入源B的数据。 当输入源B发出的barrier到来时,算子C制作自身快照并向CheckpointCoordinator报 告自身的快照制作情况,然后将两个barrier合并为一个,向下游所有的算子广播。

当由于某些原因出现故障时,CheckpointCoordinator通知流图上所有算子统一恢复到某 个周期的checkpoint状态,然后恢复数据流处理。分布式checkpoint机制保证了数据仅被 处理一次(Exactly Once)。

持久化存储

目前,Checkpoint持久化存储可以使用如下三种:

MemStateBackend

该持久化存储主要将快照数据保存到JobManager的内存中,仅适合作为测试以及

快照的数据量非常小时使用,并不推荐用作大规模商业部署。

FsStateBackend

该持久化存储主要将快照数据保存到文件系统中,目前支持的文件系统主要是 HDFS和本地文件。如果使用HDFS,则初始化FsStateBackend时,需要传入以 “hdfs://”开头的路径(即: new FsStateBackend("hdfs:///hacluster/checkpoint")), 如果使用本地文件,则需要传入以“file://”开头的路径(即:new FsStateBackend("file:///Data"))。在分布式情况下,不推荐使用本地文件。如果某 个算子在节点A上失败,在节点B上恢复,使用本地文件时,在B上无法读取节点 A上的数据,导致状态恢复失败。

RocksDBStateBackend

RocksDBStatBackend介于本地文件和HDFS之间,平时使用RocksDB的功能,将数 据持久化到本地文件中,当制作快照时,将本地数据制作成快照,并持久化到 FsStateBackend中(FsStateBackend不必用户特别指明,只需在初始化时传入HDFS 或本地路径即可,如new RocksDBStateBackend("hdfs:///hacluster/checkpoint")或new RocksDBStateBackend("file:///Data"))。

如果用户使用自定义窗口(window),不推荐用户使用RocksDBStateBackend。在自 定义窗口中,状态以ListState的形式保存在StatBackend中,如果一个key值中有多 个value值,则RocksDB读取该种ListState非常缓慢,影响性能。用户可以根据应用 的具体情况选择FsStateBackend+HDFS或RocksStateBackend+HDFS。

语法

​ val env = StreamExecutionEnvironment.getExecutionEnvironment() // start a checkpoint every 1000 ms env.enableCheckpointing(1000) // advanced options: // 设置checkpoint的执行模式,最多执行一次或者至少执行一次 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 设置checkpoint的超时时间 env.getCheckpointConfig.setCheckpointTimeout(60000) // 如果在只做快照过程中出现错误,是否让整体任务失败:true是 false不是 env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false) //设置同一时间有多少 个checkpoint可以同时执行 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

​ 例子

需求

假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,然后对统计的结果值进行checkpoint处理

数据规划

使用自定义算子每秒钟产生大约10000条数据。 
 产生的数据为一个四元组(Long,String,String,Integer)—------(id,name,info,count)。 
 数据经统计后,统计结果打印到终端输出。 
 打印输出的结果为Long类型的数据。 
 开发思路

source算子每隔1秒钟发送10000条数据,并注入到Window算子中。 window算子每隔1秒钟统计一次最近4秒钟内数据数量。 每隔1秒钟将统计结果打印到终端 每隔6秒钟触发一次checkpoint,然后将checkpoint的结果保存到HDFS中。

//发送数据形式 case class SEvent(id: Long, name: String, info: String, count: Int)

class SEventSourceWithChk extends RichSourceFunction[SEvent]{ private var count = 0L private var isRunning = true private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" // 任务取消时调用 override def cancel(): Unit = { isRunning = false } source算子的逻辑,即:每秒钟向流图中注入10000个元组 override def run(sourceContext: SourceContext[SEvent]): Unit = { while(isRunning) { for (i

/** 该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使 用了event time。 */ object FlinkEventTimeAPIChkMain { def main(args: Array[String]): Unit ={ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/flink-checkpoint/checkpoint/")) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setCheckpointInterval(6000) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 应用逻辑 val source: DataStream[SEvent] = env.addSource(new SEventSourceWithChk) source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] { // 设置watermark override def getCurrentWatermark: Watermark = { new Watermark(System.currentTimeMillis()) } // 给每个元组打上时间戳 override def extractTimestamp(t: SEvent, l: Long): Long = { System.currentTimeMillis() } }) .keyBy(0) .window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1))) .apply(new WindowStatisticWithChk) .print() env.execute() } }

//该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。 // 用户自定义状态 class UDFState extends Serializable{ private var count = 0L // 设置用户自定义状态 def setState(s: Long) = count = s // 获取用户自定状态 def getState = count }

//该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。 class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{ private var total = 0L

// window算子的实现逻辑,即:统计window中元组的数量 override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = { var count = 0L for (event

// 制作自定义状态快照 override def snapshotState(checkpointId: Long, timestamp: Long): util.List[UDFState] = { val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState] val udfState = new UDFState udfState.setState(total) udfList.add(udfState) udfList } }

flink-SQL

Table API和SQL捆绑在flink-table Maven工件中。必须将以下依赖项添加到你的项目才能使用Table API和SQL:

org.apache.flink

flink-table_2.11

1.5.0

另外,你需要为Flink的Scala批处理或流式API添加依赖项。对于批量查询,您需要添加:

org.apache.flink

flink-scala_2.11

1.5.0

Table API和SQL程序的结构

Flink的批处理和流处理的Table API和SQL程序遵循相同的模式;

所以我们只需要使用一种来演示即可

要想执行flink的SQL语句,首先需要获取SQL的执行环境:

两种方式(batch和streaming):

// *************** // STREAMING QUERY // *************** val sEnv = StreamExecutionEnvironment.getExecutionEnvironment // create a TableEnvironment for streaming queries val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)

// *********** // BATCH QUERY // *********** val bEnv = ExecutionEnvironment.getExecutionEnvironment // create a TableEnvironment for batch queries val bTableEnv = TableEnvironment.getTableEnvironment(bEnv) 通过getTableEnvironment可以获取TableEnviromment;这个TableEnviromment是Table API和SQL集成的核心概念。它负责:

在内部目录中注册一个表 注册外部目录 执行SQL查询 注册用户定义的(标量,表格或聚合)函数 转换DataStream或DataSet成Table 持有一个ExecutionEnvironment或一个参考StreamExecutionEnvironment

在内部目录中注册一个表

TableEnvironment维护一个按名称注册的表的目录。有两种类型的表格,输入表格和输出表格。

输入表可以在Table API和SQL查询中引用并提供输入数据。输出表可用于将表API或SQL查询的结果发送到外部系统

输入表可以从各种来源注册:

现有Table对象,通常是表API或SQL查询的结果。 TableSource,它访问外部数据,例如文件,数据库或消息传递系统。 DataStream或DataSet来自DataStream或DataSet程序。 输出表可以使用注册TableSink。

注册一个表

// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)

// register the Table projTable as table "projectedX" tableEnv.registerTable("projectedTable", projTable)

// Table is the result of a simple projection query val projTable: Table = tableEnv.scan("projectedTable ").select(...) 注册一个tableSource

TableSource提供对存储在诸如数据库(MySQL,HBase等),具有特定编码(CSV,Apache [Parquet,Avro,ORC],...)的文件的存储系统中的外部数据的访问或者消息传送系统(Apache Kafka,RabbitMQ,...)

// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // create a TableSource val csvSource: TableSource = new CsvTableSource("/path/to/file", ...) // register the TableSource as table "CsvTable" tableEnv.registerTableSource("CsvTable", csvSource) 注册一个tableSink

注册TableSink可用于将表API或SQL查询的结果发送到外部存储系统,如数据库,键值存储,消息队列或文件系统(使用不同的编码,例如CSV,Apache [Parquet ,Avro,ORC],...)

// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)

// create a TableSink val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)

// define the field names and types val fieldNames: Array[String] = Array("a", "b", "c") val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)

// register the TableSink as table "CsvSinkTable" tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink) 例子

//创建batch执行环境 val env = ExecutionEnvironment.getExecutionEnvironment //创建table环境用于batch查询 val tableEnvironment = TableEnvironment.getTableEnvironment(env) //加载外部数据 val csvTableSource = CsvTableSource.builder() .path("data1.csv")//文件路径 .field("id" , Types.INT)//第一列数据 .field("name" , Types.STRING)//第二列数据 .field("age" , Types.INT)//第三列数据 .fieldDelimiter(",")//列分隔符,默认是"," .lineDelimiter("\n")//换行符 .ignoreFirstLine()//忽略第一行 .ignoreParseErrors()//忽略解析错误 .build() //将外部数据构建成表 tableEnvironment.registerTableSource("tableA" , csvTableSource) //TODO 1:使用table方式查询数据 val table = tableEnvironment.scan("tableA").select("id , name , age").filter("name == 'lisi'") //将数据写出去 table.writeToSink(new CsvTableSink("bbb" , "," , 1 , FileSystem.WriteMode.OVERWRITE)) //TODO 2:使用sql方式 // val sqlResult = tableEnvironment.sqlQuery("select id,name,age from tableA where id > 0 order by id limit 2") //将数据写出去 // sqlResult.writeToSink(new CsvTableSink("aaaaaa.csv", ",", 1, FileSystem.WriteMode.OVERWRITE)) able和DataStream和DataSet的集成

1:将DataStream或DataSet转换为表格

在上面的例子讲解中,直接使用的是:registerTableSource注册表

对于flink来说,还有更灵活的方式:比如直接注册DataStream或者DataSet转换为一张表。

然后DataStream或者DataSet就相当于表,这样可以继续使用SQL来操作流或者批次的数据

语法:

// get TableEnvironment // registration of a DataSet is equivalent Env:DataStream val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// register the DataStream as Table "myTable" with fields "f0", "f1" tableEnv.registerDataStream("myTable", stream) 例子

object SQLToDataSetAndStreamSet { def main(args: Array[String]): Unit = {

// set up execution environment

val env = StreamExecutionEnvironment.getExecutionEnvironment

val tEnv = TableEnvironment.getTableEnvironment(env)

//构造数据

val orderA: DataStream[Order] = env.fromCollection(Seq(

Order(1L, "beer", 3),

Order(1L, "diaper", 4),

Order(3L, "rubber", 2)))

val orderB: DataStream[Order] = env.fromCollection(Seq(

Order(2L, "pen", 3),

Order(2L, "rubber", 3),

Order(4L, "beer", 1)))

// 根据数据注册表

tEnv.registerDataStream("OrderA", orderA)

tEnv.registerDataStream("OrderB", orderB)

// union the two tables

val result = tEnv.sqlQuery(

"SELECT * FROM OrderA WHERE amount > 2 UNION ALL " +

"SELECT * FROM OrderB WHERE amount < 2")

result.writeToSink(new CsvTableSink("ccc" , "," , 1 , FileSystem.WriteMode.OVERWRITE))

env.execute()

} } case class Order(user: Long, product: String, amount: Int)

将表转换为DataStream或DataSet

A Table可以转换成a DataStream或DataSet。通过这种方式,可以在Table API或SQL查询的结果上运行自定义的DataStream或DataSet程序

1:将表转换为DataStream

有两种模式可以将 Table转换为DataStream:

1:Append Mode

将一个表附加到流上

2:Retract Mode

将表转换为流

语法格式:

// get TableEnvironment. // registration of a DataSet is equivalent // ge val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table with two fields (String name, Integer age) val table: Table = ...

// convert the Table into an append DataStream of Row val dsRow: DataStream[Row] = tableEnv.toAppendStreamRow

// convert the Table into an append DataStream of Tuple2[String, Int] val dsTuple: DataStream[(String, Int)] dsTuple = tableEnv.toAppendStream(String, Int)

// convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream[(Boolean, X)]. // The boolean field indicates the type of the change. // True is INSERT, false is DELETE. val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStreamRow 例子:

object TableTODataSet_DataStream { def main(args: Array[String]): Unit = { //构造数据,转换为table val data = List( Peoject(1L, 1, "Hello"), Peoject(2L, 2, "Hello"), Peoject(3L, 3, "Hello"), Peoject(4L, 4, "Hello"), Peoject(5L, 5, "Hello"), Peoject(6L, 6, "Hello"), Peoject(7L, 7, "Hello World"), Peoject(8L, 8, "Hello World"), Peoject(8L, 8, "Hello World"), Peoject(20L, 20, "Hello World"))

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

val tEnv = TableEnvironment.getTableEnvironment(env)

val stream = env.fromCollection(data)

val table: Table = tEnv.fromDataStream(stream)

//TODO 将table转换为DataStream----[数控等离子切割机](http://www.158cnc.com)[http://www.158cnc.com](http://www.158cnc.com)将一个表附加到流上Append Mode

val appendStream: DataStream[Peoject] = tEnv.toAppendStream[Peoject](table)

//TODO 将表转换为流Retract Mode true代表添加消息,false代表撤销消息

val retractStream: DataStream[(Boolean, Peoject)] = tEnv.toRetractStream[Peoject](table)

retractStream.print()

env.execute()

} }

case class Peoject(user: Long, index: Int, content: String)

将表转换为DataSet

语法格式

// get TableEnvironment // registration of a DataSet is equivalent val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table with two fields (String name, Integer age) val table: Table = ...

// convert the Table into a DataSet of Row val dsRow: DataSet[Row] = tableEnv.toDataSetRow

// convert the Table into a DataSet of Tuple2[String, Int] val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet(String, Int) 例子:

case class Peoject(user: Long, index: Int, content: String)

object TableTODataSet{ def main(args: Array[String]): Unit = {

//构造数据,转换为table

val data = List(

Peoject(1L, 1, "Hello"),

Peoject(2L, 2, "Hello"),

Peoject(3L, 3, "Hello"),

Peoject(4L, 4, "Hello"),

Peoject(5L, 5, "Hello"),

Peoject(6L, 6, "Hello"),

Peoject(7L, 7, "Hello World"),

Peoject(8L, 8, "Hello World"),

Peoject(8L, 8, "Hello World"),

Peoject(20L, 20, "Hello World"))

//初始化环境,加载table数据

val env = ExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

val tableEnvironment = TableEnvironment.getTableEnvironment(env)

val collection: DataSet[Peoject] = env.fromCollection(data)

val table: Table = tableEnvironment.fromDataSet(collection)

//TODO 将table转换为dataSet

val toDataSet: DataSet[Peoject] = tableEnvironment.toDataSet[Peoject](table)

toDataSet.print()

// env.execute() } }

dataset存入mysql_dataset保存到数据库相关推荐

  1. 文件路径存入mysql_网站的文件的上传,并将相对路径保存到数据库的代码实现。...

    如果网站使用的是struts2框架,那么上传功能可以这样做. 网站的建设过程中免不了要上传文件,比如管理员上传图片到服务器上,然后将文件的相对路径保存在数据库. 这里为什么说是相对路径保存在数据库,因 ...

  2. android 保存数据到setting中_文章如何保存在数据库中

    当我们打开央视新闻网站时,能看到很多丰富多彩的文章,那么这些文章是怎么保存在数据库中的呢?或者说要怎样能实现类似的效果? 如果你了解过网页的话,应该知道通过html的p.ul.img.div等元素的合 ...

  3. oracle数据库数据消失,,保存在数据库里的数据莫名其妙的消失

    求助,保存在数据库里的数据莫名其妙的消失 我做了一个批量修改的功能,数据是肯定存到数据库里了,提交给测试部测试也没什么BUG,可是当有别的classes文件替换之后 重启服务器,我之前修改的数据就会莫 ...

  4. Scrapy爬取网页并保存到数据库中

    Scrapy爬取网页并保存到数据库中一.新建一个Scrapy工程.进入一个你想用来保存代码的文件夹,然后执行: T:\>scrapy startproject fjsen 会生成一堆文件夹和文件 ...

  5. python-scapy爬取mooc网保存在数据库中并下载图片

    爬取的步骤 - 确定url地址; - 获取页面信息;(urllib, requests); - 解析页面提取需要的数据: (正则表达式, bs4, xpath) - 保存到本地(csv, json, ...

  6. php存数组到数据库,PHP将数组保存到数据库

    在 PHP开发中,最常用的数据类型算是字符串和数组了,且数组类型的数据通常需要和数据库进行交互,尤其是对于结构化的数据. ​ 在很多时候,我们需要把数字保存到数据库,实现对于结构化数据的直接存储以及读 ...

  7. ASP.NET将Session保存到数据库中

    因为ASP.NET中Session的存取机制与ASP相同,都是保存在进行中, 一旦进程崩溃,所有Session信息将会丢失,所以我采取了将Session信息保存到SQL Server中,尽管还有其它的 ...

  8. php图片保存在mysql_php实现上传图片保存到数据库的方法

    php实现上传图片保存到数据库的方法.分享给大家供大家参考.具体分析如下: php 上传图片,一般都使用move_uploaded_file方法保存在服务器上.但如果一个网站有多台服务器,就需要把图片 ...

  9. python读取数据库导出文件_Python 获取 datax 执行结果保存到数据库的方法

    执行 datax 作业,创建执行文件,在 crontab 中每天1点(下面有关系)执行: 其中 job_start 及 job_finish 这两行记录是自己添加的,为了方便识别出哪张表. #!/bi ...

  10. C# 文件保存到数据库中或者从数据库中读取文件

    首先,介绍一下保存文件到数据库中. 将文件保存到数据库中,实际上是将文件转换成二进制流后,将二进制流保存到数据库相应的字段中.在SQL Server中该字段的数据类型是Image,在Access中该字 ...

最新文章

  1. 第一篇:时间和全局状态
  2. UVa122-Trees on the level
  3. SpringMVC之Http标准的头部信息
  4. boost::mpi模块对 all_gather() 集体的测试
  5. mycat 资料汇总
  6. java中的static类_再议Java中的static关键字
  7. python将文字转换为语音_python实现将文本转换成语音
  8. excel 第六次人口普查_第六次全国人口普查表短表
  9. VBV缓冲区大小-MPEG2规范
  10. RestTemplate的No instances available for xxx
  11. ubuntu14上nvidia 1080和 titan xp 驱动安装踩的坑
  12. 概率统计Python计算:全概率公式
  13. 解决SELECT list is not in GROUP BY clause and contains nonaggregated column..
  14. sas 导入csv文件_sas导入txt、csv文件方法
  15. Mac Mojave10.14安装vmvare Fusion 11.0.0 win8 镜像
  16. Java 在线反编译
  17. Unity插件介绍:Flux的上手教程
  18. 重采样、下采样、上采样三者之间的关系
  19. element ui 排课_新高考走班排课软件哪家最好?
  20. go每日新闻(2021-09-23)——Go 微服务框架对比

热门文章

  1. JS前台页面获取值的技巧
  2. 怎样在 Ubuntu Unity Dash 添加关机、重启选项
  3. 挨踢部落故事汇(2):机缘所致转型之路
  4. JSTL 核心标签库 使用(C标签)
  5. 解决Flex4 发布后访问 初始化极其缓慢的问题
  6. 企业园区全面安防面临的问题及解决之道
  7. 《Java程序设计语言(第4版)》阅读笔记(1)
  8. dispatch js实现_js实现对象自定义事件,触发,on监听事件的方式
  9. 中国石油大学c语言程序设计答案,中国石油大学《C语言程序设计》期末复习题和答案.doc...
  10. ios mysql 创建不同的用户表_移动端iOS系统数据库之Realm(二)表的创建增删改查(多表)...