mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver/*, wal/*: find valid snapshots by cross checking snap files and wal snap entries
This commit is contained in:
parent
50517039ae
commit
bd16071846
@ -30,6 +30,7 @@ import (
|
||||
"go.etcd.io/etcd/v3/pkg/pbutil"
|
||||
"go.etcd.io/etcd/v3/raft"
|
||||
"go.etcd.io/etcd/v3/raft/raftpb"
|
||||
"go.etcd.io/etcd/v3/wal/walpb"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -38,7 +39,6 @@ const snapSuffix = ".snap"
|
||||
|
||||
var (
|
||||
ErrNoSnapshot = errors.New("snap: no available snapshot")
|
||||
ErrSnapshotIndex = errors.New("snap: no available snapshot index")
|
||||
ErrEmptySnapshot = errors.New("snap: empty snapshot")
|
||||
ErrCRCMismatch = errors.New("snap: crc mismatch")
|
||||
crcTable = crc32.MakeTable(crc32.Castagnoli)
|
||||
@ -103,38 +103,37 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Load returns the newest snapshot.
|
||||
func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
|
||||
return s.loadMatching(func(*raftpb.Snapshot) bool { return true })
|
||||
}
|
||||
|
||||
// LoadNewestAvailable loads the newest snapshot available that is in walSnaps.
|
||||
func (s *Snapshotter) LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error) {
|
||||
return s.loadMatching(func(snapshot *raftpb.Snapshot) bool {
|
||||
m := snapshot.Metadata
|
||||
for i := len(walSnaps) - 1; i >= 0; i-- {
|
||||
if m.Term == walSnaps[i].Term && m.Index == walSnaps[i].Index {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
// loadMatching returns the newest snapshot where matchFn returns true.
|
||||
func (s *Snapshotter) loadMatching(matchFn func(*raftpb.Snapshot) bool) (*raftpb.Snapshot, error) {
|
||||
names, err := s.snapNames()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var snap *raftpb.Snapshot
|
||||
for _, name := range names {
|
||||
if snap, err = loadSnap(s.lg, s.dir, name); err == nil {
|
||||
break
|
||||
if snap, err = loadSnap(s.lg, s.dir, name); err == nil && matchFn(snap) {
|
||||
return snap, nil
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, ErrNoSnapshot
|
||||
}
|
||||
return snap, nil
|
||||
}
|
||||
|
||||
func (s *Snapshotter) LoadIndex(i uint64) (*raftpb.Snapshot, error) {
|
||||
names, err := s.snapNames()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(names) == 0 {
|
||||
return nil, ErrNoSnapshot
|
||||
}
|
||||
|
||||
if i >= uint64(len(names)) {
|
||||
return nil, ErrSnapshotIndex
|
||||
}
|
||||
|
||||
return loadSnap(s.lg, s.dir, names[i])
|
||||
return nil, ErrNoSnapshot
|
||||
}
|
||||
|
||||
func loadSnap(lg *zap.Logger, dir, name string) (*raftpb.Snapshot, error) {
|
||||
|
@ -24,6 +24,8 @@ import (
|
||||
"testing"
|
||||
|
||||
"go.etcd.io/etcd/v3/raft/raftpb"
|
||||
"go.etcd.io/etcd/v3/wal/walpb"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -166,12 +168,47 @@ func TestLoadNewestSnap(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
g, err := ss.Load()
|
||||
if err != nil {
|
||||
t.Errorf("err = %v, want nil", err)
|
||||
cases := []struct {
|
||||
name string
|
||||
availableWalSnaps []walpb.Snapshot
|
||||
expected *raftpb.Snapshot
|
||||
}{
|
||||
{
|
||||
name: "load-newest",
|
||||
expected: &newSnap,
|
||||
},
|
||||
{
|
||||
name: "loadnewestavailable-newest",
|
||||
availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}, {Index: 5, Term: 1}},
|
||||
expected: &newSnap,
|
||||
},
|
||||
{
|
||||
name: "loadnewestavailable-newest-unsorted",
|
||||
availableWalSnaps: []walpb.Snapshot{{Index: 5, Term: 1}, {Index: 1, Term: 1}, {Index: 0, Term: 0}},
|
||||
expected: &newSnap,
|
||||
},
|
||||
{
|
||||
name: "loadnewestavailable-previous",
|
||||
availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}},
|
||||
expected: testSnap,
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(g, &newSnap) {
|
||||
t.Errorf("snap = %#v, want %#v", g, &newSnap)
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
var err error
|
||||
var g *raftpb.Snapshot
|
||||
if tc.availableWalSnaps != nil {
|
||||
g, err = ss.LoadNewestAvailable(tc.availableWalSnaps)
|
||||
} else {
|
||||
g, err = ss.Load()
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("err = %v, want nil", err)
|
||||
}
|
||||
if !reflect.DeepEqual(g, tc.expected) {
|
||||
t.Errorf("snap = %#v, want %#v", g, tc.expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -227,6 +227,8 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
r.transport.Send(r.processMessages(rd.Messages))
|
||||
}
|
||||
|
||||
// Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
|
||||
// ensure that recovery after a snapshot restore is possible.
|
||||
if !raft.IsEmptySnap(rd.Snapshot) {
|
||||
// gofail: var raftBeforeSaveSnap struct{}
|
||||
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
|
||||
|
@ -29,6 +29,11 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.etcd.io/etcd/v3/auth"
|
||||
"go.etcd.io/etcd/v3/etcdserver/api"
|
||||
"go.etcd.io/etcd/v3/etcdserver/api/membership"
|
||||
@ -59,11 +64,6 @@ import (
|
||||
"go.etcd.io/etcd/v3/raft/raftpb"
|
||||
"go.etcd.io/etcd/v3/version"
|
||||
"go.etcd.io/etcd/v3/wal"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -416,24 +416,15 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
||||
}
|
||||
|
||||
// Find a snapshot to start/restart a raft node
|
||||
for i := uint64(0); ; i++ {
|
||||
snapshot, err = ss.LoadIndex(i)
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err == snap.ErrNoSnapshot {
|
||||
break
|
||||
}
|
||||
|
||||
if checkWALSnap(cfg.Logger, cfg.WALDir(), snapshot) {
|
||||
break
|
||||
}
|
||||
|
||||
cfg.Logger.Info(
|
||||
"skip snapshot",
|
||||
zap.Uint64("index", i),
|
||||
)
|
||||
walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
|
||||
// wal log entries
|
||||
snapshot, err := ss.LoadNewestAvailable(walSnaps)
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if snapshot != nil {
|
||||
@ -2168,12 +2159,10 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
||||
}
|
||||
lg.Panic("failed to create snapshot", zap.Error(err))
|
||||
}
|
||||
// SaveSnap saves the snapshot and releases the locked wal files
|
||||
// to the snapshot index.
|
||||
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
|
||||
if err = s.r.storage.SaveSnap(snap); err != nil {
|
||||
lg.Panic("failed to save snapshot", zap.Error(err))
|
||||
}
|
||||
|
||||
if err = s.r.storage.Release(snap); err != nil {
|
||||
lg.Panic("failed to release wal", zap.Error(err))
|
||||
}
|
||||
|
@ -36,7 +36,7 @@ type Storage interface {
|
||||
SaveSnap(snap raftpb.Snapshot) error
|
||||
// Close closes the Storage and performs finalization.
|
||||
Close() error
|
||||
// Release release release the locked wal files since they will not be used.
|
||||
// Release releases the locked wal files older than the provided snapshot.
|
||||
Release(snap raftpb.Snapshot) error
|
||||
// Sync WAL
|
||||
Sync() error
|
||||
@ -51,51 +51,32 @@ func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
|
||||
return &storage{w, s}
|
||||
}
|
||||
|
||||
// SaveSnap saves the snapshot to disk and release the locked
|
||||
// wal files since they will not be used.
|
||||
// SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry.
|
||||
func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
|
||||
walsnap := walpb.Snapshot{
|
||||
Index: snap.Metadata.Index,
|
||||
Term: snap.Metadata.Term,
|
||||
}
|
||||
err := st.WAL.SaveSnapshot(walsnap)
|
||||
// save the snapshot file before writing the snapshot to the wal.
|
||||
// This makes it possible for the snapshot file to become orphaned, but prevents
|
||||
// a WAL snapshot entry from having no corresponding snapshot file.
|
||||
err := st.Snapshotter.SaveSnap(snap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// gofail: var raftBeforeWALSaveSnaphot struct{}
|
||||
|
||||
return st.Snapshotter.SaveSnap(snap)
|
||||
return st.WAL.SaveSnapshot(walsnap)
|
||||
}
|
||||
|
||||
// Release release the locks to the wal files that are older than the provided wal for the given snap.
|
||||
func (st *storage) Release(snap raftpb.Snapshot) error {
|
||||
return st.WAL.ReleaseLockTo(snap.Metadata.Index)
|
||||
}
|
||||
|
||||
func checkWALSnap(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot) bool {
|
||||
if snapshot == nil {
|
||||
lg.Fatal("checkWALSnap: snapshot is empty")
|
||||
}
|
||||
|
||||
walsnap := walpb.Snapshot{
|
||||
Index: snapshot.Metadata.Index,
|
||||
Term: snapshot.Metadata.Term,
|
||||
}
|
||||
|
||||
w, _, _, st, _ := readWAL(lg, waldir, walsnap)
|
||||
defer w.Close()
|
||||
|
||||
lg.Info(
|
||||
"checkWALSnap: snapshot and hardstate data",
|
||||
zap.Uint64("snapshot-index", snapshot.Metadata.Index),
|
||||
zap.Uint64("st-commit", st.Commit),
|
||||
)
|
||||
|
||||
if snapshot.Metadata.Index > st.Commit {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// readWAL reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
|
||||
// after the position of the given snap in the WAL.
|
||||
// The snap must have been previously saved to the WAL, or this call will panic.
|
||||
func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
|
||||
var (
|
||||
err error
|
||||
|
57
wal/wal.go
57
wal/wal.go
@ -531,6 +531,63 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
||||
return metadata, state, ents, err
|
||||
}
|
||||
|
||||
// ValidSnapshotEntries returns all the valid snapshot entries in the wal logs in the given directory.
|
||||
// Snapshot entries are valid if their index is less than or equal to the most recent committed hardstate.
|
||||
func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, error) {
|
||||
var snaps []walpb.Snapshot
|
||||
var state raftpb.HardState
|
||||
var err error
|
||||
|
||||
rec := &walpb.Record{}
|
||||
names, err := readWALNames(lg, walDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// open wal files in read mode, so that there is no conflict
|
||||
// when the same WAL is opened elsewhere in write mode
|
||||
rs, _, closer, err := openWALFiles(lg, walDir, names, 0, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if closer != nil {
|
||||
closer()
|
||||
}
|
||||
}()
|
||||
|
||||
// create a new decoder from the readers on the WAL files
|
||||
decoder := newDecoder(rs...)
|
||||
|
||||
for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
|
||||
switch rec.Type {
|
||||
case snapshotType:
|
||||
var loadedSnap walpb.Snapshot
|
||||
pbutil.MustUnmarshal(&loadedSnap, rec.Data)
|
||||
snaps = append(snaps, loadedSnap)
|
||||
case stateType:
|
||||
state = mustUnmarshalState(rec.Data)
|
||||
}
|
||||
}
|
||||
// We do not have to read out all the WAL entries
|
||||
// as the decoder is opened in read mode.
|
||||
if err != io.EOF && err != io.ErrUnexpectedEOF {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// filter out any snaps that are newer than the committed hardstate
|
||||
n := 0
|
||||
for _, s := range snaps {
|
||||
if s.Index <= state.Commit {
|
||||
snaps[n] = s
|
||||
n++
|
||||
}
|
||||
}
|
||||
snaps = snaps[:n]
|
||||
|
||||
return snaps, nil
|
||||
}
|
||||
|
||||
// Verify reads through the given WAL and verifies that it is not corrupted.
|
||||
// It creates a new decoder to read through the records of the given WAL.
|
||||
// It does not conflict with any open WAL, but it is recommended not to
|
||||
|
@ -1000,3 +1000,60 @@ func TestReadAllFail(t *testing.T) {
|
||||
t.Fatalf("err = %v, want ErrDecoderNotFound", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestValidSnapshotEntries ensures ValidSnapshotEntries returns all valid wal snapshot entries, accounting
|
||||
// for hardstate
|
||||
func TestValidSnapshotEntries(t *testing.T) {
|
||||
p, err := ioutil.TempDir(os.TempDir(), "waltest")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(p)
|
||||
snap0 := walpb.Snapshot{Index: 0, Term: 0}
|
||||
snap1 := walpb.Snapshot{Index: 1, Term: 1}
|
||||
snap2 := walpb.Snapshot{Index: 2, Term: 1}
|
||||
snap3 := walpb.Snapshot{Index: 3, Term: 2}
|
||||
snap4 := walpb.Snapshot{Index: 4, Term: 2}
|
||||
func() {
|
||||
w, err := Create(zap.NewExample(), p, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
// snap0 is implicitly created at index 0, term 0
|
||||
if err = w.SaveSnapshot(snap1); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
state := raftpb.HardState{Commit: 1, Term: 1}
|
||||
if err = w.Save(state, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.SaveSnapshot(snap2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.SaveSnapshot(snap3); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
state2 := raftpb.HardState{Commit: 3, Term: 2}
|
||||
if err = w.Save(state2, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.SaveSnapshot(snap4); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
walSnaps, err := ValidSnapshotEntries(zap.NewExample(), p)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
expected := []walpb.Snapshot{snap0, snap1, snap2, snap3}
|
||||
if len(walSnaps) != len(expected) {
|
||||
t.Fatalf("expected 4 walSnaps, got %d", len(expected))
|
||||
}
|
||||
for i := 0; i < len(expected); i++ {
|
||||
if walSnaps[i].Index != expected[i].Index || walSnaps[i].Term != expected[i].Term {
|
||||
t.Errorf("expected walSnaps %+v at index %d, got %+v", expected[i], i, walSnaps[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user