functional/rpcpb: use new snapshot package interface

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyuho Lee 2018-04-12 10:39:18 -07:00
parent 90e5af76f3
commit 8ae2bbf3c4

View File

@ -16,6 +16,7 @@ package rpcpb
import ( import (
"context" "context"
"crypto/tls"
"fmt" "fmt"
"net/url" "net/url"
"os" "os"
@ -24,7 +25,6 @@ import (
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/snapshot" "github.com/coreos/etcd/snapshot"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
@ -80,11 +80,12 @@ func (m *Member) DialEtcdGRPCServer(opts ...grpc.DialOption) (*grpc.ClientConn,
return grpc.Dial(m.EtcdClientEndpoint, dialOpts...) return grpc.Dial(m.EtcdClientEndpoint, dialOpts...)
} }
// CreateEtcdClient creates a client from member. // CreateEtcdClientConfig creates a client configuration from member.
func (m *Member) CreateEtcdClient(opts ...grpc.DialOption) (*clientv3.Client, error) { func (m *Member) CreateEtcdClientConfig(opts ...grpc.DialOption) (cfg *clientv3.Config, err error) {
secure := false secure := false
for _, cu := range m.Etcd.AdvertiseClientURLs { for _, cu := range m.Etcd.AdvertiseClientURLs {
u, err := url.Parse(cu) var u *url.URL
u, err = url.Parse(cu)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -93,7 +94,7 @@ func (m *Member) CreateEtcdClient(opts ...grpc.DialOption) (*clientv3.Client, er
} }
} }
cfg := clientv3.Config{ cfg = &clientv3.Config{
Endpoints: []string{m.EtcdClientEndpoint}, Endpoints: []string{m.EtcdClientEndpoint},
DialTimeout: 10 * time.Second, DialTimeout: 10 * time.Second,
DialOptions: opts, DialOptions: opts,
@ -109,13 +110,23 @@ func (m *Member) CreateEtcdClient(opts ...grpc.DialOption) (*clientv3.Client, er
// only need it for auto TLS // only need it for auto TLS
InsecureSkipVerify: true, InsecureSkipVerify: true,
} }
tlsConfig, err := tlsInfo.ClientConfig() var tlsConfig *tls.Config
tlsConfig, err = tlsInfo.ClientConfig()
if err != nil { if err != nil {
return nil, err return nil, err
} }
cfg.TLS = tlsConfig cfg.TLS = tlsConfig
} }
return clientv3.New(cfg) return cfg, err
}
// CreateEtcdClient creates a client from member.
func (m *Member) CreateEtcdClient(opts ...grpc.DialOption) (*clientv3.Client, error) {
cfg, err := m.CreateEtcdClientConfig(opts...)
if err != nil {
return nil, err
}
return clientv3.New(*cfg)
} }
// CheckCompact ensures that historical data before given revision has been compacted. // CheckCompact ensures that historical data before given revision has been compacted.
@ -247,12 +258,11 @@ func (m *Member) SaveSnapshot(lg *zap.Logger) (err error) {
return err return err
} }
var cli *clientv3.Client var ccfg *clientv3.Config
cli, err = m.CreateEtcdClient() ccfg, err = m.CreateEtcdClientConfig()
if err != nil { if err != nil {
return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint) return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
} }
defer cli.Close()
lg.Info( lg.Info(
"snapshot save START", "snapshot save START",
@ -261,8 +271,8 @@ func (m *Member) SaveSnapshot(lg *zap.Logger) (err error) {
zap.String("snapshot-path", m.SnapshotPath), zap.String("snapshot-path", m.SnapshotPath),
) )
now := time.Now() now := time.Now()
mgr := snapshot.NewV3(cli, lg) mgr := snapshot.NewV3(lg)
if err = mgr.Save(context.Background(), m.SnapshotPath); err != nil { if err = mgr.Save(context.Background(), *ccfg, m.SnapshotPath); err != nil {
return err return err
} }
took := time.Since(now) took := time.Since(now)
@ -314,17 +324,6 @@ func (m *Member) RestoreSnapshot(lg *zap.Logger) (err error) {
return err return err
} }
var initialCluster types.URLsMap
initialCluster, err = types.NewURLsMap(m.EtcdOnSnapshotRestore.InitialCluster)
if err != nil {
return err
}
var peerURLs types.URLs
peerURLs, err = types.NewURLs(m.EtcdOnSnapshotRestore.AdvertisePeerURLs)
if err != nil {
return err
}
lg.Info( lg.Info(
"snapshot restore START", "snapshot restore START",
zap.String("member-name", m.Etcd.Name), zap.String("member-name", m.Etcd.Name),
@ -332,17 +331,17 @@ func (m *Member) RestoreSnapshot(lg *zap.Logger) (err error) {
zap.String("snapshot-path", m.SnapshotPath), zap.String("snapshot-path", m.SnapshotPath),
) )
now := time.Now() now := time.Now()
mgr := snapshot.NewV3(nil, lg) mgr := snapshot.NewV3(lg)
err = mgr.Restore(m.SnapshotInfo.SnapshotPath, snapshot.RestoreConfig{ err = mgr.Restore(snapshot.RestoreConfig{
SnapshotPath: m.SnapshotInfo.SnapshotPath,
Name: m.EtcdOnSnapshotRestore.Name, Name: m.EtcdOnSnapshotRestore.Name,
OutputDataDir: m.EtcdOnSnapshotRestore.DataDir, OutputDataDir: m.EtcdOnSnapshotRestore.DataDir,
OutputWALDir: m.EtcdOnSnapshotRestore.WALDir, OutputWALDir: m.EtcdOnSnapshotRestore.WALDir,
InitialCluster: initialCluster, PeerURLs: m.EtcdOnSnapshotRestore.AdvertisePeerURLs,
InitialCluster: m.EtcdOnSnapshotRestore.InitialCluster,
InitialClusterToken: m.EtcdOnSnapshotRestore.InitialClusterToken, InitialClusterToken: m.EtcdOnSnapshotRestore.InitialClusterToken,
PeerURLs: peerURLs,
SkipHashCheck: false, SkipHashCheck: false,
// TODO: set SkipHashCheck it true, to recover from existing db file
// TODO: SkipHashCheck == true, for recover from existing db file
}) })
took := time.Since(now) took := time.Since(now)
lg.Info( lg.Info(