Reading a file concurrently

go
19,095

Solution 1

You're almost there, just need a little bit of work on goroutines' synchronisation. Your problem is that you're trying to feed the parser and collect the results in the same routine, but that can't be done.

I propose the following:

  1. Run scanner in a separate routine, close input channel once everything is read.
  2. Run separate routine waiting for the parsers to finish their job, than close the output channel.
  3. Collect all the results in you main routine.

The relevant changes could look like this:

// Go over a file line by line and queue up a ton of work
go func() {
    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        jobs <- scanner.Text()
    }
    close(jobs)
}()

// Collect all the results...
// First, make sure we close the result channel when everything was processed
go func() {
    wg.Wait()
    close(results)
}()

// Now, add up the results from the results channel until closed
counts := 0
for v := range results {
    counts += v
}

Fully working example on the playground: http://play.golang.org/p/coja1_w-fY

Worth adding you don't necessarily need the WaitGroup to achieve the same, all you need to know is when to stop receiving results. This could be achieved for example by scanner advertising (on a channel) how many lines were read and then the collector reading only specified number of results (you would need to send zeros as well though).

Solution 2

Edit: The answer by @tomasz above is the correct one. Please disregard this answer.

You need to do two things:

  1. use buffered chan's so that sending doesn't block
  2. close the results chan so that receiving doesn't block.

The use of buffered channels is essential because unbuffered channels need a receive for each send, which is causing the deadlock you're hitting.

If you fix that, you'll run into a deadlock when you try to receive the results, because results hasn't been closed.

Here's the fixed playground: http://play.golang.org/p/DtS8Matgi5

Share:
19,095

Related videos on Youtube

squarism
Author by

squarism

I am a meat popsicle.

Updated on September 14, 2022

Comments

  • squarism
    squarism over 1 year

    The reading part isn't concurrent but the processing is. I phrased the title this way because I'm most likely to search for this problem again using that phrase. :)

    I'm getting a deadlock after trying to go beyond the examples so this is a learning experience for me. My goals are these:

    1. Read a file line by line (eventually use a buffer to do groups of lines).
    2. Pass off the text to a func() that does some regex work.
    3. Send the results somewhere but avoid mutexes or shared variables. I'm sending ints (always the number 1) to a channel. It's sort of silly but if it's not causing problems I'd like to leave it like this unless you folks have a neater option.
    4. Use a worker pool to do this. I'm not sure how I tell the workers to requeue themselves?

    Here is the playground link. I tried to write helpful comments, hopefully this makes sense. My design could be completely wrong so don't hesitate to refactor.

    package main
    
    import (
      "bufio"
      "fmt"
      "regexp"
      "strings"
      "sync"
    )
    
    func telephoneNumbersInFile(path string) int {
      file := strings.NewReader(path)
    
      var telephone = regexp.MustCompile(`\(\d+\)\s\d+-\d+`)
    
      // do I need buffered channels here?
      jobs := make(chan string)
      results := make(chan int)
    
      // I think we need a wait group, not sure.
      wg := new(sync.WaitGroup)
    
      // start up some workers that will block and wait?
      for w := 1; w <= 3; w++ {
        wg.Add(1)
        go matchTelephoneNumbers(jobs, results, wg, telephone)
      }
    
      // go over a file line by line and queue up a ton of work
      scanner := bufio.NewScanner(file)
      for scanner.Scan() {
        // Later I want to create a buffer of lines, not just line-by-line here ...
        jobs <- scanner.Text()
      }
    
      close(jobs)
      wg.Wait()
    
      // Add up the results from the results channel.
      // The rest of this isn't even working so ignore for now.
      counts := 0
      // for v := range results {
      //   counts += v
      // }
    
      return counts
    }
    
    func matchTelephoneNumbers(jobs <-chan string, results chan<- int, wg *sync.WaitGroup, telephone *regexp.Regexp) {
      // Decreasing internal counter for wait-group as soon as goroutine finishes
      defer wg.Done()
    
      // eventually I want to have a []string channel to work on a chunk of lines not just one line of text
      for j := range jobs {
        if telephone.MatchString(j) {
          results <- 1
        }
      }
    }
    
    func main() {
      // An artificial input source.  Normally this is a file passed on the command line.
      const input = "Foo\n(555) 123-3456\nBar\nBaz"
      numberOfTelephoneNumbers := telephoneNumbersInFile(input)
      fmt.Println(numberOfTelephoneNumbers)
    }
    
  • squarism
    squarism over 9 years
    Argh. I was so close. I think I tried each of these things but not both of them. Obviously, I don't grok this. Thank you! I will study this.
  • jjm
    jjm over 9 years
    Go's not easy and concurrency’s not easy either :-) takes time.
  • tomasz
    tomasz over 9 years
    I'm afraid this is incorrect. If you put more than 10 lines with a telephone number in your example you'll deadlock. You don't need buffered channels, you need to keep collecting results while waiting for the group to be done (and then for example close the result channel).
  • jjm
    jjm over 9 years
    @tomasz that is a better solution
  • squarism
    squarism over 9 years
    @tomasz Yeah I tried a larger file and it deadlocked.
  • squarism
    squarism over 9 years
    Amazing. Yes, this worked on a 100mb text file. Thank you! So, does close(results) just happen to not fire while the main is counting results? I'm going to have to put some prints in to see how this is working at runtime.
  • tomasz
    tomasz over 9 years
    We spawn a separate routine which waits for the WaitGroup to finish and then closes the results channel. This is happening in background (we used go). Note that we're starting to collect the results immediately (while we read the file, not after reading it). Instead of spawning a separate routine for closing the result channel, you could do this after calling close(jobs) as well -- in other words: read all the lines, wait for parsers to finish, close the results channel. There're plenty of possibilities.
  • Rick-777
    Rick-777 over 9 years
    don't use buffering to cure deadlocks - it's far too risky. In fact, my rule of thumb is to start with zero buffering so that deadlocks are most likely. Then I work out how to arrange goroutines and channels not to deadlock. Then I can add buffering back in if I need to as an optimisation step.