Merge pull request #3299 from xiang90/txn

initial support for txn
This commit is contained in:
Xiang Li
2015-08-14 16:05:16 -07:00
6 changed files with 464 additions and 79 deletions

View File

@@ -0,0 +1,221 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package command
import (
"bufio"
"fmt"
"os"
"strconv"
"strings"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
// NewTxnCommand returns the CLI command for "txn".
func NewTxnCommand() cli.Command {
return cli.Command{
Name: "txn",
Action: func(c *cli.Context) {
txnCommandFunc(c)
},
}
}
// txnCommandFunc executes the "txn" command.
func txnCommandFunc(c *cli.Context) {
if len(c.Args()) != 0 {
panic("unexpected args")
}
reader := bufio.NewReader(os.Stdin)
next := compareState
txn := &pb.TxnRequest{}
for next != nil {
next = next(txn, reader)
}
conn, err := grpc.Dial("127.0.0.1:12379")
if err != nil {
panic(err)
}
etcd := pb.NewEtcdClient(conn)
resp, err := etcd.Txn(context.Background(), txn)
if err != nil {
fmt.Println(err)
}
if resp.Succeeded {
fmt.Println("executed success request list")
} else {
fmt.Println("executed failure request list")
}
}
type stateFunc func(txn *pb.TxnRequest, r *bufio.Reader) stateFunc
func compareState(txn *pb.TxnRequest, r *bufio.Reader) stateFunc {
fmt.Println("entry comparison[key target expected_result compare_value] (end with empty line):")
line, err := r.ReadString('\n')
if err != nil {
os.Exit(1)
}
if len(line) == 1 {
return successState
}
// remove trialling \n
line = line[:len(line)-1]
c, err := parseCompare(line)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
txn.Compare = append(txn.Compare, c)
return compareState
}
func successState(txn *pb.TxnRequest, r *bufio.Reader) stateFunc {
fmt.Println("entry success request[method key value(end_range)] (end with empty line):")
line, err := r.ReadString('\n')
if err != nil {
os.Exit(1)
}
if len(line) == 1 {
return failureState
}
// remove trialling \n
line = line[:len(line)-1]
ru, err := parseRequestUnion(line)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
txn.Success = append(txn.Success, ru)
return successState
}
func failureState(txn *pb.TxnRequest, r *bufio.Reader) stateFunc {
fmt.Println("entry failure request[method key value(end_range)] (end with empty line):")
line, err := r.ReadString('\n')
if err != nil {
os.Exit(1)
}
if len(line) == 1 {
return nil
}
// remove trialling \n
line = line[:len(line)-1]
ru, err := parseRequestUnion(line)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
txn.Failure = append(txn.Failure, ru)
return failureState
}
func parseRequestUnion(line string) (*pb.RequestUnion, error) {
parts := strings.Split(line, " ")
if len(parts) < 2 {
return nil, fmt.Errorf("invalid txn compare request: %s", line)
}
ru := &pb.RequestUnion{}
key := []byte(parts[1])
switch parts[0] {
case "r", "range":
ru.RequestRange = &pb.RangeRequest{Key: key}
if len(parts) == 3 {
ru.RequestRange.RangeEnd = []byte(parts[2])
}
case "p", "put":
ru.RequestPut = &pb.PutRequest{Key: key, Value: []byte(parts[2])}
case "d", "deleteRange":
ru.RequestDeleteRange = &pb.DeleteRangeRequest{Key: key}
if len(parts) == 3 {
ru.RequestRange.RangeEnd = []byte(parts[2])
}
default:
return nil, fmt.Errorf("invalid txn request: %s", line)
}
return ru, nil
}
func parseCompare(line string) (*pb.Compare, error) {
parts := strings.Split(line, " ")
if len(parts) != 4 {
return nil, fmt.Errorf("invalid txn compare request: %s", line)
}
var err error
c := &pb.Compare{}
c.Key = []byte(parts[0])
switch parts[1] {
case "ver", "version":
c.Target = pb.Compare_VERSION
c.Version, err = strconv.ParseInt(parts[3], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid txn compare request: %s", line)
}
case "c", "create":
c.Target = pb.Compare_CREATE
c.CreateIndex, err = strconv.ParseInt(parts[3], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid txn compare request: %s", line)
}
case "m", "mod":
c.Target = pb.Compare_MOD
c.ModIndex, err = strconv.ParseInt(parts[3], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid txn compare request: %s", line)
}
case "val", "value":
c.Target = pb.Compare_VALUE
c.Value = []byte(parts[3])
default:
return nil, fmt.Errorf("invalid txn compare request: %s", line)
}
switch parts[2] {
case "g", "greater":
c.Result = pb.Compare_GREATER
case "e", "equal":
c.Result = pb.Compare_EQUAL
case "l", "less":
c.Result = pb.Compare_LESS
default:
return nil, fmt.Errorf("invalid txn compare request: %s", line)
}
return c, nil
}

View File

@@ -31,6 +31,7 @@ func main() {
command.NewRangeCommand(),
command.NewPutCommand(),
command.NewDeleteRangeCommand(),
command.NewTxnCommand(),
}
app.Run(os.Args)

View File

@@ -44,8 +44,8 @@ func (h *handler) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*p
}
func (h *handler) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
panic("not implemented")
return nil, nil
resp := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Txn: r})
return resp.(*pb.TxnResponse), nil
}
func (h *handler) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {

View File

@@ -24,27 +24,53 @@ var _ grpc.ClientConn
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
type Compare_CompareType int32
type Compare_CompareResult int32
const (
Compare_EQUAL Compare_CompareType = 0
Compare_GREATER Compare_CompareType = 1
Compare_LESS Compare_CompareType = 2
Compare_EQUAL Compare_CompareResult = 0
Compare_GREATER Compare_CompareResult = 1
Compare_LESS Compare_CompareResult = 2
)
var Compare_CompareType_name = map[int32]string{
var Compare_CompareResult_name = map[int32]string{
0: "EQUAL",
1: "GREATER",
2: "LESS",
}
var Compare_CompareType_value = map[string]int32{
var Compare_CompareResult_value = map[string]int32{
"EQUAL": 0,
"GREATER": 1,
"LESS": 2,
}
func (x Compare_CompareType) String() string {
return proto.EnumName(Compare_CompareType_name, int32(x))
func (x Compare_CompareResult) String() string {
return proto.EnumName(Compare_CompareResult_name, int32(x))
}
type Compare_CompareTarget int32
const (
Compare_VERSION Compare_CompareTarget = 0
Compare_CREATE Compare_CompareTarget = 1
Compare_MOD Compare_CompareTarget = 2
Compare_VALUE Compare_CompareTarget = 3
)
var Compare_CompareTarget_name = map[int32]string{
0: "VERSION",
1: "CREATE",
2: "MOD",
3: "VALUE",
}
var Compare_CompareTarget_value = map[string]int32{
"VERSION": 0,
"CREATE": 1,
"MOD": 2,
"VALUE": 3,
}
func (x Compare_CompareTarget) String() string {
return proto.EnumName(Compare_CompareTarget_name, int32(x))
}
type ResponseHeader struct {
@@ -184,7 +210,7 @@ func (m *RequestUnion) GetRequestDeleteRange() *DeleteRangeRequest {
}
type ResponseUnion struct {
ReponseRange *RangeResponse `protobuf:"bytes,1,opt,name=reponse_range" json:"reponse_range,omitempty"`
ResponseRange *RangeResponse `protobuf:"bytes,1,opt,name=response_range" json:"response_range,omitempty"`
ResponsePut *PutResponse `protobuf:"bytes,2,opt,name=response_put" json:"response_put,omitempty"`
ResponseDeleteRange *DeleteRangeResponse `protobuf:"bytes,3,opt,name=response_delete_range" json:"response_delete_range,omitempty"`
}
@@ -193,9 +219,9 @@ func (m *ResponseUnion) Reset() { *m = ResponseUnion{} }
func (m *ResponseUnion) String() string { return proto.CompactTextString(m) }
func (*ResponseUnion) ProtoMessage() {}
func (m *ResponseUnion) GetReponseRange() *RangeResponse {
func (m *ResponseUnion) GetResponseRange() *RangeResponse {
if m != nil {
return m.ReponseRange
return m.ResponseRange
}
return nil
}
@@ -215,17 +241,18 @@ func (m *ResponseUnion) GetResponseDeleteRange() *DeleteRangeResponse {
}
type Compare struct {
Type Compare_CompareType `protobuf:"varint,1,opt,name=type,proto3,enum=etcdserverpb.Compare_CompareType" json:"type,omitempty"`
Result Compare_CompareResult `protobuf:"varint,1,opt,name=result,proto3,enum=etcdserverpb.Compare_CompareResult" json:"result,omitempty"`
Target Compare_CompareTarget `protobuf:"varint,2,opt,name=target,proto3,enum=etcdserverpb.Compare_CompareTarget" json:"target,omitempty"`
// key path
Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"`
Key []byte `protobuf:"bytes,3,opt,name=key,proto3" json:"key,omitempty"`
// version of the given key
Version int64 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"`
Version int64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"`
// create index of the given key
CreateIndex int64 `protobuf:"varint,4,opt,name=create_index,proto3" json:"create_index,omitempty"`
CreateIndex int64 `protobuf:"varint,5,opt,name=create_index,proto3" json:"create_index,omitempty"`
// last modified index of the given key
ModIndex int64 `protobuf:"varint,5,opt,name=mod_index,proto3" json:"mod_index,omitempty"`
ModIndex int64 `protobuf:"varint,6,opt,name=mod_index,proto3" json:"mod_index,omitempty"`
// value of the given key
Value []byte `protobuf:"bytes,6,opt,name=value,proto3" json:"value,omitempty"`
Value []byte `protobuf:"bytes,7,opt,name=value,proto3" json:"value,omitempty"`
}
func (m *Compare) Reset() { *m = Compare{} }
@@ -326,7 +353,8 @@ func (m *CompactionResponse) GetHeader() *ResponseHeader {
}
func init() {
proto.RegisterEnum("etcdserverpb.Compare_CompareType", Compare_CompareType_name, Compare_CompareType_value)
proto.RegisterEnum("etcdserverpb.Compare_CompareResult", Compare_CompareResult_name, Compare_CompareResult_value)
proto.RegisterEnum("etcdserverpb.Compare_CompareTarget", Compare_CompareTarget_name, Compare_CompareTarget_value)
}
func (m *ResponseHeader) Unmarshal(data []byte) error {
l := len(data)
@@ -1145,7 +1173,7 @@ func (m *ResponseUnion) Unmarshal(data []byte) error {
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ReponseRange", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field ResponseRange", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
@@ -1163,10 +1191,10 @@ func (m *ResponseUnion) Unmarshal(data []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.ReponseRange == nil {
m.ReponseRange = &RangeResponse{}
if m.ResponseRange == nil {
m.ResponseRange = &RangeResponse{}
}
if err := m.ReponseRange.Unmarshal(data[iNdEx:postIndex]); err != nil {
if err := m.ResponseRange.Unmarshal(data[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
@@ -1268,7 +1296,7 @@ func (m *Compare) Unmarshal(data []byte) error {
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field Result", wireType)
}
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
@@ -1276,12 +1304,27 @@ func (m *Compare) Unmarshal(data []byte) error {
}
b := data[iNdEx]
iNdEx++
m.Type |= (Compare_CompareType(b) & 0x7F) << shift
m.Result |= (Compare_CompareResult(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Target", wireType)
}
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.Target |= (Compare_CompareTarget(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType)
}
@@ -1303,7 +1346,7 @@ func (m *Compare) Unmarshal(data []byte) error {
}
m.Key = append([]byte{}, data[iNdEx:postIndex]...)
iNdEx = postIndex
case 3:
case 4:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Version", wireType)
}
@@ -1318,7 +1361,7 @@ func (m *Compare) Unmarshal(data []byte) error {
break
}
}
case 4:
case 5:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field CreateIndex", wireType)
}
@@ -1333,7 +1376,7 @@ func (m *Compare) Unmarshal(data []byte) error {
break
}
}
case 5:
case 6:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field ModIndex", wireType)
}
@@ -1348,7 +1391,7 @@ func (m *Compare) Unmarshal(data []byte) error {
break
}
}
case 6:
case 7:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType)
}
@@ -1979,8 +2022,8 @@ func (m *RequestUnion) Size() (n int) {
func (m *ResponseUnion) Size() (n int) {
var l int
_ = l
if m.ReponseRange != nil {
l = m.ReponseRange.Size()
if m.ResponseRange != nil {
l = m.ResponseRange.Size()
n += 1 + l + sovRpc(uint64(l))
}
if m.ResponsePut != nil {
@@ -1997,8 +2040,11 @@ func (m *ResponseUnion) Size() (n int) {
func (m *Compare) Size() (n int) {
var l int
_ = l
if m.Type != 0 {
n += 1 + sovRpc(uint64(m.Type))
if m.Result != 0 {
n += 1 + sovRpc(uint64(m.Result))
}
if m.Target != 0 {
n += 1 + sovRpc(uint64(m.Target))
}
if m.Key != nil {
l = len(m.Key)
@@ -2425,11 +2471,11 @@ func (m *ResponseUnion) MarshalTo(data []byte) (n int, err error) {
_ = i
var l int
_ = l
if m.ReponseRange != nil {
if m.ResponseRange != nil {
data[i] = 0xa
i++
i = encodeVarintRpc(data, i, uint64(m.ReponseRange.Size()))
n7, err := m.ReponseRange.MarshalTo(data[i:])
i = encodeVarintRpc(data, i, uint64(m.ResponseRange.Size()))
n7, err := m.ResponseRange.MarshalTo(data[i:])
if err != nil {
return 0, err
}
@@ -2473,37 +2519,42 @@ func (m *Compare) MarshalTo(data []byte) (n int, err error) {
_ = i
var l int
_ = l
if m.Type != 0 {
if m.Result != 0 {
data[i] = 0x8
i++
i = encodeVarintRpc(data, i, uint64(m.Type))
i = encodeVarintRpc(data, i, uint64(m.Result))
}
if m.Target != 0 {
data[i] = 0x10
i++
i = encodeVarintRpc(data, i, uint64(m.Target))
}
if m.Key != nil {
if len(m.Key) > 0 {
data[i] = 0x12
data[i] = 0x1a
i++
i = encodeVarintRpc(data, i, uint64(len(m.Key)))
i += copy(data[i:], m.Key)
}
}
if m.Version != 0 {
data[i] = 0x18
data[i] = 0x20
i++
i = encodeVarintRpc(data, i, uint64(m.Version))
}
if m.CreateIndex != 0 {
data[i] = 0x20
data[i] = 0x28
i++
i = encodeVarintRpc(data, i, uint64(m.CreateIndex))
}
if m.ModIndex != 0 {
data[i] = 0x28
data[i] = 0x30
i++
i = encodeVarintRpc(data, i, uint64(m.ModIndex))
}
if m.Value != nil {
if len(m.Value) > 0 {
data[i] = 0x32
data[i] = 0x3a
i++
i = encodeVarintRpc(data, i, uint64(len(m.Value)))
i += copy(data[i:], m.Value)

View File

@@ -4,6 +4,9 @@ package etcdserverpb;
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
import "github.com/coreos/etcd/storage/storagepb/kv.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.unmarshaler_all) = true;
// Interface exported by the server.
service etcd {
// Range gets the keys in the range from the store.
@@ -88,30 +91,37 @@ message RequestUnion {
message ResponseUnion {
oneof response {
RangeResponse reponse_range = 1;
RangeResponse response_range = 1;
PutResponse response_put = 2;
DeleteRangeResponse response_delete_range = 3;
}
}
message Compare {
enum CompareType {
enum CompareResult {
EQUAL = 0;
GREATER = 1;
LESS = 2;
}
CompareType type = 1;
enum CompareTarget {
VERSION = 0;
CREATE = 1;
MOD = 2;
VALUE= 3;
}
CompareResult result = 1;
CompareTarget target = 2;
// key path
bytes key = 2;
oneof target {
bytes key = 3;
oneof target_union {
// version of the given key
int64 version = 3;
int64 version = 4;
// create index of the given key
int64 create_index = 4;
int64 create_index = 5;
// last modified index of the given key
int64 mod_index = 5;
int64 mod_index = 6;
// value of the given key
bytes value = 6;
bytes value = 7;
}
}

View File

@@ -15,9 +15,12 @@
package etcdserver
import (
"bytes"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
dstorage "github.com/coreos/etcd/storage"
)
type V3DemoServer interface {
@@ -27,36 +30,135 @@ type V3DemoServer interface {
func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) proto.Message {
switch {
case r.Range != nil:
rr := r.Range
resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}
kvs, rev, err := s.kv.Range(rr.Key, rr.RangeEnd, rr.Limit, 0)
if err != nil {
panic("not handled error")
return doRange(s.kv, r.Range)
case r.Put != nil:
return doPut(s.kv, r.Put)
case r.DeleteRange != nil:
return doDeleteRange(s.kv, r.DeleteRange)
case r.Txn != nil:
var index int64
rt := r.Txn
ok := true
for _, c := range rt.Compare {
kvs, rev, err := s.kv.Range(c.Key, nil, 1, 0)
if err != nil {
ok = false
break
}
index = rev
kv := kvs[0]
// -1 is less, 0 is equal, 1 is greater
var result int
switch c.Target {
case pb.Compare_VALUE:
result = bytes.Compare(kv.Value, c.Value)
case pb.Compare_CREATE:
result = compareInt64(kv.CreateIndex, c.CreateIndex)
case pb.Compare_MOD:
result = compareInt64(kv.ModIndex, c.ModIndex)
case pb.Compare_VERSION:
result = compareInt64(kv.Version, c.Version)
}
switch c.Result {
case pb.Compare_EQUAL:
if result != 0 {
ok = false
}
case pb.Compare_GREATER:
if result != 1 {
ok = false
}
case pb.Compare_LESS:
if result != -1 {
ok = false
}
}
if !ok {
break
}
}
resp.Header.Index = rev
for i := range kvs {
resp.Kvs = append(resp.Kvs, &kvs[i])
var reqs []*pb.RequestUnion
if ok {
reqs = rt.Success
} else {
reqs = rt.Failure
}
return resp
case r.Put != nil:
rp := r.Put
resp := &pb.PutResponse{}
resp.Header = &pb.ResponseHeader{}
rev := s.kv.Put(rp.Key, rp.Value)
resp.Header.Index = rev
return resp
case r.DeleteRange != nil:
rd := r.DeleteRange
resp := &pb.DeleteRangeResponse{}
resp.Header = &pb.ResponseHeader{}
_, rev := s.kv.DeleteRange(rd.Key, rd.RangeEnd)
resp.Header.Index = rev
return resp
case r.Txn != nil:
panic("not implemented")
resps := make([]*pb.ResponseUnion, len(reqs))
for i := range reqs {
resps[i] = doUnion(s.kv, reqs[i])
}
if len(resps) != 0 {
index += 1
}
txnResp := &pb.TxnResponse{}
txnResp.Header = &pb.ResponseHeader{}
txnResp.Header.Index = index
txnResp.Responses = resps
txnResp.Succeeded = ok
return txnResp
default:
panic("not implemented")
}
}
func compareInt64(a, b int64) int {
switch {
case a < b:
return -1
case a > b:
return 1
default:
return 0
}
}
func doPut(kv dstorage.KV, p *pb.PutRequest) *pb.PutResponse {
resp := &pb.PutResponse{}
resp.Header = &pb.ResponseHeader{}
rev := kv.Put(p.Key, p.Value)
resp.Header.Index = rev
return resp
}
func doRange(kv dstorage.KV, r *pb.RangeRequest) *pb.RangeResponse {
resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}
kvs, rev, err := kv.Range(r.Key, r.RangeEnd, r.Limit, 0)
if err != nil {
panic("not handled error")
}
resp.Header.Index = rev
for i := range kvs {
resp.Kvs = append(resp.Kvs, &kvs[i])
}
return resp
}
func doDeleteRange(kv dstorage.KV, dr *pb.DeleteRangeRequest) *pb.DeleteRangeResponse {
resp := &pb.DeleteRangeResponse{}
resp.Header = &pb.ResponseHeader{}
_, rev := kv.DeleteRange(dr.Key, dr.RangeEnd)
resp.Header.Index = rev
return resp
}
func doUnion(kv dstorage.KV, union *pb.RequestUnion) *pb.ResponseUnion {
switch {
case union.RequestRange != nil:
return &pb.ResponseUnion{ResponseRange: doRange(kv, union.RequestRange)}
case union.RequestPut != nil:
return &pb.ResponseUnion{ResponsePut: doPut(kv, union.RequestPut)}
case union.RequestDeleteRange != nil:
return &pb.ResponseUnion{ResponseDeleteRange: doDeleteRange(kv, union.RequestDeleteRange)}
default:
// empty union
return nil
}
}