From 44de670de78f867670940b0e0793fb3688d85668 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Wed, 8 Apr 2015 00:25:18 -0700 Subject: [PATCH] wal: allow at most one WAL function called at one time SaveSnap and Save are called in separate goroutines now. Allow at most one WAL function being called at one time to protect internal fields and guarantee execution order. Or one possible bug is that the new cut file is started with snapshot entry instead of crc entry. --- wal/wal.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/wal/wal.go b/wal/wal.go index 1bc2f2c4e..e63c7d752 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -23,6 +23,7 @@ import ( "os" "path" "reflect" + "sync" "time" "github.com/coreos/etcd/pkg/fileutil" @@ -69,6 +70,7 @@ type WAL struct { start walpb.Snapshot // snapshot to start reading decoder *decoder // decoder to decode records + mu sync.Mutex f *os.File // underlay file opened for appending, sync seq uint64 // sequence of the wal file currently used for writes enti uint64 // index of the last entry saved to the wal @@ -213,6 +215,9 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, all bool) (*WAL, error) { // TODO: maybe loose the checking of match. // After ReadAll, the WAL will be ready for appending new records. func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) { + w.mu.Lock() + defer w.mu.Unlock() + rec := &walpb.Record{} decoder := w.decoder @@ -335,6 +340,9 @@ func (w *WAL) sync() error { // For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release // lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4. func (w *WAL) ReleaseLockTo(index uint64) error { + w.mu.Lock() + defer w.mu.Unlock() + var smaller int found := false @@ -370,6 +378,9 @@ func (w *WAL) ReleaseLockTo(index uint64) error { } func (w *WAL) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + if w.f != nil { if err := w.sync(); err != nil { return err @@ -409,6 +420,9 @@ func (w *WAL) saveState(s *raftpb.HardState) error { } func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error { + w.mu.Lock() + defer w.mu.Unlock() + // short cut, do not call sync if raft.IsEmptyHardState(st) && len(ents) == 0 { return nil @@ -436,6 +450,9 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error { } func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { + w.mu.Lock() + defer w.mu.Unlock() + b := pbutil.MustMarshal(&e) rec := &walpb.Record{Type: snapshotType, Data: b} if err := w.encoder.encode(rec); err != nil {