pkg/wait: change list from single element to an array.

We found wait lock contention when a large amount of write operations. Converting wait from single element to an array helps to improve the performance.

Fixes #12731

Signed-off-by: Wilson Wang <wilsonny371@gmail.com>
This commit is contained in:
Wilson Wang 2021-03-04 11:02:00 -08:00
parent aefbd226b8
commit 432fde88a9

View File

@ -21,6 +21,13 @@ import (
"sync"
)
const (
// To avoid lock contention we use an array of list struct (rw mutex & map)
// for the id argument, we apply mod operation and uses its remainder to
// index into the array and find the corresponding element.
defaultListElementLength = 64
)
// Wait is an interface that provides the ability to wait and trigger events that
// are associated with IDs.
type Wait interface {
@ -34,33 +41,44 @@ type Wait interface {
}
type list struct {
e []listElement
}
type listElement struct {
l sync.RWMutex
m map[uint64]chan interface{}
}
// New creates a Wait.
func New() Wait {
return &list{m: make(map[uint64]chan interface{})}
res := list{
e: make([]listElement, defaultListElementLength),
}
for i := 0; i < len(res.e); i++ {
res.e[i].m = make(map[uint64]chan interface{})
}
return &res
}
func (w *list) Register(id uint64) <-chan interface{} {
w.l.Lock()
defer w.l.Unlock()
ch := w.m[id]
if ch == nil {
ch = make(chan interface{}, 1)
w.m[id] = ch
idx := id % defaultListElementLength
newCh := make(chan interface{}, 1)
w.e[idx].l.Lock()
defer w.e[idx].l.Unlock()
if _, ok := w.e[idx].m[id]; !ok {
w.e[idx].m[id] = newCh
} else {
log.Panicf("dup id %x", id)
}
return ch
return newCh
}
func (w *list) Trigger(id uint64, x interface{}) {
w.l.Lock()
ch := w.m[id]
delete(w.m, id)
w.l.Unlock()
idx := id % defaultListElementLength
w.e[idx].l.Lock()
ch := w.e[idx].m[id]
delete(w.e[idx].m, id)
w.e[idx].l.Unlock()
if ch != nil {
ch <- x
close(ch)
@ -68,9 +86,10 @@ func (w *list) Trigger(id uint64, x interface{}) {
}
func (w *list) IsRegistered(id uint64) bool {
w.l.RLock()
defer w.l.RUnlock()
_, ok := w.m[id]
idx := id % defaultListElementLength
w.e[idx].l.RLock()
defer w.e[idx].l.RUnlock()
_, ok := w.e[idx].m[id]
return ok
}