Spark fails on big shuffle jobs with java.io.IOException: Filesystem closed

13,206

Solution 1

As of September 1st 2014, this is an "open improvement" in Spark. Please see https://issues.apache.org/jira/browse/SPARK-3052. As syrza pointed out in the given link, the shutdown hooks are likely done in incorrect order when an executor failed which results in this message. I understand you will have to little more investigation to figure out the main cause of problem (i.e. why your executor failed). If it is a large shuffle, it might be an out-of-memory error which cause executor failure which then caused the Hadoop Filesystem to be closed in their shutdown hook. So, the RecordReaders in running tasks of that executor throw "java.io.IOException: Filesystem closed" exception. I guess it will be fixed in subsequent release and then you will get more helpful error message :)

Solution 2

Something calls DFSClient.close() or DFSClient.abort(), closing the client. The next file operation then results in the above exception.

I would try to figure out what calls close()/abort(). You could use a breakpoint in your debugger, or modify the Hadoop source code to throw an exception in these methods, so you would get a stack trace.

Share:
13,206
samthebest
Author by

samthebest

To make me answer a question I like to answer questions on Spark, Hadoop, Big Data and Scala. I'm pretty good at Bash, git and Linux, so I can sometimes answer these questions too. I've stopped checking my filters for new questions these days, so I'm probably not answering questions which I probably could. Therefore if you think I can help, especially with Spark and Scala, then rather than me give me email out, please comment on a similar question/answer of mine with a link. Furthermore cross-linking similar questions can be nice for general SO browsing and good for SEO. My favourite answers Round parenthesis are much much better than curly braces http://stackoverflow.com/a/27686566/1586965 Underscore evangelism and in depth explanation http://stackoverflow.com/a/25763401/1586965 Generalized memoization http://stackoverflow.com/a/19065888/1586965 Monad explained in basically 2 LOCs http://stackoverflow.com/a/20707480/1586965

Updated on July 26, 2022

Comments

  • samthebest
    samthebest almost 2 years

    I often find spark fails with large jobs with a rather unhelpful meaningless exception. The worker logs look normal, no errors, but they get state "KILLED". This is extremely common for large shuffles, so operations like .distinct.

    The question is, how do I diagnose what's going wrong, and ideally, how do I fix it?

    Given that a lot of these operations are monoidal I've been working around the problem by splitting the data into, say 10, chunks, running the app on each chunk, then running the app on all of the resulting outputs. In other words - meta-map-reduce.

    14/06/04 12:56:09 ERROR client.AppClient$ClientActor: Master removed our application: FAILED; stopping client
    14/06/04 12:56:09 WARN cluster.SparkDeploySchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection...
    14/06/04 12:56:09 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException
    java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703)
        at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840)
        at java.io.DataInputStream.read(DataInputStream.java:149)
        at org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:159)
        at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:143)
        at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
        at java.io.InputStream.read(InputStream.java:101)
        at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
        at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
        at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:164)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:149)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
        at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257)
        at scala.collection.AbstractIterator.toList(Iterator.scala:1157)
        at $line5.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:13)
        at $line5.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:13)
        at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:450)
        at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:450)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
        at org.apache.spark.scheduler.Task.run(Task.scala:53)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
        at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
        at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)