mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

Progress notifications requested using ProgressRequest were sent directly using the ctrlStream, which means that they could race against watch responses in the watchStream. This would especially happen when the stream was not synced - e.g. if you requested a progress notification on a freshly created unsynced watcher, the notification would typically arrive indicating a revision for which not all watch responses had been sent. This changes the behaviour so that v3rpc always goes through the watch stream, using a new RequestProgressAll function that closely matches the behaviour of the v3rpc code - i.e. 1. Generate a message with WatchId -1, indicating the revision for *all* watchers in the stream 2. Guarantee that a response is (eventually) sent The latter might require us to defer the response until all watchers are synced, which is likely as it should be. Note that we do *not* guarantee that the number of progress notifications matches the number of requests, only that eventually at least one gets sent. Signed-off-by: Benjamin Wang <wachao@vmware.com>
204 lines
5.8 KiB
Go
204 lines
5.8 KiB
Go
// Copyright 2015 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package mvcc
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"sync"
|
|
|
|
"go.etcd.io/etcd/clientv3"
|
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
|
)
|
|
|
|
var (
|
|
ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
|
|
ErrEmptyWatcherRange = errors.New("mvcc: watcher range is empty")
|
|
ErrWatcherDuplicateID = errors.New("mvcc: duplicate watch ID provided on the WatchStream")
|
|
)
|
|
|
|
type WatchID int64
|
|
|
|
// FilterFunc returns true if the given event should be filtered out.
|
|
type FilterFunc func(e mvccpb.Event) bool
|
|
|
|
type WatchStream interface {
|
|
// Watch creates a watcher. The watcher watches the events happening or
|
|
// happened on the given key or range [key, end) from the given startRev.
|
|
//
|
|
// The whole event history can be watched unless compacted.
|
|
// If "startRev" <=0, watch observes events after currentRev.
|
|
//
|
|
// The returned "id" is the ID of this watcher. It appears as WatchID
|
|
// in events that are sent to the created watcher through stream channel.
|
|
// The watch ID is used when it's not equal to AutoWatchID. Otherwise,
|
|
// an auto-generated watch ID is returned.
|
|
Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error)
|
|
|
|
// Chan returns a chan. All watch response will be sent to the returned chan.
|
|
Chan() <-chan WatchResponse
|
|
|
|
// RequestProgress requests the progress of the watcher with given ID. The response
|
|
// will only be sent if the watcher is currently synced.
|
|
// The responses will be sent through the WatchRespone Chan attached
|
|
// with this stream to ensure correct ordering.
|
|
// The responses contains no events. The revision in the response is the progress
|
|
// of the watchers since the watcher is currently synced.
|
|
RequestProgress(id WatchID)
|
|
|
|
// RequestProgressAll requests a progress notification for all
|
|
// watchers sharing the stream. If all watchers are synced, a
|
|
// progress notification with watch ID -1 will be sent to an
|
|
// arbitrary watcher of this stream, and the function returns
|
|
// true.
|
|
RequestProgressAll() bool
|
|
|
|
// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
|
|
// returned.
|
|
Cancel(id WatchID) error
|
|
|
|
// Close closes Chan and release all related resources.
|
|
Close()
|
|
|
|
// Rev returns the current revision of the KV the stream watches on.
|
|
Rev() int64
|
|
}
|
|
|
|
type WatchResponse struct {
|
|
// WatchID is the WatchID of the watcher this response sent to.
|
|
WatchID WatchID
|
|
|
|
// Events contains all the events that needs to send.
|
|
Events []mvccpb.Event
|
|
|
|
// Revision is the revision of the KV when the watchResponse is created.
|
|
// For a normal response, the revision should be the same as the last
|
|
// modified revision inside Events. For a delayed response to a unsynced
|
|
// watcher, the revision is greater than the last modified revision
|
|
// inside Events.
|
|
Revision int64
|
|
|
|
// CompactRevision is set when the watcher is cancelled due to compaction.
|
|
CompactRevision int64
|
|
}
|
|
|
|
// watchStream contains a collection of watchers that share
|
|
// one streaming chan to send out watched events and other control events.
|
|
type watchStream struct {
|
|
watchable watchable
|
|
ch chan WatchResponse
|
|
|
|
mu sync.Mutex // guards fields below it
|
|
// nextID is the ID pre-allocated for next new watcher in this stream
|
|
nextID WatchID
|
|
closed bool
|
|
cancels map[WatchID]cancelFunc
|
|
watchers map[WatchID]*watcher
|
|
}
|
|
|
|
// Watch creates a new watcher in the stream and returns its WatchID.
|
|
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
|
|
// prevent wrong range where key >= end lexicographically
|
|
// watch request with 'WithFromKey' has empty-byte range end
|
|
if len(end) != 0 && bytes.Compare(key, end) != -1 {
|
|
return -1, ErrEmptyWatcherRange
|
|
}
|
|
|
|
ws.mu.Lock()
|
|
defer ws.mu.Unlock()
|
|
if ws.closed {
|
|
return -1, ErrEmptyWatcherRange
|
|
}
|
|
|
|
if id == clientv3.AutoWatchID {
|
|
for ws.watchers[ws.nextID] != nil {
|
|
ws.nextID++
|
|
}
|
|
id = ws.nextID
|
|
ws.nextID++
|
|
} else if _, ok := ws.watchers[id]; ok {
|
|
return -1, ErrWatcherDuplicateID
|
|
}
|
|
|
|
w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
|
|
|
|
ws.cancels[id] = c
|
|
ws.watchers[id] = w
|
|
return id, nil
|
|
}
|
|
|
|
func (ws *watchStream) Chan() <-chan WatchResponse {
|
|
return ws.ch
|
|
}
|
|
|
|
func (ws *watchStream) Cancel(id WatchID) error {
|
|
ws.mu.Lock()
|
|
cancel, ok := ws.cancels[id]
|
|
w := ws.watchers[id]
|
|
ok = ok && !ws.closed
|
|
ws.mu.Unlock()
|
|
|
|
if !ok {
|
|
return ErrWatcherNotExist
|
|
}
|
|
cancel()
|
|
|
|
ws.mu.Lock()
|
|
// The watch isn't removed until cancel so that if Close() is called,
|
|
// it will wait for the cancel. Otherwise, Close() could close the
|
|
// watch channel while the store is still posting events.
|
|
if ww := ws.watchers[id]; ww == w {
|
|
delete(ws.cancels, id)
|
|
delete(ws.watchers, id)
|
|
}
|
|
ws.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ws *watchStream) Close() {
|
|
ws.mu.Lock()
|
|
defer ws.mu.Unlock()
|
|
|
|
for _, cancel := range ws.cancels {
|
|
cancel()
|
|
}
|
|
ws.closed = true
|
|
close(ws.ch)
|
|
watchStreamGauge.Dec()
|
|
}
|
|
|
|
func (ws *watchStream) Rev() int64 {
|
|
ws.mu.Lock()
|
|
defer ws.mu.Unlock()
|
|
return ws.watchable.rev()
|
|
}
|
|
|
|
func (ws *watchStream) RequestProgress(id WatchID) {
|
|
ws.mu.Lock()
|
|
w, ok := ws.watchers[id]
|
|
ws.mu.Unlock()
|
|
if !ok {
|
|
return
|
|
}
|
|
ws.watchable.progress(w)
|
|
}
|
|
|
|
func (ws *watchStream) RequestProgressAll() bool {
|
|
ws.mu.Lock()
|
|
defer ws.mu.Unlock()
|
|
return ws.watchable.progressAll(ws.watchers)
|
|
}
|