From 4336278b44caf8ac7400cd48df5d34d88c7711a4 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 4 Jan 2016 15:12:59 -0800 Subject: [PATCH] *: support put with lease --- etcdserver/v3demo_server.go | 4 +- storage/consistent_watchable_store.go | 8 +- storage/consistent_watchable_store_test.go | 6 +- storage/kv.go | 10 +- storage/kv_test.go | 134 +++++++++++---------- storage/kvstore.go | 13 +- storage/kvstore_bench_test.go | 4 +- storage/kvstore_test.go | 15 ++- storage/watchable_store.go | 8 +- storage/watchable_store_bench_test.go | 4 +- storage/watchable_store_test.go | 10 +- storage/watcher_test.go | 4 +- tools/benchmark/cmd/storage-put.go | 5 +- 13 files changed, 120 insertions(+), 105 deletions(-) diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index 6746041ff..a1632ccef 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -143,12 +143,12 @@ func applyPut(txnID int64, kv dstorage.KV, p *pb.PutRequest) (*pb.PutResponse, e err error ) if txnID != noTxn { - rev, err = kv.TxnPut(txnID, p.Key, p.Value) + rev, err = kv.TxnPut(txnID, p.Key, p.Value, dstorage.LeaseID(p.Lease)) if err != nil { return nil, err } } else { - rev = kv.Put(p.Key, p.Value) + rev = kv.Put(p.Key, p.Value, dstorage.LeaseID(p.Lease)) } resp.Header.Revision = rev return resp, nil diff --git a/storage/consistent_watchable_store.go b/storage/consistent_watchable_store.go index 245230336..78002eef2 100644 --- a/storage/consistent_watchable_store.go +++ b/storage/consistent_watchable_store.go @@ -59,9 +59,9 @@ func newConsistentWatchableStore(path string, ig ConsistentIndexGetter) *consist } } -func (s *consistentWatchableStore) Put(key, value []byte) (rev int64) { +func (s *consistentWatchableStore) Put(key, value []byte, lease LeaseID) (rev int64) { id := s.TxnBegin() - rev, err := s.TxnPut(id, key, value) + rev, err := s.TxnPut(id, key, value, lease) if err != nil { log.Panicf("unexpected TxnPut error (%v)", err) } @@ -109,11 +109,11 @@ func (s *consistentWatchableStore) TxnRange(txnID int64, key, end []byte, limit, return s.watchableStore.TxnRange(txnID, key, end, limit, rangeRev) } -func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err error) { +func (s *consistentWatchableStore) TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) { if s.skip { return 0, nil } - return s.watchableStore.TxnPut(txnID, key, value) + return s.watchableStore.TxnPut(txnID, key, value, lease) } func (s *consistentWatchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) { diff --git a/storage/consistent_watchable_store_test.go b/storage/consistent_watchable_store_test.go index 95b6c33be..d4a01b940 100644 --- a/storage/consistent_watchable_store_test.go +++ b/storage/consistent_watchable_store_test.go @@ -28,7 +28,7 @@ func TestConsistentWatchableStoreConsistentIndex(t *testing.T) { tests := []uint64{1, 2, 3, 5, 10} for i, tt := range tests { idx = indexVal(tt) - s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo"), []byte("bar"), NoLease) id := s.TxnBegin() g := s.consistentIndex() @@ -44,10 +44,10 @@ func TestConsistentWatchableStoreSkip(t *testing.T) { s := newConsistentWatchableStore(tmpPath, &idx) defer cleanup(s, tmpPath) - s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo"), []byte("bar"), NoLease) // put is skipped - rev := s.Put([]byte("foo"), []byte("bar")) + rev := s.Put([]byte("foo"), []byte("bar"), NoLease) if rev != 0 { t.Errorf("rev = %d, want 0", rev) } diff --git a/storage/kv.go b/storage/kv.go index fe3bf6f04..8069e1a56 100644 --- a/storage/kv.go +++ b/storage/kv.go @@ -25,6 +25,8 @@ type CancelFunc func() type Snapshot backend.Snapshot +type LeaseID int64 + type KV interface { // Rev returns the current revision of the KV. Rev() int64 @@ -37,9 +39,11 @@ type KV interface { // If the required rev is compacted, ErrCompacted will be returned. Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) - // Put puts the given key,value into the store. + // Put puts the given key, value into the store. Put also takes additional argument lease to + // attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease + // id. // A put also increases the rev of the store, and generates one event in the event history. - Put(key, value []byte) (rev int64) + Put(key, value []byte, lease LeaseID) (rev int64) // DeleteRange deletes the given range from the store. // A deleteRange increases the rev of the store if any key in the range exists. @@ -57,7 +61,7 @@ type KV interface { // TxnEnd ends the on-going txn with txn ID. If the on-going txn ID is not matched, error is returned. TxnEnd(txnID int64) error TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) - TxnPut(txnID int64, key, value []byte) (rev int64, err error) + TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) Compact(rev int64) error diff --git a/storage/kv_test.go b/storage/kv_test.go index 5e43dedff..b6e29d38b 100644 --- a/storage/kv_test.go +++ b/storage/kv_test.go @@ -34,7 +34,7 @@ import ( type ( rangeFunc func(kv KV, key, end []byte, limit, rangeRev int64) ([]storagepb.KeyValue, int64, error) - putFunc func(kv KV, key, value []byte) int64 + putFunc func(kv KV, key, value []byte, lease LeaseID) int64 deleteRangeFunc func(kv KV, key, end []byte) (n, rev int64) ) @@ -48,13 +48,13 @@ var ( return kv.TxnRange(id, key, end, limit, rangeRev) } - normalPutFunc = func(kv KV, key, value []byte) int64 { - return kv.Put(key, value) + normalPutFunc = func(kv KV, key, value []byte, lease LeaseID) int64 { + return kv.Put(key, value, lease) } - txnPutFunc = func(kv KV, key, value []byte) int64 { + txnPutFunc = func(kv KV, key, value []byte, lease LeaseID) int64 { id := kv.TxnBegin() defer kv.TxnEnd(id) - rev, err := kv.TxnPut(id, key, value) + rev, err := kv.TxnPut(id, key, value, lease) if err != nil { panic("txn put error") } @@ -92,13 +92,13 @@ func testKVRange(t *testing.T, f rangeFunc) { s := newDefaultStore(tmpPath) defer cleanup(s, tmpPath) - s.Put([]byte("foo"), []byte("bar")) - s.Put([]byte("foo1"), []byte("bar1")) - s.Put([]byte("foo2"), []byte("bar2")) + s.Put([]byte("foo"), []byte("bar"), 1) + s.Put([]byte("foo1"), []byte("bar1"), 2) + s.Put([]byte("foo2"), []byte("bar2"), 3) kvs := []storagepb.KeyValue{ - {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, - {Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1}, - {Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1}, + {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1}, + {Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2}, + {Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3}, } wrev := int64(3) @@ -159,13 +159,13 @@ func testKVRangeRev(t *testing.T, f rangeFunc) { s := newDefaultStore(tmpPath) defer cleanup(s, tmpPath) - s.Put([]byte("foo"), []byte("bar")) - s.Put([]byte("foo1"), []byte("bar1")) - s.Put([]byte("foo2"), []byte("bar2")) + s.Put([]byte("foo"), []byte("bar"), 1) + s.Put([]byte("foo1"), []byte("bar1"), 2) + s.Put([]byte("foo2"), []byte("bar2"), 3) kvs := []storagepb.KeyValue{ - {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, - {Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1}, - {Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1}, + {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1}, + {Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2}, + {Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3}, } tests := []struct { @@ -201,9 +201,9 @@ func testKVRangeBadRev(t *testing.T, f rangeFunc) { s := newDefaultStore(tmpPath) defer cleanup(s, tmpPath) - s.Put([]byte("foo"), []byte("bar")) - s.Put([]byte("foo1"), []byte("bar1")) - s.Put([]byte("foo2"), []byte("bar2")) + s.Put([]byte("foo"), []byte("bar"), NoLease) + s.Put([]byte("foo1"), []byte("bar1"), NoLease) + s.Put([]byte("foo2"), []byte("bar2"), NoLease) if err := s.Compact(3); err != nil { t.Fatalf("compact error (%v)", err) } @@ -233,13 +233,13 @@ func testKVRangeLimit(t *testing.T, f rangeFunc) { s := newDefaultStore(tmpPath) defer cleanup(s, tmpPath) - s.Put([]byte("foo"), []byte("bar")) - s.Put([]byte("foo1"), []byte("bar1")) - s.Put([]byte("foo2"), []byte("bar2")) + s.Put([]byte("foo"), []byte("bar"), 1) + s.Put([]byte("foo1"), []byte("bar1"), 2) + s.Put([]byte("foo2"), []byte("bar2"), 3) kvs := []storagepb.KeyValue{ - {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, - {Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1}, - {Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1}, + {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1}, + {Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2}, + {Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3}, } wrev := int64(3) @@ -280,7 +280,7 @@ func testKVPutMultipleTimes(t *testing.T, f putFunc) { for i := 0; i < 10; i++ { base := int64(i + 1) - rev := f(s, []byte("foo"), []byte("bar")) + rev := f(s, []byte("foo"), []byte("bar"), LeaseID(base)) if rev != base { t.Errorf("#%d: rev = %d, want %d", i, rev, base) } @@ -290,7 +290,7 @@ func testKVPutMultipleTimes(t *testing.T, f putFunc) { t.Fatal(err) } wkvs := []storagepb.KeyValue{ - {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: base, Version: base}, + {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: base, Version: base, Lease: base}, } if !reflect.DeepEqual(kvs, wkvs) { t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, wkvs) @@ -337,9 +337,9 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) { for i, tt := range tests { s := newDefaultStore(tmpPath) - s.Put([]byte("foo"), []byte("bar")) - s.Put([]byte("foo1"), []byte("bar1")) - s.Put([]byte("foo2"), []byte("bar2")) + s.Put([]byte("foo"), []byte("bar"), NoLease) + s.Put([]byte("foo1"), []byte("bar1"), NoLease) + s.Put([]byte("foo2"), []byte("bar2"), NoLease) n, rev := f(s, tt.key, tt.end) if n != tt.wN || rev != tt.wrev { @@ -357,7 +357,7 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) { s := newDefaultStore(tmpPath) defer cleanup(s, tmpPath) - s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo"), []byte("bar"), NoLease) n, rev := f(s, []byte("foo"), nil) if n != 1 || rev != 2 { @@ -381,7 +381,7 @@ func TestKVOperationInSequence(t *testing.T) { base := int64(i * 2) // put foo - rev := s.Put([]byte("foo"), []byte("bar")) + rev := s.Put([]byte("foo"), []byte("bar"), NoLease) if rev != base+1 { t.Errorf("#%d: put rev = %d, want %d", i, rev, base+1) } @@ -391,7 +391,7 @@ func TestKVOperationInSequence(t *testing.T) { t.Fatal(err) } wkvs := []storagepb.KeyValue{ - {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: base + 1, ModRevision: base + 1, Version: 1}, + {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: base + 1, ModRevision: base + 1, Version: 1, Lease: int64(NoLease)}, } if !reflect.DeepEqual(kvs, wkvs) { t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, wkvs) @@ -425,7 +425,7 @@ func TestKVTxnBlockNonTnxOperations(t *testing.T) { tests := []func(){ func() { s.Range([]byte("foo"), nil, 0, 0) }, - func() { s.Put([]byte("foo"), nil) }, + func() { s.Put([]byte("foo"), nil, NoLease) }, func() { s.DeleteRange([]byte("foo"), nil) }, } for i, tt := range tests { @@ -463,7 +463,7 @@ func TestKVTxnWrongID(t *testing.T) { return err }, func() error { - _, err := s.TxnPut(wrongid, []byte("foo"), nil) + _, err := s.TxnPut(wrongid, []byte("foo"), nil, NoLease) return err }, func() error { @@ -495,7 +495,7 @@ func TestKVTnxOperationInSequence(t *testing.T) { base := int64(i) // put foo - rev, err := s.TxnPut(id, []byte("foo"), []byte("bar")) + rev, err := s.TxnPut(id, []byte("foo"), []byte("bar"), NoLease) if err != nil { t.Fatal(err) } @@ -508,7 +508,7 @@ func TestKVTnxOperationInSequence(t *testing.T) { t.Fatal(err) } wkvs := []storagepb.KeyValue{ - {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: base + 1, ModRevision: base + 1, Version: 1}, + {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: base + 1, ModRevision: base + 1, Version: 1, Lease: int64(NoLease)}, } if !reflect.DeepEqual(kvs, wkvs) { t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, wkvs) @@ -545,10 +545,10 @@ func TestKVCompactReserveLastValue(t *testing.T) { s := newDefaultStore(tmpPath) defer cleanup(s, tmpPath) - s.Put([]byte("foo"), []byte("bar0")) - s.Put([]byte("foo"), []byte("bar1")) + s.Put([]byte("foo"), []byte("bar0"), 1) + s.Put([]byte("foo"), []byte("bar1"), 2) s.DeleteRange([]byte("foo"), nil) - s.Put([]byte("foo"), []byte("bar2")) + s.Put([]byte("foo"), []byte("bar2"), 3) // rev in tests will be called in Compact() one by one on the same store tests := []struct { @@ -559,13 +559,13 @@ func TestKVCompactReserveLastValue(t *testing.T) { { 0, []storagepb.KeyValue{ - {Key: []byte("foo"), Value: []byte("bar0"), CreateRevision: 1, ModRevision: 1, Version: 1}, + {Key: []byte("foo"), Value: []byte("bar0"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1}, }, }, { 1, []storagepb.KeyValue{ - {Key: []byte("foo"), Value: []byte("bar1"), CreateRevision: 1, ModRevision: 2, Version: 2}, + {Key: []byte("foo"), Value: []byte("bar1"), CreateRevision: 1, ModRevision: 2, Version: 2, Lease: 2}, }, }, { @@ -575,7 +575,7 @@ func TestKVCompactReserveLastValue(t *testing.T) { { 3, []storagepb.KeyValue{ - {Key: []byte("foo"), Value: []byte("bar2"), CreateRevision: 4, ModRevision: 4, Version: 1}, + {Key: []byte("foo"), Value: []byte("bar2"), CreateRevision: 4, ModRevision: 4, Version: 1, Lease: 3}, }, }, } @@ -598,9 +598,9 @@ func TestKVCompactBad(t *testing.T) { s := newDefaultStore(tmpPath) defer cleanup(s, tmpPath) - s.Put([]byte("foo"), []byte("bar0")) - s.Put([]byte("foo"), []byte("bar1")) - s.Put([]byte("foo"), []byte("bar2")) + s.Put([]byte("foo"), []byte("bar0"), NoLease) + s.Put([]byte("foo"), []byte("bar1"), NoLease) + s.Put([]byte("foo"), []byte("bar2"), NoLease) // rev in tests will be called in Compact() one by one on the same store tests := []struct { @@ -628,8 +628,8 @@ func TestKVHash(t *testing.T) { for i := 0; i < len(hashes); i++ { var err error kv := newDefaultStore(tmpPath) - kv.Put([]byte("foo0"), []byte("bar0")) - kv.Put([]byte("foo1"), []byte("bar0")) + kv.Put([]byte("foo0"), []byte("bar0"), NoLease) + kv.Put([]byte("foo1"), []byte("bar0"), NoLease) hashes[i], err = kv.Hash() if err != nil { t.Fatalf("failed to get hash: %v", err) @@ -647,18 +647,18 @@ func TestKVHash(t *testing.T) { func TestKVRestore(t *testing.T) { tests := []func(kv KV){ func(kv KV) { - kv.Put([]byte("foo"), []byte("bar0")) - kv.Put([]byte("foo"), []byte("bar1")) - kv.Put([]byte("foo"), []byte("bar2")) + kv.Put([]byte("foo"), []byte("bar0"), 1) + kv.Put([]byte("foo"), []byte("bar1"), 2) + kv.Put([]byte("foo"), []byte("bar2"), 3) }, func(kv KV) { - kv.Put([]byte("foo"), []byte("bar0")) + kv.Put([]byte("foo"), []byte("bar0"), 1) kv.DeleteRange([]byte("foo"), nil) - kv.Put([]byte("foo"), []byte("bar1")) + kv.Put([]byte("foo"), []byte("bar1"), 2) }, func(kv KV) { - kv.Put([]byte("foo"), []byte("bar0")) - kv.Put([]byte("foo"), []byte("bar1")) + kv.Put([]byte("foo"), []byte("bar0"), 1) + kv.Put([]byte("foo"), []byte("bar1"), 2) kv.Compact(1) }, } @@ -693,13 +693,13 @@ func TestKVSnapshot(t *testing.T) { s := newDefaultStore(tmpPath) defer cleanup(s, tmpPath) - s.Put([]byte("foo"), []byte("bar")) - s.Put([]byte("foo1"), []byte("bar1")) - s.Put([]byte("foo2"), []byte("bar2")) + s.Put([]byte("foo"), []byte("bar"), 1) + s.Put([]byte("foo1"), []byte("bar1"), 2) + s.Put([]byte("foo2"), []byte("bar2"), 3) wkvs := []storagepb.KeyValue{ - {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1}, - {Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1}, - {Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1}, + {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1}, + {Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2}, + {Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3}, } f, err := os.Create("new_test") @@ -738,7 +738,7 @@ func TestWatchableKVWatch(t *testing.T) { wid, cancel := w.Watch([]byte("foo"), true, 0) defer cancel() - s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo"), []byte("bar"), 1) select { case resp := <-w.Chan(): wev := storagepb.Event{ @@ -749,6 +749,7 @@ func TestWatchableKVWatch(t *testing.T) { CreateRevision: 1, ModRevision: 1, Version: 1, + Lease: 1, }, } if resp.WatchID != wid { @@ -762,7 +763,7 @@ func TestWatchableKVWatch(t *testing.T) { t.Fatalf("failed to watch the event") } - s.Put([]byte("foo1"), []byte("bar1")) + s.Put([]byte("foo1"), []byte("bar1"), 2) select { case resp := <-w.Chan(): wev := storagepb.Event{ @@ -773,6 +774,7 @@ func TestWatchableKVWatch(t *testing.T) { CreateRevision: 2, ModRevision: 2, Version: 1, + Lease: 2, }, } if resp.WatchID != wid { @@ -802,6 +804,7 @@ func TestWatchableKVWatch(t *testing.T) { CreateRevision: 2, ModRevision: 2, Version: 1, + Lease: 2, }, } if resp.WatchID != wid { @@ -815,7 +818,7 @@ func TestWatchableKVWatch(t *testing.T) { t.Fatalf("failed to watch the event") } - s.Put([]byte("foo1"), []byte("bar11")) + s.Put([]byte("foo1"), []byte("bar11"), 3) select { case resp := <-w.Chan(): wev := storagepb.Event{ @@ -826,6 +829,7 @@ func TestWatchableKVWatch(t *testing.T) { CreateRevision: 2, ModRevision: 3, Version: 2, + Lease: 3, }, } if resp.WatchID != wid { diff --git a/storage/kvstore.go b/storage/kvstore.go index bbc466508..1da2df82e 100644 --- a/storage/kvstore.go +++ b/storage/kvstore.go @@ -39,6 +39,8 @@ var ( markBytePosition = markedRevBytesLen - 1 markTombstone byte = 't' + NoLease = LeaseID(0) + scheduledCompactKeyName = []byte("scheduledCompactRev") finishedCompactKeyName = []byte("finishedCompactRev") @@ -95,9 +97,9 @@ func (s *store) Rev() int64 { return s.currentRev.main } -func (s *store) Put(key, value []byte) int64 { +func (s *store) Put(key, value []byte, lease LeaseID) int64 { id := s.TxnBegin() - s.put(key, value) + s.put(key, value, lease) s.txnEnd(id) putCounter.Inc() @@ -170,12 +172,12 @@ func (s *store) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (k return s.rangeKeys(key, end, limit, rangeRev) } -func (s *store) TxnPut(txnID int64, key, value []byte) (rev int64, err error) { +func (s *store) TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) { if txnID != s.txnID { return 0, ErrTxnIDMismatch } - s.put(key, value) + s.put(key, value, lease) return int64(s.currentRev.main + 1), nil } @@ -351,7 +353,7 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage return kvs, rev, nil } -func (s *store) put(key, value []byte) { +func (s *store) put(key, value []byte, lease LeaseID) { rev := s.currentRev.main + 1 c := rev @@ -371,6 +373,7 @@ func (s *store) put(key, value []byte) { CreateRevision: c, ModRevision: rev, Version: ver, + Lease: int64(lease), } d, err := kv.Marshal() diff --git a/storage/kvstore_bench_test.go b/storage/kvstore_bench_test.go index 6d53dc5f7..a4e58cfeb 100644 --- a/storage/kvstore_bench_test.go +++ b/storage/kvstore_bench_test.go @@ -30,7 +30,7 @@ func BenchmarkStorePut(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - s.Put(keys[i], vals[i]) + s.Put(keys[i], vals[i], NoLease) } } @@ -49,7 +49,7 @@ func BenchmarkStoreTxnPut(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { id := s.TxnBegin() - if _, err := s.TxnPut(id, keys[i], vals[i]); err != nil { + if _, err := s.TxnPut(id, keys[i], vals[i], NoLease); err != nil { log.Fatalf("txn put error: %v", err) } s.TxnEnd(id) diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go index 64fa2156e..4700d2707 100644 --- a/storage/kvstore_test.go +++ b/storage/kvstore_test.go @@ -33,7 +33,7 @@ func TestStoreRev(t *testing.T) { defer os.Remove(tmpPath) for i := 0; i < 3; i++ { - s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo"), []byte("bar"), NoLease) if r := s.Rev(); r != int64(i+1) { t.Errorf("#%d: rev = %d, want %d", i, r, i+1) } @@ -61,6 +61,7 @@ func TestStorePut(t *testing.T) { CreateRevision: 2, ModRevision: 2, Version: 1, + Lease: 1, }, revision{2, 0}, }, @@ -75,6 +76,7 @@ func TestStorePut(t *testing.T) { CreateRevision: 2, ModRevision: 2, Version: 2, + Lease: 2, }, revision{2, 1}, }, @@ -89,6 +91,7 @@ func TestStorePut(t *testing.T) { CreateRevision: 2, ModRevision: 3, Version: 3, + Lease: 3, }, revision{3, 0}, }, @@ -102,7 +105,7 @@ func TestStorePut(t *testing.T) { s.tx = b.BatchTx() fi.indexGetRespc <- tt.r - s.put([]byte("foo"), []byte("bar")) + s.put([]byte("foo"), []byte("bar"), LeaseID(i+1)) data, err := tt.wkv.Marshal() if err != nil { @@ -357,9 +360,9 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { s0 := newDefaultStore(tmpPath) defer os.Remove(tmpPath) - s0.Put([]byte("foo"), []byte("bar")) - s0.Put([]byte("foo"), []byte("bar1")) - s0.Put([]byte("foo"), []byte("bar2")) + s0.Put([]byte("foo"), []byte("bar"), NoLease) + s0.Put([]byte("foo"), []byte("bar1"), NoLease) + s0.Put([]byte("foo"), []byte("bar2"), NoLease) // write scheduled compaction, but not do compaction rbytes := newRevBytes() @@ -416,7 +419,7 @@ func TestTxnPut(t *testing.T) { id := s.TxnBegin() base := int64(i + 1) - rev, err := s.TxnPut(id, keys[i], vals[i]) + rev, err := s.TxnPut(id, keys[i], vals[i], NoLease) if err != nil { t.Error("txn put error") } diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 4ef68321e..0a9d523d1 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -65,11 +65,11 @@ func newWatchableStore(path string) *watchableStore { return s } -func (s *watchableStore) Put(key, value []byte) (rev int64) { +func (s *watchableStore) Put(key, value []byte, lease LeaseID) (rev int64) { s.mu.Lock() defer s.mu.Unlock() - rev = s.store.Put(key, value) + rev = s.store.Put(key, value, lease) // TODO: avoid this range kvs, _, err := s.store.Range(key, nil, 0, rev) if err != nil { @@ -111,8 +111,8 @@ func (s *watchableStore) TxnBegin() int64 { return s.store.TxnBegin() } -func (s *watchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err error) { - rev, err = s.store.TxnPut(txnID, key, value) +func (s *watchableStore) TxnPut(txnID int64, key, value []byte, lease LeaseID) (rev int64, err error) { + rev, err = s.store.TxnPut(txnID, key, value, lease) if err == nil { s.tx.put(string(key)) } diff --git a/storage/watchable_store_bench_test.go b/storage/watchable_store_bench_test.go index 350b7063e..2d8b7b6eb 100644 --- a/storage/watchable_store_bench_test.go +++ b/storage/watchable_store_bench_test.go @@ -52,7 +52,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { // and force watchers to be in unsynced. testKey := []byte("foo") testValue := []byte("bar") - s.Put(testKey, testValue) + s.Put(testKey, testValue, NoLease) w := s.NewWatchStream() @@ -90,7 +90,7 @@ func BenchmarkWatchableStoreSyncedCancel(b *testing.B) { // Put a key so that we can spawn watchers on that key testKey := []byte("foo") testValue := []byte("bar") - s.Put(testKey, testValue) + s.Put(testKey, testValue, NoLease) w := s.NewWatchStream() diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index 749ef15ea..fe1dc4298 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -31,7 +31,7 @@ func TestWatch(t *testing.T) { }() testKey := []byte("foo") testValue := []byte("bar") - s.Put(testKey, testValue) + s.Put(testKey, testValue, NoLease) w := s.NewWatchStream() w.Watch(testKey, true, 0) @@ -50,7 +50,7 @@ func TestNewWatcherCancel(t *testing.T) { }() testKey := []byte("foo") testValue := []byte("bar") - s.Put(testKey, testValue) + s.Put(testKey, testValue, NoLease) w := s.NewWatchStream() _, cancel := w.Watch(testKey, true, 0) @@ -89,7 +89,7 @@ func TestCancelUnsynced(t *testing.T) { // and force watchers to be in unsynced. testKey := []byte("foo") testValue := []byte("bar") - s.Put(testKey, testValue) + s.Put(testKey, testValue, NoLease) w := s.NewWatchStream() @@ -135,7 +135,7 @@ func TestSyncWatchers(t *testing.T) { testKey := []byte("foo") testValue := []byte("bar") - s.Put(testKey, testValue) + s.Put(testKey, testValue, NoLease) w := s.NewWatchStream() @@ -210,7 +210,7 @@ func TestUnsafeAddWatcher(t *testing.T) { }() testKey := []byte("foo") testValue := []byte("bar") - s.Put(testKey, testValue) + s.Put(testKey, testValue, NoLease) size := 10 ws := make([]*watcher, size) diff --git a/storage/watcher_test.go b/storage/watcher_test.go index acf4fd783..c6aa74f6c 100644 --- a/storage/watcher_test.go +++ b/storage/watcher_test.go @@ -34,7 +34,7 @@ func TestWatcherWatchID(t *testing.T) { } idm[id] = struct{}{} - s.Put([]byte("foo"), []byte("bar")) + s.Put([]byte("foo"), []byte("bar"), NoLease) resp := <-w.Chan() if resp.WatchID != id { @@ -44,7 +44,7 @@ func TestWatcherWatchID(t *testing.T) { cancel() } - s.Put([]byte("foo2"), []byte("bar")) + s.Put([]byte("foo2"), []byte("bar"), NoLease) // unsynced watchers for i := 10; i < 20; i++ { diff --git a/tools/benchmark/cmd/storage-put.go b/tools/benchmark/cmd/storage-put.go index 625bba151..1f0ca03bc 100644 --- a/tools/benchmark/cmd/storage-put.go +++ b/tools/benchmark/cmd/storage-put.go @@ -21,6 +21,7 @@ import ( "time" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/spf13/cobra" + "github.com/coreos/etcd/storage" ) // storagePutCmd represents a storage put performance benchmarking tool @@ -72,13 +73,13 @@ func storagePutFunc(cmd *cobra.Command, args []string) { if txn { id := s.TxnBegin() - if _, err := s.TxnPut(id, keys[i], vals[i]); err != nil { + if _, err := s.TxnPut(id, keys[i], vals[i], storage.NoLease); err != nil { fmt.Errorf("txn put error: %v", err) os.Exit(1) } s.TxnEnd(id) } else { - s.Put(keys[i], vals[i]) + s.Put(keys[i], vals[i], storage.NoLease) } end := time.Now()