From ea94d19147dcd1d022f1b1cdf0925d7374981309 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 12 Dec 2014 12:48:44 -0800 Subject: [PATCH] *: lock the in using files; do not purge locked the wal files --- etcdserver/server.go | 30 +++----------- etcdserver/storage.go | 45 +++++++++++++++++++++ pkg/fileutil/lock.go | 60 ++++++++++++++++++++++++++++ pkg/fileutil/lock_test.go | 82 ++++++++++++++++++++++++++++++++++++++ pkg/fileutil/purge.go | 21 +++++++++- pkg/fileutil/purge_test.go | 70 +++++++++++++++++++++++++++++++- wal/wal.go | 65 +++++++++++++++++++++++++++++- wal/wal_test.go | 2 + 8 files changed, 346 insertions(+), 29 deletions(-) create mode 100644 etcdserver/storage.go create mode 100644 pkg/fileutil/lock.go create mode 100644 pkg/fileutil/lock_test.go diff --git a/etcdserver/server.go b/etcdserver/server.go index d4a0b68a0..31f292538 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -90,21 +90,6 @@ type Response struct { err error } -type Storage interface { - // Save function saves ents and state to the underlying stable storage. - // Save MUST block until st and ents are on stable storage. - Save(st raftpb.HardState, ents []raftpb.Entry) error - // SaveSnap function saves snapshot to the underlying stable storage. - SaveSnap(snap raftpb.Snapshot) error - - // TODO: WAL should be able to control cut itself. After implement self-controlled cut, - // remove it in this interface. - // Cut cuts out a new wal file for saving new state and entries. - Cut() error - // Close closes the Storage and performs finalization. - Close() error -} - type Server interface { // Start performs any initialization of the Server necessary for it to // begin serving requests. It must be called before Do or Process. @@ -295,15 +280,12 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { id: id, attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, Cluster: cfg.Cluster, - storage: struct { - *wal.WAL - *snap.Snapshotter - }{w, ss}, - stats: sstats, - lstats: lstats, - Ticker: time.Tick(100 * time.Millisecond), - SyncTicker: time.Tick(500 * time.Millisecond), - snapCount: cfg.SnapCount, + storage: NewStorage(w, ss), + stats: sstats, + lstats: lstats, + Ticker: time.Tick(100 * time.Millisecond), + SyncTicker: time.Tick(500 * time.Millisecond), + snapCount: cfg.SnapCount, } srv.sendhub = newSendHub(cfg.Transport, cfg.Cluster, srv, sstats, lstats) for _, m := range getOtherMembers(cfg.Cluster, cfg.Name) { diff --git a/etcdserver/storage.go b/etcdserver/storage.go new file mode 100644 index 000000000..61d2ee41c --- /dev/null +++ b/etcdserver/storage.go @@ -0,0 +1,45 @@ +package etcdserver + +import ( + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/snap" + "github.com/coreos/etcd/wal" +) + +type Storage interface { + // Save function saves ents and state to the underlying stable storage. + // Save MUST block until st and ents are on stable storage. + Save(st raftpb.HardState, ents []raftpb.Entry) error + // SaveSnap function saves snapshot to the underlying stable storage. + SaveSnap(snap raftpb.Snapshot) error + + // TODO: WAL should be able to control cut itself. After implement self-controlled cut, + // remove it in this interface. + // Cut cuts out a new wal file for saving new state and entries. + Cut() error + // Close closes the Storage and performs finalization. + Close() error +} + +type storage struct { + *wal.WAL + *snap.Snapshotter +} + +func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage { + return &storage{w, s} +} + +// SaveSnap saves the snapshot to disk and release the locked +// wal files since they will not be used. +func (st *storage) SaveSnap(snap raftpb.Snapshot) error { + err := st.Snapshotter.SaveSnap(snap) + if err != nil { + return err + } + err = st.WAL.ReleaseLockTo(snap.Metadata.Index) + if err != nil { + return err + } + return nil +} diff --git a/pkg/fileutil/lock.go b/pkg/fileutil/lock.go new file mode 100644 index 000000000..d3c8e9eff --- /dev/null +++ b/pkg/fileutil/lock.go @@ -0,0 +1,60 @@ +package fileutil + +import ( + "errors" + "os" + "syscall" +) + +var ( + ErrLocked = errors.New("file already locked") +) + +type Lock interface { + Name() string + TryLock() error + Lock() error + Unlock() error + Destroy() error +} + +type lock struct { + fd int + file *os.File +} + +func (l *lock) Name() string { + return l.file.Name() +} + +// TryLock acquires exclusivity on the lock without blocking +func (l *lock) TryLock() error { + err := syscall.Flock(l.fd, syscall.LOCK_EX|syscall.LOCK_NB) + if err != nil && err == syscall.EWOULDBLOCK { + return ErrLocked + } + return err +} + +// Lock acquires exclusivity on the lock without blocking +func (l *lock) Lock() error { + return syscall.Flock(l.fd, syscall.LOCK_EX) +} + +// Unlock unlocks the lock +func (l *lock) Unlock() error { + return syscall.Flock(l.fd, syscall.LOCK_UN) +} + +func (l *lock) Destroy() error { + return l.file.Close() +} + +func NewLock(file string) (Lock, error) { + f, err := os.Open(file) + if err != nil { + return nil, err + } + l := &lock{int(f.Fd()), f} + return l, nil +} diff --git a/pkg/fileutil/lock_test.go b/pkg/fileutil/lock_test.go new file mode 100644 index 000000000..a2b577f4b --- /dev/null +++ b/pkg/fileutil/lock_test.go @@ -0,0 +1,82 @@ +package fileutil + +import ( + "io/ioutil" + "os" + "testing" + "time" +) + +func TestLockAndUnlock(t *testing.T) { + f, err := ioutil.TempFile("", "lock") + if err != nil { + t.Fatal(err) + } + f.Close() + defer func() { + err := os.Remove(f.Name()) + if err != nil { + t.Fatal(err) + } + }() + + // lock the file + l, err := NewLock(f.Name()) + if err != nil { + t.Fatal(err) + } + defer l.Destroy() + err = l.Lock() + if err != nil { + t.Fatal(err) + } + + // try lock a locked file + dupl, err := NewLock(f.Name()) + if err != nil { + t.Fatal(err) + } + err = dupl.TryLock() + if err != ErrLocked { + t.Errorf("err = %v, want %v", err, ErrLocked) + } + + // unlock the file + err = l.Unlock() + if err != nil { + t.Fatal(err) + } + + // try lock the unlocked file + err = dupl.TryLock() + if err != nil { + t.Errorf("err = %v, want %v", err, nil) + } + defer dupl.Destroy() + + // blocking on locked file + locked := make(chan struct{}, 1) + go func() { + l.Lock() + locked <- struct{}{} + }() + + select { + case <-locked: + t.Error("unexpected unblocking") + case <-time.After(10 * time.Millisecond): + } + + // unlock + err = dupl.Unlock() + if err != nil { + t.Fatal(err) + } + + // the previously blocked routine should be unblocked + select { + case <-locked: + case <-time.After(10 * time.Millisecond): + t.Error("unexpected blocking") + } +} diff --git a/pkg/fileutil/purge.go b/pkg/fileutil/purge.go index 8b3e02894..721482638 100644 --- a/pkg/fileutil/purge.go +++ b/pkg/fileutil/purge.go @@ -27,12 +27,29 @@ func PurgeFile(dirname string, suffix string, max uint, interval time.Duration, sort.Strings(newfnames) for len(newfnames) > int(max) { f := path.Join(dirname, newfnames[0]) - err := os.Remove(f) + l, err := NewLock(f) if err != nil { errC <- err return } - log.Printf("filePurge: successfully remvoed file %s", f) + err = l.TryLock() + if err != nil { + break + } + err = os.Remove(f) + if err != nil { + errC <- err + return + } + err = l.Unlock() + if err != nil { + log.Printf("filePurge: unlock %s error %v", l.Name(), err) + } + err = l.Destroy() + if err != nil { + log.Printf("filePurge: destroy lock %s error %v", l.Name(), err) + } + log.Printf("filePurge: successfully removed file %s", f) newfnames = newfnames[1:] } select { diff --git a/pkg/fileutil/purge_test.go b/pkg/fileutil/purge_test.go index b60804bd7..6ecde6408 100644 --- a/pkg/fileutil/purge_test.go +++ b/pkg/fileutil/purge_test.go @@ -31,7 +31,7 @@ func TestPurgeFile(t *testing.T) { if err != nil { t.Fatal(err) } - time.Sleep(time.Millisecond) + time.Sleep(2 * time.Millisecond) } fnames, err := ReadDir(dir) if err != nil { @@ -48,3 +48,71 @@ func TestPurgeFile(t *testing.T) { } close(stop) } + +func TestPurgeFileHoldingLock(t *testing.T) { + dir, err := ioutil.TempDir("", "purgefile") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + for i := 0; i < 10; i++ { + _, err := os.Create(path.Join(dir, fmt.Sprintf("%d.test", i))) + if err != nil { + t.Fatal(err) + } + } + + // create a purge barrier at 5 + l, err := NewLock(path.Join(dir, fmt.Sprintf("%d.test", 5))) + err = l.Lock() + if err != nil { + t.Fatal(err) + } + + stop := make(chan struct{}) + errch := PurgeFile(dir, "test", 3, time.Millisecond, stop) + time.Sleep(5 * time.Millisecond) + + fnames, err := ReadDir(dir) + if err != nil { + t.Fatal(err) + } + wnames := []string{"5.test", "6.test", "7.test", "8.test", "9.test"} + if !reflect.DeepEqual(fnames, wnames) { + t.Errorf("filenames = %v, want %v", fnames, wnames) + } + select { + case err := <-errch: + t.Errorf("unexpected purge error %v", err) + case <-time.After(time.Millisecond): + } + + // remove the purge barrier + err = l.Unlock() + if err != nil { + t.Fatal(err) + } + err = l.Destroy() + if err != nil { + t.Fatal(err) + } + + time.Sleep(5 * time.Millisecond) + + fnames, err = ReadDir(dir) + if err != nil { + t.Fatal(err) + } + wnames = []string{"7.test", "8.test", "9.test"} + if !reflect.DeepEqual(fnames, wnames) { + t.Errorf("filenames = %v, want %v", fnames, wnames) + } + select { + case err := <-errch: + t.Errorf("unexpected purge error %v", err) + case <-time.After(time.Millisecond): + } + + close(stop) +} diff --git a/wal/wal.go b/wal/wal.go index 9585ba76b..5f56934d8 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -67,6 +67,8 @@ type WAL struct { seq uint64 // sequence of the wal file currently used for writes enti uint64 // index of the last entry saved to the wal encoder *encoder // encoder to encode records + + locks []fileutil.Lock // the file locks the WAL is holding (the name is increasing) } // Create creates a WAL ready for appending records. The given metadata is @@ -85,6 +87,15 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { if err != nil { return nil, err } + l, err := fileutil.NewLock(f.Name()) + if err != nil { + return nil, err + } + err = l.Lock() + if err != nil { + return nil, err + } + w := &WAL{ dir: dirpath, metadata: metadata, @@ -92,6 +103,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { f: f, encoder: newEncoder(f, 0), } + w.locks = append(w.locks, l) if err := w.saveCrc(0); err != nil { return nil, err } @@ -129,12 +141,22 @@ func OpenAtIndex(dirpath string, index uint64) (*WAL, error) { // open the wal files for reading rcs := make([]io.ReadCloser, 0) + ls := make([]fileutil.Lock, 0) for _, name := range names[nameIndex:] { f, err := os.Open(path.Join(dirpath, name)) if err != nil { return nil, err } + l, err := fileutil.NewLock(f.Name()) + if err != nil { + return nil, err + } + err = l.TryLock() + if err != nil { + return nil, err + } rcs = append(rcs, f) + ls = append(ls, l) } rc := MultiReadCloser(rcs...) @@ -157,8 +179,9 @@ func OpenAtIndex(dirpath string, index uint64) (*WAL, error) { ri: index, decoder: newDecoder(rc), - f: f, - seq: seq, + f: f, + seq: seq, + locks: ls, } return w, nil } @@ -224,6 +247,15 @@ func (w *WAL) Cut() error { if err != nil { return err } + l, err := fileutil.NewLock(f.Name()) + if err != nil { + return err + } + err = l.Lock() + if err != nil { + return err + } + w.locks = append(w.locks, l) if err = w.sync(); err != nil { return err } @@ -255,6 +287,30 @@ func (w *WAL) sync() error { return w.f.Sync() } +// ReleaseLockTo releases the locks w is holding, which +// have index smaller or equal to the given index. +func (w *WAL) ReleaseLockTo(index uint64) error { + for _, l := range w.locks { + _, i, err := parseWalName(path.Base(l.Name())) + if err != nil { + return err + } + if i > index { + return nil + } + err = l.Unlock() + if err != nil { + return err + } + err = l.Destroy() + if err != nil { + return err + } + w.locks = w.locks[1:] + } + return nil +} + func (w *WAL) Close() error { if w.f != nil { if err := w.sync(); err != nil { @@ -264,6 +320,11 @@ func (w *WAL) Close() error { return err } } + for _, l := range w.locks { + // TODO: log the error + l.Unlock() + l.Destroy() + } return nil } diff --git a/wal/wal_test.go b/wal/wal_test.go index 37528fefe..41b24b53b 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -238,6 +238,7 @@ func TestRecover(t *testing.T) { if !reflect.DeepEqual(state, s) { t.Errorf("state = %+v, want %+v", state, s) } + w.Close() } func TestSearchIndex(t *testing.T) { @@ -365,6 +366,7 @@ func TestRecoverAfterCut(t *testing.T) { t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i) } } + w.Close() } }