From 0df732c052c8b810fd052553537cfc7a1d5311e2 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 17 Mar 2016 09:51:33 -0700 Subject: [PATCH] wal: pre-create segment files Pipeline file creation and allocation so it overlaps writes to the log. Fixes #4773 --- wal/file_pipeline.go | 95 ++++++++++++++++++++++++++++++++++++++++++++ wal/wal.go | 26 +++++++----- 2 files changed, 112 insertions(+), 9 deletions(-) create mode 100644 wal/file_pipeline.go diff --git a/wal/file_pipeline.go b/wal/file_pipeline.go new file mode 100644 index 000000000..6db9f66d7 --- /dev/null +++ b/wal/file_pipeline.go @@ -0,0 +1,95 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "fmt" + "os" + "path" + + "github.com/coreos/etcd/pkg/fileutil" +) + +// filePipeline pipelines allocating disk space +type filePipeline struct { + // dir to put files + dir string + // size of files to make, in bytes + size int64 + // count number of files generated + count int + + filec chan *fileutil.LockedFile + errc chan error + donec chan struct{} +} + +func newFilePipeline(dir string, fileSize int64) *filePipeline { + fp := &filePipeline{ + dir: dir, + size: fileSize, + filec: make(chan *fileutil.LockedFile), + errc: make(chan error, 1), + donec: make(chan struct{}), + } + go fp.run() + return fp +} + +// Open returns a fresh file for writing +func (fp *filePipeline) Open() (f *fileutil.LockedFile, err error) { + select { + case f = <-fp.filec: + case err = <-fp.errc: + } + return +} + +func (fp *filePipeline) Close() error { + close(fp.donec) + return <-fp.errc +} + +func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) { + fpath := path.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count)) + if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, 0600); err != nil { + return nil, err + } + if err = fileutil.Preallocate(f.File, fp.size, true); err != nil { + plog.Errorf("failed to allocate space when creating new wal file (%v)", err) + f.Close() + return nil, err + } + fp.count++ + return f, nil +} + +func (fp *filePipeline) run() { + defer close(fp.errc) + for { + f, err := fp.alloc() + if err != nil { + fp.errc <- err + return + } + select { + case fp.filec <- f: + case <-fp.donec: + os.Remove(f.Name()) + f.Close() + return + } + } +} diff --git a/wal/wal.go b/wal/wal.go index fa692dfdf..3b7d9849a 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -80,6 +80,7 @@ type WAL struct { encoder *encoder // encoder to encode records locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing) + fp *filePipeline } // Create creates a WAL ready for appending records. The given metadata is @@ -109,6 +110,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { dir: dirpath, metadata: metadata, encoder: newEncoder(f, 0), + fp: newFilePipeline(dirpath, segmentSizeBytes), } w.locks = append(w.locks, f) if err := w.saveCrc(0); err != nil { @@ -206,6 +208,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) plog.Errorf("failed to allocate space when creating new wal file (%v)", err) return nil, err } + w.fp = newFilePipeline(w.dir, segmentSizeBytes) } return w, nil @@ -332,10 +335,9 @@ func (w *WAL) cut() error { } fpath := path.Join(w.dir, walName(w.seq()+1, w.enti+1)) - ftpath := fpath + ".tmp" // create a temp wal file with name sequence + 1, or truncate the existing one - newTail, err := fileutil.LockFile(ftpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + newTail, err := w.fp.Open() if err != nil { return err } @@ -357,7 +359,13 @@ func (w *WAL) cut() error { if err = w.sync(); err != nil { return err } - if err = os.Rename(ftpath, fpath); err != nil { + + off, err = w.tail().Seek(0, os.SEEK_CUR) + if err != nil { + return err + } + + if err = os.Rename(newTail.Name(), fpath); err != nil { return err } newTail.Close() @@ -365,7 +373,7 @@ func (w *WAL) cut() error { if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, 0600); err != nil { return err } - if _, err = newTail.Seek(0, os.SEEK_END); err != nil { + if _, err = newTail.Seek(off, os.SEEK_SET); err != nil { return err } @@ -374,11 +382,6 @@ func (w *WAL) cut() error { prevCrc = w.encoder.crc.Sum32() w.encoder = newEncoder(w.tail(), prevCrc) - if err = fileutil.Preallocate(w.tail().File, segmentSizeBytes, true); err != nil { - plog.Errorf("failed to allocate space when creating new wal file (%v)", err) - return err - } - plog.Infof("segmented wal file %v is created", fpath) return nil } @@ -443,6 +446,11 @@ func (w *WAL) Close() error { w.mu.Lock() defer w.mu.Unlock() + if w.fp != nil { + w.fp.Close() + w.fp = nil + } + if w.tail() != nil { if err := w.sync(); err != nil { return err