mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
wal: use buffer
This commit is contained in:
15
wal/block.go
15
wal/block.go
@@ -7,7 +7,6 @@ import (
|
||||
|
||||
type block struct {
|
||||
t int64
|
||||
l int64
|
||||
d []byte
|
||||
}
|
||||
|
||||
@@ -22,22 +21,24 @@ func writeBlock(w io.Writer, t int64, d []byte) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func readBlock(r io.Reader) (*block, error) {
|
||||
func readBlock(r io.Reader, b *block) error {
|
||||
t, err := readInt64(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
l, err := readInt64(r)
|
||||
if err != nil {
|
||||
return nil, unexpectedEOF(err)
|
||||
return unexpectedEOF(err)
|
||||
}
|
||||
d := make([]byte, l)
|
||||
n, err := r.Read(d)
|
||||
if err != nil {
|
||||
return nil, unexpectedEOF(err)
|
||||
return unexpectedEOF(err)
|
||||
}
|
||||
if n != int(l) {
|
||||
return nil, fmt.Errorf("len(data) = %d, want %d", n, l)
|
||||
return fmt.Errorf("len(data) = %d, want %d", n, l)
|
||||
}
|
||||
return &block{t, l, d}, nil
|
||||
b.t = t
|
||||
b.d = d
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -13,31 +13,34 @@ func TestReadBlock(t *testing.T) {
|
||||
wb *block
|
||||
we error
|
||||
}{
|
||||
{infoBlock, &block{1, 8, infoData}, nil},
|
||||
{[]byte(""), nil, io.EOF},
|
||||
{infoBlock[:len(infoBlock)-len(infoData)-8], nil, io.ErrUnexpectedEOF},
|
||||
{infoBlock[:len(infoBlock)-len(infoData)], nil, io.ErrUnexpectedEOF},
|
||||
{infoBlock[:len(infoBlock)-8], nil, io.ErrUnexpectedEOF},
|
||||
{infoBlock, &block{1, infoData}, nil},
|
||||
{[]byte(""), &block{}, io.EOF},
|
||||
{infoBlock[:len(infoBlock)-len(infoData)-8], &block{}, io.ErrUnexpectedEOF},
|
||||
{infoBlock[:len(infoBlock)-len(infoData)], &block{}, io.ErrUnexpectedEOF},
|
||||
{infoBlock[:len(infoBlock)-8], &block{}, io.ErrUnexpectedEOF},
|
||||
}
|
||||
|
||||
b := &block{}
|
||||
for i, tt := range tests {
|
||||
buf := bytes.NewBuffer(tt.data)
|
||||
b, e := readBlock(buf)
|
||||
e := readBlock(buf, b)
|
||||
if !reflect.DeepEqual(b, tt.wb) {
|
||||
t.Errorf("#%d: block = %v, want %v", i, b, tt.wb)
|
||||
}
|
||||
if !reflect.DeepEqual(e, tt.we) {
|
||||
t.Errorf("#%d: err = %v, want %v", i, e, tt.we)
|
||||
}
|
||||
b = &block{}
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteBlock(t *testing.T) {
|
||||
b := &block{}
|
||||
typ := int64(0xABCD)
|
||||
d := []byte("Hello world!")
|
||||
buf := new(bytes.Buffer)
|
||||
writeBlock(buf, typ, d)
|
||||
b, err := readBlock(buf)
|
||||
err := readBlock(buf, b)
|
||||
if err != nil {
|
||||
t.Errorf("err = %v, want nil", err)
|
||||
}
|
||||
|
||||
30
wal/wal.go
30
wal/wal.go
@@ -19,8 +19,9 @@ var (
|
||||
)
|
||||
|
||||
type WAL struct {
|
||||
f *os.File
|
||||
bw *bufio.Writer
|
||||
f *os.File
|
||||
bw *bufio.Writer
|
||||
buf *bytes.Buffer
|
||||
}
|
||||
|
||||
func New(path string) (*WAL, error) {
|
||||
@@ -35,7 +36,8 @@ func New(path string) (*WAL, error) {
|
||||
return nil, err
|
||||
}
|
||||
bw := bufio.NewWriter(f)
|
||||
return &WAL{f, bw}, nil
|
||||
buf := new(bytes.Buffer)
|
||||
return &WAL{f, bw, buf}, nil
|
||||
}
|
||||
|
||||
func Open(path string) (*WAL, error) {
|
||||
@@ -44,7 +46,8 @@ func Open(path string) (*WAL, error) {
|
||||
return nil, err
|
||||
}
|
||||
bw := bufio.NewWriter(f)
|
||||
return &WAL{f, bw}, nil
|
||||
buf := new(bytes.Buffer)
|
||||
return &WAL{f, bw, buf}, nil
|
||||
}
|
||||
|
||||
func (w *WAL) Close() {
|
||||
@@ -58,13 +61,12 @@ func (w *WAL) SaveInfo(id int64) error {
|
||||
if err := w.checkAtHead(); err != nil {
|
||||
return err
|
||||
}
|
||||
// cache the buffer?
|
||||
buf := new(bytes.Buffer)
|
||||
err := binary.Write(buf, binary.LittleEndian, id)
|
||||
w.buf.Reset()
|
||||
err := binary.Write(w.buf, binary.LittleEndian, id)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return writeBlock(w.bw, infoType, buf.Bytes())
|
||||
return writeBlock(w.bw, infoType, w.buf.Bytes())
|
||||
}
|
||||
|
||||
func (w *WAL) SaveEntry(e *raft.Entry) error {
|
||||
@@ -77,13 +79,12 @@ func (w *WAL) SaveEntry(e *raft.Entry) error {
|
||||
}
|
||||
|
||||
func (w *WAL) SaveState(s *raft.State) error {
|
||||
// cache the buffer?
|
||||
buf := new(bytes.Buffer)
|
||||
err := binary.Write(buf, binary.LittleEndian, s)
|
||||
w.buf.Reset()
|
||||
err := binary.Write(w.buf, binary.LittleEndian, s)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return writeBlock(w.bw, stateType, buf.Bytes())
|
||||
return writeBlock(w.bw, stateType, w.buf.Bytes())
|
||||
}
|
||||
|
||||
func (w *WAL) Flush() error {
|
||||
@@ -112,8 +113,9 @@ func (w *WAL) LoadNode() (*Node, error) {
|
||||
return nil, err
|
||||
}
|
||||
br := bufio.NewReader(w.f)
|
||||
b := &block{}
|
||||
|
||||
b, err := readBlock(br)
|
||||
err := readBlock(br, b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -127,7 +129,7 @@ func (w *WAL) LoadNode() (*Node, error) {
|
||||
|
||||
ents := make([]raft.Entry, 0)
|
||||
var state raft.State
|
||||
for b, err = readBlock(br); err == nil; b, err = readBlock(br) {
|
||||
for err = readBlock(br, b); err == nil; err = readBlock(br, b) {
|
||||
switch b.t {
|
||||
case entryType:
|
||||
e, err := loadEntry(b.d)
|
||||
|
||||
Reference in New Issue
Block a user