From ebace2eb1b28d9618fa85397ea51f51f4ac4d016 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 20 Apr 2016 01:06:49 -0700 Subject: [PATCH] etcdserver: split out v2 Do() API from core server code --- etcdserver/server.go | 64 -------------------- etcdserver/v2_server.go | 129 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+), 64 deletions(-) create mode 100644 etcdserver/v2_server.go diff --git a/etcdserver/server.go b/etcdserver/server.go index c7e755998..6874405c6 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -735,70 +735,6 @@ func (s *EtcdServer) stopWithDelay(d time.Duration, err error) { // when the server is stopped. func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done } -// Do interprets r and performs an operation on s.store according to r.Method -// and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with -// Quorum == true, r will be sent through consensus before performing its -// respective operation. Do will block until an action is performed or there is -// an error. -func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { - r.ID = s.reqIDGen.Next() - if r.Method == "GET" && r.Quorum { - r.Method = "QGET" - } - switch r.Method { - case "POST", "PUT", "DELETE", "QGET": - data, err := r.Marshal() - if err != nil { - return Response{}, err - } - ch := s.w.Register(r.ID) - - // TODO: benchmark the cost of time.Now() - // might be sampling? - start := time.Now() - s.r.Propose(ctx, data) - - proposePending.Inc() - defer proposePending.Dec() - - select { - case x := <-ch: - proposeDurations.Observe(float64(time.Since(start)) / float64(time.Second)) - resp := x.(Response) - return resp, resp.err - case <-ctx.Done(): - proposeFailed.Inc() - s.w.Trigger(r.ID, nil) // GC wait - return Response{}, s.parseProposeCtxErr(ctx.Err(), start) - case <-s.done: - return Response{}, ErrStopped - } - case "GET": - switch { - case r.Wait: - wc, err := s.store.Watch(r.Path, r.Recursive, r.Stream, r.Since) - if err != nil { - return Response{}, err - } - return Response{Watcher: wc}, nil - default: - ev, err := s.store.Get(r.Path, r.Recursive, r.Sorted) - if err != nil { - return Response{}, err - } - return Response{Event: ev}, nil - } - case "HEAD": - ev, err := s.store.Get(r.Path, r.Recursive, r.Sorted) - if err != nil { - return Response{}, err - } - return Response{Event: ev}, nil - default: - return Response{}, ErrUnknownMethod - } -} - func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() } func (s *EtcdServer) LeaderStats() []byte { diff --git a/etcdserver/v2_server.go b/etcdserver/v2_server.go new file mode 100644 index 000000000..ed3653e2e --- /dev/null +++ b/etcdserver/v2_server.go @@ -0,0 +1,129 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "time" + + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "golang.org/x/net/context" +) + +type v2API interface { + Post(ctx context.Context, r *pb.Request) (Response, error) + Put(ctx context.Context, r *pb.Request) (Response, error) + Delete(ctx context.Context, r *pb.Request) (Response, error) + QGet(ctx context.Context, r *pb.Request) (Response, error) + Get(ctx context.Context, r *pb.Request) (Response, error) + Head(ctx context.Context, r *pb.Request) (Response, error) +} + +type v2apiStore struct{ s *EtcdServer } + +func (a *v2apiStore) Post(ctx context.Context, r *pb.Request) (Response, error) { + return a.processRaftRequest(ctx, r) +} + +func (a *v2apiStore) Put(ctx context.Context, r *pb.Request) (Response, error) { + return a.processRaftRequest(ctx, r) +} + +func (a *v2apiStore) Delete(ctx context.Context, r *pb.Request) (Response, error) { + return a.processRaftRequest(ctx, r) +} + +func (a *v2apiStore) QGet(ctx context.Context, r *pb.Request) (Response, error) { + return a.processRaftRequest(ctx, r) +} + +func (a *v2apiStore) processRaftRequest(ctx context.Context, r *pb.Request) (Response, error) { + data, err := r.Marshal() + if err != nil { + return Response{}, err + } + ch := a.s.w.Register(r.ID) + + // TODO: benchmark the cost of time.Now() + // might be sampling? + start := time.Now() + a.s.r.Propose(ctx, data) + + proposePending.Inc() + defer proposePending.Dec() + + select { + case x := <-ch: + proposeDurations.Observe(float64(time.Since(start)) / float64(time.Second)) + resp := x.(Response) + return resp, resp.err + case <-ctx.Done(): + proposeFailed.Inc() + a.s.w.Trigger(r.ID, nil) // GC wait + return Response{}, a.s.parseProposeCtxErr(ctx.Err(), start) + case <-a.s.done: + } + return Response{}, ErrStopped +} + +func (a *v2apiStore) Get(ctx context.Context, r *pb.Request) (Response, error) { + if r.Wait { + wc, err := a.s.store.Watch(r.Path, r.Recursive, r.Stream, r.Since) + if err != nil { + return Response{}, err + } + return Response{Watcher: wc}, nil + } + ev, err := a.s.store.Get(r.Path, r.Recursive, r.Sorted) + if err != nil { + return Response{}, err + } + return Response{Event: ev}, nil +} + +func (a *v2apiStore) Head(ctx context.Context, r *pb.Request) (Response, error) { + ev, err := a.s.store.Get(r.Path, r.Recursive, r.Sorted) + if err != nil { + return Response{}, err + } + return Response{Event: ev}, nil +} + +// Do interprets r and performs an operation on s.store according to r.Method +// and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with +// Quorum == true, r will be sent through consensus before performing its +// respective operation. Do will block until an action is performed or there is +// an error. +func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { + r.ID = s.reqIDGen.Next() + if r.Method == "GET" && r.Quorum { + r.Method = "QGET" + } + v2api := (v2API)(&v2apiStore{s}) + switch r.Method { + case "POST": + return v2api.Post(ctx, &r) + case "PUT": + return v2api.Put(ctx, &r) + case "DELETE": + return v2api.Delete(ctx, &r) + case "QGET": + return v2api.QGet(ctx, &r) + case "GET": + return v2api.Get(ctx, &r) + case "HEAD": + return v2api.Head(ctx, &r) + } + return Response{}, ErrUnknownMethod +}