From bf0de16c9a6d2240316ddfd09e3b4101cc68d6b7 Mon Sep 17 00:00:00 2001 From: Mark McGranaghan Date: Thu, 18 Oct 2012 19:57:28 -0400 Subject: [PATCH] publish worker-pools --- examples.txt | 3 +- examples/scatter-gather/scatter-gather.go | 24 --------- examples/worker-pools/worker-pools.go | 61 +++++++++++++++-------- examples/worker-pools/worker-pools.sh | 16 ++++++ 4 files changed, 56 insertions(+), 48 deletions(-) delete mode 100644 examples/scatter-gather/scatter-gather.go create mode 100644 examples/worker-pools/worker-pools.sh diff --git a/examples.txt b/examples.txt index 42be2ef..1163fe2 100644 --- a/examples.txt +++ b/examples.txt @@ -34,11 +34,10 @@ Timeouts Non-Blocking Channel Operations Closing Channels Range over Channels -# Scatter Gather # Rate Limiting -# Worker Pools Timers Tickers +Worker Pools # State Goroutine # State Mutex Sorting diff --git a/examples/scatter-gather/scatter-gather.go b/examples/scatter-gather/scatter-gather.go deleted file mode 100644 index a486e43..0000000 --- a/examples/scatter-gather/scatter-gather.go +++ /dev/null @@ -1,24 +0,0 @@ -package main - -import "sync" -import "time" -import "math/rand" -import "fmt" - -func main() { - times := new([20]int) - wait := new(sync.WaitGroup) - for i := 0; i < 20; i++ { - n := i - wait.Add(1) - go func() { - opTime := time.Duration(rand.Intn(2000)) - time.Sleep(opTime * time.Millisecond) - fmt.Println(n) - times[n] = opTime - wait.Done() - }() - } - wait.Wait() - fmt.Println(*times) -} diff --git a/examples/worker-pools/worker-pools.go b/examples/worker-pools/worker-pools.go index 396224e..71efafb 100644 --- a/examples/worker-pools/worker-pools.go +++ b/examples/worker-pools/worker-pools.go @@ -1,29 +1,46 @@ +// In this example we'll look at how to implement +// a _worker pool_ using goroutines and channels. + package main import "time" -func main() { - jobs := make(chan int, 100) - acks := make(chan bool, 100) - - for w := 0; w < 10; w++ { - go func() { - for j := range jobs { - println("worker", w, "processing job", j) - time.Sleep(time.Millisecond * 150) - acks <- true - } - }() +// Here's the worker, of which we'll run several +// concurrent instances. These workers will receive +// work on the `jobs` channel and send the corresponding +// results on `results`. We'll sleep a second per job to +// simulate an expensive task. +func worker(id int, jobs <-chan int, results chan<- int) { + for j := range jobs { + println("worker", id, "processing job", j) + time.Sleep(time.Second) + results <- j * 2 } - - for j := 0; j < 100; j++ { - jobs <- j - } - - for a := 0; a < 100; a++ { - <-acks - } - println("all done") } -// todo: broken +func main() { + + // In order to use our pool of workers we need to send + // them work and collect their results. We make 2 + // channels for this. + jobs := make(chan int, 100) + results := make(chan int, 100) + + // This starts up 3 workers, initially blocked + // because there are no jobs yet. + for w := 1; w <= 3; w++ { + go worker(w, jobs, results) + } + + // Here we send 9 `jobs` and then `close` that + // channel to indicate that's all the work we have. + for j := 1; j <= 9; j++ { + jobs <- j + } + close(jobs) + + // Finally we collect all the results of the work. + for a := 1; a <= 9; a++ { + <-results + } +} diff --git a/examples/worker-pools/worker-pools.sh b/examples/worker-pools/worker-pools.sh new file mode 100644 index 0000000..90937e1 --- /dev/null +++ b/examples/worker-pools/worker-pools.sh @@ -0,0 +1,16 @@ +# Our running program shows the 9 jobs being executed by +# various workers. The program only takes about 3 seconds +# despite doing about 9 seconds of total work because +# there are 3 workers operating concurrently. +$ time go run worker-pools.go +worker 1 processing job 1 +worker 2 processing job 2 +worker 3 processing job 3 +worker 1 processing job 4 +worker 2 processing job 5 +worker 3 processing job 6 +worker 1 processing job 7 +worker 2 processing job 8 +worker 3 processing job 9 + +real 0m3.149s