diff --git a/clientv3/maintenance.go b/clientv3/maintenance.go index 6164ef66a..9da27a4bb 100644 --- a/clientv3/maintenance.go +++ b/clientv3/maintenance.go @@ -20,7 +20,7 @@ import ( "io" pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" - + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -68,6 +68,7 @@ type Maintenance interface { } type maintenance struct { + lg *zap.Logger dial func(endpoint string) (pb.MaintenanceClient, func(), error) remote pb.MaintenanceClient callOpts []grpc.CallOption @@ -75,6 +76,7 @@ type maintenance struct { func NewMaintenance(c *Client) Maintenance { api := &maintenance{ + lg: c.lg, dial: func(endpoint string) (pb.MaintenanceClient, func(), error) { conn, err := c.Dial(endpoint) if err != nil { @@ -93,6 +95,7 @@ func NewMaintenance(c *Client) Maintenance { func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance { api := &maintenance{ + lg: c.lg, dial: func(string) (pb.MaintenanceClient, func(), error) { return remote, func() {}, nil }, @@ -193,23 +196,32 @@ func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) { return nil, toErr(ctx, err) } + m.lg.Info("opened snapshot stream; downloading") pr, pw := io.Pipe() go func() { for { resp, err := ss.Recv() if err != nil { + switch err { + case io.EOF: + m.lg.Info("completed snapshot read; closing") + default: + m.lg.Warn("failed to receive from snapshot stream; closing", zap.Error(err)) + } pw.CloseWithError(err) return } - if resp == nil && err == nil { - break - } + + // can "resp == nil && err == nil" + // before we receive snapshot SHA digest? + // No, server sends EOF with an empty response + // after it sends SHA digest at the end + if _, werr := pw.Write(resp.Blob); werr != nil { pw.CloseWithError(werr) return } } - pw.Close() }() return &snapshotReadCloser{ctx: ctx, ReadCloser: pr}, nil } diff --git a/clientv3/snapshot/v3_snapshot.go b/clientv3/snapshot/v3_snapshot.go index e5c81892a..f3a3c19f9 100644 --- a/clientv3/snapshot/v3_snapshot.go +++ b/clientv3/snapshot/v3_snapshot.go @@ -28,6 +28,7 @@ import ( "strings" "time" + "github.com/dustin/go-humanize" bolt "go.etcd.io/bbolt" "go.etcd.io/etcd/v3/clientv3" "go.etcd.io/etcd/v3/etcdserver" @@ -89,6 +90,14 @@ type v3Manager struct { skipHashCheck bool } +// hasChecksum returns "true" if the file size "n" +// has appended sha256 hash digest. +func hasChecksum(n int64) bool { + // 512 is chosen because it's a minimum disk sector size + // smaller than (and multiplies to) OS page size in most systems + return (n % 512) == sha256.Size +} + // Save fetches snapshot from remote etcd server and saves data to target path. func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string) error { if len(cfg.Endpoints) != 1 { @@ -108,10 +117,7 @@ func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string if err != nil { return fmt.Errorf("could not open %s (%v)", partpath, err) } - s.lg.Info( - "created temporary db file", - zap.String("path", partpath), - ) + s.lg.Info("created temporary db file", zap.String("path", partpath)) now := time.Now() var rd io.ReadCloser @@ -119,23 +125,25 @@ func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string if err != nil { return err } - s.lg.Info( - "fetching snapshot", - zap.String("endpoint", cfg.Endpoints[0]), - ) - if _, err = io.Copy(f, rd); err != nil { + s.lg.Info("fetching snapshot", zap.String("endpoint", cfg.Endpoints[0])) + var size int64 + size, err = io.Copy(f, rd) + if err != nil { return err } + if !hasChecksum(size) { + return fmt.Errorf("sha256 checksum not found [bytes: %d]", size) + } if err = fileutil.Fsync(f); err != nil { return err } if err = f.Close(); err != nil { return err } - s.lg.Info( - "fetched snapshot", + s.lg.Info("fetched snapshot", zap.String("endpoint", cfg.Endpoints[0]), - zap.Duration("took", time.Since(now)), + zap.String("size", humanize.Bytes(uint64(size))), + zap.String("took", humanize.Time(now)), ) if err = os.Rename(partpath, dbPath); err != nil { @@ -362,7 +370,7 @@ func (s *v3Manager) saveDB() error { if serr != nil { return serr } - hasHash := (off % 512) == sha256.Size + hasHash := hasChecksum(off) if hasHash { if err := db.Truncate(off - sha256.Size); err != nil { return err