Run multiple futures in parallel, return default value on timeout

13,090

Solution 1

def toFallback[T](f: Future[T], to: Int, default: T) = {
  future{
  try{
        Await.result(f, to seconds)
   }catch{
        case e:TimeoutException => default
  }
 }

You can even make this block asynchronous and each request waits for its maximum time. If there are too many threads, probably have a single thread that keeps checking for other futures using Akka's system scheduler. @Senia has answered below on this one.

Solution 2

You could create future that returns results of all 3 futures using flatMap or for-comprehension:

val combinedFuture =
  for {
    r1 <- future1
    r2 <- future2
    r3 <- future3
  } yield (r1, r2, r3)

val (r1, r2, r3) = Await.result(combinedFuture , Seq(timeout1, timeout2, timeout3).max)

If you are using akka you could complete your future with default value after timeout:

implicit class FutureHelper[T](f: Future[T]) extends AnyVal{
  import akka.pattern.after
  def orDefault(t: Timeout, default: => T)(implicit system: ActorSystem): Future[T] = {
    val delayed = after(t.duration, system.scheduler)(Future.successful(default))
    Future firstCompletedOf Seq(f, delayed)
  }
}

val combinedFuture =
  for {
    r1 <- future1.orDefault(timeout1, Map())
    r2 <- future2.orDefault(timeout2, List())
    r3 <- future3.orDefault(timeout3, Map())
  } yield (r1, r2, r3)

val (r1, r2, r3) = Await.result(combinedFuture , allowance + Seq(timeout1, timeout2, timeout3).max)

Solution 3

I would avoid using Await.result since that uses a thread just for blocking. One option to implement timeout for futures would be this:

val timer = new Timer()

def toFallback[T](f: Future[T], timeout: Int, default: T) = {
  val p = Promise[T]()
  f.onComplete(result => p.tryComplete(result))
  timer.schedule(new TimerTask {
    def run() {
      p.tryComplete(Success(default))
    }
  }, timeout)
  p.future
}

This creates a promise which will be completed either by a future or by a the default result after the specified timeout - whichever comes first.

To run the queries concurrently you would do like so:

val future1 = // start future1
val future2 = // start future2
val future3 = // start future3

val res1 = toFallback(future1, timeout1, Map[String, Int]())
val res2 = toFallback(future2, timeout2, List[Int]())
val res3 = toFallback(future3, timeout3, Map[String, BigInt]())

val resultF = for {
  r1 <- res1
  r2 <- res2
  r3 <- res3
} yield (r1, r2, r3)

val (r1, r2, r3) = Await.result(resultF, Duration.Inf)
println(s"$r1, $r2, $r3")

//or
resultF.onSuccess {
  case (r1, r2, r3) => println(s"$r1, $r2, $r3")
}

Solution 4

Here's a longer (unakka) answer that addresses what might be the use case, namely, if one of the values "times out" you want to use the default value for that result and also do something with it (such as cancel the long-running calculation or i/o or whatever).

Needless to say, the other story is to minimize blocking.

The basic idea is to sit in a loop awaiting the firstCompletedOf the items which haven't yet completed. The timeout on the ready is the minimum remaining timeout.

This code uses deadlines instead of durations, but using a duration as "time remaining" is easy.

import scala.language.postfixOps
import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits._
import scala.reflect._
import scala.util._
import java.lang.System.{ nanoTime => now }

import Test.time

class Test {

  type WorkUnit[A] = (Promise[A], Future[A], Deadline, A)
  type WorkQ[A] = Seq[WorkUnit[A]]

  def await[A: ClassTag](work: Seq[(Future[A], Deadline, A)]): Seq[A] = {
    // check for timeout; if using Duration instead of Deadline, decrement here
    def ticktock(w: WorkUnit[A]): WorkUnit[A] = w match {
      case (p, f, t, v) if !p.isCompleted && t.isOverdue => p trySuccess v ; w
      case _ => w
    }
    def await0(work: WorkQ[A]): WorkQ[A] = {
      val live = work filterNot (_._1.isCompleted)
      val t0 = (live map (_._3)).min
      Console println s"Next deadline in ${t0.timeLeft.toMillis}"
      val f0 = Future firstCompletedOf (live map (_._2))
      Try(Await ready (f0, t0.timeLeft))
      val next = work map (w => ticktock(w))
      if (next exists (!_._1.isCompleted)) {
        await0(next)
      } else {
        next
      }
    }
    val wq = work map (_ match {
      case (f, t, v) =>
        val p = Promise[A]
        p.future onComplete (x => Console println s"Value available: $x: $time")
        f onSuccess {
          case a: A => p trySuccess a  // doesn't match on primitive A
          case x => p trySuccess x.asInstanceOf[A]
        }
        f onFailure { case _ => p trySuccess v }
        (p, f, t, v)
    })
    await0(wq) map (_ match {
      case (p, f, t, v) => p.future.value.get.get
    })
  }
}

object Test {
  val start = now
  def time = s"The time is ${ Duration fromNanos (now - start) toMillis }"

  def main(args: Array[String]): Unit = {
    // #2 times out
    def calc(i: Int) = {
      val t = if (args.nonEmpty && i == 2) 6 else i
      Thread sleep t * 1000L
      Console println s"Calculate $i: $time"
      i
    }
    // futures to be completed by a timeout deadline
    // or else use default and let other work happen
    val work = List(
      (future(calc(1)), 3 seconds fromNow, 10),
      (future(calc(2)), 5 seconds fromNow, 20),
      (future(calc(3)), 7 seconds fromNow, 30)
    )
    Console println new Test().await(work)
  }
}

Sample run:

apm@mara:~/tmp$ skalac nextcompleted.scala ; skala nextcompleted.Test 
Next deadline in 2992
Calculate 1: The time is 1009
Value available: Success(1): The time is 1012
Next deadline in 4005
Calculate 2: The time is 2019
Value available: Success(2): The time is 2020
Next deadline in 4999
Calculate 3: The time is 3020
Value available: Success(3): The time is 3020
List(1, 2, 3)
apm@mara:~/tmp$ skala nextcompleted.Test arg
Next deadline in 2992
Calculate 1: The time is 1009
Value available: Success(1): The time is 1012
Next deadline in 4005
Calculate 3: The time is 3020
Value available: Success(3): The time is 3020
Next deadline in 1998
Value available: Success(20): The time is 5020
List(1, 20, 3)
Share:
13,090
zbstof
Author by

zbstof

Backend, BigData

Updated on June 17, 2022

Comments

  • zbstof
    zbstof almost 2 years

    I have to run multiple futures in parallel and the program shouldn't crash or hang.

    For now I wait on futures one by one, and use fallback value if there is TimeoutException.

    val future1 = // start future1
    val future2 = // start future2
    val future3 = // start future3
    
    // <- at this point all 3 futures are running
    
    // waits for maximum of timeout1 seconds
    val res1 = toFallback(future1, timeout1, Map[String, Int]())
    // .. timeout2 seconds 
    val res2 = toFallback(future2, timeout2, List[Int]())
    // ... timeout3 seconds
    val res3 = toFallback(future3, timeout3, Map[String, BigInt]()) 
    
    def toFallback[T](f: Future[T], to: Int, default: T) = {
      Try(Await.result(f, to seconds))
        .recover { case to: TimeoutException => default }
    }
    

    As I can see, maximum wait time of this snippet is timeout1 + timeout2 + timeout3

    My question is: how can I wait on all of those futures at once, so I can reduce wait time to max(timeout1, timeout2, timeout3)?

    EDIT: In the end I used modification of @Jatin and @senia answers:

    private def composeWaitingFuture[T](fut: Future[T], 
                                        timeout: Int, default: T) =
      future { Await.result(fut, timeout seconds) } recover {
        case e: Exception => default
      }
    

    and later it's used as follows:

    // starts futures immediately and waits for maximum of timeoutX seconds
    val res1 = composeWaitingFuture(future1, timeout1, Map[String, Int]())
    val res2 = composeWaitingFuture(future2, timeout2, List[Int]())
    val res3 = composeWaitingFuture(future3, timeout3, Map[String, BigInt]()) 
    
    // takes the maximum of max(timeout1, timeout2, timeout3) to complete
    val combinedFuture =
      for {
        r1 <- res1
        r2 <- res2
        r3 <- res3
      } yield (r1, r2, r3)
    

    and later I use combinedFuture as I see fit.