mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: Move server files to 'server' directory.
26 git mv mvcc wal auth etcdserver etcdmain proxy embed/ lease/ server 36 git mv go.mod go.sum server
This commit is contained in:
196
server/wal/decoder.go
Normal file
196
server/wal/decoder.go
Normal file
@@ -0,0 +1,196 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/binary"
|
||||
"hash"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"go.etcd.io/etcd/pkg/v3/crc"
|
||||
"go.etcd.io/etcd/pkg/v3/pbutil"
|
||||
"go.etcd.io/etcd/raft/v3/raftpb"
|
||||
"go.etcd.io/etcd/v3/wal/walpb"
|
||||
)
|
||||
|
||||
const minSectorSize = 512
|
||||
|
||||
// frameSizeBytes is frame size in bytes, including record size and padding size.
|
||||
const frameSizeBytes = 8
|
||||
|
||||
type decoder struct {
|
||||
mu sync.Mutex
|
||||
brs []*bufio.Reader
|
||||
|
||||
// lastValidOff file offset following the last valid decoded record
|
||||
lastValidOff int64
|
||||
crc hash.Hash32
|
||||
}
|
||||
|
||||
func newDecoder(r ...io.Reader) *decoder {
|
||||
readers := make([]*bufio.Reader, len(r))
|
||||
for i := range r {
|
||||
readers[i] = bufio.NewReader(r[i])
|
||||
}
|
||||
return &decoder{
|
||||
brs: readers,
|
||||
crc: crc.New(0, crcTable),
|
||||
}
|
||||
}
|
||||
|
||||
func (d *decoder) decode(rec *walpb.Record) error {
|
||||
rec.Reset()
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
return d.decodeRecord(rec)
|
||||
}
|
||||
|
||||
// raft max message size is set to 1 MB in etcd server
|
||||
// assume projects set reasonable message size limit,
|
||||
// thus entry size should never exceed 10 MB
|
||||
const maxWALEntrySizeLimit = int64(10 * 1024 * 1024)
|
||||
|
||||
func (d *decoder) decodeRecord(rec *walpb.Record) error {
|
||||
if len(d.brs) == 0 {
|
||||
return io.EOF
|
||||
}
|
||||
|
||||
l, err := readInt64(d.brs[0])
|
||||
if err == io.EOF || (err == nil && l == 0) {
|
||||
// hit end of file or preallocated space
|
||||
d.brs = d.brs[1:]
|
||||
if len(d.brs) == 0 {
|
||||
return io.EOF
|
||||
}
|
||||
d.lastValidOff = 0
|
||||
return d.decodeRecord(rec)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
recBytes, padBytes := decodeFrameSize(l)
|
||||
if recBytes >= maxWALEntrySizeLimit-padBytes {
|
||||
return ErrMaxWALEntrySizeLimitExceeded
|
||||
}
|
||||
|
||||
data := make([]byte, recBytes+padBytes)
|
||||
if _, err = io.ReadFull(d.brs[0], data); err != nil {
|
||||
// ReadFull returns io.EOF only if no bytes were read
|
||||
// the decoder should treat this as an ErrUnexpectedEOF instead.
|
||||
if err == io.EOF {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
return err
|
||||
}
|
||||
if err := rec.Unmarshal(data[:recBytes]); err != nil {
|
||||
if d.isTornEntry(data) {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// skip crc checking if the record type is crcType
|
||||
if rec.Type != crcType {
|
||||
d.crc.Write(rec.Data)
|
||||
if err := rec.Validate(d.crc.Sum32()); err != nil {
|
||||
if d.isTornEntry(data) {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
// record decoded as valid; point last valid offset to end of record
|
||||
d.lastValidOff += frameSizeBytes + recBytes + padBytes
|
||||
return nil
|
||||
}
|
||||
|
||||
func decodeFrameSize(lenField int64) (recBytes int64, padBytes int64) {
|
||||
// the record size is stored in the lower 56 bits of the 64-bit length
|
||||
recBytes = int64(uint64(lenField) & ^(uint64(0xff) << 56))
|
||||
// non-zero padding is indicated by set MSb / a negative length
|
||||
if lenField < 0 {
|
||||
// padding is stored in lower 3 bits of length MSB
|
||||
padBytes = int64((uint64(lenField) >> 56) & 0x7)
|
||||
}
|
||||
return recBytes, padBytes
|
||||
}
|
||||
|
||||
// isTornEntry determines whether the last entry of the WAL was partially written
|
||||
// and corrupted because of a torn write.
|
||||
func (d *decoder) isTornEntry(data []byte) bool {
|
||||
if len(d.brs) != 1 {
|
||||
return false
|
||||
}
|
||||
|
||||
fileOff := d.lastValidOff + frameSizeBytes
|
||||
curOff := 0
|
||||
chunks := [][]byte{}
|
||||
// split data on sector boundaries
|
||||
for curOff < len(data) {
|
||||
chunkLen := int(minSectorSize - (fileOff % minSectorSize))
|
||||
if chunkLen > len(data)-curOff {
|
||||
chunkLen = len(data) - curOff
|
||||
}
|
||||
chunks = append(chunks, data[curOff:curOff+chunkLen])
|
||||
fileOff += int64(chunkLen)
|
||||
curOff += chunkLen
|
||||
}
|
||||
|
||||
// if any data for a sector chunk is all 0, it's a torn write
|
||||
for _, sect := range chunks {
|
||||
isZero := true
|
||||
for _, v := range sect {
|
||||
if v != 0 {
|
||||
isZero = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if isZero {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (d *decoder) updateCRC(prevCrc uint32) {
|
||||
d.crc = crc.New(prevCrc, crcTable)
|
||||
}
|
||||
|
||||
func (d *decoder) lastCRC() uint32 {
|
||||
return d.crc.Sum32()
|
||||
}
|
||||
|
||||
func (d *decoder) lastOffset() int64 { return d.lastValidOff }
|
||||
|
||||
func mustUnmarshalEntry(d []byte) raftpb.Entry {
|
||||
var e raftpb.Entry
|
||||
pbutil.MustUnmarshal(&e, d)
|
||||
return e
|
||||
}
|
||||
|
||||
func mustUnmarshalState(d []byte) raftpb.HardState {
|
||||
var s raftpb.HardState
|
||||
pbutil.MustUnmarshal(&s, d)
|
||||
return s
|
||||
}
|
||||
|
||||
func readInt64(r io.Reader) (int64, error) {
|
||||
var n int64
|
||||
err := binary.Read(r, binary.LittleEndian, &n)
|
||||
return n, err
|
||||
}
|
||||
75
server/wal/doc.go
Normal file
75
server/wal/doc.go
Normal file
@@ -0,0 +1,75 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
/*
|
||||
Package wal provides an implementation of a write ahead log that is used by
|
||||
etcd.
|
||||
|
||||
A WAL is created at a particular directory and is made up of a number of
|
||||
segmented WAL files. Inside of each file the raft state and entries are appended
|
||||
to it with the Save method:
|
||||
|
||||
metadata := []byte{}
|
||||
w, err := wal.Create(zap.NewExample(), "/var/lib/etcd", metadata)
|
||||
...
|
||||
err := w.Save(s, ents)
|
||||
|
||||
After saving a raft snapshot to disk, SaveSnapshot method should be called to
|
||||
record it. So WAL can match with the saved snapshot when restarting.
|
||||
|
||||
err := w.SaveSnapshot(walpb.Snapshot{Index: 10, Term: 2})
|
||||
|
||||
When a user has finished using a WAL it must be closed:
|
||||
|
||||
w.Close()
|
||||
|
||||
Each WAL file is a stream of WAL records. A WAL record is a length field and a wal record
|
||||
protobuf. The record protobuf contains a CRC, a type, and a data payload. The length field is a
|
||||
64-bit packed structure holding the length of the remaining logical record data in its lower
|
||||
56 bits and its physical padding in the first three bits of the most significant byte. Each
|
||||
record is 8-byte aligned so that the length field is never torn. The CRC contains the CRC32
|
||||
value of all record protobufs preceding the current record.
|
||||
|
||||
WAL files are placed inside of the directory in the following format:
|
||||
$seq-$index.wal
|
||||
|
||||
The first WAL file to be created will be 0000000000000000-0000000000000000.wal
|
||||
indicating an initial sequence of 0 and an initial raft index of 0. The first
|
||||
entry written to WAL MUST have raft index 0.
|
||||
|
||||
WAL will cut its current tail wal file if its size exceeds 64MB. This will increment an internal
|
||||
sequence number and cause a new file to be created. If the last raft index saved
|
||||
was 0x20 and this is the first time cut has been called on this WAL then the sequence will
|
||||
increment from 0x0 to 0x1. The new file will be: 0000000000000001-0000000000000021.wal.
|
||||
If a second cut issues 0x10 entries with incremental index later then the file will be called:
|
||||
0000000000000002-0000000000000031.wal.
|
||||
|
||||
At a later time a WAL can be opened at a particular snapshot. If there is no
|
||||
snapshot, an empty snapshot should be passed in.
|
||||
|
||||
w, err := wal.Open("/var/lib/etcd", walpb.Snapshot{Index: 10, Term: 2})
|
||||
...
|
||||
|
||||
The snapshot must have been written to the WAL.
|
||||
|
||||
Additional items cannot be Saved to this WAL until all of the items from the given
|
||||
snapshot to the end of the WAL are read first:
|
||||
|
||||
metadata, state, ents, err := w.ReadAll()
|
||||
|
||||
This will give you the metadata, the last raft.State and the slice of
|
||||
raft.Entry items in the log.
|
||||
|
||||
*/
|
||||
package wal
|
||||
124
server/wal/encoder.go
Normal file
124
server/wal/encoder.go
Normal file
@@ -0,0 +1,124 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"hash"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"go.etcd.io/etcd/pkg/v3/crc"
|
||||
"go.etcd.io/etcd/pkg/v3/ioutil"
|
||||
"go.etcd.io/etcd/v3/wal/walpb"
|
||||
)
|
||||
|
||||
// walPageBytes is the alignment for flushing records to the backing Writer.
|
||||
// It should be a multiple of the minimum sector size so that WAL can safely
|
||||
// distinguish between torn writes and ordinary data corruption.
|
||||
const walPageBytes = 8 * minSectorSize
|
||||
|
||||
type encoder struct {
|
||||
mu sync.Mutex
|
||||
bw *ioutil.PageWriter
|
||||
|
||||
crc hash.Hash32
|
||||
buf []byte
|
||||
uint64buf []byte
|
||||
}
|
||||
|
||||
func newEncoder(w io.Writer, prevCrc uint32, pageOffset int) *encoder {
|
||||
return &encoder{
|
||||
bw: ioutil.NewPageWriter(w, walPageBytes, pageOffset),
|
||||
crc: crc.New(prevCrc, crcTable),
|
||||
// 1MB buffer
|
||||
buf: make([]byte, 1024*1024),
|
||||
uint64buf: make([]byte, 8),
|
||||
}
|
||||
}
|
||||
|
||||
// newFileEncoder creates a new encoder with current file offset for the page writer.
|
||||
func newFileEncoder(f *os.File, prevCrc uint32) (*encoder, error) {
|
||||
offset, err := f.Seek(0, io.SeekCurrent)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newEncoder(f, prevCrc, int(offset)), nil
|
||||
}
|
||||
|
||||
func (e *encoder) encode(rec *walpb.Record) error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
e.crc.Write(rec.Data)
|
||||
rec.Crc = e.crc.Sum32()
|
||||
var (
|
||||
data []byte
|
||||
err error
|
||||
n int
|
||||
)
|
||||
|
||||
if rec.Size() > len(e.buf) {
|
||||
data, err = rec.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
n, err = rec.MarshalTo(e.buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data = e.buf[:n]
|
||||
}
|
||||
|
||||
lenField, padBytes := encodeFrameSize(len(data))
|
||||
if err = writeUint64(e.bw, lenField, e.uint64buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if padBytes != 0 {
|
||||
data = append(data, make([]byte, padBytes)...)
|
||||
}
|
||||
n, err = e.bw.Write(data)
|
||||
walWriteBytes.Add(float64(n))
|
||||
return err
|
||||
}
|
||||
|
||||
func encodeFrameSize(dataBytes int) (lenField uint64, padBytes int) {
|
||||
lenField = uint64(dataBytes)
|
||||
// force 8 byte alignment so length never gets a torn write
|
||||
padBytes = (8 - (dataBytes % 8)) % 8
|
||||
if padBytes != 0 {
|
||||
lenField |= uint64(0x80|padBytes) << 56
|
||||
}
|
||||
return lenField, padBytes
|
||||
}
|
||||
|
||||
func (e *encoder) flush() error {
|
||||
e.mu.Lock()
|
||||
n, err := e.bw.FlushN()
|
||||
e.mu.Unlock()
|
||||
walWriteBytes.Add(float64(n))
|
||||
return err
|
||||
}
|
||||
|
||||
func writeUint64(w io.Writer, n uint64, buf []byte) error {
|
||||
// http://golang.org/src/encoding/binary/binary.go
|
||||
binary.LittleEndian.PutUint64(buf, n)
|
||||
nv, err := w.Write(buf)
|
||||
walWriteBytes.Add(float64(nv))
|
||||
return err
|
||||
}
|
||||
105
server/wal/file_pipeline.go
Normal file
105
server/wal/file_pipeline.go
Normal file
@@ -0,0 +1,105 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"go.etcd.io/etcd/pkg/v3/fileutil"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// filePipeline pipelines allocating disk space
|
||||
type filePipeline struct {
|
||||
lg *zap.Logger
|
||||
|
||||
// dir to put files
|
||||
dir string
|
||||
// size of files to make, in bytes
|
||||
size int64
|
||||
// count number of files generated
|
||||
count int
|
||||
|
||||
filec chan *fileutil.LockedFile
|
||||
errc chan error
|
||||
donec chan struct{}
|
||||
}
|
||||
|
||||
func newFilePipeline(lg *zap.Logger, dir string, fileSize int64) *filePipeline {
|
||||
if lg == nil {
|
||||
lg = zap.NewNop()
|
||||
}
|
||||
fp := &filePipeline{
|
||||
lg: lg,
|
||||
dir: dir,
|
||||
size: fileSize,
|
||||
filec: make(chan *fileutil.LockedFile),
|
||||
errc: make(chan error, 1),
|
||||
donec: make(chan struct{}),
|
||||
}
|
||||
go fp.run()
|
||||
return fp
|
||||
}
|
||||
|
||||
// Open returns a fresh file for writing. Rename the file before calling
|
||||
// Open again or there will be file collisions.
|
||||
func (fp *filePipeline) Open() (f *fileutil.LockedFile, err error) {
|
||||
select {
|
||||
case f = <-fp.filec:
|
||||
case err = <-fp.errc:
|
||||
}
|
||||
return f, err
|
||||
}
|
||||
|
||||
func (fp *filePipeline) Close() error {
|
||||
close(fp.donec)
|
||||
return <-fp.errc
|
||||
}
|
||||
|
||||
func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
|
||||
// count % 2 so this file isn't the same as the one last published
|
||||
fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
|
||||
if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = fileutil.Preallocate(f.File, fp.size, true); err != nil {
|
||||
fp.lg.Error("failed to preallocate space when creating a new WAL", zap.Int64("size", fp.size), zap.Error(err))
|
||||
f.Close()
|
||||
return nil, err
|
||||
}
|
||||
fp.count++
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func (fp *filePipeline) run() {
|
||||
defer close(fp.errc)
|
||||
for {
|
||||
f, err := fp.alloc()
|
||||
if err != nil {
|
||||
fp.errc <- err
|
||||
return
|
||||
}
|
||||
select {
|
||||
case fp.filec <- f:
|
||||
case <-fp.donec:
|
||||
os.Remove(f.Name())
|
||||
f.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
73
server/wal/file_pipeline_test.go
Normal file
73
server/wal/file_pipeline_test.go
Normal file
@@ -0,0 +1,73 @@
|
||||
// Copyright 2018 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func TestFilePipeline(t *testing.T) {
|
||||
tdir, err := ioutil.TempDir(os.TempDir(), "wal-test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(tdir)
|
||||
|
||||
fp := newFilePipeline(zap.NewExample(), tdir, SegmentSizeBytes)
|
||||
defer fp.Close()
|
||||
|
||||
f, ferr := fp.Open()
|
||||
if ferr != nil {
|
||||
t.Fatal(ferr)
|
||||
}
|
||||
f.Close()
|
||||
}
|
||||
|
||||
func TestFilePipelineFailPreallocate(t *testing.T) {
|
||||
tdir, err := ioutil.TempDir(os.TempDir(), "wal-test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(tdir)
|
||||
|
||||
fp := newFilePipeline(zap.NewExample(), tdir, math.MaxInt64)
|
||||
defer fp.Close()
|
||||
|
||||
f, ferr := fp.Open()
|
||||
if f != nil || ferr == nil { // no space left on device
|
||||
t.Fatal("expected error on invalid pre-allocate size, but no error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilePipelineFailLockFile(t *testing.T) {
|
||||
tdir, err := ioutil.TempDir(os.TempDir(), "wal-test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
os.RemoveAll(tdir)
|
||||
|
||||
fp := newFilePipeline(zap.NewExample(), tdir, math.MaxInt64)
|
||||
defer fp.Close()
|
||||
|
||||
f, ferr := fp.Open()
|
||||
if f != nil || ferr == nil { // no such file or directory
|
||||
t.Fatal("expected error on invalid pre-allocate size, but no error")
|
||||
}
|
||||
}
|
||||
42
server/wal/metrics.go
Normal file
42
server/wal/metrics.go
Normal file
@@ -0,0 +1,42 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
|
||||
import "github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
var (
|
||||
walFsyncSec = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "disk",
|
||||
Name: "wal_fsync_duration_seconds",
|
||||
Help: "The latency distributions of fsync called by WAL.",
|
||||
|
||||
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
|
||||
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
|
||||
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
||||
})
|
||||
|
||||
walWriteBytes = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "disk",
|
||||
Name: "wal_write_bytes_total",
|
||||
Help: "Total number of bytes written in WAL.",
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(walFsyncSec)
|
||||
prometheus.MustRegister(walWriteBytes)
|
||||
}
|
||||
87
server/wal/record_test.go
Normal file
87
server/wal/record_test.go
Normal file
@@ -0,0 +1,87 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"go.etcd.io/etcd/v3/wal/walpb"
|
||||
)
|
||||
|
||||
var (
|
||||
infoData = []byte("\b\xef\xfd\x02")
|
||||
infoRecord = append([]byte("\x0e\x00\x00\x00\x00\x00\x00\x00\b\x01\x10\x99\xb5\xe4\xd0\x03\x1a\x04"), infoData...)
|
||||
)
|
||||
|
||||
func TestReadRecord(t *testing.T) {
|
||||
badInfoRecord := make([]byte, len(infoRecord))
|
||||
copy(badInfoRecord, infoRecord)
|
||||
badInfoRecord[len(badInfoRecord)-1] = 'a'
|
||||
|
||||
tests := []struct {
|
||||
data []byte
|
||||
wr *walpb.Record
|
||||
we error
|
||||
}{
|
||||
{infoRecord, &walpb.Record{Type: 1, Crc: crc32.Checksum(infoData, crcTable), Data: infoData}, nil},
|
||||
{[]byte(""), &walpb.Record{}, io.EOF},
|
||||
{infoRecord[:8], &walpb.Record{}, io.ErrUnexpectedEOF},
|
||||
{infoRecord[:len(infoRecord)-len(infoData)-8], &walpb.Record{}, io.ErrUnexpectedEOF},
|
||||
{infoRecord[:len(infoRecord)-len(infoData)], &walpb.Record{}, io.ErrUnexpectedEOF},
|
||||
{infoRecord[:len(infoRecord)-8], &walpb.Record{}, io.ErrUnexpectedEOF},
|
||||
{badInfoRecord, &walpb.Record{}, walpb.ErrCRCMismatch},
|
||||
}
|
||||
|
||||
rec := &walpb.Record{}
|
||||
for i, tt := range tests {
|
||||
buf := bytes.NewBuffer(tt.data)
|
||||
decoder := newDecoder(ioutil.NopCloser(buf))
|
||||
e := decoder.decode(rec)
|
||||
if !reflect.DeepEqual(rec, tt.wr) {
|
||||
t.Errorf("#%d: block = %v, want %v", i, rec, tt.wr)
|
||||
}
|
||||
if !errors.Is(e, tt.we) {
|
||||
t.Errorf("#%d: err = %v, want %v", i, e, tt.we)
|
||||
}
|
||||
rec = &walpb.Record{}
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteRecord(t *testing.T) {
|
||||
b := &walpb.Record{}
|
||||
typ := int64(0xABCD)
|
||||
d := []byte("Hello world!")
|
||||
buf := new(bytes.Buffer)
|
||||
e := newEncoder(buf, 0, 0)
|
||||
e.encode(&walpb.Record{Type: typ, Data: d})
|
||||
e.flush()
|
||||
decoder := newDecoder(ioutil.NopCloser(buf))
|
||||
err := decoder.decode(b)
|
||||
if err != nil {
|
||||
t.Errorf("err = %v, want nil", err)
|
||||
}
|
||||
if b.Type != typ {
|
||||
t.Errorf("type = %d, want %d", b.Type, typ)
|
||||
}
|
||||
if !reflect.DeepEqual(b.Data, d) {
|
||||
t.Errorf("data = %v, want %v", b.Data, d)
|
||||
}
|
||||
}
|
||||
114
server/wal/repair.go
Normal file
114
server/wal/repair.go
Normal file
@@ -0,0 +1,114 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/pkg/v3/fileutil"
|
||||
"go.etcd.io/etcd/v3/wal/walpb"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Repair tries to repair ErrUnexpectedEOF in the
|
||||
// last wal file by truncating.
|
||||
func Repair(lg *zap.Logger, dirpath string) bool {
|
||||
if lg == nil {
|
||||
lg = zap.NewNop()
|
||||
}
|
||||
f, err := openLast(lg, dirpath)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
lg.Info("repairing", zap.String("path", f.Name()))
|
||||
|
||||
rec := &walpb.Record{}
|
||||
decoder := newDecoder(f)
|
||||
for {
|
||||
lastOffset := decoder.lastOffset()
|
||||
err := decoder.decode(rec)
|
||||
switch err {
|
||||
case nil:
|
||||
// update crc of the decoder when necessary
|
||||
switch rec.Type {
|
||||
case crcType:
|
||||
crc := decoder.crc.Sum32()
|
||||
// current crc of decoder must match the crc of the record.
|
||||
// do no need to match 0 crc, since the decoder is a new one at this case.
|
||||
if crc != 0 && rec.Validate(crc) != nil {
|
||||
return false
|
||||
}
|
||||
decoder.updateCRC(rec.Crc)
|
||||
}
|
||||
continue
|
||||
|
||||
case io.EOF:
|
||||
lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.EOF))
|
||||
return true
|
||||
|
||||
case io.ErrUnexpectedEOF:
|
||||
bf, bferr := os.Create(f.Name() + ".broken")
|
||||
if bferr != nil {
|
||||
lg.Warn("failed to create backup file", zap.String("path", f.Name()+".broken"), zap.Error(bferr))
|
||||
return false
|
||||
}
|
||||
defer bf.Close()
|
||||
|
||||
if _, err = f.Seek(0, io.SeekStart); err != nil {
|
||||
lg.Warn("failed to read file", zap.String("path", f.Name()), zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
if _, err = io.Copy(bf, f); err != nil {
|
||||
lg.Warn("failed to copy", zap.String("from", f.Name()+".broken"), zap.String("to", f.Name()), zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
if err = f.Truncate(lastOffset); err != nil {
|
||||
lg.Warn("failed to truncate", zap.String("path", f.Name()), zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
if err = fileutil.Fsync(f.File); err != nil {
|
||||
lg.Warn("failed to fsync", zap.String("path", f.Name()), zap.Error(err))
|
||||
return false
|
||||
}
|
||||
walFsyncSec.Observe(time.Since(start).Seconds())
|
||||
|
||||
lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.ErrUnexpectedEOF))
|
||||
return true
|
||||
|
||||
default:
|
||||
lg.Warn("failed to repair", zap.String("path", f.Name()), zap.Error(err))
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// openLast opens the last wal file for read and write.
|
||||
func openLast(lg *zap.Logger, dirpath string) (*fileutil.LockedFile, error) {
|
||||
names, err := readWALNames(lg, dirpath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
last := filepath.Join(dirpath, names[len(names)-1])
|
||||
return fileutil.LockFile(last, os.O_RDWR, fileutil.PrivateFileMode)
|
||||
}
|
||||
238
server/wal/repair_test.go
Normal file
238
server/wal/repair_test.go
Normal file
@@ -0,0 +1,238 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"go.etcd.io/etcd/raft/v3/raftpb"
|
||||
"go.etcd.io/etcd/v3/wal/walpb"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type corruptFunc func(string, int64) error
|
||||
|
||||
// TestRepairTruncate ensures a truncated file can be repaired
|
||||
func TestRepairTruncate(t *testing.T) {
|
||||
corruptf := func(p string, offset int64) error {
|
||||
f, err := openLast(zap.NewExample(), p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
return f.Truncate(offset - 4)
|
||||
}
|
||||
|
||||
testRepair(t, makeEnts(10), corruptf, 9)
|
||||
}
|
||||
|
||||
func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expectedEnts int) {
|
||||
p, err := ioutil.TempDir(os.TempDir(), "waltest")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(p)
|
||||
|
||||
// create WAL
|
||||
w, err := Create(zap.NewExample(), p, nil)
|
||||
defer func() {
|
||||
if err = w.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, es := range ents {
|
||||
if err = w.Save(raftpb.HardState{}, es); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
offset, err := w.tail().Seek(0, io.SeekCurrent)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
w.Close()
|
||||
|
||||
err = corrupt(p, offset)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// verify we broke the wal
|
||||
w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, _, _, err = w.ReadAll()
|
||||
if err != io.ErrUnexpectedEOF {
|
||||
t.Fatalf("err = %v, want error %v", err, io.ErrUnexpectedEOF)
|
||||
}
|
||||
w.Close()
|
||||
|
||||
// repair the wal
|
||||
if ok := Repair(zap.NewExample(), p); !ok {
|
||||
t.Fatalf("'Repair' returned '%v', want 'true'", ok)
|
||||
}
|
||||
|
||||
// read it back
|
||||
w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, _, walEnts, err := w.ReadAll()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(walEnts) != expectedEnts {
|
||||
t.Fatalf("len(ents) = %d, want %d", len(walEnts), expectedEnts)
|
||||
}
|
||||
|
||||
// write some more entries to repaired log
|
||||
for i := 1; i <= 10; i++ {
|
||||
es := []raftpb.Entry{{Index: uint64(expectedEnts + i)}}
|
||||
if err = w.Save(raftpb.HardState{}, es); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
w.Close()
|
||||
|
||||
// read back entries following repair, ensure it's all there
|
||||
w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, _, walEnts, err = w.ReadAll()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(walEnts) != expectedEnts+10 {
|
||||
t.Fatalf("len(ents) = %d, want %d", len(walEnts), expectedEnts+10)
|
||||
}
|
||||
}
|
||||
|
||||
func makeEnts(ents int) (ret [][]raftpb.Entry) {
|
||||
for i := 1; i <= ents; i++ {
|
||||
ret = append(ret, []raftpb.Entry{{Index: uint64(i)}})
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
// TestRepairWriteTearLast repairs the WAL in case the last record is a torn write
|
||||
// that straddled two sectors.
|
||||
func TestRepairWriteTearLast(t *testing.T) {
|
||||
corruptf := func(p string, offset int64) error {
|
||||
f, err := openLast(zap.NewExample(), p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
// 512 bytes perfectly aligns the last record, so use 1024
|
||||
if offset < 1024 {
|
||||
return fmt.Errorf("got offset %d, expected >1024", offset)
|
||||
}
|
||||
if terr := f.Truncate(1024); terr != nil {
|
||||
return terr
|
||||
}
|
||||
return f.Truncate(offset)
|
||||
}
|
||||
testRepair(t, makeEnts(50), corruptf, 40)
|
||||
}
|
||||
|
||||
// TestRepairWriteTearMiddle repairs the WAL when there is write tearing
|
||||
// in the middle of a record.
|
||||
func TestRepairWriteTearMiddle(t *testing.T) {
|
||||
corruptf := func(p string, offset int64) error {
|
||||
f, err := openLast(zap.NewExample(), p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
// corrupt middle of 2nd record
|
||||
_, werr := f.WriteAt(make([]byte, 512), 4096+512)
|
||||
return werr
|
||||
}
|
||||
ents := makeEnts(5)
|
||||
// 4096 bytes of data so a middle sector is easy to corrupt
|
||||
dat := make([]byte, 4096)
|
||||
for i := range dat {
|
||||
dat[i] = byte(i)
|
||||
}
|
||||
for i := range ents {
|
||||
ents[i][0].Data = dat
|
||||
}
|
||||
testRepair(t, ents, corruptf, 1)
|
||||
}
|
||||
|
||||
func TestRepairFailDeleteDir(t *testing.T) {
|
||||
p, err := ioutil.TempDir(os.TempDir(), "waltest")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(p)
|
||||
|
||||
w, err := Create(zap.NewExample(), p, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
oldSegmentSizeBytes := SegmentSizeBytes
|
||||
SegmentSizeBytes = 64
|
||||
defer func() {
|
||||
SegmentSizeBytes = oldSegmentSizeBytes
|
||||
}()
|
||||
for _, es := range makeEnts(50) {
|
||||
if err = w.Save(raftpb.HardState{}, es); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
_, serr := w.tail().Seek(0, io.SeekCurrent)
|
||||
if serr != nil {
|
||||
t.Fatal(serr)
|
||||
}
|
||||
w.Close()
|
||||
|
||||
f, err := openLast(zap.NewExample(), p)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if terr := f.Truncate(20); terr != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
f.Close()
|
||||
|
||||
w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, _, _, err = w.ReadAll()
|
||||
if err != io.ErrUnexpectedEOF {
|
||||
t.Fatalf("err = %v, want error %v", err, io.ErrUnexpectedEOF)
|
||||
}
|
||||
w.Close()
|
||||
|
||||
os.RemoveAll(p)
|
||||
if Repair(zap.NewExample(), p) {
|
||||
t.Fatal("expect 'Repair' fail on unexpected directory deletion")
|
||||
}
|
||||
}
|
||||
112
server/wal/util.go
Normal file
112
server/wal/util.go
Normal file
@@ -0,0 +1,112 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"go.etcd.io/etcd/pkg/v3/fileutil"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var errBadWALName = errors.New("bad wal name")
|
||||
|
||||
// Exist returns true if there are any files in a given directory.
|
||||
func Exist(dir string) bool {
|
||||
names, err := fileutil.ReadDir(dir, fileutil.WithExt(".wal"))
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return len(names) != 0
|
||||
}
|
||||
|
||||
// searchIndex returns the last array index of names whose raft index section is
|
||||
// equal to or smaller than the given index.
|
||||
// The given names MUST be sorted.
|
||||
func searchIndex(lg *zap.Logger, names []string, index uint64) (int, bool) {
|
||||
for i := len(names) - 1; i >= 0; i-- {
|
||||
name := names[i]
|
||||
_, curIndex, err := parseWALName(name)
|
||||
if err != nil {
|
||||
lg.Panic("failed to parse WAL file name", zap.String("path", name), zap.Error(err))
|
||||
}
|
||||
if index >= curIndex {
|
||||
return i, true
|
||||
}
|
||||
}
|
||||
return -1, false
|
||||
}
|
||||
|
||||
// names should have been sorted based on sequence number.
|
||||
// isValidSeq checks whether seq increases continuously.
|
||||
func isValidSeq(lg *zap.Logger, names []string) bool {
|
||||
var lastSeq uint64
|
||||
for _, name := range names {
|
||||
curSeq, _, err := parseWALName(name)
|
||||
if err != nil {
|
||||
lg.Panic("failed to parse WAL file name", zap.String("path", name), zap.Error(err))
|
||||
}
|
||||
if lastSeq != 0 && lastSeq != curSeq-1 {
|
||||
return false
|
||||
}
|
||||
lastSeq = curSeq
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func readWALNames(lg *zap.Logger, dirpath string) ([]string, error) {
|
||||
names, err := fileutil.ReadDir(dirpath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
wnames := checkWalNames(lg, names)
|
||||
if len(wnames) == 0 {
|
||||
return nil, ErrFileNotFound
|
||||
}
|
||||
return wnames, nil
|
||||
}
|
||||
|
||||
func checkWalNames(lg *zap.Logger, names []string) []string {
|
||||
wnames := make([]string, 0)
|
||||
for _, name := range names {
|
||||
if _, _, err := parseWALName(name); err != nil {
|
||||
// don't complain about left over tmp files
|
||||
if !strings.HasSuffix(name, ".tmp") {
|
||||
lg.Warn(
|
||||
"ignored file in WAL directory",
|
||||
zap.String("path", name),
|
||||
)
|
||||
}
|
||||
continue
|
||||
}
|
||||
wnames = append(wnames, name)
|
||||
}
|
||||
return wnames
|
||||
}
|
||||
|
||||
func parseWALName(str string) (seq, index uint64, err error) {
|
||||
if !strings.HasSuffix(str, ".wal") {
|
||||
return 0, 0, errBadWALName
|
||||
}
|
||||
_, err = fmt.Sscanf(str, "%016x-%016x.wal", &seq, &index)
|
||||
return seq, index, err
|
||||
}
|
||||
|
||||
func walName(seq, index uint64) string {
|
||||
return fmt.Sprintf("%016x-%016x.wal", seq, index)
|
||||
}
|
||||
988
server/wal/wal.go
Normal file
988
server/wal/wal.go
Normal file
@@ -0,0 +1,988 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/pkg/v3/fileutil"
|
||||
"go.etcd.io/etcd/pkg/v3/pbutil"
|
||||
"go.etcd.io/etcd/raft/v3"
|
||||
"go.etcd.io/etcd/raft/v3/raftpb"
|
||||
"go.etcd.io/etcd/v3/wal/walpb"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
metadataType int64 = iota + 1
|
||||
entryType
|
||||
stateType
|
||||
crcType
|
||||
snapshotType
|
||||
|
||||
// warnSyncDuration is the amount of time allotted to an fsync before
|
||||
// logging a warning
|
||||
warnSyncDuration = time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
// SegmentSizeBytes is the preallocated size of each wal segment file.
|
||||
// The actual size might be larger than this. In general, the default
|
||||
// value should be used, but this is defined as an exported variable
|
||||
// so that tests can set a different segment size.
|
||||
SegmentSizeBytes int64 = 64 * 1000 * 1000 // 64MB
|
||||
|
||||
ErrMetadataConflict = errors.New("wal: conflicting metadata found")
|
||||
ErrFileNotFound = errors.New("wal: file not found")
|
||||
ErrCRCMismatch = errors.New("wal: crc mismatch")
|
||||
ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
|
||||
ErrSnapshotNotFound = errors.New("wal: snapshot not found")
|
||||
ErrSliceOutOfRange = errors.New("wal: slice bounds out of range")
|
||||
ErrMaxWALEntrySizeLimitExceeded = errors.New("wal: max entry size limit exceeded")
|
||||
ErrDecoderNotFound = errors.New("wal: decoder not found")
|
||||
crcTable = crc32.MakeTable(crc32.Castagnoli)
|
||||
)
|
||||
|
||||
// WAL is a logical representation of the stable storage.
|
||||
// WAL is either in read mode or append mode but not both.
|
||||
// A newly created WAL is in append mode, and ready for appending records.
|
||||
// A just opened WAL is in read mode, and ready for reading records.
|
||||
// The WAL will be ready for appending after reading out all the previous records.
|
||||
type WAL struct {
|
||||
lg *zap.Logger
|
||||
|
||||
dir string // the living directory of the underlay files
|
||||
|
||||
// dirFile is a fd for the wal directory for syncing on Rename
|
||||
dirFile *os.File
|
||||
|
||||
metadata []byte // metadata recorded at the head of each WAL
|
||||
state raftpb.HardState // hardstate recorded at the head of WAL
|
||||
|
||||
start walpb.Snapshot // snapshot to start reading
|
||||
decoder *decoder // decoder to decode records
|
||||
readClose func() error // closer for decode reader
|
||||
|
||||
unsafeNoSync bool // if set, do not fsync
|
||||
|
||||
mu sync.Mutex
|
||||
enti uint64 // index of the last entry saved to the wal
|
||||
encoder *encoder // encoder to encode records
|
||||
|
||||
locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
|
||||
fp *filePipeline
|
||||
}
|
||||
|
||||
// Create creates a WAL ready for appending records. The given metadata is
|
||||
// recorded at the head of each WAL file, and can be retrieved with ReadAll
|
||||
// after the file is Open.
|
||||
func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
|
||||
if Exist(dirpath) {
|
||||
return nil, os.ErrExist
|
||||
}
|
||||
|
||||
if lg == nil {
|
||||
lg = zap.NewNop()
|
||||
}
|
||||
|
||||
// keep temporary wal directory so WAL initialization appears atomic
|
||||
tmpdirpath := filepath.Clean(dirpath) + ".tmp"
|
||||
if fileutil.Exist(tmpdirpath) {
|
||||
if err := os.RemoveAll(tmpdirpath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
defer os.RemoveAll(tmpdirpath)
|
||||
|
||||
if err := fileutil.CreateDirAll(tmpdirpath); err != nil {
|
||||
lg.Warn(
|
||||
"failed to create a temporary WAL directory",
|
||||
zap.String("tmp-dir-path", tmpdirpath),
|
||||
zap.String("dir-path", dirpath),
|
||||
zap.Error(err),
|
||||
)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p := filepath.Join(tmpdirpath, walName(0, 0))
|
||||
f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
|
||||
if err != nil {
|
||||
lg.Warn(
|
||||
"failed to flock an initial WAL file",
|
||||
zap.String("path", p),
|
||||
zap.Error(err),
|
||||
)
|
||||
return nil, err
|
||||
}
|
||||
if _, err = f.Seek(0, io.SeekEnd); err != nil {
|
||||
lg.Warn(
|
||||
"failed to seek an initial WAL file",
|
||||
zap.String("path", p),
|
||||
zap.Error(err),
|
||||
)
|
||||
return nil, err
|
||||
}
|
||||
if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
|
||||
lg.Warn(
|
||||
"failed to preallocate an initial WAL file",
|
||||
zap.String("path", p),
|
||||
zap.Int64("segment-bytes", SegmentSizeBytes),
|
||||
zap.Error(err),
|
||||
)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
w := &WAL{
|
||||
lg: lg,
|
||||
dir: dirpath,
|
||||
metadata: metadata,
|
||||
}
|
||||
w.encoder, err = newFileEncoder(f.File, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
w.locks = append(w.locks, f)
|
||||
if err = w.saveCrc(0); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logDirPath := w.dir
|
||||
if w, err = w.renameWAL(tmpdirpath); err != nil {
|
||||
lg.Warn(
|
||||
"failed to rename the temporary WAL directory",
|
||||
zap.String("tmp-dir-path", tmpdirpath),
|
||||
zap.String("dir-path", logDirPath),
|
||||
zap.Error(err),
|
||||
)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var perr error
|
||||
defer func() {
|
||||
if perr != nil {
|
||||
w.cleanupWAL(lg)
|
||||
}
|
||||
}()
|
||||
|
||||
// directory was renamed; sync parent dir to persist rename
|
||||
pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
|
||||
if perr != nil {
|
||||
lg.Warn(
|
||||
"failed to open the parent data directory",
|
||||
zap.String("parent-dir-path", filepath.Dir(w.dir)),
|
||||
zap.String("dir-path", w.dir),
|
||||
zap.Error(perr),
|
||||
)
|
||||
return nil, perr
|
||||
}
|
||||
dirCloser := func() error {
|
||||
if perr = pdir.Close(); perr != nil {
|
||||
lg.Warn(
|
||||
"failed to close the parent data directory file",
|
||||
zap.String("parent-dir-path", filepath.Dir(w.dir)),
|
||||
zap.String("dir-path", w.dir),
|
||||
zap.Error(perr),
|
||||
)
|
||||
return perr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
start := time.Now()
|
||||
if perr = fileutil.Fsync(pdir); perr != nil {
|
||||
dirCloser()
|
||||
lg.Warn(
|
||||
"failed to fsync the parent data directory file",
|
||||
zap.String("parent-dir-path", filepath.Dir(w.dir)),
|
||||
zap.String("dir-path", w.dir),
|
||||
zap.Error(perr),
|
||||
)
|
||||
return nil, perr
|
||||
}
|
||||
walFsyncSec.Observe(time.Since(start).Seconds())
|
||||
if err = dirCloser(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return w, nil
|
||||
}
|
||||
|
||||
func (w *WAL) SetUnsafeNoFsync() {
|
||||
w.unsafeNoSync = true
|
||||
}
|
||||
|
||||
func (w *WAL) cleanupWAL(lg *zap.Logger) {
|
||||
var err error
|
||||
if err = w.Close(); err != nil {
|
||||
lg.Panic("failed to close WAL during cleanup", zap.Error(err))
|
||||
}
|
||||
brokenDirName := fmt.Sprintf("%s.broken.%v", w.dir, time.Now().Format("20060102.150405.999999"))
|
||||
if err = os.Rename(w.dir, brokenDirName); err != nil {
|
||||
lg.Panic(
|
||||
"failed to rename WAL during cleanup",
|
||||
zap.Error(err),
|
||||
zap.String("source-path", w.dir),
|
||||
zap.String("rename-path", brokenDirName),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *WAL) renameWAL(tmpdirpath string) (*WAL, error) {
|
||||
if err := os.RemoveAll(w.dir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// On non-Windows platforms, hold the lock while renaming. Releasing
|
||||
// the lock and trying to reacquire it quickly can be flaky because
|
||||
// it's possible the process will fork to spawn a process while this is
|
||||
// happening. The fds are set up as close-on-exec by the Go runtime,
|
||||
// but there is a window between the fork and the exec where another
|
||||
// process holds the lock.
|
||||
if err := os.Rename(tmpdirpath, w.dir); err != nil {
|
||||
if _, ok := err.(*os.LinkError); ok {
|
||||
return w.renameWALUnlock(tmpdirpath)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
w.fp = newFilePipeline(w.lg, w.dir, SegmentSizeBytes)
|
||||
df, err := fileutil.OpenDir(w.dir)
|
||||
w.dirFile = df
|
||||
return w, err
|
||||
}
|
||||
|
||||
func (w *WAL) renameWALUnlock(tmpdirpath string) (*WAL, error) {
|
||||
// rename of directory with locked files doesn't work on windows/cifs;
|
||||
// close the WAL to release the locks so the directory can be renamed.
|
||||
w.lg.Info(
|
||||
"closing WAL to release flock and retry directory renaming",
|
||||
zap.String("from", tmpdirpath),
|
||||
zap.String("to", w.dir),
|
||||
)
|
||||
w.Close()
|
||||
|
||||
if err := os.Rename(tmpdirpath, w.dir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// reopen and relock
|
||||
newWAL, oerr := Open(w.lg, w.dir, walpb.Snapshot{})
|
||||
if oerr != nil {
|
||||
return nil, oerr
|
||||
}
|
||||
if _, _, _, err := newWAL.ReadAll(); err != nil {
|
||||
newWAL.Close()
|
||||
return nil, err
|
||||
}
|
||||
return newWAL, nil
|
||||
}
|
||||
|
||||
// Open opens the WAL at the given snap.
|
||||
// The snap SHOULD have been previously saved to the WAL, or the following
|
||||
// ReadAll will fail.
|
||||
// The returned WAL is ready to read and the first record will be the one after
|
||||
// the given snap. The WAL cannot be appended to before reading out all of its
|
||||
// previous records.
|
||||
func Open(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
|
||||
w, err := openAtIndex(lg, dirpath, snap, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// OpenForRead only opens the wal files for read.
|
||||
// Write on a read only wal panics.
|
||||
func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
|
||||
return openAtIndex(lg, dirpath, snap, false)
|
||||
}
|
||||
|
||||
func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
|
||||
if lg == nil {
|
||||
lg = zap.NewNop()
|
||||
}
|
||||
names, nameIndex, err := selectWALFiles(lg, dirpath, snap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rs, ls, closer, err := openWALFiles(lg, dirpath, names, nameIndex, write)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create a WAL ready for reading
|
||||
w := &WAL{
|
||||
lg: lg,
|
||||
dir: dirpath,
|
||||
start: snap,
|
||||
decoder: newDecoder(rs...),
|
||||
readClose: closer,
|
||||
locks: ls,
|
||||
}
|
||||
|
||||
if write {
|
||||
// write reuses the file descriptors from read; don't close so
|
||||
// WAL can append without dropping the file lock
|
||||
w.readClose = nil
|
||||
if _, _, err := parseWALName(filepath.Base(w.tail().Name())); err != nil {
|
||||
closer()
|
||||
return nil, err
|
||||
}
|
||||
w.fp = newFilePipeline(lg, w.dir, SegmentSizeBytes)
|
||||
}
|
||||
|
||||
return w, nil
|
||||
}
|
||||
|
||||
func selectWALFiles(lg *zap.Logger, dirpath string, snap walpb.Snapshot) ([]string, int, error) {
|
||||
names, err := readWALNames(lg, dirpath)
|
||||
if err != nil {
|
||||
return nil, -1, err
|
||||
}
|
||||
|
||||
nameIndex, ok := searchIndex(lg, names, snap.Index)
|
||||
if !ok || !isValidSeq(lg, names[nameIndex:]) {
|
||||
err = ErrFileNotFound
|
||||
return nil, -1, err
|
||||
}
|
||||
|
||||
return names, nameIndex, nil
|
||||
}
|
||||
|
||||
func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int, write bool) ([]io.Reader, []*fileutil.LockedFile, func() error, error) {
|
||||
rcs := make([]io.ReadCloser, 0)
|
||||
rs := make([]io.Reader, 0)
|
||||
ls := make([]*fileutil.LockedFile, 0)
|
||||
for _, name := range names[nameIndex:] {
|
||||
p := filepath.Join(dirpath, name)
|
||||
if write {
|
||||
l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
|
||||
if err != nil {
|
||||
closeAll(lg, rcs...)
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
ls = append(ls, l)
|
||||
rcs = append(rcs, l)
|
||||
} else {
|
||||
rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
|
||||
if err != nil {
|
||||
closeAll(lg, rcs...)
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
ls = append(ls, nil)
|
||||
rcs = append(rcs, rf)
|
||||
}
|
||||
rs = append(rs, rcs[len(rcs)-1])
|
||||
}
|
||||
|
||||
closer := func() error { return closeAll(lg, rcs...) }
|
||||
|
||||
return rs, ls, closer, nil
|
||||
}
|
||||
|
||||
// ReadAll reads out records of the current WAL.
|
||||
// If opened in write mode, it must read out all records until EOF. Or an error
|
||||
// will be returned.
|
||||
// If opened in read mode, it will try to read all records if possible.
|
||||
// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
|
||||
// If loaded snap doesn't match with the expected one, it will return
|
||||
// all the records and error ErrSnapshotMismatch.
|
||||
// TODO: detect not-last-snap error.
|
||||
// TODO: maybe loose the checking of match.
|
||||
// After ReadAll, the WAL will be ready for appending new records.
|
||||
func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
rec := &walpb.Record{}
|
||||
|
||||
if w.decoder == nil {
|
||||
return nil, state, nil, ErrDecoderNotFound
|
||||
}
|
||||
decoder := w.decoder
|
||||
|
||||
var match bool
|
||||
for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
|
||||
switch rec.Type {
|
||||
case entryType:
|
||||
e := mustUnmarshalEntry(rec.Data)
|
||||
// 0 <= e.Index-w.start.Index - 1 < len(ents)
|
||||
if e.Index > w.start.Index {
|
||||
// prevent "panic: runtime error: slice bounds out of range [:13038096702221461992] with capacity 0"
|
||||
up := e.Index - w.start.Index - 1
|
||||
if up > uint64(len(ents)) {
|
||||
// return error before append call causes runtime panic
|
||||
return nil, state, nil, ErrSliceOutOfRange
|
||||
}
|
||||
ents = append(ents[:up], e)
|
||||
}
|
||||
w.enti = e.Index
|
||||
|
||||
case stateType:
|
||||
state = mustUnmarshalState(rec.Data)
|
||||
|
||||
case metadataType:
|
||||
if metadata != nil && !bytes.Equal(metadata, rec.Data) {
|
||||
state.Reset()
|
||||
return nil, state, nil, ErrMetadataConflict
|
||||
}
|
||||
metadata = rec.Data
|
||||
|
||||
case crcType:
|
||||
crc := decoder.crc.Sum32()
|
||||
// current crc of decoder must match the crc of the record.
|
||||
// do no need to match 0 crc, since the decoder is a new one at this case.
|
||||
if crc != 0 && rec.Validate(crc) != nil {
|
||||
state.Reset()
|
||||
return nil, state, nil, ErrCRCMismatch
|
||||
}
|
||||
decoder.updateCRC(rec.Crc)
|
||||
|
||||
case snapshotType:
|
||||
var snap walpb.Snapshot
|
||||
pbutil.MustUnmarshal(&snap, rec.Data)
|
||||
if snap.Index == w.start.Index {
|
||||
if snap.Term != w.start.Term {
|
||||
state.Reset()
|
||||
return nil, state, nil, ErrSnapshotMismatch
|
||||
}
|
||||
match = true
|
||||
}
|
||||
|
||||
default:
|
||||
state.Reset()
|
||||
return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
|
||||
}
|
||||
}
|
||||
|
||||
switch w.tail() {
|
||||
case nil:
|
||||
// We do not have to read out all entries in read mode.
|
||||
// The last record maybe a partial written one, so
|
||||
// ErrunexpectedEOF might be returned.
|
||||
if err != io.EOF && err != io.ErrUnexpectedEOF {
|
||||
state.Reset()
|
||||
return nil, state, nil, err
|
||||
}
|
||||
default:
|
||||
// We must read all of the entries if WAL is opened in write mode.
|
||||
if err != io.EOF {
|
||||
state.Reset()
|
||||
return nil, state, nil, err
|
||||
}
|
||||
// decodeRecord() will return io.EOF if it detects a zero record,
|
||||
// but this zero record may be followed by non-zero records from
|
||||
// a torn write. Overwriting some of these non-zero records, but
|
||||
// not all, will cause CRC errors on WAL open. Since the records
|
||||
// were never fully synced to disk in the first place, it's safe
|
||||
// to zero them out to avoid any CRC errors from new writes.
|
||||
if _, err = w.tail().Seek(w.decoder.lastOffset(), io.SeekStart); err != nil {
|
||||
return nil, state, nil, err
|
||||
}
|
||||
if err = fileutil.ZeroToEnd(w.tail().File); err != nil {
|
||||
return nil, state, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
err = nil
|
||||
if !match {
|
||||
err = ErrSnapshotNotFound
|
||||
}
|
||||
|
||||
// close decoder, disable reading
|
||||
if w.readClose != nil {
|
||||
w.readClose()
|
||||
w.readClose = nil
|
||||
}
|
||||
w.start = walpb.Snapshot{}
|
||||
|
||||
w.metadata = metadata
|
||||
|
||||
if w.tail() != nil {
|
||||
// create encoder (chain crc with the decoder), enable appending
|
||||
w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
w.decoder = nil
|
||||
|
||||
return metadata, state, ents, err
|
||||
}
|
||||
|
||||
// ValidSnapshotEntries returns all the valid snapshot entries in the wal logs in the given directory.
|
||||
// Snapshot entries are valid if their index is less than or equal to the most recent committed hardstate.
|
||||
func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, error) {
|
||||
var snaps []walpb.Snapshot
|
||||
var state raftpb.HardState
|
||||
var err error
|
||||
|
||||
rec := &walpb.Record{}
|
||||
names, err := readWALNames(lg, walDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// open wal files in read mode, so that there is no conflict
|
||||
// when the same WAL is opened elsewhere in write mode
|
||||
rs, _, closer, err := openWALFiles(lg, walDir, names, 0, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if closer != nil {
|
||||
closer()
|
||||
}
|
||||
}()
|
||||
|
||||
// create a new decoder from the readers on the WAL files
|
||||
decoder := newDecoder(rs...)
|
||||
|
||||
for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
|
||||
switch rec.Type {
|
||||
case snapshotType:
|
||||
var loadedSnap walpb.Snapshot
|
||||
pbutil.MustUnmarshal(&loadedSnap, rec.Data)
|
||||
snaps = append(snaps, loadedSnap)
|
||||
case stateType:
|
||||
state = mustUnmarshalState(rec.Data)
|
||||
case crcType:
|
||||
crc := decoder.crc.Sum32()
|
||||
// current crc of decoder must match the crc of the record.
|
||||
// do no need to match 0 crc, since the decoder is a new one at this case.
|
||||
if crc != 0 && rec.Validate(crc) != nil {
|
||||
return nil, ErrCRCMismatch
|
||||
}
|
||||
decoder.updateCRC(rec.Crc)
|
||||
}
|
||||
}
|
||||
// We do not have to read out all the WAL entries
|
||||
// as the decoder is opened in read mode.
|
||||
if err != io.EOF && err != io.ErrUnexpectedEOF {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// filter out any snaps that are newer than the committed hardstate
|
||||
n := 0
|
||||
for _, s := range snaps {
|
||||
if s.Index <= state.Commit {
|
||||
snaps[n] = s
|
||||
n++
|
||||
}
|
||||
}
|
||||
snaps = snaps[:n:n]
|
||||
|
||||
return snaps, nil
|
||||
}
|
||||
|
||||
// Verify reads through the given WAL and verifies that it is not corrupted.
|
||||
// It creates a new decoder to read through the records of the given WAL.
|
||||
// It does not conflict with any open WAL, but it is recommended not to
|
||||
// call this function after opening the WAL for writing.
|
||||
// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
|
||||
// If the loaded snap doesn't match with the expected one, it will
|
||||
// return error ErrSnapshotMismatch.
|
||||
func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) error {
|
||||
var metadata []byte
|
||||
var err error
|
||||
var match bool
|
||||
|
||||
rec := &walpb.Record{}
|
||||
|
||||
if lg == nil {
|
||||
lg = zap.NewNop()
|
||||
}
|
||||
names, nameIndex, err := selectWALFiles(lg, walDir, snap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// open wal files in read mode, so that there is no conflict
|
||||
// when the same WAL is opened elsewhere in write mode
|
||||
rs, _, closer, err := openWALFiles(lg, walDir, names, nameIndex, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if closer != nil {
|
||||
closer()
|
||||
}
|
||||
}()
|
||||
|
||||
// create a new decoder from the readers on the WAL files
|
||||
decoder := newDecoder(rs...)
|
||||
|
||||
for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
|
||||
switch rec.Type {
|
||||
case metadataType:
|
||||
if metadata != nil && !bytes.Equal(metadata, rec.Data) {
|
||||
return ErrMetadataConflict
|
||||
}
|
||||
metadata = rec.Data
|
||||
case crcType:
|
||||
crc := decoder.crc.Sum32()
|
||||
// Current crc of decoder must match the crc of the record.
|
||||
// We need not match 0 crc, since the decoder is a new one at this point.
|
||||
if crc != 0 && rec.Validate(crc) != nil {
|
||||
return ErrCRCMismatch
|
||||
}
|
||||
decoder.updateCRC(rec.Crc)
|
||||
case snapshotType:
|
||||
var loadedSnap walpb.Snapshot
|
||||
pbutil.MustUnmarshal(&loadedSnap, rec.Data)
|
||||
if loadedSnap.Index == snap.Index {
|
||||
if loadedSnap.Term != snap.Term {
|
||||
return ErrSnapshotMismatch
|
||||
}
|
||||
match = true
|
||||
}
|
||||
// We ignore all entry and state type records as these
|
||||
// are not necessary for validating the WAL contents
|
||||
case entryType:
|
||||
case stateType:
|
||||
default:
|
||||
return fmt.Errorf("unexpected block type %d", rec.Type)
|
||||
}
|
||||
}
|
||||
|
||||
// We do not have to read out all the WAL entries
|
||||
// as the decoder is opened in read mode.
|
||||
if err != io.EOF && err != io.ErrUnexpectedEOF {
|
||||
return err
|
||||
}
|
||||
|
||||
if !match {
|
||||
return ErrSnapshotNotFound
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// cut closes current file written and creates a new one ready to append.
|
||||
// cut first creates a temp wal file and writes necessary headers into it.
|
||||
// Then cut atomically rename temp wal file to a wal file.
|
||||
func (w *WAL) cut() error {
|
||||
// close old wal file; truncate to avoid wasting space if an early cut
|
||||
off, serr := w.tail().Seek(0, io.SeekCurrent)
|
||||
if serr != nil {
|
||||
return serr
|
||||
}
|
||||
|
||||
if err := w.tail().Truncate(off); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := w.sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
|
||||
|
||||
// create a temp wal file with name sequence + 1, or truncate the existing one
|
||||
newTail, err := w.fp.Open()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// update writer and save the previous crc
|
||||
w.locks = append(w.locks, newTail)
|
||||
prevCrc := w.encoder.crc.Sum32()
|
||||
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = w.saveCrc(prevCrc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = w.saveState(&w.state); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// atomically move temp wal file to wal file
|
||||
if err = w.sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
off, err = w.tail().Seek(0, io.SeekCurrent)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = os.Rename(newTail.Name(), fpath); err != nil {
|
||||
return err
|
||||
}
|
||||
start := time.Now()
|
||||
if err = fileutil.Fsync(w.dirFile); err != nil {
|
||||
return err
|
||||
}
|
||||
walFsyncSec.Observe(time.Since(start).Seconds())
|
||||
|
||||
// reopen newTail with its new path so calls to Name() match the wal filename format
|
||||
newTail.Close()
|
||||
|
||||
if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = newTail.Seek(off, io.SeekStart); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.locks[len(w.locks)-1] = newTail
|
||||
|
||||
prevCrc = w.encoder.crc.Sum32()
|
||||
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.lg.Info("created a new WAL segment", zap.String("path", fpath))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *WAL) sync() error {
|
||||
if w.unsafeNoSync {
|
||||
return nil
|
||||
}
|
||||
if w.encoder != nil {
|
||||
if err := w.encoder.flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
start := time.Now()
|
||||
err := fileutil.Fdatasync(w.tail().File)
|
||||
|
||||
took := time.Since(start)
|
||||
if took > warnSyncDuration {
|
||||
w.lg.Warn(
|
||||
"slow fdatasync",
|
||||
zap.Duration("took", took),
|
||||
zap.Duration("expected-duration", warnSyncDuration),
|
||||
)
|
||||
}
|
||||
walFsyncSec.Observe(took.Seconds())
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *WAL) Sync() error {
|
||||
return w.sync()
|
||||
}
|
||||
|
||||
// ReleaseLockTo releases the locks, which has smaller index than the given index
|
||||
// except the largest one among them.
|
||||
// For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release
|
||||
// lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4.
|
||||
func (w *WAL) ReleaseLockTo(index uint64) error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
if len(w.locks) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var smaller int
|
||||
found := false
|
||||
for i, l := range w.locks {
|
||||
_, lockIndex, err := parseWALName(filepath.Base(l.Name()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if lockIndex >= index {
|
||||
smaller = i - 1
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// if no lock index is greater than the release index, we can
|
||||
// release lock up to the last one(excluding).
|
||||
if !found {
|
||||
smaller = len(w.locks) - 1
|
||||
}
|
||||
|
||||
if smaller <= 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for i := 0; i < smaller; i++ {
|
||||
if w.locks[i] == nil {
|
||||
continue
|
||||
}
|
||||
w.locks[i].Close()
|
||||
}
|
||||
w.locks = w.locks[smaller:]
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the current WAL file and directory.
|
||||
func (w *WAL) Close() error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
if w.fp != nil {
|
||||
w.fp.Close()
|
||||
w.fp = nil
|
||||
}
|
||||
|
||||
if w.tail() != nil {
|
||||
if err := w.sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, l := range w.locks {
|
||||
if l == nil {
|
||||
continue
|
||||
}
|
||||
if err := l.Close(); err != nil {
|
||||
w.lg.Error("failed to close WAL", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
return w.dirFile.Close()
|
||||
}
|
||||
|
||||
func (w *WAL) saveEntry(e *raftpb.Entry) error {
|
||||
// TODO: add MustMarshalTo to reduce one allocation.
|
||||
b := pbutil.MustMarshal(e)
|
||||
rec := &walpb.Record{Type: entryType, Data: b}
|
||||
if err := w.encoder.encode(rec); err != nil {
|
||||
return err
|
||||
}
|
||||
w.enti = e.Index
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *WAL) saveState(s *raftpb.HardState) error {
|
||||
if raft.IsEmptyHardState(*s) {
|
||||
return nil
|
||||
}
|
||||
w.state = *s
|
||||
b := pbutil.MustMarshal(s)
|
||||
rec := &walpb.Record{Type: stateType, Data: b}
|
||||
return w.encoder.encode(rec)
|
||||
}
|
||||
|
||||
func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
// short cut, do not call sync
|
||||
if raft.IsEmptyHardState(st) && len(ents) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
mustSync := raft.MustSync(st, w.state, len(ents))
|
||||
|
||||
// TODO(xiangli): no more reference operator
|
||||
for i := range ents {
|
||||
if err := w.saveEntry(&ents[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := w.saveState(&st); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
curOff, err := w.tail().Seek(0, io.SeekCurrent)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if curOff < SegmentSizeBytes {
|
||||
if mustSync {
|
||||
return w.sync()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return w.cut()
|
||||
}
|
||||
|
||||
func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
|
||||
b := pbutil.MustMarshal(&e)
|
||||
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
rec := &walpb.Record{Type: snapshotType, Data: b}
|
||||
if err := w.encoder.encode(rec); err != nil {
|
||||
return err
|
||||
}
|
||||
// update enti only when snapshot is ahead of last index
|
||||
if w.enti < e.Index {
|
||||
w.enti = e.Index
|
||||
}
|
||||
return w.sync()
|
||||
}
|
||||
|
||||
func (w *WAL) saveCrc(prevCrc uint32) error {
|
||||
return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
|
||||
}
|
||||
|
||||
func (w *WAL) tail() *fileutil.LockedFile {
|
||||
if len(w.locks) > 0 {
|
||||
return w.locks[len(w.locks)-1]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *WAL) seq() uint64 {
|
||||
t := w.tail()
|
||||
if t == nil {
|
||||
return 0
|
||||
}
|
||||
seq, _, err := parseWALName(filepath.Base(t.Name()))
|
||||
if err != nil {
|
||||
w.lg.Fatal("failed to parse WAL name", zap.String("name", t.Name()), zap.Error(err))
|
||||
}
|
||||
return seq
|
||||
}
|
||||
|
||||
func closeAll(lg *zap.Logger, rcs ...io.ReadCloser) error {
|
||||
stringArr := make([]string, 0)
|
||||
for _, f := range rcs {
|
||||
if err := f.Close(); err != nil {
|
||||
lg.Warn("failed to close: ", zap.Error(err))
|
||||
stringArr = append(stringArr, err.Error())
|
||||
}
|
||||
}
|
||||
if len(stringArr) == 0 {
|
||||
return nil
|
||||
}
|
||||
return errors.New(strings.Join(stringArr, ", "))
|
||||
}
|
||||
70
server/wal/wal_bench_test.go
Normal file
70
server/wal/wal_bench_test.go
Normal file
@@ -0,0 +1,70 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package wal
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.etcd.io/etcd/raft/v3/raftpb"
|
||||
)
|
||||
|
||||
func BenchmarkWrite100EntryWithoutBatch(b *testing.B) { benchmarkWriteEntry(b, 100, 0) }
|
||||
func BenchmarkWrite100EntryBatch10(b *testing.B) { benchmarkWriteEntry(b, 100, 10) }
|
||||
func BenchmarkWrite100EntryBatch100(b *testing.B) { benchmarkWriteEntry(b, 100, 100) }
|
||||
func BenchmarkWrite100EntryBatch500(b *testing.B) { benchmarkWriteEntry(b, 100, 500) }
|
||||
func BenchmarkWrite100EntryBatch1000(b *testing.B) { benchmarkWriteEntry(b, 100, 1000) }
|
||||
|
||||
func BenchmarkWrite1000EntryWithoutBatch(b *testing.B) { benchmarkWriteEntry(b, 1000, 0) }
|
||||
func BenchmarkWrite1000EntryBatch10(b *testing.B) { benchmarkWriteEntry(b, 1000, 10) }
|
||||
func BenchmarkWrite1000EntryBatch100(b *testing.B) { benchmarkWriteEntry(b, 1000, 100) }
|
||||
func BenchmarkWrite1000EntryBatch500(b *testing.B) { benchmarkWriteEntry(b, 1000, 500) }
|
||||
func BenchmarkWrite1000EntryBatch1000(b *testing.B) { benchmarkWriteEntry(b, 1000, 1000) }
|
||||
|
||||
func benchmarkWriteEntry(b *testing.B, size int, batch int) {
|
||||
p, err := ioutil.TempDir(os.TempDir(), "waltest")
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(p)
|
||||
|
||||
w, err := Create(zap.NewExample(), p, []byte("somedata"))
|
||||
if err != nil {
|
||||
b.Fatalf("err = %v, want nil", err)
|
||||
}
|
||||
data := make([]byte, size)
|
||||
for i := 0; i < size; i++ {
|
||||
data[i] = byte(i)
|
||||
}
|
||||
e := &raftpb.Entry{Data: data}
|
||||
|
||||
b.ResetTimer()
|
||||
n := 0
|
||||
b.SetBytes(int64(e.Size()))
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := w.saveEntry(e)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
n++
|
||||
if n > batch {
|
||||
w.sync()
|
||||
n = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
1111
server/wal/wal_test.go
Normal file
1111
server/wal/wal_test.go
Normal file
File diff suppressed because it is too large
Load Diff
29
server/wal/walpb/record.go
Normal file
29
server/wal/walpb/record.go
Normal file
@@ -0,0 +1,29 @@
|
||||
// Copyright 2015 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package walpb
|
||||
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
ErrCRCMismatch = errors.New("walpb: crc mismatch")
|
||||
)
|
||||
|
||||
func (rec *Record) Validate(crc uint32) error {
|
||||
if rec.Crc == crc {
|
||||
return nil
|
||||
}
|
||||
rec.Reset()
|
||||
return ErrCRCMismatch
|
||||
}
|
||||
556
server/wal/walpb/record.pb.go
Normal file
556
server/wal/walpb/record.pb.go
Normal file
@@ -0,0 +1,556 @@
|
||||
// Code generated by protoc-gen-gogo. DO NOT EDIT.
|
||||
// source: record.proto
|
||||
|
||||
package walpb
|
||||
|
||||
import (
|
||||
fmt "fmt"
|
||||
io "io"
|
||||
math "math"
|
||||
math_bits "math/bits"
|
||||
|
||||
_ "github.com/gogo/protobuf/gogoproto"
|
||||
proto "github.com/golang/protobuf/proto"
|
||||
)
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the proto package it is being compiled against.
|
||||
// A compilation error at this line likely means your copy of the
|
||||
// proto package needs to be updated.
|
||||
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
|
||||
|
||||
type Record struct {
|
||||
Type int64 `protobuf:"varint,1,opt,name=type" json:"type"`
|
||||
Crc uint32 `protobuf:"varint,2,opt,name=crc" json:"crc"`
|
||||
Data []byte `protobuf:"bytes,3,opt,name=data" json:"data,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Record) Reset() { *m = Record{} }
|
||||
func (m *Record) String() string { return proto.CompactTextString(m) }
|
||||
func (*Record) ProtoMessage() {}
|
||||
func (*Record) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_bf94fd919e302a1d, []int{0}
|
||||
}
|
||||
func (m *Record) XXX_Unmarshal(b []byte) error {
|
||||
return m.Unmarshal(b)
|
||||
}
|
||||
func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
if deterministic {
|
||||
return xxx_messageInfo_Record.Marshal(b, m, deterministic)
|
||||
} else {
|
||||
b = b[:cap(b)]
|
||||
n, err := m.MarshalToSizedBuffer(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b[:n], nil
|
||||
}
|
||||
}
|
||||
func (m *Record) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_Record.Merge(m, src)
|
||||
}
|
||||
func (m *Record) XXX_Size() int {
|
||||
return m.Size()
|
||||
}
|
||||
func (m *Record) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_Record.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_Record proto.InternalMessageInfo
|
||||
|
||||
type Snapshot struct {
|
||||
Index uint64 `protobuf:"varint,1,opt,name=index" json:"index"`
|
||||
Term uint64 `protobuf:"varint,2,opt,name=term" json:"term"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Snapshot) Reset() { *m = Snapshot{} }
|
||||
func (m *Snapshot) String() string { return proto.CompactTextString(m) }
|
||||
func (*Snapshot) ProtoMessage() {}
|
||||
func (*Snapshot) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_bf94fd919e302a1d, []int{1}
|
||||
}
|
||||
func (m *Snapshot) XXX_Unmarshal(b []byte) error {
|
||||
return m.Unmarshal(b)
|
||||
}
|
||||
func (m *Snapshot) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
if deterministic {
|
||||
return xxx_messageInfo_Snapshot.Marshal(b, m, deterministic)
|
||||
} else {
|
||||
b = b[:cap(b)]
|
||||
n, err := m.MarshalToSizedBuffer(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b[:n], nil
|
||||
}
|
||||
}
|
||||
func (m *Snapshot) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_Snapshot.Merge(m, src)
|
||||
}
|
||||
func (m *Snapshot) XXX_Size() int {
|
||||
return m.Size()
|
||||
}
|
||||
func (m *Snapshot) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_Snapshot.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_Snapshot proto.InternalMessageInfo
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*Record)(nil), "walpb.Record")
|
||||
proto.RegisterType((*Snapshot)(nil), "walpb.Snapshot")
|
||||
}
|
||||
|
||||
func init() { proto.RegisterFile("record.proto", fileDescriptor_bf94fd919e302a1d) }
|
||||
|
||||
var fileDescriptor_bf94fd919e302a1d = []byte{
|
||||
// 186 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0x4a, 0x4d, 0xce,
|
||||
0x2f, 0x4a, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2d, 0x4f, 0xcc, 0x29, 0x48, 0x92,
|
||||
0x12, 0x49, 0xcf, 0x4f, 0xcf, 0x07, 0x8b, 0xe8, 0x83, 0x58, 0x10, 0x49, 0x25, 0x3f, 0x2e, 0xb6,
|
||||
0x20, 0xb0, 0x62, 0x21, 0x09, 0x2e, 0x96, 0x92, 0xca, 0x82, 0x54, 0x09, 0x46, 0x05, 0x46, 0x0d,
|
||||
0x66, 0x27, 0x96, 0x13, 0xf7, 0xe4, 0x19, 0x82, 0xc0, 0x22, 0x42, 0x62, 0x5c, 0xcc, 0xc9, 0x45,
|
||||
0xc9, 0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0xbc, 0x50, 0x09, 0x90, 0x80, 0x90, 0x10, 0x17, 0x4b, 0x4a,
|
||||
0x62, 0x49, 0xa2, 0x04, 0xb3, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x98, 0xad, 0xe4, 0xc0, 0xc5, 0x11,
|
||||
0x9c, 0x97, 0x58, 0x50, 0x9c, 0x91, 0x5f, 0x22, 0x24, 0xc5, 0xc5, 0x9a, 0x99, 0x97, 0x92, 0x5a,
|
||||
0x01, 0x36, 0x92, 0x05, 0xaa, 0x13, 0x22, 0x04, 0xb6, 0x2d, 0xb5, 0x28, 0x17, 0x6c, 0x28, 0x0b,
|
||||
0xdc, 0xb6, 0xd4, 0xa2, 0x5c, 0x27, 0x91, 0x13, 0x0f, 0xe5, 0x18, 0x4e, 0x3c, 0x92, 0x63, 0xbc,
|
||||
0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x19, 0x8f, 0xe5, 0x18, 0x00, 0x01, 0x00, 0x00,
|
||||
0xff, 0xff, 0x7f, 0x5e, 0x5c, 0x46, 0xd3, 0x00, 0x00, 0x00,
|
||||
}
|
||||
|
||||
func (m *Record) Marshal() (dAtA []byte, err error) {
|
||||
size := m.Size()
|
||||
dAtA = make([]byte, size)
|
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dAtA[:n], nil
|
||||
}
|
||||
|
||||
func (m *Record) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *Record) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
if m.XXX_unrecognized != nil {
|
||||
i -= len(m.XXX_unrecognized)
|
||||
copy(dAtA[i:], m.XXX_unrecognized)
|
||||
}
|
||||
if m.Data != nil {
|
||||
i -= len(m.Data)
|
||||
copy(dAtA[i:], m.Data)
|
||||
i = encodeVarintRecord(dAtA, i, uint64(len(m.Data)))
|
||||
i--
|
||||
dAtA[i] = 0x1a
|
||||
}
|
||||
i = encodeVarintRecord(dAtA, i, uint64(m.Crc))
|
||||
i--
|
||||
dAtA[i] = 0x10
|
||||
i = encodeVarintRecord(dAtA, i, uint64(m.Type))
|
||||
i--
|
||||
dAtA[i] = 0x8
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func (m *Snapshot) Marshal() (dAtA []byte, err error) {
|
||||
size := m.Size()
|
||||
dAtA = make([]byte, size)
|
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dAtA[:n], nil
|
||||
}
|
||||
|
||||
func (m *Snapshot) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *Snapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
if m.XXX_unrecognized != nil {
|
||||
i -= len(m.XXX_unrecognized)
|
||||
copy(dAtA[i:], m.XXX_unrecognized)
|
||||
}
|
||||
i = encodeVarintRecord(dAtA, i, uint64(m.Term))
|
||||
i--
|
||||
dAtA[i] = 0x10
|
||||
i = encodeVarintRecord(dAtA, i, uint64(m.Index))
|
||||
i--
|
||||
dAtA[i] = 0x8
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func encodeVarintRecord(dAtA []byte, offset int, v uint64) int {
|
||||
offset -= sovRecord(v)
|
||||
base := offset
|
||||
for v >= 1<<7 {
|
||||
dAtA[offset] = uint8(v&0x7f | 0x80)
|
||||
v >>= 7
|
||||
offset++
|
||||
}
|
||||
dAtA[offset] = uint8(v)
|
||||
return base
|
||||
}
|
||||
func (m *Record) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
var l int
|
||||
_ = l
|
||||
n += 1 + sovRecord(uint64(m.Type))
|
||||
n += 1 + sovRecord(uint64(m.Crc))
|
||||
if m.Data != nil {
|
||||
l = len(m.Data)
|
||||
n += 1 + l + sovRecord(uint64(l))
|
||||
}
|
||||
if m.XXX_unrecognized != nil {
|
||||
n += len(m.XXX_unrecognized)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (m *Snapshot) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
var l int
|
||||
_ = l
|
||||
n += 1 + sovRecord(uint64(m.Index))
|
||||
n += 1 + sovRecord(uint64(m.Term))
|
||||
if m.XXX_unrecognized != nil {
|
||||
n += len(m.XXX_unrecognized)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func sovRecord(x uint64) (n int) {
|
||||
return (math_bits.Len64(x|1) + 6) / 7
|
||||
}
|
||||
func sozRecord(x uint64) (n int) {
|
||||
return sovRecord(uint64((x << 1) ^ uint64((int64(x) >> 63))))
|
||||
}
|
||||
func (m *Record) Unmarshal(dAtA []byte) error {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
preIndex := iNdEx
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowRecord
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
if wireType == 4 {
|
||||
return fmt.Errorf("proto: Record: wiretype end group for non-group")
|
||||
}
|
||||
if fieldNum <= 0 {
|
||||
return fmt.Errorf("proto: Record: illegal tag %d (wire type %d)", fieldNum, wire)
|
||||
}
|
||||
switch fieldNum {
|
||||
case 1:
|
||||
if wireType != 0 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType)
|
||||
}
|
||||
m.Type = 0
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowRecord
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
m.Type |= int64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
case 2:
|
||||
if wireType != 0 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Crc", wireType)
|
||||
}
|
||||
m.Crc = 0
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowRecord
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
m.Crc |= uint32(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
case 3:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType)
|
||||
}
|
||||
var byteLen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowRecord
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
byteLen |= int(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if byteLen < 0 {
|
||||
return ErrInvalidLengthRecord
|
||||
}
|
||||
postIndex := iNdEx + byteLen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthRecord
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...)
|
||||
if m.Data == nil {
|
||||
m.Data = []byte{}
|
||||
}
|
||||
iNdEx = postIndex
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipRecord(dAtA[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if skippy < 0 {
|
||||
return ErrInvalidLengthRecord
|
||||
}
|
||||
if (iNdEx + skippy) < 0 {
|
||||
return ErrInvalidLengthRecord
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
|
||||
if iNdEx > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (m *Snapshot) Unmarshal(dAtA []byte) error {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
preIndex := iNdEx
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowRecord
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
if wireType == 4 {
|
||||
return fmt.Errorf("proto: Snapshot: wiretype end group for non-group")
|
||||
}
|
||||
if fieldNum <= 0 {
|
||||
return fmt.Errorf("proto: Snapshot: illegal tag %d (wire type %d)", fieldNum, wire)
|
||||
}
|
||||
switch fieldNum {
|
||||
case 1:
|
||||
if wireType != 0 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType)
|
||||
}
|
||||
m.Index = 0
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowRecord
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
m.Index |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
case 2:
|
||||
if wireType != 0 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Term", wireType)
|
||||
}
|
||||
m.Term = 0
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowRecord
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
m.Term |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipRecord(dAtA[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if skippy < 0 {
|
||||
return ErrInvalidLengthRecord
|
||||
}
|
||||
if (iNdEx + skippy) < 0 {
|
||||
return ErrInvalidLengthRecord
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
|
||||
if iNdEx > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func skipRecord(dAtA []byte) (n int, err error) {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
depth := 0
|
||||
for iNdEx < l {
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return 0, ErrIntOverflowRecord
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
wireType := int(wire & 0x7)
|
||||
switch wireType {
|
||||
case 0:
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return 0, ErrIntOverflowRecord
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
iNdEx++
|
||||
if dAtA[iNdEx-1] < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
case 1:
|
||||
iNdEx += 8
|
||||
case 2:
|
||||
var length int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return 0, ErrIntOverflowRecord
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
length |= (int(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if length < 0 {
|
||||
return 0, ErrInvalidLengthRecord
|
||||
}
|
||||
iNdEx += length
|
||||
case 3:
|
||||
depth++
|
||||
case 4:
|
||||
if depth == 0 {
|
||||
return 0, ErrUnexpectedEndOfGroupRecord
|
||||
}
|
||||
depth--
|
||||
case 5:
|
||||
iNdEx += 4
|
||||
default:
|
||||
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
|
||||
}
|
||||
if iNdEx < 0 {
|
||||
return 0, ErrInvalidLengthRecord
|
||||
}
|
||||
if depth == 0 {
|
||||
return iNdEx, nil
|
||||
}
|
||||
}
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
|
||||
var (
|
||||
ErrInvalidLengthRecord = fmt.Errorf("proto: negative length found during unmarshaling")
|
||||
ErrIntOverflowRecord = fmt.Errorf("proto: integer overflow")
|
||||
ErrUnexpectedEndOfGroupRecord = fmt.Errorf("proto: unexpected end of group")
|
||||
)
|
||||
20
server/wal/walpb/record.proto
Normal file
20
server/wal/walpb/record.proto
Normal file
@@ -0,0 +1,20 @@
|
||||
syntax = "proto2";
|
||||
package walpb;
|
||||
|
||||
import "gogoproto/gogo.proto";
|
||||
|
||||
option (gogoproto.marshaler_all) = true;
|
||||
option (gogoproto.sizer_all) = true;
|
||||
option (gogoproto.unmarshaler_all) = true;
|
||||
option (gogoproto.goproto_getters_all) = false;
|
||||
|
||||
message Record {
|
||||
optional int64 type = 1 [(gogoproto.nullable) = false];
|
||||
optional uint32 crc = 2 [(gogoproto.nullable) = false];
|
||||
optional bytes data = 3;
|
||||
}
|
||||
|
||||
message Snapshot {
|
||||
optional uint64 index = 1 [(gogoproto.nullable) = false];
|
||||
optional uint64 term = 2 [(gogoproto.nullable) = false];
|
||||
}
|
||||
Reference in New Issue
Block a user