From 459d16196a996c08291b1324a498c7a63fdda40a Mon Sep 17 00:00:00 2001 From: Mark McGranaghan Date: Sun, 21 Oct 2012 19:14:04 -0400 Subject: [PATCH] publish mutexes and stateful-goroutines --- examples.txt | 4 +- examples/atomic-counters/atomic-counters.go | 7 +- examples/atomic-counters/atomic-counters.sh | 3 + examples/mutexes/mutexes.go | 88 ++++++++++++++ examples/mutexes/mutexes.sh | 9 ++ examples/state-goroutine/state-goroutine.go | 91 --------------- examples/state-mutex/state-mutex.go | 58 --------- .../stateful-goroutines.go | 110 ++++++++++++++++++ .../stateful-goroutines.sh | 15 +++ 9 files changed, 232 insertions(+), 153 deletions(-) create mode 100644 examples/mutexes/mutexes.go create mode 100644 examples/mutexes/mutexes.sh delete mode 100644 examples/state-goroutine/state-goroutine.go delete mode 100644 examples/state-mutex/state-mutex.go create mode 100644 examples/stateful-goroutines/stateful-goroutines.go create mode 100644 examples/stateful-goroutines/stateful-goroutines.sh diff --git a/examples.txt b/examples.txt index 37d48c2..f5eba8f 100644 --- a/examples.txt +++ b/examples.txt @@ -39,8 +39,8 @@ Tickers Worker Pools Rate Limiting Atomic Counters -# Mutexs -# Stateful Goroutines +Mutexes +Stateful Goroutines Sorting Sorting by Functions # Collection Functions diff --git a/examples/atomic-counters/atomic-counters.go b/examples/atomic-counters/atomic-counters.go index ca0b63a..25fa17e 100644 --- a/examples/atomic-counters/atomic-counters.go +++ b/examples/atomic-counters/atomic-counters.go @@ -2,8 +2,8 @@ // communication over channels. We saw this for example // with [worker pools](worker-pool). There are a few other // options for managing state though. Here we'll -// look at using the `sync/atomic` package for simple -// counters accessed by multiple goroutines. +// look at using the `sync/atomic` package for _atomic +// counters_ accessed by multiple goroutines. package main @@ -46,3 +46,6 @@ func main() { opsFinal := atomic.LoadUint64(&ops) fmt.Println("ops:", opsFinal) } + +// Next we'll look at another approach to managing state: +// mutexes. diff --git a/examples/atomic-counters/atomic-counters.sh b/examples/atomic-counters/atomic-counters.sh index 55f937a..92a26a5 100644 --- a/examples/atomic-counters/atomic-counters.sh +++ b/examples/atomic-counters/atomic-counters.sh @@ -2,3 +2,6 @@ # 40,000 operations. $ go run atomic-counters.go ops: 40200 + +# Next we'll look at mutexes, another tool for managing +# state. diff --git a/examples/mutexes/mutexes.go b/examples/mutexes/mutexes.go new file mode 100644 index 0000000..6c87ed1 --- /dev/null +++ b/examples/mutexes/mutexes.go @@ -0,0 +1,88 @@ +// In the previous example we saw how to manage simple +// counter state using atomic operations. For more complex +// state we can use a _[mutex](http://en.wikipedia.org/wiki/Mutual_exclusion)_ +// to safely access data across multiple goroutines. + +package main + +import "fmt" +import "time" +import "math/rand" +import "sync" +import "sync/atomic" +import "runtime" + +func main() { + + // For our example the `state` will be a map. + var state = make(map[int]int) + + // This `mutex` will synchronize access to `state`. + var mutex = &sync.Mutex{} + + // To compare the mutex-based approach with another + // we'll see later, `ops` will count how many + // operations we perform against the state. + var ops int64 = 0 + + // Here we start 100 goroutines to execute repeated + // reads against the state. + for r := 0; r < 100; r++ { + go func() { + total := 0 + for { + + // For each read we pick a key to access, + // `Lock()` the `mutex` to ensure + // exclusive access to the `state`, read + // the value at the chosen key, + // `Unlock()` the mutex, and increment + // the `ops` count. + key := rand.Intn(5) + mutex.Lock() + total += state[key] + mutex.Unlock() + atomic.AddInt64(&ops, 1) + + // In order to ensure that this goroutine + // doesn't starve the scheduler, we explicitly + // yield after each operation with + // `runtime.Gosched()`. This yielding is + // handled automatically with e.g. every + // channel operation and for blocking + // calls like `time.Sleep`, but in this + // case we need to do it manually. + runtime.Gosched() + } + }() + } + + // We'll also start 10 goroutines to simulate writes, + // using the same pattern we did for reads. + for w := 0; w < 10; w++ { + go func() { + for { + key := rand.Intn(5) + val := rand.Intn(100) + mutex.Lock() + state[key] = val + mutex.Unlock() + atomic.AddInt64(&ops, 1) + runtime.Gosched() + } + }() + } + + // Let the 110 goroutines work on the `state` and + // `mutex` for a second. + time.Sleep(time.Second) + + // Take and report a final operations count. + opsFinal := atomic.LoadInt64(&ops) + fmt.Println("ops:", opsFinal) + + // With a final lock of `state`, show how it ended up. + mutex.Lock() + fmt.Println("state:", state) + mutex.Unlock() +} diff --git a/examples/mutexes/mutexes.sh b/examples/mutexes/mutexes.sh new file mode 100644 index 0000000..8895533 --- /dev/null +++ b/examples/mutexes/mutexes.sh @@ -0,0 +1,9 @@ +# Running the program shows that we executed about +# 3,500,000 operations against our `mutex`-synchronized +# `state`. +$ go run mutexes.go +ops: 3598302 +state: map[1:38 4:98 2:23 3:85 0:44] + +# Next we'll look at implementing this same state +# management task using only goroutines and channels. diff --git a/examples/state-goroutine/state-goroutine.go b/examples/state-goroutine/state-goroutine.go deleted file mode 100644 index 3dfc7d8..0000000 --- a/examples/state-goroutine/state-goroutine.go +++ /dev/null @@ -1,91 +0,0 @@ -package main - -import "fmt" -import "time" -import "math/rand" -import "sync/atomic" - -// 1 statefull go routine -// writers issue writes -// readers issue reads -// e.g. routing table - -type readOp struct { - key int - resp chan int -} - -type writeOp struct { - key int - val int - resp chan bool -} - -func randKey() int { - return rand.Intn(10) -} - -func randVal() int { - return rand.Intn(100) -} - -func manageState(rs chan *readOp, ws chan *writeOp) { - data := make(map[int]int) - for { - select { - case read := <-rs: - read.resp <- data[read.key] - case write := <-ws: - data[write.key] = write.val - write.resp <- true - } - } -} - -// Keep track of how many ops we do. -var opCount int64 = 0 - -// Generate random reads. -func generateReads(reads chan *readOp) { - for { - key := randKey() - read := &readOp{key: key, resp: make(chan int)} - reads <- read - <-read.resp - atomic.AddInt64(&opCount, 1) - } -} - -// Generate random writes. -func generateWrites(writes chan *writeOp) { - for { - key := randKey() - val := randVal() - write := &writeOp{ - key: key, - val: val, - resp: make(chan bool)} - writes <- write - <-write.resp - atomic.AddInt64(&opCount, 1) - } -} - -func main() { - reads := make(chan *readOp) - writes := make(chan *writeOp) - - go manageState(reads, writes) - - for r := 0; r < 100; r++ { - go generateReads(reads) - } - for w := 0; w < 10; w++ { - go generateWrites(writes) - } - - atomic.StoreInt64(&opCount, 0) - time.Sleep(time.Second) - finalOpCount := atomic.LoadInt64(&opCount) - fmt.Println(finalOpCount) -} diff --git a/examples/state-mutex/state-mutex.go b/examples/state-mutex/state-mutex.go deleted file mode 100644 index c336be1..0000000 --- a/examples/state-mutex/state-mutex.go +++ /dev/null @@ -1,58 +0,0 @@ -// State - -package main - -import "fmt" -import "time" -import "math/rand" -import "sync" -import "sync/atomic" -import "runtime" - -// Globally-accessible state. -var data = make(map[int]int) - -var dataMutex = &sync.Mutex{} - -// Keep track of how many ops we do. -var opCount int64 = 0 - -// Generate random reads. -func generateReads() { - total := 0 - for { - key := rand.Intn(10) - dataMutex.Lock() - total += data[key] - dataMutex.Unlock() - atomic.AddInt64(&opCount, 1) - runtime.Gosched() - } -} - -// Generate random writes. -func generateWrites() { - for { - key := rand.Intn(10) - val := rand.Intn(100) - dataMutex.Lock() - data[key] = val - dataMutex.Unlock() - atomic.AddInt64(&opCount, 1) - runtime.Gosched() - } -} - -func main() { - for r := 0; r < 100; r++ { - go generateReads() - } - for w := 0; w < 10; w++ { - go generateWrites() - } - - atomic.StoreInt64(&opCount, 0) - time.Sleep(time.Second) - finalOpCount := atomic.LoadInt64(&opCount) - fmt.Println(finalOpCount) -} diff --git a/examples/stateful-goroutines/stateful-goroutines.go b/examples/stateful-goroutines/stateful-goroutines.go new file mode 100644 index 0000000..fe1b29b --- /dev/null +++ b/examples/stateful-goroutines/stateful-goroutines.go @@ -0,0 +1,110 @@ +// In the previous example we used explicit locking with +// mutexes to synchronize access to shared state across +// multiple goroutines. Another option is to use the +// built-in synchronization features of goroutines and +// channels to achieve the same result. This channel-based +// approach aligns with Go's ideas of sharing memory via +// communications and having each piece of data owned +// by exactly 1 goroutine. + +package main + +import "fmt" +import "time" +import "math/rand" +import "sync/atomic" + +// In this example our state will be owned by a single +// goroutine. This will guarantee that the data is never +// corrupted with concurrent access. In order to read or +// write that state, other goroutines will send messages +// to the owning goroutine and receive corresponding +// replies. These `readOp` and `writeOp` `struct`s +// encapsulate those requests and provide a way for the +// owning goroutine to respond. +type readOp struct { + key int + resp chan int +} +type writeOp struct { + key int + val int + resp chan bool +} + +func main() { + + // The `state` will be a map as in the previous + // example. + var state = make(map[int]int) + + // Also as before we'll count how many operations we + // perform. + var ops int64 = 0 + + // The `reads` and `writes` channels will be used by + // other goroutines to issue read and write requests, + // respectively. + reads := make(chan *readOp) + writes := make(chan *writeOp) + + // Here is the goroutine that owns the `state`. This + // goroutine repeatedly selects on the `reads` and + // `writes` channels, responding to requests as they + // arrive. A response is executed by first performing + // the requested operation, and then sending a value + // on the response channel `resp` to indicate success + // (and the desired value in the case of `reads`). + go func() { + for { + select { + case read := <-reads: + read.resp <- state[read.key] + case write := <-writes: + state[write.key] = write.val + write.resp <- true + } + } + }() + + // This starts 100 goroutines to issue reads to the + // state-owning goroutine via the `reads` channel. + // Each read requires constructing a `readOp`, sending + // it over the `reads` channel, and the receiving the + // result over the provided `resp` channel. + for r := 0; r < 100; r++ { + go func() { + for { + read := &readOp{ + key: rand.Intn(5), + resp: make(chan int)} + reads <- read + <-read.resp + atomic.AddInt64(&ops, 1) + } + }() + } + + // We start 10 writes as well, using a similar + // approach. + for w := 0; w < 10; w++ { + go func() { + for { + write := &writeOp{ + key: rand.Intn(5), + val: rand.Intn(100), + resp: make(chan bool)} + writes <- write + <-write.resp + atomic.AddInt64(&ops, 1) + } + }() + } + + // Let the goroutines work for a second. + time.Sleep(time.Second) + + // Finally, capture and report the `ops` count. + opsFinal := atomic.LoadInt64(&ops) + fmt.Println("ops:", opsFinal) +} diff --git a/examples/stateful-goroutines/stateful-goroutines.sh b/examples/stateful-goroutines/stateful-goroutines.sh new file mode 100644 index 0000000..3fe1b9c --- /dev/null +++ b/examples/stateful-goroutines/stateful-goroutines.sh @@ -0,0 +1,15 @@ +# Running our program shows that the goroutine-based +# state management example achieves about 800,000 +# operations per second. +$ go run stateful-goroutines.go +ops: 807434 + +# For this particular case the goroutine-based approach +# was a bit more involved than the mutex-based one. It +# might be useful in certain cases though, for example +# where you have other channels involved or when managing +# multiple such mutexes would be error-prone. The right +# approach for your program will depend on its particular +# details. You should use whatever approach feels most +# natural, especially with respect to understanding the +# correctness of your program.