From be7d573366e14cfcf3dd7c8a91e87d8d7dd5c398 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 21 Jan 2016 23:37:56 -0800 Subject: [PATCH 1/2] lease: store server-decided TTL in lease If actual TTL is not stored in lease, the client will receive the correct TTL and therefore won't be able to keepalive correctly. --- lease/lessor.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lease/lessor.go b/lease/lessor.go index 5898aee02..f01fa0412 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -413,16 +413,17 @@ func (l Lease) removeFrom(b backend.Backend) { // refresh refreshes the expiry of the lease. It extends the expiry at least // minLeaseTTL second. func (l *Lease) refresh() { - ttl := l.TTL if l.TTL < minLeaseTTL { - ttl = minLeaseTTL + l.TTL = minLeaseTTL } - - l.expiry = time.Now().Add(time.Second * time.Duration(ttl)) + l.expiry = time.Now().Add(time.Second * time.Duration(l.TTL)) } // forever sets the expiry of lease to be forever. func (l *Lease) forever() { + if l.TTL < minLeaseTTL { + l.TTL = minLeaseTTL + } l.expiry = forever } From 2e157530a00502d18897d83c65fcba81967cbfaf Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 19 Jan 2016 21:09:09 -0800 Subject: [PATCH 2/2] etcdhttp, lease, v3api: forward keepalives to leader keepalives don't go through raft so let follower peers announce keepalives to the leader through the peer http handler --- etcdmain/etcd.go | 2 +- etcdserver/api/v3rpc/lease.go | 6 +- etcdserver/errors.go | 1 + etcdserver/etcdhttp/peer.go | 17 ++++- etcdserver/etcdhttp/peer_test.go | 2 +- etcdserver/server.go | 2 + etcdserver/v3demo_server.go | 34 +++++++++- integration/cluster_test.go | 2 +- integration/v3_grpc_test.go | 68 ++++++++++++++++++- lease/http.go | 109 +++++++++++++++++++++++++++++++ 10 files changed, 229 insertions(+), 14 deletions(-) create mode 100644 lease/http.go diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index d76bf5998..bf494cbe5 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -312,7 +312,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { Handler: etcdhttp.NewClientHandler(s, srvcfg.ReqTimeout()), Info: cfg.corsInfo, } - ph := etcdhttp.NewPeerHandler(s.Cluster(), s.RaftHandler()) + ph := etcdhttp.NewPeerHandler(s) // Start the peer server in a goroutine for _, l := range plns { go func(l net.Listener) { diff --git a/etcdserver/api/v3rpc/lease.go b/etcdserver/api/v3rpc/lease.go index a0a450bd9..a978346bc 100644 --- a/etcdserver/api/v3rpc/lease.go +++ b/etcdserver/api/v3rpc/lease.go @@ -55,11 +55,7 @@ func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro ttl, err := ls.le.LeaseRenew(lease.LeaseID(req.ID)) if err != nil { - if err == lease.ErrLeaseNotFound { - return ErrLeaseNotFound - } - // TODO: handle not primary error by forwarding renew requests to leader - panic("TODO: handle not primary error by forwarding renew requests to leader") + return err } resp := &pb.LeaseKeepAliveResponse{ID: req.ID, TTL: ttl} diff --git a/etcdserver/errors.go b/etcdserver/errors.go index bd9a07fd4..72a721908 100644 --- a/etcdserver/errors.go +++ b/etcdserver/errors.go @@ -33,6 +33,7 @@ var ( ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost") ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members") + ErrNoLeader = errors.New("etcdserver: no leader") ) func isKeyNotFound(err error) bool { diff --git a/etcdserver/etcdhttp/peer.go b/etcdserver/etcdhttp/peer.go index 756b6baf7..73441fe35 100644 --- a/etcdserver/etcdhttp/peer.go +++ b/etcdserver/etcdhttp/peer.go @@ -19,15 +19,25 @@ import ( "net/http" "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/lease" "github.com/coreos/etcd/rafthttp" ) const ( peerMembersPrefix = "/members" + leasesPrefix = "/leases" ) -// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests. -func NewPeerHandler(cluster etcdserver.Cluster, raftHandler http.Handler) http.Handler { +// NewPeerHandler generates an http.Handler to handle etcd peer requests. +func NewPeerHandler(s *etcdserver.EtcdServer) http.Handler { + var lh http.Handler + if l := s.Lessor(); l != nil { + lh = lease.NewHandler(l) + } + return newPeerHandler(s.Cluster(), s.RaftHandler(), lh) +} + +func newPeerHandler(cluster etcdserver.Cluster, raftHandler http.Handler, leaseHandler http.Handler) http.Handler { mh := &peerMembersHandler{ cluster: cluster, } @@ -37,6 +47,9 @@ func NewPeerHandler(cluster etcdserver.Cluster, raftHandler http.Handler) http.H mux.Handle(rafthttp.RaftPrefix, raftHandler) mux.Handle(rafthttp.RaftPrefix+"/", raftHandler) mux.Handle(peerMembersPrefix, mh) + if leaseHandler != nil { + mux.Handle(leasesPrefix, leaseHandler) + } mux.HandleFunc(versionPath, versionHandler(cluster, serveVersion)) return mux } diff --git a/etcdserver/etcdhttp/peer_test.go b/etcdserver/etcdhttp/peer_test.go index 68b2f0e70..f704576a1 100644 --- a/etcdserver/etcdhttp/peer_test.go +++ b/etcdserver/etcdhttp/peer_test.go @@ -33,7 +33,7 @@ func TestNewPeerHandlerOnRaftPrefix(t *testing.T) { h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("test data")) }) - ph := NewPeerHandler(&fakeCluster{}, h) + ph := newPeerHandler(&fakeCluster{}, h, nil) srv := httptest.NewServer(ph) defer srv.Close() diff --git a/etcdserver/server.go b/etcdserver/server.go index f7085bee3..4acd871f0 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -453,6 +453,8 @@ func (s *EtcdServer) Cluster() Cluster { return s.cluster } func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() } +func (s *EtcdServer) Lessor() lease.Lessor { return s.lessor } + func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { if s.cluster.IsIDRemoved(types.ID(m.From)) { plog.Warningf("reject message from removed member %s", types.ID(m.From).String()) diff --git a/etcdserver/v3demo_server.go b/etcdserver/v3demo_server.go index 9f6c646ad..fb0fe1326 100644 --- a/etcdserver/v3demo_server.go +++ b/etcdserver/v3demo_server.go @@ -18,6 +18,7 @@ import ( "bytes" "fmt" "sort" + "time" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" @@ -112,7 +113,38 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) } func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) { - return s.lessor.Renew(id) + ttl, err := s.lessor.Renew(id) + if err == nil { + return ttl, nil + } + if err != lease.ErrNotPrimary { + return -1, err + } + + // renewals don't go through raft; forward to leader manually + leader := s.cluster.Member(s.Leader()) + for i := 0; i < 5 && leader == nil; i++ { + // wait an election + dur := time.Duration(s.cfg.ElectionTicks) * time.Duration(s.cfg.TickMs) * time.Millisecond + select { + case <-time.After(dur): + leader = s.cluster.Member(s.Leader()) + case <-s.done: + return -1, ErrStopped + } + } + if leader == nil || len(leader.PeerURLs) == 0 { + return -1, ErrNoLeader + } + + for _, url := range leader.PeerURLs { + lurl := url + "/leases" + ttl, err = lease.RenewHTTP(id, lurl, s.cfg.PeerTLSInfo, s.cfg.peerDialTimeout()) + if err == nil { + break + } + } + return ttl, err } type applyResult struct { diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 1c80bd9cf..acc290877 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -782,7 +782,7 @@ func (m *member) Launch() error { m.s.SyncTicker = time.Tick(500 * time.Millisecond) m.s.Start() - m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster(), m.s.RaftHandler())} + m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s)} for _, ln := range m.PeerListeners { hs := &httptest.Server{ diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 64438ff43..2bdae66c9 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -1010,7 +1010,8 @@ func TestV3RangeRequest(t *testing.T) { // TestV3LeaseRevoke ensures a key is deleted once its lease is revoked. func TestV3LeaseRevoke(t *testing.T) { - testLeaseRemoveLeasedKey(t, func(lc pb.LeaseClient, leaseID int64) error { + testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error { + lc := pb.NewLeaseClient(clus.RandConn()) _, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID}) return err }) @@ -1056,6 +1057,67 @@ func TestV3LeaseCreateByID(t *testing.T) { } +// TestV3LeaseKeepAlive ensures keepalive keeps the lease alive. +func TestV3LeaseKeepAlive(t *testing.T) { + testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error { + lc := pb.NewLeaseClient(clus.RandConn()) + lreq := &pb.LeaseKeepAliveRequest{ID: leaseID} + lac, err := lc.LeaseKeepAlive(context.TODO()) + if err != nil { + return err + } + defer lac.CloseSend() + + // renew long enough so lease would've expired otherwise + for i := 0; i < 3; i++ { + if err = lac.Send(lreq); err != nil { + return err + } + lresp, rxerr := lac.Recv() + if rxerr != nil { + return rxerr + } + if lresp.ID != leaseID { + return fmt.Errorf("expected lease ID %v, got %v", leaseID, lresp.ID) + } + time.Sleep(time.Duration(lresp.TTL/2) * time.Second) + } + _, err = lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID}) + return err + }) +} + +// TestV3LeaseExists creates a lease on a random client, then sends a keepalive on another +// client to confirm it's visible to the whole cluster. +func TestV3LeaseExists(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + defer clus.Terminate(t) + + // create lease + lresp, err := pb.NewLeaseClient(clus.RandConn()).LeaseCreate( + context.TODO(), + &pb.LeaseCreateRequest{TTL: 30}) + if err != nil { + t.Fatal(err) + } + if lresp.Error != "" { + t.Fatal(lresp.Error) + } + + // confirm keepalive + lac, err := pb.NewLeaseClient(clus.RandConn()).LeaseKeepAlive(context.TODO()) + if err != nil { + t.Fatal(err) + } + defer lac.CloseSend() + if err = lac.Send(&pb.LeaseKeepAliveRequest{ID: lresp.ID}); err != nil { + t.Fatal(err) + } + if _, err = lac.Recv(); err != nil { + t.Fatal(err) + } +} + // acquireLeaseAndKey creates a new lease and creates an attached key. func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) { // create lease @@ -1078,7 +1140,7 @@ func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) { // testLeaseRemoveLeasedKey performs some action while holding a lease with an // attached key "foo", then confirms the key is gone. -func testLeaseRemoveLeasedKey(t *testing.T, act func(pb.LeaseClient, int64) error) { +func testLeaseRemoveLeasedKey(t *testing.T, act func(*clusterV3, int64) error) { clus := newClusterGRPC(t, &clusterConfig{size: 3}) defer clus.Terminate(t) @@ -1087,7 +1149,7 @@ func testLeaseRemoveLeasedKey(t *testing.T, act func(pb.LeaseClient, int64) erro t.Fatal(err) } - if err := act(pb.NewLeaseClient(clus.RandConn()), leaseID); err != nil { + if err = act(clus, leaseID); err != nil { t.Fatal(err) } diff --git a/lease/http.go b/lease/http.go new file mode 100644 index 000000000..bd4ffa24d --- /dev/null +++ b/lease/http.go @@ -0,0 +1,109 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lease + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "time" + + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/transport" +) + +// NewHandler returns an http Handler for lease renewals +func NewHandler(l Lessor) http.Handler { + return &leaseHandler{l} +} + +type leaseHandler struct{ l Lessor } + +func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, "error reading body", http.StatusBadRequest) + return + } + + lreq := pb.LeaseKeepAliveRequest{} + if err := lreq.Unmarshal(b); err != nil { + http.Error(w, "error unmarshalling request", http.StatusBadRequest) + return + } + + ttl, err := h.l.Renew(LeaseID(lreq.ID)) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + // TODO: fill out ResponseHeader + resp := &pb.LeaseKeepAliveResponse{ID: lreq.ID, TTL: ttl} + v, err := resp.Marshal() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/protobuf") + w.Write(v) +} + +// RenewHTTP renews a lease at a given primary server. +func RenewHTTP(id LeaseID, url string, tlsInfo transport.TLSInfo, timeout time.Duration) (int64, error) { + // will post lreq protobuf to leader + lreq, err := (&pb.LeaseKeepAliveRequest{ID: int64(id)}).Marshal() + if err != nil { + return -1, err + } + + // TODO creating a new transporter for each forward request + // can be expensive; in the future reuse transports and batch requests + rt, err := transport.NewTimeoutTransport(tlsInfo, timeout, 0, 0) + if err != nil { + return -1, err + } + + cc := &http.Client{Transport: rt, Timeout: timeout} + resp, err := cc.Post(url, "application/protobuf", bytes.NewReader(lreq)) + if err != nil { + // TODO detect if leader failed and retry? + return -1, err + } + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return -1, err + } + + if resp.StatusCode != http.StatusOK { + return -1, fmt.Errorf("lease: %s", string(b)) + } + + lresp := &pb.LeaseKeepAliveResponse{} + if err := lresp.Unmarshal(b); err != nil { + return -1, fmt.Errorf(`lease: %v. data = "%s"`, err, string(b)) + } + if lresp.ID != int64(id) { + return -1, fmt.Errorf("lease: renew id mismatch") + } + return lresp.TTL, err +}