Go routine:Making concurrent API requests

11,610

Solution 1

I might do something like this..

func sendUser(user string, ch chan<- string, wg *sync.WaitGroup) {
    defer wg.Done()
    resp, err := http.Get("URL/" + user)
    if err != nil {
        log.Println("err handle it")
    }
    defer resp.Body.Close()
    b, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        log.Println("err handle it")
    }
    ch <- string(b)
}

func AsyncHTTP(users []string) ([]string, error) {
    ch := make(chan string)
    var responses []string
    var user string
    var wg sync.WaitGroup
    for _, user = range users {
        wg.Add(1)
        go sendUser(user, ch, &wg)
    }

    // close the channel in the background
    go func() {
        wg.Wait()
        close(ch)
    }()
    // read from channel as they come in until its closed
    for res := range ch {
        responses = append(responses, res)
    }

    return responses, nil
}

It allows to read from the channel as they are sent. By using a waitgroup I'll know when to close the channel. By putting the waitgroup and close in a goroutine I can read from the channel in "realtime" without blocking.

Solution 2

For bounded parallelism / rate limiting, we can take a look an example at https://blog.golang.org/pipelines#TOC_9.

Basically the steps are:

  1. Stream inputs / params / args used to call the API, to an input channel.
  2. Run N worker goroutines, each consuming the same (shared) input channel. Get the args from input channel, call the API, send the result into a result channel.
  3. Consume the result channel, return early if there's error.

sync.WaitGroup is used to wait for all worker goroutines to complete (after the input channel is exhausted).

Below is code example of it (you can run it right away, try changing NUM_PARALLEL to different number of parallelism). Change BASE_URL to your base url.

package main

import (
    "fmt"
    "io"
    "net/http"
    "strconv"
    "sync"
    "time"
)

// placeholder url. Change it to your base url.
const BASE_URL = "https://jsonplaceholder.typicode.com/posts/"

// number of parallelism
const NUM_PARALLEL = 20

// Stream inputs to input channel
func streamInputs(done <-chan struct{}, inputs []string) <-chan string {
    inputCh := make(chan string)
    go func() {
        defer close(inputCh)
        for _, input := range inputs {
            select {
            case inputCh <- input:
            case <-done:
                // in case done is closed prematurely (because error midway),
                // finish the loop (closing input channel)
                break
            }
        }
    }()
    return inputCh
}

// Normal function for HTTP call, no knowledge of goroutine/channels
func sendUser(user string) (string, error) {
    url := BASE_URL + user
    resp, err := http.Get(url)
    if err != nil {
        return "", err
    }
    defer resp.Body.Close()

    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return "", err
    }

    bodyStr := string(body)
    return bodyStr, nil
}

// Wrapper for sendUser return value, used as result channel type
type result struct {
    bodyStr string
    err     error
}

func AsyncHTTP(users []string) ([]string, error) {
    done := make(chan struct{})
    defer close(done)

    inputCh := streamInputs(done, users)

    var wg sync.WaitGroup
    // bulk add goroutine counter at the start
    wg.Add(NUM_PARALLEL)

    resultCh := make(chan result)

    for i := 0; i < NUM_PARALLEL; i++ {
        // spawn N worker goroutines, each is consuming a shared input channel.
        go func() {
            for input := range inputCh {
                bodyStr, err := sendUser(input)
                resultCh <- result{bodyStr, err}
            }
            wg.Done()
        }()
    }

    // Wait all worker goroutines to finish. Happens if there's no error (no early return)
    go func() {
        wg.Wait()
        close(resultCh)
    }()

    results := []string{}
    for result := range resultCh {
        if result.err != nil {
            // return early. done channel is closed, thus input channel is also closed.
            // all worker goroutines stop working (because input channel is closed)
            return nil, result.err
        }
        results = append(results, result.bodyStr)
    }

    return results, nil
}

func main() {
    // populate users param
    users := []string{}
    for i := 1; i <= 100; i++ {
        users = append(users, strconv.Itoa(i))
    }

    start := time.Now()

    results, err := AsyncHTTP(users)
    if err != nil {
        fmt.Println(err)
        return
    }

    for _, result := range results {
        fmt.Println(result)
    }

    fmt.Println("finished in ", time.Since(start))
}

Share:
11,610

Related videos on Youtube

immrsteel
Author by

immrsteel

Updated on May 25, 2022

Comments

  • immrsteel
    immrsteel almost 2 years

    I am trying to understand channels and goroutines and tried to write a goroutine for making concurrent API requests to the server

    But when I am running the code using a goroutine, it seems like it is taking the same time as it does without a goroutine.

    func sendUser(user string, ch chan<- string)  {
        resp,err := http.get("URL"/user)
        //do the processing and get resp=string
        ch <- resp
    }
    
    
    func AsyncHTTP(users []string) ([]string, error) {
        ch := make(chan string)
        var responses []string
        var user string
    
        for _ , user = range users {
            go sendUser(user, ch)
    
            for {
                select {
                case r := <-ch:
                    if r.err != nil {
                        fmt.Println(r.err)
                    }
                    responses = append(responses, r)
                    **//Is there a better way to show that the processing of response is complete**?
                    if len(responses) == len(users) { 
                        return responses, nil
                    }
                case <-time.After(50 * time.Millisecond):
                    fmt.Printf(".")
                }
            }
        }
        return responses, nil
    }
    

    Questions:

    1. Even though I am using a goroutine, request completion time is same as it is without goroutines? Is there anything I am doing wrong with goroutines?

    2. For telling the job not to wait anymore here I am using:

      if len(responses) == len(users)
      

      Is there a better way to show that the processing of response is complete and tell ch not to wait anymore?

    3. What is wait.Syncgroup? How can I use it in my goroutine?

    • RayfenWindspear
      RayfenWindspear over 6 years
      You are using an unbuffered channel. Make your chan with make(chan string, 10) or however many you want to buffer.
    • RayfenWindspear
      RayfenWindspear over 6 years
      Actually, that isn't the problem. In your loop you are calling go sendUser, but then immediately waiting for it in the loop after it. for loop all the go sendUser calls first, then loop again for the rest of your logic there.
    • immrsteel
      immrsteel over 6 years
      @RayfenWindspear But wouldn't that cause deadlocks because channel is is only of 1 buffer and if I am putting the "select" statement outside the loop then How one buffer will going to hold all the responses from the requests?
    • RayfenWindspear
      RayfenWindspear over 6 years
      That's the beauty of it, the channel doesn't need to hold any. An unbuffered channel essentially holds nothing, it passes it directly to the routine waiting to receive it. You don't care if all 100 or 1000 goroutines block on ch <- resp. They have already done the io portion (http.get) that takes the most time. Now you can use the responses one at a time. Or you could use more goroutines to use the responses faster if you want.
    • immrsteel
      immrsteel over 6 years
      @RayfenWindspear"or you could use more goroutines to use the responses faster if you want". Can you explain it little more?How can I achieve it ? Is there a better way rather than doing if len(responses) == len(users)
    • RayfenWindspear
      RayfenWindspear over 6 years
      and there you go, a perfect example by reticentroot
  • RayfenWindspear
    RayfenWindspear over 6 years
    Nice. Just one more comment about running concurrent io. Be aware of how much memory is involved with a single request. It's easy to get carried away with how easy it is to run thousands of goroutines, but if each request object ends up taking 1MB of memory, you are in the GB range with thousands. You can rate limit by using buffered channels.
  • reticentroot
    reticentroot over 6 years
    @rayfenwindspear I total agree I think in production I'd be running this with a semaphore to limit requests and even memory checking.
  • immrsteel
    immrsteel over 6 years
    @reticentroot @ RayfenWindspear Thanks guys.How does buffered channels helps in rate limiting? If i understand it correctly then, this loop for _, user = range users { wg.Add(1) go sendUser(user, ch, &wg) } is spinning number of goroutines=range of users.How does buffering would help in this case?How can I Iimit that only sane number of workers spun up and return it to channel and then take over rest of the jobs?
  • reticentroot
    reticentroot over 6 years
    A buffer channel acts like a bucket with a queue, where it would block if that bucket become full. So for example if the limit were ten only ten goroutines would spin up, one item is consumed from the bucket another channel is sent. First in first out. Let's say that you have 100 urls. At most only 10 could be processed at the same time VS spinning up 100 goroutines