From 9614dc6e7139bca13122290d48d9b1f4679ad91f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 27 Jun 2016 10:27:27 -0700 Subject: [PATCH] etcdserver: check index of the kv when restarting --- etcdserver/server.go | 24 +++++++++++++++++++----- mvcc/kvstore.go | 11 ++++++++++- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 11c58b33f..08b76a62e 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -261,7 +261,11 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { if err != nil { return nil, err } - var remotes []*membership.Member + var ( + remotes []*membership.Member + snapshot *raftpb.Snapshot + ) + switch { case !haveWAL && !cfg.NewCluster: if err = cfg.VerifyJoinExisting(); err != nil { @@ -334,7 +338,6 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { if cfg.ShouldDiscover() { plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir()) } - var snapshot *raftpb.Snapshot snapshot, err = ss.Load() if err != nil && err != snap.ErrNoSnapshot { return nil, err @@ -402,6 +405,12 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { srv.be = be srv.lessor = lease.NewLessor(srv.be) srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex) + if beExist { + kvindex := srv.kv.ConsistentIndex() + if snapshot != nil && kvindex < snapshot.Metadata.Index { + return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d).", bepath, kvindex, snapshot.Metadata.Index) + } + } srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex()) srv.authStore = auth.NewAuthStore(srv.be) if h := cfg.AutoCompactionRetention; h != 0 { @@ -1033,6 +1042,13 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint // applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { + shouldApplyV3 := false + if e.Index > s.consistIndex.ConsistentIndex() { + // set the consistent index of current executing entry + s.consistIndex.setConsistentIndex(e.Index) + shouldApplyV3 = true + } + // raft state machine may generate noop entry when leader confirmation. // skip it in advance to avoid some potential bug in the future if len(e.Data) == 0 { @@ -1057,7 +1073,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { } // do not re-apply applied entries. - if e.Index <= s.consistIndex.ConsistentIndex() { + if !shouldApplyV3 { return } @@ -1066,8 +1082,6 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { id = raftReq.Header.ID } - // set the consistent index of current executing entry - s.consistIndex.setConsistentIndex(e.Index) ar := s.applyV3.Apply(&raftReq) s.setAppliedIndex(e.Index) if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 { diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 6712910ed..4cc52d2b6 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -325,7 +325,16 @@ func (s *store) Hash() (uint32, int64, error) { return h, rev, err } -func (s *store) Commit() { s.b.ForceCommit() } +func (s *store) Commit() { + s.mu.Lock() + defer s.mu.Unlock() + + s.tx = s.b.BatchTx() + s.tx.Lock() + s.saveIndex() + s.tx.Unlock() + s.b.ForceCommit() +} func (s *store) Restore(b backend.Backend) error { s.mu.Lock()