Introduce grpc-1.30+ compatible client/v3/naming API.

This is not yet implementation, just API and tests to be filled
with implementation in next CLs,
tracked by: https://github.com/etcd-io/etcd/issues/12652

We propose here 3 packages:
 - clientv3/naming/endpoints ->
    That is abstraction layer over etcd that allows to write, read &
    watch Endpoints information. It's independent from GRPC API. It hides
    the storage details.

 - clientv3/naming/endpoints/internal ->
    That contains the grpc's compatible Update class to preserve the
    internal JSON mashalling format.

 - clientv3/naming/resolver ->
   That implements the GRPC resolver API, such that etcd can be
   used for connection.Dial in grpc.

Please see the grpc_naming.md document changes & grpcproxy/cluster.go
new integration, to see how the new abstractions work.

Signed-off-by: Chao Chen <chaochn@amazon.com>
This commit is contained in:
Piotr Tabor
2021-01-21 22:13:38 +01:00
committed by Chao Chen
parent 3663ae13fe
commit 6c0e4d97f1
12 changed files with 590 additions and 38 deletions

View File

@@ -0,0 +1,135 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package naming
import (
"context"
"reflect"
"testing"
etcd "go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/naming/endpoints"
"go.etcd.io/etcd/integration"
"go.etcd.io/etcd/pkg/testutil"
)
func TestEndpointManager(t *testing.T) {
t.Skip("Not implemented yet")
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
em, err := endpoints.NewManager(clus.RandClient(), "foo")
if err != nil {
t.Fatal("failed to create EndpointManager", err)
}
ctx, watchCancel := context.WithCancel(context.Background())
defer watchCancel()
w, err := em.NewWatchChannel(ctx)
if err != nil {
t.Fatal("failed to establish watch", err)
}
e1 := endpoints.Endpoint{Addr: "127.0.0.1", Metadata: "metadata"}
err = em.AddEndpoint(context.TODO(), "foo/a1", e1)
if err != nil {
t.Fatal("failed to add foo", err)
}
us := <-w
if us == nil {
t.Fatal("failed to get update", err)
}
wu := endpoints.Update{
Op: endpoints.Add,
Key: "foo/a1",
Endpoint: e1,
}
if !reflect.DeepEqual(us[0], wu) {
t.Fatalf("up = %#v, want %#v", us[0], wu)
}
err = em.DeleteEndpoint(context.TODO(), "foo/a1")
if err != nil {
t.Fatalf("failed to udpate %v", err)
}
us = <-w
if err != nil {
t.Fatalf("failed to get udpate %v", err)
}
wu = endpoints.Update{
Op: endpoints.Delete,
Key: "foo/a1",
}
if !reflect.DeepEqual(us, wu) {
t.Fatalf("up = %#v, want %#v", us[1], wu)
}
}
// TestEndpointManagerAtomicity ensures the resolver will initialize
// correctly with multiple hosts and correctly receive multiple
// updates in a single revision.
func TestEndpointManagerAtomicity(t *testing.T) {
t.Skip("Not implemented yet")
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
c := clus.RandClient()
em, err := endpoints.NewManager(c, "foo")
if err != nil {
t.Fatal("failed to create EndpointManager", err)
}
err = em.Update(context.TODO(), []*endpoints.UpdateWithOpts{
endpoints.NewAddUpdateOpts("foo/host", endpoints.Endpoint{Addr: "127.0.0.1:2000"}),
endpoints.NewAddUpdateOpts("foo/host2", endpoints.Endpoint{Addr: "127.0.0.1:2001"})})
if err != nil {
t.Fatal(err)
}
ctx, watchCancel := context.WithCancel(context.Background())
defer watchCancel()
w, err := em.NewWatchChannel(ctx)
if err != nil {
t.Fatal(err)
}
updates := <-w
if len(updates) != 2 {
t.Fatalf("expected two updates, got %+v", updates)
}
_, err = c.Txn(context.TODO()).Then(etcd.OpDelete("foo/host"), etcd.OpDelete("foo/host2")).Commit()
if err != nil {
t.Fatal(err)
}
updates = <-w
if len(updates) != 2 || (updates[0].Op != endpoints.Delete && updates[1].Op != endpoints.Delete) {
t.Fatalf("expected two delete updates, got %+v", updates)
}
}

View File

@@ -0,0 +1,70 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package naming
import (
"context"
"testing"
"google.golang.org/grpc"
"go.etcd.io/etcd/clientv3/naming/endpoints"
"go.etcd.io/etcd/clientv3/naming/resolver"
"go.etcd.io/etcd/integration"
"go.etcd.io/etcd/pkg/testutil"
)
// This test mimics scenario described in grpc_naming.md doc.
func TestEtcdGrpcResolver(t *testing.T) {
t.Skip("Not implemented yet")
defer testutil.AfterTest(t)
// s1 := // TODO: Dummy GRPC service listening on 127.0.0.1:20000
// s2 := // TODO: Dummy GRPC service listening on 127.0.0.1:20001
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)
em, err := endpoints.NewManager(clus.RandClient(), "foo")
if err != nil {
t.Fatal("failed to create EndpointManager", err)
}
e1 := endpoints.Endpoint{Addr: "127.0.0.1:20000"}
e2 := endpoints.Endpoint{Addr: "127.0.0.1:20001"}
err = em.AddEndpoint(context.TODO(), "foo/e1", e1)
if err != nil {
t.Fatal("failed to add foo", err)
}
etcdResolver, err := resolver.NewBuilder(clus.RandClient())
conn, err := grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver))
if err != nil {
t.Fatal("failed to connect to foo (e1)", err)
}
// TODO: send requests to conn, ensure s1 received it.
em.DeleteEndpoint(context.TODO(), "foo/e1")
em.AddEndpoint(context.TODO(), "foo/e2", e2)
// TODO: Send requests to conn and make sure s2 receive it.
// Might require restarting s1 to break the existing (open) connection.
conn.GetState() // this line is to avoid compiler warning that conn is unused.
}

View File

@@ -12,44 +12,47 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// Package naming provides an etcd-backed gRPC resolver for discovering gRPC services.
// Package naming provides:
// - subpackage endpoints: an abstraction layer to store and read endpoints
// information from etcd.
// - subpackage resolver: an etcd-backed gRPC resolver for discovering gRPC
// services based on the endpoints configuration
//
// To use, first import the packages:
//
// import (
// "go.etcd.io/etcd/clientv3"
// etcdnaming "go.etcd.io/etcd/clientv3/naming"
//
// "go.etcd.io/etcd/clientv3/naming/endpoints"
// "go.etcd.io/etcd/clientv3/naming/resolver"
// "google.golang.org/grpc"
// "google.golang.org/grpc/naming"
// )
//
// First, register new endpoint addresses for a service:
//
// func etcdAdd(c *clientv3.Client, service, addr string) error {
// r := &etcdnaming.GRPCResolver{Client: c}
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr})
// em := endpoints.NewManager(c, service)
// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr})
// }
//
// Dial an RPC service using the etcd gRPC resolver and a gRPC Balancer:
//
// func etcdDial(c *clientv3.Client, service string) (*grpc.ClientConn, error) {
// r := &etcdnaming.GRPCResolver{Client: c}
// b := grpc.RoundRobin(r)
// return grpc.Dial(service, grpc.WithBalancer(b))
// etcdResolver, err := resolver.NewBuilder(c);
// if err { return nil, err }
// return grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver))
// }
//
// Optionally, force delete an endpoint:
//
// func etcdDelete(c *clientv3, service, addr string) error {
// r := &etcdnaming.GRPCResolver{Client: c}
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Delete, Addr: "1.2.3.4"})
// em := endpoints.NewManager(c, service)
// return em.DeleteEndpoint(c.Ctx(), service+"/"+addr)
// }
//
// Or register an expiring endpoint with a lease:
//
// func etcdLeaseAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error {
// r := &etcdnaming.GRPCResolver{Client: c}
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr}, clientv3.WithLease(lid))
// func etcdAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error {
// em := endpoints.NewManager(c, service)
// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr}, clientv3.WithLease(lid))
// }
package naming

View File

@@ -0,0 +1,96 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package endpoints
import (
"context"
"go.etcd.io/etcd/clientv3"
)
// Endpoint represents a single address the connection can be established with.
//
// Inspired by: https://pkg.go.dev/google.golang.org/grpc/resolver#Address.
// Please document etcd version since which version each field is supported.
type Endpoint struct {
// Addr is the server address on which a connection will be established.
// Since etcd 3.1
Addr string
// Metadata is the information associated with Addr, which may be used
// to make load balancing decision.
// Since etcd 3.1
Metadata interface{}
}
type Operation uint8
const (
// Add indicates an Endpoint is added.
Add Operation = iota
// Delete indicates an existing address is deleted.
Delete
)
// Update describes a single edit action of an Endpoint.
type Update struct {
// Op - action Add or Delete.
Op Operation
Key string
Endpoint Endpoint
}
// WatchChannel is used to deliver notifications about endpoints updates.
type WatchChannel chan []*Update
// Key2EndpointMap maps etcd key into struct describing the endpoint.
type Key2EndpointMap map[string]Endpoint
// UpdateWithOpts describes endpoint update (add or delete) together
// with etcd options (e.g. to attach an endpoint to a lease).
type UpdateWithOpts struct {
Update
Opts []clientv3.OpOption
}
// NewAddUpdateOpts constructs UpdateWithOpts for endpoint registration.
func NewAddUpdateOpts(key string, endpoint Endpoint, opts ...clientv3.OpOption) *UpdateWithOpts {
return &UpdateWithOpts{Update: Update{Op: Add, Key: key, Endpoint: endpoint}, Opts: opts}
}
// NewDeleteUpdateOpts constructs UpdateWithOpts for endpoint deletion.
func NewDeleteUpdateOpts(key string, opts ...clientv3.OpOption) *UpdateWithOpts {
return &UpdateWithOpts{Update: Update{Op: Delete, Key: key}, Opts: opts}
}
// Manager can be used to add/remove & inspect endpoints stored in etcd for
// a particular target.
type Manager interface {
// Update allows to atomically add/remove a few endpoints from etcd.
Update(ctx context.Context, updates []*UpdateWithOpts) error
// AddEndpoint registers a single endpoint in etcd.
// For more advanced use-cases use the Update method.
AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error
// DeleteEndpoint deletes a single endpoint stored in etcd.
// For more advanced use-cases use the Update method.
DeleteEndpoint(ctx context.Context, key string, opts ...clientv3.OpOption) error
// List returns all the endpoints for the current target as a map.
List(ctx context.Context) (Key2EndpointMap, error)
// NewWatchChannel creates a channel that populates or endpoint updates.
// Cancel the 'ctx' to close the watcher.
NewWatchChannel(ctx context.Context) (WatchChannel, error)
}

View File

@@ -0,0 +1,135 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package endpoints
// TODO: The API is not yet implemented.
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/naming/endpoints/internal"
)
type endpointManager struct {
// TODO: To be implemented, tracked by: https://github.com/etcd-io/etcd/issues/12652
}
func NewManager(client *clientv3.Client, target string) (Manager, error) {
// To be implemented (https://github.com/etcd-io/etcd/issues/12652)
return nil, fmt.Errorf("Not implemented yet")
}
func (m *endpointManager) Update(ctx context.Context, updates []*UpdateWithOpts) error {
// TODO: For loop in a single transaction:
internalUpdate := &internal.Update{} // translate UpdateWithOpts into json format.
switch internalUpdate.Op {
//case internal.Add:
// var v []byte
// if v, err = json.Marshal(internalUpdate); err != nil {
// return status.Error(codes.InvalidArgument, err.Error())
// }
// _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
//case internal.Delete:
// _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
//default:
// return status.Error(codes.InvalidArgument, "naming: bad naming op")
}
return fmt.Errorf("Not implemented yet")
}
func (m *endpointManager) AddEndpoint(ctx context.Context, key string, endpoint Endpoint, opts ...clientv3.OpOption) error {
return m.Update(ctx, []*UpdateWithOpts{NewAddUpdateOpts(key, endpoint, opts...)})
}
func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts ...clientv3.OpOption) error {
return m.Update(ctx, []*UpdateWithOpts{NewDeleteUpdateOpts(key, opts...)})
}
func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
return nil, fmt.Errorf("Not implemented yet")
// TODO: Implementation to be inspired by:
// Next gets the next set of updates from the etcd resolver.
//// Calls to Next should be serialized; concurrent calls are not safe since
//// there is no way to reconcile the update ordering.
//func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
// if gw.wch == nil {
// // first Next() returns all addresses
// return gw.firstNext()
// }
// if gw.err != nil {
// return nil, gw.err
// }
//
// // process new events on target/*
// wr, ok := <-gw.wch
// if !ok {
// gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error())
// return nil, gw.err
// }
// if gw.err = wr.Err(); gw.err != nil {
// return nil, gw.err
// }
//
// updates := make([]*naming.Update, 0, len(wr.Events))
// for _, e := range wr.Events {
// var jupdate naming.Update
// var err error
// switch e.Type {
// case etcd.EventTypePut:
// err = json.Unmarshal(e.Kv.Value, &jupdate)
// jupdate.Op = naming.Add
// case etcd.EventTypeDelete:
// err = json.Unmarshal(e.PrevKv.Value, &jupdate)
// jupdate.Op = naming.Delete
// default:
// continue
// }
// if err == nil {
// updates = append(updates, &jupdate)
// }
// }
// return updates, nil
//}
//
//func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
// // Use serialized request so resolution still works if the target etcd
// // server is partitioned away from the quorum.
// resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
// if gw.err = err; err != nil {
// return nil, err
// }
//
// updates := make([]*naming.Update, 0, len(resp.Kvs))
// for _, kv := range resp.Kvs {
// var jupdate naming.Update
// if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
// continue
// }
// updates = append(updates, &jupdate)
// }
//
// opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
// gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
// return updates, nil
//}
}
func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) {
// TODO: Implementation
return nil, fmt.Errorf("Not implemented yet")
}

View File

@@ -0,0 +1,52 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
// Operation describes action performed on endpoint (addition vs deletion).
// Must stay JSON-format compatible with:
// https://pkg.go.dev/google.golang.org/grpc@v1.29.1/naming#Operation
type Operation uint8
const (
// Add indicates a new address is added.
Add Operation = iota
// Delete indicates an existing address is deleted.
Delete
)
// Update defines a persistent (JSON marshalled) format representing
// endpoint within the etcd storage.
//
// As the format can be persisted by one version of etcd client library and
// read by other the format must be kept backward compatible and
// in particular must be superset of the grpc(<=1.29.1) naming.Update structure:
// https://pkg.go.dev/google.golang.org/grpc@v1.29.1/naming#Update
//
// Please document since which version of etcd-client given property is supported.
// Please keep the naming consistent with e.g. https://pkg.go.dev/google.golang.org/grpc/resolver#Address.
//
// Notice that it is not valid having both empty string Addr and nil Metadata in an Update.
type Update struct {
// Op indicates the operation of the update.
// Since etcd 3.1.
Op Operation
// Addr is the updated address. It is empty string if there is no address update.
// Since etcd 3.1.
Addr string
// Metadata is the updated metadata. It is nil if there is no metadata update.
// Metadata is not required for a custom naming implementation.
// Since etcd 3.1.
Metadata interface{}
}

View File

@@ -0,0 +1,39 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package resolver
import (
"google.golang.org/grpc/resolver"
"go.etcd.io/etcd/clientv3"
)
type builder struct {
// ...
}
func (b builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
// To be implemented...
// Using endpoints.NewWatcher() to subscribe for endpoints changes.
return nil, nil
}
func (b builder) Scheme() string {
return "etcd"
}
func NewBuilder(client *clientv3.Client) (resolver.Builder, error) {
return builder{}, nil
}