mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: renaming db happens after snapshot persists to wal and snap files
In the case that follower recieves a snapshot from leader and crashes before renaming xxx.snap.db to db but after snapshot has persisted to .wal and .snap, restarting follower results loading old db, new .wal, and new .snap. This will causes a index mismatch between snap metadata index and consistent index from db. This pr forces an ordering where saving/renaming db must happen after snapshot is persisted to wal and snap file. this guarantees wal and snap files are newer than db. on server restart, etcd server checks if snap index > db consistent index. if yes, etcd server attempts to load xxx.snap.db where xxx=snap index if there is any and panic other wise. FIXES #7628
This commit is contained in:
parent
505bf8c708
commit
8b7b7222dd
@ -83,7 +83,8 @@ type RaftTimer interface {
|
||||
type apply struct {
|
||||
entries []raftpb.Entry
|
||||
snapshot raftpb.Snapshot
|
||||
raftDone <-chan struct{} // rx {} after raft has persisted messages
|
||||
// notifyc synchronizes etcd server applies with the raft node
|
||||
notifyc chan struct{}
|
||||
}
|
||||
|
||||
type raftNode struct {
|
||||
@ -190,11 +191,11 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
}
|
||||
}
|
||||
|
||||
raftDone := make(chan struct{}, 1)
|
||||
notifyc := make(chan struct{}, 1)
|
||||
ap := apply{
|
||||
entries: rd.CommittedEntries,
|
||||
snapshot: rd.Snapshot,
|
||||
raftDone: raftDone,
|
||||
notifyc: notifyc,
|
||||
}
|
||||
|
||||
updateCommittedIndex(&ap, rh)
|
||||
@ -227,6 +228,9 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
|
||||
plog.Fatalf("raft save snapshot error: %v", err)
|
||||
}
|
||||
// etcdserver now claim the snapshot has been persisted onto the disk
|
||||
notifyc <- struct{}{}
|
||||
|
||||
// gofail: var raftAfterSaveSnap struct{}
|
||||
r.raftStorage.ApplySnapshot(rd.Snapshot)
|
||||
plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
|
||||
@ -240,7 +244,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
msgs := r.processMessages(rd.Messages)
|
||||
|
||||
// now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
|
||||
raftDone <- struct{}{}
|
||||
notifyc <- struct{}{}
|
||||
|
||||
// Candidate or follower needs to wait for all pending configuration
|
||||
// changes to be applied before sending messages.
|
||||
@ -259,9 +263,9 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
if waitApply {
|
||||
// blocks until 'applyAll' calls 'applyWait.Trigger'
|
||||
// to be in sync with scheduled config-change job
|
||||
// (assume raftDone has cap of 1)
|
||||
// (assume notifyc has cap of 1)
|
||||
select {
|
||||
case raftDone <- struct{}{}:
|
||||
case notifyc <- struct{}{}:
|
||||
case <-r.stopped:
|
||||
return
|
||||
}
|
||||
@ -271,7 +275,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
r.transport.Send(msgs)
|
||||
} else {
|
||||
// leader already processed 'MsgSnap' and signaled
|
||||
raftDone <- struct{}{}
|
||||
notifyc <- struct{}{}
|
||||
}
|
||||
|
||||
r.Advance()
|
||||
|
@ -274,20 +274,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
||||
bepath := filepath.Join(cfg.SnapDir(), databaseFilename)
|
||||
beExist := fileutil.Exist(bepath)
|
||||
|
||||
var be backend.Backend
|
||||
beOpened := make(chan struct{})
|
||||
go func() {
|
||||
be = newBackend(bepath, cfg.QuotaBackendBytes)
|
||||
beOpened <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-beOpened:
|
||||
case <-time.After(time.Second):
|
||||
plog.Warningf("another etcd process is running with the same data dir and holding the file lock.")
|
||||
plog.Warningf("waiting for it to exit before starting...")
|
||||
<-beOpened
|
||||
}
|
||||
be := openBackend(bepath, cfg.QuotaBackendBytes)
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
@ -385,6 +372,11 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
||||
plog.Panicf("recovered store from snapshot error: %v", err)
|
||||
}
|
||||
plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)
|
||||
|
||||
be, err = checkAndRecoverDB(snapshot, be, cfg.QuotaBackendBytes, cfg.SnapDir())
|
||||
if err != nil {
|
||||
plog.Panicf("recovering backend from snapshot error: %v", err)
|
||||
}
|
||||
}
|
||||
cfg.Print()
|
||||
if !cfg.ForceNewCluster {
|
||||
@ -778,7 +770,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
|
||||
// wait for the raft routine to finish the disk writes before triggering a
|
||||
// snapshot. or applied index might be greater than the last index in raft
|
||||
// storage, since the raft routine might be slower than apply routine.
|
||||
<-apply.raftDone
|
||||
<-apply.notifyc
|
||||
|
||||
s.triggerSnapshot(ep)
|
||||
select {
|
||||
@ -803,6 +795,9 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
apply.snapshot.Metadata.Index, ep.appliedi)
|
||||
}
|
||||
|
||||
// wait for raftNode to persist snashot onto the disk
|
||||
<-apply.notifyc
|
||||
|
||||
snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index)
|
||||
if err != nil {
|
||||
plog.Panicf("get database snapshot file path error: %v", err)
|
||||
|
@ -15,11 +15,18 @@
|
||||
package etcdserver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/membership"
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/mvcc"
|
||||
"github.com/coreos/etcd/mvcc/backend"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
"github.com/coreos/etcd/snap"
|
||||
)
|
||||
|
||||
// isConnectedToQuorumSince checks whether the local member is connected to the
|
||||
@ -95,3 +102,55 @@ func (nc *notifier) notify(err error) {
|
||||
nc.err = err
|
||||
close(nc.c)
|
||||
}
|
||||
|
||||
// checkAndRecoverDB attempts to recover db in the scenario when
|
||||
// etcd server crashes before updating its in-state db
|
||||
// and after persisting snapshot to disk from syncing with leader,
|
||||
// snapshot can be newer than db where
|
||||
// (snapshot.Metadata.Index > db.consistentIndex ).
|
||||
//
|
||||
// when that happen:
|
||||
// 1. find xxx.snap.db that matches snap index.
|
||||
// 2. rename xxx.snap.db to db.
|
||||
// 3. open the new db as the backend.
|
||||
func checkAndRecoverDB(snapshot *raftpb.Snapshot, oldbe backend.Backend, quotaBackendBytes int64, snapdir string) (be backend.Backend, err error) {
|
||||
var cIndex consistentIndex
|
||||
kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex)
|
||||
defer kv.Close()
|
||||
kvindex := kv.ConsistentIndex()
|
||||
if snapshot.Metadata.Index <= kvindex {
|
||||
return oldbe, nil
|
||||
}
|
||||
|
||||
id := snapshot.Metadata.Index
|
||||
snapfn, err := snap.DBFilePathFromID(snapdir, id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("finding %v error: %v", snapdir+fmt.Sprintf("%016x.snap.db", id), err)
|
||||
}
|
||||
|
||||
bepath := snapdir + databaseFilename
|
||||
if err := os.Rename(snapfn, bepath); err != nil {
|
||||
return nil, fmt.Errorf("rename snapshot file error: %v", err)
|
||||
}
|
||||
|
||||
oldbe.Close()
|
||||
be = openBackend(bepath, quotaBackendBytes)
|
||||
return be, nil
|
||||
}
|
||||
|
||||
func openBackend(bepath string, quotaBackendBytes int64) (be backend.Backend) {
|
||||
beOpened := make(chan struct{})
|
||||
go func() {
|
||||
be = newBackend(bepath, quotaBackendBytes)
|
||||
beOpened <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-beOpened:
|
||||
case <-time.After(time.Second):
|
||||
plog.Warningf("another etcd process is running with the same data dir and holding the file lock.")
|
||||
plog.Warningf("waiting for it to exit before starting...")
|
||||
<-beOpened
|
||||
}
|
||||
return be
|
||||
}
|
||||
|
13
snap/db.go
13
snap/db.go
@ -15,6 +15,7 @@
|
||||
package snap
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
@ -24,6 +25,8 @@ import (
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
)
|
||||
|
||||
var ErrNoDBSnapshot = errors.New("snap: snapshot file doesn't exist")
|
||||
|
||||
// SaveDBFrom saves snapshot of the database from the given reader. It
|
||||
// guarantees the save operation is atomic.
|
||||
func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
|
||||
@ -60,15 +63,19 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) {
|
||||
// DBFilePath returns the file path for the snapshot of the database with
|
||||
// given id. If the snapshot does not exist, it returns error.
|
||||
func (s *Snapshotter) DBFilePath(id uint64) (string, error) {
|
||||
fns, err := fileutil.ReadDir(s.dir)
|
||||
return DBFilePathFromID(s.dir, id)
|
||||
}
|
||||
|
||||
func DBFilePathFromID(dbPath string, id uint64) (string, error) {
|
||||
fns, err := fileutil.ReadDir(dbPath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
wfn := fmt.Sprintf("%016x.snap.db", id)
|
||||
for _, fn := range fns {
|
||||
if fn == wfn {
|
||||
return filepath.Join(s.dir, fn), nil
|
||||
return filepath.Join(dbPath, fn), nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("snap: snapshot file doesn't exist")
|
||||
return "", ErrNoDBSnapshot
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user