diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 1579815e8..55e5dbfe4 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -16,6 +16,7 @@ package v3rpc import ( "io" + "time" "github.com/coreos/etcd/etcdserver" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" @@ -39,6 +40,12 @@ func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer { } } +var ( + // expose for testing purpose. External test can change this to a + // small value to finish fast. + ProgressReportInterval = 10 * time.Minute +) + const ( // We send ctrl response inside the read loop. We do not want // send to block read, but we still want ctrl response we sent to @@ -61,6 +68,10 @@ type serverWatchStream struct { watchStream storage.WatchStream ctrlStream chan *pb.WatchResponse + // progress tracks the watchID that stream might need to send + // progress to. + progress map[storage.WatchID]bool + // closec indicates the stream is closed. closec chan struct{} } @@ -74,6 +85,7 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error { watchStream: ws.watchable.NewWatchStream(), // chan for sending control response like watcher created and canceled. ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen), + progress: make(map[storage.WatchID]bool), closec: make(chan struct{}), } defer sws.close() @@ -115,6 +127,9 @@ func (sws *serverWatchStream) recvLoop() error { id := storage.WatchID(-1) if !futureRev { id = sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev) + if creq.ProgressNotify { + sws.progress[id] = true + } } sws.ctrlStream <- &pb.WatchResponse{ Header: sws.newResponseHeader(wsrev), @@ -132,6 +147,7 @@ func (sws *serverWatchStream) recvLoop() error { WatchId: id, Canceled: true, } + delete(sws.progress, storage.WatchID(id)) } } // TODO: do we need to return error back to client? @@ -147,6 +163,9 @@ func (sws *serverWatchStream) sendLoop() { // watch responses pending on a watch id creation message pending := make(map[storage.WatchID][]*pb.WatchResponse) + progressTicker := time.NewTicker(ProgressReportInterval) + defer progressTicker.Stop() + for { select { case wresp, ok := <-sws.watchStream.Chan(): @@ -182,6 +201,10 @@ func (sws *serverWatchStream) sendLoop() { return } + if _, ok := sws.progress[wresp.WatchID]; ok { + sws.progress[wresp.WatchID] = false + } + case c, ok := <-sws.ctrlStream: if !ok { return @@ -208,6 +231,13 @@ func (sws *serverWatchStream) sendLoop() { } delete(pending, wid) } + case <-progressTicker.C: + for id, ok := range sws.progress { + if ok { + sws.watchStream.RequestProgress(id) + } + sws.progress[id] = true + } case <-sws.closec: // drain the chan to clean up pending events for range sws.watchStream.Chan() { diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index cac0e19f4..b461a8755 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -875,6 +875,11 @@ type WatchCreateRequest struct { RangeEnd []byte `protobuf:"bytes,2,opt,name=range_end,proto3" json:"range_end,omitempty"` // start_revision is an optional revision (including) to watch from. No start_revision is "now". StartRevision int64 `protobuf:"varint,3,opt,name=start_revision,proto3" json:"start_revision,omitempty"` + // if progress_notify is set, etcd server sends WatchResponse with empty events to the + // created watcher when there are no recent events. It is useful when clients want always to be + // able to recover a disconnected watcher from a recent known revision. + // etcdsever can decide how long it should send a notification based on current load. + ProgressNotify bool `protobuf:"varint,4,opt,name=progress_notify,proto3" json:"progress_notify,omitempty"` } func (m *WatchCreateRequest) Reset() { *m = WatchCreateRequest{} } @@ -3374,6 +3379,16 @@ func (m *WatchCreateRequest) MarshalTo(data []byte) (int, error) { i++ i = encodeVarintRpc(data, i, uint64(m.StartRevision)) } + if m.ProgressNotify { + data[i] = 0x20 + i++ + if m.ProgressNotify { + data[i] = 1 + } else { + data[i] = 0 + } + i++ + } return i, nil } @@ -5018,6 +5033,9 @@ func (m *WatchCreateRequest) Size() (n int) { if m.StartRevision != 0 { n += 1 + sovRpc(uint64(m.StartRevision)) } + if m.ProgressNotify { + n += 2 + } return n } @@ -7693,6 +7711,26 @@ func (m *WatchCreateRequest) Unmarshal(data []byte) error { break } } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ProgressNotify", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.ProgressNotify = bool(v != 0) default: iNdEx = preIndex skippy, err := skipRpc(data[iNdEx:]) diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index 08be24259..5e22218ba 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -311,6 +311,11 @@ message WatchCreateRequest { bytes range_end = 2; // start_revision is an optional revision (including) to watch from. No start_revision is "now". int64 start_revision = 3; + // if progress_notify is set, etcd server sends WatchResponse with empty events to the + // created watcher when there are no recent events. It is useful when clients want always to be + // able to recover a disconnected watcher from a recent known revision. + // etcdsever can decide how long it should send a notification based on current load. + bool progress_notify = 4; } message WatchCancelRequest { diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index e479cc84f..16ae7dcc9 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/etcdserver/api/v3rpc" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/storage/storagepb" @@ -820,3 +821,57 @@ func TestV3WatchInvalidFutureRevision(t *testing.T) { resp.WatchId, resp.Created, resp.Canceled, len(resp.Events)) } } + +func TestWatchWithProgressNotify(t *testing.T) { + testInterval := 3 * time.Second + pi := v3rpc.ProgressReportInterval + v3rpc.ProgressReportInterval = testInterval + defer func() { v3rpc.ProgressReportInterval = pi }() + + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx) + if wErr != nil { + t.Fatalf("wAPI.Watch error: %v", wErr) + } + + // create two watchers, one with progressNotify set. + wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 1, ProgressNotify: true}}} + if err := wStream.Send(wreq); err != nil { + t.Fatalf("watch request failed (%v)", err) + } + wreq = &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 1}}} + if err := wStream.Send(wreq); err != nil { + t.Fatalf("watch request failed (%v)", err) + } + + // two creation + one notification + for i := 0; i < 3; i++ { + rok, resp := waitResponse(wStream, testInterval+time.Second) + if resp.Created { + continue + } + + if rok { + t.Errorf("failed to receive response from watch stream") + } + if resp.Header.Revision != 1 { + t.Errorf("revision = %d, want 1", resp.Header.Revision) + } + if len(resp.Events) != 0 { + t.Errorf("len(resp.Events) = %d, want 0", len(resp.Events)) + } + } + + // no more notification + rok, resp := waitResponse(wStream, testInterval+time.Second) + if !rok { + t.Errorf("unexpected pb.WatchResponse is received %+v", resp) + } +}