Merge pull request #4260 from heyitsanthony/v3-lease-forward-keepalive

lease: forward lease keepalive to leader
This commit is contained in:
Anthony Romano 2016-01-22 12:53:17 -08:00
commit 53def2dc5e
11 changed files with 234 additions and 18 deletions

View File

@ -312,7 +312,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
Handler: etcdhttp.NewClientHandler(s, srvcfg.ReqTimeout()),
Info: cfg.corsInfo,
}
ph := etcdhttp.NewPeerHandler(s.Cluster(), s.RaftHandler())
ph := etcdhttp.NewPeerHandler(s)
// Start the peer server in a goroutine
for _, l := range plns {
go func(l net.Listener) {

View File

@ -55,11 +55,7 @@ func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) erro
ttl, err := ls.le.LeaseRenew(lease.LeaseID(req.ID))
if err != nil {
if err == lease.ErrLeaseNotFound {
return ErrLeaseNotFound
}
// TODO: handle not primary error by forwarding renew requests to leader
panic("TODO: handle not primary error by forwarding renew requests to leader")
return err
}
resp := &pb.LeaseKeepAliveResponse{ID: req.ID, TTL: ttl}

View File

@ -33,6 +33,7 @@ var (
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
ErrNoLeader = errors.New("etcdserver: no leader")
)
func isKeyNotFound(err error) bool {

View File

@ -19,15 +19,25 @@ import (
"net/http"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/rafthttp"
)
const (
peerMembersPrefix = "/members"
leasesPrefix = "/leases"
)
// NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
func NewPeerHandler(cluster etcdserver.Cluster, raftHandler http.Handler) http.Handler {
// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(s *etcdserver.EtcdServer) http.Handler {
var lh http.Handler
if l := s.Lessor(); l != nil {
lh = lease.NewHandler(l)
}
return newPeerHandler(s.Cluster(), s.RaftHandler(), lh)
}
func newPeerHandler(cluster etcdserver.Cluster, raftHandler http.Handler, leaseHandler http.Handler) http.Handler {
mh := &peerMembersHandler{
cluster: cluster,
}
@ -37,6 +47,9 @@ func NewPeerHandler(cluster etcdserver.Cluster, raftHandler http.Handler) http.H
mux.Handle(rafthttp.RaftPrefix, raftHandler)
mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)
mux.Handle(peerMembersPrefix, mh)
if leaseHandler != nil {
mux.Handle(leasesPrefix, leaseHandler)
}
mux.HandleFunc(versionPath, versionHandler(cluster, serveVersion))
return mux
}

View File

@ -33,7 +33,7 @@ func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("test data"))
})
ph := NewPeerHandler(&fakeCluster{}, h)
ph := newPeerHandler(&fakeCluster{}, h, nil)
srv := httptest.NewServer(ph)
defer srv.Close()

View File

@ -453,6 +453,8 @@ func (s *EtcdServer) Cluster() Cluster { return s.cluster }
func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
func (s *EtcdServer) Lessor() lease.Lessor { return s.lessor }
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
if s.cluster.IsIDRemoved(types.ID(m.From)) {
plog.Warningf("reject message from removed member %s", types.ID(m.From).String())

View File

@ -18,6 +18,7 @@ import (
"bytes"
"fmt"
"sort"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
@ -112,7 +113,38 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
}
func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
return s.lessor.Renew(id)
ttl, err := s.lessor.Renew(id)
if err == nil {
return ttl, nil
}
if err != lease.ErrNotPrimary {
return -1, err
}
// renewals don't go through raft; forward to leader manually
leader := s.cluster.Member(s.Leader())
for i := 0; i < 5 && leader == nil; i++ {
// wait an election
dur := time.Duration(s.cfg.ElectionTicks) * time.Duration(s.cfg.TickMs) * time.Millisecond
select {
case <-time.After(dur):
leader = s.cluster.Member(s.Leader())
case <-s.done:
return -1, ErrStopped
}
}
if leader == nil || len(leader.PeerURLs) == 0 {
return -1, ErrNoLeader
}
for _, url := range leader.PeerURLs {
lurl := url + "/leases"
ttl, err = lease.RenewHTTP(id, lurl, s.cfg.PeerTLSInfo, s.cfg.peerDialTimeout())
if err == nil {
break
}
}
return ttl, err
}
type applyResult struct {

View File

@ -782,7 +782,7 @@ func (m *member) Launch() error {
m.s.SyncTicker = time.Tick(500 * time.Millisecond)
m.s.Start()
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s.Cluster(), m.s.RaftHandler())}
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s)}
for _, ln := range m.PeerListeners {
hs := &httptest.Server{

View File

@ -1010,7 +1010,8 @@ func TestV3RangeRequest(t *testing.T) {
// TestV3LeaseRevoke ensures a key is deleted once its lease is revoked.
func TestV3LeaseRevoke(t *testing.T) {
testLeaseRemoveLeasedKey(t, func(lc pb.LeaseClient, leaseID int64) error {
testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
lc := pb.NewLeaseClient(clus.RandConn())
_, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
return err
})
@ -1056,6 +1057,67 @@ func TestV3LeaseCreateByID(t *testing.T) {
}
// TestV3LeaseKeepAlive ensures keepalive keeps the lease alive.
func TestV3LeaseKeepAlive(t *testing.T) {
testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
lc := pb.NewLeaseClient(clus.RandConn())
lreq := &pb.LeaseKeepAliveRequest{ID: leaseID}
lac, err := lc.LeaseKeepAlive(context.TODO())
if err != nil {
return err
}
defer lac.CloseSend()
// renew long enough so lease would've expired otherwise
for i := 0; i < 3; i++ {
if err = lac.Send(lreq); err != nil {
return err
}
lresp, rxerr := lac.Recv()
if rxerr != nil {
return rxerr
}
if lresp.ID != leaseID {
return fmt.Errorf("expected lease ID %v, got %v", leaseID, lresp.ID)
}
time.Sleep(time.Duration(lresp.TTL/2) * time.Second)
}
_, err = lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
return err
})
}
// TestV3LeaseExists creates a lease on a random client, then sends a keepalive on another
// client to confirm it's visible to the whole cluster.
func TestV3LeaseExists(t *testing.T) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
// create lease
lresp, err := pb.NewLeaseClient(clus.RandConn()).LeaseCreate(
context.TODO(),
&pb.LeaseCreateRequest{TTL: 30})
if err != nil {
t.Fatal(err)
}
if lresp.Error != "" {
t.Fatal(lresp.Error)
}
// confirm keepalive
lac, err := pb.NewLeaseClient(clus.RandConn()).LeaseKeepAlive(context.TODO())
if err != nil {
t.Fatal(err)
}
defer lac.CloseSend()
if err = lac.Send(&pb.LeaseKeepAliveRequest{ID: lresp.ID}); err != nil {
t.Fatal(err)
}
if _, err = lac.Recv(); err != nil {
t.Fatal(err)
}
}
// acquireLeaseAndKey creates a new lease and creates an attached key.
func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) {
// create lease
@ -1078,7 +1140,7 @@ func acquireLeaseAndKey(clus *clusterV3, key string) (int64, error) {
// testLeaseRemoveLeasedKey performs some action while holding a lease with an
// attached key "foo", then confirms the key is gone.
func testLeaseRemoveLeasedKey(t *testing.T, act func(pb.LeaseClient, int64) error) {
func testLeaseRemoveLeasedKey(t *testing.T, act func(*clusterV3, int64) error) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
defer clus.Terminate(t)
@ -1087,7 +1149,7 @@ func testLeaseRemoveLeasedKey(t *testing.T, act func(pb.LeaseClient, int64) erro
t.Fatal(err)
}
if err := act(pb.NewLeaseClient(clus.RandConn()), leaseID); err != nil {
if err = act(clus, leaseID); err != nil {
t.Fatal(err)
}

109
lease/http.go Normal file
View File

@ -0,0 +1,109 @@
// Copyright 2016 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package lease
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/transport"
)
// NewHandler returns an http Handler for lease renewals
func NewHandler(l Lessor) http.Handler {
return &leaseHandler{l}
}
type leaseHandler struct{ l Lessor }
func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "error reading body", http.StatusBadRequest)
return
}
lreq := pb.LeaseKeepAliveRequest{}
if err := lreq.Unmarshal(b); err != nil {
http.Error(w, "error unmarshalling request", http.StatusBadRequest)
return
}
ttl, err := h.l.Renew(LeaseID(lreq.ID))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// TODO: fill out ResponseHeader
resp := &pb.LeaseKeepAliveResponse{ID: lreq.ID, TTL: ttl}
v, err := resp.Marshal()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/protobuf")
w.Write(v)
}
// RenewHTTP renews a lease at a given primary server.
func RenewHTTP(id LeaseID, url string, tlsInfo transport.TLSInfo, timeout time.Duration) (int64, error) {
// will post lreq protobuf to leader
lreq, err := (&pb.LeaseKeepAliveRequest{ID: int64(id)}).Marshal()
if err != nil {
return -1, err
}
// TODO creating a new transporter for each forward request
// can be expensive; in the future reuse transports and batch requests
rt, err := transport.NewTimeoutTransport(tlsInfo, timeout, 0, 0)
if err != nil {
return -1, err
}
cc := &http.Client{Transport: rt, Timeout: timeout}
resp, err := cc.Post(url, "application/protobuf", bytes.NewReader(lreq))
if err != nil {
// TODO detect if leader failed and retry?
return -1, err
}
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return -1, err
}
if resp.StatusCode != http.StatusOK {
return -1, fmt.Errorf("lease: %s", string(b))
}
lresp := &pb.LeaseKeepAliveResponse{}
if err := lresp.Unmarshal(b); err != nil {
return -1, fmt.Errorf(`lease: %v. data = "%s"`, err, string(b))
}
if lresp.ID != int64(id) {
return -1, fmt.Errorf("lease: renew id mismatch")
}
return lresp.TTL, err
}

View File

@ -413,16 +413,17 @@ func (l Lease) removeFrom(b backend.Backend) {
// refresh refreshes the expiry of the lease. It extends the expiry at least
// minLeaseTTL second.
func (l *Lease) refresh() {
ttl := l.TTL
if l.TTL < minLeaseTTL {
ttl = minLeaseTTL
l.TTL = minLeaseTTL
}
l.expiry = time.Now().Add(time.Second * time.Duration(ttl))
l.expiry = time.Now().Add(time.Second * time.Duration(l.TTL))
}
// forever sets the expiry of lease to be forever.
func (l *Lease) forever() {
if l.TTL < minLeaseTTL {
l.TTL = minLeaseTTL
}
l.expiry = forever
}