wal: never leave a corrupted wal file

If the process dies during wal.cut(), it might leave a corrupted wal
file. This commit solves the problem by creating a temp wal file first,
then atomically rename it to a wal file when we are sure it is vaild.
This commit is contained in:
Xiang Li 2015-04-08 15:50:24 -07:00
parent 2141308524
commit 53792ccbdc

View File

@ -284,31 +284,28 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
}
// cut closes current file written and creates a new one ready to append.
// cut first creates a temp wal file and writes necessary headers into it.
// Then cut atomtically rename temp wal file to a wal file.
func (w *WAL) cut() error {
// create a new wal file with name sequence + 1
// close old wal file
if err := w.sync(); err != nil {
return err
}
if err := w.f.Close(); err != nil {
return err
}
fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1))
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
ftpath := fpath + ".tmp"
// create a temp wal file with name sequence + 1
ft, err := os.OpenFile(ftpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
if err != nil {
return err
}
log.Printf("wal: segmented wal file %v is created", fpath)
l, err := fileutil.NewLock(f.Name())
if err != nil {
return err
}
err = l.Lock()
if err != nil {
return err
}
w.locks = append(w.locks, l)
if err = w.sync(); err != nil {
return err
}
w.f.Close()
// update writer and save the previous crc
w.f = f
w.seq++
w.f = ft
prevCrc := w.encoder.crc.Sum32()
w.encoder = newEncoder(w.f, prevCrc)
if err := w.saveCrc(prevCrc); err != nil {
@ -320,7 +317,45 @@ func (w *WAL) cut() error {
if err := w.saveState(&w.state); err != nil {
return err
}
return w.sync()
// close temp wal file
if err := w.sync(); err != nil {
return err
}
if err := w.f.Close(); err != nil {
return err
}
// atomically move temp wal file to wal file
if err := os.Rename(ftpath, fpath); err != nil {
return err
}
// open the wal file and update writer again
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND, 0600)
if err != nil {
return err
}
w.f = f
prevCrc = w.encoder.crc.Sum32()
w.encoder = newEncoder(w.f, prevCrc)
// lock the new wal file
l, err := fileutil.NewLock(f.Name())
if err != nil {
return err
}
err = l.Lock()
if err != nil {
return err
}
w.locks = append(w.locks, l)
// increase the wal seq
w.seq++
log.Printf("wal: segmented wal file %v is created", fpath)
return nil
}
func (w *WAL) sync() error {