diff --git a/proxy/grpcproxy/cluster.go b/proxy/grpcproxy/cluster.go index 8a2fa16c1..899fb9be6 100644 --- a/proxy/grpcproxy/cluster.go +++ b/proxy/grpcproxy/cluster.go @@ -15,38 +15,163 @@ package grpcproxy import ( + "fmt" + "os" + "sync" + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/naming" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "golang.org/x/net/context" + "golang.org/x/time/rate" + "google.golang.org/grpc" + gnaming "google.golang.org/grpc/naming" ) +// allow maximum 1 retry per second +const resolveRetryRate = 1 + type clusterProxy struct { - client *clientv3.Client + clus clientv3.Cluster + ctx context.Context + gr *naming.GRPCResolver + + // advertise client URL + advaddr string + prefix string + + umu sync.RWMutex + umap map[string]gnaming.Update } -func NewClusterProxy(c *clientv3.Client) pb.ClusterServer { - return &clusterProxy{ - client: c, +// NewClusterProxy takes optional prefix to fetch grpc-proxy member endpoints. +// The returned channel is closed when there is grpc-proxy endpoint registered +// and the client's context is canceled so the 'register' loop returns. +func NewClusterProxy(c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{}) { + cp := &clusterProxy{ + clus: c.Cluster, + ctx: c.Ctx(), + gr: &naming.GRPCResolver{Client: c}, + + advaddr: advaddr, + prefix: prefix, + umap: make(map[string]gnaming.Update), + } + + donec := make(chan struct{}) + if advaddr != "" && prefix != "" { + go func() { + defer close(donec) + cp.resolve(prefix) + }() + return cp, donec + } + + close(donec) + return cp, donec +} + +func (cp *clusterProxy) resolve(prefix string) { + rm := rate.NewLimiter(rate.Limit(resolveRetryRate), resolveRetryRate) + for rm.Wait(cp.ctx) == nil { + wa, err := cp.gr.Resolve(prefix) + if err != nil { + plog.Warningf("failed to resolve %q (%v)", prefix, err) + continue + } + cp.monitor(wa) + } +} + +func (cp *clusterProxy) monitor(wa gnaming.Watcher) { + for cp.ctx.Err() == nil { + ups, err := wa.Next() + if err != nil { + plog.Warningf("clusterProxy watcher error (%v)", err) + if grpc.ErrorDesc(err) == naming.ErrWatcherClosed.Error() { + return + } + } + + cp.umu.Lock() + for i := range ups { + switch ups[i].Op { + case gnaming.Add: + cp.umap[ups[i].Addr] = *ups[i] + case gnaming.Delete: + delete(cp.umap, ups[i].Addr) + } + } + cp.umu.Unlock() } } func (cp *clusterProxy) MemberAdd(ctx context.Context, r *pb.MemberAddRequest) (*pb.MemberAddResponse, error) { - conn := cp.client.ActiveConnection() - return pb.NewClusterClient(conn).MemberAdd(ctx, r) + mresp, err := cp.clus.MemberAdd(ctx, r.PeerURLs) + if err != nil { + return nil, err + } + resp := (pb.MemberAddResponse)(*mresp) + return &resp, err } func (cp *clusterProxy) MemberRemove(ctx context.Context, r *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) { - conn := cp.client.ActiveConnection() - return pb.NewClusterClient(conn).MemberRemove(ctx, r) + mresp, err := cp.clus.MemberRemove(ctx, r.ID) + if err != nil { + return nil, err + } + resp := (pb.MemberRemoveResponse)(*mresp) + return &resp, err } func (cp *clusterProxy) MemberUpdate(ctx context.Context, r *pb.MemberUpdateRequest) (*pb.MemberUpdateResponse, error) { - conn := cp.client.ActiveConnection() - return pb.NewClusterClient(conn).MemberUpdate(ctx, r) + mresp, err := cp.clus.MemberUpdate(ctx, r.ID, r.PeerURLs) + if err != nil { + return nil, err + } + resp := (pb.MemberUpdateResponse)(*mresp) + return &resp, err } -func (cp *clusterProxy) MemberList(ctx context.Context, r *pb.MemberListRequest) (*pb.MemberListResponse, error) { - conn := cp.client.ActiveConnection() - return pb.NewClusterClient(conn).MemberList(ctx, r) +func (cp *clusterProxy) membersFromUpdates() ([]*pb.Member, error) { + cp.umu.RLock() + defer cp.umu.RUnlock() + mbs := make([]*pb.Member, 0, len(cp.umap)) + for addr, upt := range cp.umap { + m, err := decodeMeta(fmt.Sprint(upt.Metadata)) + if err != nil { + return nil, err + } + mbs = append(mbs, &pb.Member{Name: m.Name, ClientURLs: []string{addr}}) + } + return mbs, nil +} + +// MemberList wraps member list API with following rules: +// - If 'advaddr' is not empty and 'prefix' is not empty, return registered member lists via resolver +// - If 'advaddr' is not empty and 'prefix' is not empty and registered grpc-proxy members haven't been fetched, return the 'advaddr' +// - If 'advaddr' is not empty and 'prefix' is empty, return 'advaddr' without forcing it to 'register' +// - If 'advaddr' is empty, forward to member list API +func (cp *clusterProxy) MemberList(ctx context.Context, r *pb.MemberListRequest) (*pb.MemberListResponse, error) { + if cp.advaddr != "" { + if cp.prefix != "" { + mbs, err := cp.membersFromUpdates() + if err != nil { + return nil, err + } + if len(mbs) > 0 { + return &pb.MemberListResponse{Members: mbs}, nil + } + } + // prefix is empty or no grpc-proxy members haven't been registered + hostname, _ := os.Hostname() + return &pb.MemberListResponse{Members: []*pb.Member{{Name: hostname, ClientURLs: []string{cp.advaddr}}}}, nil + } + mresp, err := cp.clus.MemberList(ctx) + if err != nil { + return nil, err + } + resp := (pb.MemberListResponse)(*mresp) + return &resp, err } diff --git a/proxy/grpcproxy/cluster_test.go b/proxy/grpcproxy/cluster_test.go new file mode 100644 index 000000000..031a95684 --- /dev/null +++ b/proxy/grpcproxy/cluster_test.go @@ -0,0 +1,121 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpcproxy + +import ( + "net" + "testing" + "time" + + "github.com/coreos/etcd/clientv3" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/integration" + "github.com/coreos/etcd/pkg/testutil" + + "golang.org/x/net/context" + "google.golang.org/grpc" +) + +func TestClusterProxyMemberList(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + cts := newClusterProxyServer([]string{clus.Members[0].GRPCAddr()}, t) + defer cts.close(t) + + cfg := clientv3.Config{ + Endpoints: []string{cts.caddr}, + DialTimeout: 5 * time.Second, + } + client, err := clientv3.New(cfg) + if err != nil { + t.Fatalf("err %v, want nil", err) + } + defer client.Close() + + // wait some time for register-loop to write keys + time.Sleep(time.Second) + + var mresp *clientv3.MemberListResponse + mresp, err = client.Cluster.MemberList(context.Background()) + if err != nil { + t.Fatalf("err %v, want nil", err) + } + + if len(mresp.Members) != 1 { + t.Fatalf("len(mresp.Members) expected 1, got %d (%+v)", len(mresp.Members), mresp.Members) + } + if len(mresp.Members[0].ClientURLs) != 1 { + t.Fatalf("len(mresp.Members[0].ClientURLs) expected 1, got %d (%+v)", len(mresp.Members[0].ClientURLs), mresp.Members[0].ClientURLs[0]) + } + if mresp.Members[0].ClientURLs[0] != cts.caddr { + t.Fatalf("mresp.Members[0].ClientURLs[0] expected %q, got %q", cts.caddr, mresp.Members[0].ClientURLs[0]) + } +} + +type clusterproxyTestServer struct { + cp pb.ClusterServer + c *clientv3.Client + server *grpc.Server + l net.Listener + donec <-chan struct{} + caddr string +} + +func (cts *clusterproxyTestServer) close(t *testing.T) { + cts.server.Stop() + cts.l.Close() + cts.c.Close() + select { + case <-cts.donec: + return + case <-time.After(5 * time.Second): + t.Fatalf("register-loop took too long to return") + } +} + +func newClusterProxyServer(endpoints []string, t *testing.T) *clusterproxyTestServer { + cfg := clientv3.Config{ + Endpoints: endpoints, + DialTimeout: 5 * time.Second, + } + client, err := clientv3.New(cfg) + if err != nil { + t.Fatal(err) + } + + cts := &clusterproxyTestServer{ + c: client, + } + cts.l, err = net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + var opts []grpc.ServerOption + cts.server = grpc.NewServer(opts...) + go cts.server.Serve(cts.l) + + // wait some time for free port 0 to be resolved + time.Sleep(500 * time.Millisecond) + + Register(client, "test-prefix", cts.l.Addr().String(), 7) + cts.cp, cts.donec = NewClusterProxy(client, cts.l.Addr().String(), "test-prefix") + cts.caddr = cts.l.Addr().String() + pb.RegisterClusterServer(cts.server, cts.cp) + + return cts +} diff --git a/proxy/grpcproxy/register.go b/proxy/grpcproxy/register.go index 31379b3bc..598c71f07 100644 --- a/proxy/grpcproxy/register.go +++ b/proxy/grpcproxy/register.go @@ -15,6 +15,9 @@ package grpcproxy import ( + "encoding/json" + "os" + "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" "github.com/coreos/etcd/clientv3/naming" @@ -26,10 +29,10 @@ import ( // allow maximum 1 retry per second const registerRetryRate = 1 -// register registers itself as a grpc-proxy server by writing prefixed-key +// Register registers itself as a grpc-proxy server by writing prefixed-key // with session of specified TTL (in seconds). The returned channel is closed // when the client's context is canceled. -func register(c *clientv3.Client, prefix string, addr string, ttl int) <-chan struct{} { +func Register(c *clientv3.Client, prefix string, addr string, ttl int) <-chan struct{} { rm := rate.NewLimiter(rate.Limit(registerRetryRate), registerRetryRate) donec := make(chan struct{}) @@ -65,10 +68,27 @@ func registerSession(c *clientv3.Client, prefix string, addr string, ttl int) (* } gr := &naming.GRPCResolver{Client: c} - if err = gr.Update(c.Ctx(), prefix, gnaming.Update{Op: gnaming.Add, Addr: addr}, clientv3.WithLease(ss.Lease())); err != nil { + if err = gr.Update(c.Ctx(), prefix, gnaming.Update{Op: gnaming.Add, Addr: addr, Metadata: getMeta()}, clientv3.WithLease(ss.Lease())); err != nil { return nil, err } plog.Infof("registered %q with %d-second lease", addr, ttl) return ss, nil } + +// meta represents metadata of proxy register. +type meta struct { + Name string `json:"name"` +} + +func getMeta() string { + hostname, _ := os.Hostname() + bts, _ := json.Marshal(meta{Name: hostname}) + return string(bts) +} + +func decodeMeta(s string) (meta, error) { + m := meta{} + err := json.Unmarshal([]byte(s), &m) + return m, err +} diff --git a/proxy/grpcproxy/register_test.go b/proxy/grpcproxy/register_test.go index 6ce87a269..7679e0e7b 100644 --- a/proxy/grpcproxy/register_test.go +++ b/proxy/grpcproxy/register_test.go @@ -26,7 +26,7 @@ import ( gnaming "google.golang.org/grpc/naming" ) -func Test_register(t *testing.T) { +func TestRegister(t *testing.T) { defer testutil.AfterTest(t) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) @@ -44,7 +44,7 @@ func Test_register(t *testing.T) { t.Fatalf("len(ups) expected 0, got %d (%v)", len(ups), ups) } - donec := register(cli, testPrefix, paddr, 5) + donec := Register(cli, testPrefix, paddr, 5) ups, err = wa.Next() if err != nil {