Merge pull request #2568 from xiang90/raftnode

raft: make node configurable
This commit is contained in:
Xiang Li 2015-03-24 11:18:22 -07:00
commit 866a9d4e41
6 changed files with 82 additions and 38 deletions

View File

@ -40,6 +40,13 @@ const (
// The max throughput is around 10K. Keep a 5K entries is enough for helping // The max throughput is around 10K. Keep a 5K entries is enough for helping
// follower to catch up. // follower to catch up.
numberOfCatchUpEntries = 5000 numberOfCatchUpEntries = 5000
// The max throughput of etcd will not exceed 100MB/s (100K * 1KB value).
// Assuming the RTT is around 10ms, 1MB max size is large enough.
maxSizePerMsg = 1 * 1024 * 1024
// Never overflow the rafthttp buffer, which is 4096.
// TODO: a better const?
maxInflightMsgs = 4096 / 8
) )
var ( var (
@ -204,7 +211,15 @@ func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *
id = member.ID id = member.ID
log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID()) log.Printf("etcdserver: start member %s in cluster %s", id, cfg.Cluster.ID())
s = raft.NewMemoryStorage() s = raft.NewMemoryStorage()
n = raft.StartNode(uint64(id), peers, cfg.ElectionTicks, 1, s) c := &raft.Config{
ID: uint64(id),
ElectionTick: cfg.ElectionTicks,
HeartbeatTick: 1,
Storage: s,
MaxSizePerMsg: maxSizePerMsg,
MaxInflightMsgs: maxInflightMsgs,
}
n = raft.StartNode(c, peers)
raftStatus = n.Status raftStatus = n.Status
return return
} }
@ -224,7 +239,15 @@ func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, raft.N
} }
s.SetHardState(st) s.SetHardState(st)
s.Append(ents) s.Append(ents)
n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s, 0) c := &raft.Config{
ID: uint64(id),
ElectionTick: cfg.ElectionTicks,
HeartbeatTick: 1,
Storage: s,
MaxSizePerMsg: maxSizePerMsg,
MaxInflightMsgs: maxInflightMsgs,
}
n := raft.RestartNode(c)
raftStatus = n.Status raftStatus = n.Status
return id, n, s, w return id, n, s, w
} }
@ -266,7 +289,15 @@ func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (type
} }
s.SetHardState(st) s.SetHardState(st)
s.Append(ents) s.Append(ents)
n := raft.RestartNode(uint64(id), cfg.ElectionTicks, 1, s, 0) c := &raft.Config{
ID: uint64(id),
ElectionTick: cfg.ElectionTicks,
HeartbeatTick: 1,
Storage: s,
MaxSizePerMsg: maxSizePerMsg,
MaxInflightMsgs: maxInflightMsgs,
}
n := raft.RestartNode(c)
raftStatus = n.Status raftStatus = n.Status
return id, n, s, w return id, n, s, w
} }

View File

@ -24,7 +24,8 @@ func saveStateToDisk(st pb.HardState) {}
func saveToDisk(ents []pb.Entry) {} func saveToDisk(ents []pb.Entry) {}
func Example_Node() { func Example_Node() {
n := StartNode(0, nil, 0, 0, nil) c := &Config{}
n := StartNode(c, nil)
// stuff to n happens in other goroutines // stuff to n happens in other goroutines

View File

@ -140,20 +140,9 @@ type Peer struct {
Context []byte Context []byte
} }
// StartNode returns a new Node given a unique raft id, a list of raft peers, and // StartNode returns a new Node given configuration and a list of raft peers.
// the election and heartbeat timeouts in units of ticks.
// It appends a ConfChangeAddNode entry for each given peer to the initial log. // It appends a ConfChangeAddNode entry for each given peer to the initial log.
func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage) Node { func StartNode(c *Config, peers []Peer) Node {
c := &Config{
ID: id,
Peers: nil,
ElectionTick: election,
HeartbeatTick: heartbeat,
Storage: storage,
// TODO(xiangli): make this configurable
MaxSizePerMsg: noLimit,
MaxInflightMsgs: 256,
}
r := newRaft(c) r := newRaft(c)
// become the follower at term 1 and apply initial configuration // become the follower at term 1 and apply initial configuration
// entires of term 1 // entires of term 1
@ -194,18 +183,7 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage
// The current membership of the cluster will be restored from the Storage. // The current membership of the cluster will be restored from the Storage.
// If the caller has an existing state machine, pass in the last log index that // If the caller has an existing state machine, pass in the last log index that
// has been applied to it; otherwise use zero. // has been applied to it; otherwise use zero.
func RestartNode(id uint64, election, heartbeat int, storage Storage, applied uint64) Node { func RestartNode(c *Config) Node {
c := &Config{
ID: id,
Peers: nil,
ElectionTick: election,
HeartbeatTick: heartbeat,
Storage: storage,
Applied: applied,
// TODO(xiangli): make this configurable
MaxSizePerMsg: noLimit,
MaxInflightMsgs: 256,
}
r := newRaft(c) r := newRaft(c)
n := newNode() n := newNode()

View File

@ -321,7 +321,15 @@ func TestNodeStart(t *testing.T) {
}, },
} }
storage := NewMemoryStorage() storage := NewMemoryStorage()
n := StartNode(1, []Peer{{ID: 1}}, 10, 1, storage) c := &Config{
ID: 1,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: storage,
MaxSizePerMsg: noLimit,
MaxInflightMsgs: 256,
}
n := StartNode(c, []Peer{{ID: 1}})
n.Campaign(ctx) n.Campaign(ctx)
g := <-n.Ready() g := <-n.Ready()
if !reflect.DeepEqual(g, wants[0]) { if !reflect.DeepEqual(g, wants[0]) {
@ -362,7 +370,15 @@ func TestNodeRestart(t *testing.T) {
storage := NewMemoryStorage() storage := NewMemoryStorage()
storage.SetHardState(st) storage.SetHardState(st)
storage.Append(entries) storage.Append(entries)
n := RestartNode(1, 10, 1, storage, 0) c := &Config{
ID: 1,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: storage,
MaxSizePerMsg: noLimit,
MaxInflightMsgs: 256,
}
n := RestartNode(c)
if g := <-n.Ready(); !reflect.DeepEqual(g, want) { if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
t.Errorf("g = %+v,\n w %+v", g, want) t.Errorf("g = %+v,\n w %+v", g, want)
} }
@ -398,7 +414,15 @@ func TestNodeRestartFromSnapshot(t *testing.T) {
s.SetHardState(st) s.SetHardState(st)
s.ApplySnapshot(snap) s.ApplySnapshot(snap)
s.Append(entries) s.Append(entries)
n := RestartNode(1, 10, 1, s, 0) c := &Config{
ID: 1,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: s,
MaxSizePerMsg: noLimit,
MaxInflightMsgs: 256,
}
n := RestartNode(c)
if g := <-n.Ready(); !reflect.DeepEqual(g, want) { if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
t.Errorf("g = %+v,\n w %+v", g, want) t.Errorf("g = %+v,\n w %+v", g, want)
} else { } else {
@ -417,7 +441,15 @@ func TestNodeAdvance(t *testing.T) {
defer cancel() defer cancel()
storage := NewMemoryStorage() storage := NewMemoryStorage()
n := StartNode(1, []Peer{{ID: 1}}, 10, 1, storage) c := &Config{
ID: 1,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: storage,
MaxSizePerMsg: noLimit,
MaxInflightMsgs: 256,
}
n := StartNode(c, []Peer{{ID: 1}})
n.Campaign(ctx) n.Campaign(ctx)
<-n.Ready() <-n.Ready()
n.Propose(ctx, []byte("foo")) n.Propose(ctx, []byte("foo"))

View File

@ -55,12 +55,14 @@ func (st StateType) String() string {
type Config struct { type Config struct {
// ID is the identity of the local raft. ID cannot be 0. // ID is the identity of the local raft. ID cannot be 0.
ID uint64 ID uint64
// Peers contains the IDs of all nodes (including self) in
// peers contains the IDs of all nodes (including self) in
// the raft cluster. It should only be set when starting a new // the raft cluster. It should only be set when starting a new
// raft cluster. // raft cluster.
// Restarting raft from previous configuration will panic if // Restarting raft from previous configuration will panic if
// Peers is set. // peers is set.
Peers []uint64 // peer is private and only used for testing right now.
peers []uint64
// ElectionTick is the election timeout. If a follower does not // ElectionTick is the election timeout. If a follower does not
// receive any message from the leader of current term during // receive any message from the leader of current term during
@ -161,7 +163,7 @@ func newRaft(c *Config) *raft {
if err != nil { if err != nil {
panic(err) // TODO(bdarnell) panic(err) // TODO(bdarnell)
} }
peers := c.Peers peers := c.peers
if len(cs.Nodes) > 0 { if len(cs.Nodes) > 0 {
if len(peers) > 0 { if len(peers) > 0 {
// TODO(bdarnell): the peers argument is always nil except in // TODO(bdarnell): the peers argument is always nil except in

View File

@ -1884,7 +1884,7 @@ func idsBySize(size int) []uint64 {
func newTestRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft { func newTestRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft {
c := &Config{ c := &Config{
ID: id, ID: id,
Peers: peers, peers: peers,
ElectionTick: election, ElectionTick: election,
HeartbeatTick: heartbeat, HeartbeatTick: heartbeat,
Storage: storage, Storage: storage,