Why is my Golang Channel Write Blocking Forever?
The goroutines block on sending to the unbuffered channel. A minimal change unblocks the goroutines is to create a buffered channel with capacity for all issues:
channel := make(chan Issue, len(allIssues))
and close the channel after the call to wg.Wait().
Comments
-
s_dolan almost 2 years
I've been attempting to take a swing at concurrency in Golang by refactoring one of my command-line utilities over the past few days, but I'm stuck.
Here's the original code (master branch).
Here's the branch with concurrency (x_concurrent branch).
When I execute the concurrent code with
go run jira_open_comment_emailer.go
, thedefer wg.Done()
never executes if the JIRA issue is added to the channel here, which causes mywg.Wait()
to hang forever.The idea is that I have a large amount of JIRA issues, and I want to spin off a goroutine for each one to see if it has a comment I need to respond to. If it does, I want to add it to some structure (I chose a channel after some research) that I can read from like a queue later to build up an email reminder.
Here's the relevant section of the code:
// Given an issue, determine if it has an open comment // Returns true if there is an open comment on the issue, otherwise false func getAndProcessComments(issue Issue, channel chan<- Issue, wg *sync.WaitGroup) { // Decrement the wait counter when the function returns defer wg.Done() needsReply := false // Loop over the comments in the issue for _, comment := range issue.Fields.Comment.Comments { commentMatched, err := regexp.MatchString("~"+config.JIRAUsername, comment.Body) checkError("Failed to regex match against comment body", err) if commentMatched { needsReply = true } if comment.Author.Name == config.JIRAUsername { needsReply = false } } // Only add the issue to the channel if it needs a reply if needsReply == true { // This never allows the defered wg.Done() to execute? channel <- issue } } func main() { start := time.Now() // This retrieves all issues in a search from JIRA allIssues := getFullIssueList() // Initialize a wait group var wg sync.WaitGroup // Set the number of waits to the number of issues to process wg.Add(len(allIssues)) // Create a channel to store issues that need a reply channel := make(chan Issue) for _, issue := range allIssues { go getAndProcessComments(issue, channel, &wg) } // Block until all of my goroutines have processed their issues. wg.Wait() // Only send an email if the channel has one or more issues if len(channel) > 0 { sendEmail(channel) } fmt.Printf("Script ran in %s", time.Since(start)) }
-
JimB almost 8 yearsYou have
len(channel)
all over the place, but that channel has no length because it's un-buffered. You need to receive from the channel for any sends to complete (and in general, making decisions based on the length of a buffered channel is a mistake, since concurrent operations can race to change that value) -
s_dolan almost 8 yearsSo, if I'm doing all of my writes to the channel, waiting for them to complete, and then reading from the channel... that can never happen because the sends will never actually complete and trigger the
defer wg.Done()
? How would you tackle implementing this concurrency, in general? Also, I'm not sure that you're correct on thelen(channel)
, since the godocs state that it returns the current number of elements in the channel, not the capacity likecap(channel)
would. golang.org/pkg/builtin/#len -
JimB almost 8 years
len(channel)
returns the current number of items in a "buffered" channel, but since channels are usually used concurrently, the result oflen
is "stale" as soon as you read it. One would generally have concurrent goroutines sending and receiving from the channel. I would advise going through the Concurrency section in the Tour Of Go again to get a better grasp of how channels work. -
hobbs almost 8 years@s_dolan yes, the first channel send will block until someone reads it, which never happens. The simplest thing you can do is to start a goroutine that reads from the other end of the channel before writing to it. As for len and cap, consider that
len(c)
is always <=cap(c)
.
-
-
RickyA almost 8 yearsBut it kind of defeats the purpose of a channel as a pipe between concurrent blocks....
-
Cerise Limón almost 8 years@RickyA There's nothing wrong with using a channel is a queue of items.
-
RickyA almost 8 yearstrue, it saves you the overhead of passing a mutexed slice around.
-
Cerise Limón almost 8 yearsYup, a slice with mutex requires more code and is unlikely to perform significantly better in this scenario.
-
JimB almost 8 yearsyes, buffered channels make great fifo buffers/queues, but in this case there's no reason to wait for the buffer to fill when each issue is already "queued" up in its own goroutine. The goal was to improve concurrency here, not add another queue to the pipeline.
-
s_dolan almost 8 yearsExcellent, thank you. The only additional change I had to make was to
close(channel)
after I was done with all of my writes.