diff --git a/etcdctl/command/backup_command.go b/etcdctl/command/backup_command.go index 0f52c9a01..d9cdfd1a2 100644 --- a/etcdctl/command/backup_command.go +++ b/etcdctl/command/backup_command.go @@ -28,6 +28,7 @@ import ( "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/snap" "github.com/coreos/etcd/wal" + "github.com/coreos/etcd/wal/walpb" ) func NewBackupCommand() cli.Command { @@ -57,16 +58,16 @@ func handleBackup(c *cli.Context) { if err != nil && err != snap.ErrNoSnapshot { log.Fatal(err) } - var index uint64 + var walsnap walpb.Snapshot if snapshot != nil { - index = snapshot.Metadata.Index + walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term newss := snap.New(destSnap) if err := newss.SaveSnap(*snapshot); err != nil { log.Fatal(err) } } - w, err := wal.OpenNotInUse(srcWAL, index) + w, err := wal.OpenNotInUse(srcWAL, walsnap) if err != nil { log.Fatal(err) } diff --git a/etcdserver/force_cluster.go b/etcdserver/force_cluster.go index f77da48a5..810ed1c31 100644 --- a/etcdserver/force_cluster.go +++ b/etcdserver/force_cluster.go @@ -26,10 +26,15 @@ import ( "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/wal" + "github.com/coreos/etcd/wal/walpb" ) -func restartAsStandaloneNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) { - w, id, cid, st, ents := readWAL(cfg.WALDir(), index) +func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) { + var walsnap walpb.Snapshot + if snapshot != nil { + walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term + } + w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap) cfg.Cluster.SetID(cid) // discard the previously uncommitted entries diff --git a/etcdserver/server.go b/etcdserver/server.go index 0983101e9..3094a17ea 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -46,6 +46,7 @@ import ( "github.com/coreos/etcd/snap" "github.com/coreos/etcd/store" "github.com/coreos/etcd/wal" + "github.com/coreos/etcd/wal/walpb" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" ) @@ -219,7 +220,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if cfg.ShouldDiscover() { log.Printf("etcdserver: discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir()) } - var index uint64 snapshot, err := ss.Load() if err != nil && err != snap.ErrNoSnapshot { return nil, err @@ -229,7 +229,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { log.Panicf("etcdserver: recovered store from snapshot error: %v", err) } log.Printf("etcdserver: recovered store from snapshot at index %d", snapshot.Metadata.Index) - index = snapshot.Metadata.Index } cfg.Cluster = NewClusterFromStore(cfg.Cluster.token, st) cfg.Print() @@ -237,9 +236,9 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { log.Printf("etcdserver: loaded cluster information from store: %s", cfg.Cluster) } if !cfg.ForceNewCluster { - id, n, s, w = restartNode(cfg, index+1, snapshot) + id, n, s, w = restartNode(cfg, snapshot) } else { - id, n, s, w = restartAsStandaloneNode(cfg, index+1, snapshot) + id, n, s, w = restartAsStandaloneNode(cfg, snapshot) } default: return nil, fmt.Errorf("unsupported bootstrap config") @@ -860,6 +859,9 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s * if w, err = wal.Create(cfg.WALDir(), metadata); err != nil { log.Fatalf("etcdserver: create wal error: %v", err) } + if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil { + log.Fatalf("etcdserver: save empty snapshot error: %v", err) + } peers := make([]raft.Peer, len(ids)) for i, id := range ids { ctx, err := json.Marshal((*cfg.Cluster).Member(id)) @@ -875,8 +877,12 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s * return } -func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) { - w, id, cid, st, ents := readWAL(cfg.WALDir(), index) +func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) { + var walsnap walpb.Snapshot + if snapshot != nil { + walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term + } + w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap) cfg.Cluster.SetID(cid) log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit) diff --git a/etcdserver/storage.go b/etcdserver/storage.go index 9f1f5cc6d..978151b2a 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -10,6 +10,7 @@ import ( "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" "github.com/coreos/etcd/wal" + "github.com/coreos/etcd/wal/walpb" ) type Storage interface { @@ -43,6 +44,14 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error { if err != nil { return err } + walsnap := walpb.Snapshot{ + Index: snap.Metadata.Index, + Term: snap.Metadata.Term, + } + err = st.WAL.SaveSnapshot(walsnap) + if err != nil { + return err + } err = st.WAL.ReleaseLockTo(snap.Metadata.Index) if err != nil { return err @@ -50,9 +59,9 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error { return nil } -func readWAL(waldir string, index uint64) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) { +func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) { var err error - if w, err = wal.Open(waldir, index); err != nil { + if w, err = wal.Open(waldir, snap); err != nil { log.Fatalf("etcdserver: open wal error: %v", err) } var wmetadata []byte diff --git a/migrate/etcd4.go b/migrate/etcd4.go index 4c1baf508..03be69db4 100644 --- a/migrate/etcd4.go +++ b/migrate/etcd4.go @@ -12,6 +12,7 @@ import ( raftpb "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" "github.com/coreos/etcd/wal" + "github.com/coreos/etcd/wal/walpb" ) func snapDir4(dataDir string) string { @@ -106,13 +107,18 @@ func Migrate4To2(dataDir string, name string) error { log.Printf("Log migration successful") // migrate snapshot (if necessary) and logs + var walsnap walpb.Snapshot if snap2 != nil { + walsnap.Index, walsnap.Term = snap2.Metadata.Index, snap2.Metadata.Term ss := snap.New(sd2) if err := ss.SaveSnap(*snap2); err != nil { return err } log.Printf("Snapshot migration successful") } + if err = w.SaveSnapshot(walsnap); err != nil { + return err + } return nil } diff --git a/tools/etcd-dump-logs/main.go b/tools/etcd-dump-logs/main.go index db87806ec..8f5812146 100644 --- a/tools/etcd-dump-logs/main.go +++ b/tools/etcd-dump-logs/main.go @@ -13,6 +13,7 @@ import ( "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" "github.com/coreos/etcd/wal" + "github.com/coreos/etcd/wal/walpb" ) func main() { @@ -24,20 +25,20 @@ func main() { ss := snap.New(snapDir(*from)) snapshot, err := ss.Load() - var index uint64 + var walsnap walpb.Snapshot switch err { case nil: - index = snapshot.Metadata.Index + walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term nodes := genIDSlice(snapshot.Metadata.ConfState.Nodes) fmt.Printf("Snapshot:\nterm=%d index=%d nodes=%s\n", - snapshot.Metadata.Term, index, nodes) + walsnap.Term, walsnap.Index, nodes) case snap.ErrNoSnapshot: fmt.Printf("Snapshot:\nempty\n") default: log.Fatalf("Failed loading snapshot: %v", err) } - w, err := wal.Open(walDir(*from), index+1) + w, err := wal.Open(walDir(*from), walsnap) if err != nil { log.Fatalf("Failed opening WAL: %v", err) } diff --git a/wal/wal.go b/wal/wal.go index 35d3cbae4..e4e47dc76 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -38,6 +38,7 @@ const ( entryType stateType crcType + snapshotType // the owner can make/remove files inside the directory privateDirMode = 0700 @@ -47,6 +48,8 @@ var ( ErrMetadataConflict = errors.New("wal: conflicting metadata found") ErrFileNotFound = errors.New("wal: file not found") ErrCRCMismatch = errors.New("wal: crc mismatch") + ErrSnapshotMismatch = errors.New("wal: snapshot mismatch") + ErrSnapshotNotFound = errors.New("wal: snapshot not found") crcTable = crc32.MakeTable(crc32.Castagnoli) ) @@ -60,8 +63,8 @@ type WAL struct { metadata []byte // metadata recorded at the head of each WAL state raftpb.HardState // hardstate recorded at the head of WAL - ri uint64 // index of entry to start reading - decoder *decoder // decoder to decode records + start walpb.Snapshot // snapshot to start reading + decoder *decoder // decoder to decode records f *os.File // underlay file opened for appending, sync seq uint64 // sequence of the wal file currently used for writes @@ -116,23 +119,23 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { return w, nil } -// Open opens the WAL at the given index. -// The index SHOULD have been previously committed to the WAL, or the following +// Open opens the WAL at the given snap. +// The snap SHOULD have been previously saved to the WAL, or the following // ReadAll will fail. -// The returned WAL is ready to read and the first record will be the given -// index. The WAL cannot be appended to before reading out all of its +// The returned WAL is ready to read and the first record will be the one after +// the given snap. The WAL cannot be appended to before reading out all of its // previous records. -func Open(dirpath string, index uint64) (*WAL, error) { - return openAtIndex(dirpath, index, true) +func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) { + return openAtIndex(dirpath, snap, true) } // OpenNotInUse only opens the wal files that are not in use. // Other than that, it is similar to Open. -func OpenNotInUse(dirpath string, index uint64) (*WAL, error) { - return openAtIndex(dirpath, index, false) +func OpenNotInUse(dirpath string, snap walpb.Snapshot) (*WAL, error) { + return openAtIndex(dirpath, snap, false) } -func openAtIndex(dirpath string, index uint64, all bool) (*WAL, error) { +func openAtIndex(dirpath string, snap walpb.Snapshot, all bool) (*WAL, error) { names, err := fileutil.ReadDir(dirpath) if err != nil { return nil, err @@ -142,7 +145,7 @@ func openAtIndex(dirpath string, index uint64, all bool) (*WAL, error) { return nil, ErrFileNotFound } - nameIndex, ok := searchIndex(names, index) + nameIndex, ok := searchIndex(names, snap.Index) if !ok || !isValidSeq(names[nameIndex:]) { return nil, ErrFileNotFound } @@ -189,7 +192,7 @@ func openAtIndex(dirpath string, index uint64, all bool) (*WAL, error) { // create a WAL ready for reading w := &WAL{ dir: dirpath, - ri: index, + start: snap, decoder: newDecoder(rc), f: f, @@ -200,18 +203,23 @@ func openAtIndex(dirpath string, index uint64, all bool) (*WAL, error) { } // ReadAll reads out all records of the current WAL. -// If it cannot read out the expected entry, it will return ErrIndexNotFound. +// If it cannot read out the expected snap, it will return ErrSnapshotNotFound. +// If loaded snap doesn't match with the expected one, it will return +// ErrSnapshotMismatch. +// TODO: detect not-last-snap error. +// TODO: maybe loose the checking of match. // After ReadAll, the WAL will be ready for appending new records. func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) { rec := &walpb.Record{} decoder := w.decoder + var match bool for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) { switch rec.Type { case entryType: e := mustUnmarshalEntry(rec.Data) - if e.Index >= w.ri { - ents = append(ents[:e.Index-w.ri], e) + if e.Index > w.start.Index { + ents = append(ents[:e.Index-w.start.Index-1], e) } w.enti = e.Index case stateType: @@ -231,6 +239,16 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. return nil, state, nil, ErrCRCMismatch } decoder.updateCRC(rec.Crc) + case snapshotType: + var snap walpb.Snapshot + pbutil.MustUnmarshal(&snap, rec.Data) + if snap.Index == w.start.Index { + if snap.Term != w.start.Term { + state.Reset() + return nil, state, nil, ErrSnapshotMismatch + } + match = true + } default: state.Reset() return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type) @@ -240,10 +258,14 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. state.Reset() return nil, state, nil, err } + if !match { + state.Reset() + return nil, state, nil, ErrSnapshotNotFound + } // close decoder, disable reading w.decoder.close() - w.ri = 0 + w.start = walpb.Snapshot{} w.metadata = metadata // create encoder (chain crc with the decoder), enable appending @@ -374,6 +396,19 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error { return w.sync() } +func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { + b := pbutil.MustMarshal(&e) + rec := &walpb.Record{Type: snapshotType, Data: b} + if err := w.encoder.encode(rec); err != nil { + return err + } + // update enti only when snapshot is ahead of last index + if w.enti < e.Index { + w.enti = e.Index + } + return w.sync() +} + func (w *WAL) saveCrc(prevCrc uint32) error { return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc}) } diff --git a/wal/wal_test.go b/wal/wal_test.go index 94734df10..98a2a3386 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -90,7 +90,7 @@ func TestOpenAtIndex(t *testing.T) { } f.Close() - w, err := Open(dir, 0) + w, err := Open(dir, walpb.Snapshot{}) if err != nil { t.Fatalf("err = %v, want nil", err) } @@ -109,7 +109,7 @@ func TestOpenAtIndex(t *testing.T) { } f.Close() - w, err = Open(dir, 5) + w, err = Open(dir, walpb.Snapshot{Index: 5}) if err != nil { t.Fatalf("err = %v, want nil", err) } @@ -126,7 +126,7 @@ func TestOpenAtIndex(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(emptydir) - if _, err = Open(emptydir, 0); err != ErrFileNotFound { + if _, err = Open(emptydir, walpb.Snapshot{}); err != ErrFileNotFound { t.Errorf("err = %v, want %v", err, ErrFileNotFound) } } @@ -168,6 +168,10 @@ func TestCut(t *testing.T) { if err := w.Cut(); err != nil { t.Fatal(err) } + snap := walpb.Snapshot{Index: 2, Term: 1} + if err := w.SaveSnapshot(snap); err != nil { + t.Fatal(err) + } wname = walName(2, 2) if g := path.Base(w.f.Name()); g != wname { t.Errorf("name = %s, want %s", g, wname) @@ -183,7 +187,7 @@ func TestCut(t *testing.T) { defer f.Close() nw := &WAL{ decoder: newDecoder(f), - ri: 2, + start: snap, } _, gst, _, err := nw.ReadAll() if err != nil { @@ -205,7 +209,10 @@ func TestRecover(t *testing.T) { if err != nil { t.Fatal(err) } - ents := []raftpb.Entry{{Index: 0, Term: 0}, {Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}} + if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil { + t.Fatal(err) + } + ents := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}} for _, e := range ents { if err = w.SaveEntry(&e); err != nil { t.Fatal(err) @@ -219,7 +226,7 @@ func TestRecover(t *testing.T) { } w.Close() - if w, err = Open(p, 0); err != nil { + if w, err = Open(p, walpb.Snapshot{}); err != nil { t.Fatal(err) } metadata, state, entries, err := w.ReadAll() @@ -319,14 +326,10 @@ func TestRecoverAfterCut(t *testing.T) { if err != nil { t.Fatal(err) } - // TODO(unihorn): remove this when cut can operate on an empty file - if err = w.SaveEntry(&raftpb.Entry{}); err != nil { - t.Fatal(err) - } - if err = w.Cut(); err != nil { - t.Fatal(err) - } - for i := 1; i < 10; i++ { + for i := 0; i < 10; i++ { + if err = w.SaveSnapshot(walpb.Snapshot{Index: uint64(i)}); err != nil { + t.Fatal(err) + } e := raftpb.Entry{Index: uint64(i)} if err = w.SaveEntry(&e); err != nil { t.Fatal(err) @@ -342,7 +345,7 @@ func TestRecoverAfterCut(t *testing.T) { } for i := 0; i < 10; i++ { - w, err := Open(p, uint64(i)) + w, err := Open(p, walpb.Snapshot{Index: uint64(i)}) if err != nil { if i <= 4 { if err != ErrFileNotFound { @@ -362,8 +365,8 @@ func TestRecoverAfterCut(t *testing.T) { t.Errorf("#%d: metadata = %s, want %s", i, metadata, "metadata") } for j, e := range entries { - if e.Index != uint64(j+i) { - t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i) + if e.Index != uint64(j+i+1) { + t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i+1) } } w.Close() @@ -381,12 +384,15 @@ func TestOpenAtUncommittedIndex(t *testing.T) { if err != nil { t.Fatal(err) } + if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil { + t.Fatal(err) + } if err := w.SaveEntry(&raftpb.Entry{Index: 0}); err != nil { t.Fatal(err) } w.Close() - w, err = Open(p, 1) + w, err = Open(p, walpb.Snapshot{}) if err != nil { t.Fatal(err) } diff --git a/wal/walpb/record.pb.go b/wal/walpb/record.pb.go index 1f93b7521..c33687074 100644 --- a/wal/walpb/record.pb.go +++ b/wal/walpb/record.pb.go @@ -10,6 +10,7 @@ It has these top-level messages: Record + Snapshot */ package walpb @@ -38,6 +39,16 @@ func (m *Record) Reset() { *m = Record{} } func (m *Record) String() string { return proto.CompactTextString(m) } func (*Record) ProtoMessage() {} +type Snapshot struct { + Index uint64 `protobuf:"varint,1,req,name=index" json:"index"` + Term uint64 `protobuf:"varint,2,req,name=term" json:"term"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Snapshot) Reset() { *m = Snapshot{} } +func (m *Snapshot) String() string { return proto.CompactTextString(m) } +func (*Snapshot) ProtoMessage() {} + func init() { } func (m *Record) Unmarshal(data []byte) error { @@ -134,6 +145,78 @@ func (m *Record) Unmarshal(data []byte) error { } return nil } +func (m *Snapshot) Unmarshal(data []byte) error { + l := len(data) + index := 0 + for index < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.Index |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + m.Term |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + index -= sizeOfWire + skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:]) + if err != nil { + return err + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...) + index += skippy + } + } + return nil +} func (m *Record) Size() (n int) { var l int _ = l @@ -148,6 +231,16 @@ func (m *Record) Size() (n int) { } return n } +func (m *Snapshot) Size() (n int) { + var l int + _ = l + n += 1 + sovRecord(uint64(m.Index)) + n += 1 + sovRecord(uint64(m.Term)) + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} func sovRecord(x uint64) (n int) { for { @@ -194,6 +287,32 @@ func (m *Record) MarshalTo(data []byte) (n int, err error) { } return i, nil } +func (m *Snapshot) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *Snapshot) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + data[i] = 0x8 + i++ + i = encodeVarintRecord(data, i, uint64(m.Index)) + data[i] = 0x10 + i++ + i = encodeVarintRecord(data, i, uint64(m.Term)) + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} func encodeFixed64Record(data []byte, offset int, v uint64) int { data[offset] = uint8(v) data[offset+1] = uint8(v >> 8) diff --git a/wal/walpb/record.proto b/wal/walpb/record.proto index a236f0481..c99e37e04 100644 --- a/wal/walpb/record.proto +++ b/wal/walpb/record.proto @@ -12,3 +12,8 @@ message Record { required uint32 crc = 2 [(gogoproto.nullable) = false]; optional bytes data = 3; } + +message Snapshot { + required uint64 index = 1 [(gogoproto.nullable) = false]; + required uint64 term = 2 [(gogoproto.nullable) = false]; +}