From 92b329fdb93c6a31367e5d414c48ccada30ef995 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 2 Feb 2015 20:56:45 -0800 Subject: [PATCH 1/3] etcdmain: use symlink instead of link for v0.4 files link doesn't support directory. --- etcdmain/etcd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index b56d5f87e..6c170c225 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -360,7 +360,7 @@ func makeMemberDir(dir string) error { // Link it to the subdir and keep the v1 file at the original // location, so v0.4 etcd can still bootstrap if the upgrade // failed. - if err := os.Link(path.Join(dir, name), path.Join(membdir, name)); err != nil { + if err := os.Symlink(path.Join(dir, name), path.Join(membdir, name)); err != nil { return err } case v2Files.Contains(name): From f2f2adc663dd5ca4ae95328319e62f35ddc1b1c0 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 2 Feb 2015 21:02:10 -0800 Subject: [PATCH 2/3] migrate/functional: always run tests on CoreOS image --- migrate/functional/README.md | 2 +- migrate/functional/member.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/migrate/functional/README.md b/migrate/functional/README.md index 421fc3cff..1b0c5b013 100644 --- a/migrate/functional/README.md +++ b/migrate/functional/README.md @@ -7,7 +7,7 @@ This functional test suite deploys a etcd cluster using processes, and asserts e Dependencies ------------ -The test suite can only be run in linux system. It's recommended to run this in a virtual machine environment on CoreOS (e.g. using coreos-vagrant). The only dependency for the tests not provided on the CoreOS image is go. +The test suite can only be run in CoreOS system. It's recommended to run this in a virtual machine environment on CoreOS (e.g. using coreos-vagrant). The only dependency for the tests not provided on the CoreOS image is go. Usage ----- diff --git a/migrate/functional/member.go b/migrate/functional/member.go index 0b9545514..28dc6385d 100644 --- a/migrate/functional/member.go +++ b/migrate/functional/member.go @@ -54,7 +54,6 @@ func NewProcWithDefaultFlags(path string) *Proc { } // always expect to use start_desired_verson mode p.Env = append(p.Env, - "ETCD_ALLOW_LEGACY_MODE=true", "ETCD_BINARY_DIR="+binDir, ) return p From 2d081bd3b9d5c016ca0540f1f6657888012c5cf4 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Sun, 1 Feb 2015 14:36:58 -0800 Subject: [PATCH 3/3] migrate: support standby mode upgrade --- migrate/functional/member.go | 5 + migrate/functional/upgrade_test.go | 79 +++++++++ migrate/standby.go | 70 ++++++++ migrate/starter/starter.go | 249 +++++++++++++++++++++-------- 4 files changed, 332 insertions(+), 71 deletions(-) create mode 100644 migrate/standby.go diff --git a/migrate/functional/member.go b/migrate/functional/member.go index 28dc6385d..d88e2fc08 100644 --- a/migrate/functional/member.go +++ b/migrate/functional/member.go @@ -177,6 +177,11 @@ func (p *Proc) Terminate() { type ProcGroup []*Proc +func NewProcInProcGroupWithV1Flags(path string, num int, idx int) *Proc { + pg := NewProcGroupWithV1Flags(path, num) + return pg[idx] +} + func NewProcGroupWithV1Flags(path string, num int) ProcGroup { pg := make([]*Proc, num) pg[0] = NewProcWithDefaultFlags(path) diff --git a/migrate/functional/upgrade_test.go b/migrate/functional/upgrade_test.go index bc67aa4bd..20b05dcfe 100644 --- a/migrate/functional/upgrade_test.go +++ b/migrate/functional/upgrade_test.go @@ -1,6 +1,7 @@ package functional import ( + "bytes" "encoding/json" "fmt" "io/ioutil" @@ -262,6 +263,84 @@ func TestJoinV1ClusterViaDiscovery(t *testing.T) { } } +func TestUpgradeV1Standby(t *testing.T) { + // get v1 standby data dir + pg := NewProcGroupWithV1Flags(v1BinPath, 3) + if err := pg.Start(); err != nil { + t.Fatalf("Start error: %v", err) + } + req, err := http.NewRequest("PUT", pg[0].PeerURL+"/v2/admin/config", bytes.NewBufferString(`{"activeSize":3,"removeDelay":1800,"syncInterval":5}`)) + if err != nil { + t.Fatalf("NewRequest error: %v", err) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("http Do error: %v", err) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("status = %d, want %d", resp.StatusCode, http.StatusOK) + } + + p := NewProcInProcGroupWithV1Flags(v2BinPath, 4, 3) + if err := p.Start(); err != nil { + t.Fatalf("Start error: %v", err) + } + fmt.Println("checking new member is in standby mode...") + mustExist(path.Join(p.DataDir, "standby_info")) + ver, err := checkInternalVersion(p.URL) + if err != nil { + t.Fatalf("checkVersion error: %v", err) + } + if ver != "1" { + t.Errorf("internal version = %s, want %s", ver, "1") + } + + fmt.Println("upgrading the whole cluster...") + cmd := exec.Command(etcdctlBinPath, "upgrade", "--peer-url", pg[0].PeerURL) + if err := cmd.Start(); err != nil { + t.Fatalf("Start error: %v", err) + } + if err := cmd.Wait(); err != nil { + t.Fatalf("Wait error: %v", err) + } + fmt.Println("waiting until peer-mode etcd exits...") + if err := pg.Wait(); err != nil { + t.Fatalf("Wait error: %v", err) + } + fmt.Println("restarting the peer-mode etcd...") + npg := NewProcGroupWithV1Flags(v2BinPath, 3) + npg.InheritDataDir(pg) + npg.CleanUnsuppportedV1Flags() + if err := npg.Start(); err != nil { + t.Fatalf("Start error: %v", err) + } + defer npg.Terminate() + fmt.Println("waiting until standby-mode etcd exits...") + if err := p.Wait(); err != nil { + t.Fatalf("Wait error: %v", err) + } + fmt.Println("restarting the standby-mode etcd...") + np := NewProcInProcGroupWithV1Flags(v2BinPath, 4, 3) + np.SetDataDir(p.DataDir) + np.CleanUnsuppportedV1Flags() + if err := np.Start(); err != nil { + t.Fatalf("Start error: %v", err) + } + defer np.Terminate() + + fmt.Println("checking the new member is in v2 proxy mode...") + ver, err = checkInternalVersion(np.URL) + if err != nil { + t.Fatalf("checkVersion error: %v", err) + } + if ver != "2" { + t.Errorf("internal version = %s, want %s", ver, "1") + } + if _, err := os.Stat(path.Join(np.DataDir, "proxy")); err != nil { + t.Errorf("stat proxy dir error = %v, want nil", err) + } +} + func absPathFromEnv(name string) string { path, err := filepath.Abs(os.Getenv(name)) if err != nil { diff --git a/migrate/standby.go b/migrate/standby.go new file mode 100644 index 000000000..0432945d6 --- /dev/null +++ b/migrate/standby.go @@ -0,0 +1,70 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package migrate + +import ( + "bytes" + "encoding/json" + "fmt" + "os" +) + +type StandbyInfo4 struct { + Running bool + Cluster []*MachineMessage + SyncInterval float64 +} + +// MachineMessage represents information about a peer or standby in the registry. +type MachineMessage struct { + Name string `json:"name"` + State string `json:"state"` + ClientURL string `json:"clientURL"` + PeerURL string `json:"peerURL"` +} + +func (si *StandbyInfo4) ClientURLs() []string { + var urls []string + for _, m := range si.Cluster { + urls = append(urls, m.ClientURL) + } + return urls +} + +func (si *StandbyInfo4) InitialCluster() string { + b := &bytes.Buffer{} + first := true + for _, m := range si.Cluster { + if !first { + fmt.Fprintf(b, ",") + } + first = false + fmt.Fprintf(b, "%s=%s", m.Name, m.PeerURL) + } + return b.String() +} + +func DecodeStandbyInfo4FromFile(path string) (*StandbyInfo4, error) { + var info StandbyInfo4 + file, err := os.OpenFile(path, os.O_RDONLY, 0600) + if err != nil { + return nil, err + } + defer file.Close() + if err = json.NewDecoder(file).Decode(&info); err != nil { + return nil, err + } + return &info, nil +} diff --git a/migrate/starter/starter.go b/migrate/starter/starter.go index 952f52ae4..1bfde016f 100644 --- a/migrate/starter/starter.go +++ b/migrate/starter/starter.go @@ -30,8 +30,9 @@ import ( "github.com/coreos/etcd/client" "github.com/coreos/etcd/etcdmain" "github.com/coreos/etcd/migrate" + "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/flags" - "github.com/coreos/etcd/wal" + "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" ) @@ -41,80 +42,147 @@ type version string const ( internalV1 version = "1" internalV2 version = "2" + internalV2Proxy version = "2.proxy" internalUnknown version = "unknown" + v0_4 version = "v0.4" + v2_0 version = "v2.0" + v2_0Proxy version = "v2.0 proxy" + empty version = "empty" + unknown version = "unknown" + defaultInternalV1etcdBinaryDir = "/usr/libexec/etcd/versions/" ) +var ( + v2SpecialFlags = []string{ + "initial-cluster", + "listen-peer-urls", + "listen-client-urls", + "proxy", + } +) + func StartDesiredVersion(args []string) { - switch checkStartVersion(args) { + fs, err := parseConfig(args) + if err != nil { + return + } + + ver := checkInternalVersion(fs) + log.Printf("starter: start etcd version %s", ver) + switch ver { case internalV1: startInternalV1() case internalV2: + case internalV2Proxy: + if _, err := os.Stat(standbyInfo4(fs.Lookup("data-dir").Value.String())); err != nil { + log.Printf("starter: Detect standby_info file exists, and add --proxy=on flag to ensure it runs in v2.0 proxy mode.") + log.Printf("starter: Before removing v0.4 data, --proxy=on flag MUST be added.") + } + // append proxy flag to args to trigger proxy mode + os.Args = append(os.Args, "-proxy=on") default: - log.Panicf("migrate: unhandled start version") + log.Panicf("starter: unhandled start version") } } -func checkStartVersion(args []string) version { - fs, err := parseConfig(args) - if err != nil { - return internalV2 - } +func checkInternalVersion(fs *flag.FlagSet) version { // If it uses 2.0 env var explicitly, start 2.0 - if fs.Lookup("initial-cluster").Value.String() != "" { - return internalV2 + for _, name := range v2SpecialFlags { + if fs.Lookup(name).Value.String() != "" { + return internalV2 + } } dataDir := fs.Lookup("data-dir").Value.String() if dataDir == "" { - log.Fatalf("migrate: please set ETCD_DATA_DIR for etcd") + log.Fatalf("starter: please set --data-dir or ETCD_DATA_DIR for etcd") } // check the data directory - walVersion, err := wal.DetectVersion(dataDir) + ver, err := checkVersion(dataDir) if err != nil { - log.Fatalf("migrate: failed to detect etcd version in %v: %v", dataDir, err) + log.Fatalf("starter: failed to detect etcd version in %v: %v", dataDir, err) } - log.Printf("migrate: detect etcd version %s in %s", walVersion, dataDir) - switch walVersion { - case wal.WALv0_5: + log.Printf("starter: detect etcd version %s in %s", ver, dataDir) + switch ver { + case v2_0: return internalV2 - case wal.WALv0_4: - // TODO: standby case - // if it is standby guy: - // print out detect standby mode - // go to WALNotExist case - // if want to start with 2.0: - // remove old data dir to avoid auto migration - // try to let it fallback? or use local proxy file? - ver, err := checkStartVersionByDataDir4(dataDir) + case v2_0Proxy: + return internalV2Proxy + case v0_4: + standbyInfo, err := migrate.DecodeStandbyInfo4FromFile(standbyInfo4(dataDir)) + if err != nil && !os.IsNotExist(err) { + log.Fatalf("starter: failed to decode standbyInfo in %v: %v", dataDir, err) + } + inStandbyMode := standbyInfo != nil && standbyInfo.Running + if inStandbyMode { + ver, err := checkInternalVersionByClientURLs(standbyInfo.ClientURLs(), clientTLSInfo(fs)) + if err != nil { + log.Printf("starter: failed to check start version through peers: %v", err) + return internalV1 + } + if ver == internalV2 { + os.Args = append(os.Args, "-initial-cluster", standbyInfo.InitialCluster()) + return internalV2Proxy + } + return ver + } + ver, err := checkInternalVersionByDataDir4(dataDir) if err != nil { - log.Fatalf("migrate: failed to check start version in %v: %v", dataDir, err) + log.Fatalf("starter: failed to check start version in %v: %v", dataDir, err) } return ver - case wal.WALUnknown: - log.Fatalf("migrate: unknown etcd version in %v", dataDir) - case wal.WALNotExist: + case empty: discovery := fs.Lookup("discovery").Value.String() - peers := trimSplit(fs.Lookup("peers").Value.String(), ",") - peerTLSInfo := &TLSInfo{ - CAFile: fs.Lookup("peer-ca-file").Value.String(), - CertFile: fs.Lookup("peer-cert-file").Value.String(), - KeyFile: fs.Lookup("peer-key-file").Value.String(), - } - ver, err := checkStartVersionByMembers(discovery, peers, peerTLSInfo) + dpeers, err := getPeersFromDiscoveryURL(discovery) if err != nil { - log.Printf("migrate: failed to check start version through peers: %v", err) - break + log.Printf("starter: failed to get peers from discovery %s: %v", discovery, err) + } + peerStr := fs.Lookup("peers").Value.String() + ppeers := getPeersFromPeersFlag(peerStr, peerTLSInfo(fs)) + + urls := getClientURLsByPeerURLs(append(dpeers, ppeers...), peerTLSInfo(fs)) + ver, err := checkInternalVersionByClientURLs(urls, clientTLSInfo(fs)) + if err != nil { + log.Printf("starter: failed to check start version through peers: %v", err) + return internalV2 } return ver - default: - log.Panicf("migrate: unhandled etcd version in %v", dataDir) } - return internalV2 + // never reach here + log.Panicf("starter: unhandled etcd version in %v", dataDir) + return internalUnknown } -func checkStartVersionByDataDir4(dataDir string) (version, error) { +func checkVersion(dataDir string) (version, error) { + names, err := fileutil.ReadDir(dataDir) + if err != nil { + if os.IsNotExist(err) { + err = nil + } + return empty, err + } + if len(names) == 0 { + return empty, nil + } + nameSet := types.NewUnsafeSet(names...) + if nameSet.ContainsAll([]string{"member"}) { + return v2_0, nil + } + if nameSet.ContainsAll([]string{"proxy"}) { + return v2_0Proxy, nil + } + if nameSet.ContainsAll([]string{"snapshot", "conf", "log"}) { + return v0_4, nil + } + if nameSet.ContainsAll([]string{"standby_info"}) { + return v0_4, nil + } + return unknown, fmt.Errorf("failed to check version") +} + +func checkInternalVersionByDataDir4(dataDir string) (version, error) { // check v0.4 snapshot snap4, err := migrate.DecodeLatestSnapshot4FromDir(snapDir4(dataDir)) if err != nil { @@ -153,51 +221,50 @@ func checkStartVersionByDataDir4(dataDir string) (version, error) { return internalV1, nil } -func checkStartVersionByMembers(discoverURL string, peers []string, tls *TLSInfo) (version, error) { - tr := &http.Transport{} - if tls.Scheme() == "https" { - tlsConfig, err := tls.ClientConfig() - if err != nil { - return internalUnknown, err - } - tr.TLSClientConfig = tlsConfig - } - c := &http.Client{Transport: tr} - - possiblePeers, err := getPeersFromDiscoveryURL(discoverURL) +func getClientURLsByPeerURLs(peers []string, tls *TLSInfo) []string { + c, err := newDefaultClient(tls) if err != nil { - return internalUnknown, err + log.Printf("starter: new client error: %v", err) + return nil } - for _, p := range peers { - possiblePeers = append(possiblePeers, tls.Scheme()+"://"+p) - } - - for _, p := range possiblePeers { - resp, err := c.Get(p + "/etcdURL") + var urls []string + for _, u := range peers { + resp, err := c.Get(u + "/etcdURL") if err != nil { - log.Printf("migrate: failed to get /etcdURL from %s", p) + log.Printf("starter: failed to get /etcdURL from %s", u) continue } b, err := ioutil.ReadAll(resp.Body) if err != nil { - log.Printf("migrate: failed to read body from %s", p) + log.Printf("starter: failed to read body from %s", u) continue } - resp, err = c.Get(string(b) + "/version") + urls = append(urls, string(b)) + } + return urls +} + +func checkInternalVersionByClientURLs(urls []string, tls *TLSInfo) (version, error) { + c, err := newDefaultClient(tls) + if err != nil { + return internalUnknown, err + } + for _, u := range urls { + resp, err := c.Get(u + "/version") if err != nil { - log.Printf("migrate: failed to get /version from %s", p) + log.Printf("starter: failed to get /version from %s", u) continue } - b, err = ioutil.ReadAll(resp.Body) + b, err := ioutil.ReadAll(resp.Body) if err != nil { - log.Printf("migrate: failed to read body from %s", p) + log.Printf("starter: failed to read body from %s", u) continue } var m map[string]string err = json.Unmarshal(b, &m) if err != nil { - log.Printf("migrate: failed to unmarshal body %s from %s", b, p) + log.Printf("starter: failed to unmarshal body %s from %s", b, u) continue } switch m["internalVersion"] { @@ -206,10 +273,10 @@ func checkStartVersionByMembers(discoverURL string, peers []string, tls *TLSInfo case "2": return internalV2, nil default: - log.Printf("migrate: unrecognized internal version %s from %s", m["internalVersion"], p) + log.Printf("starter: unrecognized internal version %s from %s", m["internalVersion"], u) } } - return internalUnknown, fmt.Errorf("failed to get version from peers %v", possiblePeers) + return internalUnknown, fmt.Errorf("failed to get version from urls %v", urls) } func getPeersFromDiscoveryURL(discoverURL string) ([]string, error) { @@ -246,6 +313,14 @@ func getPeersFromDiscoveryURL(discoverURL string) ([]string, error) { return peers, nil } +func getPeersFromPeersFlag(str string, tls *TLSInfo) []string { + peers := trimSplit(str, ",") + for i, p := range peers { + peers[i] = tls.Scheme() + "://" + p + } + return peers +} + func startInternalV1() { p := os.Getenv("ETCD_BINARY_DIR") if p == "" { @@ -254,10 +329,22 @@ func startInternalV1() { p = path.Join(p, "1") err := syscall.Exec(p, os.Args, syscall.Environ()) if err != nil { - log.Fatalf("migrate: failed to execute internal v1 etcd: %v", err) + log.Fatalf("starter: failed to execute internal v1 etcd: %v", err) } } +func newDefaultClient(tls *TLSInfo) (*http.Client, error) { + tr := &http.Transport{} + if tls.Scheme() == "https" { + tlsConfig, err := tls.ClientConfig() + if err != nil { + return nil, err + } + tr.TLSClientConfig = tlsConfig + } + return &http.Client{Transport: tr}, nil +} + type value struct { s string } @@ -285,6 +372,22 @@ func parseConfig(args []string) (*flag.FlagSet, error) { return fs, nil } +func clientTLSInfo(fs *flag.FlagSet) *TLSInfo { + return &TLSInfo{ + CAFile: fs.Lookup("ca-file").Value.String(), + CertFile: fs.Lookup("cert-file").Value.String(), + KeyFile: fs.Lookup("key-file").Value.String(), + } +} + +func peerTLSInfo(fs *flag.FlagSet) *TLSInfo { + return &TLSInfo{ + CAFile: fs.Lookup("peer-ca-file").Value.String(), + CertFile: fs.Lookup("peer-cert-file").Value.String(), + KeyFile: fs.Lookup("peer-key-file").Value.String(), + } +} + func snapDir4(dataDir string) string { return path.Join(dataDir, "snapshot") } @@ -293,6 +396,10 @@ func logFile4(dataDir string) string { return path.Join(dataDir, "log") } +func standbyInfo4(dataDir string) string { + return path.Join(dataDir, "standby_info") +} + func trimSplit(s, sep string) []string { trimmed := strings.Split(s, sep) for i := range trimmed {