From 9143329c85cb5e5ce35752acf0c164ead0db5276 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 2 Mar 2016 22:34:54 -0800 Subject: [PATCH] storage: implement requestProgress --- storage/watchable_store.go | 29 +++++++++++++++++- storage/watcher.go | 29 +++++++++++++++--- storage/watcher_group.go | 5 ++++ storage/watcher_test.go | 60 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 118 insertions(+), 5 deletions(-) diff --git a/storage/watchable_store.go b/storage/watchable_store.go index e9d3b8b8e..36521cfc1 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -34,6 +34,7 @@ const ( type watchable interface { watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse) (*watcher, cancelFunc) + progress(w *watcher) rev() int64 } @@ -168,6 +169,7 @@ func (s *watchableStore) NewWatchStream() WatchStream { watchable: s, ch: make(chan WatchResponse, chanBufLen), cancels: make(map[WatchID]cancelFunc), + watchers: make(map[WatchID]*watcher), } } @@ -267,7 +269,9 @@ func (s *watchableStore) syncWatchers() { evs := kvsToEvents(&s.unsynced, revs, vs) tx.Unlock() - for w, eb := range newWatcherBatch(&s.unsynced, evs) { + wb := newWatcherBatch(&s.unsynced, evs) + + for w, eb := range wb { select { // s.store.Rev also uses Lock, so just return directly case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.store.currentRev.main}: @@ -287,6 +291,15 @@ func (s *watchableStore) syncWatchers() { s.unsynced.delete(w) } + // bring all un-notified watchers to synced. + for w := range s.unsynced.watchers { + if !wb.contains(w) { + w.cur = curRev + s.synced.add(w) + s.unsynced.delete(w) + } + } + slowWatcherGauge.Set(float64(s.unsynced.size())) } @@ -335,6 +348,20 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) { func (s *watchableStore) rev() int64 { return s.store.Rev() } +func (s *watchableStore) progress(w *watcher) { + s.mu.Lock() + defer s.mu.Unlock() + + if _, ok := s.synced.watchers[w]; ok { + select { + case w.ch <- WatchResponse{WatchID: w.id, Revision: s.rev()}: + default: + // If the ch is full, this watcher is receiving events. + // We do not need to send progress at all. + } + } +} + type watcher struct { // the watcher key key []byte diff --git a/storage/watcher.go b/storage/watcher.go index f609db6e9..6449c453a 100644 --- a/storage/watcher.go +++ b/storage/watcher.go @@ -42,6 +42,14 @@ type WatchStream interface { // Chan returns a chan. All watch response will be sent to the returned chan. Chan() <-chan WatchResponse + // RequestProgress requests the progress of the watcher with given ID. The response + // will only be sent if the watcher is currently synced. + // The responses will be sent through the WatchRespone Chan attached + // with this stream to ensure correct ordering. + // The responses contains no events. The revision in the response is the progress + // of the watchers since the watcher is currently synced. + RequestProgress(id WatchID) + // Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be // returned. Cancel(id WatchID) error @@ -79,9 +87,10 @@ type watchStream struct { mu sync.Mutex // guards fields below it // nextID is the ID pre-allocated for next new watcher in this stream - nextID WatchID - closed bool - cancels map[WatchID]cancelFunc + nextID WatchID + closed bool + cancels map[WatchID]cancelFunc + watchers map[WatchID]*watcher } // Watch creates a new watcher in the stream and returns its WatchID. @@ -96,9 +105,10 @@ func (ws *watchStream) Watch(key, end []byte, startRev int64) WatchID { id := ws.nextID ws.nextID++ - _, c := ws.watchable.watch(key, end, startRev, id, ws.ch) + w, c := ws.watchable.watch(key, end, startRev, id, ws.ch) ws.cancels[id] = c + ws.watchers[id] = w return id } @@ -113,6 +123,7 @@ func (ws *watchStream) Cancel(id WatchID) error { } cancel() delete(ws.cancels, id) + delete(ws.watchers, id) return nil } @@ -133,3 +144,13 @@ func (ws *watchStream) Rev() int64 { defer ws.mu.Unlock() return ws.watchable.rev() } + +func (ws *watchStream) RequestProgress(id WatchID) { + ws.mu.Lock() + w, ok := ws.watchers[id] + ws.mu.Unlock() + if !ok { + return + } + ws.watchable.progress(w) +} diff --git a/storage/watcher_group.go b/storage/watcher_group.go index 4cbf54913..345ac61dd 100644 --- a/storage/watcher_group.go +++ b/storage/watcher_group.go @@ -75,6 +75,11 @@ func (wb watcherBatch) add(w *watcher, ev storagepb.Event) { eb.add(ev) } +func (wb watcherBatch) contains(w *watcher) bool { + _, ok := wb[w] + return ok +} + // newWatcherBatch maps watchers to their matched events. It enables quick // events look up by watcher. func newWatcherBatch(wg *watcherGroup, evs []storagepb.Event) watcherBatch { diff --git a/storage/watcher_test.go b/storage/watcher_test.go index 1f75eba00..a59b6f555 100644 --- a/storage/watcher_test.go +++ b/storage/watcher_test.go @@ -16,7 +16,10 @@ package storage import ( "bytes" + "os" + "reflect" "testing" + "time" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/storage/backend" @@ -184,3 +187,60 @@ func TestWatchStreamCancelWatcherByID(t *testing.T) { t.Errorf("cancels = %d, want 0", l) } } + +// TestWatcherRequestProgress ensures synced watcher can correctly +// report its correct progress. +func TestWatcherRequestProgress(t *testing.T) { + b, tmpPath := backend.NewDefaultTmpBackend() + + // manually create watchableStore instead of newWatchableStore + // because newWatchableStore automatically calls syncWatchers + // method to sync watchers in unsynced map. We want to keep watchers + // in unsynced to test if syncWatchers works as expected. + s := &watchableStore{ + store: NewStore(b, &lease.FakeLessor{}), + unsynced: newWatcherGroup(), + synced: newWatcherGroup(), + } + + defer func() { + s.store.Close() + os.Remove(tmpPath) + }() + + testKey := []byte("foo") + notTestKey := []byte("bad") + testValue := []byte("bar") + s.Put(testKey, testValue, lease.NoLease) + + w := s.NewWatchStream() + + badID := WatchID(1000) + w.RequestProgress(badID) + select { + case resp := <-w.Chan(): + t.Fatalf("unexpected %+v", resp) + default: + } + + id := w.Watch(notTestKey, nil, 1) + w.RequestProgress(id) + select { + case resp := <-w.Chan(): + t.Fatalf("unexpected %+v", resp) + default: + } + + s.syncWatchers() + + w.RequestProgress(id) + wrs := WatchResponse{WatchID: 0, Revision: 2} + select { + case resp := <-w.Chan(): + if !reflect.DeepEqual(resp, wrs) { + t.Fatalf("got %+v, expect %+v", resp, wrs) + } + case <-time.After(time.Second): + t.Fatal("failed to receive progress") + } +}