etcd/rafthttp/util.go
Yicheng Qin 1f21ccf166 rafthttp: support sending v3 snapshot message
Use snapshotSender to send v3 snapshot message. It puts raft snapshot
message and v3 snapshot into request body, then sends it to the target peer.
When it receives http.StatusNoContent, it knows the message has been
received and processed successfully.

As receiver, snapHandler saves v3 snapshot and then processes the raft snapshot
message, then respond with http.StatusNoContent.
2015-10-13 23:11:28 -07:00

160 lines
5.1 KiB
Go

// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"encoding/binary"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/version"
)
var errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster")
func writeEntryTo(w io.Writer, ent *raftpb.Entry) error {
size := ent.Size()
if err := binary.Write(w, binary.BigEndian, uint64(size)); err != nil {
return err
}
b, err := ent.Marshal()
if err != nil {
return err
}
_, err = w.Write(b)
return err
}
func readEntryFrom(r io.Reader, ent *raftpb.Entry) error {
var l uint64
if err := binary.Read(r, binary.BigEndian, &l); err != nil {
return err
}
buf := make([]byte, int(l))
if _, err := io.ReadFull(r, buf); err != nil {
return err
}
return ent.Unmarshal(buf)
}
// createPostRequest creates a HTTP POST request that sends raft message.
func createPostRequest(u url.URL, path string, body io.Reader, ct string, from, cid types.ID) *http.Request {
uu := u
uu.Path = path
req, err := http.NewRequest("POST", uu.String(), body)
if err != nil {
plog.Panicf("unexpected new request error (%v)", err)
}
req.Header.Set("Content-Type", ct)
req.Header.Set("X-Server-From", from.String())
req.Header.Set("X-Server-Version", version.Version)
req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
req.Header.Set("X-Etcd-Cluster-ID", cid.String())
return req
}
// checkPostResponse checks the response of the HTTP POST request that sends
// raft message.
func checkPostResponse(resp *http.Response, body []byte, req *http.Request, to types.ID) error {
switch resp.StatusCode {
case http.StatusPreconditionFailed:
switch strings.TrimSuffix(string(body), "\n") {
case errIncompatibleVersion.Error():
plog.Errorf("request sent was ignored by peer %s (server version incompatible)", to)
return errIncompatibleVersion
case errClusterIDMismatch.Error():
plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
to, resp.Header.Get("X-Etcd-Cluster-ID"), req.Header.Get("X-Etcd-Cluster-ID"))
return errClusterIDMismatch
default:
return fmt.Errorf("unhandled error %q when precondition failed", string(body))
}
case http.StatusForbidden:
return errMemberRemoved
case http.StatusNoContent:
return nil
default:
return fmt.Errorf("unexpected http status %s while posting to %q", http.StatusText(resp.StatusCode), req.URL.String())
}
}
// reportErr reports the given error through sending it into
// the given error channel.
// If the error channel is filled up when sending error, it drops the error
// because the fact that error has happened is reported, which is
// good enough.
func reportCriticalError(err error, errc chan<- error) {
select {
case errc <- err:
default:
}
}
// compareMajorMinorVersion returns an integer comparing two versions based on
// their major and minor version. The result will be 0 if a==b, -1 if a < b,
// and 1 if a > b.
func compareMajorMinorVersion(a, b *semver.Version) int {
na := &semver.Version{Major: a.Major, Minor: a.Minor}
nb := &semver.Version{Major: b.Major, Minor: b.Minor}
switch {
case na.LessThan(*nb):
return -1
case nb.LessThan(*na):
return 1
default:
return 0
}
}
// serverVersion returns the server version from the given header.
func serverVersion(h http.Header) *semver.Version {
verStr := h.Get("X-Server-Version")
// backward compatibility with etcd 2.0
if verStr == "" {
verStr = "2.0.0"
}
return semver.Must(semver.NewVersion(verStr))
}
// serverVersion returns the min cluster version from the given header.
func minClusterVersion(h http.Header) *semver.Version {
verStr := h.Get("X-Min-Cluster-Version")
// backward compatibility with etcd 2.0
if verStr == "" {
verStr = "2.0.0"
}
return semver.Must(semver.NewVersion(verStr))
}
// checkVersionCompability checks whether the given version is compatible
// with the local version.
func checkVersionCompability(name string, server, minCluster *semver.Version) error {
localServer := semver.Must(semver.NewVersion(version.Version))
localMinCluster := semver.Must(semver.NewVersion(version.MinClusterVersion))
if compareMajorMinorVersion(server, localMinCluster) == -1 {
return fmt.Errorf("remote version is too low: remote[%s]=%s, local=%s", name, server, localServer)
}
if compareMajorMinorVersion(minCluster, localServer) == 1 {
return fmt.Errorf("local version is too low: remote[%s]=%s, local=%s", name, server, localServer)
}
return nil
}