From 41757e7f78114c44f22d300863524fdd347b875b Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Fri, 7 Nov 2014 11:57:04 -0800 Subject: [PATCH] etcdserver: collapse shared readWAL logic --- etcdserver/force_cluster.go | 15 ++++----------- etcdserver/server.go | 35 ++++++++++++++--------------------- 2 files changed, 18 insertions(+), 32 deletions(-) diff --git a/etcdserver/force_cluster.go b/etcdserver/force_cluster.go index d6c40a487..b0d1c8085 100644 --- a/etcdserver/force_cluster.go +++ b/etcdserver/force_cluster.go @@ -28,15 +28,8 @@ import ( "github.com/coreos/etcd/wal" ) -func restartAsStandaloneNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id types.ID, n raft.Node, w *wal.WAL) { - var err error - if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil { - log.Fatalf("etcdserver: open wal error: %v", err) - } - id, cid, st, ents, err := readWAL(w, index) - if err != nil { - log.Fatalf("etcdserver: read wal error: %v", err) - } +func restartAsStandaloneNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *wal.WAL) { + w, id, cid, st, ents := readWAL(cfg.WALDir(), index) cfg.Cluster.SetID(cid) // discard the previously uncommitted entries @@ -60,8 +53,8 @@ func restartAsStandaloneNode(cfg *ServerConfig, index uint64, snapshot *raftpb.S } log.Printf("etcdserver: forcing restart of member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit) - n = raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents) - return + n := raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents) + return id, n, w } // getIDs returns an ordered set of IDs included in the given snapshot and diff --git a/etcdserver/server.go b/etcdserver/server.go index 97b125fcf..a33f3c227 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -724,8 +724,6 @@ func GetClusterFromPeers(urls []string) (*Cluster, error) { func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, w *wal.WAL) { var err error - // TODO: remove the discoveryURL when it becomes part of the source for - // generating nodeID. member := cfg.Cluster.MemberByName(cfg.Name) metadata := pbutil.MustMarshal( &pb.Metadata{ @@ -764,29 +762,24 @@ func getOtherPeerURLs(cl ClusterInfo, self string) []string { return us } -func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id types.ID, n raft.Node, w *wal.WAL) { - var err error - // restart a node from previous wal - if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil { - log.Fatalf("etcdserver: open wal error: %v", err) - } - id, clusterID, st, ents, err := readWAL(w, index) - if err != nil { - log.Fatalf("etcdserver: read wal error: %v", err) - } - cfg.Cluster.SetID(clusterID) +func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *wal.WAL) { + w, id, cid, st, ents := readWAL(cfg.WALDir(), index) + cfg.Cluster.SetID(cid) + log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit) - n = raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents) - return + n := raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents) + return id, n, w } -func readWAL(w *wal.WAL, index uint64) (id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry, err error) { - var wmetadata []byte - wmetadata, st, ents, err = w.ReadAll() - if err != nil { - return +func readWAL(waldir string, index uint64) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) { + var err error + if w, err = wal.OpenAtIndex(waldir, index); err != nil { + log.Fatalf("etcdserver: open wal error: %v", err) + } + var wmetadata []byte + if wmetadata, st, ents, err = w.ReadAll(); err != nil { + log.Fatalf("etcdserver: read wal error: %v", err) } - var metadata pb.Metadata pbutil.MustUnmarshal(&metadata, wmetadata) id = types.ID(metadata.NodeID)