etcdserver: integration test covering first commit in current term notification

This commit is contained in:
wpedrak 2021-03-31 11:04:19 +02:00
parent 3991a8c9fa
commit 08ea9cb756
3 changed files with 83 additions and 1 deletions

View File

@ -37,6 +37,7 @@ require (
go.etcd.io/etcd/server/v3 v3.5.0-alpha.0
go.uber.org/zap v1.16.0
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/grpc v1.36.1
gopkg.in/yaml.v2 v2.3.0

View File

@ -367,6 +367,7 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View File

@ -16,12 +16,14 @@ package integration
import (
"context"
"fmt"
"strings"
"testing"
"time"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"golang.org/x/sync/errgroup"
)
func TestMoveLeader(t *testing.T) { testMoveLeader(t, true) }
@ -36,7 +38,7 @@ func testMoveLeader(t *testing.T, auto bool) {
oldLeadIdx := clus.WaitLeader(t)
oldLeadID := uint64(clus.Members[oldLeadIdx].s.ID())
// ensure followers go through leader transition while learship transfer
// ensure followers go through leader transition while leadership transfer
idc := make(chan uint64)
stopc := make(chan struct{})
defer close(stopc)
@ -179,3 +181,81 @@ func TestTransferLeadershipWithLearner(t *testing.T) {
t.Error("timed out waiting for leader transition")
}
}
func TestFirstCommitNotification(t *testing.T) {
BeforeTest(t)
clusterSize := 3
cluster := NewClusterV3(t, &ClusterConfig{Size: clusterSize})
defer cluster.Terminate(t)
oldLeaderIdx := cluster.WaitLeader(t)
oldLeaderClient := cluster.Client(oldLeaderIdx)
newLeaderIdx := (oldLeaderIdx + 1) % clusterSize
newLeaderId := uint64(cluster.Members[newLeaderIdx].ID())
notifiers := make(map[int]<-chan struct{}, clusterSize)
for i, clusterMember := range cluster.Members {
notifiers[i] = clusterMember.s.FirstCommitInTermNotify()
}
_, err := oldLeaderClient.MoveLeader(context.Background(), newLeaderId)
if err != nil {
t.Errorf("got error during leadership transfer: %v", err)
}
leaderAppliedIndex := cluster.Members[newLeaderIdx].s.AppliedIndex()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
group, groupContext := errgroup.WithContext(ctx)
for i, notifier := range notifiers {
member, notifier := cluster.Members[i], notifier
group.Go(func() error {
return checkFirstCommitNotification(groupContext, member, leaderAppliedIndex, notifier)
})
}
err = group.Wait()
if err != nil {
t.Error(err)
}
}
func checkFirstCommitNotification(
ctx context.Context,
member *member,
leaderAppliedIndex uint64,
notifier <-chan struct{},
) error {
// wait until server applies all the changes of leader
for member.s.AppliedIndex() < leaderAppliedIndex {
select {
case <-ctx.Done():
return ctx.Err()
default:
time.Sleep(100 * time.Millisecond)
}
}
select {
case msg, ok := <-notifier:
if ok {
return fmt.Errorf(
"member with ID %d got message via notifier, msg: %v",
member.ID(),
msg,
)
}
default:
return fmt.Errorf(
"notification was not triggered, member ID: %d",
member.ID(),
)
}
return nil
}