etcd/etcdserver/server.go
Jonathan Boulle 8a5ab2ec06 etcdserver: introduce Server interface
This changes etcdserver.Server to an interface, with the former Server
(now "EtcdServer") becoming the canonical/production implementation.
This will facilitate better testing of the http server et al with mock
implementations of the interface.
It also more clearly defines the boundary for users of the Server.
2014-09-15 15:11:01 -07:00

205 lines
5.6 KiB
Go

package etcdserver
import (
"errors"
"time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
"github.com/coreos/etcd/wait"
)
var (
ErrUnknownMethod = errors.New("etcdserver: unknown method")
ErrStopped = errors.New("etcdserver: server stopped")
)
type SendFunc func(m []raftpb.Message)
type SaveFunc func(st raftpb.State, ents []raftpb.Entry)
type Response struct {
Event *store.Event
Watcher store.Watcher
err error
}
type Server interface {
// Start performs any initialization of the Server necessary for it to
// begin serving requests. It must be called before Do or Process.
// Start must be non-blocking; any long-running server functionality
// should be implemented in goroutines.
Start()
// Stop terminates the Server and performs any necessary finalization.
// Do and Process cannot be called after Stop has been invoked.
Stop()
// Do takes a request and attempts to fulfil it, returning a Response.
Do(ctx context.Context, r pb.Request) (Response, error)
// Process takes a raft message and applies it to the server's raft state
// machine, respecting any timeout of the given context.
Process(ctx context.Context, m raftpb.Message) error
}
// EtcdServer is the production implementation of the Server interface
type EtcdServer struct {
w wait.Wait
done chan struct{}
Node raft.Node
Store store.Store
// Send specifies the send function for sending msgs to peers. Send
// MUST NOT block. It is okay to drop messages, since clients should
// timeout and reissue their messages. If Send is nil, server will
// panic.
Send SendFunc
// Save specifies the save function for saving ents to stable storage.
// Save MUST block until st and ents are on stable storage. If Send is
// nil, server will panic.
Save func(st raftpb.State, ents []raftpb.Entry)
Ticker <-chan time.Time
}
// Start prepares and starts server in a new goroutine. It is no longer safe to
// modify a server's fields after it has been sent to Start.
func (s *EtcdServer) Start() {
s.w = wait.New()
s.done = make(chan struct{})
go s.run()
}
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
return s.Node.Step(ctx, m)
}
func (s *EtcdServer) run() {
for {
select {
case <-s.Ticker:
s.Node.Tick()
case rd := <-s.Node.Ready():
s.Save(rd.State, rd.Entries)
s.Send(rd.Messages)
// TODO(bmizerany): do this in the background, but take
// care to apply entries in a single goroutine, and not
// race them.
for _, e := range rd.CommittedEntries {
var r pb.Request
if err := r.Unmarshal(e.Data); err != nil {
panic("TODO: this is bad, what do we do about it?")
}
s.w.Trigger(r.Id, s.apply(r))
}
case <-s.done:
return
}
}
}
// Stop stops the server, and shuts down the running goroutine. Stop should be
// called after a Start(s), otherwise it will block forever.
func (s *EtcdServer) Stop() {
s.Node.Stop()
close(s.done)
}
// Do interprets r and performs an operation on s.Store according to r.Method
// and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET with
// Quorum == true, r will be sent through consensus before performing its
// respective operation. Do will block until an action is performed or there is
// an error.
func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
if r.Id == 0 {
panic("r.Id cannot be 0")
}
if r.Method == "GET" && r.Quorum {
r.Method = "QGET"
}
switch r.Method {
case "POST", "PUT", "DELETE", "QGET":
data, err := r.Marshal()
if err != nil {
return Response{}, err
}
ch := s.w.Register(r.Id)
s.Node.Propose(ctx, data)
select {
case x := <-ch:
resp := x.(Response)
return resp, resp.err
case <-ctx.Done():
s.w.Trigger(r.Id, nil) // GC wait
return Response{}, ctx.Err()
case <-s.done:
return Response{}, ErrStopped
}
case "GET":
switch {
case r.Wait:
wc, err := s.Store.Watch(r.Path, r.Recursive, false, r.Since)
if err != nil {
return Response{}, err
}
return Response{Watcher: wc}, nil
default:
ev, err := s.Store.Get(r.Path, r.Recursive, r.Sorted)
if err != nil {
return Response{}, err
}
return Response{Event: ev}, nil
}
default:
return Response{}, ErrUnknownMethod
}
}
// apply interprets r as a call to store.X and returns an Response interpreted from store.Event
func (s *EtcdServer) apply(r pb.Request) Response {
f := func(ev *store.Event, err error) Response {
return Response{Event: ev, err: err}
}
expr := time.Unix(0, r.Expiration)
switch r.Method {
case "POST":
return f(s.Store.Create(r.Path, r.Dir, r.Val, true, expr))
case "PUT":
exists, existsSet := getBool(r.PrevExists)
switch {
case existsSet:
if exists {
return f(s.Store.Update(r.Path, r.Val, expr))
} else {
return f(s.Store.Create(r.Path, r.Dir, r.Val, false, expr))
}
case r.PrevIndex > 0 || r.PrevValue != "":
return f(s.Store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
default:
return f(s.Store.Set(r.Path, r.Dir, r.Val, expr))
}
case "DELETE":
switch {
case r.PrevIndex > 0 || r.PrevValue != "":
return f(s.Store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
default:
return f(s.Store.Delete(r.Path, r.Recursive, r.Dir))
}
case "QGET":
return f(s.Store.Get(r.Path, r.Recursive, r.Sorted))
default:
// This should never be reached, but just in case:
return Response{err: ErrUnknownMethod}
}
}
func getBool(v *bool) (vv bool, set bool) {
if v == nil {
return false, false
}
return *v, true
}