diff --git a/etcdserver/api/v3rpc/header.go b/etcdserver/api/v3rpc/header.go new file mode 100644 index 000000000..dcb559219 --- /dev/null +++ b/etcdserver/api/v3rpc/header.go @@ -0,0 +1,43 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v3rpc + +import ( + "github.com/coreos/etcd/etcdserver" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" +) + +type header struct { + clusterID int64 + memberID int64 + raftTimer etcdserver.RaftTimer + rev func() int64 +} + +func newHeader(s *etcdserver.EtcdServer) header { + return header{ + clusterID: int64(s.Cluster().ID()), + memberID: int64(s.ID()), + raftTimer: s, + rev: func() int64 { return s.KV().Rev() }, + } +} + +// fill populates pb.ResponseHeader using etcdserver information +func (h *header) fill(rh *pb.ResponseHeader) { + rh.ClusterId = uint64(h.clusterID) + rh.MemberId = uint64(h.memberID) + rh.RaftTerm = h.raftTimer.Term() +} diff --git a/etcdserver/api/v3rpc/key.go b/etcdserver/api/v3rpc/key.go index 5c6c07bf2..33039f15b 100644 --- a/etcdserver/api/v3rpc/key.go +++ b/etcdserver/api/v3rpc/key.go @@ -38,20 +38,12 @@ var ( ) type kvServer struct { - clusterID int64 - memberID int64 - raftTimer etcdserver.RaftTimer - - kv etcdserver.RaftKV + hdr header + kv etcdserver.RaftKV } func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer { - return &kvServer{ - clusterID: int64(s.Cluster().ID()), - memberID: int64(s.ID()), - raftTimer: s, - kv: s, - } + return &kvServer{hdr: newHeader(s), kv: s} } func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { @@ -67,7 +59,7 @@ func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResp if resp.Header == nil { plog.Panic("unexpected nil resp.Header") } - s.fillInHeader(resp.Header) + s.hdr.fill(resp.Header) return resp, err } @@ -84,7 +76,7 @@ func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, if resp.Header == nil { plog.Panic("unexpected nil resp.Header") } - s.fillInHeader(resp.Header) + s.hdr.fill(resp.Header) return resp, err } @@ -101,7 +93,7 @@ func (s *kvServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (* if resp.Header == nil { plog.Panic("unexpected nil resp.Header") } - s.fillInHeader(resp.Header) + s.hdr.fill(resp.Header) return resp, err } @@ -118,7 +110,7 @@ func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, if resp.Header == nil { plog.Panic("unexpected nil resp.Header") } - s.fillInHeader(resp.Header) + s.hdr.fill(resp.Header) return resp, err } @@ -131,26 +123,10 @@ func (s *kvServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.Co if resp.Header == nil { plog.Panic("unexpected nil resp.Header") } - s.fillInHeader(resp.Header) + s.hdr.fill(resp.Header) return resp, nil } -func (s *kvServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) { - resp, err := s.kv.Hash(ctx, r) - if err != nil { - return nil, togRPCError(err) - } - s.fillInHeader(resp.Header) - return resp, nil -} - -// fillInHeader populates pb.ResponseHeader from kvServer, except Revision. -func (s *kvServer) fillInHeader(h *pb.ResponseHeader) { - h.ClusterId = uint64(s.clusterID) - h.MemberId = uint64(s.memberID) - h.RaftTerm = s.raftTimer.Term() -} - func checkRangeRequest(r *pb.RangeRequest) error { if len(r.Key) == 0 { return rpctypes.ErrEmptyKey diff --git a/etcdserver/api/v3rpc/maintenance.go b/etcdserver/api/v3rpc/maintenance.go index 8c3664ffa..da95cfed9 100644 --- a/etcdserver/api/v3rpc/maintenance.go +++ b/etcdserver/api/v3rpc/maintenance.go @@ -30,12 +30,13 @@ type Alarmer interface { } type maintenanceServer struct { - bg BackendGetter - a Alarmer + bg BackendGetter + a Alarmer + hdr header } func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer { - return &maintenanceServer{bg: s, a: s} + return &maintenanceServer{bg: s, a: s, hdr: newHeader(s)} } func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) { @@ -49,6 +50,16 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe return &pb.DefragmentResponse{}, nil } +func (s *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) { + h, err := s.bg.Backend().Hash() + if err != nil { + return nil, togRPCError(err) + } + resp := &pb.HashResponse{Header: &pb.ResponseHeader{Revision: s.hdr.rev()}, Hash: h} + s.hdr.fill(resp.Header) + return resp, nil +} + func (ms *maintenanceServer) Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error) { plog.Warningf("alarming %+v", ar) return ms.a.Alarm(ctx, ar) diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 4b4f0f28c..827288f76 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -94,7 +94,7 @@ func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, err error ) if txnID != noTxn { - rev, err = a.s.getKV().TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease)) + rev, err = a.s.KV().TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease)) if err != nil { return nil, err } @@ -105,7 +105,7 @@ func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, return nil, lease.ErrLeaseNotFound } } - rev = a.s.getKV().Put(p.Key, p.Value, leaseID) + rev = a.s.KV().Put(p.Key, p.Value, leaseID) } resp.Header.Revision = rev return resp, nil @@ -126,12 +126,12 @@ func (a *applierV3backend) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) ( } if txnID != noTxn { - n, rev, err = a.s.getKV().TxnDeleteRange(txnID, dr.Key, dr.RangeEnd) + n, rev, err = a.s.KV().TxnDeleteRange(txnID, dr.Key, dr.RangeEnd) if err != nil { return nil, err } } else { - n, rev = a.s.getKV().DeleteRange(dr.Key, dr.RangeEnd) + n, rev = a.s.KV().DeleteRange(dr.Key, dr.RangeEnd) } resp.Deleted = n @@ -164,12 +164,12 @@ func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResp } if txnID != noTxn { - kvs, rev, err = a.s.getKV().TxnRange(txnID, r.Key, r.RangeEnd, limit, r.Revision) + kvs, rev, err = a.s.KV().TxnRange(txnID, r.Key, r.RangeEnd, limit, r.Revision) if err != nil { return nil, err } } else { - kvs, rev, err = a.s.getKV().Range(r.Key, r.RangeEnd, limit, r.Revision) + kvs, rev, err = a.s.KV().Range(r.Key, r.RangeEnd, limit, r.Revision) if err != nil { return nil, err } @@ -235,9 +235,9 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { // When executing the operations of txn, we need to hold the txn lock. // So the reader will not see any intermediate results. - txnID := a.s.getKV().TxnBegin() + txnID := a.s.KV().TxnBegin() defer func() { - err := a.s.getKV().TxnEnd(txnID) + err := a.s.KV().TxnEnd(txnID) if err != nil { panic(fmt.Sprint("unexpected error when closing txn", txnID)) } @@ -264,7 +264,7 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { // It returns the revision at which the comparison happens. If the comparison // succeeds, the it returns true. Otherwise it returns false. func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) { - ckvs, rev, err := a.s.getKV().Range(c.Key, nil, 1, 0) + ckvs, rev, err := a.s.KV().Range(c.Key, nil, 1, 0) if err != nil { if err == dstorage.ErrTxnIDMismatch { panic("unexpected txn ID mismatch error") @@ -365,12 +365,12 @@ func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestUnion) *pb.R func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, error) { resp := &pb.CompactionResponse{} resp.Header = &pb.ResponseHeader{} - err := a.s.getKV().Compact(compaction.Revision) + err := a.s.KV().Compact(compaction.Revision) if err != nil { return nil, err } // get the current revision. which key to get is not important. - _, resp.Header.Revision, _ = a.s.getKV().Range([]byte("compaction"), nil, 1, 0) + _, resp.Header.Revision, _ = a.s.KV().Range([]byte("compaction"), nil, 1, 0) return resp, err } @@ -559,10 +559,10 @@ func (a *applierV3backend) checkRequestRange(reqs []*pb.RequestUnion) error { continue } - if greq.Revision > a.s.getKV().Rev() { + if greq.Revision > a.s.KV().Rev() { return dstorage.ErrFutureRev } - if greq.Revision < a.s.getKV().FirstRev() { + if greq.Revision < a.s.KV().FirstRev() { return dstorage.ErrCompacted } } diff --git a/etcdserver/etcdserverpb/rpc.pb.go b/etcdserver/etcdserverpb/rpc.pb.go index d9e54728c..a52f7a2ba 100644 --- a/etcdserver/etcdserverpb/rpc.pb.go +++ b/etcdserver/etcdserverpb/rpc.pb.go @@ -1657,10 +1657,6 @@ type KVClient interface { // Compact compacts the event history in etcd. User should compact the // event history periodically, or it will grow infinitely. Compact(ctx context.Context, in *CompactionRequest, opts ...grpc.CallOption) (*CompactionResponse, error) - // Hash returns the hash of local KV state for consistency checking purpose. - // This is designed for testing purpose. Do not use this in production when there - // are ongoing transactions. - Hash(ctx context.Context, in *HashRequest, opts ...grpc.CallOption) (*HashResponse, error) } type kVClient struct { @@ -1716,15 +1712,6 @@ func (c *kVClient) Compact(ctx context.Context, in *CompactionRequest, opts ...g return out, nil } -func (c *kVClient) Hash(ctx context.Context, in *HashRequest, opts ...grpc.CallOption) (*HashResponse, error) { - out := new(HashResponse) - err := grpc.Invoke(ctx, "/etcdserverpb.KV/Hash", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - // Server API for KV service type KVServer interface { @@ -1746,10 +1733,6 @@ type KVServer interface { // Compact compacts the event history in etcd. User should compact the // event history periodically, or it will grow infinitely. Compact(context.Context, *CompactionRequest) (*CompactionResponse, error) - // Hash returns the hash of local KV state for consistency checking purpose. - // This is designed for testing purpose. Do not use this in production when there - // are ongoing transactions. - Hash(context.Context, *HashRequest) (*HashResponse, error) } func RegisterKVServer(s *grpc.Server, srv KVServer) { @@ -1816,18 +1799,6 @@ func _KV_Compact_Handler(srv interface{}, ctx context.Context, dec func(interfac return out, nil } -func _KV_Hash_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { - in := new(HashRequest) - if err := dec(in); err != nil { - return nil, err - } - out, err := srv.(KVServer).Hash(ctx, in) - if err != nil { - return nil, err - } - return out, nil -} - var _KV_serviceDesc = grpc.ServiceDesc{ ServiceName: "etcdserverpb.KV", HandlerType: (*KVServer)(nil), @@ -1852,10 +1823,6 @@ var _KV_serviceDesc = grpc.ServiceDesc{ MethodName: "Compact", Handler: _KV_Compact_Handler, }, - { - MethodName: "Hash", - Handler: _KV_Hash_Handler, - }, }, Streams: []grpc.StreamDesc{}, } @@ -2274,10 +2241,13 @@ var _Cluster_serviceDesc = grpc.ServiceDesc{ // Client API for Maintenance service type MaintenanceClient interface { - // TODO: move Hash from kv to Maintenance - Defragment(ctx context.Context, in *DefragmentRequest, opts ...grpc.CallOption) (*DefragmentResponse, error) // Alarm activates, deactivates, and queries alarms regarding cluster health. Alarm(ctx context.Context, in *AlarmRequest, opts ...grpc.CallOption) (*AlarmResponse, error) + Defragment(ctx context.Context, in *DefragmentRequest, opts ...grpc.CallOption) (*DefragmentResponse, error) + // Hash returns the hash of the local KV state for consistency checking purpose. + // This is designed for testing; do not use this in production when there + // are ongoing transactions. + Hash(ctx context.Context, in *HashRequest, opts ...grpc.CallOption) (*HashResponse, error) } type maintenanceClient struct { @@ -2288,15 +2258,6 @@ func NewMaintenanceClient(cc *grpc.ClientConn) MaintenanceClient { return &maintenanceClient{cc} } -func (c *maintenanceClient) Defragment(ctx context.Context, in *DefragmentRequest, opts ...grpc.CallOption) (*DefragmentResponse, error) { - out := new(DefragmentResponse) - err := grpc.Invoke(ctx, "/etcdserverpb.Maintenance/Defragment", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - func (c *maintenanceClient) Alarm(ctx context.Context, in *AlarmRequest, opts ...grpc.CallOption) (*AlarmResponse, error) { out := new(AlarmResponse) err := grpc.Invoke(ctx, "/etcdserverpb.Maintenance/Alarm", in, out, c.cc, opts...) @@ -2306,31 +2267,40 @@ func (c *maintenanceClient) Alarm(ctx context.Context, in *AlarmRequest, opts .. return out, nil } -// Server API for Maintenance service - -type MaintenanceServer interface { - // TODO: move Hash from kv to Maintenance - Defragment(context.Context, *DefragmentRequest) (*DefragmentResponse, error) - // Alarm activates, deactivates, and queries alarms regarding cluster health. - Alarm(context.Context, *AlarmRequest) (*AlarmResponse, error) -} - -func RegisterMaintenanceServer(s *grpc.Server, srv MaintenanceServer) { - s.RegisterService(&_Maintenance_serviceDesc, srv) -} - -func _Maintenance_Defragment_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { - in := new(DefragmentRequest) - if err := dec(in); err != nil { - return nil, err - } - out, err := srv.(MaintenanceServer).Defragment(ctx, in) +func (c *maintenanceClient) Defragment(ctx context.Context, in *DefragmentRequest, opts ...grpc.CallOption) (*DefragmentResponse, error) { + out := new(DefragmentResponse) + err := grpc.Invoke(ctx, "/etcdserverpb.Maintenance/Defragment", in, out, c.cc, opts...) if err != nil { return nil, err } return out, nil } +func (c *maintenanceClient) Hash(ctx context.Context, in *HashRequest, opts ...grpc.CallOption) (*HashResponse, error) { + out := new(HashResponse) + err := grpc.Invoke(ctx, "/etcdserverpb.Maintenance/Hash", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Maintenance service + +type MaintenanceServer interface { + // Alarm activates, deactivates, and queries alarms regarding cluster health. + Alarm(context.Context, *AlarmRequest) (*AlarmResponse, error) + Defragment(context.Context, *DefragmentRequest) (*DefragmentResponse, error) + // Hash returns the hash of the local KV state for consistency checking purpose. + // This is designed for testing; do not use this in production when there + // are ongoing transactions. + Hash(context.Context, *HashRequest) (*HashResponse, error) +} + +func RegisterMaintenanceServer(s *grpc.Server, srv MaintenanceServer) { + s.RegisterService(&_Maintenance_serviceDesc, srv) +} + func _Maintenance_Alarm_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { in := new(AlarmRequest) if err := dec(in); err != nil { @@ -2343,17 +2313,45 @@ func _Maintenance_Alarm_Handler(srv interface{}, ctx context.Context, dec func(i return out, nil } +func _Maintenance_Defragment_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { + in := new(DefragmentRequest) + if err := dec(in); err != nil { + return nil, err + } + out, err := srv.(MaintenanceServer).Defragment(ctx, in) + if err != nil { + return nil, err + } + return out, nil +} + +func _Maintenance_Hash_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { + in := new(HashRequest) + if err := dec(in); err != nil { + return nil, err + } + out, err := srv.(MaintenanceServer).Hash(ctx, in) + if err != nil { + return nil, err + } + return out, nil +} + var _Maintenance_serviceDesc = grpc.ServiceDesc{ ServiceName: "etcdserverpb.Maintenance", HandlerType: (*MaintenanceServer)(nil), Methods: []grpc.MethodDesc{ + { + MethodName: "Alarm", + Handler: _Maintenance_Alarm_Handler, + }, { MethodName: "Defragment", Handler: _Maintenance_Defragment_Handler, }, { - MethodName: "Alarm", - Handler: _Maintenance_Alarm_Handler, + MethodName: "Hash", + Handler: _Maintenance_Hash_Handler, }, }, Streams: []grpc.StreamDesc{}, diff --git a/etcdserver/etcdserverpb/rpc.proto b/etcdserver/etcdserverpb/rpc.proto index cbed75a9b..aa0d01d5f 100644 --- a/etcdserver/etcdserverpb/rpc.proto +++ b/etcdserver/etcdserverpb/rpc.proto @@ -30,11 +30,6 @@ service KV { // Compact compacts the event history in etcd. User should compact the // event history periodically, or it will grow infinitely. rpc Compact(CompactionRequest) returns (CompactionResponse) {} - - // Hash returns the hash of local KV state for consistency checking purpose. - // This is designed for testing purpose. Do not use this in production when there - // are ongoing transactions. - rpc Hash(HashRequest) returns (HashResponse) {} } service Watch { @@ -77,11 +72,15 @@ service Cluster { } service Maintenance { - // TODO: move Hash from kv to Maintenance - rpc Defragment(DefragmentRequest) returns (DefragmentResponse) {} - // Alarm activates, deactivates, and queries alarms regarding cluster health. rpc Alarm(AlarmRequest) returns (AlarmResponse) {} + + rpc Defragment(DefragmentRequest) returns (DefragmentResponse) {} + + // Hash returns the hash of the local KV state for consistency checking purpose. + // This is designed for testing; do not use this in production when there + // are ongoing transactions. + rpc Hash(HashRequest) returns (HashResponse) {} } service Auth { diff --git a/etcdserver/server.go b/etcdserver/server.go index 881df64d1..dab41d0c5 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -1195,7 +1195,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { } // commit v3 storage because WAL file before snapshot index // could be removed after SaveSnap. - s.getKV().Commit() + s.KV().Commit() // SaveSnap saves the snapshot and releases the locked wal files // to the snapshot index. if err = s.r.storage.SaveSnap(snap); err != nil { @@ -1334,7 +1334,7 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { } } -func (s *EtcdServer) getKV() dstorage.ConsistentWatchableKV { return s.kv } +func (s *EtcdServer) KV() dstorage.ConsistentWatchableKV { return s.kv } func (s *EtcdServer) Backend() backend.Backend { s.bemu.Lock() defer s.bemu.Unlock() diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index 395ed749d..95509a99d 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -38,7 +38,6 @@ type RaftKV interface { DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) - Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) } type Lessor interface { @@ -109,14 +108,6 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb. return resp, result.err } -func (s *EtcdServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) { - h, err := s.be.Hash() - if err != nil { - return nil, err - } - return &pb.HashResponse{Header: &pb.ResponseHeader{Revision: s.kv.Rev()}, Hash: h}, nil -} - func (s *EtcdServer) LeaseCreate(ctx context.Context, r *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) { // no id given? choose one for r.ID == int64(lease.NoLease) { @@ -225,6 +216,4 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern } // Watchable returns a watchable interface attached to the etcdserver. -func (s *EtcdServer) Watchable() dstorage.Watchable { - return s.getKV() -} +func (s *EtcdServer) Watchable() dstorage.Watchable { return s.KV() } diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 57de58507..c19d193cc 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -440,7 +440,10 @@ func TestV3Hash(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) defer clus.Terminate(t) - kvc := toGRPC(clus.RandClient()).KV + cli := clus.RandClient() + kvc := toGRPC(cli).KV + m := toGRPC(cli).Maintenance + preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} for i := 0; i < 3; i++ { @@ -450,7 +453,7 @@ func TestV3Hash(t *testing.T) { } } - resp, err := kvc.Hash(context.Background(), &pb.HashRequest{}) + resp, err := m.Hash(context.Background(), &pb.HashRequest{}) if err != nil || resp.Hash == 0 { t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash) } diff --git a/tools/functional-tester/etcd-tester/cluster.go b/tools/functional-tester/etcd-tester/cluster.go index 81a56af61..d12c754e0 100644 --- a/tools/functional-tester/etcd-tester/cluster.go +++ b/tools/functional-tester/etcd-tester/cluster.go @@ -303,9 +303,9 @@ func (c *cluster) getRevisionHash() (map[string]int64, map[string]int64, error) if err != nil { return nil, nil, err } - kvc := pb.NewKVClient(conn) + m := pb.NewMaintenanceClient(conn) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - resp, err := kvc.Hash(ctx, &pb.HashRequest{}) + resp, err := m.Hash(ctx, &pb.HashRequest{}) cancel() conn.Close() if err != nil {