diff --git a/etcdserver/membership/cluster.go b/etcdserver/membership/cluster.go index bf29c7360..6bb047235 100644 --- a/etcdserver/membership/cluster.go +++ b/etcdserver/membership/cluster.go @@ -195,6 +195,11 @@ func (c *RaftCluster) SetID(id types.ID) { c.id = id } func (c *RaftCluster) SetStore(st store.Store) { c.store = st } +func (c *RaftCluster) SetBackend(be backend.Backend) { + c.be = be + mustCreateBackendMemberBucket(c.be) +} + func (c *RaftCluster) Recover() { c.Lock() defer c.Unlock() diff --git a/etcdserver/membership/store.go b/etcdserver/membership/store.go index b153c2024..d253fb036 100644 --- a/etcdserver/membership/store.go +++ b/etcdserver/membership/store.go @@ -137,6 +137,14 @@ func backendMemberKey(id types.ID) []byte { return []byte(id.String()) } +func mustCreateBackendMemberBucket(be backend.Backend) { + tx := be.BatchTx() + tx.Lock() + defer tx.Unlock() + tx.UnsafeCreateBucket(membersBucketName) + tx.UnsafeCreateBucket(membersRemovedBuckedName) +} + func MemberStoreKey(id types.ID) string { return path.Join(StoreMembersPrefix, id.String()) } diff --git a/etcdserver/raft.go b/etcdserver/raft.go index a936afee7..11b80dc5a 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -17,7 +17,6 @@ package etcdserver import ( "encoding/json" "expvar" - "os" "sort" "sync" "sync/atomic" @@ -289,9 +288,6 @@ func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (i ClusterID: uint64(cl.ID()), }, ) - if err = os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil { - plog.Fatalf("create snapshot directory error: %v", err) - } if w, err = wal.Create(cfg.WALDir(), metadata); err != nil { plog.Fatalf("create wal error: %v", err) } diff --git a/etcdserver/server.go b/etcdserver/server.go index 36292c387..fa176d65d 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -210,8 +210,9 @@ type EtcdServer struct { // NewServer creates a new EtcdServer from the supplied configuration. The // configuration is considered static for the lifetime of the EtcdServer. -func NewServer(cfg *ServerConfig) (*EtcdServer, error) { +func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { st := store.New(StoreClusterPrefix, StoreKeysPrefix) + var ( w *wal.WAL n raft.Node @@ -229,12 +230,22 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err != nil { return nil, err } - if err := upgradeDataDir(cfg.DataDir, cfg.Name, dataVer); err != nil { + if err = upgradeDataDir(cfg.DataDir, cfg.Name, dataVer); err != nil { return nil, err } haveWAL := wal.Exist(cfg.WALDir()) + + if err = os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil && !os.IsExist(err) { + plog.Fatalf("create snapshot directory error: %v", err) + } ss := snap.New(cfg.SnapDir()) + be := backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename)) + defer func() { + if err != nil { + be.Close() + } + }() prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout()) if err != nil { @@ -243,18 +254,18 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { var remotes []*membership.Member switch { case !haveWAL && !cfg.NewCluster: - if err := cfg.VerifyJoinExisting(); err != nil { + if err = cfg.VerifyJoinExisting(); err != nil { return nil, err } cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) if err != nil { return nil, err } - existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), prt) - if err != nil { - return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err) + existingCluster, gerr := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), prt) + if gerr != nil { + return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr) } - if err := membership.ValidateClusterAndAssignIDs(cl, existingCluster); err != nil { + if err = membership.ValidateClusterAndAssignIDs(cl, existingCluster); err != nil { return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) } if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, prt) { @@ -264,10 +275,11 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { remotes = existingCluster.Members() cl.SetID(existingCluster.ID()) cl.SetStore(st) + cl.SetBackend(be) cfg.Print() id, n, s, w = startNode(cfg, cl, nil) case !haveWAL && cfg.NewCluster: - if err := cfg.VerifyBootstrap(); err != nil { + if err = cfg.VerifyBootstrap(); err != nil { return nil, err } cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) @@ -280,7 +292,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { } if cfg.ShouldDiscover() { var str string - var err error str, err = discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String()) if err != nil { return nil, &DiscoveryError{Op: "join", Err: err} @@ -297,14 +308,15 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { } } cl.SetStore(st) + cl.SetBackend(be) cfg.PrintWithInitial() id, n, s, w = startNode(cfg, cl, cl.MemberIDs()) case haveWAL: - if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { + if err = fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { return nil, fmt.Errorf("cannot write to member directory: %v", err) } - if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil { + if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil { return nil, fmt.Errorf("cannot write to WAL directory: %v", err) } @@ -312,7 +324,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir()) } var snapshot *raftpb.Snapshot - var err error snapshot, err = ss.Load() if err != nil && err != snap.ErrNoSnapshot { return nil, err @@ -330,6 +341,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot) } cl.SetStore(st) + cl.SetBackend(be) cl.Recover() default: return nil, fmt.Errorf("unsupported bootstrap config") @@ -346,7 +358,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { sstats.Initialize() lstats := stats.NewLeaderStats(id.String()) - srv := &EtcdServer{ + srv = &EtcdServer{ cfg: cfg, snapCount: cfg.SnapCount, errorc: make(chan error, 1), @@ -369,7 +381,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), } - srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename)) + srv.be = be srv.lessor = lease.NewLessor(srv.be) srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex) srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex()) @@ -379,7 +391,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { srv.compactor.Run() } - if err := srv.restoreAlarms(); err != nil { + if err = srv.restoreAlarms(); err != nil { return nil, err } @@ -396,7 +408,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { LeaderStats: lstats, ErrorC: srv.errorc, } - if err := tr.Start(); err != nil { + if err = tr.Start(); err != nil { return nil, err } // add all remotes into transport @@ -634,6 +646,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { if err := s.store.Recovery(apply.snapshot.Data); err != nil { plog.Panicf("recovery store error: %v", err) } + s.cluster.SetBackend(s.be) s.cluster.Recover() // recover raft transport