diff --git a/main.go b/main.go index da3162d07..6c7647f3b 100644 --- a/main.go +++ b/main.go @@ -2,24 +2,32 @@ package main import ( "flag" + "fmt" "log" "net/http" + "os" + "path" "strconv" "time" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/etcdhttp" "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/store" + "github.com/coreos/etcd/wal" +) + +const ( + // the owner can make/remove files inside the directory + privateDirMode = 0700 ) var ( fid = flag.String("id", "0x1", "Id of this server") timeout = flag.Duration("timeout", 10*time.Second, "Request Timeout") laddr = flag.String("l", ":8080", "HTTP service address (e.g., ':8080')") - - peers = etcdhttp.Peers{} + dir = flag.String("data-dir", "", "Path to the data directory") + peers = etcdhttp.Peers{} ) func init() { @@ -39,13 +47,21 @@ func main() { log.Fatalf("%#x= must be specified in peers", id) } - n := raft.Start(id, peers.Ids(), 10, 1) + if *dir == "" { + *dir = fmt.Sprintf("%v_etcd_data", *fid) + log.Printf("main: no data-dir is given, using default data-dir ./%s", *dir) + } + if err := os.MkdirAll(*dir, privateDirMode); err != nil { + log.Fatalf("main: cannot create data directory: %v", err) + } + + n, w := startRaft(id, peers.Ids(), path.Join(*dir, "wal")) tk := time.NewTicker(100 * time.Millisecond) s := &etcdserver.Server{ Store: store.New(), Node: n, - Save: func(st raftpb.State, ents []raftpb.Entry) {}, // TODO: use wal + Save: w.Save, Send: etcdhttp.Sender(peers), Ticker: tk.C, } @@ -57,3 +73,35 @@ func main() { http.Handle("/", h) log.Fatal(http.ListenAndServe(*laddr, nil)) } + +// startRaft starts a raft node from the given wal dir. +// If the wal dir does not exist, startRaft will start a new raft node. +// If the wal dir exists, startRaft will restart the previous raft node. +// startRaft returns the started raft node and the opened wal. +func startRaft(id int64, peerIds []int64, waldir string) (raft.Node, *wal.WAL) { + if !wal.Exist(waldir) { + w, err := wal.Create(waldir) + if err != nil { + log.Fatal(err) + } + n := raft.Start(id, peerIds, 10, 1) + return n, w + } + + // restart a node from previous wal + // TODO(xiangli): check snapshot; not open from zero + w, err := wal.OpenFromIndex(waldir, 0) + if err != nil { + log.Fatal(err) + } + wid, st, ents, err := w.ReadAll() + // TODO(xiangli): save/recovery nodeID? + if wid != 0 { + log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid) + } + if err != nil { + log.Fatal(err) + } + n := raft.Restart(id, peerIds, 10, 1, st, ents) + return n, w +} diff --git a/raft/node.go b/raft/node.go index 48f089105..e4b2e1740 100644 --- a/raft/node.go +++ b/raft/node.go @@ -50,7 +50,23 @@ type Node struct { } func Start(id int64, peers []int64, election, heartbeat int) Node { - n := Node{ + n := newNode() + r := newRaft(id, peers, election, heartbeat) + go n.run(r) + return n +} + +func Restart(id int64, peers []int64, election, heartbeat int, st pb.State, ents []pb.Entry) Node { + n := newNode() + r := newRaft(id, peers, election, heartbeat) + r.loadState(st) + r.loadEnts(ents) + go n.run(r) + return n +} + +func newNode() Node { + return Node{ propc: make(chan pb.Message), recvc: make(chan pb.Message), readyc: make(chan Ready), @@ -58,9 +74,6 @@ func Start(id int64, peers []int64, election, heartbeat int) Node { alwaysreadyc: make(chan Ready), done: make(chan struct{}), } - r := newRaft(id, peers, election, heartbeat) - go n.run(r) - return n } func (n *Node) Stop() { @@ -73,7 +86,8 @@ func (n *Node) run(r *raft) { var lead int64 var prev Ready - prev.Vote = none + prev.State = r.State + for { if lead != r.lead { log.Printf("raft: leader changed from %#x to %#x", lead, r.lead) diff --git a/raft/node_test.go b/raft/node_test.go index 946efbb01..7a511d1b8 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -42,3 +42,28 @@ func TestNode(t *testing.T) { default: } } + +func TestNodeRestart(t *testing.T) { + entries := []raftpb.Entry{ + {Term: 1, Index: 1}, + {Term: 1, Index: 2, Data: []byte("foo")}, + } + st := raftpb.State{Term: 1, Vote: -1, Commit: 1, LastIndex: 2} + + want := Ready{ + State: st, + // commit upto index commit index in st + CommittedEntries: entries[:st.Commit], + } + + n := Restart(1, []int64{1}, 0, 0, st, entries) + if g := <-n.Ready(); !reflect.DeepEqual(g, want) { + t.Errorf("g = %+v,\n w %+v", g, want) + } + + select { + case rd := <-n.Ready(): + t.Errorf("unexpected Ready: %+v", rd) + default: + } +} diff --git a/raft/raft.go b/raft/raft.go index 4058458ea..2245749f0 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -512,4 +512,6 @@ func (r *raft) loadState(state pb.State) { r.raftLog.committed = state.Commit r.Term = state.Term r.Vote = state.Vote + r.Commit = state.Commit + r.LastIndex = state.LastIndex } diff --git a/wal/wal.go b/wal/wal.go index acce3dc03..3335fb3d1 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -35,6 +35,9 @@ const ( entryType stateType crcType + + // the owner can make/remove files inside the directory + privateDirMode = 0700 ) var ( @@ -66,6 +69,11 @@ func Create(dirpath string) (*WAL, error) { if Exist(dirpath) { return nil, os.ErrExist } + + if err := os.MkdirAll(dirpath, privateDirMode); err != nil { + return nil, err + } + p := path.Join(dirpath, fmt.Sprintf("%016x-%016x.wal", 0, 0)) f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) if err != nil { @@ -136,20 +144,16 @@ func OpenFromIndex(dirpath string, index int64) (*WAL, error) { // ReadAll reads out all records of the current WAL. // After ReadAll, the WAL will be ready for appending new records. -func (w *WAL) ReadAll() (int64, raftpb.State, []raftpb.Entry, error) { - var id int64 - var state raftpb.State - var entries []raftpb.Entry - +func (w *WAL) ReadAll() (id int64, state raftpb.State, ents []raftpb.Entry, err error) { rec := &walpb.Record{} decoder := w.decoder - var err error + for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) { switch rec.Type { case entryType: e := mustUnmarshalEntry(rec.Data) if e.Index > w.ri { - entries = append(entries[:e.Index-w.ri-1], e) + ents = append(ents[:e.Index-w.ri-1], e) } case stateType: state = mustUnmarshalState(rec.Data) @@ -186,7 +190,7 @@ func (w *WAL) ReadAll() (int64, raftpb.State, []raftpb.Entry, error) { // create encoder (chain crc with the decoder), enable appending w.encoder = newEncoder(w.f, w.decoder.lastCRC()) w.decoder = nil - return id, state, entries, nil + return id, state, ents, nil } // index should be the index of last log entry. @@ -258,6 +262,15 @@ func (w *WAL) SaveState(s *raftpb.State) error { return w.encoder.encode(rec) } +func (w *WAL) Save(st raftpb.State, ents []raftpb.Entry) { + // TODO(xiangli): no more reference operator + w.SaveState(&st) + for i := range ents { + w.SaveEntry(&ents[i]) + } + w.Sync() +} + func (w *WAL) saveCrc(prevCrc uint32) error { return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc}) }