diff --git a/etcdserver/raft.go b/etcdserver/raft.go index bda306a43..eb3f2b324 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -40,6 +40,13 @@ const ( // The max throughput is around 10K. Keep a 5K entries is enough for helping // follower to catch up. numberOfCatchUpEntries = 5000 + + // The max throughput of etcd will not exceed 100MB/s (100K * 1KB value). + // Assuming the RTT is around 10ms, 1MB max size is large enough. + maxSizePerMsg = 1 * 1024 * 1024 + // Never overflow the rafthttp buffer, which is 4096. + // TODO: a better const? + maxInflightMsgs = 4096 / 8 ) var ( @@ -204,7 +211,15 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s * id = member.ID log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID()) s = raft.NewMemoryStorage() - n = raft.StartNode(uint64(id), peers, cfg.ElectionTicks, 1, s) + c := &raft.Config{ + ID: uint64(id), + ElectionTick: cfg.ElectionTicks, + HeartbeatTick: 1, + Storage: s, + MaxSizePerMsg: maxSizePerMsg, + MaxInflightMsgs: maxInflightMsgs, + } + n = raft.StartNode(c, peers) raftStatus = n.Status return } @@ -224,7 +239,15 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.N } s.SetHardState(st) s.Append(ents) - n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s, 0) + c := &raft.Config{ + ID: uint64(id), + ElectionTick: cfg.ElectionTicks, + HeartbeatTick: 1, + Storage: s, + MaxSizePerMsg: maxSizePerMsg, + MaxInflightMsgs: maxInflightMsgs, + } + n := raft.RestartNode(c) raftStatus = n.Status return id, n, s, w } @@ -266,7 +289,15 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type } s.SetHardState(st) s.Append(ents) - n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s, 0) + c := &raft.Config{ + ID: uint64(id), + ElectionTick: cfg.ElectionTicks, + HeartbeatTick: 1, + Storage: s, + MaxSizePerMsg: maxSizePerMsg, + MaxInflightMsgs: maxInflightMsgs, + } + n := raft.RestartNode(c) raftStatus = n.Status return id, n, s, w } diff --git a/raft/example_test.go b/raft/example_test.go index 00bbb973a..ded3b8a3f 100644 --- a/raft/example_test.go +++ b/raft/example_test.go @@ -24,7 +24,8 @@ func saveStateToDisk(st pb.HardState) {} func saveToDisk(ents []pb.Entry) {} func Example_Node() { - n := StartNode(0, nil, 0, 0, nil) + c := &Config{} + n := StartNode(c, nil) // stuff to n happens in other goroutines diff --git a/raft/node.go b/raft/node.go index a32dbb401..bdb4fcd24 100644 --- a/raft/node.go +++ b/raft/node.go @@ -140,20 +140,10 @@ type Peer struct { Context []byte } -// StartNode returns a new Node given a unique raft id, a list of raft peers, and -// the election and heartbeat timeouts in units of ticks. +// StartNode returns a new Node given configuration and a list of raft peers. +// It ignores the given peer list in the given Config. // It appends a ConfChangeAddNode entry for each given peer to the initial log. -func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage) Node { - c := &Config{ - ID: id, - Peers: nil, - ElectionTick: election, - HeartbeatTick: heartbeat, - Storage: storage, - // TODO(xiangli): make this configurable - MaxSizePerMsg: noLimit, - MaxInflightMsgs: 256, - } +func StartNode(c *Config, peers []Peer) Node { r := newRaft(c) // become the follower at term 1 and apply initial configuration // entires of term 1 @@ -194,18 +184,7 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage // The current membership of the cluster will be restored from the Storage. // If the caller has an existing state machine, pass in the last log index that // has been applied to it; otherwise use zero. -func RestartNode(id uint64, election, heartbeat int, storage Storage, applied uint64) Node { - c := &Config{ - ID: id, - Peers: nil, - ElectionTick: election, - HeartbeatTick: heartbeat, - Storage: storage, - Applied: applied, - // TODO(xiangli): make this configurable - MaxSizePerMsg: noLimit, - MaxInflightMsgs: 256, - } +func RestartNode(c *Config) Node { r := newRaft(c) n := newNode() diff --git a/raft/node_test.go b/raft/node_test.go index 7e610e8b9..dea95c692 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -321,7 +321,15 @@ func TestNodeStart(t *testing.T) { }, } storage := NewMemoryStorage() - n := StartNode(1, []Peer{{ID: 1}}, 10, 1, storage) + c := &Config{ + ID: 1, + ElectionTick: 10, + HeartbeatTick: 1, + Storage: storage, + MaxSizePerMsg: noLimit, + MaxInflightMsgs: 256, + } + n := StartNode(c, []Peer{{ID: 1}}) n.Campaign(ctx) g := <-n.Ready() if !reflect.DeepEqual(g, wants[0]) { @@ -362,7 +370,15 @@ func TestNodeRestart(t *testing.T) { storage := NewMemoryStorage() storage.SetHardState(st) storage.Append(entries) - n := RestartNode(1, 10, 1, storage, 0) + c := &Config{ + ID: 1, + ElectionTick: 10, + HeartbeatTick: 1, + Storage: storage, + MaxSizePerMsg: noLimit, + MaxInflightMsgs: 256, + } + n := RestartNode(c) if g := <-n.Ready(); !reflect.DeepEqual(g, want) { t.Errorf("g = %+v,\n w %+v", g, want) } @@ -398,7 +414,15 @@ func TestNodeRestartFromSnapshot(t *testing.T) { s.SetHardState(st) s.ApplySnapshot(snap) s.Append(entries) - n := RestartNode(1, 10, 1, s, 0) + c := &Config{ + ID: 1, + ElectionTick: 10, + HeartbeatTick: 1, + Storage: s, + MaxSizePerMsg: noLimit, + MaxInflightMsgs: 256, + } + n := RestartNode(c) if g := <-n.Ready(); !reflect.DeepEqual(g, want) { t.Errorf("g = %+v,\n w %+v", g, want) } else { @@ -417,7 +441,15 @@ func TestNodeAdvance(t *testing.T) { defer cancel() storage := NewMemoryStorage() - n := StartNode(1, []Peer{{ID: 1}}, 10, 1, storage) + c := &Config{ + ID: 1, + ElectionTick: 10, + HeartbeatTick: 1, + Storage: storage, + MaxSizePerMsg: noLimit, + MaxInflightMsgs: 256, + } + n := StartNode(c, []Peer{{ID: 1}}) n.Campaign(ctx) <-n.Ready() n.Propose(ctx, []byte("foo"))