etcdserver: refactor v2 request processing

Makes interfaces more reusable.
This commit is contained in:
Anthony Romano
2017-08-07 03:55:18 -07:00
parent 1d3afd4bb5
commit 758c3c09fd
4 changed files with 102 additions and 74 deletions

View File

@@ -20,7 +20,6 @@ import (
"time"
"github.com/coreos/etcd/etcdserver/api"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/store"
@@ -29,11 +28,11 @@ import (
// ApplierV2 is the interface for processing V2 raft messages
type ApplierV2 interface {
Delete(r *pb.Request) Response
Post(r *pb.Request) Response
Put(r *pb.Request) Response
QGet(r *pb.Request) Response
Sync(r *pb.Request) Response
Delete(r *RequestV2) Response
Post(r *RequestV2) Response
Put(r *RequestV2) Response
QGet(r *RequestV2) Response
Sync(r *RequestV2) Response
}
func NewApplierV2(s store.Store, c *membership.RaftCluster) ApplierV2 {
@@ -45,7 +44,7 @@ type applierV2store struct {
cluster *membership.RaftCluster
}
func (a *applierV2store) Delete(r *pb.Request) Response {
func (a *applierV2store) Delete(r *RequestV2) Response {
switch {
case r.PrevIndex > 0 || r.PrevValue != "":
return toResponse(a.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
@@ -54,12 +53,12 @@ func (a *applierV2store) Delete(r *pb.Request) Response {
}
}
func (a *applierV2store) Post(r *pb.Request) Response {
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, toTTLOptions(r)))
func (a *applierV2store) Post(r *RequestV2) Response {
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions()))
}
func (a *applierV2store) Put(r *pb.Request) Response {
ttlOptions := toTTLOptions(r)
func (a *applierV2store) Put(r *RequestV2) Response {
ttlOptions := r.TTLOptions()
exists, existsSet := pbutil.GetBool(r.PrevExist)
switch {
case existsSet:
@@ -96,19 +95,18 @@ func (a *applierV2store) Put(r *pb.Request) Response {
}
}
func (a *applierV2store) QGet(r *pb.Request) Response {
func (a *applierV2store) QGet(r *RequestV2) Response {
return toResponse(a.store.Get(r.Path, r.Recursive, r.Sorted))
}
func (a *applierV2store) Sync(r *pb.Request) Response {
func (a *applierV2store) Sync(r *RequestV2) Response {
a.store.DeleteExpiredKeys(time.Unix(0, r.Time))
return Response{}
}
// applyV2Request interprets r as a call to store.X and returns a Response interpreted
// from store.Event
func (s *EtcdServer) applyV2Request(r *pb.Request) Response {
toTTLOptions(r)
func (s *EtcdServer) applyV2Request(r *RequestV2) Response {
switch r.Method {
case "POST":
return s.applyV2.Post(r)
@@ -122,11 +120,11 @@ func (s *EtcdServer) applyV2Request(r *pb.Request) Response {
return s.applyV2.Sync(r)
default:
// This should never be reached, but just in case:
return Response{err: ErrUnknownMethod}
return Response{Err: ErrUnknownMethod}
}
}
func toTTLOptions(r *pb.Request) store.TTLOptionSet {
func (r *RequestV2) TTLOptions() store.TTLOptionSet {
refresh, _ := pbutil.GetBool(r.Refresh)
ttlOptions := store.TTLOptionSet{Refresh: refresh}
if r.Expiration != 0 {
@@ -136,5 +134,5 @@ func toTTLOptions(r *pb.Request) store.TTLOptionSet {
}
func toResponse(ev *store.Event, err error) Response {
return Response{Event: ev, err: err}
return Response{Event: ev, Err: err}
}

View File

@@ -113,7 +113,7 @@ type Response struct {
Index uint64
Event *store.Event
Watcher store.Watcher
err error
Err error
}
type ServerV2 interface {
@@ -1344,12 +1344,13 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
var raftReq pb.InternalRaftRequest
if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
var r pb.Request
pbutil.MustUnmarshal(&r, e.Data)
s.w.Trigger(r.ID, s.applyV2Request(&r))
rp := &r
pbutil.MustUnmarshal(rp, e.Data)
s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp)))
return
}
if raftReq.V2 != nil {
req := raftReq.V2
req := (*RequestV2)(raftReq.V2)
s.w.Trigger(req.ID, s.applyV2Request(req))
return
}

View File

@@ -441,7 +441,7 @@ func TestApplyRequest(t *testing.T) {
// Unknown method - error
{
pb.Request{Method: "BADMETHOD", ID: 1},
Response{err: ErrUnknownMethod},
Response{Err: ErrUnknownMethod},
[]testutil.Action{},
},
}
@@ -450,7 +450,7 @@ func TestApplyRequest(t *testing.T) {
st := mockstore.NewRecorder()
srv := &EtcdServer{store: st}
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
resp := srv.applyV2Request(&tt.req)
resp := srv.applyV2Request((*RequestV2)(&tt.req))
if !reflect.DeepEqual(resp, tt.wresp) {
t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
@@ -476,7 +476,7 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
Path: membership.MemberAttributesStorePath(1),
Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`,
}
srv.applyV2Request(&req)
srv.applyV2Request((*RequestV2)(&req))
w := membership.Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}}
if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) {
t.Errorf("attributes = %v, want %v", g, w)

View File

@@ -18,38 +18,83 @@ import (
"time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/store"
"golang.org/x/net/context"
)
type v2API interface {
Post(ctx context.Context, r *pb.Request) (Response, error)
Put(ctx context.Context, r *pb.Request) (Response, error)
Delete(ctx context.Context, r *pb.Request) (Response, error)
QGet(ctx context.Context, r *pb.Request) (Response, error)
Get(ctx context.Context, r *pb.Request) (Response, error)
Head(ctx context.Context, r *pb.Request) (Response, error)
type RequestV2 pb.Request
type RequestV2Handler interface {
Post(ctx context.Context, r *RequestV2) (Response, error)
Put(ctx context.Context, r *RequestV2) (Response, error)
Delete(ctx context.Context, r *RequestV2) (Response, error)
QGet(ctx context.Context, r *RequestV2) (Response, error)
Get(ctx context.Context, r *RequestV2) (Response, error)
Head(ctx context.Context, r *RequestV2) (Response, error)
}
type v2apiStore struct{ s *EtcdServer }
type reqV2HandlerEtcdServer struct {
reqV2HandlerStore
s *EtcdServer
}
func (a *v2apiStore) Post(ctx context.Context, r *pb.Request) (Response, error) {
type reqV2HandlerStore struct {
store store.Store
applier ApplierV2
}
func NewStoreRequestV2Handler(s store.Store, applier ApplierV2) RequestV2Handler {
return &reqV2HandlerStore{s, applier}
}
func (a *reqV2HandlerStore) Post(ctx context.Context, r *RequestV2) (Response, error) {
return a.applier.Post(r), nil
}
func (a *reqV2HandlerStore) Put(ctx context.Context, r *RequestV2) (Response, error) {
return a.applier.Put(r), nil
}
func (a *reqV2HandlerStore) Delete(ctx context.Context, r *RequestV2) (Response, error) {
return a.applier.Delete(r), nil
}
func (a *reqV2HandlerStore) QGet(ctx context.Context, r *RequestV2) (Response, error) {
return a.applier.QGet(r), nil
}
func (a *reqV2HandlerStore) Get(ctx context.Context, r *RequestV2) (Response, error) {
if r.Wait {
wc, err := a.store.Watch(r.Path, r.Recursive, r.Stream, r.Since)
return Response{Watcher: wc}, err
}
ev, err := a.store.Get(r.Path, r.Recursive, r.Sorted)
return Response{Event: ev}, err
}
func (a *reqV2HandlerStore) Head(ctx context.Context, r *RequestV2) (Response, error) {
ev, err := a.store.Get(r.Path, r.Recursive, r.Sorted)
return Response{Event: ev}, err
}
func (a *reqV2HandlerEtcdServer) Post(ctx context.Context, r *RequestV2) (Response, error) {
return a.processRaftRequest(ctx, r)
}
func (a *v2apiStore) Put(ctx context.Context, r *pb.Request) (Response, error) {
func (a *reqV2HandlerEtcdServer) Put(ctx context.Context, r *RequestV2) (Response, error) {
return a.processRaftRequest(ctx, r)
}
func (a *v2apiStore) Delete(ctx context.Context, r *pb.Request) (Response, error) {
func (a *reqV2HandlerEtcdServer) Delete(ctx context.Context, r *RequestV2) (Response, error) {
return a.processRaftRequest(ctx, r)
}
func (a *v2apiStore) QGet(ctx context.Context, r *pb.Request) (Response, error) {
func (a *reqV2HandlerEtcdServer) QGet(ctx context.Context, r *RequestV2) (Response, error) {
return a.processRaftRequest(ctx, r)
}
func (a *v2apiStore) processRaftRequest(ctx context.Context, r *pb.Request) (Response, error) {
data, err := r.Marshal()
func (a *reqV2HandlerEtcdServer) processRaftRequest(ctx context.Context, r *RequestV2) (Response, error) {
data, err := ((*pb.Request)(r)).Marshal()
if err != nil {
return Response{}, err
}
@@ -63,7 +108,7 @@ func (a *v2apiStore) processRaftRequest(ctx context.Context, r *pb.Request) (Res
select {
case x := <-ch:
resp := x.(Response)
return resp, resp.err
return resp, resp.Err
case <-ctx.Done():
proposalsFailed.Inc()
a.s.w.Trigger(r.ID, nil) // GC wait
@@ -73,59 +118,43 @@ func (a *v2apiStore) processRaftRequest(ctx context.Context, r *pb.Request) (Res
return Response{}, ErrStopped
}
func (a *v2apiStore) Get(ctx context.Context, r *pb.Request) (Response, error) {
if r.Wait {
wc, err := a.s.store.Watch(r.Path, r.Recursive, r.Stream, r.Since)
if err != nil {
return Response{}, err
}
return Response{Watcher: wc}, nil
}
ev, err := a.s.store.Get(r.Path, r.Recursive, r.Sorted)
if err != nil {
return Response{}, err
}
return Response{Event: ev}, nil
}
func (a *v2apiStore) Head(ctx context.Context, r *pb.Request) (Response, error) {
ev, err := a.s.store.Get(r.Path, r.Recursive, r.Sorted)
if err != nil {
return Response{}, err
}
return Response{Event: ev}, nil
}
func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
resp, err := s.do(ctx, r)
r.ID = s.reqIDGen.Next()
h := &reqV2HandlerEtcdServer{
reqV2HandlerStore: reqV2HandlerStore{
store: s.store,
applier: s.applyV2,
},
s: s,
}
rp := &r
resp, err := ((*RequestV2)(rp)).Handle(ctx, h)
resp.Term, resp.Index = s.Term(), s.Index()
return resp, err
}
// do interprets r and performs an operation on s.store according to r.Method
// Handle interprets r and performs an operation on s.store according to r.Method
// and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with
// Quorum == true, r will be sent through consensus before performing its
// respective operation. Do will block until an action is performed or there is
// an error.
func (s *EtcdServer) do(ctx context.Context, r pb.Request) (Response, error) {
r.ID = s.reqIDGen.Next()
func (r *RequestV2) Handle(ctx context.Context, v2api RequestV2Handler) (Response, error) {
if r.Method == "GET" && r.Quorum {
r.Method = "QGET"
}
v2api := (v2API)(&v2apiStore{s})
switch r.Method {
case "POST":
return v2api.Post(ctx, &r)
return v2api.Post(ctx, r)
case "PUT":
return v2api.Put(ctx, &r)
return v2api.Put(ctx, r)
case "DELETE":
return v2api.Delete(ctx, &r)
return v2api.Delete(ctx, r)
case "QGET":
return v2api.QGet(ctx, &r)
return v2api.QGet(ctx, r)
case "GET":
return v2api.Get(ctx, &r)
return v2api.Get(ctx, r)
case "HEAD":
return v2api.Head(ctx, &r)
return v2api.Head(ctx, r)
}
return Response{}, ErrUnknownMethod
}