mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #3507 from yichengq/watch
storage: support basic watch
This commit is contained in:
commit
c082488e23
@ -2,6 +2,7 @@ package storage
|
||||
|
||||
import (
|
||||
"log"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/google/btree"
|
||||
@ -13,6 +14,7 @@ type index interface {
|
||||
Put(key []byte, rev revision)
|
||||
Restore(key []byte, created, modified revision, ver int64)
|
||||
Tombstone(key []byte, rev revision) error
|
||||
RangeEvents(key, end []byte, rev int64) []revision
|
||||
Compact(rev int64) map[revision]struct{}
|
||||
Equal(b index) bool
|
||||
}
|
||||
@ -118,6 +120,38 @@ func (ti *treeIndex) Tombstone(key []byte, rev revision) error {
|
||||
return ki.tombstone(rev.main, rev.sub)
|
||||
}
|
||||
|
||||
// RangeEvents returns all revisions from key(including) to end(excluding)
|
||||
// at or after the given rev. The returned slice is sorted in the order
|
||||
// of revision.
|
||||
func (ti *treeIndex) RangeEvents(key, end []byte, rev int64) []revision {
|
||||
ti.RLock()
|
||||
defer ti.RUnlock()
|
||||
|
||||
keyi := &keyIndex{key: key}
|
||||
if end == nil {
|
||||
item := ti.tree.Get(keyi)
|
||||
if item == nil {
|
||||
return nil
|
||||
}
|
||||
keyi = item.(*keyIndex)
|
||||
return keyi.since(rev)
|
||||
}
|
||||
|
||||
endi := &keyIndex{key: end}
|
||||
var revs []revision
|
||||
ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
|
||||
if !item.Less(endi) {
|
||||
return false
|
||||
}
|
||||
curKeyi := item.(*keyIndex)
|
||||
revs = append(revs, curKeyi.since(rev)...)
|
||||
return true
|
||||
})
|
||||
sort.Sort(revisions(revs))
|
||||
|
||||
return revs
|
||||
}
|
||||
|
||||
func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
|
||||
available := make(map[revision]struct{})
|
||||
emptyki := make([]*keyIndex, 0)
|
||||
|
@ -132,6 +132,41 @@ func (ki *keyIndex) get(atRev int64) (modified, created revision, ver int64, err
|
||||
return revision{}, revision{}, 0, ErrRevisionNotFound
|
||||
}
|
||||
|
||||
// since returns revisions since the give rev. Only the revision with the
|
||||
// largest sub revision will be returned if multiple revisions have the same
|
||||
// main revision.
|
||||
func (ki *keyIndex) since(rev int64) []revision {
|
||||
if ki.isEmpty() {
|
||||
log.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
|
||||
}
|
||||
since := revision{rev, 0}
|
||||
var gi int
|
||||
// find the generations to start checking
|
||||
for gi = len(ki.generations) - 1; gi > 0; gi-- {
|
||||
if since.GreaterThan(ki.generations[gi].created) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
var revs []revision
|
||||
var last int64
|
||||
for ; gi < len(ki.generations); gi++ {
|
||||
for _, r := range ki.generations[gi].revs {
|
||||
if since.GreaterThan(r) {
|
||||
continue
|
||||
}
|
||||
if r.main == last {
|
||||
// replace the revision with a new one that has higher sub value,
|
||||
// because the original one should not be seen by external
|
||||
revs[len(revs)-1] = r
|
||||
}
|
||||
revs = append(revs, r)
|
||||
last = r.main
|
||||
}
|
||||
}
|
||||
return revs
|
||||
}
|
||||
|
||||
// compact compacts a keyIndex by removing the versions with smaller or equal
|
||||
// revision than the given atRev except the largest one (If the largest one is
|
||||
// a tombstone, it will not be kept).
|
||||
|
@ -6,6 +6,10 @@ import (
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
|
||||
// CancelFunc tells an operation to abandon its work. A CancelFunc does not
|
||||
// wait for the work to stop.
|
||||
type CancelFunc func()
|
||||
|
||||
type KV interface {
|
||||
// Range gets the keys in the range at rangeRev.
|
||||
// If rangeRev <=0, range gets the keys at currentRev.
|
||||
@ -50,3 +54,34 @@ type KV interface {
|
||||
Restore() error
|
||||
Close() error
|
||||
}
|
||||
|
||||
// Watcher watches on the KV. It will be notified if there is an event
|
||||
// happened on the watched key or prefix.
|
||||
type Watcher interface {
|
||||
// Event returns a channel that receives observed event that matches the
|
||||
// context of watcher. When watch finishes or is canceled or aborted, the
|
||||
// channel is closed and returns empty event.
|
||||
// Successive calls to Event return the same value.
|
||||
Event() <-chan storagepb.Event
|
||||
|
||||
// Err returns a non-nil error value after Event is closed. Err returns
|
||||
// Compacted if the history was compacted, Canceled if watch is canceled,
|
||||
// or EOF if watch reaches the end revision. No other values for Err are defined.
|
||||
// After Event is closed, successive calls to Err return the same value.
|
||||
Err() error
|
||||
}
|
||||
|
||||
// WatchableKV is a KV that can be watched.
|
||||
type WatchableKV interface {
|
||||
KV
|
||||
|
||||
// Watcher watches the events happening or happened in etcd. The whole
|
||||
// event history can be watched unless compacted.
|
||||
// If `prefix` is true, watch observes all events whose key prefix could be the given `key`.
|
||||
// If `startRev` <=0, watch observes events after currentRev.
|
||||
// If `endRev` <=0, watch observes events until watch is cancelled.
|
||||
//
|
||||
// Canceling the watcher releases resources associated with it, so code
|
||||
// should always call cancel as soon as watch is done.
|
||||
Watcher(key []byte, prefix bool, startRev, endRev int64) (Watcher, CancelFunc)
|
||||
}
|
||||
|
@ -713,6 +713,109 @@ func TestKVSnapshot(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchableKVWatch(t *testing.T) {
|
||||
s := newWatchableStore(tmpPath)
|
||||
defer cleanup(s, tmpPath)
|
||||
|
||||
wa, cancel := s.Watcher([]byte("foo"), true, 0, 0)
|
||||
defer cancel()
|
||||
|
||||
s.Put([]byte("foo"), []byte("bar"))
|
||||
select {
|
||||
case ev := <-wa.Event():
|
||||
wev := storagepb.Event{
|
||||
Type: storagepb.PUT,
|
||||
Kv: &storagepb.KeyValue{
|
||||
Key: []byte("foo"),
|
||||
Value: []byte("bar"),
|
||||
CreateRevision: 1,
|
||||
ModRevision: 1,
|
||||
Version: 1,
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(ev, wev) {
|
||||
t.Errorf("watched event = %+v, want %+v", ev, wev)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("failed to watch the event")
|
||||
}
|
||||
|
||||
s.Put([]byte("foo1"), []byte("bar1"))
|
||||
select {
|
||||
case ev := <-wa.Event():
|
||||
wev := storagepb.Event{
|
||||
Type: storagepb.PUT,
|
||||
Kv: &storagepb.KeyValue{
|
||||
Key: []byte("foo1"),
|
||||
Value: []byte("bar1"),
|
||||
CreateRevision: 2,
|
||||
ModRevision: 2,
|
||||
Version: 1,
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(ev, wev) {
|
||||
t.Errorf("watched event = %+v, want %+v", ev, wev)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("failed to watch the event")
|
||||
}
|
||||
|
||||
wa, cancel = s.Watcher([]byte("foo1"), false, 1, 4)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case ev := <-wa.Event():
|
||||
wev := storagepb.Event{
|
||||
Type: storagepb.PUT,
|
||||
Kv: &storagepb.KeyValue{
|
||||
Key: []byte("foo1"),
|
||||
Value: []byte("bar1"),
|
||||
CreateRevision: 2,
|
||||
ModRevision: 2,
|
||||
Version: 1,
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(ev, wev) {
|
||||
t.Errorf("watched event = %+v, want %+v", ev, wev)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("failed to watch the event")
|
||||
}
|
||||
|
||||
s.Put([]byte("foo1"), []byte("bar11"))
|
||||
select {
|
||||
case ev := <-wa.Event():
|
||||
wev := storagepb.Event{
|
||||
Type: storagepb.PUT,
|
||||
Kv: &storagepb.KeyValue{
|
||||
Key: []byte("foo1"),
|
||||
Value: []byte("bar11"),
|
||||
CreateRevision: 2,
|
||||
ModRevision: 3,
|
||||
Version: 2,
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(ev, wev) {
|
||||
t.Errorf("watched event = %+v, want %+v", ev, wev)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("failed to watch the event")
|
||||
}
|
||||
|
||||
select {
|
||||
case ev := <-wa.Event():
|
||||
if !reflect.DeepEqual(ev, storagepb.Event{}) {
|
||||
t.Errorf("watched event = %+v, want %+v", ev, storagepb.Event{})
|
||||
}
|
||||
if g := wa.Err(); g != ExceedEnd {
|
||||
t.Errorf("err = %+v, want %+v", g, ExceedEnd)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("failed to watch the event")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func cleanup(s KV, path string) {
|
||||
s.Close()
|
||||
os.Remove(path)
|
||||
|
@ -26,6 +26,7 @@ var (
|
||||
ErrTxnIDMismatch = errors.New("storage: txn id mismatch")
|
||||
ErrCompacted = errors.New("storage: required revision has been compacted")
|
||||
ErrFutureRev = errors.New("storage: required revision is a future revision")
|
||||
ErrCanceled = errors.New("storage: watcher is canceled")
|
||||
)
|
||||
|
||||
type store struct {
|
||||
@ -171,6 +172,54 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err
|
||||
return n, rev, nil
|
||||
}
|
||||
|
||||
// RangeEvents gets the events from key to end at or after rangeRev.
|
||||
// If rangeRev <=0, rangeEvents returns events from the beginning of the history.
|
||||
// If `end` is nil, the request only observes the events on key.
|
||||
// If `end` is not nil, it observes the events on key range [key, range_end).
|
||||
// Limit limits the number of events returned.
|
||||
// If the required rev is compacted, ErrCompacted will be returned.
|
||||
// TODO: return byte slices instead of events to avoid meaningless encode and decode.
|
||||
func (s *store) RangeEvents(key, end []byte, limit, startRev, endRev int64) (evs []storagepb.Event, nextRev int64, err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if startRev <= s.compactMainRev {
|
||||
return nil, 0, ErrCompacted
|
||||
}
|
||||
|
||||
revs := s.kvindex.RangeEvents(key, end, startRev)
|
||||
if len(revs) == 0 {
|
||||
return nil, s.currentRev.main + 1, nil
|
||||
}
|
||||
|
||||
tx := s.b.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
// fetch events from the backend using revisions
|
||||
for _, rev := range revs {
|
||||
if rev.main >= endRev {
|
||||
return evs, rev.main, nil
|
||||
}
|
||||
revbytes := newRevBytes()
|
||||
revToBytes(rev, revbytes)
|
||||
|
||||
_, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
|
||||
if len(vs) != 1 {
|
||||
log.Fatalf("storage: range cannot find rev (%d,%d)", rev.main, rev.sub)
|
||||
}
|
||||
|
||||
e := storagepb.Event{}
|
||||
if err := e.Unmarshal(vs[0]); err != nil {
|
||||
log.Fatalf("storage: cannot unmarshal event: %v", err)
|
||||
}
|
||||
evs = append(evs, e)
|
||||
if limit > 0 && len(evs) >= int(limit) {
|
||||
return evs, rev.main + 1, nil
|
||||
}
|
||||
}
|
||||
return evs, s.currentRev.main + 1, nil
|
||||
}
|
||||
|
||||
func (s *store) Compact(rev int64) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
@ -473,6 +473,9 @@ func (i *fakeIndex) Tombstone(key []byte, rev revision) error {
|
||||
i.Recorder.Record(testutil.Action{Name: "tombstone", Params: []interface{}{key, rev}})
|
||||
return nil
|
||||
}
|
||||
func (i *fakeIndex) RangeEvents(key, end []byte, rev int64) []revision {
|
||||
return nil
|
||||
}
|
||||
func (i *fakeIndex) Compact(rev int64) map[revision]struct{} {
|
||||
i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}})
|
||||
return <-i.indexCompactRespc
|
||||
|
@ -33,3 +33,9 @@ func bytesToRev(bytes []byte) revision {
|
||||
sub: int64(binary.BigEndian.Uint64(bytes[9:])),
|
||||
}
|
||||
}
|
||||
|
||||
type revisions []revision
|
||||
|
||||
func (a revisions) Len() int { return len(a) }
|
||||
func (a revisions) Less(i, j int) bool { return a[j].GreaterThan(a[i]) }
|
||||
func (a revisions) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
336
storage/watchable_store.go
Normal file
336
storage/watchable_store.go
Normal file
@ -0,0 +1,336 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
|
||||
// ReachEnd is the error returned by Watcher.Err when watcher reaches its end revision and
|
||||
// no more event is available.
|
||||
var ExceedEnd = errors.New("storage: watcher reaches end revision")
|
||||
|
||||
type watchableStore struct {
|
||||
mu sync.Mutex
|
||||
|
||||
KV
|
||||
|
||||
// contains all unsynced watchers that needs to sync events that have happened
|
||||
// TODO: use map to reduce cancel cost
|
||||
unsynced []*watcher
|
||||
// contains all synced watchers that are tracking the events that will happen
|
||||
// The key of the map is the key that the watcher is watching on.
|
||||
synced map[string][]*watcher
|
||||
// contains all synced watchers that have an end revision
|
||||
// The key of the map is the end revision of the watcher.
|
||||
endm map[int64][]*watcher
|
||||
tx *ongoingTx
|
||||
|
||||
stopc chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func newWatchableStore(path string) *watchableStore {
|
||||
s := &watchableStore{
|
||||
KV: newStore(path),
|
||||
synced: make(map[string][]*watcher),
|
||||
endm: make(map[int64][]*watcher),
|
||||
stopc: make(chan struct{}),
|
||||
}
|
||||
s.wg.Add(1)
|
||||
go s.syncWatchersLoop()
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *watchableStore) Put(key, value []byte) (rev int64) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
rev = s.KV.Put(key, value)
|
||||
// TODO: avoid this range
|
||||
kvs, _, err := s.KV.Range(key, nil, 0, rev)
|
||||
if err != nil {
|
||||
log.Panicf("unexpected range error (%v)", err)
|
||||
}
|
||||
s.handle(rev, storagepb.Event{
|
||||
Type: storagepb.PUT,
|
||||
Kv: &kvs[0],
|
||||
})
|
||||
return rev
|
||||
}
|
||||
|
||||
func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// TODO: avoid this range
|
||||
kvs, _, err := s.KV.Range(key, end, 0, 0)
|
||||
if err != nil {
|
||||
log.Panicf("unexpected range error (%v)", err)
|
||||
}
|
||||
n, rev = s.KV.DeleteRange(key, end)
|
||||
for _, kv := range kvs {
|
||||
s.handle(rev, storagepb.Event{
|
||||
Type: storagepb.DELETE,
|
||||
Kv: &storagepb.KeyValue{
|
||||
Key: kv.Key,
|
||||
},
|
||||
})
|
||||
}
|
||||
return n, rev
|
||||
}
|
||||
|
||||
func (s *watchableStore) TxnBegin() int64 {
|
||||
s.mu.Lock()
|
||||
s.tx = newOngoingTx()
|
||||
return s.KV.TxnBegin()
|
||||
}
|
||||
|
||||
func (s *watchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err error) {
|
||||
rev, err = s.KV.TxnPut(txnID, key, value)
|
||||
if err == nil {
|
||||
s.tx.put(string(key))
|
||||
}
|
||||
return rev, err
|
||||
}
|
||||
|
||||
func (s *watchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
|
||||
kvs, _, err := s.KV.TxnRange(txnID, key, end, 0, 0)
|
||||
if err != nil {
|
||||
log.Panicf("unexpected range error (%v)", err)
|
||||
}
|
||||
n, rev, err = s.KV.TxnDeleteRange(txnID, key, end)
|
||||
if err == nil {
|
||||
for _, kv := range kvs {
|
||||
s.tx.del(string(kv.Key))
|
||||
}
|
||||
}
|
||||
return n, rev, err
|
||||
}
|
||||
|
||||
func (s *watchableStore) TxnEnd(txnID int64) error {
|
||||
err := s.KV.TxnEnd(txnID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, rev, _ := s.KV.Range(nil, nil, 0, 0)
|
||||
for k := range s.tx.putm {
|
||||
kvs, _, err := s.KV.Range([]byte(k), nil, 0, 0)
|
||||
if err != nil {
|
||||
log.Panicf("unexpected range error (%v)", err)
|
||||
}
|
||||
s.handle(rev, storagepb.Event{
|
||||
Type: storagepb.PUT,
|
||||
Kv: &kvs[0],
|
||||
})
|
||||
}
|
||||
for k := range s.tx.delm {
|
||||
s.handle(rev, storagepb.Event{
|
||||
Type: storagepb.DELETE,
|
||||
Kv: &storagepb.KeyValue{
|
||||
Key: []byte(k),
|
||||
},
|
||||
})
|
||||
}
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *watchableStore) Close() error {
|
||||
close(s.stopc)
|
||||
s.wg.Wait()
|
||||
return s.KV.Close()
|
||||
}
|
||||
|
||||
func (s *watchableStore) Watcher(key []byte, prefix bool, startRev, endRev int64) (Watcher, CancelFunc) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
wa := newWatcher(key, prefix, startRev, endRev)
|
||||
k := string(key)
|
||||
if startRev == 0 {
|
||||
s.synced[k] = append(s.synced[k], wa)
|
||||
if endRev != 0 {
|
||||
s.endm[endRev] = append(s.endm[endRev], wa)
|
||||
}
|
||||
} else {
|
||||
s.unsynced = append(s.unsynced, wa)
|
||||
}
|
||||
|
||||
cancel := CancelFunc(func() {
|
||||
s.mu.Lock()
|
||||
s.mu.Unlock()
|
||||
wa.stopWithError(ErrCanceled)
|
||||
|
||||
// remove global references of the watcher
|
||||
for i, w := range s.unsynced {
|
||||
if w == wa {
|
||||
s.unsynced = append(s.unsynced[:i], s.unsynced[i+1:]...)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
for i, w := range s.synced[k] {
|
||||
if w == wa {
|
||||
s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...)
|
||||
}
|
||||
}
|
||||
if wa.end != 0 {
|
||||
for i, w := range s.endm[wa.end] {
|
||||
if w == wa {
|
||||
s.endm[wa.end] = append(s.endm[wa.end][:i], s.endm[wa.end][i+1:]...)
|
||||
}
|
||||
}
|
||||
}
|
||||
// If we cannot find it, it should have finished watch.
|
||||
})
|
||||
|
||||
return wa, cancel
|
||||
}
|
||||
|
||||
// keepSyncWatchers syncs the watchers in the unsyncd map every 100ms.
|
||||
func (s *watchableStore) syncWatchersLoop() {
|
||||
defer s.wg.Done()
|
||||
|
||||
for {
|
||||
s.mu.Lock()
|
||||
s.syncWatchers()
|
||||
s.mu.Unlock()
|
||||
|
||||
select {
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
case <-s.stopc:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// syncWatchers syncs the watchers in the unsyncd map.
|
||||
func (s *watchableStore) syncWatchers() {
|
||||
_, curRev, _ := s.KV.Range(nil, nil, 0, 0)
|
||||
|
||||
// filtering without allocating
|
||||
// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
|
||||
nws := s.unsynced[:0]
|
||||
for _, w := range s.unsynced {
|
||||
var end []byte
|
||||
if w.prefix {
|
||||
end = make([]byte, len(w.key))
|
||||
copy(end, w.key)
|
||||
end[len(w.key)-1]++
|
||||
}
|
||||
limit := cap(w.ch) - len(w.ch)
|
||||
// the channel is full, try it in the next round
|
||||
if limit == 0 {
|
||||
nws = append(nws, w)
|
||||
continue
|
||||
}
|
||||
evs, nextRev, err := s.KV.(*store).RangeEvents(w.key, end, int64(limit), w.cur, w.end)
|
||||
if err != nil {
|
||||
w.stopWithError(err)
|
||||
continue
|
||||
}
|
||||
|
||||
// push events to the channel
|
||||
for _, ev := range evs {
|
||||
w.ch <- ev
|
||||
}
|
||||
// stop watcher if it reaches the end
|
||||
if w.end > 0 && nextRev >= w.end {
|
||||
w.stopWithError(ExceedEnd)
|
||||
continue
|
||||
}
|
||||
// switch to tracking future events if needed
|
||||
if nextRev > curRev {
|
||||
s.synced[string(w.key)] = append(s.synced[string(w.key)], w)
|
||||
if w.end != 0 {
|
||||
s.endm[w.end] = append(s.endm[w.end], w)
|
||||
}
|
||||
continue
|
||||
}
|
||||
// put it back to try it in the next round
|
||||
w.cur = nextRev
|
||||
nws = append(nws, w)
|
||||
}
|
||||
s.unsynced = nws
|
||||
}
|
||||
|
||||
// handle handles the change of the happening event on all watchers.
|
||||
func (s *watchableStore) handle(rev int64, ev storagepb.Event) {
|
||||
s.notify(rev, ev)
|
||||
s.stopWatchers(rev)
|
||||
}
|
||||
|
||||
// notify notifies the fact that given event at the given rev just happened to
|
||||
// watchers that watch on the key of the event.
|
||||
func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
|
||||
// check all prefixes of the key to notify all corresponded watchers
|
||||
for i := 0; i <= len(ev.Kv.Key); i++ {
|
||||
ws := s.synced[string(ev.Kv.Key[:i])]
|
||||
nws := ws[:0]
|
||||
for _, w := range ws {
|
||||
// the watcher needs to be notified when either it watches prefix or
|
||||
// the key is exactly matched.
|
||||
if !w.prefix && i != len(ev.Kv.Key) {
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case w.ch <- ev:
|
||||
nws = append(nws, w)
|
||||
default:
|
||||
// put it back to unsynced place
|
||||
if w.end != 0 {
|
||||
for i, ew := range s.endm[w.end] {
|
||||
if ew == w {
|
||||
s.endm[w.end] = append(s.endm[w.end][:i], s.endm[w.end][i+1:]...)
|
||||
}
|
||||
}
|
||||
}
|
||||
w.cur = rev
|
||||
s.unsynced = append(s.unsynced, w)
|
||||
}
|
||||
}
|
||||
s.synced[string(ev.Kv.Key[:i])] = nws
|
||||
}
|
||||
}
|
||||
|
||||
// stopWatchers stops watchers with limit equal to rev.
|
||||
func (s *watchableStore) stopWatchers(rev int64) {
|
||||
for i, wa := range s.endm[rev+1] {
|
||||
k := string(wa.key)
|
||||
for _, w := range s.synced[k] {
|
||||
if w == wa {
|
||||
s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...)
|
||||
}
|
||||
}
|
||||
wa.stopWithError(ExceedEnd)
|
||||
}
|
||||
delete(s.endm, rev+1)
|
||||
}
|
||||
|
||||
type ongoingTx struct {
|
||||
// keys put/deleted in the ongoing txn
|
||||
putm map[string]bool
|
||||
delm map[string]bool
|
||||
}
|
||||
|
||||
func newOngoingTx() *ongoingTx {
|
||||
return &ongoingTx{
|
||||
putm: make(map[string]bool),
|
||||
delm: make(map[string]bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (tx *ongoingTx) put(k string) {
|
||||
tx.putm[k] = true
|
||||
tx.delm[k] = false
|
||||
}
|
||||
|
||||
func (tx *ongoingTx) del(k string) {
|
||||
tx.delm[k] = true
|
||||
tx.putm[k] = false
|
||||
}
|
46
storage/watcher.go
Normal file
46
storage/watcher.go
Normal file
@ -0,0 +1,46 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
|
||||
type watcher struct {
|
||||
key []byte
|
||||
prefix bool
|
||||
cur int64
|
||||
end int64
|
||||
|
||||
ch chan storagepb.Event
|
||||
mu sync.Mutex
|
||||
err error
|
||||
}
|
||||
|
||||
func newWatcher(key []byte, prefix bool, start, end int64) *watcher {
|
||||
return &watcher{
|
||||
key: key,
|
||||
prefix: prefix,
|
||||
cur: start,
|
||||
end: end,
|
||||
ch: make(chan storagepb.Event, 10),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *watcher) Event() <-chan storagepb.Event { return w.ch }
|
||||
|
||||
func (w *watcher) Err() error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
return w.err
|
||||
}
|
||||
|
||||
func (w *watcher) stopWithError(err error) {
|
||||
if w.err != nil {
|
||||
return
|
||||
}
|
||||
close(w.ch)
|
||||
w.mu.Lock()
|
||||
w.err = err
|
||||
w.mu.Unlock()
|
||||
}
|
17
storage/watcher_bench_test.go
Normal file
17
storage/watcher_bench_test.go
Normal file
@ -0,0 +1,17 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
|
||||
s := newWatchableStore(tmpPath)
|
||||
defer cleanup(s, tmpPath)
|
||||
|
||||
b.ReportAllocs()
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.Watcher([]byte(fmt.Sprint("foo", i)), false, 0, 0)
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user