diff --git a/wal/wal.go b/wal/wal.go index e63c7d752..553df7300 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -284,31 +284,28 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. } // cut closes current file written and creates a new one ready to append. +// cut first creates a temp wal file and writes necessary headers into it. +// Then cut atomtically rename temp wal file to a wal file. func (w *WAL) cut() error { - // create a new wal file with name sequence + 1 + // close old wal file + if err := w.sync(); err != nil { + return err + } + if err := w.f.Close(); err != nil { + return err + } + fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1)) - f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) + ftpath := fpath + ".tmp" + + // create a temp wal file with name sequence + 1 + ft, err := os.OpenFile(ftpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) if err != nil { return err } - log.Printf("wal: segmented wal file %v is created", fpath) - l, err := fileutil.NewLock(f.Name()) - if err != nil { - return err - } - err = l.Lock() - if err != nil { - return err - } - w.locks = append(w.locks, l) - if err = w.sync(); err != nil { - return err - } - w.f.Close() // update writer and save the previous crc - w.f = f - w.seq++ + w.f = ft prevCrc := w.encoder.crc.Sum32() w.encoder = newEncoder(w.f, prevCrc) if err := w.saveCrc(prevCrc); err != nil { @@ -320,7 +317,45 @@ func (w *WAL) cut() error { if err := w.saveState(&w.state); err != nil { return err } - return w.sync() + // close temp wal file + if err := w.sync(); err != nil { + return err + } + if err := w.f.Close(); err != nil { + return err + } + + // atomically move temp wal file to wal file + if err := os.Rename(ftpath, fpath); err != nil { + return err + } + + // open the wal file and update writer again + f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND, 0600) + if err != nil { + return err + } + w.f = f + prevCrc = w.encoder.crc.Sum32() + w.encoder = newEncoder(w.f, prevCrc) + + // lock the new wal file + l, err := fileutil.NewLock(f.Name()) + if err != nil { + return err + } + err = l.Lock() + if err != nil { + return err + } + w.locks = append(w.locks, l) + + // increase the wal seq + w.seq++ + + log.Printf("wal: segmented wal file %v is created", fpath) + + return nil } func (w *WAL) sync() error {