mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #11888 from jpbetz/out-of-range-fix
Fix state.commit is out of range on restart
This commit is contained in:
commit
3488e25a75
@ -22,6 +22,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@ -30,6 +31,7 @@ import (
|
||||
"go.etcd.io/etcd/v3/pkg/pbutil"
|
||||
"go.etcd.io/etcd/v3/raft"
|
||||
"go.etcd.io/etcd/v3/raft/raftpb"
|
||||
"go.etcd.io/etcd/v3/wal/walpb"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -102,21 +104,37 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Load returns the newest snapshot.
|
||||
func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
|
||||
return s.loadMatching(func(*raftpb.Snapshot) bool { return true })
|
||||
}
|
||||
|
||||
// LoadNewestAvailable loads the newest snapshot available that is in walSnaps.
|
||||
func (s *Snapshotter) LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error) {
|
||||
return s.loadMatching(func(snapshot *raftpb.Snapshot) bool {
|
||||
m := snapshot.Metadata
|
||||
for i := len(walSnaps) - 1; i >= 0; i-- {
|
||||
if m.Term == walSnaps[i].Term && m.Index == walSnaps[i].Index {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
// loadMatching returns the newest snapshot where matchFn returns true.
|
||||
func (s *Snapshotter) loadMatching(matchFn func(*raftpb.Snapshot) bool) (*raftpb.Snapshot, error) {
|
||||
names, err := s.snapNames()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var snap *raftpb.Snapshot
|
||||
for _, name := range names {
|
||||
if snap, err = loadSnap(s.lg, s.dir, name); err == nil {
|
||||
break
|
||||
if snap, err = loadSnap(s.lg, s.dir, name); err == nil && matchFn(snap) {
|
||||
return snap, nil
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, ErrNoSnapshot
|
||||
}
|
||||
return snap, nil
|
||||
return nil, ErrNoSnapshot
|
||||
}
|
||||
|
||||
func loadSnap(lg *zap.Logger, dir, name string) (*raftpb.Snapshot, error) {
|
||||
@ -248,3 +266,31 @@ func (s *Snapshotter) cleanupSnapdir(filenames []string) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Snapshotter) ReleaseSnapDBs(snap raftpb.Snapshot) error {
|
||||
dir, err := os.Open(s.dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer dir.Close()
|
||||
filenames, err := dir.Readdirnames(-1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, filename := range filenames {
|
||||
if strings.HasSuffix(filename, ".snap.db") {
|
||||
hexIndex := strings.TrimSuffix(filepath.Base(filename), ".snap.db")
|
||||
index, err := strconv.ParseUint(hexIndex, 16, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse index from .snap.db filename '%s': %w", filename, err)
|
||||
}
|
||||
if index < snap.Metadata.Index {
|
||||
s.lg.Info("found orphaned .snap.db file; deleting", zap.String("path", filename))
|
||||
if rmErr := os.Remove(filepath.Join(s.dir, filename)); rmErr != nil && !os.IsNotExist(rmErr) {
|
||||
return fmt.Errorf("failed to remove orphaned defragmentation file %s: %v", filename, rmErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ package snap
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"go.etcd.io/etcd/v3/pkg/fileutil"
|
||||
"hash/crc32"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
@ -24,6 +25,8 @@ import (
|
||||
"testing"
|
||||
|
||||
"go.etcd.io/etcd/v3/raft/raftpb"
|
||||
"go.etcd.io/etcd/v3/wal/walpb"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -166,12 +169,47 @@ func TestLoadNewestSnap(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
g, err := ss.Load()
|
||||
if err != nil {
|
||||
t.Errorf("err = %v, want nil", err)
|
||||
cases := []struct {
|
||||
name string
|
||||
availableWalSnaps []walpb.Snapshot
|
||||
expected *raftpb.Snapshot
|
||||
}{
|
||||
{
|
||||
name: "load-newest",
|
||||
expected: &newSnap,
|
||||
},
|
||||
{
|
||||
name: "loadnewestavailable-newest",
|
||||
availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}, {Index: 5, Term: 1}},
|
||||
expected: &newSnap,
|
||||
},
|
||||
{
|
||||
name: "loadnewestavailable-newest-unsorted",
|
||||
availableWalSnaps: []walpb.Snapshot{{Index: 5, Term: 1}, {Index: 1, Term: 1}, {Index: 0, Term: 0}},
|
||||
expected: &newSnap,
|
||||
},
|
||||
{
|
||||
name: "loadnewestavailable-previous",
|
||||
availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}},
|
||||
expected: testSnap,
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(g, &newSnap) {
|
||||
t.Errorf("snap = %#v, want %#v", g, &newSnap)
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
var err error
|
||||
var g *raftpb.Snapshot
|
||||
if tc.availableWalSnaps != nil {
|
||||
g, err = ss.LoadNewestAvailable(tc.availableWalSnaps)
|
||||
} else {
|
||||
g, err = ss.Load()
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("err = %v, want nil", err)
|
||||
}
|
||||
if !reflect.DeepEqual(g, tc.expected) {
|
||||
t.Errorf("snap = %#v, want %#v", g, tc.expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -229,3 +267,42 @@ func TestAllSnapshotBroken(t *testing.T) {
|
||||
t.Errorf("err = %v, want %v", err, ErrNoSnapshot)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReleaseSnapDBs(t *testing.T) {
|
||||
dir := filepath.Join(os.TempDir(), "snapshot")
|
||||
err := os.Mkdir(dir, 0700)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
snapIndices := []uint64{100, 200, 300, 400}
|
||||
for _, index := range snapIndices {
|
||||
filename := filepath.Join(dir, fmt.Sprintf("%016x.snap.db", index))
|
||||
if err := ioutil.WriteFile(filename, []byte("snap file\n"), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
ss := New(zap.NewExample(), dir)
|
||||
|
||||
if err := ss.ReleaseSnapDBs(raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 300}}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
deleted := []uint64{100, 200}
|
||||
for _, index := range deleted {
|
||||
filename := filepath.Join(dir, fmt.Sprintf("%016x.snap.db", index))
|
||||
if fileutil.Exist(filename) {
|
||||
t.Errorf("expected %s (index: %d) to be deleted, but it still exists", filename, index)
|
||||
}
|
||||
}
|
||||
|
||||
retained := []uint64{300, 400}
|
||||
for _, index := range retained {
|
||||
filename := filepath.Join(dir, fmt.Sprintf("%016x.snap.db", index))
|
||||
if !fileutil.Exist(filename) {
|
||||
t.Errorf("expected %s (index: %d) to be retained, but it no longer exists", filename, index)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -227,6 +227,16 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
r.transport.Send(r.processMessages(rd.Messages))
|
||||
}
|
||||
|
||||
// Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
|
||||
// ensure that recovery after a snapshot restore is possible.
|
||||
if !raft.IsEmptySnap(rd.Snapshot) {
|
||||
// gofail: var raftBeforeSaveSnap struct{}
|
||||
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
|
||||
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
|
||||
}
|
||||
// gofail: var raftAfterSaveSnap struct{}
|
||||
}
|
||||
|
||||
// gofail: var raftBeforeSave struct{}
|
||||
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
|
||||
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
|
||||
@ -237,17 +247,26 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
// gofail: var raftAfterSave struct{}
|
||||
|
||||
if !raft.IsEmptySnap(rd.Snapshot) {
|
||||
// gofail: var raftBeforeSaveSnap struct{}
|
||||
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
|
||||
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
|
||||
// Force WAL to fsync its hard state before Release() releases
|
||||
// old data from the WAL. Otherwise could get an error like:
|
||||
// panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
|
||||
// See https://github.com/etcd-io/etcd/issues/10219 for more details.
|
||||
if err := r.storage.Sync(); err != nil {
|
||||
r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
|
||||
}
|
||||
|
||||
// etcdserver now claim the snapshot has been persisted onto the disk
|
||||
notifyc <- struct{}{}
|
||||
|
||||
// gofail: var raftAfterSaveSnap struct{}
|
||||
// gofail: var raftBeforeApplySnap struct{}
|
||||
r.raftStorage.ApplySnapshot(rd.Snapshot)
|
||||
r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
|
||||
// gofail: var raftAfterApplySnap struct{}
|
||||
|
||||
if err := r.storage.Release(rd.Snapshot); err != nil {
|
||||
r.lg.Fatal("failed to release Raft wal", zap.Error(err))
|
||||
}
|
||||
// gofail: var raftAfterWALRelease struct{}
|
||||
}
|
||||
|
||||
r.raftStorage.Append(rd.Entries)
|
||||
|
@ -29,6 +29,11 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.etcd.io/etcd/v3/auth"
|
||||
"go.etcd.io/etcd/v3/etcdserver/api"
|
||||
"go.etcd.io/etcd/v3/etcdserver/api/membership"
|
||||
@ -59,11 +64,6 @@ import (
|
||||
"go.etcd.io/etcd/v3/raft/raftpb"
|
||||
"go.etcd.io/etcd/v3/version"
|
||||
"go.etcd.io/etcd/v3/wal"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -414,10 +414,19 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
||||
zap.String("wal-dir", cfg.WALDir()),
|
||||
)
|
||||
}
|
||||
snapshot, err = ss.Load()
|
||||
|
||||
// Find a snapshot to start/restart a raft node
|
||||
walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
|
||||
// wal log entries
|
||||
snapshot, err := ss.LoadNewestAvailable(walSnaps)
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if snapshot != nil {
|
||||
if err = st.Recovery(snapshot.Data); err != nil {
|
||||
cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err))
|
||||
@ -2150,11 +2159,14 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
||||
}
|
||||
lg.Panic("failed to create snapshot", zap.Error(err))
|
||||
}
|
||||
// SaveSnap saves the snapshot and releases the locked wal files
|
||||
// to the snapshot index.
|
||||
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
|
||||
if err = s.r.storage.SaveSnap(snap); err != nil {
|
||||
lg.Panic("failed to save snapshot", zap.Error(err))
|
||||
}
|
||||
if err = s.r.storage.Release(snap); err != nil {
|
||||
lg.Panic("failed to release wal", zap.Error(err))
|
||||
}
|
||||
|
||||
lg.Info(
|
||||
"saved snapshot",
|
||||
zap.Uint64("snapshot-index", snap.Metadata.Index),
|
||||
|
@ -995,15 +995,19 @@ func TestSnapshot(t *testing.T) {
|
||||
ch := make(chan struct{}, 2)
|
||||
|
||||
go func() {
|
||||
gaction, _ := p.Wait(1)
|
||||
gaction, _ := p.Wait(2)
|
||||
defer func() { ch <- struct{}{} }()
|
||||
|
||||
if len(gaction) != 1 {
|
||||
t.Errorf("len(action) = %d, want 1", len(gaction))
|
||||
if len(gaction) != 2 {
|
||||
t.Fatalf("len(action) = %d, want 2", len(gaction))
|
||||
}
|
||||
if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) {
|
||||
t.Errorf("action = %s, want SaveSnap", gaction[0])
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "Release"}) {
|
||||
t.Errorf("action = %s, want Release", gaction[1])
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
@ -1087,20 +1091,32 @@ func TestSnapshotOrdering(t *testing.T) {
|
||||
n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot}
|
||||
}()
|
||||
|
||||
ac := <-p.Chan()
|
||||
if ac.Name != "Save" {
|
||||
t.Fatalf("expected Save, got %+v", ac)
|
||||
}
|
||||
|
||||
if ac := <-p.Chan(); ac.Name != "SaveSnap" {
|
||||
t.Fatalf("expected SaveSnap, got %+v", ac)
|
||||
}
|
||||
|
||||
if ac := <-p.Chan(); ac.Name != "Save" {
|
||||
t.Fatalf("expected Save, got %+v", ac)
|
||||
}
|
||||
if ac := <-p.Chan(); ac.Name != "Save" {
|
||||
t.Fatalf("expected Save, got %+v", ac)
|
||||
}
|
||||
|
||||
// confirm snapshot file still present before calling SaveSnap
|
||||
snapPath := filepath.Join(snapdir, fmt.Sprintf("%016x.snap.db", 1))
|
||||
if !fileutil.Exist(snapPath) {
|
||||
t.Fatalf("expected file %q, got missing", snapPath)
|
||||
}
|
||||
|
||||
// unblock SaveSnapshot, etcdserver now permitted to move snapshot file
|
||||
if ac := <-p.Chan(); ac.Name != "SaveSnap" {
|
||||
t.Fatalf("expected SaveSnap, got %+v", ac)
|
||||
if ac := <-p.Chan(); ac.Name != "Sync" {
|
||||
t.Fatalf("expected Sync, got %+v", ac)
|
||||
}
|
||||
|
||||
if ac := <-p.Chan(); ac.Name != "Release" {
|
||||
t.Fatalf("expected Release, got %+v", ac)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1140,16 +1156,22 @@ func TestTriggerSnap(t *testing.T) {
|
||||
|
||||
donec := make(chan struct{})
|
||||
go func() {
|
||||
wcnt := 2 + snapc
|
||||
wcnt := 3 + snapc
|
||||
gaction, _ := p.Wait(wcnt)
|
||||
|
||||
// each operation is recorded as a Save
|
||||
// (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap
|
||||
// (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap + Release
|
||||
if len(gaction) != wcnt {
|
||||
t.Errorf("len(action) = %d, want %d", len(gaction), wcnt)
|
||||
t.Logf("gaction: %v", gaction)
|
||||
t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
|
||||
}
|
||||
if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) {
|
||||
t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1])
|
||||
|
||||
if !reflect.DeepEqual(gaction[wcnt-2], testutil.Action{Name: "SaveSnap"}) {
|
||||
t.Errorf("action = %s, want SaveSnap", gaction[wcnt-2])
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "Release"}) {
|
||||
t.Errorf("action = %s, want Release", gaction[wcnt-1])
|
||||
}
|
||||
close(donec)
|
||||
}()
|
||||
|
@ -36,6 +36,10 @@ type Storage interface {
|
||||
SaveSnap(snap raftpb.Snapshot) error
|
||||
// Close closes the Storage and performs finalization.
|
||||
Close() error
|
||||
// Release releases the locked wal files older than the provided snapshot.
|
||||
Release(snap raftpb.Snapshot) error
|
||||
// Sync WAL
|
||||
Sync() error
|
||||
}
|
||||
|
||||
type storage struct {
|
||||
@ -47,24 +51,37 @@ func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
|
||||
return &storage{w, s}
|
||||
}
|
||||
|
||||
// SaveSnap saves the snapshot to disk and release the locked
|
||||
// wal files since they will not be used.
|
||||
// SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry.
|
||||
func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
|
||||
walsnap := walpb.Snapshot{
|
||||
Index: snap.Metadata.Index,
|
||||
Term: snap.Metadata.Term,
|
||||
}
|
||||
err := st.WAL.SaveSnapshot(walsnap)
|
||||
// save the snapshot file before writing the snapshot to the wal.
|
||||
// This makes it possible for the snapshot file to become orphaned, but prevents
|
||||
// a WAL snapshot entry from having no corresponding snapshot file.
|
||||
err := st.Snapshotter.SaveSnap(snap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = st.Snapshotter.SaveSnap(snap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return st.WAL.ReleaseLockTo(snap.Metadata.Index)
|
||||
// gofail: var raftBeforeWALSaveSnaphot struct{}
|
||||
|
||||
return st.WAL.SaveSnapshot(walsnap)
|
||||
}
|
||||
|
||||
// Release releases resources older than the given snap and are no longer needed:
|
||||
// - releases the locks to the wal files that are older than the provided wal for the given snap.
|
||||
// - deletes any .snap.db files that are older than the given snap.
|
||||
func (st *storage) Release(snap raftpb.Snapshot) error {
|
||||
if err := st.WAL.ReleaseLockTo(snap.Metadata.Index); err != nil {
|
||||
return err
|
||||
}
|
||||
return st.Snapshotter.ReleaseSnapDBs(snap)
|
||||
}
|
||||
|
||||
// readWAL reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
|
||||
// after the position of the given snap in the WAL.
|
||||
// The snap must have been previously saved to the WAL, or this call will panic.
|
||||
func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
|
||||
var (
|
||||
err error
|
||||
|
@ -45,4 +45,16 @@ func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *storageRecorder) Release(st raftpb.Snapshot) error {
|
||||
if !raft.IsEmptySnap(st) {
|
||||
p.Record(testutil.Action{Name: "Release"})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *storageRecorder) Sync() error {
|
||||
p.Record(testutil.Action{Name: "Sync"})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *storageRecorder) Close() error { return nil }
|
||||
|
61
wal/wal.go
61
wal/wal.go
@ -531,6 +531,63 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
||||
return metadata, state, ents, err
|
||||
}
|
||||
|
||||
// ValidSnapshotEntries returns all the valid snapshot entries in the wal logs in the given directory.
|
||||
// Snapshot entries are valid if their index is less than or equal to the most recent committed hardstate.
|
||||
func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, error) {
|
||||
var snaps []walpb.Snapshot
|
||||
var state raftpb.HardState
|
||||
var err error
|
||||
|
||||
rec := &walpb.Record{}
|
||||
names, err := readWALNames(lg, walDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// open wal files in read mode, so that there is no conflict
|
||||
// when the same WAL is opened elsewhere in write mode
|
||||
rs, _, closer, err := openWALFiles(lg, walDir, names, 0, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if closer != nil {
|
||||
closer()
|
||||
}
|
||||
}()
|
||||
|
||||
// create a new decoder from the readers on the WAL files
|
||||
decoder := newDecoder(rs...)
|
||||
|
||||
for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
|
||||
switch rec.Type {
|
||||
case snapshotType:
|
||||
var loadedSnap walpb.Snapshot
|
||||
pbutil.MustUnmarshal(&loadedSnap, rec.Data)
|
||||
snaps = append(snaps, loadedSnap)
|
||||
case stateType:
|
||||
state = mustUnmarshalState(rec.Data)
|
||||
}
|
||||
}
|
||||
// We do not have to read out all the WAL entries
|
||||
// as the decoder is opened in read mode.
|
||||
if err != io.EOF && err != io.ErrUnexpectedEOF {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// filter out any snaps that are newer than the committed hardstate
|
||||
n := 0
|
||||
for _, s := range snaps {
|
||||
if s.Index <= state.Commit {
|
||||
snaps[n] = s
|
||||
n++
|
||||
}
|
||||
}
|
||||
snaps = snaps[:n:n]
|
||||
|
||||
return snaps, nil
|
||||
}
|
||||
|
||||
// Verify reads through the given WAL and verifies that it is not corrupted.
|
||||
// It creates a new decoder to read through the records of the given WAL.
|
||||
// It does not conflict with any open WAL, but it is recommended not to
|
||||
@ -723,6 +780,10 @@ func (w *WAL) sync() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *WAL) Sync() error {
|
||||
return w.sync()
|
||||
}
|
||||
|
||||
// ReleaseLockTo releases the locks, which has smaller index than the given index
|
||||
// except the largest one among them.
|
||||
// For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release
|
||||
|
@ -1000,3 +1000,55 @@ func TestReadAllFail(t *testing.T) {
|
||||
t.Fatalf("err = %v, want ErrDecoderNotFound", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestValidSnapshotEntries ensures ValidSnapshotEntries returns all valid wal snapshot entries, accounting
|
||||
// for hardstate
|
||||
func TestValidSnapshotEntries(t *testing.T) {
|
||||
p, err := ioutil.TempDir(os.TempDir(), "waltest")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(p)
|
||||
snap0 := walpb.Snapshot{Index: 0, Term: 0}
|
||||
snap1 := walpb.Snapshot{Index: 1, Term: 1}
|
||||
state1 := raftpb.HardState{Commit: 1, Term: 1}
|
||||
snap2 := walpb.Snapshot{Index: 2, Term: 1}
|
||||
snap3 := walpb.Snapshot{Index: 3, Term: 2}
|
||||
state2 := raftpb.HardState{Commit: 3, Term: 2}
|
||||
snap4 := walpb.Snapshot{Index: 4, Term: 2} // will be orphaned since the last committed entry will be snap3
|
||||
func() {
|
||||
w, err := Create(zap.NewExample(), p, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
// snap0 is implicitly created at index 0, term 0
|
||||
if err = w.SaveSnapshot(snap1); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.Save(state1, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.SaveSnapshot(snap2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.SaveSnapshot(snap3); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.Save(state2, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = w.SaveSnapshot(snap4); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
walSnaps, err := ValidSnapshotEntries(zap.NewExample(), p)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
expected := []walpb.Snapshot{snap0, snap1, snap2, snap3}
|
||||
if !reflect.DeepEqual(walSnaps, expected) {
|
||||
t.Errorf("expected walSnaps %+v, got %+v", expected, walSnaps)
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user