From b47f721a98a37016ffab42a636156c66a7ff05bd Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 13 Jan 2016 11:07:29 -0800 Subject: [PATCH 1/3] integration: configure cluster with configCluster struct makes discovery, tls, and v3 explicitly part of the cluster information --- integration/cluster_test.go | 94 ++++++++++++++++++------------------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 75e1bbf04..6e10f1d4a 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -79,7 +79,7 @@ func testCluster(t *testing.T, size int) { func TestTLSClusterOf3(t *testing.T) { defer afterTest(t) - c := NewTLSCluster(t, 3) + c := NewClusterByConfig(t, &clusterConfig{size: 3, usePeerTLS: true}) c.Launch(t) defer c.Terminate(t) clusterMustProgress(t, c.Members) @@ -102,7 +102,10 @@ func testClusterUsingDiscovery(t *testing.T, size int) { } cancel() - c := NewClusterByDiscovery(t, size, dc.URL(0)+"/v2/keys") + c := NewClusterByConfig( + t, + &clusterConfig{size: size, discoveryURL: dc.URL(0) + "/v2/keys"}, + ) c.Launch(t) defer c.Terminate(t) clusterMustProgress(t, c.Members) @@ -122,7 +125,12 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) { } cancel() - c := NewTLSClusterByDiscovery(t, 3, dc.URL(0)+"/v2/keys") + c := NewClusterByConfig(t, + &clusterConfig{ + size: 3, + usePeerTLS: true, + discoveryURL: dc.URL(0) + "/v2/keys"}, + ) c.Launch(t) defer c.Terminate(t) clusterMustProgress(t, c.Members) @@ -145,12 +153,12 @@ func testDoubleClusterSize(t *testing.T, size int) { func TestDoubleTLSClusterSizeOf3(t *testing.T) { defer afterTest(t) - c := NewTLSCluster(t, 3) + c := NewClusterByConfig(t, &clusterConfig{size: 3, usePeerTLS: true}) c.Launch(t) defer c.Terminate(t) for i := 0; i < 3; i++ { - c.AddTLSMember(t) + c.AddMember(t) } clusterMustProgress(t, c.Members) } @@ -336,14 +344,26 @@ func clusterMustProgress(t *testing.T, membs []*member) { } } -// TODO: support TLS +type clusterConfig struct { + size int + usePeerTLS bool + discoveryURL string + useV3 bool +} + type cluster struct { + cfg *clusterConfig Members []*member } -func fillClusterForMembers(ms []*member) error { +func (c *cluster) fillClusterForMembers() error { + if c.cfg.discoveryURL != "" { + // cluster will be discovered + return nil + } + addrs := make([]string, 0) - for _, m := range ms { + for _, m := range c.Members { scheme := "http" if !m.PeerTLSInfo.Empty() { scheme = "https" @@ -354,7 +374,7 @@ func fillClusterForMembers(ms []*member) error { } clusterStr := strings.Join(addrs, ",") var err error - for _, m := range ms { + for _, m := range c.Members { m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr) if err != nil { return err @@ -363,49 +383,31 @@ func fillClusterForMembers(ms []*member) error { return nil } -func newCluster(t *testing.T, size int, usePeerTLS bool) *cluster { - c := &cluster{} - ms := make([]*member, size) - for i := 0; i < size; i++ { - ms[i] = mustNewMember(t, c.name(i), usePeerTLS) +func newCluster(t *testing.T, cfg *clusterConfig) *cluster { + c := &cluster{cfg: cfg} + ms := make([]*member, cfg.size) + for i := 0; i < cfg.size; i++ { + ms[i] = mustNewMember(t, c.name(i), cfg.usePeerTLS) + ms[i].DiscoveryURL = cfg.discoveryURL + ms[i].V3demo = cfg.useV3 } c.Members = ms - if err := fillClusterForMembers(c.Members); err != nil { + if err := c.fillClusterForMembers(); err != nil { t.Fatal(err) } return c } -func newClusterByDiscovery(t *testing.T, size int, usePeerTLS bool, url string) *cluster { - c := &cluster{} - ms := make([]*member, size) - for i := 0; i < size; i++ { - ms[i] = mustNewMember(t, c.name(i), usePeerTLS) - ms[i].DiscoveryURL = url - } - c.Members = ms - return c -} - // NewCluster returns an unlaunched cluster of the given size which has been // set to use static bootstrap. func NewCluster(t *testing.T, size int) *cluster { - return newCluster(t, size, false) + return newCluster(t, &clusterConfig{size: size}) } -// NewClusterUsingDiscovery returns an unlaunched cluster of the given size -// which has been set to use the given url as discovery service to bootstrap. -func NewClusterByDiscovery(t *testing.T, size int, url string) *cluster { - return newClusterByDiscovery(t, size, false, url) -} - -func NewTLSCluster(t *testing.T, size int) *cluster { - return newCluster(t, size, true) -} - -func NewTLSClusterByDiscovery(t *testing.T, size int, url string) *cluster { - return newClusterByDiscovery(t, size, true, url) +// NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration +func NewClusterByConfig(t *testing.T, cfg *clusterConfig) *cluster { + return newCluster(t, cfg) } func (c *cluster) Launch(t *testing.T) { @@ -459,10 +461,12 @@ func (c *cluster) HTTPMembers() []client.Member { return ms } -func (c *cluster) addMember(t *testing.T, usePeerTLS bool) { - m := mustNewMember(t, c.name(rand.Int()), usePeerTLS) +func (c *cluster) addMember(t *testing.T) { + m := mustNewMember(t, c.name(rand.Int()), c.cfg.usePeerTLS) + m.V3demo = c.cfg.useV3 + scheme := "http" - if usePeerTLS { + if c.cfg.usePeerTLS { scheme = "https" } @@ -495,11 +499,7 @@ func (c *cluster) addMember(t *testing.T, usePeerTLS bool) { } func (c *cluster) AddMember(t *testing.T) { - c.addMember(t, false) -} - -func (c *cluster) AddTLSMember(t *testing.T) { - c.addMember(t, true) + c.addMember(t) } func (c *cluster) RemoveMember(t *testing.T, id uint64) { From 6949f052c45874674fe1757f59f27d9cc01a5bf9 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 13 Jan 2016 11:57:27 -0800 Subject: [PATCH 2/3] integration: add support for grpc server and client --- integration/cluster_test.go | 91 +++++++++++++++++++++++++++++++------ 1 file changed, 78 insertions(+), 13 deletions(-) diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 6e10f1d4a..346911948 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -33,13 +33,16 @@ import ( "github.com/coreos/etcd/client" "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api/v3rpc" "github.com/coreos/etcd/etcdserver/etcdhttp" + "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/rafthttp" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" ) const ( @@ -349,6 +352,7 @@ type clusterConfig struct { usePeerTLS bool discoveryURL string useV3 bool + useGRPC bool } type cluster struct { @@ -387,9 +391,7 @@ func newCluster(t *testing.T, cfg *clusterConfig) *cluster { c := &cluster{cfg: cfg} ms := make([]*member, cfg.size) for i := 0; i < cfg.size; i++ { - ms[i] = mustNewMember(t, c.name(i), cfg.usePeerTLS) - ms[i].DiscoveryURL = cfg.discoveryURL - ms[i].V3demo = cfg.useV3 + ms[i] = c.mustNewMember(t) } c.Members = ms if err := c.fillClusterForMembers(); err != nil { @@ -461,9 +463,21 @@ func (c *cluster) HTTPMembers() []client.Member { return ms } -func (c *cluster) addMember(t *testing.T) { - m := mustNewMember(t, c.name(rand.Int()), c.cfg.usePeerTLS) +func (c *cluster) mustNewMember(t *testing.T) *member { + name := c.name(rand.Int()) + m := mustNewMember(t, name, c.cfg.usePeerTLS) + m.DiscoveryURL = c.cfg.discoveryURL m.V3demo = c.cfg.useV3 + if c.cfg.useGRPC { + if err := m.listenGRPC(); err != nil { + t.Fatal(err) + } + } + return m +} + +func (c *cluster) addMember(t *testing.T) { + m := c.mustNewMember(t) scheme := "http" if c.cfg.usePeerTLS { @@ -630,12 +644,16 @@ func newListenerWithAddr(t *testing.T, addr string) net.Listener { type member struct { etcdserver.ServerConfig PeerListeners, ClientListeners []net.Listener + grpcListener net.Listener // inited PeerTLSInfo implies to enable peer TLS PeerTLSInfo transport.TLSInfo raftHandler *testutil.PauseableHandler s *etcdserver.EtcdServer hss []*httptest.Server + + grpcServer *grpc.Server + grpcAddr string } // mustNewMember return an inited member with the given name. If usePeerTLS is @@ -694,6 +712,35 @@ func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member { return m } +// startGRPC starts a grpc server over a unix domain socket on the member +func (m *member) listenGRPC() error { + if m.V3demo == false { + return fmt.Errorf("starting grpc server without v3 configured") + } + m.grpcAddr = m.Name + ".sock" + if err := os.RemoveAll(m.grpcAddr); err != nil { + return err + } + l, err := net.Listen("unix", m.grpcAddr) + if err != nil { + return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err) + } + m.grpcListener = l + return nil +} + +// newGrpcClient creates a new grpc client connection to the member +func NewGRPCClient(m *member) (*grpc.ClientConn, error) { + if m.grpcAddr == "" { + return nil, fmt.Errorf("member not configured for grpc") + } + f := func(a string, t time.Duration) (net.Conn, error) { + return net.Dial("unix", a) + } + unixdialer := grpc.WithDialer(f) + return grpc.Dial(m.grpcAddr, unixdialer) +} + // Clone returns a member with the same server configuration. The returned // member will not set PeerListeners and ClientListeners. func (m *member) Clone(t *testing.T) *member { @@ -761,6 +808,12 @@ func (m *member) Launch() error { hs.Start() m.hss = append(m.hss, hs) } + if m.grpcListener != nil { + m.grpcServer = grpc.NewServer() + etcdserverpb.RegisterKVServer(m.grpcServer, v3rpc.NewKVServer(m.s)) + etcdserverpb.RegisterWatchServer(m.grpcServer, v3rpc.NewWatchServer(m.s)) + go m.grpcServer.Serve(m.grpcListener) + } return nil } @@ -794,17 +847,26 @@ func (m *member) Resume() { m.s.ResumeSending() } -// Stop stops the member, but the data dir of the member is preserved. -func (m *member) Stop(t *testing.T) { +// Close stops the member's etcdserver and closes its connections +func (m *member) Close() { + if m.grpcServer != nil { + m.grpcServer.Stop() + m.grpcServer = nil + } m.s.Stop() for _, hs := range m.hss { hs.CloseClientConnections() hs.Close() } +} + +// Stop stops the member, but the data dir of the member is preserved. +func (m *member) Stop(t *testing.T) { + m.Close() m.hss = nil } -// Start starts the member using the preserved data dir. +// Restart starts the member using the preserved data dir. func (m *member) Restart(t *testing.T) error { newPeerListeners := make([]net.Listener, 0) for _, ln := range m.PeerListeners { @@ -816,16 +878,19 @@ func (m *member) Restart(t *testing.T) error { newClientListeners = append(newClientListeners, newListenerWithAddr(t, ln.Addr().String())) } m.ClientListeners = newClientListeners + + if m.grpcListener != nil { + if err := m.listenGRPC(); err != nil { + t.Fatal(err) + } + } + return m.Launch() } // Terminate stops the member and removes the data dir. func (m *member) Terminate(t *testing.T) { - m.s.Stop() - for _, hs := range m.hss { - hs.CloseClientConnections() - hs.Close() - } + m.Close() if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil { t.Fatal(err) } From 53186da0a9560836de18184fa492a562ba6e446e Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 11 Jan 2016 13:56:51 -0800 Subject: [PATCH 3/3] integration: a few v3 grpc api tests --- integration/v3_grpc_test.go | 195 ++++++++++++++++++++++++++++++++++++ 1 file changed, 195 insertions(+) create mode 100644 integration/v3_grpc_test.go diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go new file mode 100644 index 000000000..003e6f8c7 --- /dev/null +++ b/integration/v3_grpc_test.go @@ -0,0 +1,195 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.package recipe +package integration + +import ( + "math/rand" + "reflect" + "testing" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +type clusterV3 struct { + *cluster + conns []*grpc.ClientConn +} + +// newClusterGRPC returns a launched cluster with a grpc client connection +// for each cluster member. +func newClusterGRPC(t *testing.T, cfg *clusterConfig) *clusterV3 { + cfg.useV3 = true + cfg.useGRPC = true + clus := &clusterV3{cluster: NewClusterByConfig(t, cfg)} + for _, m := range clus.Members { + conn, err := NewGRPCClient(m) + if err != nil { + t.Fatal(err) + } + clus.conns = append(clus.conns, conn) + } + clus.Launch(t) + return clus +} + +func (c *clusterV3) Terminate(t *testing.T) { + for _, conn := range c.conns { + if err := conn.Close(); err != nil { + t.Error(err) + } + } + c.cluster.Terminate(t) +} + +func (c *clusterV3) RandConn() *grpc.ClientConn { + return c.conns[rand.Intn(len(c.conns))] +} + +// TestV3PutOverwrite puts a key with the v3 api to a random cluster member, +// overwrites it, then checks that the change was applied. +func TestV3PutOverwrite(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + defer clus.Terminate(t) + + kvc := pb.NewKVClient(clus.RandConn()) + key := []byte("foo") + reqput := &pb.PutRequest{Key: key, Value: []byte("bar")} + + respput, err := kvc.Put(context.TODO(), reqput) + if err != nil { + t.Fatalf("couldn't put key (%v)", err) + } + + // overwrite + reqput.Value = []byte("baz") + respput2, err := kvc.Put(context.TODO(), reqput) + if err != nil { + t.Fatalf("couldn't put key (%v)", err) + } + if respput2.Header.Revision <= respput.Header.Revision { + t.Fatalf("expected newer revision on overwrite, got %v <= %v", + respput2.Header.Revision, respput.Header.Revision) + } + + reqrange := &pb.RangeRequest{Key: key} + resprange, err := kvc.Range(context.TODO(), reqrange) + if err != nil { + t.Fatalf("couldn't get key (%v)", err) + } + if len(resprange.Kvs) != 1 { + t.Fatalf("expected 1 key, got %v", len(resprange.Kvs)) + } + + kv := resprange.Kvs[0] + if kv.ModRevision <= kv.CreateRevision { + t.Errorf("expected modRev > createRev, got %d <= %d", + kv.ModRevision, kv.CreateRevision) + } + if !reflect.DeepEqual(reqput.Value, kv.Value) { + t.Errorf("expected value %v, got %v", reqput.Value, kv.Value) + } +} + +// TestV3DeleteRange tests various edge cases in teh DeleteRange API. +func TestV3DeleteRange(t *testing.T) { + tests := []struct { + keySet []string + begin string + end string + + wantSet [][]byte + }{ + // delete middle + { + []string{"foo", "foo/abc", "fop"}, + "foo/", "fop", + [][]byte{[]byte("foo"), []byte("fop")}, + }, + // no delete + { + []string{"foo", "foo/abc", "fop"}, + "foo/", "foo/", + [][]byte{[]byte("foo"), []byte("foo/abc"), []byte("fop")}, + }, + // delete first + { + []string{"foo", "foo/abc", "fop"}, + "fo", "fop", + [][]byte{[]byte("fop")}, + }, + // delete tail + { + []string{"foo", "foo/abc", "fop"}, + "foo/", "fos", + [][]byte{[]byte("foo")}, + }, + // delete exact + { + []string{"foo", "foo/abc", "fop"}, + "foo/abc", "", + [][]byte{[]byte("foo"), []byte("fop")}, + }, + // delete none, [x,x) + { + []string{"foo"}, + "foo", "foo", + [][]byte{[]byte("foo")}, + }, + } + + for i, tt := range tests { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + kvc := pb.NewKVClient(clus.RandConn()) + + ks := tt.keySet + for j := range ks { + reqput := &pb.PutRequest{Key: []byte(ks[j]), Value: []byte{}} + _, err := kvc.Put(context.TODO(), reqput) + if err != nil { + t.Fatalf("couldn't put key (%v)", err) + } + } + + dreq := &pb.DeleteRangeRequest{ + Key: []byte(tt.begin), + RangeEnd: []byte(tt.end)} + dresp, err := kvc.DeleteRange(context.TODO(), dreq) + if err != nil { + t.Fatalf("couldn't delete range on test %d (%v)", i, err) + } + + rreq := &pb.RangeRequest{Key: []byte{0x0}, RangeEnd: []byte{0xff}} + rresp, err := kvc.Range(context.TODO(), rreq) + if err != nil { + t.Errorf("couldn't get range on test %v (%v)", i, err) + } + if dresp.Header.Revision != rresp.Header.Revision { + t.Errorf("expected revision %v, got %v", + dresp.Header.Revision, rresp.Header.Revision) + } + + keys := [][]byte{} + for j := range rresp.Kvs { + keys = append(keys, rresp.Kvs[j].Key) + } + if reflect.DeepEqual(tt.wantSet, keys) == false { + t.Errorf("expected %v on test %v, got %v", tt.wantSet, i, keys) + } + + // can't defer because tcp ports will be in use + clus.Terminate(t) + } +}