mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

This commit adds a functionality of attaching an auth token to gRPC connection as a per RPC credential. For doing this, this commit lets clientv3.Client.Dial() create a dedicated gRPC connection for doing authentication. With the dedicated connection, the client calls Authenticate() RPC and obtain its token. The token is attached to the main gRPC connection with grpc.WithPerRPCCredentials(). This commit also adds a new option --username to etcdctl (v3). With this option, etcdctl attaches its auth token to the main gRPC connection (currently it is not used at all).
371 lines
8.9 KiB
Go
371 lines
8.9 KiB
Go
// Copyright 2016 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package clientv3
|
|
|
|
import (
|
|
"errors"
|
|
"io/ioutil"
|
|
"log"
|
|
"net"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
|
|
|
"golang.org/x/net/context"
|
|
"golang.org/x/time/rate"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/metadata"
|
|
)
|
|
|
|
var (
|
|
ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
|
|
|
|
// minConnRetryWait is the minimum time between reconnects to avoid flooding
|
|
minConnRetryWait = time.Second
|
|
)
|
|
|
|
// Client provides and manages an etcd v3 client session.
|
|
type Client struct {
|
|
Cluster
|
|
KV
|
|
Lease
|
|
Watcher
|
|
Auth
|
|
Maintenance
|
|
|
|
conn *grpc.ClientConn
|
|
cfg Config
|
|
creds *credentials.TransportAuthenticator
|
|
mu sync.RWMutex // protects connection selection and error list
|
|
errors []error // errors passed to retryConnection
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
// fields below are managed by connMonitor
|
|
|
|
// reconnc accepts writes which signal the client should reconnect
|
|
reconnc chan error
|
|
// newconnc is closed on successful connect and set to a fresh channel
|
|
newconnc chan struct{}
|
|
lastConnErr error
|
|
|
|
// Username is a username of authentication
|
|
Username string
|
|
// Password is a password of authentication
|
|
Password string
|
|
}
|
|
|
|
// New creates a new etcdv3 client from a given configuration.
|
|
func New(cfg Config) (*Client, error) {
|
|
if cfg.RetryDialer == nil {
|
|
cfg.RetryDialer = dialEndpointList
|
|
}
|
|
if len(cfg.Endpoints) == 0 {
|
|
return nil, ErrNoAvailableEndpoints
|
|
}
|
|
|
|
return newClient(&cfg)
|
|
}
|
|
|
|
// NewFromURL creates a new etcdv3 client from a URL.
|
|
func NewFromURL(url string) (*Client, error) {
|
|
return New(Config{Endpoints: []string{url}})
|
|
}
|
|
|
|
// NewFromConfigFile creates a new etcdv3 client from a configuration file.
|
|
func NewFromConfigFile(path string) (*Client, error) {
|
|
cfg, err := configFromFile(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return New(*cfg)
|
|
}
|
|
|
|
// Close shuts down the client's etcd connections.
|
|
func (c *Client) Close() error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if c.cancel == nil {
|
|
return nil
|
|
}
|
|
c.cancel()
|
|
c.cancel = nil
|
|
connc := c.newconnc
|
|
c.mu.Unlock()
|
|
c.connStartRetry(nil)
|
|
c.Watcher.Close()
|
|
c.Lease.Close()
|
|
<-connc
|
|
c.mu.Lock()
|
|
if c.lastConnErr != c.ctx.Err() {
|
|
return c.lastConnErr
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Ctx is a context for "out of band" messages (e.g., for sending
|
|
// "clean up" message when another context is canceled). It is
|
|
// canceled on client Close().
|
|
func (c *Client) Ctx() context.Context { return c.ctx }
|
|
|
|
// Endpoints lists the registered endpoints for the client.
|
|
func (c *Client) Endpoints() []string { return c.cfg.Endpoints }
|
|
|
|
// Errors returns all errors that have been observed since called last.
|
|
func (c *Client) Errors() (errs []error) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
errs = c.errors
|
|
c.errors = nil
|
|
return errs
|
|
}
|
|
|
|
type authTokenCredential struct {
|
|
token string
|
|
}
|
|
|
|
func (cred authTokenCredential) RequireTransportSecurity() bool {
|
|
return false
|
|
}
|
|
|
|
func (cred authTokenCredential) GetRequestMetadata(ctx context.Context, s ...string) (map[string]string, error) {
|
|
return map[string]string{
|
|
"token": cred.token,
|
|
}, nil
|
|
}
|
|
|
|
// Dial establishes a connection for a given endpoint using the client's config
|
|
func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) {
|
|
opts := []grpc.DialOption{
|
|
grpc.WithBlock(),
|
|
grpc.WithTimeout(c.cfg.DialTimeout),
|
|
}
|
|
if c.creds != nil {
|
|
opts = append(opts, grpc.WithTransportCredentials(*c.creds))
|
|
} else {
|
|
opts = append(opts, grpc.WithInsecure())
|
|
}
|
|
|
|
proto := "tcp"
|
|
if url, uerr := url.Parse(endpoint); uerr == nil && url.Scheme == "unix" {
|
|
proto = "unix"
|
|
// strip unix:// prefix so certs work
|
|
endpoint = url.Host
|
|
}
|
|
f := func(a string, t time.Duration) (net.Conn, error) {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return nil, c.ctx.Err()
|
|
default:
|
|
}
|
|
return net.DialTimeout(proto, a, t)
|
|
}
|
|
opts = append(opts, grpc.WithDialer(f))
|
|
|
|
if c.Username != "" && c.Password != "" {
|
|
auth, err := newAuthenticator(endpoint, opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer auth.close()
|
|
|
|
resp, err := auth.authenticate(c.ctx, c.Username, c.Password)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
opts = append(opts, grpc.WithPerRPCCredentials(authTokenCredential{token: resp.Token}))
|
|
}
|
|
|
|
conn, err := grpc.Dial(endpoint, opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return conn, nil
|
|
}
|
|
|
|
// WithRequireLeader requires client requests to only succeed
|
|
// when the cluster has a leader.
|
|
func WithRequireLeader(ctx context.Context) context.Context {
|
|
md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
|
|
return metadata.NewContext(ctx, md)
|
|
}
|
|
|
|
func newClient(cfg *Config) (*Client, error) {
|
|
if cfg == nil {
|
|
cfg = &Config{RetryDialer: dialEndpointList}
|
|
}
|
|
var creds *credentials.TransportAuthenticator
|
|
if cfg.TLS != nil {
|
|
c := credentials.NewTLS(cfg.TLS)
|
|
creds = &c
|
|
}
|
|
|
|
// use a temporary skeleton client to bootstrap first connection
|
|
ctx, cancel := context.WithCancel(context.TODO())
|
|
conn, err := cfg.RetryDialer(&Client{cfg: *cfg, creds: creds, ctx: ctx, Username: cfg.Username, Password: cfg.Password})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
client := &Client{
|
|
conn: conn,
|
|
cfg: *cfg,
|
|
creds: creds,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
reconnc: make(chan error, 1),
|
|
newconnc: make(chan struct{}),
|
|
}
|
|
|
|
if cfg.Username != "" && cfg.Password != "" {
|
|
client.Username = cfg.Username
|
|
client.Password = cfg.Password
|
|
}
|
|
|
|
go client.connMonitor()
|
|
|
|
client.Cluster = NewCluster(client)
|
|
client.KV = NewKV(client)
|
|
client.Lease = NewLease(client)
|
|
client.Watcher = NewWatcher(client)
|
|
client.Auth = NewAuth(client)
|
|
client.Maintenance = NewMaintenance(client)
|
|
if cfg.Logger != nil {
|
|
logger.Set(cfg.Logger)
|
|
} else {
|
|
// disable client side grpc by default
|
|
logger.Set(log.New(ioutil.Discard, "", 0))
|
|
}
|
|
|
|
return client, nil
|
|
}
|
|
|
|
// ActiveConnection returns the current in-use connection
|
|
func (c *Client) ActiveConnection() *grpc.ClientConn {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.conn
|
|
}
|
|
|
|
// retryConnection establishes a new connection
|
|
func (c *Client) retryConnection(err error) (newConn *grpc.ClientConn, dialErr error) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if err != nil {
|
|
c.errors = append(c.errors, err)
|
|
}
|
|
if c.conn != nil {
|
|
c.conn.Close()
|
|
if st, _ := c.conn.State(); st != grpc.Shutdown {
|
|
// wait so grpc doesn't leak sleeping goroutines
|
|
c.conn.WaitForStateChange(context.Background(), st)
|
|
}
|
|
}
|
|
if c.cancel == nil {
|
|
// client has called Close() so don't try to dial out
|
|
return nil, c.ctx.Err()
|
|
}
|
|
|
|
c.conn, dialErr = c.cfg.RetryDialer(c)
|
|
if dialErr != nil {
|
|
c.errors = append(c.errors, dialErr)
|
|
}
|
|
return c.conn, dialErr
|
|
}
|
|
|
|
// connStartRetry schedules a reconnect if one is not already running
|
|
func (c *Client) connStartRetry(err error) {
|
|
c.mu.Lock()
|
|
ch := c.reconnc
|
|
defer c.mu.Unlock()
|
|
select {
|
|
case ch <- err:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// connWait waits for a reconnect to be processed
|
|
func (c *Client) connWait(ctx context.Context, err error) (*grpc.ClientConn, error) {
|
|
c.mu.Lock()
|
|
ch := c.newconnc
|
|
c.mu.Unlock()
|
|
c.connStartRetry(err)
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case <-ch:
|
|
}
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
return c.conn, c.lastConnErr
|
|
}
|
|
|
|
// connMonitor monitors the connection and handles retries
|
|
func (c *Client) connMonitor() {
|
|
var err error
|
|
|
|
defer func() {
|
|
_, err = c.retryConnection(c.ctx.Err())
|
|
c.mu.Lock()
|
|
c.lastConnErr = err
|
|
close(c.newconnc)
|
|
c.mu.Unlock()
|
|
}()
|
|
|
|
limiter := rate.NewLimiter(rate.Every(minConnRetryWait), 1)
|
|
for limiter.Wait(c.ctx) == nil {
|
|
select {
|
|
case err = <-c.reconnc:
|
|
case <-c.ctx.Done():
|
|
return
|
|
}
|
|
conn, connErr := c.retryConnection(err)
|
|
c.mu.Lock()
|
|
c.lastConnErr = connErr
|
|
c.conn = conn
|
|
close(c.newconnc)
|
|
c.newconnc = make(chan struct{})
|
|
c.reconnc = make(chan error, 1)
|
|
c.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
// dialEndpointList attempts to connect to each endpoint in order until a
|
|
// connection is established.
|
|
func dialEndpointList(c *Client) (*grpc.ClientConn, error) {
|
|
var err error
|
|
for _, ep := range c.Endpoints() {
|
|
conn, curErr := c.Dial(ep)
|
|
if curErr != nil {
|
|
err = curErr
|
|
} else {
|
|
return conn, nil
|
|
}
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
// isHaltErr returns true if the given error and context indicate no forward
|
|
// progress can be made, even after reconnecting.
|
|
func isHaltErr(ctx context.Context, err error) bool {
|
|
isRPCError := strings.HasPrefix(grpc.ErrorDesc(err), "etcdserver: ")
|
|
return isRPCError || ctx.Err() != nil
|
|
}
|