1.问题描述

往集群提交任务的时候,需要在hdfs上面读取一个资源文件。在读取该资源文件的时候,代码报错出如下:

2021-01-29 09:48:29,023 ERROR scheduler.AsyncEventQueue: Listener EventLoggingListener threw an exception
java.io.IOException: Filesystem closedat org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:490)at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:625)at org.apache.hadoop.hdfs.DFSOutputStream.hsync(DFSOutputStream.java:607)at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:148)at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:147)at scala.Option.foreach(Option.scala:257)at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:147)at org.apache.spark.scheduler.EventLoggingListener.onApplicationEnd(EventLoggingListener.scala:196)at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:57)at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:113)at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:105)at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:105)at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:100)at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:100)at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:96)at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:95)
2021-01-29 09:48:29,037 ERROR util.Utils: Uncaught exception in thread pool-1-thread-1
java.io.IOException: Filesystem closedat org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:490)at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1734)at org.apache.hadoop.hdfs.DistributedFileSystem$31.doCall(DistributedFileSystem.java:1684)at org.apache.hadoop.hdfs.DistributedFileSystem$31.doCall(DistributedFileSystem.java:1681)at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1696)at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1758)at org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:245)at org.apache.spark.SparkContext$$anonfun$stop$8$$anonfun$apply$mcV$sp$5.apply(SparkContext.scala:1944)at org.apache.spark.SparkContext$$anonfun$stop$8$$anonfun$apply$mcV$sp$5.apply(SparkContext.scala:1944)at scala.Option.foreach(Option.scala:257)at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1944)at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1361)at org.apache.spark.SparkContext.stop(SparkContext.scala:1943)at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:587)at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:238)at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:210)at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:210)at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:210)at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1997)at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:210)at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:210)at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:210)at scala.util.Try$.apply(Try.scala:192)at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:210)at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:181)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)

2.调用拷贝hdfs文件到本地的代码

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.log4j.{Level, Logger}object EastDataReport {private val LOG: slf4j.Logger = LoggerFactory.getLogger(EastDataReport.getClass)//将hdfs文件拷贝到本地的函数
def copyHdfsDataToLocal(inHdfsPath:String,inLocalPath:String) ={var status=0try {val conf = new Configuration()// conf.setBoolean("fs.hdfs.impl.disable.cache", true)  conf.set("fs.default.name", "hdfs://xxx.hadoop:8020")val hdfsPath = new Path(inHdfsPath)val fs = hdfsPath.getFileSystem(conf)val localPath = new Path(inLocalPath)fs.copyToLocalFile(hdfsPath, localPath)fs.close()status=0}catch {case e: Exception => {e.printStackTrace()LOG.error(s"Failed , error msg ${e.getMessage()}")status=1}}finally {status}}def main(args: Array[String]): Unit = {Logger.getRootLogger.setLevel(Level.ERROR)Logger.getLogger(EastDataReport.getClass.getName).setLevel(Level.INFO)val outputHdfsPath=f"hdfs://xxx/report/data_detail/${dt}_${file_suffix}"println("outputHdfsPath: ",outputHdfsPath)println("将写入到hdfs上的csv拷贝到本地")val localpath="file:///home/work/xxx/project/report/data_detail/tmp/"val copyStatus=copyHdfsDataToLocal(outputHdfsPath,localpath)if (copyStatus.equals(0)) {println("将写入到hdfs上的csv拷贝到本地——成功")}else {println("将写入到hdfs上的csv拷贝到本地——失败")}}
}

3.问题的原因分析

一般读取hdfs上文件的api是这样写的:

public void initPackageNameTypeMap {Configuration conf = new Configuration();FileSystem fs = FileSystem.get(conf);Path file = new Path("filePath");
  • 在spark集群中执行 insert 语句时报错,堆栈信息为:FileSystem closed。常常出现在ThriftServer里面。
  • 当任务提交到集群上面以后,多个datanode在getFileSystem过程中,由于Configuration一样,会得到同一个FileSystem。
  • 由于hadoop FileSystem.get 获得的FileSystem会从缓存加载,如果多线程中的一个线程closed,即如果有一个datanode在使用完关闭连接,其它的datanode在访问就会出现上述异常。

4.解决方案

  • 1).修改配置文件:

hdfs存在不从缓存加载的解决方式,在hdfs-site.xml 配置 fs.hdfs.impl.disable.cache=true即可

  • 2).代码配置如下:

  conf.setBoolean("fs.hdfs.impl.disable.cache", true) 

         FileSytem类内部有一个static CACHE,用来保存每种文件系统的实例集合,FileSystem类中可以通过"fs.%s.impl.disable.cache"来指定是否缓存FileSystem实例(其中%s替换为相应的scheme,比如hdfs、local、s3、s3n等),即一旦创建了相应的FileSystem实例,这个实例将会保存在缓存中,此后每次get都会获取同一个实例。所以设为true以后,就能解决上面的异常。

参考链接:
http://stackoverflow.com/questions/23779186/ioexception-filesystem-closed-exception-when-running-oozie-workflow
http://stackoverflow.com/questions/20057881/hadoop-filesystem-closed-exception-when-doing-bufferedreader-close
http://shift-alt-ctrl.iteye.com/blog/2108760

spark报错:java.io.IOException: Filesystem closed相关推荐

  1. 使用HttpClient的时候报错java.io.IOException: Attempted read from closed stream

    使用HttpClient的时候报错java.io.IOException: Attempted read from closed stream 问题背景 解决方案 心得 Lyric: 我们愉快的梦游 ...

  2. HBase Master启动报错java.io.IOException: error or interrupted while splitting logs

    今天在一个CDH环境中启动HBase时HBase Master启动发生异常,HBase Master采用的是两台HMaster做一个HA.从CDH管理界面查询启动成功后,HBase Master状态并 ...

  3. zookeeper3.5.x版本启动报错java.io.IOException: No snapshot found, but there are log entries.解决

    目录 一.背景 二.原因 三.解决办法 一.背景 最近使用zookeeper比较多,而且存在3.4.x版本和3.5.x版本交替使用的情况,结果用着用着3.5.x版本的zookeeper就无法启动了,直 ...

  4. 【错误记录】Android 应用 POST 网络请求报错 ( java.io.IOException: Cleartext HTTP traffic to xxx not permitted )

    文章目录 一.报错信息 二.解决方案 一.报错信息 报错信息如下 : 执行 post 请求信息 , 报如下错误 : W/System.err: java.io.IOException: Clearte ...

  5. java报错--java.io.IOException: Server returned HTTP response code: 502

    java.io.IOException: Server returned HTTP response code: 502 for URL: http://ip.taobao.com/service/g ...

  6. java解压报错java.io.IOException: failed to skip current tar entry

    #java解压出现java.io.IOException: failed to skip current tar entry 当使用如下函数解压: AntBuilder antBuilder = ne ...

  7. MyBatis项目报错java.io.IOException: Could not find resource mapping/UserMapper.xml

    问题描述: 报错如图下:Error parsing SQL Mapper Configuration. Cause: java.io.IOException: Could not find resou ...

  8. java pdfbox 解析报错_pdfbox 读取文件报错 java.io.IOException: Page tree root must be a dictionary...

    pdfbox java.io.IOException: Page tree root must be a dictionary 示例代码 public static void main(String[ ...

  9. java远程执行命令报错java.io.IOException: Cannot run program “ifconfig“: error=2, No such file or directory

    直接执行ifconfig没问题,但是在java代码里远程执行ifconfig就找不到命令. session = conn.openSession(); session.execCommand(cmd) ...

  10. hadoop 报错 java.io.IOException: There appears to be a gap in the edit log. We expected txid 1, but g

    原因: namenode元数据被破坏,需要修复 解决: 恢复一下namenodehadoop namenode –recover选择Y选择c ok! 我也是醉了!!

最新文章

  1. WPF框架的内存泄漏BUG
  2. 蓝牙模块与电脑无线通信--AD测量大电压
  3. 海量数据实时在线分析QuickBI
  4. Apache Nutch 1.3 学习笔记十(插件机制分析)
  5. 2014新浪研发project师实习笔试(哈尔滨站)
  6. 建站基础知识之CSS 究竟什么来头?
  7. [Java基础] 使用JMAP dump及分析dump文件
  8. 一个博友的SQL问题解决过程
  9. pandas读取csv文件数据并使用matplotlib画折线图和饼图
  10. 5、海康威视摄像头配置和初步测试
  11. 在matlab下计算信源熵
  12. 北京地铁21号线_燕郊地铁M23号线是什么鬼?
  13. Qt以文件资源管理器打开文件夹
  14. Stream Collectors - counting
  15. 【竞争】SAP副总裁九华山庄的发言和真相报道(转:网易财经)
  16. 湖南中职计算机考试练习题
  17. 神舟Z7本安装Linux系统,神舟战神Z7M U盘装系统win7教程
  18. Ugurgallen只是使用ps的简单拼贴技术,却刺痛42万人的心!
  19. “深挖”小红书:内容+电商危机下还能走多远?
  20. android格式化手机号正则,Android中手机号、车牌号正则表达式大全

热门文章

  1. 苹果系统简易音乐播放器
  2. 电驴服务器搜索文件排序,电驴怎么连接服务器 电驴eMule怎么搜索文件
  3. Python货币转换Ⅰ
  4. 逆火软件测试工资,逆火刷机软件介绍和软件使用说明
  5. Abel逆变换及其求解方法
  6. 网页另存word分页
  7. 【说明书】迪士尼儿童手表说明书
  8. HTML文件mhl,比HDMI更强!MHL与HDMI技术解析
  9. 万字长文爆肝呕血整理Python入门到精通,【巨详细,一学就会】
  10. 信息安全工程师是什么?