mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

In the case that follower recieves a snapshot from leader and crashes before renaming xxx.snap.db to db but after snapshot has persisted to .wal and .snap, restarting follower results loading old db, new .wal, and new .snap. This will causes a index mismatch between snap metadata index and consistent index from db. This pr forces an ordering where saving/renaming db must happen after snapshot is persisted to wal and snap file. this guarantees wal and snap files are newer than db. on server restart, etcd server checks if snap index > db consistent index. if yes, etcd server attempts to load xxx.snap.db where xxx=snap index if there is any and panic other wise. FIXES #7628
595 lines
16 KiB
Go
595 lines
16 KiB
Go
// Copyright 2015 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package etcdserver
|
|
|
|
import (
|
|
"encoding/json"
|
|
"expvar"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
"github.com/coreos/etcd/etcdserver/membership"
|
|
"github.com/coreos/etcd/pkg/contention"
|
|
"github.com/coreos/etcd/pkg/pbutil"
|
|
"github.com/coreos/etcd/pkg/types"
|
|
"github.com/coreos/etcd/raft"
|
|
"github.com/coreos/etcd/raft/raftpb"
|
|
"github.com/coreos/etcd/rafthttp"
|
|
"github.com/coreos/etcd/wal"
|
|
"github.com/coreos/etcd/wal/walpb"
|
|
"github.com/coreos/pkg/capnslog"
|
|
)
|
|
|
|
const (
|
|
// Number of entries for slow follower to catch-up after compacting
|
|
// the raft storage entries.
|
|
// We expect the follower has a millisecond level latency with the leader.
|
|
// The max throughput is around 10K. Keep a 5K entries is enough for helping
|
|
// follower to catch up.
|
|
numberOfCatchUpEntries = 5000
|
|
|
|
// The max throughput of etcd will not exceed 100MB/s (100K * 1KB value).
|
|
// Assuming the RTT is around 10ms, 1MB max size is large enough.
|
|
maxSizePerMsg = 1 * 1024 * 1024
|
|
// Never overflow the rafthttp buffer, which is 4096.
|
|
// TODO: a better const?
|
|
maxInflightMsgs = 4096 / 8
|
|
)
|
|
|
|
var (
|
|
// protects raftStatus
|
|
raftStatusMu sync.Mutex
|
|
// indirection for expvar func interface
|
|
// expvar panics when publishing duplicate name
|
|
// expvar does not support remove a registered name
|
|
// so only register a func that calls raftStatus
|
|
// and change raftStatus as we need.
|
|
raftStatus func() raft.Status
|
|
)
|
|
|
|
func init() {
|
|
raft.SetLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "raft"))
|
|
expvar.Publish("raft.status", expvar.Func(func() interface{} {
|
|
raftStatusMu.Lock()
|
|
defer raftStatusMu.Unlock()
|
|
return raftStatus()
|
|
}))
|
|
}
|
|
|
|
type RaftTimer interface {
|
|
Index() uint64
|
|
Term() uint64
|
|
}
|
|
|
|
// apply contains entries, snapshot to be applied. Once
|
|
// an apply is consumed, the entries will be persisted to
|
|
// to raft storage concurrently; the application must read
|
|
// raftDone before assuming the raft messages are stable.
|
|
type apply struct {
|
|
entries []raftpb.Entry
|
|
snapshot raftpb.Snapshot
|
|
// notifyc synchronizes etcd server applies with the raft node
|
|
notifyc chan struct{}
|
|
}
|
|
|
|
type raftNode struct {
|
|
// Cache of the latest raft index and raft term the server has seen.
|
|
// These three unit64 fields must be the first elements to keep 64-bit
|
|
// alignment for atomic access to the fields.
|
|
index uint64
|
|
term uint64
|
|
lead uint64
|
|
|
|
raftNodeConfig
|
|
|
|
// a chan to send/receive snapshot
|
|
msgSnapC chan raftpb.Message
|
|
|
|
// a chan to send out apply
|
|
applyc chan apply
|
|
|
|
// a chan to send out readState
|
|
readStateC chan raft.ReadState
|
|
|
|
// utility
|
|
ticker *time.Ticker
|
|
// contention detectors for raft heartbeat message
|
|
td *contention.TimeoutDetector
|
|
|
|
stopped chan struct{}
|
|
done chan struct{}
|
|
}
|
|
|
|
type raftNodeConfig struct {
|
|
// to check if msg receiver is removed from cluster
|
|
isIDRemoved func(id uint64) bool
|
|
raft.Node
|
|
raftStorage *raft.MemoryStorage
|
|
storage Storage
|
|
heartbeat time.Duration // for logging
|
|
// transport specifies the transport to send and receive msgs to members.
|
|
// Sending messages MUST NOT block. It is okay to drop messages, since
|
|
// clients should timeout and reissue their messages.
|
|
// If transport is nil, server will panic.
|
|
transport rafthttp.Transporter
|
|
}
|
|
|
|
func newRaftNode(cfg raftNodeConfig) *raftNode {
|
|
r := &raftNode{
|
|
raftNodeConfig: cfg,
|
|
// set up contention detectors for raft heartbeat message.
|
|
// expect to send a heartbeat within 2 heartbeat intervals.
|
|
td: contention.NewTimeoutDetector(2 * cfg.heartbeat),
|
|
readStateC: make(chan raft.ReadState, 1),
|
|
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
|
|
applyc: make(chan apply),
|
|
stopped: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
}
|
|
if r.heartbeat == 0 {
|
|
r.ticker = &time.Ticker{}
|
|
} else {
|
|
r.ticker = time.NewTicker(r.heartbeat)
|
|
}
|
|
return r
|
|
}
|
|
|
|
// start prepares and starts raftNode in a new goroutine. It is no longer safe
|
|
// to modify the fields after it has been started.
|
|
func (r *raftNode) start(rh *raftReadyHandler) {
|
|
internalTimeout := time.Second
|
|
|
|
go func() {
|
|
defer r.onStop()
|
|
islead := false
|
|
|
|
for {
|
|
select {
|
|
case <-r.ticker.C:
|
|
r.Tick()
|
|
case rd := <-r.Ready():
|
|
if rd.SoftState != nil {
|
|
newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead
|
|
if newLeader {
|
|
leaderChanges.Inc()
|
|
}
|
|
|
|
if rd.SoftState.Lead == raft.None {
|
|
hasLeader.Set(0)
|
|
} else {
|
|
hasLeader.Set(1)
|
|
}
|
|
|
|
atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
|
|
islead = rd.RaftState == raft.StateLeader
|
|
rh.updateLeadership(newLeader)
|
|
r.td.Reset()
|
|
}
|
|
|
|
if len(rd.ReadStates) != 0 {
|
|
select {
|
|
case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
|
|
case <-time.After(internalTimeout):
|
|
plog.Warningf("timed out sending read state")
|
|
case <-r.stopped:
|
|
return
|
|
}
|
|
}
|
|
|
|
notifyc := make(chan struct{}, 1)
|
|
ap := apply{
|
|
entries: rd.CommittedEntries,
|
|
snapshot: rd.Snapshot,
|
|
notifyc: notifyc,
|
|
}
|
|
|
|
updateCommittedIndex(&ap, rh)
|
|
|
|
select {
|
|
case r.applyc <- ap:
|
|
case <-r.stopped:
|
|
return
|
|
}
|
|
|
|
// the leader can write to its disk in parallel with replicating to the followers and them
|
|
// writing to their disks.
|
|
// For more details, check raft thesis 10.2.1
|
|
if islead {
|
|
// gofail: var raftBeforeLeaderSend struct{}
|
|
r.transport.Send(r.processMessages(rd.Messages))
|
|
}
|
|
|
|
// gofail: var raftBeforeSave struct{}
|
|
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
|
|
plog.Fatalf("raft save state and entries error: %v", err)
|
|
}
|
|
if !raft.IsEmptyHardState(rd.HardState) {
|
|
proposalsCommitted.Set(float64(rd.HardState.Commit))
|
|
}
|
|
// gofail: var raftAfterSave struct{}
|
|
|
|
if !raft.IsEmptySnap(rd.Snapshot) {
|
|
// gofail: var raftBeforeSaveSnap struct{}
|
|
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
|
|
plog.Fatalf("raft save snapshot error: %v", err)
|
|
}
|
|
// etcdserver now claim the snapshot has been persisted onto the disk
|
|
notifyc <- struct{}{}
|
|
|
|
// gofail: var raftAfterSaveSnap struct{}
|
|
r.raftStorage.ApplySnapshot(rd.Snapshot)
|
|
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
|
|
// gofail: var raftAfterApplySnap struct{}
|
|
}
|
|
|
|
r.raftStorage.Append(rd.Entries)
|
|
|
|
if !islead {
|
|
// finish processing incoming messages before we signal raftdone chan
|
|
msgs := r.processMessages(rd.Messages)
|
|
|
|
// now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
|
|
notifyc <- struct{}{}
|
|
|
|
// Candidate or follower needs to wait for all pending configuration
|
|
// changes to be applied before sending messages.
|
|
// Otherwise we might incorrectly count votes (e.g. votes from removed members).
|
|
// Also slow machine's follower raft-layer could proceed to become the leader
|
|
// on its own single-node cluster, before apply-layer applies the config change.
|
|
// We simply wait for ALL pending entries to be applied for now.
|
|
// We might improve this later on if it causes unnecessary long blocking issues.
|
|
waitApply := false
|
|
for _, ent := range rd.CommittedEntries {
|
|
if ent.Type == raftpb.EntryConfChange {
|
|
waitApply = true
|
|
break
|
|
}
|
|
}
|
|
if waitApply {
|
|
// blocks until 'applyAll' calls 'applyWait.Trigger'
|
|
// to be in sync with scheduled config-change job
|
|
// (assume notifyc has cap of 1)
|
|
select {
|
|
case notifyc <- struct{}{}:
|
|
case <-r.stopped:
|
|
return
|
|
}
|
|
}
|
|
|
|
// gofail: var raftBeforeFollowerSend struct{}
|
|
r.transport.Send(msgs)
|
|
} else {
|
|
// leader already processed 'MsgSnap' and signaled
|
|
notifyc <- struct{}{}
|
|
}
|
|
|
|
r.Advance()
|
|
case <-r.stopped:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func updateCommittedIndex(ap *apply, rh *raftReadyHandler) {
|
|
var ci uint64
|
|
if len(ap.entries) != 0 {
|
|
ci = ap.entries[len(ap.entries)-1].Index
|
|
}
|
|
if ap.snapshot.Metadata.Index > ci {
|
|
ci = ap.snapshot.Metadata.Index
|
|
}
|
|
if ci != 0 {
|
|
rh.updateCommittedIndex(ci)
|
|
}
|
|
}
|
|
|
|
func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
|
|
sentAppResp := false
|
|
for i := len(ms) - 1; i >= 0; i-- {
|
|
if r.isIDRemoved(ms[i].To) {
|
|
ms[i].To = 0
|
|
}
|
|
|
|
if ms[i].Type == raftpb.MsgAppResp {
|
|
if sentAppResp {
|
|
ms[i].To = 0
|
|
} else {
|
|
sentAppResp = true
|
|
}
|
|
}
|
|
|
|
if ms[i].Type == raftpb.MsgSnap {
|
|
// There are two separate data store: the store for v2, and the KV for v3.
|
|
// The msgSnap only contains the most recent snapshot of store without KV.
|
|
// So we need to redirect the msgSnap to etcd server main loop for merging in the
|
|
// current store snapshot and KV snapshot.
|
|
select {
|
|
case r.msgSnapC <- ms[i]:
|
|
default:
|
|
// drop msgSnap if the inflight chan if full.
|
|
}
|
|
ms[i].To = 0
|
|
}
|
|
if ms[i].Type == raftpb.MsgHeartbeat {
|
|
ok, exceed := r.td.Observe(ms[i].To)
|
|
if !ok {
|
|
// TODO: limit request rate.
|
|
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
|
|
plog.Warningf("server is likely overloaded")
|
|
}
|
|
}
|
|
}
|
|
return ms
|
|
}
|
|
|
|
func (r *raftNode) apply() chan apply {
|
|
return r.applyc
|
|
}
|
|
|
|
func (r *raftNode) stop() {
|
|
r.stopped <- struct{}{}
|
|
<-r.done
|
|
}
|
|
|
|
func (r *raftNode) onStop() {
|
|
r.Stop()
|
|
r.ticker.Stop()
|
|
r.transport.Stop()
|
|
if err := r.storage.Close(); err != nil {
|
|
plog.Panicf("raft close storage error: %v", err)
|
|
}
|
|
close(r.done)
|
|
}
|
|
|
|
// for testing
|
|
func (r *raftNode) pauseSending() {
|
|
p := r.transport.(rafthttp.Pausable)
|
|
p.Pause()
|
|
}
|
|
|
|
func (r *raftNode) resumeSending() {
|
|
p := r.transport.(rafthttp.Pausable)
|
|
p.Resume()
|
|
}
|
|
|
|
// advanceTicksForElection advances ticks to the node for fast election.
|
|
// This reduces the time to wait for first leader election if bootstrapping the whole
|
|
// cluster, while leaving at least 1 heartbeat for possible existing leader
|
|
// to contact it.
|
|
func advanceTicksForElection(n raft.Node, electionTicks int) {
|
|
for i := 0; i < electionTicks-1; i++ {
|
|
n.Tick()
|
|
}
|
|
}
|
|
|
|
func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
|
|
var err error
|
|
member := cl.MemberByName(cfg.Name)
|
|
metadata := pbutil.MustMarshal(
|
|
&pb.Metadata{
|
|
NodeID: uint64(member.ID),
|
|
ClusterID: uint64(cl.ID()),
|
|
},
|
|
)
|
|
if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
|
|
plog.Fatalf("create wal error: %v", err)
|
|
}
|
|
peers := make([]raft.Peer, len(ids))
|
|
for i, id := range ids {
|
|
ctx, err := json.Marshal((*cl).Member(id))
|
|
if err != nil {
|
|
plog.Panicf("marshal member should never fail: %v", err)
|
|
}
|
|
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
|
|
}
|
|
id = member.ID
|
|
plog.Infof("starting member %s in cluster %s", id, cl.ID())
|
|
s = raft.NewMemoryStorage()
|
|
c := &raft.Config{
|
|
ID: uint64(id),
|
|
ElectionTick: cfg.ElectionTicks,
|
|
HeartbeatTick: 1,
|
|
Storage: s,
|
|
MaxSizePerMsg: maxSizePerMsg,
|
|
MaxInflightMsgs: maxInflightMsgs,
|
|
CheckQuorum: true,
|
|
}
|
|
|
|
n = raft.StartNode(c, peers)
|
|
raftStatusMu.Lock()
|
|
raftStatus = n.Status
|
|
raftStatusMu.Unlock()
|
|
advanceTicksForElection(n, c.ElectionTick)
|
|
return
|
|
}
|
|
|
|
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
|
|
var walsnap walpb.Snapshot
|
|
if snapshot != nil {
|
|
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
|
}
|
|
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
|
|
|
|
plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
|
|
cl := membership.NewCluster("")
|
|
cl.SetID(cid)
|
|
s := raft.NewMemoryStorage()
|
|
if snapshot != nil {
|
|
s.ApplySnapshot(*snapshot)
|
|
}
|
|
s.SetHardState(st)
|
|
s.Append(ents)
|
|
c := &raft.Config{
|
|
ID: uint64(id),
|
|
ElectionTick: cfg.ElectionTicks,
|
|
HeartbeatTick: 1,
|
|
Storage: s,
|
|
MaxSizePerMsg: maxSizePerMsg,
|
|
MaxInflightMsgs: maxInflightMsgs,
|
|
CheckQuorum: true,
|
|
}
|
|
|
|
n := raft.RestartNode(c)
|
|
raftStatusMu.Lock()
|
|
raftStatus = n.Status
|
|
raftStatusMu.Unlock()
|
|
advanceTicksForElection(n, c.ElectionTick)
|
|
return id, cl, n, s, w
|
|
}
|
|
|
|
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
|
|
var walsnap walpb.Snapshot
|
|
if snapshot != nil {
|
|
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
|
}
|
|
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
|
|
|
|
// discard the previously uncommitted entries
|
|
for i, ent := range ents {
|
|
if ent.Index > st.Commit {
|
|
plog.Infof("discarding %d uncommitted WAL entries ", len(ents)-i)
|
|
ents = ents[:i]
|
|
break
|
|
}
|
|
}
|
|
|
|
// force append the configuration change entries
|
|
toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), uint64(id), st.Term, st.Commit)
|
|
ents = append(ents, toAppEnts...)
|
|
|
|
// force commit newly appended entries
|
|
err := w.Save(raftpb.HardState{}, toAppEnts)
|
|
if err != nil {
|
|
plog.Fatalf("%v", err)
|
|
}
|
|
if len(ents) != 0 {
|
|
st.Commit = ents[len(ents)-1].Index
|
|
}
|
|
|
|
plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
|
|
cl := membership.NewCluster("")
|
|
cl.SetID(cid)
|
|
s := raft.NewMemoryStorage()
|
|
if snapshot != nil {
|
|
s.ApplySnapshot(*snapshot)
|
|
}
|
|
s.SetHardState(st)
|
|
s.Append(ents)
|
|
c := &raft.Config{
|
|
ID: uint64(id),
|
|
ElectionTick: cfg.ElectionTicks,
|
|
HeartbeatTick: 1,
|
|
Storage: s,
|
|
MaxSizePerMsg: maxSizePerMsg,
|
|
MaxInflightMsgs: maxInflightMsgs,
|
|
}
|
|
n := raft.RestartNode(c)
|
|
raftStatus = n.Status
|
|
return id, cl, n, s, w
|
|
}
|
|
|
|
// getIDs returns an ordered set of IDs included in the given snapshot and
|
|
// the entries. The given snapshot/entries can contain two kinds of
|
|
// ID-related entry:
|
|
// - ConfChangeAddNode, in which case the contained ID will be added into the set.
|
|
// - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
|
|
func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
|
|
ids := make(map[uint64]bool)
|
|
if snap != nil {
|
|
for _, id := range snap.Metadata.ConfState.Nodes {
|
|
ids[id] = true
|
|
}
|
|
}
|
|
for _, e := range ents {
|
|
if e.Type != raftpb.EntryConfChange {
|
|
continue
|
|
}
|
|
var cc raftpb.ConfChange
|
|
pbutil.MustUnmarshal(&cc, e.Data)
|
|
switch cc.Type {
|
|
case raftpb.ConfChangeAddNode:
|
|
ids[cc.NodeID] = true
|
|
case raftpb.ConfChangeRemoveNode:
|
|
delete(ids, cc.NodeID)
|
|
case raftpb.ConfChangeUpdateNode:
|
|
// do nothing
|
|
default:
|
|
plog.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!")
|
|
}
|
|
}
|
|
sids := make(types.Uint64Slice, 0, len(ids))
|
|
for id := range ids {
|
|
sids = append(sids, id)
|
|
}
|
|
sort.Sort(sids)
|
|
return []uint64(sids)
|
|
}
|
|
|
|
// createConfigChangeEnts creates a series of Raft entries (i.e.
|
|
// EntryConfChange) to remove the set of given IDs from the cluster. The ID
|
|
// `self` is _not_ removed, even if present in the set.
|
|
// If `self` is not inside the given ids, it creates a Raft entry to add a
|
|
// default member with the given `self`.
|
|
func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
|
|
ents := make([]raftpb.Entry, 0)
|
|
next := index + 1
|
|
found := false
|
|
for _, id := range ids {
|
|
if id == self {
|
|
found = true
|
|
continue
|
|
}
|
|
cc := &raftpb.ConfChange{
|
|
Type: raftpb.ConfChangeRemoveNode,
|
|
NodeID: id,
|
|
}
|
|
e := raftpb.Entry{
|
|
Type: raftpb.EntryConfChange,
|
|
Data: pbutil.MustMarshal(cc),
|
|
Term: term,
|
|
Index: next,
|
|
}
|
|
ents = append(ents, e)
|
|
next++
|
|
}
|
|
if !found {
|
|
m := membership.Member{
|
|
ID: types.ID(self),
|
|
RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:2380"}},
|
|
}
|
|
ctx, err := json.Marshal(m)
|
|
if err != nil {
|
|
plog.Panicf("marshal member should never fail: %v", err)
|
|
}
|
|
cc := &raftpb.ConfChange{
|
|
Type: raftpb.ConfChangeAddNode,
|
|
NodeID: self,
|
|
Context: ctx,
|
|
}
|
|
e := raftpb.Entry{
|
|
Type: raftpb.EntryConfChange,
|
|
Data: pbutil.MustMarshal(cc),
|
|
Term: term,
|
|
Index: next,
|
|
}
|
|
ents = append(ents, e)
|
|
}
|
|
return ents
|
|
}
|