mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
storage: remove the endRev of watcher
This commit is contained in:
parent
fd07e02604
commit
6556bf1643
@ -733,7 +733,7 @@ func TestWatchableKVWatch(t *testing.T) {
|
||||
s := newWatchableStore(tmpPath)
|
||||
defer cleanup(s, tmpPath)
|
||||
|
||||
wa, cancel := s.Watcher([]byte("foo"), true, 0, 0)
|
||||
wa, cancel := s.Watcher([]byte("foo"), true, 0)
|
||||
defer cancel()
|
||||
|
||||
s.Put([]byte("foo"), []byte("bar"))
|
||||
@ -776,7 +776,7 @@ func TestWatchableKVWatch(t *testing.T) {
|
||||
t.Fatalf("failed to watch the event")
|
||||
}
|
||||
|
||||
wa, cancel = s.Watcher([]byte("foo1"), false, 1, 4)
|
||||
wa, cancel = s.Watcher([]byte("foo1"), false, 1)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
@ -817,19 +817,6 @@ func TestWatchableKVWatch(t *testing.T) {
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("failed to watch the event")
|
||||
}
|
||||
|
||||
select {
|
||||
case ev := <-wa.Event():
|
||||
if !reflect.DeepEqual(ev, storagepb.Event{}) {
|
||||
t.Errorf("watched event = %+v, want %+v", ev, storagepb.Event{})
|
||||
}
|
||||
if g := wa.Err(); g != ExceedEnd {
|
||||
t.Errorf("err = %+v, want %+v", g, ExceedEnd)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("failed to watch the event")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func cleanup(s KV, path string) {
|
||||
|
@ -197,12 +197,11 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err
|
||||
return n, rev, nil
|
||||
}
|
||||
|
||||
// RangeEvents gets the events from key to end in [startRev, endRev).
|
||||
// RangeEvents gets the events from key to end starting from startRev.
|
||||
// If `end` is nil, the request only observes the events on key.
|
||||
// If `end` is not nil, it observes the events on key range [key, range_end).
|
||||
// Limit limits the number of events returned.
|
||||
// If startRev <=0, rangeEvents returns events from the beginning of uncompacted history.
|
||||
// If endRev <=0, it indicates there is no end revision.
|
||||
//
|
||||
// If the required start rev is compacted, ErrCompacted will be returned.
|
||||
// If the required start rev has not happened, ErrFutureRev will be returned.
|
||||
@ -215,7 +214,7 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err
|
||||
// has not progressed so far or it hits the event limit.
|
||||
//
|
||||
// TODO: return byte slices instead of events to avoid meaningless encode and decode.
|
||||
func (s *store) RangeEvents(key, end []byte, limit, startRev, endRev int64) (evs []storagepb.Event, nextRev int64, err error) {
|
||||
func (s *store) RangeEvents(key, end []byte, limit, startRev int64) (evs []storagepb.Event, nextRev int64, err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -236,9 +235,6 @@ func (s *store) RangeEvents(key, end []byte, limit, startRev, endRev int64) (evs
|
||||
defer tx.Unlock()
|
||||
// fetch events from the backend using revisions
|
||||
for _, rev := range revs {
|
||||
if endRev > 0 && rev.main >= endRev {
|
||||
return evs, rev.main, nil
|
||||
}
|
||||
revbytes := newRevBytes()
|
||||
revToBytes(rev, revbytes)
|
||||
|
||||
|
@ -295,7 +295,7 @@ func TestStoreRangeEvents(t *testing.T) {
|
||||
index.indexRangeEventsRespc <- tt.idxr
|
||||
b.tx.rangeRespc <- tt.r
|
||||
|
||||
evs, _, err := s.RangeEvents([]byte("foo"), []byte("goo"), 1, 1, 4)
|
||||
evs, _, err := s.RangeEvents([]byte("foo"), []byte("goo"), 1, 1)
|
||||
if err != nil {
|
||||
t.Errorf("#%d: err = %v, want nil", i, err)
|
||||
}
|
||||
@ -469,7 +469,7 @@ func TestStoreRangeEventsEnd(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
evs, rev, err := s.RangeEvents(tt.key, tt.end, 0, 1, 100)
|
||||
evs, rev, err := s.RangeEvents(tt.key, tt.end, 0, 1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -507,24 +507,17 @@ func TestStoreRangeEventsRev(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
start int64
|
||||
end int64
|
||||
|
||||
wevs []storagepb.Event
|
||||
wnext int64
|
||||
}{
|
||||
{1, 1, nil, 1},
|
||||
{1, 2, evs[:1], 2},
|
||||
{1, 3, evs[:2], 3},
|
||||
{1, 4, evs, 5},
|
||||
{1, 5, evs, 5},
|
||||
{1, 10, evs, 5},
|
||||
{3, 4, evs[2:], 5},
|
||||
{0, 10, evs, 5},
|
||||
{1, 0, evs, 5},
|
||||
{0, 0, evs, 5},
|
||||
{0, evs, 5},
|
||||
{1, evs, 5},
|
||||
{3, evs[2:], 5},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
evs, next, err := s.RangeEvents([]byte("foo"), nil, 0, tt.start, tt.end)
|
||||
evs, next, err := s.RangeEvents([]byte("foo"), nil, 0, tt.start)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -559,7 +552,7 @@ func TestStoreRangeEventsBad(t *testing.T) {
|
||||
{10, ErrFutureRev},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
_, _, err := s.RangeEvents([]byte("foo"), nil, 0, tt.rev, 100)
|
||||
_, _, err := s.RangeEvents([]byte("foo"), nil, 0, tt.rev)
|
||||
if err != tt.werr {
|
||||
t.Errorf("#%d: error = %v, want %v", i, err, tt.werr)
|
||||
}
|
||||
@ -602,7 +595,7 @@ func TestStoreRangeEventsLimit(t *testing.T) {
|
||||
{100, evs},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
evs, _, err := s.RangeEvents([]byte("foo"), nil, tt.limit, 1, 100)
|
||||
evs, _, err := s.RangeEvents([]byte("foo"), nil, tt.limit, 1)
|
||||
if err != nil {
|
||||
t.Fatalf("#%d: range error (%v)", i, err)
|
||||
}
|
||||
|
@ -15,7 +15,6 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
@ -23,10 +22,6 @@ import (
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
|
||||
// ReachEnd is the error returned by Watcher.Err when watcher reaches its end revision and
|
||||
// no more event is available.
|
||||
var ExceedEnd = errors.New("storage: watcher reaches end revision")
|
||||
|
||||
type watchableStore struct {
|
||||
mu sync.Mutex
|
||||
|
||||
@ -38,10 +33,7 @@ type watchableStore struct {
|
||||
// contains all synced watchers that are tracking the events that will happen
|
||||
// The key of the map is the key that the watcher is watching on.
|
||||
synced map[string][]*watcher
|
||||
// contains all synced watchers that have an end revision
|
||||
// The key of the map is the end revision of the watcher.
|
||||
endm map[int64][]*watcher
|
||||
tx *ongoingTx
|
||||
tx *ongoingTx
|
||||
|
||||
stopc chan struct{}
|
||||
wg sync.WaitGroup
|
||||
@ -51,7 +43,6 @@ func newWatchableStore(path string) *watchableStore {
|
||||
s := &watchableStore{
|
||||
KV: newStore(path),
|
||||
synced: make(map[string][]*watcher),
|
||||
endm: make(map[int64][]*watcher),
|
||||
stopc: make(chan struct{}),
|
||||
}
|
||||
s.wg.Add(1)
|
||||
@ -160,17 +151,14 @@ func (s *watchableStore) Close() error {
|
||||
return s.KV.Close()
|
||||
}
|
||||
|
||||
func (s *watchableStore) Watcher(key []byte, prefix bool, startRev, endRev int64) (Watcher, CancelFunc) {
|
||||
func (s *watchableStore) Watcher(key []byte, prefix bool, startRev int64) (Watcher, CancelFunc) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
wa := newWatcher(key, prefix, startRev, endRev)
|
||||
wa := newWatcher(key, prefix, startRev)
|
||||
k := string(key)
|
||||
if startRev == 0 {
|
||||
s.synced[k] = append(s.synced[k], wa)
|
||||
if endRev != 0 {
|
||||
s.endm[endRev] = append(s.endm[endRev], wa)
|
||||
}
|
||||
} else {
|
||||
slowWatchersGauge.Inc()
|
||||
s.unsynced = append(s.unsynced, wa)
|
||||
@ -198,13 +186,6 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev, endRev int64
|
||||
watchersGauge.Dec()
|
||||
}
|
||||
}
|
||||
if wa.end != 0 {
|
||||
for i, w := range s.endm[wa.end] {
|
||||
if w == wa {
|
||||
s.endm[wa.end] = append(s.endm[wa.end][:i], s.endm[wa.end][i+1:]...)
|
||||
}
|
||||
}
|
||||
}
|
||||
// If we cannot find it, it should have finished watch.
|
||||
})
|
||||
|
||||
@ -248,7 +229,7 @@ func (s *watchableStore) syncWatchers() {
|
||||
nws = append(nws, w)
|
||||
continue
|
||||
}
|
||||
evs, nextRev, err := s.KV.(*store).RangeEvents(w.key, end, int64(limit), w.cur, w.end)
|
||||
evs, nextRev, err := s.KV.(*store).RangeEvents(w.key, end, int64(limit), w.cur)
|
||||
if err != nil {
|
||||
w.stopWithError(err)
|
||||
continue
|
||||
@ -259,17 +240,9 @@ func (s *watchableStore) syncWatchers() {
|
||||
w.ch <- ev
|
||||
pendingEventsGauge.Inc()
|
||||
}
|
||||
// stop watcher if it reaches the end
|
||||
if w.end > 0 && nextRev >= w.end {
|
||||
w.stopWithError(ExceedEnd)
|
||||
continue
|
||||
}
|
||||
// switch to tracking future events if needed
|
||||
if nextRev > curRev {
|
||||
s.synced[string(w.key)] = append(s.synced[string(w.key)], w)
|
||||
if w.end != 0 {
|
||||
s.endm[w.end] = append(s.endm[w.end], w)
|
||||
}
|
||||
continue
|
||||
}
|
||||
// put it back to try it in the next round
|
||||
@ -283,7 +256,6 @@ func (s *watchableStore) syncWatchers() {
|
||||
// handle handles the change of the happening event on all watchers.
|
||||
func (s *watchableStore) handle(rev int64, ev storagepb.Event) {
|
||||
s.notify(rev, ev)
|
||||
s.stopWatchers(rev)
|
||||
}
|
||||
|
||||
// notify notifies the fact that given event at the given rev just happened to
|
||||
@ -304,14 +276,6 @@ func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
|
||||
pendingEventsGauge.Inc()
|
||||
nws = append(nws, w)
|
||||
default:
|
||||
// put it back to unsynced place
|
||||
if w.end != 0 {
|
||||
for i, ew := range s.endm[w.end] {
|
||||
if ew == w {
|
||||
s.endm[w.end] = append(s.endm[w.end][:i], s.endm[w.end][i+1:]...)
|
||||
}
|
||||
}
|
||||
}
|
||||
w.cur = rev
|
||||
s.unsynced = append(s.unsynced, w)
|
||||
slowWatchersGauge.Inc()
|
||||
@ -321,21 +285,6 @@ func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
|
||||
}
|
||||
}
|
||||
|
||||
// stopWatchers stops watchers with limit equal to rev.
|
||||
func (s *watchableStore) stopWatchers(rev int64) {
|
||||
for i, wa := range s.endm[rev+1] {
|
||||
k := string(wa.key)
|
||||
for _, w := range s.synced[k] {
|
||||
if w == wa {
|
||||
s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...)
|
||||
watchersGauge.Dec()
|
||||
}
|
||||
}
|
||||
wa.stopWithError(ExceedEnd)
|
||||
}
|
||||
delete(s.endm, rev+1)
|
||||
}
|
||||
|
||||
type ongoingTx struct {
|
||||
// keys put/deleted in the ongoing txn
|
||||
putm map[string]bool
|
||||
|
@ -24,19 +24,17 @@ type watcher struct {
|
||||
key []byte
|
||||
prefix bool
|
||||
cur int64
|
||||
end int64
|
||||
|
||||
ch chan storagepb.Event
|
||||
mu sync.Mutex
|
||||
err error
|
||||
}
|
||||
|
||||
func newWatcher(key []byte, prefix bool, start, end int64) *watcher {
|
||||
func newWatcher(key []byte, prefix bool, start int64) *watcher {
|
||||
return &watcher{
|
||||
key: key,
|
||||
prefix: prefix,
|
||||
cur: start,
|
||||
end: end,
|
||||
ch: make(chan storagepb.Event, 10),
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,6 @@ func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.Watcher([]byte(fmt.Sprint("foo", i)), false, 0, 0)
|
||||
s.Watcher([]byte(fmt.Sprint("foo", i)), false, 0)
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user