mirror of
				https://github.com/etcd-io/etcd.git
				synced 2024-09-27 06:25:44 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			362 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			362 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2018 The etcd Authors
 | |
| //
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| //     http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package rpcpb
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"crypto/tls"
 | |
| 	"fmt"
 | |
| 	"net/url"
 | |
| 	"os"
 | |
| 	"time"
 | |
| 
 | |
| 	"go.etcd.io/etcd/clientv3"
 | |
| 	"go.etcd.io/etcd/clientv3/snapshot"
 | |
| 	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
 | |
| 	"go.etcd.io/etcd/pkg/transport"
 | |
| 
 | |
| 	"github.com/dustin/go-humanize"
 | |
| 	"go.uber.org/zap"
 | |
| 	grpc "google.golang.org/grpc"
 | |
| 	"google.golang.org/grpc/credentials"
 | |
| )
 | |
| 
 | |
| // ElectionTimeout returns an election timeout duration.
 | |
| func (m *Member) ElectionTimeout() time.Duration {
 | |
| 	return time.Duration(m.Etcd.ElectionTimeoutMs) * time.Millisecond
 | |
| }
 | |
| 
 | |
| // DialEtcdGRPCServer creates a raw gRPC connection to an etcd member.
 | |
| func (m *Member) DialEtcdGRPCServer(opts ...grpc.DialOption) (*grpc.ClientConn, error) {
 | |
| 	dialOpts := []grpc.DialOption{
 | |
| 		grpc.WithTimeout(5 * time.Second),
 | |
| 		grpc.WithBlock(),
 | |
| 	}
 | |
| 
 | |
| 	secure := false
 | |
| 	for _, cu := range m.Etcd.AdvertiseClientURLs {
 | |
| 		u, err := url.Parse(cu)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		if u.Scheme == "https" { // TODO: handle unix
 | |
| 			secure = true
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if secure {
 | |
| 		// assume save TLS assets are already stord on disk
 | |
| 		tlsInfo := transport.TLSInfo{
 | |
| 			CertFile:      m.ClientCertPath,
 | |
| 			KeyFile:       m.ClientKeyPath,
 | |
| 			TrustedCAFile: m.ClientTrustedCAPath,
 | |
| 
 | |
| 			// TODO: remove this with generated certs
 | |
| 			// only need it for auto TLS
 | |
| 			InsecureSkipVerify: true,
 | |
| 		}
 | |
| 		tlsConfig, err := tlsInfo.ClientConfig()
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		creds := credentials.NewTLS(tlsConfig)
 | |
| 		dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
 | |
| 	} else {
 | |
| 		dialOpts = append(dialOpts, grpc.WithInsecure())
 | |
| 	}
 | |
| 	dialOpts = append(dialOpts, opts...)
 | |
| 	return grpc.Dial(m.EtcdClientEndpoint, dialOpts...)
 | |
| }
 | |
| 
 | |
| // CreateEtcdClientConfig creates a client configuration from member.
 | |
| func (m *Member) CreateEtcdClientConfig(opts ...grpc.DialOption) (cfg *clientv3.Config, err error) {
 | |
| 	secure := false
 | |
| 	for _, cu := range m.Etcd.AdvertiseClientURLs {
 | |
| 		var u *url.URL
 | |
| 		u, err = url.Parse(cu)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		if u.Scheme == "https" { // TODO: handle unix
 | |
| 			secure = true
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	cfg = &clientv3.Config{
 | |
| 		Endpoints:   []string{m.EtcdClientEndpoint},
 | |
| 		DialTimeout: 10 * time.Second,
 | |
| 		DialOptions: opts,
 | |
| 	}
 | |
| 	if secure {
 | |
| 		// assume save TLS assets are already stord on disk
 | |
| 		tlsInfo := transport.TLSInfo{
 | |
| 			CertFile:      m.ClientCertPath,
 | |
| 			KeyFile:       m.ClientKeyPath,
 | |
| 			TrustedCAFile: m.ClientTrustedCAPath,
 | |
| 
 | |
| 			// TODO: remove this with generated certs
 | |
| 			// only need it for auto TLS
 | |
| 			InsecureSkipVerify: true,
 | |
| 		}
 | |
| 		var tlsConfig *tls.Config
 | |
| 		tlsConfig, err = tlsInfo.ClientConfig()
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		cfg.TLS = tlsConfig
 | |
| 	}
 | |
| 	return cfg, err
 | |
| }
 | |
| 
 | |
| // CreateEtcdClient creates a client from member.
 | |
| func (m *Member) CreateEtcdClient(opts ...grpc.DialOption) (*clientv3.Client, error) {
 | |
| 	cfg, err := m.CreateEtcdClientConfig(opts...)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return clientv3.New(*cfg)
 | |
| }
 | |
| 
 | |
| // CheckCompact ensures that historical data before given revision has been compacted.
 | |
| func (m *Member) CheckCompact(rev int64) error {
 | |
| 	cli, err := m.CreateEtcdClient()
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
 | |
| 	}
 | |
| 	defer cli.Close()
 | |
| 
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 | |
| 	wch := cli.Watch(ctx, "\x00", clientv3.WithFromKey(), clientv3.WithRev(rev-1))
 | |
| 	wr, ok := <-wch
 | |
| 	cancel()
 | |
| 
 | |
| 	if !ok {
 | |
| 		return fmt.Errorf("watch channel terminated (endpoint %q)", m.EtcdClientEndpoint)
 | |
| 	}
 | |
| 	if wr.CompactRevision != rev {
 | |
| 		return fmt.Errorf("got compact revision %v, wanted %v (endpoint %q)", wr.CompactRevision, rev, m.EtcdClientEndpoint)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Defrag runs defragmentation on this member.
 | |
| func (m *Member) Defrag() error {
 | |
| 	cli, err := m.CreateEtcdClient()
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
 | |
| 	}
 | |
| 	defer cli.Close()
 | |
| 
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
 | |
| 	_, err = cli.Defragment(ctx, m.EtcdClientEndpoint)
 | |
| 	cancel()
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // RevHash fetches current revision and hash on this member.
 | |
| func (m *Member) RevHash() (int64, int64, error) {
 | |
| 	conn, err := m.DialEtcdGRPCServer()
 | |
| 	if err != nil {
 | |
| 		return 0, 0, err
 | |
| 	}
 | |
| 	defer conn.Close()
 | |
| 
 | |
| 	mt := pb.NewMaintenanceClient(conn)
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 | |
| 	resp, err := mt.Hash(ctx, &pb.HashRequest{}, grpc.FailFast(false))
 | |
| 	cancel()
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return 0, 0, err
 | |
| 	}
 | |
| 
 | |
| 	return resp.Header.Revision, int64(resp.Hash), nil
 | |
| }
 | |
| 
 | |
| // Rev fetches current revision on this member.
 | |
| func (m *Member) Rev(ctx context.Context) (int64, error) {
 | |
| 	cli, err := m.CreateEtcdClient()
 | |
| 	if err != nil {
 | |
| 		return 0, fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
 | |
| 	}
 | |
| 	defer cli.Close()
 | |
| 
 | |
| 	resp, err := cli.Status(ctx, m.EtcdClientEndpoint)
 | |
| 	if err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	return resp.Header.Revision, nil
 | |
| }
 | |
| 
 | |
| // Compact compacts member storage with given revision.
 | |
| // It blocks until it's physically done.
 | |
| func (m *Member) Compact(rev int64, timeout time.Duration) error {
 | |
| 	cli, err := m.CreateEtcdClient()
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
 | |
| 	}
 | |
| 	defer cli.Close()
 | |
| 
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), timeout)
 | |
| 	_, err = cli.Compact(ctx, rev, clientv3.WithCompactPhysical())
 | |
| 	cancel()
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // IsLeader returns true if this member is the current cluster leader.
 | |
| func (m *Member) IsLeader() (bool, error) {
 | |
| 	cli, err := m.CreateEtcdClient()
 | |
| 	if err != nil {
 | |
| 		return false, fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
 | |
| 	}
 | |
| 	defer cli.Close()
 | |
| 
 | |
| 	resp, err := cli.Status(context.Background(), m.EtcdClientEndpoint)
 | |
| 	if err != nil {
 | |
| 		return false, err
 | |
| 	}
 | |
| 	return resp.Header.MemberId == resp.Leader, nil
 | |
| }
 | |
| 
 | |
| // WriteHealthKey writes a health key to this member.
 | |
| func (m *Member) WriteHealthKey() error {
 | |
| 	cli, err := m.CreateEtcdClient()
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
 | |
| 	}
 | |
| 	defer cli.Close()
 | |
| 
 | |
| 	// give enough time-out in case expensive requests (range/delete) are pending
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 | |
| 	_, err = cli.Put(ctx, "health", "good")
 | |
| 	cancel()
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // SaveSnapshot downloads a snapshot file from this member, locally.
 | |
| // It's meant to requested remotely, so that local member can store
 | |
| // snapshot file on local disk.
 | |
| func (m *Member) SaveSnapshot(lg *zap.Logger) (err error) {
 | |
| 	// remove existing snapshot first
 | |
| 	if err = os.RemoveAll(m.SnapshotPath); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	var ccfg *clientv3.Config
 | |
| 	ccfg, err = m.CreateEtcdClientConfig()
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
 | |
| 	}
 | |
| 
 | |
| 	lg.Info(
 | |
| 		"snapshot save START",
 | |
| 		zap.String("member-name", m.Etcd.Name),
 | |
| 		zap.Strings("member-client-urls", m.Etcd.AdvertiseClientURLs),
 | |
| 		zap.String("snapshot-path", m.SnapshotPath),
 | |
| 	)
 | |
| 	now := time.Now()
 | |
| 	mgr := snapshot.NewV3(lg)
 | |
| 	if err = mgr.Save(context.Background(), *ccfg, m.SnapshotPath); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	took := time.Since(now)
 | |
| 
 | |
| 	var fi os.FileInfo
 | |
| 	fi, err = os.Stat(m.SnapshotPath)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	var st snapshot.Status
 | |
| 	st, err = mgr.Status(m.SnapshotPath)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	m.SnapshotInfo = &SnapshotInfo{
 | |
| 		MemberName:        m.Etcd.Name,
 | |
| 		MemberClientURLs:  m.Etcd.AdvertiseClientURLs,
 | |
| 		SnapshotPath:      m.SnapshotPath,
 | |
| 		SnapshotFileSize:  humanize.Bytes(uint64(fi.Size())),
 | |
| 		SnapshotTotalSize: humanize.Bytes(uint64(st.TotalSize)),
 | |
| 		SnapshotTotalKey:  int64(st.TotalKey),
 | |
| 		SnapshotHash:      int64(st.Hash),
 | |
| 		SnapshotRevision:  st.Revision,
 | |
| 		Took:              fmt.Sprintf("%v", took),
 | |
| 	}
 | |
| 	lg.Info(
 | |
| 		"snapshot save END",
 | |
| 		zap.String("member-name", m.SnapshotInfo.MemberName),
 | |
| 		zap.Strings("member-client-urls", m.SnapshotInfo.MemberClientURLs),
 | |
| 		zap.String("snapshot-path", m.SnapshotPath),
 | |
| 		zap.String("snapshot-file-size", m.SnapshotInfo.SnapshotFileSize),
 | |
| 		zap.String("snapshot-total-size", m.SnapshotInfo.SnapshotTotalSize),
 | |
| 		zap.Int64("snapshot-total-key", m.SnapshotInfo.SnapshotTotalKey),
 | |
| 		zap.Int64("snapshot-hash", m.SnapshotInfo.SnapshotHash),
 | |
| 		zap.Int64("snapshot-revision", m.SnapshotInfo.SnapshotRevision),
 | |
| 		zap.String("took", m.SnapshotInfo.Took),
 | |
| 	)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // RestoreSnapshot restores a cluster from a given snapshot file on disk.
 | |
| // It's meant to requested remotely, so that local member can load the
 | |
| // snapshot file from local disk.
 | |
| func (m *Member) RestoreSnapshot(lg *zap.Logger) (err error) {
 | |
| 	if err = os.RemoveAll(m.EtcdOnSnapshotRestore.DataDir); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if err = os.RemoveAll(m.EtcdOnSnapshotRestore.WALDir); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	lg.Info(
 | |
| 		"snapshot restore START",
 | |
| 		zap.String("member-name", m.Etcd.Name),
 | |
| 		zap.Strings("member-client-urls", m.Etcd.AdvertiseClientURLs),
 | |
| 		zap.String("snapshot-path", m.SnapshotPath),
 | |
| 	)
 | |
| 	now := time.Now()
 | |
| 	mgr := snapshot.NewV3(lg)
 | |
| 	err = mgr.Restore(snapshot.RestoreConfig{
 | |
| 		SnapshotPath:        m.SnapshotInfo.SnapshotPath,
 | |
| 		Name:                m.EtcdOnSnapshotRestore.Name,
 | |
| 		OutputDataDir:       m.EtcdOnSnapshotRestore.DataDir,
 | |
| 		OutputWALDir:        m.EtcdOnSnapshotRestore.WALDir,
 | |
| 		PeerURLs:            m.EtcdOnSnapshotRestore.AdvertisePeerURLs,
 | |
| 		InitialCluster:      m.EtcdOnSnapshotRestore.InitialCluster,
 | |
| 		InitialClusterToken: m.EtcdOnSnapshotRestore.InitialClusterToken,
 | |
| 		SkipHashCheck:       false,
 | |
| 		// TODO: set SkipHashCheck it true, to recover from existing db file
 | |
| 	})
 | |
| 	took := time.Since(now)
 | |
| 	lg.Info(
 | |
| 		"snapshot restore END",
 | |
| 		zap.String("member-name", m.SnapshotInfo.MemberName),
 | |
| 		zap.Strings("member-client-urls", m.SnapshotInfo.MemberClientURLs),
 | |
| 		zap.String("snapshot-path", m.SnapshotPath),
 | |
| 		zap.String("snapshot-file-size", m.SnapshotInfo.SnapshotFileSize),
 | |
| 		zap.String("snapshot-total-size", m.SnapshotInfo.SnapshotTotalSize),
 | |
| 		zap.Int64("snapshot-total-key", m.SnapshotInfo.SnapshotTotalKey),
 | |
| 		zap.Int64("snapshot-hash", m.SnapshotInfo.SnapshotHash),
 | |
| 		zap.Int64("snapshot-revision", m.SnapshotInfo.SnapshotRevision),
 | |
| 		zap.String("took", took.String()),
 | |
| 		zap.Error(err),
 | |
| 	)
 | |
| 	return err
 | |
| }
 | 
