diff --git a/contrib/README.md b/contrib/README.md index bd9e661cc..bdef8e4f1 100644 --- a/contrib/README.md +++ b/contrib/README.md @@ -3,3 +3,4 @@ Scripts and files which may be useful but aren't part of the core etcd project. - [systemd](systemd) - an example unit file for deploying etcd on systemd-based distributions +- [raftexample](raftexample) - an example distributed key-value store using raft diff --git a/contrib/raftexample/Procfile b/contrib/raftexample/Procfile new file mode 100644 index 000000000..6b2f7ccf0 --- /dev/null +++ b/contrib/raftexample/Procfile @@ -0,0 +1,4 @@ +# Use goreman to run `go get github.com/mattn/goreman` +raftexample1: ./raftexample --id 1 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 12380 +raftexample2: ./raftexample --id 2 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 22380 +raftexample3: ./raftexample --id 3 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 32380 diff --git a/contrib/raftexample/README.md b/contrib/raftexample/README.md new file mode 100644 index 000000000..d0ec1593d --- /dev/null +++ b/contrib/raftexample/README.md @@ -0,0 +1,90 @@ +# raftexample + +raftexample is an example usage of etcd's [raft library](../../raft). It provides a simple REST API for a key-value store cluster backed by the [Raft][raft] consensus algorithm. + +[raft]: http://raftconsensus.github.io/ + +## Getting Started + +### Running single node raftexample + +First start a single-member cluster of raftexample: + +```sh +raftexample --id 1 --cluster http://127.0.0.1:12379 --port 12380 +``` + +Each raftexample process maintains a single raft instance and a key-value server. +The process's list of comma separated peers (--cluster), its raft ID index into the peer list (--id), and http key-value server port (--port) are passed through the command line. + +Next, store a value ("hello") to a key ("my-key"): + +``` +curl -L http://127.0.0.1:12380/my-key -XPUT -d hello +``` + +Finally, retrieve the stored key: + +``` +curl -L http://127.0.0.1:12380/my-key +``` + +### Running a local cluster + +First install [goreman](https://github.com/mattn/goreman), which manages Procfile-based applications. + +The [Procfile script](./Procfile) will set up a local example cluster. You can start it with: + +```sh +goreman start +``` + +This will bring up three raftexample instances. + +You can write a key-value pair to any member of the cluster and likewise retrieve it from any member. + +### Fault Tolerance + +To test cluster recovery, first start a cluster and write a value "foo": +```sh +goreman start +curl -L http://127.0.0.1:12380/my-key -XPUT -d foo +``` + +Next, remove a node and replace the value with "bar" to check cluster availability: + +```sh +goreman run stop raftexample2 +curl -L http://127.0.0.1:12380/my-key -XPUT -d bar +curl -L http://127.0.0.1:32380/my-key +``` + +Finally, bring the node back up and verify it recovers with the updated value "bar": +```sh +goreman run start raftexample2 +curl -L http://127.0.0.1:22380/my-key +``` + +## Design + +The raftexample consists of three components: a raft-backed key-value store, a REST API server, and a raft consensus server based on etcd's raft implementation. + +The raft-backed key-value store is a key-value map that holds all committed key-values. +The store bridges communication between the raft server and the REST server. +Key-value updates are issued through the store to the raft server. +The store updates its map once raft reports the updates are committed. + +The REST server exposes the current raft consensus by accessing the raft-backed key-value store. +A GET command looks up a key in the store and returns the value, if any. +A key-value PUT command issues an update proposal to the store. + +The raft server participates in consensus with its cluster peers. +When the REST server submits a proposal, the raft server transmits the proposal to its peers. +When raft reaches a consensus, the server publishes all committed updates over a commit channel. +For raftexample, this commit channel is consumed by the key-value store. + +## Project Details + +### TODO +- Snapshot support +- Dynamic reconfiguration diff --git a/contrib/raftexample/httpapi.go b/contrib/raftexample/httpapi.go new file mode 100644 index 000000000..ff6fd1086 --- /dev/null +++ b/contrib/raftexample/httpapi.go @@ -0,0 +1,67 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "io/ioutil" + "log" + "net/http" + "strconv" +) + +// Handler for a http based key-value store backed by raft +type httpKVAPI struct { + store *kvstore +} + +func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { + key := r.RequestURI + switch { + case r.Method == "PUT": + v, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Printf("Failed to read on PUT (%v)\n", err) + http.Error(w, "Failed on PUT", http.StatusBadRequest) + return + } + + h.store.Propose(key, string(v)) + + // Optimistic-- no waiting for ack from raft. Value is not yet + // committed so a subsequent GET on the key may return old value + w.WriteHeader(http.StatusNoContent) + case r.Method == "GET": + if v, ok := h.store.Lookup(key); ok { + w.Write([]byte(v)) + } else { + http.Error(w, "Failed to GET", http.StatusNotFound) + } + default: + w.Header().Set("Allow", "PUT") + w.Header().Add("Allow", "GET") + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + } +} + +// serveHttpKVAPI starts a key-value server with a GET/PUT API and listens. +func serveHttpKVAPI(port int, proposeC chan<- string, commitC <-chan *string, errorC <-chan error) { + srv := http.Server{ + Addr: ":" + strconv.Itoa(port), + Handler: &httpKVAPI{newKVStore(proposeC, commitC, errorC)}, + } + if err := srv.ListenAndServe(); err != nil { + log.Fatal(err) + } +} diff --git a/contrib/raftexample/kvstore.go b/contrib/raftexample/kvstore.go new file mode 100644 index 000000000..1f1760dbd --- /dev/null +++ b/contrib/raftexample/kvstore.go @@ -0,0 +1,82 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bytes" + "encoding/gob" + "log" + "sync" +) + +// a key-value store backed by raft +type kvstore struct { + proposeC chan<- string // channel for proposing updates + mu sync.RWMutex + kvStore map[string]string // current committed key-value pairs +} + +type kv struct { + Key string + Val string +} + +func newKVStore(proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *kvstore { + s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string)} + // replay log into key-value map + s.readCommits(commitC, errorC) + // read commits from raft into kvStore map until error + go s.readCommits(commitC, errorC) + return s +} + +func (s *kvstore) Lookup(key string) (string, bool) { + s.mu.RLock() + v, ok := s.kvStore[key] + s.mu.RUnlock() + return v, ok +} + +func (s *kvstore) Propose(k string, v string) { + var buf bytes.Buffer + if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil { + log.Fatal(err) + } + s.proposeC <- string(buf.Bytes()) +} + +func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) { + for { + select { + case data := <-commitC: + if data == nil { + // done replaying log; new data incoming + return + } + + var data_kv kv + dec := gob.NewDecoder(bytes.NewBufferString(*data)) + if err := dec.Decode(&data_kv); err != nil { + log.Fatalf("raftexample: could not decode message (%v)", err) + } + s.mu.Lock() + s.kvStore[data_kv.Key] = data_kv.Val + s.mu.Unlock() + case err := <-errorC: + log.Println(err) + return + } + } +} diff --git a/contrib/raftexample/main.go b/contrib/raftexample/main.go new file mode 100644 index 000000000..c3ed76af7 --- /dev/null +++ b/contrib/raftexample/main.go @@ -0,0 +1,36 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "strings" +) + +func main() { + cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers") + id := flag.Int("id", 1, "node ID") + kvport := flag.Int("port", 9121, "key-value server port") + flag.Parse() + + proposeC := make(chan string) + defer close(proposeC) + + // raft provides a commit stream for the proposals from the http api + commitC, errorC := newRaftNode(*id, strings.Split(*cluster, ","), proposeC) + + // the key-value http handler will propose updates to raft + serveHttpKVAPI(*kvport, proposeC, commitC, errorC) +} diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go new file mode 100644 index 000000000..555f2f558 --- /dev/null +++ b/contrib/raftexample/raft.go @@ -0,0 +1,234 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "log" + "os" + "strconv" + "time" + + "net/http" + "net/url" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/rafthttp" + "github.com/coreos/etcd/wal" + "github.com/coreos/etcd/wal/walpb" +) + +// A key-value stream backed by raft +type raftNode struct { + proposeC <-chan string // proposed messages (k,v) + commitC chan *string // entries committed to log (k,v) + errorC chan error // errors from raft session + + id int // client ID for raft session + peers []string // raft peer URLs + waldir string // path to WAL directory + + // raft backing for the commit/error channel + node raft.Node + raftStorage *raft.MemoryStorage + wal *wal.WAL + transport *rafthttp.Transport +} + +// newRaftNode initiates a raft instance and returns a committed log entry +// channel and error channel. Proposals for log updates are sent over the +// provided the proposal channel. All log entries are replayed over the +// commit channel, followed by a nil message (to indicate the channel is +// current), then new log entries. +func newRaftNode(id int, peers []string, proposeC <-chan string) (<-chan *string, <-chan error) { + rc := &raftNode{ + proposeC: proposeC, + commitC: make(chan *string), + errorC: make(chan error), + id: id, + peers: peers, + waldir: fmt.Sprintf("raftexample-%d", id), + raftStorage: raft.NewMemoryStorage(), + // rest of structure populated after WAL replay + } + go rc.startRaft() + return rc.commitC, rc.errorC +} + +// publishEntries writes committed log entries to commit channel. +func (rc *raftNode) publishEntries(ents []raftpb.Entry) { + for i := range ents { + if ents[i].Type != raftpb.EntryNormal || len(ents[i].Data) == 0 { + // ignore conf changes and empty messages + continue + } + s := string(ents[i].Data) + rc.commitC <- &s + } +} + +// openWAL returns a WAL ready for reading. +func (rc *raftNode) openWAL() *wal.WAL { + if wal.Exist(rc.waldir) == false { + if err := os.Mkdir(rc.waldir, 0750); err != nil { + log.Fatalf("raftexample: cannot create dir for wal (%v)", err) + } + + w, err := wal.Create(rc.waldir, nil) + if err != nil { + log.Fatalf("raftexample: create wal error (%v)", err) + } + w.Close() + } + + w, err := wal.Open(rc.waldir, walpb.Snapshot{}) + if err != nil { + log.Fatalf("raftexample: error loading wal (%v)", err) + } + + return w +} + +// replayWAL replays WAL entries into the raft instance and the commit +// channel and returns an appendable WAL. +func (rc *raftNode) replayWAL() *wal.WAL { + w := rc.openWAL() + _, _, ents, err := w.ReadAll() + if err != nil { + log.Fatalf("raftexample: failed to read WAL (%v)", err) + } + // append to storage so raft starts at the right place in log + rc.raftStorage.Append(ents) + rc.publishEntries(ents) + // send nil value so client knows commit channel is current + rc.commitC <- nil + return w +} + +func (rc *raftNode) writeError(err error) { + rc.errorC <- err + rc.stop() +} + +func (rc *raftNode) stop() { + close(rc.commitC) + close(rc.errorC) + rc.node.Stop() +} + +func (rc *raftNode) startRaft() { + oldwal := wal.Exist(rc.waldir) + rc.wal = rc.replayWAL() + + rpeers := make([]raft.Peer, len(rc.peers)) + for i := range rpeers { + rpeers[i] = raft.Peer{ID: uint64(i + 1)} + } + c := &raft.Config{ + ID: uint64(rc.id), + ElectionTick: 10, + HeartbeatTick: 1, + Storage: rc.raftStorage, + MaxSizePerMsg: 1024 * 1024, + MaxInflightMsgs: 256, + } + + if oldwal { + rc.node = raft.RestartNode(c) + } else { + rc.node = raft.StartNode(c, rpeers) + } + + ss := &stats.ServerStats{} + ss.Initialize() + + rc.transport = &rafthttp.Transport{ + ID: types.ID(rc.id), + ClusterID: 0x1000, + Raft: rc, + ServerStats: ss, + LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)), + ErrorC: make(chan error), + } + + rc.transport.Start() + for i := range rc.peers { + if i+1 != rc.id { + rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]}) + } + } + + go rc.serveRaft() + go rc.serveChannels() +} + +func (rc *raftNode) serveChannels() { + defer rc.wal.Close() + + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + // event loop on client proposals and raft updates + for { + select { + case <-ticker.C: + rc.node.Tick() + + // send proposals over raft + case prop, ok := <-rc.proposeC: + if !ok { + // client closed channel; shut down + rc.stop() + return + } + rc.node.Propose(context.TODO(), []byte(prop)) + + // store raft entries to wal, then publish over commit channel + case rd := <-rc.node.Ready(): + rc.wal.Save(rd.HardState, rd.Entries) + rc.raftStorage.Append(rd.Entries) + rc.transport.Send(rd.Messages) + rc.publishEntries(rd.Entries) + rc.node.Advance() + + case err := <-rc.transport.ErrorC: + rc.writeError(err) + return + } + } +} + +func (rc *raftNode) serveRaft() { + url, err := url.Parse(rc.peers[rc.id-1]) + if err != nil { + log.Fatalf("raftexample: Failed parsing URL (%v)", err) + } + + srv := http.Server{Addr: url.Host, Handler: rc.transport.Handler()} + if err := srv.ListenAndServe(); err != nil { + log.Fatalf("raftexample: Failed serving rafthttp (%v)", err) + } +} + +func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error { + return rc.node.Step(ctx, m) +} +func (rc *raftNode) IsIDRemoved(id uint64) bool { return false } +func (rc *raftNode) ReportUnreachable(id uint64) {} +func (rc *raftNode) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}