mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
commit
dc7f9a89b0
@ -16,6 +16,7 @@ package v3rpc
|
||||
|
||||
import (
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
@ -39,6 +40,12 @@ func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer {
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
// expose for testing purpose. External test can change this to a
|
||||
// small value to finish fast.
|
||||
ProgressReportInterval = 10 * time.Minute
|
||||
)
|
||||
|
||||
const (
|
||||
// We send ctrl response inside the read loop. We do not want
|
||||
// send to block read, but we still want ctrl response we sent to
|
||||
@ -61,6 +68,10 @@ type serverWatchStream struct {
|
||||
watchStream storage.WatchStream
|
||||
ctrlStream chan *pb.WatchResponse
|
||||
|
||||
// progress tracks the watchID that stream might need to send
|
||||
// progress to.
|
||||
progress map[storage.WatchID]bool
|
||||
|
||||
// closec indicates the stream is closed.
|
||||
closec chan struct{}
|
||||
}
|
||||
@ -74,6 +85,7 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
|
||||
watchStream: ws.watchable.NewWatchStream(),
|
||||
// chan for sending control response like watcher created and canceled.
|
||||
ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),
|
||||
progress: make(map[storage.WatchID]bool),
|
||||
closec: make(chan struct{}),
|
||||
}
|
||||
defer sws.close()
|
||||
@ -115,6 +127,9 @@ func (sws *serverWatchStream) recvLoop() error {
|
||||
id := storage.WatchID(-1)
|
||||
if !futureRev {
|
||||
id = sws.watchStream.Watch(creq.Key, creq.RangeEnd, rev)
|
||||
if creq.ProgressNotify {
|
||||
sws.progress[id] = true
|
||||
}
|
||||
}
|
||||
sws.ctrlStream <- &pb.WatchResponse{
|
||||
Header: sws.newResponseHeader(wsrev),
|
||||
@ -132,6 +147,7 @@ func (sws *serverWatchStream) recvLoop() error {
|
||||
WatchId: id,
|
||||
Canceled: true,
|
||||
}
|
||||
delete(sws.progress, storage.WatchID(id))
|
||||
}
|
||||
}
|
||||
// TODO: do we need to return error back to client?
|
||||
@ -147,6 +163,9 @@ func (sws *serverWatchStream) sendLoop() {
|
||||
// watch responses pending on a watch id creation message
|
||||
pending := make(map[storage.WatchID][]*pb.WatchResponse)
|
||||
|
||||
progressTicker := time.NewTicker(ProgressReportInterval)
|
||||
defer progressTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case wresp, ok := <-sws.watchStream.Chan():
|
||||
@ -182,6 +201,10 @@ func (sws *serverWatchStream) sendLoop() {
|
||||
return
|
||||
}
|
||||
|
||||
if _, ok := sws.progress[wresp.WatchID]; ok {
|
||||
sws.progress[wresp.WatchID] = false
|
||||
}
|
||||
|
||||
case c, ok := <-sws.ctrlStream:
|
||||
if !ok {
|
||||
return
|
||||
@ -208,6 +231,13 @@ func (sws *serverWatchStream) sendLoop() {
|
||||
}
|
||||
delete(pending, wid)
|
||||
}
|
||||
case <-progressTicker.C:
|
||||
for id, ok := range sws.progress {
|
||||
if ok {
|
||||
sws.watchStream.RequestProgress(id)
|
||||
}
|
||||
sws.progress[id] = true
|
||||
}
|
||||
case <-sws.closec:
|
||||
// drain the chan to clean up pending events
|
||||
for range sws.watchStream.Chan() {
|
||||
|
@ -875,6 +875,11 @@ type WatchCreateRequest struct {
|
||||
RangeEnd []byte `protobuf:"bytes,2,opt,name=range_end,proto3" json:"range_end,omitempty"`
|
||||
// start_revision is an optional revision (including) to watch from. No start_revision is "now".
|
||||
StartRevision int64 `protobuf:"varint,3,opt,name=start_revision,proto3" json:"start_revision,omitempty"`
|
||||
// if progress_notify is set, etcd server sends WatchResponse with empty events to the
|
||||
// created watcher when there are no recent events. It is useful when clients want always to be
|
||||
// able to recover a disconnected watcher from a recent known revision.
|
||||
// etcdsever can decide how long it should send a notification based on current load.
|
||||
ProgressNotify bool `protobuf:"varint,4,opt,name=progress_notify,proto3" json:"progress_notify,omitempty"`
|
||||
}
|
||||
|
||||
func (m *WatchCreateRequest) Reset() { *m = WatchCreateRequest{} }
|
||||
@ -3374,6 +3379,16 @@ func (m *WatchCreateRequest) MarshalTo(data []byte) (int, error) {
|
||||
i++
|
||||
i = encodeVarintRpc(data, i, uint64(m.StartRevision))
|
||||
}
|
||||
if m.ProgressNotify {
|
||||
data[i] = 0x20
|
||||
i++
|
||||
if m.ProgressNotify {
|
||||
data[i] = 1
|
||||
} else {
|
||||
data[i] = 0
|
||||
}
|
||||
i++
|
||||
}
|
||||
return i, nil
|
||||
}
|
||||
|
||||
@ -5018,6 +5033,9 @@ func (m *WatchCreateRequest) Size() (n int) {
|
||||
if m.StartRevision != 0 {
|
||||
n += 1 + sovRpc(uint64(m.StartRevision))
|
||||
}
|
||||
if m.ProgressNotify {
|
||||
n += 2
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
@ -7693,6 +7711,26 @@ func (m *WatchCreateRequest) Unmarshal(data []byte) error {
|
||||
break
|
||||
}
|
||||
}
|
||||
case 4:
|
||||
if wireType != 0 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field ProgressNotify", wireType)
|
||||
}
|
||||
var v int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowRpc
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := data[iNdEx]
|
||||
iNdEx++
|
||||
v |= (int(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
m.ProgressNotify = bool(v != 0)
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipRpc(data[iNdEx:])
|
||||
|
@ -311,6 +311,11 @@ message WatchCreateRequest {
|
||||
bytes range_end = 2;
|
||||
// start_revision is an optional revision (including) to watch from. No start_revision is "now".
|
||||
int64 start_revision = 3;
|
||||
// if progress_notify is set, etcd server sends WatchResponse with empty events to the
|
||||
// created watcher when there are no recent events. It is useful when clients want always to be
|
||||
// able to recover a disconnected watcher from a recent known revision.
|
||||
// etcdsever can decide how long it should send a notification based on current load.
|
||||
bool progress_notify = 4;
|
||||
}
|
||||
|
||||
message WatchCancelRequest {
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
@ -820,3 +821,57 @@ func TestV3WatchInvalidFutureRevision(t *testing.T) {
|
||||
resp.WatchId, resp.Created, resp.Canceled, len(resp.Events))
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchWithProgressNotify(t *testing.T) {
|
||||
testInterval := 3 * time.Second
|
||||
pi := v3rpc.ProgressReportInterval
|
||||
v3rpc.ProgressReportInterval = testInterval
|
||||
defer func() { v3rpc.ProgressReportInterval = pi }()
|
||||
|
||||
defer testutil.AfterTest(t)
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx)
|
||||
if wErr != nil {
|
||||
t.Fatalf("wAPI.Watch error: %v", wErr)
|
||||
}
|
||||
|
||||
// create two watchers, one with progressNotify set.
|
||||
wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
|
||||
CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 1, ProgressNotify: true}}}
|
||||
if err := wStream.Send(wreq); err != nil {
|
||||
t.Fatalf("watch request failed (%v)", err)
|
||||
}
|
||||
wreq = &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
|
||||
CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 1}}}
|
||||
if err := wStream.Send(wreq); err != nil {
|
||||
t.Fatalf("watch request failed (%v)", err)
|
||||
}
|
||||
|
||||
// two creation + one notification
|
||||
for i := 0; i < 3; i++ {
|
||||
rok, resp := waitResponse(wStream, testInterval+time.Second)
|
||||
if resp.Created {
|
||||
continue
|
||||
}
|
||||
|
||||
if rok {
|
||||
t.Errorf("failed to receive response from watch stream")
|
||||
}
|
||||
if resp.Header.Revision != 1 {
|
||||
t.Errorf("revision = %d, want 1", resp.Header.Revision)
|
||||
}
|
||||
if len(resp.Events) != 0 {
|
||||
t.Errorf("len(resp.Events) = %d, want 0", len(resp.Events))
|
||||
}
|
||||
}
|
||||
|
||||
// no more notification
|
||||
rok, resp := waitResponse(wStream, testInterval+time.Second)
|
||||
if !rok {
|
||||
t.Errorf("unexpected pb.WatchResponse is received %+v", resp)
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user