From f59da0e453074da4b15bae6e49fe362bc55515b7 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 12 Jun 2015 16:06:02 -0700 Subject: [PATCH] *:fix point-in-time backup Backup process should be able to read all WALs until io.EOF to generate a point-in-time backup. Our WAL file is append-only. And the backup process will lock all files before start reading, which can prevent the gc routine from removing any files in the middle. --- etcdctl/command/backup_command.go | 2 +- wal/wal.go | 88 +++++++++++++++++++------------ wal/wal_test.go | 17 +++--- 3 files changed, 64 insertions(+), 43 deletions(-) diff --git a/etcdctl/command/backup_command.go b/etcdctl/command/backup_command.go index 7431596f1..a745673e1 100644 --- a/etcdctl/command/backup_command.go +++ b/etcdctl/command/backup_command.go @@ -66,7 +66,7 @@ func handleBackup(c *cli.Context) { } } - w, err := wal.OpenNotInUse(srcWAL, walsnap) + w, err := wal.OpenForRead(srcWAL, walsnap) if err != nil { log.Fatal(err) } diff --git a/wal/wal.go b/wal/wal.go index 607565423..fe853fca2 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -137,13 +137,13 @@ func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) { return openAtIndex(dirpath, snap, true) } -// OpenNotInUse only opens the wal files that are not in use. -// Other than that, it is similar to Open. -func OpenNotInUse(dirpath string, snap walpb.Snapshot) (*WAL, error) { +// OpenForRead only opens the wal files for read. +// Write on a read only wal panics. +func OpenForRead(dirpath string, snap walpb.Snapshot) (*WAL, error) { return openAtIndex(dirpath, snap, false) } -func openAtIndex(dirpath string, snap walpb.Snapshot, all bool) (*WAL, error) { +func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) { names, err := fileutil.ReadDir(dirpath) if err != nil { return nil, err @@ -172,11 +172,8 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, all bool) (*WAL, error) { } err = l.TryLock() if err != nil { - if all { + if write { return nil, err - } else { - plog.Warningf("opened all the files until %s, since it is still in use by an etcd server", name) - break } } rcs = append(rcs, f) @@ -184,33 +181,40 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, all bool) (*WAL, error) { } rc := MultiReadCloser(rcs...) - // open the lastest wal file for appending - seq, _, err := parseWalName(names[len(names)-1]) - if err != nil { - rc.Close() - return nil, err - } - last := path.Join(dirpath, names[len(names)-1]) - f, err := os.OpenFile(last, os.O_WRONLY|os.O_APPEND, 0) - if err != nil { - rc.Close() - return nil, err - } - // create a WAL ready for reading w := &WAL{ dir: dirpath, start: snap, decoder: newDecoder(rc), - - f: f, - seq: seq, - locks: ls, + locks: ls, } + + if write { + // open the lastest wal file for appending + seq, _, err := parseWalName(names[len(names)-1]) + if err != nil { + rc.Close() + return nil, err + } + last := path.Join(dirpath, names[len(names)-1]) + + f, err := os.OpenFile(last, os.O_WRONLY|os.O_APPEND, 0) + if err != nil { + rc.Close() + return nil, err + } + + w.f = f + w.seq = seq + } + return w, nil } -// ReadAll reads out all records of the current WAL. +// ReadAll reads out records of the current WAL. +// If opened in write mode, it must read out all records until EOF. Or an error +// will be returned. +// If opened in read mode, it will try to read all records if possible. // If it cannot read out the expected snap, it will return ErrSnapshotNotFound. // If loaded snap doesn't match with the expected one, it will return // all the records and error ErrSnapshotMismatch. @@ -265,10 +269,24 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type) } } - if err != io.EOF { - state.Reset() - return nil, state, nil, err + + switch w.f { + case nil: + // We do not have to read out all entries in read mode. + // The last record maybe a partial written one, so + // ErrunexpectedEOF might be returned. + if err != io.EOF && err != io.ErrUnexpectedEOF { + state.Reset() + return nil, state, nil, err + } + default: + // We must read all of the entries if WAL is opened in write mode. + if err != io.EOF { + state.Reset() + return nil, state, nil, err + } } + err = nil if !match { err = ErrSnapshotNotFound @@ -279,10 +297,14 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. w.start = walpb.Snapshot{} w.metadata = metadata - // create encoder (chain crc with the decoder), enable appending - w.encoder = newEncoder(w.f, w.decoder.lastCRC()) - w.decoder = nil - lastIndexSaved.Set(float64(w.enti)) + + if w.f != nil { + // create encoder (chain crc with the decoder), enable appending + w.encoder = newEncoder(w.f, w.decoder.lastCRC()) + w.decoder = nil + lastIndexSaved.Set(float64(w.enti)) + } + return metadata, state, ents, err } diff --git a/wal/wal_test.go b/wal/wal_test.go index 045f96051..50a4d5ba1 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -404,12 +404,11 @@ func TestOpenAtUncommittedIndex(t *testing.T) { w.Close() } -// TestOpenNotInUse tests that OpenNotInUse can load all files that are -// not in use at that point. +// TestOpenForRead tests that OpenForRead can load all files. // The tests creates WAL directory, and cut out multiple WAL files. Then -// it releases the lock of part of data, and excepts that OpenNotInUse -// can read out all unlocked data. -func TestOpenNotInUse(t *testing.T) { +// it releases the lock of part of data, and excepts that OpenForRead +// can read out all files even if some are locked for write. +func TestOpenForRead(t *testing.T) { p, err := ioutil.TempDir(os.TempDir(), "waltest") if err != nil { t.Fatal(err) @@ -435,8 +434,8 @@ func TestOpenNotInUse(t *testing.T) { unlockIndex := uint64(5) w.ReleaseLockTo(unlockIndex) - // 1,2,3 are avaliable. - w2, err := OpenNotInUse(p, walpb.Snapshot{}) + // All are avaliable for read + w2, err := OpenForRead(p, walpb.Snapshot{}) defer w2.Close() if err != nil { t.Fatal(err) @@ -445,8 +444,8 @@ func TestOpenNotInUse(t *testing.T) { if err != nil { t.Fatalf("err = %v, want nil", err) } - if g := ents[len(ents)-1].Index; g != unlockIndex-2 { - t.Errorf("last index read = %d, want %d", g, unlockIndex-2) + if g := ents[len(ents)-1].Index; g != 9 { + t.Errorf("last index read = %d, want %d", g, 9) } }