mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
v3rpc, etcdserver, leasehttp: ctxize Renew with request timeout
Would retry a few times before returning a not primary error that the client should never see. Instead, use proper timeouts and then return a request timeout error on failure. Fixes #6922
This commit is contained in:
parent
f6042890b7
commit
be1f36d97c
@ -79,14 +79,14 @@ func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
|
||||
resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
|
||||
ls.hdr.fill(resp.Header)
|
||||
|
||||
ttl, err := ls.le.LeaseRenew(lease.LeaseID(req.ID))
|
||||
ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID))
|
||||
if err == lease.ErrLeaseNotFound {
|
||||
err = nil
|
||||
ttl = 0
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
return togRPCError(err)
|
||||
}
|
||||
|
||||
resp.TTL = ttl
|
||||
|
@ -17,7 +17,6 @@ package etcdserver
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@ -27,7 +26,6 @@ import (
|
||||
"github.com/coreos/etcd/etcdserver/membership"
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/lease/leasehttp"
|
||||
"github.com/coreos/etcd/lease/leasepb"
|
||||
"github.com/coreos/etcd/mvcc"
|
||||
"github.com/coreos/etcd/raft"
|
||||
|
||||
@ -70,7 +68,7 @@ type Lessor interface {
|
||||
|
||||
// LeaseRenew renews the lease with given ID. The renewed TTL is returned. Or an error
|
||||
// is returned.
|
||||
LeaseRenew(id lease.LeaseID) (int64, error)
|
||||
LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error)
|
||||
|
||||
// LeaseTimeToLive retrieves lease information.
|
||||
LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
|
||||
@ -306,7 +304,7 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
|
||||
return result.resp.(*pb.LeaseRevokeResponse), nil
|
||||
}
|
||||
|
||||
func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
|
||||
func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
|
||||
ttl, err := s.lessor.Renew(id)
|
||||
if err == nil { // already requested to primary lessor(leader)
|
||||
return ttl, nil
|
||||
@ -315,21 +313,24 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
// renewals don't go through raft; forward to leader manually
|
||||
leader, err := s.waitLeader()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
|
||||
defer cancel()
|
||||
|
||||
for _, url := range leader.PeerURLs {
|
||||
lurl := url + leasehttp.LeasePrefix
|
||||
ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.Cfg.peerDialTimeout())
|
||||
if err == nil {
|
||||
break
|
||||
// renewals don't go through raft; forward to leader manually
|
||||
for cctx.Err() == nil && err != nil {
|
||||
leader, lerr := s.waitLeader(cctx)
|
||||
if lerr != nil {
|
||||
return -1, lerr
|
||||
}
|
||||
for _, url := range leader.PeerURLs {
|
||||
lurl := url + leasehttp.LeasePrefix
|
||||
ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
|
||||
if err == nil || err == lease.ErrLeaseNotFound {
|
||||
return ttl, err
|
||||
}
|
||||
}
|
||||
err = convertEOFToNoLeader(err)
|
||||
}
|
||||
return ttl, err
|
||||
return -1, ErrTimeout
|
||||
}
|
||||
|
||||
func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
|
||||
@ -352,39 +353,32 @@ func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// manually request to leader
|
||||
leader, err := s.waitLeader()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
|
||||
defer cancel()
|
||||
|
||||
for _, url := range leader.PeerURLs {
|
||||
lurl := url + leasehttp.LeaseInternalPrefix
|
||||
var iresp *leasepb.LeaseInternalResponse
|
||||
iresp, err = leasehttp.TimeToLiveHTTP(ctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt)
|
||||
if err == nil {
|
||||
return iresp.LeaseTimeToLiveResponse, nil
|
||||
// forward to leader
|
||||
for cctx.Err() == nil {
|
||||
leader, err := s.waitLeader(cctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, url := range leader.PeerURLs {
|
||||
lurl := url + leasehttp.LeaseInternalPrefix
|
||||
resp, err := leasehttp.TimeToLiveHTTP(cctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt)
|
||||
if err == nil {
|
||||
return resp.LeaseTimeToLiveResponse, nil
|
||||
}
|
||||
if err == lease.ErrLeaseNotFound {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
err = convertEOFToNoLeader(err)
|
||||
}
|
||||
return nil, err
|
||||
return nil, ErrTimeout
|
||||
}
|
||||
|
||||
// convertEOFToNoLeader converts EOF erros to ErrNoLeader because
|
||||
// lease renew, timetolive requests to followers are forwarded to leader,
|
||||
// and follower might not be able to reach leader from transient network
|
||||
// errors (often EOF errors). By returning ErrNoLeader, signal clients
|
||||
// to retry its requests.
|
||||
func convertEOFToNoLeader(err error) error {
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
return ErrNoLeader
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *EtcdServer) waitLeader() (*membership.Member, error) {
|
||||
func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) {
|
||||
leader := s.cluster.Member(s.Leader())
|
||||
for i := 0; i < 5 && leader == nil; i++ {
|
||||
for leader == nil {
|
||||
// wait an election
|
||||
dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond
|
||||
select {
|
||||
@ -392,6 +386,8 @@ func (s *EtcdServer) waitLeader() (*membership.Member, error) {
|
||||
leader = s.cluster.Member(s.Leader())
|
||||
case <-s.stopping:
|
||||
return nil, ErrStopped
|
||||
case <-ctx.Done():
|
||||
return nil, ErrNoLeader
|
||||
}
|
||||
}
|
||||
if leader == nil || len(leader.PeerURLs) == 0 {
|
||||
|
@ -19,7 +19,6 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/lease"
|
||||
@ -125,15 +124,22 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// RenewHTTP renews a lease at a given primary server.
|
||||
// TODO: Batch request in future?
|
||||
func RenewHTTP(id lease.LeaseID, url string, rt http.RoundTripper, timeout time.Duration) (int64, error) {
|
||||
func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundTripper) (int64, error) {
|
||||
// will post lreq protobuf to leader
|
||||
lreq, err := (&pb.LeaseKeepAliveRequest{ID: int64(id)}).Marshal()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
cc := &http.Client{Transport: rt, Timeout: timeout}
|
||||
resp, err := cc.Post(url, "application/protobuf", bytes.NewReader(lreq))
|
||||
cc := &http.Client{Transport: rt}
|
||||
req, err := http.NewRequest("POST", url, bytes.NewReader(lreq))
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/protobuf")
|
||||
req.Cancel = ctx.Done()
|
||||
|
||||
resp, err := cc.Do(req)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ func TestRenewHTTP(t *testing.T) {
|
||||
ts := httptest.NewServer(NewHandler(le))
|
||||
defer ts.Close()
|
||||
|
||||
ttl, err := RenewHTTP(l.ID, ts.URL+LeasePrefix, http.DefaultTransport, time.Second)
|
||||
ttl, err := RenewHTTP(context.TODO(), l.ID, ts.URL+LeasePrefix, http.DefaultTransport)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user