Merge pull request #4087 from gyuho/delete_discovery_check

etcdserver: always remove member directory when bootstrap fails
This commit is contained in:
Gyu-Ho Lee 2015-12-29 00:04:37 -07:00
commit cd42c9139e

View File

@ -194,7 +194,7 @@ type EtcdServer struct {
// NewServer creates a new EtcdServer from the supplied configuration. The // NewServer creates a new EtcdServer from the supplied configuration. The
// configuration is considered static for the lifetime of the EtcdServer. // configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg *ServerConfig) (*EtcdServer, error) { func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
st := store.New(StoreClusterPrefix, StoreKeysPrefix) st := store.New(StoreClusterPrefix, StoreKeysPrefix)
var ( var (
w *wal.WAL w *wal.WAL
@ -213,7 +213,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := upgradeDataDir(cfg.DataDir, cfg.Name, dataVer); err != nil { if err = upgradeDataDir(cfg.DataDir, cfg.Name, dataVer); err != nil {
return nil, err return nil, err
} }
@ -223,27 +223,36 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
} }
haveWAL := wal.Exist(cfg.WALDir()) haveWAL := wal.Exist(cfg.WALDir())
if !haveWAL {
defer func() {
if err != nil {
// cleans up member directory if bootstrap fails (including forming or joining a new cluster)
os.RemoveAll(cfg.MemberDir())
}
}()
}
ss := snap.New(cfg.SnapDir()) ss := snap.New(cfg.SnapDir())
prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout()) prt, rterr := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout())
if err != nil { if rterr != nil {
return nil, err return nil, rterr
} }
var remotes []*Member var remotes []*Member
switch { switch {
case !haveWAL && !cfg.NewCluster: case !haveWAL && !cfg.NewCluster:
if err := cfg.VerifyJoinExisting(); err != nil { if err = cfg.VerifyJoinExisting(); err != nil {
return nil, err return nil, err
} }
cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
if err != nil { if err != nil {
return nil, err return nil, err
} }
existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), prt) existingCluster, gcerr := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), prt)
if err != nil { if gcerr != nil {
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err) return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gcerr)
} }
if err := ValidateClusterAndAssignIDs(cl, existingCluster); err != nil { if err = ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
} }
if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, prt) { if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, prt) {
@ -256,7 +265,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
cfg.Print() cfg.Print()
id, n, s, w = startNode(cfg, cl, nil) id, n, s, w = startNode(cfg, cl, nil)
case !haveWAL && cfg.NewCluster: case !haveWAL && cfg.NewCluster:
if err := cfg.VerifyBootstrap(); err != nil { if err = cfg.VerifyBootstrap(); err != nil {
return nil, err return nil, err
} }
cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) cl, err = newClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
@ -268,15 +277,13 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID) return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
} }
if cfg.ShouldDiscover() { if cfg.ShouldDiscover() {
var str string str, jerr := discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
var err error if jerr != nil {
str, err = discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String()) return nil, &DiscoveryError{Op: "join", Err: jerr}
if err != nil {
return nil, &DiscoveryError{Op: "join", Err: err}
} }
urlsmap, err := types.NewURLsMap(str) urlsmap, uerr := types.NewURLsMap(str)
if err != nil { if uerr != nil {
return nil, err return nil, uerr
} }
if checkDuplicateURL(urlsmap) { if checkDuplicateURL(urlsmap) {
return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap) return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
@ -289,29 +296,27 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
cfg.PrintWithInitial() cfg.PrintWithInitial()
id, n, s, w = startNode(cfg, cl, cl.MemberIDs()) id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
case haveWAL: case haveWAL:
if err := fileutil.IsDirWriteable(cfg.DataDir); err != nil { if err = fileutil.IsDirWriteable(cfg.DataDir); err != nil {
return nil, fmt.Errorf("cannot write to data directory: %v", err) return nil, fmt.Errorf("cannot write to data directory: %v", err)
} }
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { if err = fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
return nil, fmt.Errorf("cannot write to member directory: %v", err) return nil, fmt.Errorf("cannot write to member directory: %v", err)
} }
if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil { if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
return nil, fmt.Errorf("cannot write to WAL directory: %v", err) return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
} }
if cfg.ShouldDiscover() { if cfg.ShouldDiscover() {
plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir()) plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
} }
var snapshot *raftpb.Snapshot snapshot, lerr := ss.Load()
var err error if lerr != nil && lerr != snap.ErrNoSnapshot {
snapshot, err = ss.Load() return nil, lerr
if err != nil && err != snap.ErrNoSnapshot {
return nil, err
} }
if snapshot != nil { if snapshot != nil {
if err := st.Recovery(snapshot.Data); err != nil { if err = st.Recovery(snapshot.Data); err != nil {
plog.Panicf("recovered store from snapshot error: %v", err) plog.Panicf("recovered store from snapshot error: %v", err)
} }
plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index) plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)
@ -335,7 +340,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
sstats.Initialize() sstats.Initialize()
lstats := stats.NewLeaderStats(id.String()) lstats := stats.NewLeaderStats(id.String())
srv := &EtcdServer{ srv = &EtcdServer{
cfg: cfg, cfg: cfg,
snapCount: cfg.SnapCount, snapCount: cfg.SnapCount,
errorc: make(chan error, 1), errorc: make(chan error, 1),
@ -378,7 +383,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
ErrorC: srv.errorc, ErrorC: srv.errorc,
V3demo: cfg.V3demo, V3demo: cfg.V3demo,
} }
if err := tr.Start(); err != nil { if err = tr.Start(); err != nil {
return nil, err return nil, err
} }
// add all remotes into transport // add all remotes into transport