How to use a goroutine pool

31,277

Solution 1

The simplest way, I suppose, is to create 250 goroutines and pass them a channel which you can use to pass links from main goroutine to child ones, listening that channel.

When all links are passed to goroutines, you close a channel and all goroutines just finish their jobs.

To secure yourself from main goroutine get finished before children process data, you can use sync.WaitGroup.

Here is some code to illustrate (not a final working version but shows the point) that I told above:

func worker(linkChan chan string, wg *sync.WaitGroup) {
   // Decreasing internal counter for wait-group as soon as goroutine finishes
   defer wg.Done()

   for url := range linkChan {
     // Analyze value and do the job here
   }
}

func main() {
    lCh := make(chan string)
    wg := new(sync.WaitGroup)

    // Adding routines to workgroup and running then
    for i := 0; i < 250; i++ {
        wg.Add(1)
        go worker(lCh, wg)
    }

    // Processing all links by spreading them to `free` goroutines
    for _, link := range yourLinksSlice {
        lCh <- link
    }

    // Closing channel (waiting in goroutines won't continue any more)
    close(lCh)

    // Waiting for all goroutines to finish (otherwise they die as main routine dies)
    wg.Wait()
}

Solution 2

You can use the thread pool implementation library in Go from this git repo

Here is the nice blog about how to use the channels as thread pool

Snippet from the blog

    var (
 MaxWorker = os.Getenv("MAX_WORKERS")
 MaxQueue  = os.Getenv("MAX_QUEUE")
)

//Job represents the job to be run
type Job struct {
    Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
    WorkerPool  chan chan Job
    JobChannel  chan Job
    quit        chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool)}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                // we have received a work request.
                if err := job.Payload.UploadToS3(); err != nil {
                    log.Errorf("Error uploading to S3: %s", err.Error())
                }

            case <-w.quit:
                // we have received a signal to stop
                return
            }
        }
    }()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
} 

Solution 3

This example uses two chanels, one for the inputs and another for output. Workers can scale to whatever size and each goroutine works on the input queue and saves all output to the output channel. Feedback on easier methods are very welcome.

package main

import (
    "fmt"
    "sync"
)

var wg sync.WaitGroup

func worker(input chan string, output chan string) {
    defer wg.Done()
    // Consumer: Process items from the input channel and send results to output channel
    for value := range input {
        output <- value + " processed"
    }
}

func main() {
    var jobs = []string{"one", "two", "three", "four", "two", "three", "four", "two", "three", "four", "two", "three", "four", "two", "three", "four", "two"}
    input := make(chan string, len(jobs))
    output := make(chan string, len(jobs))
    workers := 250

    // Increment waitgroup counter and create go routines
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go worker(input, output)
    }

    // Producer: load up input channel with jobs
    for _, job := range jobs {
        input <- job
    }

    // Close input channel since no more jobs are being sent to input channel
    close(input)
    // Wait for all goroutines to finish processing
    wg.Wait()
    // Close output channel since all workers have finished processing
    close(output)

    // Read from output channel
    for result := range output {
        fmt.Println(result)
    }

}

Solution 4

You can take a look at this

We have created a thread pool in go and have been using it for our production systems.

I had taken reference from here

Its pretty simple to use and also has a prometheus client that tells you how many workers are used.

To initialize just create an instance of dispatcher

dispatcher = workerpool.NewDispatcher(
    "DispatcherName",
    workerpool.SetMaxWorkers(10),
)

Create an object (lets say job) that implements this interface. So it should implement the Process method

// IJob : Interface for the Job to be processed
type IJob interface {
    Process() error
}

Then just send the job to the dispatcher

dispatcher.JobQueue <- job //object of job

This is it.

Share:
31,277
tldr
Author by

tldr

Updated on June 04, 2020

Comments

  • tldr
    tldr almost 4 years

    I want to use Go for downloading stock price spreadsheets from Yahoo finance. I'll be making an http request for every stock in its own goroutine. I have a list of around 2500 symbols, but instead of making 2500 requests in parallel, I'd prefer making 250 at a time. In Java I'd create a thread pool and reuse threads as and when they get free. I was trying to find something similar, a goroutine pool, if you will, but was unable to find any resources. I'd appreciate if someone can tell me how to accomplish the task at hand or point me to resources for the same. Thanks!

  • Druska
    Druska over 10 years
    Here's a small scale test of this code in action: play.golang.org/p/fruJiGBWjn
  • Wael Ben Taleb
    Wael Ben Taleb about 2 years
    This really helps me thank you for sharing. My question is how to figure out the MAX_WORKERS and the MAX_QUEUE values, I know it depend on the machine resources but is there a way to have a formula or something else to help us take the decision ?
  • Shettyh
    Shettyh about 2 years
    MAX_WORKERS doesnt necessarily need to tied to system resources, it can purely based on your use case as the go routines are light weight. MAX_QUEUE set this to a sane value so that it wont result in OOM errors
  • Wael Ben Taleb
    Wael Ben Taleb about 2 years
    Thank you for your fast response! Any recommendation to make the decision about the MAX_WORKERS value ?
  • Shettyh
    Shettyh about 2 years
    I would say keep a small value initially and expose metrics around queue size... Based on how the queue is used you can tune the number of workers to clear the queue backlog faster