mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
*: lease forwarding should resue transport
This commit is contained in:
parent
72ffa74476
commit
1aa312fcce
@ -179,9 +179,10 @@ type EtcdServer struct {
|
||||
// consistent index used to hold the offset of current executing entry
|
||||
// It is initialized to 0 before executing any entry.
|
||||
consistIndex consistentIndex
|
||||
// versionRt used to send requests for peer version
|
||||
versionRt http.RoundTripper
|
||||
reqIDGen *idutil.Generator
|
||||
|
||||
// peerRt used to send requests (version, lease) to peers.
|
||||
peerRt http.RoundTripper
|
||||
reqIDGen *idutil.Generator
|
||||
|
||||
// forceVersionC is used to force the version monitor loop
|
||||
// to detect the cluster version immediately.
|
||||
@ -353,7 +354,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
stats: sstats,
|
||||
lstats: lstats,
|
||||
SyncTicker: time.Tick(500 * time.Millisecond),
|
||||
versionRt: prt,
|
||||
peerRt: prt,
|
||||
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
|
||||
forceVersionC: make(chan struct{}),
|
||||
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
|
||||
@ -1223,7 +1224,7 @@ func (s *EtcdServer) monitorVersions() {
|
||||
continue
|
||||
}
|
||||
|
||||
v := decideClusterVersion(getVersions(s.cluster, s.id, s.versionRt))
|
||||
v := decideClusterVersion(getVersions(s.cluster, s.id, s.peerRt))
|
||||
if v != nil {
|
||||
// only keep major.minor version for comparison
|
||||
v = &semver.Version{
|
||||
|
@ -140,7 +140,7 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) {
|
||||
|
||||
for _, url := range leader.PeerURLs {
|
||||
lurl := url + "/leases"
|
||||
ttl, err = leasehttp.RenewHTTP(id, lurl, s.cfg.PeerTLSInfo, s.cfg.peerDialTimeout())
|
||||
ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.cfg.peerDialTimeout())
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
@ -23,7 +23,6 @@ import (
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
)
|
||||
|
||||
// NewHandler returns an http Handler for lease renewals
|
||||
@ -70,20 +69,14 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// RenewHTTP renews a lease at a given primary server.
|
||||
func RenewHTTP(id lease.LeaseID, url string, tlsInfo transport.TLSInfo, timeout time.Duration) (int64, error) {
|
||||
// TODO: Batch request in future?
|
||||
func RenewHTTP(id lease.LeaseID, url string, rt http.RoundTripper, timeout time.Duration) (int64, error) {
|
||||
// will post lreq protobuf to leader
|
||||
lreq, err := (&pb.LeaseKeepAliveRequest{ID: int64(id)}).Marshal()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
// TODO creating a new transporter for each forward request
|
||||
// can be expensive; in the future reuse transports and batch requests
|
||||
rt, err := transport.NewTimeoutTransport(tlsInfo, timeout, 0, 0)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
cc := &http.Client{Transport: rt, Timeout: timeout}
|
||||
resp, err := cc.Post(url, "application/protobuf", bytes.NewReader(lreq))
|
||||
if err != nil {
|
||||
|
Loading…
x
Reference in New Issue
Block a user