From fef4a795286beda1baebf7ba16763a1c17a8b6c3 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Thu, 15 Dec 2016 12:09:20 -0800 Subject: [PATCH 1/3] lease: force leader to apply its pending committed index for lease operations suppose a lease granting request from a follower goes through and followed by a lease look up or renewal, the leader might not apply the lease grant request locally. So the leader might not find the lease from the lease look up or renewal request which will result lease not found error. To fix this issue, we force the leader to apply its pending commited index before looking up lease. FIX #6978 --- etcdserver/api/v2http/peer.go | 5 ++- etcdserver/server.go | 2 + integration/v3_lease_test.go | 85 +++++++++++++++++++++++++++++++++++ lease/leasehttp/http.go | 34 ++++++++++++-- lease/leasehttp/http_test.go | 53 +++++++++++++++++++++- 5 files changed, 171 insertions(+), 8 deletions(-) diff --git a/etcdserver/api/v2http/peer.go b/etcdserver/api/v2http/peer.go index 456fa4619..a1abadba8 100644 --- a/etcdserver/api/v2http/peer.go +++ b/etcdserver/api/v2http/peer.go @@ -31,8 +31,9 @@ const ( // NewPeerHandler generates an http.Handler to handle etcd peer requests. func NewPeerHandler(s *etcdserver.EtcdServer) http.Handler { var lh http.Handler - if l := s.Lessor(); l != nil { - lh = leasehttp.NewHandler(l) + l := s.Lessor() + if l != nil { + lh = leasehttp.NewHandler(l, func() <-chan struct{} { return s.ApplyWait() }) } return newPeerHandler(s.Cluster(), s.RaftHandler(), lh) } diff --git a/etcdserver/server.go b/etcdserver/server.go index b62d29191..806c0c3de 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -565,6 +565,8 @@ func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() func (s *EtcdServer) Lessor() lease.Lessor { return s.lessor } +func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) } + func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { if s.cluster.IsIDRemoved(types.ID(m.From)) { plog.Warningf("reject message from removed member %s", types.ID(m.From).String()) diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index 4040b0f8f..6347c0c28 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -233,6 +233,91 @@ func TestV3LeaseExists(t *testing.T) { } } +// TestV3LeaseRenewStress keeps creating lease and renewing it immediately to ensure the renewal goes through. +// it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found. +// related issue https://github.com/coreos/etcd/issues/6978 +func TestV3LeaseRenewStress(t *testing.T) { + testLeaseStress(t, stressLeaseRenew) +} + +// TestV3LeaseTimeToLiveStress keeps creating lease and retriving it immediately to ensure the lease can be retrived. +// it was oberserved that the immediate lease retrival after granting a lease from follower resulted lease not found. +// related issue https://github.com/coreos/etcd/issues/6978 +func TestV3LeaseTimeToLiveStress(t *testing.T) { + testLeaseStress(t, stressLeaseTimeToLive) +} + +func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error) { + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + errc := make(chan error) + + for i := 0; i < 30; i++ { + for j := 0; j < 3; j++ { + go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j) + } + } + + for i := 0; i < 90; i++ { + if err := <-errc; err != nil { + t.Fatal(err) + } + } +} + +func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) (reterr error) { + defer func() { + if tctx.Err() != nil { + reterr = nil + } + }() + lac, err := lc.LeaseKeepAlive(tctx) + if err != nil { + return err + } + for tctx.Err() == nil { + resp, gerr := lc.LeaseGrant(tctx, &pb.LeaseGrantRequest{TTL: 60}) + if gerr != nil { + continue + } + err = lac.Send(&pb.LeaseKeepAliveRequest{ID: resp.ID}) + if err != nil { + continue + } + rresp, rxerr := lac.Recv() + if rxerr != nil { + continue + } + if rresp.TTL == 0 { + return fmt.Errorf("TTL shouldn't be 0 so soon") + } + } + return nil +} + +func stressLeaseTimeToLive(tctx context.Context, lc pb.LeaseClient) (reterr error) { + defer func() { + if tctx.Err() != nil { + reterr = nil + } + }() + for tctx.Err() == nil { + resp, gerr := lc.LeaseGrant(tctx, &pb.LeaseGrantRequest{TTL: 60}) + if gerr != nil { + continue + } + _, kerr := lc.LeaseTimeToLive(tctx, &pb.LeaseTimeToLiveRequest{ID: resp.ID}) + if rpctypes.Error(kerr) == rpctypes.ErrLeaseNotFound { + return kerr + } + } + return nil +} + func TestV3PutOnNonExistLease(t *testing.T) { defer testutil.AfterTest(t) clus := NewClusterV3(t, &ClusterConfig{Size: 1}) diff --git a/lease/leasehttp/http.go b/lease/leasehttp/http.go index 06eb93565..5e4afde54 100644 --- a/lease/leasehttp/http.go +++ b/lease/leasehttp/http.go @@ -16,9 +16,11 @@ package leasehttp import ( "bytes" + "errors" "fmt" "io/ioutil" "net/http" + "time" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/lease" @@ -30,14 +32,19 @@ import ( var ( LeasePrefix = "/leases" LeaseInternalPrefix = "/leases/internal" + applyTimeout = time.Second + ErrLeaseHTTPTimeout = errors.New("waiting for node to catch up its applied index has timed out") ) // NewHandler returns an http Handler for lease renewals -func NewHandler(l lease.Lessor) http.Handler { - return &leaseHandler{l} +func NewHandler(l lease.Lessor, waitch func() <-chan struct{}) http.Handler { + return &leaseHandler{l, waitch} } -type leaseHandler struct{ l lease.Lessor } +type leaseHandler struct { + l lease.Lessor + waitch func() <-chan struct{} +} func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { @@ -59,6 +66,12 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "error unmarshalling request", http.StatusBadRequest) return } + select { + case <-h.waitch(): + case <-time.After(applyTimeout): + http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout) + return + } ttl, err := h.l.Renew(lease.LeaseID(lreq.ID)) if err != nil { if err == lease.ErrLeaseNotFound { @@ -83,7 +96,12 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "error unmarshalling request", http.StatusBadRequest) return } - + select { + case <-h.waitch(): + case <-time.After(applyTimeout): + http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout) + return + } l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID)) if l == nil { http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound) @@ -148,6 +166,10 @@ func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundT return -1, err } + if resp.StatusCode == http.StatusRequestTimeout { + return -1, ErrLeaseHTTPTimeout + } + if resp.StatusCode == http.StatusNotFound { return -1, lease.ErrLeaseNotFound } @@ -196,6 +218,10 @@ func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string errc <- err return } + if resp.StatusCode == http.StatusRequestTimeout { + errc <- ErrLeaseHTTPTimeout + return + } if resp.StatusCode == http.StatusNotFound { errc <- lease.ErrLeaseNotFound return diff --git a/lease/leasehttp/http_test.go b/lease/leasehttp/http_test.go index 6219a4a2c..413208104 100644 --- a/lease/leasehttp/http_test.go +++ b/lease/leasehttp/http_test.go @@ -18,11 +18,13 @@ import ( "net/http" "net/http/httptest" "os" + "strings" "testing" "time" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc/backend" + "golang.org/x/net/context" ) @@ -38,7 +40,7 @@ func TestRenewHTTP(t *testing.T) { t.Fatalf("failed to create lease: %v", err) } - ts := httptest.NewServer(NewHandler(le)) + ts := httptest.NewServer(NewHandler(le, waitReady)) defer ts.Close() ttl, err := RenewHTTP(context.TODO(), l.ID, ts.URL+LeasePrefix, http.DefaultTransport) @@ -62,7 +64,7 @@ func TestTimeToLiveHTTP(t *testing.T) { t.Fatalf("failed to create lease: %v", err) } - ts := httptest.NewServer(NewHandler(le)) + ts := httptest.NewServer(NewHandler(le, waitReady)) defer ts.Close() resp, err := TimeToLiveHTTP(context.TODO(), l.ID, true, ts.URL+LeaseInternalPrefix, http.DefaultTransport) @@ -76,3 +78,50 @@ func TestTimeToLiveHTTP(t *testing.T) { t.Fatalf("granted TTL expected 5, got %d", resp.LeaseTimeToLiveResponse.GrantedTTL) } } + +func TestRenewHTTPTimeout(t *testing.T) { + testApplyTimeout(t, func(l *lease.Lease, serverURL string) error { + _, err := RenewHTTP(context.TODO(), l.ID, serverURL+LeasePrefix, http.DefaultTransport) + return err + }) +} + +func TestTimeToLiveHTTPTimeout(t *testing.T) { + testApplyTimeout(t, func(l *lease.Lease, serverURL string) error { + _, err := TimeToLiveHTTP(context.TODO(), l.ID, true, serverURL+LeaseInternalPrefix, http.DefaultTransport) + return err + }) +} + +func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) { + be, tmpPath := backend.NewTmpBackend(time.Hour, 10000) + defer os.Remove(tmpPath) + defer be.Close() + + le := lease.NewLessor(be, int64(5)) + le.Promote(time.Second) + l, err := le.Grant(1, int64(5)) + if err != nil { + t.Fatalf("failed to create lease: %v", err) + } + + ts := httptest.NewServer(NewHandler(le, waitNotReady)) + defer ts.Close() + err = f(l, ts.URL) + if err == nil { + t.Fatalf("expected timeout error, got nil") + } + if strings.Compare(err.Error(), ErrLeaseHTTPTimeout.Error()) != 0 { + t.Fatalf("expected (%v), got (%v)", ErrLeaseHTTPTimeout.Error(), err.Error()) + } +} + +func waitReady() <-chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch +} + +func waitNotReady() <-chan struct{} { + return nil +} From 7b7feb46fcf13da75f93797740ffc6034bb585ff Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Fri, 16 Dec 2016 17:11:23 -0800 Subject: [PATCH 2/3] leasehttp: buffer error channel to prevent goroutine leak --- lease/leasehttp/http.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lease/leasehttp/http.go b/lease/leasehttp/http.go index 5e4afde54..256051efc 100644 --- a/lease/leasehttp/http.go +++ b/lease/leasehttp/http.go @@ -206,7 +206,8 @@ func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string cc := &http.Client{Transport: rt} var b []byte - errc := make(chan error) + // buffer errc channel so that errc don't block inside the go routinue + errc := make(chan error, 2) go func() { resp, err := cc.Do(req) if err != nil { From 2faf72f47c2ce1633ed93035473fb7a9ecc771a4 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Thu, 22 Dec 2016 12:03:49 -0800 Subject: [PATCH 3/3] etcdserver: rework update committed index logic --- etcdserver/raft.go | 15 +++++++++++++++ etcdserver/server.go | 19 ++++++++----------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 50b574211..eae398cce 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -182,6 +182,8 @@ func (r *raftNode) start(rh *raftReadyHandler) { raftDone: raftDone, } + updateCommittedIndex(&ap, rh) + select { case r.applyc <- ap: case <-r.stopped: @@ -231,6 +233,19 @@ func (r *raftNode) start(rh *raftReadyHandler) { }() } +func updateCommittedIndex(ap *apply, rh *raftReadyHandler) { + var ci uint64 + if len(ap.entries) != 0 { + ci = ap.entries[len(ap.entries)-1].Index + } + if ap.snapshot.Metadata.Index > ci { + ci = ap.snapshot.Metadata.Index + } + if ci != 0 { + rh.updateCommittedIndex(ci) + } +} + func (r *raftNode) sendMessages(ms []raftpb.Message) { sentAppResp := false for i := len(ms) - 1; i >= 0; i-- { diff --git a/etcdserver/server.go b/etcdserver/server.go index 806c0c3de..d28908eee 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -598,7 +598,8 @@ type etcdProgress struct { // and helps decouple state machine logic from Raft algorithms. // TODO: add a state machine interface to apply the commit entries and do snapshot/recover type raftReadyHandler struct { - leadershipUpdate func() + leadershipUpdate func() + updateCommittedIndex func(uint64) } func (s *EtcdServer) run() { @@ -648,6 +649,12 @@ func (s *EtcdServer) run() { s.r.td.Reset() } }, + updateCommittedIndex: func(ci uint64) { + cci := s.getCommittedIndex() + if ci > cci { + s.setCommittedIndex(ci) + } + }, } s.r.start(rh) @@ -701,16 +708,6 @@ func (s *EtcdServer) run() { for { select { case ap := <-s.r.apply(): - var ci uint64 - if len(ap.entries) != 0 { - ci = ap.entries[len(ap.entries)-1].Index - } - if ap.snapshot.Metadata.Index > ci { - ci = ap.snapshot.Metadata.Index - } - if ci != 0 { - s.setCommittedIndex(ci) - } f := func(context.Context) { s.applyAll(&ep, &ap) } sched.Schedule(f) case leases := <-expiredLeaseC: