Why is my Golang Channel Write Blocking Forever?

20,424

The goroutines block on sending to the unbuffered channel. A minimal change unblocks the goroutines is to create a buffered channel with capacity for all issues:

channel := make(chan Issue, len(allIssues))

and close the channel after the call to wg.Wait().

Share:
20,424
s_dolan
Author by

s_dolan

Chief of Staff at Tuple. OmniFocus and GTD enthusiast.

Updated on July 09, 2022

Comments

  • s_dolan
    s_dolan almost 2 years

    I've been attempting to take a swing at concurrency in Golang by refactoring one of my command-line utilities over the past few days, but I'm stuck.

    Here's the original code (master branch).

    Here's the branch with concurrency (x_concurrent branch).

    When I execute the concurrent code with go run jira_open_comment_emailer.go, the defer wg.Done() never executes if the JIRA issue is added to the channel here, which causes my wg.Wait() to hang forever.

    The idea is that I have a large amount of JIRA issues, and I want to spin off a goroutine for each one to see if it has a comment I need to respond to. If it does, I want to add it to some structure (I chose a channel after some research) that I can read from like a queue later to build up an email reminder.

    Here's the relevant section of the code:

    // Given an issue, determine if it has an open comment
    // Returns true if there is an open comment on the issue, otherwise false
    func getAndProcessComments(issue Issue, channel chan<- Issue, wg *sync.WaitGroup) {
        // Decrement the wait counter when the function returns
        defer wg.Done()
    
        needsReply := false
    
        // Loop over the comments in the issue
        for _, comment := range issue.Fields.Comment.Comments {
            commentMatched, err := regexp.MatchString("~"+config.JIRAUsername, comment.Body)
            checkError("Failed to regex match against comment body", err)
    
            if commentMatched {
                needsReply = true
            }
    
            if comment.Author.Name == config.JIRAUsername {
                needsReply = false
            }
        }
    
        // Only add the issue to the channel if it needs a reply
        if needsReply == true {
            // This never allows the defered wg.Done() to execute?
            channel <- issue
        }
    }
    
    func main() {
        start := time.Now()
    
        // This retrieves all issues in a search from JIRA
        allIssues := getFullIssueList()
    
        // Initialize a wait group
        var wg sync.WaitGroup
    
        // Set the number of waits to the number of issues to process
        wg.Add(len(allIssues))
    
        // Create a channel to store issues that need a reply
        channel := make(chan Issue)
    
        for _, issue := range allIssues {
            go getAndProcessComments(issue, channel, &wg)
        }
    
        // Block until all of my goroutines have processed their issues.
        wg.Wait()
    
        // Only send an email if the channel has one or more issues
        if len(channel) > 0 {
            sendEmail(channel)
        }
    
        fmt.Printf("Script ran in %s", time.Since(start))
    }
    
    • JimB
      JimB almost 8 years
      You have len(channel) all over the place, but that channel has no length because it's un-buffered. You need to receive from the channel for any sends to complete (and in general, making decisions based on the length of a buffered channel is a mistake, since concurrent operations can race to change that value)
    • s_dolan
      s_dolan almost 8 years
      So, if I'm doing all of my writes to the channel, waiting for them to complete, and then reading from the channel... that can never happen because the sends will never actually complete and trigger the defer wg.Done()? How would you tackle implementing this concurrency, in general? Also, I'm not sure that you're correct on the len(channel), since the godocs state that it returns the current number of elements in the channel, not the capacity like cap(channel) would. golang.org/pkg/builtin/#len
    • JimB
      JimB almost 8 years
      len(channel) returns the current number of items in a "buffered" channel, but since channels are usually used concurrently, the result of len is "stale" as soon as you read it. One would generally have concurrent goroutines sending and receiving from the channel. I would advise going through the Concurrency section in the Tour Of Go again to get a better grasp of how channels work.
    • hobbs
      hobbs almost 8 years
      @s_dolan yes, the first channel send will block until someone reads it, which never happens. The simplest thing you can do is to start a goroutine that reads from the other end of the channel before writing to it. As for len and cap, consider that len(c) is always <= cap(c).
  • RickyA
    RickyA almost 8 years
    But it kind of defeats the purpose of a channel as a pipe between concurrent blocks....
  • Cerise Limón
    Cerise Limón almost 8 years
    @RickyA There's nothing wrong with using a channel is a queue of items.
  • RickyA
    RickyA almost 8 years
    true, it saves you the overhead of passing a mutexed slice around.
  • Cerise Limón
    Cerise Limón almost 8 years
    Yup, a slice with mutex requires more code and is unlikely to perform significantly better in this scenario.
  • JimB
    JimB almost 8 years
    yes, buffered channels make great fifo buffers/queues, but in this case there's no reason to wait for the buffer to fill when each issue is already "queued" up in its own goroutine. The goal was to improve concurrency here, not add another queue to the pipeline.
  • s_dolan
    s_dolan almost 8 years
    Excellent, thank you. The only additional change I had to make was to close(channel) after I was done with all of my writes.