*: add kv Hash method (for testing purpose)

This commit is contained in:
Gyu-Ho Lee 2016-02-03 12:06:58 -08:00
parent 9bfe617728
commit 2d197ac9e8
6 changed files with 278 additions and 0 deletions

View File

@ -134,6 +134,14 @@ func (s *kvServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.Co
return resp, nil
}
func (s *kvServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
resp, err := s.kv.Hash(ctx, r)
if err != nil {
return nil, togRPCError(err)
}
return resp, nil
}
// fillInHeader populates pb.ResponseHeader from kvServer, except Revision.
func (s *kvServer) fillInHeader(h *pb.ResponseHeader) {
h.ClusterId = uint64(s.clusterID)

View File

@ -29,6 +29,8 @@
TxnResponse
CompactionRequest
CompactionResponse
HashRequest
HashResponse
WatchRequest
WatchCreateRequest
WatchCancelRequest

View File

@ -734,6 +734,21 @@ func (m *CompactionResponse) GetHeader() *ResponseHeader {
return nil
}
type HashRequest struct {
}
func (m *HashRequest) Reset() { *m = HashRequest{} }
func (m *HashRequest) String() string { return proto.CompactTextString(m) }
func (*HashRequest) ProtoMessage() {}
type HashResponse struct {
Hash uint32 `protobuf:"varint,1,opt,name=hash,proto3" json:"hash,omitempty"`
}
func (m *HashResponse) Reset() { *m = HashResponse{} }
func (m *HashResponse) String() string { return proto.CompactTextString(m) }
func (*HashResponse) ProtoMessage() {}
type WatchRequest struct {
// Types that are valid to be assigned to RequestUnion:
// *WatchRequest_CreateRequest
@ -1112,6 +1127,8 @@ func init() {
proto.RegisterType((*TxnResponse)(nil), "etcdserverpb.TxnResponse")
proto.RegisterType((*CompactionRequest)(nil), "etcdserverpb.CompactionRequest")
proto.RegisterType((*CompactionResponse)(nil), "etcdserverpb.CompactionResponse")
proto.RegisterType((*HashRequest)(nil), "etcdserverpb.HashRequest")
proto.RegisterType((*HashResponse)(nil), "etcdserverpb.HashResponse")
proto.RegisterType((*WatchRequest)(nil), "etcdserverpb.WatchRequest")
proto.RegisterType((*WatchCreateRequest)(nil), "etcdserverpb.WatchCreateRequest")
proto.RegisterType((*WatchCancelRequest)(nil), "etcdserverpb.WatchCancelRequest")
@ -1162,6 +1179,10 @@ type KVClient interface {
// Compact compacts the event history in etcd. User should compact the
// event history periodically, or it will grow infinitely.
Compact(ctx context.Context, in *CompactionRequest, opts ...grpc.CallOption) (*CompactionResponse, error)
// Hash returns the hash of local KV state for consistency checking purpose.
// This is designed for testing purpose. Do not use this in production when there
// are ongoing transactions.
Hash(ctx context.Context, in *HashRequest, opts ...grpc.CallOption) (*HashResponse, error)
}
type kVClient struct {
@ -1217,6 +1238,15 @@ func (c *kVClient) Compact(ctx context.Context, in *CompactionRequest, opts ...g
return out, nil
}
func (c *kVClient) Hash(ctx context.Context, in *HashRequest, opts ...grpc.CallOption) (*HashResponse, error) {
out := new(HashResponse)
err := grpc.Invoke(ctx, "/etcdserverpb.KV/Hash", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for KV service
type KVServer interface {
@ -1238,6 +1268,10 @@ type KVServer interface {
// Compact compacts the event history in etcd. User should compact the
// event history periodically, or it will grow infinitely.
Compact(context.Context, *CompactionRequest) (*CompactionResponse, error)
// Hash returns the hash of local KV state for consistency checking purpose.
// This is designed for testing purpose. Do not use this in production when there
// are ongoing transactions.
Hash(context.Context, *HashRequest) (*HashResponse, error)
}
func RegisterKVServer(s *grpc.Server, srv KVServer) {
@ -1304,6 +1338,18 @@ func _KV_Compact_Handler(srv interface{}, ctx context.Context, dec func(interfac
return out, nil
}
func _KV_Hash_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(HashRequest)
if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(KVServer).Hash(ctx, in)
if err != nil {
return nil, err
}
return out, nil
}
var _KV_serviceDesc = grpc.ServiceDesc{
ServiceName: "etcdserverpb.KV",
HandlerType: (*KVServer)(nil),
@ -1328,6 +1374,10 @@ var _KV_serviceDesc = grpc.ServiceDesc{
MethodName: "Compact",
Handler: _KV_Compact_Handler,
},
{
MethodName: "Hash",
Handler: _KV_Hash_Handler,
},
},
Streams: []grpc.StreamDesc{},
}
@ -2377,6 +2427,47 @@ func (m *CompactionResponse) MarshalTo(data []byte) (int, error) {
return i, nil
}
func (m *HashRequest) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
if err != nil {
return nil, err
}
return data[:n], nil
}
func (m *HashRequest) MarshalTo(data []byte) (int, error) {
var i int
_ = i
var l int
_ = l
return i, nil
}
func (m *HashResponse) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
if err != nil {
return nil, err
}
return data[:n], nil
}
func (m *HashResponse) MarshalTo(data []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.Hash != 0 {
data[i] = 0x8
i++
i = encodeVarintRpc(data, i, uint64(m.Hash))
}
return i, nil
}
func (m *WatchRequest) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
@ -3391,6 +3482,21 @@ func (m *CompactionResponse) Size() (n int) {
return n
}
func (m *HashRequest) Size() (n int) {
var l int
_ = l
return n
}
func (m *HashResponse) Size() (n int) {
var l int
_ = l
if m.Hash != 0 {
n += 1 + sovRpc(uint64(m.Hash))
}
return n
}
func (m *WatchRequest) Size() (n int) {
var l int
_ = l
@ -5469,6 +5575,125 @@ func (m *CompactionResponse) Unmarshal(data []byte) error {
}
return nil
}
func (m *HashRequest) Unmarshal(data []byte) error {
l := len(data)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: HashRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: HashRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
default:
iNdEx = preIndex
skippy, err := skipRpc(data[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthRpc
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *HashResponse) Unmarshal(data []byte) error {
l := len(data)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: HashResponse: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: HashResponse: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType)
}
m.Hash = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowRpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.Hash |= (uint32(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipRpc(data[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthRpc
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *WatchRequest) Unmarshal(data []byte) error {
l := len(data)
iNdEx := 0

View File

@ -30,6 +30,11 @@ service KV {
// Compact compacts the event history in etcd. User should compact the
// event history periodically, or it will grow infinitely.
rpc Compact(CompactionRequest) returns (CompactionResponse) {}
// Hash returns the hash of local KV state for consistency checking purpose.
// This is designed for testing purpose. Do not use this in production when there
// are ongoing transactions.
rpc Hash(HashRequest) returns (HashResponse) {}
}
service Watch {
@ -229,6 +234,13 @@ message CompactionResponse {
ResponseHeader header = 1;
}
message HashRequest {
}
message HashResponse {
uint32 hash = 1;
}
message WatchRequest {
oneof request_union {
WatchCreateRequest create_request = 1;

View File

@ -35,6 +35,7 @@ type RaftKV interface {
DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error)
Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error)
}
type Lessor interface {
@ -88,6 +89,14 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.
return result.resp.(*pb.CompactionResponse), result.err
}
func (s *EtcdServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
h, err := s.be.Hash()
if err != nil {
return nil, err
}
return &pb.HashResponse{Hash: h}, nil
}
func (s *EtcdServer) LeaseCreate(ctx context.Context, r *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) {
// no id given? choose one
for r.ID == int64(lease.NoLease) {

View File

@ -413,6 +413,28 @@ func TestV3TxnInvaildRange(t *testing.T) {
}
}
// TestV3Hash tests hash.
func TestV3Hash(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
kvc := clus.RandClient().KV
preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
for i := 0; i < 3; i++ {
_, err := kvc.Put(context.Background(), preq)
if err != nil {
t.Fatalf("couldn't put key (%v)", err)
}
}
resp, err := kvc.Hash(context.Background(), &pb.HashRequest{})
if err != nil || resp.Hash == 0 {
t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
}
}
// TestV3WatchFromCurrentRevision tests Watch APIs from current revision.
func TestV3WatchFromCurrentRevision(t *testing.T) {
defer testutil.AfterTest(t)