diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index e4f63a7f0..07baad0d8 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -10,7 +10,7 @@ It has these top-level messages: Request - Info + Metadata */ package etcdserverpb @@ -52,14 +52,14 @@ func (m *Request) Reset() { *m = Request{} } func (m *Request) String() string { return proto.CompactTextString(m) } func (*Request) ProtoMessage() {} -type Info struct { - ID uint64 `protobuf:"varint,1,req" json:"ID"` +type Metadata struct { + NodeID uint64 `protobuf:"varint,1,req" json:"NodeID"` XXX_unrecognized []byte `json:"-"` } -func (m *Info) Reset() { *m = Info{} } -func (m *Info) String() string { return proto.CompactTextString(m) } -func (*Info) ProtoMessage() {} +func (m *Metadata) Reset() { *m = Metadata{} } +func (m *Metadata) String() string { return proto.CompactTextString(m) } +func (*Metadata) ProtoMessage() {} func init() { } @@ -388,7 +388,7 @@ func (m *Request) Unmarshal(data []byte) error { } return nil } -func (m *Info) Unmarshal(data []byte) error { +func (m *Metadata) Unmarshal(data []byte) error { l := len(data) index := 0 for index < l { @@ -417,7 +417,7 @@ func (m *Info) Unmarshal(data []byte) error { } b := data[index] index++ - m.ID |= (uint64(b) & 0x7F) << shift + m.NodeID |= (uint64(b) & 0x7F) << shift if b < 0x80 { break } @@ -475,10 +475,10 @@ func (m *Request) Size() (n int) { } return n } -func (m *Info) Size() (n int) { +func (m *Metadata) Size() (n int) { var l int _ = l - n += 1 + sovEtcdserver(uint64(m.ID)) + n += 1 + sovEtcdserver(uint64(m.NodeID)) if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -609,7 +609,7 @@ func (m *Request) MarshalTo(data []byte) (n int, err error) { } return i, nil } -func (m *Info) Marshal() (data []byte, err error) { +func (m *Metadata) Marshal() (data []byte, err error) { size := m.Size() data = make([]byte, size) n, err := m.MarshalTo(data) @@ -619,14 +619,14 @@ func (m *Info) Marshal() (data []byte, err error) { return data[:n], nil } -func (m *Info) MarshalTo(data []byte) (n int, err error) { +func (m *Metadata) MarshalTo(data []byte) (n int, err error) { var i int _ = i var l int _ = l data[i] = 0x8 i++ - i = encodeVarintEtcdserver(data, i, uint64(m.ID)) + i = encodeVarintEtcdserver(data, i, uint64(m.NodeID)) if m.XXX_unrecognized != nil { i += copy(data[i:], m.XXX_unrecognized) } diff --git a/etcdserver/etcdserverpb/etcdserver.proto b/etcdserver/etcdserverpb/etcdserver.proto index 9c53b81e2..0cf1da9df 100644 --- a/etcdserver/etcdserverpb/etcdserver.proto +++ b/etcdserver/etcdserverpb/etcdserver.proto @@ -26,6 +26,6 @@ message Request { required bool Stream = 16 [(gogoproto.nullable) = false]; } -message Info { - required uint64 ID = 1 [(gogoproto.nullable) = false]; +message Metadata { + required uint64 NodeID = 1 [(gogoproto.nullable) = false]; } diff --git a/etcdserver/server.go b/etcdserver/server.go index 5070c7ee3..e7603ec1d 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -12,6 +12,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" "github.com/coreos/etcd/discovery" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" @@ -384,11 +385,7 @@ func (s *EtcdServer) sync(timeout time.Duration) { ID: GenID(), Time: time.Now().UnixNano(), } - data, err := req.Marshal() - if err != nil { - log.Printf("marshal request %#v error: %v", req, err) - return - } + data := pbutil.MustMarshal(&req) // There is no promise that node has leader when do SYNC request, // so it uses goroutine to propose. go func() { @@ -447,15 +444,11 @@ func (s *EtcdServer) apply(es []raftpb.Entry) uint64 { switch e.Type { case raftpb.EntryNormal: var r pb.Request - if err := r.Unmarshal(e.Data); err != nil { - panic("TODO: this is bad, what do we do about it?") - } + pbutil.MustUnmarshal(&r, e.Data) s.w.Trigger(r.ID, s.applyRequest(r)) case raftpb.EntryConfChange: var cc raftpb.ConfChange - if err := cc.Unmarshal(e.Data); err != nil { - panic("TODO: this is bad, what do we do about it?") - } + pbutil.MustUnmarshal(&cc, e.Data) s.applyConfChange(cc) s.w.Trigger(cc.ID, nil) default: @@ -541,12 +534,9 @@ func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) { } func startNode(cfg *ServerConfig) (n raft.Node, w *wal.WAL) { - i := pb.Info{ID: cfg.ID()} - b, err := i.Marshal() - if err != nil { - log.Fatal(err) - } - if w, err = wal.Create(cfg.WALDir(), b); err != nil { + var err error + metadata := pbutil.MustMarshal(&pb.Metadata{NodeID: cfg.ID()}) + if w, err = wal.Create(cfg.WALDir(), metadata); err != nil { log.Fatal(err) } ids := cfg.Cluster.IDs() @@ -568,15 +558,15 @@ func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (n if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil { log.Fatal(err) } - md, st, ents, err := w.ReadAll() + wmetadata, st, ents, err := w.ReadAll() if err != nil { log.Fatal(err) } - var info pb.Info - if err := info.Unmarshal(md); err != nil { - log.Fatal(err) - } - n = raft.RestartNode(info.ID, 10, 1, snapshot, st, ents) + + var metadata pb.Metadata + pbutil.MustUnmarshal(&metadata, wmetadata) + + n = raft.RestartNode(metadata.NodeID, 10, 1, snapshot, st, ents) return } diff --git a/pkg/pbutil/pbutil.go b/pkg/pbutil/pbutil.go new file mode 100644 index 000000000..8353278e8 --- /dev/null +++ b/pkg/pbutil/pbutil.go @@ -0,0 +1,25 @@ +package pbutil + +import "log" + +type Marshaler interface { + Marshal() (data []byte, err error) +} + +type Unmarshaler interface { + Unmarshal(data []byte) error +} + +func MustMarshal(m Marshaler) []byte { + d, err := m.Marshal() + if err != nil { + log.Panicf("pbutil: %v", err) + } + return d +} + +func MustUnmarshal(um Unmarshaler, data []byte) { + if err := um.Unmarshal(data); err != nil { + log.Panicf("pbutil: %v", err) + } +} diff --git a/wal/decoder.go b/wal/decoder.go index ebdebc525..6f82a8f50 100644 --- a/wal/decoder.go +++ b/wal/decoder.go @@ -7,6 +7,7 @@ import ( "io" "github.com/coreos/etcd/pkg/crc" + "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/wal/walpb" ) @@ -60,19 +61,13 @@ func (d *decoder) close() error { func mustUnmarshalEntry(d []byte) raftpb.Entry { var e raftpb.Entry - if err := e.Unmarshal(d); err != nil { - // crc matched, but we cannot unmarshal the struct?! - // we must be the next winner of the $1B lottery. - panic(err) - } + pbutil.MustUnmarshal(&e, d) return e } func mustUnmarshalState(d []byte) raftpb.HardState { var s raftpb.HardState - if err := s.Unmarshal(d); err != nil { - panic(err) - } + pbutil.MustUnmarshal(&s, d) return s } diff --git a/wal/wal.go b/wal/wal.go index 5f71f1fd3..cafb40007 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -26,6 +26,7 @@ import ( "reflect" "sort" + "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/wal/walpb" @@ -55,8 +56,8 @@ var ( // A just opened WAL is in read mode, and ready for reading records. // The WAL will be ready for appending after reading out all the previous records. type WAL struct { - dir string // the living directory of the underlay files - md []byte // metadata recorded at the head of each WAL + dir string // the living directory of the underlay files + metadata []byte // metadata recorded at the head of each WAL ri uint64 // index of entry to start reading decoder *decoder // decoder to decode records @@ -84,11 +85,11 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { return nil, err } w := &WAL{ - dir: dirpath, - md: metadata, - seq: 0, - f: f, - encoder: newEncoder(f, 0), + dir: dirpath, + metadata: metadata, + seq: 0, + f: f, + encoder: newEncoder(f, 0), } if err := w.saveCrc(0); err != nil { return nil, err @@ -208,7 +209,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. w.decoder.close() w.ri = 0 - w.md = metadata + w.metadata = metadata // create encoder (chain crc with the decoder), enable appending w.encoder = newEncoder(w.f, w.decoder.lastCRC()) w.decoder = nil @@ -234,7 +235,7 @@ func (w *WAL) Cut() error { if err := w.saveCrc(prevCrc); err != nil { return err } - return w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.md}) + return w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}) } func (w *WAL) Sync() error { @@ -254,10 +255,7 @@ func (w *WAL) Close() { } func (w *WAL) SaveEntry(e *raftpb.Entry) error { - b, err := e.Marshal() - if err != nil { - panic(err) - } + b := pbutil.MustMarshal(e) rec := &walpb.Record{Type: entryType, Data: b} if err := w.encoder.encode(rec); err != nil { return err @@ -270,10 +268,7 @@ func (w *WAL) SaveState(s *raftpb.HardState) error { if raft.IsEmptyHardState(*s) { return nil } - b, err := s.Marshal() - if err != nil { - panic(err) - } + b := pbutil.MustMarshal(s) rec := &walpb.Record{Type: stateType, Data: b} return w.encoder.encode(rec) }