diff --git a/etcdserver/api/v2http/peer.go b/etcdserver/api/v2http/peer.go index 456fa4619..a1abadba8 100644 --- a/etcdserver/api/v2http/peer.go +++ b/etcdserver/api/v2http/peer.go @@ -31,8 +31,9 @@ const ( // NewPeerHandler generates an http.Handler to handle etcd peer requests. func NewPeerHandler(s *etcdserver.EtcdServer) http.Handler { var lh http.Handler - if l := s.Lessor(); l != nil { - lh = leasehttp.NewHandler(l) + l := s.Lessor() + if l != nil { + lh = leasehttp.NewHandler(l, func() <-chan struct{} { return s.ApplyWait() }) } return newPeerHandler(s.Cluster(), s.RaftHandler(), lh) } diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 50b574211..eae398cce 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -182,6 +182,8 @@ func (r *raftNode) start(rh *raftReadyHandler) { raftDone: raftDone, } + updateCommittedIndex(&ap, rh) + select { case r.applyc <- ap: case <-r.stopped: @@ -231,6 +233,19 @@ func (r *raftNode) start(rh *raftReadyHandler) { }() } +func updateCommittedIndex(ap *apply, rh *raftReadyHandler) { + var ci uint64 + if len(ap.entries) != 0 { + ci = ap.entries[len(ap.entries)-1].Index + } + if ap.snapshot.Metadata.Index > ci { + ci = ap.snapshot.Metadata.Index + } + if ci != 0 { + rh.updateCommittedIndex(ci) + } +} + func (r *raftNode) sendMessages(ms []raftpb.Message) { sentAppResp := false for i := len(ms) - 1; i >= 0; i-- { diff --git a/etcdserver/server.go b/etcdserver/server.go index b62d29191..d28908eee 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -565,6 +565,8 @@ func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() func (s *EtcdServer) Lessor() lease.Lessor { return s.lessor } +func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) } + func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { if s.cluster.IsIDRemoved(types.ID(m.From)) { plog.Warningf("reject message from removed member %s", types.ID(m.From).String()) @@ -596,7 +598,8 @@ type etcdProgress struct { // and helps decouple state machine logic from Raft algorithms. // TODO: add a state machine interface to apply the commit entries and do snapshot/recover type raftReadyHandler struct { - leadershipUpdate func() + leadershipUpdate func() + updateCommittedIndex func(uint64) } func (s *EtcdServer) run() { @@ -646,6 +649,12 @@ func (s *EtcdServer) run() { s.r.td.Reset() } }, + updateCommittedIndex: func(ci uint64) { + cci := s.getCommittedIndex() + if ci > cci { + s.setCommittedIndex(ci) + } + }, } s.r.start(rh) @@ -699,16 +708,6 @@ func (s *EtcdServer) run() { for { select { case ap := <-s.r.apply(): - var ci uint64 - if len(ap.entries) != 0 { - ci = ap.entries[len(ap.entries)-1].Index - } - if ap.snapshot.Metadata.Index > ci { - ci = ap.snapshot.Metadata.Index - } - if ci != 0 { - s.setCommittedIndex(ci) - } f := func(context.Context) { s.applyAll(&ep, &ap) } sched.Schedule(f) case leases := <-expiredLeaseC: diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index 4040b0f8f..6347c0c28 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -233,6 +233,91 @@ func TestV3LeaseExists(t *testing.T) { } } +// TestV3LeaseRenewStress keeps creating lease and renewing it immediately to ensure the renewal goes through. +// it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found. +// related issue https://github.com/coreos/etcd/issues/6978 +func TestV3LeaseRenewStress(t *testing.T) { + testLeaseStress(t, stressLeaseRenew) +} + +// TestV3LeaseTimeToLiveStress keeps creating lease and retriving it immediately to ensure the lease can be retrived. +// it was oberserved that the immediate lease retrival after granting a lease from follower resulted lease not found. +// related issue https://github.com/coreos/etcd/issues/6978 +func TestV3LeaseTimeToLiveStress(t *testing.T) { + testLeaseStress(t, stressLeaseTimeToLive) +} + +func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error) { + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + errc := make(chan error) + + for i := 0; i < 30; i++ { + for j := 0; j < 3; j++ { + go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j) + } + } + + for i := 0; i < 90; i++ { + if err := <-errc; err != nil { + t.Fatal(err) + } + } +} + +func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) (reterr error) { + defer func() { + if tctx.Err() != nil { + reterr = nil + } + }() + lac, err := lc.LeaseKeepAlive(tctx) + if err != nil { + return err + } + for tctx.Err() == nil { + resp, gerr := lc.LeaseGrant(tctx, &pb.LeaseGrantRequest{TTL: 60}) + if gerr != nil { + continue + } + err = lac.Send(&pb.LeaseKeepAliveRequest{ID: resp.ID}) + if err != nil { + continue + } + rresp, rxerr := lac.Recv() + if rxerr != nil { + continue + } + if rresp.TTL == 0 { + return fmt.Errorf("TTL shouldn't be 0 so soon") + } + } + return nil +} + +func stressLeaseTimeToLive(tctx context.Context, lc pb.LeaseClient) (reterr error) { + defer func() { + if tctx.Err() != nil { + reterr = nil + } + }() + for tctx.Err() == nil { + resp, gerr := lc.LeaseGrant(tctx, &pb.LeaseGrantRequest{TTL: 60}) + if gerr != nil { + continue + } + _, kerr := lc.LeaseTimeToLive(tctx, &pb.LeaseTimeToLiveRequest{ID: resp.ID}) + if rpctypes.Error(kerr) == rpctypes.ErrLeaseNotFound { + return kerr + } + } + return nil +} + func TestV3PutOnNonExistLease(t *testing.T) { defer testutil.AfterTest(t) clus := NewClusterV3(t, &ClusterConfig{Size: 1}) diff --git a/lease/leasehttp/http.go b/lease/leasehttp/http.go index 06eb93565..256051efc 100644 --- a/lease/leasehttp/http.go +++ b/lease/leasehttp/http.go @@ -16,9 +16,11 @@ package leasehttp import ( "bytes" + "errors" "fmt" "io/ioutil" "net/http" + "time" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/lease" @@ -30,14 +32,19 @@ import ( var ( LeasePrefix = "/leases" LeaseInternalPrefix = "/leases/internal" + applyTimeout = time.Second + ErrLeaseHTTPTimeout = errors.New("waiting for node to catch up its applied index has timed out") ) // NewHandler returns an http Handler for lease renewals -func NewHandler(l lease.Lessor) http.Handler { - return &leaseHandler{l} +func NewHandler(l lease.Lessor, waitch func() <-chan struct{}) http.Handler { + return &leaseHandler{l, waitch} } -type leaseHandler struct{ l lease.Lessor } +type leaseHandler struct { + l lease.Lessor + waitch func() <-chan struct{} +} func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { @@ -59,6 +66,12 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "error unmarshalling request", http.StatusBadRequest) return } + select { + case <-h.waitch(): + case <-time.After(applyTimeout): + http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout) + return + } ttl, err := h.l.Renew(lease.LeaseID(lreq.ID)) if err != nil { if err == lease.ErrLeaseNotFound { @@ -83,7 +96,12 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "error unmarshalling request", http.StatusBadRequest) return } - + select { + case <-h.waitch(): + case <-time.After(applyTimeout): + http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout) + return + } l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID)) if l == nil { http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound) @@ -148,6 +166,10 @@ func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundT return -1, err } + if resp.StatusCode == http.StatusRequestTimeout { + return -1, ErrLeaseHTTPTimeout + } + if resp.StatusCode == http.StatusNotFound { return -1, lease.ErrLeaseNotFound } @@ -184,7 +206,8 @@ func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string cc := &http.Client{Transport: rt} var b []byte - errc := make(chan error) + // buffer errc channel so that errc don't block inside the go routinue + errc := make(chan error, 2) go func() { resp, err := cc.Do(req) if err != nil { @@ -196,6 +219,10 @@ func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string errc <- err return } + if resp.StatusCode == http.StatusRequestTimeout { + errc <- ErrLeaseHTTPTimeout + return + } if resp.StatusCode == http.StatusNotFound { errc <- lease.ErrLeaseNotFound return diff --git a/lease/leasehttp/http_test.go b/lease/leasehttp/http_test.go index 6219a4a2c..413208104 100644 --- a/lease/leasehttp/http_test.go +++ b/lease/leasehttp/http_test.go @@ -18,11 +18,13 @@ import ( "net/http" "net/http/httptest" "os" + "strings" "testing" "time" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc/backend" + "golang.org/x/net/context" ) @@ -38,7 +40,7 @@ func TestRenewHTTP(t *testing.T) { t.Fatalf("failed to create lease: %v", err) } - ts := httptest.NewServer(NewHandler(le)) + ts := httptest.NewServer(NewHandler(le, waitReady)) defer ts.Close() ttl, err := RenewHTTP(context.TODO(), l.ID, ts.URL+LeasePrefix, http.DefaultTransport) @@ -62,7 +64,7 @@ func TestTimeToLiveHTTP(t *testing.T) { t.Fatalf("failed to create lease: %v", err) } - ts := httptest.NewServer(NewHandler(le)) + ts := httptest.NewServer(NewHandler(le, waitReady)) defer ts.Close() resp, err := TimeToLiveHTTP(context.TODO(), l.ID, true, ts.URL+LeaseInternalPrefix, http.DefaultTransport) @@ -76,3 +78,50 @@ func TestTimeToLiveHTTP(t *testing.T) { t.Fatalf("granted TTL expected 5, got %d", resp.LeaseTimeToLiveResponse.GrantedTTL) } } + +func TestRenewHTTPTimeout(t *testing.T) { + testApplyTimeout(t, func(l *lease.Lease, serverURL string) error { + _, err := RenewHTTP(context.TODO(), l.ID, serverURL+LeasePrefix, http.DefaultTransport) + return err + }) +} + +func TestTimeToLiveHTTPTimeout(t *testing.T) { + testApplyTimeout(t, func(l *lease.Lease, serverURL string) error { + _, err := TimeToLiveHTTP(context.TODO(), l.ID, true, serverURL+LeaseInternalPrefix, http.DefaultTransport) + return err + }) +} + +func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) { + be, tmpPath := backend.NewTmpBackend(time.Hour, 10000) + defer os.Remove(tmpPath) + defer be.Close() + + le := lease.NewLessor(be, int64(5)) + le.Promote(time.Second) + l, err := le.Grant(1, int64(5)) + if err != nil { + t.Fatalf("failed to create lease: %v", err) + } + + ts := httptest.NewServer(NewHandler(le, waitNotReady)) + defer ts.Close() + err = f(l, ts.URL) + if err == nil { + t.Fatalf("expected timeout error, got nil") + } + if strings.Compare(err.Error(), ErrLeaseHTTPTimeout.Error()) != 0 { + t.Fatalf("expected (%v), got (%v)", ErrLeaseHTTPTimeout.Error(), err.Error()) + } +} + +func waitReady() <-chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch +} + +func waitNotReady() <-chan struct{} { + return nil +}