From fd01db9e6033651926a9092a63b79fff923ef189 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 10 May 2017 12:19:09 -0700 Subject: [PATCH 1/3] grpcproxy, etcdmain: add lock and election services to proxy --- etcdmain/grpc_proxy.go | 6 ++++ proxy/grpcproxy/election.go | 65 +++++++++++++++++++++++++++++++++++++ proxy/grpcproxy/lock.go | 38 ++++++++++++++++++++++ 3 files changed, 109 insertions(+) create mode 100644 proxy/grpcproxy/election.go create mode 100644 proxy/grpcproxy/lock.go diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index ae5af8bbf..b2cc25c3a 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -24,6 +24,8 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/namespace" + "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb" + "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/debugutil" "github.com/coreos/etcd/pkg/transport" @@ -154,6 +156,8 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { leasep, _ := grpcproxy.NewLeaseProxy(client) mainp := grpcproxy.NewMaintenanceProxy(client) authp := grpcproxy.NewAuthProxy(client) + electionp := grpcproxy.NewElectionProxy(client) + lockp := grpcproxy.NewLockProxy(client) server := grpc.NewServer( grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), @@ -165,6 +169,8 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { pb.RegisterLeaseServer(server, leasep) pb.RegisterMaintenanceServer(server, mainp) pb.RegisterAuthServer(server, authp) + v3electionpb.RegisterElectionServer(server, electionp) + v3lockpb.RegisterLockServer(server, lockp) errc := make(chan error) diff --git a/proxy/grpcproxy/election.go b/proxy/grpcproxy/election.go new file mode 100644 index 000000000..27115a81d --- /dev/null +++ b/proxy/grpcproxy/election.go @@ -0,0 +1,65 @@ +// Copyright 2017 The etcd Lockors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcproxy + +import ( + "golang.org/x/net/context" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb" +) + +type electionProxy struct { + client *clientv3.Client +} + +func NewElectionProxy(client *clientv3.Client) v3electionpb.ElectionServer { + return &electionProxy{client: client} +} + +func (ep *electionProxy) Campaign(ctx context.Context, req *v3electionpb.CampaignRequest) (*v3electionpb.CampaignResponse, error) { + return v3electionpb.NewElectionClient(ep.client.ActiveConnection()).Campaign(ctx, req) +} + +func (ep *electionProxy) Proclaim(ctx context.Context, req *v3electionpb.ProclaimRequest) (*v3electionpb.ProclaimResponse, error) { + return v3electionpb.NewElectionClient(ep.client.ActiveConnection()).Proclaim(ctx, req) +} + +func (ep *electionProxy) Leader(ctx context.Context, req *v3electionpb.LeaderRequest) (*v3electionpb.LeaderResponse, error) { + return v3electionpb.NewElectionClient(ep.client.ActiveConnection()).Leader(ctx, req) +} + +func (ep *electionProxy) Observe(req *v3electionpb.LeaderRequest, s v3electionpb.Election_ObserveServer) error { + conn := ep.client.ActiveConnection() + ctx, cancel := context.WithCancel(s.Context()) + defer cancel() + sc, err := v3electionpb.NewElectionClient(conn).Observe(ctx, req) + if err != nil { + return err + } + for { + rr, err := sc.Recv() + if err != nil { + return err + } + if err = s.Send(rr); err != nil { + return err + } + } +} + +func (ep *electionProxy) Resign(ctx context.Context, req *v3electionpb.ResignRequest) (*v3electionpb.ResignResponse, error) { + return v3electionpb.NewElectionClient(ep.client.ActiveConnection()).Resign(ctx, req) +} diff --git a/proxy/grpcproxy/lock.go b/proxy/grpcproxy/lock.go new file mode 100644 index 000000000..804aff64a --- /dev/null +++ b/proxy/grpcproxy/lock.go @@ -0,0 +1,38 @@ +// Copyright 2017 The etcd Lockors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcproxy + +import ( + "golang.org/x/net/context" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb" +) + +type lockProxy struct { + client *clientv3.Client +} + +func NewLockProxy(client *clientv3.Client) v3lockpb.LockServer { + return &lockProxy{client: client} +} + +func (lp *lockProxy) Lock(ctx context.Context, req *v3lockpb.LockRequest) (*v3lockpb.LockResponse, error) { + return v3lockpb.NewLockClient(lp.client.ActiveConnection()).Lock(ctx, req) +} + +func (lp *lockProxy) Unlock(ctx context.Context, req *v3lockpb.UnlockRequest) (*v3lockpb.UnlockResponse, error) { + return v3lockpb.NewLockClient(lp.client.ActiveConnection()).Unlock(ctx, req) +} From 713e006bc68b4b7129be29d8d8ebebc64acec574 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 10 May 2017 12:51:05 -0700 Subject: [PATCH 2/3] adpater: adapters for lock and election services --- .../adapter/election_client_adapter.go | 79 +++++++++++++++++++ .../grpcproxy/adapter/lock_client_adapter.go | 36 +++++++++ 2 files changed, 115 insertions(+) create mode 100644 proxy/grpcproxy/adapter/election_client_adapter.go create mode 100644 proxy/grpcproxy/adapter/lock_client_adapter.go diff --git a/proxy/grpcproxy/adapter/election_client_adapter.go b/proxy/grpcproxy/adapter/election_client_adapter.go new file mode 100644 index 000000000..383c1b9d8 --- /dev/null +++ b/proxy/grpcproxy/adapter/election_client_adapter.go @@ -0,0 +1,79 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package adapter + +import ( + "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb" + + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +type es2ec struct{ es v3electionpb.ElectionServer } + +func ElectionServerToElectionClient(es v3electionpb.ElectionServer) v3electionpb.ElectionClient { + return &es2ec{es} +} + +func (s *es2ec) Campaign(ctx context.Context, r *v3electionpb.CampaignRequest, opts ...grpc.CallOption) (*v3electionpb.CampaignResponse, error) { + return s.es.Campaign(ctx, r) +} + +func (s *es2ec) Proclaim(ctx context.Context, r *v3electionpb.ProclaimRequest, opts ...grpc.CallOption) (*v3electionpb.ProclaimResponse, error) { + return s.es.Proclaim(ctx, r) +} + +func (s *es2ec) Leader(ctx context.Context, r *v3electionpb.LeaderRequest, opts ...grpc.CallOption) (*v3electionpb.LeaderResponse, error) { + return s.es.Leader(ctx, r) +} + +func (s *es2ec) Resign(ctx context.Context, r *v3electionpb.ResignRequest, opts ...grpc.CallOption) (*v3electionpb.ResignResponse, error) { + return s.es.Resign(ctx, r) +} + +func (s *es2ec) Observe(ctx context.Context, in *v3electionpb.LeaderRequest, opts ...grpc.CallOption) (v3electionpb.Election_ObserveClient, error) { + cs := newPipeStream(ctx, func(ss chanServerStream) error { + return s.es.Observe(in, &es2ecServerStream{ss}) + }) + return &es2ecClientStream{cs}, nil +} + +// es2ecClientStream implements Election_ObserveClient +type es2ecClientStream struct{ chanClientStream } + +// es2ecServerStream implements Election_ObserveServer +type es2ecServerStream struct{ chanServerStream } + +func (s *es2ecClientStream) Send(rr *v3electionpb.LeaderRequest) error { + return s.SendMsg(rr) +} +func (s *es2ecClientStream) Recv() (*v3electionpb.LeaderResponse, error) { + var v interface{} + if err := s.RecvMsg(&v); err != nil { + return nil, err + } + return v.(*v3electionpb.LeaderResponse), nil +} + +func (s *es2ecServerStream) Send(rr *v3electionpb.LeaderResponse) error { + return s.SendMsg(rr) +} +func (s *es2ecServerStream) Recv() (*v3electionpb.LeaderRequest, error) { + var v interface{} + if err := s.RecvMsg(&v); err != nil { + return nil, err + } + return v.(*v3electionpb.LeaderRequest), nil +} diff --git a/proxy/grpcproxy/adapter/lock_client_adapter.go b/proxy/grpcproxy/adapter/lock_client_adapter.go new file mode 100644 index 000000000..05e5cb020 --- /dev/null +++ b/proxy/grpcproxy/adapter/lock_client_adapter.go @@ -0,0 +1,36 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package adapter + +import ( + "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb" + + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +type ls2lsc struct{ ls v3lockpb.LockServer } + +func LockServerToLockClient(ls v3lockpb.LockServer) v3lockpb.LockClient { + return &ls2lsc{ls} +} + +func (s *ls2lsc) Lock(ctx context.Context, r *v3lockpb.LockRequest, opts ...grpc.CallOption) (*v3lockpb.LockResponse, error) { + return s.ls.Lock(ctx, r) +} + +func (s *ls2lsc) Unlock(ctx context.Context, r *v3lockpb.UnlockRequest, opts ...grpc.CallOption) (*v3lockpb.UnlockResponse, error) { + return s.ls.Unlock(ctx, r) +} From 00da3ca72596c3d6b6ed64bf7028bc01fc12ba6a Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 10 May 2017 12:53:39 -0700 Subject: [PATCH 3/3] integration: add lock and election services to proxy tests --- integration/cluster.go | 4 ++++ integration/cluster_direct.go | 4 ++++ integration/cluster_proxy.go | 4 ++++ integration/v3election_grpc_test.go | 4 ++-- integration/v3lock_grpc_test.go | 2 +- 5 files changed, 15 insertions(+), 3 deletions(-) diff --git a/integration/cluster.go b/integration/cluster.go index 7af9d77a1..3278bc232 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -935,4 +935,8 @@ type grpcAPI struct { Maintenance pb.MaintenanceClient // Auth is the authentication API for the client's connection. Auth pb.AuthClient + // Lock is the lock API for the client's connection. + Lock lockpb.LockClient + // Election is the election API for the client's connection. + Election epb.ElectionClient } diff --git a/integration/cluster_direct.go b/integration/cluster_direct.go index 84b2a796c..ff97e6146 100644 --- a/integration/cluster_direct.go +++ b/integration/cluster_direct.go @@ -18,6 +18,8 @@ package integration import ( "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb" + "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" ) @@ -29,6 +31,8 @@ func toGRPC(c *clientv3.Client) grpcAPI { pb.NewWatchClient(c.ActiveConnection()), pb.NewMaintenanceClient(c.ActiveConnection()), pb.NewAuthClient(c.ActiveConnection()), + v3lockpb.NewLockClient(c.ActiveConnection()), + v3electionpb.NewElectionClient(c.ActiveConnection()), } } diff --git a/integration/cluster_proxy.go b/integration/cluster_proxy.go index 8593b5064..3916553be 100644 --- a/integration/cluster_proxy.go +++ b/integration/cluster_proxy.go @@ -58,6 +58,8 @@ func toGRPC(c *clientv3.Client) grpcAPI { lp, lpch := grpcproxy.NewLeaseProxy(c) mp := grpcproxy.NewMaintenanceProxy(c) clp, _ := grpcproxy.NewClusterProxy(c, "", "") // without registering proxy URLs + lockp := grpcproxy.NewLockProxy(c) + electp := grpcproxy.NewElectionProxy(c) grpc := grpcAPI{ adapter.ClusterServerToClusterClient(clp), @@ -66,6 +68,8 @@ func toGRPC(c *clientv3.Client) grpcAPI { adapter.WatchServerToWatchClient(wp), adapter.MaintenanceServerToMaintenanceClient(mp), pb.NewAuthClient(c.ActiveConnection()), + adapter.LockServerToLockClient(lockp), + adapter.ElectionServerToElectionClient(electp), } proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch, kvdonec: kvpch, lpdonec: lpch} return grpc diff --git a/integration/v3election_grpc_test.go b/integration/v3election_grpc_test.go index b573b6e12..be320286e 100644 --- a/integration/v3election_grpc_test.go +++ b/integration/v3election_grpc_test.go @@ -41,7 +41,7 @@ func TestV3ElectionCampaign(t *testing.T) { t.Fatal(err2) } - lc := epb.NewElectionClient(clus.Client(0).ActiveConnection()) + lc := toGRPC(clus.Client(0)).Election req1 := &epb.CampaignRequest{Name: []byte("foo"), Lease: lease1.ID, Value: []byte("abc")} l1, lerr1 := lc.Campaign(context.TODO(), req1) if lerr1 != nil { @@ -94,7 +94,7 @@ func TestV3ElectionObserve(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 1}) defer clus.Terminate(t) - lc := epb.NewElectionClient(clus.Client(0).ActiveConnection()) + lc := toGRPC(clus.Client(0)).Election // observe leadership events observec := make(chan struct{}) diff --git a/integration/v3lock_grpc_test.go b/integration/v3lock_grpc_test.go index 04b7281d6..a66a8cf46 100644 --- a/integration/v3lock_grpc_test.go +++ b/integration/v3lock_grpc_test.go @@ -40,7 +40,7 @@ func TestV3LockLockWaiter(t *testing.T) { t.Fatal(err2) } - lc := lockpb.NewLockClient(clus.Client(0).ActiveConnection()) + lc := toGRPC(clus.Client(0)).Lock l1, lerr1 := lc.Lock(context.TODO(), &lockpb.LockRequest{Name: []byte("foo"), Lease: lease1.ID}) if lerr1 != nil { t.Fatal(lerr1)