storage: simplify watch store

We decided that we will not support modifing the same
key in one txn multiple times. That can simlify the current
code/design a lot.
This commit is contained in:
Xiang Li 2016-01-31 15:56:31 -08:00
parent 6577df17d6
commit 611751aee2
2 changed files with 45 additions and 89 deletions

View File

@ -62,6 +62,8 @@ type store struct {
tx backend.BatchTx
txnID int64 // tracks the current txnID to verify txn operations
changes []storagepb.KeyValue
wg sync.WaitGroup
stopc chan struct{}
}
@ -426,6 +428,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
s.tx.UnsafePut(keyBucketName, ibytes, d)
s.kvindex.Put(key, revision{main: rev, sub: s.currentRev.sub})
s.changes = append(s.changes, kv)
s.currentRev.sub += 1
if leaseID != lease.NoLease {
@ -482,11 +485,18 @@ func (s *store) delete(key []byte) {
if err != nil {
log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err)
}
s.changes = append(s.changes, kv)
s.currentRev.sub += 1
// TODO: De-attach keys from lease if necessary
}
func (s *store) getChanges() []storagepb.KeyValue {
changes := s.changes
s.changes = make([]storagepb.KeyValue, 0, 128)
return changes
}
// appendMarkTombstone appends tombstone mark to normal revision bytes.
func appendMarkTombstone(b []byte) []byte {
if len(b) != revBytesLen {

View File

@ -51,7 +51,6 @@ type watchableStore struct {
// contains all synced watchers that are in sync with the progress of the store.
// The key of the map is the key that the watcher watches on.
synced map[string]map[*watcher]struct{}
tx *ongoingTx
stopc chan struct{}
wg sync.WaitGroup
@ -82,14 +81,14 @@ func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64)
defer s.mu.Unlock()
rev = s.store.Put(key, value, lease)
// TODO: avoid this range
kvs, _, err := s.store.Range(key, nil, 0, rev)
if err != nil {
log.Panicf("unexpected range error (%v)", err)
changes := s.store.getChanges()
if len(changes) != 1 {
log.Panicf("unexpected len(changes) != 1 after put")
}
ev := storagepb.Event{
Type: storagepb.PUT,
Kv: &kvs[0],
Kv: &changes[0],
}
s.handle(rev, []storagepb.Event{ev})
return rev
@ -99,19 +98,22 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
s.mu.Lock()
defer s.mu.Unlock()
// TODO: avoid this range
kvs, _, err := s.store.Range(key, end, 0, 0)
if err != nil {
log.Panicf("unexpected range error (%v)", err)
}
n, rev = s.store.DeleteRange(key, end)
evs := make([]storagepb.Event, len(kvs))
for i, kv := range kvs {
changes := s.store.getChanges()
if len(changes) != int(n) {
log.Panicf("unexpected len(changes) != n after deleteRange")
}
if n == 0 {
return n, rev
}
evs := make([]storagepb.Event, n)
for i, change := range changes {
evs[i] = storagepb.Event{
Type: storagepb.DELETE,
Kv: &storagepb.KeyValue{
Key: kv.Key,
}}
Kv: &change}
}
s.handle(rev, evs)
return n, rev
@ -119,67 +121,38 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
func (s *watchableStore) TxnBegin() int64 {
s.mu.Lock()
s.tx = newOngoingTx()
return s.store.TxnBegin()
}
func (s *watchableStore) TxnPut(txnID int64, key, value []byte, lease lease.LeaseID) (rev int64, err error) {
rev, err = s.store.TxnPut(txnID, key, value, lease)
if err == nil {
s.tx.put(string(key))
}
return rev, err
}
func (s *watchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
kvs, _, err := s.store.TxnRange(txnID, key, end, 0, 0)
if err != nil {
log.Panicf("unexpected range error (%v)", err)
}
n, rev, err = s.store.TxnDeleteRange(txnID, key, end)
if err == nil {
for _, kv := range kvs {
s.tx.del(string(kv.Key))
}
}
return n, rev, err
}
func (s *watchableStore) TxnEnd(txnID int64) error {
err := s.store.TxnEnd(txnID)
if err != nil {
return err
}
_, rev, _ := s.store.Range(nil, nil, 0, 0)
evs := []storagepb.Event{}
for k := range s.tx.putm {
kvs, _, err := s.store.Range([]byte(k), nil, 0, 0)
if err != nil {
log.Panicf("unexpected range error (%v)", err)
}
ev := storagepb.Event{
Type: storagepb.PUT,
Kv: &kvs[0],
}
evs = append(evs, ev)
changes := s.getChanges()
if len(changes) == 0 {
s.mu.Unlock()
return nil
}
for k := range s.tx.delm {
ev := storagepb.Event{
Type: storagepb.DELETE,
Kv: &storagepb.KeyValue{
Key: []byte(k),
},
evs := make([]storagepb.Event, len(changes))
for i, change := range changes {
switch change.Value {
case nil:
evs[i] = storagepb.Event{
Type: storagepb.DELETE,
Kv: &changes[i]}
default:
evs[i] = storagepb.Event{
Type: storagepb.PUT,
Kv: &changes[i]}
}
evs = append(evs, ev)
}
s.handle(rev, evs)
s.handle(s.store.Rev(), evs)
s.mu.Unlock()
return nil
}
@ -408,33 +381,6 @@ func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
func (s *watchableStore) rev() int64 { return s.store.Rev() }
type ongoingTx struct {
// keys put/deleted in the ongoing txn
putm map[string]struct{}
delm map[string]struct{}
}
func newOngoingTx() *ongoingTx {
return &ongoingTx{
putm: make(map[string]struct{}),
delm: make(map[string]struct{}),
}
}
func (tx *ongoingTx) put(k string) {
tx.putm[k] = struct{}{}
if _, ok := tx.delm[k]; ok {
delete(tx.delm, k)
}
}
func (tx *ongoingTx) del(k string) {
tx.delm[k] = struct{}{}
if _, ok := tx.putm[k]; ok {
delete(tx.putm, k)
}
}
type watcher struct {
// the watcher key
key []byte