Merge pull request #5018 from xiang90/b

etcdserver: set backend to cluster
This commit is contained in:
Xiang Li
2016-04-11 13:02:57 -07:00
4 changed files with 42 additions and 20 deletions

View File

@@ -195,6 +195,11 @@ func (c *RaftCluster) SetID(id types.ID) { c.id = id }
func (c *RaftCluster) SetStore(st store.Store) { c.store = st }
func (c *RaftCluster) SetBackend(be backend.Backend) {
c.be = be
mustCreateBackendMemberBucket(c.be)
}
func (c *RaftCluster) Recover() {
c.Lock()
defer c.Unlock()

View File

@@ -137,6 +137,14 @@ func backendMemberKey(id types.ID) []byte {
return []byte(id.String())
}
func mustCreateBackendMemberBucket(be backend.Backend) {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeCreateBucket(membersBucketName)
tx.UnsafeCreateBucket(membersRemovedBuckedName)
}
func MemberStoreKey(id types.ID) string {
return path.Join(StoreMembersPrefix, id.String())
}

View File

@@ -17,7 +17,6 @@ package etcdserver
import (
"encoding/json"
"expvar"
"os"
"sort"
"sync"
"sync/atomic"
@@ -289,9 +288,6 @@ func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (i
ClusterID: uint64(cl.ID()),
},
)
if err = os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
plog.Fatalf("create snapshot directory error: %v", err)
}
if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
plog.Fatalf("create wal error: %v", err)
}

View File

@@ -210,8 +210,9 @@ type EtcdServer struct {
// NewServer creates a new EtcdServer from the supplied configuration. The
// configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
st := store.New(StoreClusterPrefix, StoreKeysPrefix)
var (
w *wal.WAL
n raft.Node
@@ -229,12 +230,22 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err != nil {
return nil, err
}
if err := upgradeDataDir(cfg.DataDir, cfg.Name, dataVer); err != nil {
if err = upgradeDataDir(cfg.DataDir, cfg.Name, dataVer); err != nil {
return nil, err
}
haveWAL := wal.Exist(cfg.WALDir())
if err = os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil && !os.IsExist(err) {
plog.Fatalf("create snapshot directory error: %v", err)
}
ss := snap.New(cfg.SnapDir())
be := backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
defer func() {
if err != nil {
be.Close()
}
}()
prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout())
if err != nil {
@@ -243,18 +254,18 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
var remotes []*membership.Member
switch {
case !haveWAL && !cfg.NewCluster:
if err := cfg.VerifyJoinExisting(); err != nil {
if err = cfg.VerifyJoinExisting(); err != nil {
return nil, err
}
cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
if err != nil {
return nil, err
}
existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), prt)
if err != nil {
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err)
existingCluster, gerr := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), prt)
if gerr != nil {
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
}
if err := membership.ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
if err = membership.ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
}
if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, prt) {
@@ -264,10 +275,11 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
remotes = existingCluster.Members()
cl.SetID(existingCluster.ID())
cl.SetStore(st)
cl.SetBackend(be)
cfg.Print()
id, n, s, w = startNode(cfg, cl, nil)
case !haveWAL && cfg.NewCluster:
if err := cfg.VerifyBootstrap(); err != nil {
if err = cfg.VerifyBootstrap(); err != nil {
return nil, err
}
cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
@@ -280,7 +292,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
}
if cfg.ShouldDiscover() {
var str string
var err error
str, err = discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
if err != nil {
return nil, &DiscoveryError{Op: "join", Err: err}
@@ -297,14 +308,15 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
}
}
cl.SetStore(st)
cl.SetBackend(be)
cfg.PrintWithInitial()
id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
case haveWAL:
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
if err = fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
return nil, fmt.Errorf("cannot write to member directory: %v", err)
}
if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
}
@@ -312,7 +324,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
}
var snapshot *raftpb.Snapshot
var err error
snapshot, err = ss.Load()
if err != nil && err != snap.ErrNoSnapshot {
return nil, err
@@ -330,6 +341,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
}
cl.SetStore(st)
cl.SetBackend(be)
cl.Recover()
default:
return nil, fmt.Errorf("unsupported bootstrap config")
@@ -346,7 +358,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
sstats.Initialize()
lstats := stats.NewLeaderStats(id.String())
srv := &EtcdServer{
srv = &EtcdServer{
cfg: cfg,
snapCount: cfg.SnapCount,
errorc: make(chan error, 1),
@@ -369,7 +381,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
}
srv.be = backend.NewDefaultBackend(path.Join(cfg.SnapDir(), databaseFilename))
srv.be = be
srv.lessor = lease.NewLessor(srv.be)
srv.kv = dstorage.New(srv.be, srv.lessor, &srv.consistIndex)
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
@@ -379,7 +391,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
srv.compactor.Run()
}
if err := srv.restoreAlarms(); err != nil {
if err = srv.restoreAlarms(); err != nil {
return nil, err
}
@@ -396,7 +408,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
LeaderStats: lstats,
ErrorC: srv.errorc,
}
if err := tr.Start(); err != nil {
if err = tr.Start(); err != nil {
return nil, err
}
// add all remotes into transport
@@ -634,6 +646,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
if err := s.store.Recovery(apply.snapshot.Data); err != nil {
plog.Panicf("recovery store error: %v", err)
}
s.cluster.SetBackend(s.be)
s.cluster.Recover()
// recover raft transport