checkpoint是什么?

spark, flink, tensorflow都有对应的checkpoint机制

Jupyter Notebook也有自动的checkpoint机制

SQL Server,Oracle等数据库其实也有Checkpoint

Checkpoint和快照都是一种应对容错和可靠的方法

checkpoint与持久化有什么关系?

checkpoint是什么?

(1)、Spark 在生产环境下经常会面临transformation的RDD非常多(例如一个Job中包含1万个RDD)或者具体transformation的RDD本身计算特别复杂或者耗时(例如计算时长超过1个小时),这个时候就要考虑对计算结果数据持久化保存;

(2)、Spark是擅长多步骤迭代的,同时擅长基于Job的复用,这个时候如果能够对曾经计算的过程产生的数据进行复用,就可以极大的提升效率;

(3)、如果采用persist把数据放在内存中,虽然是快速的,但是也是最不可靠的;如果把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏,系统管理员可能清空磁盘。

(4)、Checkpoint的产生就是为了相对而言更加可靠的持久化数据,在Checkpoint的时候可以指定把数据放在本地,并且是多副本的方式,但是在生产环境下是放在HDFS上,这就天然的借助了HDFS高容错、高可靠的特征来完成了最大化的可靠的持久化数据的方式;

假如进行一个1万个算子操作,在9000个算子的时候persist,数据还是有可能丢失的,但是如果checkpoint,数据丢失的概率几乎为0。

11.checkpoint原理机制

(1)当RDD使用cache机制从内存中读取数据,如果数据没有读到,会使用checkpoint机制读取数据。此时如果没有checkpoint机制,那么就需要找到父RDD重新计算数据了,因此checkpoint是个很重要的容错机制。checkpoint就是对于一个RDD chain(链)如果后面需要反复使用某些中间结果RDD,可能因为一些故障导致该中间数据丢失,那么就可以针对该RDD启动checkpoint机制,使用checkpoint首先需要调用sparkContext的setCheckpoint方法,设置一个容错文件系统目录,比如hdfs,然后对RDD调用checkpoint方法。之后在RDD所处的job运行结束后,会启动一个单独的job来将checkpoint过的数据写入之前设置的文件系统持久化,进行高可用。所以后面的计算在使用该RDD时,如果数据丢失了,但是还是可以从它的checkpoint中读取数据,不需要重新计算。

(2)persist或者cache与checkpoint的区别在于,前者持久化只是将数据保存在BlockManager中但是其lineage是不变的,但是后者checkpoint执行完后,rdd已经没有依赖RDD,只有一个checkpointRDD,checkpoint之后,RDD的lineage就改变了。persist或者cache持久化的数据丢失的可能性更大,因为可能磁盘或内存被清理,但是checkpoint的数据通常保存到hdfs上,放在了高容错文件系统。

本文主要简述spark checkpoint机制,快速把握checkpoint机制的来龙去脉,关于源码方面可以看参考文章。

1、Spark core的checkpoint

1)为什么checkpoint?

分布式计算中难免因为网络,存储等原因出现计算失败的情况,RDD中的lineage信息常用来在task失败后重计算使用,为了防止计算失败后从头开始计算造成的大量开销,RDD会checkpoint计算过程的信息,这样作业失败后从checkpoing点重新计算即可,提高效率。

2)什么时候写checkpoint数据?

当RDD的action算子触发计算结束后会执行checkpoint。

在spark streaming中每generate一个batch的RDD也会触发checkpoint操作。

3)什么时候读checkpoint数据?

task计算失败的时候会从checkpoint读取数据进行计算。

4)checkpoint具体实现有哪些?

其实现分两种

LocalRDDCheckpointData:临时存储在本地executor的磁盘和内存上(不能仅使用内存,因为内存的eviction机制可能造成data loss)。该实现的特点是比较快,适合lineage信息需要经常被删除的场景(如GraphX),可容忍executor挂掉。

ReliableRDDCheckpointData:存储在外部可靠存储(如hdfs),可以达到容忍driver 挂掉情况。虽然效率没有存储本地高,但是容错级别最好。

如果代码中没有设置checkpoint,则使用local的checkpoint模式,如果设置路径,则使用reliable的checkpoint模式

2、spark streaming的checkpoint

spark streaming有一个单独的线程CheckpointWriteHandler,每generate一个batch interval的RDD数据都会触发checkpoint操作。

对于kafka的DirectKafkaInputDStreamCheckpointData,实质是重写DStreamCheckpointData的update和restore方法,这样checkpoint的数据就是topic,partition,fromOffset和untilOffset。

参考:

spark源码分析之Checkpoint的过程

1.检查点

保存模型并不限于在训练模型后,在训练模型之中也需要保存,因为TensorFlow训练模型时难免会出现中断的情况,我们自然希望能够将训练得到的参数保存下来,否则下次又要重新训练。

这种在训练中保存模型,习惯上称之为保存检查点。

2.添加保存点

通过添加检查点,可以生成载入检查点文件,并能够指定生成检查文件的个数,例如使用saver的另一个参数——max_to_keep=1,表明最多只保存一个检查点文件,在保存时使用如下的代码传入迭代次数。

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import ostrain_x = np.linspace(-5, 3, 50)
train_y = train_x * 5 + 10 + np.random.random(50) * 10 - 5plt.plot(train_x, train_y, 'r.')
plt.grid(True)
plt.show()tf.reset_default_graph()X = tf.placeholder(dtype=tf.float32)
Y = tf.placeholder(dtype=tf.float32)w = tf.Variable(tf.random.truncated_normal([1]), name='Weight')
b = tf.Variable(tf.random.truncated_normal([1]), name='bias')z = tf.multiply(X, w) + bcost = tf.reduce_mean(tf.square(Y - z))
learning_rate = 0.01
optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cost)init = tf.global_variables_initializer()training_epochs = 20
display_step = 2saver = tf.train.Saver(max_to_keep=15)
savedir = "model/"if __name__ == '__main__':with tf.Session() as sess:sess.run(init)loss_list = []for epoch in range(training_epochs):for (x, y) in zip(train_x, train_y):sess.run(optimizer, feed_dict={X: x, Y: y})if epoch % display_step == 0:loss = sess.run(cost, feed_dict={X: x, Y: y})loss_list.append(loss)print('Iter: ', epoch, ' Loss: ', loss)w_, b_ = sess.run([w, b], feed_dict={X: x, Y: y})saver.save(sess, savedir + "linear.cpkt", global_step=epoch)print(" Finished ")print("W: ", w_, " b: ", b_, " loss: ", loss)plt.plot(train_x, train_x * w_ + b_, 'g-', train_x, train_y, 'r.')plt.grid(True)plt.show()load_epoch = 10with tf.Session() as sess2:sess2.run(tf.global_variables_initializer())saver.restore(sess2, savedir + "linear.cpkt-" + str(load_epoch))print(sess2.run([w, b], feed_dict={X: train_x, Y: train_y}))

在上述的代码中,我们使用saver.save(sess, savedir + "linear.cpkt", global_step=epoch)将训练的参数传入检查点进行保存,saver = tf.train.Saver(max_to_keep=1)表示只保存一个文件,这样在训练过程中得到的新的模型就会覆盖以前的模型。

cpkt = tf.train.get_checkpoint_state(savedir)
if cpkt and cpkt.model_checkpoint_path:saver.restore(sess2, cpkt.model_checkpoint_path)kpt = tf.train.latest_checkpoint(savedir)
saver.restore(sess2, kpt)

上述的两种方法也可以对checkpoint文件进行加载,tf.train.latest_checkpoint(savedir)为加载最后的检查点文件。这种方式,我们可以通过保存指定训练次数的检查点,比如保存5的倍数次保存一下检查点。

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import ostrain_x = np.linspace(-5, 3, 50)
train_y = train_x * 5 + 10 + np.random.random(50) * 10 - 5# plt.plot(train_x, train_y, 'r.')
# plt.grid(True)
# plt.show()tf.reset_default_graph()X = tf.placeholder(dtype=tf.float32)
Y = tf.placeholder(dtype=tf.float32)w = tf.Variable(tf.random.truncated_normal([1]), name='Weight')
b = tf.Variable(tf.random.truncated_normal([1]), name='bias')z = tf.multiply(X, w) + bcost = tf.reduce_mean(tf.square(Y - z))
learning_rate = 0.01
optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cost)init = tf.global_variables_initializer()training_epochs = 30
display_step = 2global_step = tf.train.get_or_create_global_step()step = tf.assign_add(global_step, 1)saver = tf.train.Saver()savedir = "check-point/"if __name__ == '__main__':with tf.train.MonitoredTrainingSession(checkpoint_dir=savedir + 'linear.cpkt', save_checkpoint_secs=5) as sess:sess.run(init)loss_list = []for epoch in range(training_epochs):sess.run(global_step)for (x, y) in zip(train_x, train_y):sess.run(optimizer, feed_dict={X: x, Y: y})if epoch % display_step == 0:loss = sess.run(cost, feed_dict={X: x, Y: y})loss_list.append(loss)print('Iter: ', epoch, ' Loss: ', loss)w_, b_ = sess.run([w, b], feed_dict={X: x, Y: y})sess.run(step)print(" Finished ")print("W: ", w_, " b: ", b_, " loss: ", loss)plt.plot(train_x, train_x * w_ + b_, 'g-', train_x, train_y, 'r.')plt.grid(True)plt.show()load_epoch = 10with tf.Session() as sess2:sess2.run(tf.global_variables_initializer())# saver.restore(sess2, savedir + 'linear.cpkt-' + str(load_epoch))# cpkt = tf.train.get_checkpoint_state(savedir)# if cpkt and cpkt.model_checkpoint_path:#     saver.restore(sess2, cpkt.model_checkpoint_path)#kpt = tf.train.latest_checkpoint(savedir + 'linear.cpkt')saver.restore(sess2, kpt)print(sess2.run([w, b], feed_dict={X: train_x, Y: train_y}))

3.简便保存检查点

我们还可以用更加简单的方法进行检查点的保存,tf.train.MonitoredTrainingSession()函数,该函数可以直接实现保存载入检查点模型的文件,与前面的方法不同的是,它是按照训练时间来保存检查点的,可以通过指定save_checkpoint_secs参数的具体秒数,设置多久保存一次检查点。

上述的代码中,我们设置了没训练了5秒中之后,就保存一次检查点,它默认的保存时间间隔是10分钟,这种按照时间的保存模式更适合使用大型数据集训练复杂模型的情况,注意在使用上述的方法时,要定义global_step变量,在训练完一个批次或者一个样本之后,要将其进行加1的操作,否则将会报错。

参考:spark checkpoint机制简述

参考:TensorFlow——Checkpoint为模型添加检查点

参考:spark笔记之RDD容错机制之checkpoint

参考:

checkpoint是什么?相关推荐

  1. 2021年大数据Flink(二十七):Flink 容错机制 Checkpoint

    目录 Flink 容错机制 Checkpoint State Vs Checkpoint Checkpoint执行流程 简单流程 复杂流程 State状态后端/State存储介质 MemStateBa ...

  2. 2021年大数据Spark(十八):Spark Core的RDD Checkpoint

    目录 RDD Checkpoint 引入 API 代码演示 总结:持久化和Checkpoint的区别 问题: 答案: 区别: RDD Checkpoint 引入 RDD 数据可以持久化,但是持久化/缓 ...

  3. oracle rodo 查看大小,Checkpoint not complete故障

    Checkpoint not complete故障 一.故障现象: ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ...

  4. 如何为Keras中的深度学习模型建立Checkpoint

    深度学习模式可能需要几个小时,几天甚至几周的时间来训练. 如果运行意外停止,你可能就白干了. 在这篇文章中,你将会发现在使用Keras库的Python训练过程中,如何检查你的深度学习模型 Checkp ...

  5. tensorflow checkpoint文件

    tensorflow中的检查点checkpoint详解(二)--以tensorflow1.x 的模型保存与恢复为主_MIss-Y的博客-CSDN博客

  6. The number of Oracle redo threads (2) is not the same as the number of checkpoint threads (1)

    单实例数据库多日志线程,搭建ogg,启动抽取进程时提示一下错误: ERROR   OGG-00446  Oracle GoldenGate Capture for Oracle, extjms.prm ...

  7. oracle checkpoint

    Oracle checkpoint是什么? Oracle检查点:检查点是一个事件,是一些动作 而SCN则是oracle数据库里的时钟,是吗? SCN,CHECKPOINT,REDO ENTRY,COM ...

  8. SparkStreaming使用checkpoint存在的问题及解决方案

    SparkStreaming使用checkpoint存在的问题及解决方案 参考文章: (1)SparkStreaming使用checkpoint存在的问题及解决方案 (2)https://www.cn ...

  9. save_path is not a valid checkpoint

    save_path is not a valid checkpoint 这句代码还是获取模型的字符串路径: aaa = tf.train.latest_checkpoint(model_path) 自 ...

最新文章

  1. 剑指offer:整数中1出现的次数(从1到n整数中1出现的次数)
  2. Konstrukt PHP REST框架 教程二
  3. 听说你想去大厂看妹子,带你看看字节跳动实习算法岗面试长啥样?
  4. spring调用存储过程
  5. stream 提取某字段_java8从list集合中取出某一属性的值的集合案例
  6. [密码学基础][每个信息安全博士生应该知道的52件事][Bristol52]51.基于ID的加密安全模型,描述IBE方案
  7. 仅展示近三天的动态设置_抱歉,朋友仅展示最近三天的朋友圈
  8. Use __doPostBack() As A Bridge
  9. [转载]仿射变换(Affine Transformation)
  10. LoadRunner 11简单使用
  11. 2019“我爱北京——市民新春联欢会”将现300人大合唱
  12. 关于MySql使用索引的分析
  13. win7显示500服务器错误,搞定win10系统提示http500内部服务器错误的解决步骤
  14. 谈谈一只菜鸟转行Erlang游戏服务端的经历(希望大佬指导,也希望我的经历能给一些还未毕业的同学或者正在迷茫自己工作内容的同学一些感触)
  15. 【我Linux服务器被ddos了】记一次ddos防御+溯源+反击
  16. 微信小程序:页面路由
  17. [工具使用]SqlMap
  18. Prince and Princess UVA - 10635
  19. 区块链项目如何开发?区块链APP和DAPP怎么做?
  20. 硬盘无法格式化及RAW格式的另一种处理方法

热门文章

  1. 前端基础入门(html+css+详)
  2. 01初识JavaScript
  3. 第二节 线程启动、结束、创建线程多个方法、join()、detach()
  4. python3 collections模块_Python3之内建模块collections
  5. 夜间奇异规范:夜间高效自监督单目深度估计(ICCV 2021)
  6. 无标定物体环境下,高分辨率雷达与相机的像素级外参标定
  7. Deepfit: 通过神经网络加权最小二乘法进行3D表面拟合
  8. React history.push 传递参数
  9. Nat. Commun. | 多层生物分子网络的鲁棒性研究
  10. Failure [DELETE_FAILED_INTERNAL_ERROR]的解决办法