David Wyrobnik 3152dc8174 contrib/raftexample: Save snapshot and WAL before hard state
Update raftexample to save the snapshot file and WAL snapshot entry
before hardstate to ensure the snapshot exists during recovery.
Otherwise if there is a failure after storing the hard state there may
be reference to a non-existent snapshot.
This PR introduces the fix from #10219 to the raftexample.
2022-04-11 23:44:54 +00:00

523 lines
14 KiB
Go

// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"fmt"
"log"
"net/http"
"net/url"
"os"
"strconv"
"time"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.uber.org/zap"
)
type commit struct {
data []string
applyDoneC chan<- struct{}
}
// A key-value stream backed by raft
type raftNode struct {
proposeC <-chan string // proposed messages (k,v)
confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
commitC chan<- *commit // entries committed to log (k,v)
errorC chan<- error // errors from raft session
id int // client ID for raft session
peers []string // raft peer URLs
join bool // node is joining an existing cluster
waldir string // path to WAL directory
snapdir string // path to snapshot directory
getSnapshot func() ([]byte, error)
confState raftpb.ConfState
snapshotIndex uint64
appliedIndex uint64
// raft backing for the commit/error channel
node raft.Node
raftStorage *raft.MemoryStorage
wal *wal.WAL
snapshotter *snap.Snapshotter
snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready
snapCount uint64
transport *rafthttp.Transport
stopc chan struct{} // signals proposal channel closed
httpstopc chan struct{} // signals http server to shutdown
httpdonec chan struct{} // signals http server shutdown complete
logger *zap.Logger
}
var defaultSnapshotCount uint64 = 10000
// newRaftNode initiates a raft instance and returns a committed log entry
// channel and error channel. Proposals for log updates are sent over the
// provided the proposal channel. All log entries are replayed over the
// commit channel, followed by a nil message (to indicate the channel is
// current), then new log entries. To shutdown, close proposeC and read errorC.
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
confChangeC <-chan raftpb.ConfChange) (<-chan *commit, <-chan error, <-chan *snap.Snapshotter) {
commitC := make(chan *commit)
errorC := make(chan error)
rc := &raftNode{
proposeC: proposeC,
confChangeC: confChangeC,
commitC: commitC,
errorC: errorC,
id: id,
peers: peers,
join: join,
waldir: fmt.Sprintf("raftexample-%d", id),
snapdir: fmt.Sprintf("raftexample-%d-snap", id),
getSnapshot: getSnapshot,
snapCount: defaultSnapshotCount,
stopc: make(chan struct{}),
httpstopc: make(chan struct{}),
httpdonec: make(chan struct{}),
logger: zap.NewExample(),
snapshotterReady: make(chan *snap.Snapshotter, 1),
// rest of structure populated after WAL replay
}
go rc.startRaft()
return commitC, errorC, rc.snapshotterReady
}
func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error {
walSnap := walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
ConfState: &snap.Metadata.ConfState,
}
// save the snapshot file before writing the snapshot to the wal.
// This makes it possible for the snapshot file to become orphaned, but prevents
// a WAL snapshot entry from having no corresponding snapshot file.
if err := rc.snapshotter.SaveSnap(snap); err != nil {
return err
}
if err := rc.wal.SaveSnapshot(walSnap); err != nil {
return err
}
return rc.wal.ReleaseLockTo(snap.Metadata.Index)
}
func (rc *raftNode) entriesToApply(ents []raftpb.Entry) (nents []raftpb.Entry) {
if len(ents) == 0 {
return ents
}
firstIdx := ents[0].Index
if firstIdx > rc.appliedIndex+1 {
log.Fatalf("first index of committed entry[%d] should <= progress.appliedIndex[%d]+1", firstIdx, rc.appliedIndex)
}
if rc.appliedIndex-firstIdx+1 < uint64(len(ents)) {
nents = ents[rc.appliedIndex-firstIdx+1:]
}
return nents
}
// publishEntries writes committed log entries to commit channel and returns
// whether all entries could be published.
func (rc *raftNode) publishEntries(ents []raftpb.Entry) (<-chan struct{}, bool) {
if len(ents) == 0 {
return nil, true
}
data := make([]string, 0, len(ents))
for i := range ents {
switch ents[i].Type {
case raftpb.EntryNormal:
if len(ents[i].Data) == 0 {
// ignore empty messages
break
}
s := string(ents[i].Data)
data = append(data, s)
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
cc.Unmarshal(ents[i].Data)
rc.confState = *rc.node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
if len(cc.Context) > 0 {
rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)})
}
case raftpb.ConfChangeRemoveNode:
if cc.NodeID == uint64(rc.id) {
log.Println("I've been removed from the cluster! Shutting down.")
return nil, false
}
rc.transport.RemovePeer(types.ID(cc.NodeID))
}
}
}
var applyDoneC chan struct{}
if len(data) > 0 {
applyDoneC = make(chan struct{}, 1)
select {
case rc.commitC <- &commit{data, applyDoneC}:
case <-rc.stopc:
return nil, false
}
}
// after commit, update appliedIndex
rc.appliedIndex = ents[len(ents)-1].Index
return applyDoneC, true
}
func (rc *raftNode) loadSnapshot() *raftpb.Snapshot {
if wal.Exist(rc.waldir) {
walSnaps, err := wal.ValidSnapshotEntries(rc.logger, rc.waldir)
if err != nil {
log.Fatalf("raftexample: error listing snapshots (%v)", err)
}
snapshot, err := rc.snapshotter.LoadNewestAvailable(walSnaps)
if err != nil && err != snap.ErrNoSnapshot {
log.Fatalf("raftexample: error loading snapshot (%v)", err)
}
return snapshot
}
return &raftpb.Snapshot{}
}
// openWAL returns a WAL ready for reading.
func (rc *raftNode) openWAL(snapshot *raftpb.Snapshot) *wal.WAL {
if !wal.Exist(rc.waldir) {
if err := os.Mkdir(rc.waldir, 0750); err != nil {
log.Fatalf("raftexample: cannot create dir for wal (%v)", err)
}
w, err := wal.Create(zap.NewExample(), rc.waldir, nil)
if err != nil {
log.Fatalf("raftexample: create wal error (%v)", err)
}
w.Close()
}
walsnap := walpb.Snapshot{}
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
log.Printf("loading WAL at term %d and index %d", walsnap.Term, walsnap.Index)
w, err := wal.Open(zap.NewExample(), rc.waldir, walsnap)
if err != nil {
log.Fatalf("raftexample: error loading wal (%v)", err)
}
return w
}
// replayWAL replays WAL entries into the raft instance.
func (rc *raftNode) replayWAL() *wal.WAL {
log.Printf("replaying WAL of member %d", rc.id)
snapshot := rc.loadSnapshot()
w := rc.openWAL(snapshot)
_, st, ents, err := w.ReadAll()
if err != nil {
log.Fatalf("raftexample: failed to read WAL (%v)", err)
}
rc.raftStorage = raft.NewMemoryStorage()
if snapshot != nil {
rc.raftStorage.ApplySnapshot(*snapshot)
}
rc.raftStorage.SetHardState(st)
// append to storage so raft starts at the right place in log
rc.raftStorage.Append(ents)
return w
}
func (rc *raftNode) writeError(err error) {
rc.stopHTTP()
close(rc.commitC)
rc.errorC <- err
close(rc.errorC)
rc.node.Stop()
}
func (rc *raftNode) startRaft() {
if !fileutil.Exist(rc.snapdir) {
if err := os.Mkdir(rc.snapdir, 0750); err != nil {
log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)
}
}
rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir)
oldwal := wal.Exist(rc.waldir)
rc.wal = rc.replayWAL()
// signal replay has finished
rc.snapshotterReady <- rc.snapshotter
rpeers := make([]raft.Peer, len(rc.peers))
for i := range rpeers {
rpeers[i] = raft.Peer{ID: uint64(i + 1)}
}
c := &raft.Config{
ID: uint64(rc.id),
ElectionTick: 10,
HeartbeatTick: 1,
Storage: rc.raftStorage,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
MaxUncommittedEntriesSize: 1 << 30,
}
if oldwal || rc.join {
rc.node = raft.RestartNode(c)
} else {
rc.node = raft.StartNode(c, rpeers)
}
rc.transport = &rafthttp.Transport{
Logger: rc.logger,
ID: types.ID(rc.id),
ClusterID: 0x1000,
Raft: rc,
ServerStats: stats.NewServerStats("", ""),
LeaderStats: stats.NewLeaderStats(zap.NewExample(), strconv.Itoa(rc.id)),
ErrorC: make(chan error),
}
rc.transport.Start()
for i := range rc.peers {
if i+1 != rc.id {
rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
}
}
go rc.serveRaft()
go rc.serveChannels()
}
// stop closes http, closes all channels, and stops raft.
func (rc *raftNode) stop() {
rc.stopHTTP()
close(rc.commitC)
close(rc.errorC)
rc.node.Stop()
}
func (rc *raftNode) stopHTTP() {
rc.transport.Stop()
close(rc.httpstopc)
<-rc.httpdonec
}
func (rc *raftNode) publishSnapshot(snapshotToSave raftpb.Snapshot) {
if raft.IsEmptySnap(snapshotToSave) {
return
}
log.Printf("publishing snapshot at index %d", rc.snapshotIndex)
defer log.Printf("finished publishing snapshot at index %d", rc.snapshotIndex)
if snapshotToSave.Metadata.Index <= rc.appliedIndex {
log.Fatalf("snapshot index [%d] should > progress.appliedIndex [%d]", snapshotToSave.Metadata.Index, rc.appliedIndex)
}
rc.commitC <- nil // trigger kvstore to load snapshot
rc.confState = snapshotToSave.Metadata.ConfState
rc.snapshotIndex = snapshotToSave.Metadata.Index
rc.appliedIndex = snapshotToSave.Metadata.Index
}
var snapshotCatchUpEntriesN uint64 = 10000
func (rc *raftNode) maybeTriggerSnapshot(applyDoneC <-chan struct{}) {
if rc.appliedIndex-rc.snapshotIndex <= rc.snapCount {
return
}
// wait until all committed entries are applied (or server is closed)
if applyDoneC != nil {
select {
case <-applyDoneC:
case <-rc.stopc:
return
}
}
log.Printf("start snapshot [applied index: %d | last snapshot index: %d]", rc.appliedIndex, rc.snapshotIndex)
data, err := rc.getSnapshot()
if err != nil {
log.Panic(err)
}
snap, err := rc.raftStorage.CreateSnapshot(rc.appliedIndex, &rc.confState, data)
if err != nil {
panic(err)
}
if err := rc.saveSnap(snap); err != nil {
panic(err)
}
compactIndex := uint64(1)
if rc.appliedIndex > snapshotCatchUpEntriesN {
compactIndex = rc.appliedIndex - snapshotCatchUpEntriesN
}
if err := rc.raftStorage.Compact(compactIndex); err != nil {
panic(err)
}
log.Printf("compacted log at index %d", compactIndex)
rc.snapshotIndex = rc.appliedIndex
}
func (rc *raftNode) serveChannels() {
snap, err := rc.raftStorage.Snapshot()
if err != nil {
panic(err)
}
rc.confState = snap.Metadata.ConfState
rc.snapshotIndex = snap.Metadata.Index
rc.appliedIndex = snap.Metadata.Index
defer rc.wal.Close()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
// send proposals over raft
go func() {
confChangeCount := uint64(0)
for rc.proposeC != nil && rc.confChangeC != nil {
select {
case prop, ok := <-rc.proposeC:
if !ok {
rc.proposeC = nil
} else {
// blocks until accepted by raft state machine
rc.node.Propose(context.TODO(), []byte(prop))
}
case cc, ok := <-rc.confChangeC:
if !ok {
rc.confChangeC = nil
} else {
confChangeCount++
cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc)
}
}
}
// client closed channel; shutdown raft if not already
close(rc.stopc)
}()
// event loop on raft state machine updates
for {
select {
case <-ticker.C:
rc.node.Tick()
// store raft entries to wal, then publish over commit channel
case rd := <-rc.node.Ready():
// Must save the snapshot file and WAL snapshot entry before saving any other entries
// or hardstate to ensure that recovery after a snapshot restore is possible.
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
}
rc.wal.Save(rd.HardState, rd.Entries)
if !raft.IsEmptySnap(rd.Snapshot) {
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}
rc.raftStorage.Append(rd.Entries)
rc.transport.Send(rc.processMessages(rd.Messages))
applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
if !ok {
rc.stop()
return
}
rc.maybeTriggerSnapshot(applyDoneC)
rc.node.Advance()
case err := <-rc.transport.ErrorC:
rc.writeError(err)
return
case <-rc.stopc:
rc.stop()
return
}
}
}
// When there is a `raftpb.EntryConfChange` after creating the snapshot,
// then the confState included in the snapshot is out of date. so We need
// to update the confState before sending a snapshot to a follower.
func (rc *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
for i := 0; i < len(ms); i++ {
if ms[i].Type == raftpb.MsgSnap {
ms[i].Snapshot.Metadata.ConfState = rc.confState
}
}
return ms
}
func (rc *raftNode) serveRaft() {
url, err := url.Parse(rc.peers[rc.id-1])
if err != nil {
log.Fatalf("raftexample: Failed parsing URL (%v)", err)
}
ln, err := newStoppableListener(url.Host, rc.httpstopc)
if err != nil {
log.Fatalf("raftexample: Failed to listen rafthttp (%v)", err)
}
err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln)
select {
case <-rc.httpstopc:
default:
log.Fatalf("raftexample: Failed to serve rafthttp (%v)", err)
}
close(rc.httpdonec)
}
func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error {
return rc.node.Step(ctx, m)
}
func (rc *raftNode) IsIDRemoved(id uint64) bool { return false }
func (rc *raftNode) ReportUnreachable(id uint64) { rc.node.ReportUnreachable(id) }
func (rc *raftNode) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
rc.node.ReportSnapshot(id, status)
}