mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #15577 from jmhbnz/add-round-robin-test
tests: Add new test for round robin resolver
This commit is contained in:
commit
4485db379e
@ -18,6 +18,8 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
testpb "google.golang.org/grpc/test/grpc_testing"
|
||||
@ -93,14 +95,16 @@ func (ss *StubServer) Addr() string {
|
||||
|
||||
type dummyStubServer struct {
|
||||
testpb.UnimplementedTestServiceServer
|
||||
body []byte
|
||||
counter uint64
|
||||
}
|
||||
|
||||
func (d dummyStubServer) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
|
||||
func (d *dummyStubServer) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
|
||||
newCount := atomic.AddUint64(&d.counter, 1)
|
||||
|
||||
return &testpb.SimpleResponse{
|
||||
Payload: &testpb.Payload{
|
||||
Type: testpb.PayloadType_COMPRESSABLE,
|
||||
Body: d.body,
|
||||
Body: []byte(strconv.FormatUint(newCount, 10)),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
@ -108,5 +112,5 @@ func (d dummyStubServer) UnaryCall(context.Context, *testpb.SimpleRequest) (*tes
|
||||
// NewDummyStubServer creates a simple test server that serves Unary calls with
|
||||
// responses with the given payload.
|
||||
func NewDummyStubServer(body []byte) *StubServer {
|
||||
return New(dummyStubServer{body: body})
|
||||
return New(&dummyStubServer{})
|
||||
}
|
||||
|
@ -17,8 +17,9 @@ package naming_test
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.etcd.io/etcd/client/v3/naming/endpoints"
|
||||
@ -29,25 +30,23 @@ import (
|
||||
testpb "google.golang.org/grpc/test/grpc_testing"
|
||||
)
|
||||
|
||||
// This test mimics scenario described in grpc_naming.md doc.
|
||||
func testEtcdGrpcResolver(t *testing.T, lbPolicy string) {
|
||||
|
||||
func TestEtcdGrpcResolver(t *testing.T) {
|
||||
integration2.BeforeTest(t)
|
||||
|
||||
s1PayloadBody := []byte{'1'}
|
||||
s1 := grpc_testing.NewDummyStubServer(s1PayloadBody)
|
||||
// Setup two new dummy stub servers
|
||||
payloadBody := []byte{'1'}
|
||||
s1 := grpc_testing.NewDummyStubServer(payloadBody)
|
||||
if err := s1.Start(nil); err != nil {
|
||||
t.Fatal("failed to start dummy grpc server (s1)", err)
|
||||
}
|
||||
defer s1.Stop()
|
||||
|
||||
s2PayloadBody := []byte{'2'}
|
||||
s2 := grpc_testing.NewDummyStubServer(s2PayloadBody)
|
||||
s2 := grpc_testing.NewDummyStubServer(payloadBody)
|
||||
if err := s2.Start(nil); err != nil {
|
||||
t.Fatal("failed to start dummy grpc server (s2)", err)
|
||||
}
|
||||
defer s2.Stop()
|
||||
|
||||
// Create new cluster with endpoint manager with two endpoints
|
||||
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
@ -64,53 +63,88 @@ func TestEtcdGrpcResolver(t *testing.T) {
|
||||
t.Fatal("failed to add foo", err)
|
||||
}
|
||||
|
||||
err = em.AddEndpoint(context.TODO(), "foo/e2", e2)
|
||||
if err != nil {
|
||||
t.Fatal("failed to add foo", err)
|
||||
}
|
||||
|
||||
b, err := resolver.NewBuilder(clus.Client(1))
|
||||
if err != nil {
|
||||
t.Fatal("failed to new resolver builder", err)
|
||||
}
|
||||
|
||||
conn, err := grpc.Dial("etcd:///foo", grpc.WithInsecure(), grpc.WithResolvers(b))
|
||||
// Create connection with provided lb policy
|
||||
conn, err := grpc.Dial("etcd:///foo", grpc.WithInsecure(), grpc.WithResolvers(b),
|
||||
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, lbPolicy)))
|
||||
if err != nil {
|
||||
t.Fatal("failed to connect to foo", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// Send an initial request that should go to e1
|
||||
c := testpb.NewTestServiceClient(conn)
|
||||
resp, err := c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}, grpc.WaitForReady(true))
|
||||
if err != nil {
|
||||
t.Fatal("failed to invoke rpc to foo (e1)", err)
|
||||
}
|
||||
if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s1PayloadBody) {
|
||||
if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), payloadBody) {
|
||||
t.Fatalf("unexpected response from foo (e1): %s", resp.GetPayload().GetBody())
|
||||
}
|
||||
|
||||
em.DeleteEndpoint(context.TODO(), "foo/e1")
|
||||
em.AddEndpoint(context.TODO(), "foo/e2", e2)
|
||||
|
||||
// We use a loop with deadline of 30s to avoid test getting flake
|
||||
// as it's asynchronous for gRPC Client to update underlying connections.
|
||||
maxRetries := 300
|
||||
retryPeriod := 100 * time.Millisecond
|
||||
retries := 0
|
||||
for {
|
||||
time.Sleep(retryPeriod)
|
||||
retries++
|
||||
|
||||
resp, err = c.UnaryCall(context.TODO(), &testpb.SimpleRequest{})
|
||||
// Send more requests
|
||||
lastResponse := []byte{'1'}
|
||||
totalRequests := 100
|
||||
for i := 1; i < totalRequests; i++ {
|
||||
resp, err := c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}, grpc.WaitForReady(true))
|
||||
if err != nil {
|
||||
if retries < maxRetries {
|
||||
continue
|
||||
}
|
||||
t.Fatal("failed to invoke rpc to foo (e2)", err)
|
||||
t.Fatal("failed to invoke rpc to foo", err)
|
||||
}
|
||||
if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s2PayloadBody) {
|
||||
if retries < maxRetries {
|
||||
continue
|
||||
}
|
||||
t.Fatalf("unexpected response from foo (e2): %s", resp.GetPayload().GetBody())
|
||||
|
||||
t.Logf("Response: %v", string(resp.GetPayload().GetBody()))
|
||||
|
||||
if resp.GetPayload() == nil {
|
||||
t.Fatalf("unexpected response from foo: %s", resp.GetPayload().GetBody())
|
||||
}
|
||||
break
|
||||
lastResponse = resp.GetPayload().GetBody()
|
||||
}
|
||||
|
||||
// If the load balancing policy is pick first then return payload should equal number of requests
|
||||
t.Logf("Last response: %v", string(lastResponse))
|
||||
if lbPolicy == "pick_first" {
|
||||
if string(lastResponse) != "100" {
|
||||
t.Fatalf("unexpected total responses from foo: %s", string(lastResponse))
|
||||
}
|
||||
}
|
||||
|
||||
// If the load balancing policy is round robin we should see roughly half total requests served by each server
|
||||
if lbPolicy == "round_robin" {
|
||||
responses, err := strconv.Atoi(string(lastResponse))
|
||||
if err != nil {
|
||||
t.Fatalf("couldn't convert to int: %s", string(lastResponse))
|
||||
}
|
||||
|
||||
// Allow 10% tolerance as round robin is not perfect and we don't want the test to flake
|
||||
expected := float64(totalRequests) * 0.5
|
||||
assert.InEpsilon(t, float64(expected), float64(responses), 0.1, "unexpected total responses from foo: %s", string(lastResponse))
|
||||
}
|
||||
}
|
||||
|
||||
// TestEtcdGrpcResolverPickFirst mimics scenarios described in grpc_naming.md doc.
|
||||
func TestEtcdGrpcResolverPickFirst(t *testing.T) {
|
||||
|
||||
integration2.BeforeTest(t)
|
||||
|
||||
// Pick first is the default load balancer policy for grpc-go
|
||||
testEtcdGrpcResolver(t, "pick_first")
|
||||
}
|
||||
|
||||
// TestEtcdGrpcResolverRoundRobin mimics scenarios described in grpc_naming.md doc.
|
||||
func TestEtcdGrpcResolverRoundRobin(t *testing.T) {
|
||||
|
||||
integration2.BeforeTest(t)
|
||||
|
||||
// Round robin is a common alternative for more production oriented scenarios
|
||||
testEtcdGrpcResolver(t, "round_robin")
|
||||
}
|
||||
|
||||
func TestEtcdEndpointManager(t *testing.T) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user