mirror of
				https://github.com/etcd-io/etcd.git
				synced 2024-09-27 06:25:44 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			259 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			259 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2017 The etcd Authors
 | |
| //
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| //     http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| // +build !cluster_proxy
 | |
| 
 | |
| package integration
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"testing"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/coreos/etcd/clientv3"
 | |
| 	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
 | |
| 	"github.com/coreos/etcd/integration"
 | |
| 	"github.com/coreos/etcd/pkg/testutil"
 | |
| )
 | |
| 
 | |
| var errExpected = errors.New("expected error")
 | |
| 
 | |
| // TestBalancerUnderNetworkPartitionPut tests when one member becomes isolated,
 | |
| // first Put request fails, and following retry succeeds with client balancer
 | |
| // switching to others.
 | |
| func TestBalancerUnderNetworkPartitionPut(t *testing.T) {
 | |
| 	testBalancerUnderNetworkPartition(t, func(cli *clientv3.Client, ctx context.Context) error {
 | |
| 		_, err := cli.Put(ctx, "a", "b")
 | |
| 		if err == context.DeadlineExceeded || isServerCtxTimeout(err) || err == rpctypes.ErrTimeout {
 | |
| 			return errExpected
 | |
| 		}
 | |
| 		return err
 | |
| 	}, time.Second)
 | |
| }
 | |
| 
 | |
| func TestBalancerUnderNetworkPartitionDelete(t *testing.T) {
 | |
| 	testBalancerUnderNetworkPartition(t, func(cli *clientv3.Client, ctx context.Context) error {
 | |
| 		_, err := cli.Delete(ctx, "a")
 | |
| 		if err == context.DeadlineExceeded || isServerCtxTimeout(err) || err == rpctypes.ErrTimeout {
 | |
| 			return errExpected
 | |
| 		}
 | |
| 		return err
 | |
| 	}, time.Second)
 | |
| }
 | |
| 
 | |
| func TestBalancerUnderNetworkPartitionTxn(t *testing.T) {
 | |
| 	testBalancerUnderNetworkPartition(t, func(cli *clientv3.Client, ctx context.Context) error {
 | |
| 		_, err := cli.Txn(ctx).
 | |
| 			If(clientv3.Compare(clientv3.Version("foo"), "=", 0)).
 | |
| 			Then(clientv3.OpPut("foo", "bar")).
 | |
| 			Else(clientv3.OpPut("foo", "baz")).Commit()
 | |
| 		if err == context.DeadlineExceeded || isServerCtxTimeout(err) || err == rpctypes.ErrTimeout {
 | |
| 			return errExpected
 | |
| 		}
 | |
| 		return err
 | |
| 	}, time.Second)
 | |
| }
 | |
| 
 | |
| // TestBalancerUnderNetworkPartitionLinearizableGetWithLongTimeout tests
 | |
| // when one member becomes isolated, first quorum Get request succeeds
 | |
| // by switching endpoints within the timeout (long enough to cover endpoint switch).
 | |
| func TestBalancerUnderNetworkPartitionLinearizableGetWithLongTimeout(t *testing.T) {
 | |
| 	testBalancerUnderNetworkPartition(t, func(cli *clientv3.Client, ctx context.Context) error {
 | |
| 		_, err := cli.Get(ctx, "a")
 | |
| 		return err
 | |
| 	}, 7*time.Second)
 | |
| }
 | |
| 
 | |
| // TestBalancerUnderNetworkPartitionLinearizableGetWithShortTimeout tests
 | |
| // when one member becomes isolated, first quorum Get request fails,
 | |
| // and following retry succeeds with client balancer switching to others.
 | |
| func TestBalancerUnderNetworkPartitionLinearizableGetWithShortTimeout(t *testing.T) {
 | |
| 	testBalancerUnderNetworkPartition(t, func(cli *clientv3.Client, ctx context.Context) error {
 | |
| 		_, err := cli.Get(ctx, "a")
 | |
| 		if err == context.DeadlineExceeded || isServerCtxTimeout(err) {
 | |
| 			return errExpected
 | |
| 		}
 | |
| 		return err
 | |
| 	}, time.Second)
 | |
| }
 | |
| 
 | |
| func TestBalancerUnderNetworkPartitionSerializableGet(t *testing.T) {
 | |
| 	testBalancerUnderNetworkPartition(t, func(cli *clientv3.Client, ctx context.Context) error {
 | |
| 		_, err := cli.Get(ctx, "a", clientv3.WithSerializable())
 | |
| 		return err
 | |
| 	}, time.Second)
 | |
| }
 | |
| 
 | |
| func testBalancerUnderNetworkPartition(t *testing.T, op func(*clientv3.Client, context.Context) error, timeout time.Duration) {
 | |
| 	defer testutil.AfterTest(t)
 | |
| 
 | |
| 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{
 | |
| 		Size:               3,
 | |
| 		SkipCreatingClient: true,
 | |
| 	})
 | |
| 	defer clus.Terminate(t)
 | |
| 
 | |
| 	eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()}
 | |
| 
 | |
| 	// expect pin eps[0]
 | |
| 	ccfg := clientv3.Config{
 | |
| 		Endpoints:   []string{eps[0]},
 | |
| 		DialTimeout: 3 * time.Second,
 | |
| 	}
 | |
| 	cli, err := clientv3.New(ccfg)
 | |
| 	if err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	defer cli.Close()
 | |
| 
 | |
| 	// wait for eps[0] to be pinned
 | |
| 	mustWaitPinReady(t, cli)
 | |
| 
 | |
| 	// add other endpoints for later endpoint switch
 | |
| 	cli.SetEndpoints(eps...)
 | |
| 	clus.Members[0].InjectPartition(t, clus.Members[1:]...)
 | |
| 
 | |
| 	for i := 0; i < 2; i++ {
 | |
| 		ctx, cancel := context.WithTimeout(context.Background(), timeout)
 | |
| 		err = op(cli, ctx)
 | |
| 		cancel()
 | |
| 		if err == nil {
 | |
| 			break
 | |
| 		}
 | |
| 		if err != errExpected {
 | |
| 			t.Errorf("#%d: expected %v, got %v", i, errExpected, err)
 | |
| 		}
 | |
| 		// give enough time for endpoint switch
 | |
| 		// TODO: remove random sleep by syncing directly with balancer
 | |
| 		if i == 0 {
 | |
| 			time.Sleep(5 * time.Second)
 | |
| 		}
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		t.Errorf("balancer did not switch in time (%v)", err)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TestBalancerUnderNetworkPartitionLinearizableGetLeaderElection ensures balancer
 | |
| // switches endpoint when leader fails and linearizable get requests returns
 | |
| // "etcdserver: request timed out".
 | |
| func TestBalancerUnderNetworkPartitionLinearizableGetLeaderElection(t *testing.T) {
 | |
| 	defer testutil.AfterTest(t)
 | |
| 
 | |
| 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{
 | |
| 		Size:               3,
 | |
| 		SkipCreatingClient: true,
 | |
| 	})
 | |
| 	defer clus.Terminate(t)
 | |
| 	eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()}
 | |
| 
 | |
| 	lead := clus.WaitLeader(t)
 | |
| 
 | |
| 	timeout := 3 * clus.Members[(lead+1)%2].ServerConfig.ReqTimeout()
 | |
| 
 | |
| 	cli, err := clientv3.New(clientv3.Config{
 | |
| 		Endpoints:   []string{eps[(lead+1)%2]},
 | |
| 		DialTimeout: 1 * time.Second,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	defer cli.Close()
 | |
| 
 | |
| 	// wait for non-leader to be pinned
 | |
| 	mustWaitPinReady(t, cli)
 | |
| 
 | |
| 	// add all eps to list, so that when the original pined one fails
 | |
| 	// the client can switch to other available eps
 | |
| 	cli.SetEndpoints(eps[lead], eps[(lead+1)%2])
 | |
| 
 | |
| 	// isolate leader
 | |
| 	clus.Members[lead].InjectPartition(t, clus.Members[(lead+1)%3], clus.Members[(lead+2)%3])
 | |
| 
 | |
| 	// expects balancer endpoint switch while ongoing leader election
 | |
| 	ctx, cancel := context.WithTimeout(context.TODO(), timeout)
 | |
| 	_, err = cli.Get(ctx, "a")
 | |
| 	cancel()
 | |
| 	if err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func TestBalancerUnderNetworkPartitionWatchLeader(t *testing.T) {
 | |
| 	testBalancerUnderNetworkPartitionWatch(t, true)
 | |
| }
 | |
| 
 | |
| func TestBalancerUnderNetworkPartitionWatchFollower(t *testing.T) {
 | |
| 	testBalancerUnderNetworkPartitionWatch(t, false)
 | |
| }
 | |
| 
 | |
| // testBalancerUnderNetworkPartitionWatch ensures watch stream
 | |
| // to a partitioned node be closed when context requires leader.
 | |
| func testBalancerUnderNetworkPartitionWatch(t *testing.T, isolateLeader bool) {
 | |
| 	defer testutil.AfterTest(t)
 | |
| 
 | |
| 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{
 | |
| 		Size:               3,
 | |
| 		SkipCreatingClient: true,
 | |
| 	})
 | |
| 	defer clus.Terminate(t)
 | |
| 
 | |
| 	eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr(), clus.Members[2].GRPCAddr()}
 | |
| 
 | |
| 	target := clus.WaitLeader(t)
 | |
| 	if !isolateLeader {
 | |
| 		target = (target + 1) % 3
 | |
| 	}
 | |
| 
 | |
| 	// pin eps[target]
 | |
| 	watchCli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[target]}})
 | |
| 	if err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	defer watchCli.Close()
 | |
| 
 | |
| 	// wait for eps[target] to be pinned
 | |
| 	mustWaitPinReady(t, watchCli)
 | |
| 
 | |
| 	// add all eps to list, so that when the original pined one fails
 | |
| 	// the client can switch to other available eps
 | |
| 	watchCli.SetEndpoints(eps...)
 | |
| 
 | |
| 	wch := watchCli.Watch(clientv3.WithRequireLeader(context.Background()), "foo", clientv3.WithCreatedNotify())
 | |
| 	select {
 | |
| 	case <-wch:
 | |
| 	case <-time.After(integration.RequestWaitTimeout):
 | |
| 		t.Fatal("took too long to create watch")
 | |
| 	}
 | |
| 
 | |
| 	// isolate eps[target]
 | |
| 	clus.Members[target].InjectPartition(t,
 | |
| 		clus.Members[(target+1)%3],
 | |
| 		clus.Members[(target+2)%3],
 | |
| 	)
 | |
| 
 | |
| 	select {
 | |
| 	case ev := <-wch:
 | |
| 		if len(ev.Events) != 0 {
 | |
| 			t.Fatal("expected no event")
 | |
| 		}
 | |
| 		if err = ev.Err(); err != rpctypes.ErrNoLeader {
 | |
| 			t.Fatalf("expected %v, got %v", rpctypes.ErrNoLeader, err)
 | |
| 		}
 | |
| 	case <-time.After(integration.RequestWaitTimeout): // enough time to detect leader lost
 | |
| 		t.Fatal("took too long to detect leader lost")
 | |
| 	}
 | |
| }
 | 
