Why does join fail with "java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]"?

99,483

Solution 1

This happens because Spark tries to do Broadcast Hash Join and one of the DataFrames is very large, so sending it consumes much time.

You can:

  1. Set higher spark.sql.broadcastTimeout to increase timeout - spark.conf.set("spark.sql.broadcastTimeout", newValueForExample36000)
  2. persist() both DataFrames, then Spark will use Shuffle Join - reference from here

PySpark

In PySpark, you can set the config when you build the spark context in the following manner:

spark = SparkSession
  .builder
  .appName("Your App")
  .config("spark.sql.broadcastTimeout", "36000")
  .getOrCreate()

Solution 2

Just to add some code context to the very concise answer from @T. Gawęda.


In your Spark application, Spark SQL did choose a broadcast hash join for the join because "libriFirstTable50Plus3DF has 766,151 records" which happened to be less than the so-called broadcast threshold (defaults to 10MB).

You can control the broadcast threshold using spark.sql.autoBroadcastJoinThreshold configuration property.

spark.sql.autoBroadcastJoinThreshold Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE COMPUTE STATISTICS noscan has been run.

You can find that particular type of join in the stack trace:

org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)

BroadcastHashJoin physical operator in Spark SQL uses a broadcast variable to distribute the smaller dataset to Spark executors (rather than shipping a copy of it with every task).

If you used explain to review the physical query plan you'd notice the query uses BroadcastExchangeExec physical operator. This is where you can see the underlying machinery for broadcasting the smaller table (and the timeout).

override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
  ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
}

doExecuteBroadcast is part of SparkPlan contract that every physical operator in Spark SQL follows that allows for broadcasting if needed. BroadcastExchangeExec happens to need it.

The timeout parameter is what you are looking for.

private val timeout: Duration = {
  val timeoutValue = sqlContext.conf.broadcastTimeout
  if (timeoutValue < 0) {
    Duration.Inf
  } else {
    timeoutValue.seconds
  }
}

As you can see you can disable it completely (using a negative value) that would imply to wait for the broadcast variable to be shipped to executors indefinitely or use sqlContext.conf.broadcastTimeout which is exactly spark.sql.broadcastTimeout configuration property. The default value is 5 * 60 seconds which you can see in the stacktrace:

java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]

Solution 3

In addition to increasing spark.sql.broadcastTimeout or persist() both DataFrames,

You may try:

1.disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1

2.increase the spark driver memory by setting spark.driver.memory to a higher value.

Share:
99,483
Christos Hadjinikolis
Author by

Christos Hadjinikolis

An ML Engineer with a passion for problem-solving, and with a proven academic record in the field of Artificial Intelligence. A competent programmer exposed to both dynamically (Python, Matlab) and statically typed (Java) programming languages, with commercial experience in the development of big-data pipelines using open-source cluster computing frameworks (Apache Spark, Flink, DataFlow). A certified Apache Spark and Neo4j developer and Google Cloud Engineer. A Graph-Analytics expert with hands-on experience in handling and statistically analysing large data sets and with a speciality in algorithm design for Monte-Carlo simulations applied on graphs. Excellent presentation and communication skills acquired through four years of teaching in Higher Education and presenting in high-calibre conferences (both in academia as well as in the industry). Ability to quickly adjust to new environments and productively interact with people from different (scientific) backgrounds, gained from extensive exposure to interdisciplinary work through my PhD and my role as a consultant over the last 3 years. I also lead the Apache Flink Meetup community in London.

Updated on July 05, 2022

Comments

  • Christos Hadjinikolis
    Christos Hadjinikolis almost 2 years

    I am using Spark 1.5.

    I have two dataframes of the form:

    scala> libriFirstTable50Plus3DF
    res1: org.apache.spark.sql.DataFrame = [basket_id: string, family_id: int]
    
    scala> linkPersonItemLessThan500DF
    res2: org.apache.spark.sql.DataFrame = [person_id: int, family_id: int]
    

    libriFirstTable50Plus3DF has 766,151 records while linkPersonItemLessThan500DF has 26,694,353 records. Note that I am using repartition(number) on linkPersonItemLessThan500DF since I intend to join these two later on. I am following up the above code with:

    val userTripletRankDF = linkPersonItemLessThan500DF
         .join(libriFirstTable50Plus3DF, Seq("family_id"))
         .take(20)
         .foreach(println(_))
    

    for which I am getting this output:

    16/12/13 15:07:10 INFO scheduler.TaskSetManager: Finished task 172.0 in stage 3.0 (TID 473) in 520 ms on mlhdd01.mondadori.it (199/200)
    java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:        at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
    at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
    at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:63)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
     at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190)
     at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
     at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
     at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
     at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
     at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
     at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
     at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315)
     at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378)
     at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
     at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402)
     at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363)
     at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371)
     at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
     at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:77)
     at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:79)
     at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:81)
     at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:83)
     at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:85)
     at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:87)
     at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89)
     at $iwC$$iwC$$iwC$$iwC.<init>(<console>:91)
     at $iwC$$iwC$$iwC.<init>(<console>:93)
     at $iwC$$iwC.<init>(<console>:95)
     at $iwC.<init>(<console>:97)
     at <init>(<console>:99)
     at .<init>(<console>:103)
     at .<clinit>(<console>)
     at .<init>(<console>:7)
     at .<clinit>(<console>)
     at $print(<console>)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.lang.reflect.Method.invoke(Method.java:606)
     at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
     at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
     at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
     at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
     at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
     at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
     at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
     at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
     at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
     at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
     at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
     at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
     at org.apache.spark.repl.Main$.main(Main.scala:31)
     at org.apache.spark.repl.Main.main(Main.scala)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.lang.reflect.Method.invoke(Method.java:606)
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    

    and I don't understand what is the issue. Is it as simple as increasing the waiting time? Is the join too intensive? Do I need more memory? Is the shufffling intensive? Can anyone help?