mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #1095 from coreos/nsnap
WIP *: init for on disk snap support
This commit is contained in:
commit
ba851b2eca
@ -17,7 +17,10 @@ import (
|
||||
"github.com/coreos/etcd/wait"
|
||||
)
|
||||
|
||||
const defaultSyncTimeout = time.Second
|
||||
const (
|
||||
defaultSyncTimeout = time.Second
|
||||
DefaultSnapCount = 10000
|
||||
)
|
||||
|
||||
var (
|
||||
ErrUnknownMethod = errors.New("etcdserver: unknown method")
|
||||
@ -33,6 +36,19 @@ type Response struct {
|
||||
err error
|
||||
}
|
||||
|
||||
type Storage interface {
|
||||
// Save function saves ents and state to the underlying stable storage.
|
||||
// Save MUST block until st and ents are on stable storage.
|
||||
Save(st raftpb.HardState, ents []raftpb.Entry)
|
||||
// SaveSnap function saves snapshot to the underlying stable storage.
|
||||
SaveSnap(snap raftpb.Snapshot)
|
||||
|
||||
// TODO: WAL should be able to control cut itself. After implement self-controled cut,
|
||||
// remove it in this interface.
|
||||
// Cut cuts out a new wal file for saving new state and entries.
|
||||
Cut() error
|
||||
}
|
||||
|
||||
type Server interface {
|
||||
// Start performs any initialization of the Server necessary for it to
|
||||
// begin serving requests. It must be called before Do or Process.
|
||||
@ -63,18 +79,21 @@ type EtcdServer struct {
|
||||
// panic.
|
||||
Send SendFunc
|
||||
|
||||
// Save specifies the save function for saving ents to stable storage.
|
||||
// Save MUST block until st and ents are on stable storage. If Send is
|
||||
// nil, server will panic.
|
||||
Save func(st raftpb.HardState, ents []raftpb.Entry)
|
||||
Storage Storage
|
||||
|
||||
Ticker <-chan time.Time
|
||||
SyncTicker <-chan time.Time
|
||||
|
||||
SnapCount int64 // number of entries to trigger a snapshot
|
||||
}
|
||||
|
||||
// Start prepares and starts server in a new goroutine. It is no longer safe to
|
||||
// modify a server's fields after it has been sent to Start.
|
||||
func (s *EtcdServer) Start() {
|
||||
if s.SnapCount == 0 {
|
||||
log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount)
|
||||
s.SnapCount = DefaultSnapCount
|
||||
}
|
||||
s.w = wait.New()
|
||||
s.done = make(chan struct{})
|
||||
go s.run()
|
||||
@ -86,12 +105,15 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
|
||||
|
||||
func (s *EtcdServer) run() {
|
||||
var syncC <-chan time.Time
|
||||
// snapi indicates the index of the last submitted snapshot request
|
||||
var snapi, appliedi int64
|
||||
for {
|
||||
select {
|
||||
case <-s.Ticker:
|
||||
s.Node.Tick()
|
||||
case rd := <-s.Node.Ready():
|
||||
s.Save(rd.HardState, rd.Entries)
|
||||
s.Storage.Save(rd.HardState, rd.Entries)
|
||||
s.Storage.SaveSnap(rd.Snapshot)
|
||||
s.Send(rd.Messages)
|
||||
|
||||
// TODO(bmizerany): do this in the background, but take
|
||||
@ -103,6 +125,24 @@ func (s *EtcdServer) run() {
|
||||
panic("TODO: this is bad, what do we do about it?")
|
||||
}
|
||||
s.w.Trigger(r.Id, s.apply(r))
|
||||
appliedi = e.Index
|
||||
}
|
||||
|
||||
if rd.Snapshot.Index > snapi {
|
||||
snapi = rd.Snapshot.Index
|
||||
}
|
||||
|
||||
// recover from snapshot if it is more updated than current applied
|
||||
if rd.Snapshot.Index > appliedi {
|
||||
if err := s.Store.Recovery(rd.Snapshot.Data); err != nil {
|
||||
panic("TODO: this is bad, what do we do about it?")
|
||||
}
|
||||
appliedi = rd.Snapshot.Index
|
||||
}
|
||||
|
||||
if appliedi-snapi > s.SnapCount {
|
||||
s.snapshot()
|
||||
snapi = appliedi
|
||||
}
|
||||
|
||||
if rd.SoftState != nil {
|
||||
@ -241,6 +281,18 @@ func (s *EtcdServer) apply(r pb.Request) Response {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: non-blocking snapshot
|
||||
func (s *EtcdServer) snapshot() {
|
||||
d, err := s.Store.Save()
|
||||
// TODO: current store will never fail to do a snapshot
|
||||
// what should we do if the store might fail?
|
||||
if err != nil {
|
||||
panic("TODO: this is bad, what do we do about it?")
|
||||
}
|
||||
s.Node.Compact(d)
|
||||
s.Storage.Cut()
|
||||
}
|
||||
|
||||
// TODO: move the function to /id pkg maybe?
|
||||
// GenID generates a random id that is not equal to 0.
|
||||
func GenID() int64 {
|
||||
|
@ -162,11 +162,11 @@ func testServer(t *testing.T, ns int64) {
|
||||
tk := time.NewTicker(10 * time.Millisecond)
|
||||
defer tk.Stop()
|
||||
srv := &EtcdServer{
|
||||
Node: n,
|
||||
Store: store.New(),
|
||||
Send: send,
|
||||
Save: func(_ raftpb.HardState, _ []raftpb.Entry) {},
|
||||
Ticker: tk.C,
|
||||
Node: n,
|
||||
Store: store.New(),
|
||||
Send: send,
|
||||
Storage: &storageRecorder{},
|
||||
Ticker: tk.C,
|
||||
}
|
||||
srv.Start()
|
||||
// TODO(xiangli): randomize election timeout
|
||||
@ -231,11 +231,11 @@ func TestDoProposal(t *testing.T) {
|
||||
// this makes <-tk always successful, which accelerates internal clock
|
||||
close(tk)
|
||||
srv := &EtcdServer{
|
||||
Node: n,
|
||||
Store: st,
|
||||
Send: func(_ []raftpb.Message) {},
|
||||
Save: func(_ raftpb.HardState, _ []raftpb.Entry) {},
|
||||
Ticker: tk,
|
||||
Node: n,
|
||||
Store: st,
|
||||
Send: func(_ []raftpb.Message) {},
|
||||
Storage: &storageRecorder{},
|
||||
Ticker: tk,
|
||||
}
|
||||
srv.Start()
|
||||
resp, err := srv.Do(ctx, tt)
|
||||
@ -299,11 +299,11 @@ func TestDoProposalStopped(t *testing.T) {
|
||||
close(tk)
|
||||
srv := &EtcdServer{
|
||||
// TODO: use fake node for better testability
|
||||
Node: n,
|
||||
Store: st,
|
||||
Send: func(_ []raftpb.Message) {},
|
||||
Save: func(_ raftpb.HardState, _ []raftpb.Entry) {},
|
||||
Ticker: tk,
|
||||
Node: n,
|
||||
Store: st,
|
||||
Send: func(_ []raftpb.Message) {},
|
||||
Storage: &storageRecorder{},
|
||||
Ticker: tk,
|
||||
}
|
||||
srv.Start()
|
||||
|
||||
@ -417,7 +417,7 @@ func TestSyncTriggerDeleteExpriedKeys(t *testing.T) {
|
||||
Node: n,
|
||||
Store: st,
|
||||
Send: func(_ []raftpb.Message) {},
|
||||
Save: func(_ raftpb.HardState, _ []raftpb.Entry) {},
|
||||
Storage: &storageRecorder{},
|
||||
SyncTicker: syncTicker.C,
|
||||
}
|
||||
srv.Start()
|
||||
@ -435,6 +435,73 @@ func TestSyncTriggerDeleteExpriedKeys(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// snapshot should snapshot the store and cut the persistent
|
||||
// TODO: node.Compact is called... we need to make the node an interface
|
||||
func TestSnapshot(t *testing.T) {
|
||||
n := raft.Start(0xBAD0, []int64{0xBAD0}, 10, 1)
|
||||
defer n.Stop()
|
||||
st := &storeRecorder{}
|
||||
p := &storageRecorder{}
|
||||
s := &EtcdServer{
|
||||
Store: st,
|
||||
Storage: p,
|
||||
Node: n,
|
||||
}
|
||||
|
||||
s.snapshot()
|
||||
action := st.Action()
|
||||
if len(action) != 1 {
|
||||
t.Fatalf("len(action) = %d, want 1", len(action))
|
||||
}
|
||||
if action[0] != "Save" {
|
||||
t.Errorf("action = %s, want Save", action[0])
|
||||
}
|
||||
|
||||
action = p.Action()
|
||||
if len(action) != 1 {
|
||||
t.Fatalf("len(action) = %d, want 1", len(action))
|
||||
}
|
||||
if action[0] != "Cut" {
|
||||
t.Errorf("action = %s, want Cut", action[0])
|
||||
}
|
||||
}
|
||||
|
||||
// Applied > SnapCount should trigger a SaveSnap event
|
||||
// TODO: receive a snapshot from raft leader should also be able
|
||||
// to trigger snapSave and also trigger a store.Recover.
|
||||
// We need fake node!
|
||||
func TestTriggerSnap(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
n := raft.Start(0xBAD0, []int64{0xBAD0}, 10, 1)
|
||||
n.Campaign(ctx)
|
||||
st := &storeRecorder{}
|
||||
p := &storageRecorder{}
|
||||
s := &EtcdServer{
|
||||
Store: st,
|
||||
Send: func(_ []raftpb.Message) {},
|
||||
Storage: p,
|
||||
Node: n,
|
||||
SnapCount: 10,
|
||||
}
|
||||
|
||||
s.Start()
|
||||
for i := 0; int64(i) < s.SnapCount; i++ {
|
||||
s.Do(ctx, pb.Request{Method: "PUT", Id: 1})
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
s.Stop()
|
||||
|
||||
action := p.Action()
|
||||
// each operation is recorded as a Save
|
||||
// Nop + SnapCount * Puts + Cut + SaveSnap = Save + SnapCount * Save + Cut + SaveSnap
|
||||
if len(action) != 3+int(s.SnapCount) {
|
||||
t.Fatalf("len(action) = %d, want %d", len(action), 3+int(s.SnapCount))
|
||||
}
|
||||
if action[12] != "SaveSnap" {
|
||||
t.Errorf("action = %s, want SaveSnap", action[12])
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: test wait trigger correctness in multi-server case
|
||||
|
||||
func TestGetBool(t *testing.T) {
|
||||
@ -458,23 +525,28 @@ func TestGetBool(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
type storeRecorder struct {
|
||||
type recorder struct {
|
||||
sync.Mutex
|
||||
action []string
|
||||
}
|
||||
|
||||
func (s *storeRecorder) record(action string) {
|
||||
s.Lock()
|
||||
s.action = append(s.action, action)
|
||||
s.Unlock()
|
||||
func (r *recorder) record(action string) {
|
||||
r.Lock()
|
||||
r.action = append(r.action, action)
|
||||
r.Unlock()
|
||||
}
|
||||
func (s *storeRecorder) Action() []string {
|
||||
s.Lock()
|
||||
cpy := make([]string, len(s.action))
|
||||
copy(cpy, s.action)
|
||||
s.Unlock()
|
||||
func (r *recorder) Action() []string {
|
||||
r.Lock()
|
||||
cpy := make([]string, len(r.action))
|
||||
copy(cpy, r.action)
|
||||
r.Unlock()
|
||||
return cpy
|
||||
}
|
||||
|
||||
type storeRecorder struct {
|
||||
recorder
|
||||
}
|
||||
|
||||
func (s *storeRecorder) Version() int { return 0 }
|
||||
func (s *storeRecorder) Index() uint64 { return 0 }
|
||||
func (s *storeRecorder) Get(_ string, _, _ bool) (*store.Event, error) {
|
||||
@ -509,7 +581,10 @@ func (s *storeRecorder) Watch(_ string, _, _ bool, _ uint64) (store.Watcher, err
|
||||
s.record("Watch")
|
||||
return &stubWatcher{}, nil
|
||||
}
|
||||
func (s *storeRecorder) Save() ([]byte, error) { return nil, nil }
|
||||
func (s *storeRecorder) Save() ([]byte, error) {
|
||||
s.record("Save")
|
||||
return nil, nil
|
||||
}
|
||||
func (s *storeRecorder) Recovery(b []byte) error { return nil }
|
||||
func (s *storeRecorder) TotalTransactions() uint64 { return 0 }
|
||||
func (s *storeRecorder) JsonStats() []byte { return nil }
|
||||
@ -537,3 +612,21 @@ func (w *waitRecorder) Trigger(id int64, x interface{}) {
|
||||
func boolp(b bool) *bool { return &b }
|
||||
|
||||
func stringp(s string) *string { return &s }
|
||||
|
||||
type storageRecorder struct {
|
||||
recorder
|
||||
}
|
||||
|
||||
func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) {
|
||||
p.record("Save")
|
||||
}
|
||||
func (p *storageRecorder) Cut() error {
|
||||
p.record("Cut")
|
||||
return nil
|
||||
}
|
||||
func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) {
|
||||
if raft.IsEmptySnap(st) {
|
||||
return
|
||||
}
|
||||
p.record("SaveSnap")
|
||||
}
|
||||
|
@ -28,10 +28,10 @@ func TestSet(t *testing.T) {
|
||||
n.Campaign(ctx)
|
||||
|
||||
srv := &etcdserver.EtcdServer{
|
||||
Store: store.New(),
|
||||
Node: n,
|
||||
Save: func(st raftpb.HardState, ents []raftpb.Entry) {},
|
||||
Send: etcdserver.SendFunc(nopSend),
|
||||
Store: store.New(),
|
||||
Node: n,
|
||||
Storage: nopStorage{},
|
||||
Send: etcdserver.SendFunc(nopSend),
|
||||
}
|
||||
srv.Start()
|
||||
defer srv.Stop()
|
||||
@ -66,3 +66,9 @@ func TestSet(t *testing.T) {
|
||||
}
|
||||
|
||||
func stringp(s string) *string { return &s }
|
||||
|
||||
type nopStorage struct{}
|
||||
|
||||
func (np nopStorage) Save(st raftpb.HardState, ents []raftpb.Entry) {}
|
||||
func (np nopStorage) Cut() error { return nil }
|
||||
func (np nopStorage) SaveSnap(st raftpb.Snapshot) {}
|
||||
|
91
main.go
91
main.go
@ -16,6 +16,7 @@ import (
|
||||
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
||||
"github.com/coreos/etcd/proxy"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/snap"
|
||||
"github.com/coreos/etcd/store"
|
||||
"github.com/coreos/etcd/wal"
|
||||
)
|
||||
@ -31,6 +32,7 @@ var (
|
||||
paddr = flag.String("peer-bind-addr", ":7001", "Peer service address (e.g., ':7001')")
|
||||
dir = flag.String("data-dir", "", "Path to the data directory")
|
||||
proxyMode = flag.Bool("proxy-mode", false, "Forward HTTP requests to peers, do not participate in raft.")
|
||||
snapCount = flag.Int64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
|
||||
|
||||
peers = &etcdhttp.Peers{}
|
||||
addrs = &Addrs{}
|
||||
@ -70,6 +72,10 @@ func startEtcd() {
|
||||
log.Fatalf("%#x=<addr> must be specified in peers", id)
|
||||
}
|
||||
|
||||
if *snapCount <= 0 {
|
||||
log.Fatalf("etcd: snapshot-count must be greater than 0: snapshot-count=%d", *snapCount)
|
||||
}
|
||||
|
||||
if *dir == "" {
|
||||
*dir = fmt.Sprintf("%v_etcd_data", *fid)
|
||||
log.Printf("main: no data-dir is given, using default data-dir ./%s", *dir)
|
||||
@ -77,16 +83,61 @@ func startEtcd() {
|
||||
if err := os.MkdirAll(*dir, privateDirMode); err != nil {
|
||||
log.Fatalf("main: cannot create data directory: %v", err)
|
||||
}
|
||||
snapdir := path.Join(*dir, "snap")
|
||||
if err := os.MkdirAll(snapdir, privateDirMode); err != nil {
|
||||
log.Fatalf("etcd: cannot create snapshot directory: %v", err)
|
||||
}
|
||||
snapshotter := snap.New(snapdir)
|
||||
|
||||
n, w := startRaft(id, peers.IDs(), path.Join(*dir, "wal"))
|
||||
waldir := path.Join(*dir, "wal")
|
||||
var w *wal.WAL
|
||||
var n raft.Node
|
||||
st := store.New()
|
||||
|
||||
if !wal.Exist(waldir) {
|
||||
w, err = wal.Create(waldir)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
n = raft.Start(id, peers.IDs(), 10, 1)
|
||||
} else {
|
||||
var index int64
|
||||
snapshot, err := snapshotter.Load()
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if snapshot != nil {
|
||||
log.Printf("etcd: restart from snapshot at index %d", snapshot.Index)
|
||||
st.Recovery(snapshot.Data)
|
||||
index = snapshot.Index
|
||||
}
|
||||
|
||||
// restart a node from previous wal
|
||||
if w, err = wal.OpenAtIndex(waldir, index); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
wid, st, ents, err := w.ReadAll()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
// TODO(xiangli): save/recovery nodeID?
|
||||
if wid != 0 {
|
||||
log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid)
|
||||
}
|
||||
n = raft.Restart(id, peers.IDs(), 10, 1, snapshot, st, ents)
|
||||
}
|
||||
|
||||
s := &etcdserver.EtcdServer{
|
||||
Store: store.New(),
|
||||
Node: n,
|
||||
Save: w.Save,
|
||||
Store: st,
|
||||
Node: n,
|
||||
Storage: struct {
|
||||
*wal.WAL
|
||||
*snap.Snapshotter
|
||||
}{w, snapshotter},
|
||||
Send: etcdhttp.Sender(*peers),
|
||||
Ticker: time.Tick(100 * time.Millisecond),
|
||||
SyncTicker: time.Tick(500 * time.Millisecond),
|
||||
SnapCount: *snapCount,
|
||||
}
|
||||
s.Start()
|
||||
|
||||
@ -109,38 +160,6 @@ func startEtcd() {
|
||||
}
|
||||
}
|
||||
|
||||
// startRaft starts a raft node from the given wal dir.
|
||||
// If the wal dir does not exist, startRaft will start a new raft node.
|
||||
// If the wal dir exists, startRaft will restart the previous raft node.
|
||||
// startRaft returns the started raft node and the opened wal.
|
||||
func startRaft(id int64, peerIDs []int64, waldir string) (raft.Node, *wal.WAL) {
|
||||
if !wal.Exist(waldir) {
|
||||
w, err := wal.Create(waldir)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
n := raft.Start(id, peerIDs, 10, 1)
|
||||
return n, w
|
||||
}
|
||||
|
||||
// restart a node from previous wal
|
||||
// TODO(xiangli): check snapshot; not open from one
|
||||
w, err := wal.OpenAtIndex(waldir, 0)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
wid, st, ents, err := w.ReadAll()
|
||||
// TODO(xiangli): save/recovery nodeID?
|
||||
if wid != 0 {
|
||||
log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid)
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
n := raft.Restart(id, peerIDs, 10, 1, st, ents)
|
||||
return n, w
|
||||
}
|
||||
|
||||
// startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
|
||||
func startProxy() {
|
||||
ph, err := proxy.NewHandler((*peers).Endpoints())
|
||||
|
13
raft/log.go
13
raft/log.go
@ -11,13 +11,12 @@ const (
|
||||
)
|
||||
|
||||
type raftLog struct {
|
||||
ents []pb.Entry
|
||||
unstable int64
|
||||
committed int64
|
||||
applied int64
|
||||
offset int64
|
||||
snapshot pb.Snapshot
|
||||
unstableSnapshot pb.Snapshot
|
||||
ents []pb.Entry
|
||||
unstable int64
|
||||
committed int64
|
||||
applied int64
|
||||
offset int64
|
||||
snapshot pb.Snapshot
|
||||
|
||||
// want a compact after the number of entries exceeds the threshold
|
||||
// TODO(xiangli) size might be a better criteria
|
||||
|
57
raft/node.go
57
raft/node.go
@ -42,6 +42,9 @@ type Ready struct {
|
||||
// Messages are sent.
|
||||
Entries []pb.Entry
|
||||
|
||||
// Snapshot specifies the snapshot to be saved to stable storage.
|
||||
Snapshot pb.Snapshot
|
||||
|
||||
// CommittedEntries specifies entries to be committed to a
|
||||
// store/state-machine. These have previously been committed to stable
|
||||
// store.
|
||||
@ -60,16 +63,22 @@ func IsEmptyHardState(st pb.HardState) bool {
|
||||
return isHardStateEqual(st, emptyState)
|
||||
}
|
||||
|
||||
func IsEmptySnap(sp pb.Snapshot) bool {
|
||||
return sp.Index == 0
|
||||
}
|
||||
|
||||
func (rd Ready) containsUpdates() bool {
|
||||
return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) || len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0
|
||||
return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) || !IsEmptySnap(rd.Snapshot) ||
|
||||
len(rd.Entries) > 0 || len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0
|
||||
}
|
||||
|
||||
type Node struct {
|
||||
propc chan pb.Message
|
||||
recvc chan pb.Message
|
||||
readyc chan Ready
|
||||
tickc chan struct{}
|
||||
done chan struct{}
|
||||
propc chan pb.Message
|
||||
recvc chan pb.Message
|
||||
compactc chan []byte
|
||||
readyc chan Ready
|
||||
tickc chan struct{}
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// Start returns a new Node given a unique raft id, a list of raft peers, and
|
||||
@ -84,9 +93,12 @@ func Start(id int64, peers []int64, election, heartbeat int) Node {
|
||||
// Restart is identical to Start but takes an initial State and a slice of
|
||||
// entries. Generally this is used when restarting from a stable storage
|
||||
// log.
|
||||
func Restart(id int64, peers []int64, election, heartbeat int, st pb.HardState, ents []pb.Entry) Node {
|
||||
func Restart(id int64, peers []int64, election, heartbeat int, snapshot *pb.Snapshot, st pb.HardState, ents []pb.Entry) Node {
|
||||
n := newNode()
|
||||
r := newRaft(id, peers, election, heartbeat)
|
||||
if snapshot != nil {
|
||||
r.restore(*snapshot)
|
||||
}
|
||||
r.loadState(st)
|
||||
r.loadEnts(ents)
|
||||
go n.run(r)
|
||||
@ -95,11 +107,12 @@ func Restart(id int64, peers []int64, election, heartbeat int, st pb.HardState,
|
||||
|
||||
func newNode() Node {
|
||||
return Node{
|
||||
propc: make(chan pb.Message),
|
||||
recvc: make(chan pb.Message),
|
||||
readyc: make(chan Ready),
|
||||
tickc: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
propc: make(chan pb.Message),
|
||||
recvc: make(chan pb.Message),
|
||||
compactc: make(chan []byte),
|
||||
readyc: make(chan Ready),
|
||||
tickc: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
@ -114,9 +127,10 @@ func (n *Node) run(r *raft) {
|
||||
lead := None
|
||||
prevSoftSt := r.softState()
|
||||
prevHardSt := r.HardState
|
||||
prevSnapi := r.raftLog.snapshot.Index
|
||||
|
||||
for {
|
||||
rd := newReady(r, prevSoftSt, prevHardSt)
|
||||
rd := newReady(r, prevSoftSt, prevHardSt, prevSnapi)
|
||||
if rd.containsUpdates() {
|
||||
readyc = n.readyc
|
||||
} else {
|
||||
@ -139,6 +153,8 @@ func (n *Node) run(r *raft) {
|
||||
r.Step(m)
|
||||
case m := <-n.recvc:
|
||||
r.Step(m) // raft never returns an error
|
||||
case d := <-n.compactc:
|
||||
r.compact(d)
|
||||
case <-n.tickc:
|
||||
r.tick()
|
||||
case readyc <- rd:
|
||||
@ -148,6 +164,9 @@ func (n *Node) run(r *raft) {
|
||||
if !IsEmptyHardState(rd.HardState) {
|
||||
prevHardSt = rd.HardState
|
||||
}
|
||||
if !IsEmptySnap(rd.Snapshot) {
|
||||
prevSnapi = rd.Snapshot.Index
|
||||
}
|
||||
r.raftLog.resetNextEnts()
|
||||
r.raftLog.resetUnstable()
|
||||
r.msgs = nil
|
||||
@ -198,7 +217,14 @@ func (n *Node) Ready() <-chan Ready {
|
||||
return n.readyc
|
||||
}
|
||||
|
||||
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
|
||||
func (n *Node) Compact(d []byte) {
|
||||
select {
|
||||
case n.compactc <- d:
|
||||
case <-n.done:
|
||||
}
|
||||
}
|
||||
|
||||
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState, prevSnapi int64) Ready {
|
||||
rd := Ready{
|
||||
Entries: r.raftLog.unstableEnts(),
|
||||
CommittedEntries: r.raftLog.nextEnts(),
|
||||
@ -210,5 +236,8 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
|
||||
if !isHardStateEqual(r.HardState, prevHardSt) {
|
||||
rd.HardState = r.HardState
|
||||
}
|
||||
if prevSnapi != r.raftLog.snapshot.Index {
|
||||
rd.Snapshot = r.raftLog.snapshot
|
||||
}
|
||||
return rd
|
||||
}
|
||||
|
@ -126,6 +126,7 @@ func TestReadyContainUpdates(t *testing.T) {
|
||||
{Ready{Entries: make([]raftpb.Entry, 1, 1)}, true},
|
||||
{Ready{CommittedEntries: make([]raftpb.Entry, 1, 1)}, true},
|
||||
{Ready{Messages: make([]raftpb.Message, 1, 1)}, true},
|
||||
{Ready{Snapshot: raftpb.Snapshot{Index: 1}}, true},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
@ -185,7 +186,7 @@ func TestNodeRestart(t *testing.T) {
|
||||
CommittedEntries: entries[1 : st.Commit+1],
|
||||
}
|
||||
|
||||
n := Restart(1, []int64{1}, 0, 0, st, entries)
|
||||
n := Restart(1, []int64{1}, 0, 0, nil, st, entries)
|
||||
if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
|
||||
t.Errorf("g = %+v,\n w %+v", g, want)
|
||||
}
|
||||
@ -197,6 +198,56 @@ func TestNodeRestart(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestCompacts ensures Node.Compact creates a correct raft snapshot and compacts
|
||||
// the raft log (call raft.compact)
|
||||
func TestCompact(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
n := newNode()
|
||||
r := newRaft(1, []int64{1}, 0, 0)
|
||||
go n.run(r)
|
||||
|
||||
n.Campaign(ctx)
|
||||
n.Propose(ctx, []byte("foo"))
|
||||
|
||||
w := raftpb.Snapshot{
|
||||
Term: 1,
|
||||
Index: 2, // one nop + one proposal
|
||||
Data: []byte("a snapshot"),
|
||||
Nodes: []int64{1},
|
||||
}
|
||||
|
||||
forceGosched()
|
||||
select {
|
||||
case <-n.Ready():
|
||||
default:
|
||||
t.Fatalf("unexpected proposal failure: unable to commit entry")
|
||||
}
|
||||
|
||||
n.Compact(w.Data)
|
||||
forceGosched()
|
||||
select {
|
||||
case rd := <-n.Ready():
|
||||
if !reflect.DeepEqual(rd.Snapshot, w) {
|
||||
t.Errorf("snap = %+v, want %+v", rd.Snapshot, w)
|
||||
}
|
||||
default:
|
||||
t.Fatalf("unexpected compact failure: unable to create a snapshot")
|
||||
}
|
||||
forceGosched()
|
||||
// TODO: this test the run updates the snapi correctly... should be tested
|
||||
// separately with other kinds of updates
|
||||
select {
|
||||
case <-n.Ready():
|
||||
t.Fatalf("unexpected more ready")
|
||||
default:
|
||||
}
|
||||
n.Stop()
|
||||
|
||||
if r.raftLog.offset != w.Index {
|
||||
t.Errorf("log.offset = %d, want %d", r.raftLog.offset, w.Index)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsStateEqual(t *testing.T) {
|
||||
tests := []struct {
|
||||
st raftpb.HardState
|
||||
|
@ -502,9 +502,6 @@ func (r *raft) delProgress(id int64) {
|
||||
}
|
||||
|
||||
func (r *raft) loadEnts(ents []pb.Entry) {
|
||||
if !r.raftLog.isEmpty() {
|
||||
panic("cannot load entries when log is not empty")
|
||||
}
|
||||
r.raftLog.load(ents)
|
||||
}
|
||||
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/snap/snappb"
|
||||
)
|
||||
@ -35,7 +36,14 @@ func New(dir string) *Snapshotter {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Snapshotter) Save(snapshot *raftpb.Snapshot) error {
|
||||
func (s *Snapshotter) SaveSnap(snapshot raftpb.Snapshot) {
|
||||
if raft.IsEmptySnap(snapshot) {
|
||||
return
|
||||
}
|
||||
s.save(&snapshot)
|
||||
}
|
||||
|
||||
func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
|
||||
fname := fmt.Sprintf("%016x-%016x%s", snapshot.Term, snapshot.Index, snapSuffix)
|
||||
b, err := snapshot.Marshal()
|
||||
if err != nil {
|
||||
|
@ -27,7 +27,7 @@ func TestSaveAndLoad(t *testing.T) {
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
ss := New(dir)
|
||||
err = ss.Save(testSnap)
|
||||
err = ss.save(testSnap)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -49,7 +49,7 @@ func TestBadCRC(t *testing.T) {
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
ss := New(dir)
|
||||
err = ss.Save(testSnap)
|
||||
err = ss.save(testSnap)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -79,7 +79,7 @@ func TestFailback(t *testing.T) {
|
||||
}
|
||||
|
||||
ss := New(dir)
|
||||
err = ss.Save(testSnap)
|
||||
err = ss.save(testSnap)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -134,14 +134,14 @@ func TestLoadNewestSnap(t *testing.T) {
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
ss := New(dir)
|
||||
err = ss.Save(testSnap)
|
||||
err = ss.save(testSnap)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
newSnap := *testSnap
|
||||
newSnap.Index = 5
|
||||
err = ss.Save(&newSnap)
|
||||
err = ss.save(&newSnap)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -142,6 +142,7 @@ func OpenAtIndex(dirpath string, index int64) (*WAL, error) {
|
||||
|
||||
// create a WAL ready for reading
|
||||
w := &WAL{
|
||||
dir: dirpath,
|
||||
ri: index,
|
||||
decoder: newDecoder(rc),
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user