mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: introduce log storage interface.
This change splits the raftLog.entries array into an in-memory "unstable" list and a pluggable interface for retrieving entries that have been persisted to disk. An in-memory implementation of this interface is provided which behaves the same as the old version; in a future commit etcdserver could replace the MemoryStorage with one backed by the WAL.
This commit is contained in:
parent
ac77971f99
commit
25b6590547
@ -28,7 +28,7 @@ import (
|
||||
"github.com/coreos/etcd/wal"
|
||||
)
|
||||
|
||||
func restartAsStandaloneNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *wal.WAL) {
|
||||
func restartAsStandaloneNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
|
||||
w, id, cid, st, ents := readWAL(cfg.WALDir(), index)
|
||||
cfg.Cluster.SetID(cid)
|
||||
|
||||
@ -53,8 +53,9 @@ func restartAsStandaloneNode(cfg *ServerConfig, index uint64, snapshot *raftpb.S
|
||||
}
|
||||
|
||||
log.Printf("etcdserver: forcing restart of member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
|
||||
n := raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents)
|
||||
return id, n, w
|
||||
s := raft.NewMemoryStorage()
|
||||
n := raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents, s)
|
||||
return id, n, s, w
|
||||
}
|
||||
|
||||
// getIDs returns an ordered set of IDs included in the given snapshot and
|
||||
|
@ -156,8 +156,9 @@ type EtcdServer struct {
|
||||
|
||||
Cluster *Cluster
|
||||
|
||||
node raft.Node
|
||||
store store.Store
|
||||
node raft.Node
|
||||
raftStorage *raft.MemoryStorage
|
||||
store store.Store
|
||||
|
||||
stats *stats.ServerStats
|
||||
lstats *stats.LeaderStats
|
||||
@ -190,6 +191,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
st := store.New()
|
||||
var w *wal.WAL
|
||||
var n raft.Node
|
||||
var s *raft.MemoryStorage
|
||||
var id types.ID
|
||||
haveWAL := wal.Exist(cfg.WALDir())
|
||||
switch {
|
||||
@ -204,7 +206,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
}
|
||||
cfg.Cluster.SetID(cl.id)
|
||||
cfg.Cluster.SetStore(st)
|
||||
id, n, w = startNode(cfg, nil)
|
||||
id, n, s, w = startNode(cfg, nil)
|
||||
case !haveWAL && cfg.NewCluster:
|
||||
if err := cfg.VerifyBootstrapConfig(); err != nil {
|
||||
return nil, err
|
||||
@ -221,7 +223,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
}
|
||||
cfg.Cluster.SetStore(st)
|
||||
log.Printf("etcdserver: initial cluster members: %s", cfg.Cluster)
|
||||
id, n, w = startNode(cfg, cfg.Cluster.MemberIDs())
|
||||
id, n, s, w = startNode(cfg, cfg.Cluster.MemberIDs())
|
||||
case haveWAL:
|
||||
if cfg.ShouldDiscover() {
|
||||
log.Printf("etcdserver: warn: ignoring discovery: etcd has already been initialized and has a valid log in %q", cfg.WALDir())
|
||||
@ -241,9 +243,9 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
log.Printf("etcdserver: loaded peers from snapshot: %s", cfg.Cluster)
|
||||
}
|
||||
if !cfg.ForceNewCluster {
|
||||
id, n, w = restartNode(cfg, index, snapshot)
|
||||
id, n, s, w = restartNode(cfg, index, snapshot)
|
||||
} else {
|
||||
id, n, w = restartAsStandaloneNode(cfg, index, snapshot)
|
||||
id, n, s, w = restartAsStandaloneNode(cfg, index, snapshot)
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported bootstrap config")
|
||||
@ -256,12 +258,13 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
lstats := stats.NewLeaderStats(id.String())
|
||||
|
||||
shub := newSendHub(cfg.Transport, cfg.Cluster, sstats, lstats)
|
||||
s := &EtcdServer{
|
||||
store: st,
|
||||
node: n,
|
||||
id: id,
|
||||
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
||||
Cluster: cfg.Cluster,
|
||||
srv := &EtcdServer{
|
||||
store: st,
|
||||
node: n,
|
||||
raftStorage: s,
|
||||
id: id,
|
||||
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
||||
Cluster: cfg.Cluster,
|
||||
storage: struct {
|
||||
*wal.WAL
|
||||
*snap.Snapshotter
|
||||
@ -273,7 +276,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
SyncTicker: time.Tick(500 * time.Millisecond),
|
||||
snapCount: cfg.SnapCount,
|
||||
}
|
||||
return s, nil
|
||||
return srv, nil
|
||||
}
|
||||
|
||||
// Start prepares and starts server in a new goroutine. It is no longer safe to
|
||||
@ -327,6 +330,7 @@ func (s *EtcdServer) run() {
|
||||
}
|
||||
}
|
||||
|
||||
s.raftStorage.Append(rd.Entries)
|
||||
if err := s.storage.Save(rd.HardState, rd.Entries); err != nil {
|
||||
log.Fatalf("etcdserver: save state and entries error: %v", err)
|
||||
}
|
||||
@ -722,7 +726,7 @@ func GetClusterFromPeers(urls []string) (*Cluster, error) {
|
||||
return nil, fmt.Errorf("etcdserver: could not retrieve cluster information from the given urls")
|
||||
}
|
||||
|
||||
func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, w *wal.WAL) {
|
||||
func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
|
||||
var err error
|
||||
member := cfg.Cluster.MemberByName(cfg.Name)
|
||||
metadata := pbutil.MustMarshal(
|
||||
@ -744,7 +748,8 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, w *
|
||||
}
|
||||
id = member.ID
|
||||
log.Printf("etcdserver: start node %s in cluster %s", id, cfg.Cluster.ID())
|
||||
n = raft.StartNode(uint64(id), peers, 10, 1)
|
||||
s = raft.NewMemoryStorage()
|
||||
n = raft.StartNode(uint64(id), peers, 10, 1, s)
|
||||
return
|
||||
}
|
||||
|
||||
@ -762,13 +767,14 @@ func getOtherPeerURLs(cl ClusterInfo, self string) []string {
|
||||
return us
|
||||
}
|
||||
|
||||
func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *wal.WAL) {
|
||||
func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (types.ID, raft.Node, *raft.MemoryStorage, *wal.WAL) {
|
||||
w, id, cid, st, ents := readWAL(cfg.WALDir(), index)
|
||||
cfg.Cluster.SetID(cid)
|
||||
|
||||
log.Printf("etcdserver: restart member %s in cluster %s at commit index %d", id, cfg.Cluster.ID(), st.Commit)
|
||||
n := raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents)
|
||||
return id, n, w
|
||||
s := raft.NewMemoryStorage()
|
||||
n := raft.RestartNode(uint64(id), 10, 1, snapshot, st, ents, s)
|
||||
return id, n, s, w
|
||||
}
|
||||
|
||||
func readWAL(waldir string, index uint64) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
|
||||
|
@ -519,24 +519,30 @@ func testServer(t *testing.T, ns uint64) {
|
||||
members := mustMakePeerSlice(t, ids...)
|
||||
for i := uint64(0); i < ns; i++ {
|
||||
id := i + 1
|
||||
n := raft.StartNode(id, members, 10, 1)
|
||||
s := raft.NewMemoryStorage()
|
||||
n := raft.StartNode(id, members, 10, 1, s)
|
||||
tk := time.NewTicker(10 * time.Millisecond)
|
||||
defer tk.Stop()
|
||||
st := store.New()
|
||||
cl := newCluster("abc")
|
||||
cl.SetStore(st)
|
||||
srv := &EtcdServer{
|
||||
node: n,
|
||||
store: st,
|
||||
sender: &fakeSender{ss},
|
||||
storage: &storageRecorder{},
|
||||
Ticker: tk.C,
|
||||
Cluster: cl,
|
||||
node: n,
|
||||
raftStorage: s,
|
||||
store: st,
|
||||
sender: &fakeSender{ss},
|
||||
storage: &storageRecorder{},
|
||||
Ticker: tk.C,
|
||||
Cluster: cl,
|
||||
}
|
||||
srv.start()
|
||||
ss[i] = srv
|
||||
}
|
||||
|
||||
// Start the servers after they're all created to avoid races in send().
|
||||
for i := uint64(0); i < ns; i++ {
|
||||
ss[i].start()
|
||||
}
|
||||
|
||||
for i := 1; i <= 10; i++ {
|
||||
r := pb.Request{
|
||||
Method: "PUT",
|
||||
@ -587,7 +593,8 @@ func TestDoProposal(t *testing.T) {
|
||||
|
||||
for i, tt := range tests {
|
||||
ctx, _ := context.WithCancel(context.Background())
|
||||
n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1)
|
||||
s := raft.NewMemoryStorage()
|
||||
n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1, s)
|
||||
st := &storeRecorder{}
|
||||
tk := make(chan time.Time)
|
||||
// this makes <-tk always successful, which accelerates internal clock
|
||||
@ -595,12 +602,13 @@ func TestDoProposal(t *testing.T) {
|
||||
cl := newCluster("abc")
|
||||
cl.SetStore(store.New())
|
||||
srv := &EtcdServer{
|
||||
node: n,
|
||||
store: st,
|
||||
sender: &nopSender{},
|
||||
storage: &storageRecorder{},
|
||||
Ticker: tk,
|
||||
Cluster: cl,
|
||||
node: n,
|
||||
raftStorage: s,
|
||||
store: st,
|
||||
sender: &nopSender{},
|
||||
storage: &storageRecorder{},
|
||||
Ticker: tk,
|
||||
Cluster: cl,
|
||||
}
|
||||
srv.start()
|
||||
resp, err := srv.Do(ctx, tt)
|
||||
@ -623,14 +631,16 @@ func TestDoProposal(t *testing.T) {
|
||||
func TestDoProposalCancelled(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
// node cannot make any progress because there are two nodes
|
||||
n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0, 0xBAD1), 10, 1)
|
||||
s := raft.NewMemoryStorage()
|
||||
n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0, 0xBAD1), 10, 1, s)
|
||||
st := &storeRecorder{}
|
||||
wait := &waitRecorder{}
|
||||
srv := &EtcdServer{
|
||||
// TODO: use fake node for better testability
|
||||
node: n,
|
||||
store: st,
|
||||
w: wait,
|
||||
node: n,
|
||||
raftStorage: s,
|
||||
store: st,
|
||||
w: wait,
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
@ -671,18 +681,20 @@ func TestDoProposalStopped(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
// node cannot make any progress because there are two nodes
|
||||
n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0, 0xBAD1), 10, 1)
|
||||
s := raft.NewMemoryStorage()
|
||||
n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0, 0xBAD1), 10, 1, s)
|
||||
st := &storeRecorder{}
|
||||
tk := make(chan time.Time)
|
||||
// this makes <-tk always successful, which accelarates internal clock
|
||||
close(tk)
|
||||
srv := &EtcdServer{
|
||||
// TODO: use fake node for better testability
|
||||
node: n,
|
||||
store: st,
|
||||
sender: &nopSender{},
|
||||
storage: &storageRecorder{},
|
||||
Ticker: tk,
|
||||
node: n,
|
||||
raftStorage: s,
|
||||
store: st,
|
||||
sender: &nopSender{},
|
||||
storage: &storageRecorder{},
|
||||
Ticker: tk,
|
||||
}
|
||||
srv.start()
|
||||
|
||||
@ -788,11 +800,12 @@ func TestSyncTrigger(t *testing.T) {
|
||||
}
|
||||
st := make(chan time.Time, 1)
|
||||
srv := &EtcdServer{
|
||||
node: n,
|
||||
store: &storeRecorder{},
|
||||
sender: &nopSender{},
|
||||
storage: &storageRecorder{},
|
||||
SyncTicker: st,
|
||||
node: n,
|
||||
raftStorage: raft.NewMemoryStorage(),
|
||||
store: &storeRecorder{},
|
||||
sender: &nopSender{},
|
||||
storage: &storageRecorder{},
|
||||
SyncTicker: st,
|
||||
}
|
||||
srv.start()
|
||||
// trigger the server to become a leader and accept sync requests
|
||||
@ -822,17 +835,19 @@ func TestSyncTrigger(t *testing.T) {
|
||||
// snapshot should snapshot the store and cut the persistent
|
||||
// TODO: node.Compact is called... we need to make the node an interface
|
||||
func TestSnapshot(t *testing.T) {
|
||||
n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1)
|
||||
s := raft.NewMemoryStorage()
|
||||
n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1, s)
|
||||
defer n.Stop()
|
||||
st := &storeRecorder{}
|
||||
p := &storageRecorder{}
|
||||
s := &EtcdServer{
|
||||
store: st,
|
||||
storage: p,
|
||||
node: n,
|
||||
srv := &EtcdServer{
|
||||
store: st,
|
||||
storage: p,
|
||||
node: n,
|
||||
raftStorage: s,
|
||||
}
|
||||
|
||||
s.snapshot(0, []uint64{1})
|
||||
srv.snapshot(0, []uint64{1})
|
||||
gaction := st.Action()
|
||||
if len(gaction) != 1 {
|
||||
t.Fatalf("len(action) = %d, want 1", len(gaction))
|
||||
@ -853,8 +868,10 @@ func TestSnapshot(t *testing.T) {
|
||||
// Applied > SnapCount should trigger a SaveSnap event
|
||||
func TestTriggerSnap(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1)
|
||||
<-n.Ready()
|
||||
s := raft.NewMemoryStorage()
|
||||
n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1, s)
|
||||
rd := <-n.Ready()
|
||||
s.Append(rd.Entries)
|
||||
n.Advance()
|
||||
n.ApplyConfChange(raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 0xBAD0})
|
||||
n.Campaign(ctx)
|
||||
@ -862,26 +879,27 @@ func TestTriggerSnap(t *testing.T) {
|
||||
p := &storageRecorder{}
|
||||
cl := newCluster("abc")
|
||||
cl.SetStore(store.New())
|
||||
s := &EtcdServer{
|
||||
store: st,
|
||||
sender: &nopSender{},
|
||||
storage: p,
|
||||
node: n,
|
||||
snapCount: 10,
|
||||
Cluster: cl,
|
||||
srv := &EtcdServer{
|
||||
store: st,
|
||||
sender: &nopSender{},
|
||||
storage: p,
|
||||
node: n,
|
||||
raftStorage: s,
|
||||
snapCount: 10,
|
||||
Cluster: cl,
|
||||
}
|
||||
|
||||
s.start()
|
||||
for i := 0; uint64(i) < s.snapCount-1; i++ {
|
||||
s.Do(ctx, pb.Request{Method: "PUT", ID: 1})
|
||||
srv.start()
|
||||
for i := 0; uint64(i) < srv.snapCount-1; i++ {
|
||||
srv.Do(ctx, pb.Request{Method: "PUT", ID: 1})
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
s.Stop()
|
||||
srv.Stop()
|
||||
|
||||
gaction := p.Action()
|
||||
// each operation is recorded as a Save
|
||||
// BootstrapConfig/Nop + (SnapCount - 1) * Puts + Cut + SaveSnap = Save + (SnapCount - 1) * Save + Cut + SaveSnap
|
||||
wcnt := 2 + int(s.snapCount)
|
||||
wcnt := 2 + int(srv.snapCount)
|
||||
if len(gaction) != wcnt {
|
||||
t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
|
||||
}
|
||||
@ -897,10 +915,11 @@ func TestRecvSnapshot(t *testing.T) {
|
||||
st := &storeRecorder{}
|
||||
p := &storageRecorder{}
|
||||
s := &EtcdServer{
|
||||
store: st,
|
||||
sender: &nopSender{},
|
||||
storage: p,
|
||||
node: n,
|
||||
store: st,
|
||||
sender: &nopSender{},
|
||||
storage: p,
|
||||
node: n,
|
||||
raftStorage: raft.NewMemoryStorage(),
|
||||
}
|
||||
|
||||
s.start()
|
||||
@ -925,10 +944,11 @@ func TestRecvSlowSnapshot(t *testing.T) {
|
||||
n := newReadyNode()
|
||||
st := &storeRecorder{}
|
||||
s := &EtcdServer{
|
||||
store: st,
|
||||
sender: &nopSender{},
|
||||
storage: &storageRecorder{},
|
||||
node: n,
|
||||
store: st,
|
||||
sender: &nopSender{},
|
||||
storage: &storageRecorder{},
|
||||
node: n,
|
||||
raftStorage: raft.NewMemoryStorage(),
|
||||
}
|
||||
|
||||
s.start()
|
||||
@ -959,11 +979,12 @@ func TestAddMember(t *testing.T) {
|
||||
cl := newTestCluster(nil)
|
||||
cl.SetStore(store.New())
|
||||
s := &EtcdServer{
|
||||
node: n,
|
||||
store: &storeRecorder{},
|
||||
sender: &nopSender{},
|
||||
storage: &storageRecorder{},
|
||||
Cluster: cl,
|
||||
node: n,
|
||||
raftStorage: raft.NewMemoryStorage(),
|
||||
store: &storeRecorder{},
|
||||
sender: &nopSender{},
|
||||
storage: &storageRecorder{},
|
||||
Cluster: cl,
|
||||
}
|
||||
s.start()
|
||||
m := Member{ID: 1234, RaftAttributes: RaftAttributes{PeerURLs: []string{"foo"}}}
|
||||
@ -994,11 +1015,12 @@ func TestRemoveMember(t *testing.T) {
|
||||
}
|
||||
cl := newTestCluster([]Member{{ID: 1234}})
|
||||
s := &EtcdServer{
|
||||
node: n,
|
||||
store: &storeRecorder{},
|
||||
sender: &nopSender{},
|
||||
storage: &storageRecorder{},
|
||||
Cluster: cl,
|
||||
node: n,
|
||||
raftStorage: raft.NewMemoryStorage(),
|
||||
store: &storeRecorder{},
|
||||
sender: &nopSender{},
|
||||
storage: &storageRecorder{},
|
||||
Cluster: cl,
|
||||
}
|
||||
s.start()
|
||||
err := s.RemoveMember(context.TODO(), 1234)
|
||||
|
@ -60,7 +60,7 @@ func mustTemp(pre, body string) string {
|
||||
func ltoa(l *raftLog) string {
|
||||
s := fmt.Sprintf("committed: %d\n", l.committed)
|
||||
s += fmt.Sprintf("applied: %d\n", l.applied)
|
||||
for i, e := range l.ents {
|
||||
for i, e := range l.allEntries() {
|
||||
s += fmt.Sprintf("#%d: %+v\n", i, e)
|
||||
}
|
||||
return s
|
||||
|
@ -26,7 +26,7 @@ func saveStateToDisk(st pb.HardState) {}
|
||||
func saveToDisk(ents []pb.Entry) {}
|
||||
|
||||
func Example_Node() {
|
||||
n := StartNode(0, nil, 0, 0)
|
||||
n := StartNode(0, nil, 0, 0, nil)
|
||||
|
||||
// stuff to n happens in other goroutines
|
||||
|
||||
|
156
raft/log.go
156
raft/log.go
@ -24,33 +24,54 @@ import (
|
||||
)
|
||||
|
||||
type raftLog struct {
|
||||
ents []pb.Entry
|
||||
unstable uint64
|
||||
// storage contains all stable entries since the last snapshot.
|
||||
storage Storage
|
||||
// unstableEnts contains all entries that have not yet been written
|
||||
// to storage.
|
||||
unstableEnts []pb.Entry
|
||||
// unstableEnts[i] has raft log position i+unstable. Note that
|
||||
// unstable may be less than the highest log position in storage;
|
||||
// this means that the next write to storage will truncate the log
|
||||
// before persisting unstableEnts.
|
||||
unstable uint64
|
||||
// committed is the highest log position that is known to be in
|
||||
// stable storage on a quorum of nodes.
|
||||
// Invariant: committed < unstable
|
||||
committed uint64
|
||||
applied uint64
|
||||
offset uint64
|
||||
snapshot pb.Snapshot
|
||||
// applied is the highest log position that the application has
|
||||
// been instructed to apply to its state machine.
|
||||
// Invariant: applied <= committed
|
||||
applied uint64
|
||||
snapshot pb.Snapshot
|
||||
}
|
||||
|
||||
func newLog() *raftLog {
|
||||
func newLog(storage Storage) *raftLog {
|
||||
if storage == nil {
|
||||
storage = NewMemoryStorage()
|
||||
}
|
||||
lastIndex, err := storage.GetLastIndex()
|
||||
if err != nil {
|
||||
panic(err) // TODO(bdarnell)
|
||||
}
|
||||
return &raftLog{
|
||||
ents: make([]pb.Entry, 1),
|
||||
unstable: 0,
|
||||
committed: 0,
|
||||
applied: 0,
|
||||
storage: storage,
|
||||
unstable: lastIndex + 1,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *raftLog) load(ents []pb.Entry) {
|
||||
if l.offset != ents[0].Index {
|
||||
// TODO(bdarnell): does this method need to support other Storage impls or does it go away?
|
||||
ms := l.storage.(*MemoryStorage)
|
||||
if ms.offset != ents[0].Index {
|
||||
panic("entries loaded don't match offset index")
|
||||
}
|
||||
l.ents = ents
|
||||
l.unstable = l.offset + uint64(len(ents))
|
||||
ms.ents = ents
|
||||
l.unstable = ms.offset + uint64(len(ents))
|
||||
}
|
||||
|
||||
func (l *raftLog) String() string {
|
||||
return fmt.Sprintf("offset=%d committed=%d applied=%d len(ents)=%d", l.offset, l.committed, l.applied, len(l.ents))
|
||||
return fmt.Sprintf("unstable=%d committed=%d applied=%d", l.unstable, l.committed, l.applied)
|
||||
|
||||
}
|
||||
|
||||
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
|
||||
@ -78,7 +99,15 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry
|
||||
}
|
||||
|
||||
func (l *raftLog) append(after uint64, ents ...pb.Entry) uint64 {
|
||||
l.ents = append(l.slice(l.offset, after+1), ents...)
|
||||
if after < l.unstable {
|
||||
// The log is being truncated to before our current unstable
|
||||
// portion, so discard it and reset unstable.
|
||||
l.unstableEnts = nil
|
||||
l.unstable = after + 1
|
||||
}
|
||||
// Truncate any unstable entries that are being replaced, then
|
||||
// append the new ones.
|
||||
l.unstableEnts = append(l.unstableEnts[0:1+after-l.unstable], ents...)
|
||||
l.unstable = min(l.unstable, after+1)
|
||||
return l.lastIndex()
|
||||
}
|
||||
@ -104,13 +133,12 @@ func (l *raftLog) findConflict(from uint64, ents []pb.Entry) uint64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (l *raftLog) unstableEnts() []pb.Entry {
|
||||
ents := l.slice(l.unstable, l.lastIndex()+1)
|
||||
if ents == nil {
|
||||
func (l *raftLog) unstableEntries() []pb.Entry {
|
||||
if len(l.unstableEnts) == 0 {
|
||||
return nil
|
||||
}
|
||||
cpy := make([]pb.Entry, len(ents))
|
||||
copy(cpy, ents)
|
||||
cpy := make([]pb.Entry, len(l.unstableEnts))
|
||||
copy(cpy, l.unstableEnts)
|
||||
return cpy
|
||||
}
|
||||
|
||||
@ -123,6 +151,25 @@ func (l *raftLog) nextEnts() (ents []pb.Entry) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *raftLog) firstIndex() uint64 {
|
||||
index, err := l.storage.GetFirstIndex()
|
||||
if err != nil {
|
||||
panic(err) // TODO(bdarnell)
|
||||
}
|
||||
return index
|
||||
}
|
||||
|
||||
func (l *raftLog) lastIndex() uint64 {
|
||||
if len(l.unstableEnts) > 0 {
|
||||
return l.unstable + uint64(len(l.unstableEnts)) - 1
|
||||
}
|
||||
index, err := l.storage.GetLastIndex()
|
||||
if err != nil {
|
||||
panic(err) // TODO(bdarnell)
|
||||
}
|
||||
return index
|
||||
}
|
||||
|
||||
func (l *raftLog) appliedTo(i uint64) {
|
||||
if i == 0 {
|
||||
return
|
||||
@ -137,12 +184,13 @@ func (l *raftLog) stableTo(i uint64) {
|
||||
if i == 0 {
|
||||
return
|
||||
}
|
||||
l.unstableEnts = l.unstableEnts[i+1-l.unstable:]
|
||||
l.unstable = i + 1
|
||||
}
|
||||
|
||||
func (l *raftLog) lastIndex() uint64 { return uint64(len(l.ents)) - 1 + l.offset }
|
||||
|
||||
func (l *raftLog) lastTerm() uint64 { return l.term(l.lastIndex()) }
|
||||
func (l *raftLog) lastTerm() uint64 {
|
||||
return l.term(l.lastIndex())
|
||||
}
|
||||
|
||||
func (l *raftLog) term(i uint64) uint64 {
|
||||
if e := l.at(i); e != nil {
|
||||
@ -155,12 +203,19 @@ func (l *raftLog) entries(i uint64) []pb.Entry {
|
||||
// never send out the first entry
|
||||
// first entry is only used for matching
|
||||
// prevLogTerm
|
||||
if i == l.offset {
|
||||
if i == 0 {
|
||||
panic("cannot return the first entry in log")
|
||||
}
|
||||
return l.slice(i, l.lastIndex()+1)
|
||||
}
|
||||
|
||||
// allEntries returns all entries in the log, including the initial
|
||||
// entry that is only used for prevLogTerm validation. This method
|
||||
// should only be used for testing.
|
||||
func (l *raftLog) allEntries() []pb.Entry {
|
||||
return l.slice(l.firstIndex(), l.lastIndex()+1)
|
||||
}
|
||||
|
||||
// isUpToDate determines if the given (lastIndex,term) log is more up-to-date
|
||||
// by comparing the index and term of the last entries in the existing logs.
|
||||
// If the logs have last entries with different terms, then the log with the
|
||||
@ -193,12 +248,22 @@ func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
|
||||
// the number of entries after compaction will be returned.
|
||||
func (l *raftLog) compact(i uint64) uint64 {
|
||||
if l.isOutOfAppliedBounds(i) {
|
||||
panic(fmt.Sprintf("compact %d out of bounds [%d:%d]", i, l.offset, l.applied))
|
||||
panic(fmt.Sprintf("compact %d out of bounds (applied up to %d)", i, l.applied))
|
||||
}
|
||||
err := l.storage.Compact(i)
|
||||
if err != nil {
|
||||
panic(err) // TODO(bdarnell)
|
||||
}
|
||||
l.ents = l.slice(i, l.lastIndex()+1)
|
||||
l.unstable = max(i+1, l.unstable)
|
||||
l.offset = i
|
||||
return uint64(len(l.ents))
|
||||
firstIndex, err := l.storage.GetFirstIndex()
|
||||
if err != nil {
|
||||
panic(err) // TODO(bdarnell)
|
||||
}
|
||||
lastIndex, err := l.storage.GetLastIndex()
|
||||
if err != nil {
|
||||
panic(err) // TODO(bdarnell)
|
||||
}
|
||||
return lastIndex - firstIndex
|
||||
}
|
||||
|
||||
func (l *raftLog) snap(d []byte, index, term uint64, nodes []uint64) {
|
||||
@ -211,19 +276,23 @@ func (l *raftLog) snap(d []byte, index, term uint64, nodes []uint64) {
|
||||
}
|
||||
|
||||
func (l *raftLog) restore(s pb.Snapshot) {
|
||||
l.ents = []pb.Entry{{Term: s.Term}}
|
||||
l.storage = &MemoryStorage{
|
||||
ents: []pb.Entry{{Term: s.Term}},
|
||||
offset: s.Index,
|
||||
}
|
||||
l.unstable = s.Index + 1
|
||||
l.unstableEnts = nil
|
||||
l.committed = s.Index
|
||||
l.applied = s.Index
|
||||
l.offset = s.Index
|
||||
l.snapshot = s
|
||||
}
|
||||
|
||||
func (l *raftLog) at(i uint64) *pb.Entry {
|
||||
if l.isOutOfBounds(i) {
|
||||
ents := l.slice(i, i+1)
|
||||
if len(ents) == 0 {
|
||||
return nil
|
||||
}
|
||||
return &l.ents[i-l.offset]
|
||||
return &ents[0]
|
||||
}
|
||||
|
||||
// slice returns a slice of log entries from lo through hi-1, inclusive.
|
||||
@ -234,18 +303,35 @@ func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry {
|
||||
if l.isOutOfBounds(lo) || l.isOutOfBounds(hi-1) {
|
||||
return nil
|
||||
}
|
||||
return l.ents[lo-l.offset : hi-l.offset]
|
||||
var ents []pb.Entry
|
||||
if lo < l.unstable {
|
||||
storedEnts, err := l.storage.GetEntries(lo, min(hi, l.unstable))
|
||||
if err != nil {
|
||||
panic(err) // TODO(bdarnell)
|
||||
}
|
||||
ents = append(ents, storedEnts...)
|
||||
}
|
||||
if len(l.unstableEnts) > 0 && hi > l.unstable {
|
||||
var firstUnstable uint64
|
||||
if lo < l.unstable {
|
||||
firstUnstable = l.unstable
|
||||
} else {
|
||||
firstUnstable = lo
|
||||
}
|
||||
ents = append(ents, l.unstableEnts[firstUnstable-l.unstable:hi-l.unstable]...)
|
||||
}
|
||||
return ents
|
||||
}
|
||||
|
||||
func (l *raftLog) isOutOfBounds(i uint64) bool {
|
||||
if i < l.offset || i > l.lastIndex() {
|
||||
if i < l.firstIndex() || i > l.lastIndex() {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (l *raftLog) isOutOfAppliedBounds(i uint64) bool {
|
||||
if i < l.offset || i > l.applied {
|
||||
if i < l.firstIndex() || i > l.applied {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
@ -49,7 +49,7 @@ func TestFindConflict(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
raftLog := newLog()
|
||||
raftLog := newLog(nil)
|
||||
raftLog.append(raftLog.lastIndex(), previousEnts...)
|
||||
|
||||
gconflict := raftLog.findConflict(tt.from, tt.ents)
|
||||
@ -61,7 +61,7 @@ func TestFindConflict(t *testing.T) {
|
||||
|
||||
func TestIsUpToDate(t *testing.T) {
|
||||
previousEnts := []pb.Entry{{Term: 1}, {Term: 2}, {Term: 3}}
|
||||
raftLog := newLog()
|
||||
raftLog := newLog(nil)
|
||||
raftLog.append(raftLog.lastIndex(), previousEnts...)
|
||||
tests := []struct {
|
||||
lastIndex uint64
|
||||
@ -92,7 +92,6 @@ func TestIsUpToDate(t *testing.T) {
|
||||
|
||||
func TestAppend(t *testing.T) {
|
||||
previousEnts := []pb.Entry{{Term: 1}, {Term: 2}}
|
||||
previousUnstable := uint64(3)
|
||||
tests := []struct {
|
||||
after uint64
|
||||
ents []pb.Entry
|
||||
@ -133,9 +132,10 @@ func TestAppend(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
raftLog := newLog()
|
||||
raftLog.append(raftLog.lastIndex(), previousEnts...)
|
||||
raftLog.unstable = previousUnstable
|
||||
storage := NewMemoryStorage()
|
||||
storage.Append(previousEnts)
|
||||
raftLog := newLog(storage)
|
||||
|
||||
index := raftLog.append(tt.after, tt.ents...)
|
||||
if index != tt.windex {
|
||||
t.Errorf("#%d: lastIndex = %d, want %d", i, index, tt.windex)
|
||||
@ -241,7 +241,7 @@ func TestLogMaybeAppend(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
raftLog := newLog()
|
||||
raftLog := newLog(nil)
|
||||
raftLog.append(raftLog.lastIndex(), previousEnts...)
|
||||
raftLog.committed = commit
|
||||
func() {
|
||||
@ -278,40 +278,50 @@ func TestLogMaybeAppend(t *testing.T) {
|
||||
// a compaction.
|
||||
func TestCompactionSideEffects(t *testing.T) {
|
||||
var i uint64
|
||||
// Populate the log with 1000 entries; 750 in stable storage and 250 in unstable.
|
||||
lastIndex := uint64(1000)
|
||||
unstableIndex := uint64(750)
|
||||
lastTerm := lastIndex
|
||||
raftLog := newLog()
|
||||
|
||||
for i = 0; i < lastIndex; i++ {
|
||||
raftLog.append(uint64(i), pb.Entry{Term: uint64(i + 1), Index: uint64(i + 1)})
|
||||
storage := NewMemoryStorage()
|
||||
for i = 1; i <= unstableIndex; i++ {
|
||||
storage.Append([]pb.Entry{{Term: uint64(i), Index: uint64(i)}})
|
||||
}
|
||||
raftLog := newLog(storage)
|
||||
for i = unstableIndex; i < lastIndex; i++ {
|
||||
raftLog.append(i, pb.Entry{Term: uint64(i + 1), Index: uint64(i + 1)})
|
||||
}
|
||||
|
||||
ok := raftLog.maybeCommit(lastIndex, lastTerm)
|
||||
if !ok {
|
||||
t.Fatalf("maybeCommit returned false")
|
||||
}
|
||||
raftLog.maybeCommit(lastIndex, lastTerm)
|
||||
raftLog.appliedTo(raftLog.committed)
|
||||
|
||||
raftLog.compact(500)
|
||||
offset := uint64(500)
|
||||
raftLog.compact(offset)
|
||||
|
||||
if raftLog.lastIndex() != lastIndex {
|
||||
t.Errorf("lastIndex = %d, want %d", raftLog.lastIndex(), lastIndex)
|
||||
}
|
||||
|
||||
for i := raftLog.offset; i <= raftLog.lastIndex(); i++ {
|
||||
for i := offset; i <= raftLog.lastIndex(); i++ {
|
||||
if raftLog.term(i) != i {
|
||||
t.Errorf("term(%d) = %d, want %d", i, raftLog.term(i), i)
|
||||
}
|
||||
}
|
||||
|
||||
for i := raftLog.offset; i <= raftLog.lastIndex(); i++ {
|
||||
for i := offset; i <= raftLog.lastIndex(); i++ {
|
||||
if !raftLog.matchTerm(i, i) {
|
||||
t.Errorf("matchTerm(%d) = false, want true", i)
|
||||
}
|
||||
}
|
||||
|
||||
unstableEnts := raftLog.unstableEnts()
|
||||
if g := len(unstableEnts); g != 500 {
|
||||
t.Errorf("len(unstableEntries) = %d, want = %d", g, 500)
|
||||
unstableEnts := raftLog.unstableEntries()
|
||||
if g := len(unstableEnts); g != 250 {
|
||||
t.Errorf("len(unstableEntries) = %d, want = %d", g, 250)
|
||||
}
|
||||
if unstableEnts[0].Index != 501 {
|
||||
t.Errorf("Index = %d, want = %d", unstableEnts[0].Index, 501)
|
||||
if unstableEnts[0].Index != 751 {
|
||||
t.Errorf("Index = %d, want = %d", unstableEnts[0].Index, 751)
|
||||
}
|
||||
|
||||
prev := raftLog.lastIndex()
|
||||
@ -338,10 +348,11 @@ func TestUnstableEnts(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
raftLog := newLog()
|
||||
raftLog.append(0, previousEnts...)
|
||||
raftLog.unstable = tt.unstable
|
||||
ents := raftLog.unstableEnts()
|
||||
storage := NewMemoryStorage()
|
||||
storage.Append(previousEnts[:tt.unstable-1])
|
||||
raftLog := newLog(storage)
|
||||
raftLog.append(raftLog.lastIndex(), previousEnts[tt.unstable-1:]...)
|
||||
ents := raftLog.unstableEntries()
|
||||
raftLog.stableTo(raftLog.lastIndex())
|
||||
if !reflect.DeepEqual(ents, tt.wents) {
|
||||
t.Errorf("#%d: unstableEnts = %+v, want %+v", i, ents, tt.wents)
|
||||
@ -374,22 +385,23 @@ func TestCompaction(t *testing.T) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
if tt.wallow == true {
|
||||
t.Errorf("%d: allow = %v, want %v", i, false, true)
|
||||
t.Errorf("%d: allow = %v, want %v: %v", i, false, true, r)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
raftLog := newLog()
|
||||
storage := NewMemoryStorage()
|
||||
for i := uint64(0); i < tt.lastIndex; i++ {
|
||||
raftLog.append(uint64(i), pb.Entry{})
|
||||
storage.Append([]pb.Entry{{}})
|
||||
}
|
||||
raftLog := newLog(storage)
|
||||
raftLog.maybeCommit(tt.applied, 0)
|
||||
raftLog.appliedTo(raftLog.committed)
|
||||
|
||||
for j := 0; j < len(tt.compact); j++ {
|
||||
raftLog.compact(tt.compact[j])
|
||||
if len(raftLog.ents) != tt.wleft[j] {
|
||||
t.Errorf("#%d.%d len = %d, want %d", i, j, len(raftLog.ents), tt.wleft[j])
|
||||
if len(raftLog.allEntries()) != tt.wleft[j] {
|
||||
t.Errorf("#%d.%d len = %d, want %d", i, j, len(raftLog.allEntries()), tt.wleft[j])
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -398,7 +410,7 @@ func TestCompaction(t *testing.T) {
|
||||
|
||||
func TestLogRestore(t *testing.T) {
|
||||
var i uint64
|
||||
raftLog := newLog()
|
||||
raftLog := newLog(nil)
|
||||
for i = 0; i < 100; i++ {
|
||||
raftLog.append(i, pb.Entry{Term: i + 1})
|
||||
}
|
||||
@ -408,11 +420,11 @@ func TestLogRestore(t *testing.T) {
|
||||
raftLog.restore(pb.Snapshot{Index: index, Term: term})
|
||||
|
||||
// only has the guard entry
|
||||
if len(raftLog.ents) != 1 {
|
||||
t.Errorf("len = %d, want 0", len(raftLog.ents))
|
||||
if len(raftLog.allEntries()) != 1 {
|
||||
t.Errorf("len = %d, want 1", len(raftLog.allEntries()))
|
||||
}
|
||||
if raftLog.offset != index {
|
||||
t.Errorf("offset = %d, want %d", raftLog.offset, index)
|
||||
if raftLog.firstIndex() != index {
|
||||
t.Errorf("firstIndex = %d, want %d", raftLog.firstIndex(), index)
|
||||
}
|
||||
if raftLog.applied != index {
|
||||
t.Errorf("applied = %d, want %d", raftLog.applied, index)
|
||||
@ -431,7 +443,9 @@ func TestLogRestore(t *testing.T) {
|
||||
func TestIsOutOfBounds(t *testing.T) {
|
||||
offset := uint64(100)
|
||||
num := uint64(100)
|
||||
l := &raftLog{offset: offset, ents: make([]pb.Entry, num)}
|
||||
l := newLog(nil)
|
||||
l.restore(pb.Snapshot{Index: offset})
|
||||
l.append(offset, make([]pb.Entry, num)...)
|
||||
|
||||
tests := []struct {
|
||||
index uint64
|
||||
@ -440,8 +454,8 @@ func TestIsOutOfBounds(t *testing.T) {
|
||||
{offset - 1, true},
|
||||
{offset, false},
|
||||
{offset + num/2, false},
|
||||
{offset + num - 1, false},
|
||||
{offset + num, true},
|
||||
{offset + num, false},
|
||||
{offset + num + 1, true},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
@ -457,9 +471,10 @@ func TestAt(t *testing.T) {
|
||||
offset := uint64(100)
|
||||
num := uint64(100)
|
||||
|
||||
l := &raftLog{offset: offset}
|
||||
l := newLog(nil)
|
||||
l.restore(pb.Snapshot{Index: offset})
|
||||
for i = 0; i < num; i++ {
|
||||
l.ents = append(l.ents, pb.Entry{Term: i})
|
||||
l.append(offset+i-1, pb.Entry{Term: i})
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
@ -486,9 +501,10 @@ func TestSlice(t *testing.T) {
|
||||
offset := uint64(100)
|
||||
num := uint64(100)
|
||||
|
||||
l := &raftLog{offset: offset}
|
||||
l := newLog(nil)
|
||||
l.restore(pb.Snapshot{Index: offset})
|
||||
for i = 0; i < num; i++ {
|
||||
l.ents = append(l.ents, pb.Entry{Term: i})
|
||||
l.append(offset+i-1, pb.Entry{Term: i})
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
|
12
raft/node.go
12
raft/node.go
@ -144,9 +144,9 @@ type Peer struct {
|
||||
// StartNode returns a new Node given a unique raft id, a list of raft peers, and
|
||||
// the election and heartbeat timeouts in units of ticks.
|
||||
// It also builds ConfChangeAddNode entry for each peer and puts them at the head of the log.
|
||||
func StartNode(id uint64, peers []Peer, election, heartbeat int) Node {
|
||||
func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage) Node {
|
||||
n := newNode()
|
||||
r := newRaft(id, nil, election, heartbeat)
|
||||
r := newRaft(id, nil, election, heartbeat, storage)
|
||||
|
||||
for _, peer := range peers {
|
||||
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
|
||||
@ -166,9 +166,11 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int) Node {
|
||||
// RestartNode is identical to StartNode but takes an initial State and a slice
|
||||
// of entries. Generally this is used when restarting from a stable storage
|
||||
// log.
|
||||
func RestartNode(id uint64, election, heartbeat int, snapshot *pb.Snapshot, st pb.HardState, ents []pb.Entry) Node {
|
||||
// TODO(bdarnell): remove args that are unnecessary with storage.
|
||||
// Maybe this function goes away and is replaced by StartNode with a non-empty Storage.
|
||||
func RestartNode(id uint64, election, heartbeat int, snapshot *pb.Snapshot, st pb.HardState, ents []pb.Entry, storage Storage) Node {
|
||||
n := newNode()
|
||||
r := newRaft(id, nil, election, heartbeat)
|
||||
r := newRaft(id, nil, election, heartbeat, storage)
|
||||
if snapshot != nil {
|
||||
r.restore(*snapshot)
|
||||
}
|
||||
@ -387,7 +389,7 @@ func (n *node) Compact(index uint64, nodes []uint64, d []byte) {
|
||||
|
||||
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState, prevSnapi uint64) Ready {
|
||||
rd := Ready{
|
||||
Entries: r.raftLog.unstableEnts(),
|
||||
Entries: r.raftLog.unstableEntries(),
|
||||
CommittedEntries: r.raftLog.nextEnts(),
|
||||
Messages: r.msgs,
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ func BenchmarkOneNode(b *testing.B) {
|
||||
defer cancel()
|
||||
|
||||
n := newNode()
|
||||
r := newRaft(1, []uint64{1}, 10, 1)
|
||||
r := newRaft(1, []uint64{1}, 10, 1, nil)
|
||||
go n.run(r)
|
||||
|
||||
defer n.Stop()
|
||||
|
@ -112,7 +112,7 @@ func TestNodeStepUnblock(t *testing.T) {
|
||||
// who is the current leader.
|
||||
func TestBlockProposal(t *testing.T) {
|
||||
n := newNode()
|
||||
r := newRaft(1, []uint64{1}, 10, 1)
|
||||
r := newRaft(1, []uint64{1}, 10, 1, nil)
|
||||
go n.run(r)
|
||||
defer n.Stop()
|
||||
|
||||
@ -175,7 +175,6 @@ func TestNode(t *testing.T) {
|
||||
SoftState: &SoftState{Lead: 1, Nodes: []uint64{1}, RaftState: StateLeader},
|
||||
HardState: raftpb.HardState{Term: 1, Commit: 2},
|
||||
Entries: []raftpb.Entry{
|
||||
{},
|
||||
{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
|
||||
{Term: 1, Index: 2},
|
||||
},
|
||||
@ -190,12 +189,15 @@ func TestNode(t *testing.T) {
|
||||
CommittedEntries: []raftpb.Entry{{Term: 1, Index: 3, Data: []byte("foo")}},
|
||||
},
|
||||
}
|
||||
n := StartNode(1, []Peer{{ID: 1}}, 10, 1)
|
||||
storage := NewMemoryStorage()
|
||||
n := StartNode(1, []Peer{{ID: 1}}, 10, 1, storage)
|
||||
n.ApplyConfChange(cc)
|
||||
n.Campaign(ctx)
|
||||
if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) {
|
||||
g := <-n.Ready()
|
||||
if !reflect.DeepEqual(g, wants[0]) {
|
||||
t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0])
|
||||
} else {
|
||||
storage.Append(g.Entries)
|
||||
n.Advance()
|
||||
}
|
||||
|
||||
@ -203,6 +205,7 @@ func TestNode(t *testing.T) {
|
||||
if g := <-n.Ready(); !reflect.DeepEqual(g, wants[1]) {
|
||||
t.Errorf("#%d: g = %+v,\n w %+v", 2, g, wants[1])
|
||||
} else {
|
||||
storage.Append(g.Entries)
|
||||
n.Advance()
|
||||
}
|
||||
|
||||
@ -227,7 +230,7 @@ func TestNodeRestart(t *testing.T) {
|
||||
CommittedEntries: entries[1 : st.Commit+1],
|
||||
}
|
||||
|
||||
n := RestartNode(1, 10, 1, nil, st, entries)
|
||||
n := RestartNode(1, 10, 1, nil, st, entries, nil)
|
||||
if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
|
||||
t.Errorf("g = %+v,\n w %+v", g, want)
|
||||
} else {
|
||||
@ -246,7 +249,8 @@ func TestNodeRestart(t *testing.T) {
|
||||
func TestNodeCompact(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
n := newNode()
|
||||
r := newRaft(1, []uint64{1}, 10, 1)
|
||||
storage := NewMemoryStorage()
|
||||
r := newRaft(1, []uint64{1}, 10, 1, storage)
|
||||
go n.run(r)
|
||||
|
||||
n.Campaign(ctx)
|
||||
@ -261,7 +265,8 @@ func TestNodeCompact(t *testing.T) {
|
||||
|
||||
testutil.ForceGosched()
|
||||
select {
|
||||
case <-n.Ready():
|
||||
case rd := <-n.Ready():
|
||||
storage.Append(rd.Entries)
|
||||
n.Advance()
|
||||
default:
|
||||
t.Fatalf("unexpected proposal failure: unable to commit entry")
|
||||
@ -274,6 +279,7 @@ func TestNodeCompact(t *testing.T) {
|
||||
if !reflect.DeepEqual(rd.Snapshot, w) {
|
||||
t.Errorf("snap = %+v, want %+v", rd.Snapshot, w)
|
||||
}
|
||||
storage.Append(rd.Entries)
|
||||
n.Advance()
|
||||
default:
|
||||
t.Fatalf("unexpected compact failure: unable to create a snapshot")
|
||||
@ -288,8 +294,8 @@ func TestNodeCompact(t *testing.T) {
|
||||
}
|
||||
n.Stop()
|
||||
|
||||
if r.raftLog.offset != w.Index {
|
||||
t.Errorf("log.offset = %d, want %d", r.raftLog.offset, w.Index)
|
||||
if r.raftLog.firstIndex() != w.Index {
|
||||
t.Errorf("log.offset = %d, want %d", r.raftLog.firstIndex(), w.Index)
|
||||
}
|
||||
}
|
||||
|
||||
@ -297,7 +303,7 @@ func TestNodeAdvance(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
n := StartNode(1, []Peer{{ID: 1}}, 10, 1)
|
||||
n := StartNode(1, []Peer{{ID: 1}}, 10, 1, nil)
|
||||
n.ApplyConfChange(raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1})
|
||||
n.Campaign(ctx)
|
||||
<-n.Ready()
|
||||
|
@ -119,14 +119,14 @@ type raft struct {
|
||||
step stepFunc
|
||||
}
|
||||
|
||||
func newRaft(id uint64, peers []uint64, election, heartbeat int) *raft {
|
||||
func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft {
|
||||
if id == None {
|
||||
panic("cannot use none id")
|
||||
}
|
||||
r := &raft{
|
||||
id: id,
|
||||
lead: None,
|
||||
raftLog: newLog(),
|
||||
raftLog: newLog(storage),
|
||||
prs: make(map[uint64]*progress),
|
||||
electionTimeout: election,
|
||||
heartbeatTimeout: heartbeat,
|
||||
@ -517,7 +517,7 @@ func (r *raft) restore(s pb.Snapshot) bool {
|
||||
}
|
||||
|
||||
func (r *raft) needSnapshot(i uint64) bool {
|
||||
if i < r.raftLog.offset {
|
||||
if i < r.raftLog.firstIndex() {
|
||||
if r.raftLog.snapshot.Term == 0 {
|
||||
panic("need non-empty snapshot")
|
||||
}
|
||||
|
@ -52,7 +52,7 @@ func TestLeaderUpdateTermFromMessage(t *testing.T) {
|
||||
// it immediately reverts to follower state.
|
||||
// Reference: section 5.1
|
||||
func testUpdateTermFromMessage(t *testing.T, state StateType) {
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
switch state {
|
||||
case StateFollower:
|
||||
r.becomeFollower(1, 2)
|
||||
@ -82,7 +82,7 @@ func TestRejectStaleTermMessage(t *testing.T) {
|
||||
fakeStep := func(r *raft, m pb.Message) {
|
||||
called = true
|
||||
}
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
r.step = fakeStep
|
||||
r.loadState(pb.HardState{Term: 2})
|
||||
|
||||
@ -96,7 +96,7 @@ func TestRejectStaleTermMessage(t *testing.T) {
|
||||
// TestStartAsFollower tests that when servers start up, they begin as followers.
|
||||
// Reference: section 5.2
|
||||
func TestStartAsFollower(t *testing.T) {
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
if r.state != StateFollower {
|
||||
t.Errorf("state = %s, want %s", r.state, StateFollower)
|
||||
}
|
||||
@ -109,7 +109,7 @@ func TestStartAsFollower(t *testing.T) {
|
||||
func TestLeaderBcastBeat(t *testing.T) {
|
||||
// heartbeat interval
|
||||
hi := 1
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, hi)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, hi, nil)
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
for i := 0; i < 10; i++ {
|
||||
@ -151,7 +151,7 @@ func TestCandidateStartNewElection(t *testing.T) {
|
||||
func testNonleaderStartElection(t *testing.T, state StateType) {
|
||||
// election timeout
|
||||
et := 10
|
||||
r := newRaft(1, []uint64{1, 2, 3}, et, 1)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, et, 1, nil)
|
||||
switch state {
|
||||
case StateFollower:
|
||||
r.becomeFollower(1, 2)
|
||||
@ -215,7 +215,7 @@ func TestLeaderElectionInOneRoundRPC(t *testing.T) {
|
||||
{5, map[uint64]bool{}, StateCandidate},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, idsBySize(tt.size), 10, 1)
|
||||
r := newRaft(1, idsBySize(tt.size), 10, 1, nil)
|
||||
|
||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
|
||||
for id, vote := range tt.votes {
|
||||
@ -248,7 +248,7 @@ func TestFollowerVote(t *testing.T) {
|
||||
{2, 1, true},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
r.loadState(pb.HardState{Term: 1, Vote: tt.vote})
|
||||
|
||||
r.Step(pb.Message{From: tt.nvote, To: 1, Term: 1, Type: pb.MsgVote})
|
||||
@ -274,7 +274,7 @@ func TestCandidateFallback(t *testing.T) {
|
||||
{From: 2, To: 1, Term: 2, Type: pb.MsgApp},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
|
||||
if r.state != StateCandidate {
|
||||
t.Fatalf("unexpected state = %s, want %s", r.state, StateCandidate)
|
||||
@ -303,7 +303,7 @@ func TestCandidateElectionTimeoutRandomized(t *testing.T) {
|
||||
// Reference: section 5.2
|
||||
func testNonleaderElectionTimeoutRandomized(t *testing.T, state StateType) {
|
||||
et := 10
|
||||
r := newRaft(1, []uint64{1, 2, 3}, et, 1)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, et, 1, nil)
|
||||
timeouts := make(map[int]bool)
|
||||
for round := 0; round < 50*et; round++ {
|
||||
switch state {
|
||||
@ -345,7 +345,7 @@ func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) {
|
||||
rs := make([]*raft, size)
|
||||
ids := idsBySize(size)
|
||||
for k := range rs {
|
||||
rs[k] = newRaft(ids[k], ids, et, 1)
|
||||
rs[k] = newRaft(ids[k], ids, et, 1, nil)
|
||||
}
|
||||
conflicts := 0
|
||||
for round := 0; round < 1000; round++ {
|
||||
@ -387,7 +387,7 @@ func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) {
|
||||
// Also, it writes the new entry into stable storage.
|
||||
// Reference: section 5.3
|
||||
func TestLeaderStartReplication(t *testing.T) {
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
commitNoopEntry(r)
|
||||
@ -412,7 +412,7 @@ func TestLeaderStartReplication(t *testing.T) {
|
||||
if !reflect.DeepEqual(msgs, wmsgs) {
|
||||
t.Errorf("msgs = %+v, want %+v", msgs, wmsgs)
|
||||
}
|
||||
if g := r.raftLog.unstableEnts(); !reflect.DeepEqual(g, wents) {
|
||||
if g := r.raftLog.unstableEntries(); !reflect.DeepEqual(g, wents) {
|
||||
t.Errorf("ents = %+v, want %+v", g, wents)
|
||||
}
|
||||
}
|
||||
@ -425,7 +425,7 @@ func TestLeaderStartReplication(t *testing.T) {
|
||||
// servers eventually find out.
|
||||
// Reference: section 5.3
|
||||
func TestLeaderCommitEntry(t *testing.T) {
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
commitNoopEntry(r)
|
||||
@ -478,7 +478,7 @@ func TestLeaderAcknowledgeCommit(t *testing.T) {
|
||||
{5, map[uint64]bool{2: true, 3: true, 4: true, 5: true}, true},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, idsBySize(tt.size), 10, 1)
|
||||
r := newRaft(1, idsBySize(tt.size), 10, 1, nil)
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
commitNoopEntry(r)
|
||||
@ -510,7 +510,7 @@ func TestLeaderCommitPrecedingEntries(t *testing.T) {
|
||||
{{Term: 1, Index: 1}},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
r.loadEnts(append([]pb.Entry{{}}, tt...))
|
||||
r.loadState(pb.HardState{Term: 2})
|
||||
r.becomeCandidate()
|
||||
@ -566,7 +566,7 @@ func TestFollowerCommitEntry(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
r.becomeFollower(1, 2)
|
||||
|
||||
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 1, Entries: tt.ents, Commit: tt.commit})
|
||||
@ -601,7 +601,7 @@ func TestFollowerCheckMsgApp(t *testing.T) {
|
||||
{3, 3, true},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
r.loadEnts(ents)
|
||||
r.loadState(pb.HardState{Commit: 2})
|
||||
r.becomeFollower(2, 2)
|
||||
@ -656,16 +656,16 @@ func TestFollowerAppendEntries(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
r.loadEnts([]pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}})
|
||||
r.becomeFollower(2, 2)
|
||||
|
||||
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index, Entries: tt.ents})
|
||||
|
||||
if g := r.raftLog.ents; !reflect.DeepEqual(g, tt.wents) {
|
||||
if g := r.raftLog.allEntries(); !reflect.DeepEqual(g, tt.wents) {
|
||||
t.Errorf("#%d: ents = %+v, want %+v", i, g, tt.wents)
|
||||
}
|
||||
if g := r.raftLog.unstableEnts(); !reflect.DeepEqual(g, tt.wunstable) {
|
||||
if g := r.raftLog.unstableEntries(); !reflect.DeepEqual(g, tt.wunstable) {
|
||||
t.Errorf("#%d: unstableEnts = %+v, want %+v", i, g, tt.wunstable)
|
||||
}
|
||||
}
|
||||
@ -724,10 +724,10 @@ func TestLeaderSyncFollowerLog(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
lead := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
lead := newRaft(1, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
lead.loadEnts(ents)
|
||||
lead.loadState(pb.HardState{Commit: lead.raftLog.lastIndex(), Term: term})
|
||||
follower := newRaft(2, []uint64{1, 2, 3}, 10, 1)
|
||||
follower := newRaft(2, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
follower.loadEnts(tt)
|
||||
follower.loadState(pb.HardState{Term: term - 1})
|
||||
// It is necessary to have a three-node cluster.
|
||||
@ -757,7 +757,7 @@ func TestVoteRequest(t *testing.T) {
|
||||
{[]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}, 3},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
r.Step(pb.Message{
|
||||
From: 2, To: 1, Type: pb.MsgApp, Term: tt.wterm - 1, LogTerm: 0, Index: 0, Entries: tt.ents,
|
||||
})
|
||||
@ -818,7 +818,7 @@ func TestVoter(t *testing.T) {
|
||||
{[]pb.Entry{{}, {Term: 2, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, nil)
|
||||
r.loadEnts(tt.ents)
|
||||
|
||||
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgVote, Term: 3, LogTerm: tt.logterm, Index: tt.index})
|
||||
@ -853,7 +853,7 @@ func TestLeaderOnlyCommitsLogFromCurrentTerm(t *testing.T) {
|
||||
{3, 3},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, nil)
|
||||
r.loadEnts(ents)
|
||||
r.loadState(pb.HardState{Term: 2})
|
||||
// become leader at term 3
|
||||
@ -891,6 +891,7 @@ func commitNoopEntry(r *raft) {
|
||||
}
|
||||
// ignore further messages to refresh followers' commmit index
|
||||
r.readMessages()
|
||||
r.raftLog.storage.(*MemoryStorage).Append(r.raftLog.unstableEntries())
|
||||
r.raftLog.appliedTo(r.raftLog.committed)
|
||||
r.raftLog.stableTo(r.raftLog.lastIndex())
|
||||
}
|
||||
|
@ -30,6 +30,11 @@ import (
|
||||
|
||||
// nextEnts returns the appliable entries and updates the applied index
|
||||
func nextEnts(r *raft) (ents []pb.Entry) {
|
||||
// Transfer all unstable entries to "stable" storage.
|
||||
memStorage := r.raftLog.storage.(*MemoryStorage)
|
||||
memStorage.Append(r.raftLog.unstableEntries())
|
||||
r.raftLog.stableTo(r.raftLog.lastIndex())
|
||||
|
||||
ents = r.raftLog.nextEnts()
|
||||
r.raftLog.appliedTo(r.raftLog.committed)
|
||||
return ents
|
||||
@ -280,9 +285,9 @@ func TestCommitWithoutNewTermEntry(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDuelingCandidates(t *testing.T) {
|
||||
a := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
b := newRaft(2, []uint64{1, 2, 3}, 10, 1)
|
||||
c := newRaft(3, []uint64{1, 2, 3}, 10, 1)
|
||||
a := newRaft(1, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
b := newRaft(2, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
c := newRaft(3, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
|
||||
nt := newNetwork(a, b, c)
|
||||
nt.cut(1, 3)
|
||||
@ -293,7 +298,11 @@ func TestDuelingCandidates(t *testing.T) {
|
||||
nt.recover()
|
||||
nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
|
||||
|
||||
wlog := &raftLog{ents: []pb.Entry{{}, pb.Entry{Data: nil, Term: 1, Index: 1}}, committed: 1}
|
||||
wlog := &raftLog{
|
||||
storage: &MemoryStorage{ents: []pb.Entry{{}, pb.Entry{Data: nil, Term: 1, Index: 1}}},
|
||||
committed: 1,
|
||||
unstable: 2,
|
||||
}
|
||||
tests := []struct {
|
||||
sm *raft
|
||||
state StateType
|
||||
@ -302,7 +311,7 @@ func TestDuelingCandidates(t *testing.T) {
|
||||
}{
|
||||
{a, StateFollower, 2, wlog},
|
||||
{b, StateFollower, 2, wlog},
|
||||
{c, StateFollower, 2, newLog()},
|
||||
{c, StateFollower, 2, newLog(nil)},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
@ -345,7 +354,13 @@ func TestCandidateConcede(t *testing.T) {
|
||||
if g := a.Term; g != 1 {
|
||||
t.Errorf("term = %d, want %d", g, 1)
|
||||
}
|
||||
wantLog := ltoa(&raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2})
|
||||
wantLog := ltoa(&raftLog{
|
||||
storage: &MemoryStorage{
|
||||
ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
|
||||
},
|
||||
unstable: 3,
|
||||
committed: 2,
|
||||
})
|
||||
for i, p := range tt.peers {
|
||||
if sm, ok := p.(*raft); ok {
|
||||
l := ltoa(sm.raftLog)
|
||||
@ -378,10 +393,13 @@ func TestOldMessages(t *testing.T) {
|
||||
tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgApp, Term: 1, Entries: []pb.Entry{{Term: 1}}})
|
||||
|
||||
l := &raftLog{
|
||||
ents: []pb.Entry{
|
||||
{}, {Data: nil, Term: 1, Index: 1},
|
||||
{Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3},
|
||||
storage: &MemoryStorage{
|
||||
ents: []pb.Entry{
|
||||
{}, {Data: nil, Term: 1, Index: 1},
|
||||
{Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3},
|
||||
},
|
||||
},
|
||||
unstable: 4,
|
||||
committed: 3,
|
||||
}
|
||||
base := ltoa(l)
|
||||
@ -432,9 +450,14 @@ func TestProposal(t *testing.T) {
|
||||
send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
|
||||
send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
|
||||
|
||||
wantLog := newLog()
|
||||
wantLog := newLog(nil)
|
||||
if tt.success {
|
||||
wantLog = &raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}}, committed: 2}
|
||||
wantLog = &raftLog{
|
||||
storage: &MemoryStorage{
|
||||
ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
|
||||
},
|
||||
unstable: 3,
|
||||
committed: 2}
|
||||
}
|
||||
base := ltoa(wantLog)
|
||||
for i, p := range tt.peers {
|
||||
@ -468,7 +491,12 @@ func TestProposalByProxy(t *testing.T) {
|
||||
// propose via follower
|
||||
tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
||||
|
||||
wantLog := &raftLog{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}}, committed: 2}
|
||||
wantLog := &raftLog{
|
||||
storage: &MemoryStorage{
|
||||
ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}},
|
||||
},
|
||||
unstable: 3,
|
||||
committed: 2}
|
||||
base := ltoa(wantLog)
|
||||
for i, p := range tt.peers {
|
||||
if sm, ok := p.(*raft); ok {
|
||||
@ -513,13 +541,15 @@ func TestCompact(t *testing.T) {
|
||||
raftLog: &raftLog{
|
||||
committed: 2,
|
||||
applied: 2,
|
||||
ents: []pb.Entry{{}, {Term: 1}, {Term: 1}, {Term: 1}},
|
||||
storage: &MemoryStorage{
|
||||
ents: []pb.Entry{{}, {Term: 1}, {Term: 1}, {Term: 1}},
|
||||
},
|
||||
},
|
||||
}
|
||||
sm.compact(tt.compacti, tt.nodes, tt.snapd)
|
||||
sort.Sort(uint64Slice(sm.raftLog.snapshot.Nodes))
|
||||
if sm.raftLog.offset != tt.compacti {
|
||||
t.Errorf("%d: log.offset = %d, want %d", i, sm.raftLog.offset, tt.compacti)
|
||||
if sm.raftLog.firstIndex() != tt.compacti {
|
||||
t.Errorf("%d: log.firstIndex = %d, want %d", i, sm.raftLog.firstIndex(), tt.compacti)
|
||||
}
|
||||
if !reflect.DeepEqual(sm.raftLog.snapshot.Nodes, tt.nodes) {
|
||||
t.Errorf("%d: snap.nodes = %v, want %v", i, sm.raftLog.snapshot.Nodes, tt.nodes)
|
||||
@ -564,7 +594,11 @@ func TestCommit(t *testing.T) {
|
||||
for j := 0; j < len(tt.matches); j++ {
|
||||
prs[uint64(j)] = &progress{tt.matches[j], tt.matches[j] + 1}
|
||||
}
|
||||
sm := &raft{raftLog: &raftLog{ents: tt.logs}, prs: prs, HardState: pb.HardState{Term: tt.smTerm}}
|
||||
sm := &raft{
|
||||
raftLog: &raftLog{storage: &MemoryStorage{ents: tt.logs}, unstable: uint64(len(tt.logs))},
|
||||
prs: prs,
|
||||
HardState: pb.HardState{Term: tt.smTerm},
|
||||
}
|
||||
sm.maybeCommit()
|
||||
if g := sm.raftLog.committed; g != tt.w {
|
||||
t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
|
||||
@ -586,7 +620,7 @@ func TestIsElectionTimeout(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
sm := newRaft(1, []uint64{1}, 10, 1)
|
||||
sm := newRaft(1, []uint64{1}, 10, 1, nil)
|
||||
sm.elapsed = tt.elapse
|
||||
c := 0
|
||||
for j := 0; j < 10000; j++ {
|
||||
@ -611,7 +645,7 @@ func TestStepIgnoreOldTermMsg(t *testing.T) {
|
||||
fakeStep := func(r *raft, m pb.Message) {
|
||||
called = true
|
||||
}
|
||||
sm := newRaft(1, []uint64{1}, 10, 1)
|
||||
sm := newRaft(1, []uint64{1}, 10, 1, nil)
|
||||
sm.step = fakeStep
|
||||
sm.Term = 2
|
||||
sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1})
|
||||
@ -654,7 +688,11 @@ func TestHandleMsgApp(t *testing.T) {
|
||||
sm := &raft{
|
||||
state: StateFollower,
|
||||
HardState: pb.HardState{Term: 2},
|
||||
raftLog: &raftLog{committed: 0, ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
|
||||
raftLog: &raftLog{
|
||||
committed: 0,
|
||||
storage: &MemoryStorage{ents: []pb.Entry{{}, {Term: 1}, {Term: 2}}},
|
||||
unstable: 3,
|
||||
},
|
||||
}
|
||||
|
||||
sm.handleAppendEntries(tt.m)
|
||||
@ -709,7 +747,7 @@ func TestRecvMsgVote(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
sm := newRaft(1, []uint64{1}, 10, 1)
|
||||
sm := newRaft(1, []uint64{1}, 10, 1, nil)
|
||||
sm.state = tt.state
|
||||
switch tt.state {
|
||||
case StateFollower:
|
||||
@ -720,7 +758,10 @@ func TestRecvMsgVote(t *testing.T) {
|
||||
sm.step = stepLeader
|
||||
}
|
||||
sm.HardState = pb.HardState{Vote: tt.voteFor}
|
||||
sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 2}, {Term: 2}}}
|
||||
sm.raftLog = &raftLog{
|
||||
storage: &MemoryStorage{ents: []pb.Entry{{}, {Term: 2}, {Term: 2}}},
|
||||
unstable: 3,
|
||||
}
|
||||
|
||||
sm.Step(pb.Message{Type: pb.MsgVote, From: 2, Index: tt.i, LogTerm: tt.term})
|
||||
|
||||
@ -766,7 +807,7 @@ func TestStateTransition(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
sm := newRaft(1, []uint64{1}, 10, 1)
|
||||
sm := newRaft(1, []uint64{1}, 10, 1, nil)
|
||||
sm.state = tt.from
|
||||
|
||||
switch tt.to {
|
||||
@ -805,7 +846,7 @@ func TestAllServerStepdown(t *testing.T) {
|
||||
tterm := uint64(3)
|
||||
|
||||
for i, tt := range tests {
|
||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
switch tt.state {
|
||||
case StateFollower:
|
||||
sm.becomeFollower(1, None)
|
||||
@ -825,8 +866,8 @@ func TestAllServerStepdown(t *testing.T) {
|
||||
if sm.Term != tt.wterm {
|
||||
t.Errorf("#%d.%d term = %v , want %v", i, j, sm.Term, tt.wterm)
|
||||
}
|
||||
if uint64(len(sm.raftLog.ents)) != tt.windex {
|
||||
t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.raftLog.ents), tt.windex)
|
||||
if uint64(len(sm.raftLog.allEntries())) != tt.windex {
|
||||
t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.raftLog.allEntries()), tt.windex)
|
||||
}
|
||||
wlead := uint64(2)
|
||||
if msgType == pb.MsgVote {
|
||||
@ -861,8 +902,11 @@ func TestLeaderAppResp(t *testing.T) {
|
||||
for i, tt := range tests {
|
||||
// sm term is 1 after it becomes the leader.
|
||||
// thus the last log term must be 1 to be committed.
|
||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
|
||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
sm.raftLog = &raftLog{
|
||||
storage: &MemoryStorage{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}},
|
||||
unstable: 3,
|
||||
}
|
||||
sm.becomeCandidate()
|
||||
sm.becomeLeader()
|
||||
sm.readMessages()
|
||||
@ -902,7 +946,7 @@ func TestBcastBeat(t *testing.T) {
|
||||
Term: 1,
|
||||
Nodes: []uint64{1, 2, 3},
|
||||
}
|
||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
sm.Term = 1
|
||||
sm.restore(s)
|
||||
|
||||
@ -952,8 +996,8 @@ func TestRecvMsgBeat(t *testing.T) {
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1)
|
||||
sm.raftLog = &raftLog{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}
|
||||
sm := newRaft(1, []uint64{1, 2, 3}, 10, 1, nil)
|
||||
sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Term: 0}, {Term: 1}}}}
|
||||
sm.Term = 1
|
||||
sm.state = tt.state
|
||||
switch tt.state {
|
||||
@ -985,7 +1029,7 @@ func TestRestore(t *testing.T) {
|
||||
Nodes: []uint64{1, 2, 3},
|
||||
}
|
||||
|
||||
sm := newRaft(1, []uint64{1, 2}, 10, 1)
|
||||
sm := newRaft(1, []uint64{1, 2}, 10, 1, nil)
|
||||
if ok := sm.restore(s); !ok {
|
||||
t.Fatal("restore fail, want succeed")
|
||||
}
|
||||
@ -1016,7 +1060,7 @@ func TestProvideSnap(t *testing.T) {
|
||||
Term: 11, // magic number
|
||||
Nodes: []uint64{1, 2},
|
||||
}
|
||||
sm := newRaft(1, []uint64{1}, 10, 1)
|
||||
sm := newRaft(1, []uint64{1}, 10, 1, nil)
|
||||
// restore the statemachin from a snapshot
|
||||
// so it has a compacted log and a snapshot
|
||||
sm.restore(s)
|
||||
@ -1026,7 +1070,7 @@ func TestProvideSnap(t *testing.T) {
|
||||
|
||||
// force set the next of node 1, so that
|
||||
// node 1 needs a snapshot
|
||||
sm.prs[2].next = sm.raftLog.offset
|
||||
sm.prs[2].next = sm.raftLog.firstIndex()
|
||||
|
||||
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].next - 1, Reject: true})
|
||||
msgs := sm.readMessages()
|
||||
@ -1047,7 +1091,7 @@ func TestRestoreFromSnapMsg(t *testing.T) {
|
||||
}
|
||||
m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s}
|
||||
|
||||
sm := newRaft(2, []uint64{1, 2}, 10, 1)
|
||||
sm := newRaft(2, []uint64{1, 2}, 10, 1, nil)
|
||||
sm.Step(m)
|
||||
|
||||
if !reflect.DeepEqual(sm.raftLog.snapshot, s) {
|
||||
@ -1086,7 +1130,7 @@ func TestSlowNodeRestore(t *testing.T) {
|
||||
// it appends the entry to log and sets pendingConf to be true.
|
||||
func TestStepConfig(t *testing.T) {
|
||||
// a raft that cannot make progress
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, nil)
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
index := r.raftLog.lastIndex()
|
||||
@ -1104,7 +1148,7 @@ func TestStepConfig(t *testing.T) {
|
||||
// the proposal and keep its original state.
|
||||
func TestStepIgnoreConfig(t *testing.T) {
|
||||
// a raft that cannot make progress
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, nil)
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
|
||||
@ -1130,7 +1174,7 @@ func TestRecoverPendingConfig(t *testing.T) {
|
||||
{pb.EntryConfChange, true},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, nil)
|
||||
r.appendEntry(pb.Entry{Type: tt.entType})
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
@ -1149,7 +1193,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) {
|
||||
t.Errorf("expect panic, but nothing happens")
|
||||
}
|
||||
}()
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, nil)
|
||||
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
|
||||
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
|
||||
r.becomeCandidate()
|
||||
@ -1159,7 +1203,7 @@ func TestRecoverDoublePendingConfig(t *testing.T) {
|
||||
|
||||
// TestAddNode tests that addNode could update pendingConf and nodes correctly.
|
||||
func TestAddNode(t *testing.T) {
|
||||
r := newRaft(1, []uint64{1}, 10, 1)
|
||||
r := newRaft(1, []uint64{1}, 10, 1, nil)
|
||||
r.pendingConf = true
|
||||
r.addNode(2)
|
||||
if r.pendingConf != false {
|
||||
@ -1176,7 +1220,7 @@ func TestAddNode(t *testing.T) {
|
||||
// TestRemoveNode tests that removeNode could update pendingConf, nodes and
|
||||
// and removed list correctly.
|
||||
func TestRemoveNode(t *testing.T) {
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1)
|
||||
r := newRaft(1, []uint64{1, 2}, 10, 1, nil)
|
||||
r.pendingConf = true
|
||||
r.removeNode(2)
|
||||
if r.pendingConf != false {
|
||||
@ -1216,7 +1260,12 @@ func ents(terms ...uint64) *raft {
|
||||
ents = append(ents, pb.Entry{Term: term})
|
||||
}
|
||||
|
||||
sm := &raft{raftLog: &raftLog{ents: ents}}
|
||||
sm := &raft{
|
||||
raftLog: &raftLog{
|
||||
storage: &MemoryStorage{ents: ents},
|
||||
unstable: uint64(len(ents)),
|
||||
},
|
||||
}
|
||||
sm.reset(0)
|
||||
return sm
|
||||
}
|
||||
@ -1241,7 +1290,7 @@ func newNetwork(peers ...Interface) *network {
|
||||
id := peerAddrs[i]
|
||||
switch v := p.(type) {
|
||||
case nil:
|
||||
sm := newRaft(id, peerAddrs, 10, 1)
|
||||
sm := newRaft(id, peerAddrs, 10, 1, nil)
|
||||
npeers[id] = sm
|
||||
case *raft:
|
||||
v.id = id
|
||||
|
108
raft/storage.go
Normal file
108
raft/storage.go
Normal file
@ -0,0 +1,108 @@
|
||||
/*
|
||||
Copyright 2014 CoreOS, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package raft
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
pb "github.com/coreos/etcd/raft/raftpb"
|
||||
)
|
||||
|
||||
// Storage is an interface that may be implemented by the application
|
||||
// to retrieve log entries from storage. If no storage implementation
|
||||
// is supplied by the application, a MemoryStorage will be used, which
|
||||
// retains all log entries in memory.
|
||||
//
|
||||
// If any Storage method returns an error, the raft instance will
|
||||
// become inoperable and refuse to participate in elections; the
|
||||
// application is responsible for cleanup and recovery in this case.
|
||||
type Storage interface {
|
||||
// GetEntries returns a slice of log entries in the range [lo,hi).
|
||||
GetEntries(lo, hi uint64) ([]pb.Entry, error)
|
||||
// GetLastIndex returns the index of the last entry in the log.
|
||||
GetLastIndex() (uint64, error)
|
||||
// GetFirstIndex returns the index of the first log entry that is
|
||||
// available via GetEntries (older entries have been incorporated
|
||||
// into the latest Snapshot).
|
||||
GetFirstIndex() (uint64, error)
|
||||
// Compact discards all log entries prior to i, creating a snapshot
|
||||
// which can be used to reconstruct the state at that point.
|
||||
Compact(i uint64) error
|
||||
}
|
||||
|
||||
// MemoryStorage implements the Storage interface backed by an
|
||||
// in-memory array.
|
||||
type MemoryStorage struct {
|
||||
// Protects access to all fields. Most methods of MemoryStorage are
|
||||
// run on the raft goroutine, but Append() is run on an application
|
||||
// goroutine.
|
||||
sync.Mutex
|
||||
|
||||
ents []pb.Entry
|
||||
// offset is the position of the last compaction.
|
||||
// ents[i] has raft log position i+offset.
|
||||
offset uint64
|
||||
}
|
||||
|
||||
// NewMemoryStorage creates an empty MemoryStorage.
|
||||
func NewMemoryStorage() *MemoryStorage {
|
||||
return &MemoryStorage{
|
||||
// Populate the list with a dummy entry at term zero.
|
||||
ents: make([]pb.Entry, 1),
|
||||
}
|
||||
}
|
||||
|
||||
// GetEntries implements the Storage interface.
|
||||
func (ms *MemoryStorage) GetEntries(lo, hi uint64) ([]pb.Entry, error) {
|
||||
ms.Lock()
|
||||
defer ms.Unlock()
|
||||
return ms.ents[lo-ms.offset : hi-ms.offset], nil
|
||||
}
|
||||
|
||||
// GetLastIndex implements the Storage interface.
|
||||
func (ms *MemoryStorage) GetLastIndex() (uint64, error) {
|
||||
ms.Lock()
|
||||
defer ms.Unlock()
|
||||
return ms.offset + uint64(len(ms.ents)) - 1, nil
|
||||
}
|
||||
|
||||
// GetFirstIndex implements the Storage interface.
|
||||
func (ms *MemoryStorage) GetFirstIndex() (uint64, error) {
|
||||
ms.Lock()
|
||||
defer ms.Unlock()
|
||||
return ms.offset, nil
|
||||
}
|
||||
|
||||
// Compact implements the Storage interface.
|
||||
func (ms *MemoryStorage) Compact(i uint64) error {
|
||||
ms.Lock()
|
||||
defer ms.Unlock()
|
||||
i -= ms.offset
|
||||
ents := make([]pb.Entry, 1, 1+uint64(len(ms.ents))-i)
|
||||
ents[0].Term = ms.ents[i].Term
|
||||
ents = append(ents, ms.ents[i+1:]...)
|
||||
ms.ents = ents
|
||||
ms.offset += i
|
||||
return nil
|
||||
}
|
||||
|
||||
// Append the new entries to storage.
|
||||
func (ms *MemoryStorage) Append(entries []pb.Entry) {
|
||||
ms.Lock()
|
||||
defer ms.Unlock()
|
||||
ms.ents = append(ms.ents, entries...)
|
||||
}
|
54
raft/util.go
Normal file
54
raft/util.go
Normal file
@ -0,0 +1,54 @@
|
||||
/*
|
||||
Copyright 2014 CoreOS, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package raft
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
|
||||
pb "github.com/coreos/etcd/raft/raftpb"
|
||||
)
|
||||
|
||||
// DescribeMessage returns a concise human-readable description of a
|
||||
// Message for debugging.
|
||||
func DescribeMessage(m pb.Message) string {
|
||||
var buf bytes.Buffer
|
||||
fmt.Fprintf(&buf, "%d->%d %s Term:%d Log:%d/%d", m.From, m.To, m.Type, m.Term, m.LogTerm, m.Index)
|
||||
if m.Reject {
|
||||
fmt.Fprintf(&buf, " Rejected")
|
||||
}
|
||||
if m.Commit != 0 {
|
||||
fmt.Fprintf(&buf, " Commit:%d", m.Commit)
|
||||
}
|
||||
if len(m.Entries) > 0 {
|
||||
fmt.Fprintf(&buf, " Entries:[")
|
||||
for _, e := range m.Entries {
|
||||
buf.WriteString(DescribeEntry(e))
|
||||
}
|
||||
fmt.Fprintf(&buf, "]")
|
||||
}
|
||||
if !IsEmptySnap(m.Snapshot) {
|
||||
fmt.Fprintf(&buf, " Snapshot:%v", m.Snapshot)
|
||||
}
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
// DescribeEntry returns a concise human-readable description of an
|
||||
// Entry for debugging.
|
||||
func DescribeEntry(e pb.Entry) string {
|
||||
return fmt.Sprintf("%d/%d %s %q", e.Term, e.Index, e.Type, string(e.Data))
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user