tests: Make using bridge optional

This commit is contained in:
Marek Siarkowicz 2021-09-24 13:55:34 +02:00
parent 994bd08723
commit f324894e8f
19 changed files with 119 additions and 89 deletions

View File

@ -29,8 +29,8 @@ type Dialer interface {
// to disconnect grpc network connections without closing the logical grpc connection.
type bridge struct {
dialer Dialer
l net.Listener
conns map[*bridgeConn]struct{}
l net.Listener
conns map[*bridgeConn]struct{}
stopc chan struct{}
pausec chan struct{}
@ -43,8 +43,8 @@ type bridge struct {
func newBridge(dialer Dialer, listener net.Listener) (*bridge, error) {
b := &bridge{
// bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number
dialer: dialer,
l: listener,
dialer: dialer,
l: listener,
conns: make(map[*bridgeConn]struct{}),
stopc: make(chan struct{}),
pausec: make(chan struct{}),

View File

@ -38,6 +38,7 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{
Size: 2,
GRPCKeepAliveMinTime: time.Millisecond, // avoid too_many_pings
UseBridge: true,
})
defer clus.Terminate(t)
@ -170,6 +171,7 @@ func testBalancerUnderBlackholeNoKeepAlive(t *testing.T, op func(*clientv3.Clien
clus := integration.NewClusterV3(t, &integration.ClusterConfig{
Size: 2,
SkipCreatingClient: true,
UseBridge: true,
})
defer clus.Terminate(t)

View File

@ -35,6 +35,7 @@ func TestBalancerUnderServerShutdownWatch(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{
Size: 3,
SkipCreatingClient: true,
UseBridge: true,
})
defer clus.Terminate(t)
@ -278,6 +279,7 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl
cfg := &integration.ClusterConfig{
Size: 2,
SkipCreatingClient: true,
UseBridge: true,
}
if linearizable {
cfg.Size = 3

View File

@ -712,7 +712,7 @@ func TestKVGetRetry(t *testing.T) {
integration.BeforeTest(t)
clusterSize := 3
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: clusterSize})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: clusterSize, UseBridge: true})
defer clus.Terminate(t)
// because killing leader and following election
@ -765,7 +765,7 @@ func TestKVGetRetry(t *testing.T) {
func TestKVPutFailGetRetry(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
kv := clus.Client(0)
@ -876,7 +876,7 @@ func TestKVPutStoppedServerAndClose(t *testing.T) {
// in the presence of network errors.
func TestKVPutAtMostOnce(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
if _, err := clus.Client(0).Put(context.TODO(), "k", "1"); err != nil {

View File

@ -190,7 +190,7 @@ func TestLeaseKeepAliveHandleFailure(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
// TODO: change this line to get a cluster client
@ -416,7 +416,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {
func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
cli := clus.Client(0)
@ -462,7 +462,7 @@ func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) {
func TestLeaseKeepAliveInitTimeout(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
cli := clus.Client(0)
@ -495,7 +495,7 @@ func TestLeaseKeepAliveInitTimeout(t *testing.T) {
func TestLeaseKeepAliveTTLTimeout(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
cli := clus.Client(0)
@ -530,7 +530,7 @@ func TestLeaseKeepAliveTTLTimeout(t *testing.T) {
func TestLeaseTimeToLive(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
c := clus.RandClient()
@ -656,7 +656,7 @@ func TestLeaseLeases(t *testing.T) {
func TestLeaseRenewLostQuorum(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
cli := clus.Client(0)
@ -728,7 +728,7 @@ func TestLeaseKeepAliveLoopExit(t *testing.T) {
// transient cluster failure.
func TestV3LeaseFailureOverlap(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, UseBridge: true})
defer clus.Terminate(t)
numReqs := 5
@ -782,7 +782,7 @@ func TestV3LeaseFailureOverlap(t *testing.T) {
func TestLeaseWithRequireLeader(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, UseBridge: true})
defer clus.Terminate(t)
c := clus.Client(0)

View File

@ -194,7 +194,7 @@ func TestLeasingPutInvalidateExisting(t *testing.T) {
// TestLeasingGetNoLeaseTTL checks a key with a TTL is not leased.
func TestLeasingGetNoLeaseTTL(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -223,7 +223,7 @@ func TestLeasingGetNoLeaseTTL(t *testing.T) {
// when the etcd cluster is partitioned.
func TestLeasingGetSerializable(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -325,7 +325,7 @@ func TestLeasingRevGet(t *testing.T) {
// TestLeasingGetWithOpts checks options that can be served through the cache do not depend on the server.
func TestLeasingGetWithOpts(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -417,7 +417,7 @@ func TestLeasingConcurrentPut(t *testing.T) {
func TestLeasingDisconnectedGet(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -549,7 +549,7 @@ func TestLeasingOverwriteResponse(t *testing.T) {
func TestLeasingOwnerPutResponse(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -616,7 +616,7 @@ func TestLeasingTxnOwnerGetRange(t *testing.T) {
func TestLeasingTxnOwnerGet(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
client := clus.Client(0)
@ -772,7 +772,7 @@ func TestLeasingTxnOwnerDelete(t *testing.T) {
func TestLeasingTxnOwnerIf(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -866,7 +866,7 @@ func TestLeasingTxnOwnerIf(t *testing.T) {
func TestLeasingTxnCancel(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
lkv1, closeLKV1, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -1084,7 +1084,7 @@ func TestLeasingTxnRandIfThenOrElse(t *testing.T) {
func TestLeasingOwnerPutError(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -1105,7 +1105,7 @@ func TestLeasingOwnerPutError(t *testing.T) {
func TestLeasingOwnerDeleteError(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -1126,7 +1126,7 @@ func TestLeasingOwnerDeleteError(t *testing.T) {
func TestLeasingNonOwnerPutError(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
@ -1200,7 +1200,7 @@ func testLeasingOwnerDelete(t *testing.T, del clientv3.Op) {
func TestLeasingDeleteRangeBounds(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
delkv, closeDelKV, err := leasing.NewKV(clus.Client(0), "0/")
@ -1375,7 +1375,7 @@ func TestLeasingPutGetDeleteConcurrent(t *testing.T) {
// disconnected when trying to submit revoke txn.
func TestLeasingReconnectOwnerRevoke(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
lkv1, closeLKV1, err1 := leasing.NewKV(clus.Client(0), "foo/")
@ -1436,7 +1436,7 @@ func TestLeasingReconnectOwnerRevoke(t *testing.T) {
// disconnected and the watch is compacted.
func TestLeasingReconnectOwnerRevokeCompact(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
lkv1, closeLKV1, err1 := leasing.NewKV(clus.Client(0), "foo/")
@ -1489,7 +1489,7 @@ func TestLeasingReconnectOwnerRevokeCompact(t *testing.T) {
// not cause inconsistency between the server and the client.
func TestLeasingReconnectOwnerConsistency(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
@ -1649,7 +1649,7 @@ func TestLeasingTxnAtomicCache(t *testing.T) {
// TestLeasingReconnectTxn checks that Txn is resilient to disconnects.
func TestLeasingReconnectTxn(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
@ -1685,7 +1685,7 @@ func TestLeasingReconnectTxn(t *testing.T) {
// not cause inconsistency between the server and the client.
func TestLeasingReconnectNonOwnerGet(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
@ -1813,7 +1813,7 @@ func TestLeasingDo(t *testing.T) {
func TestLeasingTxnOwnerPutBranch(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
@ -1907,7 +1907,7 @@ func randCmps(pfx string, dat []*clientv3.PutResponse) (cmps []clientv3.Cmp, the
func TestLeasingSessionExpire(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1))
@ -1983,7 +1983,7 @@ func TestLeasingSessionExpireCancel(t *testing.T) {
for i := range tests {
t.Run(fmt.Sprintf("test %d", i), func(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1))

View File

@ -142,7 +142,7 @@ func TestMaintenanceSnapshotError(t *testing.T) {
func TestMaintenanceSnapshotErrorInflight(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
// take about 1-second to read snapshot

View File

@ -30,7 +30,7 @@ func TestDetectKvOrderViolation(t *testing.T) {
var errOrderViolation = errors.New("DetectedOrderViolation")
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
cfg := clientv3.Config{
@ -97,7 +97,7 @@ func TestDetectTxnOrderViolation(t *testing.T) {
var errOrderViolation = errors.New("DetectedOrderViolation")
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
cfg := clientv3.Config{

View File

@ -80,7 +80,7 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) {
func TestUnresolvableOrderViolation(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 5, SkipCreatingClient: true})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 5, SkipCreatingClient: true, UseBridge: true})
defer clus.Terminate(t)
cfg := clientv3.Config{
Endpoints: []string{

View File

@ -53,7 +53,7 @@ func TestTxnError(t *testing.T) {
func TestTxnWriteFail(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
kv := clus.Client(0)
@ -103,7 +103,7 @@ func TestTxnReadRetry(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
kv := clus.Client(0)

View File

@ -47,7 +47,7 @@ type watchctx struct {
func runWatchTest(t *testing.T, f watcherTest) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
wclientMember := rand.Intn(3)
@ -348,7 +348,7 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
func TestWatchResumeInitRev(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
cli := clus.Client(0)
@ -404,7 +404,7 @@ func TestWatchResumeInitRev(t *testing.T) {
func TestWatchResumeCompacted(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
// create a waiting watcher at rev 1
@ -955,7 +955,7 @@ func TestWatchWithCreatedNotification(t *testing.T) {
func TestWatchWithCreatedNotificationDropConn(t *testing.T) {
integration.BeforeTest(t)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer cluster.Terminate(t)
client := cluster.RandClient()
@ -1063,7 +1063,7 @@ func TestWatchOverlapDropConnContextCancel(t *testing.T) {
func testWatchOverlapContextCancel(t *testing.T, f func(*integration.ClusterV3)) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
n := 100
@ -1154,7 +1154,7 @@ func TestWatchCancelAndCloseClient(t *testing.T) {
// then closes the watcher interface to ensure correct clean up.
func TestWatchStressResumeClose(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
cli := clus.Client(0)

View File

@ -153,6 +153,9 @@ type ClusterConfig struct {
// UseIP is true to use only IP for gRPC requests.
UseIP bool
// UseBridge adds bridge between client and grpc server. Should be used in tests that
// want to manipulate connection or require connection not breaking despite server stop/restart.
UseBridge bool
EnableLeaseCheckpoint bool
LeaseCheckpointInterval time.Duration
@ -313,6 +316,7 @@ func (c *cluster) mustNewMember(t testutil.TB) *member {
clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize,
clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
useIP: c.cfg.UseIP,
useBridge: c.cfg.UseBridge,
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
@ -582,6 +586,7 @@ type member struct {
clientMaxCallSendMsgSize int
clientMaxCallRecvMsgSize int
useIP bool
useBridge bool
isLearner bool
closed bool
@ -605,6 +610,7 @@ type memberConfig struct {
clientMaxCallSendMsgSize int
clientMaxCallRecvMsgSize int
useIP bool
useBridge bool
enableLeaseCheckpoint bool
leaseCheckpointInterval time.Duration
WatchProgressNotifyInterval time.Duration
@ -698,6 +704,7 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member {
m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize
m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize
m.useIP = mcfg.useIP
m.useBridge = mcfg.useBridge
m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint
m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
@ -730,35 +737,53 @@ func memberLogger(t testutil.TB, name string) *zap.Logger {
// listenGRPC starts a grpc server over a unix domain socket on the member
func (m *member) listenGRPC() error {
// prefix with localhost so cert has right domain
m.grpcURL = "localhost:" + m.Name
m.Logger.Info("LISTEN GRPC", zap.String("m.grpcURL", m.grpcURL), zap.String("m.Name", m.Name))
if m.useIP { // for IP-only TLS certs
m.grpcURL = "127.0.0.1:" + m.Name
}
grpcListener, err := transport.NewUnixListener(m.grpcURL)
grpcAddr := m.grpcAddr()
m.Logger.Info("LISTEN GRPC", zap.String("grpcAddr", grpcAddr), zap.String("m.Name", m.Name))
grpcListener, err := transport.NewUnixListener(grpcAddr)
if err != nil {
return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcURL, err)
return fmt.Errorf("listen failed on grpc socket %s (%v)", grpcAddr, err)
}
bridgeAddr := m.grpcURL + "0"
bridgeListener, err := transport.NewUnixListener(bridgeAddr)
if err != nil {
grpcListener.Close()
return fmt.Errorf("listen failed on bridge socket %s (%v)", m.grpcURL, err)
m.grpcURL = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + grpcAddr
if m.useBridge {
_, err = m.addBridge()
if err != nil {
grpcListener.Close()
return err
}
}
m.grpcBridge, err = newBridge(dialer{network: "unix", addr: m.grpcURL}, bridgeListener)
if err != nil {
bridgeListener.Close()
grpcListener.Close()
return err
}
m.grpcURL = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + bridgeAddr
m.grpcListener = grpcListener
return nil
}
func (m *member) addBridge() (*bridge, error) {
grpcAddr := m.grpcAddr()
bridgeAddr := grpcAddr + "0"
m.Logger.Info("LISTEN BRIDGE", zap.String("grpc-address", bridgeAddr), zap.String("member", m.Name))
bridgeListener, err := transport.NewUnixListener(bridgeAddr)
if err != nil {
return nil, fmt.Errorf("listen failed on bridge socket %s (%v)", grpcAddr, err)
}
m.grpcBridge, err = newBridge(dialer{network: "unix", addr: grpcAddr}, bridgeListener)
if err != nil {
bridgeListener.Close()
return nil, err
}
m.grpcURL = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + bridgeAddr
return m.grpcBridge, nil
}
func (m *member) grpcAddr() string {
// prefix with localhost so cert has right domain
addr := "localhost:" + m.Name
if m.useIP { // for IP-only TLS certs
addr = "127.0.0.1:" + m.Name
}
return addr
}
type dialer struct {
network string
addr string
addr string
}
func (d dialer) Dial() (net.Conn, error) {

View File

@ -173,7 +173,7 @@ func testDecreaseClusterSize(t *testing.T, size int) {
}
func TestForceNewCluster(t *testing.T) {
c := NewCluster(t, 3)
c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true})
c.Launch(t)
cc := MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
kapi := client.NewKeysAPI(cc)
@ -283,7 +283,7 @@ func testIssue2746(t *testing.T, members int) {
func TestIssue2904(t *testing.T) {
BeforeTest(t)
// start 1-member cluster to ensure member 0 is the leader of the cluster.
c := NewCluster(t, 1)
c := newCluster(t, &ClusterConfig{Size: 1, UseBridge: true})
c.Launch(t)
defer c.Terminate(t)
@ -319,7 +319,7 @@ func TestIssue2904(t *testing.T) {
func TestIssue3699(t *testing.T) {
// start a cluster of 3 nodes a, b, c
BeforeTest(t)
c := NewCluster(t, 3)
c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true})
c.Launch(t)
defer c.Terminate(t)
@ -371,7 +371,7 @@ func TestIssue3699(t *testing.T) {
// TestRejectUnhealthyAdd ensures an unhealthy cluster rejects adding members.
func TestRejectUnhealthyAdd(t *testing.T) {
BeforeTest(t)
c := NewCluster(t, 3)
c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true})
for _, m := range c.Members {
m.ServerConfig.StrictReconfigCheck = true
}
@ -415,7 +415,7 @@ func TestRejectUnhealthyAdd(t *testing.T) {
// if quorum will be lost.
func TestRejectUnhealthyRemove(t *testing.T) {
BeforeTest(t)
c := NewCluster(t, 5)
c := newCluster(t, &ClusterConfig{Size: 5, UseBridge: true})
for _, m := range c.Members {
m.ServerConfig.StrictReconfigCheck = true
}
@ -464,7 +464,7 @@ func TestRestartRemoved(t *testing.T) {
BeforeTest(t)
// 1. start single-member cluster
c := NewCluster(t, 1)
c := newCluster(t, &ClusterConfig{Size: 1, UseBridge: true})
for _, m := range c.Members {
m.ServerConfig.StrictReconfigCheck = true
}
@ -540,7 +540,7 @@ func clusterMustProgress(t *testing.T, membs []*member) {
func TestSpeedyTerminate(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3, UseBridge: true})
// Stop/Restart so requests will time out on lost leaders
for i := 0; i < 3; i++ {
clus.Members[i].Stop(t)

View File

@ -46,7 +46,7 @@ func TestPauseMember(t *testing.T) {
func TestRestartMember(t *testing.T) {
BeforeTest(t)
c := NewCluster(t, 3)
c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true})
c.Launch(t)
defer c.Terminate(t)
@ -88,7 +88,7 @@ func TestLaunchDuplicateMemberShouldFail(t *testing.T) {
func TestSnapshotAndRestartMember(t *testing.T) {
BeforeTest(t)
m := mustNewMember(t, memberConfig{name: "snapAndRestartTest"})
m := mustNewMember(t, memberConfig{name: "snapAndRestartTest", useBridge: true})
m.SnapshotCount = 100
m.Launch()
defer m.Terminate(t)

View File

@ -35,7 +35,7 @@ func TestV3StorageQuotaApply(t *testing.T) {
BeforeTest(t)
quotasize := int64(16 * os.Getpagesize())
clus := NewClusterV3(t, &ClusterConfig{Size: 2})
clus := NewClusterV3(t, &ClusterConfig{Size: 2, UseBridge: true})
defer clus.Terminate(t)
kvc0 := toGRPC(clus.Client(0)).KV
kvc1 := toGRPC(clus.Client(1)).KV
@ -147,7 +147,7 @@ func TestV3AlarmDeactivate(t *testing.T) {
func TestV3CorruptAlarm(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
var wg sync.WaitGroup

View File

@ -61,7 +61,7 @@ func TestV3MaintenanceDefragmentInflightRange(t *testing.T) {
// See https://github.com/etcd-io/etcd/issues/7322 for more detail.
func TestV3KVInflightRangeRequests(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
cli := clus.RandClient()

View File

@ -88,7 +88,7 @@ func TestV3PutOverwrite(t *testing.T) {
// TestPutRestart checks if a put after an unrelated member restart succeeds
func TestV3PutRestart(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
kvIdx := rand.Intn(3)
@ -1207,7 +1207,7 @@ func TestV3Hash(t *testing.T) {
// TestV3HashRestart ensures that hash stays the same after restart.
func TestV3HashRestart(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
cli := clus.RandClient()
@ -1240,7 +1240,7 @@ func TestV3StorageQuotaAPI(t *testing.T) {
BeforeTest(t)
quotasize := int64(16 * os.Getpagesize())
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3, UseBridge: true})
// Set a quota on one node
clus.Members[0].QuotaBackendBytes = quotasize
@ -1851,7 +1851,7 @@ func TestGRPCRequireLeader(t *testing.T) {
func TestGRPCStreamRequireLeader(t *testing.T) {
BeforeTest(t)
cfg := ClusterConfig{Size: 3}
cfg := ClusterConfig{Size: 3, UseBridge: true}
clus := newClusterV3NoClients(t, &cfg)
defer clus.Terminate(t)

View File

@ -36,7 +36,7 @@ import (
func TestV3LeasePromote(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
clus := NewClusterV3(t, &ClusterConfig{Size: 3, UseBridge: true})
defer clus.Terminate(t)
// create lease
@ -237,6 +237,7 @@ func TestV3LeaseCheckpoint(t *testing.T) {
Size: 3,
EnableLeaseCheckpoint: true,
LeaseCheckpointInterval: leaseInterval,
UseBridge: true,
})
defer clus.Terminate(t)
@ -649,7 +650,7 @@ const fiveMinTTL int64 = 300
func TestV3LeaseRecoverAndRevoke(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
kvc := toGRPC(clus.Client(0)).KV
@ -700,7 +701,7 @@ func TestV3LeaseRecoverAndRevoke(t *testing.T) {
func TestV3LeaseRevokeAndRecover(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
kvc := toGRPC(clus.Client(0)).KV
@ -752,7 +753,7 @@ func TestV3LeaseRevokeAndRecover(t *testing.T) {
func TestV3LeaseRecoverKeyWithDetachedLease(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
kvc := toGRPC(clus.Client(0)).KV
@ -808,7 +809,7 @@ func TestV3LeaseRecoverKeyWithDetachedLease(t *testing.T) {
func TestV3LeaseRecoverKeyWithMutipleLease(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
kvc := toGRPC(clus.Client(0)).KV

View File

@ -1040,7 +1040,7 @@ func TestWatchWithProgressNotify(t *testing.T) {
// TestV3WatcMultiOpenhClose opens many watchers concurrently on multiple streams.
func TestV3WatchClose(t *testing.T) {
BeforeTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true})
defer clus.Terminate(t)
c := clus.Client(0)