From 611751aee2ae5725306ba8ce66e91401db9809ef Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 31 Jan 2016 15:56:31 -0800 Subject: [PATCH] storage: simplify watch store We decided that we will not support modifing the same key in one txn multiple times. That can simlify the current code/design a lot. --- storage/kvstore.go | 10 +++ storage/watchable_store.go | 124 +++++++++++-------------------------- 2 files changed, 45 insertions(+), 89 deletions(-) diff --git a/storage/kvstore.go b/storage/kvstore.go index 27325ea12..46f403376 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -62,6 +62,8 @@ type store struct { tx backend.BatchTx txnID int64 // tracks the current txnID to verify txn operations + changes []storagepb.KeyValue + wg sync.WaitGroup stopc chan struct{} } @@ -426,6 +428,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) { s.tx.UnsafePut(keyBucketName, ibytes, d) s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub}) + s.changes = append(s.changes, kv) s.currentRev.sub += 1 if leaseID != lease.NoLease { @@ -482,11 +485,18 @@ func (s *store) delete(key []byte) { if err != nil { log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err) } + s.changes = append(s.changes, kv) s.currentRev.sub += 1 // TODO: De-attach keys from lease if necessary } +func (s *store) getChanges() []storagepb.KeyValue { + changes := s.changes + s.changes = make([]storagepb.KeyValue, 0, 128) + return changes +} + // appendMarkTombstone appends tombstone mark to normal revision bytes. func appendMarkTombstone(b []byte) []byte { if len(b) != revBytesLen { diff --git a/storage/watchable_store.go b/storage/watchable_store.go index dde374652..bf92584ad 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -51,7 +51,6 @@ type watchableStore struct { // contains all synced watchers that are in sync with the progress of the store. // The key of the map is the key that the watcher watches on. synced map[string]map[*watcher]struct{} - tx *ongoingTx stopc chan struct{} wg sync.WaitGroup @@ -82,14 +81,14 @@ func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) defer s.mu.Unlock() rev = s.store.Put(key, value, lease) - // TODO: avoid this range - kvs, _, err := s.store.Range(key, nil, 0, rev) - if err != nil { - log.Panicf("unexpected range error (%v)", err) + changes := s.store.getChanges() + if len(changes) != 1 { + log.Panicf("unexpected len(changes) != 1 after put") } + ev := storagepb.Event{ Type: storagepb.PUT, - Kv: &kvs[0], + Kv: &changes[0], } s.handle(rev, []storagepb.Event{ev}) return rev @@ -99,19 +98,22 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) { s.mu.Lock() defer s.mu.Unlock() - // TODO: avoid this range - kvs, _, err := s.store.Range(key, end, 0, 0) - if err != nil { - log.Panicf("unexpected range error (%v)", err) - } n, rev = s.store.DeleteRange(key, end) - evs := make([]storagepb.Event, len(kvs)) - for i, kv := range kvs { + changes := s.store.getChanges() + + if len(changes) != int(n) { + log.Panicf("unexpected len(changes) != n after deleteRange") + } + + if n == 0 { + return n, rev + } + + evs := make([]storagepb.Event, n) + for i, change := range changes { evs[i] = storagepb.Event{ Type: storagepb.DELETE, - Kv: &storagepb.KeyValue{ - Key: kv.Key, - }} + Kv: &change} } s.handle(rev, evs) return n, rev @@ -119,67 +121,38 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) { func (s *watchableStore) TxnBegin() int64 { s.mu.Lock() - s.tx = newOngoingTx() return s.store.TxnBegin() } -func (s *watchableStore) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) { - rev, err = s.store.TxnPut(txnID, key, value, lease) - if err == nil { - s.tx.put(string(key)) - } - return rev, err -} - -func (s *watchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) { - kvs, _, err := s.store.TxnRange(txnID, key, end, 0, 0) - if err != nil { - log.Panicf("unexpected range error (%v)", err) - } - n, rev, err = s.store.TxnDeleteRange(txnID, key, end) - if err == nil { - for _, kv := range kvs { - s.tx.del(string(kv.Key)) - } - } - return n, rev, err -} - func (s *watchableStore) TxnEnd(txnID int64) error { err := s.store.TxnEnd(txnID) if err != nil { return err } - _, rev, _ := s.store.Range(nil, nil, 0, 0) - - evs := []storagepb.Event{} - - for k := range s.tx.putm { - kvs, _, err := s.store.Range([]byte(k), nil, 0, 0) - if err != nil { - log.Panicf("unexpected range error (%v)", err) - } - ev := storagepb.Event{ - Type: storagepb.PUT, - Kv: &kvs[0], - } - evs = append(evs, ev) + changes := s.getChanges() + if len(changes) == 0 { + s.mu.Unlock() + return nil } - for k := range s.tx.delm { - ev := storagepb.Event{ - Type: storagepb.DELETE, - Kv: &storagepb.KeyValue{ - Key: []byte(k), - }, + evs := make([]storagepb.Event, len(changes)) + for i, change := range changes { + switch change.Value { + case nil: + evs[i] = storagepb.Event{ + Type: storagepb.DELETE, + Kv: &changes[i]} + default: + evs[i] = storagepb.Event{ + Type: storagepb.PUT, + Kv: &changes[i]} } - evs = append(evs, ev) } - s.handle(rev, evs) - + s.handle(s.store.Rev(), evs) s.mu.Unlock() + return nil } @@ -408,33 +381,6 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) { func (s *watchableStore) rev() int64 { return s.store.Rev() } -type ongoingTx struct { - // keys put/deleted in the ongoing txn - putm map[string]struct{} - delm map[string]struct{} -} - -func newOngoingTx() *ongoingTx { - return &ongoingTx{ - putm: make(map[string]struct{}), - delm: make(map[string]struct{}), - } -} - -func (tx *ongoingTx) put(k string) { - tx.putm[k] = struct{}{} - if _, ok := tx.delm[k]; ok { - delete(tx.delm, k) - } -} - -func (tx *ongoingTx) del(k string) { - tx.delm[k] = struct{}{} - if _, ok := tx.putm[k]; ok { - delete(tx.putm, k) - } -} - type watcher struct { // the watcher key key []byte