Merge pull request #9869 from jpbetz/progress

*: Add progress notify request watch request
This commit is contained in:
Joe Betz 2018-06-28 10:46:01 -07:00 committed by GitHub
commit 4df18a7316
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 807 additions and 415 deletions

View File

@ -740,7 +740,7 @@ Empty field.
| ----- | ----------- | ---- |
| cluster_id | cluster_id is the ID of the cluster which sent the response. | uint64 |
| member_id | member_id is the ID of the member which sent the response. | uint64 |
| revision | revision is the key-value store revision when the request was applied. | int64 |
| revision | revision is the key-value store revision when the request was applied. For watch progress responses, the header.revision indicates progress. All future events recieved in this stream are guaranteed to have a higher revision number than the header.revision number. | int64 |
| raft_term | raft_term is the raft term when the request was applied. | uint64 |
@ -840,6 +840,14 @@ From google paxosdb paper: Our implementation hinges around a powerful primitive
##### message `WatchProgressRequest` (etcdserver/etcdserverpb/rpc.proto)
Requests the a watch stream progress status be sent in the watch response stream as soon as possible.
Empty field.
##### message `WatchRequest` (etcdserver/etcdserverpb/rpc.proto)
| Field | Description | Type |
@ -847,6 +855,7 @@ From google paxosdb paper: Our implementation hinges around a powerful primitive
| request_union | request_union is a request to either create a new watcher or cancel an existing watcher. | oneof |
| create_request | | WatchCreateRequest |
| cancel_request | | WatchCancelRequest |
| progress_request | | WatchProgressRequest |

View File

@ -2195,7 +2195,7 @@
"format": "uint64"
},
"revision": {
"description": "revision is the key-value store revision when the request was applied.",
"description": "revision is the key-value store revision when the request was applied.\nFor watch progress responses, the header.revision indicates progress. All future events\nrecieved in this stream are guaranteed to have a higher revision number than the\nheader.revision number.",
"type": "string",
"format": "int64"
}
@ -2396,6 +2396,10 @@
}
}
},
"etcdserverpbWatchProgressRequest": {
"description": "Requests the a watch stream progress status be sent in the watch response stream as soon as\npossible.",
"type": "object"
},
"etcdserverpbWatchRequest": {
"type": "object",
"properties": {
@ -2404,6 +2408,9 @@
},
"create_request": {
"$ref": "#/definitions/etcdserverpbWatchCreateRequest"
},
"progress_request": {
"$ref": "#/definitions/etcdserverpbWatchProgressRequest"
}
}
},

View File

@ -168,7 +168,7 @@
"revision": {
"type": "string",
"format": "int64",
"description": "revision is the key-value store revision when the request was applied."
"description": "revision is the key-value store revision when the request was applied.\nFor watch progress responses, the header.revision indicates progress. All future events\nrecieved in this stream are guaranteed to have a higher revision number than the\nheader.revision number."
},
"raft_term": {
"type": "string",

View File

@ -87,7 +87,7 @@
"revision": {
"type": "string",
"format": "int64",
"description": "revision is the key-value store revision when the request was applied."
"description": "revision is the key-value store revision when the request was applied.\nFor watch progress responses, the header.revision indicates progress. All future events\nrecieved in this stream are guaranteed to have a higher revision number than the\nheader.revision number."
},
"raft_term": {
"type": "string",

View File

@ -355,6 +355,26 @@ foo # key
bar_latest # value of foo key after modification
```
## Watch progress
Applications may want to check the progress of a watch to determine how up-to-date the watch stream is. For example, if a watch is used to update a cache, it can be useful to know if the cache is stale compared to the revision from a quorum read.
Progress requests can be issued using the "progress" command in interactive watch session to ask the etcd server to send a progress notify update in the watch stream:
```bash
$ etcdctl watch -i
$ watch a
$ progress
progress notify: 1
# in another terminal: etcdctl put x 0
# in another terminal: etcdctl put y 1
$ progress
progress notify: 3
```
Note: The revision number in the progress notify response is the revision from the local etcd server node that the watch stream is connected to. If this node is partitioned and not part of quorum, this progress notify revision might be lower than
than the revision returned by a quorum read against a non-partitioned etcd server node.
## Compacted revisions
As we mentioned, etcd keeps revisions so that applications can read past versions of keys. However, to avoid accumulating an unbounded amount of history, it is important to compact past revisions. After compacting, etcd removes historical revisions, releasing resources for future use. All superseded data with revisions before the compacted revision will be unavailable.

View File

@ -582,6 +582,78 @@ func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
}
}
func TestWatchRequestProgress(t *testing.T) {
testCases := []struct {
name string
watchers []string
}{
{"0-watcher", []string{}},
{"1-watcher", []string{"/"}},
{"2-watcher", []string{"/", "/"}},
}
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
defer testutil.AfterTest(t)
watchTimeout := 3 * time.Second
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
wc := clus.RandClient()
var watchChans []clientv3.WatchChan
for _, prefix := range c.watchers {
watchChans = append(watchChans, wc.Watch(context.Background(), prefix, clientv3.WithPrefix()))
}
_, err := wc.Put(context.Background(), "/a", "1")
if err != nil {
t.Fatal(err)
}
for _, rch := range watchChans {
select {
case resp := <-rch: // wait for notification
if len(resp.Events) != 1 {
t.Fatalf("resp.Events expected 1, got %d", len(resp.Events))
}
case <-time.After(watchTimeout):
t.Fatalf("watch response expected in %v, but timed out", watchTimeout)
}
}
// put a value not being watched to increment revision
_, err = wc.Put(context.Background(), "x", "1")
if err != nil {
t.Fatal(err)
}
err = wc.RequestProgress(context.Background())
if err != nil {
t.Fatal(err)
}
// verify all watch channels receive a progress notify
for _, rch := range watchChans {
select {
case resp := <-rch:
if !resp.IsProgressNotify() {
t.Fatalf("expected resp.IsProgressNotify() == true")
}
if resp.Header.Revision != 3 {
t.Fatalf("resp.Header.Revision expected 3, got %d", resp.Header.Revision)
}
case <-time.After(watchTimeout):
t.Fatalf("progress response expected in %v, but timed out", watchTimeout)
}
}
})
}
}
func TestWatchEventType(t *testing.T) {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)

View File

@ -70,6 +70,9 @@ type Watcher interface {
// (see https://github.com/coreos/etcd/issues/8980)
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
// RequestProgress requests a progress notify response be sent in all watch channels.
RequestProgress(ctx context.Context) error
// Close closes the watcher and cancels all watch requests.
Close() error
}
@ -156,7 +159,7 @@ type watchGrpcStream struct {
resuming []*watcherStream
// reqc sends a watch request from Watch() to the main goroutine
reqc chan *watchRequest
reqc chan watchStreamRequest
// respc receives data from the watch client
respc chan *pb.WatchResponse
// donec closes to broadcast shutdown
@ -174,6 +177,11 @@ type watchGrpcStream struct {
closeErr error
}
// watchStreamRequest is a union of the supported watch request operation types
type watchStreamRequest interface {
toPB() *pb.WatchRequest
}
// watchRequest is issued by the subscriber to start a new watcher
type watchRequest struct {
ctx context.Context
@ -198,6 +206,10 @@ type watchRequest struct {
retc chan chan WatchResponse
}
// progressRequest is issued by the subscriber to request watch progress
type progressRequest struct {
}
// watcherStream represents a registered watcher
type watcherStream struct {
// initReq is the request that initiated this request
@ -255,7 +267,7 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
cancel: cancel,
substreams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
reqc: make(chan watchStreamRequest),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
@ -361,6 +373,42 @@ func (w *watcher) Close() (err error) {
return err
}
// RequestProgress requests a progress notify response be sent in all watch channels.
func (w *watcher) RequestProgress(ctx context.Context) (err error) {
ctxKey := streamKeyFromCtx(ctx)
w.mu.Lock()
if w.streams == nil {
return fmt.Errorf("no stream found for context")
}
wgs := w.streams[ctxKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
}
donec := wgs.donec
reqc := wgs.reqc
w.mu.Unlock()
pr := &progressRequest{}
select {
case reqc <- pr:
return nil
case <-ctx.Done():
if err == nil {
return ctx.Err()
}
return err
case <-donec:
if wgs.closeErr != nil {
return wgs.closeErr
}
// retry; may have dropped stream from no ctxs
return w.RequestProgress(ctx)
}
}
func (w *watchGrpcStream) close() (err error) {
w.cancel()
<-w.donec
@ -468,26 +516,31 @@ func (w *watchGrpcStream) run() {
for {
select {
// Watch() requested
case wreq := <-w.reqc:
outc := make(chan WatchResponse, 1)
// TODO: pass custom watch ID?
ws := &watcherStream{
initReq: *wreq,
id: -1,
outc: outc,
// unbuffered so resumes won't cause repeat events
recvc: make(chan *WatchResponse),
}
case req := <-w.reqc:
switch wreq := req.(type) {
case *watchRequest:
outc := make(chan WatchResponse, 1)
// TODO: pass custom watch ID?
ws := &watcherStream{
initReq: *wreq,
id: -1,
outc: outc,
// unbuffered so resumes won't cause repeat events
recvc: make(chan *WatchResponse),
}
ws.donec = make(chan struct{})
w.wg.Add(1)
go w.serveSubstream(ws, w.resumec)
ws.donec = make(chan struct{})
w.wg.Add(1)
go w.serveSubstream(ws, w.resumec)
// queue up for watcher creation/resume
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 {
// head of resume queue, can register a new watcher
wc.Send(ws.initReq.toPB())
// queue up for watcher creation/resume
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 {
// head of resume queue, can register a new watcher
wc.Send(ws.initReq.toPB())
}
case *progressRequest:
wc.Send(wreq.toPB())
}
// new events from the watch client
@ -614,7 +667,31 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
Canceled: pbresp.Canceled,
cancelReason: pbresp.CancelReason,
}
ws, ok := w.substreams[pbresp.WatchId]
// watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to
// indicate they should be broadcast.
if wr.IsProgressNotify() && pbresp.WatchId == -1 {
return w.broadcastResponse(wr)
}
return w.unicastResponse(wr, pbresp.WatchId)
}
// broadcastResponse send a watch response to all watch substreams.
func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
for _, ws := range w.substreams {
select {
case ws.recvc <- wr:
case <-ws.donec:
}
}
return true
}
// unicastResponse sends a watch response to a specific watch substream.
func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
ws, ok := w.substreams[watchId]
if !ok {
return false
}
@ -888,6 +965,13 @@ func (wr *watchRequest) toPB() *pb.WatchRequest {
return &pb.WatchRequest{RequestUnion: cr}
}
// toPB converts an internal progress request structure to its protobuf WatchRequest structure.
func (pr *progressRequest) toPB() *pb.WatchRequest {
req := &pb.WatchProgressRequest{}
cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req}
return &pb.WatchRequest{RequestUnion: cr}
}
func streamKeyFromCtx(ctx context.Context) string {
if md, ok := metadata.FromOutgoingContext(ctx); ok {
return fmt.Sprintf("%+v", md)

View File

@ -101,27 +101,33 @@ func watchInteractiveFunc(cmd *cobra.Command, osArgs []string, envKey, envRange
l = strings.TrimSuffix(l, "\n")
args := argify(l)
if len(args) < 2 && envKey == "" {
fmt.Fprintf(os.Stderr, "Invalid command %s (command type or key is not provided)\n", l)
continue
}
switch args[0] {
case "watch":
if len(args) < 2 && envKey == "" {
fmt.Fprintf(os.Stderr, "Invalid command %s (command type or key is not provided)\n", l)
continue
}
watchArgs, execArgs, perr := parseWatchArgs(osArgs, args, envKey, envRange, true)
if perr != nil {
ExitWithError(ExitBadArgs, perr)
}
if args[0] != "watch" {
ch, err := getWatchChan(c, watchArgs)
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
continue
}
go printWatchCh(c, ch, execArgs)
case "progress":
err := c.RequestProgress(clientv3.WithRequireLeader(context.Background()))
if err != nil {
ExitWithError(ExitError, err)
}
default:
fmt.Fprintf(os.Stderr, "Invalid command %s (only support watch)\n", l)
continue
}
watchArgs, execArgs, perr := parseWatchArgs(osArgs, args, envKey, envRange, true)
if perr != nil {
ExitWithError(ExitBadArgs, perr)
}
ch, err := getWatchChan(c, watchArgs)
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err)
continue
}
go printWatchCh(c, ch, execArgs)
}
}
@ -152,6 +158,9 @@ func printWatchCh(c *clientv3.Client, ch clientv3.WatchChan, execArgs []string)
if resp.Canceled {
fmt.Fprintf(os.Stderr, "watch was canceled (%v)\n", resp.Err())
}
if resp.IsProgressNotify() {
fmt.Fprintf(os.Stdout, "progress notify: %d\n", resp.Header.Revision)
}
display.Watch(resp)
if len(execArgs) > 0 {

View File

@ -317,7 +317,13 @@ func (sws *serverWatchStream) recvLoop() error {
sws.mu.Unlock()
}
}
case *pb.WatchRequest_ProgressRequest:
if uv.ProgressRequest != nil {
sws.ctrlStream <- &pb.WatchResponse{
Header: sws.newResponseHeader(sws.watchStream.Rev()),
WatchId: -1, // response is not associated with any WatchId and will be broadcast to all watch channels
}
}
default:
// we probably should not shutdown the entire stream when
// receive an valid command.

View File

@ -39,6 +39,7 @@
WatchRequest
WatchCreateRequest
WatchCancelRequest
WatchProgressRequest
WatchResponse
LeaseGrantRequest
LeaseGrantResponse

File diff suppressed because it is too large Load Diff

View File

@ -367,6 +367,9 @@ message ResponseHeader {
// member_id is the ID of the member which sent the response.
uint64 member_id = 2;
// revision is the key-value store revision when the request was applied.
// For watch progress responses, the header.revision indicates progress. All future events
// recieved in this stream are guaranteed to have a higher revision number than the
// header.revision number.
int64 revision = 3;
// raft_term is the raft term when the request was applied.
uint64 raft_term = 4;
@ -655,6 +658,7 @@ message WatchRequest {
oneof request_union {
WatchCreateRequest create_request = 1;
WatchCancelRequest cancel_request = 2;
WatchProgressRequest progress_request = 3;
}
}
@ -708,6 +712,11 @@ message WatchCancelRequest {
int64 watch_id = 1;
}
// Requests the a watch stream progress status be sent in the watch response stream as soon as
// possible.
message WatchProgressRequest {
}
message WatchResponse {
ResponseHeader header = 1;
// watch_id is the ID of the watcher that corresponds to the response.