Merge pull request #4274 from xiang90/leasehttp

leasehttp: move lease/http.go to its own pkg
This commit is contained in:
Xiang Li 2016-01-25 10:25:25 +08:00
commit b1a45fe1c8
5 changed files with 30 additions and 26 deletions

View File

@ -19,7 +19,7 @@ import (
"net/http" "net/http"
"github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/lease" "github.com/coreos/etcd/lease/leasehttp"
"github.com/coreos/etcd/rafthttp" "github.com/coreos/etcd/rafthttp"
) )
@ -32,7 +32,7 @@ const (
func NewPeerHandler(s *etcdserver.EtcdServer) http.Handler { func NewPeerHandler(s *etcdserver.EtcdServer) http.Handler {
var lh http.Handler var lh http.Handler
if l := s.Lessor(); l != nil { if l := s.Lessor(); l != nil {
lh = lease.NewHandler(l) lh = leasehttp.NewHandler(l)
} }
return newPeerHandler(s.Cluster(), s.RaftHandler(), lh) return newPeerHandler(s.Cluster(), s.RaftHandler(), lh)
} }

View File

@ -62,8 +62,11 @@ const (
StoreClusterPrefix = "/0" StoreClusterPrefix = "/0"
StoreKeysPrefix = "/1" StoreKeysPrefix = "/1"
purgeFileInterval = 30 * time.Second purgeFileInterval = 30 * time.Second
monitorVersionInterval = 5 * time.Second // monitorVersionInterval should be smaller than the timeout
// on the connection. Or we will not be able to resue the connection
// (since it will timeout).
monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second
databaseFilename = "db" databaseFilename = "db"
// max number of in-flight snapshot messages etcdserver allows to have // max number of in-flight snapshot messages etcdserver allows to have
@ -179,9 +182,10 @@ type EtcdServer struct {
// consistent index used to hold the offset of current executing entry // consistent index used to hold the offset of current executing entry
// It is initialized to 0 before executing any entry. // It is initialized to 0 before executing any entry.
consistIndex consistentIndex consistIndex consistentIndex
// versionRt used to send requests for peer version
versionRt http.RoundTripper // peerRt used to send requests (version, lease) to peers.
reqIDGen *idutil.Generator peerRt http.RoundTripper
reqIDGen *idutil.Generator
// forceVersionC is used to force the version monitor loop // forceVersionC is used to force the version monitor loop
// to detect the cluster version immediately. // to detect the cluster version immediately.
@ -353,7 +357,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
stats: sstats, stats: sstats,
lstats: lstats, lstats: lstats,
SyncTicker: time.Tick(500 * time.Millisecond), SyncTicker: time.Tick(500 * time.Millisecond),
versionRt: prt, peerRt: prt,
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()), reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
forceVersionC: make(chan struct{}), forceVersionC: make(chan struct{}),
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
@ -1223,7 +1227,7 @@ func (s *EtcdServer) monitorVersions() {
continue continue
} }
v := decideClusterVersion(getVersions(s.cluster, s.id, s.versionRt)) v := decideClusterVersion(getVersions(s.cluster, s.id, s.peerRt))
if v != nil { if v != nil {
// only keep major.minor version for comparison // only keep major.minor version for comparison
v = &semver.Version{ v = &semver.Version{

View File

@ -24,6 +24,7 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/lease" "github.com/coreos/etcd/lease"
"github.com/coreos/etcd/lease/leasehttp"
dstorage "github.com/coreos/etcd/storage" dstorage "github.com/coreos/etcd/storage"
"github.com/coreos/etcd/storage/storagepb" "github.com/coreos/etcd/storage/storagepb"
) )
@ -139,7 +140,7 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
for _, url := range leader.PeerURLs { for _, url := range leader.PeerURLs {
lurl := url + "/leases" lurl := url + "/leases"
ttl, err = lease.RenewHTTP(id, lurl, s.cfg.PeerTLSInfo, s.cfg.peerDialTimeout()) ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.cfg.peerDialTimeout())
if err == nil { if err == nil {
break break
} }

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package lease package leasehttp
import ( import (
"bytes" "bytes"
@ -22,15 +22,15 @@ import (
"time" "time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/lease"
) )
// NewHandler returns an http Handler for lease renewals // NewHandler returns an http Handler for lease renewals
func NewHandler(l Lessor) http.Handler { func NewHandler(l lease.Lessor) http.Handler {
return &leaseHandler{l} return &leaseHandler{l}
} }
type leaseHandler struct{ l Lessor } type leaseHandler struct{ l lease.Lessor }
func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" { if r.Method != "POST" {
@ -50,7 +50,7 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
ttl, err := h.l.Renew(LeaseID(lreq.ID)) ttl, err := h.l.Renew(lease.LeaseID(lreq.ID))
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest) http.Error(w, err.Error(), http.StatusBadRequest)
return return
@ -69,20 +69,14 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
// RenewHTTP renews a lease at a given primary server. // RenewHTTP renews a lease at a given primary server.
func RenewHTTP(id LeaseID, url string, tlsInfo transport.TLSInfo, timeout time.Duration) (int64, error) { // TODO: Batch request in future?
func RenewHTTP(id lease.LeaseID, url string, rt http.RoundTripper, timeout time.Duration) (int64, error) {
// will post lreq protobuf to leader // will post lreq protobuf to leader
lreq, err := (&pb.LeaseKeepAliveRequest{ID: int64(id)}).Marshal() lreq, err := (&pb.LeaseKeepAliveRequest{ID: int64(id)}).Marshal()
if err != nil { if err != nil {
return -1, err return -1, err
} }
// TODO creating a new transporter for each forward request
// can be expensive; in the future reuse transports and batch requests
rt, err := transport.NewTimeoutTransport(tlsInfo, timeout, 0, 0)
if err != nil {
return -1, err
}
cc := &http.Client{Transport: rt, Timeout: timeout} cc := &http.Client{Transport: rt, Timeout: timeout}
resp, err := cc.Post(url, "application/protobuf", bytes.NewReader(lreq)) resp, err := cc.Post(url, "application/protobuf", bytes.NewReader(lreq))
if err != nil { if err != nil {

View File

@ -23,14 +23,19 @@ import (
// NewTimeoutTransport returns a transport created using the given TLS info. // NewTimeoutTransport returns a transport created using the given TLS info.
// If read/write on the created connection blocks longer than its time limit, // If read/write on the created connection blocks longer than its time limit,
// it will return timeout error. // it will return timeout error.
// If read/write timeout is set, transport will not be able to reuse connection.
func NewTimeoutTransport(info TLSInfo, dialtimeoutd, rdtimeoutd, wtimeoutd time.Duration) (*http.Transport, error) { func NewTimeoutTransport(info TLSInfo, dialtimeoutd, rdtimeoutd, wtimeoutd time.Duration) (*http.Transport, error) {
tr, err := NewTransport(info, dialtimeoutd) tr, err := NewTransport(info, dialtimeoutd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// the timeouted connection will timeout soon after it is idle.
// it should not be put back to http transport as an idle connection for future usage. if rdtimeoutd != 0 || wtimeoutd != 0 {
tr.MaxIdleConnsPerHost = -1 // the timeouted connection will timeout soon after it is idle.
// it should not be put back to http transport as an idle connection for future usage.
tr.MaxIdleConnsPerHost = -1
}
tr.Dial = (&rwTimeoutDialer{ tr.Dial = (&rwTimeoutDialer{
Dialer: net.Dialer{ Dialer: net.Dialer{
Timeout: dialtimeoutd, Timeout: dialtimeoutd,