Merge 3dc1e4faa8affb8b4167ec02b1942effc0ff78cb into c86c93ca2951338115159dcdd20711603044e1f1

This commit is contained in:
KosovGrigorii 2024-09-26 22:00:14 +01:00 committed by GitHub
commit d0304d9613
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 76 additions and 31 deletions

View File

@ -460,6 +460,7 @@ pb.GoFeatures: ""
pb.GoFeatures.legacy_unmarshal_json_enum: ""
walpb.Record: ""
walpb.Record.crc: ""
walpb.Record.created_at: "3.6"
walpb.Record.data: ""
walpb.Record.type: ""
walpb.Snapshot: ""

View File

@ -135,7 +135,7 @@ func TestRepairWriteTearLast(t *testing.T) {
}
return f.Truncate(offset)
}
testRepair(t, makeEnts(50), corruptf, 40)
testRepair(t, makeEnts(50), corruptf, 29)
}
// TestRepairWriteTearMiddle repairs the WAL when there is write tearing

View File

@ -26,6 +26,8 @@ import (
"sync"
"time"
"github.com/jonboulle/clockwork"
"go.uber.org/zap"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
@ -92,12 +94,14 @@ type WAL struct {
locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
fp *filePipeline
clock clockwork.Clock
}
// Create creates a WAL ready for appending records. The given metadata is
// recorded at the head of each WAL file, and can be retrieved with ReadAll
// after the file is Open.
func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
func Create(lg *zap.Logger, dirpath string, metadata []byte, walClock ...clockwork.Clock) (*WAL, error) {
if Exist(dirpath) {
return nil, os.ErrExist
}
@ -158,6 +162,12 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
dir: dirpath,
metadata: metadata,
}
if walClock != nil {
w.clock = walClock[0]
} else {
w.clock = clockwork.NewRealClock()
}
w.encoder, err = newFileEncoder(f.File, 0)
if err != nil {
return nil, err
@ -166,7 +176,7 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
if err = w.saveCrc(0); err != nil {
return nil, err
}
if err = w.encoder.encode(&walpb.Record{Type: MetadataType, Data: metadata}); err != nil {
if err = w.encoder.encode(&walpb.Record{Type: MetadataType, Data: metadata, CreatedAt: w.clock.Now().Unix()}); err != nil {
return nil, err
}
if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
@ -379,6 +389,7 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool
decoder: NewDecoder(rs...),
readClose: closer,
locks: ls,
clock: clockwork.NewRealClock(),
}
if write {
@ -773,7 +784,7 @@ func (w *WAL) cut() error {
return err
}
if err = w.encoder.encode(&walpb.Record{Type: MetadataType, Data: w.metadata}); err != nil {
if err = w.encoder.encode(&walpb.Record{Type: MetadataType, Data: w.metadata, CreatedAt: w.clock.Now().Unix()}); err != nil {
return err
}
@ -930,7 +941,7 @@ func (w *WAL) Close() error {
func (w *WAL) saveEntry(e *raftpb.Entry) error {
// TODO: add MustMarshalTo to reduce one allocation.
b := pbutil.MustMarshal(e)
rec := &walpb.Record{Type: EntryType, Data: b}
rec := &walpb.Record{Type: EntryType, Data: b, CreatedAt: w.clock.Now().Unix()}
if err := w.encoder.encode(rec); err != nil {
return err
}
@ -944,7 +955,7 @@ func (w *WAL) saveState(s *raftpb.HardState) error {
}
w.state = *s
b := pbutil.MustMarshal(s)
rec := &walpb.Record{Type: StateType, Data: b}
rec := &walpb.Record{Type: StateType, Data: b, CreatedAt: w.clock.Now().Unix()}
return w.encoder.encode(rec)
}
@ -996,7 +1007,7 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
w.mu.Lock()
defer w.mu.Unlock()
rec := &walpb.Record{Type: SnapshotType, Data: b}
rec := &walpb.Record{Type: SnapshotType, Data: b, CreatedAt: w.clock.Now().Unix()}
if err := w.encoder.encode(rec); err != nil {
return err
}
@ -1008,7 +1019,7 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
}
func (w *WAL) saveCrc(prevCrc uint32) error {
return w.encoder.encode(&walpb.Record{Type: CrcType, Crc: prevCrc})
return w.encoder.encode(&walpb.Record{Type: CrcType, Crc: prevCrc, CreatedAt: w.clock.Now().Unix()})
}
func (w *WAL) tail() *fileutil.LockedFile {

View File

@ -29,6 +29,8 @@ import (
"strings"
"testing"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
@ -49,7 +51,7 @@ var (
func TestNew(t *testing.T) {
p := t.TempDir()
w, err := Create(zaptest.NewLogger(t), p, []byte("somedata"))
w, err := Create(zaptest.NewLogger(t), p, []byte("somedata"), clockwork.NewFakeClock())
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
@ -75,17 +77,18 @@ func TestNew(t *testing.T) {
var wb bytes.Buffer
e := newEncoder(&wb, 0, 0)
err = e.encode(&walpb.Record{Type: CrcType, Crc: 0})
err = e.encode(&walpb.Record{Type: CrcType, Crc: 0, CreatedAt: w.clock.Now().Unix()})
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
err = e.encode(&walpb.Record{Type: MetadataType, Data: []byte("somedata")})
err = e.encode(&walpb.Record{Type: MetadataType, Data: []byte("somedata"), CreatedAt: w.clock.Now().Unix()})
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
r := &walpb.Record{
Type: SnapshotType,
Data: pbutil.MustMarshal(&walpb.Snapshot{}),
Type: SnapshotType,
Data: pbutil.MustMarshal(&walpb.Snapshot{}),
CreatedAt: w.clock.Now().Unix(),
}
if err = e.encode(r); err != nil {
t.Fatalf("err = %v, want nil", err)

View File

@ -11,6 +11,7 @@ import (
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/golang/protobuf/proto"
_ "go.etcd.io/etcd/api/v3/versionpb"
raftpb "go.etcd.io/raft/v3/raftpb"
)
@ -29,6 +30,7 @@ type Record struct {
Type int64 `protobuf:"varint,1,opt,name=type" json:"type"`
Crc uint32 `protobuf:"varint,2,opt,name=crc" json:"crc"`
Data []byte `protobuf:"bytes,3,opt,name=data" json:"data,omitempty"`
CreatedAt int64 `protobuf:"varint,4,opt,name=created_at,json=createdAt" json:"created_at"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -119,24 +121,27 @@ func init() {
func init() { proto.RegisterFile("record.proto", fileDescriptor_bf94fd919e302a1d) }
var fileDescriptor_bf94fd919e302a1d = []byte{
// 266 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x3c, 0x90, 0x41, 0x4e, 0xc3, 0x30,
0x14, 0x44, 0x63, 0x92, 0x22, 0x30, 0x65, 0x51, 0x0b, 0xa1, 0x28, 0x8b, 0x10, 0x75, 0x15, 0x09,
0x29, 0x46, 0x70, 0x02, 0xca, 0x9e, 0x45, 0xba, 0x63, 0x83, 0x5c, 0xe7, 0x27, 0x54, 0x6a, 0xf3,
0xad, 0x1f, 0xab, 0x85, 0x9b, 0x70, 0xa4, 0x2c, 0x39, 0x01, 0x82, 0x70, 0x11, 0x64, 0xa7, 0xb0,
0xfa, 0xa3, 0x37, 0x9a, 0x19, 0xcb, 0x7c, 0x4a, 0xa0, 0x91, 0xaa, 0xc2, 0x10, 0x5a, 0x14, 0x93,
0xbd, 0xda, 0x98, 0x55, 0x72, 0xd1, 0x60, 0x83, 0x9e, 0x48, 0xa7, 0x46, 0x33, 0x99, 0x91, 0xaa,
0xad, 0x59, 0x49, 0x77, 0x46, 0x34, 0x7f, 0xe4, 0xc7, 0xa5, 0xcf, 0x8b, 0x98, 0x47, 0xf6, 0xcd,
0x40, 0xcc, 0x32, 0x96, 0x87, 0x8b, 0xa8, 0xff, 0xbc, 0x0a, 0x4a, 0x4f, 0xc4, 0x25, 0x0f, 0x35,
0xe9, 0xf8, 0x28, 0x63, 0xf9, 0xf9, 0xc1, 0x70, 0x40, 0x08, 0x1e, 0x55, 0xca, 0xaa, 0x38, 0xcc,
0x58, 0x3e, 0x2d, 0xbd, 0x9e, 0x13, 0x3f, 0x59, 0xb6, 0xca, 0x74, 0x2f, 0x68, 0x45, 0xc2, 0x27,
0xeb, 0xb6, 0x82, 0x57, 0x5f, 0x19, 0x1d, 0x92, 0x23, 0xf2, 0x6b, 0x40, 0x5b, 0x5f, 0x1a, 0xfd,
0xaf, 0x01, 0x6d, 0xc5, 0x0d, 0xe7, 0x1a, 0xdb, 0xfa, 0xb9, 0xb3, 0xca, 0x82, 0xef, 0x3e, 0xbb,
0x9d, 0x15, 0xe3, 0xcb, 0x8b, 0x07, 0x6c, 0xeb, 0xa5, 0x33, 0xca, 0x53, 0xfd, 0x27, 0x17, 0xf7,
0xfd, 0x77, 0x1a, 0xf4, 0x43, 0xca, 0x3e, 0x86, 0x94, 0x7d, 0x0d, 0x29, 0x7b, 0xff, 0x49, 0x83,
0xa7, 0xeb, 0x06, 0x0b, 0xb0, 0xba, 0x2a, 0xd6, 0x28, 0xdd, 0x95, 0x1d, 0xd0, 0x0e, 0x48, 0xee,
0xee, 0x64, 0x67, 0x91, 0x54, 0x03, 0x72, 0xaf, 0x36, 0xd2, 0xff, 0xd7, 0x6f, 0x00, 0x00, 0x00,
0xff, 0xff, 0xcf, 0xa9, 0xf0, 0x02, 0x45, 0x01, 0x00, 0x00,
// 312 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x3c, 0x90, 0x41, 0x4e, 0xc2, 0x40,
0x18, 0x85, 0x19, 0x5b, 0x8c, 0x0c, 0xb8, 0x60, 0x62, 0x4c, 0xc3, 0xa2, 0x36, 0xac, 0x88, 0x26,
0x1d, 0x23, 0x89, 0x7b, 0xf0, 0x06, 0x65, 0xe7, 0x86, 0x0c, 0xd3, 0x9f, 0xda, 0x04, 0x3a, 0x93,
0xbf, 0x13, 0x50, 0x8f, 0xe0, 0x09, 0xbc, 0x88, 0x77, 0x60, 0xe9, 0x09, 0x8c, 0xd6, 0x8b, 0x98,
0xfe, 0x2d, 0xae, 0xe6, 0xe5, 0x7b, 0x33, 0xf3, 0x5e, 0x1e, 0x1f, 0x20, 0x68, 0x83, 0x69, 0x6c,
0xd1, 0x38, 0x23, 0xba, 0x7b, 0xb5, 0xb1, 0xab, 0xd1, 0x45, 0x66, 0x32, 0x43, 0x44, 0xd6, 0xaa,
0x31, 0x47, 0x43, 0x54, 0x6b, 0x67, 0x57, 0xb2, 0x3e, 0x5a, 0x14, 0x81, 0xd3, 0xa9, 0x54, 0x36,
0x97, 0x3b, 0xc0, 0x32, 0x37, 0x85, 0x5d, 0x1d, 0x55, 0x73, 0x63, 0xfc, 0xca, 0x4f, 0x13, 0x4a,
0x10, 0x01, 0xf7, 0xdd, 0x8b, 0x85, 0x80, 0x45, 0x6c, 0xe2, 0xcd, 0xfd, 0xc3, 0xd7, 0x55, 0x27,
0x21, 0x22, 0x2e, 0xb9, 0xa7, 0x51, 0x07, 0x27, 0x11, 0x9b, 0x9c, 0xb7, 0x46, 0x0d, 0x84, 0xe0,
0x7e, 0xaa, 0x9c, 0x0a, 0xbc, 0x88, 0x4d, 0x06, 0x09, 0x69, 0x71, 0xcd, 0xb9, 0x46, 0x50, 0x0e,
0xd2, 0xa5, 0x72, 0x81, 0x4f, 0x7f, 0xf5, 0xdf, 0x3e, 0x02, 0x6f, 0x1a, 0xdf, 0xd3, 0xcb, 0x5e,
0x6b, 0xcf, 0xdc, 0x18, 0xf9, 0xd9, 0xa2, 0x50, 0xb6, 0x7c, 0x32, 0x4e, 0x8c, 0x78, 0x37, 0x2f,
0x52, 0x78, 0xa6, 0x78, 0xbf, 0x4d, 0x69, 0x10, 0x35, 0x03, 0xdc, 0x52, 0x01, 0xff, 0xbf, 0x19,
0xe0, 0x56, 0xdc, 0x72, 0xae, 0x4d, 0xb1, 0x5e, 0x96, 0x4e, 0x39, 0xa0, 0x1e, 0xfd, 0xbb, 0x61,
0xdc, 0xec, 0x10, 0x3f, 0x98, 0x62, 0xbd, 0xa8, 0x8d, 0xa4, 0xa7, 0x8f, 0x72, 0x3e, 0x3b, 0xfc,
0x84, 0x9d, 0x43, 0x15, 0xb2, 0xcf, 0x2a, 0x64, 0xdf, 0x55, 0xc8, 0xde, 0x7f, 0xc3, 0xce, 0xe3,
0x4d, 0x66, 0xe2, 0x7a, 0xaa, 0x38, 0x37, 0x92, 0x26, 0x2b, 0x01, 0x77, 0x80, 0x72, 0x37, 0x95,
0xa5, 0x33, 0xa8, 0x32, 0x90, 0x7b, 0xb5, 0x91, 0xb4, 0xfe, 0x5f, 0x00, 0x00, 0x00, 0xff, 0xff,
0xe2, 0x67, 0xb3, 0x8d, 0x93, 0x01, 0x00, 0x00,
}
func (m *Record) Marshal() (dAtA []byte, err error) {
@ -163,6 +168,9 @@ func (m *Record) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
i = encodeVarintRecord(dAtA, i, uint64(m.CreatedAt))
i--
dAtA[i] = 0x20
if m.Data != nil {
i -= len(m.Data)
copy(dAtA[i:], m.Data)
@ -247,6 +255,7 @@ func (m *Record) Size() (n int) {
l = len(m.Data)
n += 1 + l + sovRecord(uint64(l))
}
n += 1 + sovRecord(uint64(m.CreatedAt))
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
@ -378,6 +387,25 @@ func (m *Record) Unmarshal(dAtA []byte) error {
m.Data = []byte{}
}
iNdEx = postIndex
case 4:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field CreatedAt", wireType)
}
m.CreatedAt = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRecord
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.CreatedAt |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipRecord(dAtA[iNdEx:])

View File

@ -3,6 +3,7 @@ package walpb;
import "gogoproto/gogo.proto";
import "raftpb/raft.proto";
import "etcd/api/versionpb/version.proto";
option go_package = "go.etcd.io/etcd/server/v3/storage/wal/walpb";
@ -15,6 +16,7 @@ message Record {
optional int64 type = 1 [(gogoproto.nullable) = false];
optional uint32 crc = 2 [(gogoproto.nullable) = false];
optional bytes data = 3;
optional int64 created_at = 4 [(gogoproto.nullable) = false, (versionpb.etcd_version_field)="3.6"];
}
// Keep in sync with raftpb.SnapshotMetadata.