mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

Progress notifications requested using ProgressRequest were sent directly using the ctrlStream, which means that they could race against watch responses in the watchStream. This would especially happen when the stream was not synced - e.g. if you requested a progress notification on a freshly created unsynced watcher, the notification would typically arrive indicating a revision for which not all watch responses had been sent. This changes the behaviour so that v3rpc always goes through the watch stream, using a new RequestProgressAll function that closely matches the behaviour of the v3rpc code - i.e. 1. Generate a message with WatchId -1, indicating the revision for *all* watchers in the stream 2. Guarantee that a response is (eventually) sent The latter might require us to defer the response until all watchers are synced, which is likely as it should be. Note that we do *not* guarantee that the number of progress notifications matches the number of requests, only that eventually at least one gets sent. Signed-off-by: Peter Wortmann <peter.wortmann@skao.int>
572 lines
14 KiB
Go
572 lines
14 KiB
Go
// Copyright 2015 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package mvcc
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
"go.etcd.io/etcd/pkg/v3/traceutil"
|
|
"go.etcd.io/etcd/server/v3/lease"
|
|
"go.etcd.io/etcd/server/v3/storage/backend"
|
|
"go.etcd.io/etcd/server/v3/storage/schema"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// non-const so modifiable by tests
|
|
var (
|
|
// chanBufLen is the length of the buffered chan
|
|
// for sending out watched events.
|
|
// See https://github.com/etcd-io/etcd/issues/11906 for more detail.
|
|
chanBufLen = 128
|
|
|
|
// maxWatchersPerSync is the number of watchers to sync in a single batch
|
|
maxWatchersPerSync = 512
|
|
)
|
|
|
|
type watchable interface {
|
|
watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
|
|
progress(w *watcher)
|
|
progressAll(watchers map[WatchID]*watcher) bool
|
|
rev() int64
|
|
}
|
|
|
|
type watchableStore struct {
|
|
*store
|
|
|
|
// mu protects watcher groups and batches. It should never be locked
|
|
// before locking store.mu to avoid deadlock.
|
|
mu sync.RWMutex
|
|
|
|
// victims are watcher batches that were blocked on the watch channel
|
|
victims []watcherBatch
|
|
victimc chan struct{}
|
|
|
|
// contains all unsynced watchers that needs to sync with events that have happened
|
|
unsynced watcherGroup
|
|
|
|
// contains all synced watchers that are in sync with the progress of the store.
|
|
// The key of the map is the key that the watcher watches on.
|
|
synced watcherGroup
|
|
|
|
stopc chan struct{}
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// cancelFunc updates unsynced and synced maps when running
|
|
// cancel operations.
|
|
type cancelFunc func()
|
|
|
|
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV {
|
|
return newWatchableStore(lg, b, le, cfg)
|
|
}
|
|
|
|
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
|
|
if lg == nil {
|
|
lg = zap.NewNop()
|
|
}
|
|
s := &watchableStore{
|
|
store: NewStore(lg, b, le, cfg),
|
|
victimc: make(chan struct{}, 1),
|
|
unsynced: newWatcherGroup(),
|
|
synced: newWatcherGroup(),
|
|
stopc: make(chan struct{}),
|
|
}
|
|
s.store.ReadView = &readView{s}
|
|
s.store.WriteView = &writeView{s}
|
|
if s.le != nil {
|
|
// use this store as the deleter so revokes trigger watch events
|
|
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
|
|
}
|
|
s.wg.Add(2)
|
|
go s.syncWatchersLoop()
|
|
go s.syncVictimsLoop()
|
|
return s
|
|
}
|
|
|
|
func (s *watchableStore) Close() error {
|
|
close(s.stopc)
|
|
s.wg.Wait()
|
|
return s.store.Close()
|
|
}
|
|
|
|
func (s *watchableStore) NewWatchStream() WatchStream {
|
|
watchStreamGauge.Inc()
|
|
return &watchStream{
|
|
watchable: s,
|
|
ch: make(chan WatchResponse, chanBufLen),
|
|
cancels: make(map[WatchID]cancelFunc),
|
|
watchers: make(map[WatchID]*watcher),
|
|
}
|
|
}
|
|
|
|
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
|
|
wa := &watcher{
|
|
key: key,
|
|
end: end,
|
|
minRev: startRev,
|
|
id: id,
|
|
ch: ch,
|
|
fcs: fcs,
|
|
}
|
|
|
|
s.mu.Lock()
|
|
s.revMu.RLock()
|
|
synced := startRev > s.store.currentRev || startRev == 0
|
|
if synced {
|
|
wa.minRev = s.store.currentRev + 1
|
|
if startRev > wa.minRev {
|
|
wa.minRev = startRev
|
|
}
|
|
s.synced.add(wa)
|
|
} else {
|
|
slowWatcherGauge.Inc()
|
|
s.unsynced.add(wa)
|
|
}
|
|
s.revMu.RUnlock()
|
|
s.mu.Unlock()
|
|
|
|
watcherGauge.Inc()
|
|
|
|
return wa, func() { s.cancelWatcher(wa) }
|
|
}
|
|
|
|
// cancelWatcher removes references of the watcher from the watchableStore
|
|
func (s *watchableStore) cancelWatcher(wa *watcher) {
|
|
for {
|
|
s.mu.Lock()
|
|
if s.unsynced.delete(wa) {
|
|
slowWatcherGauge.Dec()
|
|
watcherGauge.Dec()
|
|
break
|
|
} else if s.synced.delete(wa) {
|
|
watcherGauge.Dec()
|
|
break
|
|
} else if wa.compacted {
|
|
watcherGauge.Dec()
|
|
break
|
|
} else if wa.ch == nil {
|
|
// already canceled (e.g., cancel/close race)
|
|
break
|
|
}
|
|
|
|
if !wa.victim {
|
|
s.mu.Unlock()
|
|
panic("watcher not victim but not in watch groups")
|
|
}
|
|
|
|
var victimBatch watcherBatch
|
|
for _, wb := range s.victims {
|
|
if wb[wa] != nil {
|
|
victimBatch = wb
|
|
break
|
|
}
|
|
}
|
|
if victimBatch != nil {
|
|
slowWatcherGauge.Dec()
|
|
watcherGauge.Dec()
|
|
delete(victimBatch, wa)
|
|
break
|
|
}
|
|
|
|
// victim being processed so not accessible; retry
|
|
s.mu.Unlock()
|
|
time.Sleep(time.Millisecond)
|
|
}
|
|
|
|
wa.ch = nil
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *watchableStore) Restore(b backend.Backend) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
err := s.store.Restore(b)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for wa := range s.synced.watchers {
|
|
wa.restore = true
|
|
s.unsynced.add(wa)
|
|
}
|
|
s.synced = newWatcherGroup()
|
|
return nil
|
|
}
|
|
|
|
// syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
|
|
func (s *watchableStore) syncWatchersLoop() {
|
|
defer s.wg.Done()
|
|
|
|
waitDuration := 100 * time.Millisecond
|
|
delayTicker := time.NewTicker(waitDuration)
|
|
defer delayTicker.Stop()
|
|
|
|
for {
|
|
s.mu.RLock()
|
|
st := time.Now()
|
|
lastUnsyncedWatchers := s.unsynced.size()
|
|
s.mu.RUnlock()
|
|
|
|
unsyncedWatchers := 0
|
|
if lastUnsyncedWatchers > 0 {
|
|
unsyncedWatchers = s.syncWatchers()
|
|
}
|
|
syncDuration := time.Since(st)
|
|
|
|
delayTicker.Reset(waitDuration)
|
|
// more work pending?
|
|
if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
|
|
// be fair to other store operations by yielding time taken
|
|
delayTicker.Reset(syncDuration)
|
|
}
|
|
|
|
select {
|
|
case <-delayTicker.C:
|
|
case <-s.stopc:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// syncVictimsLoop tries to write precomputed watcher responses to
|
|
// watchers that had a blocked watcher channel
|
|
func (s *watchableStore) syncVictimsLoop() {
|
|
defer s.wg.Done()
|
|
|
|
for {
|
|
for s.moveVictims() != 0 {
|
|
// try to update all victim watchers
|
|
}
|
|
s.mu.RLock()
|
|
isEmpty := len(s.victims) == 0
|
|
s.mu.RUnlock()
|
|
|
|
var tickc <-chan time.Time
|
|
if !isEmpty {
|
|
tickc = time.After(10 * time.Millisecond)
|
|
}
|
|
|
|
select {
|
|
case <-tickc:
|
|
case <-s.victimc:
|
|
case <-s.stopc:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// moveVictims tries to update watches with already pending event data
|
|
func (s *watchableStore) moveVictims() (moved int) {
|
|
s.mu.Lock()
|
|
victims := s.victims
|
|
s.victims = nil
|
|
s.mu.Unlock()
|
|
|
|
var newVictim watcherBatch
|
|
for _, wb := range victims {
|
|
// try to send responses again
|
|
for w, eb := range wb {
|
|
// watcher has observed the store up to, but not including, w.minRev
|
|
rev := w.minRev - 1
|
|
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
|
|
pendingEventsGauge.Add(float64(len(eb.evs)))
|
|
} else {
|
|
if newVictim == nil {
|
|
newVictim = make(watcherBatch)
|
|
}
|
|
newVictim[w] = eb
|
|
continue
|
|
}
|
|
moved++
|
|
}
|
|
|
|
// assign completed victim watchers to unsync/sync
|
|
s.mu.Lock()
|
|
s.store.revMu.RLock()
|
|
curRev := s.store.currentRev
|
|
for w, eb := range wb {
|
|
if newVictim != nil && newVictim[w] != nil {
|
|
// couldn't send watch response; stays victim
|
|
continue
|
|
}
|
|
w.victim = false
|
|
if eb.moreRev != 0 {
|
|
w.minRev = eb.moreRev
|
|
}
|
|
if w.minRev <= curRev {
|
|
s.unsynced.add(w)
|
|
} else {
|
|
slowWatcherGauge.Dec()
|
|
s.synced.add(w)
|
|
}
|
|
}
|
|
s.store.revMu.RUnlock()
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
if len(newVictim) > 0 {
|
|
s.mu.Lock()
|
|
s.victims = append(s.victims, newVictim)
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
return moved
|
|
}
|
|
|
|
// syncWatchers syncs unsynced watchers by:
|
|
// 1. choose a set of watchers from the unsynced watcher group
|
|
// 2. iterate over the set to get the minimum revision and remove compacted watchers
|
|
// 3. use minimum revision to get all key-value pairs and send those events to watchers
|
|
// 4. remove synced watchers in set from unsynced group and move to synced group
|
|
func (s *watchableStore) syncWatchers() int {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if s.unsynced.size() == 0 {
|
|
return 0
|
|
}
|
|
|
|
s.store.revMu.RLock()
|
|
defer s.store.revMu.RUnlock()
|
|
|
|
// in order to find key-value pairs from unsynced watchers, we need to
|
|
// find min revision index, and these revisions can be used to
|
|
// query the backend store of key-value pairs
|
|
curRev := s.store.currentRev
|
|
compactionRev := s.store.compactMainRev
|
|
|
|
wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
|
|
minBytes, maxBytes := newRevBytes(), newRevBytes()
|
|
revToBytes(revision{main: minRev}, minBytes)
|
|
revToBytes(revision{main: curRev + 1}, maxBytes)
|
|
|
|
// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
|
|
// values are actual key-value pairs in backend.
|
|
tx := s.store.b.ReadTx()
|
|
tx.RLock()
|
|
revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0)
|
|
evs := kvsToEvents(s.store.lg, wg, revs, vs)
|
|
// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
|
|
// We can only unlock after Unmarshal, which will do deep copy.
|
|
// Otherwise we will trigger SIGSEGV during boltdb re-mmap.
|
|
tx.RUnlock()
|
|
|
|
victims := make(watcherBatch)
|
|
wb := newWatcherBatch(wg, evs)
|
|
for w := range wg.watchers {
|
|
w.minRev = curRev + 1
|
|
|
|
eb, ok := wb[w]
|
|
if !ok {
|
|
// bring un-notified watcher to synced
|
|
s.synced.add(w)
|
|
s.unsynced.delete(w)
|
|
continue
|
|
}
|
|
|
|
if eb.moreRev != 0 {
|
|
w.minRev = eb.moreRev
|
|
}
|
|
|
|
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
|
|
pendingEventsGauge.Add(float64(len(eb.evs)))
|
|
} else {
|
|
w.victim = true
|
|
}
|
|
|
|
if w.victim {
|
|
victims[w] = eb
|
|
} else {
|
|
if eb.moreRev != 0 {
|
|
// stay unsynced; more to read
|
|
continue
|
|
}
|
|
s.synced.add(w)
|
|
}
|
|
s.unsynced.delete(w)
|
|
}
|
|
s.addVictim(victims)
|
|
|
|
vsz := 0
|
|
for _, v := range s.victims {
|
|
vsz += len(v)
|
|
}
|
|
slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
|
|
|
|
return s.unsynced.size()
|
|
}
|
|
|
|
// kvsToEvents gets all events for the watchers from all key-value pairs
|
|
func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
|
|
for i, v := range vals {
|
|
var kv mvccpb.KeyValue
|
|
if err := kv.Unmarshal(v); err != nil {
|
|
lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
|
|
}
|
|
|
|
if !wg.contains(string(kv.Key)) {
|
|
continue
|
|
}
|
|
|
|
ty := mvccpb.PUT
|
|
if isTombstone(revs[i]) {
|
|
ty = mvccpb.DELETE
|
|
// patch in mod revision so watchers won't skip
|
|
kv.ModRevision = bytesToRev(revs[i]).main
|
|
}
|
|
evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty})
|
|
}
|
|
return evs
|
|
}
|
|
|
|
// notify notifies the fact that given event at the given rev just happened to
|
|
// watchers that watch on the key of the event.
|
|
func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
|
|
victim := make(watcherBatch)
|
|
for w, eb := range newWatcherBatch(&s.synced, evs) {
|
|
if eb.revs != 1 {
|
|
s.store.lg.Panic(
|
|
"unexpected multiple revisions in watch notification",
|
|
zap.Int("number-of-revisions", eb.revs),
|
|
)
|
|
}
|
|
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
|
|
pendingEventsGauge.Add(float64(len(eb.evs)))
|
|
} else {
|
|
// move slow watcher to victims
|
|
w.victim = true
|
|
victim[w] = eb
|
|
s.synced.delete(w)
|
|
slowWatcherGauge.Inc()
|
|
}
|
|
// always update minRev
|
|
// in case 'send' returns true and watcher stays synced, this is needed for Restore when all watchers become unsynced
|
|
// in case 'send' returns false, this is needed for syncWatchers
|
|
w.minRev = rev + 1
|
|
}
|
|
s.addVictim(victim)
|
|
}
|
|
|
|
func (s *watchableStore) addVictim(victim watcherBatch) {
|
|
if len(victim) == 0 {
|
|
return
|
|
}
|
|
s.victims = append(s.victims, victim)
|
|
select {
|
|
case s.victimc <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (s *watchableStore) rev() int64 { return s.store.Rev() }
|
|
|
|
func (s *watchableStore) progress(w *watcher) {
|
|
s.progressIfSync(map[WatchID]*watcher{w.id: w}, w.id)
|
|
}
|
|
|
|
func (s *watchableStore) progressAll(watchers map[WatchID]*watcher) bool {
|
|
return s.progressIfSync(watchers, clientv3.InvalidWatchID)
|
|
}
|
|
|
|
func (s *watchableStore) progressIfSync(watchers map[WatchID]*watcher, responseWatchID WatchID) bool {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
// Any watcher unsynced?
|
|
for _, w := range watchers {
|
|
if _, ok := s.synced.watchers[w]; !ok {
|
|
return false
|
|
}
|
|
}
|
|
|
|
// If all watchers are synchronised, send out progress
|
|
// notification on first watcher. Note that all watchers
|
|
// should have the same underlying stream, and the progress
|
|
// notification will be broadcasted client-side if required
|
|
// (see dispatchEvent in client/v3/watch.go)
|
|
for _, w := range watchers {
|
|
w.send(WatchResponse{WatchID: responseWatchID, Revision: s.rev()})
|
|
return true
|
|
}
|
|
return true
|
|
}
|
|
|
|
type watcher struct {
|
|
// the watcher key
|
|
key []byte
|
|
// end indicates the end of the range to watch.
|
|
// If end is set, the watcher is on a range.
|
|
end []byte
|
|
|
|
// victim is set when ch is blocked and undergoing victim processing
|
|
victim bool
|
|
|
|
// compacted is set when the watcher is removed because of compaction
|
|
compacted bool
|
|
|
|
// restore is true when the watcher is being restored from leader snapshot
|
|
// which means that this watcher has just been moved from "synced" to "unsynced"
|
|
// watcher group, possibly with a future revision when it was first added
|
|
// to the synced watcher
|
|
// "unsynced" watcher revision must always be <= current revision,
|
|
// except when the watcher were to be moved from "synced" watcher group
|
|
restore bool
|
|
|
|
// minRev is the minimum revision update the watcher will accept
|
|
minRev int64
|
|
id WatchID
|
|
|
|
fcs []FilterFunc
|
|
// a chan to send out the watch response.
|
|
// The chan might be shared with other watchers.
|
|
ch chan<- WatchResponse
|
|
}
|
|
|
|
func (w *watcher) send(wr WatchResponse) bool {
|
|
progressEvent := len(wr.Events) == 0
|
|
|
|
if len(w.fcs) != 0 {
|
|
ne := make([]mvccpb.Event, 0, len(wr.Events))
|
|
for i := range wr.Events {
|
|
filtered := false
|
|
for _, filter := range w.fcs {
|
|
if filter(wr.Events[i]) {
|
|
filtered = true
|
|
break
|
|
}
|
|
}
|
|
if !filtered {
|
|
ne = append(ne, wr.Events[i])
|
|
}
|
|
}
|
|
wr.Events = ne
|
|
}
|
|
|
|
// if all events are filtered out, we should send nothing.
|
|
if !progressEvent && len(wr.Events) == 0 {
|
|
return true
|
|
}
|
|
select {
|
|
case w.ch <- wr:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|