etcd: pass v2 kv api tests

This commit is contained in:
Xiang Li 2014-07-06 10:19:23 -07:00 committed by Yicheng Qin
parent 2af0ad505a
commit fc35324ba7
17 changed files with 2352 additions and 485 deletions

View File

@ -1,425 +1,206 @@
/*
Copyright 2013 CoreOS Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"path"
"time"
goetcd "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
golog "github.com/coreos/etcd/third_party/github.com/coreos/go-log/log"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
httpclient "github.com/coreos/etcd/third_party/github.com/mreiferson/go-httpclient"
"github.com/coreos/etcd/config"
ehttp "github.com/coreos/etcd/http"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/metrics"
"github.com/coreos/etcd/server"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/store"
)
// TODO(yichengq): constant extraTimeout is a hack.
// Current problem is that there is big lag between join command
// execution and join success.
// Fix it later. It should be removed when proper method is found and
// enough tests are provided. It is expected to be calculated from
// heartbeatInterval and electionTimeout only.
const extraTimeout = time.Duration(1000) * time.Millisecond
const (
defaultHeartbeat = 1
defaultElection = 5
type Etcd struct {
Config *config.Config // etcd config
defaultTickDuration = time.Millisecond * 100
Store store.Store // data store
Registry *server.Registry // stores URL information for nodes
Server *server.Server // http server, runs on 4001 by default
PeerServer *server.PeerServer // peer server, runs on 7001 by default
StandbyServer *server.StandbyServer
nodePrefix = "/cfg/nodes"
raftPrefix = "/raft"
v2Prefix = "/v2/keys"
)
server *http.Server
peerServer *http.Server
type Server struct {
id int
pubAddr string
nodes map[string]bool
tickDuration time.Duration
mode Mode
modeMutex sync.Mutex
closeChan chan bool
readyNotify chan bool // To signal when server is ready to accept connections
onceReady sync.Once
stopNotify chan bool // To signal when server is stopped totally
proposal chan v2Proposal
node *v2Raft
t *transporter
store.Store
stop chan struct{}
http.Handler
}
// New returns a new Etcd instance.
func New(c *config.Config) *Etcd {
if c == nil {
c = config.New()
func New(id int, pubAddr string, nodes []string) *Server {
s := &Server{
id: id,
pubAddr: pubAddr,
nodes: make(map[string]bool),
tickDuration: defaultTickDuration,
proposal: make(chan v2Proposal),
node: &v2Raft{
Node: raft.New(id, defaultHeartbeat, defaultElection),
result: make(map[wait]chan interface{}),
},
t: newTransporter(),
Store: store.New(),
stop: make(chan struct{}),
}
return &Etcd{
Config: c,
closeChan: make(chan bool),
readyNotify: make(chan bool),
stopNotify: make(chan bool),
for _, seed := range nodes {
s.nodes[seed] = true
}
m := http.NewServeMux()
//m.Handle("/HEAD", handlerErr(s.serveHead))
m.Handle("/", handlerErr(s.serveValue))
m.Handle("/raft", s.t)
s.Handler = m
return s
}
func (s *Server) SetTick(d time.Duration) {
s.tickDuration = d
}
func (s *Server) Stop() {
close(s.stop)
s.t.stop()
}
func (s *Server) Bootstrap() {
s.node.Campaign()
s.node.Add(s.id, s.pubAddr)
s.apply(s.node.Next())
s.run()
}
func (s *Server) Join() {
d, err := json.Marshal(&raft.Config{s.id, s.pubAddr})
if err != nil {
panic(err)
}
b, err := json.Marshal(&raft.Message{From: s.id, Type: 2, Entries: []raft.Entry{{Type: 1, Data: d}}})
if err != nil {
panic(err)
}
for seed := range s.nodes {
if err := s.t.send(seed+raftPrefix, b); err != nil {
log.Println(err)
continue
}
// todo(xiangli) WAIT for join to be committed or retry...
break
}
s.run()
}
func (s *Server) run() {
node := s.node
recv := s.t.recv
ticker := time.NewTicker(s.tickDuration)
v2SyncTicker := time.NewTicker(time.Millisecond * 500)
var proposal chan v2Proposal
for {
if node.HasLeader() {
proposal = s.proposal
} else {
proposal = nil
}
select {
case p := <-proposal:
node.Propose(p)
case msg := <-recv:
node.Step(*msg)
case <-ticker.C:
node.Tick()
case <-v2SyncTicker.C:
node.Sync()
case <-s.stop:
log.Printf("Node: %d stopped\n", s.id)
return
}
s.apply(node.Next())
s.send(node.Msgs())
}
}
// Run the etcd instance.
func (e *Etcd) Run() {
// Sanitize all the input fields.
if err := e.Config.Sanitize(); err != nil {
log.Fatalf("failed sanitizing configuration: %v", err)
}
// Force remove server configuration if specified.
if e.Config.Force {
e.Config.Reset()
}
// Enable options.
if e.Config.VeryVeryVerbose {
log.Verbose = true
raft.SetLogLevel(raft.Trace)
goetcd.SetLogger(
golog.New(
"go-etcd",
false,
golog.CombinedSink(
os.Stdout,
"[%s] %s %-9s | %s\n",
[]string{"prefix", "time", "priority", "message"},
),
),
)
} else if e.Config.VeryVerbose {
log.Verbose = true
raft.SetLogLevel(raft.Debug)
} else if e.Config.Verbose {
log.Verbose = true
}
if e.Config.CPUProfileFile != "" {
profile(e.Config.CPUProfileFile)
}
if e.Config.DataDir == "" {
log.Fatal("The data dir was not set and could not be guessed from machine name")
}
// Create data directory if it doesn't already exist.
if err := os.MkdirAll(e.Config.DataDir, 0744); err != nil {
log.Fatalf("Unable to create path: %s", err)
}
// Warn people if they have an info file
info := filepath.Join(e.Config.DataDir, "info")
if _, err := os.Stat(info); err == nil {
log.Warnf("All cached configuration is now ignored. The file %s can be removed.", info)
}
var mbName string
if e.Config.Trace() {
mbName = e.Config.MetricsBucketName()
runtime.SetBlockProfileRate(1)
}
mb := metrics.NewBucket(mbName)
if e.Config.GraphiteHost != "" {
err := mb.Publish(e.Config.GraphiteHost)
if err != nil {
panic(err)
func (s *Server) apply(ents []raft.Entry) {
offset := s.node.Applied() - len(ents) + 1
for i, ent := range ents {
switch ent.Type {
// expose raft entry type
case raft.Normal:
if len(ent.Data) == 0 {
continue
}
s.v2apply(offset+i, ent)
case raft.AddNode:
cfg := new(raft.Config)
if err := json.Unmarshal(ent.Data, cfg); err != nil {
log.Println(err)
break
}
if err := s.t.set(cfg.NodeId, cfg.Addr); err != nil {
log.Println(err)
break
}
s.nodes[cfg.Addr] = true
p := path.Join(nodePrefix, fmt.Sprint(cfg.NodeId))
s.Store.Set(p, false, cfg.Addr, store.Permanent)
default:
panic("unimplemented")
}
}
}
// Retrieve CORS configuration
corsInfo, err := ehttp.NewCORSInfo(e.Config.CorsOrigins)
if err != nil {
log.Fatal("CORS:", err)
}
// Create etcd key-value store and registry.
e.Store = store.New()
e.Registry = server.NewRegistry(e.Store)
// Create stats objects
followersStats := server.NewRaftFollowersStats(e.Config.Name)
serverStats := server.NewRaftServerStats(e.Config.Name)
// Calculate all of our timeouts
heartbeatInterval := time.Duration(e.Config.Peer.HeartbeatInterval) * time.Millisecond
electionTimeout := time.Duration(e.Config.Peer.ElectionTimeout) * time.Millisecond
dialTimeout := (3 * heartbeatInterval) + electionTimeout
responseHeaderTimeout := (3 * heartbeatInterval) + electionTimeout
clientTransporter := &httpclient.Transport{
ResponseHeaderTimeout: responseHeaderTimeout + extraTimeout,
// This is a workaround for Transport.CancelRequest doesn't work on
// HTTPS connections blocked. The patch for it is in progress,
// and would be available in Go1.3
// More: https://codereview.appspot.com/69280043/
ConnectTimeout: dialTimeout + extraTimeout,
RequestTimeout: responseHeaderTimeout + dialTimeout + 2*extraTimeout,
}
if e.Config.PeerTLSInfo().Scheme() == "https" {
clientTLSConfig, err := e.Config.PeerTLSInfo().ClientConfig()
if err != nil {
log.Fatal("client TLS error: ", err)
}
clientTransporter.TLSClientConfig = clientTLSConfig
clientTransporter.DisableCompression = true
}
client := server.NewClient(clientTransporter)
// Create peer server
psConfig := server.PeerServerConfig{
Name: e.Config.Name,
Scheme: e.Config.PeerTLSInfo().Scheme(),
URL: e.Config.Peer.Addr,
SnapshotCount: e.Config.SnapshotCount,
RetryTimes: e.Config.MaxRetryAttempts,
RetryInterval: e.Config.RetryInterval,
}
e.PeerServer = server.NewPeerServer(psConfig, client, e.Registry, e.Store, &mb, followersStats, serverStats)
// Create raft transporter and server
raftTransporter := server.NewTransporter(followersStats, serverStats, e.Registry, heartbeatInterval, dialTimeout, responseHeaderTimeout)
if e.Config.PeerTLSInfo().Scheme() == "https" {
raftClientTLSConfig, err := e.Config.PeerTLSInfo().ClientConfig()
if err != nil {
log.Fatal("raft client TLS error: ", err)
}
raftTransporter.SetTLSConfig(*raftClientTLSConfig)
}
raftServer, err := raft.NewServer(e.Config.Name, e.Config.DataDir, raftTransporter, e.Store, e.PeerServer, "")
if err != nil {
log.Fatal(err)
}
raftServer.SetElectionTimeout(electionTimeout)
raftServer.SetHeartbeatInterval(heartbeatInterval)
e.PeerServer.SetRaftServer(raftServer, e.Config.Snapshot)
// Create etcd server
e.Server = server.New(e.Config.Name, e.Config.Addr, e.PeerServer, e.Registry, e.Store, &mb)
if e.Config.Trace() {
e.Server.EnableTracing()
}
e.PeerServer.SetServer(e.Server)
// Create standby server
ssConfig := server.StandbyServerConfig{
Name: e.Config.Name,
PeerScheme: e.Config.PeerTLSInfo().Scheme(),
PeerURL: e.Config.Peer.Addr,
ClientURL: e.Config.Addr,
DataDir: e.Config.DataDir,
}
e.StandbyServer = server.NewStandbyServer(ssConfig, client)
e.StandbyServer.SetRaftServer(raftServer)
// Generating config could be slow.
// Put it here to make listen happen immediately after peer-server starting.
peerTLSConfig := server.TLSServerConfig(e.Config.PeerTLSInfo())
etcdTLSConfig := server.TLSServerConfig(e.Config.EtcdTLSInfo())
if !e.StandbyServer.IsRunning() {
startPeerServer, possiblePeers, err := e.PeerServer.FindCluster(e.Config.Discovery, e.Config.Peers)
func (s *Server) send(msgs []raft.Message) {
for i := range msgs {
data, err := json.Marshal(msgs[i])
if err != nil {
// todo(xiangli): error handling
log.Fatal(err)
}
if startPeerServer {
e.setMode(PeerMode)
} else {
e.StandbyServer.SyncCluster(possiblePeers)
e.setMode(StandbyMode)
}
} else {
e.setMode(StandbyMode)
}
serverHTTPHandler := &ehttp.CORSHandler{e.Server.HTTPHandler(), corsInfo}
peerServerHTTPHandler := &ehttp.CORSHandler{e.PeerServer.HTTPHandler(), corsInfo}
standbyServerHTTPHandler := &ehttp.CORSHandler{e.StandbyServer.ClientHTTPHandler(), corsInfo}
log.Infof("etcd server [name %s, listen on %s, advertised url %s]", e.Server.Name, e.Config.BindAddr, e.Server.URL())
listener := server.NewListener(e.Config.EtcdTLSInfo().Scheme(), e.Config.BindAddr, etcdTLSConfig)
e.server = &http.Server{Handler: &ModeHandler{e, serverHTTPHandler, standbyServerHTTPHandler},
ReadTimeout: time.Duration(e.Config.HTTPReadTimeout) * time.Second,
WriteTimeout: time.Duration(e.Config.HTTPWriteTimeout) * time.Second,
}
log.Infof("peer server [name %s, listen on %s, advertised url %s]", e.PeerServer.Config.Name, e.Config.Peer.BindAddr, e.PeerServer.Config.URL)
peerListener := server.NewListener(e.Config.PeerTLSInfo().Scheme(), e.Config.Peer.BindAddr, peerTLSConfig)
e.peerServer = &http.Server{Handler: &ModeHandler{e, peerServerHTTPHandler, http.NotFoundHandler()},
ReadTimeout: time.Duration(server.DefaultReadTimeout) * time.Second,
WriteTimeout: time.Duration(server.DefaultWriteTimeout) * time.Second,
}
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
<-e.readyNotify
defer wg.Done()
if err := e.server.Serve(listener); err != nil {
if !isListenerClosing(err) {
log.Fatal(err)
// todo(xiangli): reuse routines and limit the number of sending routines
// sync.Pool?
go func(i int) {
var err error
if err = s.t.sendTo(msgs[i].To, data); err == nil {
return
}
}
}()
go func() {
<-e.readyNotify
defer wg.Done()
if err := e.peerServer.Serve(peerListener); err != nil {
if !isListenerClosing(err) {
log.Fatal(err)
if err == errUnknownNode {
err = s.fetchAddr(msgs[i].To)
}
if err == nil {
err = s.t.sendTo(msgs[i].To, data)
}
}
}()
e.runServer()
listener.Close()
peerListener.Close()
wg.Wait()
log.Infof("etcd instance is stopped [name %s]", e.Config.Name)
close(e.stopNotify)
}
func (e *Etcd) runServer() {
var removeNotify <-chan bool
for {
if e.mode == PeerMode {
log.Infof("%v starting in peer mode", e.Config.Name)
// Starting peer server should be followed close by listening on its port
// If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster.
// One severe problem caused if failing receiving heartbeats is when the second node joins one-node cluster,
// the cluster could be out of work as long as the two nodes cannot transfer messages.
e.PeerServer.Start(e.Config.Snapshot, e.Config.ClusterConfig())
removeNotify = e.PeerServer.RemoveNotify()
} else {
log.Infof("%v starting in standby mode", e.Config.Name)
e.StandbyServer.Start()
removeNotify = e.StandbyServer.RemoveNotify()
}
// etcd server is ready to accept connections, notify waiters.
e.onceReady.Do(func() { close(e.readyNotify) })
select {
case <-e.closeChan:
e.PeerServer.Stop()
e.StandbyServer.Stop()
return
case <-removeNotify:
}
if e.mode == PeerMode {
peerURLs := e.Registry.PeerURLs(e.PeerServer.RaftServer().Leader(), e.Config.Name)
e.StandbyServer.SyncCluster(peerURLs)
e.setMode(StandbyMode)
} else {
// Create etcd key-value store and registry.
e.Store = store.New()
e.Registry = server.NewRegistry(e.Store)
e.PeerServer.SetStore(e.Store)
e.PeerServer.SetRegistry(e.Registry)
e.Server.SetStore(e.Store)
e.Server.SetRegistry(e.Registry)
// Generate new peer server here.
// TODO(yichengq): raft server cannot be started after stopped.
// It should be removed when raft restart is implemented.
heartbeatInterval := time.Duration(e.Config.Peer.HeartbeatInterval) * time.Millisecond
electionTimeout := time.Duration(e.Config.Peer.ElectionTimeout) * time.Millisecond
raftServer, err := raft.NewServer(e.Config.Name, e.Config.DataDir, e.PeerServer.RaftServer().Transporter(), e.Store, e.PeerServer, "")
if err != nil {
log.Fatal(err)
log.Println(err)
}
raftServer.SetElectionTimeout(electionTimeout)
raftServer.SetHeartbeatInterval(heartbeatInterval)
e.PeerServer.SetRaftServer(raftServer, e.Config.Snapshot)
e.StandbyServer.SetRaftServer(raftServer)
}(i)
}
}
e.PeerServer.SetJoinIndex(e.StandbyServer.JoinIndex())
e.setMode(PeerMode)
func (s *Server) fetchAddr(nodeId int) error {
for seed := range s.nodes {
if err := s.t.fetchAddr(seed, nodeId); err == nil {
return nil
}
}
return fmt.Errorf("cannot fetch the address of node %d", nodeId)
}
// Stop the etcd instance.
func (e *Etcd) Stop() {
close(e.closeChan)
<-e.stopNotify
}
// ReadyNotify returns a channel that is going to be closed
// when the etcd instance is ready to accept connections.
func (e *Etcd) ReadyNotify() <-chan bool {
return e.readyNotify
}
func (e *Etcd) Mode() Mode {
e.modeMutex.Lock()
defer e.modeMutex.Unlock()
return e.mode
}
func (e *Etcd) setMode(m Mode) {
e.modeMutex.Lock()
defer e.modeMutex.Unlock()
e.mode = m
}
func isListenerClosing(err error) bool {
// An error string equivalent to net.errClosing for using with
// http.Serve() during server shutdown. Need to re-declare
// here because it is not exported by "net" package.
const errClosing = "use of closed network connection"
return strings.Contains(err.Error(), errClosing)
}
type ModeGetter interface {
Mode() Mode
}
type ModeHandler struct {
ModeGetter
PeerModeHandler http.Handler
StandbyModeHandler http.Handler
}
func (h *ModeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch h.Mode() {
case PeerMode:
h.PeerModeHandler.ServeHTTP(w, r)
case StandbyMode:
h.StandbyModeHandler.ServeHTTP(w, r)
}
}
type Mode int
const (
PeerMode Mode = iota
StandbyMode
)

View File

@ -1,41 +1,70 @@
/*
Copyright 2013 CoreOS Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"io/ioutil"
"os"
"fmt"
"net/http/httptest"
"testing"
"github.com/coreos/etcd/config"
"time"
)
func TestRunStop(t *testing.T) {
path, _ := ioutil.TempDir("", "etcd-")
defer os.RemoveAll(path)
func TestMultipleNodes(t *testing.T) {
tests := []int{1, 3, 5, 9, 11}
config := config.New()
config.Name = "ETCDTEST"
config.DataDir = path
config.Addr = "localhost:0"
config.Peer.Addr = "localhost:0"
etcd := New(config)
go etcd.Run()
<-etcd.ReadyNotify()
etcd.Stop()
for _, tt := range tests {
es, hs := buildCluster(tt)
waitCluster(t, es)
for i := range es {
es[len(es)-i-1].Stop()
}
for i := range hs {
hs[len(hs)-i-1].Close()
}
}
afterTest(t)
}
func buildCluster(number int) ([]*Server, []*httptest.Server) {
bootstrapper := 0
es := make([]*Server, number)
hs := make([]*httptest.Server, number)
var seed string
for i := range es {
es[i] = New(i, "", []string{seed})
es[i].SetTick(time.Millisecond * 5)
hs[i] = httptest.NewServer(es[i])
es[i].pubAddr = hs[i].URL
if i == bootstrapper {
seed = hs[i].URL
go es[i].Bootstrap()
} else {
// wait for the previous configuration change to be committed
// or this configuration request might be dropped
w, err := es[0].Watch(nodePrefix, true, false, uint64(i))
if err != nil {
panic(err)
}
<-w.EventChan
go es[i].Join()
}
}
return es, hs
}
func waitCluster(t *testing.T, es []*Server) {
n := len(es)
for i, e := range es {
for k := 1; k < n+1; k++ {
w, err := e.Watch(nodePrefix, true, false, uint64(k))
if err != nil {
panic(err)
}
v := <-w.EventChan
ww := fmt.Sprintf("%s/%d", nodePrefix, k-1)
if v.Node.Key != ww {
t.Errorf("#%d path = %v, want %v", i, v.Node.Key, w)
}
}
}
}

View File

@ -1,27 +0,0 @@
package etcd
import (
"os"
"os/signal"
"runtime/pprof"
"github.com/coreos/etcd/log"
)
// profile starts CPU profiling.
func profile(path string) {
f, err := os.Create(path)
if err != nil {
log.Fatal(err)
}
pprof.StartCPUProfile(f)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
sig := <-c
log.Infof("captured %v, stopping profiler and exiting..", sig)
pprof.StopCPUProfile()
os.Exit(1)
}()
}

142
etcd/transporter.go Normal file
View File

@ -0,0 +1,142 @@
package etcd
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"path"
"sync"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/store"
)
var (
errUnknownNode = errors.New("unknown node")
)
type transporter struct {
mu sync.RWMutex
stopped bool
urls map[int]string
recv chan *raft.Message
client *http.Client
wg sync.WaitGroup
}
func newTransporter() *transporter {
tr := new(http.Transport)
c := &http.Client{Transport: tr}
return &transporter{
urls: make(map[int]string),
recv: make(chan *raft.Message, 512),
client: c,
}
}
func (t *transporter) stop() {
t.mu.Lock()
t.stopped = true
t.mu.Unlock()
t.wg.Wait()
tr := t.client.Transport.(*http.Transport)
tr.CloseIdleConnections()
}
func (t *transporter) set(nodeId int, rawurl string) error {
u, err := url.Parse(rawurl)
if err != nil {
return err
}
u.Path = raftPrefix
t.mu.Lock()
t.urls[nodeId] = u.String()
t.mu.Unlock()
return nil
}
func (t *transporter) sendTo(nodeId int, data []byte) error {
t.mu.RLock()
url := t.urls[nodeId]
t.mu.RUnlock()
if len(url) == 0 {
return errUnknownNode
}
return t.send(url, data)
}
func (t *transporter) send(addr string, data []byte) error {
t.mu.RLock()
if t.stopped {
t.mu.RUnlock()
return fmt.Errorf("transporter stopped")
}
t.mu.RUnlock()
buf := bytes.NewBuffer(data)
t.wg.Add(1)
defer t.wg.Done()
resp, err := t.client.Post(addr, "application/octet-stream", buf)
if err != nil {
return err
}
resp.Body.Close()
return nil
}
func (t *transporter) fetchAddr(seedurl string, id int) error {
u, err := url.Parse(seedurl)
if err != nil {
return fmt.Errorf("cannot parse the url of the given seed")
}
u.Path = path.Join(v2Prefix, nodePrefix, fmt.Sprint(id))
resp, err := t.client.Get(u.String())
if err != nil {
return fmt.Errorf("cannot reach %v", u)
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("cannot reach %v", u)
}
event := new(store.Event)
err = json.Unmarshal(b, event)
if err != nil {
panic(fmt.Sprintf("fetchAddr: ", err))
}
if err := t.set(id, *event.Node.Value); err != nil {
return fmt.Errorf("cannot parse the url of node %d: %v", id, err)
}
return nil
}
func (t *transporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
msg := new(raft.Message)
if err := json.NewDecoder(r.Body).Decode(msg); err != nil {
log.Println(err)
return
}
select {
case t.recv <- msg:
default:
log.Println("drop")
// drop the incoming package at network layer if the upper layer
// cannot consume them in time.
// TODO(xiangli): not return 200.
}
return
}

63
etcd/v2_apply.go Normal file
View File

@ -0,0 +1,63 @@
package etcd
import (
"encoding/json"
"fmt"
"log"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/store"
)
func (s *Server) v2apply(index int, ent raft.Entry) {
var ret interface{}
var e *store.Event
var err error
cmd := new(cmd)
if err := json.Unmarshal(ent.Data, cmd); err != nil {
log.Println("v2apply.decode:", err)
return
}
switch cmd.Type {
case "set":
e, err = s.Store.Set(cmd.Key, cmd.Dir, cmd.Value, cmd.Time)
case "update":
e, err = s.Store.Update(cmd.Key, cmd.Value, cmd.Time)
case "create", "unique":
e, err = s.Store.Create(cmd.Key, cmd.Dir, cmd.Value, cmd.Unique, cmd.Time)
case "delete":
e, err = s.Store.Delete(cmd.Key, cmd.Dir, cmd.Recursive)
case "cad":
e, err = s.Store.CompareAndDelete(cmd.Key, cmd.PrevValue, cmd.PrevIndex)
case "cas":
e, err = s.Store.CompareAndSwap(cmd.Key, cmd.PrevValue, cmd.PrevIndex, cmd.Value, cmd.Time)
case "sync":
s.Store.DeleteExpiredKeys(cmd.Time)
return
default:
log.Println("unexpected command type:", cmd.Type)
}
if ent.Term > s.node.term {
s.node.term = ent.Term
for k, v := range s.node.result {
if k.term < s.node.term {
v <- fmt.Errorf("proposal lost due to leader election")
delete(s.node.result, k)
}
}
}
if s.node.result[wait{index, ent.Term}] == nil {
return
}
if err != nil {
ret = err
} else {
ret = e
}
s.node.result[wait{index, ent.Term}] <- ret
}

84
etcd/v2_http.go Normal file
View File

@ -0,0 +1,84 @@
package etcd
import (
"fmt"
"log"
"net/http"
"net/url"
"strings"
etcdErr "github.com/coreos/etcd/error"
)
func (s *Server) serveValue(w http.ResponseWriter, r *http.Request) error {
switch r.Method {
case "GET":
return s.GetHandler(w, r)
case "HEAD":
w = &HEADResponseWriter{w}
return s.GetHandler(w, r)
case "PUT":
return s.PutHandler(w, r)
case "POST":
return s.PostHandler(w, r)
case "DELETE":
return s.DeleteHandler(w, r)
}
return allow(w, "GET", "PUT", "POST", "DELETE", "HEAD")
}
type handlerErr func(w http.ResponseWriter, r *http.Request) error
func (eh handlerErr) ServeHTTP(w http.ResponseWriter, r *http.Request) {
err := eh(w, r)
if err == nil {
return
}
if r.Method == "HEAD" {
w = &HEADResponseWriter{w}
}
if etcdErr, ok := err.(*etcdErr.Error); ok {
w.Header().Set("Content-Type", "application/json")
etcdErr.Write(w)
return
}
log.Println("http error", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
}
func allow(w http.ResponseWriter, m ...string) error {
w.Header().Set("Allow", strings.Join(m, ","))
return nil
}
type HEADResponseWriter struct {
http.ResponseWriter
}
func (w *HEADResponseWriter) Write([]byte) (int, error) {
return 0, nil
}
func (s *Server) redirect(w http.ResponseWriter, r *http.Request, id int) error {
baseURL := s.t.urls[id]
if len(baseURL) == 0 {
log.Println("redirect cannot find node", id)
return fmt.Errorf("redirect cannot find node %d", id)
}
originalURL := r.URL
redirectURL, err := url.Parse(baseURL)
if err != nil {
log.Println("redirect cannot parse url:", err)
return fmt.Errorf("redirect cannot parse url: %v", err)
}
redirectURL.Path = originalURL.Path
redirectURL.RawQuery = originalURL.RawQuery
redirectURL.Fragment = originalURL.Fragment
http.Redirect(w, r, redirectURL.String(), http.StatusTemporaryRedirect)
return nil
}

69
etcd/v2_http_delete.go Normal file
View File

@ -0,0 +1,69 @@
package etcd
import (
"log"
"net/http"
"strconv"
etcdErr "github.com/coreos/etcd/error"
)
func (s *Server) DeleteHandler(w http.ResponseWriter, req *http.Request) error {
if !s.node.IsLeader() {
return s.redirect(w, req, s.node.Leader())
}
key := req.URL.Path[len("/v2/keys"):]
recursive := (req.FormValue("recursive") == "true")
dir := (req.FormValue("dir") == "true")
req.ParseForm()
_, valueOk := req.Form["prevValue"]
_, indexOk := req.Form["prevIndex"]
if !valueOk && !indexOk {
return s.serveDelete(w, req, key, dir, recursive)
}
var err error
prevIndex := uint64(0)
prevValue := req.Form.Get("prevValue")
if indexOk {
prevIndexStr := req.Form.Get("prevIndex")
prevIndex, err = strconv.ParseUint(prevIndexStr, 10, 64)
// bad previous index
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndDelete", s.Store.Index())
}
}
if valueOk {
if prevValue == "" {
return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndDelete", s.Store.Index())
}
}
return s.serveCAD(w, req, key, prevValue, prevIndex)
}
func (s *Server) serveDelete(w http.ResponseWriter, req *http.Request, key string, dir, recursive bool) error {
ret, err := s.Delete(key, dir, recursive)
if err == nil {
s.handleRet(w, ret)
return nil
}
log.Println("delete:", err)
return err
}
func (s *Server) serveCAD(w http.ResponseWriter, req *http.Request, key string, prevValue string, prevIndex uint64) error {
ret, err := s.CAD(key, prevValue, prevIndex)
if err == nil {
s.handleRet(w, ret)
return nil
}
log.Println("cad:", err)
return err
}

111
etcd/v2_http_get.go Normal file
View File

@ -0,0 +1,111 @@
package etcd
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
etcdErr "github.com/coreos/etcd/error"
)
func (s *Server) GetHandler(w http.ResponseWriter, req *http.Request) error {
key := req.URL.Path[len("/v2/keys"):]
// TODO(xiangli): handle consistent get
recursive := (req.FormValue("recursive") == "true")
sort := (req.FormValue("sorted") == "true")
waitIndex := req.FormValue("waitIndex")
stream := (req.FormValue("stream") == "true")
if req.FormValue("wait") == "true" {
return s.handleWatch(key, recursive, stream, waitIndex, w, req)
}
return s.handleGet(key, recursive, sort, w, req)
}
func (s *Server) handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, req *http.Request) error {
// Create a command to watch from a given index (default 0).
var sinceIndex uint64 = 0
var err error
if waitIndex != "" {
sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64)
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store.Index())
}
}
watcher, err := s.Store.Watch(key, recursive, stream, sinceIndex)
if err != nil {
return err
}
cn, _ := w.(http.CloseNotifier)
closeChan := cn.CloseNotify()
s.writeHeaders(w)
if stream {
// watcher hub will not help to remove stream watcher
// so we need to remove here
defer watcher.Remove()
for {
select {
case <-closeChan:
return nil
case event, ok := <-watcher.EventChan:
if !ok {
// If the channel is closed this may be an indication of
// that notifications are much more than we are able to
// send to the client in time. Then we simply end streaming.
return nil
}
if req.Method == "HEAD" {
continue
}
b, _ := json.Marshal(event)
_, err := w.Write(b)
if err != nil {
return nil
}
w.(http.Flusher).Flush()
}
}
}
select {
case <-closeChan:
watcher.Remove()
case event := <-watcher.EventChan:
if req.Method == "HEAD" {
return nil
}
b, _ := json.Marshal(event)
w.Write(b)
}
return nil
}
func (s *Server) handleGet(key string, recursive, sort bool, w http.ResponseWriter, req *http.Request) error {
event, err := s.Store.Get(key, recursive, sort)
if err != nil {
return err
}
s.writeHeaders(w)
if req.Method == "HEAD" {
return nil
}
b, err := json.Marshal(event)
if err != nil {
panic(fmt.Sprintf("handleGet: ", err))
}
w.Write(b)
return nil
}
func (s *Server) writeHeaders(w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json")
w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store.Index()))
// TODO(xiangli): raft-index and term
w.WriteHeader(http.StatusOK)
}

32
etcd/v2_http_post.go Normal file
View File

@ -0,0 +1,32 @@
package etcd
import (
"log"
"net/http"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
)
func (s *Server) PostHandler(w http.ResponseWriter, req *http.Request) error {
if !s.node.IsLeader() {
return s.redirect(w, req, s.node.Leader())
}
key := req.URL.Path[len("/v2/keys"):]
value := req.FormValue("value")
dir := (req.FormValue("dir") == "true")
expireTime, err := store.TTL(req.FormValue("ttl"))
if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", s.Store.Index())
}
ret, err := s.Create(key, dir, value, expireTime, true)
if err == nil {
s.handleRet(w, ret)
return nil
}
log.Println("unique:", err)
return err
}

146
etcd/v2_http_put.go Normal file
View File

@ -0,0 +1,146 @@
package etcd
import (
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"strconv"
"time"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/store"
)
func (s *Server) PutHandler(w http.ResponseWriter, req *http.Request) error {
if !s.node.IsLeader() {
return s.redirect(w, req, s.node.Leader())
}
key := req.URL.Path[len("/v2/keys"):]
req.ParseForm()
value := req.Form.Get("value")
dir := (req.FormValue("dir") == "true")
expireTime, err := store.TTL(req.Form.Get("ttl"))
if err != nil {
return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", s.Store.Index())
}
prevValue, valueOk := firstValue(req.Form, "prevValue")
prevIndexStr, indexOk := firstValue(req.Form, "prevIndex")
prevExist, existOk := firstValue(req.Form, "prevExist")
// Set handler: create a new node or replace the old one.
if !valueOk && !indexOk && !existOk {
return s.serveSet(w, req, key, dir, value, expireTime)
}
// update with test
if existOk {
if prevExist == "false" {
// Create command: create a new node. Fail, if a node already exists
// Ignore prevIndex and prevValue
return s.serveCreate(w, req, key, dir, value, expireTime)
}
if prevExist == "true" && !indexOk && !valueOk {
return s.serveUpdate(w, req, key, value, expireTime)
}
}
var prevIndex uint64
if indexOk {
prevIndex, err = strconv.ParseUint(prevIndexStr, 10, 64)
// bad previous index
if err != nil {
return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndSwap", s.Store.Index())
}
} else {
prevIndex = 0
}
if valueOk {
if prevValue == "" {
return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndSwap", s.Store.Index())
}
}
return s.serveCAS(w, req, key, value, prevValue, prevIndex, expireTime)
}
func (s *Server) handleRet(w http.ResponseWriter, ret *store.Event) {
b, _ := json.Marshal(ret)
w.Header().Set("Content-Type", "application/json")
// etcd index should be the same as the event index
// which is also the last modified index of the node
w.Header().Add("X-Etcd-Index", fmt.Sprint(ret.Index()))
// w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
// w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
if ret.IsCreated() {
w.WriteHeader(http.StatusCreated)
} else {
w.WriteHeader(http.StatusOK)
}
w.Write(b)
}
func (s *Server) serveSet(w http.ResponseWriter, req *http.Request, key string, dir bool, value string, expireTime time.Time) error {
ret, err := s.Set(key, dir, value, expireTime)
if err == nil {
s.handleRet(w, ret)
return nil
}
log.Println("set:", err)
return err
}
func (s *Server) serveCreate(w http.ResponseWriter, req *http.Request, key string, dir bool, value string, expireTime time.Time) error {
ret, err := s.Create(key, dir, value, expireTime, false)
if err == nil {
s.handleRet(w, ret)
return nil
}
log.Println("create:", err)
return err
}
func (s *Server) serveUpdate(w http.ResponseWriter, req *http.Request, key, value string, expireTime time.Time) error {
// Update should give at least one option
if value == "" && expireTime.Sub(store.Permanent) == 0 {
return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", s.Store.Index())
}
ret, err := s.Update(key, value, expireTime)
if err == nil {
s.handleRet(w, ret)
return nil
}
log.Println("update:", err)
return err
}
func (s *Server) serveCAS(w http.ResponseWriter, req *http.Request, key, value, prevValue string, prevIndex uint64, expireTime time.Time) error {
ret, err := s.CAS(key, value, prevValue, prevIndex, expireTime)
if err == nil {
s.handleRet(w, ret)
return nil
}
log.Println("update:", err)
return err
}
func firstValue(f url.Values, key string) (string, bool) {
l, ok := f[key]
if !ok {
return "", false
}
return l[0], true
}

1117
etcd/v2_http_test.go Normal file

File diff suppressed because it is too large Load Diff

46
etcd/v2_raft.go Normal file
View File

@ -0,0 +1,46 @@
package etcd
import (
"encoding/json"
"fmt"
"time"
"github.com/coreos/etcd/raft"
)
type v2Proposal struct {
data []byte
ret chan interface{}
}
type wait struct {
index int
term int
}
type v2Raft struct {
*raft.Node
result map[wait]chan interface{}
term int
}
func (r *v2Raft) Propose(p v2Proposal) error {
if !r.Node.IsLeader() {
return fmt.Errorf("not leader")
}
r.Node.Propose(p.data)
r.result[wait{r.Index(), r.Term()}] = p.ret
return nil
}
func (r *v2Raft) Sync() {
if !r.Node.IsLeader() {
return
}
sync := &cmd{Type: "sync", Time: time.Now()}
data, err := json.Marshal(sync)
if err != nil {
panic(err)
}
r.Node.Propose(data)
}

78
etcd/v2_store.go Normal file
View File

@ -0,0 +1,78 @@
package etcd
import (
"encoding/json"
"fmt"
"time"
"github.com/coreos/etcd/store"
)
type cmd struct {
Type string
Key string
Value string
PrevValue string
PrevIndex uint64
Dir bool
Recursive bool
Unique bool
Time time.Time
}
func (s *Server) Set(key string, dir bool, value string, expireTime time.Time) (*store.Event, error) {
set := &cmd{Type: "set", Key: key, Dir: dir, Value: value, Time: expireTime}
return s.do(set)
}
func (s *Server) Create(key string, dir bool, value string, expireTime time.Time, unique bool) (*store.Event, error) {
create := &cmd{Type: "create", Key: key, Dir: dir, Value: value, Time: expireTime, Unique: unique}
return s.do(create)
}
func (s *Server) Update(key string, value string, expireTime time.Time) (*store.Event, error) {
update := &cmd{Type: "update", Key: key, Value: value, Time: expireTime}
return s.do(update)
}
func (s *Server) CAS(key, value, prevValue string, prevIndex uint64, expireTime time.Time) (*store.Event, error) {
cas := &cmd{Type: "cas", Key: key, Value: value, PrevValue: prevValue, PrevIndex: prevIndex, Time: expireTime}
return s.do(cas)
}
func (s *Server) Delete(key string, dir, recursive bool) (*store.Event, error) {
d := &cmd{Type: "delete", Key: key, Dir: dir, Recursive: recursive}
return s.do(d)
}
func (s *Server) CAD(key string, prevValue string, prevIndex uint64) (*store.Event, error) {
cad := &cmd{Type: "cad", Key: key, PrevValue: prevValue, PrevIndex: prevIndex}
return s.do(cad)
}
func (s *Server) do(c *cmd) (*store.Event, error) {
data, err := json.Marshal(c)
if err != nil {
panic(err)
}
p := v2Proposal{
data: data,
ret: make(chan interface{}, 1),
}
select {
case s.proposal <- p:
default:
return nil, fmt.Errorf("unable to send out the proposal")
}
switch t := (<-p.ret).(type) {
case *store.Event:
return t, nil
case error:
return nil, t
default:
panic("server.do: unexpected return type")
}
}

78
etcd/v2_util.go Normal file
View File

@ -0,0 +1,78 @@
package etcd
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
)
// Creates a new HTTP client with KeepAlive disabled.
func NewHTTPClient() *http.Client {
return &http.Client{Transport: &http.Transport{DisableKeepAlives: true}}
}
// Reads the body from the response and closes it.
func ReadBody(resp *http.Response) []byte {
if resp == nil {
return []byte{}
}
body, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()
return body
}
// Reads the body from the response and parses it as JSON.
func ReadBodyJSON(resp *http.Response) map[string]interface{} {
m := make(map[string]interface{})
b := ReadBody(resp)
if err := json.Unmarshal(b, &m); err != nil {
panic(fmt.Sprintf("HTTP body JSON parse error: %v: %s", err, string(b)))
}
return m
}
func Head(url string) (*http.Response, error) {
return send("HEAD", url, "application/json", nil)
}
func Get(url string) (*http.Response, error) {
return send("GET", url, "application/json", nil)
}
func Post(url string, bodyType string, body io.Reader) (*http.Response, error) {
return send("POST", url, bodyType, body)
}
func PostForm(url string, data url.Values) (*http.Response, error) {
return Post(url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode()))
}
func Put(url string, bodyType string, body io.Reader) (*http.Response, error) {
return send("PUT", url, bodyType, body)
}
func PutForm(url string, data url.Values) (*http.Response, error) {
return Put(url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode()))
}
func Delete(url string, bodyType string, body io.Reader) (*http.Response, error) {
return send("DELETE", url, bodyType, body)
}
func DeleteForm(url string, data url.Values) (*http.Response, error) {
return Delete(url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode()))
}
func send(method string, url string, bodyType string, body io.Reader) (*http.Response, error) {
c := NewHTTPClient()
req, err := http.NewRequest(method, url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", bodyType)
return c.Do(req)
}

94
etcd/z_last_test.go Normal file
View File

@ -0,0 +1,94 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package etcd
import (
"net/http"
"runtime"
"sort"
"strings"
"testing"
"time"
)
func interestingGoroutines() (gs []string) {
buf := make([]byte, 2<<20)
buf = buf[:runtime.Stack(buf, true)]
for _, g := range strings.Split(string(buf), "\n\n") {
sl := strings.SplitN(g, "\n", 2)
if len(sl) != 2 {
continue
}
stack := strings.TrimSpace(sl[1])
if stack == "" ||
strings.Contains(stack, "created by testing.RunTests") ||
strings.Contains(stack, "testing.Main(") ||
strings.Contains(stack, "runtime.goexit") ||
strings.Contains(stack, "created by runtime.gc") ||
strings.Contains(stack, "runtime.MHeap_Scavenger") {
continue
}
gs = append(gs, stack)
}
sort.Strings(gs)
return
}
// Verify the other tests didn't leave any goroutines running.
// This is in a file named z_last_test.go so it sorts at the end.
func TestGoroutinesRunning(t *testing.T) {
if testing.Short() {
t.Skip("not counting goroutines for leakage in -short mode")
}
gs := interestingGoroutines()
n := 0
stackCount := make(map[string]int)
for _, g := range gs {
stackCount[g]++
n++
}
t.Logf("num goroutines = %d", n)
if n > 0 {
t.Error("Too many goroutines.")
for stack, count := range stackCount {
t.Logf("%d instances of:\n%s", count, stack)
}
}
}
func afterTest(t *testing.T) {
http.DefaultTransport.(*http.Transport).CloseIdleConnections()
if testing.Short() {
return
}
var bad string
badSubstring := map[string]string{
").readLoop(": "a Transport",
").writeLoop(": "a Transport",
"created by net/http/httptest.(*Server).Start": "an httptest.Server",
"timeoutHandler": "a TimeoutHandler",
"net.(*netFD).connect(": "a timing out dial",
").noteClientGone(": "a closenotifier sender",
}
var stacks string
for i := 0; i < 4; i++ {
bad = ""
stacks = strings.Join(interestingGoroutines(), "\n\n")
for substr, what := range badSubstring {
if strings.Contains(stacks, substr) {
bad = what
}
}
if bad == "" {
return
}
// Bad stuff found, but goroutines might just still be
// shutting down, so give it some time.
time.Sleep(50 * time.Millisecond)
}
t.Errorf("Test appears to have leaked %s:\n%s", bad, stacks)
}

80
main.go
View File

@ -1,44 +1,58 @@
/*
Copyright 2013 CoreOS Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"fmt"
"os"
"flag"
"log"
"net/http"
"net/url"
"strings"
"github.com/coreos/etcd/config"
"github.com/coreos/etcd/etcd"
"github.com/coreos/etcd/server"
)
var (
laddr = flag.String("l", ":8000", "The port to listen on")
paddr = flag.String("p", "127.0.0.1:8000", "The public address to be adversited")
cluster = flag.String("c", "", "The cluster to join")
)
func main() {
var config = config.New()
if err := config.Load(os.Args[1:]); err != nil {
fmt.Println(server.Usage() + "\n")
fmt.Println(err.Error() + "\n")
os.Exit(1)
} else if config.ShowVersion {
fmt.Println("etcd version", server.ReleaseVersion)
os.Exit(0)
} else if config.ShowHelp {
fmt.Println(server.Usage() + "\n")
os.Exit(0)
flag.Parse()
p, err := sanitizeURL(*paddr)
if err != nil {
log.Fatal(err)
}
var etcd = etcd.New(config)
etcd.Run()
var e *etcd.Server
if len(*cluster) == 0 {
e = etcd.New(1, p, nil)
go e.Bootstrap()
} else {
addrs := strings.Split(*cluster, ",")
cStr := addrs[0]
c, err := sanitizeURL(cStr)
if err != nil {
log.Fatal(err)
}
e = etcd.New(len(addrs), p, []string{c})
go e.Join()
}
if err := http.ListenAndServe(*laddr, e); err != nil {
log.Fatal("system", err)
}
}
func sanitizeURL(ustr string) (string, error) {
u, err := url.Parse(ustr)
if err != nil {
return "", err
}
if u.Scheme == "" {
u.Scheme = "http"
}
return u.String(), nil
}

View File

@ -41,8 +41,18 @@ func New(id int64, heartbeat, election tick) *Node {
func (n *Node) Id() int64 { return n.sm.id }
func (n *Node) Index() int { return n.sm.log.lastIndex() }
func (n *Node) Term() int { return n.sm.term }
func (n *Node) Applied() int { return n.sm.log.applied }
func (n *Node) HasLeader() bool { return n.sm.lead != none }
func (n *Node) IsLeader() bool { return n.sm.lead == n.Id() }
func (n *Node) Leader() int { return n.sm.lead }
// Propose asynchronously proposes data be applied to the underlying state machine.
func (n *Node) Propose(data []byte) { n.propose(Normal, data) }