Cannot overwrite a path that is also being read from.

这个错看起来很简单。代码简化为

Dataset<Row> selectBefore = session.sql("select * from table1")

Dataset<Row> dataset2 = session.createDataset(list,xx.class)

大概就是获取表里的原始数据,然后从别的地方搞来的新数据两个合起来继续存到表里去

selectBefore
.union(dataset)
.write()
.mode(SaveMode.Overwrite)
.format("hive")
.insertInto(Constant.ORDERONLINE_TABLE);

为啥不用append 因为有时候会重复调用。。 反正就是这么个情况。就是要先查再插入。

为什么报错?

Spark SQL在雪球的实践 - 腾讯云开发者社区-腾讯云

 解决办法

第一个解决办法真不行。我查了下这两个参数大多是解决spark读取hive表数据量不对的情况用的。而且我设置之后还有报错就不贴出来了

spark sql读取不到orc格式hive表数据问题_Java小田的博客-CSDN博客_spark读取不到hive表

第二个 确实可以。

session.sparkContext().setCheckpointDir("/tmp/spark/job/OrderOnlineSparkJob");

Dataset<Row> selectBefore = session.sql("select * from table1").checkpoint();

第三个这种lowb方法就不说了

找个时间好好学习下spark的checkpoint知识。

备注下:这种方式有个弊端,checkpoint 在hdfs的目录不是一定会删除的,经过百度,说到GC的时候才会删除,还有什么弱引用。给两个解决办法

1..config("spark.cleaner.referenceTracking.cleanCheckpoints", "true") 这个就是checkpoint的清理线程去清理,但是不一定完全有用,建议大家可以试下System.gc(); 手动触发下。

2.搞个脚本跑完spark后 hdfs dfs -rm -r /checkpoint/*

___________________________________________________________________________

2022-11-15更新。为什么checkpoint一下就可以了?

我们稍微百度下,spark.checkpoint的作用?

spark checkpoint详解 - 超级核弹头 - 博客园

简单来说,

1.截断血缘关系,避免rdd从头开始继续计算,解决链路过长的问题

2.上面说的云里雾里,那就看这里,就是把数据存到了hdfs的目录。可以从磁盘文件去恢复数据

那为啥没有checkpoint就不能插入读取的表?

之前看到的一篇文章说了,spark的逻辑是没有临时目录,hive是有临时目录的。

所以这里就很清楚了

没有checkpoint= 读自己目录,写自己目录  报错

有了checkpoint=读hdfs的checkpoint dir,写自己目录,所以是可以的

——————————————————————————————————————————

稍微分析下源码

上图中internalRdd是我们查出来的dataset然后toRdd 在copy,简单来说就是复制了一份,

reliableCheckpoint默认是true。 这个一直是true,应该就是你的checkpoint是否可靠,比如存到hdfs就可靠,你存到磁盘肯定不可靠。。。

接着看

internalRdd.checkpoint()和 internalRdd.count()

上图看你的checkpointDir 设置没有,没有就报错

最后new ReliableRDDCheckpointData(this)->new ReliableRDDCheckpointData(internalRdd)

我们再看ReliableRDDCheckpointData 类

这里只是new了这个对象。并没有存储rdd

实际是在后面的action后调用该doCheckpoint方法

Materialize this RDD and write its content to a reliable DFS. This is called immediately after the first action invoked on this RDD has completed.

将rdd物化也就是存储,将它的内容写到可靠的dfs上,这个在第一个action执行后会被立马调用!

val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)

这里可以看到说明是啥,不是把原来的rdd返回了,是返回了一个新的rdd!!!!所以我们这个时候使用的rdd不是我们spark读取的rdd了。

这个时候我们再看下我之前提到的问题,spark.cleaner.referenceTracking.cleanCheckpoints这个参数有什么用?

这里设置了true,看代码不就是把rdd的cleaner遍历,然后去清理checkpoint目录吗?为什么有时候不起作用?(我跑了十次大概清理了1-2次,最后hdfs checkpoint目录大概有1G的大小,如果数据特别大,这个空间不就浪费了?)

继续研究下,这个cleaner是啥?

  可以看到 spark.cleaner.referenceTracking 这个设置为true了会new一个 默认为true。然后foreach start。

这个代码就有意思了。。。。

cleaningThread.setDaemon(true)//设置为守护线程,主线程没了它也没了
cleaningThread.setName("Spark Context Cleaner")//加个名字不重要。
cleaningThread.start() //start 方法 肯定要看
periodicGCService.scheduleAtFixedRate(new Runnable { override def run(): Unit = System.gc()
}, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)

//这个搞了一个定时调度的一个东西,仔细看这个是啥

periodicGCInterval=30min periodicGCInterval=30min

意思是啥?就是每隔30min来次System.GC来清除弱引用。这就是为什么有时候checkpoint会自己删 有时候不删了。

这段代码主要两个是

1.开启线程清理

2.开启定时GC

cleaningThread是什么呢?

private val cleaningThread = new Thread() { override def run() { keepCleaning() }}

cleaningThread.start()就是keepCleaning对吧。

//开启清理线程,清理没有引用的checkpoint boradcast rdd
//怎么清理的?referenceQueue这个引用队列里获取的。
/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {while (!stopped) {//差不多就是while(true)了try {val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)).map(_.asInstanceOf[CleanupTaskWeakReference])//原来是这里!!!// Synchronize here to avoid being interrupted on stop()synchronized {reference.foreach { ref =>logDebug("Got cleaning task " + ref.task)referenceBuffer.remove(ref)ref.task match {case CleanRDD(rddId) =>doCleanupRDD(rddId, blocking = blockOnCleanupTasks)case CleanShuffle(shuffleId) =>doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)case CleanBroadcast(broadcastId) =>doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)case CleanAccum(accId) =>doCleanupAccum(accId, blocking = blockOnCleanupTasks)case CleanCheckpoint(rddId) => //!!!!清理checkpoint目录doCleanCheckpoint(rddId)}}}} catch {case ie: InterruptedException if stopped => // ignorecase e: Exception => logError("Error in cleaning thread", e)}}
}

——————————————————————————————————

/*** Clean up checkpoint files written to a reliable storage.* Locally checkpointed files are cleaned up separately through RDD cleanups.*/
def doCleanCheckpoint(rddId: Int): Unit = {try {logDebug("Cleaning rdd checkpoint data " + rddId)ReliableRDDCheckpointData.cleanCheckpoint(sc, rddId)listeners.asScala.foreach(_.checkpointCleaned(rddId))logInfo("Cleaned rdd checkpoint data " + rddId)}catch {case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e)}
}

———————————————清理hdfs目录———————————————

/** Clean up the files associated with the checkpoint data for this RDD. */
def cleanCheckpoint(sc: SparkContext, rddId: Int): Unit = {checkpointPath(sc, rddId).foreach { path =>path.getFileSystem(sc.hadoopConfiguration).delete(path, true)}
}

看着好像没问题为啥没有清理呢? 能力有限查不出来。。。但是可以借鉴别人的,别人比我写的好多了。

Spark ContextCleaner及checkpoint的clean机制分析 - 知乎

———————————————继续学习弱引用————————————————————

不清楚弱引用的可以看这个文章

Java中弱引用的概念和作用是什么 - 编程语言 - 亿速云

之前的文章分析到 checkpoint目录没有被删除是因为弱引用的问题。弱引用假装已经很熟练了。那么继续源码分析。

ContextCleaner类

//搞了一个引用队列,存放引用对象

private val referenceQueue = new ReferenceQueue[AnyRef]
/** Register an RDD for cleanup when it is garbage collected. */
def registerRDDForCleanup(rdd: RDD[_]): Unit = {registerForCleanup(rdd, CleanRDD(rdd.id))
}def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = {registerForCleanup(a, CleanAccum(a.id))
}/** Register a ShuffleDependency for cleanup when it is garbage collected. */
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit = {registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
}/** Register a Broadcast for cleanup when it is garbage collected. */
def registerBroadcastForCleanup[T](broadcast: Broadcast[T]): Unit = {registerForCleanup(broadcast, CleanBroadcast(broadcast.id))
}/** Register a RDDCheckpointData for cleanup when it is garbage collected. */
def registerRDDCheckpointDataForCleanup[T](rdd: RDD[_], parentId: Int): Unit = {registerForCleanup(rdd, CleanCheckpoint(parentId))
}/** Register an object for cleanup. */
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
}

这里多复制点。可以看到acc rdd RDDCheckpointData 等都是调用的registerForCleanup这个方法。最后都是变成了 new CleanupTaskWeakReference(task,acc/rdd/checkpoint,引用队列)

CleanupTaskWeakReference

/*** A WeakReference associated with a CleanupTask.** When the referent object becomes only weakly reachable, the corresponding* CleanupTaskWeakReference is automatically added to the given reference queue.*/
private class CleanupTaskWeakReference(val task: CleanupTask,referent: AnyRef,referenceQueue: ReferenceQueue[AnyRef])extends WeakReference(referent, referenceQueue)

至此我们分析下checkpoint的流程。

rdd.checkpoint()->

new ReliableRDDCheckpointData(this)

ReliableRDDCheckpointData.checkpointPath(rdd.context, rdd.id)//确定checkpoint路径

spark.cleaner.referenceTracking.cleanCheckpoints=TURE会将当前rddCheckpoint放到引用队列里,

最开始我们spark代码里new SparkContext的时候

会根据spark.cleaner.referenceTracking=true去 new ContextCleaner()然后调用这个的start方法

而这个ContextCleaner的start方法开启了两个线程

一个不停的看队列里是否有需要清除弱引用对象

一个定时去GC

经过我测试 弱引用对象什么时候会被清理?

参考文章

referenceQueue用法_gmHappy的博客-CSDN博客_referencequeue

    private void test2() throws InterruptedException {ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();Object value = new Object();Map<Object, Object> map = new HashMap<>();for (int i = 0; i < 10; i++) {WeakReference<String> weakReference = new WeakReference<String>(String.valueOf(i), referenceQueue);System.out.println("创造了:"+weakReference+",value="+weakReference.get());map.put(weakReference, value);}System.gc();Thread.sleep(100);WeakReference<String> result=null;int cnt = 0;while ((result=(WeakReference)referenceQueue.poll())!=null){System.out.println((cnt++) + "回收了:" + result+",value="+result.get());}}
-----------
创造了:java.lang.ref.WeakReference@383534aa,value=0
创造了:java.lang.ref.WeakReference@6bc168e5,value=1
创造了:java.lang.ref.WeakReference@7b3300e5,value=2
创造了:java.lang.ref.WeakReference@2e5c649,value=3
创造了:java.lang.ref.WeakReference@136432db,value=4
0回收了:java.lang.ref.WeakReference@6bc168e5,value=null
1回收了:java.lang.ref.WeakReference@2e5c649,value=null
2回收了:java.lang.ref.WeakReference@7b3300e5,value=null
3回收了:java.lang.ref.WeakReference@136432db,value=null
4回收了:java.lang.ref.WeakReference@383534aa,value=null
    private void testBigObjectWithoutGC() throws InterruptedException {int _1M = 1024 * 1024;ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();Thread thread = new Thread(() -> {try {int cnt = 0;WeakReference<byte[]> k;while ((k = (WeakReference) referenceQueue.remove()) != null) {System.out.println((cnt++) + "回收了:" + k);}} catch (InterruptedException e) {// 结束循环}});thread.setDaemon(true);thread.start();Object value = new Object();Map<Object, Object> map = new HashMap<>();for (int i = 0; i < 10000; i++) {byte[] bytes = new byte[_1M];WeakReference<byte[]> weakReference = new WeakReference<byte[]>(bytes, referenceQueue);map.put(weakReference, value);}System.out.println("map.size->" + map.size());}
--------
8342回收了:java.lang.ref.WeakReference@67a3bd51
8343回收了:java.lang.ref.WeakReference@2cec704c
8344回收了:java.lang.ref.WeakReference@bd1111a
8345回收了:java.lang.ref.WeakReference@5918c260
8346回收了:java.lang.ref.WeakReference@7fc7c4a
8347回收了:java.lang.ref.WeakReference@9d3c67
注意这里没有了。没有回收10000个对象 只回收了8000多个。。有时候9000多。

经过测试发现

需要使用

System.gc(); //触发GC 但是GC没有真正开始

Thread.sleep(1000);//给个时间让GC去执行。

或者

你队列里放的对象都很大,并且数量也不少 这个时候会主动触发GC

这样弱引用对象就会被清除了。。。所以我们之前如何清理checkpoint目录呢?

spark.cleaner.referenceTracking.cleanCheckpoints true

System.gc();+ Thread.sleep(1000); 也不会特别影响spark的任务。我就懒得测试了。。。各位成功了留个言让我知道下。

spark报错:Cannot overwrite a path that is also being read from.相关推荐

  1. Spark 中 JVM 内存使用及配置详情、spark报错与调优、Spark内存溢出OOM异常

    一般在我们开发spark程序的时候,从代码开发到上线以及后期的维护中,在整个过程中都需要涉及到调优的问题,即一开始需要考虑如何把代码写的更简洁高效调优(即代码优化),待开发测试完成后,提交任务时综合考 ...

  2. 解决IntelliJ IDEA报错Error:Cannot determine path to ‘tools.jar‘ library for 17 (C:\Program Files\Java\jd

    解决IntelliJ IDEA报错Error:Cannot determine path to 'tools.jar' library for 17 (C:\Program Files\Java\jd ...

  3. spark报错:invalid token

    启动spark报错,启动container失败,去看yarn的日志,显示invalid token, 经过排查是hadoop子节点的配置和主节点的配置不一致导致的,同步之后,问题解决. 转载于:htt ...

  4. android studio报错Error:Project with path 'XXXX' could not be found解决办法

    刚从服务器上剪下来的工程编译死活报 project with path '... ' could not be found in project ' app' 这个错, 找不到依赖库,可是检查工程里面 ...

  5. Spark报错:Error:scalac: Scala compiler JARs not found (module ‘Spark_two‘): C:\Users\沫小新\.m2\repo

    Spark报错: 找不到scala-compiler-2.11.8.jar的包 Error:scalac: Scala compiler JARs not found (module 'Spark_t ...

  6. Spark报错It appears that you are attempting to broadcast an RDD or reference an RDD from an action

    Spark报错: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from ...

  7. pyspark及Spark报错问题汇总及某些函数用法。

    此贴,主要记录本人在工作中遇到的某些报错问题,并提出自己的解决办法. 1. spark = SparkSession.builder()  TypeError: 'Builder' object is ...

  8. Spark 报错 Failed to delete: C:\Users\lvacz\AppData\Local\Temp\spark-*

    一.问题 在win10,local模式执行完spark任务后不论是否可以执行出结果,都会报错: Failed to delete: C:\Users\lvacz\AppData\Local\Temp\ ...

  9. kerberos 下运行spark 报错 Requested user hdfs is banned

    启动运行报错 main : run as user is hdfs main : requested yarn user is hdfs Requested user hdfs is bannedFa ...

  10. Spark 报错 DROP TABLE IF EXISTS should not show AnalysisException

    spark-sql 执行执行 drop table if exists xxxx 的时候,报错信息如下: DROP TABLE IF EXISTS should not show AnalysisEx ...

最新文章

  1. linux中硬链接文件,科学网—Linux:文件的符号链接和硬链接 - 刘洋洋的博文
  2. 【python小游戏】据说这是一款还原度超高的小游戏,你感受下......
  3. 华为使用网线通过浏览器登录AC6005的Web网管
  4. TypeSprict -- 基础类型
  5. pip安装python模块不成功时,你可以尝试这样做
  6. oracle添加伪列,Oracle伪列 - jifengtang的个人空间 - OSCHINA - 中文开源技术交流社区...
  7. 互联网日报 | 理想汽车交付量突破30000辆;美团王慧文正式退休;寺库打造首个奢侈品直播基地...
  8. 大数据的Java/Hbase+C云平台开发技术 课程
  9. hmcl手机版_hmcl启动器正版
  10. win10系统怎么查看电脑配置?
  11. 《黑客秘笈——渗透测试实用指南》读书笔记(1)
  12. 计算机无线键盘没反应,电脑无线键盘没反应怎么回事
  13. 继电器电路原理图,PNP和NPN三极管介绍
  14. 人工智能调度如何改变现场服务行业
  15. 银行接口数据包(银行名称获取)
  16. 【蓝桥杯省赛】冲刺练习题【数学公式】倒计时【05】天(准考证组委会已下发,请查询)
  17. Ubuntu 10.04 LTS 下 Android 4.1.2_r1 源代码的下载
  18. 面试真题:经典智力题最详汇总
  19. 蓝翔技术学院计算机,蓝翔计算机学子马安然:我眼中的蓝翔
  20. 客服中心话务预测模型应用实践

热门文章

  1. Windows 免密码登录
  2. 田忌赛马(贪心算法)
  3. 未来IT人才市场最热门的12项技能
  4. repo报错:SyntaxError: invalid syntax
  5. 面试经历(纯属个人经历,仅供观看参考)
  6. 微信公众号自定义服务器,微信公众号自定义服务器的第一次验证
  7. 【DB宝36】使用Docker分分钟搭建漂亮的prometheus+grafana监控
  8. 2021“智荟杯”浦发百度高校极客挑战赛——比赛总结
  9. c# 软件单元测试,单元测试(C#版)
  10. 前端UI设计稿对比工具