diff --git a/etcd/etcd.go b/etcd/etcd.go index ee331ad4c..d9b190ef4 100644 --- a/etcd/etcd.go +++ b/etcd/etcd.go @@ -22,6 +22,7 @@ import ( "log" "net/http" "net/url" + "os" "sync" "time" @@ -55,7 +56,7 @@ type Server struct { http.Handler } -func New(c *config.Config) *Server { +func New(c *config.Config) (*Server, error) { if err := c.Sanitize(); err != nil { log.Fatalf("server.new sanitizeErr=\"%v\"\n", err) } @@ -95,8 +96,12 @@ func New(c *config.Config) *Server { s.Handler = m log.Printf("id=%x server.new raftPubAddr=%s\n", s.id, s.raftPubAddr) - - return s + if err = os.MkdirAll(s.config.DataDir, 0700); err != nil { + if !os.IsExist(err) { + return nil, err + } + } + return s, nil } func (s *Server) SetTick(tick time.Duration) { @@ -176,7 +181,12 @@ func (s *Server) Run() error { } switch next { case participantMode: - s.p = newParticipant(s.id, s.pubAddr, s.raftPubAddr, s.client, s.peerHub, s.tickDuration) + p, err := newParticipant(s.id, s.pubAddr, s.raftPubAddr, s.config.DataDir, s.client, s.peerHub, s.tickDuration) + if err != nil { + log.Printf("id=%x server.run newParicipanteErr=\"%v\"\n", s.id, err) + return err + } + s.p = p dStopc := make(chan struct{}) if d != nil { go d.heartbeat(dStopc) diff --git a/etcd/etcd_test.go b/etcd/etcd_test.go index c09cb9a81..06fb2f495 100644 --- a/etcd/etcd_test.go +++ b/etcd/etcd_test.go @@ -23,6 +23,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "os" "testing" "time" @@ -371,7 +372,13 @@ func buildCluster(number int, tls bool) ([]*Server, []*httptest.Server) { } func initTestServer(c *config.Config, id int64, tls bool) (e *Server, h *httptest.Server) { - e = New(c) + c.DataDir = fmt.Sprintf("tests/etcd_%d", id) + os.RemoveAll(c.DataDir) + + e, err := New(c) + if err != nil { + panic(err) + } e.setId(id) e.SetTick(time.Millisecond * 5) m := http.NewServeMux() diff --git a/etcd/participant.go b/etcd/participant.go index eb062e348..61e9395a5 100644 --- a/etcd/participant.go +++ b/etcd/participant.go @@ -22,6 +22,7 @@ import ( "log" "math/rand" "net/http" + "os" "path" "sync" "time" @@ -29,6 +30,7 @@ import ( etcdErr "github.com/coreos/etcd/error" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/store" + "github.com/coreos/etcd/wal" ) const ( @@ -74,6 +76,7 @@ type participant struct { node *v2Raft store.Store rh *raftHandler + w *wal.WAL stopped bool mu sync.Mutex @@ -82,12 +85,9 @@ type participant struct { *http.ServeMux } -func newParticipant(id int64, pubAddr string, raftPubAddr string, client *v2client, peerHub *peerHub, tickDuration time.Duration) *participant { +func newParticipant(id int64, pubAddr string, raftPubAddr string, dir string, client *v2client, peerHub *peerHub, tickDuration time.Duration) (*participant, error) { p := &participant{ - id: id, clusterId: -1, - pubAddr: pubAddr, - raftPubAddr: raftPubAddr, tickDuration: tickDuration, client: client, @@ -97,7 +97,6 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, client *v2clie addNodeC: make(chan raft.Config, 1), removeNodeC: make(chan raft.Config, 1), node: &v2Raft{ - Node: raft.New(id, defaultHeartbeat, defaultElection), result: make(map[wait]chan interface{}), }, Store: store.New(), @@ -107,6 +106,31 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, client *v2clie ServeMux: http.NewServeMux(), } p.rh = newRaftHandler(peerHub, p.Store.Version()) + + walPath := path.Join(dir, "wal") + w, err := wal.Open(walPath) + if err != nil { + if !os.IsNotExist(err) { + return nil, err + } + if w, err = wal.New(walPath); err != nil { + return nil, err + } + w.SaveInfo(p.id) + p.id = id + p.pubAddr = pubAddr + p.raftPubAddr = raftPubAddr + p.node.Node = raft.New(p.id, defaultHeartbeat, defaultElection) + } else { + n, err := w.LoadNode() + if err != nil { + return nil, err + } + p.id = n.Id + p.node.Node = raft.Recover(n.Id, n.Ents, n.State, defaultHeartbeat, defaultElection) + } + p.w = w + p.Handle(v2Prefix+"/", handlerErr(p.serveValue)) p.Handle(v2machinePrefix, handlerErr(p.serveMachines)) p.Handle(v2peersPrefix, handlerErr(p.serveMachines)) @@ -114,20 +138,23 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, client *v2clie p.Handle(v2StoreStatsPrefix, handlerErr(p.serveStoreStats)) p.Handle(v2adminConfigPrefix, handlerErr(p.serveAdminConfig)) p.Handle(v2adminMachinesPrefix, handlerErr(p.serveAdminMachines)) - return p + + return p, nil } func (p *participant) run() int64 { - seeds := p.peerHub.getSeeds() - if len(seeds) == 0 { - log.Printf("id=%x participant.run action=bootstrap\n", p.id) - p.node.Campaign() - p.node.InitCluster(genId()) - p.node.Add(p.id, p.raftPubAddr, []byte(p.pubAddr)) - p.apply(p.node.Next()) - } else { - log.Printf("id=%x participant.run action=join seeds=\"%v\"\n", p.id, seeds) - p.join() + if p.node.IsEmpty() { + seeds := p.peerHub.getSeeds() + if len(seeds) == 0 { + log.Printf("id=%x participant.run action=bootstrap\n", p.id) + p.node.Campaign() + p.node.InitCluster(genId()) + p.node.Add(p.id, p.raftPubAddr, []byte(p.pubAddr)) + p.apply(p.node.Next()) + } else { + log.Printf("id=%x participant.run action=join seeds=\"%v\"\n", p.id, seeds) + p.join() + } } p.rh.start() @@ -170,6 +197,8 @@ func (p *participant) run() int64 { return stopMode } p.apply(node.Next()) + _, ents := node.UnstableEnts() + p.save(ents, node.UnstableState()) p.send(node.Msgs()) if node.IsRemoved() { p.stop() @@ -187,6 +216,7 @@ func (p *participant) stop() { } p.stopped = true close(p.stopc) + p.w.Close() } func (p *participant) raftHandler() http.Handler { @@ -303,6 +333,10 @@ func (p *participant) apply(ents []raft.Entry) { peer.participate() pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId)) p.Store.Set(pp, false, fmt.Sprintf("raft=%v&etcd=%v", cfg.Addr, string(cfg.Context)), store.Permanent) + if p.id == cfg.NodeId { + p.raftPubAddr = cfg.Addr + p.pubAddr = string(cfg.Context) + } log.Printf("id=%x participant.cluster.addNode nodeId=%x addr=%s context=%s\n", p.id, cfg.NodeId, cfg.Addr, cfg.Context) case raft.RemoveNode: cfg := new(raft.Config) @@ -324,6 +358,17 @@ func (p *participant) apply(ents []raft.Entry) { } } +func (p *participant) save(ents []raft.Entry, state raft.State) { + for _, ent := range ents { + p.w.SaveEntry(&ent) + } + if state != raft.EmptyState { + p.w.SaveState(&state) + } + p.w.Flush() + +} + func (p *participant) send(msgs []raft.Message) { for i := range msgs { if err := p.peerHub.send(msgs[i]); err != nil { diff --git a/raft/node.go b/raft/node.go index 4fb827800..abc53270c 100644 --- a/raft/node.go +++ b/raft/node.go @@ -204,6 +204,11 @@ func (n *Node) Tick() { } } +// IsEmpty returns ture if the log of the node is empty. +func (n *Node) IsEmpty() bool { + return n.sm.raftLog.isEmpty() +} + func (n *Node) UpdateConf(t int64, c *Config) { data, err := json.Marshal(c) if err != nil { @@ -219,8 +224,8 @@ func (n *Node) UnstableEnts() (int64, []Entry) { } func (n *Node) UnstableState() State { - if n.sm.unstableState == emptyState { - return emptyState + if n.sm.unstableState == EmptyState { + return EmptyState } s := n.sm.unstableState n.sm.clearState() diff --git a/raft/raft.go b/raft/raft.go index 89e315668..08ecb1bc3 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -72,7 +72,7 @@ type State struct { Commit int64 } -var emptyState = State{} +var EmptyState = State{} type Message struct { Type messageType