Idiomatic goroutine termination and error handling

44,778

Solution 1

Using Error Group makes this even simpler. This automatically waits for all the supplied Go Routines to complete successfully, or cancels all those remaining in the case of any one routine returning an error (in which case that error is the one bubble back up to the caller).

package main

import (
        "context"
        "fmt"
        "math/rand"
        "time"

        "golang.org/x/sync/errgroup"
)

func fetchAll(ctx context.Context) error {
        errs, ctx := errgroup.WithContext(ctx)

        // run all the http requests in parallel
        for i := 0; i < 4; i++ {
                errs.Go(func() error {
                        // pretend this does an http request and returns an error                                                  
                        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)                                               
                        return fmt.Errorf("error in go routine, bailing")                                                      
                })
        }

        // Wait for completion and return the first error (if any)                                                                 
        return errs.Wait()
}

func main() {
        fmt.Println(fetchAll(context.Background()))
}

Solution 2

All but one of your goroutines are leaked, because they're still waiting to send to the errs channel - you never finish the for-range that empties it. You're also leaking the goroutine who's job is to close the errs channel, because the waitgroup is never finished.

(Also, as Andy pointed out, deleting from map is not thread-safe, so that'd need protection from a mutex.)

However, I don't think maps, mutexes, waitgroups, contexts etc. are even necessary here. I'd rewrite the whole thing to just use basic channel operations, something like the following:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func fetchAll() error {
    var N = 4
    quit := make(chan bool)
    errc := make(chan error)
    done := make(chan error)
    for i := 0; i < N; i++ {
        go func(i int) {
            // dummy fetch
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            err := error(nil)
            if rand.Intn(2) == 0 {
                err = fmt.Errorf("goroutine %d's error returned", i)
            }
            ch := done // we'll send to done if nil error and to errc otherwise
            if err != nil {
                ch = errc
            }
            select {
            case ch <- err:
                return
            case <-quit:
                return
            }
        }(i)
    }
    count := 0
    for {
        select {
        case err := <-errc:
            close(quit)
            return err
        case <-done:
            count++
            if count == N {
                return nil // got all N signals, so there was no error
            }
        }
    }
}

func main() {
    rand.Seed(time.Now().UnixNano())
    fmt.Println(fetchAll())
}

Playground link: https://play.golang.org/p/mxGhSYYkOb

EDIT: There indeed was a silly mistake, thanks for pointing it out. I fixed the code above (I think...). I also added some randomness for added Realism™.

Also, I'd like to stress that there really are multiple ways to approach this problem, and my solution is but one way. Ultimately it comes down to personal taste, but in general, you want to strive towards "idiomatic" code - and towards a style that feels natural and easy to understand for you.

Solution 3

Here's a more complete example using errgroup suggested by joth. It shows processing successful data, and will exit on the first error.

https://play.golang.org/p/rU1v-Mp2ijo

package main

import (
    "context"
    "fmt"
    "golang.org/x/sync/errgroup"
    "math/rand"
    "time"
)

func fetchAll() error {
    g, ctx := errgroup.WithContext(context.Background())
    results := make(chan int)
    for i := 0; i < 4; i++ {
        current := i
        g.Go(func() error {
            // Simulate delay with random errors.
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            if rand.Intn(2) == 0 {
                return fmt.Errorf("goroutine %d's error returned", current)
            }
            // Pass processed data to channel, or receive a context completion.
            select {
            case results <- current:
                return nil
            // Close out if another error occurs.
            case <-ctx.Done():
                return ctx.Err()
            }
        })
    }

    // Elegant way to close out the channel when the first error occurs or
    // when processing is successful.
    go func() {
        g.Wait()
        close(results)
    }()

    for result := range results {
        fmt.Println("processed", result)
    }

    // Wait for all fetches to complete.
    return g.Wait()
}

func main() {
    fmt.Println(fetchAll())
}

Solution 4

As long as each goroutine completes, you won't leak anything. You should create the error channel as buffered with the buffer size equal to the number of goroutines so that the send operations on the channel won't block. Each goroutine should always send something on the channel when it finishes, whether it succeeds or fails. The loop at the bottom can then just iterate for the number of goroutines and return if it gets a non-nil error. You don't need the WaitGroup or the other goroutine that closes the channel.

I think the reason it appears that goroutines are leaking is that you return when you get the first error, so some of them are still running.

By the way, maps are not goroutine safe. If you share a map among goroutines and some of them are making changes to the map, you need to protect it with a mutex.

Share:
44,778
Admin
Author by

Admin

Updated on July 22, 2022

Comments

  • Admin
    Admin almost 2 years

    I have a simple concurrency use case in go, and I cannot figure out an elegant solution to my problem.

    I want to write a method fetchAll that queries an unspecified number of resources from remote servers in parallel. If any of the fetches fails, I want to return that first error immediately.

    My initial implementation leaks goroutines:

        package main
    
        import (
          "fmt"
          "math/rand"
          "sync"
          "time"
        )
    
        func fetchAll() error {
          wg := sync.WaitGroup{}
          errs := make(chan error)
          leaks := make(map[int]struct{})
          defer fmt.Println("these goroutines leaked:", leaks)
    
          // run all the http requests in parallel
          for i := 0; i < 4; i++ {
            leaks[i] = struct{}{}
            wg.Add(1)
            go func(i int) {
              defer wg.Done()
              defer delete(leaks, i)
    
              // pretend this does an http request and returns an error
              time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
              errs <- fmt.Errorf("goroutine %d's error returned", i)
            }(i)
          }
    
          // wait until all the fetches are done and close the error
          // channel so the loop below terminates
          go func() {
            wg.Wait()
            close(errs)
          }()
    
          // return the first error
          for err := range errs {
            if err != nil {
              return err
            }
          }
    
          return nil
        }
    
        func main() {
          fmt.Println(fetchAll())
        }
    

    Playground: https://play.golang.org/p/Be93J514R5

    I know from reading https://blog.golang.org/pipelines that I can create a signal channel to cleanup the other threads. Alternatively, I could probably use context to accomplish it. But it seems like such a simple use case should have a simpler solution that I'm missing.