Merge pull request #4126 from heyitsanthony/testutil-recorder-stream

remove WaitSchedule() from etcdserver tests
This commit is contained in:
Anthony Romano 2016-01-05 10:19:14 -08:00
commit 23ddb9ff30
10 changed files with 456 additions and 307 deletions

View File

@ -406,7 +406,7 @@ func TestClusterGenID(t *testing.T) {
}
previd := cs.ID()
cs.SetStore(&storeRecorder{})
cs.SetStore(store.NewNop())
cs.AddMember(newTestMember(3, nil, "", nil))
cs.genID()
if cs.ID() == previd {
@ -447,7 +447,7 @@ func TestNodeToMemberBad(t *testing.T) {
}
func TestClusterAddMember(t *testing.T) {
st := &storeRecorder{}
st := store.NewRecorder()
c := newTestCluster(nil)
c.SetStore(st)
c.AddMember(newTestMember(1, nil, "node1", nil))
@ -492,7 +492,7 @@ func TestClusterMembers(t *testing.T) {
}
func TestClusterRemoveMember(t *testing.T) {
st := &storeRecorder{}
st := store.NewRecorder()
c := newTestCluster(nil)
c.SetStore(st)
c.RemoveMember(1)

View File

@ -24,6 +24,7 @@ import (
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/rafthttp"
)
func TestGetIDs(t *testing.T) {
@ -149,12 +150,12 @@ func TestCreateConfigChangeEnts(t *testing.T) {
}
func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
n := newReadyNode()
n := newNopReadyNode()
r := raftNode{
Node: n,
storage: &storageRecorder{},
raftStorage: raft.NewMemoryStorage(),
transport: &nopTransporter{},
transport: rafthttp.NewNopTransporter(),
}
r.start(&EtcdServer{r: r})
n.readyc <- raft.Ready{}

View File

@ -18,7 +18,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"path"
"reflect"
@ -35,7 +34,7 @@ import (
"github.com/coreos/etcd/pkg/wait"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/rafthttp"
dstorage "github.com/coreos/etcd/storage"
"github.com/coreos/etcd/store"
)
@ -52,7 +51,7 @@ func TestDoLocalAction(t *testing.T) {
}{
{
pb.Request{Method: "GET", ID: 1, Wait: true},
Response{Watcher: &nopWatcher{}}, nil, []testutil.Action{{Name: "Watch"}},
Response{Watcher: store.NewNopWatcher()}, nil, []testutil.Action{{Name: "Watch"}},
},
{
pb.Request{Method: "GET", ID: 1},
@ -80,7 +79,7 @@ func TestDoLocalAction(t *testing.T) {
},
}
for i, tt := range tests {
st := &storeRecorder{}
st := store.NewRecorder()
srv := &EtcdServer{
store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -133,7 +132,7 @@ func TestDoBadLocalAction(t *testing.T) {
},
}
for i, tt := range tests {
st := &errStoreRecorder{err: storeErr}
st := store.NewErrRecorder(storeErr)
srv := &EtcdServer{
store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -377,7 +376,7 @@ func TestApplyRequest(t *testing.T) {
}
for i, tt := range tests {
st := &storeRecorder{}
st := store.NewRecorder()
srv := &EtcdServer{store: st}
resp := srv.applyRequest(tt.req)
@ -394,7 +393,7 @@ func TestApplyRequest(t *testing.T) {
func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
cl := newTestCluster([]*Member{{ID: 1}})
srv := &EtcdServer{
store: &storeRecorder{},
store: store.NewRecorder(),
cluster: cl,
}
req := pb.Request{
@ -452,7 +451,7 @@ func TestApplyConfChangeError(t *testing.T) {
},
}
for i, tt := range tests {
n := &nodeRecorder{}
n := newNodeRecorder()
srv := &EtcdServer{
r: raftNode{Node: n},
cluster: cl,
@ -469,7 +468,7 @@ func TestApplyConfChangeError(t *testing.T) {
Params: []interface{}{cc},
},
}
if g := n.Action(); !reflect.DeepEqual(g, w) {
if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) {
t.Errorf("#%d: action = %+v, want %+v", i, g, w)
}
}
@ -484,8 +483,8 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
srv := &EtcdServer{
id: 1,
r: raftNode{
Node: &nodeRecorder{},
transport: &nopTransporter{},
Node: newNodeNop(),
transport: rafthttp.NewNopTransporter(),
},
cluster: cl,
}
@ -524,8 +523,8 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
srv := &EtcdServer{
id: 2,
r: raftNode{
Node: &nodeRecorder{},
transport: &nopTransporter{},
Node: newNodeNop(),
transport: rafthttp.NewNopTransporter(),
},
cluster: cl,
w: wait.New(),
@ -558,14 +557,14 @@ func TestDoProposal(t *testing.T) {
{Method: "GET", ID: 1, Quorum: true},
}
for i, tt := range tests {
st := &storeRecorder{}
st := store.NewRecorder()
srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
r: raftNode{
Node: newNodeCommitter(),
storage: &storageRecorder{},
raftStorage: raft.NewMemoryStorage(),
transport: &nopTransporter{},
transport: rafthttp.NewNopTransporter(),
},
store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -589,10 +588,10 @@ func TestDoProposal(t *testing.T) {
}
func TestDoProposalCancelled(t *testing.T) {
wait := &waitRecorder{}
wait := wait.NewRecorder()
srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: &nodeRecorder{}},
r: raftNode{Node: newNodeNop()},
w: wait,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
@ -604,16 +603,16 @@ func TestDoProposalCancelled(t *testing.T) {
t.Fatalf("err = %v, want %v", err, ErrCanceled)
}
w := []testutil.Action{{Name: "Register"}, {Name: "Trigger"}}
if !reflect.DeepEqual(wait.action, w) {
t.Errorf("wait.action = %+v, want %+v", wait.action, w)
if !reflect.DeepEqual(wait.Action(), w) {
t.Errorf("wait.action = %+v, want %+v", wait.Action(), w)
}
}
func TestDoProposalTimeout(t *testing.T) {
srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: &nodeRecorder{}},
w: &waitRecorder{},
r: raftNode{Node: newNodeNop()},
w: wait.NewNop(),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
ctx, _ := context.WithTimeout(context.Background(), 0)
@ -626,8 +625,8 @@ func TestDoProposalTimeout(t *testing.T) {
func TestDoProposalStopped(t *testing.T) {
srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: &nodeRecorder{}},
w: &waitRecorder{},
r: raftNode{Node: newNodeNop()},
w: wait.NewNop(),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
srv.done = make(chan struct{})
@ -640,7 +639,7 @@ func TestDoProposalStopped(t *testing.T) {
// TestSync tests sync 1. is nonblocking 2. proposes SYNC request.
func TestSync(t *testing.T) {
n := &nodeRecorder{}
n := newNodeRecorder()
srv := &EtcdServer{
r: raftNode{Node: n},
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -658,9 +657,7 @@ func TestSync(t *testing.T) {
t.Fatal("sync should be non-blocking but did not return after 1s!")
}
testutil.WaitSchedule()
action := n.Action()
action, _ := n.Wait(1)
if len(action) != 1 {
t.Fatalf("len(action) = %d, want 1", len(action))
}
@ -680,7 +677,7 @@ func TestSync(t *testing.T) {
// TestSyncTimeout tests the case that sync 1. is non-blocking 2. cancel request
// after timeout
func TestSyncTimeout(t *testing.T) {
n := &nodeProposalBlockerRecorder{}
n := newProposalBlockerRecorder()
srv := &EtcdServer{
r: raftNode{Node: n},
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -698,10 +695,8 @@ func TestSyncTimeout(t *testing.T) {
t.Fatal("sync should be non-blocking but did not return after 1s!")
}
// give time for goroutine in sync to cancel
testutil.WaitSchedule()
w := []testutil.Action{{Name: "Propose blocked"}}
if g := n.Action(); !reflect.DeepEqual(g, w) {
if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) {
t.Errorf("action = %v, want %v", g, w)
}
}
@ -717,26 +712,29 @@ func TestSyncTrigger(t *testing.T) {
r: raftNode{
Node: n,
raftStorage: raft.NewMemoryStorage(),
transport: &nopTransporter{},
transport: rafthttp.NewNopTransporter(),
storage: &storageRecorder{},
},
store: &storeRecorder{},
store: store.NewNop(),
SyncTicker: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
srv.start()
defer srv.Stop()
// trigger the server to become a leader and accept sync requests
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{
RaftState: raft.StateLeader,
},
}
// trigger a sync request
st <- time.Time{}
testutil.WaitSchedule()
action := n.Action()
// trigger the server to become a leader and accept sync requests
go func() {
srv.start()
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{
RaftState: raft.StateLeader,
},
}
// trigger a sync request
st <- time.Time{}
}()
action, _ := n.Wait(1)
go srv.Stop()
if len(action) != 1 {
t.Fatalf("len(action) = %d, want 1", len(action))
}
@ -751,26 +749,28 @@ func TestSyncTrigger(t *testing.T) {
if req.Method != "SYNC" {
t.Fatalf("unexpected proposed request: %#v", req.Method)
}
// wait on stop message
<-n.Chan()
}
// snapshot should snapshot the store and cut the persistent
func TestSnapshot(t *testing.T) {
s := raft.NewMemoryStorage()
s.Append([]raftpb.Entry{{Index: 1}})
st := &storeRecorder{}
st := store.NewRecorder()
p := &storageRecorder{}
srv := &EtcdServer{
cfg: &ServerConfig{},
r: raftNode{
Node: &nodeRecorder{},
Node: newNodeNop(),
raftStorage: s,
storage: p,
},
store: st,
}
srv.snapshot(1, raftpb.ConfState{Nodes: []uint64{1}})
testutil.WaitSchedule()
gaction := st.Action()
gaction, _ := st.Wait(2)
if len(gaction) != 2 {
t.Fatalf("len(action) = %d, want 1", len(gaction))
}
@ -792,7 +792,7 @@ func TestSnapshot(t *testing.T) {
// Applied > SnapCount should trigger a SaveSnap event
func TestTriggerSnap(t *testing.T) {
snapc := 10
st := &storeRecorder{}
st := store.NewRecorder()
p := &storageRecorder{}
srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
@ -801,7 +801,7 @@ func TestTriggerSnap(t *testing.T) {
Node: newNodeCommitter(),
raftStorage: raft.NewMemoryStorage(),
storage: p,
transport: &nopTransporter{},
transport: rafthttp.NewNopTransporter(),
},
store: st,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -810,14 +810,14 @@ func TestTriggerSnap(t *testing.T) {
for i := 0; i < snapc+1; i++ {
srv.Do(context.Background(), pb.Request{Method: "PUT"})
}
srv.Stop()
// wait for snapshot goroutine to finish
testutil.WaitSchedule()
gaction := p.Action()
wcnt := 2 + snapc
gaction, _ := p.Wait(wcnt)
srv.Stop()
// each operation is recorded as a Save
// (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap
wcnt := 2 + snapc
if len(gaction) != wcnt {
t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
}
@ -833,7 +833,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
// snapshots that may queue up at once without dropping
maxInFlightMsgSnap = 16
)
n := newReadyNode()
n := newNopReadyNode()
cl := newCluster("abc")
cl.SetStore(store.New())
@ -847,7 +847,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
}
rs := raft.NewMemoryStorage()
tr := newSnapTransporter(testdir)
tr, snapDoneC := rafthttp.NewSnapTransporter(testdir)
s := &EtcdServer{
cfg: &ServerConfig{
V3demo: true,
@ -896,7 +896,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
n.readyc <- raft.Ready{Messages: []raftpb.Message{{Type: raftpb.MsgSnap}}}
// get the snapshot sent by the transport
snapMsg := <-tr.snapDoneC
snapMsg := <-snapDoneC
// If the snapshot trails applied records, recovery will panic
// since there's no allocated snapshot at the place of the
// snapshot record. This only happens when the applier and the
@ -923,8 +923,8 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
// TestRecvSnapshot tests when it receives a snapshot from raft leader,
// it should trigger storage.SaveSnap and also store.Recover.
func TestRecvSnapshot(t *testing.T) {
n := newReadyNode()
st := &storeRecorder{}
n := newNopReadyNode()
st := store.NewRecorder()
p := &storageRecorder{}
cl := newCluster("abc")
cl.SetStore(store.New())
@ -932,7 +932,7 @@ func TestRecvSnapshot(t *testing.T) {
cfg: &ServerConfig{},
r: raftNode{
Node: n,
transport: &nopTransporter{},
transport: rafthttp.NewNopTransporter(),
storage: p,
raftStorage: raft.NewMemoryStorage(),
},
@ -963,8 +963,8 @@ func TestRecvSnapshot(t *testing.T) {
// TestApplySnapshotAndCommittedEntries tests that server applies snapshot
// first and then committed entries.
func TestApplySnapshotAndCommittedEntries(t *testing.T) {
n := newReadyNode()
st := &storeRecorder{}
n := newNopReadyNode()
st := store.NewRecorder()
cl := newCluster("abc")
cl.SetStore(store.New())
storage := raft.NewMemoryStorage()
@ -974,7 +974,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
Node: n,
storage: &storageRecorder{},
raftStorage: storage,
transport: &nopTransporter{},
transport: rafthttp.NewNopTransporter(),
},
store: st,
cluster: cl,
@ -989,10 +989,9 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
},
}
// make goroutines move forward to receive snapshot
testutil.WaitSchedule()
actions, _ := st.Wait(2)
s.Stop()
actions := st.Action()
if len(actions) != 2 {
t.Fatalf("len(action) = %d, want 2", len(actions))
}
@ -1018,7 +1017,7 @@ func TestAddMember(t *testing.T) {
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: &storageRecorder{},
transport: &nopTransporter{},
transport: rafthttp.NewNopTransporter(),
},
cfg: &ServerConfig{},
store: st,
@ -1058,7 +1057,7 @@ func TestRemoveMember(t *testing.T) {
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: &storageRecorder{},
transport: &nopTransporter{},
transport: rafthttp.NewNopTransporter(),
},
cfg: &ServerConfig{},
store: st,
@ -1097,7 +1096,7 @@ func TestUpdateMember(t *testing.T) {
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: &storageRecorder{},
transport: &nopTransporter{},
transport: rafthttp.NewNopTransporter(),
},
store: st,
cluster: cl,
@ -1124,11 +1123,11 @@ func TestUpdateMember(t *testing.T) {
// TODO: test server could stop itself when being removed
func TestPublish(t *testing.T) {
n := &nodeRecorder{}
n := newNodeRecorder()
ch := make(chan interface{}, 1)
// simulate that request has gone through consensus
ch <- Response{}
w := &waitWithResponse{ch: ch}
w := wait.NewWithResponse(ch)
srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
id: 1,
@ -1173,11 +1172,11 @@ func TestPublishStopped(t *testing.T) {
srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
r: raftNode{
Node: &nodeRecorder{},
transport: &nopTransporter{},
Node: newNodeNop(),
transport: rafthttp.NewNopTransporter(),
},
cluster: &cluster{},
w: &waitRecorder{},
w: wait.NewNop(),
done: make(chan struct{}),
stop: make(chan struct{}),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -1188,11 +1187,11 @@ func TestPublishStopped(t *testing.T) {
// TestPublishRetry tests that publish will keep retry until success.
func TestPublishRetry(t *testing.T) {
n := &nodeRecorder{}
n := newNodeRecorder()
srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: n},
w: &waitRecorder{},
w: wait.NewNop(),
done: make(chan struct{}),
reqIDGen: idutil.NewGenerator(0, time.Time{}),
}
@ -1208,11 +1207,11 @@ func TestPublishRetry(t *testing.T) {
}
func TestUpdateVersion(t *testing.T) {
n := &nodeRecorder{}
n := newNodeRecorder()
ch := make(chan interface{}, 1)
// simulate that request has gone through consensus
ch <- Response{}
w := &waitWithResponse{ch: ch}
w := wait.NewWithResponse(ch)
srv := &EtcdServer{
id: 1,
cfg: &ServerConfig{TickMs: 1},
@ -1312,166 +1311,11 @@ func TestGetOtherPeerURLs(t *testing.T) {
}
}
// storeRecorder records all the methods it receives.
// storeRecorder DOES NOT work as a actual store.
// It always returns invalid empty response and no error.
type storeRecorder struct{ testutil.Recorder }
func (s *storeRecorder) Version() int { return 0 }
func (s *storeRecorder) Index() uint64 { return 0 }
func (s *storeRecorder) Get(path string, recursive, sorted bool) (*store.Event, error) {
s.Record(testutil.Action{
Name: "Get",
Params: []interface{}{path, recursive, sorted},
})
return &store.Event{}, nil
}
func (s *storeRecorder) Set(path string, dir bool, val string, expr time.Time) (*store.Event, error) {
s.Record(testutil.Action{
Name: "Set",
Params: []interface{}{path, dir, val, expr},
})
return &store.Event{}, nil
}
func (s *storeRecorder) Update(path, val string, expr time.Time) (*store.Event, error) {
s.Record(testutil.Action{
Name: "Update",
Params: []interface{}{path, val, expr},
})
return &store.Event{}, nil
}
func (s *storeRecorder) Create(path string, dir bool, val string, uniq bool, exp time.Time) (*store.Event, error) {
s.Record(testutil.Action{
Name: "Create",
Params: []interface{}{path, dir, val, uniq, exp},
})
return &store.Event{}, nil
}
func (s *storeRecorder) CompareAndSwap(path, prevVal string, prevIdx uint64, val string, expr time.Time) (*store.Event, error) {
s.Record(testutil.Action{
Name: "CompareAndSwap",
Params: []interface{}{path, prevVal, prevIdx, val, expr},
})
return &store.Event{}, nil
}
func (s *storeRecorder) Delete(path string, dir, recursive bool) (*store.Event, error) {
s.Record(testutil.Action{
Name: "Delete",
Params: []interface{}{path, dir, recursive},
})
return &store.Event{}, nil
}
func (s *storeRecorder) CompareAndDelete(path, prevVal string, prevIdx uint64) (*store.Event, error) {
s.Record(testutil.Action{
Name: "CompareAndDelete",
Params: []interface{}{path, prevVal, prevIdx},
})
return &store.Event{}, nil
}
func (s *storeRecorder) Watch(_ string, _, _ bool, _ uint64) (store.Watcher, error) {
s.Record(testutil.Action{Name: "Watch"})
return &nopWatcher{}, nil
}
func (s *storeRecorder) Save() ([]byte, error) {
s.Record(testutil.Action{Name: "Save"})
return nil, nil
}
func (s *storeRecorder) Recovery(b []byte) error {
s.Record(testutil.Action{Name: "Recovery"})
return nil
}
func (s *storeRecorder) SaveNoCopy() ([]byte, error) {
s.Record(testutil.Action{Name: "SaveNoCopy"})
return nil, nil
}
func (s *storeRecorder) Clone() store.Store {
s.Record(testutil.Action{Name: "Clone"})
return s
}
func (s *storeRecorder) JsonStats() []byte { return nil }
func (s *storeRecorder) DeleteExpiredKeys(cutoff time.Time) {
s.Record(testutil.Action{
Name: "DeleteExpiredKeys",
Params: []interface{}{cutoff},
})
}
type nopWatcher struct{}
func (w *nopWatcher) EventChan() chan *store.Event { return nil }
func (w *nopWatcher) StartIndex() uint64 { return 0 }
func (w *nopWatcher) Remove() {}
// errStoreRecorder is a storeRecorder, but returns the given error on
// Get, Watch methods.
type errStoreRecorder struct {
storeRecorder
err error
}
func (s *errStoreRecorder) Get(path string, recursive, sorted bool) (*store.Event, error) {
s.storeRecorder.Get(path, recursive, sorted)
return nil, s.err
}
func (s *errStoreRecorder) Watch(path string, recursive, sorted bool, index uint64) (store.Watcher, error) {
s.storeRecorder.Watch(path, recursive, sorted, index)
return nil, s.err
}
type waitRecorder struct {
action []testutil.Action
}
func (w *waitRecorder) Register(id uint64) <-chan interface{} {
w.action = append(w.action, testutil.Action{Name: "Register"})
return nil
}
func (w *waitRecorder) Trigger(id uint64, x interface{}) {
w.action = append(w.action, testutil.Action{Name: "Trigger"})
}
type waitWithResponse struct {
ch <-chan interface{}
}
func (w *waitWithResponse) Register(id uint64) <-chan interface{} {
return w.ch
}
func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
type storageRecorder struct {
testutil.Recorder
dbPath string // must have '/' suffix if set
}
func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) error {
p.Record(testutil.Action{Name: "Save"})
return nil
}
func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error {
if !raft.IsEmptySnap(st) {
p.Record(testutil.Action{Name: "SaveSnap"})
}
return nil
}
func (p *storageRecorder) DBFilePath(id uint64) (string, error) {
p.Record(testutil.Action{Name: "DBFilePath"})
path := p.dbPath
if path != "" {
path = path + "/"
}
return fmt.Sprintf("%s%016x.snap.db", path, id), nil
}
func (p *storageRecorder) Close() error { return nil }
type nodeRecorder struct{ testutil.Recorder }
func newNodeRecorder() *nodeRecorder { return &nodeRecorder{&testutil.RecorderBuffered{}} }
func newNodeNop() raft.Node { return newNodeRecorder() }
func (n *nodeRecorder) Tick() { n.Record(testutil.Action{Name: "Tick"}) }
func (n *nodeRecorder) Campaign(ctx context.Context) error {
n.Record(testutil.Action{Name: "Campaign"})
@ -1513,22 +1357,42 @@ type nodeProposalBlockerRecorder struct {
nodeRecorder
}
func newProposalBlockerRecorder() *nodeProposalBlockerRecorder {
return &nodeProposalBlockerRecorder{*newNodeRecorder()}
}
func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte) error {
<-ctx.Done()
n.Record(testutil.Action{Name: "Propose blocked"})
return nil
}
type nodeConfChangeCommitterRecorder struct {
// readyNode is a nodeRecorder with a user-writeable ready channel
type readyNode struct {
nodeRecorder
readyc chan raft.Ready
index uint64
}
func newReadyNode() *readyNode {
return &readyNode{
nodeRecorder{testutil.NewRecorderStream()},
make(chan raft.Ready, 1)}
}
func newNopReadyNode() *readyNode {
return &readyNode{*newNodeRecorder(), make(chan raft.Ready, 1)}
}
func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
type nodeConfChangeCommitterRecorder struct {
readyNode
index uint64
}
func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
readyc := make(chan raft.Ready, 1)
return &nodeConfChangeCommitterRecorder{readyc: readyc}
return &nodeConfChangeCommitterRecorder{*newNopReadyNode(), 0}
}
func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
data, err := conf.Marshal()
if err != nil {
@ -1549,14 +1413,12 @@ func (n *nodeConfChangeCommitterRecorder) ApplyConfChange(conf raftpb.ConfChange
// nodeCommitter commits proposed data immediately.
type nodeCommitter struct {
nodeRecorder
readyc chan raft.Ready
index uint64
readyNode
index uint64
}
func newNodeCommitter() *nodeCommitter {
readyc := make(chan raft.Ready, 1)
return &nodeCommitter{readyc: readyc}
func newNodeCommitter() raft.Node {
return &nodeCommitter{*newNopReadyNode(), 0}
}
func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
n.index++
@ -1567,53 +1429,3 @@ func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
}
return nil
}
func (n *nodeCommitter) Ready() <-chan raft.Ready {
return n.readyc
}
type readyNode struct {
nodeRecorder
readyc chan raft.Ready
}
func newReadyNode() *readyNode {
readyc := make(chan raft.Ready, 1)
return &readyNode{readyc: readyc}
}
func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
type nopTransporter struct{}
func (s *nopTransporter) Start() error { return nil }
func (s *nopTransporter) Handler() http.Handler { return nil }
func (s *nopTransporter) Send(m []raftpb.Message) {}
func (s *nopTransporter) SendSnapshot(m snap.Message) {}
func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
func (s *nopTransporter) RemovePeer(id types.ID) {}
func (s *nopTransporter) RemoveAllPeers() {}
func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} }
func (s *nopTransporter) Stop() {}
func (s *nopTransporter) Pause() {}
func (s *nopTransporter) Resume() {}
type snapTransporter struct {
nopTransporter
snapDoneC chan snap.Message
snapDir string
}
func newSnapTransporter(snapDir string) *snapTransporter {
return &snapTransporter{
snapDoneC: make(chan snap.Message, 1),
snapDir: snapDir,
}
}
func (s *snapTransporter) SendSnapshot(m snap.Message) {
ss := snap.New(s.snapDir)
ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1)
m.CloseWithError(nil)
s.snapDoneC <- m
}

View File

@ -15,13 +15,16 @@
package etcdserver
import (
"fmt"
"io"
"os"
"path"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/version"
@ -144,3 +147,31 @@ func makeMemberDir(dir string) error {
}
return nil
}
type storageRecorder struct {
testutil.RecorderBuffered
dbPath string // must have '/' suffix if set
}
func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) error {
p.Record(testutil.Action{Name: "Save"})
return nil
}
func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error {
if !raft.IsEmptySnap(st) {
p.Record(testutil.Action{Name: "SaveSnap"})
}
return nil
}
func (p *storageRecorder) DBFilePath(id uint64) (string, error) {
p.Record(testutil.Action{Name: "DBFilePath"})
path := p.dbPath
if path != "" {
path = path + "/"
}
return fmt.Sprintf("%s%016x.snap.db", path, id), nil
}
func (p *storageRecorder) Close() error { return nil }

View File

@ -14,27 +14,120 @@
package testutil
import "sync"
import (
"errors"
"fmt"
"sync"
"time"
)
type Action struct {
Name string
Params []interface{}
}
type Recorder struct {
type Recorder interface {
// Record publishes an Action (e.g., function call) which will
// be reflected by Wait() or Chan()
Record(a Action)
// Wait waits until at least n Actions are availble or returns with error
Wait(n int) ([]Action, error)
// Action returns immediately available Actions
Action() []Action
// Chan returns the channel for actions published by Record
Chan() <-chan Action
}
// RecorderBuffered appends all Actions to a slice
type RecorderBuffered struct {
sync.Mutex
actions []Action
}
func (r *Recorder) Record(a Action) {
func (r *RecorderBuffered) Record(a Action) {
r.Lock()
r.actions = append(r.actions, a)
r.Unlock()
}
func (r *Recorder) Action() []Action {
func (r *RecorderBuffered) Action() []Action {
r.Lock()
cpy := make([]Action, len(r.actions))
copy(cpy, r.actions)
r.Unlock()
return cpy
}
func (r *RecorderBuffered) Wait(n int) (acts []Action, err error) {
// legacy racey behavior
WaitSchedule()
acts = r.Action()
if len(acts) < n {
err = newLenErr(n, len(r.actions))
}
return acts, err
}
func (r *RecorderBuffered) Chan() <-chan Action {
ch := make(chan Action)
go func() {
acts := r.Action()
for i := range acts {
ch <- acts[i]
}
close(ch)
}()
return ch
}
// RecorderStream writes all Actions to an unbuffered channel
type recorderStream struct {
ch chan Action
}
func NewRecorderStream() Recorder {
return &recorderStream{ch: make(chan Action)}
}
func (r *recorderStream) Record(a Action) {
r.ch <- a
}
func (r *recorderStream) Action() (acts []Action) {
for {
select {
case act := <-r.ch:
acts = append(acts, act)
default:
return acts
}
}
return acts
}
func (r *recorderStream) Chan() <-chan Action {
return r.ch
}
func (r *recorderStream) Wait(n int) ([]Action, error) {
acts := make([]Action, n)
timeoutC := time.After(5 * time.Second)
for i := 0; i < n; i++ {
select {
case acts[i] = <-r.ch:
case <-timeoutC:
acts = acts[:i]
return acts, newLenErr(n, i)
}
}
// extra wait to catch any Action spew
select {
case act := <-r.ch:
acts = append(acts, act)
case <-time.After(10 * time.Millisecond):
}
return acts, nil
}
func newLenErr(expected int, actual int) error {
s := fmt.Sprintf("len(actions) = %d, expected >= %d", actual, expected)
return errors.New(s)
}

View File

@ -18,6 +18,8 @@ package wait
import (
"sync"
"github.com/coreos/etcd/pkg/testutil"
)
type Wait interface {
@ -55,3 +57,39 @@ func (w *List) Trigger(id uint64, x interface{}) {
close(ch)
}
}
type WaitRecorder struct {
Wait
testutil.Recorder
}
type waitRecorder struct {
testutil.RecorderBuffered
}
func NewRecorder() *WaitRecorder {
wr := &waitRecorder{}
return &WaitRecorder{Wait: wr, Recorder: wr}
}
func NewNop() Wait { return NewRecorder() }
func (w *waitRecorder) Register(id uint64) <-chan interface{} {
w.Record(testutil.Action{Name: "Register"})
return nil
}
func (w *waitRecorder) Trigger(id uint64, x interface{}) {
w.Record(testutil.Action{Name: "Trigger"})
}
type waitWithResponse struct {
ch <-chan interface{}
}
func NewWithResponse(ch <-chan interface{}) Wait {
return &waitWithResponse{ch: ch}
}
func (w *waitWithResponse) Register(id uint64) <-chan interface{} {
return w.ch
}
func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}

View File

@ -308,3 +308,42 @@ func (t *Transport) Resume() {
p.(Pausable).Resume()
}
}
type nopTransporter struct{}
func NewNopTransporter() Transporter {
return &nopTransporter{}
}
func (s *nopTransporter) Start() error { return nil }
func (s *nopTransporter) Handler() http.Handler { return nil }
func (s *nopTransporter) Send(m []raftpb.Message) {}
func (s *nopTransporter) SendSnapshot(m snap.Message) {}
func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
func (s *nopTransporter) RemovePeer(id types.ID) {}
func (s *nopTransporter) RemoveAllPeers() {}
func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} }
func (s *nopTransporter) Stop() {}
func (s *nopTransporter) Pause() {}
func (s *nopTransporter) Resume() {}
type snapTransporter struct {
nopTransporter
snapDoneC chan snap.Message
snapDir string
}
func NewSnapTransporter(snapDir string) (Transporter, <-chan snap.Message) {
ch := make(chan snap.Message, 1)
tr := &snapTransporter{snapDoneC: ch, snapDir: snapDir}
return tr, ch
}
func (s *snapTransporter) SendSnapshot(m snap.Message) {
ss := snap.New(s.snapDir)
ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1)
m.CloseWithError(nil)
s.snapDoneC <- m
}

View File

@ -473,8 +473,11 @@ func newTestKeyBytes(rev revision, tombstone bool) []byte {
}
func newFakeStore() *store {
b := &fakeBackend{&fakeBatchTx{rangeRespc: make(chan rangeResp, 5)}}
b := &fakeBackend{&fakeBatchTx{
Recorder: &testutil.RecorderBuffered{},
rangeRespc: make(chan rangeResp, 5)}}
fi := &fakeIndex{
Recorder: &testutil.RecorderBuffered{},
indexGetRespc: make(chan indexGetResp, 1),
indexRangeRespc: make(chan indexRangeResp, 1),
indexRangeEventsRespc: make(chan indexRangeEventsResp, 1),

View File

@ -25,6 +25,7 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/types"
)
@ -743,3 +744,126 @@ func (s *store) JsonStats() []byte {
s.Stats.Watchers = uint64(s.WatcherHub.count)
return s.Stats.toJson()
}
// StoreRecorder provides a Store interface with a testutil.Recorder
type StoreRecorder struct {
Store
testutil.Recorder
}
// storeRecorder records all the methods it receives.
// storeRecorder DOES NOT work as a actual store.
// It always returns invalid empty response and no error.
type storeRecorder struct {
Store
testutil.RecorderBuffered
}
func NewNop() Store { return &storeRecorder{} }
func NewRecorder() *StoreRecorder {
sr := &storeRecorder{}
return &StoreRecorder{Store: sr, Recorder: sr}
}
func (s *storeRecorder) Version() int { return 0 }
func (s *storeRecorder) Index() uint64 { return 0 }
func (s *storeRecorder) Get(path string, recursive, sorted bool) (*Event, error) {
s.Record(testutil.Action{
Name: "Get",
Params: []interface{}{path, recursive, sorted},
})
return &Event{}, nil
}
func (s *storeRecorder) Set(path string, dir bool, val string, expr time.Time) (*Event, error) {
s.Record(testutil.Action{
Name: "Set",
Params: []interface{}{path, dir, val, expr},
})
return &Event{}, nil
}
func (s *storeRecorder) Update(path, val string, expr time.Time) (*Event, error) {
s.Record(testutil.Action{
Name: "Update",
Params: []interface{}{path, val, expr},
})
return &Event{}, nil
}
func (s *storeRecorder) Create(path string, dir bool, val string, uniq bool, exp time.Time) (*Event, error) {
s.Record(testutil.Action{
Name: "Create",
Params: []interface{}{path, dir, val, uniq, exp},
})
return &Event{}, nil
}
func (s *storeRecorder) CompareAndSwap(path, prevVal string, prevIdx uint64, val string, expr time.Time) (*Event, error) {
s.Record(testutil.Action{
Name: "CompareAndSwap",
Params: []interface{}{path, prevVal, prevIdx, val, expr},
})
return &Event{}, nil
}
func (s *storeRecorder) Delete(path string, dir, recursive bool) (*Event, error) {
s.Record(testutil.Action{
Name: "Delete",
Params: []interface{}{path, dir, recursive},
})
return &Event{}, nil
}
func (s *storeRecorder) CompareAndDelete(path, prevVal string, prevIdx uint64) (*Event, error) {
s.Record(testutil.Action{
Name: "CompareAndDelete",
Params: []interface{}{path, prevVal, prevIdx},
})
return &Event{}, nil
}
func (s *storeRecorder) Watch(_ string, _, _ bool, _ uint64) (Watcher, error) {
s.Record(testutil.Action{Name: "Watch"})
return NewNopWatcher(), nil
}
func (s *storeRecorder) Save() ([]byte, error) {
s.Record(testutil.Action{Name: "Save"})
return nil, nil
}
func (s *storeRecorder) Recovery(b []byte) error {
s.Record(testutil.Action{Name: "Recovery"})
return nil
}
func (s *storeRecorder) SaveNoCopy() ([]byte, error) {
s.Record(testutil.Action{Name: "SaveNoCopy"})
return nil, nil
}
func (s *storeRecorder) Clone() Store {
s.Record(testutil.Action{Name: "Clone"})
return s
}
func (s *storeRecorder) JsonStats() []byte { return nil }
func (s *storeRecorder) DeleteExpiredKeys(cutoff time.Time) {
s.Record(testutil.Action{
Name: "DeleteExpiredKeys",
Params: []interface{}{cutoff},
})
}
// errStoreRecorder is a storeRecorder, but returns the given error on
// Get, Watch methods.
type errStoreRecorder struct {
storeRecorder
err error
}
func NewErrRecorder(err error) *StoreRecorder {
sr := &errStoreRecorder{err: err}
return &StoreRecorder{Store: sr, Recorder: sr}
}
func (s *errStoreRecorder) Get(path string, recursive, sorted bool) (*Event, error) {
s.storeRecorder.Get(path, recursive, sorted)
return nil, s.err
}
func (s *errStoreRecorder) Watch(path string, recursive, sorted bool, index uint64) (Watcher, error) {
s.storeRecorder.Watch(path, recursive, sorted, index)
return nil, s.err
}

View File

@ -85,3 +85,11 @@ func (w *watcher) Remove() {
w.remove()
}
}
// nopWatcher is a watcher that receives nothing, always blocking.
type nopWatcher struct{}
func NewNopWatcher() Watcher { return &nopWatcher{} }
func (w *nopWatcher) EventChan() chan *Event { return nil }
func (w *nopWatcher) StartIndex() uint64 { return 0 }
func (w *nopWatcher) Remove() {}