Golang HTTP request worker pool

12,633

One thing up front: If you are running an HTTP server (Go's standard server anyway), you cannot control the number of goroutines without stopping and restarting the server. Each request starts at least one goroutine, and there's nothing you can do about that. The good news is that this is usually not a problem, since goroutines are so lightweight. However, it is perfectly reasonable that you want to keep the number of goroutines that are doing hard work under control.

You can put any value into a channel, including functions. So if the goal is to only having to write code in http handlers, let the jobs be closures -- the workers don't know (or care) what they are working on.

package main

import (
    "encoding/json"
    "io/ioutil"
    "net/http"
)

var largePool chan func()
var smallPool chan func()

func main() {
    // Start two different sized worker pools (e.g., for different workloads).
    // Cancelation and graceful shutdown omited for brevity.

    largePool = make(chan func(), 100)
    smallPool = make(chan func(), 10)

    for i := 0; i < 100; i++ {
            go func() {
                    for f := range largePool {
                            f()
                    }
            }()
    }

    for i := 0; i < 10; i++ {
            go func() {
                    for f := range smallPool {
                            f()
                    }
            }()
    }

    http.HandleFunc("/endpoint-1", handler1)
    http.HandleFunc("/endpoint-2", handler2) // naming things is hard, okay?

    http.ListenAndServe(":8080", nil)
}

func handler1(w http.ResponseWriter, r *http.Request) {
    // Imagine a JSON body containing a URL that we are expected to fetch.
    // Light work that doesn't consume many of *our* resources and can be done
    // in bulk, so we put in in the large pool.
    var job struct{ URL string }

    if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
    }

    go func() {
            largePool <- func() {
                    http.Get(job.URL)
                    // Do something with the response
            }
    }()

    w.WriteHeader(http.StatusAccepted)
}

func handler2(w http.ResponseWriter, r *http.Request) {
    // The request body is an image that we want to do some fancy processing
    // on. That's hard work; we don't want to do too many of them at once, so
    // so we put those jobs in the small pool.

    b, err := ioutil.ReadAll(r.Body)
    if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
    }

    go func() {
            smallPool <- func() {
                    processImage(b)
            }
    }()
    w.WriteHeader(http.StatusAccepted)
}

func processImage(b []byte) {}

This is a very simple example to get the point across. It doesn't matter much how you setup your worker pools. You just need a clever job definition. In the example above it is a closure, but you could also define a Job interface, for instance.

type Job interface {
    Do()
}

var largePool chan Job
var smallPool chan Job

Now, I wouldn't call the whole worker pool approach "simple". You said your goal is to limit the number of goroutines (that are doing work). That doesn't require workers at all; it only needs a limiter. Here is the same example as above, but using channels as semaphores to limit concurrency.

package main

import (
    "encoding/json"
    "io/ioutil"
    "net/http"
)

var largePool chan struct{}
var smallPool chan struct{}

func main() {
    largePool = make(chan struct{}, 100)
    smallPool = make(chan struct{}, 10)

    http.HandleFunc("/endpoint-1", handler1)
    http.HandleFunc("/endpoint-2", handler2)

    http.ListenAndServe(":8080", nil)
}

func handler1(w http.ResponseWriter, r *http.Request) {
    var job struct{ URL string }

    if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
            http.Error(w, err.Error(), http.StatusBadRequest)
            return
    }

    go func() {
            // Block until there are fewer than cap(largePool) light-work
            // goroutines running.
            largePool <- struct{}{}
            defer func() { <-largePool }() // Let everyone that we are done

            http.Get(job.URL)
    }()

    w.WriteHeader(http.StatusAccepted)
}

func handler2(w http.ResponseWriter, r *http.Request) {
    b, err := ioutil.ReadAll(r.Body)
    if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
    }

    go func() {
            // Block until there are fewer than cap(smallPool) hard-work
            // goroutines running.
            smallPool <- struct{}{}
            defer func() { <-smallPool }() // Let everyone that we are done

            processImage(b)
    }()

    w.WriteHeader(http.StatusAccepted)
}

func processImage(b []byte) {}
Share:
12,633
Rien
Author by

Rien

Here to help and be helped

Updated on June 04, 2022

Comments

  • Rien
    Rien almost 2 years

    I'm trying to build a system, worker pool / jobqueue, to handle as many http requests as possible on each API endpoint. I looked into this example and got it working just fine except that I stumbled upon the problem that I don't understand how to expand the pool / jobqueue to different endpoints.

    For scenario sake let's sketch an Golang http server that has a million request / min across different endpoints and request types GET & POST ETC.

    How can I expand on this concept? Should I create different worker pools and jobs for each endpoint. Or can I create different jobs and enter them in the same queue and have the same pool handle these?

    I want to maintain simplicity where if I create a new API endpoint I don't have to create new worker pools, so I can focus just on the api. But performance is also very much in mind.

    The code I'm trying to build on is taken from the example linked earlier, here is a github 'gist' of somebody else with this code.