Merge pull request #1655 from jonboulle/wal_logic

etcdserver: collapse shared readWAL logic
This commit is contained in:
Jonathan Boulle 2014-11-08 17:07:43 -08:00
commit aca58ec605
2 changed files with 18 additions and 32 deletions

View File

@ -28,15 +28,8 @@ import (
"github.com/coreos/etcd/wal"
)
func restartAsStandaloneNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id types.ID, n raft.Node, w *wal.WAL) {
var err error
if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil {
log.Fatalf("etcdserver: open wal error: %v", err)
}
id, cid, st, ents, err := readWAL(w, index)
if err != nil {
log.Fatalf("etcdserver: read wal error: %v", err)
}
func restartAsStandaloneNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *wal.WAL) {
w, id, cid, st, ents := readWAL(cfg.WALDir(), index)
cfg.Cluster.SetID(cid)
// discard the previously uncommitted entries
@ -60,8 +53,8 @@ func restartAsStandaloneNode(cfg *ServerConfig, index uint64, snapshot *raftpb.S
}
log.Printf("etcdserver: forcing restart of member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
n = raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents)
return
n := raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents)
return id, n, w
}
// getIDs returns an ordered set of IDs included in the given snapshot and

View File

@ -724,8 +724,6 @@ func GetClusterFromPeers(urls []string) (*Cluster, error) {
func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, w *wal.WAL) {
var err error
// TODO: remove the discoveryURL when it becomes part of the source for
// generating nodeID.
member := cfg.Cluster.MemberByName(cfg.Name)
metadata := pbutil.MustMarshal(
&pb.Metadata{
@ -764,29 +762,24 @@ func getOtherPeerURLs(cl ClusterInfo, self string) []string {
return us
}
func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (id types.ID, n raft.Node, w *wal.WAL) {
var err error
// restart a node from previous wal
if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil {
log.Fatalf("etcdserver: open wal error: %v", err)
}
id, clusterID, st, ents, err := readWAL(w, index)
if err != nil {
log.Fatalf("etcdserver: read wal error: %v", err)
}
cfg.Cluster.SetID(clusterID)
func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *wal.WAL) {
w, id, cid, st, ents := readWAL(cfg.WALDir(), index)
cfg.Cluster.SetID(cid)
log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
n = raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents)
return
n := raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents)
return id, n, w
}
func readWAL(w *wal.WAL, index uint64) (id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry, err error) {
var wmetadata []byte
wmetadata, st, ents, err = w.ReadAll()
if err != nil {
return
func readWAL(waldir string, index uint64) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
var err error
if w, err = wal.OpenAtIndex(waldir, index); err != nil {
log.Fatalf("etcdserver: open wal error: %v", err)
}
var wmetadata []byte
if wmetadata, st, ents, err = w.ReadAll(); err != nil {
log.Fatalf("etcdserver: read wal error: %v", err)
}
var metadata pb.Metadata
pbutil.MustUnmarshal(&metadata, wmetadata)
id = types.ID(metadata.NodeID)