diff --git a/storage/kv_test.go b/storage/kv_test.go index 4b3f8e980..66de472bb 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -735,7 +735,7 @@ func TestWatchableKVWatch(t *testing.T) { w := s.NewWatcher() - cancel := w.Watch([]byte("foo"), true, 0) + wid, cancel := w.Watch([]byte("foo"), true, 0) defer cancel() s.Put([]byte("foo"), []byte("bar")) @@ -750,6 +750,7 @@ func TestWatchableKVWatch(t *testing.T) { ModRevision: 1, Version: 1, }, + WatchID: wid, } if !reflect.DeepEqual(ev, wev) { t.Errorf("watched event = %+v, want %+v", ev, wev) @@ -770,6 +771,7 @@ func TestWatchableKVWatch(t *testing.T) { ModRevision: 2, Version: 1, }, + WatchID: wid, } if !reflect.DeepEqual(ev, wev) { t.Errorf("watched event = %+v, want %+v", ev, wev) @@ -778,7 +780,10 @@ func TestWatchableKVWatch(t *testing.T) { t.Fatalf("failed to watch the event") } - cancel = w.Watch([]byte("foo1"), false, 1) + w.Close() + + w = s.NewWatcher() + wid, cancel = w.Watch([]byte("foo1"), false, 1) defer cancel() select { @@ -792,6 +797,7 @@ func TestWatchableKVWatch(t *testing.T) { ModRevision: 2, Version: 1, }, + WatchID: wid, } if !reflect.DeepEqual(ev, wev) { t.Errorf("watched event = %+v, want %+v", ev, wev) @@ -812,6 +818,7 @@ func TestWatchableKVWatch(t *testing.T) { ModRevision: 3, Version: 2, }, + WatchID: wid, } if !reflect.DeepEqual(ev, wev) { t.Errorf("watched event = %+v, want %+v", ev, wev) diff --git a/storage/storagepb/kv.pb.go b/storage/storagepb/kv.pb.go index d7bfc9bba..3684363fe 100644 --- a/storage/storagepb/kv.pb.go +++ b/storage/storagepb/kv.pb.go @@ -68,7 +68,8 @@ type Event struct { // a put event contains the current key-value // a delete/expire event contains the previous // key-value - Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"` + Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"` + WatchID int64 `protobuf:"varint,3,opt,name=watchID,proto3" json:"watchID,omitempty"` } func (m *Event) Reset() { *m = Event{} } @@ -157,6 +158,11 @@ func (m *Event) MarshalTo(data []byte) (int, error) { } i += n1 } + if m.WatchID != 0 { + data[i] = 0x18 + i++ + i = encodeVarintKv(data, i, uint64(m.WatchID)) + } return i, nil } @@ -224,6 +230,9 @@ func (m *Event) Size() (n int) { l = m.Kv.Size() n += 1 + l + sovKv(uint64(l)) } + if m.WatchID != 0 { + n += 1 + sovKv(uint64(m.WatchID)) + } return n } @@ -448,6 +457,22 @@ func (m *Event) Unmarshal(data []byte) error { return err } iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field WatchID", wireType) + } + m.WatchID = 0 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.WatchID |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: var sizeOfWire int for { diff --git a/storage/storagepb/kv.proto b/storage/storagepb/kv.proto index e1ac0b94a..e08c9ac73 100644 --- a/storage/storagepb/kv.proto +++ b/storage/storagepb/kv.proto @@ -32,5 +32,6 @@ message Event { // a delete/expire event contains the previous // key-value KeyValue kv = 2; + // watchID is the ID of watching this event is sent to. + int64 watchID = 3; } - diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 96f0688d1..09b5b5ff2 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -32,7 +32,7 @@ const ( ) type watchable interface { - watch(key []byte, prefix bool, startRev int64, ch chan<- storagepb.Event) (*watching, CancelFunc) + watch(key []byte, prefix bool, startRev, id int64, ch chan<- storagepb.Event) (*watching, CancelFunc) } type watchableStore struct { @@ -173,7 +173,7 @@ func (s *watchableStore) NewWatcher() Watcher { } } -func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, ch chan<- storagepb.Event) (*watching, CancelFunc) { +func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- storagepb.Event) (*watching, CancelFunc) { s.mu.Lock() defer s.mu.Unlock() @@ -181,6 +181,7 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, ch chan< key: key, prefix: prefix, cur: startRev, + id: id, ch: ch, } @@ -273,8 +274,9 @@ func (s *watchableStore) syncWatchings() { } w.ch <- storagepb.Event{ - Type: evt, - Kv: &kv, + Type: evt, + Kv: &kv, + WatchID: w.id, } pendingEventsGauge.Inc() } @@ -311,6 +313,7 @@ func (s *watchableStore) notify(rev int64, ev storagepb.Event) { if !w.prefix && i != len(ev.Kv.Key) { continue } + ev.WatchID = w.id select { case w.ch <- ev: pendingEventsGauge.Inc() @@ -362,6 +365,7 @@ type watching struct { // If cur is behind the current revision of the KV, // watching is unsynced and needs to catch up. cur int64 + id int64 // a chan to send out the watched events. // The chan might be shared with other watchings. diff --git a/storage/watchable_store_bench_test.go b/storage/watchable_store_bench_test.go index a4d6ad861..92b8273a4 100644 --- a/storage/watchable_store_bench_test.go +++ b/storage/watchable_store_bench_test.go @@ -65,7 +65,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { cancels := make([]CancelFunc, watcherSize) for i := 0; i < watcherSize; i++ { // non-0 value to keep watchers in unsynced - cancel := w.Watch(testKey, true, 1) + _, cancel := w.Watch(testKey, true, 1) cancels[i] = cancel } @@ -102,7 +102,7 @@ func BenchmarkWatchableStoreSyncedCancel(b *testing.B) { cancels := make([]CancelFunc, watcherSize) for i := 0; i < watcherSize; i++ { // 0 for startRev to keep watchers in synced - cancel := w.Watch(testKey, true, 0) + _, cancel := w.Watch(testKey, true, 0) cancels[i] = cancel } diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index eefae15a3..70572f852 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -49,7 +49,7 @@ func TestNewWatcherCancel(t *testing.T) { s.Put(testKey, testValue) w := s.NewWatcher() - cancel := w.Watch(testKey, true, 0) + _, cancel := w.Watch(testKey, true, 0) cancel() diff --git a/storage/watcher.go b/storage/watcher.go index b9c46e7d0..298971291 100644 --- a/storage/watcher.go +++ b/storage/watcher.go @@ -26,7 +26,9 @@ type Watcher interface { // The whole event history can be watched unless compacted. // If `prefix` is true, watch observes all events whose key prefix could be the given `key`. // If `startRev` <=0, watch observes events after currentRev. - Watch(key []byte, prefix bool, startRev int64) CancelFunc + // The returned `id` is the ID of this watching. It appears as WatchID + // in events that are sent to this watching. + Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc) // Chan returns a chan. All watched events will be sent to the returned chan. Chan() <-chan storagepb.Event @@ -42,21 +44,27 @@ type watcher struct { ch chan storagepb.Event mu sync.Mutex // guards fields below it + nextID int64 // nextID is the ID allocated for next new watching closed bool cancels []CancelFunc } // TODO: return error if ws is closed? -func (ws *watcher) Watch(key []byte, prefix bool, startRev int64) CancelFunc { - _, c := ws.watchable.watch(key, prefix, startRev, ws.ch) +func (ws *watcher) Watch(key []byte, prefix bool, startRev int64) (id int64, cancel CancelFunc) { ws.mu.Lock() defer ws.mu.Unlock() if ws.closed { - return nil + return -1, nil } + + id = ws.nextID + ws.nextID++ + + _, c := ws.watchable.watch(key, prefix, startRev, id, ws.ch) + // TODO: cancelFunc needs to be removed from the cancels when it is called. ws.cancels = append(ws.cancels, c) - return c + return id, c } func (ws *watcher) Chan() <-chan storagepb.Event { diff --git a/storage/watcher_test.go b/storage/watcher_test.go new file mode 100644 index 000000000..c3adabf69 --- /dev/null +++ b/storage/watcher_test.go @@ -0,0 +1,64 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import "testing" + +// TestWatcherWatchID tests that each watcher provides unique watch ID, +// and the watched event attaches the correct watch ID. +func TestWatcherWatchID(t *testing.T) { + s := WatchableKV(newWatchableStore(tmpPath)) + defer cleanup(s, tmpPath) + + w := s.NewWatcher() + defer w.Close() + + idm := make(map[int64]struct{}) + + // synced watchings + for i := 0; i < 10; i++ { + id, cancel := w.Watch([]byte("foo"), false, 0) + if _, ok := idm[id]; ok { + t.Errorf("#%d: id %d exists", i, id) + } + idm[id] = struct{}{} + + s.Put([]byte("foo"), []byte("bar")) + + ev := <-w.Chan() + if ev.WatchID != id { + t.Errorf("#%d: watch id in event = %d, want %d", i, ev.WatchID, id) + } + + cancel() + } + + s.Put([]byte("foo2"), []byte("bar")) + // unsynced watchings + for i := 10; i < 20; i++ { + id, cancel := w.Watch([]byte("foo2"), false, 1) + if _, ok := idm[id]; ok { + t.Errorf("#%d: id %d exists", i, id) + } + idm[id] = struct{}{} + + ev := <-w.Chan() + if ev.WatchID != id { + t.Errorf("#%d: watch id in event = %d, want %d", i, ev.WatchID, id) + } + + cancel() + } +}