How would you define a pool of goroutines to be executed at once?

30,264

Solution 1

I would spawn 4 worker goroutines that read the tasks from a common channel. Goroutines that are faster than others (because they are scheduled differently or happen to get simple tasks) will receive more task from this channel than others. In addition to that, I would use a sync.WaitGroup to wait for all workers to finish. The remaining part is just the creation of the tasks. You can see an example implementation of that approach here:

package main

import (
    "os/exec"
    "strconv"
    "sync"
)

func main() {
    tasks := make(chan *exec.Cmd, 64)

    // spawn four worker goroutines
    var wg sync.WaitGroup
    for i := 0; i < 4; i++ {
        wg.Add(1)
        go func() {
            for cmd := range tasks {
                cmd.Run()
            }
            wg.Done()
        }()
    }

    // generate some tasks
    for i := 0; i < 10; i++ {
        tasks <- exec.Command("zenity", "--info", "--text='Hello from iteration n."+strconv.Itoa(i)+"'")
    }
    close(tasks)

    // wait for the workers to finish
    wg.Wait()
}

There are probably other possible approaches, but I think this is a very clean solution that is easy to understand.

Solution 2

A simple approach to throttling (execute f() N times but maximum maxConcurrency concurrently), just a scheme:

package main

import (
        "sync"
)

const maxConcurrency = 4 // for example

var throttle = make(chan int, maxConcurrency)

func main() {
        const N = 100 // for example
        var wg sync.WaitGroup
        for i := 0; i < N; i++ {
                throttle <- 1 // whatever number
                wg.Add(1)
                go f(i, &wg, throttle)
        }
        wg.Wait()
}

func f(i int, wg *sync.WaitGroup, throttle chan int) {
        defer wg.Done()
        // whatever processing
        println(i)
        <-throttle
}

Playground

I wouldn't probably call the throttle channel "dummy". IMHO it's an elegant way (it's not my invention of course), how to limit concurrency.

BTW: Please note that you're ignoring the returned error from cmd.Run().

Solution 3

try this: https://github.com/korovkin/limiter

 limiter := NewConcurrencyLimiter(10)
 limiter.Execute(func() {
        zenity(...) 
 })
 limiter.Wait()

Solution 4


🧩 Modules


📃 Template

package main

import (
    "fmt"
    "github.com/zenthangplus/goccm"
    "math/rand"
    "runtime"
)

func main() {
    semaphore := goccm.New(runtime.NumCPU())
    
    for {
        semaphore.Wait()

        go func() {
            fmt.Println(rand.Int())
            semaphore.Done()
        }()
    }
    
    semaphore.WaitAllDone()
}

🎰 Optimal routine quantity

  • If the operation is CPU bounded: runtime.NumCPU()
  • Otherwise test with: time go run *.go

🔨 Configure

export GOPATH="$(pwd)/gopath"
go mod init *.go
go mod tidy

🧹 CleanUp

find "${GOPATH}" -exec chmod +w {} \;
rm --recursive --force "${GOPATH}"

Share:
30,264
Admin
Author by

Admin

Updated on July 23, 2021

Comments

  • Admin
    Admin almost 3 years

    TL;DR: Please just go to the last part and tell me how you would solve this problem.

    I've begun using Go this morning coming from Python. I want to call a closed-source executable from Go several times, with a bit of concurrency, with different command line arguments. My resulting code is working just well but I'd like to get your input in order to improve it. Since I'm at an early learning stage, I'll also explain my workflow.

    For the sake of simplicity, assume here that this "external closed-source program" is zenity, a Linux command line tool that can display graphical message boxes from the command line.

    Calling an executable file from Go

    So, in Go, I would go like this:

    package main
    import "os/exec"
    func main() {
        cmd := exec.Command("zenity", "--info", "--text='Hello World'")
        cmd.Run()
    }
    

    This should be working just right. Note that .Run() is a functional equivalent to .Start() followed by .Wait(). This is great, but if I wanted to execute this program just once, the whole programming stuff would not be worth it. So let's just do that multiple times.

    Calling an executable multiple times

    Now that I had this working, I'd like to call my program multiple times, with custom command line arguments (here just i for the sake of simplicity).

    package main    
    import (
        "os/exec"
        "strconv"
    )
    
    func main() {
        NumEl := 8 // Number of times the external program is called
        for i:=0; i<NumEl; i++ {
            cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
            cmd.Run()
        }
    }
    

    Ok, we did it! But I still can't see the advantage of Go over Python … This piece of code is actually executed in a serial fashion. I have a multiple-core CPU and I'd like to take advantage of it. So let's add some concurrency with goroutines.

    Goroutines, or a way to make my program parallel

    a) First attempt: just add "go"s everywhere

    Let's rewrite our code to make things easier to call and reuse and add the famous go keyword:

    package main
    import (
        "os/exec"
        "strconv"
    )
    
    func main() {
        NumEl := 8 
        for i:=0; i<NumEl; i++ {
            go callProg(i)  // <--- There!
        }
    }
    
    func callProg(i int) {
        cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
        cmd.Run()
    }
    

    Nothing! What is the problem? All the goroutines are executed at once. I don't really know why zenity is not executed but AFAIK, the Go program exited before the zenity external program could even be initialized. This was confirmed by the use of time.Sleep: waiting for a couple of seconds was enough to let the 8 instance of zenity launch themselves. I don't know if this can be considered a bug though.

    To make it worse, the real program I'd actually like to call takes a while to execute itself. If I execute 8 instances of this program in parallel on my 4-core CPU, it's gonna waste some time doing a lot of context switching … I don't know how plain Go goroutines behave, but exec.Command will launch zenity 8 times in 8 different threads. To make it even worse, I want to execute this program more than 100,000 times. Doing all of that at once in goroutines won't be efficient at all. Still, I'd like to leverage my 4-core CPU!

    b) Second attempt: use pools of goroutines

    The online resources tend to recommend the use of sync.WaitGroup for this kind of work. The problem with that approach is that you are basically working with batches of goroutines: if I create of WaitGroup of 4 members, the Go program will wait for all the 4 external programs to finish before calling a new batch of 4 programs. This is not efficient: CPU is wasted, once again.

    Some other resources recommended the use of a buffered channel to do the work:

    package main
    import (
        "os/exec"
        "strconv"
    )
    
    func main() {
        NumEl := 8               // Number of times the external program is called
        NumCore := 4             // Number of available cores
        c := make(chan bool, NumCore - 1) 
        for i:=0; i<NumEl; i++ {
            go callProg(i, c)
            c <- true            // At the NumCoreth iteration, c is blocking   
        }
    }
    
    func callProg(i int, c chan bool) {
        defer func () {<- c}()
        cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
        cmd.Run()
    }
    

    This seems ugly. Channels were not intended for this purpose: I'm exploiting a side-effect. I love the concept of defer but I hate having to declare a function (even a lambda) to pop a value out of the dummy channel that I created. Oh, and of course, using a dummy channel is, by itself, ugly.

    c) Third attempt: die when all the children are dead

    Now we are nearly finished. I have just to take into account yet another side effect: the Go program closes before all the zenity pop-ups are closed. This is because when the loop is finised (at the 8th iteration), nothing prevents the program from finishing. This time, sync.WaitGroup will be useful.

    package main
    import (
        "os/exec"
        "strconv"
        "sync"
    )
    
    func main() {
        NumEl := 8               // Number of times the external program is called
        NumCore := 4             // Number of available cores
        c := make(chan bool, NumCore - 1) 
        wg := new(sync.WaitGroup)
        wg.Add(NumEl)            // Set the number of goroutines to (0 + NumEl)
        for i:=0; i<NumEl; i++ {
            go callProg(i, c, wg)
            c <- true            // At the NumCoreth iteration, c is blocking   
        }
        wg.Wait() // Wait for all the children to die
        close(c)
    }
    
    func callProg(i int, c chan bool, wg *sync.WaitGroup) {
        defer func () {
            <- c
            wg.Done() // Decrease the number of alive goroutines
        }()
        cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
        cmd.Run()
    }
    

    Done.

    My questions

    • Do you know any other proper way to limit the number of goroutines executed at once?

    I don't mean threads; how Go manages goroutines internally is not relevant. I really mean limiting the number of goroutines launched at once: exec.Command creates a new thread each time it is called, so I should control the number of time it is called.

    • Does that code look fine to you?
    • Do you know how to avoid the use of a dummy channel in that case?

    I can't convince myself that such dummy channels are the way to go.

  • Admin
    Admin over 10 years
    I've just made a similar version. This is way better! Thanks.
  • Fabio Suarez
    Fabio Suarez over 10 years
    This example is wrong. You should fill the throttle channel with maxConcurrency items first and swap the receive and the send operation in order to synchronize data properly. Take a look at this thread for more information: groups.google.com/d/msg/golang-nuts/MDvnk1Ax7UQ/eQGkJJmOxc4J
  • zzzz
    zzzz over 10 years
    @tux21b: You're welcome to formally prove it being wrong ;-) Meanwhile,let me please attempt to prove the opposite: Before starting a new goroutine a "token" must be inserted (throttle <- 1) in the throttle channel. A "token" is removed (<-throttle) from the same channel only after f() completes its whatever-processing. As the channel has a fixed capacity of maxConcurrency, there can never be more than maxConcurrency tokens queued. Thus there cannot ever be more than maxConcurrency concurrent instances of f() processing data. BTW: No "synchronize data" happens above.
  • Fabio Suarez
    Fabio Suarez over 10 years
    It depends on what you are doing in the loop. If you just call println which is projected by a mutex anyway, everything will be fine. But if you modify something (lets say a shared variable x with maxConcurrency set to one) you will get a data race (and the go tool will happily report it). When you remove a token (<-throttle) in goroutine A and insert a token in goroutine B (token <- 1), B might not see the changes from A. Better be safe and do it the other way round by swapping the send / receive operations.
  • zzzz
    zzzz over 10 years
    @tux21b: No. In the schema presented, no data races are ever relevant because no data are concurrently shared. The schema is just a mechanism for limiting the number of concurrently executing functions (aka workers if you prefer). The schema does exactly that (and nothing more). Even when claimed "wrong" w/o any proof so far...
  • Fabio Suarez
    Fabio Suarez over 10 years
    It's enough to prove something wrong by giving an example (just imagine tasks like "insert this to the db", "read that from the db" and a limit of one). There are probably more complicated examples that will not work but I'm not an expert either. In order order to prove something correct, you would have to argue with just the rules of the Go memory model that uses acquire / release semantics. Giving a correct sequence and assuming sequential consistency doesn't do it. Your solution is probably fine for some cases (i.e. your println example), but you need to be very cautious.
  • zzzz
    zzzz over 10 years
    @tux21b: I'm sorry, I've formally derived why my example works, but you've so far presented no proof whatsoever why it doesn't. Hand waving by "just imagine" doesn't qualify. Have you really understood my derivation? If so, can you make a valid point about where it fails? My claim is: "The example limits the number of concurrently executing f() to maxConcurrency". Your claim is "This example is wrong". We cannot both be right, correct? ;-)
  • Fabio Suarez
    Fabio Suarez over 10 years
    Your algorithm doesn't ensure that the finishing of a task happens before the start of a later task. Therefore I claim that there might be more than maxConcurrency tasks executing at a given time. Some goroutines might still need to store the computed result in the system memory (by flushing their store buffers, etc.). That's also the reason why you could get a data race with maxConcurrecny = 1. My suggested change would ensure the required happens-before relationship by swapping the send and the receive. It has the same amount of concurrency primitives and isn't more complicated.
  • zzzz
    zzzz over 10 years
    @tux21b: I'm sorry, you're completely missing the previously presented proof. Here we go again: There is a fixed upper limit of tokens in a channel. No task starts before putting a token into the channel. No token is removed from the channel before a task finishes. Therefore there is absolutely no way how there can ever be more tasks in progress than is the capacity of the channel. q.e.d. Additionally, any data races are absolutely tangential and irrelevant to the code I presented as there are no data processed in it. I'm afraid you don't understand how the simple code actually works, sorry.
  • Fabio Suarez
    Fabio Suarez almost 10 years
    No, it doesn't. The code above uses a single SPMC (single producer / multiple consumer) queue to distribute tasks to different workers. Each command can only be received once from the tasks channel.
  • Chris
    Chris over 8 years
    'range tasks' will iterate until the task channel has no more messages?
  • fubupc
    fubupc over 7 years
    This solution has two little issues: (1) wg.Done() should better be wrapped in a defer wg.Done() . (2) even if you fix (1) , you still may NOT fully utilize max concurrency after some goroutines be killed by cmd.Run() panic somehow.
  • AndreaM16
    AndreaM16 over 7 years
    +1 for the answer but, I have a question, what If I would like to store the i-output in a variable and print it?
  • Gerhard Hagerer
    Gerhard Hagerer over 7 years
    Beautiful solution!
  • William King
    William King about 7 years
    For anyone else looking at this, TL;DR zzzz is correct, although for a time tux21b was correct. See the following: golang.org/doc/go1.3#memory codereview.appspot.com/75130045 golang.org/doc/effective_go.html#channels
  • perelin
    perelin almost 7 years
    Exactly what I was looking for! My long running goroutines were sucking up memory until finaly OSX decided to kill the whole process. With this I now just limit goroutines to the number of cores.