diff --git a/etcdserver/etcdhttp/http.go b/etcdserver/etcdhttp/http.go index ebcbcc20a..353cdb16b 100644 --- a/etcdserver/etcdhttp/http.go +++ b/etcdserver/etcdhttp/http.go @@ -35,10 +35,10 @@ const ( var errClosed = errors.New("etcdhttp: client closed connection") // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests. -func NewClientHandler(server *etcdserver.EtcdServer, clusterStore etcdserver.ClusterStore, timeout time.Duration) http.Handler { +func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http.Handler { sh := &serverHandler{ server: server, - clusterStore: clusterStore, + clusterStore: server.ClusterStore, timer: server, timeout: timeout, } diff --git a/etcdserver/etcdhttp/http_test.go b/etcdserver/etcdhttp/http_test.go index 4c41dc448..9fc149669 100644 --- a/etcdserver/etcdhttp/http_test.go +++ b/etcdserver/etcdhttp/http_test.go @@ -591,7 +591,7 @@ func TestV2MachinesEndpoint(t *testing.T) { {"POST", http.StatusMethodNotAllowed}, } - m := NewClientHandler(nil, &fakeCluster{}, time.Hour) + m := NewClientHandler(&etcdserver.EtcdServer{ClusterStore: &fakeCluster{}}, time.Hour) s := httptest.NewServer(m) defer s.Close() diff --git a/etcdserver/server.go b/etcdserver/server.go index 7c043077f..953bd6f76 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -5,6 +5,7 @@ import ( "errors" "log" "math/rand" + "net/http" "sync/atomic" "time" @@ -12,9 +13,11 @@ import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/snap" "github.com/coreos/etcd/store" "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context" "github.com/coreos/etcd/wait" + "github.com/coreos/etcd/wal" ) const ( @@ -76,13 +79,89 @@ type RaftTimer interface { Term() int64 } -// EtcdServer is the production implementation of the Server interface -type EtcdServer struct { - w wait.Wait - done chan struct{} - +type ServerConfig struct { Name string ClientURLs types.URLs + SnapDir string + SnapCount int64 + WalDir string + Cluster *Cluster + Transport *http.Transport +} + +// NewServer creates a new EtcdServer from the supplied configuration. The +// configuration is considered static for the lifetime of the EtcdServer. +func NewServer(cfg *ServerConfig) *EtcdServer { + m := cfg.Cluster.FindName(cfg.Name) + if m == nil { + // Should never happen + log.Fatalf("could not find name %v in cluster!", cfg.Name) + } + st := store.New() + ss := snap.New(cfg.SnapDir) + var w *wal.WAL + var n raft.Node + var err error + if !wal.Exist(cfg.WalDir) { + if w, err = wal.Create(cfg.WalDir); err != nil { + log.Fatal(err) + } + n = raft.StartNode(m.ID, cfg.Cluster.IDs(), 10, 1) + } else { + var index int64 + snapshot, err := ss.Load() + if err != nil && err != snap.ErrNoSnapshot { + log.Fatal(err) + } + if snapshot != nil { + log.Printf("etcdserver: restart from snapshot at index %d", snapshot.Index) + st.Recovery(snapshot.Data) + index = snapshot.Index + } + + // restart a node from previous wal + if w, err = wal.OpenAtIndex(cfg.WalDir, index); err != nil { + log.Fatal(err) + } + wid, st, ents, err := w.ReadAll() + if err != nil { + log.Fatal(err) + } + // TODO(xiangli): save/recovery nodeID? + if wid != 0 { + log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid) + } + n = raft.RestartNode(m.ID, cfg.Cluster.IDs(), 10, 1, snapshot, st, ents) + } + + cls := NewClusterStore(st, *cfg.Cluster) + + s := &EtcdServer{ + Store: st, + Node: n, + name: cfg.Name, + Storage: struct { + *wal.WAL + *snap.Snapshotter + }{w, ss}, + Send: Sender(cfg.Transport, cls), + clientURLs: cfg.ClientURLs, + Ticker: time.Tick(100 * time.Millisecond), + SyncTicker: time.Tick(500 * time.Millisecond), + SnapCount: cfg.SnapCount, + ClusterStore: cls, + } + return s +} + +// EtcdServer is the production implementation of the Server interface +type EtcdServer struct { + w wait.Wait + done chan struct{} + name string + clientURLs types.URLs + + ClusterStore ClusterStore Node raft.Node Store store.Store @@ -101,9 +180,8 @@ type EtcdServer struct { SnapCount int64 // number of entries to trigger a snapshot // Cache of the latest raft index and raft term the server has seen - raftIndex int64 - raftTerm int64 - ClusterStore ClusterStore + raftIndex int64 + raftTerm int64 } // Start prepares and starts server in a new goroutine. It is no longer safe to @@ -338,14 +416,14 @@ func (s *EtcdServer) sync(timeout time.Duration) { } // publish registers server information into the cluster. The information -// is the json format of its self member struct, whose ClientURLs may be -// updated. +// is the JSON representation of this server's member struct, updated with the +// static clientURLs of the server. // The function keeps attempting to register until it succeeds, // or its server is stopped. // TODO: take care of info fetched from cluster store after having reconfig. func (s *EtcdServer) publish(retryInterval time.Duration) { - m := *s.ClusterStore.Get().FindName(s.Name) - m.ClientURLs = s.ClientURLs.StringSlice() + m := *s.ClusterStore.Get().FindName(s.name) + m.ClientURLs = s.clientURLs.StringSlice() b, err := json.Marshal(m) if err != nil { log.Printf("etcdserver: json marshal error: %v", err) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 3d38a1156..21331cbe2 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -856,8 +856,8 @@ func TestPublish(t *testing.T) { ch <- Response{} w := &waitWithResponse{ch: ch} srv := &EtcdServer{ - Name: "node1", - ClientURLs: []url.URL{{Scheme: "http", Host: "a"}, {Scheme: "http", Host: "b"}}, + name: "node1", + clientURLs: []url.URL{{Scheme: "http", Host: "a"}, {Scheme: "http", Host: "b"}}, Node: n, ClusterStore: cs, w: w, @@ -892,7 +892,7 @@ func TestPublish(t *testing.T) { func TestPublishStopped(t *testing.T) { cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}}) srv := &EtcdServer{ - Name: "node1", + name: "node1", Node: &nodeRecorder{}, ClusterStore: cs, w: &waitRecorder{}, @@ -907,7 +907,7 @@ func TestPublishRetry(t *testing.T) { n := &nodeRecorder{} cs := mustClusterStore(t, []Member{{ID: 1, Name: "node1"}}) srv := &EtcdServer{ - Name: "node1", + name: "node1", Node: n, ClusterStore: cs, w: &waitRecorder{}, diff --git a/main.go b/main.go index 44e5cf879..ef46547c5 100644 --- a/main.go +++ b/main.go @@ -17,9 +17,6 @@ import ( "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/proxy" "github.com/coreos/etcd/raft" - "github.com/coreos/etcd/snap" - "github.com/coreos/etcd/store" - "github.com/coreos/etcd/wal" ) const ( @@ -33,7 +30,7 @@ var ( name = flag.String("name", "default", "Unique human-readable name for this node") timeout = flag.Duration("timeout", 10*time.Second, "Request Timeout") dir = flag.String("data-dir", "", "Path to the data directory") - snapCount = flag.Int64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot") + snapCount = flag.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot") printVersion = flag.Bool("version", false, "Print the version and exit") cluster = &etcdserver.Cluster{} @@ -125,93 +122,42 @@ func startEtcd() { log.Fatalf("etcd: cannot use None(%d) as member id", raft.None) } - if *snapCount <= 0 { - log.Fatalf("etcd: snapshot-count must be greater than 0: snapshot-count=%d", *snapCount) - } - if *dir == "" { *dir = fmt.Sprintf("%v_etcd_data", self.ID) - log.Printf("main: no data-dir is given, using default data-dir ./%s", *dir) + log.Printf("main: no data-dir provided, using default data-dir ./%s", *dir) } if err := os.MkdirAll(*dir, privateDirMode); err != nil { log.Fatalf("main: cannot create data directory: %v", err) } snapdir := path.Join(*dir, "snap") if err := os.MkdirAll(snapdir, privateDirMode); err != nil { - log.Fatalf("etcd: cannot create snapshot directory: %v", err) + log.Fatalf("main: cannot create snapshot directory: %v", err) } - snapshotter := snap.New(snapdir) - waldir := path.Join(*dir, "wal") - var w *wal.WAL - var n raft.Node - var err error - st := store.New() - - if !wal.Exist(waldir) { - w, err = wal.Create(waldir) - if err != nil { - log.Fatal(err) - } - n = raft.StartNode(self.ID, cluster.IDs(), 10, 1) - } else { - var index int64 - snapshot, err := snapshotter.Load() - if err != nil && err != snap.ErrNoSnapshot { - log.Fatal(err) - } - if snapshot != nil { - log.Printf("etcd: restart from snapshot at index %d", snapshot.Index) - st.Recovery(snapshot.Data) - index = snapshot.Index - } - - // restart a node from previous wal - if w, err = wal.OpenAtIndex(waldir, index); err != nil { - log.Fatal(err) - } - wid, st, ents, err := w.ReadAll() - if err != nil { - log.Fatal(err) - } - // TODO(xiangli): save/recovery nodeID? - if wid != 0 { - log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid) - } - n = raft.RestartNode(self.ID, cluster.IDs(), 10, 1, snapshot, st, ents) - } pt, err := transport.NewTransport(peerTLSInfo) if err != nil { log.Fatal(err) } - cls := etcdserver.NewClusterStore(st, *cluster) - acurls, err := pkg.URLsFromFlags(flag.CommandLine, "advertise-client-urls", "addr", clientTLSInfo) if err != nil { log.Fatal(err.Error()) } - - s := &etcdserver.EtcdServer{ + cfg := &etcdserver.ServerConfig{ Name: *name, ClientURLs: acurls, - Store: st, - Node: n, - Storage: struct { - *wal.WAL - *snap.Snapshotter - }{w, snapshotter}, - Send: etcdserver.Sender(pt, cls), - Ticker: time.Tick(100 * time.Millisecond), - SyncTicker: time.Tick(500 * time.Millisecond), - SnapCount: *snapCount, - ClusterStore: cls, + SnapDir: snapdir, + SnapCount: int64(*snapCount), + WalDir: waldir, + Cluster: cluster, + Transport: pt, } + s := etcdserver.NewServer(cfg) s.Start() ch := &pkg.CORSHandler{ - Handler: etcdhttp.NewClientHandler(s, cls, *timeout), + Handler: etcdhttp.NewClientHandler(s, *timeout), Info: cors, } ph := etcdhttp.NewPeerHandler(s)