mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: clean NewServer
This commit is contained in:
parent
57ae19b500
commit
a85ec90d68
@ -3,6 +3,7 @@ package etcdserver
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"path"
|
||||
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
)
|
||||
@ -40,3 +41,14 @@ func (c *ServerConfig) Verify() error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ServerConfig) WALDir() string { return path.Join(c.DataDir, "wal") }
|
||||
|
||||
func (c *ServerConfig) SnapDir() string { return path.Join(c.DataDir, "snap") }
|
||||
|
||||
func (c *ServerConfig) ID() uint64 { return c.Cluster.FindName(c.Name).ID }
|
||||
|
||||
// IsBootstrap returns ture if a bootstrap method is provided.
|
||||
func (c *ServerConfig) IsBootstrap() bool {
|
||||
return c.DiscoveryURL != "" || c.ClusterState == ClusterStateValueNew
|
||||
}
|
||||
|
@ -6,7 +6,6 @@ import (
|
||||
"log"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@ -82,111 +81,6 @@ type RaftTimer interface {
|
||||
Term() uint64
|
||||
}
|
||||
|
||||
// NewServer creates a new EtcdServer from the supplied configuration. The
|
||||
// configuration is considered static for the lifetime of the EtcdServer.
|
||||
func NewServer(cfg *ServerConfig) *EtcdServer {
|
||||
err := cfg.Verify()
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
snapdir := path.Join(cfg.DataDir, "snap")
|
||||
if err := os.MkdirAll(snapdir, privateDirMode); err != nil {
|
||||
log.Fatalf("etcdserver: cannot create snapshot directory: %v", err)
|
||||
}
|
||||
ss := snap.New(snapdir)
|
||||
st := store.New()
|
||||
var w *wal.WAL
|
||||
var n raft.Node
|
||||
m := cfg.Cluster.FindName(cfg.Name)
|
||||
waldir := path.Join(cfg.DataDir, "wal")
|
||||
if !wal.Exist(waldir) {
|
||||
if cfg.DiscoveryURL != "" {
|
||||
d, err := discovery.New(cfg.DiscoveryURL, m.ID, cfg.Cluster.String())
|
||||
if err != nil {
|
||||
log.Fatalf("etcd: cannot init discovery %v", err)
|
||||
}
|
||||
s, err := d.Discover()
|
||||
if err != nil {
|
||||
log.Fatalf("etcd: %v", err)
|
||||
}
|
||||
if err = cfg.Cluster.Set(s); err != nil {
|
||||
log.Fatalf("etcd: %v", err)
|
||||
}
|
||||
} else if (cfg.ClusterState) != ClusterStateValueNew {
|
||||
log.Fatalf("etcd: initial cluster state unset and no wal or discovery URL found")
|
||||
}
|
||||
i := pb.Info{ID: m.ID}
|
||||
b, err := i.Marshal()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if w, err = wal.Create(waldir, b); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
ids := cfg.Cluster.IDs()
|
||||
peers := make([]raft.Peer, len(ids))
|
||||
for i, id := range ids {
|
||||
ctx, err := json.Marshal((*cfg.Cluster)[id])
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
peers[i] = raft.Peer{ID: id, Context: ctx}
|
||||
}
|
||||
n = raft.StartNode(m.ID, peers, 10, 1)
|
||||
} else {
|
||||
if cfg.DiscoveryURL != "" {
|
||||
log.Printf("etcd: warn: ignoring discovery URL: etcd has already been initialized and has a valid log in %q", waldir)
|
||||
}
|
||||
var index uint64
|
||||
snapshot, err := ss.Load()
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if snapshot != nil {
|
||||
log.Printf("etcdserver: restart from snapshot at index %d", snapshot.Index)
|
||||
st.Recovery(snapshot.Data)
|
||||
index = snapshot.Index
|
||||
}
|
||||
|
||||
// restart a node from previous wal
|
||||
if w, err = wal.OpenAtIndex(waldir, index); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
md, st, ents, err := w.ReadAll()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
var info pb.Info
|
||||
if err := info.Unmarshal(md); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
// TODO(xiangli): save/recovery nodeID?
|
||||
if info.ID != m.ID {
|
||||
log.Fatalf("unexpected nodeid %x, want %x: nodeid should always be the same until we support name/peerURLs update or dynamic configuration", info.ID, m.ID)
|
||||
}
|
||||
n = raft.RestartNode(m.ID, 10, 1, snapshot, st, ents)
|
||||
}
|
||||
|
||||
cls := &clusterStore{Store: st}
|
||||
s := &EtcdServer{
|
||||
store: st,
|
||||
node: n,
|
||||
id: m.ID,
|
||||
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
||||
storage: struct {
|
||||
*wal.WAL
|
||||
*snap.Snapshotter
|
||||
}{w, ss},
|
||||
send: Sender(cfg.Transport, cls),
|
||||
ticker: time.Tick(100 * time.Millisecond),
|
||||
syncTicker: time.Tick(500 * time.Millisecond),
|
||||
snapCount: cfg.SnapCount,
|
||||
ClusterStore: cls,
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// EtcdServer is the production implementation of the Server interface
|
||||
type EtcdServer struct {
|
||||
w wait.Wait
|
||||
@ -217,6 +111,73 @@ type EtcdServer struct {
|
||||
raftTerm uint64
|
||||
}
|
||||
|
||||
// NewServer creates a new EtcdServer from the supplied configuration. The
|
||||
// configuration is considered static for the lifetime of the EtcdServer.
|
||||
func NewServer(cfg *ServerConfig) *EtcdServer {
|
||||
if err := cfg.Verify(); err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
if err := os.MkdirAll(cfg.SnapDir(), privateDirMode); err != nil {
|
||||
log.Fatalf("etcdserver: cannot create snapshot directory: %v", err)
|
||||
}
|
||||
ss := snap.New(cfg.SnapDir())
|
||||
st := store.New()
|
||||
var w *wal.WAL
|
||||
var n raft.Node
|
||||
if !wal.Exist(cfg.WALDir()) {
|
||||
if !cfg.IsBootstrap() {
|
||||
log.Fatalf("etcd: initial cluster state unset and no wal or discovery URL found")
|
||||
}
|
||||
if cfg.DiscoveryURL != "" {
|
||||
d, err := discovery.New(cfg.DiscoveryURL, cfg.ID(), cfg.Cluster.String())
|
||||
if err != nil {
|
||||
log.Fatalf("etcd: cannot init discovery %v", err)
|
||||
}
|
||||
s, err := d.Discover()
|
||||
if err != nil {
|
||||
log.Fatalf("etcd: %v", err)
|
||||
}
|
||||
if err = cfg.Cluster.Set(s); err != nil {
|
||||
log.Fatalf("etcd: %v", err)
|
||||
}
|
||||
}
|
||||
n, w = startNode(cfg)
|
||||
} else {
|
||||
if cfg.DiscoveryURL != "" {
|
||||
log.Printf("etcd: warn: ignoring discovery URL: etcd has already been initialized and has a valid log in %q", cfg.WALDir())
|
||||
}
|
||||
var index uint64
|
||||
snapshot, err := ss.Load()
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if snapshot != nil {
|
||||
log.Printf("etcdserver: restart from snapshot at index %d", snapshot.Index)
|
||||
st.Recovery(snapshot.Data)
|
||||
index = snapshot.Index
|
||||
}
|
||||
n, w = restartNode(cfg, index, snapshot)
|
||||
}
|
||||
|
||||
cls := &clusterStore{Store: st}
|
||||
s := &EtcdServer{
|
||||
store: st,
|
||||
node: n,
|
||||
id: cfg.ID(),
|
||||
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
||||
storage: struct {
|
||||
*wal.WAL
|
||||
*snap.Snapshotter
|
||||
}{w, ss},
|
||||
send: Sender(cfg.Transport, cls),
|
||||
ticker: time.Tick(100 * time.Millisecond),
|
||||
syncTicker: time.Tick(500 * time.Millisecond),
|
||||
snapCount: cfg.SnapCount,
|
||||
ClusterStore: cls,
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// Start prepares and starts server in a new goroutine. It is no longer safe to
|
||||
// modify a server's fields after it has been sent to Start.
|
||||
// It also starts a goroutine to publish its server information.
|
||||
@ -570,6 +531,46 @@ func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) {
|
||||
s.storage.Cut()
|
||||
}
|
||||
|
||||
func startNode(cfg *ServerConfig) (n raft.Node, w *wal.WAL) {
|
||||
i := pb.Info{ID: cfg.ID()}
|
||||
b, err := i.Marshal()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if w, err = wal.Create(cfg.WALDir(), b); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
ids := cfg.Cluster.IDs()
|
||||
peers := make([]raft.Peer, len(ids))
|
||||
for i, id := range ids {
|
||||
ctx, err := json.Marshal((*cfg.Cluster)[id])
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
peers[i] = raft.Peer{ID: id, Context: ctx}
|
||||
}
|
||||
n = raft.StartNode(cfg.ID(), peers, 10, 1)
|
||||
return
|
||||
}
|
||||
|
||||
func restartNode(cfg *ServerConfig, index uint64, snapshot *raftpb.Snapshot) (n raft.Node, w *wal.WAL) {
|
||||
var err error
|
||||
// restart a node from previous wal
|
||||
if w, err = wal.OpenAtIndex(cfg.WALDir(), index); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
md, st, ents, err := w.ReadAll()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
var info pb.Info
|
||||
if err := info.Unmarshal(md); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
n = raft.RestartNode(info.ID, 10, 1, snapshot, st, ents)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: move the function to /id pkg maybe?
|
||||
// GenID generates a random id that is not equal to 0.
|
||||
func GenID() (n uint64) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user