diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index da9cc6da0..f1f74fea9 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -49,7 +49,6 @@ import ( ) func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { - st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) if cfg.MaxRequestBytes > recommendedMaxRequestBytes { cfg.Logger.Warn( @@ -65,60 +64,118 @@ func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) { return nil, fmt.Errorf("cannot access data directory: %v", terr) } - haveWAL := wal.Exist(cfg.WALDir()) - ss := bootstrapSnapshot(cfg) - - be, ci, beExist, beHooks, err := bootstrapBackend(cfg) - if err != nil { - return nil, err + if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil { + return nil, fmt.Errorf("cannot access member directory: %v", terr) } - defer func() { - if err != nil { - be.Close() - } - }() - + ss := bootstrapSnapshot(cfg) prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout()) if err != nil { return nil, err } - switch { - case !haveWAL && !cfg.NewCluster: - b, err = bootstrapExistingClusterNoWAL(cfg, prt, st, be) - case !haveWAL && cfg.NewCluster: - b, err = bootstrapNewClusterNoWAL(cfg, prt, st, be) - case haveWAL: - b, err = bootstrapWithWAL(cfg, st, be, ss, beExist, beHooks, ci) - default: - be.Close() - return nil, fmt.Errorf("unsupported bootstrap config") - } + haveWAL := wal.Exist(cfg.WALDir()) + st := v2store.New(StoreClusterPrefix, StoreKeysPrefix) + backend, err := bootstrapBackend(cfg, haveWAL, st, ss) if err != nil { return nil, err } + var ( + bwal *bootstrappedWAL + ) - if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil { - return nil, fmt.Errorf("cannot access member directory: %v", terr) + if haveWAL { + if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil { + return nil, fmt.Errorf("cannot write to WAL directory: %v", err) + } + bwal = bootstrapWALFromSnapshot(cfg, backend.snapshot) } - b.prt = prt - b.ci = ci - b.st = st - b.be = be - b.ss = ss - b.beHooks = beHooks - return b, nil + + cluster, err := bootstrapCluster(cfg, bwal, prt) + if err != nil { + backend.Close() + return nil, err + } + + s, err := bootstrapStorage(cfg, st, backend, bwal, cluster) + if err != nil { + backend.Close() + return nil, err + } + + err = cluster.Finalize(cfg, s) + if err != nil { + backend.Close() + return nil, err + } + raft := bootstrapRaft(cfg, cluster, s.wal) + return &bootstrappedServer{ + prt: prt, + ss: ss, + storage: s, + cluster: cluster, + raft: raft, + }, nil } type bootstrappedServer struct { + storage *bootstrappedStorage + cluster *bootstrapedCluster raft *bootstrappedRaft - remotes []*membership.Member prt http.RoundTripper - ci cindex.ConsistentIndexer - st v2store.Store - be backend.Backend ss *snap.Snapshotter - beHooks *serverstorage.BackendHooks +} + +func (s *bootstrappedServer) Close() { + s.storage.Close() +} + +type bootstrappedStorage struct { + backend *bootstrappedBackend + wal *bootstrappedWAL + st v2store.Store +} + +func (s *bootstrappedStorage) Close() { + s.backend.Close() +} + +type bootstrappedBackend struct { + beHooks *serverstorage.BackendHooks + be backend.Backend + ci cindex.ConsistentIndexer + beExist bool + snapshot *raftpb.Snapshot +} + +func (s *bootstrappedBackend) Close() { + s.be.Close() +} + +type bootstrapedCluster struct { + remotes []*membership.Member + cl *membership.RaftCluster + nodeID types.ID +} + +type bootstrappedRaft struct { + lg *zap.Logger + heartbeat time.Duration + + peers []raft.Peer + config *raft.Config + storage *raft.MemoryStorage +} + +func bootstrapStorage(cfg config.ServerConfig, st v2store.Store, be *bootstrappedBackend, wal *bootstrappedWAL, cl *bootstrapedCluster) (b *bootstrappedStorage, err error) { + if wal == nil { + wal = bootstrapNewWAL(cfg, cl) + } + + return &bootstrappedStorage{ + backend: be, + st: st, + wal: wal, + }, nil } func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter { @@ -142,11 +199,11 @@ func bootstrapSnapshot(cfg config.ServerConfig) *snap.Snapshotter { return snap.New(cfg.Logger, cfg.SnapDir()) } -func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.ConsistentIndexer, beExist bool, beHooks *serverstorage.BackendHooks, err error) { - beExist = fileutil.Exist(cfg.BackendPath()) - ci = cindex.NewConsistentIndex(nil) - beHooks = serverstorage.NewBackendHooks(cfg.Logger, ci) - be = serverstorage.OpenBackend(cfg, beHooks) +func bootstrapBackend(cfg config.ServerConfig, haveWAL bool, st v2store.Store, ss *snap.Snapshotter) (backend *bootstrappedBackend, err error) { + beExist := fileutil.Exist(cfg.BackendPath()) + ci := cindex.NewConsistentIndex(nil) + beHooks := serverstorage.NewBackendHooks(cfg.Logger, ci) + be := serverstorage.OpenBackend(cfg, beHooks) defer func() { if err != nil && be != nil { be.Close() @@ -157,20 +214,35 @@ func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.Co if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 { err = maybeDefragBackend(cfg, be) if err != nil { - return nil, nil, false, nil, err + return nil, err } } cfg.Logger.Debug("restore consistentIndex", zap.Uint64("index", ci.ConsistentIndex())) // TODO(serathius): Implement schema setup in fresh storage + var ( + snapshot *raftpb.Snapshot + ) + if haveWAL { + snapshot, be, err = recoverSnapshot(cfg, st, be, beExist, beHooks, ci, ss) + if err != nil { + return nil, err + } + } if beExist { err = schema.Validate(cfg.Logger, be.BatchTx()) if err != nil { cfg.Logger.Error("Failed to validate schema", zap.Error(err)) - return nil, nil, false, nil, err + return nil, err } } - return be, ci, beExist, beHooks, nil + return &bootstrappedBackend{ + beHooks: beHooks, + be: be, + ci: ci, + beExist: beExist, + snapshot: snapshot, + }, nil } func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error { @@ -192,7 +264,24 @@ func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error { return be.Defrag() } -func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrappedServer, error) { +func bootstrapCluster(cfg config.ServerConfig, bwal *bootstrappedWAL, prt http.RoundTripper) (c *bootstrapedCluster, err error) { + switch { + case bwal == nil && !cfg.NewCluster: + c, err = bootstrapExistingClusterNoWAL(cfg, prt) + case bwal == nil && cfg.NewCluster: + c, err = bootstrapNewClusterNoWAL(cfg, prt) + case bwal != nil && bwal.haveWAL: + c, err = bootstrapClusterWithWAL(cfg, bwal.meta) + default: + return nil, fmt.Errorf("unsupported bootstrap config") + } + if err != nil { + return nil, err + } + return c, nil +} + +func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper) (*bootstrapedCluster, error) { if err := cfg.VerifyJoinExisting(); err != nil { return nil, err } @@ -213,17 +302,15 @@ func bootstrapExistingClusterNoWAL(cfg config.ServerConfig, prt http.RoundTrippe remotes := existingCluster.Members() cl.SetID(types.ID(0), existingCluster.ID()) - cl.SetStore(st) - cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) - br := bootstrapRaftFromCluster(cfg, cl, nil) - cl.SetID(br.wal.id, existingCluster.ID()) - return &bootstrappedServer{ - raft: br, + member := cl.MemberByName(cfg.Name) + return &bootstrapedCluster{ remotes: remotes, + cl: cl, + nodeID: member.ID, }, nil } -func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st v2store.Store, be backend.Backend) (*bootstrappedServer, error) { +func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper) (*bootstrapedCluster, error) { if err := cfg.VerifyBootstrap(); err != nil { return nil, err } @@ -253,42 +340,43 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper, st return nil, err } } - cl.SetStore(st) - cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) - br := bootstrapRaftFromCluster(cfg, cl, cl.MemberIDs()) - cl.SetID(br.wal.id, cl.ID()) - return &bootstrappedServer{ + return &bootstrapedCluster{ remotes: nil, - raft: br, + cl: cl, + nodeID: m.ID, }, nil } -func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Backend, ss *snap.Snapshotter, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer) (*bootstrappedServer, error) { +func bootstrapClusterWithWAL(cfg config.ServerConfig, meta *snapshotMetadata) (*bootstrapedCluster, error) { if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { return nil, fmt.Errorf("cannot write to member directory: %v", err) } - if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil { - return nil, fmt.Errorf("cannot write to WAL directory: %v", err) - } - if cfg.ShouldDiscover() { cfg.Logger.Warn( "discovery token is ignored since cluster already initialized; valid logs are found", zap.String("wal-dir", cfg.WALDir()), ) } + cl := membership.NewCluster(cfg.Logger) + cl.SetID(meta.nodeID, meta.clusterID) + return &bootstrapedCluster{ + cl: cl, + nodeID: meta.nodeID, + }, nil +} +func recoverSnapshot(cfg config.ServerConfig, st v2store.Store, be backend.Backend, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer, ss *snap.Snapshotter) (*raftpb.Snapshot, backend.Backend, error) { // Find a snapshot to start/restart a raft node walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir()) if err != nil { - return nil, err + return nil, be, err } // snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding - // wal log entries + // bwal log entries snapshot, err := ss.LoadNewestAvailable(walSnaps) if err != nil && err != snap.ErrNoSnapshot { - return nil, err + return nil, be, err } if snapshot != nil { @@ -298,7 +386,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil { cfg.Logger.Error("illegal v2store content", zap.Error(err)) - return nil, err + return nil, be, err } cfg.Logger.Info( @@ -324,7 +412,7 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back kvindex := ci.ConsistentIndex() if kvindex < snapshot.Metadata.Index { if kvindex != 0 { - return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index) + return nil, be, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index) } cfg.Logger.Warn( "consistent index was never saved", @@ -335,29 +423,47 @@ func bootstrapWithWAL(cfg config.ServerConfig, st v2store.Store, be backend.Back } else { cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!") } - - r := &bootstrappedServer{} - if !cfg.ForceNewCluster { - r.raft = bootstrapRaftFromWal(cfg, snapshot) - } else { - r.raft = bootstrapRaftFromWalStandalone(cfg, snapshot) - } - - r.raft.cl.SetStore(st) - r.raft.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, be)) - r.raft.cl.Recover(api.UpdateCapability) - if r.raft.cl.Version() != nil && !r.raft.cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { - bepath := cfg.BackendPath() - os.RemoveAll(bepath) - return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath) - } - return r, nil + return snapshot, be, nil } -func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) *bootstrappedRaft { +func (c *bootstrapedCluster) Finalize(cfg config.ServerConfig, s *bootstrappedStorage) error { + if !s.wal.haveWAL { + c.cl.SetID(c.nodeID, c.cl.ID()) + } + c.cl.SetStore(s.st) + c.cl.SetBackend(schema.NewMembershipBackend(cfg.Logger, s.backend.be)) + if s.wal.haveWAL { + c.cl.Recover(api.UpdateCapability) + if c.databaseFileMissing(s) { + bepath := cfg.BackendPath() + os.RemoveAll(bepath) + return fmt.Errorf("database file (%v) of the backend is missing", bepath) + } + } + return nil +} + +func (c *bootstrapedCluster) databaseFileMissing(s *bootstrappedStorage) bool { + v3Cluster := c.cl.Version() != nil && !c.cl.Version().LessThan(semver.Version{Major: 3}) + return v3Cluster && !s.backend.beExist +} + +func bootstrapRaft(cfg config.ServerConfig, cluster *bootstrapedCluster, bwal *bootstrappedWAL) *bootstrappedRaft { + switch { + case !bwal.haveWAL && !cfg.NewCluster: + return bootstrapRaftFromCluster(cfg, cluster.cl, nil, bwal) + case !bwal.haveWAL && cfg.NewCluster: + return bootstrapRaftFromCluster(cfg, cluster.cl, cluster.cl.MemberIDs(), bwal) + case bwal.haveWAL: + return bootstrapRaftFromWAL(cfg, bwal) + default: + cfg.Logger.Panic("unsupported bootstrap config") + return nil + } +} + +func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID, bwal *bootstrappedWAL) *bootstrappedRaft { member := cl.MemberByName(cfg.Name) - id := member.ID - wal := bootstrapNewWAL(cfg, id, cl.ID()) peers := make([]raft.Peer, len(ids)) for i, id := range ids { var ctx []byte @@ -369,69 +475,26 @@ func bootstrapRaftFromCluster(cfg config.ServerConfig, cl *membership.RaftCluste } cfg.Logger.Info( "starting local member", - zap.String("local-member-id", id.String()), + zap.String("local-member-id", member.ID.String()), zap.String("cluster-id", cl.ID().String()), ) - s := wal.MemoryStorage() + s := bwal.MemoryStorage() return &bootstrappedRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, - cl: cl, - config: raftConfig(cfg, uint64(wal.id), s), + config: raftConfig(cfg, uint64(member.ID), s), peers: peers, storage: s, - wal: wal, } } -func bootstrapRaftFromWal(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedRaft { - wal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) - - cfg.Logger.Info( - "restarting local member", - zap.String("cluster-id", wal.cid.String()), - zap.String("local-member-id", wal.id.String()), - zap.Uint64("commit-index", wal.st.Commit), - ) - cl := membership.NewCluster(cfg.Logger) - cl.SetID(wal.id, wal.cid) - s := wal.MemoryStorage() +func bootstrapRaftFromWAL(cfg config.ServerConfig, bwal *bootstrappedWAL) *bootstrappedRaft { + s := bwal.MemoryStorage() return &bootstrappedRaft{ lg: cfg.Logger, heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, - cl: cl, - config: raftConfig(cfg, uint64(wal.id), s), + config: raftConfig(cfg, uint64(bwal.meta.nodeID), s), storage: s, - wal: wal, - } -} - -func bootstrapRaftFromWalStandalone(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedRaft { - wal := bootstrapWALFromSnapshot(cfg.Logger, cfg.WALDir(), snapshot, cfg.UnsafeNoFsync) - - // discard the previously uncommitted entries - wal.ents = wal.CommitedEntries() - entries := wal.ConfigChangeEntries() - // force commit config change entries - wal.AppendAndCommitEntries(entries) - - cfg.Logger.Info( - "forcing restart member", - zap.String("cluster-id", wal.cid.String()), - zap.String("local-member-id", wal.id.String()), - zap.Uint64("commit-index", wal.st.Commit), - ) - - cl := membership.NewCluster(cfg.Logger) - cl.SetID(wal.id, wal.cid) - s := wal.MemoryStorage() - return &bootstrappedRaft{ - lg: cfg.Logger, - heartbeat: time.Duration(cfg.TickMs) * time.Millisecond, - cl: cl, - config: raftConfig(cfg, uint64(wal.id), s), - storage: s, - wal: wal, } } @@ -449,18 +512,7 @@ func raftConfig(cfg config.ServerConfig, id uint64, s *raft.MemoryStorage) *raft } } -type bootstrappedRaft struct { - lg *zap.Logger - heartbeat time.Duration - - peers []raft.Peer - config *raft.Config - cl *membership.RaftCluster - storage *raft.MemoryStorage - wal *bootstrappedWAL -} - -func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter) *raftNode { +func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL, cl *membership.RaftCluster) *raftNode { var n raft.Node if len(b.peers) == 0 { n = raft.RestartNode(b.config) @@ -473,30 +525,65 @@ func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter) *raftNode { return newRaftNode( raftNodeConfig{ lg: b.lg, - isIDRemoved: func(id uint64) bool { return b.cl.IsIDRemoved(types.ID(id)) }, + isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, Node: n, heartbeat: b.heartbeat, raftStorage: b.storage, - storage: NewStorage(b.wal.w, ss), + storage: NewStorage(wal, ss), }, ) } -// bootstrapWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear +func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) *bootstrappedWAL { + wal, st, ents, snap, meta := openWALFromSnapshot(cfg, snapshot) + bwal := &bootstrappedWAL{ + lg: cfg.Logger, + w: wal, + st: st, + ents: ents, + snapshot: snap, + meta: meta, + haveWAL: true, + } + + if cfg.ForceNewCluster { + // discard the previously uncommitted entries + bwal.ents = bwal.CommitedEntries() + entries := bwal.NewConfigChangeEntries() + // force commit config change entries + bwal.AppendAndCommitEntries(entries) + cfg.Logger.Info( + "forcing restart member", + zap.String("cluster-id", meta.clusterID.String()), + zap.String("local-member-id", meta.nodeID.String()), + zap.Uint64("commit-index", bwal.st.Commit), + ) + } else { + cfg.Logger.Info( + "restarting local member", + zap.String("cluster-id", meta.clusterID.String()), + zap.String("local-member-id", meta.nodeID.String()), + zap.Uint64("commit-index", bwal.st.Commit), + ) + } + return bwal +} + +// openWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear // after the position of the given snap in the WAL. // The snap must have been previously saved to the WAL, or this call will panic. -func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot, unsafeNoFsync bool) *bootstrappedWAL { +func openWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (*wal.WAL, *raftpb.HardState, []raftpb.Entry, *raftpb.Snapshot, *snapshotMetadata) { var walsnap walpb.Snapshot if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term } repaired := false for { - w, err := wal.Open(lg, waldir, walsnap) + w, err := wal.Open(cfg.Logger, cfg.WALDir(), walsnap) if err != nil { - lg.Fatal("failed to open WAL", zap.Error(err)) + cfg.Logger.Fatal("failed to open WAL", zap.Error(err)) } - if unsafeNoFsync { + if cfg.UnsafeNoFsync { w.SetUnsafeNoFsync() } wmetadata, st, ents, err := w.ReadAll() @@ -504,12 +591,12 @@ func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Sn w.Close() // we can only repair ErrUnexpectedEOF and we never repair twice. if repaired || err != io.ErrUnexpectedEOF { - lg.Fatal("failed to read WAL, cannot be repaired", zap.Error(err)) + cfg.Logger.Fatal("failed to read WAL, cannot be repaired", zap.Error(err)) } - if !wal.Repair(lg, waldir) { - lg.Fatal("failed to repair WAL", zap.Error(err)) + if !wal.Repair(cfg.Logger, cfg.WALDir()) { + cfg.Logger.Fatal("failed to repair WAL", zap.Error(err)) } else { - lg.Info("repaired WAL", zap.Error(err)) + cfg.Logger.Info("repaired WAL", zap.Error(err)) repaired = true } continue @@ -518,23 +605,20 @@ func bootstrapWALFromSnapshot(lg *zap.Logger, waldir string, snapshot *raftpb.Sn pbutil.MustUnmarshal(&metadata, wmetadata) id := types.ID(metadata.NodeID) cid := types.ID(metadata.ClusterID) - return &bootstrappedWAL{ - lg: lg, - w: w, - id: id, - cid: cid, - st: &st, - ents: ents, - snapshot: snapshot, - } + meta := &snapshotMetadata{clusterID: cid, nodeID: id} + return w, &st, ents, snapshot, meta } } -func bootstrapNewWAL(cfg config.ServerConfig, nodeID, clusterID types.ID) *bootstrappedWAL { +type snapshotMetadata struct { + nodeID, clusterID types.ID +} + +func bootstrapNewWAL(cfg config.ServerConfig, cl *bootstrapedCluster) *bootstrappedWAL { metadata := pbutil.MustMarshal( &etcdserverpb.Metadata{ - NodeID: uint64(nodeID), - ClusterID: uint64(clusterID), + NodeID: uint64(cl.nodeID), + ClusterID: uint64(cl.cl.ID()), }, ) w, err := wal.Create(cfg.Logger, cfg.WALDir(), metadata) @@ -545,21 +629,20 @@ func bootstrapNewWAL(cfg config.ServerConfig, nodeID, clusterID types.ID) *boots w.SetUnsafeNoFsync() } return &bootstrappedWAL{ - lg: cfg.Logger, - w: w, - id: nodeID, - cid: clusterID, + lg: cfg.Logger, + w: w, } } type bootstrappedWAL struct { lg *zap.Logger + haveWAL bool w *wal.WAL - id, cid types.ID st *raftpb.HardState ents []raftpb.Entry snapshot *raftpb.Snapshot + meta *snapshotMetadata } func (wal *bootstrappedWAL) MemoryStorage() *raft.MemoryStorage { @@ -591,11 +674,11 @@ func (wal *bootstrappedWAL) CommitedEntries() []raftpb.Entry { return wal.ents } -func (wal *bootstrappedWAL) ConfigChangeEntries() []raftpb.Entry { +func (wal *bootstrappedWAL) NewConfigChangeEntries() []raftpb.Entry { return serverstorage.CreateConfigChangeEnts( wal.lg, - serverstorage.GetIDs(wal.lg, wal.snapshot, wal.ents), - uint64(wal.id), + serverstorage.GetEffectiveNodeIDsFromWalEntries(wal.lg, wal.snapshot, wal.ents), + uint64(wal.meta.nodeID), wal.st.Term, wal.st.Commit, ) diff --git a/server/etcdserver/raft_test.go b/server/etcdserver/raft_test.go index 49de844b5..f552f8180 100644 --- a/server/etcdserver/raft_test.go +++ b/server/etcdserver/raft_test.go @@ -67,7 +67,7 @@ func TestGetIDs(t *testing.T) { if tt.confState != nil { snap.Metadata.ConfState = *tt.confState } - idSet := serverstorage.GetIDs(testLogger, &snap, tt.ents) + idSet := serverstorage.GetEffectiveNodeIDsFromWalEntries(testLogger, &snap, tt.ents) if !reflect.DeepEqual(idSet, tt.widSet) { t.Errorf("#%d: idset = %#v, want %#v", i, idSet, tt.widSet) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 4a9d55efa..310f436e5 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -304,12 +304,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { defer func() { if err != nil { - b.be.Close() + b.Close() } }() - sstats := stats.NewServerStats(cfg.Name, b.raft.wal.id.String()) - lstats := stats.NewLeaderStats(cfg.Logger, b.raft.wal.id.String()) + sstats := stats.NewServerStats(cfg.Name, b.cluster.cl.String()) + lstats := stats.NewLeaderStats(cfg.Logger, b.cluster.nodeID.String()) heartbeat := time.Duration(cfg.TickMs) * time.Millisecond srv = &EtcdServer{ @@ -318,28 +318,28 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { lgMu: new(sync.RWMutex), lg: cfg.Logger, errorc: make(chan error, 1), - v2store: b.st, + v2store: b.storage.st, snapshotter: b.ss, - r: *b.raft.newRaftNode(b.ss), - id: b.raft.wal.id, + r: *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl), + id: b.cluster.nodeID, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, - cluster: b.raft.cl, + cluster: b.cluster.cl, stats: sstats, lstats: lstats, SyncTicker: time.NewTicker(500 * time.Millisecond), peerRt: b.prt, - reqIDGen: idutil.NewGenerator(uint16(b.raft.wal.id), time.Now()), + reqIDGen: idutil.NewGenerator(uint16(b.cluster.nodeID), time.Now()), AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, - consistIndex: b.ci, + consistIndex: b.storage.backend.ci, firstCommitInTerm: notify.NewNotifier(), clusterVersionChanged: notify.NewNotifier(), } - serverID.With(prometheus.Labels{"server_id": b.raft.wal.id.String()}).Set(1) + serverID.With(prometheus.Labels{"server_id": b.cluster.nodeID.String()}).Set(1) srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged) srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) - srv.be = b.be - srv.beHooks = b.beHooks + srv.be = b.storage.backend.be + srv.beHooks = b.storage.backend.beHooks minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. @@ -403,9 +403,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { Logger: cfg.Logger, TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(), - ID: b.raft.wal.id, + ID: b.cluster.nodeID, URLs: cfg.PeerURLs, - ClusterID: b.raft.cl.ID(), + ClusterID: b.cluster.cl.ID(), Raft: srv, Snapshotter: b.ss, ServerStats: sstats, @@ -416,13 +416,13 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { return nil, err } // add all remotes into transport - for _, m := range b.remotes { - if m.ID != b.raft.wal.id { + for _, m := range b.cluster.remotes { + if m.ID != b.cluster.nodeID { tr.AddRemote(m.ID, m.PeerURLs) } } - for _, m := range b.raft.cl.Members() { - if m.ID != b.raft.wal.id { + for _, m := range b.cluster.cl.Members() { + if m.ID != b.cluster.nodeID { tr.AddPeer(m.ID, m.PeerURLs) } } diff --git a/server/storage/util.go b/server/storage/util.go index bdac72ec1..252e74f92 100644 --- a/server/storage/util.go +++ b/server/storage/util.go @@ -109,13 +109,13 @@ func CreateConfigChangeEnts(lg *zap.Logger, ids []uint64, self uint64, term, ind return ents } -// GetIDs returns an ordered set of IDs included in the given snapshot and +// GetEffectiveNodeIDsFromWalEntries returns an ordered set of IDs included in the given snapshot and // the entries. The given snapshot/entries can contain three kinds of // ID-related entry: // - ConfChangeAddNode, in which case the contained ID will Be added into the set. // - ConfChangeRemoveNode, in which case the contained ID will Be removed from the set. // - ConfChangeAddLearnerNode, in which the contained ID will Be added into the set. -func GetIDs(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 { +func GetEffectiveNodeIDsFromWalEntries(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 { ids := make(map[uint64]bool) if snap != nil { for _, id := range snap.Metadata.ConfState.Voters {