diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index c31aca01d..36fb37ba4 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -267,6 +267,10 @@ func (c *Cluster) SetID(id types.ID) { c.id = id } func (c *Cluster) SetStore(st store.Store) { c.store = st } +func (c *Cluster) Recover() { + c.members, c.removed = membersFromStore(c.store) +} + // ValidateConfigurationChange takes a proposed ConfChange and // ensures that it is still valid. func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { diff --git a/etcdserver/server.go b/etcdserver/server.go index ed325a324..a3ce4bd5b 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -349,27 +349,37 @@ func (s *EtcdServer) run() { } s.sender.Send(rd.Messages) - // TODO(bmizerany): do this in the background, but take - // care to apply entries in a single goroutine, and not - // race them. - if len(rd.CommittedEntries) != 0 { - appliedi = s.apply(rd.CommittedEntries) - } - - if rd.Snapshot.Index > snapi { - snapi = rd.Snapshot.Index - } - // recover from snapshot if it is more updated than current applied if rd.Snapshot.Index > appliedi { if err := s.store.Recovery(rd.Snapshot.Data); err != nil { log.Panicf("recovery store error: %v", err) } + s.Cluster.Recover() appliedi = rd.Snapshot.Index } + // TODO(bmizerany): do this in the background, but take + // care to apply entries in a single goroutine, and not + // race them. + if len(rd.CommittedEntries) != 0 { + firsti := rd.CommittedEntries[0].Index + if appliedi == 0 { + appliedi = firsti - 1 + } + if firsti > appliedi+1 { + log.Panicf("etcdserver: first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, appliedi) + } + var ents []raftpb.Entry + if appliedi+1-firsti < uint64(len(rd.CommittedEntries)) { + ents = rd.CommittedEntries[appliedi+1-firsti:] + } + appliedi = s.apply(ents) + } s.node.Advance() + if rd.Snapshot.Index > snapi { + snapi = rd.Snapshot.Index + } if appliedi-snapi > s.snapCount { s.snapshot(appliedi, nodes) snapi = appliedi diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 8dd653b90..1ed391698 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -31,6 +31,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" @@ -900,11 +901,14 @@ func TestRecvSnapshot(t *testing.T) { n := newReadyNode() st := &storeRecorder{} p := &storageRecorder{} + cl := newCluster("abc") + cl.SetStore(store.New()) s := &EtcdServer{ store: st, sender: &nopSender{}, storage: p, node: n, + Cluster: cl, } s.start() @@ -924,15 +928,19 @@ func TestRecvSnapshot(t *testing.T) { } // TestRecvSlowSnapshot tests that slow snapshot will not be applied -// to store. +// to store. The case could happen when server compacts the log and +// raft returns the compacted snapshot. func TestRecvSlowSnapshot(t *testing.T) { n := newReadyNode() st := &storeRecorder{} + cl := newCluster("abc") + cl.SetStore(store.New()) s := &EtcdServer{ store: st, sender: &nopSender{}, storage: &storageRecorder{}, node: n, + Cluster: cl, } s.start() @@ -951,6 +959,45 @@ func TestRecvSlowSnapshot(t *testing.T) { } } +// TestApplySnapshotAndCommittedEntries tests that server applies snapshot +// first and then committed entries. +func TestApplySnapshotAndCommittedEntries(t *testing.T) { + n := newReadyNode() + st := &storeRecorder{} + cl := newCluster("abc") + cl.SetStore(store.New()) + s := &EtcdServer{ + store: st, + sender: &nopSender{}, + storage: &storageRecorder{}, + node: n, + Cluster: cl, + } + + s.start() + req := &pb.Request{Method: "QGET"} + n.readyc <- raft.Ready{ + Snapshot: raftpb.Snapshot{Index: 1}, + CommittedEntries: []raftpb.Entry{ + {Index: 2, Data: pbutil.MustMarshal(req)}, + }, + } + // make goroutines move forward to receive snapshot + testutil.ForceGosched() + s.Stop() + + actions := st.Action() + if len(actions) != 2 { + t.Fatalf("len(action) = %d, want 2", len(actions)) + } + if actions[0].name != "Recovery" { + t.Errorf("actions[0] = %s, want %s", actions[0].name, "Recovery") + } + if actions[1].name != "Get" { + t.Errorf("actions[1] = %s, want %s", actions[1].name, "Get") + } +} + // TestAddMember tests AddMember can propose and perform node addition. func TestAddMember(t *testing.T) { n := newNodeConfChangeCommitterRecorder() diff --git a/raft/log.go b/raft/log.go index 49666c80c..93c800349 100644 --- a/raft/log.go +++ b/raft/log.go @@ -115,10 +115,12 @@ func (l *raftLog) unstableEnts() []pb.Entry { } // nextEnts returns all the available entries for execution. -// all the returned entries will be marked as applied. +// If applied is smaller than the index of snapshot, it returns all committed +// entries after the index of snapshot. func (l *raftLog) nextEnts() (ents []pb.Entry) { - if l.committed > l.applied { - return l.slice(l.applied+1, l.committed+1) + off := max(l.applied, l.snapshot.Index) + if l.committed > off { + return l.slice(off+1, l.committed+1) } return nil } @@ -211,7 +213,6 @@ func (l *raftLog) restore(s pb.Snapshot) { l.ents = []pb.Entry{{Term: s.Term}} l.unstable = s.Index + 1 l.committed = s.Index - l.applied = s.Index l.offset = s.Index l.snapshot = s } diff --git a/raft/log_test.go b/raft/log_test.go index 9654d76aa..eb479b48d 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -326,6 +326,37 @@ func TestCompactionSideEffects(t *testing.T) { } } +func TestNextEnts(t *testing.T) { + snap := pb.Snapshot{Term: 1, Index: 3} + ents := []pb.Entry{ + {Term: 1, Index: 3}, + {Term: 1, Index: 4}, + {Term: 1, Index: 5}, + {Term: 1, Index: 6}, + } + tests := []struct { + applied uint64 + wents []pb.Entry + }{ + {0, ents[1:3]}, + {3, ents[1:3]}, + {4, ents[2:3]}, + {5, nil}, + } + for i, tt := range tests { + raftLog := newLog() + raftLog.restore(snap) + raftLog.load(ents) + raftLog.maybeCommit(5, 1) + raftLog.appliedTo(tt.applied) + + ents := raftLog.nextEnts() + if !reflect.DeepEqual(ents, tt.wents) { + t.Errorf("#%d: ents = %+v, want %+v", i, ents, tt.wents) + } + } +} + func TestUnstableEnts(t *testing.T) { previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}} tests := []struct { @@ -435,9 +466,6 @@ func TestLogRestore(t *testing.T) { if raftLog.offset != index { t.Errorf("offset = %d, want %d", raftLog.offset, index) } - if raftLog.applied != index { - t.Errorf("applied = %d, want %d", raftLog.applied, index) - } if raftLog.committed != index { t.Errorf("comitted = %d, want %d", raftLog.committed, index) } diff --git a/raft/node.go b/raft/node.go index db2b710f3..274d89e52 100644 --- a/raft/node.go +++ b/raft/node.go @@ -171,6 +171,7 @@ func RestartNode(id uint64, election, heartbeat int, snapshot *pb.Snapshot, st p r := newRaft(id, nil, election, heartbeat) if snapshot != nil { r.restore(*snapshot) + r.raftLog.appliedTo(snapshot.Index) } if !isHardStateEqual(st, emptyState) { r.loadState(st) diff --git a/raft/node_test.go b/raft/node_test.go index 77675a56c..0ad94d949 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -368,6 +368,39 @@ func TestNodeRestart(t *testing.T) { } } +func TestNodeRestartFromSnapshot(t *testing.T) { + snap := &raftpb.Snapshot{ + Data: []byte("some data"), + Nodes: []uint64{1, 2}, + Index: 2, + Term: 1, + } + entries := []raftpb.Entry{ + {Term: 1, Index: 2}, + {Term: 1, Index: 3, Data: []byte("foo")}, + } + st := raftpb.HardState{Term: 1, Commit: 3} + + want := Ready{ + HardState: emptyState, + // commit upto index commit index in st + CommittedEntries: entries[1:], + } + + n := RestartNode(1, 10, 1, snap, st, entries) + if g := <-n.Ready(); !reflect.DeepEqual(g, want) { + t.Errorf("g = %+v,\n w %+v", g, want) + } else { + n.Advance() + } + + select { + case rd := <-n.Ready(): + t.Errorf("unexpected Ready: %+v", rd) + case <-time.After(time.Millisecond): + } +} + // TestCompacts ensures Node.Compact creates a correct raft snapshot and compacts // the raft log (call raft.compact) func TestNodeCompact(t *testing.T) {