mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
638 lines
16 KiB
Go
638 lines
16 KiB
Go
// Copyright 2015 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package wal
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"hash/crc32"
|
|
"io"
|
|
"os"
|
|
"path"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/coreos/etcd/pkg/fileutil"
|
|
"github.com/coreos/etcd/pkg/pbutil"
|
|
"github.com/coreos/etcd/raft"
|
|
"github.com/coreos/etcd/raft/raftpb"
|
|
"github.com/coreos/etcd/wal/walpb"
|
|
|
|
"github.com/coreos/pkg/capnslog"
|
|
)
|
|
|
|
const (
|
|
metadataType int64 = iota + 1
|
|
entryType
|
|
stateType
|
|
crcType
|
|
snapshotType
|
|
|
|
// warnSyncDuration is the amount of time allotted to an fsync before
|
|
// logging a warning
|
|
warnSyncDuration = time.Second
|
|
)
|
|
|
|
var (
|
|
// SegmentSizeBytes is the preallocated size of each wal segment file.
|
|
// The actual size might be larger than this. In general, the default
|
|
// value should be used, but this is defined as an exported variable
|
|
// so that tests can set a different segment size.
|
|
SegmentSizeBytes int64 = 64 * 1000 * 1000 // 64MB
|
|
|
|
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "wal")
|
|
|
|
ErrMetadataConflict = errors.New("wal: conflicting metadata found")
|
|
ErrFileNotFound = errors.New("wal: file not found")
|
|
ErrCRCMismatch = errors.New("wal: crc mismatch")
|
|
ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
|
|
ErrSnapshotNotFound = errors.New("wal: snapshot not found")
|
|
crcTable = crc32.MakeTable(crc32.Castagnoli)
|
|
)
|
|
|
|
// WAL is a logical representation of the stable storage.
|
|
// WAL is either in read mode or append mode but not both.
|
|
// A newly created WAL is in append mode, and ready for appending records.
|
|
// A just opened WAL is in read mode, and ready for reading records.
|
|
// The WAL will be ready for appending after reading out all the previous records.
|
|
type WAL struct {
|
|
dir string // the living directory of the underlay files
|
|
|
|
// dirFile is a fd for the wal directory for syncing on Rename
|
|
dirFile *os.File
|
|
|
|
metadata []byte // metadata recorded at the head of each WAL
|
|
state raftpb.HardState // hardstate recorded at the head of WAL
|
|
|
|
start walpb.Snapshot // snapshot to start reading
|
|
decoder *decoder // decoder to decode records
|
|
readClose func() error // closer for decode reader
|
|
|
|
mu sync.Mutex
|
|
enti uint64 // index of the last entry saved to the wal
|
|
encoder *encoder // encoder to encode records
|
|
|
|
locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
|
|
fp *filePipeline
|
|
}
|
|
|
|
// Create creates a WAL ready for appending records. The given metadata is
|
|
// recorded at the head of each WAL file, and can be retrieved with ReadAll.
|
|
func Create(dirpath string, metadata []byte) (*WAL, error) {
|
|
if Exist(dirpath) {
|
|
return nil, os.ErrExist
|
|
}
|
|
|
|
// keep temporary wal directory so WAL initialization appears atomic
|
|
tmpdirpath := path.Clean(dirpath) + ".tmp"
|
|
if fileutil.Exist(tmpdirpath) {
|
|
if err := os.RemoveAll(tmpdirpath); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if err := fileutil.CreateDirAll(tmpdirpath); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
p := path.Join(tmpdirpath, walName(0, 0))
|
|
f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if _, err = f.Seek(0, os.SEEK_END); err != nil {
|
|
return nil, err
|
|
}
|
|
if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
w := &WAL{
|
|
dir: dirpath,
|
|
metadata: metadata,
|
|
}
|
|
w.encoder, err = newFileEncoder(f.File, 0)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
w.locks = append(w.locks, f)
|
|
if err = w.saveCrc(0); err != nil {
|
|
return nil, err
|
|
}
|
|
if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
|
|
return nil, err
|
|
}
|
|
if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if w, err = w.renameWal(tmpdirpath); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// directory was renamed; sync parent dir to persist rename
|
|
pdir, perr := fileutil.OpenDir(path.Dir(w.dir))
|
|
if perr != nil {
|
|
return nil, perr
|
|
}
|
|
if perr = fileutil.Fsync(pdir); perr != nil {
|
|
return nil, perr
|
|
}
|
|
if perr = pdir.Close(); err != nil {
|
|
return nil, perr
|
|
}
|
|
|
|
return w, nil
|
|
}
|
|
|
|
// Open opens the WAL at the given snap.
|
|
// The snap SHOULD have been previously saved to the WAL, or the following
|
|
// ReadAll will fail.
|
|
// The returned WAL is ready to read and the first record will be the one after
|
|
// the given snap. The WAL cannot be appended to before reading out all of its
|
|
// previous records.
|
|
func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
|
|
w, err := openAtIndex(dirpath, snap, true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {
|
|
return nil, err
|
|
}
|
|
return w, nil
|
|
}
|
|
|
|
// OpenForRead only opens the wal files for read.
|
|
// Write on a read only wal panics.
|
|
func OpenForRead(dirpath string, snap walpb.Snapshot) (*WAL, error) {
|
|
return openAtIndex(dirpath, snap, false)
|
|
}
|
|
|
|
func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
|
|
names, err := readWalNames(dirpath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
nameIndex, ok := searchIndex(names, snap.Index)
|
|
if !ok || !isValidSeq(names[nameIndex:]) {
|
|
return nil, ErrFileNotFound
|
|
}
|
|
|
|
// open the wal files
|
|
rcs := make([]io.ReadCloser, 0)
|
|
rs := make([]io.Reader, 0)
|
|
ls := make([]*fileutil.LockedFile, 0)
|
|
for _, name := range names[nameIndex:] {
|
|
p := path.Join(dirpath, name)
|
|
if write {
|
|
l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
|
|
if err != nil {
|
|
closeAll(rcs...)
|
|
return nil, err
|
|
}
|
|
ls = append(ls, l)
|
|
rcs = append(rcs, l)
|
|
} else {
|
|
rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
|
|
if err != nil {
|
|
closeAll(rcs...)
|
|
return nil, err
|
|
}
|
|
ls = append(ls, nil)
|
|
rcs = append(rcs, rf)
|
|
}
|
|
rs = append(rs, rcs[len(rcs)-1])
|
|
}
|
|
|
|
closer := func() error { return closeAll(rcs...) }
|
|
|
|
// create a WAL ready for reading
|
|
w := &WAL{
|
|
dir: dirpath,
|
|
start: snap,
|
|
decoder: newDecoder(rs...),
|
|
readClose: closer,
|
|
locks: ls,
|
|
}
|
|
|
|
if write {
|
|
// write reuses the file descriptors from read; don't close so
|
|
// WAL can append without dropping the file lock
|
|
w.readClose = nil
|
|
if _, _, err := parseWalName(path.Base(w.tail().Name())); err != nil {
|
|
closer()
|
|
return nil, err
|
|
}
|
|
w.fp = newFilePipeline(w.dir, SegmentSizeBytes)
|
|
}
|
|
|
|
return w, nil
|
|
}
|
|
|
|
// ReadAll reads out records of the current WAL.
|
|
// If opened in write mode, it must read out all records until EOF. Or an error
|
|
// will be returned.
|
|
// If opened in read mode, it will try to read all records if possible.
|
|
// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
|
|
// If loaded snap doesn't match with the expected one, it will return
|
|
// all the records and error ErrSnapshotMismatch.
|
|
// TODO: detect not-last-snap error.
|
|
// TODO: maybe loose the checking of match.
|
|
// After ReadAll, the WAL will be ready for appending new records.
|
|
func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
rec := &walpb.Record{}
|
|
decoder := w.decoder
|
|
|
|
var match bool
|
|
for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
|
|
switch rec.Type {
|
|
case entryType:
|
|
e := mustUnmarshalEntry(rec.Data)
|
|
if e.Index > w.start.Index {
|
|
ents = append(ents[:e.Index-w.start.Index-1], e)
|
|
}
|
|
w.enti = e.Index
|
|
case stateType:
|
|
state = mustUnmarshalState(rec.Data)
|
|
case metadataType:
|
|
if metadata != nil && !bytes.Equal(metadata, rec.Data) {
|
|
state.Reset()
|
|
return nil, state, nil, ErrMetadataConflict
|
|
}
|
|
metadata = rec.Data
|
|
case crcType:
|
|
crc := decoder.crc.Sum32()
|
|
// current crc of decoder must match the crc of the record.
|
|
// do no need to match 0 crc, since the decoder is a new one at this case.
|
|
if crc != 0 && rec.Validate(crc) != nil {
|
|
state.Reset()
|
|
return nil, state, nil, ErrCRCMismatch
|
|
}
|
|
decoder.updateCRC(rec.Crc)
|
|
case snapshotType:
|
|
var snap walpb.Snapshot
|
|
pbutil.MustUnmarshal(&snap, rec.Data)
|
|
if snap.Index == w.start.Index {
|
|
if snap.Term != w.start.Term {
|
|
state.Reset()
|
|
return nil, state, nil, ErrSnapshotMismatch
|
|
}
|
|
match = true
|
|
}
|
|
default:
|
|
state.Reset()
|
|
return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
|
|
}
|
|
}
|
|
|
|
switch w.tail() {
|
|
case nil:
|
|
// We do not have to read out all entries in read mode.
|
|
// The last record maybe a partial written one, so
|
|
// ErrunexpectedEOF might be returned.
|
|
if err != io.EOF && err != io.ErrUnexpectedEOF {
|
|
state.Reset()
|
|
return nil, state, nil, err
|
|
}
|
|
default:
|
|
// We must read all of the entries if WAL is opened in write mode.
|
|
if err != io.EOF {
|
|
state.Reset()
|
|
return nil, state, nil, err
|
|
}
|
|
// decodeRecord() will return io.EOF if it detects a zero record,
|
|
// but this zero record may be followed by non-zero records from
|
|
// a torn write. Overwriting some of these non-zero records, but
|
|
// not all, will cause CRC errors on WAL open. Since the records
|
|
// were never fully synced to disk in the first place, it's safe
|
|
// to zero them out to avoid any CRC errors from new writes.
|
|
if _, err = w.tail().Seek(w.decoder.lastOffset(), os.SEEK_SET); err != nil {
|
|
return nil, state, nil, err
|
|
}
|
|
if err = fileutil.ZeroToEnd(w.tail().File); err != nil {
|
|
return nil, state, nil, err
|
|
}
|
|
}
|
|
|
|
err = nil
|
|
if !match {
|
|
err = ErrSnapshotNotFound
|
|
}
|
|
|
|
// close decoder, disable reading
|
|
if w.readClose != nil {
|
|
w.readClose()
|
|
w.readClose = nil
|
|
}
|
|
w.start = walpb.Snapshot{}
|
|
|
|
w.metadata = metadata
|
|
|
|
if w.tail() != nil {
|
|
// create encoder (chain crc with the decoder), enable appending
|
|
w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC())
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
w.decoder = nil
|
|
|
|
return metadata, state, ents, err
|
|
}
|
|
|
|
// cut closes current file written and creates a new one ready to append.
|
|
// cut first creates a temp wal file and writes necessary headers into it.
|
|
// Then cut atomically rename temp wal file to a wal file.
|
|
func (w *WAL) cut() error {
|
|
// close old wal file; truncate to avoid wasting space if an early cut
|
|
off, serr := w.tail().Seek(0, os.SEEK_CUR)
|
|
if serr != nil {
|
|
return serr
|
|
}
|
|
if err := w.tail().Truncate(off); err != nil {
|
|
return err
|
|
}
|
|
if err := w.sync(); err != nil {
|
|
return err
|
|
}
|
|
|
|
fpath := path.Join(w.dir, walName(w.seq()+1, w.enti+1))
|
|
|
|
// create a temp wal file with name sequence + 1, or truncate the existing one
|
|
newTail, err := w.fp.Open()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// update writer and save the previous crc
|
|
w.locks = append(w.locks, newTail)
|
|
prevCrc := w.encoder.crc.Sum32()
|
|
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err = w.saveCrc(prevCrc); err != nil {
|
|
return err
|
|
}
|
|
if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
|
|
return err
|
|
}
|
|
if err = w.saveState(&w.state); err != nil {
|
|
return err
|
|
}
|
|
// atomically move temp wal file to wal file
|
|
if err = w.sync(); err != nil {
|
|
return err
|
|
}
|
|
|
|
off, err = w.tail().Seek(0, os.SEEK_CUR)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = os.Rename(newTail.Name(), fpath); err != nil {
|
|
return err
|
|
}
|
|
if err = fileutil.Fsync(w.dirFile); err != nil {
|
|
return err
|
|
}
|
|
|
|
newTail.Close()
|
|
|
|
if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
|
|
return err
|
|
}
|
|
if _, err = newTail.Seek(off, os.SEEK_SET); err != nil {
|
|
return err
|
|
}
|
|
|
|
w.locks[len(w.locks)-1] = newTail
|
|
|
|
prevCrc = w.encoder.crc.Sum32()
|
|
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
plog.Infof("segmented wal file %v is created", fpath)
|
|
return nil
|
|
}
|
|
|
|
func (w *WAL) sync() error {
|
|
if w.encoder != nil {
|
|
if err := w.encoder.flush(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
start := time.Now()
|
|
err := fileutil.Fdatasync(w.tail().File)
|
|
|
|
duration := time.Since(start)
|
|
if duration > warnSyncDuration {
|
|
plog.Warningf("sync duration of %v, expected less than %v", duration, warnSyncDuration)
|
|
}
|
|
syncDurations.Observe(duration.Seconds())
|
|
|
|
return err
|
|
}
|
|
|
|
// ReleaseLockTo releases the locks, which has smaller index than the given index
|
|
// except the largest one among them.
|
|
// For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release
|
|
// lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4.
|
|
func (w *WAL) ReleaseLockTo(index uint64) error {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
var smaller int
|
|
found := false
|
|
|
|
for i, l := range w.locks {
|
|
_, lockIndex, err := parseWalName(path.Base(l.Name()))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if lockIndex >= index {
|
|
smaller = i - 1
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
|
|
// if no lock index is greater than the release index, we can
|
|
// release lock up to the last one(excluding).
|
|
if !found && len(w.locks) != 0 {
|
|
smaller = len(w.locks) - 1
|
|
}
|
|
|
|
if smaller <= 0 {
|
|
return nil
|
|
}
|
|
|
|
for i := 0; i < smaller; i++ {
|
|
if w.locks[i] == nil {
|
|
continue
|
|
}
|
|
w.locks[i].Close()
|
|
}
|
|
w.locks = w.locks[smaller:]
|
|
|
|
return nil
|
|
}
|
|
|
|
func (w *WAL) Close() error {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
if w.fp != nil {
|
|
w.fp.Close()
|
|
w.fp = nil
|
|
}
|
|
|
|
if w.tail() != nil {
|
|
if err := w.sync(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
for _, l := range w.locks {
|
|
if l == nil {
|
|
continue
|
|
}
|
|
if err := l.Close(); err != nil {
|
|
plog.Errorf("failed to unlock during closing wal: %s", err)
|
|
}
|
|
}
|
|
|
|
return w.dirFile.Close()
|
|
}
|
|
|
|
func (w *WAL) saveEntry(e *raftpb.Entry) error {
|
|
// TODO: add MustMarshalTo to reduce one allocation.
|
|
b := pbutil.MustMarshal(e)
|
|
rec := &walpb.Record{Type: entryType, Data: b}
|
|
if err := w.encoder.encode(rec); err != nil {
|
|
return err
|
|
}
|
|
w.enti = e.Index
|
|
return nil
|
|
}
|
|
|
|
func (w *WAL) saveState(s *raftpb.HardState) error {
|
|
if raft.IsEmptyHardState(*s) {
|
|
return nil
|
|
}
|
|
w.state = *s
|
|
b := pbutil.MustMarshal(s)
|
|
rec := &walpb.Record{Type: stateType, Data: b}
|
|
return w.encoder.encode(rec)
|
|
}
|
|
|
|
func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
// short cut, do not call sync
|
|
if raft.IsEmptyHardState(st) && len(ents) == 0 {
|
|
return nil
|
|
}
|
|
|
|
mustSync := mustSync(st, w.state, len(ents))
|
|
|
|
// TODO(xiangli): no more reference operator
|
|
for i := range ents {
|
|
if err := w.saveEntry(&ents[i]); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if err := w.saveState(&st); err != nil {
|
|
return err
|
|
}
|
|
|
|
curOff, err := w.tail().Seek(0, os.SEEK_CUR)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if curOff < SegmentSizeBytes {
|
|
if mustSync {
|
|
return w.sync()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
return w.cut()
|
|
}
|
|
|
|
func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
|
|
b := pbutil.MustMarshal(&e)
|
|
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
rec := &walpb.Record{Type: snapshotType, Data: b}
|
|
if err := w.encoder.encode(rec); err != nil {
|
|
return err
|
|
}
|
|
// update enti only when snapshot is ahead of last index
|
|
if w.enti < e.Index {
|
|
w.enti = e.Index
|
|
}
|
|
return w.sync()
|
|
}
|
|
|
|
func (w *WAL) saveCrc(prevCrc uint32) error {
|
|
return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
|
|
}
|
|
|
|
func (w *WAL) tail() *fileutil.LockedFile {
|
|
if len(w.locks) > 0 {
|
|
return w.locks[len(w.locks)-1]
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *WAL) seq() uint64 {
|
|
t := w.tail()
|
|
if t == nil {
|
|
return 0
|
|
}
|
|
seq, _, err := parseWalName(path.Base(t.Name()))
|
|
if err != nil {
|
|
plog.Fatalf("bad wal name %s (%v)", t.Name(), err)
|
|
}
|
|
return seq
|
|
}
|
|
|
|
func mustSync(st, prevst raftpb.HardState, entsnum int) bool {
|
|
// Persistent state on all servers:
|
|
// (Updated on stable storage before responding to RPCs)
|
|
// currentTerm
|
|
// votedFor
|
|
// log entries[]
|
|
return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
|
|
}
|
|
|
|
func closeAll(rcs ...io.ReadCloser) error {
|
|
for _, f := range rcs {
|
|
if err := f.Close(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|