etcd/mvcc/watcher.go
Benjamin Wang 90e4d04c8e etcdserver: guarantee order of requested progress notification
Progress notifications requested using ProgressRequest were sent
directly using the ctrlStream, which means that they could race
against watch responses in the watchStream.

This would especially happen when the stream was not synced - e.g. if
you requested a progress notification on a freshly created unsynced
watcher, the notification would typically arrive indicating a revision
for which not all watch responses had been sent.

This changes the behaviour so that v3rpc always goes through the watch
stream, using a new RequestProgressAll function that closely matches
the behaviour of the v3rpc code - i.e.

1. Generate a message with WatchId -1, indicating the revision for
   *all* watchers in the stream

2. Guarantee that a response is (eventually) sent

The latter might require us to defer the response until all watchers
are synced, which is likely as it should be. Note that we do *not*
guarantee that the number of progress notifications matches the number
of requests, only that eventually at least one gets sent.

Signed-off-by: Benjamin Wang <wachao@vmware.com>
2023-04-11 12:47:09 +08:00

204 lines
5.8 KiB
Go

// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvcc
import (
"bytes"
"errors"
"sync"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
)
var (
ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
ErrEmptyWatcherRange = errors.New("mvcc: watcher range is empty")
ErrWatcherDuplicateID = errors.New("mvcc: duplicate watch ID provided on the WatchStream")
)
type WatchID int64
// FilterFunc returns true if the given event should be filtered out.
type FilterFunc func(e mvccpb.Event) bool
type WatchStream interface {
// Watch creates a watcher. The watcher watches the events happening or
// happened on the given key or range [key, end) from the given startRev.
//
// The whole event history can be watched unless compacted.
// If "startRev" <=0, watch observes events after currentRev.
//
// The returned "id" is the ID of this watcher. It appears as WatchID
// in events that are sent to the created watcher through stream channel.
// The watch ID is used when it's not equal to AutoWatchID. Otherwise,
// an auto-generated watch ID is returned.
Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error)
// Chan returns a chan. All watch response will be sent to the returned chan.
Chan() <-chan WatchResponse
// RequestProgress requests the progress of the watcher with given ID. The response
// will only be sent if the watcher is currently synced.
// The responses will be sent through the WatchRespone Chan attached
// with this stream to ensure correct ordering.
// The responses contains no events. The revision in the response is the progress
// of the watchers since the watcher is currently synced.
RequestProgress(id WatchID)
// RequestProgressAll requests a progress notification for all
// watchers sharing the stream. If all watchers are synced, a
// progress notification with watch ID -1 will be sent to an
// arbitrary watcher of this stream, and the function returns
// true.
RequestProgressAll() bool
// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
// returned.
Cancel(id WatchID) error
// Close closes Chan and release all related resources.
Close()
// Rev returns the current revision of the KV the stream watches on.
Rev() int64
}
type WatchResponse struct {
// WatchID is the WatchID of the watcher this response sent to.
WatchID WatchID
// Events contains all the events that needs to send.
Events []mvccpb.Event
// Revision is the revision of the KV when the watchResponse is created.
// For a normal response, the revision should be the same as the last
// modified revision inside Events. For a delayed response to a unsynced
// watcher, the revision is greater than the last modified revision
// inside Events.
Revision int64
// CompactRevision is set when the watcher is cancelled due to compaction.
CompactRevision int64
}
// watchStream contains a collection of watchers that share
// one streaming chan to send out watched events and other control events.
type watchStream struct {
watchable watchable
ch chan WatchResponse
mu sync.Mutex // guards fields below it
// nextID is the ID pre-allocated for next new watcher in this stream
nextID WatchID
closed bool
cancels map[WatchID]cancelFunc
watchers map[WatchID]*watcher
}
// Watch creates a new watcher in the stream and returns its WatchID.
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
// prevent wrong range where key >= end lexicographically
// watch request with 'WithFromKey' has empty-byte range end
if len(end) != 0 && bytes.Compare(key, end) != -1 {
return -1, ErrEmptyWatcherRange
}
ws.mu.Lock()
defer ws.mu.Unlock()
if ws.closed {
return -1, ErrEmptyWatcherRange
}
if id == clientv3.AutoWatchID {
for ws.watchers[ws.nextID] != nil {
ws.nextID++
}
id = ws.nextID
ws.nextID++
} else if _, ok := ws.watchers[id]; ok {
return -1, ErrWatcherDuplicateID
}
w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
ws.cancels[id] = c
ws.watchers[id] = w
return id, nil
}
func (ws *watchStream) Chan() <-chan WatchResponse {
return ws.ch
}
func (ws *watchStream) Cancel(id WatchID) error {
ws.mu.Lock()
cancel, ok := ws.cancels[id]
w := ws.watchers[id]
ok = ok && !ws.closed
ws.mu.Unlock()
if !ok {
return ErrWatcherNotExist
}
cancel()
ws.mu.Lock()
// The watch isn't removed until cancel so that if Close() is called,
// it will wait for the cancel. Otherwise, Close() could close the
// watch channel while the store is still posting events.
if ww := ws.watchers[id]; ww == w {
delete(ws.cancels, id)
delete(ws.watchers, id)
}
ws.mu.Unlock()
return nil
}
func (ws *watchStream) Close() {
ws.mu.Lock()
defer ws.mu.Unlock()
for _, cancel := range ws.cancels {
cancel()
}
ws.closed = true
close(ws.ch)
watchStreamGauge.Dec()
}
func (ws *watchStream) Rev() int64 {
ws.mu.Lock()
defer ws.mu.Unlock()
return ws.watchable.rev()
}
func (ws *watchStream) RequestProgress(id WatchID) {
ws.mu.Lock()
w, ok := ws.watchers[id]
ws.mu.Unlock()
if !ok {
return
}
ws.watchable.progress(w)
}
func (ws *watchStream) RequestProgressAll() bool {
ws.mu.Lock()
defer ws.mu.Unlock()
return ws.watchable.progressAll(ws.watchers)
}