*: support put with lease

This commit is contained in:
Xiang Li 2016-01-04 15:12:59 -08:00
parent cfe23b886d
commit 4336278b44
13 changed files with 120 additions and 105 deletions

View File

@ -143,12 +143,12 @@ func applyPut(txnID int64, kv dstorage.KV, p *pb.PutRequest) (*pb.PutResponse, e
err error
)
if txnID != noTxn {
rev, err = kv.TxnPut(txnID, p.Key, p.Value)
rev, err = kv.TxnPut(txnID, p.Key, p.Value, dstorage.LeaseID(p.Lease))
if err != nil {
return nil, err
}
} else {
rev = kv.Put(p.Key, p.Value)
rev = kv.Put(p.Key, p.Value, dstorage.LeaseID(p.Lease))
}
resp.Header.Revision = rev
return resp, nil

View File

@ -59,9 +59,9 @@ func newConsistentWatchableStore(path string, ig ConsistentIndexGetter) *consist
}
}
func (s *consistentWatchableStore) Put(key, value []byte) (rev int64) {
func (s *consistentWatchableStore) Put(key, value []byte, lease LeaseID) (rev int64) {
id := s.TxnBegin()
rev, err := s.TxnPut(id, key, value)
rev, err := s.TxnPut(id, key, value, lease)
if err != nil {
log.Panicf("unexpected TxnPut error (%v)", err)
}
@ -109,11 +109,11 @@ func (s *consistentWatchableStore) TxnRange(txnID int64, key, end []byte, limit,
return s.watchableStore.TxnRange(txnID, key, end, limit, rangeRev)
}
func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err error) {
func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) {
if s.skip {
return 0, nil
}
return s.watchableStore.TxnPut(txnID, key, value)
return s.watchableStore.TxnPut(txnID, key, value, lease)
}
func (s *consistentWatchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {

View File

@ -28,7 +28,7 @@ func TestConsistentWatchableStoreConsistentIndex(t *testing.T) {
tests := []uint64{1, 2, 3, 5, 10}
for i, tt := range tests {
idx = indexVal(tt)
s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo"), []byte("bar"), NoLease)
id := s.TxnBegin()
g := s.consistentIndex()
@ -44,10 +44,10 @@ func TestConsistentWatchableStoreSkip(t *testing.T) {
s := newConsistentWatchableStore(tmpPath, &idx)
defer cleanup(s, tmpPath)
s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo"), []byte("bar"), NoLease)
// put is skipped
rev := s.Put([]byte("foo"), []byte("bar"))
rev := s.Put([]byte("foo"), []byte("bar"), NoLease)
if rev != 0 {
t.Errorf("rev = %d, want 0", rev)
}

View File

@ -25,6 +25,8 @@ type CancelFunc func()
type Snapshot backend.Snapshot
type LeaseID int64
type KV interface {
// Rev returns the current revision of the KV.
Rev() int64
@ -37,9 +39,11 @@ type KV interface {
// If the required rev is compacted, ErrCompacted will be returned.
Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error)
// Put puts the given key,value into the store.
// Put puts the given key, value into the store. Put also takes additional argument lease to
// attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
// id.
// A put also increases the rev of the store, and generates one event in the event history.
Put(key, value []byte) (rev int64)
Put(key, value []byte, lease LeaseID) (rev int64)
// DeleteRange deletes the given range from the store.
// A deleteRange increases the rev of the store if any key in the range exists.
@ -57,7 +61,7 @@ type KV interface {
// TxnEnd ends the on-going txn with txn ID. If the on-going txn ID is not matched, error is returned.
TxnEnd(txnID int64) error
TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error)
TxnPut(txnID int64, key, value []byte) (rev int64, err error)
TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error)
TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
Compact(rev int64) error

View File

@ -34,7 +34,7 @@ import (
type (
rangeFunc func(kv KV, key, end []byte, limit, rangeRev int64) ([]storagepb.KeyValue, int64, error)
putFunc func(kv KV, key, value []byte) int64
putFunc func(kv KV, key, value []byte, lease LeaseID) int64
deleteRangeFunc func(kv KV, key, end []byte) (n, rev int64)
)
@ -48,13 +48,13 @@ var (
return kv.TxnRange(id, key, end, limit, rangeRev)
}
normalPutFunc = func(kv KV, key, value []byte) int64 {
return kv.Put(key, value)
normalPutFunc = func(kv KV, key, value []byte, lease LeaseID) int64 {
return kv.Put(key, value, lease)
}
txnPutFunc = func(kv KV, key, value []byte) int64 {
txnPutFunc = func(kv KV, key, value []byte, lease LeaseID) int64 {
id := kv.TxnBegin()
defer kv.TxnEnd(id)
rev, err := kv.TxnPut(id, key, value)
rev, err := kv.TxnPut(id, key, value, lease)
if err != nil {
panic("txn put error")
}
@ -92,13 +92,13 @@ func testKVRange(t *testing.T, f rangeFunc) {
s := newDefaultStore(tmpPath)
defer cleanup(s, tmpPath)
s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo1"), []byte("bar1"))
s.Put([]byte("foo2"), []byte("bar2"))
s.Put([]byte("foo"), []byte("bar"), 1)
s.Put([]byte("foo1"), []byte("bar1"), 2)
s.Put([]byte("foo2"), []byte("bar2"), 3)
kvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1},
{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1},
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2},
{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3},
}
wrev := int64(3)
@ -159,13 +159,13 @@ func testKVRangeRev(t *testing.T, f rangeFunc) {
s := newDefaultStore(tmpPath)
defer cleanup(s, tmpPath)
s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo1"), []byte("bar1"))
s.Put([]byte("foo2"), []byte("bar2"))
s.Put([]byte("foo"), []byte("bar"), 1)
s.Put([]byte("foo1"), []byte("bar1"), 2)
s.Put([]byte("foo2"), []byte("bar2"), 3)
kvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1},
{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1},
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2},
{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3},
}
tests := []struct {
@ -201,9 +201,9 @@ func testKVRangeBadRev(t *testing.T, f rangeFunc) {
s := newDefaultStore(tmpPath)
defer cleanup(s, tmpPath)
s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo1"), []byte("bar1"))
s.Put([]byte("foo2"), []byte("bar2"))
s.Put([]byte("foo"), []byte("bar"), NoLease)
s.Put([]byte("foo1"), []byte("bar1"), NoLease)
s.Put([]byte("foo2"), []byte("bar2"), NoLease)
if err := s.Compact(3); err != nil {
t.Fatalf("compact error (%v)", err)
}
@ -233,13 +233,13 @@ func testKVRangeLimit(t *testing.T, f rangeFunc) {
s := newDefaultStore(tmpPath)
defer cleanup(s, tmpPath)
s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo1"), []byte("bar1"))
s.Put([]byte("foo2"), []byte("bar2"))
s.Put([]byte("foo"), []byte("bar"), 1)
s.Put([]byte("foo1"), []byte("bar1"), 2)
s.Put([]byte("foo2"), []byte("bar2"), 3)
kvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1},
{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1},
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2},
{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3},
}
wrev := int64(3)
@ -280,7 +280,7 @@ func testKVPutMultipleTimes(t *testing.T, f putFunc) {
for i := 0; i < 10; i++ {
base := int64(i + 1)
rev := f(s, []byte("foo"), []byte("bar"))
rev := f(s, []byte("foo"), []byte("bar"), LeaseID(base))
if rev != base {
t.Errorf("#%d: rev = %d, want %d", i, rev, base)
}
@ -290,7 +290,7 @@ func testKVPutMultipleTimes(t *testing.T, f putFunc) {
t.Fatal(err)
}
wkvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: base, Version: base},
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: base, Version: base, Lease: base},
}
if !reflect.DeepEqual(kvs, wkvs) {
t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, wkvs)
@ -337,9 +337,9 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) {
for i, tt := range tests {
s := newDefaultStore(tmpPath)
s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo1"), []byte("bar1"))
s.Put([]byte("foo2"), []byte("bar2"))
s.Put([]byte("foo"), []byte("bar"), NoLease)
s.Put([]byte("foo1"), []byte("bar1"), NoLease)
s.Put([]byte("foo2"), []byte("bar2"), NoLease)
n, rev := f(s, tt.key, tt.end)
if n != tt.wN || rev != tt.wrev {
@ -357,7 +357,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
s := newDefaultStore(tmpPath)
defer cleanup(s, tmpPath)
s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo"), []byte("bar"), NoLease)
n, rev := f(s, []byte("foo"), nil)
if n != 1 || rev != 2 {
@ -381,7 +381,7 @@ func TestKVOperationInSequence(t *testing.T) {
base := int64(i * 2)
// put foo
rev := s.Put([]byte("foo"), []byte("bar"))
rev := s.Put([]byte("foo"), []byte("bar"), NoLease)
if rev != base+1 {
t.Errorf("#%d: put rev = %d, want %d", i, rev, base+1)
}
@ -391,7 +391,7 @@ func TestKVOperationInSequence(t *testing.T) {
t.Fatal(err)
}
wkvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: base + 1, ModRevision: base + 1, Version: 1},
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: base + 1, ModRevision: base + 1, Version: 1, Lease: int64(NoLease)},
}
if !reflect.DeepEqual(kvs, wkvs) {
t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, wkvs)
@ -425,7 +425,7 @@ func TestKVTxnBlockNonTnxOperations(t *testing.T) {
tests := []func(){
func() { s.Range([]byte("foo"), nil, 0, 0) },
func() { s.Put([]byte("foo"), nil) },
func() { s.Put([]byte("foo"), nil, NoLease) },
func() { s.DeleteRange([]byte("foo"), nil) },
}
for i, tt := range tests {
@ -463,7 +463,7 @@ func TestKVTxnWrongID(t *testing.T) {
return err
},
func() error {
_, err := s.TxnPut(wrongid, []byte("foo"), nil)
_, err := s.TxnPut(wrongid, []byte("foo"), nil, NoLease)
return err
},
func() error {
@ -495,7 +495,7 @@ func TestKVTnxOperationInSequence(t *testing.T) {
base := int64(i)
// put foo
rev, err := s.TxnPut(id, []byte("foo"), []byte("bar"))
rev, err := s.TxnPut(id, []byte("foo"), []byte("bar"), NoLease)
if err != nil {
t.Fatal(err)
}
@ -508,7 +508,7 @@ func TestKVTnxOperationInSequence(t *testing.T) {
t.Fatal(err)
}
wkvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: base + 1, ModRevision: base + 1, Version: 1},
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: base + 1, ModRevision: base + 1, Version: 1, Lease: int64(NoLease)},
}
if !reflect.DeepEqual(kvs, wkvs) {
t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, wkvs)
@ -545,10 +545,10 @@ func TestKVCompactReserveLastValue(t *testing.T) {
s := newDefaultStore(tmpPath)
defer cleanup(s, tmpPath)
s.Put([]byte("foo"), []byte("bar0"))
s.Put([]byte("foo"), []byte("bar1"))
s.Put([]byte("foo"), []byte("bar0"), 1)
s.Put([]byte("foo"), []byte("bar1"), 2)
s.DeleteRange([]byte("foo"), nil)
s.Put([]byte("foo"), []byte("bar2"))
s.Put([]byte("foo"), []byte("bar2"), 3)
// rev in tests will be called in Compact() one by one on the same store
tests := []struct {
@ -559,13 +559,13 @@ func TestKVCompactReserveLastValue(t *testing.T) {
{
0,
[]storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar0"), CreateRevision: 1, ModRevision: 1, Version: 1},
{Key: []byte("foo"), Value: []byte("bar0"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
},
},
{
1,
[]storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar1"), CreateRevision: 1, ModRevision: 2, Version: 2},
{Key: []byte("foo"), Value: []byte("bar1"), CreateRevision: 1, ModRevision: 2, Version: 2, Lease: 2},
},
},
{
@ -575,7 +575,7 @@ func TestKVCompactReserveLastValue(t *testing.T) {
{
3,
[]storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar2"), CreateRevision: 4, ModRevision: 4, Version: 1},
{Key: []byte("foo"), Value: []byte("bar2"), CreateRevision: 4, ModRevision: 4, Version: 1, Lease: 3},
},
},
}
@ -598,9 +598,9 @@ func TestKVCompactBad(t *testing.T) {
s := newDefaultStore(tmpPath)
defer cleanup(s, tmpPath)
s.Put([]byte("foo"), []byte("bar0"))
s.Put([]byte("foo"), []byte("bar1"))
s.Put([]byte("foo"), []byte("bar2"))
s.Put([]byte("foo"), []byte("bar0"), NoLease)
s.Put([]byte("foo"), []byte("bar1"), NoLease)
s.Put([]byte("foo"), []byte("bar2"), NoLease)
// rev in tests will be called in Compact() one by one on the same store
tests := []struct {
@ -628,8 +628,8 @@ func TestKVHash(t *testing.T) {
for i := 0; i < len(hashes); i++ {
var err error
kv := newDefaultStore(tmpPath)
kv.Put([]byte("foo0"), []byte("bar0"))
kv.Put([]byte("foo1"), []byte("bar0"))
kv.Put([]byte("foo0"), []byte("bar0"), NoLease)
kv.Put([]byte("foo1"), []byte("bar0"), NoLease)
hashes[i], err = kv.Hash()
if err != nil {
t.Fatalf("failed to get hash: %v", err)
@ -647,18 +647,18 @@ func TestKVHash(t *testing.T) {
func TestKVRestore(t *testing.T) {
tests := []func(kv KV){
func(kv KV) {
kv.Put([]byte("foo"), []byte("bar0"))
kv.Put([]byte("foo"), []byte("bar1"))
kv.Put([]byte("foo"), []byte("bar2"))
kv.Put([]byte("foo"), []byte("bar0"), 1)
kv.Put([]byte("foo"), []byte("bar1"), 2)
kv.Put([]byte("foo"), []byte("bar2"), 3)
},
func(kv KV) {
kv.Put([]byte("foo"), []byte("bar0"))
kv.Put([]byte("foo"), []byte("bar0"), 1)
kv.DeleteRange([]byte("foo"), nil)
kv.Put([]byte("foo"), []byte("bar1"))
kv.Put([]byte("foo"), []byte("bar1"), 2)
},
func(kv KV) {
kv.Put([]byte("foo"), []byte("bar0"))
kv.Put([]byte("foo"), []byte("bar1"))
kv.Put([]byte("foo"), []byte("bar0"), 1)
kv.Put([]byte("foo"), []byte("bar1"), 2)
kv.Compact(1)
},
}
@ -693,13 +693,13 @@ func TestKVSnapshot(t *testing.T) {
s := newDefaultStore(tmpPath)
defer cleanup(s, tmpPath)
s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo1"), []byte("bar1"))
s.Put([]byte("foo2"), []byte("bar2"))
s.Put([]byte("foo"), []byte("bar"), 1)
s.Put([]byte("foo1"), []byte("bar1"), 2)
s.Put([]byte("foo2"), []byte("bar2"), 3)
wkvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1},
{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1},
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2},
{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3},
}
f, err := os.Create("new_test")
@ -738,7 +738,7 @@ func TestWatchableKVWatch(t *testing.T) {
wid, cancel := w.Watch([]byte("foo"), true, 0)
defer cancel()
s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo"), []byte("bar"), 1)
select {
case resp := <-w.Chan():
wev := storagepb.Event{
@ -749,6 +749,7 @@ func TestWatchableKVWatch(t *testing.T) {
CreateRevision: 1,
ModRevision: 1,
Version: 1,
Lease: 1,
},
}
if resp.WatchID != wid {
@ -762,7 +763,7 @@ func TestWatchableKVWatch(t *testing.T) {
t.Fatalf("failed to watch the event")
}
s.Put([]byte("foo1"), []byte("bar1"))
s.Put([]byte("foo1"), []byte("bar1"), 2)
select {
case resp := <-w.Chan():
wev := storagepb.Event{
@ -773,6 +774,7 @@ func TestWatchableKVWatch(t *testing.T) {
CreateRevision: 2,
ModRevision: 2,
Version: 1,
Lease: 2,
},
}
if resp.WatchID != wid {
@ -802,6 +804,7 @@ func TestWatchableKVWatch(t *testing.T) {
CreateRevision: 2,
ModRevision: 2,
Version: 1,
Lease: 2,
},
}
if resp.WatchID != wid {
@ -815,7 +818,7 @@ func TestWatchableKVWatch(t *testing.T) {
t.Fatalf("failed to watch the event")
}
s.Put([]byte("foo1"), []byte("bar11"))
s.Put([]byte("foo1"), []byte("bar11"), 3)
select {
case resp := <-w.Chan():
wev := storagepb.Event{
@ -826,6 +829,7 @@ func TestWatchableKVWatch(t *testing.T) {
CreateRevision: 2,
ModRevision: 3,
Version: 2,
Lease: 3,
},
}
if resp.WatchID != wid {

View File

@ -39,6 +39,8 @@ var (
markBytePosition = markedRevBytesLen - 1
markTombstone byte = 't'
NoLease = LeaseID(0)
scheduledCompactKeyName = []byte("scheduledCompactRev")
finishedCompactKeyName = []byte("finishedCompactRev")
@ -95,9 +97,9 @@ func (s *store) Rev() int64 {
return s.currentRev.main
}
func (s *store) Put(key, value []byte) int64 {
func (s *store) Put(key, value []byte, lease LeaseID) int64 {
id := s.TxnBegin()
s.put(key, value)
s.put(key, value, lease)
s.txnEnd(id)
putCounter.Inc()
@ -170,12 +172,12 @@ func (s *store) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (k
return s.rangeKeys(key, end, limit, rangeRev)
}
func (s *store) TxnPut(txnID int64, key, value []byte) (rev int64, err error) {
func (s *store) TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) {
if txnID != s.txnID {
return 0, ErrTxnIDMismatch
}
s.put(key, value)
s.put(key, value, lease)
return int64(s.currentRev.main + 1), nil
}
@ -351,7 +353,7 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
return kvs, rev, nil
}
func (s *store) put(key, value []byte) {
func (s *store) put(key, value []byte, lease LeaseID) {
rev := s.currentRev.main + 1
c := rev
@ -371,6 +373,7 @@ func (s *store) put(key, value []byte) {
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(lease),
}
d, err := kv.Marshal()

View File

@ -30,7 +30,7 @@ func BenchmarkStorePut(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.Put(keys[i], vals[i])
s.Put(keys[i], vals[i], NoLease)
}
}
@ -49,7 +49,7 @@ func BenchmarkStoreTxnPut(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
id := s.TxnBegin()
if _, err := s.TxnPut(id, keys[i], vals[i]); err != nil {
if _, err := s.TxnPut(id, keys[i], vals[i], NoLease); err != nil {
log.Fatalf("txn put error: %v", err)
}
s.TxnEnd(id)

View File

@ -33,7 +33,7 @@ func TestStoreRev(t *testing.T) {
defer os.Remove(tmpPath)
for i := 0; i < 3; i++ {
s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo"), []byte("bar"), NoLease)
if r := s.Rev(); r != int64(i+1) {
t.Errorf("#%d: rev = %d, want %d", i, r, i+1)
}
@ -61,6 +61,7 @@ func TestStorePut(t *testing.T) {
CreateRevision: 2,
ModRevision: 2,
Version: 1,
Lease: 1,
},
revision{2, 0},
},
@ -75,6 +76,7 @@ func TestStorePut(t *testing.T) {
CreateRevision: 2,
ModRevision: 2,
Version: 2,
Lease: 2,
},
revision{2, 1},
},
@ -89,6 +91,7 @@ func TestStorePut(t *testing.T) {
CreateRevision: 2,
ModRevision: 3,
Version: 3,
Lease: 3,
},
revision{3, 0},
},
@ -102,7 +105,7 @@ func TestStorePut(t *testing.T) {
s.tx = b.BatchTx()
fi.indexGetRespc <- tt.r
s.put([]byte("foo"), []byte("bar"))
s.put([]byte("foo"), []byte("bar"), LeaseID(i+1))
data, err := tt.wkv.Marshal()
if err != nil {
@ -357,9 +360,9 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
s0 := newDefaultStore(tmpPath)
defer os.Remove(tmpPath)
s0.Put([]byte("foo"), []byte("bar"))
s0.Put([]byte("foo"), []byte("bar1"))
s0.Put([]byte("foo"), []byte("bar2"))
s0.Put([]byte("foo"), []byte("bar"), NoLease)
s0.Put([]byte("foo"), []byte("bar1"), NoLease)
s0.Put([]byte("foo"), []byte("bar2"), NoLease)
// write scheduled compaction, but not do compaction
rbytes := newRevBytes()
@ -416,7 +419,7 @@ func TestTxnPut(t *testing.T) {
id := s.TxnBegin()
base := int64(i + 1)
rev, err := s.TxnPut(id, keys[i], vals[i])
rev, err := s.TxnPut(id, keys[i], vals[i], NoLease)
if err != nil {
t.Error("txn put error")
}

View File

@ -65,11 +65,11 @@ func newWatchableStore(path string) *watchableStore {
return s
}
func (s *watchableStore) Put(key, value []byte) (rev int64) {
func (s *watchableStore) Put(key, value []byte, lease LeaseID) (rev int64) {
s.mu.Lock()
defer s.mu.Unlock()
rev = s.store.Put(key, value)
rev = s.store.Put(key, value, lease)
// TODO: avoid this range
kvs, _, err := s.store.Range(key, nil, 0, rev)
if err != nil {
@ -111,8 +111,8 @@ func (s *watchableStore) TxnBegin() int64 {
return s.store.TxnBegin()
}
func (s *watchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err error) {
rev, err = s.store.TxnPut(txnID, key, value)
func (s *watchableStore) TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) {
rev, err = s.store.TxnPut(txnID, key, value, lease)
if err == nil {
s.tx.put(string(key))
}

View File

@ -52,7 +52,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
// and force watchers to be in unsynced.
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue)
s.Put(testKey, testValue, NoLease)
w := s.NewWatchStream()
@ -90,7 +90,7 @@ func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
// Put a key so that we can spawn watchers on that key
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue)
s.Put(testKey, testValue, NoLease)
w := s.NewWatchStream()

View File

@ -31,7 +31,7 @@ func TestWatch(t *testing.T) {
}()
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue)
s.Put(testKey, testValue, NoLease)
w := s.NewWatchStream()
w.Watch(testKey, true, 0)
@ -50,7 +50,7 @@ func TestNewWatcherCancel(t *testing.T) {
}()
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue)
s.Put(testKey, testValue, NoLease)
w := s.NewWatchStream()
_, cancel := w.Watch(testKey, true, 0)
@ -89,7 +89,7 @@ func TestCancelUnsynced(t *testing.T) {
// and force watchers to be in unsynced.
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue)
s.Put(testKey, testValue, NoLease)
w := s.NewWatchStream()
@ -135,7 +135,7 @@ func TestSyncWatchers(t *testing.T) {
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue)
s.Put(testKey, testValue, NoLease)
w := s.NewWatchStream()
@ -210,7 +210,7 @@ func TestUnsafeAddWatcher(t *testing.T) {
}()
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue)
s.Put(testKey, testValue, NoLease)
size := 10
ws := make([]*watcher, size)

View File

@ -34,7 +34,7 @@ func TestWatcherWatchID(t *testing.T) {
}
idm[id] = struct{}{}
s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo"), []byte("bar"), NoLease)
resp := <-w.Chan()
if resp.WatchID != id {
@ -44,7 +44,7 @@ func TestWatcherWatchID(t *testing.T) {
cancel()
}
s.Put([]byte("foo2"), []byte("bar"))
s.Put([]byte("foo2"), []byte("bar"), NoLease)
// unsynced watchers
for i := 10; i < 20; i++ {

View File

@ -21,6 +21,7 @@ import (
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra"
"github.com/coreos/etcd/storage"
)
// storagePutCmd represents a storage put performance benchmarking tool
@ -72,13 +73,13 @@ func storagePutFunc(cmd *cobra.Command, args []string) {
if txn {
id := s.TxnBegin()
if _, err := s.TxnPut(id, keys[i], vals[i]); err != nil {
if _, err := s.TxnPut(id, keys[i], vals[i], storage.NoLease); err != nil {
fmt.Errorf("txn put error: %v", err)
os.Exit(1)
}
s.TxnEnd(id)
} else {
s.Put(keys[i], vals[i])
s.Put(keys[i], vals[i], storage.NoLease)
}
end := time.Now()