diff --git a/integration/cluster.go b/integration/cluster.go index bc6881126..79e3f6d44 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -188,7 +188,7 @@ func (c *cluster) fillClusterForMembers() error { return nil } -func newCluster(t *testing.T, cfg *ClusterConfig) *cluster { +func newCluster(t testing.TB, cfg *ClusterConfig) *cluster { c := &cluster{cfg: cfg} ms := make([]*member, cfg.Size) for i := 0; i < cfg.Size; i++ { @@ -204,16 +204,16 @@ func newCluster(t *testing.T, cfg *ClusterConfig) *cluster { // NewCluster returns an unlaunched cluster of the given size which has been // set to use static bootstrap. -func NewCluster(t *testing.T, size int) *cluster { +func NewCluster(t testing.TB, size int) *cluster { return newCluster(t, &ClusterConfig{Size: size}) } // NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration -func NewClusterByConfig(t *testing.T, cfg *ClusterConfig) *cluster { +func NewClusterByConfig(t testing.TB, cfg *ClusterConfig) *cluster { return newCluster(t, cfg) } -func (c *cluster) Launch(t *testing.T) { +func (c *cluster) Launch(t testing.TB) { errc := make(chan error) for _, m := range c.Members { // Members are launched in separate goroutines because if they boot @@ -274,7 +274,7 @@ func (c *cluster) HTTPMembers() []client.Member { return ms } -func (c *cluster) mustNewMember(t *testing.T) *member { +func (c *cluster) mustNewMember(t testing.TB) *member { m := mustNewMember(t, memberConfig{ name: c.name(rand.Int()), @@ -303,7 +303,7 @@ func (c *cluster) mustNewMember(t *testing.T) *member { return m } -func (c *cluster) addMember(t *testing.T) { +func (c *cluster) addMember(t testing.TB) { m := c.mustNewMember(t) scheme := schemeFromTLSInfo(c.cfg.PeerTLS) @@ -335,7 +335,7 @@ func (c *cluster) addMember(t *testing.T) { c.waitMembersMatch(t, c.HTTPMembers()) } -func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error { +func (c *cluster) addMemberByURL(t testing.TB, clientURL, peerURL string) error { cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS) ma := client.NewMembersAPI(cc) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) @@ -351,17 +351,17 @@ func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error return nil } -func (c *cluster) AddMember(t *testing.T) { +func (c *cluster) AddMember(t testing.TB) { c.addMember(t) } -func (c *cluster) RemoveMember(t *testing.T, id uint64) { +func (c *cluster) RemoveMember(t testing.TB, id uint64) { if err := c.removeMember(t, id); err != nil { t.Fatal(err) } } -func (c *cluster) removeMember(t *testing.T, id uint64) error { +func (c *cluster) removeMember(t testing.TB, id uint64) error { // send remove request to the cluster cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS) ma := client.NewMembersAPI(cc) @@ -392,7 +392,7 @@ func (c *cluster) removeMember(t *testing.T, id uint64) error { return nil } -func (c *cluster) Terminate(t *testing.T) { +func (c *cluster) Terminate(t testing.TB) { var wg sync.WaitGroup wg.Add(len(c.Members)) for _, m := range c.Members { @@ -404,7 +404,7 @@ func (c *cluster) Terminate(t *testing.T) { wg.Wait() } -func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) { +func (c *cluster) waitMembersMatch(t testing.TB, membs []client.Member) { for _, u := range c.URLs() { cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS) ma := client.NewMembersAPI(cc) @@ -420,10 +420,10 @@ func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) { } } -func (c *cluster) WaitLeader(t *testing.T) int { return c.waitLeader(t, c.Members) } +func (c *cluster) WaitLeader(t testing.TB) int { return c.waitLeader(t, c.Members) } // waitLeader waits until given members agree on the same leader. -func (c *cluster) waitLeader(t *testing.T, membs []*member) int { +func (c *cluster) waitLeader(t testing.TB, membs []*member) int { possibleLead := make(map[uint64]bool) var lead uint64 for _, m := range membs { @@ -516,14 +516,14 @@ func isMembersEqual(membs []client.Member, wmembs []client.Member) bool { return reflect.DeepEqual(membs, wmembs) } -func newLocalListener(t *testing.T) net.Listener { +func newLocalListener(t testing.TB) net.Listener { c := atomic.AddInt64(&localListenCount, 1) // Go 1.8+ allows only numbers in port addr := fmt.Sprintf("127.0.0.1:%05d%05d", c+basePort, os.Getpid()) return NewListenerWithAddr(t, addr) } -func NewListenerWithAddr(t *testing.T, addr string) net.Listener { +func NewListenerWithAddr(t testing.TB, addr string) net.Listener { l, err := transport.NewUnixListener(addr) if err != nil { t.Fatal(err) @@ -583,7 +583,7 @@ type memberConfig struct { // mustNewMember return an inited member with the given name. If peerTLS is // set, it will use https scheme to communicate between peers. -func mustNewMember(t *testing.T, mcfg memberConfig) *member { +func mustNewMember(t testing.TB, mcfg memberConfig) *member { var err error m := &member{} @@ -759,7 +759,7 @@ func NewClientV3(m *member) (*clientv3.Client, error) { // Clone returns a member with the same server configuration. The returned // member will not set PeerListeners and ClientListeners. -func (m *member) Clone(t *testing.T) *member { +func (m *member) Clone(t testing.TB) *member { mm := &member{} mm.ServerConfig = m.ServerConfig @@ -959,14 +959,14 @@ func (m *member) Launch() error { return nil } -func (m *member) WaitOK(t *testing.T) { +func (m *member) WaitOK(t testing.TB) { m.WaitStarted(t) for m.s.Leader() == 0 { time.Sleep(tickDuration) } } -func (m *member) WaitStarted(t *testing.T) { +func (m *member) WaitStarted(t testing.TB) { cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo) kapi := client.NewKeysAPI(cc) for { @@ -981,7 +981,7 @@ func (m *member) WaitStarted(t *testing.T) { } } -func WaitClientV3(t *testing.T, kv clientv3.KV) { +func WaitClientV3(t testing.TB, kv clientv3.KV) { timeout := time.Now().Add(requestTimeout) var err error for time.Now().Before(timeout) { @@ -1035,7 +1035,7 @@ func (m *member) Close() { } // Stop stops the member, but the data dir of the member is preserved. -func (m *member) Stop(t *testing.T) { +func (m *member) Stop(t testing.TB) { lg.Info( "stopping a member", zap.String("name", m.Name), @@ -1069,7 +1069,7 @@ func (m *member) StopNotify() <-chan struct{} { } // Restart starts the member using the preserved data dir. -func (m *member) Restart(t *testing.T) error { +func (m *member) Restart(t testing.TB) error { lg.Info( "restarting a member", zap.String("name", m.Name), @@ -1107,7 +1107,7 @@ func (m *member) Restart(t *testing.T) error { } // Terminate stops the member and removes the data dir. -func (m *member) Terminate(t *testing.T) { +func (m *member) Terminate(t testing.TB) { lg.Info( "terminating a member", zap.String("name", m.Name), @@ -1157,7 +1157,7 @@ func (m *member) Metric(metricName string) (string, error) { } // InjectPartition drops connections from m to others, vice versa. -func (m *member) InjectPartition(t *testing.T, others ...*member) { +func (m *member) InjectPartition(t testing.TB, others ...*member) { for _, other := range others { m.s.CutPeer(other.s.ID()) other.s.CutPeer(m.s.ID()) @@ -1165,14 +1165,14 @@ func (m *member) InjectPartition(t *testing.T, others ...*member) { } // RecoverPartition recovers connections from m to others, vice versa. -func (m *member) RecoverPartition(t *testing.T, others ...*member) { +func (m *member) RecoverPartition(t testing.TB, others ...*member) { for _, other := range others { m.s.MendPeer(other.s.ID()) other.s.MendPeer(m.s.ID()) } } -func MustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client { +func MustNewHTTPClient(t testing.TB, eps []string, tls *transport.TLSInfo) client.Client { cfgtls := transport.TLSInfo{} if tls != nil { cfgtls = *tls @@ -1185,7 +1185,7 @@ func MustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) clien return c } -func mustNewTransport(t *testing.T, tlsInfo transport.TLSInfo) *http.Transport { +func mustNewTransport(t testing.TB, tlsInfo transport.TLSInfo) *http.Transport { // tick in integration test is short, so 1s dial timeout could play well. tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout) if err != nil { @@ -1211,7 +1211,7 @@ type ClusterV3 struct { // NewClusterV3 returns a launched cluster with a grpc client connection // for each cluster member. -func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 { +func NewClusterV3(t testing.TB, cfg *ClusterConfig) *ClusterV3 { cfg.UseGRPC = true if os.Getenv("CLIENT_DEBUG") != "" { clientv3.SetLogger(grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 4)) @@ -1240,7 +1240,7 @@ func (c *ClusterV3) TakeClient(idx int) { c.mu.Unlock() } -func (c *ClusterV3) Terminate(t *testing.T) { +func (c *ClusterV3) Terminate(t testing.TB) { c.mu.Lock() for _, client := range c.clients { if client == nil {