How to wait for several Futures?

72,203

Solution 1

You could use a for-comprehension as follows instead:

val fut1 = Future{...}
val fut2 = Future{...}
val fut3 = Future{...}

val aggFut = for{
  f1Result <- fut1
  f2Result <- fut2
  f3Result <- fut3
} yield (f1Result, f2Result, f3Result)

In this example, futures 1, 2 and 3 are kicked off in parallel. Then, in the for comprehension, we wait until the results 1 and then 2 and then 3 are available. If either 1 or 2 fails, we will not wait for 3 anymore. If all 3 succeed, then the aggFut val will hold a tuple with 3 slots, corresponding to the results of the 3 futures.

Now if you need the behavior where you want to stop waiting if say fut2 fails first, things get a little trickier. In the above example, you would have to wait for fut1 to complete before realizing fut2 failed. To solve that, you could try something like this:

  val fut1 = Future{Thread.sleep(3000);1}
  val fut2 = Promise.failed(new RuntimeException("boo")).future
  val fut3 = Future{Thread.sleep(1000);3}

  def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
    val fut = if (futures.size == 1) futures.head._2
    else Future.firstCompletedOf(futures.values)

    fut onComplete{
      case Success(value) if (futures.size == 1)=> 
        prom.success(value :: values)

      case Success(value) =>
        processFutures(futures - value, value :: values, prom)

      case Failure(ex) => prom.failure(ex)
    }
    prom.future
  }

  val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
  aggFut onComplete{
    case value => println(value)
  }

Now this works correctly, but the issue comes from knowing which Future to remove from the Map when one has been successfully completed. As long as you have some way to properly correlate a result with the Future that spawned that result, then something like this works. It just recursively keeps removing completed Futures from the Map and then calling Future.firstCompletedOf on the remaining Futures until there are none left, collecting the results along the way. It's not pretty, but if you really need the behavior you are talking about, then this, or something similar could work.

Solution 2

You can use a promise, and send to it either the first failure, or the final completed aggregated success:

def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
  val p = Promise[M[A]]()

  // the first Future to fail completes the promise
  in.foreach(_.onFailure{case i => p.tryFailure(i)})

  // if the whole sequence succeeds (i.e. no failures)
  // then the promise is completed with the aggregated success
  Future.sequence(in).foreach(p trySuccess _)

  p.future
}

Then you can Await on that resulting Future if you want to block, or just map it into something else.

The difference with for comprehension is that here you get the error of the first to fail, whereas with for comprehension you get the first error in traversal order of the input collection (even if another one failed first). For example:

val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }

Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)}
// this waits one second, then prints "java.lang.ArithmeticException: / by zero"
// the first to fail in traversal order

And:

val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }

sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)}
// this immediately prints "java.util.NoSuchElementException: None.get"
// the 'actual' first to fail (usually...)
// and it returns early (it does not wait 1 sec)

Solution 3

Here is a solution without using actors.

import scala.util._
import scala.concurrent._
import java.util.concurrent.atomic.AtomicInteger

// Nondeterministic.
// If any failure, return it immediately, else return the final success.
def allSucceed[T](fs: Future[T]*): Future[T] = {
  val remaining = new AtomicInteger(fs.length)

  val p = promise[T]

  fs foreach {
    _ onComplete {
      case s @ Success(_) => {
        if (remaining.decrementAndGet() == 0) {
          // Arbitrarily return the final success
          p tryComplete s
        }
      }
      case f @ Failure(_) => {
        p tryComplete f
      }
    }
  }

  p.future
}

Solution 4

For this purpose I would use an Akka actor. Unlike the for-comprehension, it fails as soon as any of the futures fail, so it's a bit more efficient in that sense.

class ResultCombiner(futs: Future[_]*) extends Actor {

  var origSender: ActorRef = null
  var futsRemaining: Set[Future[_]] = futs.toSet

  override def receive = {
    case () =>
      origSender = sender
      for(f <- futs)
        f.onComplete(result => self ! if(result.isSuccess) f else false)
    case false =>
      origSender ! SomethingFailed
    case f: Future[_] =>
      futsRemaining -= f
      if(futsRemaining.isEmpty) origSender ! EverythingSucceeded
  }

}

sealed trait Result
case object SomethingFailed extends Result
case object EverythingSucceeded extends Result

Then, create the actor, send a message to it (so that it will know where to send its reply to) and wait for a reply.

val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3)))
try {
  val f4: Future[Result] = actor ? ()
  implicit val timeout = new Timeout(30 seconds) // or whatever
  Await.result(f4, timeout.duration).asInstanceOf[Result] match {
    case SomethingFailed => println("Oh noes!")
    case EverythingSucceeded => println("It all worked!")
  }
} finally {
  // Avoid memory leaks: destroy the actor
  actor ! PoisonPill
}

Solution 5

You can do this with futures alone. Here's one implementation. Note that it won't terminate execution early! In that case you need to do something more sophisticated (and probably implement the interruption yourself). But if you just don't want to keep waiting for something that isn't going to work, the key is to keep waiting for the first thing to finish, and stop when either nothing is left or you hit an exception:

import scala.annotation.tailrec
import scala.util.{Try, Success, Failure}
import scala.concurrent._
import scala.concurrent.duration.Duration
import ExecutionContext.Implicits.global

@tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()): 
Either[Throwable, Seq[A]] = {
  val first = Future.firstCompletedOf(fs)
  Await.ready(first, Duration.Inf).value match {
    case None => awaitSuccess(fs, done)  // Shouldn't happen!
    case Some(Failure(e)) => Left(e)
    case Some(Success(_)) =>
      val (complete, running) = fs.partition(_.isCompleted)
      val answers = complete.flatMap(_.value)
      answers.find(_.isFailure) match {
        case Some(Failure(e)) => Left(e)
        case _ =>
          if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done)
          else Right( answers.map(_.get) ++: done )
      }
  }
}

Here's an example of it in action when everything works okay:

scala> awaitSuccess(Seq(Future{ println("Hi!") }, 
  Future{ Thread.sleep(1000); println("Fancy meeting you here!") },
  Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
Fancy meeting you here!
Bye!
res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))

But when something goes wrong:

scala> awaitSuccess(Seq(Future{ println("Hi!") }, 
  Future{ Thread.sleep(1000); throw new Exception("boo"); () }, 
  Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo)

scala> Bye!
Share:
72,203
Michael
Author by

Michael

Scala (moslty) programmer

Updated on May 13, 2020

Comments

  • Michael
    Michael almost 4 years

    Suppose I have several futures and need to wait until either any of them fails or all of them succeed.

    For example: Let there are 3 futures: f1, f2, f3.

    • If f1 succeeds and f2 fails I do not wait for f3 (and return failure to the client).

    • If f2 fails while f1 and f3 are still running I do not wait for them (and return failure)

    • If f1 succeeds and then f2 succeeds I continue waiting for f3.

    How would you implement it?

  • Michael
    Michael about 11 years
    Looks a bit too complex for such a simple task. Do I really need an actor to just wait for futures ? Thanks anyway.
  • Robin Green
    Robin Green about 11 years
    I couldn't find any suitable method in the API which can do exactly what you want, but maybe I missed something.
  • Michael
    Michael about 11 years
    Thank you. What happens if fut2 fails before fut1 ? Will we still wait for fut1 in that case ? If we will it's not exactly what I want.
  • Alexander Aleksandrovič Klimov
    Alexander Aleksandrovič Klimov about 11 years
    But if 3 fails first, we still wait for 1 and 2 when we could return early. Any way of doing this without requiring sequencing the futures?
  • pagoda_5b
    pagoda_5b about 11 years
    You can install an onFailure handler for fut2 to fail fast, and a onSuccess on aggFut to handle success. A success on aggFut implies fut2 did complete successfully, so you only have one of the handlers called.
  • cmbaxter
    cmbaxter about 11 years
    I added a little more to my answer to show a possible solution for fast failing if any of the futures fails.
  • bwawok
    bwawok almost 9 years
    In your first example, 1 2 and 3 do not run in parallel, then run in serial. Try it with printlines and see
  • cmbaxter
    cmbaxter almost 9 years
    @bwawok, they certainly do. Put a Thread.sleep in the first one before the println and I bet the other two print first.
  • bwawok
    bwawok almost 9 years
    @cmbaxter They certainly do not. See gist.github.com/brianwawok/98a231396fc87675fba3 . Now if instead of for compregension you use a Future.sequence.. they WILL compute in parallel, because sequence is using fold under the hood.
  • cmbaxter
    cmbaxter almost 9 years
    @bwawok, there is a distinct difference in your code example and mine. In my example, the Futures are kicked off outside of a for comprehension and then referenced in the for comprehension. In your example, you are instantiating the Futures inside of the for comprehension and that will definitely run them sequentially. Change your code to create the futures outside of the for comprehension (assigning them to vals) and then you can extract their values in the for comprehension. If you do it this way they will indeed run in parallel.
  • bwawok
    bwawok almost 9 years
    @cmbaxter that is pretty terrible, seems like the compiler should compile them down to the same code, weather you declare a future in 1 line or two
  • cmbaxter
    cmbaxter almost 9 years
    @bwawok, remember that the for comp is basically the same as fut1 flatMap fut2 (roughly at least). If you think of it that way, because the flatMap propagates the value forward to the next step then it needs fut1 to be complete first before continuing on. If the next fut is created in the flatMap itself then it won't be created or started until the preceding step completes.
  • bwawok
    bwawok almost 9 years
    @cmbaxter except at compile time you can tell if the result of fut1 is used in fut2... for example, if you ignore the result of fut1 (map as _ ), it is pretty clear it is not used
  • srirachapills
    srirachapills over 8 years
    Awesome! Any lib that provide that kind of utility function?
  • Lance Gatlin
    Lance Gatlin over 8 years
    Yes I've since created an extensive Future utility lib: github.com/S-Mach/s_mach.concurrent See async.par in the example code.
  • jm0
    jm0 over 8 years
    @bwawok I considered that same thing as well but the for comprehension is what it is since its not specific to futures (it is a Monad operator). The case of non-sequential future optimization would be a bit of a special case (since I'm not sure it would be safe to assume it could optimize all Monad types this same way). Still, it's kind of like optimizing for the side-effects of a specific type (in this case, knowing that initialization of a future begins its computation). One could make a variation of the for construct that is specific to futures, it's just not in Scala core.
  • Michael Rueegg
    Michael Rueegg about 8 years
    Nice implementation. But note that if you pass an empty sequence of futures to awaitSuccess, it waits forever...