From 5ede18be7401f034679c175ae6eb1aef884debbe Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 23 Jan 2015 10:50:43 -0800 Subject: [PATCH] raft: separate compact and createsnap in memory storage --- etcdserver/server.go | 25 +++-- etcdserver/server_test.go | 12 +- raft/log_test.go | 4 +- raft/raft_test.go | 3 +- raft/storage.go | 70 ++++++++---- raft/storage_test.go | 223 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 294 insertions(+), 43 deletions(-) create mode 100644 raft/storage_test.go diff --git a/etcdserver/server.go b/etcdserver/server.go index bfd663a9f..cc8f527d5 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -817,27 +817,32 @@ func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) { if err != nil { log.Panicf("etcdserver: store save should never fail: %v", err) } - err = s.r.raftStorage.Compact(snapi, confState, d) + snap, err := s.r.raftStorage.CreateSnapshot(snapi, confState, d) if err != nil { // the snapshot was done asynchronously with the progress of raft. - // raft might have already got a newer snapshot and called compact. + // raft might have already got a newer snapshot. + if err == raft.ErrSnapOutOfDate { + return + } + log.Panicf("etcdserver: unexpected create snapshot error %v", err) + } + if err := s.r.storage.SaveSnap(snap); err != nil { + log.Fatalf("etcdserver: save snapshot error: %v", err) + } + + err = s.r.raftStorage.Compact(snapi) + if err != nil { + // the compaction was done asynchronously with the progress of raft. + // raft log might already been compact. if err == raft.ErrCompacted { return } log.Panicf("etcdserver: unexpected compaction error %v", err) } log.Printf("etcdserver: compacted log at index %d", snapi) - if err := s.r.storage.Cut(); err != nil { log.Panicf("etcdserver: rotate wal file should never fail: %v", err) } - snap, err := s.r.raftStorage.Snapshot() - if err != nil { - log.Panicf("etcdserver: snapshot error: %v", err) - } - if err := s.r.storage.SaveSnap(snap); err != nil { - log.Fatalf("etcdserver: save snapshot error: %v", err) - } log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index) } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 29dd90350..5393b6f91 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -723,11 +723,11 @@ func TestSnapshot(t *testing.T) { if len(gaction) != 2 { t.Fatalf("len(action) = %d, want 2", len(gaction)) } - if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Cut"}) { - t.Errorf("action = %s, want Cut", gaction[0]) + if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) { + t.Errorf("action = %s, want SaveSnap", gaction[0]) } - if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveSnap"}) { - t.Errorf("action = %s, want SaveSnap", gaction[1]) + if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "Cut"}) { + t.Errorf("action = %s, want Cut", gaction[1]) } } @@ -755,12 +755,12 @@ func TestTriggerSnap(t *testing.T) { gaction := p.Action() // each operation is recorded as a Save - // (SnapCount+1) * Puts + Cut + SaveSnap = (SnapCount+1) * Save + Cut + SaveSnap + // (SnapCount+1) * Puts + Cut + SaveSnap = (SnapCount+1) * Save + SaveSnap + CUT wcnt := 3 + snapc if len(gaction) != wcnt { t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt) } - if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) { + if !reflect.DeepEqual(gaction[wcnt-2], testutil.Action{Name: "SaveSnap"}) { t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1]) } } diff --git a/raft/log_test.go b/raft/log_test.go index 37403c9ad..3aef304bf 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -290,7 +290,7 @@ func TestCompactionSideEffects(t *testing.T) { raftLog.appliedTo(raftLog.committed) offset := uint64(500) - storage.Compact(offset, nil, nil) + storage.Compact(offset) if raftLog.lastIndex() != lastIndex { t.Errorf("lastIndex = %d, want %d", raftLog.lastIndex(), lastIndex) @@ -520,7 +520,7 @@ func TestCompaction(t *testing.T) { raftLog.appliedTo(raftLog.committed) for j := 0; j < len(tt.compact); j++ { - err := storage.Compact(tt.compact[j], nil, nil) + err := storage.Compact(tt.compact[j]) if err != nil { if tt.wallow { t.Errorf("#%d.%d allow = %t, want %t", i, j, false, tt.wallow) diff --git a/raft/raft_test.go b/raft/raft_test.go index 8608533e0..5ea2c90f1 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1412,7 +1412,8 @@ func TestSlowNodeRestore(t *testing.T) { } lead := nt.peers[1].(*raft) nextEnts(lead, nt.storage[1]) - nt.storage[1].Compact(lead.raftLog.applied, &pb.ConfState{Nodes: lead.nodes()}, nil) + nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.nodes()}, nil) + nt.storage[1].Compact(lead.raftLog.applied) nt.recover() // trigger a snapshot diff --git a/raft/storage.go b/raft/storage.go index efd3bc902..49c72ac1e 100644 --- a/raft/storage.go +++ b/raft/storage.go @@ -26,6 +26,10 @@ import ( // index is unavailable because it predates the last snapshot. var ErrCompacted = errors.New("requested index is unavailable due to compaction") +// ErrOutOfDataSnap is returned by Storage.CreateSnapshot when a requested +// index is older than the existing snapshot. +var ErrSnapOutOfDate = errors.New("requested index is older than the existing snapshot") + var ErrUnavailable = errors.New("requested entry at index is unavailable") // Storage is an interface that may be implemented by the application @@ -92,7 +96,7 @@ func (ms *MemoryStorage) SetHardState(st pb.HardState) error { func (ms *MemoryStorage) Entries(lo, hi uint64) ([]pb.Entry, error) { ms.Lock() defer ms.Unlock() - offset := ms.snapshot.Metadata.Index + offset := ms.ents[0].Index if lo <= offset { return nil, ErrCompacted } @@ -107,7 +111,7 @@ func (ms *MemoryStorage) Entries(lo, hi uint64) ([]pb.Entry, error) { func (ms *MemoryStorage) Term(i uint64) (uint64, error) { ms.Lock() defer ms.Unlock() - offset := ms.snapshot.Metadata.Index + offset := ms.ents[0].Index if i < offset { return 0, ErrCompacted } @@ -118,14 +122,14 @@ func (ms *MemoryStorage) Term(i uint64) (uint64, error) { func (ms *MemoryStorage) LastIndex() (uint64, error) { ms.Lock() defer ms.Unlock() - return ms.snapshot.Metadata.Index + uint64(len(ms.ents)) - 1, nil + return ms.ents[0].Index + uint64(len(ms.ents)) - 1, nil } // FirstIndex implements the Storage interface. func (ms *MemoryStorage) FirstIndex() (uint64, error) { ms.Lock() defer ms.Unlock() - return ms.snapshot.Metadata.Index + 1, nil + return ms.ents[0].Index + 1, nil } // Snapshot implements the Storage interface. @@ -141,50 +145,68 @@ func (ms *MemoryStorage) ApplySnapshot(snap pb.Snapshot) error { ms.Lock() defer ms.Unlock() + // TODO: return snapOutOfDate? ms.snapshot = snap ms.ents = []pb.Entry{{Term: snap.Metadata.Term, Index: snap.Metadata.Index}} return nil } -// Compact discards all log entries prior to i. Creates a snapshot -// which can be retrieved with the Snapshot() method and can be used -// to reconstruct the state at that point. +// Creates a snapshot which can be retrieved with the Snapshot() method and +// can be used to reconstruct the state at that point. // If any configuration changes have been made since the last compaction, // the result of the last ApplyConfChange must be passed in. -// It is the application's responsibility to not attempt to compact an index -// greater than raftLog.applied. -func (ms *MemoryStorage) Compact(i uint64, cs *pb.ConfState, data []byte) error { +func (ms *MemoryStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) (pb.Snapshot, error) { ms.Lock() defer ms.Unlock() - offset := ms.snapshot.Metadata.Index - if i <= offset { - return ErrCompacted + if i <= ms.snapshot.Metadata.Index { + return pb.Snapshot{}, ErrSnapOutOfDate } + + offset := ms.ents[0].Index if i > offset+uint64(len(ms.ents))-1 { - log.Panicf("compact %d is out of bound lastindex(%d)", i, offset+uint64(len(ms.ents))-1) + log.Panicf("snapshot %d is out of bound lastindex(%d)", i, offset+uint64(len(ms.ents))-1) } - i -= offset - ents := make([]pb.Entry, 1, 1+uint64(len(ms.ents))-i) - ents[0].Term = ms.ents[i].Term - ents = append(ents, ms.ents[i+1:]...) - ms.ents = ents - ms.snapshot.Metadata.Index += i - ms.snapshot.Metadata.Term = ents[0].Term + + ms.snapshot.Metadata.Index = i + ms.snapshot.Metadata.Term = ms.ents[i-offset].Term if cs != nil { ms.snapshot.Metadata.ConfState = *cs } ms.snapshot.Data = data + return ms.snapshot, nil +} + +// Compact discards all log entries prior to i. +// It is the application's responsibility to not attempt to compact an index +// greater than raftLog.applied. +func (ms *MemoryStorage) Compact(compactIndex uint64) error { + offset := ms.ents[0].Index + if compactIndex <= offset { + return ErrCompacted + } + if compactIndex > offset+uint64(len(ms.ents))-1 { + log.Panicf("compact %d is out of bound lastindex(%d)", compactIndex, offset+uint64(len(ms.ents))-1) + } + + i := compactIndex - offset + ents := make([]pb.Entry, 1, 1+uint64(len(ms.ents))-i) + ents[0].Index = ms.ents[i].Index + ents[0].Term = ms.ents[i].Term + ents = append(ents, ms.ents[i+1:]...) + ms.ents = ents return nil } // Append the new entries to storage. +// TODO (xiangli): ensure the entries are continuous and +// entries[0].Index > ms.entries[0].Index func (ms *MemoryStorage) Append(entries []pb.Entry) error { ms.Lock() defer ms.Unlock() if len(entries) == 0 { return nil } - first := ms.snapshot.Metadata.Index + 1 + first := ms.ents[0].Index + 1 last := entries[0].Index + uint64(len(entries)) - 1 // shortcut if there is no new entry. @@ -196,7 +218,7 @@ func (ms *MemoryStorage) Append(entries []pb.Entry) error { entries = entries[first-entries[0].Index:] } - offset := entries[0].Index - ms.snapshot.Metadata.Index + offset := entries[0].Index - ms.ents[0].Index switch { case uint64(len(ms.ents)) > offset: ms.ents = append([]pb.Entry{}, ms.ents[:offset]...) @@ -205,7 +227,7 @@ func (ms *MemoryStorage) Append(entries []pb.Entry) error { ms.ents = append(ms.ents, entries...) default: log.Panicf("missing log entry [last: %d, append at: %d]", - ms.snapshot.Metadata.Index+uint64(len(ms.ents)), entries[0].Index) + ms.ents[0].Index+uint64(len(ms.ents)), entries[0].Index) } return nil } diff --git a/raft/storage_test.go b/raft/storage_test.go new file mode 100644 index 000000000..f136e353d --- /dev/null +++ b/raft/storage_test.go @@ -0,0 +1,223 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raft + +import ( + "reflect" + "testing" + + pb "github.com/coreos/etcd/raft/raftpb" +) + +// TODO(xiangli): Test panic cases + +func TestStorageTerm(t *testing.T) { + ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}} + tests := []struct { + i uint64 + + werr error + wterm uint64 + }{ + {2, ErrCompacted, 0}, + {3, nil, 3}, + {4, nil, 4}, + {5, nil, 5}, + } + + for i, tt := range tests { + s := &MemoryStorage{ents: ents} + term, err := s.Term(tt.i) + if err != tt.werr { + t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) + } + if term != tt.wterm { + t.Errorf("#%d: term = %d, want %d", i, term, tt.wterm) + } + } +} + +func TestStorageEntries(t *testing.T) { + ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}} + tests := []struct { + lo, hi uint64 + + werr error + wentries []pb.Entry + }{ + {2, 6, ErrCompacted, nil}, + {3, 4, ErrCompacted, nil}, + {4, 5, nil, []pb.Entry{{Index: 4, Term: 4}}}, + {4, 6, nil, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 5}}}, + } + + for i, tt := range tests { + s := &MemoryStorage{ents: ents} + entries, err := s.Entries(tt.lo, tt.hi) + if err != tt.werr { + t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) + } + if !reflect.DeepEqual(entries, tt.wentries) { + t.Errorf("#%d: entries = %v, want %v", i, entries, tt.wentries) + } + } +} + +func TestStorageLastIndex(t *testing.T) { + ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}} + s := &MemoryStorage{ents: ents} + + last, err := s.LastIndex() + if err != nil { + t.Errorf("err = %v, want nil", err) + } + if last != 5 { + t.Errorf("term = %d, want %d", last, 5) + } + + s.Append([]pb.Entry{{Index: 6, Term: 5}}) + last, err = s.LastIndex() + if err != nil { + t.Errorf("err = %v, want nil", err) + } + if last != 6 { + t.Errorf("last = %d, want %d", last, 5) + } +} + +func TestStorageFirstIndex(t *testing.T) { + ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}} + s := &MemoryStorage{ents: ents} + + first, err := s.FirstIndex() + if err != nil { + t.Errorf("err = %v, want nil", err) + } + if first != 4 { + t.Errorf("first = %d, want %d", first, 4) + } + + s.Compact(4) + first, err = s.FirstIndex() + if err != nil { + t.Errorf("err = %v, want nil", err) + } + if first != 5 { + t.Errorf("first = %d, want %d", first, 5) + } +} + +func TestStorageCompact(t *testing.T) { + ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}} + tests := []struct { + i uint64 + + werr error + windex uint64 + wterm uint64 + wlen int + }{ + {2, ErrCompacted, 3, 3, 3}, + {3, ErrCompacted, 3, 3, 3}, + {4, nil, 4, 4, 2}, + {5, nil, 5, 5, 1}, + } + + for i, tt := range tests { + s := &MemoryStorage{ents: ents} + err := s.Compact(tt.i) + if err != tt.werr { + t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) + } + if s.ents[0].Index != tt.windex { + t.Errorf("#%d: index = %d, want %d", i, s.ents[0].Index, tt.windex) + } + if s.ents[0].Term != tt.wterm { + t.Errorf("#%d: term = %d, want %d", i, s.ents[0].Term, tt.wterm) + } + if len(s.ents) != tt.wlen { + t.Errorf("#%d: len = %d, want %d", i, len(s.ents), tt.wlen) + } + } +} + +func TestStorageCreateSnapshot(t *testing.T) { + ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}} + cs := &pb.ConfState{Nodes: []uint64{1, 2, 3}} + data := []byte("data") + + tests := []struct { + i uint64 + + werr error + wsnap pb.Snapshot + }{ + {4, nil, pb.Snapshot{Data: data, Metadata: pb.SnapshotMetadata{Index: 4, Term: 4, ConfState: *cs}}}, + {5, nil, pb.Snapshot{Data: data, Metadata: pb.SnapshotMetadata{Index: 5, Term: 5, ConfState: *cs}}}, + } + + for i, tt := range tests { + s := &MemoryStorage{ents: ents} + snap, err := s.CreateSnapshot(tt.i, cs, data) + if err != tt.werr { + t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) + } + if !reflect.DeepEqual(snap, tt.wsnap) { + t.Errorf("#%d: snap = %+v, want %+v", i, snap, tt.wsnap) + } + } +} + +func TestStorageAppend(t *testing.T) { + ents := []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}} + tests := []struct { + entries []pb.Entry + + werr error + wentries []pb.Entry + }{ + { + []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}}, + nil, + []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}}, + }, + { + []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 6}, {Index: 5, Term: 6}}, + nil, + []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 6}, {Index: 5, Term: 6}}, + }, + { + []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 5}}, + nil, + []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 5}}, + }, + { + []pb.Entry{{Index: 6, Term: 5}}, + nil, + []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 5}, {Index: 6, Term: 5}}, + }, + } + + for i, tt := range tests { + s := &MemoryStorage{ents: ents} + err := s.Append(tt.entries) + if err != tt.werr { + t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) + } + if !reflect.DeepEqual(s.ents, tt.wentries) { + t.Errorf("#%d: entries = %v, want %v", i, s.ents, tt.wentries) + } + } +}