From 7c1cb47bd0fa393cfde87befe351a1b74b9c03be Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Sun, 28 Jul 2019 18:23:26 +0300 Subject: [PATCH] [NOD-249] Change WaitGroup to use channels (#350) * [NOD-248] Implement waitgroup to enable waiting while adding * [NOD-248] fix waitGroup.done() error message * [NOD-248] atomically read wg.counter * [NOD-248] return lowPriorityMutex * [NOD-249] Add tests to waitgroup * [NOD-249] Change waitgroup to use channels * [NOD-249] Format project * [NOD-249] Add comments and logs to waitGroup, and remove timeouts from prioritymutex_test.go * [NOD-249] Fix comments --- logger/logger.go | 12 +- util/locks/log.go | 22 ++++ util/locks/prioritymutex_test.go | 17 +-- util/locks/waitgroup.go | 18 ++- util/locks/waitgroup_test.go | 217 +++++++++++++++++++++++++++++++ 5 files changed, 260 insertions(+), 26 deletions(-) create mode 100644 util/locks/log.go create mode 100644 util/locks/waitgroup_test.go diff --git a/logger/logger.go b/logger/logger.go index a93b5411a..53cffa74d 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -52,6 +52,7 @@ var ( bcdbLog = backendLog.Logger("BCDB") btcdLog = backendLog.Logger("BTCD") chanLog = backendLog.Logger("CHAN") + cnfgLog = backendLog.Logger("CNFG") discLog = backendLog.Logger("DISC") indxLog = backendLog.Logger("INDX") minrLog = backendLog.Logger("MINR") @@ -61,7 +62,7 @@ var ( srvrLog = backendLog.Logger("SRVR") syncLog = backendLog.Logger("SYNC") txmpLog = backendLog.Logger("TXMP") - cnfgLog = backendLog.Logger("CNFG") + utilLog = backendLog.Logger("UTIL") initiated = false ) @@ -74,6 +75,7 @@ var SubsystemTags = struct { BCDB, BTCD, CHAN, + CNFG, DISC, INDX, MINR, @@ -83,7 +85,7 @@ var SubsystemTags = struct { SRVR, SYNC, TXMP, - CNFG string + UTIL string }{ ADXR: "ADXR", AMGR: "AMGR", @@ -91,6 +93,7 @@ var SubsystemTags = struct { BCDB: "BCDB", BTCD: "BTCD", CHAN: "CHAN", + CNFG: "CNFG", DISC: "DISC", INDX: "INDX", MINR: "MINR", @@ -100,7 +103,7 @@ var SubsystemTags = struct { SRVR: "SRVR", SYNC: "SYNC", TXMP: "TXMP", - CNFG: "CNFG", + UTIL: "UTIL", } // subsystemLoggers maps each subsystem identifier to its associated logger. @@ -111,6 +114,7 @@ var subsystemLoggers = map[string]btclog.Logger{ SubsystemTags.BCDB: bcdbLog, SubsystemTags.BTCD: btcdLog, SubsystemTags.CHAN: chanLog, + SubsystemTags.CNFG: cnfgLog, SubsystemTags.DISC: discLog, SubsystemTags.INDX: indxLog, SubsystemTags.MINR: minrLog, @@ -120,7 +124,7 @@ var subsystemLoggers = map[string]btclog.Logger{ SubsystemTags.SRVR: srvrLog, SubsystemTags.SYNC: syncLog, SubsystemTags.TXMP: txmpLog, - SubsystemTags.CNFG: cnfgLog, + SubsystemTags.UTIL: utilLog, } // InitLogRotator initializes the logging rotater to write logs to logFile and diff --git a/util/locks/log.go b/util/locks/log.go new file mode 100644 index 000000000..24df7ab19 --- /dev/null +++ b/util/locks/log.go @@ -0,0 +1,22 @@ +// Copyright (c) 2013-2014 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package locks + +import ( + "github.com/btcsuite/btclog" + "github.com/daglabs/btcd/logger" + "github.com/daglabs/btcd/util/panics" +) + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger +var spawn func(func()) + +func init() { + log, _ = logger.Get(logger.SubsystemTags.UTIL) + spawn = panics.GoroutineWrapperFunc(log) +} diff --git a/util/locks/prioritymutex_test.go b/util/locks/prioritymutex_test.go index f2fbceab9..2f161a14f 100644 --- a/util/locks/prioritymutex_test.go +++ b/util/locks/prioritymutex_test.go @@ -41,7 +41,7 @@ func TestPriorityMutex(t *testing.T) { }() time.Sleep(time.Second) mtx.HighPriorityWriteUnlock() - waitForWaitGroup(t, &wg, 2*time.Second) + wg.Wait() expectedSlice := []int{1, 2} if !reflect.DeepEqual(sharedSlice, expectedSlice) { t.Errorf("Expected the shared slice to be %d but got %d", expectedSlice, sharedSlice) @@ -76,18 +76,5 @@ func TestHighPriorityReadLock(t *testing.T) { }() time.Sleep(time.Second) mtx.LowPriorityWriteUnlock() - waitForWaitGroup(t, &wg, time.Second) -} - -func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) { - doneWaiting := make(chan struct{}) - go func() { - wg.Wait() - doneWaiting <- struct{}{} - }() - select { - case <-time.Tick(timeout): - t.Fatalf("Unexpected timeout") - case <-doneWaiting: - } + wg.Wait() } diff --git a/util/locks/waitgroup.go b/util/locks/waitgroup.go index 3356ac666..32406a06e 100644 --- a/util/locks/waitgroup.go +++ b/util/locks/waitgroup.go @@ -6,13 +6,15 @@ import ( ) type waitGroup struct { - counter int64 - waitCond *sync.Cond + counter int64 + waitLock sync.Mutex + syncChannel chan struct{} } func newWaitGroup() *waitGroup { return &waitGroup{ - waitCond: sync.NewCond(&sync.Mutex{}), + waitLock: sync.Mutex{}, + syncChannel: make(chan struct{}), } } @@ -26,14 +28,16 @@ func (wg *waitGroup) done() { panic("negative values for wg.counter are not allowed. This was likely caused by calling done() before add()") } if atomic.LoadInt64(&wg.counter) == 0 { - wg.waitCond.Signal() + spawn(func() { + wg.syncChannel <- struct{}{} + }) } } func (wg *waitGroup) wait() { - wg.waitCond.L.Lock() - defer wg.waitCond.L.Unlock() + wg.waitLock.Lock() + defer wg.waitLock.Unlock() for atomic.LoadInt64(&wg.counter) != 0 { - wg.waitCond.Wait() + <-wg.syncChannel } } diff --git a/util/locks/waitgroup_test.go b/util/locks/waitgroup_test.go new file mode 100644 index 000000000..84cc9eaa6 --- /dev/null +++ b/util/locks/waitgroup_test.go @@ -0,0 +1,217 @@ +// Copyright 2011 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package locks + +import ( + "sync/atomic" + "testing" +) + +// All of the tests, except TestAddAfterWait, are copied +// from the native sync/waitgroup_test.go (with some +// minor changes), to check that the new waitGroup +// behaves the same, except enabling the use of add() +// concurrently with wait() + +// syncWaitGroupCompatible is a type that was created in order to +// make the use of waitGroup similar to the native one, so it'll +// be more convenient to use the same tests from sync/waitgroup_test.go +type syncWaitGroupCompatible struct { + *waitGroup +} + +func (swg *syncWaitGroupCompatible) add(delta int) { + for i := 0; i < delta; i++ { + swg.waitGroup.add() + } +} + +func newSyncWgCompatible() *syncWaitGroupCompatible { + return &syncWaitGroupCompatible{ + waitGroup: newWaitGroup(), + } +} + +func testWaitGroup(t *testing.T, wg1 *syncWaitGroupCompatible, wg2 *syncWaitGroupCompatible) { + n := 16 + wg1.add(n) + wg2.add(n) + exited := make(chan struct{}, n) + for i := 0; i != n; i++ { + go func(i int) { + wg1.done() + wg2.wait() + exited <- struct{}{} + }(i) + } + wg1.wait() + for i := 0; i != n; i++ { + select { + case <-exited: + t.Fatal("waitGroup released group too soon") + default: + } + wg2.done() + } + for i := 0; i != n; i++ { + <-exited // Will block if barrier fails to unlock someone. + } +} + +func TestWaitGroup(t *testing.T) { + wg1 := newSyncWgCompatible() + wg2 := newSyncWgCompatible() + + // Run the same test a few times to ensure barrier is in a proper state. + for i := 0; i != 8; i++ { + testWaitGroup(t, wg1, wg2) + } +} + +func TestWaitGroupMisuse(t *testing.T) { + defer func() { + err := recover() + if err != "negative values for wg.counter are not allowed. This was likely caused by calling done() before add()" { + t.Fatalf("Unexpected panic: %#v", err) + } + }() + wg := newSyncWgCompatible() + wg.add(1) + wg.done() + wg.done() + t.Fatal("Should panic, because wg.counter should be negative (-1), which is not allowed") +} + +func TestAddAfterWait(t *testing.T) { + wg := newSyncWgCompatible() + wg.add(1) + syncChan := make(chan struct{}) + go func() { + syncChan <- struct{}{} + wg.wait() + syncChan <- struct{}{} + }() + <-syncChan + wg.add(1) + wg.done() + wg.done() + <-syncChan +} + +func TestWaitGroupRace(t *testing.T) { + // Run this test for about 1ms. + for i := 0; i < 1000; i++ { + wg := newSyncWgCompatible() + n := new(int32) + // spawn goroutine 1 + wg.add(1) + go func() { + atomic.AddInt32(n, 1) + wg.done() + }() + // spawn goroutine 2 + wg.add(1) + go func() { + atomic.AddInt32(n, 1) + wg.done() + }() + // Wait for goroutine 1 and 2 + wg.wait() + if atomic.LoadInt32(n) != 2 { + t.Fatal("Spurious wakeup from Wait") + } + } +} + +func TestWaitGroupAlign(t *testing.T) { + type X struct { + x byte + wg *syncWaitGroupCompatible + } + x := X{wg: newSyncWgCompatible()} + x.wg.add(1) + go func(x *X) { + x.wg.done() + }(&x) + x.wg.wait() +} + +func BenchmarkWaitGroupUncontended(b *testing.B) { + type PaddedWaitGroup struct { + *syncWaitGroupCompatible + pad [128]uint8 + } + b.RunParallel(func(pb *testing.PB) { + wg := PaddedWaitGroup{ + syncWaitGroupCompatible: newSyncWgCompatible(), + } + for pb.Next() { + wg.add(1) + wg.done() + wg.wait() + } + }) +} + +func benchmarkWaitGroupAdddone(b *testing.B, localWork int) { + wg := newSyncWgCompatible() + b.RunParallel(func(pb *testing.PB) { + foo := 0 + for pb.Next() { + wg.add(1) + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + wg.done() + } + _ = foo + }) +} + +func BenchmarkWaitGroupAdddone(b *testing.B) { + benchmarkWaitGroupAdddone(b, 0) +} + +func BenchmarkWaitGroupAddDoneWork(b *testing.B) { + benchmarkWaitGroupAdddone(b, 100) +} + +func benchmarkWaitGroupwait(b *testing.B, localWork int) { + wg := newSyncWgCompatible() + b.RunParallel(func(pb *testing.PB) { + foo := 0 + for pb.Next() { + wg.wait() + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + } + _ = foo + }) +} + +func BenchmarkWaitGroupwait(b *testing.B) { + benchmarkWaitGroupwait(b, 0) +} + +func BenchmarkWaitGroupWaitWork(b *testing.B) { + benchmarkWaitGroupwait(b, 100) +} + +func BenchmarkWaitGroupActuallywait(b *testing.B) { + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + wg := newSyncWgCompatible() + wg.add(1) + go func() { + wg.done() + }() + wg.wait() + } + }) +}