mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
tests: Allow configuring integration tests to use TCP
This commit is contained in:
parent
7272a9585d
commit
6e04e8ae42
@ -161,6 +161,8 @@ type ClusterConfig struct {
|
||||
// UseBridge adds bridge between client and grpc server. Should be used in tests that
|
||||
// want to manipulate connection or require connection not breaking despite server stop/restart.
|
||||
UseBridge bool
|
||||
// UseTCP configures server listen on tcp socket. If disabled unix socket is used.
|
||||
UseTCP bool
|
||||
|
||||
EnableLeaseCheckpoint bool
|
||||
LeaseCheckpointInterval time.Duration
|
||||
@ -216,7 +218,7 @@ func newCluster(t testutil.TB, cfg *ClusterConfig) *cluster {
|
||||
c := &cluster{cfg: cfg}
|
||||
ms := make([]*member, cfg.Size)
|
||||
for i := 0; i < cfg.Size; i++ {
|
||||
ms[i] = c.mustNewMember(t, int32(i))
|
||||
ms[i] = c.mustNewMember(t, int64(i))
|
||||
}
|
||||
c.Members = ms
|
||||
if err := c.fillClusterForMembers(); err != nil {
|
||||
@ -303,11 +305,11 @@ func (c *cluster) HTTPMembers() []client.Member {
|
||||
return ms
|
||||
}
|
||||
|
||||
func (c *cluster) mustNewMember(t testutil.TB, number int32) *member {
|
||||
func (c *cluster) mustNewMember(t testutil.TB, memberNumber int64) *member {
|
||||
m := mustNewMember(t,
|
||||
memberConfig{
|
||||
name: c.generateMemberName(),
|
||||
memberNumber: number,
|
||||
memberNumber: memberNumber,
|
||||
authToken: c.cfg.AuthToken,
|
||||
peerTLS: c.cfg.PeerTLS,
|
||||
clientTLS: c.cfg.ClientTLS,
|
||||
@ -323,6 +325,7 @@ func (c *cluster) mustNewMember(t testutil.TB, number int32) *member {
|
||||
clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
|
||||
useIP: c.cfg.UseIP,
|
||||
useBridge: c.cfg.UseBridge,
|
||||
useTCP: c.cfg.UseTCP,
|
||||
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
|
||||
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
|
||||
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
|
||||
@ -338,7 +341,7 @@ func (c *cluster) mustNewMember(t testutil.TB, number int32) *member {
|
||||
|
||||
// addMember return PeerURLs of the added member.
|
||||
func (c *cluster) addMember(t testutil.TB) types.URLs {
|
||||
m := c.mustNewMember(t,0)
|
||||
m := c.mustNewMember(t, 0)
|
||||
|
||||
scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
|
||||
|
||||
@ -567,8 +570,8 @@ func NewListenerWithAddr(t testutil.TB, addr string) net.Listener {
|
||||
|
||||
type member struct {
|
||||
config.ServerConfig
|
||||
uniqNumber int32
|
||||
memberNumber int32
|
||||
UniqNumber int64
|
||||
MemberNumber int64
|
||||
PeerListeners, ClientListeners []net.Listener
|
||||
grpcListener net.Listener
|
||||
// PeerTLSInfo enables peer TLS when set
|
||||
@ -595,6 +598,7 @@ type member struct {
|
||||
clientMaxCallRecvMsgSize int
|
||||
useIP bool
|
||||
useBridge bool
|
||||
useTCP bool
|
||||
|
||||
isLearner bool
|
||||
closed bool
|
||||
@ -604,7 +608,8 @@ func (m *member) GRPCURL() string { return m.grpcURL }
|
||||
|
||||
type memberConfig struct {
|
||||
name string
|
||||
memberNumber int32
|
||||
uniqNumber int64
|
||||
memberNumber int64
|
||||
peerTLS *transport.TLSInfo
|
||||
clientTLS *transport.TLSInfo
|
||||
authToken string
|
||||
@ -620,6 +625,7 @@ type memberConfig struct {
|
||||
clientMaxCallRecvMsgSize int
|
||||
useIP bool
|
||||
useBridge bool
|
||||
useTCP bool
|
||||
enableLeaseCheckpoint bool
|
||||
leaseCheckpointInterval time.Duration
|
||||
WatchProgressNotifyInterval time.Duration
|
||||
@ -630,8 +636,8 @@ type memberConfig struct {
|
||||
func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
|
||||
var err error
|
||||
m := &member{
|
||||
uniqNumber: atomic.AddInt32(&uniqueNumber, 1),
|
||||
memberNumber: mcfg.memberNumber,
|
||||
MemberNumber: mcfg.memberNumber,
|
||||
UniqNumber: atomic.AddInt64(&localListenCount, 1),
|
||||
}
|
||||
|
||||
peerScheme := schemeFromTLSInfo(mcfg.peerTLS)
|
||||
@ -717,6 +723,7 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
|
||||
m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize
|
||||
m.useIP = mcfg.useIP
|
||||
m.useBridge = mcfg.useBridge
|
||||
m.useTCP = mcfg.useTCP
|
||||
m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint
|
||||
m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
|
||||
|
||||
@ -749,13 +756,14 @@ func memberLogger(t testutil.TB, name string) *zap.Logger {
|
||||
// listenGRPC starts a grpc server over a unix domain socket on the member
|
||||
func (m *member) listenGRPC() error {
|
||||
// prefix with localhost so cert has right domain
|
||||
grpcAddr := m.grpcAddr()
|
||||
network, host, port := m.grpcAddr()
|
||||
grpcAddr := host + ":" + port
|
||||
m.Logger.Info("LISTEN GRPC", zap.String("grpcAddr", grpcAddr), zap.String("m.Name", m.Name))
|
||||
grpcListener, err := transport.NewUnixListener(grpcAddr)
|
||||
grpcListener, err := net.Listen(network, grpcAddr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("listen failed on grpc socket %s (%v)", grpcAddr, err)
|
||||
}
|
||||
m.grpcURL = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + grpcAddr
|
||||
m.grpcURL = fmt.Sprintf("%s://%s", m.clientScheme(), grpcAddr)
|
||||
if m.useBridge {
|
||||
_, err = m.addBridge()
|
||||
if err != nil {
|
||||
@ -767,20 +775,36 @@ func (m *member) listenGRPC() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *member) clientScheme() string {
|
||||
switch {
|
||||
case m.useTCP && m.ClientTLSInfo != nil:
|
||||
return "https"
|
||||
case m.useTCP && m.ClientTLSInfo == nil:
|
||||
return "http"
|
||||
case !m.useTCP && m.ClientTLSInfo != nil:
|
||||
return "unixs"
|
||||
case !m.useTCP && m.ClientTLSInfo == nil:
|
||||
return "unix"
|
||||
}
|
||||
m.Logger.Panic("Failed to determine client schema")
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *member) addBridge() (*bridge, error) {
|
||||
grpcAddr := m.grpcAddr()
|
||||
network, host, port := m.grpcAddr()
|
||||
grpcAddr := host + ":" + port
|
||||
bridgeAddr := grpcAddr + "0"
|
||||
m.Logger.Info("LISTEN BRIDGE", zap.String("grpc-address", bridgeAddr), zap.String("member", m.Name))
|
||||
bridgeListener, err := transport.NewUnixListener(bridgeAddr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("listen failed on bridge socket %s (%v)", grpcAddr, err)
|
||||
return nil, fmt.Errorf("listen failed on bridge socket %s (%v)", bridgeAddr, err)
|
||||
}
|
||||
m.grpcBridge, err = newBridge(dialer{network: "unix", addr: grpcAddr}, bridgeListener)
|
||||
m.grpcBridge, err = newBridge(dialer{network: network, addr: grpcAddr}, bridgeListener)
|
||||
if err != nil {
|
||||
bridgeListener.Close()
|
||||
return nil, err
|
||||
}
|
||||
m.grpcURL = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + bridgeAddr
|
||||
m.grpcURL = m.clientScheme() + "://" + bridgeAddr
|
||||
return m.grpcBridge, nil
|
||||
}
|
||||
|
||||
@ -791,13 +815,25 @@ func (m *member) Bridge() *bridge {
|
||||
return m.grpcBridge
|
||||
}
|
||||
|
||||
func (m *member) grpcAddr() string {
|
||||
func (m *member) grpcAddr() (network, host, port string) {
|
||||
// prefix with localhost so cert has right domain
|
||||
host := "localhost"
|
||||
host = "localhost"
|
||||
if m.useIP { // for IP-only TLS certs
|
||||
host = "127.0.0.1"
|
||||
}
|
||||
return fmt.Sprintf("%s:%d", host, baseGRPCPort + m.uniqNumber * 10 + m.memberNumber)
|
||||
network = "unix"
|
||||
if m.useTCP {
|
||||
network = "tcp"
|
||||
}
|
||||
port = m.Name
|
||||
if m.useTCP {
|
||||
port = fmt.Sprintf("%d", GrpcPortNumber(m.UniqNumber, m.MemberNumber))
|
||||
}
|
||||
return network, host, port
|
||||
}
|
||||
|
||||
func GrpcPortNumber(uniqNumber, memberNumber int64) int64 {
|
||||
return baseGRPCPort + uniqNumber*10 + memberNumber
|
||||
}
|
||||
|
||||
type dialer struct {
|
||||
@ -1573,7 +1609,7 @@ func (p SortableProtoMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j],
|
||||
|
||||
// MustNewMember creates a new member instance based on the response of V3 Member Add API.
|
||||
func (c *ClusterV3) MustNewMember(t testutil.TB, resp *clientv3.MemberAddResponse) *member {
|
||||
m := c.mustNewMember(t,0)
|
||||
m := c.mustNewMember(t, 0)
|
||||
m.isLearner = resp.Member.IsLearner
|
||||
m.NewCluster = false
|
||||
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"math/rand"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -1594,8 +1595,10 @@ func TestTLSGRPCRejectSecureClient(t *testing.T) {
|
||||
|
||||
clus.Members[0].ClientTLSInfo = &testTLSInfo
|
||||
clus.Members[0].DialOptions = []grpc.DialOption{grpc.WithBlock()}
|
||||
clus.Members[0].grpcURL = strings.Replace(clus.Members[0].grpcURL, "http://", "https://", 1)
|
||||
client, err := NewClientV3(clus.Members[0])
|
||||
if client != nil || err == nil {
|
||||
client.Close()
|
||||
t.Fatalf("expected no client")
|
||||
} else if err != context.DeadlineExceeded {
|
||||
t.Fatalf("unexpected error (%v)", err)
|
||||
|
Loading…
x
Reference in New Issue
Block a user