From d0c29cc6108a0b2752a67b6e2a17aac97fc295be Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 28 Sep 2016 09:41:24 -0700 Subject: [PATCH 1/2] pkg/ioutil: configure pageOffset in NewPageWriter --- pkg/ioutil/pagewriter.go | 5 ++++- pkg/ioutil/pagewriter_test.go | 33 +++++++++++++++++++++++++++++++-- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/pkg/ioutil/pagewriter.go b/pkg/ioutil/pagewriter.go index ed22d9420..72de1593d 100644 --- a/pkg/ioutil/pagewriter.go +++ b/pkg/ioutil/pagewriter.go @@ -38,9 +38,12 @@ type PageWriter struct { bufWatermarkBytes int } -func NewPageWriter(w io.Writer, pageBytes int) *PageWriter { +// NewPageWriter creates a new PageWriter. pageBytes is the number of bytes +// to write per page. pageOffset is the starting offset of io.Writer. +func NewPageWriter(w io.Writer, pageBytes, pageOffset int) *PageWriter { return &PageWriter{ w: w, + pageOffset: pageOffset, pageBytes: pageBytes, buf: make([]byte, defaultBufferBytes+pageBytes), bufWatermarkBytes: defaultBufferBytes, diff --git a/pkg/ioutil/pagewriter_test.go b/pkg/ioutil/pagewriter_test.go index 62cbd6492..cdeaba926 100644 --- a/pkg/ioutil/pagewriter_test.go +++ b/pkg/ioutil/pagewriter_test.go @@ -25,7 +25,7 @@ func TestPageWriterRandom(t *testing.T) { pageBytes := 128 buf := make([]byte, 4*defaultBufferBytes) cw := &checkPageWriter{pageBytes: pageBytes, t: t} - w := NewPageWriter(cw, pageBytes) + w := NewPageWriter(cw, pageBytes, 0) n := 0 for i := 0; i < 4096; i++ { c, err := w.Write(buf[:rand.Intn(len(buf))]) @@ -51,7 +51,7 @@ func TestPageWriterPartialSlack(t *testing.T) { pageBytes := 128 buf := make([]byte, defaultBufferBytes) cw := &checkPageWriter{pageBytes: 64, t: t} - w := NewPageWriter(cw, pageBytes) + w := NewPageWriter(cw, pageBytes, 0) // put writer in non-zero page offset if _, err := w.Write(buf[:64]); err != nil { t.Fatal(err) @@ -82,6 +82,35 @@ func TestPageWriterPartialSlack(t *testing.T) { } } +// TestPageWriterOffset tests if page writer correctly repositions when offset is given. +func TestPageWriterOffset(t *testing.T) { + defaultBufferBytes = 1024 + pageBytes := 128 + buf := make([]byte, defaultBufferBytes) + cw := &checkPageWriter{pageBytes: 64, t: t} + w := NewPageWriter(cw, pageBytes, 0) + if _, err := w.Write(buf[:64]); err != nil { + t.Fatal(err) + } + if err := w.Flush(); err != nil { + t.Fatal(err) + } + if w.pageOffset != 64 { + t.Fatalf("w.pageOffset expected 64, got %d", w.pageOffset) + } + + w = NewPageWriter(cw, w.pageOffset, pageBytes) + if _, err := w.Write(buf[:64]); err != nil { + t.Fatal(err) + } + if err := w.Flush(); err != nil { + t.Fatal(err) + } + if w.pageOffset != 0 { + t.Fatalf("w.pageOffset expected 0, got %d", w.pageOffset) + } +} + // checkPageWriter implements an io.Writer that fails a test on unaligned writes. type checkPageWriter struct { pageBytes int From f5588526cca812acc68505e38c08098f797d6671 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 28 Sep 2016 11:03:24 -0700 Subject: [PATCH 2/2] wal: set PageWriter offset in file encoder --- wal/encoder.go | 14 ++++++++++++-- wal/record_test.go | 2 +- wal/wal.go | 20 ++++++++++++++++---- wal/wal_test.go | 4 ++-- 4 files changed, 31 insertions(+), 9 deletions(-) diff --git a/wal/encoder.go b/wal/encoder.go index 4a6603654..efe58928c 100644 --- a/wal/encoder.go +++ b/wal/encoder.go @@ -18,6 +18,7 @@ import ( "encoding/binary" "hash" "io" + "os" "sync" "github.com/coreos/etcd/pkg/crc" @@ -39,9 +40,9 @@ type encoder struct { uint64buf []byte } -func newEncoder(w io.Writer, prevCrc uint32) *encoder { +func newEncoder(w io.Writer, prevCrc uint32, pageOffset int) *encoder { return &encoder{ - bw: ioutil.NewPageWriter(w, walPageBytes), + bw: ioutil.NewPageWriter(w, walPageBytes, pageOffset), crc: crc.New(prevCrc, crcTable), // 1MB buffer buf: make([]byte, 1024*1024), @@ -49,6 +50,15 @@ func newEncoder(w io.Writer, prevCrc uint32) *encoder { } } +// newFileEncoder creates a new encoder with current file offset for the page writer. +func newFileEncoder(f *os.File, prevCrc uint32) (*encoder, error) { + offset, err := f.Seek(0, os.SEEK_CUR) + if err != nil { + return nil, err + } + return newEncoder(f, prevCrc, int(offset)), nil +} + func (e *encoder) encode(rec *walpb.Record) error { e.mu.Lock() defer e.mu.Unlock() diff --git a/wal/record_test.go b/wal/record_test.go index ddbc37d86..2a6904a81 100644 --- a/wal/record_test.go +++ b/wal/record_test.go @@ -69,7 +69,7 @@ func TestWriteRecord(t *testing.T) { typ := int64(0xABCD) d := []byte("Hello world!") buf := new(bytes.Buffer) - e := newEncoder(buf, 0) + e := newEncoder(buf, 0, 0) e.encode(&walpb.Record{Type: typ, Data: d}) e.flush() decoder := newDecoder(ioutil.NopCloser(buf)) diff --git a/wal/wal.go b/wal/wal.go index 0bd85c166..7719e930b 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -122,7 +122,10 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { w := &WAL{ dir: dirpath, metadata: metadata, - encoder: newEncoder(f, 0), + } + w.encoder, err = newFileEncoder(f.File, 0) + if err != nil { + return nil, err } w.locks = append(w.locks, f) if err = w.saveCrc(0); err != nil { @@ -343,7 +346,10 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. if w.tail() != nil { // create encoder (chain crc with the decoder), enable appending - w.encoder = newEncoder(w.tail(), w.decoder.lastCRC()) + w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC()) + if err != nil { + return + } } w.decoder = nil @@ -377,7 +383,10 @@ func (w *WAL) cut() error { // update writer and save the previous crc w.locks = append(w.locks, newTail) prevCrc := w.encoder.crc.Sum32() - w.encoder = newEncoder(w.tail(), prevCrc) + w.encoder, err = newFileEncoder(w.tail().File, prevCrc) + if err != nil { + return err + } if err = w.saveCrc(prevCrc); err != nil { return err } @@ -416,7 +425,10 @@ func (w *WAL) cut() error { w.locks[len(w.locks)-1] = newTail prevCrc = w.encoder.crc.Sum32() - w.encoder = newEncoder(w.tail(), prevCrc) + w.encoder, err = newFileEncoder(w.tail().File, prevCrc) + if err != nil { + return err + } plog.Infof("segmented wal file %v is created", fpath) return nil diff --git a/wal/wal_test.go b/wal/wal_test.go index 6fc6f782e..a762ee255 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -61,7 +61,7 @@ func TestNew(t *testing.T) { } var wb bytes.Buffer - e := newEncoder(&wb, 0) + e := newEncoder(&wb, 0, 0) err = e.encode(&walpb.Record{Type: crcType, Crc: 0}) if err != nil { t.Fatalf("err = %v, want nil", err) @@ -528,7 +528,7 @@ func TestSaveEmpty(t *testing.T) { var buf bytes.Buffer var est raftpb.HardState w := WAL{ - encoder: newEncoder(&buf, 0), + encoder: newEncoder(&buf, 0, 0), } if err := w.saveState(&est); err != nil { t.Errorf("err = %v, want nil", err)