etcdserver: split out v2 raft apply interface

This commit is contained in:
Anthony Romano
2016-04-20 00:31:58 -07:00
parent 1fe4c34398
commit 41382bc3f0
3 changed files with 197 additions and 103 deletions

129
etcdserver/apply_v2.go Normal file
View File

@@ -0,0 +1,129 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package etcdserver
import (
"encoding/json"
"path"
"time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/store"
"github.com/coreos/go-semver/semver"
)
// applierV2 is the interface for processing V2 raft messages
type applierV2 interface {
Delete(r *pb.Request) Response
Post(r *pb.Request) Response
Put(r *pb.Request) Response
QGet(r *pb.Request) Response
Sync(r *pb.Request) Response
}
type applierV2store struct{ s *EtcdServer }
func (a *applierV2store) Delete(r *pb.Request) Response {
switch {
case r.PrevIndex > 0 || r.PrevValue != "":
return toResponse(a.s.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
default:
return toResponse(a.s.store.Delete(r.Path, r.Dir, r.Recursive))
}
}
func (a *applierV2store) Post(r *pb.Request) Response {
return toResponse(a.s.store.Create(r.Path, r.Dir, r.Val, true, toTTLOptions(r)))
}
func (a *applierV2store) Put(r *pb.Request) Response {
ttlOptions := toTTLOptions(r)
exists, existsSet := pbutil.GetBool(r.PrevExist)
switch {
case existsSet:
if exists {
if r.PrevIndex == 0 && r.PrevValue == "" {
return toResponse(a.s.store.Update(r.Path, r.Val, ttlOptions))
} else {
return toResponse(a.s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
}
}
return toResponse(a.s.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions))
case r.PrevIndex > 0 || r.PrevValue != "":
return toResponse(a.s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
default:
if storeMemberAttributeRegexp.MatchString(r.Path) {
id := membership.MustParseMemberIDFromKey(path.Dir(r.Path))
var attr membership.Attributes
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
plog.Panicf("unmarshal %s should never fail: %v", r.Val, err)
}
a.s.cluster.UpdateAttributes(id, attr)
// return an empty response since there is no consumer.
return Response{}
}
if r.Path == membership.StoreClusterVersionKey() {
a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
// return an empty response since there is no consumer.
return Response{}
}
return toResponse(a.s.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
}
}
func (a *applierV2store) QGet(r *pb.Request) Response {
return toResponse(a.s.store.Get(r.Path, r.Recursive, r.Sorted))
}
func (a *applierV2store) Sync(r *pb.Request) Response {
a.s.store.DeleteExpiredKeys(time.Unix(0, r.Time))
return Response{}
}
// applyV2Request interprets r as a call to store.X and returns a Response interpreted
// from store.Event
func (s *EtcdServer) applyV2Request(r *pb.Request) Response {
toTTLOptions(r)
switch r.Method {
case "POST":
return s.applyV2.Post(r)
case "PUT":
return s.applyV2.Put(r)
case "DELETE":
return s.applyV2.Delete(r)
case "QGET":
return s.applyV2.QGet(r)
case "SYNC":
return s.applyV2.Sync(r)
default:
// This should never be reached, but just in case:
return Response{err: ErrUnknownMethod}
}
}
func toTTLOptions(r *pb.Request) store.TTLOptionSet {
refresh, _ := pbutil.GetBool(r.Refresh)
ttlOptions := store.TTLOptionSet{Refresh: refresh}
if r.Expiration != 0 {
ttlOptions.ExpireTime = time.Unix(0, r.Expiration)
}
return ttlOptions
}
func toResponse(ev *store.Event, err error) Response {
return Response{Event: ev, err: err}
}

View File

@@ -178,6 +178,8 @@ type EtcdServer struct {
store store.Store
applyV2 applierV2
applyV3 applierV3
kv dstorage.ConsistentWatchableKV
lessor lease.Lessor
@@ -382,6 +384,8 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
}
srv.applyV2 = &applierV2store{srv}
srv.be = be
srv.lessor = lease.NewLessor(srv.be)
srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex)
@@ -1025,48 +1029,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
e := es[i]
switch e.Type {
case raftpb.EntryNormal:
// raft state machine may generate noop entry when leader confirmation.
// skip it in advance to avoid some potential bug in the future
if len(e.Data) == 0 {
select {
case s.forceVersionC <- struct{}{}:
default:
}
break
}
var raftReq pb.InternalRaftRequest
if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
var r pb.Request
pbutil.MustUnmarshal(&r, e.Data)
s.w.Trigger(r.ID, s.applyRequest(r))
} else if raftReq.V2 != nil {
req := raftReq.V2
s.w.Trigger(req.ID, s.applyRequest(*req))
} else {
// do not re-apply applied entries.
if e.Index <= s.consistIndex.ConsistentIndex() {
break
}
// set the consistent index of current executing entry
s.consistIndex.setConsistentIndex(e.Index)
ar := s.applyV3Request(&raftReq)
if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
s.w.Trigger(raftReq.ID, ar)
break
}
plog.Errorf("applying raft message exceeded backend quota")
go func() {
a := &pb.AlarmRequest{
MemberID: uint64(s.ID()),
Action: pb.AlarmRequest_ACTIVATE,
Alarm: pb.AlarmType_NOSPACE,
}
r := pb.InternalRaftRequest{Alarm: a}
s.processInternalRaftRequest(context.TODO(), r)
s.w.Trigger(raftReq.ID, ar)
}()
}
s.applyEntryNormal(&e)
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
@@ -1083,70 +1046,54 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
return applied, shouldstop
}
// applyRequest interprets r as a call to store.X and returns a Response interpreted
// from store.Event
func (s *EtcdServer) applyRequest(r pb.Request) Response {
f := func(ev *store.Event, err error) Response {
return Response{Event: ev, err: err}
// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
// raft state machine may generate noop entry when leader confirmation.
// skip it in advance to avoid some potential bug in the future
if len(e.Data) == 0 {
select {
case s.forceVersionC <- struct{}{}:
default:
}
return
}
refresh, _ := pbutil.GetBool(r.Refresh)
ttlOptions := store.TTLOptionSet{Refresh: refresh}
if r.Expiration != 0 {
ttlOptions.ExpireTime = time.Unix(0, r.Expiration)
var raftReq pb.InternalRaftRequest
if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
var r pb.Request
pbutil.MustUnmarshal(&r, e.Data)
s.w.Trigger(r.ID, s.applyV2Request(&r))
return
}
if raftReq.V2 != nil {
req := raftReq.V2
s.w.Trigger(req.ID, s.applyV2Request(req))
return
}
switch r.Method {
case "POST":
return f(s.store.Create(r.Path, r.Dir, r.Val, true, ttlOptions))
case "PUT":
exists, existsSet := pbutil.GetBool(r.PrevExist)
switch {
case existsSet:
if exists {
if r.PrevIndex == 0 && r.PrevValue == "" {
return f(s.store.Update(r.Path, r.Val, ttlOptions))
} else {
return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
}
}
return f(s.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions))
case r.PrevIndex > 0 || r.PrevValue != "":
return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
default:
if storeMemberAttributeRegexp.MatchString(r.Path) {
id := membership.MustParseMemberIDFromKey(path.Dir(r.Path))
var attr membership.Attributes
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
plog.Panicf("unmarshal %s should never fail: %v", r.Val, err)
}
s.cluster.UpdateAttributes(id, attr)
// return an empty response since there is no consumer.
return Response{}
}
if r.Path == membership.StoreClusterVersionKey() {
s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
// return an empty response since there is no consumer.
return Response{}
}
return f(s.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
}
case "DELETE":
switch {
case r.PrevIndex > 0 || r.PrevValue != "":
return f(s.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
default:
return f(s.store.Delete(r.Path, r.Dir, r.Recursive))
}
case "QGET":
return f(s.store.Get(r.Path, r.Recursive, r.Sorted))
case "SYNC":
s.store.DeleteExpiredKeys(time.Unix(0, r.Time))
return Response{}
default:
// This should never be reached, but just in case:
return Response{err: ErrUnknownMethod}
// do not re-apply applied entries.
if e.Index <= s.consistIndex.ConsistentIndex() {
return
}
// set the consistent index of current executing entry
s.consistIndex.setConsistentIndex(e.Index)
ar := s.applyV3Request(&raftReq)
if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
s.w.Trigger(raftReq.ID, ar)
return
}
plog.Errorf("applying raft message exceeded backend quota")
go func() {
a := &pb.AlarmRequest{
MemberID: uint64(s.ID()),
Action: pb.AlarmRequest_ACTIVATE,
Alarm: pb.AlarmType_NOSPACE,
}
r := pb.InternalRaftRequest{Alarm: a}
s.processInternalRaftRequest(context.TODO(), r)
s.w.Trigger(raftReq.ID, ar)
}()
}
// applyConfChange applies a ConfChange to the server. It is only

View File

@@ -179,6 +179,7 @@ func TestApplyRepeat(t *testing.T) {
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
s.applyV2 = &applierV2store{s}
s.start()
req := &pb.Request{Method: "QGET", ID: uint64(1)}
ents := []raftpb.Entry{{Index: 1, Data: pbutil.MustMarshal(req)}}
@@ -444,7 +445,8 @@ func TestApplyRequest(t *testing.T) {
for i, tt := range tests {
st := mockstore.NewRecorder()
srv := &EtcdServer{store: st}
resp := srv.applyRequest(tt.req)
srv.applyV2 = &applierV2store{srv}
resp := srv.applyV2Request(&tt.req)
if !reflect.DeepEqual(resp, tt.wresp) {
t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
@@ -462,13 +464,15 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
store: mockstore.NewRecorder(),
cluster: cl,
}
srv.applyV2 = &applierV2store{srv}
req := pb.Request{
Method: "PUT",
ID: 1,
Path: membership.MemberAttributesStorePath(1),
Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`,
}
srv.applyRequest(req)
srv.applyV2Request(&req)
w := membership.Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}}
if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) {
t.Errorf("attributes = %v, want %v", g, w)
@@ -635,6 +639,7 @@ func TestDoProposal(t *testing.T) {
store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
srv.applyV2 = &applierV2store{srv}
srv.start()
resp, err := srv.Do(context.Background(), tt)
srv.Stop()
@@ -661,6 +666,8 @@ func TestDoProposalCancelled(t *testing.T) {
w: wt,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
srv.applyV2 = &applierV2store{srv}
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := srv.Do(ctx, pb.Request{Method: "PUT"})
@@ -681,6 +688,8 @@ func TestDoProposalTimeout(t *testing.T) {
w: mockwait.NewNop(),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
srv.applyV2 = &applierV2store{srv}
ctx, _ := context.WithTimeout(context.Background(), 0)
_, err := srv.Do(ctx, pb.Request{Method: "PUT"})
if err != ErrTimeout {
@@ -695,6 +704,8 @@ func TestDoProposalStopped(t *testing.T) {
w: mockwait.NewNop(),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
srv.applyV2 = &applierV2store{srv}
srv.done = make(chan struct{})
close(srv.done)
_, err := srv.Do(context.Background(), pb.Request{Method: "PUT", ID: 1})
@@ -710,6 +721,8 @@ func TestSync(t *testing.T) {
r: raftNode{Node: n},
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
srv.applyV2 = &applierV2store{srv}
// check that sync is non-blocking
done := make(chan struct{})
go func() {
@@ -748,6 +761,8 @@ func TestSyncTimeout(t *testing.T) {
r: raftNode{Node: n},
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
srv.applyV2 = &applierV2store{srv}
// check that sync is non-blocking
done := make(chan struct{})
go func() {
@@ -885,6 +900,8 @@ func TestTriggerSnap(t *testing.T) {
store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
srv.applyV2 = &applierV2store{srv}
srv.kv = dstorage.New(be, &lease.FakeLessor{}, &srv.consistIndex)
srv.be = be
@@ -951,6 +968,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
cluster: cl,
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
}
s.applyV2 = &applierV2store{s}
be, tmpPath := backend.NewDefaultTmpBackend()
defer func() {