mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #3796 from yichengq/fix-get-version
etcdserver: not reuse connections for peer transport
This commit is contained in:
commit
3d15526c35
@ -210,7 +210,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||
plog.Warningf("The scheme of peer url %s is http while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
|
||||
}
|
||||
var l net.Listener
|
||||
l, err = transport.NewTimeoutListener(u.Host, u.Scheme, cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
l, err = rafthttp.NewListener(u, cfg.peerTLSInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -29,8 +29,8 @@ import (
|
||||
|
||||
// isMemberBootstrapped tries to check if the given member has been bootstrapped
|
||||
// in the given cluster.
|
||||
func isMemberBootstrapped(cl *cluster, member string, tr *http.Transport) bool {
|
||||
rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), time.Second, false, tr)
|
||||
func isMemberBootstrapped(cl *cluster, member string, rt http.RoundTripper) bool {
|
||||
rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), time.Second, false, rt)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
@ -52,14 +52,14 @@ func isMemberBootstrapped(cl *cluster, member string, tr *http.Transport) bool {
|
||||
// response, an error is returned.
|
||||
// Each request has a 10-second timeout. Because the upper limit of TTL is 5s,
|
||||
// 10 second is enough for building connection and finishing request.
|
||||
func GetClusterFromRemotePeers(urls []string, tr *http.Transport) (*cluster, error) {
|
||||
return getClusterFromRemotePeers(urls, 10*time.Second, true, tr)
|
||||
func GetClusterFromRemotePeers(urls []string, rt http.RoundTripper) (*cluster, error) {
|
||||
return getClusterFromRemotePeers(urls, 10*time.Second, true, rt)
|
||||
}
|
||||
|
||||
// If logerr is true, it prints out more error messages.
|
||||
func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool, tr *http.Transport) (*cluster, error) {
|
||||
func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool, rt http.RoundTripper) (*cluster, error) {
|
||||
cc := &http.Client{
|
||||
Transport: tr,
|
||||
Transport: rt,
|
||||
Timeout: timeout,
|
||||
}
|
||||
for _, u := range urls {
|
||||
@ -114,7 +114,7 @@ func getRemotePeerURLs(cl Cluster, local string) []string {
|
||||
// The key of the returned map is the member's ID. The value of the returned map
|
||||
// is the semver versions string, including server and cluster.
|
||||
// If it fails to get the version of a member, the key will be nil.
|
||||
func getVersions(cl Cluster, local types.ID, tr *http.Transport) map[string]*version.Versions {
|
||||
func getVersions(cl Cluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions {
|
||||
members := cl.Members()
|
||||
vers := make(map[string]*version.Versions)
|
||||
for _, m := range members {
|
||||
@ -126,7 +126,7 @@ func getVersions(cl Cluster, local types.ID, tr *http.Transport) map[string]*ver
|
||||
vers[m.ID.String()] = &version.Versions{Server: version.Version, Cluster: cv}
|
||||
continue
|
||||
}
|
||||
ver, err := getVersion(m, tr)
|
||||
ver, err := getVersion(m, rt)
|
||||
if err != nil {
|
||||
plog.Warningf("cannot get the version of member %s (%v)", m.ID, err)
|
||||
vers[m.ID.String()] = nil
|
||||
@ -172,8 +172,8 @@ func decideClusterVersion(vers map[string]*version.Versions) *semver.Version {
|
||||
// cluster version in the range of [MinClusterVersion, Version] and no known members has a cluster version
|
||||
// out of the range.
|
||||
// We set this rule since when the local member joins, another member might be offline.
|
||||
func isCompatibleWithCluster(cl Cluster, local types.ID, tr *http.Transport) bool {
|
||||
vers := getVersions(cl, local, tr)
|
||||
func isCompatibleWithCluster(cl Cluster, local types.ID, rt http.RoundTripper) bool {
|
||||
vers := getVersions(cl, local, rt)
|
||||
minV := semver.Must(semver.NewVersion(version.MinClusterVersion))
|
||||
maxV := semver.Must(semver.NewVersion(version.Version))
|
||||
maxV = &semver.Version{
|
||||
@ -214,9 +214,9 @@ func isCompatibleWithVers(vers map[string]*version.Versions, local types.ID, min
|
||||
|
||||
// getVersion returns the Versions of the given member via its
|
||||
// peerURLs. Returns the last error if it fails to get the version.
|
||||
func getVersion(m *Member, tr *http.Transport) (*version.Versions, error) {
|
||||
func getVersion(m *Member, rt http.RoundTripper) (*version.Versions, error) {
|
||||
cc := &http.Client{
|
||||
Transport: tr,
|
||||
Transport: rt,
|
||||
}
|
||||
var (
|
||||
err error
|
||||
|
@ -40,7 +40,6 @@ import (
|
||||
"github.com/coreos/etcd/pkg/pbutil"
|
||||
"github.com/coreos/etcd/pkg/runtime"
|
||||
"github.com/coreos/etcd/pkg/timeutil"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/pkg/wait"
|
||||
"github.com/coreos/etcd/raft"
|
||||
@ -171,8 +170,8 @@ type EtcdServer struct {
|
||||
// consistent index used to hold the offset of current executing entry
|
||||
// It is initialized to 0 before executing any entry.
|
||||
consistIndex consistentIndex
|
||||
// versionTr used to send requests for peer version
|
||||
versionTr *http.Transport
|
||||
// versionRt used to send requests for peer version
|
||||
versionRt http.RoundTripper
|
||||
reqIDGen *idutil.Generator
|
||||
|
||||
// forceVersionC is used to force the version monitor loop
|
||||
@ -211,7 +210,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
haveWAL := wal.Exist(cfg.WALDir())
|
||||
ss := snap.New(cfg.SnapDir())
|
||||
|
||||
pt, err := transport.NewTransport(cfg.PeerTLSInfo, cfg.peerDialTimeout())
|
||||
prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -225,14 +224,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), pt)
|
||||
existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), prt)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err)
|
||||
}
|
||||
if err := ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
|
||||
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
|
||||
}
|
||||
if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, pt) {
|
||||
if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, prt) {
|
||||
return nil, fmt.Errorf("incomptible with current running cluster")
|
||||
}
|
||||
|
||||
@ -250,7 +249,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
return nil, err
|
||||
}
|
||||
m := cl.MemberByName(cfg.Name)
|
||||
if isMemberBootstrapped(cl, cfg.Name, pt) {
|
||||
if isMemberBootstrapped(cl, cfg.Name, prt) {
|
||||
return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
|
||||
}
|
||||
if cfg.ShouldDiscover() {
|
||||
@ -338,7 +337,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
stats: sstats,
|
||||
lstats: lstats,
|
||||
SyncTicker: time.Tick(500 * time.Millisecond),
|
||||
versionTr: pt,
|
||||
versionRt: prt,
|
||||
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
|
||||
forceVersionC: make(chan struct{}),
|
||||
}
|
||||
@ -1090,7 +1089,7 @@ func (s *EtcdServer) monitorVersions() {
|
||||
continue
|
||||
}
|
||||
|
||||
v := decideClusterVersion(getVersions(s.cluster, s.id, s.versionTr))
|
||||
v := decideClusterVersion(getVersions(s.cluster, s.id, s.versionRt))
|
||||
if v != nil {
|
||||
// only keep major.minor version for comparasion
|
||||
v = &semver.Version{
|
||||
|
@ -137,14 +137,11 @@ type Transport struct {
|
||||
|
||||
func (t *Transport) Start() error {
|
||||
var err error
|
||||
// Read/write timeout is set for stream roundTripper to promptly
|
||||
// find out broken status, which minimizes the number of messages
|
||||
// sent on broken connection.
|
||||
t.streamRt, err = transport.NewTimeoutTransport(t.TLSInfo, t.DialTimeout, ConnReadTimeout, ConnWriteTimeout)
|
||||
t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.pipelineRt, err = transport.NewTransport(t.TLSInfo, t.DialTimeout)
|
||||
t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -18,11 +18,14 @@ import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/version"
|
||||
@ -30,6 +33,30 @@ import (
|
||||
|
||||
var errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster")
|
||||
|
||||
// NewListener returns a listener for raft message transfer between peers.
|
||||
// It uses timeout listener to identify broken streams promptly.
|
||||
func NewListener(u url.URL, tlsInfo transport.TLSInfo) (net.Listener, error) {
|
||||
return transport.NewTimeoutListener(u.Host, u.Scheme, tlsInfo, ConnReadTimeout, ConnWriteTimeout)
|
||||
}
|
||||
|
||||
// NewRoundTripper returns a roundTripper used to send requests
|
||||
// to rafthttp listener of remote peers.
|
||||
func NewRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
|
||||
// It uses timeout transport to pair with remote timeout listeners.
|
||||
// It sets no read/write timeout, because message in requests may
|
||||
// take long time to write out before reading out the response.
|
||||
return transport.NewTimeoutTransport(tlsInfo, dialTimeout, 0, 0)
|
||||
}
|
||||
|
||||
// newStreamRoundTripper returns a roundTripper used to send stream requests
|
||||
// to rafthttp listener of remote peers.
|
||||
// Read/write timeout is set for stream roundTripper to promptly
|
||||
// find out broken status, which minimizes the number of messages
|
||||
// sent on broken connection.
|
||||
func newStreamRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
|
||||
return transport.NewTimeoutTransport(tlsInfo, dialTimeout, ConnReadTimeout, ConnWriteTimeout)
|
||||
}
|
||||
|
||||
func writeEntryTo(w io.Writer, ent *raftpb.Entry) error {
|
||||
size := ent.Size()
|
||||
if err := binary.Write(w, binary.BigEndian, uint64(size)); err != nil {
|
||||
|
Loading…
x
Reference in New Issue
Block a user