From fc35324ba71ad3874aabad740502e57a5ac6aa38 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 6 Jul 2014 10:19:23 -0700 Subject: [PATCH] etcd: pass v2 kv api tests --- etcd/etcd.go | 565 +++++++------------- etcd/etcd_test.go | 95 ++-- etcd/profile.go | 27 - etcd/transporter.go | 142 +++++ etcd/v2_apply.go | 63 +++ etcd/v2_http.go | 84 +++ etcd/v2_http_delete.go | 69 +++ etcd/v2_http_get.go | 111 ++++ etcd/v2_http_post.go | 32 ++ etcd/v2_http_put.go | 146 ++++++ etcd/v2_http_test.go | 1117 ++++++++++++++++++++++++++++++++++++++++ etcd/v2_raft.go | 46 ++ etcd/v2_store.go | 78 +++ etcd/v2_util.go | 78 +++ etcd/z_last_test.go | 94 ++++ main.go | 80 +-- raft/node.go | 10 + 17 files changed, 2352 insertions(+), 485 deletions(-) delete mode 100644 etcd/profile.go create mode 100644 etcd/transporter.go create mode 100644 etcd/v2_apply.go create mode 100644 etcd/v2_http.go create mode 100644 etcd/v2_http_delete.go create mode 100644 etcd/v2_http_get.go create mode 100644 etcd/v2_http_post.go create mode 100644 etcd/v2_http_put.go create mode 100644 etcd/v2_http_test.go create mode 100644 etcd/v2_raft.go create mode 100644 etcd/v2_store.go create mode 100644 etcd/v2_util.go create mode 100644 etcd/z_last_test.go diff --git a/etcd/etcd.go b/etcd/etcd.go index b23fa3db5..a9ca72159 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -1,425 +1,206 @@ -/* -Copyright 2013 CoreOS Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - package etcd import ( + "encoding/json" + "fmt" + "log" "net/http" - "os" - "path/filepath" - "runtime" - "strings" - "sync" + "path" "time" - goetcd "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd" - golog "github.com/coreos/etcd/third_party/github.com/coreos/go-log/log" - "github.com/coreos/etcd/third_party/github.com/goraft/raft" - httpclient "github.com/coreos/etcd/third_party/github.com/mreiferson/go-httpclient" - - "github.com/coreos/etcd/config" - ehttp "github.com/coreos/etcd/http" - "github.com/coreos/etcd/log" - "github.com/coreos/etcd/metrics" - "github.com/coreos/etcd/server" + "github.com/coreos/etcd/raft" "github.com/coreos/etcd/store" ) -// TODO(yichengq): constant extraTimeout is a hack. -// Current problem is that there is big lag between join command -// execution and join success. -// Fix it later. It should be removed when proper method is found and -// enough tests are provided. It is expected to be calculated from -// heartbeatInterval and electionTimeout only. -const extraTimeout = time.Duration(1000) * time.Millisecond +const ( + defaultHeartbeat = 1 + defaultElection = 5 -type Etcd struct { - Config *config.Config // etcd config + defaultTickDuration = time.Millisecond * 100 - Store store.Store // data store - Registry *server.Registry // stores URL information for nodes - Server *server.Server // http server, runs on 4001 by default - PeerServer *server.PeerServer // peer server, runs on 7001 by default - StandbyServer *server.StandbyServer + nodePrefix = "/cfg/nodes" + raftPrefix = "/raft" + v2Prefix = "/v2/keys" +) - server *http.Server - peerServer *http.Server +type Server struct { + id int + pubAddr string + nodes map[string]bool + tickDuration time.Duration - mode Mode - modeMutex sync.Mutex - closeChan chan bool - readyNotify chan bool // To signal when server is ready to accept connections - onceReady sync.Once - stopNotify chan bool // To signal when server is stopped totally + proposal chan v2Proposal + node *v2Raft + t *transporter + + store.Store + + stop chan struct{} + + http.Handler } -// New returns a new Etcd instance. -func New(c *config.Config) *Etcd { - if c == nil { - c = config.New() +func New(id int, pubAddr string, nodes []string) *Server { + s := &Server{ + id: id, + pubAddr: pubAddr, + nodes: make(map[string]bool), + tickDuration: defaultTickDuration, + + proposal: make(chan v2Proposal), + node: &v2Raft{ + Node: raft.New(id, defaultHeartbeat, defaultElection), + result: make(map[wait]chan interface{}), + }, + t: newTransporter(), + + Store: store.New(), + + stop: make(chan struct{}), } - return &Etcd{ - Config: c, - closeChan: make(chan bool), - readyNotify: make(chan bool), - stopNotify: make(chan bool), + + for _, seed := range nodes { + s.nodes[seed] = true + } + + m := http.NewServeMux() + //m.Handle("/HEAD", handlerErr(s.serveHead)) + m.Handle("/", handlerErr(s.serveValue)) + m.Handle("/raft", s.t) + s.Handler = m + return s +} + +func (s *Server) SetTick(d time.Duration) { + s.tickDuration = d +} + +func (s *Server) Stop() { + close(s.stop) + s.t.stop() +} + +func (s *Server) Bootstrap() { + s.node.Campaign() + s.node.Add(s.id, s.pubAddr) + s.apply(s.node.Next()) + s.run() +} + +func (s *Server) Join() { + d, err := json.Marshal(&raft.Config{s.id, s.pubAddr}) + if err != nil { + panic(err) + } + + b, err := json.Marshal(&raft.Message{From: s.id, Type: 2, Entries: []raft.Entry{{Type: 1, Data: d}}}) + if err != nil { + panic(err) + } + + for seed := range s.nodes { + if err := s.t.send(seed+raftPrefix, b); err != nil { + log.Println(err) + continue + } + // todo(xiangli) WAIT for join to be committed or retry... + break + } + s.run() +} + +func (s *Server) run() { + node := s.node + recv := s.t.recv + ticker := time.NewTicker(s.tickDuration) + v2SyncTicker := time.NewTicker(time.Millisecond * 500) + + var proposal chan v2Proposal + for { + if node.HasLeader() { + proposal = s.proposal + } else { + proposal = nil + } + select { + case p := <-proposal: + node.Propose(p) + case msg := <-recv: + node.Step(*msg) + case <-ticker.C: + node.Tick() + case <-v2SyncTicker.C: + node.Sync() + case <-s.stop: + log.Printf("Node: %d stopped\n", s.id) + return + } + s.apply(node.Next()) + s.send(node.Msgs()) } } -// Run the etcd instance. -func (e *Etcd) Run() { - // Sanitize all the input fields. - if err := e.Config.Sanitize(); err != nil { - log.Fatalf("failed sanitizing configuration: %v", err) - } - - // Force remove server configuration if specified. - if e.Config.Force { - e.Config.Reset() - } - - // Enable options. - if e.Config.VeryVeryVerbose { - log.Verbose = true - raft.SetLogLevel(raft.Trace) - goetcd.SetLogger( - golog.New( - "go-etcd", - false, - golog.CombinedSink( - os.Stdout, - "[%s] %s %-9s | %s\n", - []string{"prefix", "time", "priority", "message"}, - ), - ), - ) - } else if e.Config.VeryVerbose { - log.Verbose = true - raft.SetLogLevel(raft.Debug) - } else if e.Config.Verbose { - log.Verbose = true - } - - if e.Config.CPUProfileFile != "" { - profile(e.Config.CPUProfileFile) - } - - if e.Config.DataDir == "" { - log.Fatal("The data dir was not set and could not be guessed from machine name") - } - - // Create data directory if it doesn't already exist. - if err := os.MkdirAll(e.Config.DataDir, 0744); err != nil { - log.Fatalf("Unable to create path: %s", err) - } - - // Warn people if they have an info file - info := filepath.Join(e.Config.DataDir, "info") - if _, err := os.Stat(info); err == nil { - log.Warnf("All cached configuration is now ignored. The file %s can be removed.", info) - } - - var mbName string - if e.Config.Trace() { - mbName = e.Config.MetricsBucketName() - runtime.SetBlockProfileRate(1) - } - - mb := metrics.NewBucket(mbName) - - if e.Config.GraphiteHost != "" { - err := mb.Publish(e.Config.GraphiteHost) - if err != nil { - panic(err) +func (s *Server) apply(ents []raft.Entry) { + offset := s.node.Applied() - len(ents) + 1 + for i, ent := range ents { + switch ent.Type { + // expose raft entry type + case raft.Normal: + if len(ent.Data) == 0 { + continue + } + s.v2apply(offset+i, ent) + case raft.AddNode: + cfg := new(raft.Config) + if err := json.Unmarshal(ent.Data, cfg); err != nil { + log.Println(err) + break + } + if err := s.t.set(cfg.NodeId, cfg.Addr); err != nil { + log.Println(err) + break + } + s.nodes[cfg.Addr] = true + p := path.Join(nodePrefix, fmt.Sprint(cfg.NodeId)) + s.Store.Set(p, false, cfg.Addr, store.Permanent) + default: + panic("unimplemented") } } +} - // Retrieve CORS configuration - corsInfo, err := ehttp.NewCORSInfo(e.Config.CorsOrigins) - if err != nil { - log.Fatal("CORS:", err) - } - - // Create etcd key-value store and registry. - e.Store = store.New() - e.Registry = server.NewRegistry(e.Store) - - // Create stats objects - followersStats := server.NewRaftFollowersStats(e.Config.Name) - serverStats := server.NewRaftServerStats(e.Config.Name) - - // Calculate all of our timeouts - heartbeatInterval := time.Duration(e.Config.Peer.HeartbeatInterval) * time.Millisecond - electionTimeout := time.Duration(e.Config.Peer.ElectionTimeout) * time.Millisecond - dialTimeout := (3 * heartbeatInterval) + electionTimeout - responseHeaderTimeout := (3 * heartbeatInterval) + electionTimeout - - clientTransporter := &httpclient.Transport{ - ResponseHeaderTimeout: responseHeaderTimeout + extraTimeout, - // This is a workaround for Transport.CancelRequest doesn't work on - // HTTPS connections blocked. The patch for it is in progress, - // and would be available in Go1.3 - // More: https://codereview.appspot.com/69280043/ - ConnectTimeout: dialTimeout + extraTimeout, - RequestTimeout: responseHeaderTimeout + dialTimeout + 2*extraTimeout, - } - if e.Config.PeerTLSInfo().Scheme() == "https" { - clientTLSConfig, err := e.Config.PeerTLSInfo().ClientConfig() - if err != nil { - log.Fatal("client TLS error: ", err) - } - clientTransporter.TLSClientConfig = clientTLSConfig - clientTransporter.DisableCompression = true - } - client := server.NewClient(clientTransporter) - - // Create peer server - psConfig := server.PeerServerConfig{ - Name: e.Config.Name, - Scheme: e.Config.PeerTLSInfo().Scheme(), - URL: e.Config.Peer.Addr, - SnapshotCount: e.Config.SnapshotCount, - RetryTimes: e.Config.MaxRetryAttempts, - RetryInterval: e.Config.RetryInterval, - } - e.PeerServer = server.NewPeerServer(psConfig, client, e.Registry, e.Store, &mb, followersStats, serverStats) - - // Create raft transporter and server - raftTransporter := server.NewTransporter(followersStats, serverStats, e.Registry, heartbeatInterval, dialTimeout, responseHeaderTimeout) - if e.Config.PeerTLSInfo().Scheme() == "https" { - raftClientTLSConfig, err := e.Config.PeerTLSInfo().ClientConfig() - if err != nil { - log.Fatal("raft client TLS error: ", err) - } - raftTransporter.SetTLSConfig(*raftClientTLSConfig) - } - raftServer, err := raft.NewServer(e.Config.Name, e.Config.DataDir, raftTransporter, e.Store, e.PeerServer, "") - if err != nil { - log.Fatal(err) - } - raftServer.SetElectionTimeout(electionTimeout) - raftServer.SetHeartbeatInterval(heartbeatInterval) - e.PeerServer.SetRaftServer(raftServer, e.Config.Snapshot) - - // Create etcd server - e.Server = server.New(e.Config.Name, e.Config.Addr, e.PeerServer, e.Registry, e.Store, &mb) - - if e.Config.Trace() { - e.Server.EnableTracing() - } - - e.PeerServer.SetServer(e.Server) - - // Create standby server - ssConfig := server.StandbyServerConfig{ - Name: e.Config.Name, - PeerScheme: e.Config.PeerTLSInfo().Scheme(), - PeerURL: e.Config.Peer.Addr, - ClientURL: e.Config.Addr, - DataDir: e.Config.DataDir, - } - e.StandbyServer = server.NewStandbyServer(ssConfig, client) - e.StandbyServer.SetRaftServer(raftServer) - - // Generating config could be slow. - // Put it here to make listen happen immediately after peer-server starting. - peerTLSConfig := server.TLSServerConfig(e.Config.PeerTLSInfo()) - etcdTLSConfig := server.TLSServerConfig(e.Config.EtcdTLSInfo()) - - if !e.StandbyServer.IsRunning() { - startPeerServer, possiblePeers, err := e.PeerServer.FindCluster(e.Config.Discovery, e.Config.Peers) +func (s *Server) send(msgs []raft.Message) { + for i := range msgs { + data, err := json.Marshal(msgs[i]) if err != nil { + // todo(xiangli): error handling log.Fatal(err) } - if startPeerServer { - e.setMode(PeerMode) - } else { - e.StandbyServer.SyncCluster(possiblePeers) - e.setMode(StandbyMode) - } - } else { - e.setMode(StandbyMode) - } - - serverHTTPHandler := &ehttp.CORSHandler{e.Server.HTTPHandler(), corsInfo} - peerServerHTTPHandler := &ehttp.CORSHandler{e.PeerServer.HTTPHandler(), corsInfo} - standbyServerHTTPHandler := &ehttp.CORSHandler{e.StandbyServer.ClientHTTPHandler(), corsInfo} - - log.Infof("etcd server [name %s, listen on %s, advertised url %s]", e.Server.Name, e.Config.BindAddr, e.Server.URL()) - listener := server.NewListener(e.Config.EtcdTLSInfo().Scheme(), e.Config.BindAddr, etcdTLSConfig) - - e.server = &http.Server{Handler: &ModeHandler{e, serverHTTPHandler, standbyServerHTTPHandler}, - ReadTimeout: time.Duration(e.Config.HTTPReadTimeout) * time.Second, - WriteTimeout: time.Duration(e.Config.HTTPWriteTimeout) * time.Second, - } - - log.Infof("peer server [name %s, listen on %s, advertised url %s]", e.PeerServer.Config.Name, e.Config.Peer.BindAddr, e.PeerServer.Config.URL) - peerListener := server.NewListener(e.Config.PeerTLSInfo().Scheme(), e.Config.Peer.BindAddr, peerTLSConfig) - - e.peerServer = &http.Server{Handler: &ModeHandler{e, peerServerHTTPHandler, http.NotFoundHandler()}, - ReadTimeout: time.Duration(server.DefaultReadTimeout) * time.Second, - WriteTimeout: time.Duration(server.DefaultWriteTimeout) * time.Second, - } - - wg := sync.WaitGroup{} - wg.Add(2) - go func() { - <-e.readyNotify - defer wg.Done() - if err := e.server.Serve(listener); err != nil { - if !isListenerClosing(err) { - log.Fatal(err) + // todo(xiangli): reuse routines and limit the number of sending routines + // sync.Pool? + go func(i int) { + var err error + if err = s.t.sendTo(msgs[i].To, data); err == nil { + return } - } - }() - go func() { - <-e.readyNotify - defer wg.Done() - if err := e.peerServer.Serve(peerListener); err != nil { - if !isListenerClosing(err) { - log.Fatal(err) + if err == errUnknownNode { + err = s.fetchAddr(msgs[i].To) + } + if err == nil { + err = s.t.sendTo(msgs[i].To, data) } - } - }() - - e.runServer() - - listener.Close() - peerListener.Close() - wg.Wait() - log.Infof("etcd instance is stopped [name %s]", e.Config.Name) - close(e.stopNotify) -} - -func (e *Etcd) runServer() { - var removeNotify <-chan bool - for { - if e.mode == PeerMode { - log.Infof("%v starting in peer mode", e.Config.Name) - // Starting peer server should be followed close by listening on its port - // If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster. - // One severe problem caused if failing receiving heartbeats is when the second node joins one-node cluster, - // the cluster could be out of work as long as the two nodes cannot transfer messages. - e.PeerServer.Start(e.Config.Snapshot, e.Config.ClusterConfig()) - removeNotify = e.PeerServer.RemoveNotify() - } else { - log.Infof("%v starting in standby mode", e.Config.Name) - e.StandbyServer.Start() - removeNotify = e.StandbyServer.RemoveNotify() - } - - // etcd server is ready to accept connections, notify waiters. - e.onceReady.Do(func() { close(e.readyNotify) }) - - select { - case <-e.closeChan: - e.PeerServer.Stop() - e.StandbyServer.Stop() - return - case <-removeNotify: - } - - if e.mode == PeerMode { - peerURLs := e.Registry.PeerURLs(e.PeerServer.RaftServer().Leader(), e.Config.Name) - e.StandbyServer.SyncCluster(peerURLs) - e.setMode(StandbyMode) - } else { - // Create etcd key-value store and registry. - e.Store = store.New() - e.Registry = server.NewRegistry(e.Store) - e.PeerServer.SetStore(e.Store) - e.PeerServer.SetRegistry(e.Registry) - e.Server.SetStore(e.Store) - e.Server.SetRegistry(e.Registry) - - // Generate new peer server here. - // TODO(yichengq): raft server cannot be started after stopped. - // It should be removed when raft restart is implemented. - heartbeatInterval := time.Duration(e.Config.Peer.HeartbeatInterval) * time.Millisecond - electionTimeout := time.Duration(e.Config.Peer.ElectionTimeout) * time.Millisecond - raftServer, err := raft.NewServer(e.Config.Name, e.Config.DataDir, e.PeerServer.RaftServer().Transporter(), e.Store, e.PeerServer, "") if err != nil { - log.Fatal(err) + log.Println(err) } - raftServer.SetElectionTimeout(electionTimeout) - raftServer.SetHeartbeatInterval(heartbeatInterval) - e.PeerServer.SetRaftServer(raftServer, e.Config.Snapshot) - e.StandbyServer.SetRaftServer(raftServer) + }(i) + } +} - e.PeerServer.SetJoinIndex(e.StandbyServer.JoinIndex()) - e.setMode(PeerMode) +func (s *Server) fetchAddr(nodeId int) error { + for seed := range s.nodes { + if err := s.t.fetchAddr(seed, nodeId); err == nil { + return nil } } + return fmt.Errorf("cannot fetch the address of node %d", nodeId) } - -// Stop the etcd instance. -func (e *Etcd) Stop() { - close(e.closeChan) - <-e.stopNotify -} - -// ReadyNotify returns a channel that is going to be closed -// when the etcd instance is ready to accept connections. -func (e *Etcd) ReadyNotify() <-chan bool { - return e.readyNotify -} - -func (e *Etcd) Mode() Mode { - e.modeMutex.Lock() - defer e.modeMutex.Unlock() - return e.mode -} - -func (e *Etcd) setMode(m Mode) { - e.modeMutex.Lock() - defer e.modeMutex.Unlock() - e.mode = m -} - -func isListenerClosing(err error) bool { - // An error string equivalent to net.errClosing for using with - // http.Serve() during server shutdown. Need to re-declare - // here because it is not exported by "net" package. - const errClosing = "use of closed network connection" - - return strings.Contains(err.Error(), errClosing) -} - -type ModeGetter interface { - Mode() Mode -} - -type ModeHandler struct { - ModeGetter - PeerModeHandler http.Handler - StandbyModeHandler http.Handler -} - -func (h *ModeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - switch h.Mode() { - case PeerMode: - h.PeerModeHandler.ServeHTTP(w, r) - case StandbyMode: - h.StandbyModeHandler.ServeHTTP(w, r) - } -} - -type Mode int - -const ( - PeerMode Mode = iota - StandbyMode -) diff --git a/etcd/etcd_test.go b/etcd/etcd_test.go index 4d5b92579..784a514ca 100644 --- a/etcd/etcd_test.go +++ b/etcd/etcd_test.go @@ -1,41 +1,70 @@ -/* -Copyright 2013 CoreOS Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - package etcd import ( - "io/ioutil" - "os" + "fmt" + "net/http/httptest" "testing" - - "github.com/coreos/etcd/config" + "time" ) -func TestRunStop(t *testing.T) { - path, _ := ioutil.TempDir("", "etcd-") - defer os.RemoveAll(path) +func TestMultipleNodes(t *testing.T) { + tests := []int{1, 3, 5, 9, 11} - config := config.New() - config.Name = "ETCDTEST" - config.DataDir = path - config.Addr = "localhost:0" - config.Peer.Addr = "localhost:0" - - etcd := New(config) - go etcd.Run() - <-etcd.ReadyNotify() - etcd.Stop() + for _, tt := range tests { + es, hs := buildCluster(tt) + waitCluster(t, es) + for i := range es { + es[len(es)-i-1].Stop() + } + for i := range hs { + hs[len(hs)-i-1].Close() + } + } + afterTest(t) +} + +func buildCluster(number int) ([]*Server, []*httptest.Server) { + bootstrapper := 0 + es := make([]*Server, number) + hs := make([]*httptest.Server, number) + var seed string + + for i := range es { + es[i] = New(i, "", []string{seed}) + es[i].SetTick(time.Millisecond * 5) + hs[i] = httptest.NewServer(es[i]) + es[i].pubAddr = hs[i].URL + + if i == bootstrapper { + seed = hs[i].URL + go es[i].Bootstrap() + } else { + // wait for the previous configuration change to be committed + // or this configuration request might be dropped + w, err := es[0].Watch(nodePrefix, true, false, uint64(i)) + if err != nil { + panic(err) + } + <-w.EventChan + go es[i].Join() + } + } + return es, hs +} + +func waitCluster(t *testing.T, es []*Server) { + n := len(es) + for i, e := range es { + for k := 1; k < n+1; k++ { + w, err := e.Watch(nodePrefix, true, false, uint64(k)) + if err != nil { + panic(err) + } + v := <-w.EventChan + ww := fmt.Sprintf("%s/%d", nodePrefix, k-1) + if v.Node.Key != ww { + t.Errorf("#%d path = %v, want %v", i, v.Node.Key, w) + } + } + } } diff --git a/etcd/profile.go b/etcd/profile.go deleted file mode 100644 index 766325737..000000000 --- a/etcd/profile.go +++ /dev/null @@ -1,27 +0,0 @@ -package etcd - -import ( - "os" - "os/signal" - "runtime/pprof" - - "github.com/coreos/etcd/log" -) - -// profile starts CPU profiling. -func profile(path string) { - f, err := os.Create(path) - if err != nil { - log.Fatal(err) - } - pprof.StartCPUProfile(f) - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - go func() { - sig := <-c - log.Infof("captured %v, stopping profiler and exiting..", sig) - pprof.StopCPUProfile() - os.Exit(1) - }() -} diff --git a/etcd/transporter.go b/etcd/transporter.go new file mode 100644 index 000000000..c21845aaf --- /dev/null +++ b/etcd/transporter.go @@ -0,0 +1,142 @@ +package etcd + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "log" + "net/http" + "net/url" + "path" + "sync" + + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/store" +) + +var ( + errUnknownNode = errors.New("unknown node") +) + +type transporter struct { + mu sync.RWMutex + stopped bool + urls map[int]string + + recv chan *raft.Message + client *http.Client + wg sync.WaitGroup +} + +func newTransporter() *transporter { + tr := new(http.Transport) + c := &http.Client{Transport: tr} + + return &transporter{ + urls: make(map[int]string), + recv: make(chan *raft.Message, 512), + client: c, + } +} + +func (t *transporter) stop() { + t.mu.Lock() + t.stopped = true + t.mu.Unlock() + + t.wg.Wait() + tr := t.client.Transport.(*http.Transport) + tr.CloseIdleConnections() +} + +func (t *transporter) set(nodeId int, rawurl string) error { + u, err := url.Parse(rawurl) + if err != nil { + return err + } + u.Path = raftPrefix + t.mu.Lock() + t.urls[nodeId] = u.String() + t.mu.Unlock() + return nil +} + +func (t *transporter) sendTo(nodeId int, data []byte) error { + t.mu.RLock() + url := t.urls[nodeId] + t.mu.RUnlock() + + if len(url) == 0 { + return errUnknownNode + } + return t.send(url, data) +} + +func (t *transporter) send(addr string, data []byte) error { + t.mu.RLock() + if t.stopped { + t.mu.RUnlock() + return fmt.Errorf("transporter stopped") + } + t.mu.RUnlock() + + buf := bytes.NewBuffer(data) + t.wg.Add(1) + defer t.wg.Done() + resp, err := t.client.Post(addr, "application/octet-stream", buf) + if err != nil { + return err + } + resp.Body.Close() + return nil +} + +func (t *transporter) fetchAddr(seedurl string, id int) error { + u, err := url.Parse(seedurl) + if err != nil { + return fmt.Errorf("cannot parse the url of the given seed") + } + + u.Path = path.Join(v2Prefix, nodePrefix, fmt.Sprint(id)) + resp, err := t.client.Get(u.String()) + if err != nil { + return fmt.Errorf("cannot reach %v", u) + } + defer resp.Body.Close() + + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("cannot reach %v", u) + } + + event := new(store.Event) + err = json.Unmarshal(b, event) + if err != nil { + panic(fmt.Sprintf("fetchAddr: ", err)) + } + + if err := t.set(id, *event.Node.Value); err != nil { + return fmt.Errorf("cannot parse the url of node %d: %v", id, err) + } + return nil +} + +func (t *transporter) ServeHTTP(w http.ResponseWriter, r *http.Request) { + msg := new(raft.Message) + if err := json.NewDecoder(r.Body).Decode(msg); err != nil { + log.Println(err) + return + } + + select { + case t.recv <- msg: + default: + log.Println("drop") + // drop the incoming package at network layer if the upper layer + // cannot consume them in time. + // TODO(xiangli): not return 200. + } + return +} diff --git a/etcd/v2_apply.go b/etcd/v2_apply.go new file mode 100644 index 000000000..a1daeea96 --- /dev/null +++ b/etcd/v2_apply.go @@ -0,0 +1,63 @@ +package etcd + +import ( + "encoding/json" + "fmt" + "log" + + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/store" +) + +func (s *Server) v2apply(index int, ent raft.Entry) { + var ret interface{} + var e *store.Event + var err error + + cmd := new(cmd) + if err := json.Unmarshal(ent.Data, cmd); err != nil { + log.Println("v2apply.decode:", err) + return + } + + switch cmd.Type { + case "set": + e, err = s.Store.Set(cmd.Key, cmd.Dir, cmd.Value, cmd.Time) + case "update": + e, err = s.Store.Update(cmd.Key, cmd.Value, cmd.Time) + case "create", "unique": + e, err = s.Store.Create(cmd.Key, cmd.Dir, cmd.Value, cmd.Unique, cmd.Time) + case "delete": + e, err = s.Store.Delete(cmd.Key, cmd.Dir, cmd.Recursive) + case "cad": + e, err = s.Store.CompareAndDelete(cmd.Key, cmd.PrevValue, cmd.PrevIndex) + case "cas": + e, err = s.Store.CompareAndSwap(cmd.Key, cmd.PrevValue, cmd.PrevIndex, cmd.Value, cmd.Time) + case "sync": + s.Store.DeleteExpiredKeys(cmd.Time) + return + default: + log.Println("unexpected command type:", cmd.Type) + } + + if ent.Term > s.node.term { + s.node.term = ent.Term + for k, v := range s.node.result { + if k.term < s.node.term { + v <- fmt.Errorf("proposal lost due to leader election") + delete(s.node.result, k) + } + } + } + + if s.node.result[wait{index, ent.Term}] == nil { + return + } + + if err != nil { + ret = err + } else { + ret = e + } + s.node.result[wait{index, ent.Term}] <- ret +} diff --git a/etcd/v2_http.go b/etcd/v2_http.go new file mode 100644 index 000000000..aa678618b --- /dev/null +++ b/etcd/v2_http.go @@ -0,0 +1,84 @@ +package etcd + +import ( + "fmt" + "log" + "net/http" + "net/url" + "strings" + + etcdErr "github.com/coreos/etcd/error" +) + +func (s *Server) serveValue(w http.ResponseWriter, r *http.Request) error { + switch r.Method { + case "GET": + return s.GetHandler(w, r) + case "HEAD": + w = &HEADResponseWriter{w} + return s.GetHandler(w, r) + case "PUT": + return s.PutHandler(w, r) + case "POST": + return s.PostHandler(w, r) + case "DELETE": + return s.DeleteHandler(w, r) + } + return allow(w, "GET", "PUT", "POST", "DELETE", "HEAD") +} + +type handlerErr func(w http.ResponseWriter, r *http.Request) error + +func (eh handlerErr) ServeHTTP(w http.ResponseWriter, r *http.Request) { + err := eh(w, r) + if err == nil { + return + } + + if r.Method == "HEAD" { + w = &HEADResponseWriter{w} + } + + if etcdErr, ok := err.(*etcdErr.Error); ok { + w.Header().Set("Content-Type", "application/json") + etcdErr.Write(w) + return + } + + log.Println("http error", err) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) +} + +func allow(w http.ResponseWriter, m ...string) error { + w.Header().Set("Allow", strings.Join(m, ",")) + return nil +} + +type HEADResponseWriter struct { + http.ResponseWriter +} + +func (w *HEADResponseWriter) Write([]byte) (int, error) { + return 0, nil +} + +func (s *Server) redirect(w http.ResponseWriter, r *http.Request, id int) error { + baseURL := s.t.urls[id] + if len(baseURL) == 0 { + log.Println("redirect cannot find node", id) + return fmt.Errorf("redirect cannot find node %d", id) + } + + originalURL := r.URL + redirectURL, err := url.Parse(baseURL) + if err != nil { + log.Println("redirect cannot parse url:", err) + return fmt.Errorf("redirect cannot parse url: %v", err) + } + + redirectURL.Path = originalURL.Path + redirectURL.RawQuery = originalURL.RawQuery + redirectURL.Fragment = originalURL.Fragment + http.Redirect(w, r, redirectURL.String(), http.StatusTemporaryRedirect) + return nil +} diff --git a/etcd/v2_http_delete.go b/etcd/v2_http_delete.go new file mode 100644 index 000000000..6e7118ecf --- /dev/null +++ b/etcd/v2_http_delete.go @@ -0,0 +1,69 @@ +package etcd + +import ( + "log" + "net/http" + "strconv" + + etcdErr "github.com/coreos/etcd/error" +) + +func (s *Server) DeleteHandler(w http.ResponseWriter, req *http.Request) error { + if !s.node.IsLeader() { + return s.redirect(w, req, s.node.Leader()) + } + + key := req.URL.Path[len("/v2/keys"):] + + recursive := (req.FormValue("recursive") == "true") + dir := (req.FormValue("dir") == "true") + + req.ParseForm() + _, valueOk := req.Form["prevValue"] + _, indexOk := req.Form["prevIndex"] + + if !valueOk && !indexOk { + return s.serveDelete(w, req, key, dir, recursive) + } + + var err error + prevIndex := uint64(0) + prevValue := req.Form.Get("prevValue") + + if indexOk { + prevIndexStr := req.Form.Get("prevIndex") + prevIndex, err = strconv.ParseUint(prevIndexStr, 10, 64) + + // bad previous index + if err != nil { + return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndDelete", s.Store.Index()) + } + } + + if valueOk { + if prevValue == "" { + return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndDelete", s.Store.Index()) + } + } + return s.serveCAD(w, req, key, prevValue, prevIndex) +} + +func (s *Server) serveDelete(w http.ResponseWriter, req *http.Request, key string, dir, recursive bool) error { + ret, err := s.Delete(key, dir, recursive) + if err == nil { + s.handleRet(w, ret) + return nil + } + log.Println("delete:", err) + return err +} + +func (s *Server) serveCAD(w http.ResponseWriter, req *http.Request, key string, prevValue string, prevIndex uint64) error { + ret, err := s.CAD(key, prevValue, prevIndex) + if err == nil { + s.handleRet(w, ret) + return nil + } + log.Println("cad:", err) + return err +} diff --git a/etcd/v2_http_get.go b/etcd/v2_http_get.go new file mode 100644 index 000000000..8b9b1670a --- /dev/null +++ b/etcd/v2_http_get.go @@ -0,0 +1,111 @@ +package etcd + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + + etcdErr "github.com/coreos/etcd/error" +) + +func (s *Server) GetHandler(w http.ResponseWriter, req *http.Request) error { + key := req.URL.Path[len("/v2/keys"):] + // TODO(xiangli): handle consistent get + recursive := (req.FormValue("recursive") == "true") + sort := (req.FormValue("sorted") == "true") + waitIndex := req.FormValue("waitIndex") + stream := (req.FormValue("stream") == "true") + if req.FormValue("wait") == "true" { + return s.handleWatch(key, recursive, stream, waitIndex, w, req) + } + return s.handleGet(key, recursive, sort, w, req) +} + +func (s *Server) handleWatch(key string, recursive, stream bool, waitIndex string, w http.ResponseWriter, req *http.Request) error { + // Create a command to watch from a given index (default 0). + var sinceIndex uint64 = 0 + var err error + + if waitIndex != "" { + sinceIndex, err = strconv.ParseUint(waitIndex, 10, 64) + if err != nil { + return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", s.Store.Index()) + } + } + + watcher, err := s.Store.Watch(key, recursive, stream, sinceIndex) + if err != nil { + return err + } + + cn, _ := w.(http.CloseNotifier) + closeChan := cn.CloseNotify() + + s.writeHeaders(w) + + if stream { + // watcher hub will not help to remove stream watcher + // so we need to remove here + defer watcher.Remove() + for { + select { + case <-closeChan: + return nil + case event, ok := <-watcher.EventChan: + if !ok { + // If the channel is closed this may be an indication of + // that notifications are much more than we are able to + // send to the client in time. Then we simply end streaming. + return nil + } + if req.Method == "HEAD" { + continue + } + + b, _ := json.Marshal(event) + _, err := w.Write(b) + if err != nil { + return nil + } + w.(http.Flusher).Flush() + } + } + } + + select { + case <-closeChan: + watcher.Remove() + case event := <-watcher.EventChan: + if req.Method == "HEAD" { + return nil + } + b, _ := json.Marshal(event) + w.Write(b) + } + return nil +} + +func (s *Server) handleGet(key string, recursive, sort bool, w http.ResponseWriter, req *http.Request) error { + event, err := s.Store.Get(key, recursive, sort) + if err != nil { + return err + } + s.writeHeaders(w) + if req.Method == "HEAD" { + return nil + } + b, err := json.Marshal(event) + if err != nil { + panic(fmt.Sprintf("handleGet: ", err)) + } + w.Write(b) + return nil +} + +func (s *Server) writeHeaders(w http.ResponseWriter) { + w.Header().Set("Content-Type", "application/json") + w.Header().Add("X-Etcd-Index", fmt.Sprint(s.Store.Index())) + // TODO(xiangli): raft-index and term + w.WriteHeader(http.StatusOK) +} diff --git a/etcd/v2_http_post.go b/etcd/v2_http_post.go new file mode 100644 index 000000000..02e02f84e --- /dev/null +++ b/etcd/v2_http_post.go @@ -0,0 +1,32 @@ +package etcd + +import ( + "log" + "net/http" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/store" +) + +func (s *Server) PostHandler(w http.ResponseWriter, req *http.Request) error { + if !s.node.IsLeader() { + return s.redirect(w, req, s.node.Leader()) + } + + key := req.URL.Path[len("/v2/keys"):] + + value := req.FormValue("value") + dir := (req.FormValue("dir") == "true") + expireTime, err := store.TTL(req.FormValue("ttl")) + if err != nil { + return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", s.Store.Index()) + } + + ret, err := s.Create(key, dir, value, expireTime, true) + if err == nil { + s.handleRet(w, ret) + return nil + } + log.Println("unique:", err) + return err +} diff --git a/etcd/v2_http_put.go b/etcd/v2_http_put.go new file mode 100644 index 000000000..7804323b2 --- /dev/null +++ b/etcd/v2_http_put.go @@ -0,0 +1,146 @@ +package etcd + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "net/url" + "strconv" + "time" + + etcdErr "github.com/coreos/etcd/error" + "github.com/coreos/etcd/store" +) + +func (s *Server) PutHandler(w http.ResponseWriter, req *http.Request) error { + if !s.node.IsLeader() { + return s.redirect(w, req, s.node.Leader()) + } + + key := req.URL.Path[len("/v2/keys"):] + + req.ParseForm() + + value := req.Form.Get("value") + dir := (req.FormValue("dir") == "true") + + expireTime, err := store.TTL(req.Form.Get("ttl")) + if err != nil { + return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", s.Store.Index()) + } + + prevValue, valueOk := firstValue(req.Form, "prevValue") + prevIndexStr, indexOk := firstValue(req.Form, "prevIndex") + prevExist, existOk := firstValue(req.Form, "prevExist") + + // Set handler: create a new node or replace the old one. + if !valueOk && !indexOk && !existOk { + return s.serveSet(w, req, key, dir, value, expireTime) + } + + // update with test + if existOk { + if prevExist == "false" { + // Create command: create a new node. Fail, if a node already exists + // Ignore prevIndex and prevValue + return s.serveCreate(w, req, key, dir, value, expireTime) + } + + if prevExist == "true" && !indexOk && !valueOk { + return s.serveUpdate(w, req, key, value, expireTime) + } + } + + var prevIndex uint64 + + if indexOk { + prevIndex, err = strconv.ParseUint(prevIndexStr, 10, 64) + + // bad previous index + if err != nil { + return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndSwap", s.Store.Index()) + } + } else { + prevIndex = 0 + } + + if valueOk { + if prevValue == "" { + return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndSwap", s.Store.Index()) + } + } + + return s.serveCAS(w, req, key, value, prevValue, prevIndex, expireTime) +} + +func (s *Server) handleRet(w http.ResponseWriter, ret *store.Event) { + b, _ := json.Marshal(ret) + + w.Header().Set("Content-Type", "application/json") + // etcd index should be the same as the event index + // which is also the last modified index of the node + w.Header().Add("X-Etcd-Index", fmt.Sprint(ret.Index())) + // w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex())) + // w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term())) + + if ret.IsCreated() { + w.WriteHeader(http.StatusCreated) + } else { + w.WriteHeader(http.StatusOK) + } + + w.Write(b) +} + +func (s *Server) serveSet(w http.ResponseWriter, req *http.Request, key string, dir bool, value string, expireTime time.Time) error { + ret, err := s.Set(key, dir, value, expireTime) + if err == nil { + s.handleRet(w, ret) + return nil + } + log.Println("set:", err) + return err +} + +func (s *Server) serveCreate(w http.ResponseWriter, req *http.Request, key string, dir bool, value string, expireTime time.Time) error { + ret, err := s.Create(key, dir, value, expireTime, false) + if err == nil { + s.handleRet(w, ret) + return nil + } + log.Println("create:", err) + return err +} + +func (s *Server) serveUpdate(w http.ResponseWriter, req *http.Request, key, value string, expireTime time.Time) error { + // Update should give at least one option + if value == "" && expireTime.Sub(store.Permanent) == 0 { + return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", s.Store.Index()) + } + ret, err := s.Update(key, value, expireTime) + if err == nil { + s.handleRet(w, ret) + return nil + } + log.Println("update:", err) + return err +} + +func (s *Server) serveCAS(w http.ResponseWriter, req *http.Request, key, value, prevValue string, prevIndex uint64, expireTime time.Time) error { + ret, err := s.CAS(key, value, prevValue, prevIndex, expireTime) + if err == nil { + s.handleRet(w, ret) + return nil + } + log.Println("update:", err) + return err +} + +func firstValue(f url.Values, key string) (string, bool) { + l, ok := f[key] + if !ok { + return "", false + } + return l[0], true +} diff --git a/etcd/v2_http_test.go b/etcd/v2_http_test.go new file mode 100644 index 000000000..626b2e57a --- /dev/null +++ b/etcd/v2_http_test.go @@ -0,0 +1,1117 @@ +package etcd + +// Ensures that a value can be retrieve for a given key. + +import ( + "fmt" + "net/http" + "net/url" + "testing" + "time" + + "github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert" +) + +// Ensures that a directory is created +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar?dir=true +// +func TestV2SetDirectory(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + resp, err := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo?dir=true"), url.Values{}) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + body := ReadBody(resp) + assert.Nil(t, err, "") + assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "") + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a time-to-live is added to a key. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d ttl=20 +// +func TestV2SetKeyWithTTL(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + t0 := time.Now() + v := url.Values{} + v.Set("value", "XXX") + v.Set("ttl", "20") + resp, _ := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar"), v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + body := ReadBodyJSON(resp) + node := body["node"].(map[string]interface{}) + assert.Equal(t, node["ttl"], 20, "") + + // Make sure the expiration date is correct. + expiration, _ := time.Parse(time.RFC3339Nano, node["expiration"].(string)) + assert.Equal(t, expiration.Sub(t0)/time.Second, 20, "") + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that an invalid time-to-live is returned as an error. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d ttl=bad_ttl +// +func TestV2SetKeyWithBadTTL(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + v := url.Values{} + v.Set("value", "XXX") + v.Set("ttl", "bad_ttl") + resp, _ := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar"), v) + assert.Equal(t, resp.StatusCode, http.StatusBadRequest) + body := ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 202, "") + assert.Equal(t, body["message"], "The given TTL in POST form is not a number", "") + assert.Equal(t, body["cause"], "Update", "") + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a key is conditionally set if it previously did not exist. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=false +// +func TestV2CreateKeySuccess(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + v := url.Values{} + v.Set("value", "XXX") + v.Set("prevExist", "false") + resp, _ := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar"), v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + body := ReadBodyJSON(resp) + node := body["node"].(map[string]interface{}) + assert.Equal(t, node["value"], "XXX", "") + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a key is not conditionally set because it previously existed. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=false +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=false -> fail +// +func TestV2CreateKeyFail(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + v := url.Values{} + v.Set("value", "XXX") + v.Set("prevExist", "false") + fullURL := fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar") + resp, _ := PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + ReadBody(resp) + resp, _ = PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed) + body := ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 105, "") + assert.Equal(t, body["message"], "Key already exists", "") + assert.Equal(t, body["cause"], "/foo/bar", "") + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a key is conditionally set only if it previously did exist. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevExist=true +// +func TestV2UpdateKeySuccess(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + v := url.Values{} + + v.Set("value", "XXX") + fullURL := fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar") + resp, _ := PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + ReadBody(resp) + + v.Set("value", "YYY") + v.Set("prevExist", "true") + resp, _ = PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusOK) + body := ReadBodyJSON(resp) + assert.Equal(t, body["action"], "update", "") + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a key is not conditionally set if it previously did not exist. +// +// $ curl -X PUT localhost:4001/v2/keys/foo?dir=true +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevExist=true +// +func TestV2UpdateKeyFailOnValue(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + v := url.Values{} + resp, _ := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo?dir=true"), v) + resp.Body.Close() + + assert.Equal(t, resp.StatusCode, http.StatusCreated) + v.Set("value", "YYY") + v.Set("prevExist", "true") + resp, _ = PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar"), v) + assert.Equal(t, resp.StatusCode, http.StatusNotFound) + body := ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 100, "") + assert.Equal(t, body["message"], "Key not found", "") + assert.Equal(t, body["cause"], "/foo/bar", "") + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a key is not conditionally set if it previously did not exist. +// +// $ curl -X PUT localhost:4001/v2/keys/foo -d value=YYY -d prevExist=true -> fail +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevExist=true -> fail +// +func TestV2UpdateKeyFailOnMissingDirectory(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + v := url.Values{} + v.Set("value", "YYY") + v.Set("prevExist", "true") + resp, _ := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo"), v) + assert.Equal(t, resp.StatusCode, http.StatusNotFound) + body := ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 100, "") + assert.Equal(t, body["message"], "Key not found", "") + assert.Equal(t, body["cause"], "/foo", "") + + resp, _ = PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar"), v) + assert.Equal(t, resp.StatusCode, http.StatusNotFound) + body = ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 100, "") + assert.Equal(t, body["message"], "Key not found", "") + assert.Equal(t, body["cause"], "/foo", "") + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a key could update TTL. +// +// $ curl -X PUT localhost:4001/v2/keys/foo -d value=XXX +// $ curl -X PUT localhost:4001/v2/keys/foo -d value=XXX -d ttl=1000 -d prevExist=true +// $ curl -X PUT localhost:4001/v2/keys/foo -d value=XXX -d ttl= -d prevExist=true +// +func TestV2UpdateKeySuccessWithTTL(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + v := url.Values{} + v.Set("value", "XXX") + resp, _ := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo"), v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + node := (ReadBodyJSON(resp)["node"]).(map[string]interface{}) + createdIndex := node["createdIndex"] + + v.Set("ttl", "1000") + v.Set("prevExist", "true") + resp, _ = PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo"), v) + assert.Equal(t, resp.StatusCode, http.StatusOK) + node = (ReadBodyJSON(resp)["node"]).(map[string]interface{}) + assert.Equal(t, node["value"], "XXX", "") + assert.Equal(t, node["ttl"], 1000, "") + assert.NotEqual(t, node["expiration"], "", "") + assert.Equal(t, node["createdIndex"], createdIndex, "") + + v.Del("ttl") + resp, _ = PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo"), v) + assert.Equal(t, resp.StatusCode, http.StatusOK) + node = (ReadBodyJSON(resp)["node"]).(map[string]interface{}) + assert.Equal(t, node["value"], "XXX", "") + assert.Equal(t, node["ttl"], nil, "") + assert.Equal(t, node["expiration"], nil, "") + assert.Equal(t, node["createdIndex"], createdIndex, "") + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a key is set only if the previous index matches. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevIndex=1 +// +func TestV2SetKeyCASOnIndexSuccess(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + v := url.Values{} + v.Set("value", "XXX") + fullURL := fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar") + resp, _ := PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + ReadBody(resp) + + v.Set("value", "YYY") + v.Set("prevIndex", "2") + resp, _ = PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusOK) + body := ReadBodyJSON(resp) + assert.Equal(t, body["action"], "compareAndSwap", "") + node := body["node"].(map[string]interface{}) + assert.Equal(t, node["value"], "YYY", "") + assert.Equal(t, node["modifiedIndex"], 3, "") + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a key is not set if the previous index does not match. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevIndex=10 +// +func TestV2SetKeyCASOnIndexFail(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + v := url.Values{} + v.Set("value", "XXX") + fullURL := fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar") + resp, _ := PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + ReadBody(resp) + v.Set("value", "YYY") + v.Set("prevIndex", "10") + resp, _ = PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed) + body := ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 101, "") + assert.Equal(t, body["message"], "Compare failed", "") + assert.Equal(t, body["cause"], "[10 != 2]", "") + assert.Equal(t, body["index"], 2, "") + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that an error is thrown if an invalid previous index is provided. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevIndex=bad_index +// +func TestV2SetKeyCASWithInvalidIndex(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + v := url.Values{} + v.Set("value", "YYY") + v.Set("prevIndex", "bad_index") + resp, _ := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar"), v) + assert.Equal(t, resp.StatusCode, http.StatusBadRequest) + body := ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 203, "") + assert.Equal(t, body["message"], "The given index in POST form is not a number", "") + assert.Equal(t, body["cause"], "CompareAndSwap", "") + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a key is set only if the previous value matches. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=XXX +// +func TestV2SetKeyCASOnValueSuccess(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + v := url.Values{} + v.Set("value", "XXX") + fullURL := fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar") + resp, _ := PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + ReadBody(resp) + v.Set("value", "YYY") + v.Set("prevValue", "XXX") + resp, _ = PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusOK) + body := ReadBodyJSON(resp) + assert.Equal(t, body["action"], "compareAndSwap", "") + node := body["node"].(map[string]interface{}) + assert.Equal(t, node["value"], "YYY", "") + assert.Equal(t, node["modifiedIndex"], 3, "") + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a key is not set if the previous value does not match. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=AAA +// +func TestV2SetKeyCASOnValueFail(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + v := url.Values{} + v.Set("value", "XXX") + fullURL := fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar") + resp, _ := PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + ReadBody(resp) + v.Set("value", "YYY") + v.Set("prevValue", "AAA") + resp, _ = PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed) + body := ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 101, "") + assert.Equal(t, body["message"], "Compare failed", "") + assert.Equal(t, body["cause"], "[AAA != XXX]", "") + assert.Equal(t, body["index"], 2, "") + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that an error is returned if a blank prevValue is set. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX -d prevValue= +// +func TestV2SetKeyCASWithMissingValueFails(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + v := url.Values{} + v.Set("value", "XXX") + v.Set("prevValue", "") + resp, _ := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar"), v) + assert.Equal(t, resp.StatusCode, http.StatusBadRequest) + body := ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 201, "") + assert.Equal(t, body["message"], "PrevValue is Required in POST form", "") + assert.Equal(t, body["cause"], "CompareAndSwap", "") + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a key is not set if both previous value and index do not match. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=AAA -d prevIndex=4 +// +func TestV2SetKeyCASOnValueAndIndexFail(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + v := url.Values{} + v.Set("value", "XXX") + fullURL := fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar") + resp, _ := PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + ReadBody(resp) + v.Set("value", "YYY") + v.Set("prevValue", "AAA") + v.Set("prevIndex", "4") + resp, _ = PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed) + body := ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 101, "") + assert.Equal(t, body["message"], "Compare failed", "") + assert.Equal(t, body["cause"], "[AAA != XXX] [4 != 2]", "") + assert.Equal(t, body["index"], 2, "") + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a key is not set if previous value match but index does not. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=XXX -d prevIndex=4 +// +func TestV2SetKeyCASOnValueMatchAndIndexFail(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + v := url.Values{} + v.Set("value", "XXX") + fullURL := fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar") + resp, _ := PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + ReadBody(resp) + v.Set("value", "YYY") + v.Set("prevValue", "XXX") + v.Set("prevIndex", "4") + resp, _ = PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed) + body := ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 101, "") + assert.Equal(t, body["message"], "Compare failed", "") + assert.Equal(t, body["cause"], "[4 != 2]", "") + assert.Equal(t, body["index"], 2, "") + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a key is not set if previous index matches but value does not. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY -d prevValue=AAA -d prevIndex=3 +// +func TestV2SetKeyCASOnIndexMatchAndValueFail(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + v := url.Values{} + v.Set("value", "XXX") + fullURL := fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar") + resp, _ := PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + ReadBody(resp) + v.Set("value", "YYY") + v.Set("prevValue", "AAA") + v.Set("prevIndex", "2") + resp, _ = PutForm(fullURL, v) + assert.Equal(t, resp.StatusCode, http.StatusPreconditionFailed) + body := ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 101, "") + assert.Equal(t, body["message"], "Compare failed", "") + assert.Equal(t, body["cause"], "[AAA != XXX]", "") + assert.Equal(t, body["index"], 2, "") + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensure that we can set an empty value +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value= +// +func TestV2SetKeyCASWithEmptyValueSuccess(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + v := url.Values{} + v.Set("value", "") + resp, _ := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar"), v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + body := ReadBody(resp) + assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"","modifiedIndex":2,"createdIndex":2}}`) + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +func TestV2SetKey(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + + v := url.Values{} + v.Set("value", "XXX") + resp, err := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar"), v) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + body := ReadBody(resp) + assert.Nil(t, err, "") + assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"XXX","modifiedIndex":2,"createdIndex":2}}`, "") + + resp.Body.Close() + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +func TestV2SetKeyRedirect(t *testing.T) { + es, hs := buildCluster(3) + waitCluster(t, es) + u := hs[1].URL + ru := fmt.Sprintf("%s%s", hs[0].URL, "/v2/keys/foo/bar") + + v := url.Values{} + v.Set("value", "XXX") + resp, _ := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar"), v) + assert.Equal(t, resp.StatusCode, http.StatusTemporaryRedirect) + location, err := resp.Location() + if err != nil { + t.Errorf("want err = %, want nil", err) + } + + if location.String() != ru { + t.Errorf("location = %v, want %v", location.String(), ru) + } + + resp.Body.Close() + for i := range es { + es[len(es)-i-1].Stop() + } + for i := range hs { + hs[len(hs)-i-1].Close() + } + afterTest(t) +} + +// Ensures that a key is deleted. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl -X DELETE localhost:4001/v2/keys/foo/bar +// +func TestV2DeleteKey(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + + v := url.Values{} + v.Set("value", "XXX") + resp, err := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar"), v) + resp.Body.Close() + ReadBody(resp) + + resp, err = DeleteForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar"), url.Values{}) + assert.Equal(t, resp.StatusCode, http.StatusOK) + body := ReadBody(resp) + assert.Nil(t, err, "") + assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo/bar","modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo/bar","value":"XXX","modifiedIndex":2,"createdIndex":2}}`, "") + resp.Body.Close() + + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that an empty directory is deleted when dir is set. +// +// $ curl -X PUT localhost:4001/v2/keys/foo?dir=true +// $ curl -X DELETE localhost:4001/v2/keys/foo ->fail +// $ curl -X DELETE localhost:4001/v2/keys/foo?dir=true +// +func TestV2DeleteEmptyDirectory(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + + resp, err := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo?dir=true"), url.Values{}) + resp.Body.Close() + + resp, err = DeleteForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo"), url.Values{}) + assert.Equal(t, resp.StatusCode, http.StatusForbidden) + bodyJson := ReadBodyJSON(resp) + assert.Equal(t, bodyJson["errorCode"], 102, "") + resp, err = DeleteForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo?dir=true"), url.Values{}) + assert.Equal(t, resp.StatusCode, http.StatusOK) + body := ReadBody(resp) + assert.Nil(t, err, "") + assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "") + + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a not-empty directory is deleted when dir is set. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar?dir=true +// $ curl -X DELETE localhost:4001/v2/keys/foo?dir=true ->fail +// $ curl -X DELETE localhost:4001/v2/keys/foo?dir=true&recursive=true +// +func TestV2DeleteNonEmptyDirectory(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + + resp, err := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar?dir=true"), url.Values{}) + ReadBody(resp) + resp, err = DeleteForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo?dir=true"), url.Values{}) + assert.Equal(t, resp.StatusCode, http.StatusForbidden) + bodyJson := ReadBodyJSON(resp) + assert.Equal(t, bodyJson["errorCode"], 108, "") + resp, err = DeleteForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo?dir=true&recursive=true"), url.Values{}) + assert.Equal(t, resp.StatusCode, http.StatusOK) + body := ReadBody(resp) + assert.Nil(t, err, "") + assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "") + + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a directory is deleted when recursive is set. +// +// $ curl -X PUT localhost:4001/v2/keys/foo?dir=true +// $ curl -X DELETE localhost:4001/v2/keys/foo?recursive=true +// +func TestV2DeleteDirectoryRecursiveImpliesDir(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + + resp, err := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo?dir=true"), url.Values{}) + ReadBody(resp) + resp, err = DeleteForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo?recursive=true"), url.Values{}) + assert.Equal(t, resp.StatusCode, http.StatusOK) + body := ReadBody(resp) + assert.Nil(t, err, "") + assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo","dir":true,"modifiedIndex":3,"createdIndex":2},"prevNode":{"key":"/foo","dir":true,"modifiedIndex":2,"createdIndex":2}}`, "") + + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a key is deleted if the previous index matches +// +// $ curl -X PUT localhost:4001/v2/keys/foo -d value=XXX +// $ curl -X DELETE localhost:4001/v2/keys/foo?prevIndex=3 +// +func TestV2DeleteKeyCADOnIndexSuccess(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + + v := url.Values{} + v.Set("value", "XXX") + resp, err := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo"), v) + ReadBody(resp) + resp, err = DeleteForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo?prevIndex=2"), url.Values{}) + assert.Nil(t, err, "") + body := ReadBodyJSON(resp) + assert.Equal(t, body["action"], "compareAndDelete", "") + + node := body["node"].(map[string]interface{}) + assert.Equal(t, node["key"], "/foo", "") + assert.Equal(t, node["modifiedIndex"], 3, "") + + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a key is not deleted if the previous index does not match +// +// $ curl -X PUT localhost:4001/v2/keys/foo -d value=XXX +// $ curl -X DELETE localhost:4001/v2/keys/foo?prevIndex=100 +// +func TestV2DeleteKeyCADOnIndexFail(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + + v := url.Values{} + v.Set("value", "XXX") + resp, err := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo"), v) + ReadBody(resp) + resp, err = DeleteForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo?prevIndex=100"), url.Values{}) + assert.Nil(t, err, "") + body := ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 101) + + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that an error is thrown if an invalid previous index is provided. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevIndex=bad_index +// +func TestV2DeleteKeyCADWithInvalidIndex(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + + v := url.Values{} + v.Set("value", "XXX") + resp, _ := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar"), v) + ReadBody(resp) + resp, _ = DeleteForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar?prevIndex=bad_index"), v) + body := ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 203) + + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a key is deleted only if the previous value matches. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevValue=XXX +// +func TestV2DeleteKeyCADOnValueSuccess(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + + v := url.Values{} + v.Set("value", "XXX") + resp, _ := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar"), v) + ReadBody(resp) + resp, _ = DeleteForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar?prevValue=XXX"), v) + body := ReadBodyJSON(resp) + assert.Equal(t, body["action"], "compareAndDelete", "") + + node := body["node"].(map[string]interface{}) + assert.Equal(t, node["modifiedIndex"], 3, "") + + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a key is not deleted if the previous value does not match. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevValue=YYY +// +func TestV2DeleteKeyCADOnValueFail(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + + v := url.Values{} + v.Set("value", "XXX") + resp, _ := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar"), v) + ReadBody(resp) + resp, _ = DeleteForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar?prevValue=YYY"), v) + body := ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 101) + + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that an error is thrown if an invalid previous value is provided. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl -X DELETE localhost:4001/v2/keys/foo/bar?prevIndex= +// +func TestV2DeleteKeyCADWithInvalidValue(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + + v := url.Values{} + v.Set("value", "XXX") + resp, _ := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar"), v) + ReadBody(resp) + resp, _ = DeleteForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar?prevValue="), v) + body := ReadBodyJSON(resp) + assert.Equal(t, body["errorCode"], 201) + + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures a unique value is added to the key's children. +// +// $ curl -X POST localhost:4001/v2/keys/foo/bar +// $ curl -X POST localhost:4001/v2/keys/foo/bar +// $ curl -X POST localhost:4001/v2/keys/foo/baz +// +func TestV2CreateUnique(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + + // POST should add index to list. + fullURL := fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar") + resp, _ := PostForm(fullURL, nil) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + body := ReadBodyJSON(resp) + assert.Equal(t, body["action"], "create", "") + + node := body["node"].(map[string]interface{}) + assert.Equal(t, node["key"], "/foo/bar/2", "") + assert.Nil(t, node["dir"], "") + assert.Equal(t, node["modifiedIndex"], 2, "") + + // Second POST should add next index to list. + resp, _ = PostForm(fullURL, nil) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + body = ReadBodyJSON(resp) + + node = body["node"].(map[string]interface{}) + assert.Equal(t, node["key"], "/foo/bar/3", "") + + // POST to a different key should add index to that list. + resp, _ = PostForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/baz"), nil) + assert.Equal(t, resp.StatusCode, http.StatusCreated) + body = ReadBodyJSON(resp) + + node = body["node"].(map[string]interface{}) + assert.Equal(t, node["key"], "/foo/baz/4", "") + + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// +// $ curl localhost:4001/v2/keys/foo/bar -> fail +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl localhost:4001/v2/keys/foo/bar +// +func TestV2GetKey(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + + v := url.Values{} + v.Set("value", "XXX") + fullURL := fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar") + resp, _ := Get(fullURL) + resp.Body.Close() + + resp, _ = PutForm(fullURL, v) + resp.Body.Close() + + resp, _ = Get(fullURL) + assert.Equal(t, resp.Header.Get("Content-Type"), "application/json") + assert.Equal(t, resp.StatusCode, http.StatusOK) + body := ReadBodyJSON(resp) + resp.Body.Close() + assert.Equal(t, body["action"], "get", "") + node := body["node"].(map[string]interface{}) + assert.Equal(t, node["key"], "/foo/bar", "") + assert.Equal(t, node["value"], "XXX", "") + assert.Equal(t, node["modifiedIndex"], 2, "") + + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a directory of values can be recursively retrieved for a given key. +// +// $ curl -X PUT localhost:4001/v2/keys/foo/x -d value=XXX +// $ curl -X PUT localhost:4001/v2/keys/foo/y/z -d value=YYY +// $ curl localhost:4001/v2/keys/foo -d recursive=true +// +func TestV2GetKeyRecursively(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + + v := url.Values{} + v.Set("value", "XXX") + v.Set("ttl", "10") + resp, _ := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/x"), v) + ReadBody(resp) + + v.Set("value", "YYY") + resp, _ = PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/y/z"), v) + ReadBody(resp) + + resp, _ = Get(fmt.Sprintf("%s%s", u, "/v2/keys/foo?recursive=true")) + assert.Equal(t, resp.StatusCode, http.StatusOK) + body := ReadBodyJSON(resp) + assert.Equal(t, body["action"], "get", "") + node := body["node"].(map[string]interface{}) + assert.Equal(t, node["key"], "/foo", "") + assert.Equal(t, node["dir"], true, "") + assert.Equal(t, node["modifiedIndex"], 2, "") + assert.Equal(t, len(node["nodes"].([]interface{})), 2, "") + + node0 := node["nodes"].([]interface{})[0].(map[string]interface{}) + assert.Equal(t, node0["key"], "/foo/x", "") + assert.Equal(t, node0["value"], "XXX", "") + assert.Equal(t, node0["ttl"], 10, "") + + node1 := node["nodes"].([]interface{})[1].(map[string]interface{}) + assert.Equal(t, node1["key"], "/foo/y", "") + assert.Equal(t, node1["dir"], true, "") + + node2 := node1["nodes"].([]interface{})[0].(map[string]interface{}) + assert.Equal(t, node2["key"], "/foo/y/z", "") + assert.Equal(t, node2["value"], "YYY", "") + + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a watcher can wait for a value to be set and return it to the client. +// +// $ curl localhost:4001/v2/keys/foo/bar?wait=true +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// +func TestV2WatchKey(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + + // There exists a little gap between etcd ready to serve and + // it actually serves the first request, which means the response + // delay could be a little bigger. + // This test is time sensitive, so it does one request to ensure + // that the server is working. + resp, _ := Get(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar")) + resp.Body.Close() + + var watchResp *http.Response + c := make(chan bool) + go func() { + watchResp, _ = Get(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar?wait=true")) + c <- true + }() + + // Make sure response didn't fire early. + time.Sleep(1 * time.Millisecond) + + // Set a value. + v := url.Values{} + v.Set("value", "XXX") + resp, _ = PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar"), v) + ReadBody(resp) + + // A response should follow from the GET above. + time.Sleep(1 * time.Millisecond) + + select { + case <-c: + default: + t.Fatal("cannot get watch result") + } + + body := ReadBodyJSON(watchResp) + watchResp.Body.Close() + assert.NotNil(t, body, "") + assert.Equal(t, body["action"], "set", "") + + node := body["node"].(map[string]interface{}) + assert.Equal(t, node["key"], "/foo/bar", "") + assert.Equal(t, node["value"], "XXX", "") + assert.Equal(t, node["modifiedIndex"], 2, "") + + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a watcher can wait for a value to be set after a given index. +// +// $ curl localhost:4001/v2/keys/foo/bar?wait=true&waitIndex=3 +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=YYY +// +func TestV2WatchKeyWithIndex(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + + var body map[string]interface{} + c := make(chan bool) + go func() { + resp, _ := Get(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar?wait=true&waitIndex=3")) + body = ReadBodyJSON(resp) + c <- true + }() + + // Make sure response didn't fire early. + time.Sleep(1 * time.Millisecond) + assert.Nil(t, body, "") + + // Set a value (before given index). + v := url.Values{} + v.Set("value", "XXX") + resp, _ := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar"), v) + ReadBody(resp) + + // Make sure response didn't fire early. + time.Sleep(1 * time.Millisecond) + assert.Nil(t, body, "") + + // Set a value (before given index). + v.Set("value", "YYY") + resp, _ = PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar"), v) + ReadBody(resp) + + // A response should follow from the GET above. + time.Sleep(1 * time.Millisecond) + + select { + case <-c: + default: + t.Fatal("cannot get watch result") + } + + assert.NotNil(t, body, "") + assert.Equal(t, body["action"], "set", "") + + node := body["node"].(map[string]interface{}) + assert.Equal(t, node["key"], "/foo/bar", "") + assert.Equal(t, node["value"], "YYY", "") + assert.Equal(t, node["modifiedIndex"], 3, "") + + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that a watcher can wait for a value to be set after a given index. +// +// $ curl localhost:4001/v2/keys/keyindir/bar?wait=true +// $ curl -X PUT localhost:4001/v2/keys/keyindir -d dir=true -d ttl=1 +// $ curl -X PUT localhost:4001/v2/keys/keyindir/bar -d value=YYY +// +func TestV2WatchKeyInDir(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + + var body map[string]interface{} + c := make(chan bool) + + // Set a value (before given index). + v := url.Values{} + v.Set("dir", "true") + v.Set("ttl", "1") + resp, _ := PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/keyindir"), v) + ReadBody(resp) + + // Set a value (before given index). + v = url.Values{} + v.Set("value", "XXX") + resp, _ = PutForm(fmt.Sprintf("%s%s", u, "/v2/keys/keyindir/bar"), v) + ReadBody(resp) + + go func() { + resp, _ := Get(fmt.Sprintf("%s%s", u, "/v2/keys/keyindir/bar?wait=true")) + body = ReadBodyJSON(resp) + c <- true + }() + + // wait for expiration, we do have a up to 500 millisecond delay + time.Sleep(time.Second + time.Millisecond*500) + + select { + case <-c: + default: + t.Fatal("cannot get watch result") + } + + assert.NotNil(t, body, "") + assert.Equal(t, body["action"], "expire", "") + + node := body["node"].(map[string]interface{}) + assert.Equal(t, node["key"], "/keyindir", "") + + es[0].Stop() + hs[0].Close() + afterTest(t) +} + +// Ensures that HEAD could work. +// +// $ curl -I localhost:4001/v2/keys/foo/bar -> fail +// $ curl -X PUT localhost:4001/v2/keys/foo/bar -d value=XXX +// $ curl -I localhost:4001/v2/keys/foo/bar +// +func TestV2HeadKey(t *testing.T) { + es, hs := buildCluster(1) + u := hs[0].URL + + v := url.Values{} + v.Set("value", "XXX") + fullURL := fmt.Sprintf("%s%s", u, "/v2/keys/foo/bar") + resp, _ := Head(fullURL) + assert.Equal(t, resp.StatusCode, http.StatusNotFound) + assert.Equal(t, resp.ContentLength, -1) + + resp, _ = PutForm(fullURL, v) + ReadBody(resp) + + resp, _ = Head(fullURL) + assert.Equal(t, resp.StatusCode, http.StatusOK) + assert.Equal(t, resp.ContentLength, -1) + + es[0].Stop() + hs[0].Close() + afterTest(t) +} diff --git a/etcd/v2_raft.go b/etcd/v2_raft.go new file mode 100644 index 000000000..bf1a9ae05 --- /dev/null +++ b/etcd/v2_raft.go @@ -0,0 +1,46 @@ +package etcd + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/coreos/etcd/raft" +) + +type v2Proposal struct { + data []byte + ret chan interface{} +} + +type wait struct { + index int + term int +} + +type v2Raft struct { + *raft.Node + result map[wait]chan interface{} + term int +} + +func (r *v2Raft) Propose(p v2Proposal) error { + if !r.Node.IsLeader() { + return fmt.Errorf("not leader") + } + r.Node.Propose(p.data) + r.result[wait{r.Index(), r.Term()}] = p.ret + return nil +} + +func (r *v2Raft) Sync() { + if !r.Node.IsLeader() { + return + } + sync := &cmd{Type: "sync", Time: time.Now()} + data, err := json.Marshal(sync) + if err != nil { + panic(err) + } + r.Node.Propose(data) +} diff --git a/etcd/v2_store.go b/etcd/v2_store.go new file mode 100644 index 000000000..31abc9280 --- /dev/null +++ b/etcd/v2_store.go @@ -0,0 +1,78 @@ +package etcd + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/coreos/etcd/store" +) + +type cmd struct { + Type string + Key string + Value string + PrevValue string + PrevIndex uint64 + Dir bool + Recursive bool + Unique bool + Time time.Time +} + +func (s *Server) Set(key string, dir bool, value string, expireTime time.Time) (*store.Event, error) { + set := &cmd{Type: "set", Key: key, Dir: dir, Value: value, Time: expireTime} + return s.do(set) +} + +func (s *Server) Create(key string, dir bool, value string, expireTime time.Time, unique bool) (*store.Event, error) { + create := &cmd{Type: "create", Key: key, Dir: dir, Value: value, Time: expireTime, Unique: unique} + return s.do(create) +} + +func (s *Server) Update(key string, value string, expireTime time.Time) (*store.Event, error) { + update := &cmd{Type: "update", Key: key, Value: value, Time: expireTime} + return s.do(update) +} + +func (s *Server) CAS(key, value, prevValue string, prevIndex uint64, expireTime time.Time) (*store.Event, error) { + cas := &cmd{Type: "cas", Key: key, Value: value, PrevValue: prevValue, PrevIndex: prevIndex, Time: expireTime} + return s.do(cas) +} + +func (s *Server) Delete(key string, dir, recursive bool) (*store.Event, error) { + d := &cmd{Type: "delete", Key: key, Dir: dir, Recursive: recursive} + return s.do(d) +} + +func (s *Server) CAD(key string, prevValue string, prevIndex uint64) (*store.Event, error) { + cad := &cmd{Type: "cad", Key: key, PrevValue: prevValue, PrevIndex: prevIndex} + return s.do(cad) +} + +func (s *Server) do(c *cmd) (*store.Event, error) { + data, err := json.Marshal(c) + if err != nil { + panic(err) + } + + p := v2Proposal{ + data: data, + ret: make(chan interface{}, 1), + } + + select { + case s.proposal <- p: + default: + return nil, fmt.Errorf("unable to send out the proposal") + } + + switch t := (<-p.ret).(type) { + case *store.Event: + return t, nil + case error: + return nil, t + default: + panic("server.do: unexpected return type") + } +} diff --git a/etcd/v2_util.go b/etcd/v2_util.go new file mode 100644 index 000000000..bf2a12fd0 --- /dev/null +++ b/etcd/v2_util.go @@ -0,0 +1,78 @@ +package etcd + +import ( + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "strings" +) + +// Creates a new HTTP client with KeepAlive disabled. +func NewHTTPClient() *http.Client { + return &http.Client{Transport: &http.Transport{DisableKeepAlives: true}} +} + +// Reads the body from the response and closes it. +func ReadBody(resp *http.Response) []byte { + if resp == nil { + return []byte{} + } + body, _ := ioutil.ReadAll(resp.Body) + resp.Body.Close() + return body +} + +// Reads the body from the response and parses it as JSON. +func ReadBodyJSON(resp *http.Response) map[string]interface{} { + m := make(map[string]interface{}) + b := ReadBody(resp) + if err := json.Unmarshal(b, &m); err != nil { + panic(fmt.Sprintf("HTTP body JSON parse error: %v: %s", err, string(b))) + } + return m +} + +func Head(url string) (*http.Response, error) { + return send("HEAD", url, "application/json", nil) +} + +func Get(url string) (*http.Response, error) { + return send("GET", url, "application/json", nil) +} + +func Post(url string, bodyType string, body io.Reader) (*http.Response, error) { + return send("POST", url, bodyType, body) +} + +func PostForm(url string, data url.Values) (*http.Response, error) { + return Post(url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode())) +} + +func Put(url string, bodyType string, body io.Reader) (*http.Response, error) { + return send("PUT", url, bodyType, body) +} + +func PutForm(url string, data url.Values) (*http.Response, error) { + return Put(url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode())) +} + +func Delete(url string, bodyType string, body io.Reader) (*http.Response, error) { + return send("DELETE", url, bodyType, body) +} + +func DeleteForm(url string, data url.Values) (*http.Response, error) { + return Delete(url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode())) +} + +func send(method string, url string, bodyType string, body io.Reader) (*http.Response, error) { + c := NewHTTPClient() + req, err := http.NewRequest(method, url, body) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", bodyType) + return c.Do(req) +} diff --git a/etcd/z_last_test.go b/etcd/z_last_test.go new file mode 100644 index 000000000..b3addcda8 --- /dev/null +++ b/etcd/z_last_test.go @@ -0,0 +1,94 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package etcd + +import ( + "net/http" + "runtime" + "sort" + "strings" + "testing" + "time" +) + +func interestingGoroutines() (gs []string) { + buf := make([]byte, 2<<20) + buf = buf[:runtime.Stack(buf, true)] + for _, g := range strings.Split(string(buf), "\n\n") { + sl := strings.SplitN(g, "\n", 2) + if len(sl) != 2 { + continue + } + stack := strings.TrimSpace(sl[1]) + if stack == "" || + strings.Contains(stack, "created by testing.RunTests") || + strings.Contains(stack, "testing.Main(") || + strings.Contains(stack, "runtime.goexit") || + strings.Contains(stack, "created by runtime.gc") || + strings.Contains(stack, "runtime.MHeap_Scavenger") { + continue + } + gs = append(gs, stack) + } + sort.Strings(gs) + return +} + +// Verify the other tests didn't leave any goroutines running. +// This is in a file named z_last_test.go so it sorts at the end. +func TestGoroutinesRunning(t *testing.T) { + if testing.Short() { + t.Skip("not counting goroutines for leakage in -short mode") + } + gs := interestingGoroutines() + + n := 0 + stackCount := make(map[string]int) + for _, g := range gs { + stackCount[g]++ + n++ + } + + t.Logf("num goroutines = %d", n) + if n > 0 { + t.Error("Too many goroutines.") + for stack, count := range stackCount { + t.Logf("%d instances of:\n%s", count, stack) + } + } +} + +func afterTest(t *testing.T) { + http.DefaultTransport.(*http.Transport).CloseIdleConnections() + if testing.Short() { + return + } + var bad string + badSubstring := map[string]string{ + ").readLoop(": "a Transport", + ").writeLoop(": "a Transport", + "created by net/http/httptest.(*Server).Start": "an httptest.Server", + "timeoutHandler": "a TimeoutHandler", + "net.(*netFD).connect(": "a timing out dial", + ").noteClientGone(": "a closenotifier sender", + } + var stacks string + for i := 0; i < 4; i++ { + bad = "" + stacks = strings.Join(interestingGoroutines(), "\n\n") + for substr, what := range badSubstring { + if strings.Contains(stacks, substr) { + bad = what + } + } + if bad == "" { + return + } + // Bad stuff found, but goroutines might just still be + // shutting down, so give it some time. + time.Sleep(50 * time.Millisecond) + } + t.Errorf("Test appears to have leaked %s:\n%s", bad, stacks) +} diff --git a/main.go b/main.go index e2f370d1f..5bc00b7b3 100644 --- a/main.go +++ b/main.go @@ -1,44 +1,58 @@ -/* -Copyright 2013 CoreOS Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - package main import ( - "fmt" - "os" + "flag" + "log" + "net/http" + "net/url" + "strings" - "github.com/coreos/etcd/config" "github.com/coreos/etcd/etcd" - "github.com/coreos/etcd/server" +) + +var ( + laddr = flag.String("l", ":8000", "The port to listen on") + paddr = flag.String("p", "127.0.0.1:8000", "The public address to be adversited") + cluster = flag.String("c", "", "The cluster to join") ) func main() { - var config = config.New() - if err := config.Load(os.Args[1:]); err != nil { - fmt.Println(server.Usage() + "\n") - fmt.Println(err.Error() + "\n") - os.Exit(1) - } else if config.ShowVersion { - fmt.Println("etcd version", server.ReleaseVersion) - os.Exit(0) - } else if config.ShowHelp { - fmt.Println(server.Usage() + "\n") - os.Exit(0) + flag.Parse() + + p, err := sanitizeURL(*paddr) + if err != nil { + log.Fatal(err) } - var etcd = etcd.New(config) - etcd.Run() + var e *etcd.Server + + if len(*cluster) == 0 { + e = etcd.New(1, p, nil) + go e.Bootstrap() + } else { + addrs := strings.Split(*cluster, ",") + cStr := addrs[0] + c, err := sanitizeURL(cStr) + if err != nil { + log.Fatal(err) + } + e = etcd.New(len(addrs), p, []string{c}) + go e.Join() + } + + if err := http.ListenAndServe(*laddr, e); err != nil { + log.Fatal("system", err) + } +} + +func sanitizeURL(ustr string) (string, error) { + u, err := url.Parse(ustr) + if err != nil { + return "", err + } + + if u.Scheme == "" { + u.Scheme = "http" + } + return u.String(), nil } diff --git a/raft/node.go b/raft/node.go index f063d78a1..6fb06f846 100644 --- a/raft/node.go +++ b/raft/node.go @@ -41,8 +41,18 @@ func New(id int64, heartbeat, election tick) *Node { func (n *Node) Id() int64 { return n.sm.id } +func (n *Node) Index() int { return n.sm.log.lastIndex() } + +func (n *Node) Term() int { return n.sm.term } + +func (n *Node) Applied() int { return n.sm.log.applied } + func (n *Node) HasLeader() bool { return n.sm.lead != none } +func (n *Node) IsLeader() bool { return n.sm.lead == n.Id() } + +func (n *Node) Leader() int { return n.sm.lead } + // Propose asynchronously proposes data be applied to the underlying state machine. func (n *Node) Propose(data []byte) { n.propose(Normal, data) }