publish mutexes and stateful-goroutines
This commit is contained in:
parent
e7cb1d1ac8
commit
459d16196a
@ -39,8 +39,8 @@ Tickers
|
|||||||
Worker Pools
|
Worker Pools
|
||||||
Rate Limiting
|
Rate Limiting
|
||||||
Atomic Counters
|
Atomic Counters
|
||||||
# Mutexs
|
Mutexes
|
||||||
# Stateful Goroutines
|
Stateful Goroutines
|
||||||
Sorting
|
Sorting
|
||||||
Sorting by Functions
|
Sorting by Functions
|
||||||
# Collection Functions
|
# Collection Functions
|
||||||
|
@ -2,8 +2,8 @@
|
|||||||
// communication over channels. We saw this for example
|
// communication over channels. We saw this for example
|
||||||
// with [worker pools](worker-pool). There are a few other
|
// with [worker pools](worker-pool). There are a few other
|
||||||
// options for managing state though. Here we'll
|
// options for managing state though. Here we'll
|
||||||
// look at using the `sync/atomic` package for simple
|
// look at using the `sync/atomic` package for _atomic
|
||||||
// counters accessed by multiple goroutines.
|
// counters_ accessed by multiple goroutines.
|
||||||
|
|
||||||
package main
|
package main
|
||||||
|
|
||||||
@ -46,3 +46,6 @@ func main() {
|
|||||||
opsFinal := atomic.LoadUint64(&ops)
|
opsFinal := atomic.LoadUint64(&ops)
|
||||||
fmt.Println("ops:", opsFinal)
|
fmt.Println("ops:", opsFinal)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Next we'll look at another approach to managing state:
|
||||||
|
// mutexes.
|
||||||
|
@ -2,3 +2,6 @@
|
|||||||
# 40,000 operations.
|
# 40,000 operations.
|
||||||
$ go run atomic-counters.go
|
$ go run atomic-counters.go
|
||||||
ops: 40200
|
ops: 40200
|
||||||
|
|
||||||
|
# Next we'll look at mutexes, another tool for managing
|
||||||
|
# state.
|
||||||
|
88
examples/mutexes/mutexes.go
Normal file
88
examples/mutexes/mutexes.go
Normal file
@ -0,0 +1,88 @@
|
|||||||
|
// In the previous example we saw how to manage simple
|
||||||
|
// counter state using atomic operations. For more complex
|
||||||
|
// state we can use a _[mutex](http://en.wikipedia.org/wiki/Mutual_exclusion)_
|
||||||
|
// to safely access data across multiple goroutines.
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import "fmt"
|
||||||
|
import "time"
|
||||||
|
import "math/rand"
|
||||||
|
import "sync"
|
||||||
|
import "sync/atomic"
|
||||||
|
import "runtime"
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
|
||||||
|
// For our example the `state` will be a map.
|
||||||
|
var state = make(map[int]int)
|
||||||
|
|
||||||
|
// This `mutex` will synchronize access to `state`.
|
||||||
|
var mutex = &sync.Mutex{}
|
||||||
|
|
||||||
|
// To compare the mutex-based approach with another
|
||||||
|
// we'll see later, `ops` will count how many
|
||||||
|
// operations we perform against the state.
|
||||||
|
var ops int64 = 0
|
||||||
|
|
||||||
|
// Here we start 100 goroutines to execute repeated
|
||||||
|
// reads against the state.
|
||||||
|
for r := 0; r < 100; r++ {
|
||||||
|
go func() {
|
||||||
|
total := 0
|
||||||
|
for {
|
||||||
|
|
||||||
|
// For each read we pick a key to access,
|
||||||
|
// `Lock()` the `mutex` to ensure
|
||||||
|
// exclusive access to the `state`, read
|
||||||
|
// the value at the chosen key,
|
||||||
|
// `Unlock()` the mutex, and increment
|
||||||
|
// the `ops` count.
|
||||||
|
key := rand.Intn(5)
|
||||||
|
mutex.Lock()
|
||||||
|
total += state[key]
|
||||||
|
mutex.Unlock()
|
||||||
|
atomic.AddInt64(&ops, 1)
|
||||||
|
|
||||||
|
// In order to ensure that this goroutine
|
||||||
|
// doesn't starve the scheduler, we explicitly
|
||||||
|
// yield after each operation with
|
||||||
|
// `runtime.Gosched()`. This yielding is
|
||||||
|
// handled automatically with e.g. every
|
||||||
|
// channel operation and for blocking
|
||||||
|
// calls like `time.Sleep`, but in this
|
||||||
|
// case we need to do it manually.
|
||||||
|
runtime.Gosched()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// We'll also start 10 goroutines to simulate writes,
|
||||||
|
// using the same pattern we did for reads.
|
||||||
|
for w := 0; w < 10; w++ {
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
key := rand.Intn(5)
|
||||||
|
val := rand.Intn(100)
|
||||||
|
mutex.Lock()
|
||||||
|
state[key] = val
|
||||||
|
mutex.Unlock()
|
||||||
|
atomic.AddInt64(&ops, 1)
|
||||||
|
runtime.Gosched()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Let the 110 goroutines work on the `state` and
|
||||||
|
// `mutex` for a second.
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
|
// Take and report a final operations count.
|
||||||
|
opsFinal := atomic.LoadInt64(&ops)
|
||||||
|
fmt.Println("ops:", opsFinal)
|
||||||
|
|
||||||
|
// With a final lock of `state`, show how it ended up.
|
||||||
|
mutex.Lock()
|
||||||
|
fmt.Println("state:", state)
|
||||||
|
mutex.Unlock()
|
||||||
|
}
|
9
examples/mutexes/mutexes.sh
Normal file
9
examples/mutexes/mutexes.sh
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
# Running the program shows that we executed about
|
||||||
|
# 3,500,000 operations against our `mutex`-synchronized
|
||||||
|
# `state`.
|
||||||
|
$ go run mutexes.go
|
||||||
|
ops: 3598302
|
||||||
|
state: map[1:38 4:98 2:23 3:85 0:44]
|
||||||
|
|
||||||
|
# Next we'll look at implementing this same state
|
||||||
|
# management task using only goroutines and channels.
|
@ -1,91 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import "fmt"
|
|
||||||
import "time"
|
|
||||||
import "math/rand"
|
|
||||||
import "sync/atomic"
|
|
||||||
|
|
||||||
// 1 statefull go routine
|
|
||||||
// writers issue writes
|
|
||||||
// readers issue reads
|
|
||||||
// e.g. routing table
|
|
||||||
|
|
||||||
type readOp struct {
|
|
||||||
key int
|
|
||||||
resp chan int
|
|
||||||
}
|
|
||||||
|
|
||||||
type writeOp struct {
|
|
||||||
key int
|
|
||||||
val int
|
|
||||||
resp chan bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func randKey() int {
|
|
||||||
return rand.Intn(10)
|
|
||||||
}
|
|
||||||
|
|
||||||
func randVal() int {
|
|
||||||
return rand.Intn(100)
|
|
||||||
}
|
|
||||||
|
|
||||||
func manageState(rs chan *readOp, ws chan *writeOp) {
|
|
||||||
data := make(map[int]int)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case read := <-rs:
|
|
||||||
read.resp <- data[read.key]
|
|
||||||
case write := <-ws:
|
|
||||||
data[write.key] = write.val
|
|
||||||
write.resp <- true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Keep track of how many ops we do.
|
|
||||||
var opCount int64 = 0
|
|
||||||
|
|
||||||
// Generate random reads.
|
|
||||||
func generateReads(reads chan *readOp) {
|
|
||||||
for {
|
|
||||||
key := randKey()
|
|
||||||
read := &readOp{key: key, resp: make(chan int)}
|
|
||||||
reads <- read
|
|
||||||
<-read.resp
|
|
||||||
atomic.AddInt64(&opCount, 1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Generate random writes.
|
|
||||||
func generateWrites(writes chan *writeOp) {
|
|
||||||
for {
|
|
||||||
key := randKey()
|
|
||||||
val := randVal()
|
|
||||||
write := &writeOp{
|
|
||||||
key: key,
|
|
||||||
val: val,
|
|
||||||
resp: make(chan bool)}
|
|
||||||
writes <- write
|
|
||||||
<-write.resp
|
|
||||||
atomic.AddInt64(&opCount, 1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
reads := make(chan *readOp)
|
|
||||||
writes := make(chan *writeOp)
|
|
||||||
|
|
||||||
go manageState(reads, writes)
|
|
||||||
|
|
||||||
for r := 0; r < 100; r++ {
|
|
||||||
go generateReads(reads)
|
|
||||||
}
|
|
||||||
for w := 0; w < 10; w++ {
|
|
||||||
go generateWrites(writes)
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic.StoreInt64(&opCount, 0)
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
finalOpCount := atomic.LoadInt64(&opCount)
|
|
||||||
fmt.Println(finalOpCount)
|
|
||||||
}
|
|
@ -1,58 +0,0 @@
|
|||||||
// State
|
|
||||||
|
|
||||||
package main
|
|
||||||
|
|
||||||
import "fmt"
|
|
||||||
import "time"
|
|
||||||
import "math/rand"
|
|
||||||
import "sync"
|
|
||||||
import "sync/atomic"
|
|
||||||
import "runtime"
|
|
||||||
|
|
||||||
// Globally-accessible state.
|
|
||||||
var data = make(map[int]int)
|
|
||||||
|
|
||||||
var dataMutex = &sync.Mutex{}
|
|
||||||
|
|
||||||
// Keep track of how many ops we do.
|
|
||||||
var opCount int64 = 0
|
|
||||||
|
|
||||||
// Generate random reads.
|
|
||||||
func generateReads() {
|
|
||||||
total := 0
|
|
||||||
for {
|
|
||||||
key := rand.Intn(10)
|
|
||||||
dataMutex.Lock()
|
|
||||||
total += data[key]
|
|
||||||
dataMutex.Unlock()
|
|
||||||
atomic.AddInt64(&opCount, 1)
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Generate random writes.
|
|
||||||
func generateWrites() {
|
|
||||||
for {
|
|
||||||
key := rand.Intn(10)
|
|
||||||
val := rand.Intn(100)
|
|
||||||
dataMutex.Lock()
|
|
||||||
data[key] = val
|
|
||||||
dataMutex.Unlock()
|
|
||||||
atomic.AddInt64(&opCount, 1)
|
|
||||||
runtime.Gosched()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
for r := 0; r < 100; r++ {
|
|
||||||
go generateReads()
|
|
||||||
}
|
|
||||||
for w := 0; w < 10; w++ {
|
|
||||||
go generateWrites()
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic.StoreInt64(&opCount, 0)
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
finalOpCount := atomic.LoadInt64(&opCount)
|
|
||||||
fmt.Println(finalOpCount)
|
|
||||||
}
|
|
110
examples/stateful-goroutines/stateful-goroutines.go
Normal file
110
examples/stateful-goroutines/stateful-goroutines.go
Normal file
@ -0,0 +1,110 @@
|
|||||||
|
// In the previous example we used explicit locking with
|
||||||
|
// mutexes to synchronize access to shared state across
|
||||||
|
// multiple goroutines. Another option is to use the
|
||||||
|
// built-in synchronization features of goroutines and
|
||||||
|
// channels to achieve the same result. This channel-based
|
||||||
|
// approach aligns with Go's ideas of sharing memory via
|
||||||
|
// communications and having each piece of data owned
|
||||||
|
// by exactly 1 goroutine.
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import "fmt"
|
||||||
|
import "time"
|
||||||
|
import "math/rand"
|
||||||
|
import "sync/atomic"
|
||||||
|
|
||||||
|
// In this example our state will be owned by a single
|
||||||
|
// goroutine. This will guarantee that the data is never
|
||||||
|
// corrupted with concurrent access. In order to read or
|
||||||
|
// write that state, other goroutines will send messages
|
||||||
|
// to the owning goroutine and receive corresponding
|
||||||
|
// replies. These `readOp` and `writeOp` `struct`s
|
||||||
|
// encapsulate those requests and provide a way for the
|
||||||
|
// owning goroutine to respond.
|
||||||
|
type readOp struct {
|
||||||
|
key int
|
||||||
|
resp chan int
|
||||||
|
}
|
||||||
|
type writeOp struct {
|
||||||
|
key int
|
||||||
|
val int
|
||||||
|
resp chan bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
|
||||||
|
// The `state` will be a map as in the previous
|
||||||
|
// example.
|
||||||
|
var state = make(map[int]int)
|
||||||
|
|
||||||
|
// Also as before we'll count how many operations we
|
||||||
|
// perform.
|
||||||
|
var ops int64 = 0
|
||||||
|
|
||||||
|
// The `reads` and `writes` channels will be used by
|
||||||
|
// other goroutines to issue read and write requests,
|
||||||
|
// respectively.
|
||||||
|
reads := make(chan *readOp)
|
||||||
|
writes := make(chan *writeOp)
|
||||||
|
|
||||||
|
// Here is the goroutine that owns the `state`. This
|
||||||
|
// goroutine repeatedly selects on the `reads` and
|
||||||
|
// `writes` channels, responding to requests as they
|
||||||
|
// arrive. A response is executed by first performing
|
||||||
|
// the requested operation, and then sending a value
|
||||||
|
// on the response channel `resp` to indicate success
|
||||||
|
// (and the desired value in the case of `reads`).
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case read := <-reads:
|
||||||
|
read.resp <- state[read.key]
|
||||||
|
case write := <-writes:
|
||||||
|
state[write.key] = write.val
|
||||||
|
write.resp <- true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// This starts 100 goroutines to issue reads to the
|
||||||
|
// state-owning goroutine via the `reads` channel.
|
||||||
|
// Each read requires constructing a `readOp`, sending
|
||||||
|
// it over the `reads` channel, and the receiving the
|
||||||
|
// result over the provided `resp` channel.
|
||||||
|
for r := 0; r < 100; r++ {
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
read := &readOp{
|
||||||
|
key: rand.Intn(5),
|
||||||
|
resp: make(chan int)}
|
||||||
|
reads <- read
|
||||||
|
<-read.resp
|
||||||
|
atomic.AddInt64(&ops, 1)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// We start 10 writes as well, using a similar
|
||||||
|
// approach.
|
||||||
|
for w := 0; w < 10; w++ {
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
write := &writeOp{
|
||||||
|
key: rand.Intn(5),
|
||||||
|
val: rand.Intn(100),
|
||||||
|
resp: make(chan bool)}
|
||||||
|
writes <- write
|
||||||
|
<-write.resp
|
||||||
|
atomic.AddInt64(&ops, 1)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Let the goroutines work for a second.
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
|
// Finally, capture and report the `ops` count.
|
||||||
|
opsFinal := atomic.LoadInt64(&ops)
|
||||||
|
fmt.Println("ops:", opsFinal)
|
||||||
|
}
|
15
examples/stateful-goroutines/stateful-goroutines.sh
Normal file
15
examples/stateful-goroutines/stateful-goroutines.sh
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
# Running our program shows that the goroutine-based
|
||||||
|
# state management example achieves about 800,000
|
||||||
|
# operations per second.
|
||||||
|
$ go run stateful-goroutines.go
|
||||||
|
ops: 807434
|
||||||
|
|
||||||
|
# For this particular case the goroutine-based approach
|
||||||
|
# was a bit more involved than the mutex-based one. It
|
||||||
|
# might be useful in certain cases though, for example
|
||||||
|
# where you have other channels involved or when managing
|
||||||
|
# multiple such mutexes would be error-prone. The right
|
||||||
|
# approach for your program will depend on its particular
|
||||||
|
# details. You should use whatever approach feels most
|
||||||
|
# natural, especially with respect to understanding the
|
||||||
|
# correctness of your program.
|
Loading…
x
Reference in New Issue
Block a user