Merge pull request #2642 from yichengq/protect-wal

wal: allow at most one WAL function called at one time
This commit is contained in:
Yicheng Qin 2015-04-08 09:42:00 -07:00
commit 252a931666

View File

@ -23,6 +23,7 @@ import (
"os" "os"
"path" "path"
"reflect" "reflect"
"sync"
"time" "time"
"github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/fileutil"
@ -69,6 +70,7 @@ type WAL struct {
start walpb.Snapshot // snapshot to start reading start walpb.Snapshot // snapshot to start reading
decoder *decoder // decoder to decode records decoder *decoder // decoder to decode records
mu sync.Mutex
f *os.File // underlay file opened for appending, sync f *os.File // underlay file opened for appending, sync
seq uint64 // sequence of the wal file currently used for writes seq uint64 // sequence of the wal file currently used for writes
enti uint64 // index of the last entry saved to the wal enti uint64 // index of the last entry saved to the wal
@ -213,6 +215,9 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, all bool) (*WAL, error) {
// TODO: maybe loose the checking of match. // TODO: maybe loose the checking of match.
// After ReadAll, the WAL will be ready for appending new records. // After ReadAll, the WAL will be ready for appending new records.
func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) { func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
w.mu.Lock()
defer w.mu.Unlock()
rec := &walpb.Record{} rec := &walpb.Record{}
decoder := w.decoder decoder := w.decoder
@ -335,6 +340,9 @@ func (w *WAL) sync() error {
// For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release // For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release
// lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4. // lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4.
func (w *WAL) ReleaseLockTo(index uint64) error { func (w *WAL) ReleaseLockTo(index uint64) error {
w.mu.Lock()
defer w.mu.Unlock()
var smaller int var smaller int
found := false found := false
@ -370,6 +378,9 @@ func (w *WAL) ReleaseLockTo(index uint64) error {
} }
func (w *WAL) Close() error { func (w *WAL) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.f != nil { if w.f != nil {
if err := w.sync(); err != nil { if err := w.sync(); err != nil {
return err return err
@ -409,6 +420,9 @@ func (w *WAL) saveState(s *raftpb.HardState) error {
} }
func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error { func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
w.mu.Lock()
defer w.mu.Unlock()
// short cut, do not call sync // short cut, do not call sync
if raft.IsEmptyHardState(st) && len(ents) == 0 { if raft.IsEmptyHardState(st) && len(ents) == 0 {
return nil return nil
@ -436,6 +450,9 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
} }
func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
w.mu.Lock()
defer w.mu.Unlock()
b := pbutil.MustMarshal(&e) b := pbutil.MustMarshal(&e)
rec := &walpb.Record{Type: snapshotType, Data: b} rec := &walpb.Record{Type: snapshotType, Data: b}
if err := w.encoder.encode(rec); err != nil { if err := w.encoder.encode(rec); err != nil {