From 225e618b8f12e5fc80844ba74bc66b2952bc69cc Mon Sep 17 00:00:00 2001 From: Blake Mizerany Date: Tue, 26 Aug 2014 16:19:49 -0700 Subject: [PATCH] raft: add Id to Entry --- raft/entry.pb.go | 21 +++++++++++++++++++++ raft/entry.proto | 1 + raft/example_test.go | 8 ++++---- raft/node.go | 4 ++-- raft/state.pb.go | 35 +++++++++++++---------------------- 5 files changed, 41 insertions(+), 28 deletions(-) diff --git a/raft/entry.pb.go b/raft/entry.pb.go index 0d6ff698c..48d1b19f3 100644 --- a/raft/entry.pb.go +++ b/raft/entry.pb.go @@ -7,6 +7,7 @@ It is generated from these files: entry.proto + state.proto It has these top-level messages: Entry @@ -32,6 +33,7 @@ type Entry struct { Term int64 `protobuf:"varint,2,req,name=term" json:"term"` Index int64 `protobuf:"varint,3,req,name=index" json:"index"` Data []byte `protobuf:"bytes,4,opt,name=data" json:"data,omitempty"` + Id int64 `protobuf:"varint,5,req,name=id" json:"id"` XXX_unrecognized []byte `json:"-"` } @@ -127,6 +129,21 @@ func (m *Entry) Unmarshal(data []byte) error { } m.Data = append(m.Data, data[index:postIndex]...) index = postIndex + case 5: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.Id |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: var sizeOfWire int for { @@ -160,6 +177,7 @@ func (m *Entry) Size() (n int) { l = len(m.Data) n += 1 + l + sovEntry(uint64(l)) } + n += 1 + sovEntry(uint64(m.Id)) if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -209,6 +227,9 @@ func (m *Entry) MarshalTo(data []byte) (n int, err error) { i = encodeVarintEntry(data, i, uint64(len(m.Data))) i += copy(data[i:], m.Data) } + data[i] = 0x28 + i++ + i = encodeVarintEntry(data, i, uint64(m.Id)) if m.XXX_unrecognized != nil { i += copy(data[i:], m.XXX_unrecognized) } diff --git a/raft/entry.proto b/raft/entry.proto index b9606c3fa..57252e728 100644 --- a/raft/entry.proto +++ b/raft/entry.proto @@ -12,4 +12,5 @@ message Entry { required int64 term = 2 [(gogoproto.nullable) = false]; required int64 index = 3 [(gogoproto.nullable) = false]; optional bytes data = 4; + required int64 id = 5 [(gogoproto.nullable) = false]; } diff --git a/raft/example_test.go b/raft/example_test.go index aef6758e6..c8aa75acb 100644 --- a/raft/example_test.go +++ b/raft/example_test.go @@ -18,12 +18,12 @@ func Example_Node() { // ReadState blocks until there is new state ready. rd := <-n.Ready() if !prev.Equal(rd.State) { - saveStateToDisk(st) + saveStateToDisk(rd.State) prev = rd.State } - saveToDisk(ents) - go applyToStore(cents) - sendMessages(msgs) + saveToDisk(rd.Entries) + go applyToStore(rd.CommittedEntries) + sendMessages(rd.Messages) } } diff --git a/raft/node.go b/raft/node.go index 272a8a5f9..a2a4c9f00 100644 --- a/raft/node.go +++ b/raft/node.go @@ -110,8 +110,8 @@ func (n *Node) Tick() error { } // Propose proposes data be appended to the log. -func (n *Node) Propose(ctx context.Context, data []byte) error { - return n.Step(ctx, []Message{{Type: msgProp, Entries: []Entry{{Data: data}}}}) +func (n *Node) Propose(ctx context.Context, id int64, data []byte) error { + return n.Step(ctx, []Message{{Type: msgProp, Entries: []Entry{{Id: id, Data: data}}}}) } // Step advances the state machine using msgs. Proposals are priotized last so diff --git a/raft/state.pb.go b/raft/state.pb.go index 4152a7a84..5824dc7dd 100644 --- a/raft/state.pb.go +++ b/raft/state.pb.go @@ -2,15 +2,6 @@ // source: state.proto // DO NOT EDIT! -/* - Package raft is a generated protocol buffer package. - - It is generated from these files: - state.proto - - It has these top-level messages: - State -*/ package raft import proto "code.google.com/p/gogoprotobuf/proto" @@ -19,8 +10,8 @@ import math "math" // discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo.pb" -import io "io" -import code_google_com_p_gogoprotobuf_proto "code.google.com/p/gogoprotobuf/proto" +import io1 "io" +import code_google_com_p_gogoprotobuf_proto1 "code.google.com/p/gogoprotobuf/proto" // Reference proto, json, and math imports to suppress error if they are not otherwise used. var _ = proto.Marshal @@ -48,7 +39,7 @@ func (m *State) Unmarshal(data []byte) error { var wire uint64 for shift := uint(0); ; shift += 7 { if index >= l { - return io.ErrUnexpectedEOF + return io1.ErrUnexpectedEOF } b := data[index] index++ @@ -62,11 +53,11 @@ func (m *State) Unmarshal(data []byte) error { switch fieldNum { case 1: if wireType != 0 { - return code_google_com_p_gogoprotobuf_proto.ErrWrongType + return code_google_com_p_gogoprotobuf_proto1.ErrWrongType } for shift := uint(0); ; shift += 7 { if index >= l { - return io.ErrUnexpectedEOF + return io1.ErrUnexpectedEOF } b := data[index] index++ @@ -77,11 +68,11 @@ func (m *State) Unmarshal(data []byte) error { } case 2: if wireType != 0 { - return code_google_com_p_gogoprotobuf_proto.ErrWrongType + return code_google_com_p_gogoprotobuf_proto1.ErrWrongType } for shift := uint(0); ; shift += 7 { if index >= l { - return io.ErrUnexpectedEOF + return io1.ErrUnexpectedEOF } b := data[index] index++ @@ -92,11 +83,11 @@ func (m *State) Unmarshal(data []byte) error { } case 3: if wireType != 0 { - return code_google_com_p_gogoprotobuf_proto.ErrWrongType + return code_google_com_p_gogoprotobuf_proto1.ErrWrongType } for shift := uint(0); ; shift += 7 { if index >= l { - return io.ErrUnexpectedEOF + return io1.ErrUnexpectedEOF } b := data[index] index++ @@ -107,11 +98,11 @@ func (m *State) Unmarshal(data []byte) error { } case 4: if wireType != 0 { - return code_google_com_p_gogoprotobuf_proto.ErrWrongType + return code_google_com_p_gogoprotobuf_proto1.ErrWrongType } for shift := uint(0); ; shift += 7 { if index >= l { - return io.ErrUnexpectedEOF + return io1.ErrUnexpectedEOF } b := data[index] index++ @@ -130,12 +121,12 @@ func (m *State) Unmarshal(data []byte) error { } } index -= sizeOfWire - skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:]) + skippy, err := code_google_com_p_gogoprotobuf_proto1.Skip(data[index:]) if err != nil { return err } if (index + skippy) > l { - return io.ErrUnexpectedEOF + return io1.ErrUnexpectedEOF } m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...) index += skippy