From 1a6e90897111a15b6dae85e9388927a8c79b8047 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 4 Sep 2014 21:15:39 -0700 Subject: [PATCH 01/14] *: add wal --- main.go | 40 +++++++++++++++++++++++++++++++++++++--- raft/node.go | 24 +++++++++++++++++++----- raft/node_test.go | 25 +++++++++++++++++++++++++ raft/raft.go | 2 ++ wal/wal.go | 14 ++++++++++++++ 5 files changed, 97 insertions(+), 8 deletions(-) diff --git a/main.go b/main.go index bc0854509..341c0d0da 100644 --- a/main.go +++ b/main.go @@ -2,22 +2,26 @@ package main import ( "flag" + "fmt" "log" "net/http" + "os" + "path" "strconv" "time" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/etcdhttp" "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/store" + "github.com/coreos/etcd/wal" ) var ( fid = flag.String("id", "0xBEEF", "Id of this server") timeout = flag.Duration("timeout", 10*time.Second, "Request Timeout") laddr = flag.String("l", ":8080", "HTTP service address (e.g., ':8080')") + dir = flag.String("d", "", "Directory to store wal files and snapshot files") peers = etcdhttp.Peers{} ) @@ -38,13 +42,43 @@ func main() { log.Fatalf("%#x= must be specified in peers", id) } - n := raft.Start(id, peers.Ids(), 10, 1) + if *dir == "" { + *dir = fmt.Sprintf("%v", *fid) + log.Printf("main: no data dir is given, use default data dir ./%s", *dir) + } + if err := os.MkdirAll(*dir, 0700); err != nil { + log.Fatal(err) + } + + waldir := path.Join(*dir, "wal") + + var w *wal.WAL + var n raft.Node + if wal.Exist(waldir) { + // TODO(xiangli): check snapshot; not open from zero + w, err = wal.OpenFromIndex(waldir, 0) + if err != nil { + log.Fatal(err) + } + // TODO(xiangli): save/recovery nodeID? + _, st, ents, err := w.ReadAll() + if err != nil { + log.Fatal(err) + } + n = raft.Restart(id, peers.Ids(), 10, 1, st, ents) + } else { + w, err = wal.Create(waldir) + if err != nil { + log.Fatal(err) + } + n = raft.Start(id, peers.Ids(), 10, 1) + } tk := time.NewTicker(100 * time.Millisecond) s := &etcdserver.Server{ Store: store.New(), Node: n, - Save: func(st raftpb.State, ents []raftpb.Entry) {}, // TODO: use wal + Save: w.Save, Send: etcdhttp.Sender(peers), Ticker: tk.C, } diff --git a/raft/node.go b/raft/node.go index 870e8746f..e89276caf 100644 --- a/raft/node.go +++ b/raft/node.go @@ -48,7 +48,23 @@ type Node struct { } func Start(id int64, peers []int64, election, heartbeat int) Node { - n := Node{ + n := newNode() + r := newRaft(id, peers, election, heartbeat) + go n.run(r) + return n +} + +func Restart(id int64, peers []int64, election, heartbeat int, st pb.State, ents []pb.Entry) Node { + n := newNode() + r := newRaft(id, peers, election, heartbeat) + r.loadState(st) + r.loadEnts(ents) + go n.run(r) + return n +} + +func newNode() Node { + return Node{ propc: make(chan pb.Message), recvc: make(chan pb.Message), readyc: make(chan Ready), @@ -56,9 +72,6 @@ func Start(id int64, peers []int64, election, heartbeat int) Node { alwaysreadyc: make(chan Ready), done: make(chan struct{}), } - r := newRaft(id, peers, election, heartbeat) - go n.run(r) - return n } func (n *Node) Stop() { @@ -71,7 +84,8 @@ func (n *Node) run(r *raft) { var lead int64 var prev Ready - prev.Vote = none + prev.State = r.State + for { if lead != r.lead { log.Printf("raft: leader changed from %#x to %#x", lead, r.lead) diff --git a/raft/node_test.go b/raft/node_test.go index 946efbb01..675015837 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -42,3 +42,28 @@ func TestNode(t *testing.T) { default: } } + +func TestNodeRestart(t *testing.T) { + entries := []raftpb.Entry{ + {Term: 1, Index: 1}, + {Term: 1, Index: 2, Data: []byte("foo")}, + } + st := raftpb.State{Term: 1, Vote: -1, Commit: 1, LastIndex: 2} + + want := Ready{ + State: raftpb.State{Term: 1, Vote: -1, Commit: 1, LastIndex: 2}, + // commit upto index 1 + CommittedEntries: []raftpb.Entry{{Term: 1, Index: 1}}, + } + + n := Restart(1, []int64{1}, 0, 0, st, entries) + if g := <-n.Ready(); !reflect.DeepEqual(g, want) { + t.Errorf("g = %+v,\n w %+v", g, want) + } + + select { + case rd := <-n.Ready(): + t.Errorf("unexpected Ready: %+v", rd) + default: + } +} diff --git a/raft/raft.go b/raft/raft.go index 4058458ea..2245749f0 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -512,4 +512,6 @@ func (r *raft) loadState(state pb.State) { r.raftLog.committed = state.Commit r.Term = state.Term r.Vote = state.Vote + r.Commit = state.Commit + r.LastIndex = state.LastIndex } diff --git a/wal/wal.go b/wal/wal.go index acce3dc03..b3628f1d4 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -66,6 +66,11 @@ func Create(dirpath string) (*WAL, error) { if Exist(dirpath) { return nil, os.ErrExist } + + if err := os.MkdirAll(dirpath, 0700); err != nil { + return nil, err + } + p := path.Join(dirpath, fmt.Sprintf("%016x-%016x.wal", 0, 0)) f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) if err != nil { @@ -258,6 +263,15 @@ func (w *WAL) SaveState(s *raftpb.State) error { return w.encoder.encode(rec) } +func (w *WAL) Save(st raftpb.State, ents []raftpb.Entry) { + // TODO(xiangli): no addresses fly around + w.SaveState(&st) + for i := range ents { + w.SaveEntry(&ents[i]) + } + w.Sync() +} + func (w *WAL) saveCrc(prevCrc uint32) error { return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc}) } From ee78890f227760c68098325ae68eee01ec6eff49 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 5 Sep 2014 09:56:25 -0700 Subject: [PATCH 02/14] main: use data-dir as the path to data directory flag --- main.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index 341c0d0da..bb9a2043c 100644 --- a/main.go +++ b/main.go @@ -21,9 +21,8 @@ var ( fid = flag.String("id", "0xBEEF", "Id of this server") timeout = flag.Duration("timeout", 10*time.Second, "Request Timeout") laddr = flag.String("l", ":8080", "HTTP service address (e.g., ':8080')") - dir = flag.String("d", "", "Directory to store wal files and snapshot files") - - peers = etcdhttp.Peers{} + dir = flag.String("data-dir", "", "Path to the data directory") + peers = etcdhttp.Peers{} ) func init() { @@ -44,7 +43,7 @@ func main() { if *dir == "" { *dir = fmt.Sprintf("%v", *fid) - log.Printf("main: no data dir is given, use default data dir ./%s", *dir) + log.Printf("main: no data-dir is given, use default data-dir ./%s", *dir) } if err := os.MkdirAll(*dir, 0700); err != nil { log.Fatal(err) From 0851a1fe7f6ca98be19bc06fcb5ccbbf8978b311 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 5 Sep 2014 10:02:46 -0700 Subject: [PATCH 03/14] main: better error msg --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index bb9a2043c..f0a1fe6a8 100644 --- a/main.go +++ b/main.go @@ -46,7 +46,7 @@ func main() { log.Printf("main: no data-dir is given, use default data-dir ./%s", *dir) } if err := os.MkdirAll(*dir, 0700); err != nil { - log.Fatal(err) + log.Fatalf("main: cannot create data directory: %v", err) } waldir := path.Join(*dir, "wal") From 8c9d7e3e9374e9d117d6f03b80c46f6fede57190 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 5 Sep 2014 10:16:55 -0700 Subject: [PATCH 04/14] main: add a helper function for starting raft --- main.go | 53 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/main.go b/main.go index f0a1fe6a8..60af748ba 100644 --- a/main.go +++ b/main.go @@ -49,29 +49,7 @@ func main() { log.Fatalf("main: cannot create data directory: %v", err) } - waldir := path.Join(*dir, "wal") - - var w *wal.WAL - var n raft.Node - if wal.Exist(waldir) { - // TODO(xiangli): check snapshot; not open from zero - w, err = wal.OpenFromIndex(waldir, 0) - if err != nil { - log.Fatal(err) - } - // TODO(xiangli): save/recovery nodeID? - _, st, ents, err := w.ReadAll() - if err != nil { - log.Fatal(err) - } - n = raft.Restart(id, peers.Ids(), 10, 1, st, ents) - } else { - w, err = wal.Create(waldir) - if err != nil { - log.Fatal(err) - } - n = raft.Start(id, peers.Ids(), 10, 1) - } + n, w := startRaft(id, peers.Ids(), path.Join(*dir, "wal")) tk := time.NewTicker(100 * time.Millisecond) s := &etcdserver.Server{ @@ -89,3 +67,32 @@ func main() { http.Handle("/", h) log.Fatal(http.ListenAndServe(*laddr, nil)) } + +// startRaft starts a raft node from the given wal dir. +// If the wal dir does not exist, startRaft will start a new raft node. +// If the wal dir exists, startRaft will restart the previous raft node. +// startRaft returns the started raft node and the opened wal. +func startRaft(id int64, peerIds []int64, waldir string) (raft.Node, *wal.WAL) { + if !wal.Exist(waldir) { + w, err := wal.Create(waldir) + if err != nil { + log.Fatal(err) + } + n := raft.Start(id, peerIds, 10, 1) + return n, w + } + + // restart a node from previous wal + // TODO(xiangli): check snapshot; not open from zero + w, err := wal.OpenFromIndex(waldir, 0) + if err != nil { + log.Fatal(err) + } + // TODO(xiangli): save/recovery nodeID? + _, st, ents, err := w.ReadAll() + if err != nil { + log.Fatal(err) + } + n := raft.Restart(id, peerIds, 10, 1, st, ents) + return n, w +} From 36730ca613277225b5ef149ee69b3f14ea8d4798 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 8 Sep 2014 10:32:49 -0700 Subject: [PATCH 05/14] main: use -> using --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 60af748ba..2eafc5762 100644 --- a/main.go +++ b/main.go @@ -43,7 +43,7 @@ func main() { if *dir == "" { *dir = fmt.Sprintf("%v", *fid) - log.Printf("main: no data-dir is given, use default data-dir ./%s", *dir) + log.Printf("main: no data-dir is given, using default data-dir ./%s", *dir) } if err := os.MkdirAll(*dir, 0700); err != nil { log.Fatalf("main: cannot create data directory: %v", err) From 0461c517e4af059adba8dc9a73fade30559afb48 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 8 Sep 2014 13:57:35 -0700 Subject: [PATCH 06/14] wal: clarify TODO --- wal/wal.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wal/wal.go b/wal/wal.go index b3628f1d4..b8d5540a5 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -264,7 +264,7 @@ func (w *WAL) SaveState(s *raftpb.State) error { } func (w *WAL) Save(st raftpb.State, ents []raftpb.Entry) { - // TODO(xiangli): no addresses fly around + // TODO(xiangli): no more reference operator w.SaveState(&st) for i := range ents { w.SaveEntry(&ents[i]) From a3b6a646eb2c3cf90c46ce32257ffbb3b26cc828 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 8 Sep 2014 15:31:11 -0700 Subject: [PATCH 07/14] main: check the id read by from wal. --- main.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 2eafc5762..69579b324 100644 --- a/main.go +++ b/main.go @@ -88,8 +88,11 @@ func startRaft(id int64, peerIds []int64, waldir string) (raft.Node, *wal.WAL) { if err != nil { log.Fatal(err) } + wid, st, ents, err := w.ReadAll() // TODO(xiangli): save/recovery nodeID? - _, st, ents, err := w.ReadAll() + if wid != 0 { + log.Fatal("unimplemented: nodeid") + } if err != nil { log.Fatal(err) } From adff0f3813b93bf5c744997eeb7c7c3fc91e5fed Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 8 Sep 2014 15:36:25 -0700 Subject: [PATCH 08/14] wal: named return values for ReadAll. --- wal/wal.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/wal/wal.go b/wal/wal.go index b8d5540a5..1d90de007 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -141,14 +141,10 @@ func OpenFromIndex(dirpath string, index int64) (*WAL, error) { // ReadAll reads out all records of the current WAL. // After ReadAll, the WAL will be ready for appending new records. -func (w *WAL) ReadAll() (int64, raftpb.State, []raftpb.Entry, error) { - var id int64 - var state raftpb.State - var entries []raftpb.Entry - +func (w *WAL) ReadAll() (id int64, state raftpb.State, entries []raftpb.Entry, err error) { rec := &walpb.Record{} decoder := w.decoder - var err error + for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) { switch rec.Type { case entryType: From b094410066c34ec7c135a04efcaddcbf996c9351 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 8 Sep 2014 15:40:12 -0700 Subject: [PATCH 09/14] wal: change entries->ents for consistency --- wal/wal.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/wal/wal.go b/wal/wal.go index 1d90de007..ba5737d98 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -141,7 +141,7 @@ func OpenFromIndex(dirpath string, index int64) (*WAL, error) { // ReadAll reads out all records of the current WAL. // After ReadAll, the WAL will be ready for appending new records. -func (w *WAL) ReadAll() (id int64, state raftpb.State, entries []raftpb.Entry, err error) { +func (w *WAL) ReadAll() (id int64, state raftpb.State, ents []raftpb.Entry, err error) { rec := &walpb.Record{} decoder := w.decoder @@ -150,7 +150,7 @@ func (w *WAL) ReadAll() (id int64, state raftpb.State, entries []raftpb.Entry, e case entryType: e := mustUnmarshalEntry(rec.Data) if e.Index > w.ri { - entries = append(entries[:e.Index-w.ri-1], e) + ents = append(ents[:e.Index-w.ri-1], e) } case stateType: state = mustUnmarshalState(rec.Data) @@ -187,7 +187,7 @@ func (w *WAL) ReadAll() (id int64, state raftpb.State, entries []raftpb.Entry, e // create encoder (chain crc with the decoder), enable appending w.encoder = newEncoder(w.f, w.decoder.lastCRC()) w.decoder = nil - return id, state, entries, nil + return id, state, ents, nil } // index should be the index of last log entry. From 54734b0903ac613209757160b3d25e3ef42a2bd4 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 8 Sep 2014 15:45:58 -0700 Subject: [PATCH 10/14] main/wal: add a const for 0700 magic number --- main.go | 7 ++++++- wal/wal.go | 5 ++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 69579b324..7f79287e1 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,11 @@ import ( "github.com/coreos/etcd/wal" ) +const ( + // the owner can make/remove files inside the directory + privateDirMode = 0700 +) + var ( fid = flag.String("id", "0xBEEF", "Id of this server") timeout = flag.Duration("timeout", 10*time.Second, "Request Timeout") @@ -45,7 +50,7 @@ func main() { *dir = fmt.Sprintf("%v", *fid) log.Printf("main: no data-dir is given, using default data-dir ./%s", *dir) } - if err := os.MkdirAll(*dir, 0700); err != nil { + if err := os.MkdirAll(*dir, privateDirMode); err != nil { log.Fatalf("main: cannot create data directory: %v", err) } diff --git a/wal/wal.go b/wal/wal.go index ba5737d98..3335fb3d1 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -35,6 +35,9 @@ const ( entryType stateType crcType + + // the owner can make/remove files inside the directory + privateDirMode = 0700 ) var ( @@ -67,7 +70,7 @@ func Create(dirpath string) (*WAL, error) { return nil, os.ErrExist } - if err := os.MkdirAll(dirpath, 0700); err != nil { + if err := os.MkdirAll(dirpath, privateDirMode); err != nil { return nil, err } From 19235c81043157290f35c0bb45cb8a78ac8db0ee Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 8 Sep 2014 16:10:13 -0700 Subject: [PATCH 11/14] raft: refactor restart test --- raft/node_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/raft/node_test.go b/raft/node_test.go index 675015837..95bd4304e 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -51,9 +51,9 @@ func TestNodeRestart(t *testing.T) { st := raftpb.State{Term: 1, Vote: -1, Commit: 1, LastIndex: 2} want := Ready{ - State: raftpb.State{Term: 1, Vote: -1, Commit: 1, LastIndex: 2}, + State: st, // commit upto index 1 - CommittedEntries: []raftpb.Entry{{Term: 1, Index: 1}}, + CommittedEntries: entries[:st.Commit], } n := Restart(1, []int64{1}, 0, 0, st, entries) From 9a57d1067d8613cd5aa8bd6e00b7522553784770 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 8 Sep 2014 16:15:18 -0700 Subject: [PATCH 12/14] main: make default data to have _data_etcd suffix --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 7f79287e1..096b6e88c 100644 --- a/main.go +++ b/main.go @@ -47,7 +47,7 @@ func main() { } if *dir == "" { - *dir = fmt.Sprintf("%v", *fid) + *dir = fmt.Sprintf("%v_etcd_data", *fid) log.Printf("main: no data-dir is given, using default data-dir ./%s", *dir) } if err := os.MkdirAll(*dir, privateDirMode); err != nil { From f3348d6e13922f0812c4c0d714e1c8db9ff03ad7 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 8 Sep 2014 16:16:58 -0700 Subject: [PATCH 13/14] raft: fix the comment in test --- raft/node_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft/node_test.go b/raft/node_test.go index 95bd4304e..7a511d1b8 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -52,7 +52,7 @@ func TestNodeRestart(t *testing.T) { want := Ready{ State: st, - // commit upto index 1 + // commit upto index commit index in st CommittedEntries: entries[:st.Commit], } From 4a02a1a60cd423a261055b13b1d4823ea4921282 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 8 Sep 2014 16:21:27 -0700 Subject: [PATCH 14/14] main: clarify fatal message for unexpect nodeid --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 096b6e88c..67c188071 100644 --- a/main.go +++ b/main.go @@ -96,7 +96,7 @@ func startRaft(id int64, peerIds []int64, waldir string) (raft.Node, *wal.WAL) { wid, st, ents, err := w.ReadAll() // TODO(xiangli): save/recovery nodeID? if wid != 0 { - log.Fatal("unimplemented: nodeid") + log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid) } if err != nil { log.Fatal(err)