mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver,wal: fix inconsistencies in WAL and snapshot
etcdserver/*, wal/*: changes to snapshots and wal logic etcdserver/*: changes to snapshots and wal logic to fix #10219 etcdserver/*, wal/*: add Sync method etcdserver/*, wal/*: find valid snapshots by cross checking snap files and wal snap entries etcdserver/*, wal/*:Add comments, clean up error messages and tests etcdserver/*, wal/*: Remove orphaned .snap.db files during Release Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
This commit is contained in:
parent
e048e166ab
commit
87fc3c9e57
@ -22,16 +22,17 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/coreos/pkg/capnslog"
|
||||||
"go.etcd.io/etcd/etcdserver/api/snap/snappb"
|
"go.etcd.io/etcd/etcdserver/api/snap/snappb"
|
||||||
pioutil "go.etcd.io/etcd/pkg/ioutil"
|
pioutil "go.etcd.io/etcd/pkg/ioutil"
|
||||||
"go.etcd.io/etcd/pkg/pbutil"
|
"go.etcd.io/etcd/pkg/pbutil"
|
||||||
"go.etcd.io/etcd/raft"
|
"go.etcd.io/etcd/raft"
|
||||||
"go.etcd.io/etcd/raft/raftpb"
|
"go.etcd.io/etcd/raft/raftpb"
|
||||||
|
"go.etcd.io/etcd/wal/walpb"
|
||||||
"github.com/coreos/pkg/capnslog"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -108,21 +109,37 @@ func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Load returns the newest snapshot.
|
||||||
func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
|
func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
|
||||||
|
return s.loadMatching(func(*raftpb.Snapshot) bool { return true })
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadNewestAvailable loads the newest snapshot available that is in walSnaps.
|
||||||
|
func (s *Snapshotter) LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error) {
|
||||||
|
return s.loadMatching(func(snapshot *raftpb.Snapshot) bool {
|
||||||
|
m := snapshot.Metadata
|
||||||
|
for i := len(walSnaps) - 1; i >= 0; i-- {
|
||||||
|
if m.Term == walSnaps[i].Term && m.Index == walSnaps[i].Index {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadMatching returns the newest snapshot where matchFn returns true.
|
||||||
|
func (s *Snapshotter) loadMatching(matchFn func(*raftpb.Snapshot) bool) (*raftpb.Snapshot, error) {
|
||||||
names, err := s.snapNames()
|
names, err := s.snapNames()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var snap *raftpb.Snapshot
|
var snap *raftpb.Snapshot
|
||||||
for _, name := range names {
|
for _, name := range names {
|
||||||
if snap, err = loadSnap(s.lg, s.dir, name); err == nil {
|
if snap, err = loadSnap(s.lg, s.dir, name); err == nil && matchFn(snap) {
|
||||||
break
|
return snap, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
return nil, ErrNoSnapshot
|
||||||
return nil, ErrNoSnapshot
|
|
||||||
}
|
|
||||||
return snap, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func loadSnap(lg *zap.Logger, dir, name string) (*raftpb.Snapshot, error) {
|
func loadSnap(lg *zap.Logger, dir, name string) (*raftpb.Snapshot, error) {
|
||||||
@ -274,3 +291,31 @@ func (s *Snapshotter) cleanupSnapdir(filenames []string) error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Snapshotter) ReleaseSnapDBs(snap raftpb.Snapshot) error {
|
||||||
|
dir, err := os.Open(s.dir)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer dir.Close()
|
||||||
|
filenames, err := dir.Readdirnames(-1)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, filename := range filenames {
|
||||||
|
if strings.HasSuffix(filename, ".snap.db") {
|
||||||
|
hexIndex := strings.TrimSuffix(filepath.Base(filename), ".snap.db")
|
||||||
|
index, err := strconv.ParseUint(hexIndex, 16, 64)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to parse index from .snap.db filename '%s': %v", filename, err)
|
||||||
|
}
|
||||||
|
if index < snap.Metadata.Index {
|
||||||
|
s.lg.Info("found orphaned .snap.db file; deleting", zap.String("path", filename))
|
||||||
|
if rmErr := os.Remove(filepath.Join(s.dir, filename)); rmErr != nil && !os.IsNotExist(rmErr) {
|
||||||
|
return fmt.Errorf("failed to remove orphaned defragmentation file %s: %v", filename, rmErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -23,7 +23,10 @@ import (
|
|||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"go.etcd.io/etcd/pkg/fileutil"
|
||||||
|
|
||||||
"go.etcd.io/etcd/raft/raftpb"
|
"go.etcd.io/etcd/raft/raftpb"
|
||||||
|
"go.etcd.io/etcd/wal/walpb"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -166,12 +169,47 @@ func TestLoadNewestSnap(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
g, err := ss.Load()
|
cases := []struct {
|
||||||
if err != nil {
|
name string
|
||||||
t.Errorf("err = %v, want nil", err)
|
availableWalSnaps []walpb.Snapshot
|
||||||
|
expected *raftpb.Snapshot
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "load-newest",
|
||||||
|
expected: &newSnap,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "loadnewestavailable-newest",
|
||||||
|
availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}, {Index: 5, Term: 1}},
|
||||||
|
expected: &newSnap,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "loadnewestavailable-newest-unsorted",
|
||||||
|
availableWalSnaps: []walpb.Snapshot{{Index: 5, Term: 1}, {Index: 1, Term: 1}, {Index: 0, Term: 0}},
|
||||||
|
expected: &newSnap,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "loadnewestavailable-previous",
|
||||||
|
availableWalSnaps: []walpb.Snapshot{{Index: 0, Term: 0}, {Index: 1, Term: 1}},
|
||||||
|
expected: testSnap,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(g, &newSnap) {
|
for _, tc := range cases {
|
||||||
t.Errorf("snap = %#v, want %#v", g, &newSnap)
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
var err error
|
||||||
|
var g *raftpb.Snapshot
|
||||||
|
if tc.availableWalSnaps != nil {
|
||||||
|
g, err = ss.LoadNewestAvailable(tc.availableWalSnaps)
|
||||||
|
} else {
|
||||||
|
g, err = ss.Load()
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("err = %v, want nil", err)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(g, tc.expected) {
|
||||||
|
t.Errorf("snap = %#v, want %#v", g, tc.expected)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -229,3 +267,42 @@ func TestAllSnapshotBroken(t *testing.T) {
|
|||||||
t.Errorf("err = %v, want %v", err, ErrNoSnapshot)
|
t.Errorf("err = %v, want %v", err, ErrNoSnapshot)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReleaseSnapDBs(t *testing.T) {
|
||||||
|
dir := filepath.Join(os.TempDir(), "snapshot")
|
||||||
|
err := os.Mkdir(dir, 0700)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
|
snapIndices := []uint64{100, 200, 300, 400}
|
||||||
|
for _, index := range snapIndices {
|
||||||
|
filename := filepath.Join(dir, fmt.Sprintf("%016x.snap.db", index))
|
||||||
|
if err := ioutil.WriteFile(filename, []byte("snap file\n"), 0644); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ss := New(zap.NewExample(), dir)
|
||||||
|
|
||||||
|
if err := ss.ReleaseSnapDBs(raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 300}}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
deleted := []uint64{100, 200}
|
||||||
|
for _, index := range deleted {
|
||||||
|
filename := filepath.Join(dir, fmt.Sprintf("%016x.snap.db", index))
|
||||||
|
if fileutil.Exist(filename) {
|
||||||
|
t.Errorf("expected %s (index: %d) to be deleted, but it still exists", filename, index)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
retained := []uint64{300, 400}
|
||||||
|
for _, index := range retained {
|
||||||
|
filename := filepath.Join(dir, fmt.Sprintf("%016x.snap.db", index))
|
||||||
|
if !fileutil.Exist(filename) {
|
||||||
|
t.Errorf("expected %s (index: %d) to be retained, but it no longer exists", filename, index)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -231,12 +231,26 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
|||||||
r.transport.Send(r.processMessages(rd.Messages))
|
r.transport.Send(r.processMessages(rd.Messages))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
|
||||||
|
// ensure that recovery after a snapshot restore is possible.
|
||||||
|
if !raft.IsEmptySnap(rd.Snapshot) {
|
||||||
|
// gofail: var raftBeforeSaveSnap struct{}
|
||||||
|
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
|
||||||
|
if r.lg != nil {
|
||||||
|
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
|
||||||
|
} else {
|
||||||
|
plog.Fatalf("failed to save Raft snapshot %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// gofail: var raftAfterSaveSnap struct{}
|
||||||
|
}
|
||||||
|
|
||||||
// gofail: var raftBeforeSave struct{}
|
// gofail: var raftBeforeSave struct{}
|
||||||
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
|
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
|
||||||
if r.lg != nil {
|
if r.lg != nil {
|
||||||
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
|
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
|
||||||
} else {
|
} else {
|
||||||
plog.Fatalf("raft save state and entries error: %v", err)
|
plog.Fatalf("failed to save state and entries error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !raft.IsEmptyHardState(rd.HardState) {
|
if !raft.IsEmptyHardState(rd.HardState) {
|
||||||
@ -245,18 +259,22 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
|||||||
// gofail: var raftAfterSave struct{}
|
// gofail: var raftAfterSave struct{}
|
||||||
|
|
||||||
if !raft.IsEmptySnap(rd.Snapshot) {
|
if !raft.IsEmptySnap(rd.Snapshot) {
|
||||||
// gofail: var raftBeforeSaveSnap struct{}
|
// Force WAL to fsync its hard state before Release() releases
|
||||||
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
|
// old data from the WAL. Otherwise could get an error like:
|
||||||
|
// panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
|
||||||
|
// See https://github.com/etcd-io/etcd/issues/10219 for more details.
|
||||||
|
if err := r.storage.Sync(); err != nil {
|
||||||
if r.lg != nil {
|
if r.lg != nil {
|
||||||
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
|
r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
|
||||||
} else {
|
} else {
|
||||||
plog.Fatalf("raft save snapshot error: %v", err)
|
plog.Fatalf("failed to sync Raft snapshot %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// etcdserver now claim the snapshot has been persisted onto the disk
|
// etcdserver now claim the snapshot has been persisted onto the disk
|
||||||
notifyc <- struct{}{}
|
notifyc <- struct{}{}
|
||||||
|
|
||||||
// gofail: var raftAfterSaveSnap struct{}
|
// gofail: var raftBeforeApplySnap struct{}
|
||||||
r.raftStorage.ApplySnapshot(rd.Snapshot)
|
r.raftStorage.ApplySnapshot(rd.Snapshot)
|
||||||
if r.lg != nil {
|
if r.lg != nil {
|
||||||
r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
|
r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
|
||||||
@ -264,6 +282,15 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
|||||||
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
|
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
|
||||||
}
|
}
|
||||||
// gofail: var raftAfterApplySnap struct{}
|
// gofail: var raftAfterApplySnap struct{}
|
||||||
|
|
||||||
|
if err := r.storage.Release(rd.Snapshot); err != nil {
|
||||||
|
if r.lg != nil {
|
||||||
|
r.lg.Fatal("failed to release Raft wal", zap.Error(err))
|
||||||
|
} else {
|
||||||
|
plog.Fatalf("failed to release Raft wal %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// gofail: var raftAfterWALRelease struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
r.raftStorage.Append(rd.Entries)
|
r.raftStorage.Append(rd.Entries)
|
||||||
|
@ -29,6 +29,10 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/coreos/go-semver/semver"
|
||||||
|
"github.com/coreos/pkg/capnslog"
|
||||||
|
humanize "github.com/dustin/go-humanize"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"go.etcd.io/etcd/auth"
|
"go.etcd.io/etcd/auth"
|
||||||
"go.etcd.io/etcd/etcdserver/api"
|
"go.etcd.io/etcd/etcdserver/api"
|
||||||
"go.etcd.io/etcd/etcdserver/api/membership"
|
"go.etcd.io/etcd/etcdserver/api/membership"
|
||||||
@ -57,11 +61,6 @@ import (
|
|||||||
"go.etcd.io/etcd/raft/raftpb"
|
"go.etcd.io/etcd/raft/raftpb"
|
||||||
"go.etcd.io/etcd/version"
|
"go.etcd.io/etcd/version"
|
||||||
"go.etcd.io/etcd/wal"
|
"go.etcd.io/etcd/wal"
|
||||||
|
|
||||||
"github.com/coreos/go-semver/semver"
|
|
||||||
"github.com/coreos/pkg/capnslog"
|
|
||||||
humanize "github.com/dustin/go-humanize"
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -426,10 +425,19 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
|
plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
snapshot, err = ss.Load()
|
|
||||||
|
// Find a snapshot to start/restart a raft node
|
||||||
|
walSnaps, serr := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
|
||||||
|
if serr != nil {
|
||||||
|
return nil, serr
|
||||||
|
}
|
||||||
|
// snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
|
||||||
|
// wal log entries
|
||||||
|
snapshot, err = ss.LoadNewestAvailable(walSnaps)
|
||||||
if err != nil && err != snap.ErrNoSnapshot {
|
if err != nil && err != snap.ErrNoSnapshot {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if snapshot != nil {
|
if snapshot != nil {
|
||||||
if err = st.Recovery(snapshot.Data); err != nil {
|
if err = st.Recovery(snapshot.Data); err != nil {
|
||||||
if cfg.Logger != nil {
|
if cfg.Logger != nil {
|
||||||
@ -2370,8 +2378,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
|||||||
plog.Panicf("unexpected create snapshot error %v", err)
|
plog.Panicf("unexpected create snapshot error %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// SaveSnap saves the snapshot and releases the locked wal files
|
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
|
||||||
// to the snapshot index.
|
|
||||||
if err = s.r.storage.SaveSnap(snap); err != nil {
|
if err = s.r.storage.SaveSnap(snap); err != nil {
|
||||||
if lg != nil {
|
if lg != nil {
|
||||||
lg.Panic("failed to save snapshot", zap.Error(err))
|
lg.Panic("failed to save snapshot", zap.Error(err))
|
||||||
@ -2387,6 +2394,13 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
|||||||
} else {
|
} else {
|
||||||
plog.Infof("saved snapshot at index %d", snap.Metadata.Index)
|
plog.Infof("saved snapshot at index %d", snap.Metadata.Index)
|
||||||
}
|
}
|
||||||
|
if err = s.r.storage.Release(snap); err != nil {
|
||||||
|
if lg != nil {
|
||||||
|
lg.Panic("failed to release wal", zap.Error(err))
|
||||||
|
} else {
|
||||||
|
plog.Panicf("failed to release wal %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// When sending a snapshot, etcd will pause compaction.
|
// When sending a snapshot, etcd will pause compaction.
|
||||||
// After receives a snapshot, the slow follower needs to get all the entries right after
|
// After receives a snapshot, the slow follower needs to get all the entries right after
|
||||||
|
@ -990,15 +990,19 @@ func TestSnapshot(t *testing.T) {
|
|||||||
ch := make(chan struct{}, 2)
|
ch := make(chan struct{}, 2)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
gaction, _ := p.Wait(1)
|
gaction, _ := p.Wait(2)
|
||||||
defer func() { ch <- struct{}{} }()
|
defer func() { ch <- struct{}{} }()
|
||||||
|
|
||||||
if len(gaction) != 1 {
|
if len(gaction) != 2 {
|
||||||
t.Errorf("len(action) = %d, want 1", len(gaction))
|
t.Fatalf("len(action) = %d, want 2", len(gaction))
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) {
|
if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) {
|
||||||
t.Errorf("action = %s, want SaveSnap", gaction[0])
|
t.Errorf("action = %s, want SaveSnap", gaction[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "Release"}) {
|
||||||
|
t.Errorf("action = %s, want Release", gaction[1])
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
@ -1081,20 +1085,32 @@ func TestSnapshotOrdering(t *testing.T) {
|
|||||||
n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot}
|
n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
ac := <-p.Chan()
|
||||||
|
if ac.Name != "Save" {
|
||||||
|
t.Fatalf("expected Save, got %+v", ac)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ac := <-p.Chan(); ac.Name != "SaveSnap" {
|
||||||
|
t.Fatalf("expected SaveSnap, got %+v", ac)
|
||||||
|
}
|
||||||
|
|
||||||
if ac := <-p.Chan(); ac.Name != "Save" {
|
if ac := <-p.Chan(); ac.Name != "Save" {
|
||||||
t.Fatalf("expected Save, got %+v", ac)
|
t.Fatalf("expected Save, got %+v", ac)
|
||||||
}
|
}
|
||||||
if ac := <-p.Chan(); ac.Name != "Save" {
|
|
||||||
t.Fatalf("expected Save, got %+v", ac)
|
|
||||||
}
|
|
||||||
// confirm snapshot file still present before calling SaveSnap
|
// confirm snapshot file still present before calling SaveSnap
|
||||||
snapPath := filepath.Join(snapdir, fmt.Sprintf("%016x.snap.db", 1))
|
snapPath := filepath.Join(snapdir, fmt.Sprintf("%016x.snap.db", 1))
|
||||||
if !fileutil.Exist(snapPath) {
|
if !fileutil.Exist(snapPath) {
|
||||||
t.Fatalf("expected file %q, got missing", snapPath)
|
t.Fatalf("expected file %q, got missing", snapPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
// unblock SaveSnapshot, etcdserver now permitted to move snapshot file
|
// unblock SaveSnapshot, etcdserver now permitted to move snapshot file
|
||||||
if ac := <-p.Chan(); ac.Name != "SaveSnap" {
|
if ac := <-p.Chan(); ac.Name != "Sync" {
|
||||||
t.Fatalf("expected SaveSnap, got %+v", ac)
|
t.Fatalf("expected Sync, got %+v", ac)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ac := <-p.Chan(); ac.Name != "Release" {
|
||||||
|
t.Fatalf("expected Release, got %+v", ac)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1133,16 +1149,22 @@ func TestTriggerSnap(t *testing.T) {
|
|||||||
|
|
||||||
donec := make(chan struct{})
|
donec := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
wcnt := 2 + snapc
|
wcnt := 3 + snapc
|
||||||
gaction, _ := p.Wait(wcnt)
|
gaction, _ := p.Wait(wcnt)
|
||||||
|
|
||||||
// each operation is recorded as a Save
|
// each operation is recorded as a Save
|
||||||
// (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap
|
// (SnapshotCount+1) * Puts + SaveSnap = (SnapshotCount+1) * Save + SaveSnap + Release
|
||||||
if len(gaction) != wcnt {
|
if len(gaction) != wcnt {
|
||||||
t.Errorf("len(action) = %d, want %d", len(gaction), wcnt)
|
t.Logf("gaction: %v", gaction)
|
||||||
|
t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) {
|
|
||||||
t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1])
|
if !reflect.DeepEqual(gaction[wcnt-2], testutil.Action{Name: "SaveSnap"}) {
|
||||||
|
t.Errorf("action = %s, want SaveSnap", gaction[wcnt-2])
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "Release"}) {
|
||||||
|
t.Errorf("action = %s, want Release", gaction[wcnt-1])
|
||||||
}
|
}
|
||||||
close(donec)
|
close(donec)
|
||||||
}()
|
}()
|
||||||
|
@ -36,6 +36,10 @@ type Storage interface {
|
|||||||
SaveSnap(snap raftpb.Snapshot) error
|
SaveSnap(snap raftpb.Snapshot) error
|
||||||
// Close closes the Storage and performs finalization.
|
// Close closes the Storage and performs finalization.
|
||||||
Close() error
|
Close() error
|
||||||
|
// Release releases the locked wal files older than the provided snapshot.
|
||||||
|
Release(snap raftpb.Snapshot) error
|
||||||
|
// Sync WAL
|
||||||
|
Sync() error
|
||||||
}
|
}
|
||||||
|
|
||||||
type storage struct {
|
type storage struct {
|
||||||
@ -47,24 +51,37 @@ func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
|
|||||||
return &storage{w, s}
|
return &storage{w, s}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SaveSnap saves the snapshot to disk and release the locked
|
// SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry.
|
||||||
// wal files since they will not be used.
|
|
||||||
func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
|
func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
|
||||||
walsnap := walpb.Snapshot{
|
walsnap := walpb.Snapshot{
|
||||||
Index: snap.Metadata.Index,
|
Index: snap.Metadata.Index,
|
||||||
Term: snap.Metadata.Term,
|
Term: snap.Metadata.Term,
|
||||||
}
|
}
|
||||||
err := st.WAL.SaveSnapshot(walsnap)
|
// save the snapshot file before writing the snapshot to the wal.
|
||||||
|
// This makes it possible for the snapshot file to become orphaned, but prevents
|
||||||
|
// a WAL snapshot entry from having no corresponding snapshot file.
|
||||||
|
err := st.Snapshotter.SaveSnap(snap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = st.Snapshotter.SaveSnap(snap)
|
// gofail: var raftBeforeWALSaveSnaphot struct{}
|
||||||
if err != nil {
|
|
||||||
return err
|
return st.WAL.SaveSnapshot(walsnap)
|
||||||
}
|
|
||||||
return st.WAL.ReleaseLockTo(snap.Metadata.Index)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Release releases resources older than the given snap and are no longer needed:
|
||||||
|
// - releases the locks to the wal files that are older than the provided wal for the given snap.
|
||||||
|
// - deletes any .snap.db files that are older than the given snap.
|
||||||
|
func (st *storage) Release(snap raftpb.Snapshot) error {
|
||||||
|
if err := st.WAL.ReleaseLockTo(snap.Metadata.Index); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return st.Snapshotter.ReleaseSnapDBs(snap)
|
||||||
|
}
|
||||||
|
|
||||||
|
// readWAL reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
|
||||||
|
// after the position of the given snap in the WAL.
|
||||||
|
// The snap must have been previously saved to the WAL, or this call will panic.
|
||||||
func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
|
func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
|
@ -45,4 +45,16 @@ func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *storageRecorder) Release(st raftpb.Snapshot) error {
|
||||||
|
if !raft.IsEmptySnap(st) {
|
||||||
|
p.Record(testutil.Action{Name: "Release"})
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *storageRecorder) Sync() error {
|
||||||
|
p.Record(testutil.Action{Name: "Sync"})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (p *storageRecorder) Close() error { return nil }
|
func (p *storageRecorder) Close() error { return nil }
|
||||||
|
61
wal/wal.go
61
wal/wal.go
@ -532,6 +532,63 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
|||||||
return metadata, state, ents, err
|
return metadata, state, ents, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ValidSnapshotEntries returns all the valid snapshot entries in the wal logs in the given directory.
|
||||||
|
// Snapshot entries are valid if their index is less than or equal to the most recent committed hardstate.
|
||||||
|
func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, error) {
|
||||||
|
var snaps []walpb.Snapshot
|
||||||
|
var state raftpb.HardState
|
||||||
|
var err error
|
||||||
|
|
||||||
|
rec := &walpb.Record{}
|
||||||
|
names, err := readWALNames(lg, walDir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// open wal files in read mode, so that there is no conflict
|
||||||
|
// when the same WAL is opened elsewhere in write mode
|
||||||
|
rs, _, closer, err := openWALFiles(lg, walDir, names, 0, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if closer != nil {
|
||||||
|
closer()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// create a new decoder from the readers on the WAL files
|
||||||
|
decoder := newDecoder(rs...)
|
||||||
|
|
||||||
|
for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
|
||||||
|
switch rec.Type {
|
||||||
|
case snapshotType:
|
||||||
|
var loadedSnap walpb.Snapshot
|
||||||
|
pbutil.MustUnmarshal(&loadedSnap, rec.Data)
|
||||||
|
snaps = append(snaps, loadedSnap)
|
||||||
|
case stateType:
|
||||||
|
state = mustUnmarshalState(rec.Data)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// We do not have to read out all the WAL entries
|
||||||
|
// as the decoder is opened in read mode.
|
||||||
|
if err != io.EOF && err != io.ErrUnexpectedEOF {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// filter out any snaps that are newer than the committed hardstate
|
||||||
|
n := 0
|
||||||
|
for _, s := range snaps {
|
||||||
|
if s.Index <= state.Commit {
|
||||||
|
snaps[n] = s
|
||||||
|
n++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
snaps = snaps[:n:n]
|
||||||
|
|
||||||
|
return snaps, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Verify reads through the given WAL and verifies that it is not corrupted.
|
// Verify reads through the given WAL and verifies that it is not corrupted.
|
||||||
// It creates a new decoder to read through the records of the given WAL.
|
// It creates a new decoder to read through the records of the given WAL.
|
||||||
// It does not conflict with any open WAL, but it is recommended not to
|
// It does not conflict with any open WAL, but it is recommended not to
|
||||||
@ -728,6 +785,10 @@ func (w *WAL) sync() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *WAL) Sync() error {
|
||||||
|
return w.sync()
|
||||||
|
}
|
||||||
|
|
||||||
// ReleaseLockTo releases the locks, which has smaller index than the given index
|
// ReleaseLockTo releases the locks, which has smaller index than the given index
|
||||||
// except the largest one among them.
|
// except the largest one among them.
|
||||||
// For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release
|
// For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release
|
||||||
|
@ -950,3 +950,56 @@ func TestRenameFail(t *testing.T) {
|
|||||||
t.Fatalf("expected error, got %v", werr)
|
t.Fatalf("expected error, got %v", werr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestValidSnapshotEntries ensures ValidSnapshotEntries returns all valid wal snapshot entries, accounting
|
||||||
|
// for hardstate
|
||||||
|
func TestValidSnapshotEntries(t *testing.T) {
|
||||||
|
p, err := ioutil.TempDir(os.TempDir(), "waltest")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(p)
|
||||||
|
snap0 := walpb.Snapshot{Index: 0, Term: 0}
|
||||||
|
snap1 := walpb.Snapshot{Index: 1, Term: 1}
|
||||||
|
state1 := raftpb.HardState{Commit: 1, Term: 1}
|
||||||
|
snap2 := walpb.Snapshot{Index: 2, Term: 1}
|
||||||
|
snap3 := walpb.Snapshot{Index: 3, Term: 2}
|
||||||
|
state2 := raftpb.HardState{Commit: 3, Term: 2}
|
||||||
|
snap4 := walpb.Snapshot{Index: 4, Term: 2} // will be orphaned since the last committed entry will be snap3
|
||||||
|
func() {
|
||||||
|
var w *WAL
|
||||||
|
w, err = Create(zap.NewExample(), p, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer w.Close()
|
||||||
|
|
||||||
|
// snap0 is implicitly created at index 0, term 0
|
||||||
|
if err = w.SaveSnapshot(snap1); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err = w.Save(state1, nil); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err = w.SaveSnapshot(snap2); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err = w.SaveSnapshot(snap3); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err = w.Save(state2, nil); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err = w.SaveSnapshot(snap4); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
walSnaps, serr := ValidSnapshotEntries(zap.NewExample(), p)
|
||||||
|
if serr != nil {
|
||||||
|
t.Fatal(serr)
|
||||||
|
}
|
||||||
|
expected := []walpb.Snapshot{snap0, snap1, snap2, snap3}
|
||||||
|
if !reflect.DeepEqual(walSnaps, expected) {
|
||||||
|
t.Errorf("expected walSnaps %+v, got %+v", expected, walSnaps)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user