Merge pull request #13636 from serathius/remove-v2-client

Remove v2 server
This commit is contained in:
Marek Siarkowicz
2022-02-02 11:41:56 +01:00
committed by GitHub
30 changed files with 140 additions and 6753 deletions

View File

@@ -17,6 +17,7 @@ package integration
import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"log"
@@ -37,7 +38,6 @@ import (
"go.etcd.io/etcd/client/pkg/v3/tlsutil"
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/client/v2"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/grpc_testing"
"go.etcd.io/etcd/raft/v3"
@@ -58,6 +58,8 @@ import (
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/soheilhy/cmux"
"go.uber.org/zap"
"golang.org/x/crypto/bcrypt"
@@ -226,44 +228,20 @@ func (c *Cluster) Launch(t testutil.TB) {
}
}
// wait Cluster to be stable to receive future client requests
c.WaitMembersMatch(t, c.HTTPMembers())
c.WaitMembersMatch(t, c.ProtoMembers())
c.waitVersion()
for _, m := range c.Members {
t.Logf(" - %v -> %v (%v)", m.Name, m.ID(), m.GRPCURL())
}
}
func (c *Cluster) URL(i int) string {
return c.Members[i].ClientURLs[0].String()
}
// URLs returns a list of all active client URLs in the Cluster
func (c *Cluster) URLs() []string {
return getMembersURLs(c.Members)
}
func getMembersURLs(members []*Member) []string {
urls := make([]string, 0)
for _, m := range members {
select {
case <-m.Server.StopNotify():
continue
default:
}
for _, u := range m.ClientURLs {
urls = append(urls, u.String())
}
}
return urls
}
// HTTPMembers returns a list of all active members as client.Members
func (c *Cluster) HTTPMembers() []client.Member {
ms := []client.Member{}
// ProtoMembers returns a list of all active members as client.Members
func (c *Cluster) ProtoMembers() []*pb.Member {
ms := []*pb.Member{}
for _, m := range c.Members {
pScheme := SchemeFromTLSInfo(m.PeerTLSInfo)
cScheme := SchemeFromTLSInfo(m.ClientTLSInfo)
cm := client.Member{Name: m.Name}
cm := &pb.Member{Name: m.Name}
for _, ln := range m.PeerListeners {
cm.PeerURLs = append(cm.PeerURLs, pScheme+"://"+ln.Addr().String())
}
@@ -318,9 +296,8 @@ func (c *Cluster) addMember(t testutil.TB) types.URLs {
// send add request to the Cluster
var err error
for i := 0; i < len(c.Members); i++ {
clientURL := c.URL(i)
peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
if err = c.AddMemberByURL(t, clientURL, peerURL); err == nil {
if err = c.AddMemberByURL(t, c.Members[i].Client, peerURL); err == nil {
break
}
}
@@ -339,22 +316,20 @@ func (c *Cluster) addMember(t testutil.TB) types.URLs {
}
c.Members = append(c.Members, m)
// wait Cluster to be stable to receive future client requests
c.WaitMembersMatch(t, c.HTTPMembers())
c.WaitMembersMatch(t, c.ProtoMembers())
return m.PeerURLs
}
func (c *Cluster) AddMemberByURL(t testutil.TB, clientURL, peerURL string) error {
cc := MustNewHTTPClient(t, []string{clientURL}, c.Cfg.ClientTLS)
ma := client.NewMembersAPI(cc)
func (c *Cluster) AddMemberByURL(t testutil.TB, cc *clientv3.Client, peerURL string) error {
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
_, err := ma.Add(ctx, peerURL)
_, err := cc.MemberAdd(ctx, []string{peerURL})
cancel()
if err != nil {
return err
}
// wait for the add node entry applied in the Cluster
members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
members := append(c.ProtoMembers(), &pb.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
c.WaitMembersMatch(t, members)
return nil
}
@@ -364,18 +339,11 @@ func (c *Cluster) AddMember(t testutil.TB) types.URLs {
return c.addMember(t)
}
func (c *Cluster) MustRemoveMember(t testutil.TB, id uint64) {
if err := c.RemoveMember(t, id); err != nil {
t.Fatal(err)
}
}
func (c *Cluster) RemoveMember(t testutil.TB, id uint64) error {
func (c *Cluster) RemoveMember(t testutil.TB, cc *clientv3.Client, id uint64) error {
// send remove request to the Cluster
cc := MustNewHTTPClient(t, c.URLs(), c.Cfg.ClientTLS)
ma := client.NewMembersAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
err := ma.Remove(ctx, types.ID(id).String())
_, err := cc.MemberRemove(ctx, id)
cancel()
if err != nil {
return err
@@ -398,19 +366,29 @@ func (c *Cluster) RemoveMember(t testutil.TB, id uint64) error {
}
}
c.Members = newMembers
c.WaitMembersMatch(t, c.HTTPMembers())
c.WaitMembersMatch(t, c.ProtoMembers())
return nil
}
func (c *Cluster) WaitMembersMatch(t testutil.TB, membs []client.Member) {
for _, u := range c.URLs() {
cc := MustNewHTTPClient(t, []string{u}, c.Cfg.ClientTLS)
ma := client.NewMembersAPI(cc)
func (c *Cluster) WaitMembersMatch(t testutil.TB, membs []*pb.Member) {
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
defer cancel()
for _, m := range c.Members {
cc := ToGRPC(m.Client)
select {
case <-m.Server.StopNotify():
continue
default:
}
for {
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
ms, err := ma.List(ctx)
cancel()
if err == nil && isMembersEqual(ms, membs) {
resp, err := cc.Cluster.MemberList(ctx, &pb.MemberListRequest{Linearizable: false})
if errors.Is(err, context.DeadlineExceeded) {
t.Fatal(err)
}
if err != nil {
continue
}
if isMembersEqual(resp.Members, membs) {
break
}
time.Sleep(TickDuration)
@@ -429,13 +407,14 @@ func (c *Cluster) WaitMembersForLeader(t testutil.TB, membs []*Member) int {
for _, m := range membs {
possibleLead[uint64(m.Server.ID())] = true
}
cc := MustNewHTTPClient(t, getMembersURLs(membs), nil)
kapi := client.NewKeysAPI(cc)
cc, err := c.ClusterClient()
if err != nil {
t.Fatal(err)
}
// ensure leader is up via linearizable get
for {
ctx, cancel := context.WithTimeout(context.Background(), 10*TickDuration+time.Second)
_, err := kapi.Get(ctx, "0", &client.GetOptions{Quorum: true})
_, err := cc.Get(ctx, "0")
cancel()
if err == nil || strings.Contains(err.Error(), "Key not found") {
break
@@ -503,13 +482,10 @@ func (c *Cluster) waitVersion() {
// isMembersEqual checks whether two members equal except ID field.
// The given wmembs should always set ID field to empty string.
func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
func isMembersEqual(membs []*pb.Member, wmembs []*pb.Member) bool {
sort.Sort(SortableMemberSliceByPeerURLs(membs))
sort.Sort(SortableMemberSliceByPeerURLs(wmembs))
for i := range membs {
membs[i].ID = ""
}
return reflect.DeepEqual(membs, wmembs)
return cmp.Equal(membs, wmembs, cmpopts.IgnoreFields(pb.Member{}, "ID", "PeerURLs", "ClientURLs"))
}
func newLocalListener(t testutil.TB) net.Listener {
@@ -1075,11 +1051,9 @@ func (m *Member) WaitOK(t testutil.TB) {
}
func (m *Member) WaitStarted(t testutil.TB) {
cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
kapi := client.NewKeysAPI(cc)
for {
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
_, err := kapi.Get(ctx, "/", nil)
_, err := m.Client.Get(ctx, "/", clientv3.WithSerializable())
if err != nil {
time.Sleep(TickDuration)
continue
@@ -1323,29 +1297,7 @@ func (m *Member) ReadyNotify() <-chan struct{} {
return m.Server.ReadyNotify()
}
func MustNewHTTPClient(t testutil.TB, eps []string, tls *transport.TLSInfo) client.Client {
cfgtls := transport.TLSInfo{}
if tls != nil {
cfgtls = *tls
}
cfg := client.Config{Transport: mustNewTransport(t, cfgtls), Endpoints: eps}
c, err := client.New(cfg)
if err != nil {
t.Fatal(err)
}
return c
}
func mustNewTransport(t testutil.TB, tlsInfo transport.TLSInfo) *http.Transport {
// tick in integration test is short, so 1s dial timeout could play well.
tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
if err != nil {
t.Fatal(err)
}
return tr
}
type SortableMemberSliceByPeerURLs []client.Member
type SortableMemberSliceByPeerURLs []*pb.Member
func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) }
func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool {