mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

This moves the code to create listener and roundTripper for raft communication to the same place, and use explicit functions to build them. This prevents possible development errors in the future.
187 lines
6.4 KiB
Go
187 lines
6.4 KiB
Go
// Copyright 2015 CoreOS, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package rafthttp
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
|
|
"github.com/coreos/etcd/pkg/transport"
|
|
"github.com/coreos/etcd/pkg/types"
|
|
"github.com/coreos/etcd/raft/raftpb"
|
|
"github.com/coreos/etcd/version"
|
|
)
|
|
|
|
var errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster")
|
|
|
|
// NewListener returns a listener for raft message transfer between peers.
|
|
// It uses timeout listener to identify broken streams promptly.
|
|
func NewListener(u url.URL, tlsInfo transport.TLSInfo) (net.Listener, error) {
|
|
return transport.NewTimeoutListener(u.Host, u.Scheme, tlsInfo, ConnReadTimeout, ConnWriteTimeout)
|
|
}
|
|
|
|
// NewRoundTripper returns a roundTripper used to send requests
|
|
// to rafthttp listener of remote peers.
|
|
func NewRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
|
|
// It uses timeout transport to pair with remote timeout listeners.
|
|
// It sets no read/write timeout, because message in requests may
|
|
// take long time to write out before reading out the response.
|
|
return transport.NewTimeoutTransport(tlsInfo, dialTimeout, 0, 0)
|
|
}
|
|
|
|
// newStreamRoundTripper returns a roundTripper used to send stream requests
|
|
// to rafthttp listener of remote peers.
|
|
// Read/write timeout is set for stream roundTripper to promptly
|
|
// find out broken status, which minimizes the number of messages
|
|
// sent on broken connection.
|
|
func newStreamRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
|
|
return transport.NewTimeoutTransport(tlsInfo, dialTimeout, ConnReadTimeout, ConnWriteTimeout)
|
|
}
|
|
|
|
func writeEntryTo(w io.Writer, ent *raftpb.Entry) error {
|
|
size := ent.Size()
|
|
if err := binary.Write(w, binary.BigEndian, uint64(size)); err != nil {
|
|
return err
|
|
}
|
|
b, err := ent.Marshal()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = w.Write(b)
|
|
return err
|
|
}
|
|
|
|
func readEntryFrom(r io.Reader, ent *raftpb.Entry) error {
|
|
var l uint64
|
|
if err := binary.Read(r, binary.BigEndian, &l); err != nil {
|
|
return err
|
|
}
|
|
buf := make([]byte, int(l))
|
|
if _, err := io.ReadFull(r, buf); err != nil {
|
|
return err
|
|
}
|
|
return ent.Unmarshal(buf)
|
|
}
|
|
|
|
// createPostRequest creates a HTTP POST request that sends raft message.
|
|
func createPostRequest(u url.URL, path string, body io.Reader, ct string, from, cid types.ID) *http.Request {
|
|
uu := u
|
|
uu.Path = path
|
|
req, err := http.NewRequest("POST", uu.String(), body)
|
|
if err != nil {
|
|
plog.Panicf("unexpected new request error (%v)", err)
|
|
}
|
|
req.Header.Set("Content-Type", ct)
|
|
req.Header.Set("X-Server-From", from.String())
|
|
req.Header.Set("X-Server-Version", version.Version)
|
|
req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
|
|
req.Header.Set("X-Etcd-Cluster-ID", cid.String())
|
|
return req
|
|
}
|
|
|
|
// checkPostResponse checks the response of the HTTP POST request that sends
|
|
// raft message.
|
|
func checkPostResponse(resp *http.Response, body []byte, req *http.Request, to types.ID) error {
|
|
switch resp.StatusCode {
|
|
case http.StatusPreconditionFailed:
|
|
switch strings.TrimSuffix(string(body), "\n") {
|
|
case errIncompatibleVersion.Error():
|
|
plog.Errorf("request sent was ignored by peer %s (server version incompatible)", to)
|
|
return errIncompatibleVersion
|
|
case errClusterIDMismatch.Error():
|
|
plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
|
|
to, resp.Header.Get("X-Etcd-Cluster-ID"), req.Header.Get("X-Etcd-Cluster-ID"))
|
|
return errClusterIDMismatch
|
|
default:
|
|
return fmt.Errorf("unhandled error %q when precondition failed", string(body))
|
|
}
|
|
case http.StatusForbidden:
|
|
return errMemberRemoved
|
|
case http.StatusNoContent:
|
|
return nil
|
|
default:
|
|
return fmt.Errorf("unexpected http status %s while posting to %q", http.StatusText(resp.StatusCode), req.URL.String())
|
|
}
|
|
}
|
|
|
|
// reportErr reports the given error through sending it into
|
|
// the given error channel.
|
|
// If the error channel is filled up when sending error, it drops the error
|
|
// because the fact that error has happened is reported, which is
|
|
// good enough.
|
|
func reportCriticalError(err error, errc chan<- error) {
|
|
select {
|
|
case errc <- err:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// compareMajorMinorVersion returns an integer comparing two versions based on
|
|
// their major and minor version. The result will be 0 if a==b, -1 if a < b,
|
|
// and 1 if a > b.
|
|
func compareMajorMinorVersion(a, b *semver.Version) int {
|
|
na := &semver.Version{Major: a.Major, Minor: a.Minor}
|
|
nb := &semver.Version{Major: b.Major, Minor: b.Minor}
|
|
switch {
|
|
case na.LessThan(*nb):
|
|
return -1
|
|
case nb.LessThan(*na):
|
|
return 1
|
|
default:
|
|
return 0
|
|
}
|
|
}
|
|
|
|
// serverVersion returns the server version from the given header.
|
|
func serverVersion(h http.Header) *semver.Version {
|
|
verStr := h.Get("X-Server-Version")
|
|
// backward compatibility with etcd 2.0
|
|
if verStr == "" {
|
|
verStr = "2.0.0"
|
|
}
|
|
return semver.Must(semver.NewVersion(verStr))
|
|
}
|
|
|
|
// serverVersion returns the min cluster version from the given header.
|
|
func minClusterVersion(h http.Header) *semver.Version {
|
|
verStr := h.Get("X-Min-Cluster-Version")
|
|
// backward compatibility with etcd 2.0
|
|
if verStr == "" {
|
|
verStr = "2.0.0"
|
|
}
|
|
return semver.Must(semver.NewVersion(verStr))
|
|
}
|
|
|
|
// checkVersionCompability checks whether the given version is compatible
|
|
// with the local version.
|
|
func checkVersionCompability(name string, server, minCluster *semver.Version) error {
|
|
localServer := semver.Must(semver.NewVersion(version.Version))
|
|
localMinCluster := semver.Must(semver.NewVersion(version.MinClusterVersion))
|
|
if compareMajorMinorVersion(server, localMinCluster) == -1 {
|
|
return fmt.Errorf("remote version is too low: remote[%s]=%s, local=%s", name, server, localServer)
|
|
}
|
|
if compareMajorMinorVersion(minCluster, localServer) == 1 {
|
|
return fmt.Errorf("local version is too low: remote[%s]=%s, local=%s", name, server, localServer)
|
|
}
|
|
return nil
|
|
}
|