mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #3970 from xiang90/snapshot
*: rewrite snapshot merging and sending for v3
This commit is contained in:
commit
9b26753dbf
@ -121,8 +121,6 @@ func (c *ServerConfig) WALDir() string {
|
||||
|
||||
func (c *ServerConfig) SnapDir() string { return path.Join(c.MemberDir(), "snap") }
|
||||
|
||||
func (c *ServerConfig) StorageDir() string { return path.Join(c.MemberDir(), "storage") }
|
||||
|
||||
func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" }
|
||||
|
||||
// ReqTimeout returns timeout for request to finish.
|
||||
|
@ -109,7 +109,7 @@ type raftNode struct {
|
||||
|
||||
// utility
|
||||
ticker <-chan time.Time
|
||||
raftStorage *raftStorage
|
||||
raftStorage *raft.MemoryStorage
|
||||
storage Storage
|
||||
// transport specifies the transport to send and receive msgs to members.
|
||||
// Sending messages MUST NOT block. It is okay to drop messages, since
|
||||
@ -126,7 +126,6 @@ type raftNode struct {
|
||||
// TODO: Ideally raftNode should get rid of the passed in server structure.
|
||||
func (r *raftNode) start(s *EtcdServer) {
|
||||
r.s = s
|
||||
r.raftStorage.raftStarted = true
|
||||
r.applyc = make(chan apply)
|
||||
r.stopped = make(chan struct{})
|
||||
r.done = make(chan struct{})
|
||||
@ -245,7 +244,7 @@ func advanceTicksForElection(n raft.Node, electionTicks int) {
|
||||
}
|
||||
}
|
||||
|
||||
func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n raft.Node, s *raftStorage, w *wal.WAL) {
|
||||
func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
|
||||
var err error
|
||||
member := cl.MemberByName(cfg.Name)
|
||||
metadata := pbutil.MustMarshal(
|
||||
@ -270,7 +269,7 @@ func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n r
|
||||
}
|
||||
id = member.ID
|
||||
plog.Infof("starting member %s in cluster %s", id, cl.ID())
|
||||
s = newRaftStorage()
|
||||
s = raft.NewMemoryStorage()
|
||||
c := &raft.Config{
|
||||
ID: uint64(id),
|
||||
ElectionTick: cfg.ElectionTicks,
|
||||
@ -287,7 +286,7 @@ func startNode(cfg *ServerConfig, cl *cluster, ids []types.ID) (id types.ID, n r
|
||||
return
|
||||
}
|
||||
|
||||
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raftStorage, *wal.WAL) {
|
||||
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
|
||||
var walsnap walpb.Snapshot
|
||||
if snapshot != nil {
|
||||
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||
@ -297,7 +296,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust
|
||||
plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
|
||||
cl := newCluster("")
|
||||
cl.SetID(cid)
|
||||
s := newRaftStorage()
|
||||
s := raft.NewMemoryStorage()
|
||||
if snapshot != nil {
|
||||
s.ApplySnapshot(*snapshot)
|
||||
}
|
||||
@ -319,7 +318,7 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *clust
|
||||
return id, cl, n, s, w
|
||||
}
|
||||
|
||||
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raftStorage, *wal.WAL) {
|
||||
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *cluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
|
||||
var walsnap walpb.Snapshot
|
||||
if snapshot != nil {
|
||||
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||
@ -351,7 +350,7 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type
|
||||
plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
|
||||
cl := newCluster("")
|
||||
cl.SetID(cid)
|
||||
s := newRaftStorage()
|
||||
s := raft.NewMemoryStorage()
|
||||
if snapshot != nil {
|
||||
s.ApplySnapshot(*snapshot)
|
||||
}
|
||||
|
@ -1,64 +0,0 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package etcdserver
|
||||
|
||||
import (
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
)
|
||||
|
||||
type raftStorage struct {
|
||||
*raft.MemoryStorage
|
||||
// snapStore is the place to request snapshot when v3demo is enabled.
|
||||
// If snapStore is nil, it uses the snapshot saved in MemoryStorage.
|
||||
snapStore *snapshotStore
|
||||
// raftStarted indicates whether raft starts to function. If not, it cannot
|
||||
// request snapshot, and should get snapshot from MemoryStorage.
|
||||
raftStarted bool
|
||||
}
|
||||
|
||||
func newRaftStorage() *raftStorage {
|
||||
return &raftStorage{
|
||||
MemoryStorage: raft.NewMemoryStorage(),
|
||||
}
|
||||
}
|
||||
|
||||
func (rs *raftStorage) reqsnap() <-chan struct{} {
|
||||
if rs.snapStore == nil {
|
||||
return nil
|
||||
}
|
||||
return rs.snapStore.reqsnapc
|
||||
}
|
||||
|
||||
func (rs *raftStorage) raftsnap() chan<- raftpb.Snapshot {
|
||||
if rs.snapStore == nil {
|
||||
return nil
|
||||
}
|
||||
return rs.snapStore.raftsnapc
|
||||
}
|
||||
|
||||
// Snapshot returns raft snapshot. If snapStore is nil or raft is not started, this method
|
||||
// returns snapshot saved in MemoryStorage. Otherwise, this method
|
||||
// returns snapshot from snapStore.
|
||||
func (rs *raftStorage) Snapshot() (raftpb.Snapshot, error) {
|
||||
if rs.snapStore == nil || !rs.raftStarted {
|
||||
return rs.MemoryStorage.Snapshot()
|
||||
}
|
||||
snap, err := rs.snapStore.getSnap()
|
||||
if err != nil {
|
||||
return raftpb.Snapshot{}, err
|
||||
}
|
||||
return snap.raft(), nil
|
||||
}
|
@ -153,7 +153,7 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
|
||||
r := raftNode{
|
||||
Node: n,
|
||||
storage: &storageRecorder{},
|
||||
raftStorage: newRaftStorage(),
|
||||
raftStorage: raft.NewMemoryStorage(),
|
||||
transport: &nopTransporter{},
|
||||
}
|
||||
r.start(&EtcdServer{r: r})
|
||||
|
@ -19,7 +19,6 @@ import (
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"os"
|
||||
@ -65,6 +64,9 @@ const (
|
||||
monitorVersionInterval = 5 * time.Second
|
||||
|
||||
databaseFilename = "db"
|
||||
// max number of in-flight snapshot messages etcdserver allows to have
|
||||
// This number is more than enough for most clusters with 5 machines.
|
||||
maxInFlightMsgSnap = 16
|
||||
)
|
||||
|
||||
var (
|
||||
@ -177,19 +179,23 @@ type EtcdServer struct {
|
||||
// forceVersionC is used to force the version monitor loop
|
||||
// to detect the cluster version immediately.
|
||||
forceVersionC chan struct{}
|
||||
|
||||
msgSnapC chan raftpb.Message
|
||||
}
|
||||
|
||||
// NewServer creates a new EtcdServer from the supplied configuration. The
|
||||
// configuration is considered static for the lifetime of the EtcdServer.
|
||||
func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
st := store.New(StoreClusterPrefix, StoreKeysPrefix)
|
||||
var w *wal.WAL
|
||||
var n raft.Node
|
||||
var s *raftStorage
|
||||
var id types.ID
|
||||
var cl *cluster
|
||||
var (
|
||||
w *wal.WAL
|
||||
n raft.Node
|
||||
s *raft.MemoryStorage
|
||||
id types.ID
|
||||
cl *cluster
|
||||
)
|
||||
|
||||
if !cfg.V3demo && fileutil.Exist(path.Join(cfg.StorageDir(), databaseFilename)) {
|
||||
if !cfg.V3demo && fileutil.Exist(path.Join(cfg.SnapDir(), databaseFilename)) {
|
||||
return nil, errors.New("experimental-v3demo cannot be disabled once it is enabled")
|
||||
}
|
||||
|
||||
@ -340,18 +346,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
versionRt: prt,
|
||||
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
|
||||
forceVersionC: make(chan struct{}),
|
||||
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
|
||||
}
|
||||
|
||||
if cfg.V3demo {
|
||||
err = os.MkdirAll(cfg.StorageDir(), privateDirMode)
|
||||
if err != nil && err != os.ErrExist {
|
||||
return nil, err
|
||||
}
|
||||
srv.kv = dstorage.New(path.Join(cfg.StorageDir(), databaseFilename), &srv.consistIndex)
|
||||
srv.kv = dstorage.New(path.Join(cfg.SnapDir(), databaseFilename), &srv.consistIndex)
|
||||
if err := srv.kv.Restore(); err != nil {
|
||||
plog.Fatalf("v3 storage restore error: %v", err)
|
||||
}
|
||||
s.snapStore = newSnapshotStore(cfg.StorageDir(), srv.kv)
|
||||
}
|
||||
|
||||
// TODO: move transport initialization near the definition of remote
|
||||
@ -361,7 +363,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
ID: id,
|
||||
ClusterID: cl.ID(),
|
||||
Raft: srv,
|
||||
SnapSaver: s.snapStore,
|
||||
Snapshotter: ss,
|
||||
ServerStats: sstats,
|
||||
LeaderStats: lstats,
|
||||
ErrorC: srv.errorc,
|
||||
@ -383,10 +385,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
}
|
||||
srv.r.transport = tr
|
||||
|
||||
if cfg.V3demo {
|
||||
s.snapStore.tr = tr
|
||||
}
|
||||
|
||||
return srv, nil
|
||||
}
|
||||
|
||||
@ -465,9 +463,6 @@ func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }
|
||||
// and clears the used snapshot from the snapshot store.
|
||||
func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
|
||||
s.r.ReportSnapshot(id, status)
|
||||
if s.cfg.V3demo {
|
||||
s.r.raftStorage.snapStore.clearUsedSnap()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) run() {
|
||||
@ -496,12 +491,12 @@ func (s *EtcdServer) run() {
|
||||
}
|
||||
|
||||
if s.cfg.V3demo {
|
||||
snapfn, err := s.r.raftStorage.snapStore.getSnapFilePath(apply.snapshot.Metadata.Index)
|
||||
snapfn, err := s.r.storage.DBFilePath(apply.snapshot.Metadata.Index)
|
||||
if err != nil {
|
||||
plog.Panicf("get snapshot file path error: %v", err)
|
||||
plog.Panicf("get database snapshot file path error: %v", err)
|
||||
}
|
||||
|
||||
fn := path.Join(s.cfg.StorageDir(), databaseFilename)
|
||||
fn := path.Join(s.cfg.SnapDir(), databaseFilename)
|
||||
if err := os.Rename(snapfn, fn); err != nil {
|
||||
plog.Panicf("rename snapshot file error: %v", err)
|
||||
}
|
||||
@ -514,7 +509,6 @@ func (s *EtcdServer) run() {
|
||||
oldKV := s.kv
|
||||
// TODO: swap the kv pointer atomically
|
||||
s.kv = newKV
|
||||
s.r.raftStorage.snapStore.kv = newKV
|
||||
|
||||
// Closing oldKV might block until all the txns
|
||||
// on the kv are finished.
|
||||
@ -571,9 +565,9 @@ func (s *EtcdServer) run() {
|
||||
s.snapshot(appliedi, confState)
|
||||
snapi = appliedi
|
||||
}
|
||||
case <-s.r.raftStorage.reqsnap():
|
||||
s.r.raftStorage.raftsnap() <- s.createRaftSnapshot(appliedi, confState)
|
||||
plog.Infof("requested snapshot created at %d", appliedi)
|
||||
case m := <-s.msgSnapC:
|
||||
merged := s.createMergedSnapshotMessage(m, appliedi, confState)
|
||||
s.r.transport.SendSnapshot(merged)
|
||||
case err := <-s.errorc:
|
||||
plog.Errorf("%s", err)
|
||||
plog.Infof("the data-dir used by this member must be removed.")
|
||||
@ -828,7 +822,24 @@ func (s *EtcdServer) send(ms []raftpb.Message) {
|
||||
if s.cluster.IsIDRemoved(types.ID(ms[i].To)) {
|
||||
ms[i].To = 0
|
||||
}
|
||||
|
||||
if s.cfg.V3demo {
|
||||
if ms[i].Type == raftpb.MsgSnap {
|
||||
// There are two separate data store when v3 demo is enabled: the store for v2,
|
||||
// and the KV for v3.
|
||||
// The msgSnap only contains the most recent snapshot of store without KV.
|
||||
// So we need to redirect the msgSnap to etcd server main loop for merging in the
|
||||
// current store snapshot and KV snapshot.
|
||||
select {
|
||||
case s.msgSnapC <- ms[i]:
|
||||
default:
|
||||
// drop msgSnap if the inflight chan if full.
|
||||
}
|
||||
ms[i].To = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s.r.transport.Send(ms)
|
||||
}
|
||||
|
||||
@ -998,29 +1009,6 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// createRaftSnapshot creates a raft snapshot that includes the state of store for v2 api.
|
||||
func (s *EtcdServer) createRaftSnapshot(snapi uint64, confState raftpb.ConfState) raftpb.Snapshot {
|
||||
snapt, err := s.r.raftStorage.Term(snapi)
|
||||
if err != nil {
|
||||
log.Panicf("get term should never fail: %v", err)
|
||||
}
|
||||
|
||||
clone := s.store.Clone()
|
||||
d, err := clone.SaveNoCopy()
|
||||
if err != nil {
|
||||
plog.Panicf("store save should never fail: %v", err)
|
||||
}
|
||||
|
||||
return raftpb.Snapshot{
|
||||
Metadata: raftpb.SnapshotMetadata{
|
||||
Index: snapi,
|
||||
Term: snapt,
|
||||
ConfState: confState,
|
||||
},
|
||||
Data: d,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: non-blocking snapshot
|
||||
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
||||
clone := s.store.Clone()
|
||||
@ -1068,9 +1056,6 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
||||
plog.Panicf("unexpected compaction error %v", err)
|
||||
}
|
||||
plog.Infof("compacted raft log at %d", compacti)
|
||||
if s.cfg.V3demo && s.r.raftStorage.snapStore.closeSnapBefore(compacti) {
|
||||
plog.Infof("closed snapshot stored due to compaction at %d", compacti)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
|
@ -17,7 +17,6 @@ package etcdserver
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"path"
|
||||
"reflect"
|
||||
@ -33,6 +32,7 @@ import (
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/snap"
|
||||
"github.com/coreos/etcd/store"
|
||||
)
|
||||
|
||||
@ -523,7 +523,7 @@ func TestDoProposal(t *testing.T) {
|
||||
r: raftNode{
|
||||
Node: newNodeCommitter(),
|
||||
storage: &storageRecorder{},
|
||||
raftStorage: newRaftStorage(),
|
||||
raftStorage: raft.NewMemoryStorage(),
|
||||
transport: &nopTransporter{},
|
||||
},
|
||||
store: st,
|
||||
@ -675,7 +675,7 @@ func TestSyncTrigger(t *testing.T) {
|
||||
cfg: &ServerConfig{TickMs: 1},
|
||||
r: raftNode{
|
||||
Node: n,
|
||||
raftStorage: newRaftStorage(),
|
||||
raftStorage: raft.NewMemoryStorage(),
|
||||
transport: &nopTransporter{},
|
||||
storage: &storageRecorder{},
|
||||
},
|
||||
@ -712,51 +712,9 @@ func TestSyncTrigger(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateRaftSnapshot(t *testing.T) {
|
||||
s := newRaftStorage()
|
||||
s.Append([]raftpb.Entry{{Index: 1, Term: 1}})
|
||||
st := &storeRecorder{}
|
||||
srv := &EtcdServer{
|
||||
r: raftNode{
|
||||
raftStorage: s,
|
||||
},
|
||||
store: st,
|
||||
}
|
||||
|
||||
snap := srv.createRaftSnapshot(1, raftpb.ConfState{Nodes: []uint64{1}})
|
||||
wdata, err := st.Save()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
wsnap := raftpb.Snapshot{
|
||||
Metadata: raftpb.SnapshotMetadata{
|
||||
Index: 1,
|
||||
Term: 1,
|
||||
ConfState: raftpb.ConfState{Nodes: []uint64{1}},
|
||||
},
|
||||
Data: wdata,
|
||||
}
|
||||
if !reflect.DeepEqual(snap, wsnap) {
|
||||
t.Errorf("snap = %+v, want %+v", snap, wsnap)
|
||||
}
|
||||
|
||||
gaction := st.Action()
|
||||
// the third action is store.Save used in testing
|
||||
if len(gaction) != 3 {
|
||||
t.Fatalf("len(action) = %d, want 3", len(gaction))
|
||||
}
|
||||
if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "Clone"}) {
|
||||
t.Errorf("action = %s, want Clone", gaction[0])
|
||||
}
|
||||
if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "SaveNoCopy"}) {
|
||||
t.Errorf("action = %s, want SaveNoCopy", gaction[1])
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// snapshot should snapshot the store and cut the persistent
|
||||
func TestSnapshot(t *testing.T) {
|
||||
s := newRaftStorage()
|
||||
s := raft.NewMemoryStorage()
|
||||
s.Append([]raftpb.Entry{{Index: 1}})
|
||||
st := &storeRecorder{}
|
||||
p := &storageRecorder{}
|
||||
@ -800,7 +758,7 @@ func TestTriggerSnap(t *testing.T) {
|
||||
snapCount: uint64(snapc),
|
||||
r: raftNode{
|
||||
Node: newNodeCommitter(),
|
||||
raftStorage: newRaftStorage(),
|
||||
raftStorage: raft.NewMemoryStorage(),
|
||||
storage: p,
|
||||
transport: &nopTransporter{},
|
||||
},
|
||||
@ -841,7 +799,7 @@ func TestRecvSnapshot(t *testing.T) {
|
||||
Node: n,
|
||||
transport: &nopTransporter{},
|
||||
storage: p,
|
||||
raftStorage: newRaftStorage(),
|
||||
raftStorage: raft.NewMemoryStorage(),
|
||||
},
|
||||
store: st,
|
||||
cluster: cl,
|
||||
@ -874,7 +832,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
|
||||
st := &storeRecorder{}
|
||||
cl := newCluster("abc")
|
||||
cl.SetStore(store.New())
|
||||
storage := newRaftStorage()
|
||||
storage := raft.NewMemoryStorage()
|
||||
s := &EtcdServer{
|
||||
cfg: &ServerConfig{},
|
||||
r: raftNode{
|
||||
@ -923,7 +881,7 @@ func TestAddMember(t *testing.T) {
|
||||
s := &EtcdServer{
|
||||
r: raftNode{
|
||||
Node: n,
|
||||
raftStorage: newRaftStorage(),
|
||||
raftStorage: raft.NewMemoryStorage(),
|
||||
storage: &storageRecorder{},
|
||||
transport: &nopTransporter{},
|
||||
},
|
||||
@ -963,7 +921,7 @@ func TestRemoveMember(t *testing.T) {
|
||||
s := &EtcdServer{
|
||||
r: raftNode{
|
||||
Node: n,
|
||||
raftStorage: newRaftStorage(),
|
||||
raftStorage: raft.NewMemoryStorage(),
|
||||
storage: &storageRecorder{},
|
||||
transport: &nopTransporter{},
|
||||
},
|
||||
@ -1002,7 +960,7 @@ func TestUpdateMember(t *testing.T) {
|
||||
s := &EtcdServer{
|
||||
r: raftNode{
|
||||
Node: n,
|
||||
raftStorage: newRaftStorage(),
|
||||
raftStorage: raft.NewMemoryStorage(),
|
||||
storage: &storageRecorder{},
|
||||
transport: &nopTransporter{},
|
||||
},
|
||||
@ -1355,12 +1313,19 @@ func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) error {
|
||||
p.Record(testutil.Action{Name: "Save"})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error {
|
||||
if !raft.IsEmptySnap(st) {
|
||||
p.Record(testutil.Action{Name: "SaveSnap"})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *storageRecorder) DBFilePath(id uint64) (string, error) {
|
||||
p.Record(testutil.Action{Name: "DBFilePath"})
|
||||
return fmt.Sprintf("%016x.snap.db", id), nil
|
||||
}
|
||||
|
||||
func (p *storageRecorder) Close() error { return nil }
|
||||
|
||||
type nodeRecorder struct{ testutil.Recorder }
|
||||
@ -1477,16 +1442,16 @@ func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
|
||||
|
||||
type nopTransporter struct{}
|
||||
|
||||
func (s *nopTransporter) Start() error { return nil }
|
||||
func (s *nopTransporter) Handler() http.Handler { return nil }
|
||||
func (s *nopTransporter) Send(m []raftpb.Message) {}
|
||||
func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) RemovePeer(id types.ID) {}
|
||||
func (s *nopTransporter) RemoveAllPeers() {}
|
||||
func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} }
|
||||
func (s *nopTransporter) SnapshotReady(rc io.ReadCloser, index uint64) {}
|
||||
func (s *nopTransporter) Stop() {}
|
||||
func (s *nopTransporter) Pause() {}
|
||||
func (s *nopTransporter) Resume() {}
|
||||
func (s *nopTransporter) Start() error { return nil }
|
||||
func (s *nopTransporter) Handler() http.Handler { return nil }
|
||||
func (s *nopTransporter) Send(m []raftpb.Message) {}
|
||||
func (s *nopTransporter) SendSnapshot(m snap.Message) {}
|
||||
func (s *nopTransporter) AddRemote(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) AddPeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) RemovePeer(id types.ID) {}
|
||||
func (s *nopTransporter) RemoveAllPeers() {}
|
||||
func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {}
|
||||
func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} }
|
||||
func (s *nopTransporter) Stop() {}
|
||||
func (s *nopTransporter) Pause() {}
|
||||
func (s *nopTransporter) Resume() {}
|
||||
|
71
etcdserver/snapshot_merge.go
Normal file
71
etcdserver/snapshot_merge.go
Normal file
@ -0,0 +1,71 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package etcdserver
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log"
|
||||
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/snap"
|
||||
dstorage "github.com/coreos/etcd/storage"
|
||||
)
|
||||
|
||||
// createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf),
|
||||
// a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
|
||||
// as ReadCloser.
|
||||
func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapi uint64, confState raftpb.ConfState) snap.Message {
|
||||
snapt, err := s.r.raftStorage.Term(snapi)
|
||||
if err != nil {
|
||||
log.Panicf("get term should never fail: %v", err)
|
||||
}
|
||||
|
||||
// get a snapshot of v2 store as []byte
|
||||
clone := s.store.Clone()
|
||||
d, err := clone.SaveNoCopy()
|
||||
if err != nil {
|
||||
plog.Panicf("store save should never fail: %v", err)
|
||||
}
|
||||
|
||||
// get a snapshot of v3 KV as readCloser
|
||||
rc := newSnapshotReaderCloser(s.kv.Snapshot())
|
||||
|
||||
// put the []byte snapshot of store into raft snapshot and return the merged snapshot with
|
||||
// KV readCloser snapshot.
|
||||
snapshot := raftpb.Snapshot{
|
||||
Metadata: raftpb.SnapshotMetadata{
|
||||
Index: snapi,
|
||||
Term: snapt,
|
||||
ConfState: confState,
|
||||
},
|
||||
Data: d,
|
||||
}
|
||||
m.Snapshot = snapshot
|
||||
|
||||
return snap.Message{
|
||||
Message: m,
|
||||
ReadCloser: rc,
|
||||
}
|
||||
}
|
||||
|
||||
func newSnapshotReaderCloser(snapshot dstorage.Snapshot) io.ReadCloser {
|
||||
pr, pw := io.Pipe()
|
||||
go func() {
|
||||
_, err := snapshot.WriteTo(pw)
|
||||
pw.CloseWithError(err)
|
||||
snapshot.Close()
|
||||
}()
|
||||
return pr
|
||||
}
|
@ -1,260 +0,0 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package etcdserver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
dstorage "github.com/coreos/etcd/storage"
|
||||
)
|
||||
|
||||
// clearUnusedSnapshotInterval specifies the time interval to wait
|
||||
// before clearing unused snapshot.
|
||||
// The newly created snapshot should be retrieved within one heartbeat
|
||||
// interval because raft state machine retries to send snapshot
|
||||
// to slow follower when receiving MsgHeartbeatResp from the follower.
|
||||
// Set it as 5s to match the upper limit of heartbeat interval.
|
||||
const clearUnusedSnapshotInterval = 5 * time.Second
|
||||
|
||||
type snapshot struct {
|
||||
r raftpb.Snapshot
|
||||
|
||||
io.ReadCloser // used to read out v3 snapshot
|
||||
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func newSnapshot(r raftpb.Snapshot, kv dstorage.Snapshot) *snapshot {
|
||||
done := make(chan struct{})
|
||||
pr, pw := io.Pipe()
|
||||
go func() {
|
||||
_, err := kv.WriteTo(pw)
|
||||
pw.CloseWithError(err)
|
||||
kv.Close()
|
||||
close(done)
|
||||
}()
|
||||
return &snapshot{
|
||||
r: r,
|
||||
ReadCloser: pr,
|
||||
done: done,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *snapshot) raft() raftpb.Snapshot { return s.r }
|
||||
|
||||
func (s *snapshot) isClosed() bool {
|
||||
select {
|
||||
case <-s.done:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: remove snapshotStore. getSnap part could be put into memoryStorage,
|
||||
// while SaveFrom could be put into another struct, or even put into dstorage package.
|
||||
type snapshotStore struct {
|
||||
// dir to save snapshot data
|
||||
dir string
|
||||
kv dstorage.KV
|
||||
tr rafthttp.Transporter
|
||||
|
||||
// send empty to reqsnapc to notify the channel receiver to send back latest
|
||||
// snapshot to snapc
|
||||
reqsnapc chan struct{}
|
||||
// a chan to receive the requested raft snapshot
|
||||
// snapshotStore will receive from the chan immediately after it sends empty to reqsnapc
|
||||
raftsnapc chan raftpb.Snapshot
|
||||
|
||||
mu sync.Mutex // protect belowing vars
|
||||
// snap is nil iff there is no snapshot stored
|
||||
snap *snapshot
|
||||
inUse bool
|
||||
createOnce sync.Once // ensure at most one snapshot is created when no snapshot stored
|
||||
|
||||
clock clockwork.Clock
|
||||
}
|
||||
|
||||
func newSnapshotStore(dir string, kv dstorage.KV) *snapshotStore {
|
||||
return &snapshotStore{
|
||||
dir: dir,
|
||||
kv: kv,
|
||||
reqsnapc: make(chan struct{}),
|
||||
raftsnapc: make(chan raftpb.Snapshot),
|
||||
clock: clockwork.NewRealClock(),
|
||||
}
|
||||
}
|
||||
|
||||
// getSnap returns a snapshot.
|
||||
// If there is no available snapshot, ErrSnapshotTemporarilyUnavaliable will be returned.
|
||||
//
|
||||
// If the snapshot stored is in use, it returns ErrSnapshotTemporarilyUnavailable.
|
||||
// If there is no snapshot stored, it creates new snapshot
|
||||
// asynchronously and returns ErrSnapshotTemporarilyUnavailable, so
|
||||
// caller could get snapshot later when the snapshot is created.
|
||||
// Otherwise, it returns the snapshot stored.
|
||||
//
|
||||
// The created snapshot is cleared from the snapshot store if it is
|
||||
// either unused after clearUnusedSnapshotInterval, or explicitly cleared
|
||||
// through clearUsedSnap after using.
|
||||
// closeSnapBefore is used to close outdated snapshot,
|
||||
// so the snapshot will be cleared faster when in use.
|
||||
//
|
||||
// snapshot store stores at most one snapshot at a time.
|
||||
// If raft state machine wants to send two snapshot messages to two followers,
|
||||
// the second snapshot message will keep getting snapshot and succeed only after
|
||||
// the first message is sent. This increases the time used to send messages,
|
||||
// but it is acceptable because this should happen seldomly.
|
||||
func (ss *snapshotStore) getSnap() (*snapshot, error) {
|
||||
ss.mu.Lock()
|
||||
defer ss.mu.Unlock()
|
||||
|
||||
if ss.inUse {
|
||||
return nil, raft.ErrSnapshotTemporarilyUnavailable
|
||||
}
|
||||
|
||||
if ss.snap == nil {
|
||||
// create snapshot asynchronously
|
||||
ss.createOnce.Do(func() { go ss.createSnap() })
|
||||
return nil, raft.ErrSnapshotTemporarilyUnavailable
|
||||
}
|
||||
|
||||
ss.inUse = true
|
||||
// give transporter the generated snapshot that is ready to send out
|
||||
ss.tr.SnapshotReady(ss.snap, ss.snap.raft().Metadata.Index)
|
||||
return ss.snap, nil
|
||||
}
|
||||
|
||||
// clearUsedSnap clears the snapshot from the snapshot store after it
|
||||
// is used.
|
||||
// After clear, snapshotStore could create new snapshot when getSnap.
|
||||
func (ss *snapshotStore) clearUsedSnap() {
|
||||
ss.mu.Lock()
|
||||
defer ss.mu.Unlock()
|
||||
if !ss.inUse {
|
||||
plog.Panicf("unexpected clearUsedSnap when snapshot is not in use")
|
||||
}
|
||||
ss.clear()
|
||||
}
|
||||
|
||||
// closeSnapBefore closes the stored snapshot if its index is not greater
|
||||
// than the given compact index.
|
||||
// If it closes the snapshot, it returns true.
|
||||
func (ss *snapshotStore) closeSnapBefore(index uint64) bool {
|
||||
ss.mu.Lock()
|
||||
defer ss.mu.Unlock()
|
||||
if ss.snap != nil && ss.snap.raft().Metadata.Index <= index {
|
||||
if err := ss.snap.Close(); err != nil {
|
||||
plog.Errorf("snapshot close error (%v)", err)
|
||||
}
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// createSnap creates a new snapshot and stores it into the snapshot store.
|
||||
// It also sets a timer to clear the snapshot if it is not in use after
|
||||
// some time interval.
|
||||
// It should only be called in snapshotStore functions.
|
||||
func (ss *snapshotStore) createSnap() {
|
||||
// ask to generate v2 snapshot
|
||||
ss.reqsnapc <- struct{}{}
|
||||
// generate KV snapshot
|
||||
kvsnap := ss.kv.Snapshot()
|
||||
raftsnap := <-ss.raftsnapc
|
||||
snap := newSnapshot(raftsnap, kvsnap)
|
||||
|
||||
ss.mu.Lock()
|
||||
ss.snap = snap
|
||||
ss.mu.Unlock()
|
||||
|
||||
go func() {
|
||||
<-ss.clock.After(clearUnusedSnapshotInterval)
|
||||
ss.mu.Lock()
|
||||
defer ss.mu.Unlock()
|
||||
if snap == ss.snap && !ss.inUse {
|
||||
ss.clear()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// clear clears snapshot related variables in snapshotStore. It closes
|
||||
// the snapshot stored and sets the variables to initial values.
|
||||
// It should only be called in snapshotStore functions.
|
||||
func (ss *snapshotStore) clear() {
|
||||
if err := ss.snap.Close(); err != nil {
|
||||
plog.Errorf("snapshot close error (%v)", err)
|
||||
}
|
||||
ss.snap = nil
|
||||
ss.inUse = false
|
||||
ss.createOnce = sync.Once{}
|
||||
}
|
||||
|
||||
// SaveFrom saves snapshot at the given index from the given reader.
|
||||
// If the snapshot with the given index has been saved successfully, it keeps
|
||||
// the original saved snapshot and returns error.
|
||||
// The function guarantees that SaveFrom always saves either complete
|
||||
// snapshot or no snapshot, even if the call is aborted because program
|
||||
// is hard killed.
|
||||
func (ss *snapshotStore) SaveFrom(r io.Reader, index uint64) error {
|
||||
f, err := ioutil.TempFile(ss.dir, "tmp")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = io.Copy(f, r)
|
||||
f.Close()
|
||||
if err != nil {
|
||||
os.Remove(f.Name())
|
||||
return err
|
||||
}
|
||||
fn := path.Join(ss.dir, fmt.Sprintf("%016x.db", index))
|
||||
if fileutil.Exist(fn) {
|
||||
os.Remove(f.Name())
|
||||
return fmt.Errorf("snapshot to save has existed")
|
||||
}
|
||||
err = os.Rename(f.Name(), fn)
|
||||
if err != nil {
|
||||
os.Remove(f.Name())
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// getSnapFilePath returns the file path for the snapshot with given index.
|
||||
// If the snapshot does not exist, it returns error.
|
||||
func (ss *snapshotStore) getSnapFilePath(index uint64) (string, error) {
|
||||
fns, err := fileutil.ReadDir(ss.dir)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
wfn := fmt.Sprintf("%016x.db", index)
|
||||
for _, fn := range fns {
|
||||
if fn == wfn {
|
||||
return path.Join(ss.dir, fn), nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("snapshot file doesn't exist")
|
||||
}
|
@ -1,205 +0,0 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package etcdserver
|
||||
|
||||
import (
|
||||
"io"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
dstorage "github.com/coreos/etcd/storage"
|
||||
"github.com/coreos/etcd/storage/storagepb"
|
||||
)
|
||||
|
||||
func TestSnapshotStoreCreateSnap(t *testing.T) {
|
||||
snap := raftpb.Snapshot{
|
||||
Metadata: raftpb.SnapshotMetadata{Index: 1},
|
||||
}
|
||||
ss := newSnapshotStore("", &nopKV{})
|
||||
fakeClock := clockwork.NewFakeClock()
|
||||
ss.clock = fakeClock
|
||||
go func() {
|
||||
<-ss.reqsnapc
|
||||
ss.raftsnapc <- snap
|
||||
}()
|
||||
|
||||
// create snapshot
|
||||
ss.createSnap()
|
||||
if !reflect.DeepEqual(ss.snap.raft(), snap) {
|
||||
t.Errorf("raftsnap = %+v, want %+v", ss.snap.raft(), snap)
|
||||
}
|
||||
|
||||
// unused snapshot is cleared after clearUnusedSnapshotInterval
|
||||
fakeClock.BlockUntil(1)
|
||||
fakeClock.Advance(clearUnusedSnapshotInterval)
|
||||
testutil.WaitSchedule()
|
||||
ss.mu.Lock()
|
||||
if ss.snap != nil {
|
||||
t.Errorf("snap = %+v, want %+v", ss.snap, nil)
|
||||
}
|
||||
ss.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestSnapshotStoreGetSnap(t *testing.T) {
|
||||
snap := raftpb.Snapshot{
|
||||
Metadata: raftpb.SnapshotMetadata{Index: 1},
|
||||
}
|
||||
ss := newSnapshotStore("", &nopKV{})
|
||||
fakeClock := clockwork.NewFakeClock()
|
||||
ss.clock = fakeClock
|
||||
ss.tr = &nopTransporter{}
|
||||
go func() {
|
||||
<-ss.reqsnapc
|
||||
ss.raftsnapc <- snap
|
||||
}()
|
||||
|
||||
// get snap when no snapshot stored
|
||||
_, err := ss.getSnap()
|
||||
if err != raft.ErrSnapshotTemporarilyUnavailable {
|
||||
t.Fatalf("getSnap error = %v, want %v", err, raft.ErrSnapshotTemporarilyUnavailable)
|
||||
}
|
||||
|
||||
// wait for asynchronous snapshot creation to finish
|
||||
testutil.WaitSchedule()
|
||||
// get the created snapshot
|
||||
s, err := ss.getSnap()
|
||||
if err != nil {
|
||||
t.Fatalf("getSnap error = %v, want nil", err)
|
||||
}
|
||||
if !reflect.DeepEqual(s.raft(), snap) {
|
||||
t.Errorf("raftsnap = %+v, want %+v", s.raft(), snap)
|
||||
}
|
||||
if !ss.inUse {
|
||||
t.Errorf("inUse = %v, want true", ss.inUse)
|
||||
}
|
||||
|
||||
// get snap when snapshot stored has been in use
|
||||
_, err = ss.getSnap()
|
||||
if err != raft.ErrSnapshotTemporarilyUnavailable {
|
||||
t.Fatalf("getSnap error = %v, want %v", err, raft.ErrSnapshotTemporarilyUnavailable)
|
||||
}
|
||||
|
||||
// clean up
|
||||
fakeClock.Advance(clearUnusedSnapshotInterval)
|
||||
}
|
||||
|
||||
func TestSnapshotStoreClearUsedSnap(t *testing.T) {
|
||||
s := &fakeSnapshot{}
|
||||
var once sync.Once
|
||||
once.Do(func() {})
|
||||
ss := &snapshotStore{
|
||||
snap: newSnapshot(raftpb.Snapshot{}, s),
|
||||
inUse: true,
|
||||
createOnce: once,
|
||||
}
|
||||
|
||||
ss.clearUsedSnap()
|
||||
// wait for underlying KV snapshot closed
|
||||
testutil.WaitSchedule()
|
||||
s.mu.Lock()
|
||||
if !s.closed {
|
||||
t.Errorf("snapshot closed = %v, want true", s.closed)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
if ss.snap != nil {
|
||||
t.Errorf("snapshot = %v, want nil", ss.snap)
|
||||
}
|
||||
if ss.inUse {
|
||||
t.Errorf("isUse = %v, want false", ss.inUse)
|
||||
}
|
||||
// test createOnce is reset
|
||||
if ss.createOnce == once {
|
||||
t.Errorf("createOnce fails to reset")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotStoreCloseSnapBefore(t *testing.T) {
|
||||
snapIndex := uint64(5)
|
||||
|
||||
tests := []struct {
|
||||
index uint64
|
||||
wok bool
|
||||
}{
|
||||
{snapIndex - 2, false},
|
||||
{snapIndex - 1, false},
|
||||
{snapIndex, true},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
rs := raftpb.Snapshot{
|
||||
Metadata: raftpb.SnapshotMetadata{Index: 5},
|
||||
}
|
||||
s := &fakeSnapshot{}
|
||||
ss := &snapshotStore{
|
||||
snap: newSnapshot(rs, s),
|
||||
}
|
||||
|
||||
ok := ss.closeSnapBefore(tt.index)
|
||||
if ok != tt.wok {
|
||||
t.Errorf("#%d: closeSnapBefore = %v, want %v", i, ok, tt.wok)
|
||||
}
|
||||
if ok {
|
||||
// wait for underlying KV snapshot closed
|
||||
testutil.WaitSchedule()
|
||||
s.mu.Lock()
|
||||
if !s.closed {
|
||||
t.Errorf("#%d: snapshot closed = %v, want true", i, s.closed)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type nopKV struct{}
|
||||
|
||||
func (kv *nopKV) Rev() int64 { return 0 }
|
||||
func (kv *nopKV) Range(key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
|
||||
return nil, 0, nil
|
||||
}
|
||||
func (kv *nopKV) Put(key, value []byte) (rev int64) { return 0 }
|
||||
func (kv *nopKV) DeleteRange(key, end []byte) (n, rev int64) { return 0, 0 }
|
||||
func (kv *nopKV) TxnBegin() int64 { return 0 }
|
||||
func (kv *nopKV) TxnEnd(txnID int64) error { return nil }
|
||||
func (kv *nopKV) TxnRange(txnID int64, key, end []byte, limit, rangeRev int64) (kvs []storagepb.KeyValue, rev int64, err error) {
|
||||
return nil, 0, nil
|
||||
}
|
||||
func (kv *nopKV) TxnPut(txnID int64, key, value []byte) (rev int64, err error) { return 0, nil }
|
||||
func (kv *nopKV) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
|
||||
return 0, 0, nil
|
||||
}
|
||||
func (kv *nopKV) Compact(rev int64) error { return nil }
|
||||
func (kv *nopKV) Hash() (uint32, error) { return 0, nil }
|
||||
func (kv *nopKV) Snapshot() dstorage.Snapshot { return &fakeSnapshot{} }
|
||||
func (kv *nopKV) Commit() {}
|
||||
func (kv *nopKV) Restore() error { return nil }
|
||||
func (kv *nopKV) Close() error { return nil }
|
||||
|
||||
type fakeSnapshot struct {
|
||||
mu sync.Mutex
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (s *fakeSnapshot) Size() int64 { return 0 }
|
||||
func (s *fakeSnapshot) WriteTo(w io.Writer) (int64, error) { return 0, nil }
|
||||
func (s *fakeSnapshot) Close() error {
|
||||
s.mu.Lock()
|
||||
s.closed = true
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
}
|
@ -35,6 +35,9 @@ type Storage interface {
|
||||
Save(st raftpb.HardState, ents []raftpb.Entry) error
|
||||
// SaveSnap function saves snapshot to the underlying stable storage.
|
||||
SaveSnap(snap raftpb.Snapshot) error
|
||||
// DBFilePath returns the file path of database snapshot saved with given
|
||||
// id.
|
||||
DBFilePath(id uint64) (string, error)
|
||||
// Close closes the Storage and performs finalization.
|
||||
Close() error
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
pioutil "github.com/coreos/etcd/pkg/ioutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/snap"
|
||||
"github.com/coreos/etcd/version"
|
||||
)
|
||||
|
||||
@ -118,16 +119,16 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
type snapshotHandler struct {
|
||||
r Raft
|
||||
snapSaver SnapshotSaver
|
||||
cid types.ID
|
||||
r Raft
|
||||
snapshotter *snap.Snapshotter
|
||||
cid types.ID
|
||||
}
|
||||
|
||||
func newSnapshotHandler(r Raft, snapSaver SnapshotSaver, cid types.ID) http.Handler {
|
||||
func newSnapshotHandler(r Raft, snapshotter *snap.Snapshotter, cid types.ID) http.Handler {
|
||||
return &snapshotHandler{
|
||||
r: r,
|
||||
snapSaver: snapSaver,
|
||||
cid: cid,
|
||||
r: r,
|
||||
snapshotter: snapshotter,
|
||||
cid: cid,
|
||||
}
|
||||
}
|
||||
|
||||
@ -168,14 +169,14 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// save snapshot
|
||||
if err := h.snapSaver.SaveFrom(r.Body, m.Snapshot.Metadata.Index); err != nil {
|
||||
// save incoming database snapshot.
|
||||
if err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index); err != nil {
|
||||
msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
|
||||
plog.Error(msg)
|
||||
http.Error(w, msg, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
plog.Infof("received and saved snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
|
||||
plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
|
||||
|
||||
if err := h.r.Process(context.TODO(), m); err != nil {
|
||||
switch v := err.(type) {
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
"github.com/coreos/etcd/pkg/pbutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/snap"
|
||||
"github.com/coreos/etcd/version"
|
||||
)
|
||||
|
||||
@ -340,9 +341,10 @@ type fakePeerGetter struct {
|
||||
func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] }
|
||||
|
||||
type fakePeer struct {
|
||||
msgs []raftpb.Message
|
||||
urls types.URLs
|
||||
connc chan *outgoingConn
|
||||
msgs []raftpb.Message
|
||||
snapMsgs []snap.Message
|
||||
urls types.URLs
|
||||
connc chan *outgoingConn
|
||||
}
|
||||
|
||||
func newFakePeer() *fakePeer {
|
||||
@ -352,6 +354,7 @@ func newFakePeer() *fakePeer {
|
||||
}
|
||||
|
||||
func (pr *fakePeer) send(m raftpb.Message) { pr.msgs = append(pr.msgs, m) }
|
||||
func (pr *fakePeer) sendSnap(m snap.Message) { pr.snapMsgs = append(pr.snapMsgs, m) }
|
||||
func (pr *fakePeer) update(urls types.URLs) { pr.urls = urls }
|
||||
func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
|
||||
func (pr *fakePeer) activeSince() time.Time { return time.Time{} }
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/snap"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -57,6 +58,11 @@ type Peer interface {
|
||||
// When it fails to send message out, it will report the status to underlying
|
||||
// raft.
|
||||
send(m raftpb.Message)
|
||||
|
||||
// sendSanp sends the merged snapshot message to the remote peer. Its behavior
|
||||
// is similar to send.
|
||||
sendSnap(m snap.Message)
|
||||
|
||||
// update updates the urls of remote peer.
|
||||
update(urls types.URLs)
|
||||
// attachOutgoingConn attachs the outgoing connection to the peer for
|
||||
@ -110,7 +116,7 @@ type peer struct {
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, to, cid types.ID, snapst *snapshotStore, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer {
|
||||
func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer {
|
||||
picker := newURLPicker(urls)
|
||||
status := newPeerStatus(to)
|
||||
p := &peer{
|
||||
@ -121,7 +127,7 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t
|
||||
msgAppV2Writer: startStreamWriter(to, status, fs, r),
|
||||
writer: startStreamWriter(to, status, fs, r),
|
||||
pipeline: newPipeline(pipelineRt, picker, local, to, cid, status, fs, r, errorc),
|
||||
snapSender: newSnapshotSender(pipelineRt, picker, local, to, cid, status, snapst, r, errorc),
|
||||
snapSender: newSnapshotSender(pipelineRt, picker, local, to, cid, status, r, errorc),
|
||||
sendc: make(chan raftpb.Message),
|
||||
recvc: make(chan raftpb.Message, recvBufSize),
|
||||
propc: make(chan raftpb.Message, maxPendingProposals),
|
||||
@ -158,10 +164,6 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t
|
||||
if paused {
|
||||
continue
|
||||
}
|
||||
if p.v3demo && isMsgSnap(m) {
|
||||
go p.snapSender.send(m)
|
||||
continue
|
||||
}
|
||||
writec, name := p.pick(m)
|
||||
select {
|
||||
case writec <- m:
|
||||
@ -209,6 +211,10 @@ func (p *peer) send(m raftpb.Message) {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *peer) sendSnap(m snap.Message) {
|
||||
go p.snapSender.send(m)
|
||||
}
|
||||
|
||||
func (p *peer) update(urls types.URLs) {
|
||||
select {
|
||||
case p.newURLsC <- urls:
|
||||
|
@ -24,7 +24,7 @@ import (
|
||||
"github.com/coreos/etcd/pkg/httputil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/snap"
|
||||
)
|
||||
|
||||
type snapshotSender struct {
|
||||
@ -34,14 +34,13 @@ type snapshotSender struct {
|
||||
tr http.RoundTripper
|
||||
picker *urlPicker
|
||||
status *peerStatus
|
||||
snapst *snapshotStore
|
||||
r Raft
|
||||
errorc chan error
|
||||
|
||||
stopc chan struct{}
|
||||
}
|
||||
|
||||
func newSnapshotSender(tr http.RoundTripper, picker *urlPicker, from, to, cid types.ID, status *peerStatus, snapst *snapshotStore, r Raft, errorc chan error) *snapshotSender {
|
||||
func newSnapshotSender(tr http.RoundTripper, picker *urlPicker, from, to, cid types.ID, status *peerStatus, r Raft, errorc chan error) *snapshotSender {
|
||||
return &snapshotSender{
|
||||
from: from,
|
||||
to: to,
|
||||
@ -49,7 +48,6 @@ func newSnapshotSender(tr http.RoundTripper, picker *urlPicker, from, to, cid ty
|
||||
tr: tr,
|
||||
picker: picker,
|
||||
status: status,
|
||||
snapst: snapst,
|
||||
r: r,
|
||||
errorc: errorc,
|
||||
stopc: make(chan struct{}),
|
||||
@ -58,10 +56,12 @@ func newSnapshotSender(tr http.RoundTripper, picker *urlPicker, from, to, cid ty
|
||||
|
||||
func (s *snapshotSender) stop() { close(s.stopc) }
|
||||
|
||||
func (s *snapshotSender) send(m raftpb.Message) {
|
||||
func (s *snapshotSender) send(merged snap.Message) {
|
||||
m := merged.Message
|
||||
|
||||
start := time.Now()
|
||||
|
||||
body := createSnapBody(m, s.snapst)
|
||||
body := createSnapBody(merged)
|
||||
defer body.Close()
|
||||
|
||||
u := s.picker.pick()
|
||||
@ -142,20 +142,16 @@ type readCloser struct {
|
||||
io.Closer
|
||||
}
|
||||
|
||||
// createSnapBody creates the request body for the given raft snapshot message.
|
||||
// Callers should close body when done reading from it.
|
||||
func createSnapBody(m raftpb.Message, snapst *snapshotStore) io.ReadCloser {
|
||||
func createSnapBody(merged snap.Message) io.ReadCloser {
|
||||
buf := new(bytes.Buffer)
|
||||
enc := &messageEncoder{w: buf}
|
||||
// encode raft message
|
||||
if err := enc.encode(m); err != nil {
|
||||
if err := enc.encode(merged.Message); err != nil {
|
||||
plog.Panicf("encode message error (%v)", err)
|
||||
}
|
||||
// get snapshot
|
||||
rc := snapst.get(m.Snapshot.Metadata.Index)
|
||||
|
||||
return &readCloser{
|
||||
Reader: io.MultiReader(buf, rc),
|
||||
Closer: rc,
|
||||
Reader: io.MultiReader(buf, merged.ReadCloser),
|
||||
Closer: merged.ReadCloser,
|
||||
}
|
||||
}
|
||||
|
@ -1,45 +0,0 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package rafthttp
|
||||
|
||||
import (
|
||||
"io"
|
||||
)
|
||||
|
||||
// snapshotStore is the store of snapshot. Caller could put one
|
||||
// snapshot into the store, and get it later.
|
||||
// snapshotStore stores at most one snapshot at a time, or it panics.
|
||||
type snapshotStore struct {
|
||||
rc io.ReadCloser
|
||||
// index of the stored snapshot
|
||||
// index is 0 if and only if there is no snapshot stored.
|
||||
index uint64
|
||||
}
|
||||
|
||||
func (s *snapshotStore) put(rc io.ReadCloser, index uint64) {
|
||||
if s.index != 0 {
|
||||
plog.Panicf("unexpected put when there is one snapshot stored")
|
||||
}
|
||||
s.rc, s.index = rc, index
|
||||
}
|
||||
|
||||
func (s *snapshotStore) get(index uint64) io.ReadCloser {
|
||||
if s.index == index {
|
||||
// set index to 0 to indicate no snapshot stored
|
||||
s.index = 0
|
||||
return s.rc
|
||||
}
|
||||
return nil
|
||||
}
|
@ -15,7 +15,6 @@
|
||||
package rafthttp
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
@ -29,6 +28,7 @@ import (
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/snap"
|
||||
)
|
||||
|
||||
var plog = logutil.NewMergeLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "rafthttp"))
|
||||
@ -40,12 +40,6 @@ type Raft interface {
|
||||
ReportSnapshot(id uint64, status raft.SnapshotStatus)
|
||||
}
|
||||
|
||||
// SnapshotSaver is the interface that wraps the SaveFrom method.
|
||||
type SnapshotSaver interface {
|
||||
// SaveFrom saves the snapshot data at the given index from the given reader.
|
||||
SaveFrom(r io.Reader, index uint64) error
|
||||
}
|
||||
|
||||
type Transporter interface {
|
||||
// Start starts the given Transporter.
|
||||
// Start MUST be called before calling other functions in the interface.
|
||||
@ -62,6 +56,9 @@ type Transporter interface {
|
||||
// If the id cannot be found in the transport, the message
|
||||
// will be ignored.
|
||||
Send(m []raftpb.Message)
|
||||
// SendSnapshot sends out the given snapshot message to a remote peer.
|
||||
// The behavior of SendSnapshot is similar to Send.
|
||||
SendSnapshot(m snap.Message)
|
||||
// AddRemote adds a remote with given peer urls into the transport.
|
||||
// A remote helps newly joined member to catch up the progress of cluster,
|
||||
// and will not be used after that.
|
||||
@ -86,14 +83,6 @@ type Transporter interface {
|
||||
// If the connection is active since peer was added, it returns the adding time.
|
||||
// If the connection is currently inactive, it returns zero time.
|
||||
ActiveSince(id types.ID) time.Time
|
||||
// SnapshotReady accepts a snapshot at the given index that is ready to send out.
|
||||
// It is expected that caller sends a raft snapshot message with
|
||||
// the given index soon, and the accepted snapshot will be sent out
|
||||
// together. After sending, snapshot sent status is reported
|
||||
// through Raft.SnapshotStatus.
|
||||
// SnapshotReady MUST not be called when the snapshot sent status of previous
|
||||
// accepted one has not been reported.
|
||||
SnapshotReady(rc io.ReadCloser, index uint64)
|
||||
// Stop closes the connections and stops the transporter.
|
||||
Stop()
|
||||
}
|
||||
@ -108,10 +97,10 @@ type Transport struct {
|
||||
DialTimeout time.Duration // maximum duration before timing out dial of the request
|
||||
TLSInfo transport.TLSInfo // TLS information used when creating connection
|
||||
|
||||
ID types.ID // local member ID
|
||||
ClusterID types.ID // raft cluster ID for request validation
|
||||
Raft Raft // raft state machine, to which the Transport forwards received messages and reports status
|
||||
SnapSaver SnapshotSaver // used to save snapshot in v3 snapshot messages
|
||||
ID types.ID // local member ID
|
||||
ClusterID types.ID // raft cluster ID for request validation
|
||||
Raft Raft // raft state machine, to which the Transport forwards received messages and reports status
|
||||
Snapshotter *snap.Snapshotter
|
||||
ServerStats *stats.ServerStats // used to record general transportation statistics
|
||||
// used to record transportation statistics with followers when
|
||||
// performing as leader in raft protocol
|
||||
@ -130,8 +119,6 @@ type Transport struct {
|
||||
remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
|
||||
peers map[types.ID]Peer // peers map
|
||||
|
||||
snapst *snapshotStore
|
||||
|
||||
prober probing.Prober
|
||||
}
|
||||
|
||||
@ -147,7 +134,6 @@ func (t *Transport) Start() error {
|
||||
}
|
||||
t.remotes = make(map[types.ID]*remote)
|
||||
t.peers = make(map[types.ID]Peer)
|
||||
t.snapst = &snapshotStore{}
|
||||
t.prober = probing.NewProber(t.pipelineRt)
|
||||
return nil
|
||||
}
|
||||
@ -155,7 +141,7 @@ func (t *Transport) Start() error {
|
||||
func (t *Transport) Handler() http.Handler {
|
||||
pipelineHandler := newPipelineHandler(t.Raft, t.ClusterID)
|
||||
streamHandler := newStreamHandler(t, t.Raft, t.ID, t.ClusterID)
|
||||
snapHandler := newSnapshotHandler(t.Raft, t.SnapSaver, t.ClusterID)
|
||||
snapHandler := newSnapshotHandler(t.Raft, t.Snapshotter, t.ClusterID)
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle(RaftPrefix, pipelineHandler)
|
||||
mux.Handle(RaftStreamPrefix+"/", streamHandler)
|
||||
@ -240,7 +226,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) {
|
||||
plog.Panicf("newURLs %+v should never fail: %+v", us, err)
|
||||
}
|
||||
fs := t.LeaderStats.Follower(id.String())
|
||||
t.peers[id] = startPeer(t.streamRt, t.pipelineRt, urls, t.ID, id, t.ClusterID, t.snapst, t.Raft, fs, t.ErrorC, t.V3demo)
|
||||
t.peers[id] = startPeer(t.streamRt, t.pipelineRt, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC, t.V3demo)
|
||||
addPeerToProber(t.prober, id.String(), us)
|
||||
}
|
||||
|
||||
@ -296,8 +282,13 @@ func (t *Transport) ActiveSince(id types.ID) time.Time {
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
func (t *Transport) SnapshotReady(rc io.ReadCloser, index uint64) {
|
||||
t.snapst.put(rc, index)
|
||||
func (t *Transport) SendSnapshot(m snap.Message) {
|
||||
p := t.peers[types.ID(m.To)]
|
||||
if p == nil {
|
||||
m.ReadCloser.Close()
|
||||
return
|
||||
}
|
||||
p.sendSnap(m)
|
||||
}
|
||||
|
||||
type Pausable interface {
|
||||
|
67
snap/db.go
Normal file
67
snap/db.go
Normal file
@ -0,0 +1,67 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package snap
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
)
|
||||
|
||||
// SaveDBFrom saves snapshot of the database from the given reader. It
|
||||
// guarantees the save operation is atomic.
|
||||
func (s *Snapshotter) SaveDBFrom(r io.Reader, id uint64) error {
|
||||
f, err := ioutil.TempFile(s.dir, "tmp")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = io.Copy(f, r)
|
||||
f.Close()
|
||||
if err != nil {
|
||||
os.Remove(f.Name())
|
||||
return err
|
||||
}
|
||||
fn := path.Join(s.dir, fmt.Sprintf("%016x.snap.db", id))
|
||||
if fileutil.Exist(fn) {
|
||||
os.Remove(f.Name())
|
||||
return nil
|
||||
}
|
||||
err = os.Rename(f.Name(), fn)
|
||||
if err != nil {
|
||||
os.Remove(f.Name())
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DBFilePath returns the file path for the snapshot of the database with
|
||||
// given id. If the snapshot does not exist, it returns error.
|
||||
func (s *Snapshotter) DBFilePath(id uint64) (string, error) {
|
||||
fns, err := fileutil.ReadDir(s.dir)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
wfn := fmt.Sprintf("%016x.snap.db", id)
|
||||
for _, fn := range fns {
|
||||
if fn == wfn {
|
||||
return path.Join(s.dir, fn), nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("snap: snapshot file doesn't exist")
|
||||
}
|
34
snap/message.go
Normal file
34
snap/message.go
Normal file
@ -0,0 +1,34 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package snap
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
)
|
||||
|
||||
// Message is a struct that contains a raft Message and a ReadCloser. The type
|
||||
// of raft message MUST be MsgSnap, which contains the raft meta-data and an
|
||||
// additional data []byte field that contains the snapshot of the actual state
|
||||
// machine.
|
||||
// Message contains the ReadCloser field for handling large snapshot. This avoid
|
||||
// copying the entire snapshot into a byte array, which consumes a lot of memory.
|
||||
//
|
||||
// User of Message should close the ReadCloser after sending it.
|
||||
type Message struct {
|
||||
raftpb.Message
|
||||
ReadCloser io.ReadCloser
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user