mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: recover cluster when receiving newer snapshot
This commit is contained in:
parent
dfaa7290c4
commit
77433ff6da
@ -267,6 +267,10 @@ func (c *Cluster) SetID(id types.ID) { c.id = id }
|
|||||||
|
|
||||||
func (c *Cluster) SetStore(st store.Store) { c.store = st }
|
func (c *Cluster) SetStore(st store.Store) { c.store = st }
|
||||||
|
|
||||||
|
func (c *Cluster) Recover() {
|
||||||
|
c.members, c.removed = membersFromStore(c.store)
|
||||||
|
}
|
||||||
|
|
||||||
// ValidateConfigurationChange takes a proposed ConfChange and
|
// ValidateConfigurationChange takes a proposed ConfChange and
|
||||||
// ensures that it is still valid.
|
// ensures that it is still valid.
|
||||||
func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
|
func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
|
||||||
|
@ -354,6 +354,7 @@ func (s *EtcdServer) run() {
|
|||||||
if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
|
if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
|
||||||
log.Panicf("recovery store error: %v", err)
|
log.Panicf("recovery store error: %v", err)
|
||||||
}
|
}
|
||||||
|
s.Cluster.Recover()
|
||||||
appliedi = rd.Snapshot.Index
|
appliedi = rd.Snapshot.Index
|
||||||
}
|
}
|
||||||
// TODO(bmizerany): do this in the background, but take
|
// TODO(bmizerany): do this in the background, but take
|
||||||
|
@ -901,11 +901,14 @@ func TestRecvSnapshot(t *testing.T) {
|
|||||||
n := newReadyNode()
|
n := newReadyNode()
|
||||||
st := &storeRecorder{}
|
st := &storeRecorder{}
|
||||||
p := &storageRecorder{}
|
p := &storageRecorder{}
|
||||||
|
cl := newCluster("abc")
|
||||||
|
cl.SetStore(store.New())
|
||||||
s := &EtcdServer{
|
s := &EtcdServer{
|
||||||
store: st,
|
store: st,
|
||||||
sender: &nopSender{},
|
sender: &nopSender{},
|
||||||
storage: p,
|
storage: p,
|
||||||
node: n,
|
node: n,
|
||||||
|
Cluster: cl,
|
||||||
}
|
}
|
||||||
|
|
||||||
s.start()
|
s.start()
|
||||||
@ -930,11 +933,14 @@ func TestRecvSnapshot(t *testing.T) {
|
|||||||
func TestRecvSlowSnapshot(t *testing.T) {
|
func TestRecvSlowSnapshot(t *testing.T) {
|
||||||
n := newReadyNode()
|
n := newReadyNode()
|
||||||
st := &storeRecorder{}
|
st := &storeRecorder{}
|
||||||
|
cl := newCluster("abc")
|
||||||
|
cl.SetStore(store.New())
|
||||||
s := &EtcdServer{
|
s := &EtcdServer{
|
||||||
store: st,
|
store: st,
|
||||||
sender: &nopSender{},
|
sender: &nopSender{},
|
||||||
storage: &storageRecorder{},
|
storage: &storageRecorder{},
|
||||||
node: n,
|
node: n,
|
||||||
|
Cluster: cl,
|
||||||
}
|
}
|
||||||
|
|
||||||
s.start()
|
s.start()
|
||||||
@ -958,11 +964,14 @@ func TestRecvSlowSnapshot(t *testing.T) {
|
|||||||
func TestApplySnapshotAndCommittedEntries(t *testing.T) {
|
func TestApplySnapshotAndCommittedEntries(t *testing.T) {
|
||||||
n := newReadyNode()
|
n := newReadyNode()
|
||||||
st := &storeRecorder{}
|
st := &storeRecorder{}
|
||||||
|
cl := newCluster("abc")
|
||||||
|
cl.SetStore(store.New())
|
||||||
s := &EtcdServer{
|
s := &EtcdServer{
|
||||||
store: st,
|
store: st,
|
||||||
sender: &nopSender{},
|
sender: &nopSender{},
|
||||||
storage: &storageRecorder{},
|
storage: &storageRecorder{},
|
||||||
node: n,
|
node: n,
|
||||||
|
Cluster: cl,
|
||||||
}
|
}
|
||||||
|
|
||||||
s.start()
|
s.start()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user