wal: make record a protobuf type

This commit is contained in:
Xiang Li
2014-08-04 15:46:12 -07:00
committed by Yicheng Qin
parent d0dc7427dd
commit 659eb5fd2a
6 changed files with 301 additions and 76 deletions

View File

@@ -20,43 +20,28 @@ import (
"io"
)
type block struct {
t int64
d []byte
}
func writeRecord(w io.Writer, rec *Record) error {
data, err := rec.Marshal()
if err != nil {
return err
}
func writeBlock(w io.Writer, t int64, d []byte) error {
if err := writeInt64(w, t); err != nil {
if err := writeInt64(w, int64(len(data))); err != nil {
return err
}
if err := writeInt64(w, int64(len(d))); err != nil {
return err
}
_, err := w.Write(d)
_, err = w.Write(data)
return err
}
func readBlock(r io.Reader, b *block) error {
t, err := readInt64(r)
if err != nil {
return err
}
func readRecord(r io.Reader, rec *Record) error {
rec.Reset()
l, err := readInt64(r)
if err != nil {
return unexpectedEOF(err)
return err
}
d := make([]byte, l)
if _, err = io.ReadFull(r, d); err != nil {
return unexpectedEOF(err)
return err
}
b.t = t
b.d = d
return nil
}
func unexpectedEOF(err error) error {
if err == io.EOF {
return io.ErrUnexpectedEOF
}
return err
return rec.Unmarshal(d)
}

223
wal/record.pb.go Normal file
View File

@@ -0,0 +1,223 @@
// Code generated by protoc-gen-gogo.
// source: record.proto
// DO NOT EDIT!
/*
Package wal is a generated protocol buffer package.
It is generated from these files:
record.proto
It has these top-level messages:
Record
*/
package wal
import proto "code.google.com/p/gogoprotobuf/proto"
import json "encoding/json"
import math "math"
// discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo.pb"
import io "io"
import code_google_com_p_gogoprotobuf_proto "code.google.com/p/gogoprotobuf/proto"
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
var _ = proto.Marshal
var _ = &json.SyntaxError{}
var _ = math.Inf
type Record struct {
Type int64 `protobuf:"varint,1,req,name=type" json:"type"`
Crc int32 `protobuf:"varint,2,req,name=crc" json:"crc"`
Data []byte `protobuf:"bytes,3,opt,name=data" json:"data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Record) Reset() { *m = Record{} }
func (m *Record) String() string { return proto.CompactTextString(m) }
func (*Record) ProtoMessage() {}
func init() {
}
func (m *Record) Unmarshal(data []byte) error {
l := len(data)
index := 0
for index < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
switch fieldNum {
case 1:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.Type |= (int64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.Crc |= (int32(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 2 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
byteLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
postIndex := index + byteLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Data = append(m.Data, data[index:postIndex]...)
index = postIndex
default:
var sizeOfWire int
for {
sizeOfWire++
wire >>= 7
if wire == 0 {
break
}
}
index -= sizeOfWire
skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
if err != nil {
return err
}
if (index + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
index += skippy
}
}
return nil
}
func (m *Record) Size() (n int) {
var l int
_ = l
n += 1 + sovRecord(uint64(m.Type))
n += 1 + sovRecord(uint64(uint32(m.Crc)))
if m.Data != nil {
l = len(m.Data)
n += 1 + l + sovRecord(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovRecord(x uint64) (n int) {
for {
n++
x >>= 7
if x == 0 {
break
}
}
return n
}
func sozRecord(x uint64) (n int) {
return sovRecord(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *Record) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
if err != nil {
return nil, err
}
return data[:n], nil
}
func (m *Record) MarshalTo(data []byte) (n int, err error) {
var i int
_ = i
var l int
_ = l
data[i] = 0x8
i++
i = encodeVarintRecord(data, i, uint64(m.Type))
data[i] = 0x10
i++
i = encodeVarintRecord(data, i, uint64(uint32(m.Crc)))
if m.Data != nil {
data[i] = 0x1a
i++
i = encodeVarintRecord(data, i, uint64(len(m.Data)))
i += copy(data[i:], m.Data)
}
if m.XXX_unrecognized != nil {
i += copy(data[i:], m.XXX_unrecognized)
}
return i, nil
}
func encodeFixed64Record(data []byte, offset int, v uint64) int {
data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8)
data[offset+2] = uint8(v >> 16)
data[offset+3] = uint8(v >> 24)
data[offset+4] = uint8(v >> 32)
data[offset+5] = uint8(v >> 40)
data[offset+6] = uint8(v >> 48)
data[offset+7] = uint8(v >> 56)
return offset + 8
}
func encodeFixed32Record(data []byte, offset int, v uint32) int {
data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8)
data[offset+2] = uint8(v >> 16)
data[offset+3] = uint8(v >> 24)
return offset + 4
}
func encodeVarintRecord(data []byte, offset int, v uint64) int {
for v >= 1<<7 {
data[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
data[offset] = uint8(v)
return offset + 1
}

14
wal/record.proto Normal file
View File

@@ -0,0 +1,14 @@
package wal;
import "code.google.com/p/gogoprotobuf/gogoproto/gogo.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
message Record {
required int64 type = 1 [(gogoproto.nullable) = false];
required int32 crc = 2 [(gogoproto.nullable) = false];
optional bytes data = 3;
}

View File

@@ -23,47 +23,47 @@ import (
"testing"
)
func TestReadBlock(t *testing.T) {
func TestReadRecord(t *testing.T) {
tests := []struct {
data []byte
wb *block
wr *Record
we error
}{
{infoBlock, &block{1, infoData}, nil},
{[]byte(""), &block{}, io.EOF},
{infoBlock[:len(infoBlock)-len(infoData)-8], &block{}, io.ErrUnexpectedEOF},
{infoBlock[:len(infoBlock)-len(infoData)], &block{}, io.ErrUnexpectedEOF},
{infoBlock[:len(infoBlock)-8], &block{}, io.ErrUnexpectedEOF},
{infoRecord, &Record{Type: 1, Crc: 0, Data: infoData}, nil},
{[]byte(""), &Record{}, io.EOF},
{infoRecord[:len(infoRecord)-len(infoData)-8], &Record{}, io.ErrUnexpectedEOF},
{infoRecord[:len(infoRecord)-len(infoData)], &Record{}, io.ErrUnexpectedEOF},
{infoRecord[:len(infoRecord)-8], &Record{}, io.ErrUnexpectedEOF},
}
b := &block{}
rec := &Record{}
for i, tt := range tests {
buf := bytes.NewBuffer(tt.data)
e := readBlock(buf, b)
if !reflect.DeepEqual(b, tt.wb) {
t.Errorf("#%d: block = %v, want %v", i, b, tt.wb)
e := readRecord(buf, rec)
if !reflect.DeepEqual(rec, tt.wr) {
t.Errorf("#%d: block = %v, want %v", i, rec, tt.wr)
}
if !reflect.DeepEqual(e, tt.we) {
t.Errorf("#%d: err = %v, want %v", i, e, tt.we)
}
b = &block{}
rec = &Record{}
}
}
func TestWriteBlock(t *testing.T) {
b := &block{}
func TestWriteRecord(t *testing.T) {
b := &Record{}
typ := int64(0xABCD)
d := []byte("Hello world!")
buf := new(bytes.Buffer)
writeBlock(buf, typ, d)
err := readBlock(buf, b)
writeRecord(buf, &Record{Type: typ, Crc: 0, Data: d})
err := readRecord(buf, b)
if err != nil {
t.Errorf("err = %v, want nil", err)
}
if b.t != typ {
t.Errorf("type = %d, want %d", b.t, typ)
if b.Type != typ {
t.Errorf("type = %d, want %d", b.Type, typ)
}
if !reflect.DeepEqual(b.d, d) {
t.Errorf("data = %v, want %v", b.d, d)
if !reflect.DeepEqual(b.Data, d) {
t.Errorf("data = %v, want %v", b.Data, d)
}
}

View File

@@ -92,7 +92,8 @@ func (w *WAL) SaveInfo(i *raft.Info) error {
if err != nil {
panic(err)
}
return writeBlock(w.bw, infoType, b)
rec := &Record{Type: infoType, Data: b}
return writeRecord(w.bw, rec)
}
func (w *WAL) SaveEntry(e *raft.Entry) error {
@@ -101,7 +102,8 @@ func (w *WAL) SaveEntry(e *raft.Entry) error {
if err != nil {
panic(err)
}
return writeBlock(w.bw, entryType, b)
rec := &Record{Type: entryType, Data: b}
return writeRecord(w.bw, rec)
}
func (w *WAL) SaveState(s *raft.State) error {
@@ -110,7 +112,8 @@ func (w *WAL) SaveState(s *raft.State) error {
if err != nil {
panic(err)
}
return writeBlock(w.bw, stateType, b)
rec := &Record{Type: stateType, Data: b}
return writeRecord(w.bw, rec)
}
func (w *WAL) checkAtHead() error {
@@ -136,38 +139,38 @@ func (w *WAL) LoadNode() (*Node, error) {
return nil, err
}
br := bufio.NewReader(w.f)
b := &block{}
rec := &Record{}
err := readBlock(br, b)
err := readRecord(br, rec)
if err != nil {
return nil, err
}
if b.t != infoType {
return nil, fmt.Errorf("the first block of wal is not infoType but %d", b.t)
if rec.Type != infoType {
return nil, fmt.Errorf("the first block of wal is not infoType but %d", rec.Type)
}
i, err := loadInfo(b.d)
i, err := loadInfo(rec.Data)
if err != nil {
return nil, err
}
ents := make([]raft.Entry, 0)
var state raft.State
for err = readBlock(br, b); err == nil; err = readBlock(br, b) {
switch b.t {
for err = readRecord(br, rec); err == nil; err = readRecord(br, rec) {
switch rec.Type {
case entryType:
e, err := loadEntry(b.d)
e, err := loadEntry(rec.Data)
if err != nil {
return nil, err
}
ents = append(ents[:e.Index-1], e)
case stateType:
s, err := loadState(b.d)
s, err := loadState(rec.Data)
if err != nil {
return nil, err
}
state = s
default:
return nil, fmt.Errorf("unexpected block type %d", b.t)
return nil, fmt.Errorf("unexpected block type %d", rec.Type)
}
}
if err != io.EOF {

View File

@@ -27,14 +27,14 @@ import (
)
var (
infoData = []byte("\b\xef\xfd\x02")
infoBlock = append([]byte("\x01\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00"), infoData...)
infoData = []byte("\b\xef\xfd\x02")
infoRecord = append([]byte("\n\x00\x00\x00\x00\x00\x00\x00\b\x01\x10\x00\x1a\x04"), infoData...)
stateData = []byte("\b\x01\x10\x01\x18\x01")
stateBlock = append([]byte("\x03\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00"), stateData...)
stateData = []byte("\b\x01\x10\x01\x18\x01")
stateRecord = append([]byte("\f\x00\x00\x00\x00\x00\x00\x00\b\x03\x10\x00\x1a\x06"), stateData...)
entryData = []byte("\b\x01\x10\x01\x18\x01\x22\x01\x01")
entryBlock = append([]byte("\x02\x00\x00\x00\x00\x00\x00\x00\t\x00\x00\x00\x00\x00\x00\x00"), entryData...)
entryData = []byte("\b\x01\x10\x01\x18\x01\x22\x01\x01")
entryRecord = append([]byte("\x0f\x00\x00\x00\x00\x00\x00\x00\b\x02\x10\x00\x1a\t"), entryData...)
)
func TestNew(t *testing.T) {
@@ -79,8 +79,8 @@ func TestSaveEntry(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(b, entryBlock) {
t.Errorf("ent = %q, want %q", b, entryBlock)
if !reflect.DeepEqual(b, entryRecord) {
t.Errorf("ent = %q, want %q", b, entryRecord)
}
err = os.Remove(p)
@@ -104,15 +104,15 @@ func TestSaveInfo(t *testing.T) {
// make sure we can only write info at the head of the wal file
// still in buffer
err = w.SaveInfo(i)
if err == nil || err.Error() != "cannot write info at 20, expect 0" {
t.Errorf("err = %v, want cannot write info at 20, expect 0", err)
if err == nil || err.Error() != "cannot write info at 18, expect 0" {
t.Errorf("err = %v, want cannot write info at 18, expect 0", err)
}
// sync to disk
w.Sync()
err = w.SaveInfo(i)
if err == nil || err.Error() != "cannot write info at 20, expect 0" {
t.Errorf("err = %v, want cannot write info at 20, expect 0", err)
if err == nil || err.Error() != "cannot write info at 18, expect 0" {
t.Errorf("err = %v, want cannot write info at 18, expect 0", err)
}
w.Close()
@@ -120,8 +120,8 @@ func TestSaveInfo(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(b, infoBlock) {
t.Errorf("ent = %q, want %q", b, infoBlock)
if !reflect.DeepEqual(b, infoRecord) {
t.Errorf("ent = %q, want %q", b, infoRecord)
}
err = os.Remove(p)
@@ -147,8 +147,8 @@ func TestSaveState(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(b, stateBlock) {
t.Errorf("ent = %q, want %q", b, stateBlock)
if !reflect.DeepEqual(b, stateRecord) {
t.Errorf("ent = %q, want %q", b, stateRecord)
}
err = os.Remove(p)