Merge pull request #8263 from fanminshi/hash_by_rev

api: hash by rev
This commit is contained in:
Xiang Li 2017-07-26 11:22:33 -07:00 committed by GitHub
commit 2a348fb8e9
12 changed files with 1068 additions and 382 deletions

View File

@ -858,15 +858,15 @@
"tags": [
"Maintenance"
],
"summary": "Hash returns the hash of the local KV state for consistency checking purpose.\nThis is designed for testing; do not use this in production when there\nare ongoing transactions.",
"operationId": "Hash",
"summary": "HashKV computes the hash of all MVCC keys up to a given revision.",
"operationId": "HashKV",
"parameters": [
{
"name": "body",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/etcdserverpbHashRequest"
"$ref": "#/definitions/etcdserverpbHashKVRequest"
}
}
],
@ -874,7 +874,7 @@
"200": {
"description": "(empty)",
"schema": {
"$ref": "#/definitions/etcdserverpbHashResponse"
"$ref": "#/definitions/etcdserverpbHashKVResponse"
}
}
}
@ -1552,6 +1552,34 @@
}
}
},
"etcdserverpbHashKVRequest": {
"type": "object",
"properties": {
"revision": {
"description": "revision is the key-value store revision for the hash operation.",
"type": "string",
"format": "int64"
}
}
},
"etcdserverpbHashKVResponse": {
"type": "object",
"properties": {
"compact_revision": {
"description": "compact_revision is the compacted revision of key-value store when hash begins.",
"type": "string",
"format": "int64"
},
"hash": {
"description": "hash is the hash value computed from the responding member's MVCC keys up to a given revision.",
"type": "integer",
"format": "int64"
},
"header": {
"$ref": "#/definitions/etcdserverpbResponseHeader"
}
}
},
"etcdserverpbHashRequest": {
"type": "object"
},
@ -1559,7 +1587,7 @@
"type": "object",
"properties": {
"hash": {
"description": "hash is the hash value computed from the responding member's key-value store.",
"description": "hash is the hash value computed from the responding member's KV's backend.",
"type": "integer",
"format": "int64"
},

View File

@ -137,6 +137,17 @@ func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.H
return resp, nil
}
func (ms *maintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
h, rev, compactRev, err := ms.kg.KV().HashByRev(r.Revision)
if err != nil {
return nil, togRPCError(err)
}
resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h, CompactRevision: compactRev}
ms.hdr.fill(resp.Header)
return resp, nil
}
func (ms *maintenanceServer) Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
return ms.a.Alarm(ctx, ar)
}
@ -203,6 +214,13 @@ func (ams *authMaintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (
return ams.maintenanceServer.Hash(ctx, r)
}
func (ams *authMaintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
if err := ams.isAuthenticated(ctx); err != nil {
return nil, err
}
return ams.maintenanceServer.HashKV(ctx, r)
}
func (ams *authMaintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
return ams.maintenanceServer.Status(ctx, ar)
}

View File

@ -32,6 +32,8 @@
CompactionRequest
CompactionResponse
HashRequest
HashKVRequest
HashKVResponse
HashResponse
SnapshotRequest
SnapshotResponse

View File

@ -340,6 +340,19 @@ func request_Maintenance_Hash_0(ctx context.Context, marshaler runtime.Marshaler
}
func request_Maintenance_HashKV_0(ctx context.Context, marshaler runtime.Marshaler, client etcdserverpb.MaintenanceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq etcdserverpb.HashKVRequest
var metadata runtime.ServerMetadata
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil {
return nil, metadata, grpc.Errorf(codes.InvalidArgument, "%v", err)
}
msg, err := client.HashKV(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func request_Maintenance_Snapshot_0(ctx context.Context, marshaler runtime.Marshaler, client etcdserverpb.MaintenanceClient, req *http.Request, pathParams map[string]string) (etcdserverpb.Maintenance_SnapshotClient, runtime.ServerMetadata, error) {
var protoReq etcdserverpb.SnapshotRequest
var metadata runtime.ServerMetadata
@ -1320,6 +1333,34 @@ func RegisterMaintenanceHandler(ctx context.Context, mux *runtime.ServeMux, conn
})
mux.Handle("POST", pattern_Maintenance_HashKV_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if cn, ok := w.(http.CloseNotifier); ok {
go func(done <-chan struct{}, closed <-chan bool) {
select {
case <-done:
case <-closed:
cancel()
}
}(ctx.Done(), cn.CloseNotify())
}
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateContext(ctx, req)
if err != nil {
runtime.HTTPError(ctx, outboundMarshaler, w, req, err)
}
resp, md, err := request_Maintenance_HashKV_0(rctx, inboundMarshaler, client, req, pathParams)
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, outboundMarshaler, w, req, err)
return
}
forward_Maintenance_HashKV_0(ctx, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
mux.Handle("POST", pattern_Maintenance_Snapshot_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -1388,6 +1429,8 @@ var (
pattern_Maintenance_Hash_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v3alpha", "maintenance", "hash"}, ""))
pattern_Maintenance_HashKV_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v3alpha", "maintenance", "hash"}, ""))
pattern_Maintenance_Snapshot_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v3alpha", "maintenance", "snapshot"}, ""))
pattern_Maintenance_MoveLeader_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v3alpha", "maintenance", "transfer-leadership"}, ""))
@ -1402,6 +1445,8 @@ var (
forward_Maintenance_Hash_0 = runtime.ForwardResponseMessage
forward_Maintenance_HashKV_0 = runtime.ForwardResponseMessage
forward_Maintenance_Snapshot_0 = runtime.ForwardResponseStream
forward_Maintenance_MoveLeader_0 = runtime.ForwardResponseMessage

File diff suppressed because it is too large Load Diff

View File

@ -174,7 +174,7 @@ service Maintenance {
};
}
// Hash returns the hash of the local KV state for consistency checking purpose.
// Hash computes the hash of the KV's backend.
// This is designed for testing; do not use this in production when there
// are ongoing transactions.
rpc Hash(HashRequest) returns (HashResponse) {
@ -184,6 +184,14 @@ service Maintenance {
};
}
// HashKV computes the hash of all MVCC keys up to a given revision.
rpc HashKV(HashKVRequest) returns (HashKVResponse) {
option (google.api.http) = {
post: "/v3alpha/maintenance/hash"
body: "*"
};
}
// Snapshot sends a snapshot of the entire backend from a member over a stream to a client.
rpc Snapshot(SnapshotRequest) returns (stream SnapshotResponse) {
option (google.api.http) = {
@ -581,9 +589,22 @@ message CompactionResponse {
message HashRequest {
}
message HashKVRequest {
// revision is the key-value store revision for the hash operation.
int64 revision = 1;
}
message HashKVResponse {
ResponseHeader header = 1;
// hash is the hash value computed from the responding member's MVCC keys up to a given revision.
uint32 hash = 2;
// compact_revision is the compacted revision of key-value store when hash begins.
int64 compact_revision = 3;
}
message HashResponse {
ResponseHeader header = 1;
// hash is the hash value computed from the responding member's key-value store.
// hash is the hash value computed from the responding member's KV's backend.
uint32 hash = 2;
}

View File

@ -147,6 +147,55 @@ func TestV3CompactCurrentRev(t *testing.T) {
}
}
// TestV3HashKV ensures that multiple calls of HashKV on same node return same hash and compact rev.
func TestV3HashKV(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
kvc := toGRPC(clus.RandClient()).KV
mvc := toGRPC(clus.RandClient()).Maintenance
for i := 0; i < 10; i++ {
resp, err := kvc.Put(context.Background(), &pb.PutRequest{Key: []byte("foo"), Value: []byte(fmt.Sprintf("bar%d", i))})
if err != nil {
t.Fatal(err)
}
rev := resp.Header.Revision
hresp, err := mvc.HashKV(context.Background(), &pb.HashKVRequest{0})
if err != nil {
t.Fatal(err)
}
if rev != hresp.Header.Revision {
t.Fatalf("Put rev %v != HashKV rev %v", rev, hresp.Header.Revision)
}
prevHash := hresp.Hash
prevCompactRev := hresp.CompactRevision
for i := 0; i < 10; i++ {
hresp, err := mvc.HashKV(context.Background(), &pb.HashKVRequest{0})
if err != nil {
t.Fatal(err)
}
if rev != hresp.Header.Revision {
t.Fatalf("Put rev %v != HashKV rev %v", rev, hresp.Header.Revision)
}
if prevHash != hresp.Hash {
t.Fatalf("prevHash %v != Hash %v", prevHash, hresp.Hash)
}
if prevCompactRev != hresp.CompactRevision {
t.Fatalf("prevCompactRev %v != CompactRevision %v", prevHash, hresp.Hash)
}
prevHash = hresp.Hash
prevCompactRev = hresp.CompactRevision
}
}
}
func TestV3TxnTooManyOps(t *testing.T) {
defer testutil.AfterTest(t)
maxTxnOps := uint(128)

View File

@ -107,10 +107,12 @@ type KV interface {
// Write creates a write transaction.
Write() TxnWrite
// Hash retrieves the hash of KV state and revision.
// This method is designed for consistency checking purposes.
// Hash computes the hash of the KV's backend.
Hash() (hash uint32, revision int64, err error)
// HashByRev computes the hash of all MVCC revisions up to a given revision.
HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error)
// Compact frees all superseded keys with revisions less than rev.
Compact(rev int64) (<-chan struct{}, error)

View File

@ -17,6 +17,7 @@ package mvcc
import (
"encoding/binary"
"errors"
"hash/crc32"
"math"
"sync"
"sync/atomic"
@ -44,6 +45,8 @@ var (
ErrClosed = errors.New("mvcc: closed")
plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
emptyKeep = make(map[revision]struct{})
)
const (
@ -98,6 +101,12 @@ type store struct {
fifoSched schedule.Scheduler
stopc chan struct{}
// keepMu protects keep
keepMu sync.RWMutex
// keep contains all revisions <= compactMainRev to be kept for the
// ongoing compaction; nil otherwise.
keep map[revision]struct{}
}
// NewStore returns a new store. It is useful to create a store inside
@ -160,6 +169,63 @@ func (s *store) Hash() (hash uint32, revision int64, err error) {
return h, s.currentRev, err
}
func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
s.mu.Lock()
s.revMu.RLock()
compactRev, currentRev = s.compactMainRev, s.currentRev
s.revMu.RUnlock()
if rev > 0 && rev <= compactRev {
s.mu.Unlock()
return 0, 0, compactRev, ErrCompacted
} else if rev > 0 && rev > currentRev {
s.mu.Unlock()
return 0, currentRev, 0, ErrFutureRev
}
s.keepMu.Lock()
if s.keep == nil {
// ForceCommit ensures that txnRead begins after backend
// has committed all the changes from the prev completed compaction.
s.b.ForceCommit()
s.keep = emptyKeep
}
keep := s.keep
s.keepMu.Unlock()
tx := s.b.ReadTx()
tx.Lock()
defer tx.Unlock()
s.mu.Unlock()
if rev == 0 {
rev = currentRev
}
upper := revision{main: rev + 1}
lower := revision{main: compactRev + 1}
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
h.Write(keyBucketName)
err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error {
kr := bytesToRev(k)
if !upper.GreaterThan(kr) {
return nil
}
// skip revisions that are scheduled for deletion
// due to compacting; don't skip if there isn't one.
if lower.GreaterThan(kr) && len(keep) > 0 {
if _, ok := keep[kr]; !ok {
return nil
}
}
h.Write(k)
h.Write(v)
return nil
})
return h.Sum32(), currentRev, compactRev, err
}
func (s *store) Compact(rev int64) (<-chan struct{}, error) {
s.mu.Lock()
defer s.mu.Unlock()
@ -191,6 +257,9 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
s.b.ForceCommit()
keep := s.kvindex.Compact(rev)
s.keepMu.Lock()
s.keep = keep
s.keepMu.Unlock()
ch := make(chan struct{})
var j = func(ctx context.Context) {
if ctx.Err() != nil {
@ -202,6 +271,9 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
return
}
close(ch)
s.keepMu.Lock()
s.keep = nil
s.keepMu.Unlock()
}
s.fifoSched.Schedule(j)

View File

@ -22,6 +22,7 @@ import (
mrand "math/rand"
"os"
"reflect"
"sync"
"testing"
"time"
@ -510,6 +511,78 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes))
}
type hashKVResult struct {
hash uint32
compactRev int64
}
// TestHashKVWhenCompacting ensures that HashKV returns correct hash when compacting.
func TestHashKVWhenCompacting(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(b, &lease.FakeLessor{}, nil)
defer os.Remove(tmpPath)
rev := 1000
for i := 2; i <= rev; i++ {
s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
}
hashCompactc := make(chan hashKVResult, 1)
donec := make(chan struct{})
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
hash, _, compactRev, err := s.HashByRev(int64(rev))
if err != nil {
t.Fatal(err)
}
select {
case <-donec:
return
case hashCompactc <- hashKVResult{hash, compactRev}:
}
}
}()
}
go func() {
defer close(donec)
revHash := make(map[int64]uint32)
for round := 0; round < 1000; round++ {
r := <-hashCompactc
if revHash[r.compactRev] == 0 {
revHash[r.compactRev] = r.hash
}
if r.hash != revHash[r.compactRev] {
t.Fatalf("Hashes differ (current %v) != (saved %v)", r.hash, revHash[r.compactRev])
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for i := 100; i >= 0; i-- {
_, err := s.Compact(int64(rev - 1 - i))
if err != nil {
t.Fatal(err)
}
time.Sleep(10 * time.Millisecond)
}
}()
select {
case <-donec:
wg.Wait()
case <-time.After(10 * time.Second):
testutil.FatalStack(t, "timeout")
}
}
func TestTxnPut(t *testing.T) {
// assign arbitrary size
bytesN := 30

View File

@ -43,6 +43,10 @@ func (s *mts2mtc) Hash(ctx context.Context, r *pb.HashRequest, opts ...grpc.Call
return s.mts.Hash(ctx, r)
}
func (s *mts2mtc) HashKV(ctx context.Context, r *pb.HashKVRequest, opts ...grpc.CallOption) (*pb.HashKVResponse, error) {
return s.mts.HashKV(ctx, r)
}
func (s *mts2mtc) MoveLeader(ctx context.Context, r *pb.MoveLeaderRequest, opts ...grpc.CallOption) (*pb.MoveLeaderResponse, error) {
return s.mts.MoveLeader(ctx, r)
}

View File

@ -68,6 +68,11 @@ func (mp *maintenanceProxy) Hash(ctx context.Context, r *pb.HashRequest) (*pb.Ha
return pb.NewMaintenanceClient(conn).Hash(ctx, r)
}
func (mp *maintenanceProxy) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
conn := mp.client.ActiveConnection()
return pb.NewMaintenanceClient(conn).HashKV(ctx, r)
}
func (mp *maintenanceProxy) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {
conn := mp.client.ActiveConnection()
return pb.NewMaintenanceClient(conn).Alarm(ctx, r)