From 4f57bb313f1c82942dcfb871a6286e4301bd8bd3 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 28 Jun 2016 15:14:45 -0700 Subject: [PATCH] clientv3: add grpc naming resolver --- clientv3/naming/grpc.go | 115 +++++++++++++++++++++++++++++++++++ clientv3/naming/grpc_test.go | 77 +++++++++++++++++++++++ 2 files changed, 192 insertions(+) create mode 100644 clientv3/naming/grpc.go create mode 100644 clientv3/naming/grpc_test.go diff --git a/clientv3/naming/grpc.go b/clientv3/naming/grpc.go new file mode 100644 index 000000000..100899ea0 --- /dev/null +++ b/clientv3/naming/grpc.go @@ -0,0 +1,115 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package naming + +import ( + "encoding/json" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/mvcc/mvccpb" + "golang.org/x/net/context" + "google.golang.org/grpc/naming" +) + +const ( + gRPCNamingPrefix = "/github.com/grpc/" +) + +// GRPCResolver creates a grpc.Watcher for a target to track its resolution changes. +type GRPCResolver struct { + // Client is an initialized etcd client + Client *clientv3.Client + // Timeout for update/delete request. + Timeout time.Duration +} + +func (gr *GRPCResolver) Add(target string, addr string, metadata interface{}) error { + update := naming.Update{ + Addr: addr, + Metadata: metadata, + } + val, err := json.Marshal(update) + if err != nil { + return err + } + + ctx := context.Background() + if gr.Timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), gr.Timeout) + defer cancel() + } + + _, err = gr.Client.KV.Put(ctx, gRPCNamingPrefix+target, string(val)) + return err +} + +func (gr *GRPCResolver) Delete(target string) error { + ctx := context.Background() + if gr.Timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), gr.Timeout) + defer cancel() + } + + _, err := gr.Client.Delete(ctx, gRPCNamingPrefix+target) + return err +} + +func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) { + cctx, cancel := context.WithCancel(context.Background()) + + wch := gr.Client.Watch(cctx, gRPCNamingPrefix+target) + + w := &gRPCWatcher{ + cancel: cancel, + wch: wch, + } + + return w, nil +} + +type gRPCWatcher struct { + cancel context.CancelFunc + wch clientv3.WatchChan +} + +func (gw *gRPCWatcher) Next() ([]*naming.Update, error) { + wr, ok := <-gw.wch + if !ok { + return nil, wr.Err() + } + + updates := make([]*naming.Update, 0, len(wr.Events)) + + for _, e := range wr.Events { + switch e.Type { + case mvccpb.PUT: + var jupdate naming.Update + err := json.Unmarshal(e.Kv.Value, &jupdate) + if err != nil { + continue + } + updates = append(updates, &jupdate) + case mvccpb.DELETE: + updates = append(updates, &naming.Update{Op: naming.Delete}) + } + } + + return updates, nil +} + +func (gw *gRPCWatcher) Close() { gw.cancel() } diff --git a/clientv3/naming/grpc_test.go b/clientv3/naming/grpc_test.go new file mode 100644 index 000000000..8d0248376 --- /dev/null +++ b/clientv3/naming/grpc_test.go @@ -0,0 +1,77 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package naming + +import ( + "reflect" + "testing" + + "google.golang.org/grpc/naming" + + "github.com/coreos/etcd/integration" + "github.com/coreos/etcd/pkg/testutil" +) + +func TestGRPCResolver(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + r := GRPCResolver{ + Client: clus.RandClient(), + } + + w, err := r.Resolve("foo") + if err != nil { + t.Fatal("failed to resolve foo", err) + } + defer w.Close() + + err = r.Add("foo", "127.0.0.1", "metadata") + if err != nil { + t.Fatal("failed to add foo", err) + } + + us, err := w.Next() + if err != nil { + t.Fatal("failed to get udpate", err) + } + + wu := &naming.Update{ + Op: naming.Add, + Addr: "127.0.0.1", + Metadata: "metadata", + } + + if !reflect.DeepEqual(us[0], wu) { + t.Fatalf("up = %#v, want %#v", us[0], wu) + } + + err = r.Delete("foo") + + us, err = w.Next() + if err != nil { + t.Fatal("failed to get udpate", err) + } + + wu = &naming.Update{ + Op: naming.Delete, + } + + if !reflect.DeepEqual(us[0], wu) { + t.Fatalf("up = %#v, want %#v", us[0], wu) + } +}