[NOD-249] Change WaitGroup to use channels (#350)

* [NOD-248] Implement waitgroup to enable waiting while adding

* [NOD-248] fix waitGroup.done() error message

* [NOD-248] atomically read wg.counter

* [NOD-248] return lowPriorityMutex

* [NOD-249] Add tests to waitgroup

* [NOD-249] Change waitgroup to use channels

* [NOD-249] Format project

* [NOD-249] Add comments and logs to waitGroup, and remove timeouts from
prioritymutex_test.go

* [NOD-249] Fix comments
This commit is contained in:
Ori Newman 2019-07-28 18:23:26 +03:00 committed by Svarog
parent 6acfa18d7c
commit 7c1cb47bd0
5 changed files with 260 additions and 26 deletions

View File

@ -52,6 +52,7 @@ var (
bcdbLog = backendLog.Logger("BCDB") bcdbLog = backendLog.Logger("BCDB")
btcdLog = backendLog.Logger("BTCD") btcdLog = backendLog.Logger("BTCD")
chanLog = backendLog.Logger("CHAN") chanLog = backendLog.Logger("CHAN")
cnfgLog = backendLog.Logger("CNFG")
discLog = backendLog.Logger("DISC") discLog = backendLog.Logger("DISC")
indxLog = backendLog.Logger("INDX") indxLog = backendLog.Logger("INDX")
minrLog = backendLog.Logger("MINR") minrLog = backendLog.Logger("MINR")
@ -61,7 +62,7 @@ var (
srvrLog = backendLog.Logger("SRVR") srvrLog = backendLog.Logger("SRVR")
syncLog = backendLog.Logger("SYNC") syncLog = backendLog.Logger("SYNC")
txmpLog = backendLog.Logger("TXMP") txmpLog = backendLog.Logger("TXMP")
cnfgLog = backendLog.Logger("CNFG") utilLog = backendLog.Logger("UTIL")
initiated = false initiated = false
) )
@ -74,6 +75,7 @@ var SubsystemTags = struct {
BCDB, BCDB,
BTCD, BTCD,
CHAN, CHAN,
CNFG,
DISC, DISC,
INDX, INDX,
MINR, MINR,
@ -83,7 +85,7 @@ var SubsystemTags = struct {
SRVR, SRVR,
SYNC, SYNC,
TXMP, TXMP,
CNFG string UTIL string
}{ }{
ADXR: "ADXR", ADXR: "ADXR",
AMGR: "AMGR", AMGR: "AMGR",
@ -91,6 +93,7 @@ var SubsystemTags = struct {
BCDB: "BCDB", BCDB: "BCDB",
BTCD: "BTCD", BTCD: "BTCD",
CHAN: "CHAN", CHAN: "CHAN",
CNFG: "CNFG",
DISC: "DISC", DISC: "DISC",
INDX: "INDX", INDX: "INDX",
MINR: "MINR", MINR: "MINR",
@ -100,7 +103,7 @@ var SubsystemTags = struct {
SRVR: "SRVR", SRVR: "SRVR",
SYNC: "SYNC", SYNC: "SYNC",
TXMP: "TXMP", TXMP: "TXMP",
CNFG: "CNFG", UTIL: "UTIL",
} }
// subsystemLoggers maps each subsystem identifier to its associated logger. // subsystemLoggers maps each subsystem identifier to its associated logger.
@ -111,6 +114,7 @@ var subsystemLoggers = map[string]btclog.Logger{
SubsystemTags.BCDB: bcdbLog, SubsystemTags.BCDB: bcdbLog,
SubsystemTags.BTCD: btcdLog, SubsystemTags.BTCD: btcdLog,
SubsystemTags.CHAN: chanLog, SubsystemTags.CHAN: chanLog,
SubsystemTags.CNFG: cnfgLog,
SubsystemTags.DISC: discLog, SubsystemTags.DISC: discLog,
SubsystemTags.INDX: indxLog, SubsystemTags.INDX: indxLog,
SubsystemTags.MINR: minrLog, SubsystemTags.MINR: minrLog,
@ -120,7 +124,7 @@ var subsystemLoggers = map[string]btclog.Logger{
SubsystemTags.SRVR: srvrLog, SubsystemTags.SRVR: srvrLog,
SubsystemTags.SYNC: syncLog, SubsystemTags.SYNC: syncLog,
SubsystemTags.TXMP: txmpLog, SubsystemTags.TXMP: txmpLog,
SubsystemTags.CNFG: cnfgLog, SubsystemTags.UTIL: utilLog,
} }
// InitLogRotator initializes the logging rotater to write logs to logFile and // InitLogRotator initializes the logging rotater to write logs to logFile and

22
util/locks/log.go Normal file
View File

@ -0,0 +1,22 @@
// Copyright (c) 2013-2014 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package locks
import (
"github.com/btcsuite/btclog"
"github.com/daglabs/btcd/logger"
"github.com/daglabs/btcd/util/panics"
)
// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger
var spawn func(func())
func init() {
log, _ = logger.Get(logger.SubsystemTags.UTIL)
spawn = panics.GoroutineWrapperFunc(log)
}

View File

@ -41,7 +41,7 @@ func TestPriorityMutex(t *testing.T) {
}() }()
time.Sleep(time.Second) time.Sleep(time.Second)
mtx.HighPriorityWriteUnlock() mtx.HighPriorityWriteUnlock()
waitForWaitGroup(t, &wg, 2*time.Second) wg.Wait()
expectedSlice := []int{1, 2} expectedSlice := []int{1, 2}
if !reflect.DeepEqual(sharedSlice, expectedSlice) { if !reflect.DeepEqual(sharedSlice, expectedSlice) {
t.Errorf("Expected the shared slice to be %d but got %d", expectedSlice, sharedSlice) t.Errorf("Expected the shared slice to be %d but got %d", expectedSlice, sharedSlice)
@ -76,18 +76,5 @@ func TestHighPriorityReadLock(t *testing.T) {
}() }()
time.Sleep(time.Second) time.Sleep(time.Second)
mtx.LowPriorityWriteUnlock() mtx.LowPriorityWriteUnlock()
waitForWaitGroup(t, &wg, time.Second) wg.Wait()
}
func waitForWaitGroup(t *testing.T, wg *sync.WaitGroup, timeout time.Duration) {
doneWaiting := make(chan struct{})
go func() {
wg.Wait()
doneWaiting <- struct{}{}
}()
select {
case <-time.Tick(timeout):
t.Fatalf("Unexpected timeout")
case <-doneWaiting:
}
} }

View File

@ -6,13 +6,15 @@ import (
) )
type waitGroup struct { type waitGroup struct {
counter int64 counter int64
waitCond *sync.Cond waitLock sync.Mutex
syncChannel chan struct{}
} }
func newWaitGroup() *waitGroup { func newWaitGroup() *waitGroup {
return &waitGroup{ return &waitGroup{
waitCond: sync.NewCond(&sync.Mutex{}), waitLock: sync.Mutex{},
syncChannel: make(chan struct{}),
} }
} }
@ -26,14 +28,16 @@ func (wg *waitGroup) done() {
panic("negative values for wg.counter are not allowed. This was likely caused by calling done() before add()") panic("negative values for wg.counter are not allowed. This was likely caused by calling done() before add()")
} }
if atomic.LoadInt64(&wg.counter) == 0 { if atomic.LoadInt64(&wg.counter) == 0 {
wg.waitCond.Signal() spawn(func() {
wg.syncChannel <- struct{}{}
})
} }
} }
func (wg *waitGroup) wait() { func (wg *waitGroup) wait() {
wg.waitCond.L.Lock() wg.waitLock.Lock()
defer wg.waitCond.L.Unlock() defer wg.waitLock.Unlock()
for atomic.LoadInt64(&wg.counter) != 0 { for atomic.LoadInt64(&wg.counter) != 0 {
wg.waitCond.Wait() <-wg.syncChannel
} }
} }

View File

@ -0,0 +1,217 @@
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package locks
import (
"sync/atomic"
"testing"
)
// All of the tests, except TestAddAfterWait, are copied
// from the native sync/waitgroup_test.go (with some
// minor changes), to check that the new waitGroup
// behaves the same, except enabling the use of add()
// concurrently with wait()
// syncWaitGroupCompatible is a type that was created in order to
// make the use of waitGroup similar to the native one, so it'll
// be more convenient to use the same tests from sync/waitgroup_test.go
type syncWaitGroupCompatible struct {
*waitGroup
}
func (swg *syncWaitGroupCompatible) add(delta int) {
for i := 0; i < delta; i++ {
swg.waitGroup.add()
}
}
func newSyncWgCompatible() *syncWaitGroupCompatible {
return &syncWaitGroupCompatible{
waitGroup: newWaitGroup(),
}
}
func testWaitGroup(t *testing.T, wg1 *syncWaitGroupCompatible, wg2 *syncWaitGroupCompatible) {
n := 16
wg1.add(n)
wg2.add(n)
exited := make(chan struct{}, n)
for i := 0; i != n; i++ {
go func(i int) {
wg1.done()
wg2.wait()
exited <- struct{}{}
}(i)
}
wg1.wait()
for i := 0; i != n; i++ {
select {
case <-exited:
t.Fatal("waitGroup released group too soon")
default:
}
wg2.done()
}
for i := 0; i != n; i++ {
<-exited // Will block if barrier fails to unlock someone.
}
}
func TestWaitGroup(t *testing.T) {
wg1 := newSyncWgCompatible()
wg2 := newSyncWgCompatible()
// Run the same test a few times to ensure barrier is in a proper state.
for i := 0; i != 8; i++ {
testWaitGroup(t, wg1, wg2)
}
}
func TestWaitGroupMisuse(t *testing.T) {
defer func() {
err := recover()
if err != "negative values for wg.counter are not allowed. This was likely caused by calling done() before add()" {
t.Fatalf("Unexpected panic: %#v", err)
}
}()
wg := newSyncWgCompatible()
wg.add(1)
wg.done()
wg.done()
t.Fatal("Should panic, because wg.counter should be negative (-1), which is not allowed")
}
func TestAddAfterWait(t *testing.T) {
wg := newSyncWgCompatible()
wg.add(1)
syncChan := make(chan struct{})
go func() {
syncChan <- struct{}{}
wg.wait()
syncChan <- struct{}{}
}()
<-syncChan
wg.add(1)
wg.done()
wg.done()
<-syncChan
}
func TestWaitGroupRace(t *testing.T) {
// Run this test for about 1ms.
for i := 0; i < 1000; i++ {
wg := newSyncWgCompatible()
n := new(int32)
// spawn goroutine 1
wg.add(1)
go func() {
atomic.AddInt32(n, 1)
wg.done()
}()
// spawn goroutine 2
wg.add(1)
go func() {
atomic.AddInt32(n, 1)
wg.done()
}()
// Wait for goroutine 1 and 2
wg.wait()
if atomic.LoadInt32(n) != 2 {
t.Fatal("Spurious wakeup from Wait")
}
}
}
func TestWaitGroupAlign(t *testing.T) {
type X struct {
x byte
wg *syncWaitGroupCompatible
}
x := X{wg: newSyncWgCompatible()}
x.wg.add(1)
go func(x *X) {
x.wg.done()
}(&x)
x.wg.wait()
}
func BenchmarkWaitGroupUncontended(b *testing.B) {
type PaddedWaitGroup struct {
*syncWaitGroupCompatible
pad [128]uint8
}
b.RunParallel(func(pb *testing.PB) {
wg := PaddedWaitGroup{
syncWaitGroupCompatible: newSyncWgCompatible(),
}
for pb.Next() {
wg.add(1)
wg.done()
wg.wait()
}
})
}
func benchmarkWaitGroupAdddone(b *testing.B, localWork int) {
wg := newSyncWgCompatible()
b.RunParallel(func(pb *testing.PB) {
foo := 0
for pb.Next() {
wg.add(1)
for i := 0; i < localWork; i++ {
foo *= 2
foo /= 2
}
wg.done()
}
_ = foo
})
}
func BenchmarkWaitGroupAdddone(b *testing.B) {
benchmarkWaitGroupAdddone(b, 0)
}
func BenchmarkWaitGroupAddDoneWork(b *testing.B) {
benchmarkWaitGroupAdddone(b, 100)
}
func benchmarkWaitGroupwait(b *testing.B, localWork int) {
wg := newSyncWgCompatible()
b.RunParallel(func(pb *testing.PB) {
foo := 0
for pb.Next() {
wg.wait()
for i := 0; i < localWork; i++ {
foo *= 2
foo /= 2
}
}
_ = foo
})
}
func BenchmarkWaitGroupwait(b *testing.B) {
benchmarkWaitGroupwait(b, 0)
}
func BenchmarkWaitGroupWaitWork(b *testing.B) {
benchmarkWaitGroupwait(b, 100)
}
func BenchmarkWaitGroupActuallywait(b *testing.B) {
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
wg := newSyncWgCompatible()
wg.add(1)
go func() {
wg.done()
}()
wg.wait()
}
})
}