From abddef0f286e10199e0dd20558121c67d818667b Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 23 Mar 2015 21:19:03 -0700 Subject: [PATCH 1/2] raft: make node configurable --- etcdserver/raft.go | 37 ++++++++++++++++++++++++++++++++++--- raft/example_test.go | 3 ++- raft/node.go | 29 ++++------------------------- raft/node_test.go | 40 ++++++++++++++++++++++++++++++++++++---- 4 files changed, 76 insertions(+), 33 deletions(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index bda306a43..eb3f2b324 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -40,6 +40,13 @@ const ( // The max throughput is around 10K. Keep a 5K entries is enough for helping // follower to catch up. numberOfCatchUpEntries = 5000 + + // The max throughput of etcd will not exceed 100MB/s (100K * 1KB value). + // Assuming the RTT is around 10ms, 1MB max size is large enough. + maxSizePerMsg = 1 * 1024 * 1024 + // Never overflow the rafthttp buffer, which is 4096. + // TODO: a better const? + maxInflightMsgs = 4096 / 8 ) var ( @@ -204,7 +211,15 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s * id = member.ID log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID()) s = raft.NewMemoryStorage() - n = raft.StartNode(uint64(id), peers, cfg.ElectionTicks, 1, s) + c := &raft.Config{ + ID: uint64(id), + ElectionTick: cfg.ElectionTicks, + HeartbeatTick: 1, + Storage: s, + MaxSizePerMsg: maxSizePerMsg, + MaxInflightMsgs: maxInflightMsgs, + } + n = raft.StartNode(c, peers) raftStatus = n.Status return } @@ -224,7 +239,15 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.N } s.SetHardState(st) s.Append(ents) - n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s, 0) + c := &raft.Config{ + ID: uint64(id), + ElectionTick: cfg.ElectionTicks, + HeartbeatTick: 1, + Storage: s, + MaxSizePerMsg: maxSizePerMsg, + MaxInflightMsgs: maxInflightMsgs, + } + n := raft.RestartNode(c) raftStatus = n.Status return id, n, s, w } @@ -266,7 +289,15 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type } s.SetHardState(st) s.Append(ents) - n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s, 0) + c := &raft.Config{ + ID: uint64(id), + ElectionTick: cfg.ElectionTicks, + HeartbeatTick: 1, + Storage: s, + MaxSizePerMsg: maxSizePerMsg, + MaxInflightMsgs: maxInflightMsgs, + } + n := raft.RestartNode(c) raftStatus = n.Status return id, n, s, w } diff --git a/raft/example_test.go b/raft/example_test.go index 00bbb973a..ded3b8a3f 100644 --- a/raft/example_test.go +++ b/raft/example_test.go @@ -24,7 +24,8 @@ func saveStateToDisk(st pb.HardState) {} func saveToDisk(ents []pb.Entry) {} func Example_Node() { - n := StartNode(0, nil, 0, 0, nil) + c := &Config{} + n := StartNode(c, nil) // stuff to n happens in other goroutines diff --git a/raft/node.go b/raft/node.go index a32dbb401..bdb4fcd24 100644 --- a/raft/node.go +++ b/raft/node.go @@ -140,20 +140,10 @@ type Peer struct { Context []byte } -// StartNode returns a new Node given a unique raft id, a list of raft peers, and -// the election and heartbeat timeouts in units of ticks. +// StartNode returns a new Node given configuration and a list of raft peers. +// It ignores the given peer list in the given Config. // It appends a ConfChangeAddNode entry for each given peer to the initial log. -func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage) Node { - c := &Config{ - ID: id, - Peers: nil, - ElectionTick: election, - HeartbeatTick: heartbeat, - Storage: storage, - // TODO(xiangli): make this configurable - MaxSizePerMsg: noLimit, - MaxInflightMsgs: 256, - } +func StartNode(c *Config, peers []Peer) Node { r := newRaft(c) // become the follower at term 1 and apply initial configuration // entires of term 1 @@ -194,18 +184,7 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage // The current membership of the cluster will be restored from the Storage. // If the caller has an existing state machine, pass in the last log index that // has been applied to it; otherwise use zero. -func RestartNode(id uint64, election, heartbeat int, storage Storage, applied uint64) Node { - c := &Config{ - ID: id, - Peers: nil, - ElectionTick: election, - HeartbeatTick: heartbeat, - Storage: storage, - Applied: applied, - // TODO(xiangli): make this configurable - MaxSizePerMsg: noLimit, - MaxInflightMsgs: 256, - } +func RestartNode(c *Config) Node { r := newRaft(c) n := newNode() diff --git a/raft/node_test.go b/raft/node_test.go index 7e610e8b9..dea95c692 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -321,7 +321,15 @@ func TestNodeStart(t *testing.T) { }, } storage := NewMemoryStorage() - n := StartNode(1, []Peer{{ID: 1}}, 10, 1, storage) + c := &Config{ + ID: 1, + ElectionTick: 10, + HeartbeatTick: 1, + Storage: storage, + MaxSizePerMsg: noLimit, + MaxInflightMsgs: 256, + } + n := StartNode(c, []Peer{{ID: 1}}) n.Campaign(ctx) g := <-n.Ready() if !reflect.DeepEqual(g, wants[0]) { @@ -362,7 +370,15 @@ func TestNodeRestart(t *testing.T) { storage := NewMemoryStorage() storage.SetHardState(st) storage.Append(entries) - n := RestartNode(1, 10, 1, storage, 0) + c := &Config{ + ID: 1, + ElectionTick: 10, + HeartbeatTick: 1, + Storage: storage, + MaxSizePerMsg: noLimit, + MaxInflightMsgs: 256, + } + n := RestartNode(c) if g := <-n.Ready(); !reflect.DeepEqual(g, want) { t.Errorf("g = %+v,\n w %+v", g, want) } @@ -398,7 +414,15 @@ func TestNodeRestartFromSnapshot(t *testing.T) { s.SetHardState(st) s.ApplySnapshot(snap) s.Append(entries) - n := RestartNode(1, 10, 1, s, 0) + c := &Config{ + ID: 1, + ElectionTick: 10, + HeartbeatTick: 1, + Storage: s, + MaxSizePerMsg: noLimit, + MaxInflightMsgs: 256, + } + n := RestartNode(c) if g := <-n.Ready(); !reflect.DeepEqual(g, want) { t.Errorf("g = %+v,\n w %+v", g, want) } else { @@ -417,7 +441,15 @@ func TestNodeAdvance(t *testing.T) { defer cancel() storage := NewMemoryStorage() - n := StartNode(1, []Peer{{ID: 1}}, 10, 1, storage) + c := &Config{ + ID: 1, + ElectionTick: 10, + HeartbeatTick: 1, + Storage: storage, + MaxSizePerMsg: noLimit, + MaxInflightMsgs: 256, + } + n := StartNode(c, []Peer{{ID: 1}}) n.Campaign(ctx) <-n.Ready() n.Propose(ctx, []byte("foo")) From b3fb052ad4038f490ced41294b50200acb5638a4 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 24 Mar 2015 11:10:07 -0700 Subject: [PATCH 2/2] raft: make peers a prviate field in raft.Config --- raft/node.go | 1 - raft/raft.go | 10 ++++++---- raft/raft_test.go | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/raft/node.go b/raft/node.go index bdb4fcd24..6dc717b51 100644 --- a/raft/node.go +++ b/raft/node.go @@ -141,7 +141,6 @@ type Peer struct { } // StartNode returns a new Node given configuration and a list of raft peers. -// It ignores the given peer list in the given Config. // It appends a ConfChangeAddNode entry for each given peer to the initial log. func StartNode(c *Config, peers []Peer) Node { r := newRaft(c) diff --git a/raft/raft.go b/raft/raft.go index 6b41b4510..ee5b1cd3f 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -55,12 +55,14 @@ func (st StateType) String() string { type Config struct { // ID is the identity of the local raft. ID cannot be 0. ID uint64 - // Peers contains the IDs of all nodes (including self) in + + // peers contains the IDs of all nodes (including self) in // the raft cluster. It should only be set when starting a new // raft cluster. // Restarting raft from previous configuration will panic if - // Peers is set. - Peers []uint64 + // peers is set. + // peer is private and only used for testing right now. + peers []uint64 // ElectionTick is the election timeout. If a follower does not // receive any message from the leader of current term during @@ -161,7 +163,7 @@ func newRaft(c *Config) *raft { if err != nil { panic(err) // TODO(bdarnell) } - peers := c.Peers + peers := c.peers if len(cs.Nodes) > 0 { if len(peers) > 0 { // TODO(bdarnell): the peers argument is always nil except in diff --git a/raft/raft_test.go b/raft/raft_test.go index 815cb1d08..22a27f4c8 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1884,7 +1884,7 @@ func idsBySize(size int) []uint64 { func newTestRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft { c := &Config{ ID: id, - Peers: peers, + peers: peers, ElectionTick: election, HeartbeatTick: heartbeat, Storage: storage,