diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index ee3172c7f..6ccbec035 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -27,7 +27,6 @@ import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/rafthttp" "go.uber.org/zap" ) @@ -161,7 +160,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { Node: n, storage: mockstorage.NewStorageRecorder(""), raftStorage: raft.NewMemoryStorage(), - transport: rafthttp.NewNopTransporter(), + transport: newNopTransporter(), }) srv := &EtcdServer{lgMu: new(sync.RWMutex), lg: zap.NewExample(), r: *r} srv.r.start(nil) @@ -189,7 +188,7 @@ func TestConfgChangeBlocksApply(t *testing.T) { Node: n, storage: mockstorage.NewStorageRecorder(""), raftStorage: raft.NewMemoryStorage(), - transport: rafthttp.NewNopTransporter(), + transport: newNopTransporter(), }) srv := &EtcdServer{lgMu: new(sync.RWMutex), lg: zap.NewExample(), r: *r} diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 0ef6cfb93..f22b64b1a 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "net/http" "os" "path" "path/filepath" @@ -182,7 +183,7 @@ func TestApplyRepeat(t *testing.T) { Node: n, raftStorage: raft.NewMemoryStorage(), storage: mockstorage.NewStorageRecorder(""), - transport: rafthttp.NewNopTransporter(), + transport: newNopTransporter(), }) s := &EtcdServer{ lgMu: new(sync.RWMutex), @@ -574,7 +575,7 @@ func TestApplyConfChangeShouldStop(t *testing.T) { r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), Node: newNodeNop(), - transport: rafthttp.NewNopTransporter(), + transport: newNopTransporter(), }) srv := &EtcdServer{ lgMu: new(sync.RWMutex), @@ -616,7 +617,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), Node: newNodeNop(), - transport: rafthttp.NewNopTransporter(), + transport: newNopTransporter(), }) srv := &EtcdServer{ lgMu: new(sync.RWMutex), @@ -664,7 +665,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), Node: newNodeNop(), - transport: rafthttp.NewNopTransporter(), + transport: newNopTransporter(), }) srv := &EtcdServer{ lgMu: new(sync.RWMutex), @@ -708,7 +709,7 @@ func TestDoProposal(t *testing.T) { Node: newNodeCommitter(), storage: mockstorage.NewStorageRecorder(""), raftStorage: raft.NewMemoryStorage(), - transport: rafthttp.NewNopTransporter(), + transport: newNopTransporter(), }) srv := &EtcdServer{ lgMu: new(sync.RWMutex), @@ -891,7 +892,7 @@ func TestSyncTrigger(t *testing.T) { lg: zap.NewExample(), Node: n, raftStorage: raft.NewMemoryStorage(), - transport: rafthttp.NewNopTransporter(), + transport: newNopTransporter(), storage: mockstorage.NewStorageRecorder(""), }) @@ -1020,7 +1021,7 @@ func TestSnapshotOrdering(t *testing.T) { rs := raft.NewMemoryStorage() p := mockstorage.NewStorageRecorderStream(testdir) - tr, snapDoneC := rafthttp.NewSnapTransporter(snapdir) + tr, snapDoneC := newSnapTransporter(snapdir) r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, @@ -1091,7 +1092,7 @@ func TestTriggerSnap(t *testing.T) { Node: newNodeCommitter(), raftStorage: raft.NewMemoryStorage(), storage: p, - transport: rafthttp.NewNopTransporter(), + transport: newNopTransporter(), }) srv := &EtcdServer{ lgMu: new(sync.RWMutex), @@ -1151,7 +1152,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { } rs := raft.NewMemoryStorage() - tr, snapDoneC := rafthttp.NewSnapTransporter(testdir) + tr, snapDoneC := newSnapTransporter(testdir) r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, @@ -1245,7 +1246,7 @@ func TestAddMember(t *testing.T) { Node: n, raftStorage: raft.NewMemoryStorage(), storage: mockstorage.NewStorageRecorder(""), - transport: rafthttp.NewNopTransporter(), + transport: newNopTransporter(), }) s := &EtcdServer{ lgMu: new(sync.RWMutex), @@ -1289,7 +1290,7 @@ func TestRemoveMember(t *testing.T) { Node: n, raftStorage: raft.NewMemoryStorage(), storage: mockstorage.NewStorageRecorder(""), - transport: rafthttp.NewNopTransporter(), + transport: newNopTransporter(), }) s := &EtcdServer{ lgMu: new(sync.RWMutex), @@ -1332,7 +1333,7 @@ func TestUpdateMember(t *testing.T) { Node: n, raftStorage: raft.NewMemoryStorage(), storage: mockstorage.NewStorageRecorder(""), - transport: rafthttp.NewNopTransporter(), + transport: newNopTransporter(), }) s := &EtcdServer{ lgMu: new(sync.RWMutex), @@ -1422,7 +1423,7 @@ func TestPublishStopped(t *testing.T) { r := newRaftNode(raftNodeConfig{ lg: zap.NewExample(), Node: newNodeNop(), - transport: rafthttp.NewNopTransporter(), + transport: newNopTransporter(), }) srv := &EtcdServer{ lgMu: new(sync.RWMutex), @@ -1727,3 +1728,43 @@ func newTestCluster(membs []*membership.Member) *membership.RaftCluster { } return c } + +type nopTransporter struct{} + +func newNopTransporter() rafthttp.Transporter { + return &nopTransporter{} +} + +func (s *nopTransporter) Start() error { return nil } +func (s *nopTransporter) Handler() http.Handler { return nil } +func (s *nopTransporter) Send(m []raftpb.Message) {} +func (s *nopTransporter) SendSnapshot(m raftsnap.Message) {} +func (s *nopTransporter) AddRemote(id types.ID, us []string) {} +func (s *nopTransporter) AddPeer(id types.ID, us []string) {} +func (s *nopTransporter) RemovePeer(id types.ID) {} +func (s *nopTransporter) RemoveAllPeers() {} +func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {} +func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} } +func (s *nopTransporter) ActivePeers() int { return 0 } +func (s *nopTransporter) Stop() {} +func (s *nopTransporter) Pause() {} +func (s *nopTransporter) Resume() {} + +type snapTransporter struct { + nopTransporter + snapDoneC chan raftsnap.Message + snapDir string +} + +func newSnapTransporter(snapDir string) (rafthttp.Transporter, <-chan raftsnap.Message) { + ch := make(chan raftsnap.Message, 1) + tr := &snapTransporter{snapDoneC: ch, snapDir: snapDir} + return tr, ch +} + +func (s *snapTransporter) SendSnapshot(m raftsnap.Message) { + ss := raftsnap.New(zap.NewExample(), s.snapDir) + ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1) + m.CloseWithError(nil) + s.snapDoneC <- m +} diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 27d37a12e..9127625a8 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -454,43 +454,3 @@ func (t *Transport) ActivePeers() (cnt int) { } return cnt } - -type nopTransporter struct{} - -func NewNopTransporter() Transporter { - return &nopTransporter{} -} - -func (s *nopTransporter) Start() error { return nil } -func (s *nopTransporter) Handler() http.Handler { return nil } -func (s *nopTransporter) Send(m []raftpb.Message) {} -func (s *nopTransporter) SendSnapshot(m raftsnap.Message) {} -func (s *nopTransporter) AddRemote(id types.ID, us []string) {} -func (s *nopTransporter) AddPeer(id types.ID, us []string) {} -func (s *nopTransporter) RemovePeer(id types.ID) {} -func (s *nopTransporter) RemoveAllPeers() {} -func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {} -func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} } -func (s *nopTransporter) ActivePeers() int { return 0 } -func (s *nopTransporter) Stop() {} -func (s *nopTransporter) Pause() {} -func (s *nopTransporter) Resume() {} - -type snapTransporter struct { - nopTransporter - snapDoneC chan raftsnap.Message - snapDir string -} - -func NewSnapTransporter(snapDir string) (Transporter, <-chan raftsnap.Message) { - ch := make(chan raftsnap.Message, 1) - tr := &snapTransporter{snapDoneC: ch, snapDir: snapDir} - return tr, ch -} - -func (s *snapTransporter) SendSnapshot(m raftsnap.Message) { - ss := raftsnap.New(zap.NewExample(), s.snapDir) - ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1) - m.CloseWithError(nil) - s.snapDoneC <- m -}