mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

The issue is not caused by this code, but by reading snapshot from disk. etcd assumes the snapshot of v3 kv should live in memory. If not, etcd does not work well anyway.
156 lines
4.2 KiB
Go
156 lines
4.2 KiB
Go
// Copyright 2015 CoreOS, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package rafthttp
|
|
|
|
import (
|
|
"bytes"
|
|
"io"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/coreos/etcd/pkg/httputil"
|
|
pioutil "github.com/coreos/etcd/pkg/ioutil"
|
|
"github.com/coreos/etcd/pkg/types"
|
|
"github.com/coreos/etcd/raft"
|
|
"github.com/coreos/etcd/snap"
|
|
)
|
|
|
|
var (
|
|
// timeout for reading snapshot response body
|
|
snapResponseReadTimeout = 5 * time.Second
|
|
)
|
|
|
|
type snapshotSender struct {
|
|
from, to types.ID
|
|
cid types.ID
|
|
|
|
tr http.RoundTripper
|
|
picker *urlPicker
|
|
status *peerStatus
|
|
r Raft
|
|
errorc chan error
|
|
|
|
stopc chan struct{}
|
|
}
|
|
|
|
func newSnapshotSender(tr http.RoundTripper, picker *urlPicker, from, to, cid types.ID, status *peerStatus, r Raft, errorc chan error) *snapshotSender {
|
|
return &snapshotSender{
|
|
from: from,
|
|
to: to,
|
|
cid: cid,
|
|
tr: tr,
|
|
picker: picker,
|
|
status: status,
|
|
r: r,
|
|
errorc: errorc,
|
|
stopc: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (s *snapshotSender) stop() { close(s.stopc) }
|
|
|
|
func (s *snapshotSender) send(merged snap.Message) {
|
|
m := merged.Message
|
|
|
|
start := time.Now()
|
|
|
|
body := createSnapBody(merged)
|
|
defer body.Close()
|
|
|
|
u := s.picker.pick()
|
|
req := createPostRequest(u, RaftSnapshotPrefix, body, "application/octet-stream", s.from, s.cid)
|
|
|
|
err := s.post(req)
|
|
if err != nil {
|
|
// errMemberRemoved is a critical error since a removed member should
|
|
// always be stopped. So we use reportCriticalError to report it to errorc.
|
|
if err == errMemberRemoved {
|
|
reportCriticalError(err, s.errorc)
|
|
}
|
|
s.picker.unreachable(u)
|
|
reportSentFailure(sendSnap, m)
|
|
s.status.deactivate(failureType{source: sendSnap, action: "post"}, err.Error())
|
|
s.r.ReportUnreachable(m.To)
|
|
// report SnapshotFailure to raft state machine. After raft state
|
|
// machine knows about it, it would pause a while and retry sending
|
|
// new snapshot message.
|
|
s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
|
|
if s.status.isActive() {
|
|
plog.Warningf("snapshot [index: %d, to: %s] failed to be sent out (%v)", m.Snapshot.Metadata.Index, types.ID(m.To), err)
|
|
} else {
|
|
plog.Debugf("snapshot [index: %d, to: %s] failed to be sent out (%v)", m.Snapshot.Metadata.Index, types.ID(m.To), err)
|
|
}
|
|
return
|
|
}
|
|
reportSentDuration(sendSnap, m, time.Since(start))
|
|
s.status.activate()
|
|
s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
|
|
plog.Infof("snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
|
|
}
|
|
|
|
// post posts the given request.
|
|
// It returns nil when request is sent out and processed successfully.
|
|
func (s *snapshotSender) post(req *http.Request) (err error) {
|
|
cancel := httputil.RequestCanceler(s.tr, req)
|
|
|
|
type responseAndError struct {
|
|
resp *http.Response
|
|
body []byte
|
|
err error
|
|
}
|
|
result := make(chan responseAndError, 1)
|
|
|
|
go func() {
|
|
resp, err := s.tr.RoundTrip(req)
|
|
if err != nil {
|
|
result <- responseAndError{resp, nil, err}
|
|
return
|
|
}
|
|
|
|
// close the response body when timeouts.
|
|
// prevents from reading the body forever when the other side dies right after
|
|
// successfully receives the request body.
|
|
time.AfterFunc(snapResponseReadTimeout, func() { resp.Body.Close() })
|
|
body, err := ioutil.ReadAll(resp.Body)
|
|
result <- responseAndError{resp, body, err}
|
|
}()
|
|
|
|
select {
|
|
case <-s.stopc:
|
|
cancel()
|
|
return errStopped
|
|
case r := <-result:
|
|
if r.err != nil {
|
|
return r.err
|
|
}
|
|
return checkPostResponse(r.resp, r.body, req, s.to)
|
|
}
|
|
}
|
|
|
|
func createSnapBody(merged snap.Message) io.ReadCloser {
|
|
buf := new(bytes.Buffer)
|
|
enc := &messageEncoder{w: buf}
|
|
// encode raft message
|
|
if err := enc.encode(merged.Message); err != nil {
|
|
plog.Panicf("encode message error (%v)", err)
|
|
}
|
|
|
|
return &pioutil.ReaderAndCloser{
|
|
Reader: io.MultiReader(buf, merged.ReadCloser),
|
|
Closer: merged.ReadCloser,
|
|
}
|
|
}
|