From 08ea9cb756245ff35c2cfb73a4dfb65cb85c03b4 Mon Sep 17 00:00:00 2001 From: wpedrak Date: Wed, 31 Mar 2021 11:04:19 +0200 Subject: [PATCH] etcdserver: integration test covering first commit in current term notification --- tests/go.mod | 1 + tests/go.sum | 1 + tests/integration/v3_leadership_test.go | 82 ++++++++++++++++++++++++- 3 files changed, 83 insertions(+), 1 deletion(-) diff --git a/tests/go.mod b/tests/go.mod index 4a43b433c..1e28eb328 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -37,6 +37,7 @@ require ( go.etcd.io/etcd/server/v3 v3.5.0-alpha.0 go.uber.org/zap v1.16.0 golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 + golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba google.golang.org/grpc v1.36.1 gopkg.in/yaml.v2 v2.3.0 diff --git a/tests/go.sum b/tests/go.sum index e8e94809b..891b40c59 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -367,6 +367,7 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/tests/integration/v3_leadership_test.go b/tests/integration/v3_leadership_test.go index 7fd5ff1ca..72aaf46df 100644 --- a/tests/integration/v3_leadership_test.go +++ b/tests/integration/v3_leadership_test.go @@ -16,12 +16,14 @@ package integration import ( "context" + "fmt" "strings" "testing" "time" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + "golang.org/x/sync/errgroup" ) func TestMoveLeader(t *testing.T) { testMoveLeader(t, true) } @@ -36,7 +38,7 @@ func testMoveLeader(t *testing.T, auto bool) { oldLeadIdx := clus.WaitLeader(t) oldLeadID := uint64(clus.Members[oldLeadIdx].s.ID()) - // ensure followers go through leader transition while learship transfer + // ensure followers go through leader transition while leadership transfer idc := make(chan uint64) stopc := make(chan struct{}) defer close(stopc) @@ -179,3 +181,81 @@ func TestTransferLeadershipWithLearner(t *testing.T) { t.Error("timed out waiting for leader transition") } } + +func TestFirstCommitNotification(t *testing.T) { + BeforeTest(t) + clusterSize := 3 + cluster := NewClusterV3(t, &ClusterConfig{Size: clusterSize}) + defer cluster.Terminate(t) + + oldLeaderIdx := cluster.WaitLeader(t) + oldLeaderClient := cluster.Client(oldLeaderIdx) + + newLeaderIdx := (oldLeaderIdx + 1) % clusterSize + newLeaderId := uint64(cluster.Members[newLeaderIdx].ID()) + + notifiers := make(map[int]<-chan struct{}, clusterSize) + for i, clusterMember := range cluster.Members { + notifiers[i] = clusterMember.s.FirstCommitInTermNotify() + } + + _, err := oldLeaderClient.MoveLeader(context.Background(), newLeaderId) + + if err != nil { + t.Errorf("got error during leadership transfer: %v", err) + } + + leaderAppliedIndex := cluster.Members[newLeaderIdx].s.AppliedIndex() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + group, groupContext := errgroup.WithContext(ctx) + + for i, notifier := range notifiers { + member, notifier := cluster.Members[i], notifier + group.Go(func() error { + return checkFirstCommitNotification(groupContext, member, leaderAppliedIndex, notifier) + }) + } + + err = group.Wait() + if err != nil { + t.Error(err) + } +} + +func checkFirstCommitNotification( + ctx context.Context, + member *member, + leaderAppliedIndex uint64, + notifier <-chan struct{}, +) error { + // wait until server applies all the changes of leader + for member.s.AppliedIndex() < leaderAppliedIndex { + select { + case <-ctx.Done(): + return ctx.Err() + default: + time.Sleep(100 * time.Millisecond) + } + } + + select { + case msg, ok := <-notifier: + if ok { + return fmt.Errorf( + "member with ID %d got message via notifier, msg: %v", + member.ID(), + msg, + ) + } + default: + return fmt.Errorf( + "notification was not triggered, member ID: %d", + member.ID(), + ) + } + + return nil +}