From 447caf1afc71773df70bd6e69ff4e24da5cf90d9 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 2 Oct 2014 17:06:35 -0700 Subject: [PATCH] etcdserver/wal: record info at the head of WAL file --- etcdserver/etcdserverpb/etcdserver.pb.go | 99 ++++++++++++++++++++++++ etcdserver/etcdserverpb/etcdserver.proto | 4 + etcdserver/server.go | 17 +++- raft/raftpb/raft.pb.go | 99 ------------------------ raft/raftpb/raft.proto | 4 - wal/decoder.go | 12 +-- wal/record_test.go | 5 ++ wal/wal.go | 59 +++++++------- wal/wal_test.go | 40 +++------- 9 files changed, 165 insertions(+), 174 deletions(-) diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index a6720af13..70d59cee2 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -10,6 +10,7 @@ It has these top-level messages: Request + Info */ package etcdserverpb @@ -51,6 +52,15 @@ func (m *Request) Reset() { *m = Request{} } func (m *Request) String() string { return proto.CompactTextString(m) } func (*Request) ProtoMessage() {} +type Info struct { + ID uint64 `protobuf:"varint,1,req" json:"ID"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Info) Reset() { *m = Info{} } +func (m *Info) String() string { return proto.CompactTextString(m) } +func (*Info) ProtoMessage() {} + func init() { } func (m *Request) Unmarshal(data []byte) error { @@ -378,6 +388,63 @@ func (m *Request) Unmarshal(data []byte) error { } return nil } +func (m *Info) Unmarshal(data []byte) error { + l := len(data) + index := 0 + for index < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.ID |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + index -= sizeOfWire + skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:]) + if err != nil { + return err + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...) + index += skippy + } + } + return nil +} func (m *Request) Size() (n int) { var l int _ = l @@ -408,6 +475,15 @@ func (m *Request) Size() (n int) { } return n } +func (m *Info) Size() (n int) { + var l int + _ = l + n += 1 + sovEtcdserver(uint64(m.ID)) + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} func sovEtcdserver(x uint64) (n int) { for { @@ -533,6 +609,29 @@ func (m *Request) MarshalTo(data []byte) (n int, err error) { } return i, nil } +func (m *Info) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *Info) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + data[i] = 0x8 + i++ + i = encodeVarintEtcdserver(data, i, uint64(m.ID)) + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} func encodeFixed64Etcdserver(data []byte, offset int, v uint64) int { data[offset] = uint8(v) data[offset+1] = uint8(v >> 8) diff --git a/etcdserver/etcdserverpb/etcdserver.proto b/etcdserver/etcdserverpb/etcdserver.proto index e29e86dca..ea2f6ebed 100644 --- a/etcdserver/etcdserverpb/etcdserver.proto +++ b/etcdserver/etcdserverpb/etcdserver.proto @@ -25,3 +25,7 @@ message Request { required int64 Time = 15 [(gogoproto.nullable) = false]; required bool Stream = 16 [(gogoproto.nullable) = false]; } + +message Info { + required uint64 ID = 1 [(gogoproto.nullable) = false]; +} diff --git a/etcdserver/server.go b/etcdserver/server.go index 73c9e30ee..c6ed734f0 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -116,7 +116,12 @@ func NewServer(cfg *ServerConfig) *EtcdServer { } else if (cfg.ClusterState) != ClusterStateValueNew { log.Fatalf("etcd: initial cluster state unset and no wal or discovery URL found") } - if w, err = wal.Create(waldir); err != nil { + i := pb.Info{ID: m.ID} + b, err := i.Marshal() + if err != nil { + log.Fatal(err) + } + if w, err = wal.Create(waldir, b); err != nil { log.Fatal(err) } // TODO: add context for PeerURLs @@ -140,13 +145,17 @@ func NewServer(cfg *ServerConfig) *EtcdServer { if w, err = wal.OpenAtIndex(waldir, index); err != nil { log.Fatal(err) } - wid, st, ents, err := w.ReadAll() + md, st, ents, err := w.ReadAll() if err != nil { log.Fatal(err) } + var info pb.Info + if err := info.Unmarshal(md); err != nil { + log.Fatal(err) + } // TODO(xiangli): save/recovery nodeID? - if wid != 0 { - log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid) + if info.ID != m.ID { + log.Fatalf("unexpected nodeid %x, want %x: nodeid should always be the same until we support name/peerURLs update or dynamic configuration", info.ID, m.ID) } n = raft.RestartNode(m.ID, cfg.Cluster.IDs(), 10, 1, snapshot, st, ents) } diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index 32bb4d8bb..79fd902ee 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -9,7 +9,6 @@ raft.proto It has these top-level messages: - Info Entry Snapshot Message @@ -98,15 +97,6 @@ func (x *ConfChangeType) UnmarshalJSON(data []byte) error { return nil } -type Info struct { - ID uint64 `protobuf:"varint,1,req" json:"ID"` - XXX_unrecognized []byte `json:"-"` -} - -func (m *Info) Reset() { *m = Info{} } -func (m *Info) String() string { return proto.CompactTextString(m) } -func (*Info) ProtoMessage() {} - type Entry struct { Type EntryType `protobuf:"varint,1,req,enum=raftpb.EntryType" json:"Type"` Term uint64 `protobuf:"varint,2,req" json:"Term"` @@ -177,63 +167,6 @@ func init() { proto.RegisterEnum("raftpb.EntryType", EntryType_name, EntryType_value) proto.RegisterEnum("raftpb.ConfChangeType", ConfChangeType_name, ConfChangeType_value) } -func (m *Info) Unmarshal(data []byte) error { - l := len(data) - index := 0 - for index < l { - var wire uint64 - for shift := uint(0); ; shift += 7 { - if index >= l { - return io.ErrUnexpectedEOF - } - b := data[index] - index++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - switch fieldNum { - case 1: - if wireType != 0 { - return code_google_com_p_gogoprotobuf_proto.ErrWrongType - } - for shift := uint(0); ; shift += 7 { - if index >= l { - return io.ErrUnexpectedEOF - } - b := data[index] - index++ - m.ID |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - default: - var sizeOfWire int - for { - sizeOfWire++ - wire >>= 7 - if wire == 0 { - break - } - } - index -= sizeOfWire - skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:]) - if err != nil { - return err - } - if (index + skippy) > l { - return io.ErrUnexpectedEOF - } - m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...) - index += skippy - } - } - return nil -} func (m *Entry) Unmarshal(data []byte) error { l := len(data) index := 0 @@ -878,15 +811,6 @@ func (m *ConfChange) Unmarshal(data []byte) error { } return nil } -func (m *Info) Size() (n int) { - var l int - _ = l - n += 1 + sovRaft(uint64(m.ID)) - if m.XXX_unrecognized != nil { - n += len(m.XXX_unrecognized) - } - return n -} func (m *Entry) Size() (n int) { var l int _ = l @@ -984,29 +908,6 @@ func sovRaft(x uint64) (n int) { func sozRaft(x uint64) (n int) { return sovRaft(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } -func (m *Info) Marshal() (data []byte, err error) { - size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) - if err != nil { - return nil, err - } - return data[:n], nil -} - -func (m *Info) MarshalTo(data []byte) (n int, err error) { - var i int - _ = i - var l int - _ = l - data[i] = 0x8 - i++ - i = encodeVarintRaft(data, i, uint64(m.ID)) - if m.XXX_unrecognized != nil { - i += copy(data[i:], m.XXX_unrecognized) - } - return i, nil -} func (m *Entry) Marshal() (data []byte, err error) { size := m.Size() data = make([]byte, size) diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index 0d4c53ad5..c6db9fa79 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -8,10 +8,6 @@ option (gogoproto.unmarshaler_all) = true; option (gogoproto.goproto_getters_all) = false; option (gogoproto.goproto_enum_prefix_all) = false; -message Info { - required uint64 ID = 1 [(gogoproto.nullable) = false]; -} - enum EntryType { EntryNormal = 0; EntryConfChange = 1; diff --git a/wal/decoder.go b/wal/decoder.go index ea295769d..ebdebc525 100644 --- a/wal/decoder.go +++ b/wal/decoder.go @@ -58,19 +58,11 @@ func (d *decoder) close() error { return d.c.Close() } -func mustUnmarshalInfo(d []byte) raftpb.Info { - var i raftpb.Info - if err := i.Unmarshal(d); err != nil { - // crc matched, but we cannot unmarshal the struct?! - // we must be the next winner of the $1B lottery. - panic(err) - } - return i -} - func mustUnmarshalEntry(d []byte) raftpb.Entry { var e raftpb.Entry if err := e.Unmarshal(d); err != nil { + // crc matched, but we cannot unmarshal the struct?! + // we must be the next winner of the $1B lottery. panic(err) } return e diff --git a/wal/record_test.go b/wal/record_test.go index 562152c74..943b5e26f 100644 --- a/wal/record_test.go +++ b/wal/record_test.go @@ -27,6 +27,11 @@ import ( "github.com/coreos/etcd/wal/walpb" ) +var ( + infoData = []byte("\b\xef\xfd\x02") + infoRecord = append([]byte("\x0e\x00\x00\x00\x00\x00\x00\x00\b\x01\x10\x99\xb5\xe4\xd0\x03\x1a\x04"), infoData...) +) + func TestReadRecord(t *testing.T) { badInfoRecord := make([]byte, len(infoRecord)) copy(badInfoRecord, infoRecord) diff --git a/wal/wal.go b/wal/wal.go index 05cbf1482..5f71f1fd3 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -23,6 +23,7 @@ import ( "io" "os" "path" + "reflect" "sort" "github.com/coreos/etcd/raft" @@ -31,7 +32,7 @@ import ( ) const ( - infoType int64 = iota + 1 + metadataType int64 = iota + 1 entryType stateType crcType @@ -41,11 +42,11 @@ const ( ) var ( - ErrIDMismatch = errors.New("wal: unmatch id") - ErrFileNotFound = errors.New("wal: file not found") - ErrIndexNotFound = errors.New("wal: index not found in file") - ErrCRCMismatch = errors.New("wal: crc mismatch") - crcTable = crc32.MakeTable(crc32.Castagnoli) + ErrMetadataConflict = errors.New("wal: conflicting metadata found") + ErrFileNotFound = errors.New("wal: file not found") + ErrIndexNotFound = errors.New("wal: index not found in file") + ErrCRCMismatch = errors.New("wal: crc mismatch") + crcTable = crc32.MakeTable(crc32.Castagnoli) ) // WAL is a logical repersentation of the stable storage. @@ -55,6 +56,7 @@ var ( // The WAL will be ready for appending after reading out all the previous records. type WAL struct { dir string // the living directory of the underlay files + md []byte // metadata recorded at the head of each WAL ri uint64 // index of entry to start reading decoder *decoder // decoder to decode records @@ -65,8 +67,9 @@ type WAL struct { encoder *encoder // encoder to encode records } -// Create creates a WAL ready for appending records. -func Create(dirpath string) (*WAL, error) { +// Create creates a WAL ready for appending records. The given metadata is +// recorded at the head of each WAL file, and can be retrieved with ReadAll. +func Create(dirpath string, metadata []byte) (*WAL, error) { if Exist(dirpath) { return nil, os.ErrExist } @@ -82,6 +85,7 @@ func Create(dirpath string) (*WAL, error) { } w := &WAL{ dir: dirpath, + md: metadata, seq: 0, f: f, encoder: newEncoder(f, 0), @@ -89,6 +93,9 @@ func Create(dirpath string) (*WAL, error) { if err := w.saveCrc(0); err != nil { return nil, err } + if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil { + return nil, err + } return w, nil } @@ -154,7 +161,7 @@ func OpenAtIndex(dirpath string, index uint64) (*WAL, error) { // ReadAll reads out all records of the current WAL. // If it cannot read out the expected entry, it will return ErrIndexNotFound. // After ReadAll, the WAL will be ready for appending new records. -func (w *WAL) ReadAll() (id uint64, state raftpb.HardState, ents []raftpb.Entry, err error) { +func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) { rec := &walpb.Record{} decoder := w.decoder @@ -168,44 +175,44 @@ func (w *WAL) ReadAll() (id uint64, state raftpb.HardState, ents []raftpb.Entry, w.enti = e.Index case stateType: state = mustUnmarshalState(rec.Data) - case infoType: - i := mustUnmarshalInfo(rec.Data) - if id != 0 && id != i.ID { + case metadataType: + if metadata != nil && !reflect.DeepEqual(metadata, rec.Data) { state.Reset() - return 0, state, nil, ErrIDMismatch + return nil, state, nil, ErrMetadataConflict } - id = i.ID + metadata = rec.Data case crcType: crc := decoder.crc.Sum32() // current crc of decoder must match the crc of the record. // do no need to match 0 crc, since the decoder is a new one at this case. if crc != 0 && rec.Validate(crc) != nil { state.Reset() - return 0, state, nil, ErrCRCMismatch + return nil, state, nil, ErrCRCMismatch } decoder.updateCRC(rec.Crc) default: state.Reset() - return 0, state, nil, fmt.Errorf("unexpected block type %d", rec.Type) + return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type) } } if err != io.EOF { state.Reset() - return 0, state, nil, err + return nil, state, nil, err } if w.enti < w.ri { state.Reset() - return 0, state, nil, ErrIndexNotFound + return nil, state, nil, ErrIndexNotFound } // close decoder, disable reading w.decoder.close() w.ri = 0 + w.md = metadata // create encoder (chain crc with the decoder), enable appending w.encoder = newEncoder(w.f, w.decoder.lastCRC()) w.decoder = nil - return id, state, ents, nil + return metadata, state, ents, nil } // Cut closes current file written and creates a new one ready to append. @@ -224,7 +231,10 @@ func (w *WAL) Cut() error { w.seq++ prevCrc := w.encoder.crc.Sum32() w.encoder = newEncoder(w.f, prevCrc) - return w.saveCrc(prevCrc) + if err := w.saveCrc(prevCrc); err != nil { + return err + } + return w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.md}) } func (w *WAL) Sync() error { @@ -243,15 +253,6 @@ func (w *WAL) Close() { } } -func (w *WAL) SaveInfo(i *raftpb.Info) error { - b, err := i.Marshal() - if err != nil { - panic(err) - } - rec := &walpb.Record{Type: infoType, Data: b} - return w.encoder.encode(rec) -} - func (w *WAL) SaveEntry(e *raftpb.Entry) error { b, err := e.Marshal() if err != nil { diff --git a/wal/wal_test.go b/wal/wal_test.go index b5b269bb5..d8f8f633d 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -27,11 +27,6 @@ import ( "github.com/coreos/etcd/raft/raftpb" ) -var ( - infoData = []byte("\b\xef\xfd\x02") - infoRecord = append([]byte("\x0e\x00\x00\x00\x00\x00\x00\x00\b\x01\x10\x99\xb5\xe4\xd0\x03\x1a\x04"), infoData...) -) - func TestNew(t *testing.T) { p, err := ioutil.TempDir(os.TempDir(), "waltest") if err != nil { @@ -39,7 +34,7 @@ func TestNew(t *testing.T) { } defer os.RemoveAll(p) - w, err := Create(p) + w, err := Create(p, nil) if err != nil { t.Fatalf("err = %v, want nil", err) } @@ -57,7 +52,7 @@ func TestNewForInitedDir(t *testing.T) { defer os.RemoveAll(p) os.Create(path.Join(p, walName(0, 0))) - if _, err = Create(p); err == nil || err != os.ErrExist { + if _, err = Create(p, nil); err == nil || err != os.ErrExist { t.Errorf("err = %v, want %v", err, os.ErrExist) } } @@ -123,7 +118,7 @@ func TestCut(t *testing.T) { } defer os.RemoveAll(p) - w, err := Create(p) + w, err := Create(p, nil) if err != nil { t.Fatal(err) } @@ -161,14 +156,10 @@ func TestRecover(t *testing.T) { } defer os.RemoveAll(p) - w, err := Create(p) + w, err := Create(p, []byte("metadata")) if err != nil { t.Fatal(err) } - i := &raftpb.Info{ID: uint64(0xBAD0)} - if err = w.SaveInfo(i); err != nil { - t.Fatal(err) - } ents := []raftpb.Entry{{Index: 0, Term: 0}, {Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}} for _, e := range ents { if err = w.SaveEntry(&e); err != nil { @@ -186,13 +177,13 @@ func TestRecover(t *testing.T) { if w, err = OpenAtIndex(p, 0); err != nil { t.Fatal(err) } - id, state, entries, err := w.ReadAll() + metadata, state, entries, err := w.ReadAll() if err != nil { t.Fatal(err) } - if id != i.ID { - t.Errorf("id = %d, want %d", id, i.ID) + if !reflect.DeepEqual(metadata, []byte("metadata")) { + t.Errorf("metadata = %s, want %s", metadata, "metadata") } if !reflect.DeepEqual(entries, ents) { t.Errorf("ents = %+v, want %+v", entries, ents) @@ -278,14 +269,10 @@ func TestRecoverAfterCut(t *testing.T) { } defer os.RemoveAll(p) - w, err := Create(p) + w, err := Create(p, []byte("metadata")) if err != nil { t.Fatal(err) } - info := &raftpb.Info{ID: uint64(0xBAD1)} - if err = w.SaveInfo(info); err != nil { - t.Fatal(err) - } // TODO(unihorn): remove this when cut can operate on an empty file if err = w.SaveEntry(&raftpb.Entry{}); err != nil { t.Fatal(err) @@ -301,9 +288,6 @@ func TestRecoverAfterCut(t *testing.T) { if err = w.Cut(); err != nil { t.Fatal(err) } - if err = w.SaveInfo(info); err != nil { - t.Fatal(err) - } } w.Close() @@ -323,13 +307,13 @@ func TestRecoverAfterCut(t *testing.T) { } continue } - id, _, entries, err := w.ReadAll() + metadata, _, entries, err := w.ReadAll() if err != nil { t.Errorf("#%d: err = %v, want nil", i, err) continue } - if id != info.ID { - t.Errorf("#%d: id = %d, want %d", i, id, info.ID) + if !reflect.DeepEqual(metadata, []byte("metadata")) { + t.Errorf("#%d: metadata = %s, want %s", i, metadata, "metadata") } for j, e := range entries { if e.Index != uint64(j+i) { @@ -346,7 +330,7 @@ func TestOpenAtUncommittedIndex(t *testing.T) { } defer os.RemoveAll(p) - w, err := Create(p) + w, err := Create(p, nil) if err != nil { t.Fatal(err) }