diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index df8ca437d..07ca30c1a 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -111,7 +111,7 @@ func (sws *serverWatchStream) recvLoop() error { func (sws *serverWatchStream) sendLoop() { for { select { - case evs, ok := <-sws.watchStream.Chan(): + case wresp, ok := <-sws.watchStream.Chan(): if !ok { return } @@ -119,12 +119,13 @@ func (sws *serverWatchStream) sendLoop() { // TODO: evs is []storagepb.Event type // either return []*storagepb.Event from storage package // or define protocol buffer with []storagepb.Event. + evs := wresp.Events events := make([]*storagepb.Event, len(evs)) for i := range evs { events[i] = &evs[i] } - err := sws.gRPCStream.Send(&pb.WatchResponse{Events: events}) + err := sws.gRPCStream.Send(&pb.WatchResponse{WatchId: wresp.WatchID, Events: events}) storage.ReportEventReceived() if err != nil { return diff --git a/storage/kv_test.go b/storage/kv_test.go index 784e18877..5e43dedff 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -740,7 +740,7 @@ func TestWatchableKVWatch(t *testing.T) { s.Put([]byte("foo"), []byte("bar")) select { - case evs := <-w.Chan(): + case resp := <-w.Chan(): wev := storagepb.Event{ Type: storagepb.PUT, Kv: &storagepb.KeyValue{ @@ -750,9 +750,11 @@ func TestWatchableKVWatch(t *testing.T) { ModRevision: 1, Version: 1, }, - WatchID: wid, } - ev := evs[0] + if resp.WatchID != wid { + t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid) + } + ev := resp.Events[0] if !reflect.DeepEqual(ev, wev) { t.Errorf("watched event = %+v, want %+v", ev, wev) } @@ -762,7 +764,7 @@ func TestWatchableKVWatch(t *testing.T) { s.Put([]byte("foo1"), []byte("bar1")) select { - case evs := <-w.Chan(): + case resp := <-w.Chan(): wev := storagepb.Event{ Type: storagepb.PUT, Kv: &storagepb.KeyValue{ @@ -772,9 +774,11 @@ func TestWatchableKVWatch(t *testing.T) { ModRevision: 2, Version: 1, }, - WatchID: wid, } - ev := evs[0] + if resp.WatchID != wid { + t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid) + } + ev := resp.Events[0] if !reflect.DeepEqual(ev, wev) { t.Errorf("watched event = %+v, want %+v", ev, wev) } @@ -789,7 +793,7 @@ func TestWatchableKVWatch(t *testing.T) { defer cancel() select { - case evs := <-w.Chan(): + case resp := <-w.Chan(): wev := storagepb.Event{ Type: storagepb.PUT, Kv: &storagepb.KeyValue{ @@ -799,9 +803,11 @@ func TestWatchableKVWatch(t *testing.T) { ModRevision: 2, Version: 1, }, - WatchID: wid, } - ev := evs[0] + if resp.WatchID != wid { + t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid) + } + ev := resp.Events[0] if !reflect.DeepEqual(ev, wev) { t.Errorf("watched event = %+v, want %+v", ev, wev) } @@ -811,7 +817,7 @@ func TestWatchableKVWatch(t *testing.T) { s.Put([]byte("foo1"), []byte("bar11")) select { - case evs := <-w.Chan(): + case resp := <-w.Chan(): wev := storagepb.Event{ Type: storagepb.PUT, Kv: &storagepb.KeyValue{ @@ -821,9 +827,11 @@ func TestWatchableKVWatch(t *testing.T) { ModRevision: 3, Version: 2, }, - WatchID: wid, } - ev := evs[0] + if resp.WatchID != wid { + t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid) + } + ev := resp.Events[0] if !reflect.DeepEqual(ev, wev) { t.Errorf("watched event = %+v, want %+v", ev, wev) } diff --git a/storage/storagepb/kv.pb.go b/storage/storagepb/kv.pb.go index bdcb565db..971c312bc 100644 --- a/storage/storagepb/kv.pb.go +++ b/storage/storagepb/kv.pb.go @@ -72,8 +72,6 @@ type Event struct { // a delete/expire event contains the previous // key-value Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"` - // watchID is the ID of watching this event is sent to. - WatchID int64 `protobuf:"varint,3,opt,name=watchID,proto3" json:"watchID,omitempty"` } func (m *Event) Reset() { *m = Event{} } @@ -167,11 +165,6 @@ func (m *Event) MarshalTo(data []byte) (int, error) { } i += n1 } - if m.WatchID != 0 { - data[i] = 0x18 - i++ - i = encodeVarintKv(data, i, uint64(m.WatchID)) - } return i, nil } @@ -242,9 +235,6 @@ func (m *Event) Size() (n int) { l = m.Kv.Size() n += 1 + l + sovKv(uint64(l)) } - if m.WatchID != 0 { - n += 1 + sovKv(uint64(m.WatchID)) - } return n } @@ -485,22 +475,6 @@ func (m *Event) Unmarshal(data []byte) error { return err } iNdEx = postIndex - case 3: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field WatchID", wireType) - } - m.WatchID = 0 - for shift := uint(0); ; shift += 7 { - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - m.WatchID |= (int64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } default: var sizeOfWire int for { diff --git a/storage/storagepb/kv.proto b/storage/storagepb/kv.proto index 719da8895..123b5a174 100644 --- a/storage/storagepb/kv.proto +++ b/storage/storagepb/kv.proto @@ -35,6 +35,4 @@ message Event { // a delete/expire event contains the previous // key-value KeyValue kv = 2; - // watchID is the ID of watching this event is sent to. - int64 watchID = 3; } diff --git a/storage/watchable_store.go b/storage/watchable_store.go index f1ac98770..4ef68321e 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -33,7 +33,7 @@ const ( ) type watchable interface { - watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watcher, CancelFunc) + watch(key []byte, prefix bool, startRev, id int64, ch chan<- WatchResponse) (*watcher, CancelFunc) } type watchableStore struct { @@ -181,12 +181,12 @@ func (s *watchableStore) NewWatchStream() WatchStream { watchStreamGauge.Inc() return &watchStream{ watchable: s, - ch: make(chan []storagepb.Event, chanBufLen), + ch: make(chan WatchResponse, chanBufLen), cancels: make(map[int64]CancelFunc), } } -func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- []storagepb.Event) (*watcher, CancelFunc) { +func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- WatchResponse) (*watcher, CancelFunc) { s.mu.Lock() defer s.mu.Unlock() @@ -340,8 +340,9 @@ func (s *watchableStore) syncWatchers() { } for w, es := range newWatcherToEventMap(keyToUnsynced, evs) { + wr := WatchResponse{WatchID: w.id, Events: es} select { - case w.ch <- es: + case w.ch <- wr: pendingEventsGauge.Add(float64(len(es))) default: // TODO: handle the full unsynced watchers. @@ -374,8 +375,9 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) { continue } es := we[w] + wr := WatchResponse{WatchID: w.id, Events: es} select { - case w.ch <- es: + case w.ch <- wr: pendingEventsGauge.Add(float64(len(es))) default: // move slow watcher to unsynced @@ -427,9 +429,9 @@ type watcher struct { cur int64 id int64 - // a chan to send out the watched events. + // a chan to send out the watch response. // The chan might be shared with other watchers. - ch chan<- []storagepb.Event + ch chan<- WatchResponse } // unsafeAddWatcher puts watcher with key k into watchableStore's synced. @@ -475,7 +477,6 @@ func newWatcherToEventMap(sm map[string]map[*watcher]struct{}, evs []storagepb.E if !w.prefix && i != len(ev.Kv.Key) { continue } - ev.WatchID = w.id if _, ok := watcherToEvents[w]; !ok { watcherToEvents[w] = []storagepb.Event{} diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index 2895f637e..749ef15ea 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -186,7 +186,8 @@ func TestSyncWatchers(t *testing.T) { if len(w.(*watchStream).ch) != watcherN { t.Errorf("watched event size = %d, want %d", len(w.(*watchStream).ch), watcherN) } - evs := <-w.(*watchStream).ch + wr := <-w.(*watchStream).ch + evs := wr.Events if len(evs) != 1 { t.Errorf("len(evs) got = %d, want = 1", len(evs)) } diff --git a/storage/watcher.go b/storage/watcher.go index d2d493292..2d8a22711 100644 --- a/storage/watcher.go +++ b/storage/watcher.go @@ -39,8 +39,8 @@ type WatchStream interface { // TODO: remove the returned CancelFunc. Always use Cancel. Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc) - // Chan returns a chan. All watched events will be sent to the returned chan. - Chan() <-chan []storagepb.Event + // Chan returns a chan. All watch response will be sent to the returned chan. + Chan() <-chan WatchResponse // Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be // returned. @@ -50,11 +50,18 @@ type WatchStream interface { Close() } +type WatchResponse struct { + // WatchID is the ID of the watcher this response sent to. + WatchID int64 + // Events contains all the events that needs to send. + Events []storagepb.Event +} + // watchStream contains a collection of watchers that share // one streaming chan to send out watched events and other control events. type watchStream struct { watchable watchable - ch chan []storagepb.Event + ch chan WatchResponse mu sync.Mutex // guards fields below it // nextID is the ID pre-allocated for next new watcher in this stream @@ -80,7 +87,7 @@ func (ws *watchStream) Watch(key []byte, prefix bool, startRev int64) (id int64, return id, c } -func (ws *watchStream) Chan() <-chan []storagepb.Event { +func (ws *watchStream) Chan() <-chan WatchResponse { return ws.ch } diff --git a/storage/watcher_test.go b/storage/watcher_test.go index cbd5593a5..acf4fd783 100644 --- a/storage/watcher_test.go +++ b/storage/watcher_test.go @@ -36,12 +36,11 @@ func TestWatcherWatchID(t *testing.T) { s.Put([]byte("foo"), []byte("bar")) - evs := <-w.Chan() - for j, ev := range evs { - if ev.WatchID != id { - t.Errorf("#%d.%d: watch id in event = %d, want %d", i, j, ev.WatchID, id) - } + resp := <-w.Chan() + if resp.WatchID != id { + t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id) } + cancel() } @@ -55,11 +54,9 @@ func TestWatcherWatchID(t *testing.T) { } idm[id] = struct{}{} - evs := <-w.Chan() - for j, ev := range evs { - if ev.WatchID != id { - t.Errorf("#%d.%d: watch id in event = %d, want %d", i, j, ev.WatchID, id) - } + resp := <-w.Chan() + if resp.WatchID != id { + t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id) } cancel()