mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: Config -> ConfigChange
Configure -> ProposeConfigChange AddNode, RemoveNode -> ApplyConfigChange
This commit is contained in:
parent
ec8f493fde
commit
d92931853e
@ -127,16 +127,16 @@ func (s *EtcdServer) run() {
|
||||
if err := r.Unmarshal(e.Data); err != nil {
|
||||
panic("TODO: this is bad, what do we do about it?")
|
||||
}
|
||||
s.w.Trigger(r.Id, s.applyRequest(r))
|
||||
case raftpb.EntryConfig:
|
||||
var c raftpb.Config
|
||||
if err := c.Unmarshal(e.Data); err != nil {
|
||||
s.w.Trigger(r.Id, s.apply(r))
|
||||
case raftpb.EntryConfigChange:
|
||||
var cc raftpb.ConfigChange
|
||||
if err := cc.Unmarshal(e.Data); err != nil {
|
||||
panic("TODO: this is bad, what do we do about it?")
|
||||
}
|
||||
s.applyConfig(c)
|
||||
s.w.Trigger(c.ID, nil)
|
||||
s.Node.ApplyConfigChange(cc)
|
||||
s.w.Trigger(cc.ID, nil)
|
||||
default:
|
||||
panic("unsupported entry type")
|
||||
panic("unexpected entry type")
|
||||
}
|
||||
appliedi = e.Index
|
||||
}
|
||||
@ -231,38 +231,38 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
|
||||
}
|
||||
|
||||
func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) error {
|
||||
req := raftpb.Config{
|
||||
cc := raftpb.ConfigChange{
|
||||
ID: GenID(),
|
||||
Type: raftpb.ConfigAddNode,
|
||||
Type: raftpb.ConfigChangeAddNode,
|
||||
NodeID: id,
|
||||
Context: context,
|
||||
}
|
||||
return s.configure(ctx, req)
|
||||
return s.configure(ctx, cc)
|
||||
}
|
||||
|
||||
func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error {
|
||||
req := raftpb.Config{
|
||||
cc := raftpb.ConfigChange{
|
||||
ID: GenID(),
|
||||
Type: raftpb.ConfigRemoveNode,
|
||||
Type: raftpb.ConfigChangeRemoveNode,
|
||||
NodeID: id,
|
||||
}
|
||||
return s.configure(ctx, req)
|
||||
return s.configure(ctx, cc)
|
||||
}
|
||||
|
||||
// configure sends configuration change through consensus then performs it.
|
||||
// It will block until the change is performed or there is an error.
|
||||
func (s *EtcdServer) configure(ctx context.Context, r raftpb.Config) error {
|
||||
ch := s.w.Register(r.ID)
|
||||
if err := s.Node.Configure(ctx, r); err != nil {
|
||||
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfigChange) error {
|
||||
ch := s.w.Register(cc.ID)
|
||||
if err := s.Node.ProposeConfigChange(ctx, cc); err != nil {
|
||||
log.Printf("configure error: %v", err)
|
||||
s.w.Trigger(r.ID, nil)
|
||||
s.w.Trigger(cc.ID, nil)
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case <-ch:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
s.w.Trigger(r.ID, nil) // GC wait
|
||||
s.w.Trigger(cc.ID, nil) // GC wait
|
||||
return ctx.Err()
|
||||
case <-s.done:
|
||||
return ErrStopped
|
||||
@ -300,8 +300,8 @@ func getExpirationTime(r *pb.Request) time.Time {
|
||||
return t
|
||||
}
|
||||
|
||||
// applyRequest interprets r as a call to store.X and returns an Response interpreted from store.Event
|
||||
func (s *EtcdServer) applyRequest(r pb.Request) Response {
|
||||
// apply interprets r as a call to store.X and returns an Response interpreted from store.Event
|
||||
func (s *EtcdServer) apply(r pb.Request) Response {
|
||||
f := func(ev *store.Event, err error) Response {
|
||||
return Response{Event: ev, err: err}
|
||||
}
|
||||
@ -341,18 +341,6 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EtcdServer) applyConfig(r raftpb.Config) {
|
||||
switch r.Type {
|
||||
case raftpb.ConfigAddNode:
|
||||
s.Node.AddNode(r.NodeID)
|
||||
case raftpb.ConfigRemoveNode:
|
||||
s.Node.RemoveNode(r.NodeID)
|
||||
default:
|
||||
// This should never be reached
|
||||
panic("unexpected config type")
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: non-blocking snapshot
|
||||
func (s *EtcdServer) snapshot() {
|
||||
d, err := s.Store.Save()
|
||||
|
@ -120,7 +120,7 @@ func TestDoBadLocalAction(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestApplyRequest(t *testing.T) {
|
||||
func TestApply(t *testing.T) {
|
||||
tests := []struct {
|
||||
req pb.Request
|
||||
|
||||
@ -188,7 +188,7 @@ func TestApplyRequest(t *testing.T) {
|
||||
for i, tt := range tests {
|
||||
st := &storeRecorder{}
|
||||
srv := &EtcdServer{Store: st}
|
||||
resp := srv.applyRequest(tt.req)
|
||||
resp := srv.apply(tt.req)
|
||||
|
||||
if !reflect.DeepEqual(resp, tt.wresp) {
|
||||
t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
|
||||
@ -594,9 +594,9 @@ func TestRecvSlowSnapshot(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestAddNode tests AddNode could propose configuration and add node to raft.
|
||||
// TestAddNode tests AddNode could propose and perform node addition.
|
||||
func TestAddNode(t *testing.T) {
|
||||
n := newNodeCommitterRecorder()
|
||||
n := newNodeConfigChangeCommitterRecorder()
|
||||
s := &EtcdServer{
|
||||
Node: n,
|
||||
Store: &storeRecorder{},
|
||||
@ -608,15 +608,15 @@ func TestAddNode(t *testing.T) {
|
||||
action := n.Action()
|
||||
s.Stop()
|
||||
|
||||
waction := []string{"Configure", "AddNode"}
|
||||
waction := []string{"ProposeConfigChange:ConfigChangeAddNode", "ApplyConfigChange:ConfigChangeAddNode"}
|
||||
if !reflect.DeepEqual(action, waction) {
|
||||
t.Errorf("action = %v, want %v", action, waction)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRemoveNode tests RemoveNode could propose configuration and remove node from raft.
|
||||
// TestRemoveNode tests RemoveNode could propose and perform node removal.
|
||||
func TestRemoveNode(t *testing.T) {
|
||||
n := newNodeCommitterRecorder()
|
||||
n := newNodeConfigChangeCommitterRecorder()
|
||||
s := &EtcdServer{
|
||||
Node: n,
|
||||
Store: &storeRecorder{},
|
||||
@ -628,7 +628,7 @@ func TestRemoveNode(t *testing.T) {
|
||||
action := n.Action()
|
||||
s.Stop()
|
||||
|
||||
waction := []string{"Configure", "RemoveNode"}
|
||||
waction := []string{"ProposeConfigChange:ConfigChangeRemoveNode", "ApplyConfigChange:ConfigChangeRemoveNode"}
|
||||
if !reflect.DeepEqual(action, waction) {
|
||||
t.Errorf("action = %v, want %v", action, waction)
|
||||
}
|
||||
@ -802,16 +802,17 @@ func newReadyNode() *readyNode {
|
||||
readyc := make(chan raft.Ready, 1)
|
||||
return &readyNode{readyc: readyc}
|
||||
}
|
||||
func (n *readyNode) Tick() {}
|
||||
func (n *readyNode) Campaign(ctx context.Context) error { return nil }
|
||||
func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil }
|
||||
func (n *readyNode) Configure(ctx context.Context, conf raftpb.Config) error { return nil }
|
||||
func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil }
|
||||
func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
|
||||
func (n *readyNode) Stop() {}
|
||||
func (n *readyNode) Compact(d []byte) {}
|
||||
func (n *readyNode) AddNode(id int64) {}
|
||||
func (n *readyNode) RemoveNode(id int64) {}
|
||||
func (n *readyNode) Tick() {}
|
||||
func (n *readyNode) Campaign(ctx context.Context) error { return nil }
|
||||
func (n *readyNode) Propose(ctx context.Context, data []byte) error { return nil }
|
||||
func (n *readyNode) ProposeConfigChange(ctx context.Context, conf raftpb.ConfigChange) error {
|
||||
return nil
|
||||
}
|
||||
func (n *readyNode) Step(ctx context.Context, msg raftpb.Message) error { return nil }
|
||||
func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
|
||||
func (n *readyNode) ApplyConfigChange(conf raftpb.ConfigChange) {}
|
||||
func (n *readyNode) Stop() {}
|
||||
func (n *readyNode) Compact(d []byte) {}
|
||||
|
||||
type nodeRecorder struct {
|
||||
recorder
|
||||
@ -828,8 +829,8 @@ func (n *nodeRecorder) Propose(ctx context.Context, data []byte) error {
|
||||
n.record("Propose")
|
||||
return nil
|
||||
}
|
||||
func (n *nodeRecorder) Configure(ctx context.Context, conf raftpb.Config) error {
|
||||
n.record("Configure")
|
||||
func (n *nodeRecorder) ProposeConfigChange(ctx context.Context, conf raftpb.ConfigChange) error {
|
||||
n.record("ProposeConfigChange")
|
||||
return nil
|
||||
}
|
||||
func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
|
||||
@ -837,18 +838,15 @@ func (n *nodeRecorder) Step(ctx context.Context, msg raftpb.Message) error {
|
||||
return nil
|
||||
}
|
||||
func (n *nodeRecorder) Ready() <-chan raft.Ready { return nil }
|
||||
func (n *nodeRecorder) ApplyConfigChange(conf raftpb.ConfigChange) {
|
||||
n.record("ApplyConfigChange")
|
||||
}
|
||||
func (n *nodeRecorder) Stop() {
|
||||
n.record("Stop")
|
||||
}
|
||||
func (n *nodeRecorder) Compact(d []byte) {
|
||||
n.record("Compact")
|
||||
}
|
||||
func (n *nodeRecorder) AddNode(id int64) {
|
||||
n.record("AddNode")
|
||||
}
|
||||
func (n *nodeRecorder) RemoveNode(id int64) {
|
||||
n.record("RemoveNode")
|
||||
}
|
||||
|
||||
type nodeProposeDataRecorder struct {
|
||||
nodeRecorder
|
||||
@ -880,28 +878,28 @@ func (n *nodeProposalBlockerRecorder) Propose(ctx context.Context, data []byte)
|
||||
return nil
|
||||
}
|
||||
|
||||
type nodeCommitterRecorder struct {
|
||||
type nodeConfigChangeCommitterRecorder struct {
|
||||
nodeRecorder
|
||||
readyc chan raft.Ready
|
||||
}
|
||||
|
||||
func newNodeCommitterRecorder() *nodeCommitterRecorder {
|
||||
func newNodeConfigChangeCommitterRecorder() *nodeConfigChangeCommitterRecorder {
|
||||
readyc := make(chan raft.Ready, 1)
|
||||
readyc <- raft.Ready{SoftState: &raft.SoftState{RaftState: raft.StateLeader}}
|
||||
return &nodeCommitterRecorder{readyc: readyc}
|
||||
return &nodeConfigChangeCommitterRecorder{readyc: readyc}
|
||||
}
|
||||
func (n *nodeCommitterRecorder) Propose(ctx context.Context, data []byte) error {
|
||||
n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Data: data}}}
|
||||
return n.nodeRecorder.Propose(ctx, data)
|
||||
}
|
||||
func (n *nodeCommitterRecorder) Configure(ctx context.Context, conf raftpb.Config) error {
|
||||
func (n *nodeConfigChangeCommitterRecorder) ProposeConfigChange(ctx context.Context, conf raftpb.ConfigChange) error {
|
||||
data, err := conf.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfig, Data: data}}}
|
||||
return n.nodeRecorder.Configure(ctx, conf)
|
||||
n.readyc <- raft.Ready{CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfigChange, Data: data}}}
|
||||
n.record("ProposeConfigChange:" + conf.Type.String())
|
||||
return nil
|
||||
}
|
||||
func (n *nodeCommitterRecorder) Ready() <-chan raft.Ready {
|
||||
func (n *nodeConfigChangeCommitterRecorder) Ready() <-chan raft.Ready {
|
||||
return n.readyc
|
||||
}
|
||||
func (n *nodeConfigChangeCommitterRecorder) ApplyConfigChange(conf raftpb.ConfigChange) {
|
||||
n.record("ApplyConfigChange:" + conf.Type.String())
|
||||
}
|
||||
|
21
raft/doc.go
21
raft/doc.go
@ -61,20 +61,19 @@ data, serialize it into a byte slice and call:
|
||||
|
||||
n.Propose(ctx, data)
|
||||
|
||||
To add or remove node in a cluster, build Config struct and call:
|
||||
If the proposal is committed, data will appear in committed entries with type
|
||||
raftpb.EntryNormal.
|
||||
|
||||
n.Configure(ctx, conf)
|
||||
To add or remove node in a cluster, build ConfigChange struct 'cc' and call:
|
||||
|
||||
After configuration is committed, you should apply it to node through:
|
||||
n.ProposeConfigChange(ctx, cc)
|
||||
|
||||
var conf raftpb.Config
|
||||
conf.Unmarshal(data)
|
||||
switch conf.Type {
|
||||
case raftpb.ConfigAddNode:
|
||||
n.AddNode(conf.ID)
|
||||
case raftpb.ConfigRemoveNode:
|
||||
n.RemoveNode(conf.ID)
|
||||
}
|
||||
After config change is committed, some committed entry with type
|
||||
raftpb.EntryConfigChange will be returned. You should apply it to node through:
|
||||
|
||||
var cc raftpb.ConfigChange
|
||||
cc.Unmarshal(data)
|
||||
n.ApplyConfigChange(cc)
|
||||
|
||||
*/
|
||||
package raft
|
||||
|
74
raft/node.go
74
raft/node.go
@ -80,23 +80,22 @@ type Node interface {
|
||||
Campaign(ctx context.Context) error
|
||||
// Propose proposes that data be appended to the log.
|
||||
Propose(ctx context.Context, data []byte) error
|
||||
// Configure proposes config change. At most one config can be in the process of going through consensus.
|
||||
// Application needs to call AddNode/RemoveNode when applying EntryConfig type entry.
|
||||
Configure(ctx context.Context, conf pb.Config) error
|
||||
// ProposeConfigChange proposes config change.
|
||||
// At most one ConfigChange can be in the process of going through consensus.
|
||||
// Application needs to call ApplyConfigChange when applying EntryConfigChange type entry.
|
||||
ProposeConfigChange(ctx context.Context, cc pb.ConfigChange) error
|
||||
// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
|
||||
Step(ctx context.Context, msg pb.Message) error
|
||||
// Ready returns a channel that returns the current point-in-time state
|
||||
Ready() <-chan Ready
|
||||
// ApplyConfigChange applies config change to the local node.
|
||||
// TODO: reject existing node when add node
|
||||
// TODO: reject non-existant node when remove node
|
||||
ApplyConfigChange(cc pb.ConfigChange)
|
||||
// Stop performs any necessary termination of the Node
|
||||
Stop()
|
||||
// Compact
|
||||
Compact(d []byte)
|
||||
// AddNode adds a node with given id into peer list.
|
||||
// TODO: reject existed node
|
||||
AddNode(id int64)
|
||||
// RemoveNode removes a node with give id from peer list.
|
||||
// TODO: reject unexisted node
|
||||
RemoveNode(id int64)
|
||||
}
|
||||
|
||||
// StartNode returns a new Node given a unique raft id, a list of raft peers, and
|
||||
@ -123,22 +122,12 @@ func RestartNode(id int64, peers []int64, election, heartbeat int, snapshot *pb.
|
||||
return &n
|
||||
}
|
||||
|
||||
const (
|
||||
confAdd = iota
|
||||
confRemove
|
||||
)
|
||||
|
||||
type conf struct {
|
||||
typ int
|
||||
id int64
|
||||
}
|
||||
|
||||
// node is the canonical implementation of the Node interface
|
||||
type node struct {
|
||||
propc chan pb.Message
|
||||
recvc chan pb.Message
|
||||
compactc chan []byte
|
||||
confc chan conf
|
||||
confc chan pb.ConfigChange
|
||||
readyc chan Ready
|
||||
tickc chan struct{}
|
||||
done chan struct{}
|
||||
@ -149,7 +138,7 @@ func newNode() node {
|
||||
propc: make(chan pb.Message),
|
||||
recvc: make(chan pb.Message),
|
||||
compactc: make(chan []byte),
|
||||
confc: make(chan conf),
|
||||
confc: make(chan pb.ConfigChange),
|
||||
readyc: make(chan Ready),
|
||||
tickc: make(chan struct{}),
|
||||
done: make(chan struct{}),
|
||||
@ -188,7 +177,9 @@ func (n *node) run(r *raft) {
|
||||
}
|
||||
|
||||
select {
|
||||
// TODO: buffer the config propose if there exists one
|
||||
// TODO: maybe buffer the config propose if there exists one (the way
|
||||
// described in raft dissertation)
|
||||
// Currently it is dropped in Step silently.
|
||||
case m := <-propc:
|
||||
m.From = r.id
|
||||
r.Step(m)
|
||||
@ -196,12 +187,12 @@ func (n *node) run(r *raft) {
|
||||
r.Step(m) // raft never returns an error
|
||||
case d := <-n.compactc:
|
||||
r.compact(d)
|
||||
case c := <-n.confc:
|
||||
switch c.typ {
|
||||
case confAdd:
|
||||
r.addNode(c.id)
|
||||
case confRemove:
|
||||
r.removeNode(c.id)
|
||||
case cc := <-n.confc:
|
||||
switch cc.Type {
|
||||
case pb.ConfigChangeAddNode:
|
||||
r.addNode(cc.NodeID)
|
||||
case pb.ConfigChangeRemoveNode:
|
||||
r.removeNode(cc.NodeID)
|
||||
default:
|
||||
panic("unexpected conf type")
|
||||
}
|
||||
@ -247,12 +238,12 @@ func (n *node) Propose(ctx context.Context, data []byte) error {
|
||||
return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Data: data}}})
|
||||
}
|
||||
|
||||
func (n *node) Configure(ctx context.Context, conf pb.Config) error {
|
||||
data, err := conf.Marshal()
|
||||
func (n *node) ProposeConfigChange(ctx context.Context, cc pb.ConfigChange) error {
|
||||
data, err := cc.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig, Data: data}}})
|
||||
return n.Step(ctx, pb.Message{Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfigChange, Data: data}}})
|
||||
}
|
||||
|
||||
// Step advances the state machine using msgs. The ctx.Err() will be returned,
|
||||
@ -277,6 +268,13 @@ func (n *node) Ready() <-chan Ready {
|
||||
return n.readyc
|
||||
}
|
||||
|
||||
func (n *node) ApplyConfigChange(cc pb.ConfigChange) {
|
||||
select {
|
||||
case n.confc <- cc:
|
||||
case <-n.done:
|
||||
}
|
||||
}
|
||||
|
||||
func (n *node) Compact(d []byte) {
|
||||
select {
|
||||
case n.compactc <- d:
|
||||
@ -284,20 +282,6 @@ func (n *node) Compact(d []byte) {
|
||||
}
|
||||
}
|
||||
|
||||
func (n *node) AddNode(id int64) {
|
||||
select {
|
||||
case n.confc <- conf{typ: confAdd, id: id}:
|
||||
case <-n.done:
|
||||
}
|
||||
}
|
||||
|
||||
func (n *node) RemoveNode(id int64) {
|
||||
select {
|
||||
case n.confc <- conf{typ: confRemove, id: id}:
|
||||
case <-n.done:
|
||||
}
|
||||
}
|
||||
|
||||
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState, prevSnapi int64) Ready {
|
||||
rd := Ready{
|
||||
Entries: r.raftLog.unstableEnts(),
|
||||
|
@ -313,7 +313,7 @@ func (r *raft) becomeLeader() {
|
||||
r.lead = r.id
|
||||
r.state = StateLeader
|
||||
for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
|
||||
if e.Type != pb.EntryConfig {
|
||||
if e.Type != pb.EntryConfigChange {
|
||||
continue
|
||||
}
|
||||
if r.pendingConf {
|
||||
@ -407,7 +407,7 @@ func stepLeader(r *raft, m pb.Message) {
|
||||
panic("unexpected length(entries) of a msgProp")
|
||||
}
|
||||
e := m.Entries[0]
|
||||
if e.Type == pb.EntryConfig {
|
||||
if e.Type == pb.EntryConfigChange {
|
||||
if r.pendingConf {
|
||||
return
|
||||
}
|
||||
|
@ -948,7 +948,7 @@ func TestSlowNodeRestore(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestStepConfig tests that when raft step msgProp in ConfigEntry type,
|
||||
// TestStepConfig tests that when raft step msgProp in EntryConfigChange type,
|
||||
// it appends the entry to log and sets pendingConf to be true.
|
||||
func TestStepConfig(t *testing.T) {
|
||||
// a raft that cannot make progress
|
||||
@ -956,7 +956,7 @@ func TestStepConfig(t *testing.T) {
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
index := r.raftLog.lastIndex()
|
||||
r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig}}})
|
||||
r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfigChange}}})
|
||||
if g := r.raftLog.lastIndex(); g != index+1 {
|
||||
t.Errorf("index = %d, want %d", g, index+1)
|
||||
}
|
||||
@ -966,17 +966,17 @@ func TestStepConfig(t *testing.T) {
|
||||
}
|
||||
|
||||
// TestStepIgnoreConfig tests that if raft step the second msgProp in
|
||||
// ConfigEntry type when the first one is uncommitted, the node will deny
|
||||
// EntryConfigChange type when the first one is uncommitted, the node will deny
|
||||
// the proposal and keep its original state.
|
||||
func TestStepIgnoreConfig(t *testing.T) {
|
||||
// a raft that cannot make progress
|
||||
r := newRaft(1, []int64{1, 2}, 0, 0)
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig}}})
|
||||
r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfigChange}}})
|
||||
index := r.raftLog.lastIndex()
|
||||
pendingConf := r.pendingConf
|
||||
r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfig}}})
|
||||
r.Step(pb.Message{From: 1, To: 1, Type: msgProp, Entries: []pb.Entry{{Type: pb.EntryConfigChange}}})
|
||||
if g := r.raftLog.lastIndex(); g != index {
|
||||
t.Errorf("index = %d, want %d", g, index)
|
||||
}
|
||||
@ -993,7 +993,7 @@ func TestRecoverPendingConfig(t *testing.T) {
|
||||
wpending bool
|
||||
}{
|
||||
{pb.EntryNormal, false},
|
||||
{pb.EntryConfig, true},
|
||||
{pb.EntryConfigChange, true},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
r := newRaft(1, []int64{1, 2}, 0, 0)
|
||||
@ -1016,8 +1016,8 @@ func TestRecoverDoublePendingConfig(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
r := newRaft(1, []int64{1, 2}, 0, 0)
|
||||
r.appendEntry(pb.Entry{Type: pb.EntryConfig})
|
||||
r.appendEntry(pb.Entry{Type: pb.EntryConfig})
|
||||
r.appendEntry(pb.Entry{Type: pb.EntryConfigChange})
|
||||
r.appendEntry(pb.Entry{Type: pb.EntryConfigChange})
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
}()
|
||||
|
@ -14,7 +14,7 @@
|
||||
Snapshot
|
||||
Message
|
||||
HardState
|
||||
Config
|
||||
ConfigChange
|
||||
*/
|
||||
package raftpb
|
||||
|
||||
@ -35,17 +35,17 @@ var _ = math.Inf
|
||||
type EntryType int32
|
||||
|
||||
const (
|
||||
EntryNormal EntryType = 0
|
||||
EntryConfig EntryType = 1
|
||||
EntryNormal EntryType = 0
|
||||
EntryConfigChange EntryType = 1
|
||||
)
|
||||
|
||||
var EntryType_name = map[int32]string{
|
||||
0: "EntryNormal",
|
||||
1: "EntryConfig",
|
||||
1: "EntryConfigChange",
|
||||
}
|
||||
var EntryType_value = map[string]int32{
|
||||
"EntryNormal": 0,
|
||||
"EntryConfig": 1,
|
||||
"EntryNormal": 0,
|
||||
"EntryConfigChange": 1,
|
||||
}
|
||||
|
||||
func (x EntryType) Enum() *EntryType {
|
||||
@ -65,36 +65,36 @@ func (x *EntryType) UnmarshalJSON(data []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type ConfigType int32
|
||||
type ConfigChangeType int32
|
||||
|
||||
const (
|
||||
ConfigAddNode ConfigType = 0
|
||||
ConfigRemoveNode ConfigType = 1
|
||||
ConfigChangeAddNode ConfigChangeType = 0
|
||||
ConfigChangeRemoveNode ConfigChangeType = 1
|
||||
)
|
||||
|
||||
var ConfigType_name = map[int32]string{
|
||||
0: "ConfigAddNode",
|
||||
1: "ConfigRemoveNode",
|
||||
var ConfigChangeType_name = map[int32]string{
|
||||
0: "ConfigChangeAddNode",
|
||||
1: "ConfigChangeRemoveNode",
|
||||
}
|
||||
var ConfigType_value = map[string]int32{
|
||||
"ConfigAddNode": 0,
|
||||
"ConfigRemoveNode": 1,
|
||||
var ConfigChangeType_value = map[string]int32{
|
||||
"ConfigChangeAddNode": 0,
|
||||
"ConfigChangeRemoveNode": 1,
|
||||
}
|
||||
|
||||
func (x ConfigType) Enum() *ConfigType {
|
||||
p := new(ConfigType)
|
||||
func (x ConfigChangeType) Enum() *ConfigChangeType {
|
||||
p := new(ConfigChangeType)
|
||||
*p = x
|
||||
return p
|
||||
}
|
||||
func (x ConfigType) String() string {
|
||||
return proto.EnumName(ConfigType_name, int32(x))
|
||||
func (x ConfigChangeType) String() string {
|
||||
return proto.EnumName(ConfigChangeType_name, int32(x))
|
||||
}
|
||||
func (x *ConfigType) UnmarshalJSON(data []byte) error {
|
||||
value, err := proto.UnmarshalJSONEnum(ConfigType_value, data, "ConfigType")
|
||||
func (x *ConfigChangeType) UnmarshalJSON(data []byte) error {
|
||||
value, err := proto.UnmarshalJSONEnum(ConfigChangeType_value, data, "ConfigChangeType")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*x = ConfigType(value)
|
||||
*x = ConfigChangeType(value)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -159,21 +159,21 @@ func (m *HardState) Reset() { *m = HardState{} }
|
||||
func (m *HardState) String() string { return proto.CompactTextString(m) }
|
||||
func (*HardState) ProtoMessage() {}
|
||||
|
||||
type Config struct {
|
||||
ID int64 `protobuf:"varint,1,req" json:"ID"`
|
||||
Type ConfigType `protobuf:"varint,2,req,enum=raftpb.ConfigType" json:"Type"`
|
||||
NodeID int64 `protobuf:"varint,3,req" json:"NodeID"`
|
||||
Context []byte `protobuf:"bytes,4,opt" json:"Context"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
type ConfigChange struct {
|
||||
ID int64 `protobuf:"varint,1,req" json:"ID"`
|
||||
Type ConfigChangeType `protobuf:"varint,2,req,enum=raftpb.ConfigChangeType" json:"Type"`
|
||||
NodeID int64 `protobuf:"varint,3,req" json:"NodeID"`
|
||||
Context []byte `protobuf:"bytes,4,opt" json:"Context"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Config) Reset() { *m = Config{} }
|
||||
func (m *Config) String() string { return proto.CompactTextString(m) }
|
||||
func (*Config) ProtoMessage() {}
|
||||
func (m *ConfigChange) Reset() { *m = ConfigChange{} }
|
||||
func (m *ConfigChange) String() string { return proto.CompactTextString(m) }
|
||||
func (*ConfigChange) ProtoMessage() {}
|
||||
|
||||
func init() {
|
||||
proto.RegisterEnum("raftpb.EntryType", EntryType_name, EntryType_value)
|
||||
proto.RegisterEnum("raftpb.ConfigType", ConfigType_name, ConfigType_value)
|
||||
proto.RegisterEnum("raftpb.ConfigChangeType", ConfigChangeType_name, ConfigChangeType_value)
|
||||
}
|
||||
func (m *Info) Unmarshal(data []byte) error {
|
||||
l := len(data)
|
||||
@ -733,7 +733,7 @@ func (m *HardState) Unmarshal(data []byte) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (m *Config) Unmarshal(data []byte) error {
|
||||
func (m *ConfigChange) Unmarshal(data []byte) error {
|
||||
l := len(data)
|
||||
index := 0
|
||||
for index < l {
|
||||
@ -777,7 +777,7 @@ func (m *Config) Unmarshal(data []byte) error {
|
||||
}
|
||||
b := data[index]
|
||||
index++
|
||||
m.Type |= (ConfigType(b) & 0x7F) << shift
|
||||
m.Type |= (ConfigChangeType(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
@ -915,7 +915,7 @@ func (m *HardState) Size() (n int) {
|
||||
}
|
||||
return n
|
||||
}
|
||||
func (m *Config) Size() (n int) {
|
||||
func (m *ConfigChange) Size() (n int) {
|
||||
var l int
|
||||
_ = l
|
||||
n += 1 + sovRaft(uint64(m.ID))
|
||||
@ -1131,7 +1131,7 @@ func (m *HardState) MarshalTo(data []byte) (n int, err error) {
|
||||
}
|
||||
return i, nil
|
||||
}
|
||||
func (m *Config) Marshal() (data []byte, err error) {
|
||||
func (m *ConfigChange) Marshal() (data []byte, err error) {
|
||||
size := m.Size()
|
||||
data = make([]byte, size)
|
||||
n, err := m.MarshalTo(data)
|
||||
@ -1141,7 +1141,7 @@ func (m *Config) Marshal() (data []byte, err error) {
|
||||
return data[:n], nil
|
||||
}
|
||||
|
||||
func (m *Config) MarshalTo(data []byte) (n int, err error) {
|
||||
func (m *ConfigChange) MarshalTo(data []byte) (n int, err error) {
|
||||
var i int
|
||||
_ = i
|
||||
var l int
|
||||
|
@ -13,8 +13,8 @@ message Info {
|
||||
}
|
||||
|
||||
enum EntryType {
|
||||
EntryNormal = 0;
|
||||
EntryConfig = 1;
|
||||
EntryNormal = 0;
|
||||
EntryConfigChange = 1;
|
||||
}
|
||||
|
||||
message Entry {
|
||||
@ -49,14 +49,14 @@ message HardState {
|
||||
required int64 commit = 3 [(gogoproto.nullable) = false];
|
||||
}
|
||||
|
||||
enum ConfigType {
|
||||
ConfigAddNode = 0;
|
||||
ConfigRemoveNode = 1;
|
||||
enum ConfigChangeType {
|
||||
ConfigChangeAddNode = 0;
|
||||
ConfigChangeRemoveNode = 1;
|
||||
}
|
||||
|
||||
message Config {
|
||||
required int64 ID = 1 [(gogoproto.nullable) = false];
|
||||
required ConfigType Type = 2 [(gogoproto.nullable) = false];
|
||||
required int64 NodeID = 3 [(gogoproto.nullable) = false];
|
||||
optional bytes Context = 4 [(gogoproto.nullable) = false];
|
||||
message ConfigChange {
|
||||
required int64 ID = 1 [(gogoproto.nullable) = false];
|
||||
required ConfigChangeType Type = 2 [(gogoproto.nullable) = false];
|
||||
required int64 NodeID = 3 [(gogoproto.nullable) = false];
|
||||
optional bytes Context = 4 [(gogoproto.nullable) = false];
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user