Priority in Go select statement workaround

21,184

Solution 1

package main

import "fmt"

func sender(out chan int, exit chan bool) {
    for i := 1; i <= 10; i++ {
        out <- i
    }
    exit <- true
}

func main() {
    out := make(chan int, 10)
    exit := make(chan bool)

    go sender(out, exit)

    for {
        select {
        case i := <-out:
            fmt.Printf("Value: %d\n", i)
            continue
        default:
        }
        select {
        case i := <-out:
            fmt.Printf("Value: %d\n", i)
            continue
        case <-exit:
            fmt.Println("Exiting")
        }
        break
    }
    fmt.Println("Did we get all 10? I think so!")
}

The default case of the first select makes it non-blocking. The select will drain the out channel without looking at the exit channel, but otherwise will not wait. If the out channel is empty, it immediately drops to the second select. The second select is blocking. It will wait for data on either channel. If an exit comes, it handles it and allows the loop to exit. If data comes, it goes back up the top of the loop and back into drain mode.

Solution 2

The language supports this natively and no workaround is required. It's very simple: the quit channel should only be visible to the producer. On quit, the producer closes the channel. Only when the channel is empty and closed does the consumer quit. This is made possible by ranging over the channel.

Here is an example to illustrate:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

var (
    produced  = 0
    processed = 0
)

func produceEndlessly(out chan int, quit chan bool) {
    defer close(out)
    for {
        select {
        case <-quit:
            fmt.Println("RECV QUIT")
            return
        default:
            out <- rand.Int()
            time.Sleep(time.Duration(rand.Int63n(5e6)))
            produced++
        }
    }
}

func quitRandomly(quit chan bool) {
    d := time.Duration(rand.Int63n(5e9))
    fmt.Println("SLEEP", d)
    time.Sleep(d)
    fmt.Println("SEND QUIT")
    quit <- true
}

func main() {
    vals, quit := make(chan int, 10), make(chan bool)
    go produceEndlessly(vals, quit)
    go quitRandomly(quit)
    for x := range vals {
        fmt.Println(x)
        processed++
        time.Sleep(time.Duration(rand.Int63n(5e8)))
    }
    fmt.Println("Produced:", produced)
    fmt.Println("Processed:", processed)
}

Solution 3

Another approach:

package main

import "fmt"

func sender(c chan int) chan int {
        go func() {
                for i := 1; i <= 15; i++ {
                        c <- i
                }
                close(c)
        }()
        return c
}

func main() {
        for i := range sender(make(chan int, 10)) {
                fmt.Printf("Value: %d\n", i)
        }
        fmt.Println("Did we get all 15? Surely yes")
}

$ go run main.go
Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 11
Value: 12
Value: 13
Value: 14
Value: 15
Did we get all 15? Surely yes
$ 

Solution 4

Here's a general idiom that solves the select's priority problem.

Yes, it's not nice to say a least, but does what is needed for 100%, no pitfalls and no hidden limitations.

Here's a short code example, and explanation follows.

package main

import(
    "fmt"
    "time"
)

func sender(out chan int, exit chan bool) {
    for i := 1; i <= 10; i++ {
        out <- i
    }

    time.Sleep(2000 * time.Millisecond)
    out <- 11
    exit <- true
}

func main(){
    out := make(chan int, 20)
    exit := make(chan bool)

    go sender(out, exit)

    time.Sleep(500 * time.Millisecond)

    L:
    for {
        select {
        case i := <-out:
            fmt.Printf("Value: %d\n", i)
        default:
            select {
            case i := <-out:
                fmt.Printf("Value: %d\n", i)
            case <-exit:
                select {
                case i := <-out:
                    fmt.Printf("Value: %d\n", i)
                default:
                    fmt.Println("Exiting")
                    break L
                }
            }
        }
    }
    fmt.Println("Did we get all 10? Yes.")
    fmt.Println("Did we get 11? DEFINITELY YES")
}

And, here's how it works, the main() from above, annotated:

func main(){
    out := make(chan int, 20)
    exit := make(chan bool)
    go sender(out, exit)
    time.Sleep(500 * time.Millisecond)
    L:
    for {
        select {

            // here we go when entering next loop iteration
            // and check if the out has something to be read from

            // this select is used to handle buffered data in a loop

        case i := <-out:
            fmt.Printf("Value: %d\n", i)
        default:
            // else we fallback in here

            select {

                // this select is used to block when there's no data in either chan

            case i := <-out:
            // if out has something to read, we unblock, and then go the loop round again

                fmt.Printf("Value: %d\n", i)
            case <-exit:
                select {

                    // this select is used to explicitly propritize one chan over the another,
                    // in case we woke up (unblocked up) on the low-priority case

                    // NOTE:
                    // this will prioritize high-pri one even if it came _second_, in quick
                    // succession to the first one

                case i := <-out:
                    fmt.Printf("Value: %d\n", i)
                default:
                    fmt.Println("Exiting")
                    break L
                }
            }
        }
    }

    fmt.Println("Did we get all 10? Yes.")
    fmt.Println("Did we get 11? DEFINITELY YES")
}

NOTE: Before playing tricks with prioritizations, MAKE SURE YOU ARE SOLVING THE RIGHT PROBLEM.

Chances are, it can be solved differently.

Still, to have prioritized select in Go would have been great thing. Just a dream..

NOTE: This is quite a similar answer https://stackoverflow.com/a/45854345/11729048 on this thread, but there is only two select-s are nested, not three ones as I did. What's the difference? My approach is more efficient, and there we explicitly expect to handle random choices on each loop iteration.

However, if the high-priority channel isn't buffered, and/or you don't expect bulk data on it, only the sporadic single events, then the simpler two-stage idiom (as in that answer) will suffice:

L:
for {
    select {
    case i := <-out:
        fmt.Printf("Value: %d\n", i)
    case <-exit:
        select {
        case i := <-out:
            fmt.Printf("Value: %d\n", i)
        default:
            fmt.Println("Exiting")
            break L
        }
    }
}

It is basically 2 and 3 stages, the 1 being removed.

And once again: in like 90% cases you think you do need to prioritize chan switch cases, you really don't.

And here's a one-liner, that can be wrapped in a macro:

for {
    select { case a1 := <-ch_p1: p1_action(a1); default: select { case a1 := <-ch_p1: p1_action(a1); case a2 := <-ch_p2: select { case a1 := <-ch_p1: p1_action(a1); default: p2_action(a2); }}}
}

And what if you want to prioritize more than two cases?

Then you have two options. First one - build a tree, using intermediate goroutines, so that each fork is exactly binary (the above idiom).

The second option is to make the priority-fork more then double.

Here's an example of three priorities:

for {
    select {
    case a1 := <-ch_p1:
        p1_action(a1)
    default:
        select {
        case a2 := <-ch_p2:
            p2_action(a2)
        default:
            select {    // block here, on this select
            case a1 := <-ch_p1:
                p1_action(a1)
            case a2 := <-ch_p2:
                select {
                case a1 := <-ch_p1:
                    p1_action(a1)
                default:
                    p2_action(a2)
                }
            case a3 := <-ch_p3:
                select {
                case a1 := <-ch_p1:
                    p1_action(a1)
                case a2 := <-ch_p2:
                    p1_action(a2)
                default:
                    p2_action(a3)
                }
            }
        }
    }
}

That is, the whole structure is conceptually split into three parts, as the original (binary) one.

One again: chances are, you can design your system so that you can avoid this mess.

P.S., the rhetoric question: why Golang doesn't have it built in into the language??? The question is rhetoric one.

Solution 5

Here's another option.

Consumer Code:

  go func() {
    stop := false
    for {
      select {
      case item, _ := <-r.queue:
        doWork(item)
      case <-r.stopping:
        stop = true
      }
      if stop && len(r.queue) == 0 {
        break
      }
    }
  }()
Share:
21,184

Related videos on Youtube

ANisus
Author by

ANisus

Starting with C64 basic in my childhood, going through Atari's STOS, Amiga's AMOS, Motorola 680x0 assembly, C and C++, I now work as a System Developer using a wide range of languages and technologies. However, my passion lies in creating excellent software using Go.

Updated on July 09, 2022

Comments

  • ANisus
    ANisus almost 2 years

    I wish to have a go routine listening on two channels, blocked when both channels are drained. However, if both channels contains data, I want one to be drained before the other is handled.

    In the working example below I wish all out to be drained before exit is handled. I use a select-statement which doesn't have any priority order. How might I get around the problem, making all 10 out-values be handled before the exit?

    package main
    
    import "fmt"
    
    func sender(out chan int, exit chan bool){
        for i := 1; i <= 10; i++ {
            out <- i
        } 
        exit <- true
    }
    
    func main(){
        out := make(chan int, 10)
        exit := make(chan bool)
    
        go sender(out, exit)
    
        L:
        for {
            select {
                case i := <-out:
                    fmt.Printf("Value: %d\n", i)
                case <-exit:
                    fmt.Println("Exiting")
                    break L
            }
        }
        fmt.Println("Did we get all 10? Most likely not")
    }
    
    • zach
      zach almost 6 years
      For the example you gave, you just need the out channel and close it after sending is complete.
  • ANisus
    ANisus almost 12 years
    Thanks for the suggestion! If I understand you correctly, you suggest using only one channel, calling an exit by closing the channel, thus breaking the for range-statement. True, maybe that is a better way to do it, but in my case I am working with two channels.
  • Sonia
    Sonia almost 12 years
    This works and is nice and compact, but uses some tricks you should try to avoid in general. Flags get confusing as programs get bigger. They are kind of like gotos. More seriously, len(chan) can often introduce races. It looks okay in this situation, but in many cases it's invalid to make a decision based on len(chan) because it can change before you take action. Imagine the case where you get len==0, then a value arrives, then an exit arrives, and select picks the exit. You might shrug and say they arrived at about the same time, but in some time critical programs, it could matter.
  • Sonia
    Sonia almost 12 years
    Umm, maybe it still works in the case I described. Sorry if it's a bad example. But anyway, I try to avoid using len in synchronization code.
  • ANisus
    ANisus almost 12 years
    The idea is very similar to my own. But true, with the continue-statement, you get rid of the need of a flag. Smart. Well, this is probably as good an answer as I can assume to get. Thanks!
  • ANisus
    ANisus almost 12 years
    Hi again Sonia :) . Good input. Yes, in my case it doesn't matter much. I just wanted to flush what was going out before exiting. However, I actually redid the code using for range and close(out) instead (as suggested by jmnl). Then only the out-events placed in the channel pipe preceding the close would be "flushed". I will avoid decision making based on len(chan) if Nasdaq ever asks me to do some Go program for them ;)
  • jorelli
    jorelli almost 12 years
    this will loop infinitely in the first select statement if the out channel is closed.
  • Sonia
    Sonia almost 12 years
    jorelli, quite true. If you wanted to allow for hostile or buggy goroutines closing the channel unexpectedly, you would check the ok status on the receive.
  • bug
    bug over 11 years
    This is actually not an entirely correct solution, since it is possible for both queues to receive data in a single context switch. The behavior of select when multiple queues are ready is indeterminate (pseudo-random).
  • BrandonAGr
    BrandonAGr over 11 years
    Thanks this is exactly the solution I was looking for, and it doesn't have the potential race condition bug that is in Sonia's answer
  • chmike
    chmike over 7 years
    This doesn't seam correct. When blocking on the second select, if data arrives on out and exit channels, there is no guarantee that data in out will be processed before exit. I actually believe that there is no solution with channels.
  • zach
    zach almost 6 years
    just range over the vals channel in the main routine will work
  • Amin
    Amin over 3 years
    This doesn't work! I tested it and it is totally wrong!
  • agronskiy
    agronskiy over 3 years
    Wirth noting that whild entirely correct under the question’s premises, this won’t work for “N-producers-1-consumer” case, because closing out channel without synchronization between producers can trigger panic. Chicken-egg-problem, because such synchronization requires priority select between quit and out :)
  • Mohsenasm
    Mohsenasm over 2 years
    It only increases the success probability :)