diff --git a/Documentation/configuration.md b/Documentation/configuration.md index d785079eb..d340028ee 100644 --- a/Documentation/configuration.md +++ b/Documentation/configuration.md @@ -193,6 +193,12 @@ Follow the instructions when using these flags. + Force to create a new one-member cluster. It commits configuration changes in force to remove all existing members in the cluster and add itself. It needs to be set to [restore a backup][restore]. + default: false +### Experimental Flags + +##### -experimental-v3demo ++ Enable experimental [v3 demo API](rfc/v3api.proto). ++ default: false + ### Miscellaneous Flags ##### -version diff --git a/etcdctlv3/command/delete_range_command.go b/etcdctlv3/command/delete_range_command.go new file mode 100644 index 000000000..a3aee77b3 --- /dev/null +++ b/etcdctlv3/command/delete_range_command.go @@ -0,0 +1,61 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "fmt" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +// NewDeleteRangeCommand returns the CLI command for "deleteRange". +func NewDeleteRangeCommand() cli.Command { + return cli.Command{ + Name: "delete-range", + Action: func(c *cli.Context) { + deleteRangeCommandFunc(c) + }, + } +} + +// deleteRangeCommandFunc executes the "delegeRange" command. +func deleteRangeCommandFunc(c *cli.Context) { + if len(c.Args()) == 0 { + panic("bad arg") + } + + var rangeEnd []byte + key := []byte(c.Args()[0]) + if len(c.Args()) > 1 { + rangeEnd = []byte(c.Args()[1]) + } + conn, err := grpc.Dial("127.0.0.1:12379") + if err != nil { + panic(err) + } + etcd := pb.NewEtcdClient(conn) + req := &pb.DeleteRangeRequest{Key: key, RangeEnd: rangeEnd} + + etcd.DeleteRange(context.Background(), req) + + if rangeEnd != nil { + fmt.Printf("range [%s, %s) is deleted\n", string(key), string(rangeEnd)) + } else { + fmt.Printf("key %s is deleted\n", string(key)) + } +} diff --git a/etcdctlv3/command/put_command.go b/etcdctlv3/command/put_command.go new file mode 100644 index 000000000..515e65e66 --- /dev/null +++ b/etcdctlv3/command/put_command.go @@ -0,0 +1,53 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "fmt" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +// NewPutCommand returns the CLI command for "put". +func NewPutCommand() cli.Command { + return cli.Command{ + Name: "put", + Action: func(c *cli.Context) { + putCommandFunc(c) + }, + } +} + +// putCommandFunc executes the "put" command. +func putCommandFunc(c *cli.Context) { + if len(c.Args()) != 2 { + panic("bad arg") + } + + key := []byte(c.Args()[0]) + value := []byte(c.Args()[1]) + conn, err := grpc.Dial("127.0.0.1:12379") + if err != nil { + panic(err) + } + etcd := pb.NewEtcdClient(conn) + req := &pb.PutRequest{Key: key, Value: value} + + etcd.Put(context.Background(), req) + fmt.Printf("%s %s\n", key, value) +} diff --git a/etcdctlv3/command/range_command.go b/etcdctlv3/command/range_command.go new file mode 100644 index 000000000..0b77dd698 --- /dev/null +++ b/etcdctlv3/command/range_command.go @@ -0,0 +1,58 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "fmt" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +// NewRangeCommand returns the CLI command for "range". +func NewRangeCommand() cli.Command { + return cli.Command{ + Name: "range", + Action: func(c *cli.Context) { + rangeCommandFunc(c) + }, + } +} + +// rangeCommandFunc executes the "range" command. +func rangeCommandFunc(c *cli.Context) { + if len(c.Args()) == 0 { + panic("bad arg") + } + + var rangeEnd []byte + key := []byte(c.Args()[0]) + if len(c.Args()) > 1 { + rangeEnd = []byte(c.Args()[1]) + } + conn, err := grpc.Dial("127.0.0.1:12379") + if err != nil { + panic(err) + } + etcd := pb.NewEtcdClient(conn) + req := &pb.RangeRequest{Key: key, RangeEnd: rangeEnd} + + resp, err := etcd.Range(context.Background(), req) + for _, kv := range resp.Kvs { + fmt.Printf("%s %s\n", string(kv.Key), string(kv.Value)) + } +} diff --git a/etcdctlv3/main.go b/etcdctlv3/main.go new file mode 100644 index 000000000..bf7a5d07b --- /dev/null +++ b/etcdctlv3/main.go @@ -0,0 +1,37 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "os" + + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli" + "github.com/coreos/etcd/etcdctlv3/command" + "github.com/coreos/etcd/version" +) + +func main() { + app := cli.NewApp() + app.Name = "etcdctlv3" + app.Version = version.Version + app.Usage = "A simple command line client for etcd3." + app.Commands = []cli.Command{ + command.NewRangeCommand(), + command.NewPutCommand(), + command.NewDeleteRangeCommand(), + } + + app.Run(os.Args) +} diff --git a/etcdmain/config.go b/etcdmain/config.go index 117ba4a9b..55ebdb163 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -115,6 +115,8 @@ type config struct { printVersion bool + v3demo bool + ignored []string } @@ -208,6 +210,9 @@ func NewConfig() *config { // version fs.BoolVar(&cfg.printVersion, "version", false, "Print the version and exit") + // demo flag + fs.BoolVar(&cfg.v3demo, "experimental-v3demo", false, "Enable experimental v3 demo API") + // backwards-compatibility with v0.4.6 fs.Var(&flags.IPAddressPort{}, "addr", "DEPRECATED: Use -advertise-client-urls instead.") fs.Var(&flags.IPAddressPort{}, "bind-addr", "DEPRECATED: Use -listen-client-urls instead.") diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 42facf8fc..810e30835 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -31,9 +31,12 @@ import ( systemdutil "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-systemd/util" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" "github.com/coreos/etcd/discovery" "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api/v3rpc" "github.com/coreos/etcd/etcdserver/etcdhttp" + "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/cors" "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/osutil" @@ -232,6 +235,15 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { clns = append(clns, l) } + var v3l net.Listener + if cfg.v3demo { + v3l, err = net.Listen("tcp", "127.0.0.1:12379") + if err != nil { + plog.Fatal(err) + } + plog.Infof("listening for client rpc on 127.0.0.1:12379") + } + srvcfg := &etcdserver.ServerConfig{ Name: cfg.name, ClientURLs: cfg.acurls, @@ -249,6 +261,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { Transport: pt, TickMs: cfg.TickMs, ElectionTicks: cfg.electionTicks(), + V3demo: cfg.v3demo, } var s *etcdserver.EtcdServer s, err = etcdserver.NewServer(srvcfg) @@ -280,6 +293,14 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { plog.Fatal(serveHTTP(l, ch, 0)) }(l) } + + if cfg.v3demo { + // set up v3 demo rpc + grpcServer := grpc.NewServer() + etcdserverpb.RegisterEtcdServer(grpcServer, v3rpc.New(s)) + go plog.Fatal(grpcServer.Serve(v3l)) + } + return s.StopNotify(), nil } diff --git a/etcdmain/help.go b/etcdmain/help.go index 404160509..987fe22fd 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -121,5 +121,11 @@ given by the consensus protocol. --force-new-cluster 'false' force to create a new one-member cluster. + + +experimental flags: + + --experimental-v3demo 'false' + enable experimental v3 demo API ` ) diff --git a/etcdserver/api/v3rpc/key.go b/etcdserver/api/v3rpc/key.go new file mode 100644 index 000000000..b2af39908 --- /dev/null +++ b/etcdserver/api/v3rpc/key.go @@ -0,0 +1,54 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v3rpc + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/etcdserver" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +type handler struct { + server etcdserver.V3DemoServer +} + +func New(s etcdserver.V3DemoServer) pb.EtcdServer { + return &handler{s} +} + +func (h *handler) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { + resp := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Range: r}) + return resp.(*pb.RangeResponse), nil +} + +func (h *handler) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { + resp := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{Put: r}) + return resp.(*pb.PutResponse), nil +} + +func (h *handler) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { + resp := h.server.V3DemoDo(ctx, pb.InternalRaftRequest{DeleteRange: r}) + return resp.(*pb.DeleteRangeResponse), nil +} + +func (h *handler) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { + panic("not implemented") + return nil, nil +} + +func (h *handler) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { + panic("not implemented") + return nil, nil +} diff --git a/etcdserver/config.go b/etcdserver/config.go index 9b6132670..b19eae375 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -43,6 +43,8 @@ type ServerConfig struct { TickMs uint ElectionTicks int + + V3demo bool } // VerifyBootstrapConfig sanity-checks the initial config for bootstrap case diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index 3fc62b854..6626e706e 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -7,12 +7,12 @@ It is generated from these files: etcdserver.proto + raft_internal.proto rpc.proto It has these top-level messages: Request Metadata - InternalRaftRequest */ package etcdserverpb @@ -62,17 +62,6 @@ func (m *Metadata) Reset() { *m = Metadata{} } func (m *Metadata) String() string { return proto.CompactTextString(m) } func (*Metadata) ProtoMessage() {} -// An InternalRaftRequest is the union of all requests which can be -// sent via raft. -type InternalRaftRequest struct { - V2 *Request `protobuf:"bytes,1,opt,name=v2" json:"v2,omitempty"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *InternalRaftRequest) Reset() { *m = InternalRaftRequest{} } -func (m *InternalRaftRequest) String() string { return proto.CompactTextString(m) } -func (*InternalRaftRequest) ProtoMessage() {} - func init() { } func (m *Request) Unmarshal(data []byte) error { @@ -474,76 +463,6 @@ func (m *Metadata) Unmarshal(data []byte) error { return nil } -func (m *InternalRaftRequest) Unmarshal(data []byte) error { - l := len(data) - iNdEx := 0 - for iNdEx < l { - var wire uint64 - for shift := uint(0); ; shift += 7 { - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field V2", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - msglen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - postIndex := iNdEx + msglen - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.V2 == nil { - m.V2 = &Request{} - } - if err := m.V2.Unmarshal(data[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - default: - var sizeOfWire int - for { - sizeOfWire++ - wire >>= 7 - if wire == 0 { - break - } - } - iNdEx -= sizeOfWire - skippy, err := skipEtcdserver(data[iNdEx:]) - if err != nil { - return err - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - m.XXX_unrecognized = append(m.XXX_unrecognized, data[iNdEx:iNdEx+skippy]...) - iNdEx += skippy - } - } - - return nil -} func skipEtcdserver(data []byte) (n int, err error) { l := len(data) iNdEx := 0 @@ -628,22 +547,6 @@ func skipEtcdserver(data []byte) (n int, err error) { } panic("unreachable") } -func (this *InternalRaftRequest) GetValue() interface{} { - if this.V2 != nil { - return this.V2 - } - return nil -} - -func (this *InternalRaftRequest) SetValue(value interface{}) bool { - switch vt := value.(type) { - case *Request: - this.V2 = vt - default: - return false - } - return true -} func (m *Request) Size() (n int) { var l int _ = l @@ -686,19 +589,6 @@ func (m *Metadata) Size() (n int) { return n } -func (m *InternalRaftRequest) Size() (n int) { - var l int - _ = l - if m.V2 != nil { - l = m.V2.Size() - n += 1 + l + sovEtcdserver(uint64(l)) - } - if m.XXX_unrecognized != nil { - n += len(m.XXX_unrecognized) - } - return n -} - func sovEtcdserver(x uint64) (n int) { for { n++ @@ -851,37 +741,6 @@ func (m *Metadata) MarshalTo(data []byte) (n int, err error) { return i, nil } -func (m *InternalRaftRequest) Marshal() (data []byte, err error) { - size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) - if err != nil { - return nil, err - } - return data[:n], nil -} - -func (m *InternalRaftRequest) MarshalTo(data []byte) (n int, err error) { - var i int - _ = i - var l int - _ = l - if m.V2 != nil { - data[i] = 0xa - i++ - i = encodeVarintEtcdserver(data, i, uint64(m.V2.Size())) - n1, err := m.V2.MarshalTo(data[i:]) - if err != nil { - return 0, err - } - i += n1 - } - if m.XXX_unrecognized != nil { - i += copy(data[i:], m.XXX_unrecognized) - } - return i, nil -} - func encodeFixed64Etcdserver(data []byte, offset int, v uint64) int { data[offset] = uint8(v) data[offset+1] = uint8(v >> 8) diff --git a/etcdserver/etcdserverpb/etcdserver.proto b/etcdserver/etcdserverpb/etcdserver.proto index ee989d9c3..bfc29625c 100644 --- a/etcdserver/etcdserverpb/etcdserver.proto +++ b/etcdserver/etcdserverpb/etcdserver.proto @@ -31,12 +31,3 @@ message Metadata { optional uint64 NodeID = 1 [(gogoproto.nullable) = false]; optional uint64 ClusterID = 2 [(gogoproto.nullable) = false]; } - -// An InternalRaftRequest is the union of all requests which can be -// sent via raft. -message InternalRaftRequest { - option (gogoproto.onlyone) = true; - oneof value { - Request v2 = 1; - } -} diff --git a/etcdserver/etcdserverpb/raft_internal.pb.go b/etcdserver/etcdserverpb/raft_internal.pb.go new file mode 100644 index 000000000..c3073178a --- /dev/null +++ b/etcdserver/etcdserverpb/raft_internal.pb.go @@ -0,0 +1,463 @@ +// Code generated by protoc-gen-gogo. +// source: raft_internal.proto +// DO NOT EDIT! + +package etcdserverpb + +import proto "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto" + +// discarding unused import gogoproto "github.com/gogo/protobuf/gogoproto/gogo.pb" + +import io "io" +import fmt "fmt" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal + +// An InternalRaftRequest is the union of all requests which can be +// sent via raft. +type InternalRaftRequest struct { + V2 *Request `protobuf:"bytes,1,opt,name=v2" json:"v2,omitempty"` + Range *RangeRequest `protobuf:"bytes,2,opt,name=range" json:"range,omitempty"` + Put *PutRequest `protobuf:"bytes,3,opt,name=put" json:"put,omitempty"` + DeleteRange *DeleteRangeRequest `protobuf:"bytes,4,opt,name=delete_range" json:"delete_range,omitempty"` + Txn *TxnRequest `protobuf:"bytes,5,opt,name=txn" json:"txn,omitempty"` +} + +func (m *InternalRaftRequest) Reset() { *m = InternalRaftRequest{} } +func (m *InternalRaftRequest) String() string { return proto.CompactTextString(m) } +func (*InternalRaftRequest) ProtoMessage() {} + +func init() { +} +func (m *InternalRaftRequest) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field V2", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.V2 == nil { + m.V2 = &Request{} + } + if err := m.V2.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Range", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Range == nil { + m.Range = &RangeRequest{} + } + if err := m.Range.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Put", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Put == nil { + m.Put = &PutRequest{} + } + if err := m.Put.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DeleteRange", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.DeleteRange == nil { + m.DeleteRange = &DeleteRangeRequest{} + } + if err := m.DeleteRange.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Txn", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Txn == nil { + m.Txn = &TxnRequest{} + } + if err := m.Txn.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + iNdEx -= sizeOfWire + skippy, err := skipRaftInternal(data[iNdEx:]) + if err != nil { + return err + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + return nil +} +func skipRaftInternal(data []byte) (n int, err error) { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if data[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipRaftInternal(data[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} +func (this *InternalRaftRequest) GetValue() interface{} { + if this.V2 != nil { + return this.V2 + } + if this.Range != nil { + return this.Range + } + if this.Put != nil { + return this.Put + } + if this.DeleteRange != nil { + return this.DeleteRange + } + if this.Txn != nil { + return this.Txn + } + return nil +} + +func (this *InternalRaftRequest) SetValue(value interface{}) bool { + switch vt := value.(type) { + case *Request: + this.V2 = vt + case *RangeRequest: + this.Range = vt + case *PutRequest: + this.Put = vt + case *DeleteRangeRequest: + this.DeleteRange = vt + case *TxnRequest: + this.Txn = vt + default: + return false + } + return true +} +func (m *InternalRaftRequest) Size() (n int) { + var l int + _ = l + if m.V2 != nil { + l = m.V2.Size() + n += 1 + l + sovRaftInternal(uint64(l)) + } + if m.Range != nil { + l = m.Range.Size() + n += 1 + l + sovRaftInternal(uint64(l)) + } + if m.Put != nil { + l = m.Put.Size() + n += 1 + l + sovRaftInternal(uint64(l)) + } + if m.DeleteRange != nil { + l = m.DeleteRange.Size() + n += 1 + l + sovRaftInternal(uint64(l)) + } + if m.Txn != nil { + l = m.Txn.Size() + n += 1 + l + sovRaftInternal(uint64(l)) + } + return n +} + +func sovRaftInternal(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozRaftInternal(x uint64) (n int) { + return sovRaftInternal(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *InternalRaftRequest) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *InternalRaftRequest) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + if m.V2 != nil { + data[i] = 0xa + i++ + i = encodeVarintRaftInternal(data, i, uint64(m.V2.Size())) + n1, err := m.V2.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n1 + } + if m.Range != nil { + data[i] = 0x12 + i++ + i = encodeVarintRaftInternal(data, i, uint64(m.Range.Size())) + n2, err := m.Range.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n2 + } + if m.Put != nil { + data[i] = 0x1a + i++ + i = encodeVarintRaftInternal(data, i, uint64(m.Put.Size())) + n3, err := m.Put.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n3 + } + if m.DeleteRange != nil { + data[i] = 0x22 + i++ + i = encodeVarintRaftInternal(data, i, uint64(m.DeleteRange.Size())) + n4, err := m.DeleteRange.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n4 + } + if m.Txn != nil { + data[i] = 0x2a + i++ + i = encodeVarintRaftInternal(data, i, uint64(m.Txn.Size())) + n5, err := m.Txn.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n5 + } + return i, nil +} + +func encodeFixed64RaftInternal(data []byte, offset int, v uint64) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + data[offset+4] = uint8(v >> 32) + data[offset+5] = uint8(v >> 40) + data[offset+6] = uint8(v >> 48) + data[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32RaftInternal(data []byte, offset int, v uint32) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintRaftInternal(data []byte, offset int, v uint64) int { + for v >= 1<<7 { + data[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + data[offset] = uint8(v) + return offset + 1 +} diff --git a/etcdserver/etcdserverpb/raft_internal.proto b/etcdserver/etcdserverpb/raft_internal.proto new file mode 100644 index 000000000..4fb496bfd --- /dev/null +++ b/etcdserver/etcdserverpb/raft_internal.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; +package etcdserverpb; + +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; +import "etcdserver.proto"; +import "rpc.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.sizer_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_getters_all) = false; + +// An InternalRaftRequest is the union of all requests which can be +// sent via raft. +message InternalRaftRequest { + option (gogoproto.onlyone) = true; + oneof value { + Request v2 = 1; + RangeRequest range = 2; + PutRequest put = 3; + DeleteRangeRequest delete_range = 4; + TxnRequest txn = 5; + } +} diff --git a/etcdserver/server.go b/etcdserver/server.go index dce70c6f0..995912a10 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -20,6 +20,7 @@ import ( "fmt" "math/rand" "net/http" + "os" "path" "regexp" "sync/atomic" @@ -43,6 +44,7 @@ import ( "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/rafthttp" "github.com/coreos/etcd/snap" + dstorage "github.com/coreos/etcd/storage" "github.com/coreos/etcd/store" "github.com/coreos/etcd/version" "github.com/coreos/etcd/wal" @@ -158,6 +160,7 @@ type EtcdServer struct { cluster *cluster store store.Store + kv dstorage.KV stats *stats.ServerStats lstats *stats.LeaderStats @@ -315,6 +318,13 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { forceVersionC: make(chan struct{}), } + if cfg.V3demo { + srv.kv = dstorage.New(path.Join(cfg.DataDir, "member", "v3demo")) + } else { + // we do not care about the error of the removal + os.RemoveAll(path.Join(cfg.DataDir, "member", "v3demo")) + } + // TODO: move transport initialization near the definition of remote tr := rafthttp.NewTransporter(cfg.Transport, id, cl.ID(), srv, srv.errorc, sstats, lstats) // add all remotes into transport diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go new file mode 100644 index 000000000..409a56c17 --- /dev/null +++ b/etcdserver/v3demo_server.go @@ -0,0 +1,62 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto" + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +type V3DemoServer interface { + V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) proto.Message +} + +func (s *EtcdServer) V3DemoDo(ctx context.Context, r pb.InternalRaftRequest) proto.Message { + switch { + case r.Range != nil: + rr := r.Range + resp := &pb.RangeResponse{} + resp.Header = &pb.ResponseHeader{} + kvs, rev, err := s.kv.Range(rr.Key, rr.RangeEnd, rr.Limit, 0) + if err != nil { + panic("not handled error") + } + + resp.Header.Index = rev + for i := range kvs { + resp.Kvs = append(resp.Kvs, &kvs[i]) + } + return resp + case r.Put != nil: + rp := r.Put + resp := &pb.PutResponse{} + resp.Header = &pb.ResponseHeader{} + rev := s.kv.Put(rp.Key, rp.Value) + resp.Header.Index = rev + return resp + case r.DeleteRange != nil: + rd := r.DeleteRange + resp := &pb.DeleteRangeResponse{} + resp.Header = &pb.ResponseHeader{} + _, rev := s.kv.DeleteRange(rd.Key, rd.RangeEnd) + resp.Header.Index = rev + return resp + case r.Txn != nil: + panic("not implemented") + default: + panic("not implemented") + } +}