Merge pull request #1236 from unihorn/153

wal: record node id at the head of WAL file
This commit is contained in:
Yicheng Qin 2014-10-10 12:09:17 -07:00
commit f8b338d423
9 changed files with 165 additions and 174 deletions

View File

@ -10,6 +10,7 @@
It has these top-level messages:
Request
Info
*/
package etcdserverpb
@ -51,6 +52,15 @@ func (m *Request) Reset() { *m = Request{} }
func (m *Request) String() string { return proto.CompactTextString(m) }
func (*Request) ProtoMessage() {}
type Info struct {
ID uint64 `protobuf:"varint,1,req" json:"ID"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Info) Reset() { *m = Info{} }
func (m *Info) String() string { return proto.CompactTextString(m) }
func (*Info) ProtoMessage() {}
func init() {
}
func (m *Request) Unmarshal(data []byte) error {
@ -378,6 +388,63 @@ func (m *Request) Unmarshal(data []byte) error {
}
return nil
}
func (m *Info) Unmarshal(data []byte) error {
l := len(data)
index := 0
for index < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
switch fieldNum {
case 1:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.ID |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default:
var sizeOfWire int
for {
sizeOfWire++
wire >>= 7
if wire == 0 {
break
}
}
index -= sizeOfWire
skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
if err != nil {
return err
}
if (index + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
index += skippy
}
}
return nil
}
func (m *Request) Size() (n int) {
var l int
_ = l
@ -408,6 +475,15 @@ func (m *Request) Size() (n int) {
}
return n
}
func (m *Info) Size() (n int) {
var l int
_ = l
n += 1 + sovEtcdserver(uint64(m.ID))
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovEtcdserver(x uint64) (n int) {
for {
@ -533,6 +609,29 @@ func (m *Request) MarshalTo(data []byte) (n int, err error) {
}
return i, nil
}
func (m *Info) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
if err != nil {
return nil, err
}
return data[:n], nil
}
func (m *Info) MarshalTo(data []byte) (n int, err error) {
var i int
_ = i
var l int
_ = l
data[i] = 0x8
i++
i = encodeVarintEtcdserver(data, i, uint64(m.ID))
if m.XXX_unrecognized != nil {
i += copy(data[i:], m.XXX_unrecognized)
}
return i, nil
}
func encodeFixed64Etcdserver(data []byte, offset int, v uint64) int {
data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8)

View File

@ -25,3 +25,7 @@ message Request {
required int64 Time = 15 [(gogoproto.nullable) = false];
required bool Stream = 16 [(gogoproto.nullable) = false];
}
message Info {
required uint64 ID = 1 [(gogoproto.nullable) = false];
}

View File

@ -116,7 +116,12 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
} else if (cfg.ClusterState) != ClusterStateValueNew {
log.Fatalf("etcd: initial cluster state unset and no wal or discovery URL found")
}
if w, err = wal.Create(waldir); err != nil {
i := pb.Info{ID: m.ID}
b, err := i.Marshal()
if err != nil {
log.Fatal(err)
}
if w, err = wal.Create(waldir, b); err != nil {
log.Fatal(err)
}
// TODO: add context for PeerURLs
@ -140,13 +145,17 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
if w, err = wal.OpenAtIndex(waldir, index); err != nil {
log.Fatal(err)
}
wid, st, ents, err := w.ReadAll()
md, st, ents, err := w.ReadAll()
if err != nil {
log.Fatal(err)
}
var info pb.Info
if err := info.Unmarshal(md); err != nil {
log.Fatal(err)
}
// TODO(xiangli): save/recovery nodeID?
if wid != 0 {
log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid)
if info.ID != m.ID {
log.Fatalf("unexpected nodeid %x, want %x: nodeid should always be the same until we support name/peerURLs update or dynamic configuration", info.ID, m.ID)
}
n = raft.RestartNode(m.ID, cfg.Cluster.IDs(), 10, 1, snapshot, st, ents)
}

View File

@ -9,7 +9,6 @@
raft.proto
It has these top-level messages:
Info
Entry
Snapshot
Message
@ -98,15 +97,6 @@ func (x *ConfChangeType) UnmarshalJSON(data []byte) error {
return nil
}
type Info struct {
ID uint64 `protobuf:"varint,1,req" json:"ID"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Info) Reset() { *m = Info{} }
func (m *Info) String() string { return proto.CompactTextString(m) }
func (*Info) ProtoMessage() {}
type Entry struct {
Type EntryType `protobuf:"varint,1,req,enum=raftpb.EntryType" json:"Type"`
Term uint64 `protobuf:"varint,2,req" json:"Term"`
@ -177,63 +167,6 @@ func init() {
proto.RegisterEnum("raftpb.EntryType", EntryType_name, EntryType_value)
proto.RegisterEnum("raftpb.ConfChangeType", ConfChangeType_name, ConfChangeType_value)
}
func (m *Info) Unmarshal(data []byte) error {
l := len(data)
index := 0
for index < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
switch fieldNum {
case 1:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.ID |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default:
var sizeOfWire int
for {
sizeOfWire++
wire >>= 7
if wire == 0 {
break
}
}
index -= sizeOfWire
skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
if err != nil {
return err
}
if (index + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
index += skippy
}
}
return nil
}
func (m *Entry) Unmarshal(data []byte) error {
l := len(data)
index := 0
@ -878,15 +811,6 @@ func (m *ConfChange) Unmarshal(data []byte) error {
}
return nil
}
func (m *Info) Size() (n int) {
var l int
_ = l
n += 1 + sovRaft(uint64(m.ID))
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *Entry) Size() (n int) {
var l int
_ = l
@ -984,29 +908,6 @@ func sovRaft(x uint64) (n int) {
func sozRaft(x uint64) (n int) {
return sovRaft(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *Info) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
if err != nil {
return nil, err
}
return data[:n], nil
}
func (m *Info) MarshalTo(data []byte) (n int, err error) {
var i int
_ = i
var l int
_ = l
data[i] = 0x8
i++
i = encodeVarintRaft(data, i, uint64(m.ID))
if m.XXX_unrecognized != nil {
i += copy(data[i:], m.XXX_unrecognized)
}
return i, nil
}
func (m *Entry) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)

View File

@ -8,10 +8,6 @@ option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
option (gogoproto.goproto_enum_prefix_all) = false;
message Info {
required uint64 ID = 1 [(gogoproto.nullable) = false];
}
enum EntryType {
EntryNormal = 0;
EntryConfChange = 1;

View File

@ -58,19 +58,11 @@ func (d *decoder) close() error {
return d.c.Close()
}
func mustUnmarshalInfo(d []byte) raftpb.Info {
var i raftpb.Info
if err := i.Unmarshal(d); err != nil {
// crc matched, but we cannot unmarshal the struct?!
// we must be the next winner of the $1B lottery.
panic(err)
}
return i
}
func mustUnmarshalEntry(d []byte) raftpb.Entry {
var e raftpb.Entry
if err := e.Unmarshal(d); err != nil {
// crc matched, but we cannot unmarshal the struct?!
// we must be the next winner of the $1B lottery.
panic(err)
}
return e

View File

@ -27,6 +27,11 @@ import (
"github.com/coreos/etcd/wal/walpb"
)
var (
infoData = []byte("\b\xef\xfd\x02")
infoRecord = append([]byte("\x0e\x00\x00\x00\x00\x00\x00\x00\b\x01\x10\x99\xb5\xe4\xd0\x03\x1a\x04"), infoData...)
)
func TestReadRecord(t *testing.T) {
badInfoRecord := make([]byte, len(infoRecord))
copy(badInfoRecord, infoRecord)

View File

@ -23,6 +23,7 @@ import (
"io"
"os"
"path"
"reflect"
"sort"
"github.com/coreos/etcd/raft"
@ -31,7 +32,7 @@ import (
)
const (
infoType int64 = iota + 1
metadataType int64 = iota + 1
entryType
stateType
crcType
@ -41,11 +42,11 @@ const (
)
var (
ErrIDMismatch = errors.New("wal: unmatch id")
ErrFileNotFound = errors.New("wal: file not found")
ErrIndexNotFound = errors.New("wal: index not found in file")
ErrCRCMismatch = errors.New("wal: crc mismatch")
crcTable = crc32.MakeTable(crc32.Castagnoli)
ErrMetadataConflict = errors.New("wal: conflicting metadata found")
ErrFileNotFound = errors.New("wal: file not found")
ErrIndexNotFound = errors.New("wal: index not found in file")
ErrCRCMismatch = errors.New("wal: crc mismatch")
crcTable = crc32.MakeTable(crc32.Castagnoli)
)
// WAL is a logical repersentation of the stable storage.
@ -55,6 +56,7 @@ var (
// The WAL will be ready for appending after reading out all the previous records.
type WAL struct {
dir string // the living directory of the underlay files
md []byte // metadata recorded at the head of each WAL
ri uint64 // index of entry to start reading
decoder *decoder // decoder to decode records
@ -65,8 +67,9 @@ type WAL struct {
encoder *encoder // encoder to encode records
}
// Create creates a WAL ready for appending records.
func Create(dirpath string) (*WAL, error) {
// Create creates a WAL ready for appending records. The given metadata is
// recorded at the head of each WAL file, and can be retrieved with ReadAll.
func Create(dirpath string, metadata []byte) (*WAL, error) {
if Exist(dirpath) {
return nil, os.ErrExist
}
@ -82,6 +85,7 @@ func Create(dirpath string) (*WAL, error) {
}
w := &WAL{
dir: dirpath,
md: metadata,
seq: 0,
f: f,
encoder: newEncoder(f, 0),
@ -89,6 +93,9 @@ func Create(dirpath string) (*WAL, error) {
if err := w.saveCrc(0); err != nil {
return nil, err
}
if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
return nil, err
}
return w, nil
}
@ -154,7 +161,7 @@ func OpenAtIndex(dirpath string, index uint64) (*WAL, error) {
// ReadAll reads out all records of the current WAL.
// If it cannot read out the expected entry, it will return ErrIndexNotFound.
// After ReadAll, the WAL will be ready for appending new records.
func (w *WAL) ReadAll() (id uint64, state raftpb.HardState, ents []raftpb.Entry, err error) {
func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
rec := &walpb.Record{}
decoder := w.decoder
@ -168,44 +175,44 @@ func (w *WAL) ReadAll() (id uint64, state raftpb.HardState, ents []raftpb.Entry,
w.enti = e.Index
case stateType:
state = mustUnmarshalState(rec.Data)
case infoType:
i := mustUnmarshalInfo(rec.Data)
if id != 0 && id != i.ID {
case metadataType:
if metadata != nil && !reflect.DeepEqual(metadata, rec.Data) {
state.Reset()
return 0, state, nil, ErrIDMismatch
return nil, state, nil, ErrMetadataConflict
}
id = i.ID
metadata = rec.Data
case crcType:
crc := decoder.crc.Sum32()
// current crc of decoder must match the crc of the record.
// do no need to match 0 crc, since the decoder is a new one at this case.
if crc != 0 && rec.Validate(crc) != nil {
state.Reset()
return 0, state, nil, ErrCRCMismatch
return nil, state, nil, ErrCRCMismatch
}
decoder.updateCRC(rec.Crc)
default:
state.Reset()
return 0, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
}
}
if err != io.EOF {
state.Reset()
return 0, state, nil, err
return nil, state, nil, err
}
if w.enti < w.ri {
state.Reset()
return 0, state, nil, ErrIndexNotFound
return nil, state, nil, ErrIndexNotFound
}
// close decoder, disable reading
w.decoder.close()
w.ri = 0
w.md = metadata
// create encoder (chain crc with the decoder), enable appending
w.encoder = newEncoder(w.f, w.decoder.lastCRC())
w.decoder = nil
return id, state, ents, nil
return metadata, state, ents, nil
}
// Cut closes current file written and creates a new one ready to append.
@ -224,7 +231,10 @@ func (w *WAL) Cut() error {
w.seq++
prevCrc := w.encoder.crc.Sum32()
w.encoder = newEncoder(w.f, prevCrc)
return w.saveCrc(prevCrc)
if err := w.saveCrc(prevCrc); err != nil {
return err
}
return w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.md})
}
func (w *WAL) Sync() error {
@ -243,15 +253,6 @@ func (w *WAL) Close() {
}
}
func (w *WAL) SaveInfo(i *raftpb.Info) error {
b, err := i.Marshal()
if err != nil {
panic(err)
}
rec := &walpb.Record{Type: infoType, Data: b}
return w.encoder.encode(rec)
}
func (w *WAL) SaveEntry(e *raftpb.Entry) error {
b, err := e.Marshal()
if err != nil {

View File

@ -27,11 +27,6 @@ import (
"github.com/coreos/etcd/raft/raftpb"
)
var (
infoData = []byte("\b\xef\xfd\x02")
infoRecord = append([]byte("\x0e\x00\x00\x00\x00\x00\x00\x00\b\x01\x10\x99\xb5\xe4\xd0\x03\x1a\x04"), infoData...)
)
func TestNew(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
if err != nil {
@ -39,7 +34,7 @@ func TestNew(t *testing.T) {
}
defer os.RemoveAll(p)
w, err := Create(p)
w, err := Create(p, nil)
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
@ -57,7 +52,7 @@ func TestNewForInitedDir(t *testing.T) {
defer os.RemoveAll(p)
os.Create(path.Join(p, walName(0, 0)))
if _, err = Create(p); err == nil || err != os.ErrExist {
if _, err = Create(p, nil); err == nil || err != os.ErrExist {
t.Errorf("err = %v, want %v", err, os.ErrExist)
}
}
@ -123,7 +118,7 @@ func TestCut(t *testing.T) {
}
defer os.RemoveAll(p)
w, err := Create(p)
w, err := Create(p, nil)
if err != nil {
t.Fatal(err)
}
@ -161,14 +156,10 @@ func TestRecover(t *testing.T) {
}
defer os.RemoveAll(p)
w, err := Create(p)
w, err := Create(p, []byte("metadata"))
if err != nil {
t.Fatal(err)
}
i := &raftpb.Info{ID: uint64(0xBAD0)}
if err = w.SaveInfo(i); err != nil {
t.Fatal(err)
}
ents := []raftpb.Entry{{Index: 0, Term: 0}, {Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}}
for _, e := range ents {
if err = w.SaveEntry(&e); err != nil {
@ -186,13 +177,13 @@ func TestRecover(t *testing.T) {
if w, err = OpenAtIndex(p, 0); err != nil {
t.Fatal(err)
}
id, state, entries, err := w.ReadAll()
metadata, state, entries, err := w.ReadAll()
if err != nil {
t.Fatal(err)
}
if id != i.ID {
t.Errorf("id = %d, want %d", id, i.ID)
if !reflect.DeepEqual(metadata, []byte("metadata")) {
t.Errorf("metadata = %s, want %s", metadata, "metadata")
}
if !reflect.DeepEqual(entries, ents) {
t.Errorf("ents = %+v, want %+v", entries, ents)
@ -278,14 +269,10 @@ func TestRecoverAfterCut(t *testing.T) {
}
defer os.RemoveAll(p)
w, err := Create(p)
w, err := Create(p, []byte("metadata"))
if err != nil {
t.Fatal(err)
}
info := &raftpb.Info{ID: uint64(0xBAD1)}
if err = w.SaveInfo(info); err != nil {
t.Fatal(err)
}
// TODO(unihorn): remove this when cut can operate on an empty file
if err = w.SaveEntry(&raftpb.Entry{}); err != nil {
t.Fatal(err)
@ -301,9 +288,6 @@ func TestRecoverAfterCut(t *testing.T) {
if err = w.Cut(); err != nil {
t.Fatal(err)
}
if err = w.SaveInfo(info); err != nil {
t.Fatal(err)
}
}
w.Close()
@ -323,13 +307,13 @@ func TestRecoverAfterCut(t *testing.T) {
}
continue
}
id, _, entries, err := w.ReadAll()
metadata, _, entries, err := w.ReadAll()
if err != nil {
t.Errorf("#%d: err = %v, want nil", i, err)
continue
}
if id != info.ID {
t.Errorf("#%d: id = %d, want %d", i, id, info.ID)
if !reflect.DeepEqual(metadata, []byte("metadata")) {
t.Errorf("#%d: metadata = %s, want %s", i, metadata, "metadata")
}
for j, e := range entries {
if e.Index != uint64(j+i) {
@ -346,7 +330,7 @@ func TestOpenAtUncommittedIndex(t *testing.T) {
}
defer os.RemoveAll(p)
w, err := Create(p)
w, err := Create(p, nil)
if err != nil {
t.Fatal(err)
}