mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
v3rpc: fill lease header
This commit is contained in:
parent
26e52d2bce
commit
c3de53c23c
@ -40,4 +40,7 @@ func (h *header) fill(rh *pb.ResponseHeader) {
|
||||
rh.ClusterId = uint64(h.clusterID)
|
||||
rh.MemberId = uint64(h.memberID)
|
||||
rh.RaftTerm = h.raftTimer.Term()
|
||||
if rh.Revision == 0 {
|
||||
rh.Revision = h.rev()
|
||||
}
|
||||
}
|
||||
|
@ -25,11 +25,12 @@ import (
|
||||
)
|
||||
|
||||
type LeaseServer struct {
|
||||
le etcdserver.Lessor
|
||||
hdr header
|
||||
le etcdserver.Lessor
|
||||
}
|
||||
|
||||
func NewLeaseServer(le etcdserver.Lessor) pb.LeaseServer {
|
||||
return &LeaseServer{le: le}
|
||||
func NewLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer {
|
||||
return &LeaseServer{le: s, hdr: newHeader(s)}
|
||||
}
|
||||
|
||||
func (ls *LeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
|
||||
@ -37,15 +38,17 @@ func (ls *LeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest)
|
||||
if err == lease.ErrLeaseExists {
|
||||
return nil, rpctypes.ErrLeaseExist
|
||||
}
|
||||
ls.hdr.fill(resp.Header)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (ls *LeaseServer) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
|
||||
r, err := ls.le.LeaseRevoke(ctx, rr)
|
||||
resp, err := ls.le.LeaseRevoke(ctx, rr)
|
||||
if err != nil {
|
||||
return nil, rpctypes.ErrLeaseNotFound
|
||||
}
|
||||
return r, nil
|
||||
ls.hdr.fill(resp.Header)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
|
||||
@ -58,6 +61,15 @@ func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
|
||||
return err
|
||||
}
|
||||
|
||||
// Create header before we sent out the renew request.
|
||||
// This can make sure that the revision is strictly smaller or equal to
|
||||
// when the keepalive happened at the local server (when the local server is the leader)
|
||||
// or remote leader.
|
||||
// Without this, a lease might be revoked at rev 3 but client can see the keepalive succeeded
|
||||
// at rev 4.
|
||||
resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
|
||||
ls.hdr.fill(resp.Header)
|
||||
|
||||
ttl, err := ls.le.LeaseRenew(lease.LeaseID(req.ID))
|
||||
if err == lease.ErrLeaseNotFound {
|
||||
return rpctypes.ErrLeaseNotFound
|
||||
@ -67,7 +79,7 @@ func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
|
||||
return err
|
||||
}
|
||||
|
||||
resp := &pb.LeaseKeepAliveResponse{ID: req.ID, TTL: ttl}
|
||||
resp.TTL = ttl
|
||||
err = stream.Send(resp)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -406,13 +406,15 @@ func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantR
|
||||
if err == nil {
|
||||
resp.ID = int64(l.ID)
|
||||
resp.TTL = l.TTL
|
||||
resp.Header = &pb.ResponseHeader{Revision: a.s.KV().Rev()}
|
||||
}
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
|
||||
err := a.s.lessor.Revoke(lease.LeaseID(lc.ID))
|
||||
return &pb.LeaseRevokeResponse{}, err
|
||||
return &pb.LeaseRevokeResponse{Header: &pb.ResponseHeader{Revision: a.s.KV().Rev()}}, err
|
||||
}
|
||||
|
||||
func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user