Merge pull request #2110 from xiang90/raft

etcdserver: separate out raft related stuff
This commit is contained in:
Xiang Li 2015-01-15 15:17:48 -08:00
commit c36aa3be6e
5 changed files with 261 additions and 222 deletions

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2014 CoreOS, Inc. Copyright 2015 CoreOS, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
@ -19,16 +19,107 @@ package etcdserver
import ( import (
"encoding/json" "encoding/json"
"log" "log"
"os"
"sort" "sort"
"time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/etcd/wal" "github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb" "github.com/coreos/etcd/wal/walpb"
) )
type RaftTimer interface {
Index() uint64
Term() uint64
}
type raftNode struct {
raft.Node
// config
snapCount uint64 // number of entries to trigger a snapshot
// utility
ticker <-chan time.Time
raftStorage *raft.MemoryStorage
storage Storage
// transport specifies the transport to send and receive msgs to members.
// Sending messages MUST NOT block. It is okay to drop messages, since
// clients should timeout and reissue their messages.
// If transport is nil, server will panic.
transport rafthttp.Transporter
// Cache of the latest raft index and raft term the server has seen
index uint64
term uint64
lead uint64
}
// for testing
func (r *raftNode) pauseSending() {
p := r.transport.(rafthttp.Pausable)
p.Pause()
}
func (r *raftNode) resumeSending() {
p := r.transport.(rafthttp.Pausable)
p.Resume()
}
func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
var err error
member := cfg.Cluster.MemberByName(cfg.Name)
metadata := pbutil.MustMarshal(
&pb.Metadata{
NodeID: uint64(member.ID),
ClusterID: uint64(cfg.Cluster.ID()),
},
)
if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
log.Fatalf("etcdserver create snapshot directory error: %v", err)
}
if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
log.Fatalf("etcdserver: create wal error: %v", err)
}
peers := make([]raft.Peer, len(ids))
for i, id := range ids {
ctx, err := json.Marshal((*cfg.Cluster).Member(id))
if err != nil {
log.Panicf("marshal member should never fail: %v", err)
}
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
}
id = member.ID
log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID())
s = raft.NewMemoryStorage()
n = raft.StartNode(uint64(id), peers, cfg.ElectionTicks, 1, s)
return
}
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
cfg.Cluster.SetID(cid)
log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
s := raft.NewMemoryStorage()
if snapshot != nil {
s.ApplySnapshot(*snapshot)
}
s.SetHardState(st)
s.Append(ents)
n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s)
return id, n, s, w
}
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) { func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot var walsnap walpb.Snapshot
if snapshot != nil { if snapshot != nil {

View File

@ -23,7 +23,6 @@ import (
"log" "log"
"math/rand" "math/rand"
"net/http" "net/http"
"os"
"path" "path"
"regexp" "regexp"
"sort" "sort"
@ -46,7 +45,6 @@ import (
"github.com/coreos/etcd/snap" "github.com/coreos/etcd/snap"
"github.com/coreos/etcd/store" "github.com/coreos/etcd/store"
"github.com/coreos/etcd/wal" "github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
) )
@ -115,14 +113,12 @@ type Server interface {
UpdateMember(ctx context.Context, updateMemb Member) error UpdateMember(ctx context.Context, updateMemb Member) error
} }
type RaftTimer interface {
Index() uint64
Term() uint64
}
// EtcdServer is the production implementation of the Server interface // EtcdServer is the production implementation of the Server interface
type EtcdServer struct { type EtcdServer struct {
cfg *ServerConfig cfg *ServerConfig
r raftNode
w wait.Wait w wait.Wait
stop chan struct{} stop chan struct{}
done chan struct{} done chan struct{}
@ -132,32 +128,13 @@ type EtcdServer struct {
Cluster *Cluster Cluster *Cluster
node raft.Node
raftStorage *raft.MemoryStorage
storage Storage
store store.Store store store.Store
stats *stats.ServerStats stats *stats.ServerStats
lstats *stats.LeaderStats lstats *stats.LeaderStats
// transport specifies the transport to send and receive msgs to members.
// Sending messages MUST NOT block. It is okay to drop messages, since
// clients should timeout and reissue their messages.
// If transport is nil, server will panic.
transport rafthttp.Transporter
Ticker <-chan time.Time
SyncTicker <-chan time.Time SyncTicker <-chan time.Time
snapCount uint64 // number of entries to trigger a snapshot
// Cache of the latest raft index and raft term the server has seen
raftIndex uint64
raftTerm uint64
raftLead uint64
reqIDGen *idutil.Generator reqIDGen *idutil.Generator
} }
@ -254,21 +231,23 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
lstats := stats.NewLeaderStats(id.String()) lstats := stats.NewLeaderStats(id.String())
srv := &EtcdServer{ srv := &EtcdServer{
cfg: cfg, cfg: cfg,
errorc: make(chan error, 1), errorc: make(chan error, 1),
store: st, store: st,
node: n, r: raftNode{
raftStorage: s, Node: n,
id: id, snapCount: cfg.SnapCount,
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond),
Cluster: cfg.Cluster, raftStorage: s,
storage: NewStorage(w, ss), storage: NewStorage(w, ss),
stats: sstats, },
lstats: lstats, id: id,
Ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond), attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
SyncTicker: time.Tick(500 * time.Millisecond), Cluster: cfg.Cluster,
snapCount: cfg.SnapCount, stats: sstats,
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()), lstats: lstats,
SyncTicker: time.Tick(500 * time.Millisecond),
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
} }
tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats) tr := rafthttp.NewTransporter(cfg.Transport, id, cfg.Cluster.ID(), srv, srv.errorc, sstats, lstats)
@ -278,7 +257,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
tr.AddPeer(m.ID, m.PeerURLs) tr.AddPeer(m.ID, m.PeerURLs)
} }
} }
srv.transport = tr srv.r.transport = tr
return srv, nil return srv, nil
} }
@ -295,9 +274,9 @@ func (s *EtcdServer) Start() {
// modify a server's fields after it has been sent to Start. // modify a server's fields after it has been sent to Start.
// This function is just used for testing. // This function is just used for testing.
func (s *EtcdServer) start() { func (s *EtcdServer) start() {
if s.snapCount == 0 { if s.r.snapCount == 0 {
log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount) log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount)
s.snapCount = DefaultSnapCount s.r.snapCount = DefaultSnapCount
} }
s.w = wait.New() s.w = wait.New()
s.done = make(chan struct{}) s.done = make(chan struct{})
@ -328,7 +307,7 @@ func (s *EtcdServer) purgeFile() {
func (s *EtcdServer) ID() types.ID { return s.id } func (s *EtcdServer) ID() types.ID { return s.id }
func (s *EtcdServer) RaftHandler() http.Handler { return s.transport.Handler() } func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
if s.Cluster.IsIDRemoved(types.ID(m.From)) { if s.Cluster.IsIDRemoved(types.ID(m.From)) {
@ -338,7 +317,7 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
if m.Type == raftpb.MsgApp { if m.Type == raftpb.MsgApp {
s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size()) s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size())
} }
return s.node.Step(ctx, m) return s.r.Step(ctx, m)
} }
func (s *EtcdServer) run() { func (s *EtcdServer) run() {
@ -346,7 +325,7 @@ func (s *EtcdServer) run() {
var shouldstop bool var shouldstop bool
// load initial state from raft storage // load initial state from raft storage
snap, err := s.raftStorage.Snapshot() snap, err := s.r.raftStorage.Snapshot()
if err != nil { if err != nil {
log.Panicf("etcdserver: get snapshot from raft storage error: %v", err) log.Panicf("etcdserver: get snapshot from raft storage error: %v", err)
} }
@ -356,20 +335,21 @@ func (s *EtcdServer) run() {
confState := snap.Metadata.ConfState confState := snap.Metadata.ConfState
defer func() { defer func() {
s.node.Stop() s.r.Stop()
s.transport.Stop() s.r.transport.Stop()
if err := s.storage.Close(); err != nil { if err := s.r.storage.Close(); err != nil {
log.Panicf("etcdserver: close storage error: %v", err) log.Panicf("etcdserver: close storage error: %v", err)
} }
close(s.done) close(s.done)
}() }()
// TODO: make raft loop a method on raftNode
for { for {
select { select {
case <-s.Ticker: case <-s.r.ticker:
s.node.Tick() s.r.Tick()
case rd := <-s.node.Ready(): case rd := <-s.r.Ready():
if rd.SoftState != nil { if rd.SoftState != nil {
atomic.StoreUint64(&s.raftLead, rd.SoftState.Lead) atomic.StoreUint64(&s.r.lead, rd.SoftState.Lead)
if rd.RaftState == raft.StateLeader { if rd.RaftState == raft.StateLeader {
syncC = s.SyncTicker syncC = s.SyncTicker
// TODO: remove the nil checking // TODO: remove the nil checking
@ -384,18 +364,18 @@ func (s *EtcdServer) run() {
// apply snapshot to storage if it is more updated than current snapi // apply snapshot to storage if it is more updated than current snapi
if !raft.IsEmptySnap(rd.Snapshot) && rd.Snapshot.Metadata.Index > snapi { if !raft.IsEmptySnap(rd.Snapshot) && rd.Snapshot.Metadata.Index > snapi {
if err := s.storage.SaveSnap(rd.Snapshot); err != nil { if err := s.r.storage.SaveSnap(rd.Snapshot); err != nil {
log.Fatalf("etcdserver: save snapshot error: %v", err) log.Fatalf("etcdserver: save snapshot error: %v", err)
} }
s.raftStorage.ApplySnapshot(rd.Snapshot) s.r.raftStorage.ApplySnapshot(rd.Snapshot)
snapi = rd.Snapshot.Metadata.Index snapi = rd.Snapshot.Metadata.Index
log.Printf("etcdserver: saved incoming snapshot at index %d", snapi) log.Printf("etcdserver: saved incoming snapshot at index %d", snapi)
} }
if err := s.storage.Save(rd.HardState, rd.Entries); err != nil { if err := s.r.storage.Save(rd.HardState, rd.Entries); err != nil {
log.Fatalf("etcdserver: save state and entries error: %v", err) log.Fatalf("etcdserver: save state and entries error: %v", err)
} }
s.raftStorage.Append(rd.Entries) s.r.raftStorage.Append(rd.Entries)
s.send(rd.Messages) s.send(rd.Messages)
@ -427,9 +407,9 @@ func (s *EtcdServer) run() {
} }
} }
s.node.Advance() s.r.Advance()
if appliedi-snapi > s.snapCount { if appliedi-snapi > s.r.snapCount {
log.Printf("etcdserver: start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi) log.Printf("etcdserver: start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi)
s.snapshot(appliedi, &confState) s.snapshot(appliedi, &confState)
snapi = appliedi snapi = appliedi
@ -486,7 +466,7 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
return Response{}, err return Response{}, err
} }
ch := s.w.Register(r.ID) ch := s.w.Register(r.ID)
s.node.Propose(ctx, data) s.r.Propose(ctx, data)
select { select {
case x := <-ch: case x := <-ch:
resp := x.(Response) resp := x.(Response)
@ -526,7 +506,7 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() } func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
func (s *EtcdServer) LeaderStats() []byte { func (s *EtcdServer) LeaderStats() []byte {
lead := atomic.LoadUint64(&s.raftLead) lead := atomic.LoadUint64(&s.r.lead)
if lead != uint64(s.id) { if lead != uint64(s.id) {
return nil return nil
} }
@ -571,14 +551,14 @@ func (s *EtcdServer) UpdateMember(ctx context.Context, memb Member) error {
} }
// Implement the RaftTimer interface // Implement the RaftTimer interface
func (s *EtcdServer) Index() uint64 { return atomic.LoadUint64(&s.raftIndex) } func (s *EtcdServer) Index() uint64 { return atomic.LoadUint64(&s.r.index) }
func (s *EtcdServer) Term() uint64 { return atomic.LoadUint64(&s.raftTerm) } func (s *EtcdServer) Term() uint64 { return atomic.LoadUint64(&s.r.term) }
// Only for testing purpose // Only for testing purpose
// TODO: add Raft server interface to expose raft related info: // TODO: add Raft server interface to expose raft related info:
// Index, Term, Lead, Committed, Applied, LastIndex, etc. // Index, Term, Lead, Committed, Applied, LastIndex, etc.
func (s *EtcdServer) Lead() uint64 { return atomic.LoadUint64(&s.raftLead) } func (s *EtcdServer) Lead() uint64 { return atomic.LoadUint64(&s.r.lead) }
func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) } func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) }
@ -588,7 +568,7 @@ func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) }
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error { func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
cc.ID = s.reqIDGen.Next() cc.ID = s.reqIDGen.Next()
ch := s.w.Register(cc.ID) ch := s.w.Register(cc.ID)
if err := s.node.ProposeConfChange(ctx, cc); err != nil { if err := s.r.ProposeConfChange(ctx, cc); err != nil {
s.w.Trigger(cc.ID, nil) s.w.Trigger(cc.ID, nil)
return err return err
} }
@ -623,7 +603,7 @@ func (s *EtcdServer) sync(timeout time.Duration) {
// There is no promise that node has leader when do SYNC request, // There is no promise that node has leader when do SYNC request,
// so it uses goroutine to propose. // so it uses goroutine to propose.
go func() { go func() {
s.node.Propose(ctx, data) s.r.Propose(ctx, data)
cancel() cancel()
}() }()
} }
@ -668,7 +648,7 @@ func (s *EtcdServer) send(ms []raftpb.Message) {
m.To = 0 m.To = 0
} }
} }
s.transport.Send(ms) s.r.transport.Send(ms)
} }
// apply takes entries received from Raft (after it has been committed) and // apply takes entries received from Raft (after it has been committed) and
@ -693,8 +673,8 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint
default: default:
log.Panicf("entry type should be either EntryNormal or EntryConfChange") log.Panicf("entry type should be either EntryNormal or EntryConfChange")
} }
atomic.StoreUint64(&s.raftIndex, e.Index) atomic.StoreUint64(&s.r.index, e.Index)
atomic.StoreUint64(&s.raftTerm, e.Term) atomic.StoreUint64(&s.r.term, e.Term)
applied = e.Index applied = e.Index
} }
return applied, shouldstop return applied, shouldstop
@ -754,10 +734,10 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) { func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {
if err := s.Cluster.ValidateConfigurationChange(cc); err != nil { if err := s.Cluster.ValidateConfigurationChange(cc); err != nil {
cc.NodeID = raft.None cc.NodeID = raft.None
s.node.ApplyConfChange(cc) s.r.ApplyConfChange(cc)
return false, err return false, err
} }
*confState = *s.node.ApplyConfChange(cc) *confState = *s.r.ApplyConfChange(cc)
switch cc.Type { switch cc.Type {
case raftpb.ConfChangeAddNode: case raftpb.ConfChangeAddNode:
m := new(Member) m := new(Member)
@ -771,7 +751,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
if m.ID == s.id { if m.ID == s.id {
log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
} else { } else {
s.transport.AddPeer(m.ID, m.PeerURLs) s.r.transport.AddPeer(m.ID, m.PeerURLs)
log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
} }
case raftpb.ConfChangeRemoveNode: case raftpb.ConfChangeRemoveNode:
@ -780,7 +760,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
if id == s.id { if id == s.id {
return true, nil return true, nil
} else { } else {
s.transport.RemovePeer(id) s.r.transport.RemovePeer(id)
log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID()) log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID())
} }
case raftpb.ConfChangeUpdateNode: case raftpb.ConfChangeUpdateNode:
@ -795,7 +775,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
if m.ID == s.id { if m.ID == s.id {
log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
} else { } else {
s.transport.UpdatePeer(m.ID, m.PeerURLs) s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID())
} }
} }
@ -810,7 +790,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) {
if err != nil { if err != nil {
log.Panicf("etcdserver: store save should never fail: %v", err) log.Panicf("etcdserver: store save should never fail: %v", err)
} }
err = s.raftStorage.Compact(snapi, confState, d) err = s.r.raftStorage.Compact(snapi, confState, d)
if err != nil { if err != nil {
// the snapshot was done asynchronously with the progress of raft. // the snapshot was done asynchronously with the progress of raft.
// raft might have already got a newer snapshot and called compact. // raft might have already got a newer snapshot and called compact.
@ -821,78 +801,22 @@ func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) {
} }
log.Printf("etcdserver: compacted log at index %d", snapi) log.Printf("etcdserver: compacted log at index %d", snapi)
if err := s.storage.Cut(); err != nil { if err := s.r.storage.Cut(); err != nil {
log.Panicf("etcdserver: rotate wal file should never fail: %v", err) log.Panicf("etcdserver: rotate wal file should never fail: %v", err)
} }
snap, err := s.raftStorage.Snapshot() snap, err := s.r.raftStorage.Snapshot()
if err != nil { if err != nil {
log.Panicf("etcdserver: snapshot error: %v", err) log.Panicf("etcdserver: snapshot error: %v", err)
} }
if err := s.storage.SaveSnap(snap); err != nil { if err := s.r.storage.SaveSnap(snap); err != nil {
log.Fatalf("etcdserver: save snapshot error: %v", err) log.Fatalf("etcdserver: save snapshot error: %v", err)
} }
log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index) log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index)
} }
// for testing func (s *EtcdServer) PauseSending() { s.r.pauseSending() }
func (s *EtcdServer) PauseSending() {
p := s.transport.(rafthttp.Pausable)
p.Pause()
}
func (s *EtcdServer) ResumeSending() { func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
p := s.transport.(rafthttp.Pausable)
p.Resume()
}
func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
var err error
member := cfg.Cluster.MemberByName(cfg.Name)
metadata := pbutil.MustMarshal(
&pb.Metadata{
NodeID: uint64(member.ID),
ClusterID: uint64(cfg.Cluster.ID()),
},
)
if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
log.Fatalf("etcdserver create snapshot directory error: %v", err)
}
if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
log.Fatalf("etcdserver: create wal error: %v", err)
}
peers := make([]raft.Peer, len(ids))
for i, id := range ids {
ctx, err := json.Marshal((*cfg.Cluster).Member(id))
if err != nil {
log.Panicf("marshal member should never fail: %v", err)
}
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
}
id = member.ID
log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID())
s = raft.NewMemoryStorage()
n = raft.StartNode(uint64(id), peers, cfg.ElectionTicks, 1, s)
return
}
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
cfg.Cluster.SetID(cid)
log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
s := raft.NewMemoryStorage()
if snapshot != nil {
s.ApplySnapshot(*snapshot)
}
s.SetHardState(st)
s.Append(ents)
n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s)
return id, n, s, w
}
// isBootstrapped tries to check if the given member has been bootstrapped // isBootstrapped tries to check if the given member has been bootstrapped
// in the given cluster. // in the given cluster.

View File

@ -456,7 +456,7 @@ func TestApplyConfChangeError(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
n := &nodeRecorder{} n := &nodeRecorder{}
srv := &EtcdServer{ srv := &EtcdServer{
node: n, r: raftNode{Node: n},
Cluster: cl, Cluster: cl,
} }
_, err := srv.applyConfChange(tt.cc, nil) _, err := srv.applyConfChange(tt.cc, nil)
@ -483,10 +483,12 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
cl.AddMember(&Member{ID: types.ID(i)}) cl.AddMember(&Member{ID: types.ID(i)})
} }
srv := &EtcdServer{ srv := &EtcdServer{
id: 1, id: 1,
node: &nodeRecorder{}, r: raftNode{
Cluster: cl, Node: &nodeRecorder{},
transport: &nopTransporter{}, transport: &nopTransporter{},
},
Cluster: cl,
} }
cc := raftpb.ConfChange{ cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode, Type: raftpb.ConfChangeRemoveNode,
@ -522,12 +524,14 @@ func TestDoProposal(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
st := &storeRecorder{} st := &storeRecorder{}
srv := &EtcdServer{ srv := &EtcdServer{
node: newNodeCommitter(), r: raftNode{
raftStorage: raft.NewMemoryStorage(), Node: newNodeCommitter(),
store: st, storage: &storageRecorder{},
transport: &nopTransporter{}, raftStorage: raft.NewMemoryStorage(),
storage: &storageRecorder{}, transport: &nopTransporter{},
reqIDGen: idutil.NewGenerator(0, time.Time{}), },
store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
} }
srv.start() srv.start()
resp, err := srv.Do(context.Background(), tt) resp, err := srv.Do(context.Background(), tt)
@ -550,7 +554,7 @@ func TestDoProposal(t *testing.T) {
func TestDoProposalCancelled(t *testing.T) { func TestDoProposalCancelled(t *testing.T) {
wait := &waitRecorder{} wait := &waitRecorder{}
srv := &EtcdServer{ srv := &EtcdServer{
node: &nodeRecorder{}, r: raftNode{Node: &nodeRecorder{}},
w: wait, w: wait,
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
} }
@ -569,7 +573,7 @@ func TestDoProposalCancelled(t *testing.T) {
func TestDoProposalTimeout(t *testing.T) { func TestDoProposalTimeout(t *testing.T) {
srv := &EtcdServer{ srv := &EtcdServer{
node: &nodeRecorder{}, r: raftNode{Node: &nodeRecorder{}},
w: &waitRecorder{}, w: &waitRecorder{},
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
} }
@ -582,7 +586,7 @@ func TestDoProposalTimeout(t *testing.T) {
func TestDoProposalStopped(t *testing.T) { func TestDoProposalStopped(t *testing.T) {
srv := &EtcdServer{ srv := &EtcdServer{
node: &nodeRecorder{}, r: raftNode{Node: &nodeRecorder{}},
w: &waitRecorder{}, w: &waitRecorder{},
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
} }
@ -598,7 +602,7 @@ func TestDoProposalStopped(t *testing.T) {
func TestSync(t *testing.T) { func TestSync(t *testing.T) {
n := &nodeRecorder{} n := &nodeRecorder{}
srv := &EtcdServer{ srv := &EtcdServer{
node: n, r: raftNode{Node: n},
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
} }
// check that sync is non-blocking // check that sync is non-blocking
@ -631,7 +635,7 @@ func TestSync(t *testing.T) {
func TestSyncTimeout(t *testing.T) { func TestSyncTimeout(t *testing.T) {
n := &nodeProposalBlockerRecorder{} n := &nodeProposalBlockerRecorder{}
srv := &EtcdServer{ srv := &EtcdServer{
node: n, r: raftNode{Node: n},
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
} }
// check that sync is non-blocking // check that sync is non-blocking
@ -656,13 +660,15 @@ func TestSyncTrigger(t *testing.T) {
n := newReadyNode() n := newReadyNode()
st := make(chan time.Time, 1) st := make(chan time.Time, 1)
srv := &EtcdServer{ srv := &EtcdServer{
node: n, r: raftNode{
raftStorage: raft.NewMemoryStorage(), Node: n,
store: &storeRecorder{}, raftStorage: raft.NewMemoryStorage(),
transport: &nopTransporter{}, transport: &nopTransporter{},
storage: &storageRecorder{}, storage: &storageRecorder{},
SyncTicker: st, },
reqIDGen: idutil.NewGenerator(0, time.Time{}), store: &storeRecorder{},
SyncTicker: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
} }
srv.start() srv.start()
defer srv.Stop() defer srv.Stop()
@ -700,10 +706,12 @@ func TestSnapshot(t *testing.T) {
st := &storeRecorder{} st := &storeRecorder{}
p := &storageRecorder{} p := &storageRecorder{}
srv := &EtcdServer{ srv := &EtcdServer{
node: &nodeRecorder{}, r: raftNode{
raftStorage: s, Node: &nodeRecorder{},
store: st, raftStorage: s,
storage: p, storage: p,
},
store: st,
} }
srv.snapshot(1, &raftpb.ConfState{Nodes: []uint64{1}}) srv.snapshot(1, &raftpb.ConfState{Nodes: []uint64{1}})
gaction := st.Action() gaction := st.Action()
@ -731,13 +739,15 @@ func TestTriggerSnap(t *testing.T) {
st := &storeRecorder{} st := &storeRecorder{}
p := &storageRecorder{} p := &storageRecorder{}
srv := &EtcdServer{ srv := &EtcdServer{
node: newNodeCommitter(), r: raftNode{
raftStorage: raft.NewMemoryStorage(), Node: newNodeCommitter(),
store: st, snapCount: uint64(snapc),
transport: &nopTransporter{}, raftStorage: raft.NewMemoryStorage(),
storage: p, storage: p,
snapCount: uint64(snapc), transport: &nopTransporter{},
reqIDGen: idutil.NewGenerator(0, time.Time{}), },
store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
} }
srv.start() srv.start()
for i := 0; i < snapc+1; i++ { for i := 0; i < snapc+1; i++ {
@ -766,12 +776,14 @@ func TestRecvSnapshot(t *testing.T) {
cl := newCluster("abc") cl := newCluster("abc")
cl.SetStore(store.New()) cl.SetStore(store.New())
s := &EtcdServer{ s := &EtcdServer{
store: st, r: raftNode{
transport: &nopTransporter{}, Node: n,
storage: p, transport: &nopTransporter{},
node: n, storage: p,
raftStorage: raft.NewMemoryStorage(), raftStorage: raft.NewMemoryStorage(),
Cluster: cl, },
store: st,
Cluster: cl,
} }
s.start() s.start()
@ -799,12 +811,14 @@ func TestRecvSlowSnapshot(t *testing.T) {
cl := newCluster("abc") cl := newCluster("abc")
cl.SetStore(store.New()) cl.SetStore(store.New())
s := &EtcdServer{ s := &EtcdServer{
store: st, r: raftNode{
transport: &nopTransporter{}, Node: n,
storage: &storageRecorder{}, storage: &storageRecorder{},
node: n, raftStorage: raft.NewMemoryStorage(),
raftStorage: raft.NewMemoryStorage(), transport: &nopTransporter{},
Cluster: cl, },
store: st,
Cluster: cl,
} }
s.start() s.start()
@ -832,12 +846,14 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
cl.SetStore(store.New()) cl.SetStore(store.New())
storage := raft.NewMemoryStorage() storage := raft.NewMemoryStorage()
s := &EtcdServer{ s := &EtcdServer{
store: st, r: raftNode{
transport: &nopTransporter{}, Node: n,
storage: &storageRecorder{}, storage: &storageRecorder{},
node: n, raftStorage: storage,
raftStorage: storage, transport: &nopTransporter{},
Cluster: cl, },
store: st,
Cluster: cl,
} }
s.start() s.start()
@ -874,13 +890,15 @@ func TestAddMember(t *testing.T) {
st := store.New() st := store.New()
cl.SetStore(st) cl.SetStore(st)
s := &EtcdServer{ s := &EtcdServer{
node: n, r: raftNode{
raftStorage: raft.NewMemoryStorage(), Node: n,
store: st, raftStorage: raft.NewMemoryStorage(),
transport: &nopTransporter{}, storage: &storageRecorder{},
storage: &storageRecorder{}, transport: &nopTransporter{},
Cluster: cl, },
reqIDGen: idutil.NewGenerator(0, time.Time{}), store: st,
Cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
} }
s.start() s.start()
m := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}} m := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}}
@ -911,13 +929,15 @@ func TestRemoveMember(t *testing.T) {
cl.SetStore(store.New()) cl.SetStore(store.New())
cl.AddMember(&Member{ID: 1234}) cl.AddMember(&Member{ID: 1234})
s := &EtcdServer{ s := &EtcdServer{
node: n, r: raftNode{
raftStorage: raft.NewMemoryStorage(), Node: n,
store: st, raftStorage: raft.NewMemoryStorage(),
transport: &nopTransporter{}, storage: &storageRecorder{},
storage: &storageRecorder{}, transport: &nopTransporter{},
Cluster: cl, },
reqIDGen: idutil.NewGenerator(0, time.Time{}), store: st,
Cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
} }
s.start() s.start()
err := s.RemoveMember(context.TODO(), 1234) err := s.RemoveMember(context.TODO(), 1234)
@ -947,13 +967,15 @@ func TestUpdateMember(t *testing.T) {
cl.SetStore(st) cl.SetStore(st)
cl.AddMember(&Member{ID: 1234}) cl.AddMember(&Member{ID: 1234})
s := &EtcdServer{ s := &EtcdServer{
node: n, r: raftNode{
raftStorage: raft.NewMemoryStorage(), Node: n,
store: st, raftStorage: raft.NewMemoryStorage(),
transport: &nopTransporter{}, storage: &storageRecorder{},
storage: &storageRecorder{}, transport: &nopTransporter{},
Cluster: cl, },
reqIDGen: idutil.NewGenerator(0, time.Time{}), store: st,
Cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
} }
s.start() s.start()
wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}} wm := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
@ -983,9 +1005,9 @@ func TestPublish(t *testing.T) {
w := &waitWithResponse{ch: ch} w := &waitWithResponse{ch: ch}
srv := &EtcdServer{ srv := &EtcdServer{
id: 1, id: 1,
r: raftNode{Node: n},
attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, attributes: Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}},
Cluster: &Cluster{}, Cluster: &Cluster{},
node: n,
w: w, w: w,
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),
} }
@ -1022,13 +1044,15 @@ func TestPublish(t *testing.T) {
// TestPublishStopped tests that publish will be stopped if server is stopped. // TestPublishStopped tests that publish will be stopped if server is stopped.
func TestPublishStopped(t *testing.T) { func TestPublishStopped(t *testing.T) {
srv := &EtcdServer{ srv := &EtcdServer{
node: &nodeRecorder{}, r: raftNode{
transport: &nopTransporter{}, Node: &nodeRecorder{},
Cluster: &Cluster{}, transport: &nopTransporter{},
w: &waitRecorder{}, },
done: make(chan struct{}), Cluster: &Cluster{},
stop: make(chan struct{}), w: &waitRecorder{},
reqIDGen: idutil.NewGenerator(0, time.Time{}), done: make(chan struct{}),
stop: make(chan struct{}),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
} }
close(srv.done) close(srv.done)
srv.publish(time.Hour) srv.publish(time.Hour)
@ -1040,7 +1064,7 @@ func TestPublishRetry(t *testing.T) {
defer log.SetOutput(os.Stderr) defer log.SetOutput(os.Stderr)
n := &nodeRecorder{} n := &nodeRecorder{}
srv := &EtcdServer{ srv := &EtcdServer{
node: n, r: raftNode{Node: n},
w: &waitRecorder{}, w: &waitRecorder{},
done: make(chan struct{}), done: make(chan struct{}),
reqIDGen: idutil.NewGenerator(0, time.Time{}), reqIDGen: idutil.NewGenerator(0, time.Time{}),

View File

@ -484,6 +484,7 @@ func mustNewMember(t *testing.T, name string) *member {
m.NewCluster = true m.NewCluster = true
m.Transport = mustNewTransport(t) m.Transport = mustNewTransport(t)
m.ElectionTicks = electionTicks m.ElectionTicks = electionTicks
m.TickMs = uint(tickDuration / time.Millisecond)
return m return m
} }
@ -524,7 +525,6 @@ func (m *member) Launch() error {
if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil { if m.s, err = etcdserver.NewServer(&m.ServerConfig); err != nil {
return fmt.Errorf("failed to initialize the etcd server: %v", err) return fmt.Errorf("failed to initialize the etcd server: %v", err)
} }
m.s.Ticker = time.Tick(tickDuration)
m.s.SyncTicker = time.Tick(500 * time.Millisecond) m.s.SyncTicker = time.Tick(500 * time.Millisecond)
m.s.Start() m.s.Start()