*: make sure snapshot save downloads SHA256 checksum

ref. https://github.com/etcd-io/etcd/pull/11896

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
This commit is contained in:
Gyuho Lee 2020-05-17 15:51:39 -07:00
parent f1ea03a7c8
commit 9bad82fee5
5 changed files with 101 additions and 25 deletions

1
.words
View File

@ -76,6 +76,7 @@ consistentIndex
todo todo
saveWALAndSnap saveWALAndSnap
SHA
subconns subconns
nop nop
SubConns SubConns

View File

@ -20,6 +20,7 @@ import (
"io" "io"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb" pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -68,6 +69,7 @@ type Maintenance interface {
} }
type maintenance struct { type maintenance struct {
lg *zap.Logger
dial func(endpoint string) (pb.MaintenanceClient, func(), error) dial func(endpoint string) (pb.MaintenanceClient, func(), error)
remote pb.MaintenanceClient remote pb.MaintenanceClient
callOpts []grpc.CallOption callOpts []grpc.CallOption
@ -75,6 +77,7 @@ type maintenance struct {
func NewMaintenance(c *Client) Maintenance { func NewMaintenance(c *Client) Maintenance {
api := &maintenance{ api := &maintenance{
lg: c.lg,
dial: func(endpoint string) (pb.MaintenanceClient, func(), error) { dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
conn, err := c.Dial(endpoint) conn, err := c.Dial(endpoint)
if err != nil { if err != nil {
@ -93,6 +96,7 @@ func NewMaintenance(c *Client) Maintenance {
func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance { func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance {
api := &maintenance{ api := &maintenance{
lg: c.lg,
dial: func(string) (pb.MaintenanceClient, func(), error) { dial: func(string) (pb.MaintenanceClient, func(), error) {
return remote, func() {}, nil return remote, func() {}, nil
}, },
@ -193,23 +197,32 @@ func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
return nil, toErr(ctx, err) return nil, toErr(ctx, err)
} }
m.lg.Info("opened snapshot stream; downloading")
pr, pw := io.Pipe() pr, pw := io.Pipe()
go func() { go func() {
for { for {
resp, err := ss.Recv() resp, err := ss.Recv()
if err != nil { if err != nil {
switch err {
case io.EOF:
m.lg.Info("completed snapshot read; closing")
default:
m.lg.Warn("failed to receive from snapshot stream; closing", zap.Error(err))
}
pw.CloseWithError(err) pw.CloseWithError(err)
return return
} }
if resp == nil && err == nil {
break // can "resp == nil && err == nil"
} // before we receive snapshot SHA digest?
// No, server sends EOF with an empty response
// after it sends SHA digest at the end
if _, werr := pw.Write(resp.Blob); werr != nil { if _, werr := pw.Write(resp.Blob); werr != nil {
pw.CloseWithError(werr) pw.CloseWithError(werr)
return return
} }
} }
pw.Close()
}() }()
return &snapshotReadCloser{ctx: ctx, ReadCloser: pr}, nil return &snapshotReadCloser{ctx: ctx, ReadCloser: pr}, nil
} }

View File

@ -28,6 +28,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/dustin/go-humanize"
bolt "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver" "go.etcd.io/etcd/etcdserver"
@ -88,6 +89,14 @@ type v3Manager struct {
skipHashCheck bool skipHashCheck bool
} }
// hasChecksum returns "true" if the file size "n"
// has appended sha256 hash digest.
func hasChecksum(n int64) bool {
// 512 is chosen because it's a minimum disk sector size
// smaller than (and multiplies to) OS page size in most systems
return (n % 512) == sha256.Size
}
// Save fetches snapshot from remote etcd server and saves data to target path. // Save fetches snapshot from remote etcd server and saves data to target path.
func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string) error { func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string) error {
if len(cfg.Endpoints) != 1 { if len(cfg.Endpoints) != 1 {
@ -107,10 +116,7 @@ func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string
if err != nil { if err != nil {
return fmt.Errorf("could not open %s (%v)", partpath, err) return fmt.Errorf("could not open %s (%v)", partpath, err)
} }
s.lg.Info( s.lg.Info("created temporary db file", zap.String("path", partpath))
"created temporary db file",
zap.String("path", partpath),
)
now := time.Now() now := time.Now()
var rd io.ReadCloser var rd io.ReadCloser
@ -118,13 +124,15 @@ func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string
if err != nil { if err != nil {
return err return err
} }
s.lg.Info( s.lg.Info("fetching snapshot", zap.String("endpoint", cfg.Endpoints[0]))
"fetching snapshot", var size int64
zap.String("endpoint", cfg.Endpoints[0]), size, err = io.Copy(f, rd)
) if err != nil {
if _, err = io.Copy(f, rd); err != nil {
return err return err
} }
if !hasChecksum(size) {
return fmt.Errorf("sha256 checksum not found [bytes: %d]", size)
}
if err = fileutil.Fsync(f); err != nil { if err = fileutil.Fsync(f); err != nil {
return err return err
} }
@ -134,6 +142,7 @@ func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string
s.lg.Info( s.lg.Info(
"fetched snapshot", "fetched snapshot",
zap.String("endpoint", cfg.Endpoints[0]), zap.String("endpoint", cfg.Endpoints[0]),
zap.String("size", humanize.Bytes(uint64(size))),
zap.Duration("took", time.Since(now)), zap.Duration("took", time.Since(now)),
) )
@ -346,7 +355,7 @@ func (s *v3Manager) saveDB() error {
if serr != nil { if serr != nil {
return serr return serr
} }
hasHash := (off % 512) == sha256.Size hasHash := hasChecksum(off)
if hasHash { if hasHash {
if err := db.Truncate(off - sha256.Size); err != nil { if err := db.Truncate(off - sha256.Size); err != nil {
return err return err

View File

@ -313,7 +313,7 @@ func (s *Snapshotter) ReleaseSnapDBs(snap raftpb.Snapshot) error {
if s.lg != nil { if s.lg != nil {
s.lg.Warn("failed to parse index from filename", zap.String("path", filename), zap.String("error", err.Error())) s.lg.Warn("failed to parse index from filename", zap.String("path", filename), zap.String("error", err.Error()))
} else { } else {
plog.Warnf("failed to parse index from filename: %s (%v)", filename, err) plog.Warningf("failed to parse index from filename: %s (%v)", filename, err)
} }
continue continue
} }
@ -321,13 +321,13 @@ func (s *Snapshotter) ReleaseSnapDBs(snap raftpb.Snapshot) error {
if s.lg != nil { if s.lg != nil {
s.lg.Warn("found orphaned .snap.db file; deleting", zap.String("path", filename)) s.lg.Warn("found orphaned .snap.db file; deleting", zap.String("path", filename))
} else { } else {
plog.Warnf("found orphaned .snap.db file; deleting: %s", filename) plog.Warningf("found orphaned .snap.db file; deleting: %s", filename)
} }
if rmErr := os.Remove(filepath.Join(s.dir, filename)); rmErr != nil && !os.IsNotExist(rmErr) { if rmErr := os.Remove(filepath.Join(s.dir, filename)); rmErr != nil && !os.IsNotExist(rmErr) {
if s.lg != nil { if s.lg != nil {
s.lg.Warn("failed to remove orphaned .snap.db file", zap.String("path", filename), zap.Error(rmErr)) s.lg.Warn("failed to remove orphaned .snap.db file", zap.String("path", filename), zap.Error(rmErr))
} else { } else {
plog.Warnf("failed to remove orphaned .snap.db file: %s (%v)", filename, rmErr) plog.Warningf("failed to remove orphaned .snap.db file: %s (%v)", filename, rmErr)
} }
} }
} }

View File

@ -18,7 +18,9 @@ import (
"context" "context"
"crypto/sha256" "crypto/sha256"
"io" "io"
"time"
"github.com/dustin/go-humanize"
"go.etcd.io/etcd/auth" "go.etcd.io/etcd/auth"
"go.etcd.io/etcd/etcdserver" "go.etcd.io/etcd/etcdserver"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
@ -98,6 +100,9 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe
return &pb.DefragmentResponse{}, nil return &pb.DefragmentResponse{}, nil
} }
// big enough size to hold >1 OS pages in the buffer
const snapshotSendBufferSize = 32 * 1024
func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error { func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
snap := ms.bg.Backend().Snapshot() snap := ms.bg.Backend().Snapshot()
pr, pw := io.Pipe() pr, pw := io.Pipe()
@ -116,19 +121,46 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
pw.Close() pw.Close()
}() }()
// send file data // record SHA digest of snapshot data
// used for integrity checks during snapshot restore operation
h := sha256.New() h := sha256.New()
br := int64(0)
buf := make([]byte, 32*1024) // buffer just holds read bytes from stream
sz := snap.Size() // response size is multiple of OS page size, fetched in boltdb
for br < sz { // e.g. 4*1024
buf := make([]byte, snapshotSendBufferSize)
sent := int64(0)
total := snap.Size()
size := humanize.Bytes(uint64(total))
start := time.Now()
if ms.lg != nil {
ms.lg.Info("sending database snapshot to client",
zap.Int64("total-bytes", total),
zap.String("size", size),
)
} else {
plog.Infof("sending database snapshot to client %s [%d bytes]", size, total)
}
for total-sent > 0 {
n, err := io.ReadFull(pr, buf) n, err := io.ReadFull(pr, buf)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
return togRPCError(err) return togRPCError(err)
} }
br += int64(n) sent += int64(n)
// if total is x * snapshotSendBufferSize. it is possible that
// resp.RemainingBytes == 0
// resp.Blob == zero byte but not nil
// does this make server response sent to client nil in proto
// and client stops receiving from snapshot stream before
// server sends snapshot SHA?
// No, the client will still receive non-nil response
// until server closes the stream with EOF
resp := &pb.SnapshotResponse{ resp := &pb.SnapshotResponse{
RemainingBytes: uint64(sz - br), RemainingBytes: uint64(total - sent),
Blob: buf[:n], Blob: buf[:n],
} }
if err = srv.Send(resp); err != nil { if err = srv.Send(resp); err != nil {
@ -137,13 +169,34 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
h.Write(buf[:n]) h.Write(buf[:n])
} }
// send sha // send SHA digest for integrity checks
// during snapshot restore operation
sha := h.Sum(nil) sha := h.Sum(nil)
if ms.lg != nil {
ms.lg.Info("sending database sha256 checksum to client",
zap.Int64("total-bytes", total),
zap.Int("checksum-size", len(sha)),
)
} else {
plog.Infof("sending database sha256 checksum to client [%d bytes]", len(sha))
}
hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha} hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha}
if err := srv.Send(hresp); err != nil { if err := srv.Send(hresp); err != nil {
return togRPCError(err) return togRPCError(err)
} }
if ms.lg != nil {
ms.lg.Info("successfully sent database snapshot to client",
zap.Int64("total-bytes", total),
zap.String("size", size),
zap.String("took", humanize.Time(start)),
)
} else {
plog.Infof("successfully sent database snapshot to client %s [%d bytes]", size, total)
}
return nil return nil
} }