Merge pull request #4363 from xiang90/watch

storage: simplify watch store
This commit is contained in:
Xiang Li 2016-01-31 16:49:44 -08:00
commit 4ba1ec6a4d
2 changed files with 45 additions and 89 deletions

View File

@ -62,6 +62,8 @@ type store struct {
tx backend.BatchTx tx backend.BatchTx
txnID int64 // tracks the current txnID to verify txn operations txnID int64 // tracks the current txnID to verify txn operations
changes []storagepb.KeyValue
wg sync.WaitGroup wg sync.WaitGroup
stopc chan struct{} stopc chan struct{}
} }
@ -427,6 +429,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
s.tx.UnsafePut(keyBucketName, ibytes, d) s.tx.UnsafePut(keyBucketName, ibytes, d)
s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub}) s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub})
s.changes = append(s.changes, kv)
s.currentRev.sub += 1 s.currentRev.sub += 1
if leaseID != lease.NoLease { if leaseID != lease.NoLease {
@ -483,11 +486,18 @@ func (s *store) delete(key []byte) {
if err != nil { if err != nil {
log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err) log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err)
} }
s.changes = append(s.changes, kv)
s.currentRev.sub += 1 s.currentRev.sub += 1
// TODO: De-attach keys from lease if necessary // TODO: De-attach keys from lease if necessary
} }
func (s *store) getChanges() []storagepb.KeyValue {
changes := s.changes
s.changes = make([]storagepb.KeyValue, 0, 128)
return changes
}
// appendMarkTombstone appends tombstone mark to normal revision bytes. // appendMarkTombstone appends tombstone mark to normal revision bytes.
func appendMarkTombstone(b []byte) []byte { func appendMarkTombstone(b []byte) []byte {
if len(b) != revBytesLen { if len(b) != revBytesLen {

View File

@ -51,7 +51,6 @@ type watchableStore struct {
// contains all synced watchers that are in sync with the progress of the store. // contains all synced watchers that are in sync with the progress of the store.
// The key of the map is the key that the watcher watches on. // The key of the map is the key that the watcher watches on.
synced map[string]map[*watcher]struct{} synced map[string]map[*watcher]struct{}
tx *ongoingTx
stopc chan struct{} stopc chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
@ -82,14 +81,14 @@ func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64)
defer s.mu.Unlock() defer s.mu.Unlock()
rev = s.store.Put(key, value, lease) rev = s.store.Put(key, value, lease)
// TODO: avoid this range changes := s.store.getChanges()
kvs, _, err := s.store.Range(key, nil, 0, rev) if len(changes) != 1 {
if err != nil { log.Panicf("unexpected len(changes) != 1 after put")
log.Panicf("unexpected range error (%v)", err)
} }
ev := storagepb.Event{ ev := storagepb.Event{
Type: storagepb.PUT, Type: storagepb.PUT,
Kv: &kvs[0], Kv: &changes[0],
} }
s.handle(rev, []storagepb.Event{ev}) s.handle(rev, []storagepb.Event{ev})
return rev return rev
@ -99,19 +98,22 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
// TODO: avoid this range
kvs, _, err := s.store.Range(key, end, 0, 0)
if err != nil {
log.Panicf("unexpected range error (%v)", err)
}
n, rev = s.store.DeleteRange(key, end) n, rev = s.store.DeleteRange(key, end)
evs := make([]storagepb.Event, len(kvs)) changes := s.store.getChanges()
for i, kv := range kvs {
if len(changes) != int(n) {
log.Panicf("unexpected len(changes) != n after deleteRange")
}
if n == 0 {
return n, rev
}
evs := make([]storagepb.Event, n)
for i, change := range changes {
evs[i] = storagepb.Event{ evs[i] = storagepb.Event{
Type: storagepb.DELETE, Type: storagepb.DELETE,
Kv: &storagepb.KeyValue{ Kv: &change}
Key: kv.Key,
}}
} }
s.handle(rev, evs) s.handle(rev, evs)
return n, rev return n, rev
@ -119,67 +121,38 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
func (s *watchableStore) TxnBegin() int64 { func (s *watchableStore) TxnBegin() int64 {
s.mu.Lock() s.mu.Lock()
s.tx = newOngoingTx()
return s.store.TxnBegin() return s.store.TxnBegin()
} }
func (s *watchableStore) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) {
rev, err = s.store.TxnPut(txnID, key, value, lease)
if err == nil {
s.tx.put(string(key))
}
return rev, err
}
func (s *watchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
kvs, _, err := s.store.TxnRange(txnID, key, end, 0, 0)
if err != nil {
log.Panicf("unexpected range error (%v)", err)
}
n, rev, err = s.store.TxnDeleteRange(txnID, key, end)
if err == nil {
for _, kv := range kvs {
s.tx.del(string(kv.Key))
}
}
return n, rev, err
}
func (s *watchableStore) TxnEnd(txnID int64) error { func (s *watchableStore) TxnEnd(txnID int64) error {
err := s.store.TxnEnd(txnID) err := s.store.TxnEnd(txnID)
if err != nil { if err != nil {
return err return err
} }
_, rev, _ := s.store.Range(nil, nil, 0, 0) changes := s.getChanges()
if len(changes) == 0 {
evs := []storagepb.Event{} s.mu.Unlock()
return nil
for k := range s.tx.putm {
kvs, _, err := s.store.Range([]byte(k), nil, 0, 0)
if err != nil {
log.Panicf("unexpected range error (%v)", err)
}
ev := storagepb.Event{
Type: storagepb.PUT,
Kv: &kvs[0],
}
evs = append(evs, ev)
} }
for k := range s.tx.delm { evs := make([]storagepb.Event, len(changes))
ev := storagepb.Event{ for i, change := range changes {
Type: storagepb.DELETE, switch change.Value {
Kv: &storagepb.KeyValue{ case nil:
Key: []byte(k), evs[i] = storagepb.Event{
}, Type: storagepb.DELETE,
Kv: &changes[i]}
default:
evs[i] = storagepb.Event{
Type: storagepb.PUT,
Kv: &changes[i]}
} }
evs = append(evs, ev)
} }
s.handle(rev, evs) s.handle(s.store.Rev(), evs)
s.mu.Unlock() s.mu.Unlock()
return nil return nil
} }
@ -408,33 +381,6 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
func (s *watchableStore) rev() int64 { return s.store.Rev() } func (s *watchableStore) rev() int64 { return s.store.Rev() }
type ongoingTx struct {
// keys put/deleted in the ongoing txn
putm map[string]struct{}
delm map[string]struct{}
}
func newOngoingTx() *ongoingTx {
return &ongoingTx{
putm: make(map[string]struct{}),
delm: make(map[string]struct{}),
}
}
func (tx *ongoingTx) put(k string) {
tx.putm[k] = struct{}{}
if _, ok := tx.delm[k]; ok {
delete(tx.delm, k)
}
}
func (tx *ongoingTx) del(k string) {
tx.delm[k] = struct{}{}
if _, ok := tx.putm[k]; ok {
delete(tx.putm, k)
}
}
type watcher struct { type watcher struct {
// the watcher key // the watcher key
key []byte key []byte