v3rpc: move Hash RPC to Maintenance service

This commit is contained in:
Anthony Romano 2016-03-28 17:15:58 -07:00
parent 6e3a0948e4
commit 3fbacf4be2
10 changed files with 158 additions and 139 deletions

View File

@ -0,0 +1,43 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v3rpc
import (
"github.com/coreos/etcd/etcdserver"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
)
type header struct {
clusterID int64
memberID int64
raftTimer etcdserver.RaftTimer
rev func() int64
}
func newHeader(s *etcdserver.EtcdServer) header {
return header{
clusterID: int64(s.Cluster().ID()),
memberID: int64(s.ID()),
raftTimer: s,
rev: func() int64 { return s.KV().Rev() },
}
}
// fill populates pb.ResponseHeader using etcdserver information
func (h *header) fill(rh *pb.ResponseHeader) {
rh.ClusterId = uint64(h.clusterID)
rh.MemberId = uint64(h.memberID)
rh.RaftTerm = h.raftTimer.Term()
}

View File

@ -38,20 +38,12 @@ var (
) )
type kvServer struct { type kvServer struct {
clusterID int64 hdr header
memberID int64 kv etcdserver.RaftKV
raftTimer etcdserver.RaftTimer
kv etcdserver.RaftKV
} }
func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer { func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {
return &kvServer{ return &kvServer{hdr: newHeader(s), kv: s}
clusterID: int64(s.Cluster().ID()),
memberID: int64(s.ID()),
raftTimer: s,
kv: s,
}
} }
func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
@ -67,7 +59,7 @@ func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResp
if resp.Header == nil { if resp.Header == nil {
plog.Panic("unexpected nil resp.Header") plog.Panic("unexpected nil resp.Header")
} }
s.fillInHeader(resp.Header) s.hdr.fill(resp.Header)
return resp, err return resp, err
} }
@ -84,7 +76,7 @@ func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse,
if resp.Header == nil { if resp.Header == nil {
plog.Panic("unexpected nil resp.Header") plog.Panic("unexpected nil resp.Header")
} }
s.fillInHeader(resp.Header) s.hdr.fill(resp.Header)
return resp, err return resp, err
} }
@ -101,7 +93,7 @@ func (s *kvServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*
if resp.Header == nil { if resp.Header == nil {
plog.Panic("unexpected nil resp.Header") plog.Panic("unexpected nil resp.Header")
} }
s.fillInHeader(resp.Header) s.hdr.fill(resp.Header)
return resp, err return resp, err
} }
@ -118,7 +110,7 @@ func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse,
if resp.Header == nil { if resp.Header == nil {
plog.Panic("unexpected nil resp.Header") plog.Panic("unexpected nil resp.Header")
} }
s.fillInHeader(resp.Header) s.hdr.fill(resp.Header)
return resp, err return resp, err
} }
@ -131,26 +123,10 @@ func (s *kvServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.Co
if resp.Header == nil { if resp.Header == nil {
plog.Panic("unexpected nil resp.Header") plog.Panic("unexpected nil resp.Header")
} }
s.fillInHeader(resp.Header) s.hdr.fill(resp.Header)
return resp, nil return resp, nil
} }
func (s *kvServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
resp, err := s.kv.Hash(ctx, r)
if err != nil {
return nil, togRPCError(err)
}
s.fillInHeader(resp.Header)
return resp, nil
}
// fillInHeader populates pb.ResponseHeader from kvServer, except Revision.
func (s *kvServer) fillInHeader(h *pb.ResponseHeader) {
h.ClusterId = uint64(s.clusterID)
h.MemberId = uint64(s.memberID)
h.RaftTerm = s.raftTimer.Term()
}
func checkRangeRequest(r *pb.RangeRequest) error { func checkRangeRequest(r *pb.RangeRequest) error {
if len(r.Key) == 0 { if len(r.Key) == 0 {
return rpctypes.ErrEmptyKey return rpctypes.ErrEmptyKey

View File

@ -30,12 +30,13 @@ type Alarmer interface {
} }
type maintenanceServer struct { type maintenanceServer struct {
bg BackendGetter bg BackendGetter
a Alarmer a Alarmer
hdr header
} }
func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer { func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
return &maintenanceServer{bg: s, a: s} return &maintenanceServer{bg: s, a: s, hdr: newHeader(s)}
} }
func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) { func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
@ -49,6 +50,16 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe
return &pb.DefragmentResponse{}, nil return &pb.DefragmentResponse{}, nil
} }
func (s *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
h, err := s.bg.Backend().Hash()
if err != nil {
return nil, togRPCError(err)
}
resp := &pb.HashResponse{Header: &pb.ResponseHeader{Revision: s.hdr.rev()}, Hash: h}
s.hdr.fill(resp.Header)
return resp, nil
}
func (ms *maintenanceServer) Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error) { func (ms *maintenanceServer) Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
plog.Warningf("alarming %+v", ar) plog.Warningf("alarming %+v", ar)
return ms.a.Alarm(ctx, ar) return ms.a.Alarm(ctx, ar)

View File

@ -94,7 +94,7 @@ func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse,
err error err error
) )
if txnID != noTxn { if txnID != noTxn {
rev, err = a.s.getKV().TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease)) rev, err = a.s.KV().TxnPut(txnID, p.Key, p.Value, lease.LeaseID(p.Lease))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -105,7 +105,7 @@ func (a *applierV3backend) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse,
return nil, lease.ErrLeaseNotFound return nil, lease.ErrLeaseNotFound
} }
} }
rev = a.s.getKV().Put(p.Key, p.Value, leaseID) rev = a.s.KV().Put(p.Key, p.Value, leaseID)
} }
resp.Header.Revision = rev resp.Header.Revision = rev
return resp, nil return resp, nil
@ -126,12 +126,12 @@ func (a *applierV3backend) DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (
} }
if txnID != noTxn { if txnID != noTxn {
n, rev, err = a.s.getKV().TxnDeleteRange(txnID, dr.Key, dr.RangeEnd) n, rev, err = a.s.KV().TxnDeleteRange(txnID, dr.Key, dr.RangeEnd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} else { } else {
n, rev = a.s.getKV().DeleteRange(dr.Key, dr.RangeEnd) n, rev = a.s.KV().DeleteRange(dr.Key, dr.RangeEnd)
} }
resp.Deleted = n resp.Deleted = n
@ -164,12 +164,12 @@ func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResp
} }
if txnID != noTxn { if txnID != noTxn {
kvs, rev, err = a.s.getKV().TxnRange(txnID, r.Key, r.RangeEnd, limit, r.Revision) kvs, rev, err = a.s.KV().TxnRange(txnID, r.Key, r.RangeEnd, limit, r.Revision)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} else { } else {
kvs, rev, err = a.s.getKV().Range(r.Key, r.RangeEnd, limit, r.Revision) kvs, rev, err = a.s.KV().Range(r.Key, r.RangeEnd, limit, r.Revision)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -235,9 +235,9 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
// When executing the operations of txn, we need to hold the txn lock. // When executing the operations of txn, we need to hold the txn lock.
// So the reader will not see any intermediate results. // So the reader will not see any intermediate results.
txnID := a.s.getKV().TxnBegin() txnID := a.s.KV().TxnBegin()
defer func() { defer func() {
err := a.s.getKV().TxnEnd(txnID) err := a.s.KV().TxnEnd(txnID)
if err != nil { if err != nil {
panic(fmt.Sprint("unexpected error when closing txn", txnID)) panic(fmt.Sprint("unexpected error when closing txn", txnID))
} }
@ -264,7 +264,7 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
// It returns the revision at which the comparison happens. If the comparison // It returns the revision at which the comparison happens. If the comparison
// succeeds, the it returns true. Otherwise it returns false. // succeeds, the it returns true. Otherwise it returns false.
func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) { func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
ckvs, rev, err := a.s.getKV().Range(c.Key, nil, 1, 0) ckvs, rev, err := a.s.KV().Range(c.Key, nil, 1, 0)
if err != nil { if err != nil {
if err == dstorage.ErrTxnIDMismatch { if err == dstorage.ErrTxnIDMismatch {
panic("unexpected txn ID mismatch error") panic("unexpected txn ID mismatch error")
@ -365,12 +365,12 @@ func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestUnion) *pb.R
func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, error) { func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, error) {
resp := &pb.CompactionResponse{} resp := &pb.CompactionResponse{}
resp.Header = &pb.ResponseHeader{} resp.Header = &pb.ResponseHeader{}
err := a.s.getKV().Compact(compaction.Revision) err := a.s.KV().Compact(compaction.Revision)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// get the current revision. which key to get is not important. // get the current revision. which key to get is not important.
_, resp.Header.Revision, _ = a.s.getKV().Range([]byte("compaction"), nil, 1, 0) _, resp.Header.Revision, _ = a.s.KV().Range([]byte("compaction"), nil, 1, 0)
return resp, err return resp, err
} }
@ -559,10 +559,10 @@ func (a *applierV3backend) checkRequestRange(reqs []*pb.RequestUnion) error {
continue continue
} }
if greq.Revision > a.s.getKV().Rev() { if greq.Revision > a.s.KV().Rev() {
return dstorage.ErrFutureRev return dstorage.ErrFutureRev
} }
if greq.Revision < a.s.getKV().FirstRev() { if greq.Revision < a.s.KV().FirstRev() {
return dstorage.ErrCompacted return dstorage.ErrCompacted
} }
} }

View File

@ -1657,10 +1657,6 @@ type KVClient interface {
// Compact compacts the event history in etcd. User should compact the // Compact compacts the event history in etcd. User should compact the
// event history periodically, or it will grow infinitely. // event history periodically, or it will grow infinitely.
Compact(ctx context.Context, in *CompactionRequest, opts ...grpc.CallOption) (*CompactionResponse, error) Compact(ctx context.Context, in *CompactionRequest, opts ...grpc.CallOption) (*CompactionResponse, error)
// Hash returns the hash of local KV state for consistency checking purpose.
// This is designed for testing purpose. Do not use this in production when there
// are ongoing transactions.
Hash(ctx context.Context, in *HashRequest, opts ...grpc.CallOption) (*HashResponse, error)
} }
type kVClient struct { type kVClient struct {
@ -1716,15 +1712,6 @@ func (c *kVClient) Compact(ctx context.Context, in *CompactionRequest, opts ...g
return out, nil return out, nil
} }
func (c *kVClient) Hash(ctx context.Context, in *HashRequest, opts ...grpc.CallOption) (*HashResponse, error) {
out := new(HashResponse)
err := grpc.Invoke(ctx, "/etcdserverpb.KV/Hash", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for KV service // Server API for KV service
type KVServer interface { type KVServer interface {
@ -1746,10 +1733,6 @@ type KVServer interface {
// Compact compacts the event history in etcd. User should compact the // Compact compacts the event history in etcd. User should compact the
// event history periodically, or it will grow infinitely. // event history periodically, or it will grow infinitely.
Compact(context.Context, *CompactionRequest) (*CompactionResponse, error) Compact(context.Context, *CompactionRequest) (*CompactionResponse, error)
// Hash returns the hash of local KV state for consistency checking purpose.
// This is designed for testing purpose. Do not use this in production when there
// are ongoing transactions.
Hash(context.Context, *HashRequest) (*HashResponse, error)
} }
func RegisterKVServer(s *grpc.Server, srv KVServer) { func RegisterKVServer(s *grpc.Server, srv KVServer) {
@ -1816,18 +1799,6 @@ func _KV_Compact_Handler(srv interface{}, ctx context.Context, dec func(interfac
return out, nil return out, nil
} }
func _KV_Hash_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(HashRequest)
if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(KVServer).Hash(ctx, in)
if err != nil {
return nil, err
}
return out, nil
}
var _KV_serviceDesc = grpc.ServiceDesc{ var _KV_serviceDesc = grpc.ServiceDesc{
ServiceName: "etcdserverpb.KV", ServiceName: "etcdserverpb.KV",
HandlerType: (*KVServer)(nil), HandlerType: (*KVServer)(nil),
@ -1852,10 +1823,6 @@ var _KV_serviceDesc = grpc.ServiceDesc{
MethodName: "Compact", MethodName: "Compact",
Handler: _KV_Compact_Handler, Handler: _KV_Compact_Handler,
}, },
{
MethodName: "Hash",
Handler: _KV_Hash_Handler,
},
}, },
Streams: []grpc.StreamDesc{}, Streams: []grpc.StreamDesc{},
} }
@ -2274,10 +2241,13 @@ var _Cluster_serviceDesc = grpc.ServiceDesc{
// Client API for Maintenance service // Client API for Maintenance service
type MaintenanceClient interface { type MaintenanceClient interface {
// TODO: move Hash from kv to Maintenance
Defragment(ctx context.Context, in *DefragmentRequest, opts ...grpc.CallOption) (*DefragmentResponse, error)
// Alarm activates, deactivates, and queries alarms regarding cluster health. // Alarm activates, deactivates, and queries alarms regarding cluster health.
Alarm(ctx context.Context, in *AlarmRequest, opts ...grpc.CallOption) (*AlarmResponse, error) Alarm(ctx context.Context, in *AlarmRequest, opts ...grpc.CallOption) (*AlarmResponse, error)
Defragment(ctx context.Context, in *DefragmentRequest, opts ...grpc.CallOption) (*DefragmentResponse, error)
// Hash returns the hash of the local KV state for consistency checking purpose.
// This is designed for testing; do not use this in production when there
// are ongoing transactions.
Hash(ctx context.Context, in *HashRequest, opts ...grpc.CallOption) (*HashResponse, error)
} }
type maintenanceClient struct { type maintenanceClient struct {
@ -2288,15 +2258,6 @@ func NewMaintenanceClient(cc *grpc.ClientConn) MaintenanceClient {
return &maintenanceClient{cc} return &maintenanceClient{cc}
} }
func (c *maintenanceClient) Defragment(ctx context.Context, in *DefragmentRequest, opts ...grpc.CallOption) (*DefragmentResponse, error) {
out := new(DefragmentResponse)
err := grpc.Invoke(ctx, "/etcdserverpb.Maintenance/Defragment", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *maintenanceClient) Alarm(ctx context.Context, in *AlarmRequest, opts ...grpc.CallOption) (*AlarmResponse, error) { func (c *maintenanceClient) Alarm(ctx context.Context, in *AlarmRequest, opts ...grpc.CallOption) (*AlarmResponse, error) {
out := new(AlarmResponse) out := new(AlarmResponse)
err := grpc.Invoke(ctx, "/etcdserverpb.Maintenance/Alarm", in, out, c.cc, opts...) err := grpc.Invoke(ctx, "/etcdserverpb.Maintenance/Alarm", in, out, c.cc, opts...)
@ -2306,31 +2267,40 @@ func (c *maintenanceClient) Alarm(ctx context.Context, in *AlarmRequest, opts ..
return out, nil return out, nil
} }
// Server API for Maintenance service func (c *maintenanceClient) Defragment(ctx context.Context, in *DefragmentRequest, opts ...grpc.CallOption) (*DefragmentResponse, error) {
out := new(DefragmentResponse)
type MaintenanceServer interface { err := grpc.Invoke(ctx, "/etcdserverpb.Maintenance/Defragment", in, out, c.cc, opts...)
// TODO: move Hash from kv to Maintenance
Defragment(context.Context, *DefragmentRequest) (*DefragmentResponse, error)
// Alarm activates, deactivates, and queries alarms regarding cluster health.
Alarm(context.Context, *AlarmRequest) (*AlarmResponse, error)
}
func RegisterMaintenanceServer(s *grpc.Server, srv MaintenanceServer) {
s.RegisterService(&_Maintenance_serviceDesc, srv)
}
func _Maintenance_Defragment_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(DefragmentRequest)
if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(MaintenanceServer).Defragment(ctx, in)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return out, nil return out, nil
} }
func (c *maintenanceClient) Hash(ctx context.Context, in *HashRequest, opts ...grpc.CallOption) (*HashResponse, error) {
out := new(HashResponse)
err := grpc.Invoke(ctx, "/etcdserverpb.Maintenance/Hash", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Maintenance service
type MaintenanceServer interface {
// Alarm activates, deactivates, and queries alarms regarding cluster health.
Alarm(context.Context, *AlarmRequest) (*AlarmResponse, error)
Defragment(context.Context, *DefragmentRequest) (*DefragmentResponse, error)
// Hash returns the hash of the local KV state for consistency checking purpose.
// This is designed for testing; do not use this in production when there
// are ongoing transactions.
Hash(context.Context, *HashRequest) (*HashResponse, error)
}
func RegisterMaintenanceServer(s *grpc.Server, srv MaintenanceServer) {
s.RegisterService(&_Maintenance_serviceDesc, srv)
}
func _Maintenance_Alarm_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { func _Maintenance_Alarm_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(AlarmRequest) in := new(AlarmRequest)
if err := dec(in); err != nil { if err := dec(in); err != nil {
@ -2343,17 +2313,45 @@ func _Maintenance_Alarm_Handler(srv interface{}, ctx context.Context, dec func(i
return out, nil return out, nil
} }
func _Maintenance_Defragment_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(DefragmentRequest)
if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(MaintenanceServer).Defragment(ctx, in)
if err != nil {
return nil, err
}
return out, nil
}
func _Maintenance_Hash_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(HashRequest)
if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(MaintenanceServer).Hash(ctx, in)
if err != nil {
return nil, err
}
return out, nil
}
var _Maintenance_serviceDesc = grpc.ServiceDesc{ var _Maintenance_serviceDesc = grpc.ServiceDesc{
ServiceName: "etcdserverpb.Maintenance", ServiceName: "etcdserverpb.Maintenance",
HandlerType: (*MaintenanceServer)(nil), HandlerType: (*MaintenanceServer)(nil),
Methods: []grpc.MethodDesc{ Methods: []grpc.MethodDesc{
{
MethodName: "Alarm",
Handler: _Maintenance_Alarm_Handler,
},
{ {
MethodName: "Defragment", MethodName: "Defragment",
Handler: _Maintenance_Defragment_Handler, Handler: _Maintenance_Defragment_Handler,
}, },
{ {
MethodName: "Alarm", MethodName: "Hash",
Handler: _Maintenance_Alarm_Handler, Handler: _Maintenance_Hash_Handler,
}, },
}, },
Streams: []grpc.StreamDesc{}, Streams: []grpc.StreamDesc{},

View File

@ -30,11 +30,6 @@ service KV {
// Compact compacts the event history in etcd. User should compact the // Compact compacts the event history in etcd. User should compact the
// event history periodically, or it will grow infinitely. // event history periodically, or it will grow infinitely.
rpc Compact(CompactionRequest) returns (CompactionResponse) {} rpc Compact(CompactionRequest) returns (CompactionResponse) {}
// Hash returns the hash of local KV state for consistency checking purpose.
// This is designed for testing purpose. Do not use this in production when there
// are ongoing transactions.
rpc Hash(HashRequest) returns (HashResponse) {}
} }
service Watch { service Watch {
@ -77,11 +72,15 @@ service Cluster {
} }
service Maintenance { service Maintenance {
// TODO: move Hash from kv to Maintenance
rpc Defragment(DefragmentRequest) returns (DefragmentResponse) {}
// Alarm activates, deactivates, and queries alarms regarding cluster health. // Alarm activates, deactivates, and queries alarms regarding cluster health.
rpc Alarm(AlarmRequest) returns (AlarmResponse) {} rpc Alarm(AlarmRequest) returns (AlarmResponse) {}
rpc Defragment(DefragmentRequest) returns (DefragmentResponse) {}
// Hash returns the hash of the local KV state for consistency checking purpose.
// This is designed for testing; do not use this in production when there
// are ongoing transactions.
rpc Hash(HashRequest) returns (HashResponse) {}
} }
service Auth { service Auth {

View File

@ -1195,7 +1195,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
} }
// commit v3 storage because WAL file before snapshot index // commit v3 storage because WAL file before snapshot index
// could be removed after SaveSnap. // could be removed after SaveSnap.
s.getKV().Commit() s.KV().Commit()
// SaveSnap saves the snapshot and releases the locked wal files // SaveSnap saves the snapshot and releases the locked wal files
// to the snapshot index. // to the snapshot index.
if err = s.r.storage.SaveSnap(snap); err != nil { if err = s.r.storage.SaveSnap(snap); err != nil {
@ -1334,7 +1334,7 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
} }
} }
func (s *EtcdServer) getKV() dstorage.ConsistentWatchableKV { return s.kv } func (s *EtcdServer) KV() dstorage.ConsistentWatchableKV { return s.kv }
func (s *EtcdServer) Backend() backend.Backend { func (s *EtcdServer) Backend() backend.Backend {
s.bemu.Lock() s.bemu.Lock()
defer s.bemu.Unlock() defer s.bemu.Unlock()

View File

@ -38,7 +38,6 @@ type RaftKV interface {
DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error)
Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error)
} }
type Lessor interface { type Lessor interface {
@ -109,14 +108,6 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.
return resp, result.err return resp, result.err
} }
func (s *EtcdServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
h, err := s.be.Hash()
if err != nil {
return nil, err
}
return &pb.HashResponse{Header: &pb.ResponseHeader{Revision: s.kv.Rev()}, Hash: h}, nil
}
func (s *EtcdServer) LeaseCreate(ctx context.Context, r *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) { func (s *EtcdServer) LeaseCreate(ctx context.Context, r *pb.LeaseCreateRequest) (*pb.LeaseCreateResponse, error) {
// no id given? choose one // no id given? choose one
for r.ID == int64(lease.NoLease) { for r.ID == int64(lease.NoLease) {
@ -225,6 +216,4 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern
} }
// Watchable returns a watchable interface attached to the etcdserver. // Watchable returns a watchable interface attached to the etcdserver.
func (s *EtcdServer) Watchable() dstorage.Watchable { func (s *EtcdServer) Watchable() dstorage.Watchable { return s.KV() }
return s.getKV()
}

View File

@ -440,7 +440,10 @@ func TestV3Hash(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3}) clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t) defer clus.Terminate(t)
kvc := toGRPC(clus.RandClient()).KV cli := clus.RandClient()
kvc := toGRPC(cli).KV
m := toGRPC(cli).Maintenance
preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
@ -450,7 +453,7 @@ func TestV3Hash(t *testing.T) {
} }
} }
resp, err := kvc.Hash(context.Background(), &pb.HashRequest{}) resp, err := m.Hash(context.Background(), &pb.HashRequest{})
if err != nil || resp.Hash == 0 { if err != nil || resp.Hash == 0 {
t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash) t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
} }

View File

@ -303,9 +303,9 @@ func (c *cluster) getRevisionHash() (map[string]int64, map[string]int64, error)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
kvc := pb.NewKVClient(conn) m := pb.NewMaintenanceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := kvc.Hash(ctx, &pb.HashRequest{}) resp, err := m.Hash(ctx, &pb.HashRequest{})
cancel() cancel()
conn.Close() conn.Close()
if err != nil { if err != nil {