mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Expose types of entries within the WAL log for access from the tools.
Signed-off-by: Piotr Tabor <ptab@google.com>
This commit is contained in:
parent
0d8aad54ba
commit
58681d3feb
@ -121,8 +121,8 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// skip crc checking if the record type is crcType
|
// skip crc checking if the record type is CrcType
|
||||||
if rec.Type != crcType {
|
if rec.Type != CrcType {
|
||||||
_, err := d.crc.Write(rec.Data)
|
_, err := d.crc.Write(rec.Data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -204,13 +204,13 @@ func (d *decoder) LastCRC() uint32 {
|
|||||||
|
|
||||||
func (d *decoder) LastOffset() int64 { return d.lastValidOff }
|
func (d *decoder) LastOffset() int64 { return d.lastValidOff }
|
||||||
|
|
||||||
func mustUnmarshalEntry(d []byte) raftpb.Entry {
|
func MustUnmarshalEntry(d []byte) raftpb.Entry {
|
||||||
var e raftpb.Entry
|
var e raftpb.Entry
|
||||||
pbutil.MustUnmarshal(&e, d)
|
pbutil.MustUnmarshal(&e, d)
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
func mustUnmarshalState(d []byte) raftpb.HardState {
|
func MustUnmarshalState(d []byte) raftpb.HardState {
|
||||||
var s raftpb.HardState
|
var s raftpb.HardState
|
||||||
pbutil.MustUnmarshal(&s, d)
|
pbutil.MustUnmarshal(&s, d)
|
||||||
return s
|
return s
|
||||||
|
@ -50,7 +50,7 @@ func Repair(lg *zap.Logger, dirpath string) bool {
|
|||||||
case err == nil:
|
case err == nil:
|
||||||
// update crc of the decoder when necessary
|
// update crc of the decoder when necessary
|
||||||
switch rec.Type {
|
switch rec.Type {
|
||||||
case crcType:
|
case CrcType:
|
||||||
crc := decoder.LastCRC()
|
crc := decoder.LastCRC()
|
||||||
// current crc of decoder must match the crc of the record.
|
// current crc of decoder must match the crc of the record.
|
||||||
// do no need to match 0 crc, since the decoder is a new one at this case.
|
// do no need to match 0 crc, since the decoder is a new one at this case.
|
||||||
|
@ -36,11 +36,11 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
metadataType int64 = iota + 1
|
MetadataType int64 = iota + 1
|
||||||
entryType
|
EntryType
|
||||||
stateType
|
StateType
|
||||||
crcType
|
CrcType
|
||||||
snapshotType
|
SnapshotType
|
||||||
|
|
||||||
// warnSyncDuration is the amount of time allotted to an fsync before
|
// warnSyncDuration is the amount of time allotted to an fsync before
|
||||||
// logging a warning
|
// logging a warning
|
||||||
@ -56,7 +56,7 @@ var (
|
|||||||
|
|
||||||
ErrMetadataConflict = errors.New("wal: conflicting metadata found")
|
ErrMetadataConflict = errors.New("wal: conflicting metadata found")
|
||||||
ErrFileNotFound = errors.New("wal: file not found")
|
ErrFileNotFound = errors.New("wal: file not found")
|
||||||
ErrCRCMismatch = errors.New("wal: crc mismatch")
|
ErrCRCMismatch = walpb.ErrCRCMismatch
|
||||||
ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
|
ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
|
||||||
ErrSnapshotNotFound = errors.New("wal: snapshot not found")
|
ErrSnapshotNotFound = errors.New("wal: snapshot not found")
|
||||||
ErrSliceOutOfRange = errors.New("wal: slice bounds out of range")
|
ErrSliceOutOfRange = errors.New("wal: slice bounds out of range")
|
||||||
@ -166,7 +166,7 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
|
|||||||
if err = w.saveCrc(0); err != nil {
|
if err = w.saveCrc(0); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
|
if err = w.encoder.encode(&walpb.Record{Type: MetadataType, Data: metadata}); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
|
if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
|
||||||
@ -454,8 +454,8 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
|||||||
var match bool
|
var match bool
|
||||||
for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) {
|
for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) {
|
||||||
switch rec.Type {
|
switch rec.Type {
|
||||||
case entryType:
|
case EntryType:
|
||||||
e := mustUnmarshalEntry(rec.Data)
|
e := MustUnmarshalEntry(rec.Data)
|
||||||
// 0 <= e.Index-w.start.Index - 1 < len(ents)
|
// 0 <= e.Index-w.start.Index - 1 < len(ents)
|
||||||
if e.Index > w.start.Index {
|
if e.Index > w.start.Index {
|
||||||
// prevent "panic: runtime error: slice bounds out of range [:13038096702221461992] with capacity 0"
|
// prevent "panic: runtime error: slice bounds out of range [:13038096702221461992] with capacity 0"
|
||||||
@ -469,17 +469,17 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
|||||||
}
|
}
|
||||||
w.enti = e.Index
|
w.enti = e.Index
|
||||||
|
|
||||||
case stateType:
|
case StateType:
|
||||||
state = mustUnmarshalState(rec.Data)
|
state = MustUnmarshalState(rec.Data)
|
||||||
|
|
||||||
case metadataType:
|
case MetadataType:
|
||||||
if metadata != nil && !bytes.Equal(metadata, rec.Data) {
|
if metadata != nil && !bytes.Equal(metadata, rec.Data) {
|
||||||
state.Reset()
|
state.Reset()
|
||||||
return nil, state, nil, ErrMetadataConflict
|
return nil, state, nil, ErrMetadataConflict
|
||||||
}
|
}
|
||||||
metadata = rec.Data
|
metadata = rec.Data
|
||||||
|
|
||||||
case crcType:
|
case CrcType:
|
||||||
crc := decoder.LastCRC()
|
crc := decoder.LastCRC()
|
||||||
// current crc of decoder must match the crc of the record.
|
// current crc of decoder must match the crc of the record.
|
||||||
// do no need to match 0 crc, since the decoder is a new one at this case.
|
// do no need to match 0 crc, since the decoder is a new one at this case.
|
||||||
@ -489,7 +489,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
|||||||
}
|
}
|
||||||
decoder.UpdateCRC(rec.Crc)
|
decoder.UpdateCRC(rec.Crc)
|
||||||
|
|
||||||
case snapshotType:
|
case SnapshotType:
|
||||||
var snap walpb.Snapshot
|
var snap walpb.Snapshot
|
||||||
pbutil.MustUnmarshal(&snap, rec.Data)
|
pbutil.MustUnmarshal(&snap, rec.Data)
|
||||||
if snap.Index == w.start.Index {
|
if snap.Index == w.start.Index {
|
||||||
@ -591,13 +591,13 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro
|
|||||||
|
|
||||||
for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) {
|
for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) {
|
||||||
switch rec.Type {
|
switch rec.Type {
|
||||||
case snapshotType:
|
case SnapshotType:
|
||||||
var loadedSnap walpb.Snapshot
|
var loadedSnap walpb.Snapshot
|
||||||
pbutil.MustUnmarshal(&loadedSnap, rec.Data)
|
pbutil.MustUnmarshal(&loadedSnap, rec.Data)
|
||||||
snaps = append(snaps, loadedSnap)
|
snaps = append(snaps, loadedSnap)
|
||||||
case stateType:
|
case StateType:
|
||||||
state = mustUnmarshalState(rec.Data)
|
state = MustUnmarshalState(rec.Data)
|
||||||
case crcType:
|
case CrcType:
|
||||||
crc := decoder.LastCRC()
|
crc := decoder.LastCRC()
|
||||||
// current crc of decoder must match the crc of the record.
|
// current crc of decoder must match the crc of the record.
|
||||||
// do no need to match 0 crc, since the decoder is a new one at this case.
|
// do no need to match 0 crc, since the decoder is a new one at this case.
|
||||||
@ -665,12 +665,12 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta
|
|||||||
|
|
||||||
for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) {
|
for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) {
|
||||||
switch rec.Type {
|
switch rec.Type {
|
||||||
case metadataType:
|
case MetadataType:
|
||||||
if metadata != nil && !bytes.Equal(metadata, rec.Data) {
|
if metadata != nil && !bytes.Equal(metadata, rec.Data) {
|
||||||
return nil, ErrMetadataConflict
|
return nil, ErrMetadataConflict
|
||||||
}
|
}
|
||||||
metadata = rec.Data
|
metadata = rec.Data
|
||||||
case crcType:
|
case CrcType:
|
||||||
crc := decoder.LastCRC()
|
crc := decoder.LastCRC()
|
||||||
// Current crc of decoder must match the crc of the record.
|
// Current crc of decoder must match the crc of the record.
|
||||||
// We need not match 0 crc, since the decoder is a new one at this point.
|
// We need not match 0 crc, since the decoder is a new one at this point.
|
||||||
@ -678,7 +678,7 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta
|
|||||||
return nil, ErrCRCMismatch
|
return nil, ErrCRCMismatch
|
||||||
}
|
}
|
||||||
decoder.UpdateCRC(rec.Crc)
|
decoder.UpdateCRC(rec.Crc)
|
||||||
case snapshotType:
|
case SnapshotType:
|
||||||
var loadedSnap walpb.Snapshot
|
var loadedSnap walpb.Snapshot
|
||||||
pbutil.MustUnmarshal(&loadedSnap, rec.Data)
|
pbutil.MustUnmarshal(&loadedSnap, rec.Data)
|
||||||
if loadedSnap.Index == snap.Index {
|
if loadedSnap.Index == snap.Index {
|
||||||
@ -689,8 +689,8 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta
|
|||||||
}
|
}
|
||||||
// We ignore all entry and state type records as these
|
// We ignore all entry and state type records as these
|
||||||
// are not necessary for validating the WAL contents
|
// are not necessary for validating the WAL contents
|
||||||
case entryType:
|
case EntryType:
|
||||||
case stateType:
|
case StateType:
|
||||||
pbutil.MustUnmarshal(&state, rec.Data)
|
pbutil.MustUnmarshal(&state, rec.Data)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unexpected block type %d", rec.Type)
|
return nil, fmt.Errorf("unexpected block type %d", rec.Type)
|
||||||
@ -748,7 +748,7 @@ func (w *WAL) cut() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
|
if err = w.encoder.encode(&walpb.Record{Type: MetadataType, Data: w.metadata}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -905,7 +905,7 @@ func (w *WAL) Close() error {
|
|||||||
func (w *WAL) saveEntry(e *raftpb.Entry) error {
|
func (w *WAL) saveEntry(e *raftpb.Entry) error {
|
||||||
// TODO: add MustMarshalTo to reduce one allocation.
|
// TODO: add MustMarshalTo to reduce one allocation.
|
||||||
b := pbutil.MustMarshal(e)
|
b := pbutil.MustMarshal(e)
|
||||||
rec := &walpb.Record{Type: entryType, Data: b}
|
rec := &walpb.Record{Type: EntryType, Data: b}
|
||||||
if err := w.encoder.encode(rec); err != nil {
|
if err := w.encoder.encode(rec); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -919,7 +919,7 @@ func (w *WAL) saveState(s *raftpb.HardState) error {
|
|||||||
}
|
}
|
||||||
w.state = *s
|
w.state = *s
|
||||||
b := pbutil.MustMarshal(s)
|
b := pbutil.MustMarshal(s)
|
||||||
rec := &walpb.Record{Type: stateType, Data: b}
|
rec := &walpb.Record{Type: StateType, Data: b}
|
||||||
return w.encoder.encode(rec)
|
return w.encoder.encode(rec)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -968,7 +968,7 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
|
|||||||
w.mu.Lock()
|
w.mu.Lock()
|
||||||
defer w.mu.Unlock()
|
defer w.mu.Unlock()
|
||||||
|
|
||||||
rec := &walpb.Record{Type: snapshotType, Data: b}
|
rec := &walpb.Record{Type: SnapshotType, Data: b}
|
||||||
if err := w.encoder.encode(rec); err != nil {
|
if err := w.encoder.encode(rec); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -980,7 +980,7 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *WAL) saveCrc(prevCrc uint32) error {
|
func (w *WAL) saveCrc(prevCrc uint32) error {
|
||||||
return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
|
return w.encoder.encode(&walpb.Record{Type: CrcType, Crc: prevCrc})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WAL) tail() *fileutil.LockedFile {
|
func (w *WAL) tail() *fileutil.LockedFile {
|
||||||
|
@ -74,16 +74,16 @@ func TestNew(t *testing.T) {
|
|||||||
|
|
||||||
var wb bytes.Buffer
|
var wb bytes.Buffer
|
||||||
e := newEncoder(&wb, 0, 0)
|
e := newEncoder(&wb, 0, 0)
|
||||||
err = e.encode(&walpb.Record{Type: crcType, Crc: 0})
|
err = e.encode(&walpb.Record{Type: CrcType, Crc: 0})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err = %v, want nil", err)
|
t.Fatalf("err = %v, want nil", err)
|
||||||
}
|
}
|
||||||
err = e.encode(&walpb.Record{Type: metadataType, Data: []byte("somedata")})
|
err = e.encode(&walpb.Record{Type: MetadataType, Data: []byte("somedata")})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err = %v, want nil", err)
|
t.Fatalf("err = %v, want nil", err)
|
||||||
}
|
}
|
||||||
r := &walpb.Record{
|
r := &walpb.Record{
|
||||||
Type: snapshotType,
|
Type: SnapshotType,
|
||||||
Data: pbutil.MustMarshal(&walpb.Snapshot{}),
|
Data: pbutil.MustMarshal(&walpb.Snapshot{}),
|
||||||
}
|
}
|
||||||
if err = e.encode(r); err != nil {
|
if err = e.encode(r); err != nil {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user