Merge pull request #2044 from yichengq/278

wal: record mark when snapshotting
This commit is contained in:
Yicheng Qin 2015-01-07 08:26:33 -08:00
commit 6b237416e1
11 changed files with 264 additions and 62 deletions

View File

@ -28,6 +28,7 @@ import (
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
)
func NewBackupCommand() cli.Command {
@ -57,16 +58,16 @@ func handleBackup(c *cli.Context) {
if err != nil && err != snap.ErrNoSnapshot {
log.Fatal(err)
}
var index uint64
var walsnap walpb.Snapshot
if snapshot != nil {
index = snapshot.Metadata.Index
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
newss := snap.New(destSnap)
if err := newss.SaveSnap(*snapshot); err != nil {
log.Fatal(err)
}
}
w, err := wal.OpenNotInUse(srcWAL, index)
w, err := wal.OpenNotInUse(srcWAL, walsnap)
if err != nil {
log.Fatal(err)
}

View File

@ -26,10 +26,15 @@ import (
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
)
func restartAsStandaloneNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
w, id, cid, st, ents := readWAL(cfg.WALDir(), index)
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
cfg.Cluster.SetID(cid)
// discard the previously uncommitted entries

View File

@ -46,6 +46,7 @@ import (
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/store"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
)
@ -222,7 +223,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if cfg.ShouldDiscover() {
log.Printf("etcdserver: discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
}
var index uint64
snapshot, err := ss.Load()
if err != nil && err != snap.ErrNoSnapshot {
return nil, err
@ -232,7 +232,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
log.Panicf("etcdserver: recovered store from snapshot error: %v", err)
}
log.Printf("etcdserver: recovered store from snapshot at index %d", snapshot.Metadata.Index)
index = snapshot.Metadata.Index
}
cfg.Cluster = NewClusterFromStore(cfg.Cluster.token, st)
cfg.Print()
@ -240,9 +239,9 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
log.Printf("etcdserver: loaded cluster information from store: %s", cfg.Cluster)
}
if !cfg.ForceNewCluster {
id, n, s, w = restartNode(cfg, index+1, snapshot)
id, n, s, w = restartNode(cfg, snapshot)
} else {
id, n, s, w = restartAsStandaloneNode(cfg, index+1, snapshot)
id, n, s, w = restartAsStandaloneNode(cfg, snapshot)
}
default:
return nil, fmt.Errorf("unsupported bootstrap config")
@ -876,8 +875,12 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *
return
}
func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
w, id, cid, st, ents := readWAL(cfg.WALDir(), index)
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
cfg.Cluster.SetID(cid)
log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)

View File

@ -10,6 +10,7 @@ import (
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
)
type Storage interface {
@ -43,6 +44,14 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
if err != nil {
return err
}
walsnap := walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
}
err = st.WAL.SaveSnapshot(walsnap)
if err != nil {
return err
}
err = st.WAL.ReleaseLockTo(snap.Metadata.Index)
if err != nil {
return err
@ -50,9 +59,9 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
return nil
}
func readWAL(waldir string, index uint64) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
var err error
if w, err = wal.Open(waldir, index); err != nil {
if w, err = wal.Open(waldir, snap); err != nil {
log.Fatalf("etcdserver: open wal error: %v", err)
}
var wmetadata []byte

View File

@ -12,6 +12,7 @@ import (
raftpb "github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
)
func snapDir4(dataDir string) string {
@ -106,13 +107,18 @@ func Migrate4To2(dataDir string, name string) error {
log.Printf("Log migration successful")
// migrate snapshot (if necessary) and logs
var walsnap walpb.Snapshot
if snap2 != nil {
walsnap.Index, walsnap.Term = snap2.Metadata.Index, snap2.Metadata.Term
ss := snap.New(sd2)
if err := ss.SaveSnap(*snap2); err != nil {
return err
}
log.Printf("Snapshot migration successful")
}
if err = w.SaveSnapshot(walsnap); err != nil {
return err
}
return nil
}

View File

@ -13,6 +13,7 @@ import (
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
)
func main() {
@ -24,20 +25,20 @@ func main() {
ss := snap.New(snapDir(*from))
snapshot, err := ss.Load()
var index uint64
var walsnap walpb.Snapshot
switch err {
case nil:
index = snapshot.Metadata.Index
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
nodes := genIDSlice(snapshot.Metadata.ConfState.Nodes)
fmt.Printf("Snapshot:\nterm=%d index=%d nodes=%s\n",
snapshot.Metadata.Term, index, nodes)
walsnap.Term, walsnap.Index, nodes)
case snap.ErrNoSnapshot:
fmt.Printf("Snapshot:\nempty\n")
default:
log.Fatalf("Failed loading snapshot: %v", err)
}
w, err := wal.Open(walDir(*from), index+1)
w, err := wal.Open(walDir(*from), walsnap)
if err != nil {
log.Fatalf("Failed opening WAL: %v", err)
}

View File

@ -27,6 +27,11 @@ to it with the Save method:
...
err := w.Save(s, ents)
After saving an raft snapshot to disk, SaveSnapshot method should be called to
record it. So WAL can match with the saved snapshot when restarting.
err := w.SaveSnapshot(walpb.Snapshot{Index: 10, Term: 2})
When a user has finished using a WAL it must be closed:
w.Close()
@ -46,21 +51,20 @@ Cut has been called on this WAL then the sequence will increment from 0x0 to
Cut issues 0x10 entries with incremental index later then the file will be called:
0000000000000002-0000000000000031.wal.
At a later time a WAL can be opened at a particular raft index:
At a later time a WAL can be opened at a particular snapshot. If there is no
snapshot, an empty snapshot should be passed in.
w, err := wal.Open("/var/lib/etcd", 0)
w, err := wal.Open("/var/lib/etcd", walpb.Snapshot{Index: 10, Term: 2})
...
The raft index must have been written to the WAL. When opening without a
snapshot the raft index should always be 0. When opening with a snapshot
the raft index should be the index of the last entry covered by the snapshot.
The snapshot must have been written to the WAL.
Additional items cannot be Saved to this WAL until all of the items from 0 to
the end of the WAL are read first:
Additional items cannot be Saved to this WAL until all of the items from the given
snapshot to the end of the WAL are read first:
id, state, ents, err := w.ReadAll()
metadata, state, ents, err := w.ReadAll()
This will give you the raft node id, the last raft.State and the slice of
This will give you the metadata, the last raft.State and the slice of
raft.Entry items in the log.
*/

View File

@ -38,6 +38,7 @@ const (
entryType
stateType
crcType
snapshotType
// the owner can make/remove files inside the directory
privateDirMode = 0700
@ -47,6 +48,8 @@ var (
ErrMetadataConflict = errors.New("wal: conflicting metadata found")
ErrFileNotFound = errors.New("wal: file not found")
ErrCRCMismatch = errors.New("wal: crc mismatch")
ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
ErrSnapshotNotFound = errors.New("wal: snapshot not found")
crcTable = crc32.MakeTable(crc32.Castagnoli)
)
@ -60,8 +63,8 @@ type WAL struct {
metadata []byte // metadata recorded at the head of each WAL
state raftpb.HardState // hardstate recorded at the head of WAL
ri uint64 // index of entry to start reading
decoder *decoder // decoder to decode records
start walpb.Snapshot // snapshot to start reading
decoder *decoder // decoder to decode records
f *os.File // underlay file opened for appending, sync
seq uint64 // sequence of the wal file currently used for writes
@ -110,29 +113,29 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
return nil, err
}
if err = w.sync(); err != nil {
if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
return nil, err
}
return w, nil
}
// Open opens the WAL at the given index.
// The index SHOULD have been previously committed to the WAL, or the following
// Open opens the WAL at the given snap.
// The snap SHOULD have been previously saved to the WAL, or the following
// ReadAll will fail.
// The returned WAL is ready to read and the first record will be the given
// index. The WAL cannot be appended to before reading out all of its
// The returned WAL is ready to read and the first record will be the one after
// the given snap. The WAL cannot be appended to before reading out all of its
// previous records.
func Open(dirpath string, index uint64) (*WAL, error) {
return openAtIndex(dirpath, index, true)
func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
return openAtIndex(dirpath, snap, true)
}
// OpenNotInUse only opens the wal files that are not in use.
// Other than that, it is similar to Open.
func OpenNotInUse(dirpath string, index uint64) (*WAL, error) {
return openAtIndex(dirpath, index, false)
func OpenNotInUse(dirpath string, snap walpb.Snapshot) (*WAL, error) {
return openAtIndex(dirpath, snap, false)
}
func openAtIndex(dirpath string, index uint64, all bool) (*WAL, error) {
func openAtIndex(dirpath string, snap walpb.Snapshot, all bool) (*WAL, error) {
names, err := fileutil.ReadDir(dirpath)
if err != nil {
return nil, err
@ -142,7 +145,7 @@ func openAtIndex(dirpath string, index uint64, all bool) (*WAL, error) {
return nil, ErrFileNotFound
}
nameIndex, ok := searchIndex(names, index)
nameIndex, ok := searchIndex(names, snap.Index)
if !ok || !isValidSeq(names[nameIndex:]) {
return nil, ErrFileNotFound
}
@ -189,7 +192,7 @@ func openAtIndex(dirpath string, index uint64, all bool) (*WAL, error) {
// create a WAL ready for reading
w := &WAL{
dir: dirpath,
ri: index,
start: snap,
decoder: newDecoder(rc),
f: f,
@ -200,18 +203,23 @@ func openAtIndex(dirpath string, index uint64, all bool) (*WAL, error) {
}
// ReadAll reads out all records of the current WAL.
// If it cannot read out the expected entry, it will return ErrIndexNotFound.
// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
// If loaded snap doesn't match with the expected one, it will return
// ErrSnapshotMismatch.
// TODO: detect not-last-snap error.
// TODO: maybe loose the checking of match.
// After ReadAll, the WAL will be ready for appending new records.
func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
rec := &walpb.Record{}
decoder := w.decoder
var match bool
for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
switch rec.Type {
case entryType:
e := mustUnmarshalEntry(rec.Data)
if e.Index >= w.ri {
ents = append(ents[:e.Index-w.ri], e)
if e.Index > w.start.Index {
ents = append(ents[:e.Index-w.start.Index-1], e)
}
w.enti = e.Index
case stateType:
@ -231,6 +239,16 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
return nil, state, nil, ErrCRCMismatch
}
decoder.updateCRC(rec.Crc)
case snapshotType:
var snap walpb.Snapshot
pbutil.MustUnmarshal(&snap, rec.Data)
if snap.Index == w.start.Index {
if snap.Term != w.start.Term {
state.Reset()
return nil, state, nil, ErrSnapshotMismatch
}
match = true
}
default:
state.Reset()
return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
@ -240,10 +258,14 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
state.Reset()
return nil, state, nil, err
}
if !match {
state.Reset()
return nil, state, nil, ErrSnapshotNotFound
}
// close decoder, disable reading
w.decoder.close()
w.ri = 0
w.start = walpb.Snapshot{}
w.metadata = metadata
// create encoder (chain crc with the decoder), enable appending
@ -374,6 +396,19 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
return w.sync()
}
func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
b := pbutil.MustMarshal(&e)
rec := &walpb.Record{Type: snapshotType, Data: b}
if err := w.encoder.encode(rec); err != nil {
return err
}
// update enti only when snapshot is ahead of last index
if w.enti < e.Index {
w.enti = e.Index
}
return w.sync()
}
func (w *WAL) saveCrc(prevCrc uint32) error {
return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
}

View File

@ -24,6 +24,7 @@ import (
"reflect"
"testing"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/wal/walpb"
)
@ -58,6 +59,13 @@ func TestNew(t *testing.T) {
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
r := &walpb.Record{
Type: snapshotType,
Data: pbutil.MustMarshal(&walpb.Snapshot{}),
}
if err = e.encode(r); err != nil {
t.Fatalf("err = %v, want nil", err)
}
e.flush()
if !reflect.DeepEqual(gd, wb.Bytes()) {
t.Errorf("data = %v, want %v", gd, wb.Bytes())
@ -90,7 +98,7 @@ func TestOpenAtIndex(t *testing.T) {
}
f.Close()
w, err := Open(dir, 0)
w, err := Open(dir, walpb.Snapshot{})
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
@ -109,7 +117,7 @@ func TestOpenAtIndex(t *testing.T) {
}
f.Close()
w, err = Open(dir, 5)
w, err = Open(dir, walpb.Snapshot{Index: 5})
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
@ -126,7 +134,7 @@ func TestOpenAtIndex(t *testing.T) {
t.Fatal(err)
}
defer os.RemoveAll(emptydir)
if _, err = Open(emptydir, 0); err != ErrFileNotFound {
if _, err = Open(emptydir, walpb.Snapshot{}); err != ErrFileNotFound {
t.Errorf("err = %v, want %v", err, ErrFileNotFound)
}
}
@ -168,6 +176,10 @@ func TestCut(t *testing.T) {
if err := w.Cut(); err != nil {
t.Fatal(err)
}
snap := walpb.Snapshot{Index: 2, Term: 1}
if err := w.SaveSnapshot(snap); err != nil {
t.Fatal(err)
}
wname = walName(2, 2)
if g := path.Base(w.f.Name()); g != wname {
t.Errorf("name = %s, want %s", g, wname)
@ -183,7 +195,7 @@ func TestCut(t *testing.T) {
defer f.Close()
nw := &WAL{
decoder: newDecoder(f),
ri: 2,
start: snap,
}
_, gst, _, err := nw.ReadAll()
if err != nil {
@ -205,7 +217,10 @@ func TestRecover(t *testing.T) {
if err != nil {
t.Fatal(err)
}
ents := []raftpb.Entry{{Index: 0, Term: 0}, {Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}}
if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil {
t.Fatal(err)
}
ents := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}}
for _, e := range ents {
if err = w.SaveEntry(&e); err != nil {
t.Fatal(err)
@ -219,7 +234,7 @@ func TestRecover(t *testing.T) {
}
w.Close()
if w, err = Open(p, 0); err != nil {
if w, err = Open(p, walpb.Snapshot{}); err != nil {
t.Fatal(err)
}
metadata, state, entries, err := w.ReadAll()
@ -319,14 +334,10 @@ func TestRecoverAfterCut(t *testing.T) {
if err != nil {
t.Fatal(err)
}
// TODO(unihorn): remove this when cut can operate on an empty file
if err = w.SaveEntry(&raftpb.Entry{}); err != nil {
t.Fatal(err)
}
if err = w.Cut(); err != nil {
t.Fatal(err)
}
for i := 1; i < 10; i++ {
for i := 0; i < 10; i++ {
if err = w.SaveSnapshot(walpb.Snapshot{Index: uint64(i)}); err != nil {
t.Fatal(err)
}
e := raftpb.Entry{Index: uint64(i)}
if err = w.SaveEntry(&e); err != nil {
t.Fatal(err)
@ -342,7 +353,7 @@ func TestRecoverAfterCut(t *testing.T) {
}
for i := 0; i < 10; i++ {
w, err := Open(p, uint64(i))
w, err := Open(p, walpb.Snapshot{Index: uint64(i)})
if err != nil {
if i <= 4 {
if err != ErrFileNotFound {
@ -362,8 +373,8 @@ func TestRecoverAfterCut(t *testing.T) {
t.Errorf("#%d: metadata = %s, want %s", i, metadata, "metadata")
}
for j, e := range entries {
if e.Index != uint64(j+i) {
t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i)
if e.Index != uint64(j+i+1) {
t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i+1)
}
}
w.Close()
@ -381,12 +392,15 @@ func TestOpenAtUncommittedIndex(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil {
t.Fatal(err)
}
if err := w.SaveEntry(&raftpb.Entry{Index: 0}); err != nil {
t.Fatal(err)
}
w.Close()
w, err = Open(p, 1)
w, err = Open(p, walpb.Snapshot{})
if err != nil {
t.Fatal(err)
}

View File

@ -10,6 +10,7 @@
It has these top-level messages:
Record
Snapshot
*/
package walpb
@ -38,6 +39,16 @@ func (m *Record) Reset() { *m = Record{} }
func (m *Record) String() string { return proto.CompactTextString(m) }
func (*Record) ProtoMessage() {}
type Snapshot struct {
Index uint64 `protobuf:"varint,1,req,name=index" json:"index"`
Term uint64 `protobuf:"varint,2,req,name=term" json:"term"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Snapshot) Reset() { *m = Snapshot{} }
func (m *Snapshot) String() string { return proto.CompactTextString(m) }
func (*Snapshot) ProtoMessage() {}
func init() {
}
func (m *Record) Unmarshal(data []byte) error {
@ -134,6 +145,78 @@ func (m *Record) Unmarshal(data []byte) error {
}
return nil
}
func (m *Snapshot) Unmarshal(data []byte) error {
l := len(data)
index := 0
for index < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
switch fieldNum {
case 1:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.Index |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
m.Term |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default:
var sizeOfWire int
for {
sizeOfWire++
wire >>= 7
if wire == 0 {
break
}
}
index -= sizeOfWire
skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
if err != nil {
return err
}
if (index + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
index += skippy
}
}
return nil
}
func (m *Record) Size() (n int) {
var l int
_ = l
@ -148,6 +231,16 @@ func (m *Record) Size() (n int) {
}
return n
}
func (m *Snapshot) Size() (n int) {
var l int
_ = l
n += 1 + sovRecord(uint64(m.Index))
n += 1 + sovRecord(uint64(m.Term))
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovRecord(x uint64) (n int) {
for {
@ -194,6 +287,32 @@ func (m *Record) MarshalTo(data []byte) (n int, err error) {
}
return i, nil
}
func (m *Snapshot) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
if err != nil {
return nil, err
}
return data[:n], nil
}
func (m *Snapshot) MarshalTo(data []byte) (n int, err error) {
var i int
_ = i
var l int
_ = l
data[i] = 0x8
i++
i = encodeVarintRecord(data, i, uint64(m.Index))
data[i] = 0x10
i++
i = encodeVarintRecord(data, i, uint64(m.Term))
if m.XXX_unrecognized != nil {
i += copy(data[i:], m.XXX_unrecognized)
}
return i, nil
}
func encodeFixed64Record(data []byte, offset int, v uint64) int {
data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8)

View File

@ -12,3 +12,8 @@ message Record {
required uint32 crc = 2 [(gogoproto.nullable) = false];
optional bytes data = 3;
}
message Snapshot {
required uint64 index = 1 [(gogoproto.nullable) = false];
required uint64 term = 2 [(gogoproto.nullable) = false];
}