diff --git a/etcd/etcd_test.go b/etcd/etcd_test.go index bd0ed49cb..1739a0ce3 100644 --- a/etcd/etcd_test.go +++ b/etcd/etcd_test.go @@ -230,7 +230,7 @@ func TestTakingSnapshot(t *testing.T) { cl.Participant(0).Set("/foo", false, "bar", store.Permanent) } snap := cl.Participant(0).node.GetSnap() - if snap.Index != defaultCompact { + if snap.Index != int64(defaultCompact) { t.Errorf("snap.Index = %d, want %d", snap.Index, defaultCompact) } } @@ -273,7 +273,7 @@ func TestRestoreSnapshotFromLeader(t *testing.T) { } // check new proposal could be committed in the new machine - wch, err := ts.Participant().Watch("/foo", false, false, defaultCompact) + wch, err := ts.Participant().Watch("/foo", false, false, uint64(defaultCompact)) if err != nil { t.Errorf("watch err = %v", err) } @@ -314,34 +314,43 @@ func TestSaveSnapshot(t *testing.T) { func TestRestoreSnapshotFromDisk(t *testing.T) { defer afterTest(t) - cl := testCluster{Size: 1} - cl.Start() - defer cl.Destroy() + tests := []int{1, 3, 5} - lead, _ := cl.Leader() - for i := 0; i < defaultCompact+10; i++ { - cl.Participant(lead).Set(fmt.Sprint("/foo", i), false, fmt.Sprint("bar", i), store.Permanent) - } + // TODO(xiangli): tunable compact; reduce testing time + oldDefaultCompact := defaultCompact + defaultCompact = 10 + defer func() { defaultCompact = oldDefaultCompact }() - cl.Stop() - cl.Restart() + for _, tt := range tests { + cl := testCluster{Size: tt} + cl.Start() + defer cl.Destroy() - lead, _ = cl.Leader() - // check store is recovered - for i := 0; i < defaultCompact+10; i++ { - ev, err := cl.Participant(lead).Store.Get(fmt.Sprint("/foo", i), false, false) - if err != nil { - t.Errorf("get err = %v", err) - continue + lead, _ := cl.Leader() + for i := 0; i < defaultCompact+10; i++ { + cl.Participant(lead).Set(fmt.Sprint("/foo", i), false, fmt.Sprint("bar", i), store.Permanent) } - w := fmt.Sprint("bar", i) - if g := *ev.Node.Value; g != w { - t.Errorf("value = %v, want %v", g, w) + + cl.Stop() + cl.Restart() + + lead, _ = cl.Leader() + // check store is recovered + for i := 0; i < defaultCompact+10; i++ { + ev, err := cl.Participant(lead).Store.Get(fmt.Sprint("/foo", i), false, false) + if err != nil { + t.Errorf("get err = %v", err) + continue + } + w := fmt.Sprint("bar", i) + if g := *ev.Node.Value; g != w { + t.Errorf("value = %v, want %v", g, w) + } + } + // check new proposal could be submitted + if _, err := cl.Participant(lead).Set("/foo", false, "bar", store.Permanent); err != nil { + t.Fatal(err) } - } - // check new proposal could be submitted - if _, err := cl.Participant(lead).Set("/foo", false, "bar", store.Permanent); err != nil { - t.Fatal(err) } } diff --git a/etcd/participant.go b/etcd/participant.go index 5bb62a176..701d91ee1 100644 --- a/etcd/participant.go +++ b/etcd/participant.go @@ -22,6 +22,7 @@ import ( "log" "math/rand" "net/http" + "net/url" "os" "path" "time" @@ -37,7 +38,6 @@ import ( const ( defaultHeartbeat = 1 defaultElection = 5 - defaultCompact = 10000 maxBufferedProposal = 128 @@ -58,6 +58,8 @@ const ( ) var ( + defaultCompact = 10000 + tmpErr = fmt.Errorf("try again") stopErr = fmt.Errorf("server is stopped") raftStopErr = fmt.Errorf("raft is stopped") @@ -158,6 +160,24 @@ func newParticipant(c *conf.Config, client *v2client, peerHub *peerHub, tickDura panic(err) } log.Printf("id=%x participant.store.recovered index=%d\n", p.id, s.Index) + + for _, node := range s.Nodes { + pp := path.Join(v2machineKVPrefix, fmt.Sprint(node)) + ev, err := p.Store.Get(pp, false, false) + if err != nil { + panic(err) + } + q, err := url.ParseQuery(*ev.Node.Value) + if err != nil { + panic(err) + } + peer, err := p.peerHub.add(node, q["raft"][0]) + if err != nil { + panic(err) + } + peer.participate() + } + snapIndex = s.Index } n, err := wal.Read(walDir, snapIndex)