diff --git a/etcdserver/raft.go b/etcdserver/raft.go index b87ceea54..dcb894f82 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -83,7 +83,8 @@ type RaftTimer interface { type apply struct { entries []raftpb.Entry snapshot raftpb.Snapshot - raftDone <-chan struct{} // rx {} after raft has persisted messages + // notifyc synchronizes etcd server applies with the raft node + notifyc chan struct{} } type raftNode struct { @@ -190,11 +191,11 @@ func (r *raftNode) start(rh *raftReadyHandler) { } } - raftDone := make(chan struct{}, 1) + notifyc := make(chan struct{}, 1) ap := apply{ entries: rd.CommittedEntries, snapshot: rd.Snapshot, - raftDone: raftDone, + notifyc: notifyc, } updateCommittedIndex(&ap, rh) @@ -227,6 +228,9 @@ func (r *raftNode) start(rh *raftReadyHandler) { if err := r.storage.SaveSnap(rd.Snapshot); err != nil { plog.Fatalf("raft save snapshot error: %v", err) } + // etcdserver now claim the snapshot has been persisted onto the disk + notifyc <- struct{}{} + // gofail: var raftAfterSaveSnap struct{} r.raftStorage.ApplySnapshot(rd.Snapshot) plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index) @@ -240,7 +244,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { msgs := r.processMessages(rd.Messages) // now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots - raftDone <- struct{}{} + notifyc <- struct{}{} // Candidate or follower needs to wait for all pending configuration // changes to be applied before sending messages. @@ -259,9 +263,9 @@ func (r *raftNode) start(rh *raftReadyHandler) { if waitApply { // blocks until 'applyAll' calls 'applyWait.Trigger' // to be in sync with scheduled config-change job - // (assume raftDone has cap of 1) + // (assume notifyc has cap of 1) select { - case raftDone <- struct{}{}: + case notifyc <- struct{}{}: case <-r.stopped: return } @@ -271,7 +275,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { r.transport.Send(msgs) } else { // leader already processed 'MsgSnap' and signaled - raftDone <- struct{}{} + notifyc <- struct{}{} } r.Advance() diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index 3396e60f0..757826cc9 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -211,7 +211,7 @@ func TestConfgChangeBlocksApply(t *testing.T) { } // finish apply, unblock raft routine - <-ap.raftDone + <-ap.notifyc select { case <-continueC: diff --git a/etcdserver/server.go b/etcdserver/server.go index 33430276b..1c2a95c05 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -274,20 +274,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { bepath := filepath.Join(cfg.SnapDir(), databaseFilename) beExist := fileutil.Exist(bepath) - var be backend.Backend - beOpened := make(chan struct{}) - go func() { - be = newBackend(bepath, cfg.QuotaBackendBytes) - beOpened <- struct{}{} - }() - - select { - case <-beOpened: - case <-time.After(time.Second): - plog.Warningf("another etcd process is running with the same data dir and holding the file lock.") - plog.Warningf("waiting for it to exit before starting...") - <-beOpened - } + be := openBackend(bepath, cfg.QuotaBackendBytes) defer func() { if err != nil { @@ -385,6 +372,11 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { plog.Panicf("recovered store from snapshot error: %v", err) } plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index) + + be, err = checkAndRecoverDB(snapshot, be, cfg.QuotaBackendBytes, cfg.SnapDir()) + if err != nil { + plog.Panicf("recovering backend from snapshot error: %v", err) + } } cfg.Print() if !cfg.ForceNewCluster { @@ -778,7 +770,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { // wait for the raft routine to finish the disk writes before triggering a // snapshot. or applied index might be greater than the last index in raft // storage, since the raft routine might be slower than apply routine. - <-apply.raftDone + <-apply.notifyc s.triggerSnapshot(ep) select { @@ -803,6 +795,9 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { apply.snapshot.Metadata.Index, ep.appliedi) } + // wait for raftNode to persist snashot onto the disk + <-apply.notifyc + snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index) if err != nil { plog.Panicf("get database snapshot file path error: %v", err) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 55f32d522..0f2b0edf7 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -951,6 +951,89 @@ func TestSnapshot(t *testing.T) { <-ch } +// TestSnapshotOrdering ensures raft persists snapshot onto disk before +// snapshot db is applied. +func TestSnapshotOrdering(t *testing.T) { + n := newNopReadyNode() + st := store.New() + cl := membership.NewCluster("abc") + cl.SetStore(st) + + testdir, err := ioutil.TempDir(os.TempDir(), "testsnapdir") + if err != nil { + t.Fatalf("couldn't open tempdir (%v)", err) + } + defer os.RemoveAll(testdir) + if err := os.MkdirAll(testdir+"/member/snap", 0755); err != nil { + t.Fatalf("couldn't make snap dir (%v)", err) + } + + rs := raft.NewMemoryStorage() + p := mockstorage.NewStorageRecorderStream(testdir) + tr, snapDoneC := rafthttp.NewSnapTransporter(testdir) + r := newRaftNode(raftNodeConfig{ + isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, + Node: n, + transport: tr, + storage: p, + raftStorage: rs, + }) + s := &EtcdServer{ + Cfg: &ServerConfig{ + DataDir: testdir, + }, + r: *r, + store: st, + cluster: cl, + SyncTicker: &time.Ticker{}, + } + s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster} + + be, tmpPath := backend.NewDefaultTmpBackend() + defer os.RemoveAll(tmpPath) + s.kv = mvcc.New(be, &lease.FakeLessor{}, &s.consistIndex) + s.be = be + + s.start() + defer s.Stop() + + actionc := p.Chan() + n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}} + if ac := <-actionc; ac.Name != "Save" { + // MsgSnap triggers raftNode to call Save() + t.Fatalf("expect save() is called, but got %v", ac.Name) + } + + // get the snapshot sent by the transport + snapMsg := <-snapDoneC + + // Snapshot first triggers raftnode to persists the snapshot onto disk + // before renaming db snapshot file to db + snapMsg.Snapshot.Metadata.Index = 1 + n.readyc <- raft.Ready{Snapshot: snapMsg.Snapshot} + var seenSaveSnap bool + timer := time.After(5 * time.Second) + for { + select { + case ac := <-actionc: + switch ac.Name { + // DBFilePath() is called immediately before snapshot renaming. + case "DBFilePath": + if !seenSaveSnap { + t.Fatalf("DBFilePath called before SaveSnap") + } + return + case "SaveSnap": + seenSaveSnap = true + default: + continue + } + case <-timer: + t.Fatalf("timeout waiting on actions") + } + } +} + // Applied > SnapCount should trigger a SaveSnap event func TestTriggerSnap(t *testing.T) { be, tmpPath := backend.NewDefaultTmpBackend() diff --git a/etcdserver/util.go b/etcdserver/util.go index e3896ffc2..de7ef179a 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -15,11 +15,18 @@ package etcdserver import ( + "fmt" + "os" "time" "github.com/coreos/etcd/etcdserver/membership" + "github.com/coreos/etcd/lease" + "github.com/coreos/etcd/mvcc" + "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/rafthttp" + "github.com/coreos/etcd/snap" ) // isConnectedToQuorumSince checks whether the local member is connected to the @@ -95,3 +102,55 @@ func (nc *notifier) notify(err error) { nc.err = err close(nc.c) } + +// checkAndRecoverDB attempts to recover db in the scenario when +// etcd server crashes before updating its in-state db +// and after persisting snapshot to disk from syncing with leader, +// snapshot can be newer than db where +// (snapshot.Metadata.Index > db.consistentIndex ). +// +// when that happen: +// 1. find xxx.snap.db that matches snap index. +// 2. rename xxx.snap.db to db. +// 3. open the new db as the backend. +func checkAndRecoverDB(snapshot *raftpb.Snapshot, oldbe backend.Backend, quotaBackendBytes int64, snapdir string) (be backend.Backend, err error) { + var cIndex consistentIndex + kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex) + defer kv.Close() + kvindex := kv.ConsistentIndex() + if snapshot.Metadata.Index <= kvindex { + return oldbe, nil + } + + id := snapshot.Metadata.Index + snapfn, err := snap.DBFilePathFromID(snapdir, id) + if err != nil { + return nil, fmt.Errorf("finding %v error: %v", snapdir+fmt.Sprintf("%016x.snap.db", id), err) + } + + bepath := snapdir + databaseFilename + if err := os.Rename(snapfn, bepath); err != nil { + return nil, fmt.Errorf("rename snapshot file error: %v", err) + } + + oldbe.Close() + be = openBackend(bepath, quotaBackendBytes) + return be, nil +} + +func openBackend(bepath string, quotaBackendBytes int64) (be backend.Backend) { + beOpened := make(chan struct{}) + go func() { + be = newBackend(bepath, quotaBackendBytes) + beOpened <- struct{}{} + }() + + select { + case <-beOpened: + case <-time.After(time.Second): + plog.Warningf("another etcd process is running with the same data dir and holding the file lock.") + plog.Warningf("waiting for it to exit before starting...") + <-beOpened + } + return be +} diff --git a/snap/db.go b/snap/db.go index ae3c743f8..77d109141 100644 --- a/snap/db.go +++ b/snap/db.go @@ -15,6 +15,7 @@ package snap import ( + "errors" "fmt" "io" "io/ioutil" @@ -24,6 +25,8 @@ import ( "github.com/coreos/etcd/pkg/fileutil" ) +var ErrNoDBSnapshot = errors.New("snap: snapshot file doesn't exist") + // SaveDBFrom saves snapshot of the database from the given reader. It // guarantees the save operation is atomic. func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) { @@ -60,15 +63,19 @@ func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) (int64, error) { // DBFilePath returns the file path for the snapshot of the database with // given id. If the snapshot does not exist, it returns error. func (s *Snapshotter) DBFilePath(id uint64) (string, error) { - fns, err := fileutil.ReadDir(s.dir) + return DBFilePathFromID(s.dir, id) +} + +func DBFilePathFromID(dbPath string, id uint64) (string, error) { + fns, err := fileutil.ReadDir(dbPath) if err != nil { return "", err } wfn := fmt.Sprintf("%016x.snap.db", id) for _, fn := range fns { if fn == wfn { - return filepath.Join(s.dir, fn), nil + return filepath.Join(dbPath, fn), nil } } - return "", fmt.Errorf("snap: snapshot file doesn't exist") + return "", ErrNoDBSnapshot }