mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
integration: add TestKVForLearner
Adding TestKVForLearner. Also adding test utility functions for clientv3 integration tests.
This commit is contained in:
parent
43ed94f769
commit
57a11eb1e1
@ -16,7 +16,6 @@ package integration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
@ -204,27 +203,11 @@ func TestMemberAddForLearner(t *testing.T) {
|
||||
t.Errorf("Added a member as learner, got resp.Member.IsLearner = %v", resp.Member.IsLearner)
|
||||
}
|
||||
|
||||
numOfLearners, err := getNumberOfLearners(clus)
|
||||
learners, err := clus.GetLearnerMembers()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get the number of learners in cluster: %v", err)
|
||||
t.Fatalf("failed to get the learner members in cluster: %v", err)
|
||||
}
|
||||
if numOfLearners != 1 {
|
||||
t.Errorf("Added 1 learner node to cluster, got %d", numOfLearners)
|
||||
if len(learners) != 1 {
|
||||
t.Errorf("Added 1 learner node to cluster, got %d", len(learners))
|
||||
}
|
||||
}
|
||||
|
||||
// getNumberOfLearners return the number of learner nodes in cluster using MemberList API
|
||||
func getNumberOfLearners(clus *integration.ClusterV3) (int, error) {
|
||||
cli := clus.RandClient()
|
||||
resp, err := cli.MemberList(context.Background())
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to list member %v", err)
|
||||
}
|
||||
numberOfLearners := 0
|
||||
for _, m := range resp.Members {
|
||||
if m.IsLearner {
|
||||
numberOfLearners++
|
||||
}
|
||||
}
|
||||
return numberOfLearners, nil
|
||||
}
|
||||
|
@ -971,3 +971,79 @@ func TestKVLargeRequests(t *testing.T) {
|
||||
clus.Terminate(t)
|
||||
}
|
||||
}
|
||||
|
||||
// TestKVForLearner ensures learner member only accepts serializable read request.
|
||||
func TestKVForLearner(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
// we have to add and launch learner member after initial cluster was created, because
|
||||
// bootstrapping a cluster with learner member is not supported.
|
||||
clus.AddAndLaunchLearnerMember(t)
|
||||
|
||||
learners, err := clus.GetLearnerMembers()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get the learner members in cluster: %v", err)
|
||||
}
|
||||
if len(learners) != 1 {
|
||||
t.Fatalf("added 1 learner to cluster, got %d", len(learners))
|
||||
}
|
||||
|
||||
if len(clus.Members) != 4 {
|
||||
t.Fatalf("expecting 4 members in cluster after adding the learner member, got %d", len(clus.Members))
|
||||
}
|
||||
// note:
|
||||
// 1. clus.Members[3] is the newly added learner member, which was appended to clus.Members
|
||||
// 2. we are using member's grpcAddr instead of clientURLs as the endpoint for clientv3.Config,
|
||||
// because the implementation of integration test has diverged from embed/etcd.go.
|
||||
learnerEp := clus.Members[3].GRPCAddr()
|
||||
cfg := clientv3.Config{
|
||||
Endpoints: []string{learnerEp},
|
||||
DialTimeout: 5 * time.Second,
|
||||
DialOptions: []grpc.DialOption{grpc.WithBlock()},
|
||||
}
|
||||
// this cli only has endpoint of the learner member
|
||||
cli, err := clientv3.New(cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create clientv3: %v", err)
|
||||
}
|
||||
defer cli.Close()
|
||||
|
||||
tests := []struct {
|
||||
op clientv3.Op
|
||||
wErr bool
|
||||
}{
|
||||
{
|
||||
op: clientv3.OpGet("foo", clientv3.WithSerializable()),
|
||||
wErr: false,
|
||||
},
|
||||
{
|
||||
op: clientv3.OpGet("foo"),
|
||||
wErr: true,
|
||||
},
|
||||
{
|
||||
op: clientv3.OpPut("foo", "bar"),
|
||||
wErr: true,
|
||||
},
|
||||
{
|
||||
op: clientv3.OpDelete("foo"),
|
||||
wErr: true,
|
||||
},
|
||||
{
|
||||
op: clientv3.OpTxn([]clientv3.Cmp{clientv3.Compare(clientv3.CreateRevision("foo"), "=", 0)}, nil, nil),
|
||||
wErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for idx, test := range tests {
|
||||
_, err := cli.Do(context.TODO(), test.op)
|
||||
if err != nil && !test.wErr {
|
||||
t.Errorf("%d: expect no error, got %v", idx, err)
|
||||
}
|
||||
if err == nil && test.wErr {
|
||||
t.Errorf("%d: expect error, got nil", idx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -559,6 +559,8 @@ type member struct {
|
||||
clientMaxCallSendMsgSize int
|
||||
clientMaxCallRecvMsgSize int
|
||||
useIP bool
|
||||
|
||||
isLearner bool
|
||||
}
|
||||
|
||||
func (m *member) GRPCAddr() string { return m.grpcAddr }
|
||||
@ -1272,3 +1274,121 @@ type grpcAPI struct {
|
||||
// Election is the election API for the client's connection.
|
||||
Election epb.ElectionClient
|
||||
}
|
||||
|
||||
// GetLearnerMembers returns the list of learner members in cluster using MemberList API.
|
||||
func (c *ClusterV3) GetLearnerMembers() ([]*pb.Member, error) {
|
||||
cli := c.Client(0)
|
||||
resp, err := cli.MemberList(context.Background())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list member %v", err)
|
||||
}
|
||||
var learners []*pb.Member
|
||||
for _, m := range resp.Members {
|
||||
if m.IsLearner {
|
||||
learners = append(learners, m)
|
||||
}
|
||||
}
|
||||
return learners, nil
|
||||
}
|
||||
|
||||
// AddAndLaunchLearnerMember creates a leaner member, adds it to cluster
|
||||
// via v3 MemberAdd API, and then launches the new member.
|
||||
func (c *ClusterV3) AddAndLaunchLearnerMember(t testing.TB) {
|
||||
m := c.mustNewMember(t)
|
||||
m.isLearner = true
|
||||
|
||||
scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
|
||||
peerURLs := []string{scheme + "://" + m.PeerListeners[0].Addr().String()}
|
||||
|
||||
cli := c.Client(0)
|
||||
_, err := cli.MemberAddAsLearner(context.Background(), peerURLs)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to add learner member %v", err)
|
||||
}
|
||||
|
||||
m.InitialPeerURLsMap = types.URLsMap{}
|
||||
for _, mm := range c.Members {
|
||||
m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
|
||||
}
|
||||
m.InitialPeerURLsMap[m.Name] = m.PeerURLs
|
||||
m.NewCluster = false
|
||||
|
||||
if err := m.Launch(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
c.Members = append(c.Members, m)
|
||||
|
||||
c.waitMembersMatch(t)
|
||||
}
|
||||
|
||||
// getMembers returns a list of members in cluster, in format of etcdserverpb.Member
|
||||
func (c *ClusterV3) getMembers() []*pb.Member {
|
||||
var mems []*pb.Member
|
||||
for _, m := range c.Members {
|
||||
mem := &pb.Member{
|
||||
Name: m.Name,
|
||||
PeerURLs: m.PeerURLs.StringSlice(),
|
||||
ClientURLs: m.ClientURLs.StringSlice(),
|
||||
IsLearner: m.isLearner,
|
||||
}
|
||||
mems = append(mems, mem)
|
||||
}
|
||||
return mems
|
||||
}
|
||||
|
||||
// waitMembersMatch waits until v3rpc MemberList returns the 'same' members info as the
|
||||
// local 'c.Members', which is the local recording of members in the testing cluster. With
|
||||
// the exception that the local recording c.Members does not have info on Member.ID, which
|
||||
// is generated when the member is been added to cluster.
|
||||
//
|
||||
// Note:
|
||||
// A successful match means the Member.clientURLs are matched. This means member has already
|
||||
// finished publishing its server attributes to cluster. Publishing attributes is a cluster-wide
|
||||
// write request (in v2 server). Therefore, at this point, any raft log entries prior to this
|
||||
// would have already been applied.
|
||||
//
|
||||
// If a new member was added to an existing cluster, at this point, it has finished publishing
|
||||
// its own server attributes to the cluster. And therefore by the same argument, it has already
|
||||
// applied the raft log entries (especially those of type raftpb.ConfChangeType). At this point,
|
||||
// the new member has the correct view of the cluster configuration.
|
||||
//
|
||||
// Special note on learner member:
|
||||
// Learner member is only added to a cluster via v3rpc MemberAdd API (as of v3.4). When starting
|
||||
// the learner member, its initial view of the cluster created by peerURLs map does not have info
|
||||
// on whether or not the new member itself is learner. But at this point, a successful match does
|
||||
// indicate that the new learner member has applied the raftpb.ConfChangeAddLearnerNode entry
|
||||
// which was used to add the learner itself to the cluster, and therefore it has the correct info
|
||||
// on learner.
|
||||
func (c *ClusterV3) waitMembersMatch(t testing.TB) {
|
||||
wMembers := c.getMembers()
|
||||
sort.Sort(SortableProtoMemberSliceByPeerURLs(wMembers))
|
||||
cli := c.Client(0)
|
||||
for {
|
||||
resp, err := cli.MemberList(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("failed to list member %v", err)
|
||||
}
|
||||
|
||||
if len(resp.Members) != len(wMembers) {
|
||||
continue
|
||||
}
|
||||
sort.Sort(SortableProtoMemberSliceByPeerURLs(resp.Members))
|
||||
for _, m := range resp.Members {
|
||||
m.ID = 0
|
||||
}
|
||||
if reflect.DeepEqual(resp.Members, wMembers) {
|
||||
return
|
||||
}
|
||||
|
||||
time.Sleep(tickDuration)
|
||||
}
|
||||
}
|
||||
|
||||
type SortableProtoMemberSliceByPeerURLs []*pb.Member
|
||||
|
||||
func (p SortableProtoMemberSliceByPeerURLs) Len() int { return len(p) }
|
||||
func (p SortableProtoMemberSliceByPeerURLs) Less(i, j int) bool {
|
||||
return p[i].PeerURLs[0] < p[j].PeerURLs[0]
|
||||
}
|
||||
func (p SortableProtoMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
||||
|
Loading…
x
Reference in New Issue
Block a user