From 2d197ac9e8d1ef13d3b464eea1373d3246810f77 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 3 Feb 2016 12:06:58 -0800 Subject: [PATCH] *: add kv Hash method (for testing purpose) --- etcdserver/api/v3rpc/key.go | 8 + etcdserver/etcdserverpb/etcdserver.pb.go | 2 + etcdserver/etcdserverpb/rpc.pb.go | 225 +++++++++++++++++++++++ etcdserver/etcdserverpb/rpc.proto | 12 ++ etcdserver/v3demo_server.go | 9 + integration/v3_grpc_test.go | 22 +++ 6 files changed, 278 insertions(+) diff --git a/etcdserver/api/v3rpc/key.go b/etcdserver/api/v3rpc/key.go index e82397c71..ad4abe3c8 100644 --- a/etcdserver/api/v3rpc/key.go +++ b/etcdserver/api/v3rpc/key.go @@ -134,6 +134,14 @@ func (s *kvServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.Co return resp, nil } +func (s *kvServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) { + resp, err := s.kv.Hash(ctx, r) + if err != nil { + return nil, togRPCError(err) + } + return resp, nil +} + // fillInHeader populates pb.ResponseHeader from kvServer, except Revision. func (s *kvServer) fillInHeader(h *pb.ResponseHeader) { h.ClusterId = uint64(s.clusterID) diff --git a/etcdserver/etcdserverpb/etcdserver.pb.go b/etcdserver/etcdserverpb/etcdserver.pb.go index 3c78c88d2..974cea4c2 100644 --- a/etcdserver/etcdserverpb/etcdserver.pb.go +++ b/etcdserver/etcdserverpb/etcdserver.pb.go @@ -29,6 +29,8 @@ TxnResponse CompactionRequest CompactionResponse + HashRequest + HashResponse WatchRequest WatchCreateRequest WatchCancelRequest diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index abef5139e..24d6c4340 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -734,6 +734,21 @@ func (m *CompactionResponse) GetHeader() *ResponseHeader { return nil } +type HashRequest struct { +} + +func (m *HashRequest) Reset() { *m = HashRequest{} } +func (m *HashRequest) String() string { return proto.CompactTextString(m) } +func (*HashRequest) ProtoMessage() {} + +type HashResponse struct { + Hash uint32 `protobuf:"varint,1,opt,name=hash,proto3" json:"hash,omitempty"` +} + +func (m *HashResponse) Reset() { *m = HashResponse{} } +func (m *HashResponse) String() string { return proto.CompactTextString(m) } +func (*HashResponse) ProtoMessage() {} + type WatchRequest struct { // Types that are valid to be assigned to RequestUnion: // *WatchRequest_CreateRequest @@ -1112,6 +1127,8 @@ func init() { proto.RegisterType((*TxnResponse)(nil), "etcdserverpb.TxnResponse") proto.RegisterType((*CompactionRequest)(nil), "etcdserverpb.CompactionRequest") proto.RegisterType((*CompactionResponse)(nil), "etcdserverpb.CompactionResponse") + proto.RegisterType((*HashRequest)(nil), "etcdserverpb.HashRequest") + proto.RegisterType((*HashResponse)(nil), "etcdserverpb.HashResponse") proto.RegisterType((*WatchRequest)(nil), "etcdserverpb.WatchRequest") proto.RegisterType((*WatchCreateRequest)(nil), "etcdserverpb.WatchCreateRequest") proto.RegisterType((*WatchCancelRequest)(nil), "etcdserverpb.WatchCancelRequest") @@ -1162,6 +1179,10 @@ type KVClient interface { // Compact compacts the event history in etcd. User should compact the // event history periodically, or it will grow infinitely. Compact(ctx context.Context, in *CompactionRequest, opts ...grpc.CallOption) (*CompactionResponse, error) + // Hash returns the hash of local KV state for consistency checking purpose. + // This is designed for testing purpose. Do not use this in production when there + // are ongoing transactions. + Hash(ctx context.Context, in *HashRequest, opts ...grpc.CallOption) (*HashResponse, error) } type kVClient struct { @@ -1217,6 +1238,15 @@ func (c *kVClient) Compact(ctx context.Context, in *CompactionRequest, opts ...g return out, nil } +func (c *kVClient) Hash(ctx context.Context, in *HashRequest, opts ...grpc.CallOption) (*HashResponse, error) { + out := new(HashResponse) + err := grpc.Invoke(ctx, "/etcdserverpb.KV/Hash", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // Server API for KV service type KVServer interface { @@ -1238,6 +1268,10 @@ type KVServer interface { // Compact compacts the event history in etcd. User should compact the // event history periodically, or it will grow infinitely. Compact(context.Context, *CompactionRequest) (*CompactionResponse, error) + // Hash returns the hash of local KV state for consistency checking purpose. + // This is designed for testing purpose. Do not use this in production when there + // are ongoing transactions. + Hash(context.Context, *HashRequest) (*HashResponse, error) } func RegisterKVServer(s *grpc.Server, srv KVServer) { @@ -1304,6 +1338,18 @@ func _KV_Compact_Handler(srv interface{}, ctx context.Context, dec func(interfac return out, nil } +func _KV_Hash_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { + in := new(HashRequest) + if err := dec(in); err != nil { + return nil, err + } + out, err := srv.(KVServer).Hash(ctx, in) + if err != nil { + return nil, err + } + return out, nil +} + var _KV_serviceDesc = grpc.ServiceDesc{ ServiceName: "etcdserverpb.KV", HandlerType: (*KVServer)(nil), @@ -1328,6 +1374,10 @@ var _KV_serviceDesc = grpc.ServiceDesc{ MethodName: "Compact", Handler: _KV_Compact_Handler, }, + { + MethodName: "Hash", + Handler: _KV_Hash_Handler, + }, }, Streams: []grpc.StreamDesc{}, } @@ -2377,6 +2427,47 @@ func (m *CompactionResponse) MarshalTo(data []byte) (int, error) { return i, nil } +func (m *HashRequest) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *HashRequest) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func (m *HashResponse) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *HashResponse) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Hash != 0 { + data[i] = 0x8 + i++ + i = encodeVarintRpc(data, i, uint64(m.Hash)) + } + return i, nil +} + func (m *WatchRequest) Marshal() (data []byte, err error) { size := m.Size() data = make([]byte, size) @@ -3391,6 +3482,21 @@ func (m *CompactionResponse) Size() (n int) { return n } +func (m *HashRequest) Size() (n int) { + var l int + _ = l + return n +} + +func (m *HashResponse) Size() (n int) { + var l int + _ = l + if m.Hash != 0 { + n += 1 + sovRpc(uint64(m.Hash)) + } + return n +} + func (m *WatchRequest) Size() (n int) { var l int _ = l @@ -5469,6 +5575,125 @@ func (m *CompactionResponse) Unmarshal(data []byte) error { } return nil } +func (m *HashRequest) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: HashRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: HashRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *HashResponse) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: HashResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: HashResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType) + } + m.Hash = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Hash |= (uint32(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipRpc(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *WatchRequest) Unmarshal(data []byte) error { l := len(data) iNdEx := 0 diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index 7d75d2a0b..4d4d1af6c 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -30,6 +30,11 @@ service KV { // Compact compacts the event history in etcd. User should compact the // event history periodically, or it will grow infinitely. rpc Compact(CompactionRequest) returns (CompactionResponse) {} + + // Hash returns the hash of local KV state for consistency checking purpose. + // This is designed for testing purpose. Do not use this in production when there + // are ongoing transactions. + rpc Hash(HashRequest) returns (HashResponse) {} } service Watch { @@ -229,6 +234,13 @@ message CompactionResponse { ResponseHeader header = 1; } +message HashRequest { +} + +message HashResponse { + uint32 hash = 1; +} + message WatchRequest { oneof request_union { WatchCreateRequest create_request = 1; diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index 8b1296856..2a825d8ac 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -35,6 +35,7 @@ type RaftKV interface { DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) + Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) } type Lessor interface { @@ -88,6 +89,14 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb. return result.resp.(*pb.CompactionResponse), result.err } +func (s *EtcdServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) { + h, err := s.be.Hash() + if err != nil { + return nil, err + } + return &pb.HashResponse{Hash: h}, nil +} + func (s *EtcdServer) LeaseCreate(ctx context.Context, r *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) { // no id given? choose one for r.ID == int64(lease.NoLease) { diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 73e215cdc..6d8628353 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -413,6 +413,28 @@ func TestV3TxnInvaildRange(t *testing.T) { } } +// TestV3Hash tests hash. +func TestV3Hash(t *testing.T) { + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + kvc := clus.RandClient().KV + preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} + + for i := 0; i < 3; i++ { + _, err := kvc.Put(context.Background(), preq) + if err != nil { + t.Fatalf("couldn't put key (%v)", err) + } + } + + resp, err := kvc.Hash(context.Background(), &pb.HashRequest{}) + if err != nil || resp.Hash == 0 { + t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash) + } +} + // TestV3WatchFromCurrentRevision tests Watch APIs from current revision. func TestV3WatchFromCurrentRevision(t *testing.T) { defer testutil.AfterTest(t)