mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
rafthttp: rename "snap" to "raftsnap" package
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
parent
6a70a931d3
commit
bb2c48b38a
@ -26,7 +26,7 @@ import (
|
|||||||
pioutil "github.com/coreos/etcd/pkg/ioutil"
|
pioutil "github.com/coreos/etcd/pkg/ioutil"
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
"github.com/coreos/etcd/snap"
|
"github.com/coreos/etcd/raftsnap"
|
||||||
"github.com/coreos/etcd/version"
|
"github.com/coreos/etcd/version"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -136,11 +136,11 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
type snapshotHandler struct {
|
type snapshotHandler struct {
|
||||||
tr Transporter
|
tr Transporter
|
||||||
r Raft
|
r Raft
|
||||||
snapshotter *snap.Snapshotter
|
snapshotter *raftsnap.Snapshotter
|
||||||
cid types.ID
|
cid types.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSnapshotHandler(tr Transporter, r Raft, snapshotter *snap.Snapshotter, cid types.ID) http.Handler {
|
func newSnapshotHandler(tr Transporter, r Raft, snapshotter *raftsnap.Snapshotter, cid types.ID) http.Handler {
|
||||||
return &snapshotHandler{
|
return &snapshotHandler{
|
||||||
tr: tr,
|
tr: tr,
|
||||||
r: r,
|
r: r,
|
||||||
|
@ -29,7 +29,7 @@ import (
|
|||||||
"github.com/coreos/etcd/pkg/pbutil"
|
"github.com/coreos/etcd/pkg/pbutil"
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
"github.com/coreos/etcd/snap"
|
"github.com/coreos/etcd/raftsnap"
|
||||||
"github.com/coreos/etcd/version"
|
"github.com/coreos/etcd/version"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -356,7 +356,7 @@ func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] }
|
|||||||
|
|
||||||
type fakePeer struct {
|
type fakePeer struct {
|
||||||
msgs []raftpb.Message
|
msgs []raftpb.Message
|
||||||
snapMsgs []snap.Message
|
snapMsgs []raftsnap.Message
|
||||||
peerURLs types.URLs
|
peerURLs types.URLs
|
||||||
connc chan *outgoingConn
|
connc chan *outgoingConn
|
||||||
paused bool
|
paused bool
|
||||||
@ -377,7 +377,7 @@ func (pr *fakePeer) send(m raftpb.Message) {
|
|||||||
pr.msgs = append(pr.msgs, m)
|
pr.msgs = append(pr.msgs, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pr *fakePeer) sendSnap(m snap.Message) {
|
func (pr *fakePeer) sendSnap(m raftsnap.Message) {
|
||||||
if pr.paused {
|
if pr.paused {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -23,7 +23,7 @@ import (
|
|||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/raft"
|
"github.com/coreos/etcd/raft"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
"github.com/coreos/etcd/snap"
|
"github.com/coreos/etcd/raftsnap"
|
||||||
|
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
@ -63,7 +63,7 @@ type Peer interface {
|
|||||||
|
|
||||||
// sendSnap sends the merged snapshot message to the remote peer. Its behavior
|
// sendSnap sends the merged snapshot message to the remote peer. Its behavior
|
||||||
// is similar to send.
|
// is similar to send.
|
||||||
sendSnap(m snap.Message)
|
sendSnap(m raftsnap.Message)
|
||||||
|
|
||||||
// update updates the urls of remote peer.
|
// update updates the urls of remote peer.
|
||||||
update(urls types.URLs)
|
update(urls types.URLs)
|
||||||
@ -233,7 +233,7 @@ func (p *peer) send(m raftpb.Message) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *peer) sendSnap(m snap.Message) {
|
func (p *peer) sendSnap(m raftsnap.Message) {
|
||||||
go p.snapSender.send(m)
|
go p.snapSender.send(m)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,7 +26,7 @@ import (
|
|||||||
pioutil "github.com/coreos/etcd/pkg/ioutil"
|
pioutil "github.com/coreos/etcd/pkg/ioutil"
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/raft"
|
"github.com/coreos/etcd/raft"
|
||||||
"github.com/coreos/etcd/snap"
|
"github.com/coreos/etcd/raftsnap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -63,7 +63,7 @@ func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *pe
|
|||||||
|
|
||||||
func (s *snapshotSender) stop() { close(s.stopc) }
|
func (s *snapshotSender) stop() { close(s.stopc) }
|
||||||
|
|
||||||
func (s *snapshotSender) send(merged snap.Message) {
|
func (s *snapshotSender) send(merged raftsnap.Message) {
|
||||||
m := merged.Message
|
m := merged.Message
|
||||||
|
|
||||||
body := createSnapBody(merged)
|
body := createSnapBody(merged)
|
||||||
@ -142,7 +142,7 @@ func (s *snapshotSender) post(req *http.Request) (err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func createSnapBody(merged snap.Message) io.ReadCloser {
|
func createSnapBody(merged raftsnap.Message) io.ReadCloser {
|
||||||
buf := new(bytes.Buffer)
|
buf := new(bytes.Buffer)
|
||||||
enc := &messageEncoder{w: buf}
|
enc := &messageEncoder{w: buf}
|
||||||
// encode raft message
|
// encode raft message
|
||||||
|
@ -27,7 +27,7 @@ import (
|
|||||||
|
|
||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
"github.com/coreos/etcd/snap"
|
"github.com/coreos/etcd/raftsnap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type strReaderCloser struct{ *strings.Reader }
|
type strReaderCloser struct{ *strings.Reader }
|
||||||
@ -82,7 +82,7 @@ func TestSnapshotSend(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
sent, files := testSnapshotSend(t, snap.NewMessage(tt.m, tt.rc, tt.size))
|
sent, files := testSnapshotSend(t, raftsnap.NewMessage(tt.m, tt.rc, tt.size))
|
||||||
if tt.wsent != sent {
|
if tt.wsent != sent {
|
||||||
t.Errorf("#%d: snapshot expected %v, got %v", i, tt.wsent, sent)
|
t.Errorf("#%d: snapshot expected %v, got %v", i, tt.wsent, sent)
|
||||||
}
|
}
|
||||||
@ -92,7 +92,7 @@ func TestSnapshotSend(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testSnapshotSend(t *testing.T, sm *snap.Message) (bool, []os.FileInfo) {
|
func testSnapshotSend(t *testing.T, sm *raftsnap.Message) (bool, []os.FileInfo) {
|
||||||
d, err := ioutil.TempDir(os.TempDir(), "snapdir")
|
d, err := ioutil.TempDir(os.TempDir(), "snapdir")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -102,7 +102,7 @@ func testSnapshotSend(t *testing.T, sm *snap.Message) (bool, []os.FileInfo) {
|
|||||||
r := &fakeRaft{}
|
r := &fakeRaft{}
|
||||||
tr := &Transport{pipelineRt: &http.Transport{}, ClusterID: types.ID(1), Raft: r}
|
tr := &Transport{pipelineRt: &http.Transport{}, ClusterID: types.ID(1), Raft: r}
|
||||||
ch := make(chan struct{}, 1)
|
ch := make(chan struct{}, 1)
|
||||||
h := &syncHandler{newSnapshotHandler(tr, r, snap.New(d), types.ID(1)), ch}
|
h := &syncHandler{newSnapshotHandler(tr, r, raftsnap.New(d), types.ID(1)), ch}
|
||||||
srv := httptest.NewServer(h)
|
srv := httptest.NewServer(h)
|
||||||
defer srv.Close()
|
defer srv.Close()
|
||||||
|
|
||||||
|
@ -26,7 +26,7 @@ import (
|
|||||||
"github.com/coreos/etcd/pkg/types"
|
"github.com/coreos/etcd/pkg/types"
|
||||||
"github.com/coreos/etcd/raft"
|
"github.com/coreos/etcd/raft"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
"github.com/coreos/etcd/snap"
|
"github.com/coreos/etcd/raftsnap"
|
||||||
|
|
||||||
"github.com/coreos/pkg/capnslog"
|
"github.com/coreos/pkg/capnslog"
|
||||||
"github.com/xiang90/probing"
|
"github.com/xiang90/probing"
|
||||||
@ -60,7 +60,7 @@ type Transporter interface {
|
|||||||
Send(m []raftpb.Message)
|
Send(m []raftpb.Message)
|
||||||
// SendSnapshot sends out the given snapshot message to a remote peer.
|
// SendSnapshot sends out the given snapshot message to a remote peer.
|
||||||
// The behavior of SendSnapshot is similar to Send.
|
// The behavior of SendSnapshot is similar to Send.
|
||||||
SendSnapshot(m snap.Message)
|
SendSnapshot(m raftsnap.Message)
|
||||||
// AddRemote adds a remote with given peer urls into the transport.
|
// AddRemote adds a remote with given peer urls into the transport.
|
||||||
// A remote helps newly joined member to catch up the progress of cluster,
|
// A remote helps newly joined member to catch up the progress of cluster,
|
||||||
// and will not be used after that.
|
// and will not be used after that.
|
||||||
@ -107,7 +107,7 @@ type Transport struct {
|
|||||||
URLs types.URLs // local peer URLs
|
URLs types.URLs // local peer URLs
|
||||||
ClusterID types.ID // raft cluster ID for request validation
|
ClusterID types.ID // raft cluster ID for request validation
|
||||||
Raft Raft // raft state machine, to which the Transport forwards received messages and reports status
|
Raft Raft // raft state machine, to which the Transport forwards received messages and reports status
|
||||||
Snapshotter *snap.Snapshotter
|
Snapshotter *raftsnap.Snapshotter
|
||||||
ServerStats *stats.ServerStats // used to record general transportation statistics
|
ServerStats *stats.ServerStats // used to record general transportation statistics
|
||||||
// used to record transportation statistics with followers when
|
// used to record transportation statistics with followers when
|
||||||
// performing as leader in raft protocol
|
// performing as leader in raft protocol
|
||||||
@ -346,7 +346,7 @@ func (t *Transport) ActiveSince(id types.ID) time.Time {
|
|||||||
return time.Time{}
|
return time.Time{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Transport) SendSnapshot(m snap.Message) {
|
func (t *Transport) SendSnapshot(m raftsnap.Message) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
defer t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
p := t.peers[types.ID(m.To)]
|
p := t.peers[types.ID(m.To)]
|
||||||
@ -384,7 +384,7 @@ func NewNopTransporter() Transporter {
|
|||||||
func (s *nopTransporter) Start() error { return nil }
|
func (s *nopTransporter) Start() error { return nil }
|
||||||
func (s *nopTransporter) Handler() http.Handler { return nil }
|
func (s *nopTransporter) Handler() http.Handler { return nil }
|
||||||
func (s *nopTransporter) Send(m []raftpb.Message) {}
|
func (s *nopTransporter) Send(m []raftpb.Message) {}
|
||||||
func (s *nopTransporter) SendSnapshot(m snap.Message) {}
|
func (s *nopTransporter) SendSnapshot(m raftsnap.Message) {}
|
||||||
func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
|
func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
|
||||||
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
||||||
func (s *nopTransporter) RemovePeer(id types.ID) {}
|
func (s *nopTransporter) RemovePeer(id types.ID) {}
|
||||||
@ -397,18 +397,18 @@ func (s *nopTransporter) Resume() {}
|
|||||||
|
|
||||||
type snapTransporter struct {
|
type snapTransporter struct {
|
||||||
nopTransporter
|
nopTransporter
|
||||||
snapDoneC chan snap.Message
|
snapDoneC chan raftsnap.Message
|
||||||
snapDir string
|
snapDir string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSnapTransporter(snapDir string) (Transporter, <-chan snap.Message) {
|
func NewSnapTransporter(snapDir string) (Transporter, <-chan raftsnap.Message) {
|
||||||
ch := make(chan snap.Message, 1)
|
ch := make(chan raftsnap.Message, 1)
|
||||||
tr := &snapTransporter{snapDoneC: ch, snapDir: snapDir}
|
tr := &snapTransporter{snapDoneC: ch, snapDir: snapDir}
|
||||||
return tr, ch
|
return tr, ch
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *snapTransporter) SendSnapshot(m snap.Message) {
|
func (s *snapTransporter) SendSnapshot(m raftsnap.Message) {
|
||||||
ss := snap.New(s.snapDir)
|
ss := raftsnap.New(s.snapDir)
|
||||||
ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1)
|
ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1)
|
||||||
m.CloseWithError(nil)
|
m.CloseWithError(nil)
|
||||||
s.snapDoneC <- m
|
s.snapDoneC <- m
|
||||||
|
Loading…
x
Reference in New Issue
Block a user