Merge pull request #6544 from gyuho/page-offset

wal, ioutil: set page offset for encoder
This commit is contained in:
Gyu-Ho Lee 2016-09-28 11:37:24 -07:00 committed by GitHub
commit 2353cbca71
6 changed files with 66 additions and 12 deletions

View File

@ -38,9 +38,12 @@ type PageWriter struct {
bufWatermarkBytes int
}
func NewPageWriter(w io.Writer, pageBytes int) *PageWriter {
// NewPageWriter creates a new PageWriter. pageBytes is the number of bytes
// to write per page. pageOffset is the starting offset of io.Writer.
func NewPageWriter(w io.Writer, pageBytes, pageOffset int) *PageWriter {
return &PageWriter{
w: w,
pageOffset: pageOffset,
pageBytes: pageBytes,
buf: make([]byte, defaultBufferBytes+pageBytes),
bufWatermarkBytes: defaultBufferBytes,

View File

@ -25,7 +25,7 @@ func TestPageWriterRandom(t *testing.T) {
pageBytes := 128
buf := make([]byte, 4*defaultBufferBytes)
cw := &checkPageWriter{pageBytes: pageBytes, t: t}
w := NewPageWriter(cw, pageBytes)
w := NewPageWriter(cw, pageBytes, 0)
n := 0
for i := 0; i < 4096; i++ {
c, err := w.Write(buf[:rand.Intn(len(buf))])
@ -51,7 +51,7 @@ func TestPageWriterPartialSlack(t *testing.T) {
pageBytes := 128
buf := make([]byte, defaultBufferBytes)
cw := &checkPageWriter{pageBytes: 64, t: t}
w := NewPageWriter(cw, pageBytes)
w := NewPageWriter(cw, pageBytes, 0)
// put writer in non-zero page offset
if _, err := w.Write(buf[:64]); err != nil {
t.Fatal(err)
@ -82,6 +82,35 @@ func TestPageWriterPartialSlack(t *testing.T) {
}
}
// TestPageWriterOffset tests if page writer correctly repositions when offset is given.
func TestPageWriterOffset(t *testing.T) {
defaultBufferBytes = 1024
pageBytes := 128
buf := make([]byte, defaultBufferBytes)
cw := &checkPageWriter{pageBytes: 64, t: t}
w := NewPageWriter(cw, pageBytes, 0)
if _, err := w.Write(buf[:64]); err != nil {
t.Fatal(err)
}
if err := w.Flush(); err != nil {
t.Fatal(err)
}
if w.pageOffset != 64 {
t.Fatalf("w.pageOffset expected 64, got %d", w.pageOffset)
}
w = NewPageWriter(cw, w.pageOffset, pageBytes)
if _, err := w.Write(buf[:64]); err != nil {
t.Fatal(err)
}
if err := w.Flush(); err != nil {
t.Fatal(err)
}
if w.pageOffset != 0 {
t.Fatalf("w.pageOffset expected 0, got %d", w.pageOffset)
}
}
// checkPageWriter implements an io.Writer that fails a test on unaligned writes.
type checkPageWriter struct {
pageBytes int

View File

@ -18,6 +18,7 @@ import (
"encoding/binary"
"hash"
"io"
"os"
"sync"
"github.com/coreos/etcd/pkg/crc"
@ -39,9 +40,9 @@ type encoder struct {
uint64buf []byte
}
func newEncoder(w io.Writer, prevCrc uint32) *encoder {
func newEncoder(w io.Writer, prevCrc uint32, pageOffset int) *encoder {
return &encoder{
bw: ioutil.NewPageWriter(w, walPageBytes),
bw: ioutil.NewPageWriter(w, walPageBytes, pageOffset),
crc: crc.New(prevCrc, crcTable),
// 1MB buffer
buf: make([]byte, 1024*1024),
@ -49,6 +50,15 @@ func newEncoder(w io.Writer, prevCrc uint32) *encoder {
}
}
// newFileEncoder creates a new encoder with current file offset for the page writer.
func newFileEncoder(f *os.File, prevCrc uint32) (*encoder, error) {
offset, err := f.Seek(0, os.SEEK_CUR)
if err != nil {
return nil, err
}
return newEncoder(f, prevCrc, int(offset)), nil
}
func (e *encoder) encode(rec *walpb.Record) error {
e.mu.Lock()
defer e.mu.Unlock()

View File

@ -69,7 +69,7 @@ func TestWriteRecord(t *testing.T) {
typ := int64(0xABCD)
d := []byte("Hello world!")
buf := new(bytes.Buffer)
e := newEncoder(buf, 0)
e := newEncoder(buf, 0, 0)
e.encode(&walpb.Record{Type: typ, Data: d})
e.flush()
decoder := newDecoder(ioutil.NopCloser(buf))

View File

@ -122,7 +122,10 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
w := &WAL{
dir: dirpath,
metadata: metadata,
encoder: newEncoder(f, 0),
}
w.encoder, err = newFileEncoder(f.File, 0)
if err != nil {
return nil, err
}
w.locks = append(w.locks, f)
if err = w.saveCrc(0); err != nil {
@ -343,7 +346,10 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
if w.tail() != nil {
// create encoder (chain crc with the decoder), enable appending
w.encoder = newEncoder(w.tail(), w.decoder.lastCRC())
w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC())
if err != nil {
return
}
}
w.decoder = nil
@ -377,7 +383,10 @@ func (w *WAL) cut() error {
// update writer and save the previous crc
w.locks = append(w.locks, newTail)
prevCrc := w.encoder.crc.Sum32()
w.encoder = newEncoder(w.tail(), prevCrc)
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
if err != nil {
return err
}
if err = w.saveCrc(prevCrc); err != nil {
return err
}
@ -416,7 +425,10 @@ func (w *WAL) cut() error {
w.locks[len(w.locks)-1] = newTail
prevCrc = w.encoder.crc.Sum32()
w.encoder = newEncoder(w.tail(), prevCrc)
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
if err != nil {
return err
}
plog.Infof("segmented wal file %v is created", fpath)
return nil

View File

@ -61,7 +61,7 @@ func TestNew(t *testing.T) {
}
var wb bytes.Buffer
e := newEncoder(&wb, 0)
e := newEncoder(&wb, 0, 0)
err = e.encode(&walpb.Record{Type: crcType, Crc: 0})
if err != nil {
t.Fatalf("err = %v, want nil", err)
@ -528,7 +528,7 @@ func TestSaveEmpty(t *testing.T) {
var buf bytes.Buffer
var est raftpb.HardState
w := WAL{
encoder: newEncoder(&buf, 0),
encoder: newEncoder(&buf, 0, 0),
}
if err := w.saveState(&est); err != nil {
t.Errorf("err = %v, want nil", err)