diff --git a/etcdserver/apply_v2.go b/etcdserver/apply_v2.go new file mode 100644 index 000000000..b1e2c686f --- /dev/null +++ b/etcdserver/apply_v2.go @@ -0,0 +1,129 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "encoding/json" + "path" + "time" + + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/etcdserver/membership" + "github.com/coreos/etcd/pkg/pbutil" + "github.com/coreos/etcd/store" + "github.com/coreos/go-semver/semver" +) + +// applierV2 is the interface for processing V2 raft messages +type applierV2 interface { + Delete(r *pb.Request) Response + Post(r *pb.Request) Response + Put(r *pb.Request) Response + QGet(r *pb.Request) Response + Sync(r *pb.Request) Response +} + +type applierV2store struct{ s *EtcdServer } + +func (a *applierV2store) Delete(r *pb.Request) Response { + switch { + case r.PrevIndex > 0 || r.PrevValue != "": + return toResponse(a.s.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex)) + default: + return toResponse(a.s.store.Delete(r.Path, r.Dir, r.Recursive)) + } +} + +func (a *applierV2store) Post(r *pb.Request) Response { + return toResponse(a.s.store.Create(r.Path, r.Dir, r.Val, true, toTTLOptions(r))) +} + +func (a *applierV2store) Put(r *pb.Request) Response { + ttlOptions := toTTLOptions(r) + exists, existsSet := pbutil.GetBool(r.PrevExist) + switch { + case existsSet: + if exists { + if r.PrevIndex == 0 && r.PrevValue == "" { + return toResponse(a.s.store.Update(r.Path, r.Val, ttlOptions)) + } else { + return toResponse(a.s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) + } + } + return toResponse(a.s.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions)) + case r.PrevIndex > 0 || r.PrevValue != "": + return toResponse(a.s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) + default: + if storeMemberAttributeRegexp.MatchString(r.Path) { + id := membership.MustParseMemberIDFromKey(path.Dir(r.Path)) + var attr membership.Attributes + if err := json.Unmarshal([]byte(r.Val), &attr); err != nil { + plog.Panicf("unmarshal %s should never fail: %v", r.Val, err) + } + a.s.cluster.UpdateAttributes(id, attr) + // return an empty response since there is no consumer. + return Response{} + } + if r.Path == membership.StoreClusterVersionKey() { + a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val))) + // return an empty response since there is no consumer. + return Response{} + } + return toResponse(a.s.store.Set(r.Path, r.Dir, r.Val, ttlOptions)) + } +} + +func (a *applierV2store) QGet(r *pb.Request) Response { + return toResponse(a.s.store.Get(r.Path, r.Recursive, r.Sorted)) +} + +func (a *applierV2store) Sync(r *pb.Request) Response { + a.s.store.DeleteExpiredKeys(time.Unix(0, r.Time)) + return Response{} +} + +// applyV2Request interprets r as a call to store.X and returns a Response interpreted +// from store.Event +func (s *EtcdServer) applyV2Request(r *pb.Request) Response { + toTTLOptions(r) + switch r.Method { + case "POST": + return s.applyV2.Post(r) + case "PUT": + return s.applyV2.Put(r) + case "DELETE": + return s.applyV2.Delete(r) + case "QGET": + return s.applyV2.QGet(r) + case "SYNC": + return s.applyV2.Sync(r) + default: + // This should never be reached, but just in case: + return Response{err: ErrUnknownMethod} + } +} + +func toTTLOptions(r *pb.Request) store.TTLOptionSet { + refresh, _ := pbutil.GetBool(r.Refresh) + ttlOptions := store.TTLOptionSet{Refresh: refresh} + if r.Expiration != 0 { + ttlOptions.ExpireTime = time.Unix(0, r.Expiration) + } + return ttlOptions +} + +func toResponse(ev *store.Event, err error) Response { + return Response{Event: ev, err: err} +} diff --git a/etcdserver/server.go b/etcdserver/server.go index cd254eae5..c7e755998 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -178,6 +178,8 @@ type EtcdServer struct { store store.Store + applyV2 applierV2 + applyV3 applierV3 kv dstorage.ConsistentWatchableKV lessor lease.Lessor @@ -382,6 +384,8 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), } + srv.applyV2 = &applierV2store{srv} + srv.be = be srv.lessor = lease.NewLessor(srv.be) srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex) @@ -1025,48 +1029,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint e := es[i] switch e.Type { case raftpb.EntryNormal: - // raft state machine may generate noop entry when leader confirmation. - // skip it in advance to avoid some potential bug in the future - if len(e.Data) == 0 { - select { - case s.forceVersionC <- struct{}{}: - default: - } - break - } - - var raftReq pb.InternalRaftRequest - if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible - var r pb.Request - pbutil.MustUnmarshal(&r, e.Data) - s.w.Trigger(r.ID, s.applyRequest(r)) - } else if raftReq.V2 != nil { - req := raftReq.V2 - s.w.Trigger(req.ID, s.applyRequest(*req)) - } else { - // do not re-apply applied entries. - if e.Index <= s.consistIndex.ConsistentIndex() { - break - } - // set the consistent index of current executing entry - s.consistIndex.setConsistentIndex(e.Index) - ar := s.applyV3Request(&raftReq) - if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 { - s.w.Trigger(raftReq.ID, ar) - break - } - plog.Errorf("applying raft message exceeded backend quota") - go func() { - a := &pb.AlarmRequest{ - MemberID: uint64(s.ID()), - Action: pb.AlarmRequest_ACTIVATE, - Alarm: pb.AlarmType_NOSPACE, - } - r := pb.InternalRaftRequest{Alarm: a} - s.processInternalRaftRequest(context.TODO(), r) - s.w.Trigger(raftReq.ID, ar) - }() - } + s.applyEntryNormal(&e) case raftpb.EntryConfChange: var cc raftpb.ConfChange pbutil.MustUnmarshal(&cc, e.Data) @@ -1083,70 +1046,54 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint return applied, shouldstop } -// applyRequest interprets r as a call to store.X and returns a Response interpreted -// from store.Event -func (s *EtcdServer) applyRequest(r pb.Request) Response { - f := func(ev *store.Event, err error) Response { - return Response{Event: ev, err: err} +// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer +func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { + // raft state machine may generate noop entry when leader confirmation. + // skip it in advance to avoid some potential bug in the future + if len(e.Data) == 0 { + select { + case s.forceVersionC <- struct{}{}: + default: + } + return } - refresh, _ := pbutil.GetBool(r.Refresh) - ttlOptions := store.TTLOptionSet{Refresh: refresh} - if r.Expiration != 0 { - ttlOptions.ExpireTime = time.Unix(0, r.Expiration) + var raftReq pb.InternalRaftRequest + if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible + var r pb.Request + pbutil.MustUnmarshal(&r, e.Data) + s.w.Trigger(r.ID, s.applyV2Request(&r)) + return + } + if raftReq.V2 != nil { + req := raftReq.V2 + s.w.Trigger(req.ID, s.applyV2Request(req)) + return } - switch r.Method { - case "POST": - return f(s.store.Create(r.Path, r.Dir, r.Val, true, ttlOptions)) - case "PUT": - exists, existsSet := pbutil.GetBool(r.PrevExist) - switch { - case existsSet: - if exists { - if r.PrevIndex == 0 && r.PrevValue == "" { - return f(s.store.Update(r.Path, r.Val, ttlOptions)) - } else { - return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) - } - } - return f(s.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions)) - case r.PrevIndex > 0 || r.PrevValue != "": - return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions)) - default: - if storeMemberAttributeRegexp.MatchString(r.Path) { - id := membership.MustParseMemberIDFromKey(path.Dir(r.Path)) - var attr membership.Attributes - if err := json.Unmarshal([]byte(r.Val), &attr); err != nil { - plog.Panicf("unmarshal %s should never fail: %v", r.Val, err) - } - s.cluster.UpdateAttributes(id, attr) - // return an empty response since there is no consumer. - return Response{} - } - if r.Path == membership.StoreClusterVersionKey() { - s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val))) - // return an empty response since there is no consumer. - return Response{} - } - return f(s.store.Set(r.Path, r.Dir, r.Val, ttlOptions)) - } - case "DELETE": - switch { - case r.PrevIndex > 0 || r.PrevValue != "": - return f(s.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex)) - default: - return f(s.store.Delete(r.Path, r.Dir, r.Recursive)) - } - case "QGET": - return f(s.store.Get(r.Path, r.Recursive, r.Sorted)) - case "SYNC": - s.store.DeleteExpiredKeys(time.Unix(0, r.Time)) - return Response{} - default: - // This should never be reached, but just in case: - return Response{err: ErrUnknownMethod} + // do not re-apply applied entries. + if e.Index <= s.consistIndex.ConsistentIndex() { + return } + // set the consistent index of current executing entry + s.consistIndex.setConsistentIndex(e.Index) + ar := s.applyV3Request(&raftReq) + if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 { + s.w.Trigger(raftReq.ID, ar) + return + } + + plog.Errorf("applying raft message exceeded backend quota") + go func() { + a := &pb.AlarmRequest{ + MemberID: uint64(s.ID()), + Action: pb.AlarmRequest_ACTIVATE, + Alarm: pb.AlarmType_NOSPACE, + } + r := pb.InternalRaftRequest{Alarm: a} + s.processInternalRaftRequest(context.TODO(), r) + s.w.Trigger(raftReq.ID, ar) + }() } // applyConfChange applies a ConfChange to the server. It is only diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 513833d0c..cbd6843f4 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -179,6 +179,7 @@ func TestApplyRepeat(t *testing.T) { cluster: cl, reqIDGen: idutil.NewGenerator(0, time.Time{}), } + s.applyV2 = &applierV2store{s} s.start() req := &pb.Request{Method: "QGET", ID: uint64(1)} ents := []raftpb.Entry{{Index: 1, Data: pbutil.MustMarshal(req)}} @@ -444,7 +445,8 @@ func TestApplyRequest(t *testing.T) { for i, tt := range tests { st := mockstore.NewRecorder() srv := &EtcdServer{store: st} - resp := srv.applyRequest(tt.req) + srv.applyV2 = &applierV2store{srv} + resp := srv.applyV2Request(&tt.req) if !reflect.DeepEqual(resp, tt.wresp) { t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp) @@ -462,13 +464,15 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) { store: mockstore.NewRecorder(), cluster: cl, } + srv.applyV2 = &applierV2store{srv} + req := pb.Request{ Method: "PUT", ID: 1, Path: membership.MemberAttributesStorePath(1), Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`, } - srv.applyRequest(req) + srv.applyV2Request(&req) w := membership.Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}} if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) { t.Errorf("attributes = %v, want %v", g, w) @@ -635,6 +639,7 @@ func TestDoProposal(t *testing.T) { store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), } + srv.applyV2 = &applierV2store{srv} srv.start() resp, err := srv.Do(context.Background(), tt) srv.Stop() @@ -661,6 +666,8 @@ func TestDoProposalCancelled(t *testing.T) { w: wt, reqIDGen: idutil.NewGenerator(0, time.Time{}), } + srv.applyV2 = &applierV2store{srv} + ctx, cancel := context.WithCancel(context.Background()) cancel() _, err := srv.Do(ctx, pb.Request{Method: "PUT"}) @@ -681,6 +688,8 @@ func TestDoProposalTimeout(t *testing.T) { w: mockwait.NewNop(), reqIDGen: idutil.NewGenerator(0, time.Time{}), } + srv.applyV2 = &applierV2store{srv} + ctx, _ := context.WithTimeout(context.Background(), 0) _, err := srv.Do(ctx, pb.Request{Method: "PUT"}) if err != ErrTimeout { @@ -695,6 +704,8 @@ func TestDoProposalStopped(t *testing.T) { w: mockwait.NewNop(), reqIDGen: idutil.NewGenerator(0, time.Time{}), } + srv.applyV2 = &applierV2store{srv} + srv.done = make(chan struct{}) close(srv.done) _, err := srv.Do(context.Background(), pb.Request{Method: "PUT", ID: 1}) @@ -710,6 +721,8 @@ func TestSync(t *testing.T) { r: raftNode{Node: n}, reqIDGen: idutil.NewGenerator(0, time.Time{}), } + srv.applyV2 = &applierV2store{srv} + // check that sync is non-blocking done := make(chan struct{}) go func() { @@ -748,6 +761,8 @@ func TestSyncTimeout(t *testing.T) { r: raftNode{Node: n}, reqIDGen: idutil.NewGenerator(0, time.Time{}), } + srv.applyV2 = &applierV2store{srv} + // check that sync is non-blocking done := make(chan struct{}) go func() { @@ -885,6 +900,8 @@ func TestTriggerSnap(t *testing.T) { store: st, reqIDGen: idutil.NewGenerator(0, time.Time{}), } + srv.applyV2 = &applierV2store{srv} + srv.kv = dstorage.New(be, &lease.FakeLessor{}, &srv.consistIndex) srv.be = be @@ -951,6 +968,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { cluster: cl, msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), } + s.applyV2 = &applierV2store{s} be, tmpPath := backend.NewDefaultTmpBackend() defer func() {