mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #2286 from barakmich/fix_migrations
etcdserver: Canonicalize migrations
This commit is contained in:
commit
c6cc276ef0
@ -31,7 +31,6 @@ import (
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
||||
"github.com/coreos/etcd/pkg/cors"
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/proxy"
|
||||
@ -88,13 +87,6 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||
cfg.dir = fmt.Sprintf("%v.etcd", cfg.name)
|
||||
log.Printf("no data-dir provided, using default data-dir ./%s", cfg.dir)
|
||||
}
|
||||
if err := makeMemberDir(cfg.dir); err != nil {
|
||||
return nil, fmt.Errorf("cannot use /member sub-directory: %v", err)
|
||||
}
|
||||
membdir := path.Join(cfg.dir, "member")
|
||||
if err := fileutil.IsDirWriteable(membdir); err != nil {
|
||||
return nil, fmt.Errorf("cannot write to data directory: %v", err)
|
||||
}
|
||||
|
||||
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
if err != nil {
|
||||
@ -149,7 +141,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||
Name: cfg.name,
|
||||
ClientURLs: cfg.acurls,
|
||||
PeerURLs: cfg.apurls,
|
||||
DataDir: membdir,
|
||||
DataDir: cfg.dir,
|
||||
SnapCount: cfg.snapCount,
|
||||
MaxSnapFiles: cfg.maxSnapFiles,
|
||||
MaxWALFiles: cfg.maxWalFiles,
|
||||
@ -336,42 +328,6 @@ func setupCluster(cfg *config) (*etcdserver.Cluster, error) {
|
||||
return cls, err
|
||||
}
|
||||
|
||||
func makeMemberDir(dir string) error {
|
||||
membdir := path.Join(dir, "member")
|
||||
_, err := os.Stat(membdir)
|
||||
switch {
|
||||
case err == nil:
|
||||
return nil
|
||||
case !os.IsNotExist(err):
|
||||
return err
|
||||
}
|
||||
if err := os.MkdirAll(membdir, 0700); err != nil {
|
||||
return err
|
||||
}
|
||||
v1Files := types.NewUnsafeSet("conf", "log", "snapshot")
|
||||
v2Files := types.NewUnsafeSet("wal", "snap")
|
||||
names, err := fileutil.ReadDir(dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, name := range names {
|
||||
switch {
|
||||
case v1Files.Contains(name):
|
||||
// Link it to the subdir and keep the v1 file at the original
|
||||
// location, so v0.4 etcd can still bootstrap if the upgrade
|
||||
// failed.
|
||||
if err := os.Symlink(path.Join(dir, name), path.Join(membdir, name)); err != nil {
|
||||
return err
|
||||
}
|
||||
case v2Files.Contains(name):
|
||||
if err := os.Rename(path.Join(dir, name), path.Join(membdir, name)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func genClusterString(name string, urls types.URLs) string {
|
||||
addrs := make([]string, 0)
|
||||
for _, u := range urls {
|
||||
|
@ -83,9 +83,11 @@ func (c *ServerConfig) VerifyBootstrapConfig() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ServerConfig) WALDir() string { return path.Join(c.DataDir, "wal") }
|
||||
func (c *ServerConfig) MemberDir() string { return path.Join(c.DataDir, "member") }
|
||||
|
||||
func (c *ServerConfig) SnapDir() string { return path.Join(c.DataDir, "snap") }
|
||||
func (c *ServerConfig) WALDir() string { return path.Join(c.MemberDir(), "wal") }
|
||||
|
||||
func (c *ServerConfig) SnapDir() string { return path.Join(c.MemberDir(), "snap") }
|
||||
|
||||
func (c *ServerConfig) ShouldDiscover() bool { return c.DiscoveryURL != "" }
|
||||
|
||||
@ -99,6 +101,7 @@ func (c *ServerConfig) print(initial bool) {
|
||||
log.Println("etcdserver: force new cluster")
|
||||
}
|
||||
log.Printf("etcdserver: data dir = %s", c.DataDir)
|
||||
log.Printf("etcdserver: member dir = %s", c.MemberDir())
|
||||
log.Printf("etcdserver: heartbeat = %dms", c.TickMs)
|
||||
log.Printf("etcdserver: election = %dms", c.ElectionTicks*int(c.TickMs))
|
||||
log.Printf("etcdserver: snapshot count = %d", c.SnapCount)
|
||||
|
@ -129,8 +129,8 @@ func TestBootstrapConfigVerify(t *testing.T) {
|
||||
|
||||
func TestSnapDir(t *testing.T) {
|
||||
tests := map[string]string{
|
||||
"/": "/snap",
|
||||
"/var/lib/etc": "/var/lib/etc/snap",
|
||||
"/": "/member/snap",
|
||||
"/var/lib/etc": "/var/lib/etc/member/snap",
|
||||
}
|
||||
for dd, w := range tests {
|
||||
cfg := ServerConfig{
|
||||
@ -144,8 +144,8 @@ func TestSnapDir(t *testing.T) {
|
||||
|
||||
func TestWALDir(t *testing.T) {
|
||||
tests := map[string]string{
|
||||
"/": "/wal",
|
||||
"/var/lib/etc": "/var/lib/etc/wal",
|
||||
"/": "/member/wal",
|
||||
"/var/lib/etc": "/var/lib/etc/member/wal",
|
||||
}
|
||||
for dd, w := range tests {
|
||||
cfg := ServerConfig{
|
||||
|
@ -144,6 +144,8 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
var n raft.Node
|
||||
var s *raft.MemoryStorage
|
||||
var id types.ID
|
||||
var ss *snap.Snapshotter
|
||||
|
||||
walVersion, err := wal.DetectVersion(cfg.DataDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -153,7 +155,6 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
}
|
||||
haveWAL := walVersion != wal.WALNotExist
|
||||
|
||||
ss := snap.New(cfg.SnapDir())
|
||||
switch {
|
||||
case !haveWAL && !cfg.NewCluster:
|
||||
us := getOtherPeerURLs(cfg.Cluster, cfg.Name)
|
||||
@ -189,15 +190,23 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
|
||||
cfg.PrintWithInitial()
|
||||
id, n, s, w = startNode(cfg, cfg.Cluster.MemberIDs())
|
||||
case haveWAL:
|
||||
if walVersion != wal.WALv0_5 {
|
||||
if err := upgradeWAL(cfg, walVersion); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Run the migrations.
|
||||
if err := upgradeWAL(cfg.DataDir, cfg.Name, walVersion); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := fileutil.IsDirWriteable(cfg.DataDir); err != nil {
|
||||
return nil, fmt.Errorf("cannot write to data directory: %v", err)
|
||||
}
|
||||
|
||||
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
|
||||
return nil, fmt.Errorf("cannot write to member directory: %v", err)
|
||||
}
|
||||
|
||||
if cfg.ShouldDiscover() {
|
||||
log.Printf("etcdserver: discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
|
||||
}
|
||||
ss := snap.New(cfg.SnapDir())
|
||||
snapshot, err := ss.Load()
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
return nil, err
|
||||
|
@ -16,6 +16,8 @@ package etcdserver
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/migrate"
|
||||
@ -91,14 +93,47 @@ func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID,
|
||||
|
||||
// upgradeWAL converts an older version of the etcdServer data to the newest version.
|
||||
// It must ensure that, after upgrading, the most recent version is present.
|
||||
func upgradeWAL(cfg *ServerConfig, ver wal.WalVersion) error {
|
||||
if ver == wal.WALv0_4 {
|
||||
func upgradeWAL(baseDataDir string, name string, ver wal.WalVersion) error {
|
||||
switch ver {
|
||||
case wal.WALv0_4:
|
||||
log.Print("etcdserver: converting v0.4 log to v2.0")
|
||||
err := migrate.Migrate4To2(cfg.DataDir, cfg.Name)
|
||||
err := migrate.Migrate4To2(baseDataDir, name)
|
||||
if err != nil {
|
||||
log.Fatalf("etcdserver: failed migrating data-dir: %v", err)
|
||||
return err
|
||||
}
|
||||
fallthrough
|
||||
case wal.WALv2_0:
|
||||
err := makeMemberDir(baseDataDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fallthrough
|
||||
case wal.WALv2_0_1:
|
||||
fallthrough
|
||||
default:
|
||||
log.Printf("datadir is valid for the 2.0.1 format")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func makeMemberDir(dir string) error {
|
||||
membdir := path.Join(dir, "member")
|
||||
_, err := os.Stat(membdir)
|
||||
switch {
|
||||
case err == nil:
|
||||
return nil
|
||||
case !os.IsNotExist(err):
|
||||
return err
|
||||
}
|
||||
if err := os.MkdirAll(membdir, 0700); err != nil {
|
||||
return err
|
||||
}
|
||||
names := []string{"snap", "wal"}
|
||||
for _, name := range names {
|
||||
if err := os.Rename(path.Join(dir, name), path.Join(membdir, name)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
15
wal/util.go
15
wal/util.go
@ -31,7 +31,8 @@ const (
|
||||
WALUnknown WalVersion = "Unknown WAL"
|
||||
WALNotExist WalVersion = "No WAL"
|
||||
WALv0_4 WalVersion = "0.4.x"
|
||||
WALv0_5 WalVersion = "0.5.x"
|
||||
WALv2_0 WalVersion = "2.0.0"
|
||||
WALv2_0_1 WalVersion = "2.0.1"
|
||||
)
|
||||
|
||||
func DetectVersion(dirpath string) (WalVersion, error) {
|
||||
@ -48,10 +49,20 @@ func DetectVersion(dirpath string) (WalVersion, error) {
|
||||
return WALNotExist, nil
|
||||
}
|
||||
nameSet := types.NewUnsafeSet(names...)
|
||||
if nameSet.Contains("member") {
|
||||
ver, err := DetectVersion(path.Join(dirpath, "member"))
|
||||
if ver == WALv2_0 {
|
||||
return WALv2_0_1, nil
|
||||
} else if ver == WALv0_4 {
|
||||
// How in the blazes did it get there?
|
||||
return WALUnknown, nil
|
||||
}
|
||||
return ver, err
|
||||
}
|
||||
if nameSet.ContainsAll([]string{"snap", "wal"}) {
|
||||
// .../wal cannot be empty to exist.
|
||||
if Exist(path.Join(dirpath, "wal")) {
|
||||
return WALv0_5, nil
|
||||
return WALv2_0, nil
|
||||
}
|
||||
}
|
||||
if nameSet.ContainsAll([]string{"snapshot", "conf", "log"}) {
|
||||
|
@ -28,7 +28,8 @@ func TestDetectVersion(t *testing.T) {
|
||||
wver WalVersion
|
||||
}{
|
||||
{[]string{}, WALNotExist},
|
||||
{[]string{"snap/", "wal/", "wal/1"}, WALv0_5},
|
||||
{[]string{"member/", "member/wal/", "member/wal/1", "member/snap/"}, WALv2_0_1},
|
||||
{[]string{"snap/", "wal/", "wal/1"}, WALv2_0},
|
||||
{[]string{"snapshot/", "conf", "log"}, WALv0_4},
|
||||
{[]string{"weird"}, WALUnknown},
|
||||
{[]string{"snap/", "wal/"}, WALUnknown},
|
||||
|
Loading…
x
Reference in New Issue
Block a user