storage: remove check for DELETE type KeyValue

kvindex always returns kvs that exist at given revision, so there is no
need to check for whether the KeyValue range from backend is DELETE type.
This commit is contained in:
Yicheng Qin 2015-09-03 15:13:20 -07:00
parent 00e31f13a6
commit 797a4796d9

View File

@ -319,9 +319,7 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
if err := e.Unmarshal(vs[0]); err != nil { if err := e.Unmarshal(vs[0]); err != nil {
log.Fatalf("storage: cannot unmarshal event: %v", err) log.Fatalf("storage: cannot unmarshal event: %v", err)
} }
if e.Type == storagepb.PUT { kvs = append(kvs, *e.Kv)
kvs = append(kvs, *e.Kv)
}
if limit > 0 && len(kvs) >= int(limit) { if limit > 0 && len(kvs) >= int(limit) {
break break
} }
@ -369,7 +367,6 @@ func (s *store) put(key, value []byte) {
func (s *store) deleteRange(key, end []byte) int64 { func (s *store) deleteRange(key, end []byte) int64 {
rev := s.currentRev.main + 1 rev := s.currentRev.main + 1
var n int64
rrev := rev rrev := rev
if s.currentRev.sub > 0 { if s.currentRev.sub > 0 {
rrev += 1 rrev += 1
@ -381,46 +378,18 @@ func (s *store) deleteRange(key, end []byte) int64 {
} }
for _, key := range keys { for _, key := range keys {
ok := s.delete(key) s.delete(key)
if ok {
n++
}
} }
return n return int64(len(keys))
} }
func (s *store) delete(key []byte) bool { func (s *store) delete(key []byte) {
mainrev := s.currentRev.main + 1 mainrev := s.currentRev.main + 1
grev := mainrev
if s.currentRev.sub > 0 {
grev += 1
}
rev, _, _, err := s.kvindex.Get(key, grev)
if err != nil {
// key not exist
return false
}
tx := s.b.BatchTx() tx := s.b.BatchTx()
tx.Lock() tx.Lock()
defer tx.Unlock() defer tx.Unlock()
revbytes := newRevBytes()
revToBytes(rev, revbytes)
_, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
if len(vs) != 1 {
log.Fatalf("storage: delete cannot find rev (%d,%d)", rev.main, rev.sub)
}
e := &storagepb.Event{}
if err := e.Unmarshal(vs[0]); err != nil {
log.Fatalf("storage: cannot unmarshal event: %v", err)
}
if e.Type == storagepb.DELETE {
return false
}
ibytes := newRevBytes() ibytes := newRevBytes()
revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes) revToBytes(revision{main: mainrev, sub: s.currentRev.sub}, ibytes)
@ -442,5 +411,4 @@ func (s *store) delete(key []byte) bool {
log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err) log.Fatalf("storage: cannot tombstone an existing key (%s): %v", string(key), err)
} }
s.currentRev.sub += 1 s.currentRev.sub += 1
return true
} }