*: tnx -> txn

This commit is contained in:
Xiang Li 2015-07-24 23:16:27 +08:00
parent b407f72766
commit 53a77fa519
9 changed files with 109 additions and 109 deletions

View File

@ -14,14 +14,14 @@
- more efficient/ low cost keep alive - more efficient/ low cost keep alive
- a logical group of TTL keys - a logical group of TTL keys
5. Replace CAS/CAD with multi-object Tnx 5. Replace CAS/CAD with multi-object Txn
- MUCH MORE powerful and flexible - MUCH MORE powerful and flexible
6. Support efficient watching with multiple ranges 6. Support efficient watching with multiple ranges
7. RPC API supports the completed set of APIs. 7. RPC API supports the completed set of APIs.
- more efficient than JSON/HTTP - more efficient than JSON/HTTP
- additional tnx/lease support - additional txn/lease support
8. HTTP API supports a subset of APIs. 8. HTTP API supports a subset of APIs.
- easy for people to try out etcd - easy for people to try out etcd
@ -97,9 +97,9 @@ RangeResponse {
} }
``` ```
#### Finish a tnx (assume we have foo0=bar0, foo1=bar1) #### Finish a txn (assume we have foo0=bar0, foo1=bar1)
``` ```
Tnx(TnxRequest { Txn(TxnRequest {
// mod_index of foo0 is equal to 1, mod_index of foo1 is greater than 1 // mod_index of foo0 is equal to 1, mod_index of foo1 is greater than 1
compare = { compare = {
{compareType = equal, key = foo0, mod_index = 1}, {compareType = equal, key = foo0, mod_index = 1},
@ -111,7 +111,7 @@ Tnx(TnxRequest {
failure = {PutRequest { key = foo2, value = failure }}, failure = {PutRequest { key = foo2, value = failure }},
) )
TnxResponse { TxnResponse {
cluster_id = 0x1000, cluster_id = 0x1000,
member_id = 0x1, member_id = 0x1,
index = 3, index = 3,

View File

@ -15,10 +15,10 @@ service etcd {
// and generates one event in the event history. // and generates one event in the event history.
rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse) {} rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse) {}
// Tnx processes all the requests in one transaction. // Txn processes all the requests in one transaction.
// A tnx request increases the index of the store, // A txn request increases the index of the store,
// and generates events with the same index in the event history. // and generates events with the same index in the event history.
rpc Tnx(TnxRequest) returns (TnxResponse) {} rpc Txn(TxnRequest) returns (TxnResponse) {}
// Watch watches the events happening or happened in etcd. Both input and output // Watch watches the events happening or happened in etcd. Both input and output
// are stream. One watch rpc can watch for multiple ranges and get a stream of // are stream. One watch rpc can watch for multiple ranges and get a stream of
@ -41,10 +41,10 @@ service etcd {
// LeaseAttach attaches keys with a lease. // LeaseAttach attaches keys with a lease.
rpc LeaseAttach(LeaseAttachRequest) returns (LeaseAttachResponse) {} rpc LeaseAttach(LeaseAttachRequest) returns (LeaseAttachResponse) {}
// LeaseTnx likes Tnx. It has two addition success and failure LeaseAttachRequest list. // LeaseTxn likes Txn. It has two addition success and failure LeaseAttachRequest list.
// If the Tnx is successful, then the success list will be executed. Or the failure list // If the Txn is successful, then the success list will be executed. Or the failure list
// will be executed. // will be executed.
rpc LeaseTnx(LeaseTnxRequest) returns (LeaseTnxResponse) {} rpc LeaseTxn(LeaseTxnRequest) returns (LeaseTxnResponse) {}
// KeepAlive keeps the lease alive. // KeepAlive keeps the lease alive.
rpc LeaseKeepAlive(stream LeaseKeepAliveRequest) returns (stream LeaseKeepAliveResponse) {} rpc LeaseKeepAlive(stream LeaseKeepAliveRequest) returns (stream LeaseKeepAliveResponse) {}
@ -157,13 +157,13 @@ message Compare {
// if guard evaluates to // if guard evaluates to
// true. // true.
// 3. A list of database operations called f op. Like t op, but executed if guard evaluates to false. // 3. A list of database operations called f op. Like t op, but executed if guard evaluates to false.
message TnxRequest { message TxnRequest {
repeated Compare compare = 1; repeated Compare compare = 1;
repeated RequestUnion success = 2; repeated RequestUnion success = 2;
repeated RequestUnion failure = 3; repeated RequestUnion failure = 3;
} }
message TnxResponse { message TxnResponse {
optional ResponseHeader header = 1; optional ResponseHeader header = 1;
optional bool succeeded = 2; optional bool succeeded = 2;
repeated ResponseUnion responses = 3; repeated ResponseUnion responses = 3;
@ -240,15 +240,15 @@ message LeaseRevokeResponse {
optional ResponseHeader header = 1; optional ResponseHeader header = 1;
} }
message LeaseTnxRequest { message LeaseTxnRequest {
optional TnxRequest request = 1; optional TxnRequest request = 1;
repeated LeaseAttachRequest success = 2; repeated LeaseAttachRequest success = 2;
repeated LeaseAttachRequest failure = 3; repeated LeaseAttachRequest failure = 3;
} }
message LeaseTnxResponse { message LeaseTxnResponse {
optional ResponseHeader header = 1; optional ResponseHeader header = 1;
optional TnxResponse response = 2; optional TxnResponse response = 2;
repeated LeaseAttachResponse attach_responses = 3; repeated LeaseAttachResponse attach_responses = 3;
} }

View File

@ -247,55 +247,55 @@ func (*Compare) ProtoMessage() {}
// if guard evaluates to // if guard evaluates to
// true. // true.
// 3. A list of database operations called f op. Like t op, but executed if guard evaluates to false. // 3. A list of database operations called f op. Like t op, but executed if guard evaluates to false.
type TnxRequest struct { type TxnRequest struct {
Compare []*Compare `protobuf:"bytes,1,rep,name=compare" json:"compare,omitempty"` Compare []*Compare `protobuf:"bytes,1,rep,name=compare" json:"compare,omitempty"`
Success []*RequestUnion `protobuf:"bytes,2,rep,name=success" json:"success,omitempty"` Success []*RequestUnion `protobuf:"bytes,2,rep,name=success" json:"success,omitempty"`
Failure []*RequestUnion `protobuf:"bytes,3,rep,name=failure" json:"failure,omitempty"` Failure []*RequestUnion `protobuf:"bytes,3,rep,name=failure" json:"failure,omitempty"`
} }
func (m *TnxRequest) Reset() { *m = TnxRequest{} } func (m *TxnRequest) Reset() { *m = TxnRequest{} }
func (m *TnxRequest) String() string { return proto.CompactTextString(m) } func (m *TxnRequest) String() string { return proto.CompactTextString(m) }
func (*TnxRequest) ProtoMessage() {} func (*TxnRequest) ProtoMessage() {}
func (m *TnxRequest) GetCompare() []*Compare { func (m *TxnRequest) GetCompare() []*Compare {
if m != nil { if m != nil {
return m.Compare return m.Compare
} }
return nil return nil
} }
func (m *TnxRequest) GetSuccess() []*RequestUnion { func (m *TxnRequest) GetSuccess() []*RequestUnion {
if m != nil { if m != nil {
return m.Success return m.Success
} }
return nil return nil
} }
func (m *TnxRequest) GetFailure() []*RequestUnion { func (m *TxnRequest) GetFailure() []*RequestUnion {
if m != nil { if m != nil {
return m.Failure return m.Failure
} }
return nil return nil
} }
type TnxResponse struct { type TxnResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
Succeeded bool `protobuf:"varint,2,opt,name=succeeded,proto3" json:"succeeded,omitempty"` Succeeded bool `protobuf:"varint,2,opt,name=succeeded,proto3" json:"succeeded,omitempty"`
Responses []*ResponseUnion `protobuf:"bytes,3,rep,name=responses" json:"responses,omitempty"` Responses []*ResponseUnion `protobuf:"bytes,3,rep,name=responses" json:"responses,omitempty"`
} }
func (m *TnxResponse) Reset() { *m = TnxResponse{} } func (m *TxnResponse) Reset() { *m = TxnResponse{} }
func (m *TnxResponse) String() string { return proto.CompactTextString(m) } func (m *TxnResponse) String() string { return proto.CompactTextString(m) }
func (*TnxResponse) ProtoMessage() {} func (*TxnResponse) ProtoMessage() {}
func (m *TnxResponse) GetHeader() *ResponseHeader { func (m *TxnResponse) GetHeader() *ResponseHeader {
if m != nil { if m != nil {
return m.Header return m.Header
} }
return nil return nil
} }
func (m *TnxResponse) GetResponses() []*ResponseUnion { func (m *TxnResponse) GetResponses() []*ResponseUnion {
if m != nil { if m != nil {
return m.Responses return m.Responses
} }
@ -1393,7 +1393,7 @@ func (m *Compare) Unmarshal(data []byte) error {
return nil return nil
} }
func (m *TnxRequest) Unmarshal(data []byte) error { func (m *TxnRequest) Unmarshal(data []byte) error {
l := len(data) l := len(data)
iNdEx := 0 iNdEx := 0
for iNdEx < l { for iNdEx < l {
@ -1510,7 +1510,7 @@ func (m *TnxRequest) Unmarshal(data []byte) error {
return nil return nil
} }
func (m *TnxResponse) Unmarshal(data []byte) error { func (m *TxnResponse) Unmarshal(data []byte) error {
l := len(data) l := len(data)
iNdEx := 0 iNdEx := 0
for iNdEx < l { for iNdEx < l {
@ -2024,7 +2024,7 @@ func (m *Compare) Size() (n int) {
return n return n
} }
func (m *TnxRequest) Size() (n int) { func (m *TxnRequest) Size() (n int) {
var l int var l int
_ = l _ = l
if len(m.Compare) > 0 { if len(m.Compare) > 0 {
@ -2048,7 +2048,7 @@ func (m *TnxRequest) Size() (n int) {
return n return n
} }
func (m *TnxResponse) Size() (n int) { func (m *TxnResponse) Size() (n int) {
var l int var l int
_ = l _ = l
if m.Header != nil { if m.Header != nil {
@ -2512,7 +2512,7 @@ func (m *Compare) MarshalTo(data []byte) (n int, err error) {
return i, nil return i, nil
} }
func (m *TnxRequest) Marshal() (data []byte, err error) { func (m *TxnRequest) Marshal() (data []byte, err error) {
size := m.Size() size := m.Size()
data = make([]byte, size) data = make([]byte, size)
n, err := m.MarshalTo(data) n, err := m.MarshalTo(data)
@ -2522,7 +2522,7 @@ func (m *TnxRequest) Marshal() (data []byte, err error) {
return data[:n], nil return data[:n], nil
} }
func (m *TnxRequest) MarshalTo(data []byte) (n int, err error) { func (m *TxnRequest) MarshalTo(data []byte) (n int, err error) {
var i int var i int
_ = i _ = i
var l int var l int
@ -2566,7 +2566,7 @@ func (m *TnxRequest) MarshalTo(data []byte) (n int, err error) {
return i, nil return i, nil
} }
func (m *TnxResponse) Marshal() (data []byte, err error) { func (m *TxnResponse) Marshal() (data []byte, err error) {
size := m.Size() size := m.Size()
data = make([]byte, size) data = make([]byte, size)
n, err := m.MarshalTo(data) n, err := m.MarshalTo(data)
@ -2576,7 +2576,7 @@ func (m *TnxResponse) Marshal() (data []byte, err error) {
return data[:n], nil return data[:n], nil
} }
func (m *TnxResponse) MarshalTo(data []byte) (n int, err error) { func (m *TxnResponse) MarshalTo(data []byte) (n int, err error) {
var i int var i int
_ = i _ = i
var l int var l int
@ -2708,10 +2708,10 @@ type EtcdClient interface {
// A delete request increase the index of the store, // A delete request increase the index of the store,
// and generates one event in the event history. // and generates one event in the event history.
DeleteRange(ctx context.Context, in *DeleteRangeRequest, opts ...grpc.CallOption) (*DeleteRangeResponse, error) DeleteRange(ctx context.Context, in *DeleteRangeRequest, opts ...grpc.CallOption) (*DeleteRangeResponse, error)
// Tnx processes all the requests in one transaction. // Txn processes all the requests in one transaction.
// A tnx request increases the index of the store, // A txn request increases the index of the store,
// and generates events with the same index in the event history. // and generates events with the same index in the event history.
Tnx(ctx context.Context, in *TnxRequest, opts ...grpc.CallOption) (*TnxResponse, error) Txn(ctx context.Context, in *TxnRequest, opts ...grpc.CallOption) (*TxnResponse, error)
// Compact compacts the event history in etcd. User should compact the // Compact compacts the event history in etcd. User should compact the
// event history periodically, or it will grow infinitely. // event history periodically, or it will grow infinitely.
Compact(ctx context.Context, in *CompactionRequest, opts ...grpc.CallOption) (*CompactionResponse, error) Compact(ctx context.Context, in *CompactionRequest, opts ...grpc.CallOption) (*CompactionResponse, error)
@ -2752,9 +2752,9 @@ func (c *etcdClient) DeleteRange(ctx context.Context, in *DeleteRangeRequest, op
return out, nil return out, nil
} }
func (c *etcdClient) Tnx(ctx context.Context, in *TnxRequest, opts ...grpc.CallOption) (*TnxResponse, error) { func (c *etcdClient) Txn(ctx context.Context, in *TxnRequest, opts ...grpc.CallOption) (*TxnResponse, error) {
out := new(TnxResponse) out := new(TxnResponse)
err := grpc.Invoke(ctx, "/etcdserverpb.etcd/Tnx", in, out, c.cc, opts...) err := grpc.Invoke(ctx, "/etcdserverpb.etcd/Txn", in, out, c.cc, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -2783,10 +2783,10 @@ type EtcdServer interface {
// A delete request increase the index of the store, // A delete request increase the index of the store,
// and generates one event in the event history. // and generates one event in the event history.
DeleteRange(context.Context, *DeleteRangeRequest) (*DeleteRangeResponse, error) DeleteRange(context.Context, *DeleteRangeRequest) (*DeleteRangeResponse, error)
// Tnx processes all the requests in one transaction. // Txn processes all the requests in one transaction.
// A tnx request increases the index of the store, // A txn request increases the index of the store,
// and generates events with the same index in the event history. // and generates events with the same index in the event history.
Tnx(context.Context, *TnxRequest) (*TnxResponse, error) Txn(context.Context, *TxnRequest) (*TxnResponse, error)
// Compact compacts the event history in etcd. User should compact the // Compact compacts the event history in etcd. User should compact the
// event history periodically, or it will grow infinitely. // event history periodically, or it will grow infinitely.
Compact(context.Context, *CompactionRequest) (*CompactionResponse, error) Compact(context.Context, *CompactionRequest) (*CompactionResponse, error)
@ -2832,12 +2832,12 @@ func _Etcd_DeleteRange_Handler(srv interface{}, ctx context.Context, codec grpc.
return out, nil return out, nil
} }
func _Etcd_Tnx_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { func _Etcd_Txn_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
in := new(TnxRequest) in := new(TxnRequest)
if err := codec.Unmarshal(buf, in); err != nil { if err := codec.Unmarshal(buf, in); err != nil {
return nil, err return nil, err
} }
out, err := srv.(EtcdServer).Tnx(ctx, in) out, err := srv.(EtcdServer).Txn(ctx, in)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -2873,8 +2873,8 @@ var _Etcd_serviceDesc = grpc.ServiceDesc{
Handler: _Etcd_DeleteRange_Handler, Handler: _Etcd_DeleteRange_Handler,
}, },
{ {
MethodName: "Tnx", MethodName: "Txn",
Handler: _Etcd_Tnx_Handler, Handler: _Etcd_Txn_Handler,
}, },
{ {
MethodName: "Compact", MethodName: "Compact",

View File

@ -19,10 +19,10 @@ service etcd {
// and generates one event in the event history. // and generates one event in the event history.
rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse) {} rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse) {}
// Tnx processes all the requests in one transaction. // Txn processes all the requests in one transaction.
// A tnx request increases the index of the store, // A txn request increases the index of the store,
// and generates events with the same index in the event history. // and generates events with the same index in the event history.
rpc Tnx(TnxRequest) returns (TnxResponse) {} rpc Txn(TxnRequest) returns (TxnResponse) {}
// Compact compacts the event history in etcd. User should compact the // Compact compacts the event history in etcd. User should compact the
// event history periodically, or it will grow infinitely. // event history periodically, or it will grow infinitely.
@ -136,13 +136,13 @@ message Compare {
// if guard evaluates to // if guard evaluates to
// true. // true.
// 3. A list of database operations called f op. Like t op, but executed if guard evaluates to false. // 3. A list of database operations called f op. Like t op, but executed if guard evaluates to false.
message TnxRequest { message TxnRequest {
repeated Compare compare = 1; repeated Compare compare = 1;
repeated RequestUnion success = 2; repeated RequestUnion success = 2;
repeated RequestUnion failure = 3; repeated RequestUnion failure = 3;
} }
message TnxResponse { message TxnResponse {
ResponseHeader header = 1; ResponseHeader header = 1;
bool succeeded = 2; bool succeeded = 2;
repeated ResponseUnion responses = 3; repeated ResponseUnion responses = 3;

View File

@ -50,7 +50,7 @@ func New(path string, d time.Duration, limit int) Backend {
return b return b
} }
// BatchTnx returns the current batch tx in coalescer. The tx can be used for read and // BatchTx returns the current batch tx in coalescer. The tx can be used for read and
// write operations. The write result can be retrieved within the same tx immediately. // write operations. The write result can be retrieved within the same tx immediately.
// The write result is isolated with other txs until the current one get committed. // The write result is isolated with other txs until the current one get committed.
func (b *backend) BatchTx() BatchTx { func (b *backend) BatchTx() BatchTx {

View File

@ -32,7 +32,7 @@ func (t *batchTx) UnsafeCreateBucket(name []byte) {
} }
} }
// before calling unsafePut, the caller MUST hold the lock on tnx. // before calling unsafePut, the caller MUST hold the lock on tx.
func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) { func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
bucket := t.tx.Bucket(bucketName) bucket := t.tx.Bucket(bucketName)
if bucket == nil { if bucket == nil {
@ -48,7 +48,7 @@ func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
} }
} }
// before calling unsafeRange, the caller MUST hold the lock on tnx. // before calling unsafeRange, the caller MUST hold the lock on tx.
func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) { func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
bucket := t.tx.Bucket(bucketName) bucket := t.tx.Bucket(bucketName)
if bucket == nil { if bucket == nil {
@ -72,7 +72,7 @@ func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64
return keys, vs return keys, vs
} }
// before calling unsafeDelete, the caller MUST hold the lock on tnx. // before calling unsafeDelete, the caller MUST hold the lock on tx.
func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) { func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
bucket := t.tx.Bucket(bucketName) bucket := t.tx.Bucket(bucketName)
if bucket == nil { if bucket == nil {

View File

@ -27,16 +27,16 @@ type KV interface {
// if the `end` is not nil, deleteRange deletes the keys in range [key, range_end). // if the `end` is not nil, deleteRange deletes the keys in range [key, range_end).
DeleteRange(key, end []byte) (n, rev int64) DeleteRange(key, end []byte) (n, rev int64)
// TnxBegin begins a tnx. Only Tnx prefixed operation can be executed, others will be blocked // TxnBegin begins a txn. Only Txn prefixed operation can be executed, others will be blocked
// until tnx ends. Only one on-going tnx is allowed. // until txn ends. Only one on-going txn is allowed.
// TnxBegin returns an int64 tnx ID. // TxnBegin returns an int64 txn ID.
// All tnx prefixed operations with same tnx ID will be done with the same rev. // All txn prefixed operations with same txn ID will be done with the same rev.
TnxBegin() int64 TxnBegin() int64
// TnxEnd ends the on-going tnx with tnx ID. If the on-going tnx ID is not matched, error is returned. // TxnEnd ends the on-going txn with txn ID. If the on-going txn ID is not matched, error is returned.
TnxEnd(tnxID int64) error TxnEnd(txnID int64) error
TnxRange(tnxID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error)
TnxPut(tnxID int64, key, value []byte) (rev int64, err error) TxnPut(txnID int64, key, value []byte) (rev int64, err error)
TnxDeleteRange(tnxID int64, key, end []byte) (n, rev int64, err error) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
Compact(rev int64) error Compact(rev int64) error

View File

@ -22,7 +22,7 @@ var (
scheduledCompactKeyName = []byte("scheduledCompactRev") scheduledCompactKeyName = []byte("scheduledCompactRev")
finishedCompactKeyName = []byte("finishedCompactRev") finishedCompactKeyName = []byte("finishedCompactRev")
ErrTnxIDMismatch = errors.New("storage: tnx id mismatch") ErrTxnIDMismatch = errors.New("storage: txn id mismatch")
ErrCompacted = errors.New("storage: required reversion has been compacted") ErrCompacted = errors.New("storage: required reversion has been compacted")
ErrFutureRev = errors.New("storage: required reversion is a future reversion") ErrFutureRev = errors.New("storage: required reversion is a future reversion")
) )
@ -37,8 +37,8 @@ type store struct {
// the main reversion of the last compaction // the main reversion of the last compaction
compactMainRev int64 compactMainRev int64
tmu sync.Mutex // protect the tnxID field tmu sync.Mutex // protect the txnID field
tnxID int64 // tracks the current tnxID to verify tnx operations txnID int64 // tracks the current txnID to verify txn operations
wg sync.WaitGroup wg sync.WaitGroup
stopc chan struct{} stopc chan struct{}
@ -68,44 +68,44 @@ func newStore(path string) *store {
} }
func (s *store) Put(key, value []byte) int64 { func (s *store) Put(key, value []byte) int64 {
id := s.TnxBegin() id := s.TxnBegin()
s.put(key, value, s.currentRev.main+1) s.put(key, value, s.currentRev.main+1)
s.TnxEnd(id) s.TxnEnd(id)
return int64(s.currentRev.main) return int64(s.currentRev.main)
} }
func (s *store) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) { func (s *store) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
id := s.TnxBegin() id := s.TxnBegin()
kvs, rev, err = s.rangeKeys(key, end, limit, rangeRev) kvs, rev, err = s.rangeKeys(key, end, limit, rangeRev)
s.TnxEnd(id) s.TxnEnd(id)
return kvs, rev, err return kvs, rev, err
} }
func (s *store) DeleteRange(key, end []byte) (n, rev int64) { func (s *store) DeleteRange(key, end []byte) (n, rev int64) {
id := s.TnxBegin() id := s.TxnBegin()
n = s.deleteRange(key, end, s.currentRev.main+1) n = s.deleteRange(key, end, s.currentRev.main+1)
s.TnxEnd(id) s.TxnEnd(id)
return n, int64(s.currentRev.main) return n, int64(s.currentRev.main)
} }
func (s *store) TnxBegin() int64 { func (s *store) TxnBegin() int64 {
s.mu.Lock() s.mu.Lock()
s.currentRev.sub = 0 s.currentRev.sub = 0
s.tmu.Lock() s.tmu.Lock()
defer s.tmu.Unlock() defer s.tmu.Unlock()
s.tnxID = rand.Int63() s.txnID = rand.Int63()
return s.tnxID return s.txnID
} }
func (s *store) TnxEnd(tnxID int64) error { func (s *store) TxnEnd(txnID int64) error {
s.tmu.Lock() s.tmu.Lock()
defer s.tmu.Unlock() defer s.tmu.Unlock()
if tnxID != s.tnxID { if txnID != s.txnID {
return ErrTnxIDMismatch return ErrTxnIDMismatch
} }
if s.currentRev.sub != 0 { if s.currentRev.sub != 0 {
@ -116,31 +116,31 @@ func (s *store) TnxEnd(tnxID int64) error {
return nil return nil
} }
func (s *store) TnxRange(tnxID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) { func (s *store) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
s.tmu.Lock() s.tmu.Lock()
defer s.tmu.Unlock() defer s.tmu.Unlock()
if tnxID != s.tnxID { if txnID != s.txnID {
return nil, 0, ErrTnxIDMismatch return nil, 0, ErrTxnIDMismatch
} }
return s.rangeKeys(key, end, limit, rangeRev) return s.rangeKeys(key, end, limit, rangeRev)
} }
func (s *store) TnxPut(tnxID int64, key, value []byte) (rev int64, err error) { func (s *store) TxnPut(txnID int64, key, value []byte) (rev int64, err error) {
s.tmu.Lock() s.tmu.Lock()
defer s.tmu.Unlock() defer s.tmu.Unlock()
if tnxID != s.tnxID { if txnID != s.txnID {
return 0, ErrTnxIDMismatch return 0, ErrTxnIDMismatch
} }
s.put(key, value, s.currentRev.main+1) s.put(key, value, s.currentRev.main+1)
return int64(s.currentRev.main + 1), nil return int64(s.currentRev.main + 1), nil
} }
func (s *store) TnxDeleteRange(tnxID int64, key, end []byte) (n, rev int64, err error) { func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
s.tmu.Lock() s.tmu.Lock()
defer s.tmu.Unlock() defer s.tmu.Unlock()
if tnxID != s.tnxID { if txnID != s.txnID {
return 0, 0, ErrTnxIDMismatch return 0, 0, ErrTxnIDMismatch
} }
n = s.deleteRange(key, end, s.currentRev.main+1) n = s.deleteRange(key, end, s.currentRev.main+1)

View File

@ -238,18 +238,18 @@ func TestRangeInSequence(t *testing.T) {
} }
} }
func TestOneTnx(t *testing.T) { func TestOneTxn(t *testing.T) {
s := newStore("test") s := newStore("test")
defer os.Remove("test") defer os.Remove("test")
id := s.TnxBegin() id := s.TxnBegin()
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
s.TnxPut(id, []byte("foo"), []byte("bar")) s.TxnPut(id, []byte("foo"), []byte("bar"))
s.TnxPut(id, []byte("foo1"), []byte("bar1")) s.TxnPut(id, []byte("foo1"), []byte("bar1"))
s.TnxPut(id, []byte("foo2"), []byte("bar2")) s.TxnPut(id, []byte("foo2"), []byte("bar2"))
// remove foo // remove foo
n, rev, err := s.TnxDeleteRange(id, []byte("foo"), nil) n, rev, err := s.TxnDeleteRange(id, []byte("foo"), nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -257,7 +257,7 @@ func TestOneTnx(t *testing.T) {
t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 1, 1) t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 1, 1)
} }
kvs, rev, err := s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0) kvs, rev, err := s.TxnRange(id, []byte("foo"), []byte("foo3"), 0, 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -266,7 +266,7 @@ func TestOneTnx(t *testing.T) {
} }
// remove again -> expect nothing // remove again -> expect nothing
n, rev, err = s.TnxDeleteRange(id, []byte("foo"), nil) n, rev, err = s.TxnDeleteRange(id, []byte("foo"), nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -275,7 +275,7 @@ func TestOneTnx(t *testing.T) {
} }
// remove foo1 // remove foo1
n, rev, err = s.TnxDeleteRange(id, []byte("foo"), []byte("foo2")) n, rev, err = s.TxnDeleteRange(id, []byte("foo"), []byte("foo2"))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -284,7 +284,7 @@ func TestOneTnx(t *testing.T) {
} }
// after removal foo1 // after removal foo1
kvs, rev, err = s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0) kvs, rev, err = s.TxnRange(id, []byte("foo"), []byte("foo3"), 0, 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -293,7 +293,7 @@ func TestOneTnx(t *testing.T) {
} }
// remove foo2 // remove foo2
n, rev, err = s.TnxDeleteRange(id, []byte("foo2"), []byte("foo3")) n, rev, err = s.TxnDeleteRange(id, []byte("foo2"), []byte("foo3"))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -302,7 +302,7 @@ func TestOneTnx(t *testing.T) {
} }
// after removal foo2 // after removal foo2
kvs, rev, err = s.TnxRange(id, []byte("foo"), []byte("foo3"), 0, 0) kvs, rev, err = s.TxnRange(id, []byte("foo"), []byte("foo3"), 0, 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -310,12 +310,12 @@ func TestOneTnx(t *testing.T) {
t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0) t.Fatalf("len(kvs) = %d, want %d", len(kvs), 0)
} }
} }
err := s.TnxEnd(id) err := s.TxnEnd(id)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// After tnx // After txn
kvs, rev, err := s.Range([]byte("foo"), []byte("foo3"), 0, 1) kvs, rev, err := s.Range([]byte("foo"), []byte("foo3"), 0, 1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)