From ec5a6e8bebb1b358aed18a7527b0261fa7cd177a Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 22 Jan 2015 13:40:45 -0800 Subject: [PATCH] migrate: support start desired version --- etcdmain/config.go | 12 +- main.go | 14 ++ migrate/functional/README.md | 27 +++ migrate/functional/member.go | 256 ++++++++++++++++++++++++ migrate/functional/upgrade_test.go | 292 ++++++++++++++++++++++++++++ migrate/snapshot.go | 6 +- migrate/starter/starter.go | 302 +++++++++++++++++++++++++++++ migrate/starter/tls_info.go | 120 ++++++++++++ pkg/flags/flag.go | 10 + 9 files changed, 1034 insertions(+), 5 deletions(-) create mode 100644 migrate/functional/README.md create mode 100644 migrate/functional/member.go create mode 100644 migrate/functional/upgrade_test.go create mode 100644 migrate/starter/starter.go create mode 100644 migrate/starter/tls_info.go diff --git a/etcdmain/config.go b/etcdmain/config.go index 0fcd21d7a..93d4dbc1d 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -231,7 +231,11 @@ func (cfg *config) Parse(arguments []string) error { return ErrConflictBootstrapFlags } - cfg.lpurls, err = flags.URLsFromFlags(cfg.FlagSet, "listen-peer-urls", "peer-bind-addr", cfg.peerTLSInfo) + peerBindAddrFlag := "peer-bind-addr" + if !flags.IsSet(cfg.FlagSet, peerBindAddrFlag) { + peerBindAddrFlag = "peer-addr" + } + cfg.lpurls, err = flags.URLsFromFlags(cfg.FlagSet, "listen-peer-urls", peerBindAddrFlag, cfg.peerTLSInfo) if err != nil { return err } @@ -239,7 +243,11 @@ func (cfg *config) Parse(arguments []string) error { if err != nil { return err } - cfg.lcurls, err = flags.URLsFromFlags(cfg.FlagSet, "listen-client-urls", "bind-addr", cfg.clientTLSInfo) + bindAddrFlag := "bind-addr" + if !flags.IsSet(cfg.FlagSet, bindAddrFlag) { + bindAddrFlag = "addr" + } + cfg.lcurls, err = flags.URLsFromFlags(cfg.FlagSet, "listen-client-urls", bindAddrFlag, cfg.clientTLSInfo) if err != nil { return err } diff --git a/main.go b/main.go index 232ec5472..7983de712 100644 --- a/main.go +++ b/main.go @@ -24,9 +24,23 @@ package main import ( + "log" + "os" + "strconv" + "github.com/coreos/etcd/etcdmain" + "github.com/coreos/etcd/migrate/starter" ) func main() { + if str := os.Getenv("ETCD_ALLOW_LEGACY_MODE"); str != "" { + v, err := strconv.ParseBool(str) + if err != nil { + log.Fatalf("failed to parse ETCD_ALLOW_LEGACY_MODE=%s as bool", str) + } + if v { + starter.StartDesiredVersion(os.Args[1:]) + } + } etcdmain.Main() } diff --git a/migrate/functional/README.md b/migrate/functional/README.md new file mode 100644 index 000000000..421fc3cff --- /dev/null +++ b/migrate/functional/README.md @@ -0,0 +1,27 @@ + +etcd migration functional tests +===== + +This functional test suite deploys a etcd cluster using processes, and asserts etcd is functioning properly. + +Dependencies +------------ + +The test suite can only be run in linux system. It's recommended to run this in a virtual machine environment on CoreOS (e.g. using coreos-vagrant). The only dependency for the tests not provided on the CoreOS image is go. + +Usage +----- + +Set environment variables point to the respective binaries that are used to drive the actual tests: + +``` +$ export ETCD_V1_BIN=/path/to/v1_etcd +$ export ETCD_V2_BIN=/path/to/v2_etcd +$ export ETCDCTL_BIN=/path/to/etcdctl +``` + +Then the tests can be run: + +``` +$ go test github.com/coreos/etcd/migrate/functional +``` diff --git a/migrate/functional/member.go b/migrate/functional/member.go new file mode 100644 index 000000000..0b9545514 --- /dev/null +++ b/migrate/functional/member.go @@ -0,0 +1,256 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package functional + +import ( + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + "os/exec" + "strconv" + "strings" + "time" +) + +type Proc struct { + *exec.Cmd + Name string + DataDir string + URL string + PeerURL string + + stderr io.ReadCloser +} + +func NewProcWithDefaultFlags(path string) *Proc { + var args []string + dir, err := ioutil.TempDir(os.TempDir(), "etcd") + if err != nil { + fmt.Printf("unexpected TempDir error: %v", err) + os.Exit(1) + } + args = append(args, "--data-dir="+dir) + args = append(args, "--name=default") + p := &Proc{ + Cmd: exec.Command(path, args...), + Name: "default", + DataDir: dir, + URL: "http://127.0.0.1:4001", + PeerURL: "http://127.0.0.1:7001", + } + // always expect to use start_desired_verson mode + p.Env = append(p.Env, + "ETCD_ALLOW_LEGACY_MODE=true", + "ETCD_BINARY_DIR="+binDir, + ) + return p +} + +func NewProcWithV1Flags(path string) *Proc { + p := NewProcWithDefaultFlags(path) + p.SetV1PeerAddr("127.0.0.1:7001") + return p +} + +func NewProcWithV2Flags(path string) *Proc { + p := NewProcWithDefaultFlags(path) + p.SetV2PeerURL("http://127.0.0.1:7001") + return p +} + +func (p *Proc) SetV2PeerURL(url string) { + p.Args = append(p.Args, + "-listen-peer-urls="+url, + "-initial-advertise-peer-urls="+url, + "-initial-cluster", + p.Name+"="+url, + ) + p.PeerURL = url +} + +func (p *Proc) SetV1PeerAddr(addr string) { + p.Args = append(p.Args, + "-peer-addr="+addr, + ) + p.PeerURL = "http://" + addr +} + +func (p *Proc) SetV1Addr(addr string) { + p.Args = append(p.Args, + "-addr="+addr, + ) + p.URL = "http://" + addr +} + +func (p *Proc) SetV1Peers(peers []string) { + p.Args = append(p.Args, + "-peers="+strings.Join(peers, ","), + ) +} + +func (p *Proc) SetName(name string) { + p.Args = append(p.Args, + "-name="+name, + ) + p.Name = name +} + +func (p *Proc) SetDataDir(dataDir string) { + p.Args = append(p.Args, + "-data-dir="+dataDir, + ) + p.DataDir = dataDir +} + +func (p *Proc) SetSnapCount(cnt int) { + p.Args = append(p.Args, + "-snapshot-count="+strconv.Itoa(cnt), + ) +} + +func (p *Proc) SetDiscovery(url string) { + p.Args = append(p.Args, + "-discovery="+url, + ) +} + +func (p *Proc) CleanUnsuppportedV1Flags() { + var args []string + for _, arg := range p.Args { + if !strings.HasPrefix(arg, "-peers=") { + args = append(args, arg) + } + } + p.Args = args +} + +func (p *Proc) Start() error { + var err error + if p.stderr, err = p.Cmd.StderrPipe(); err != nil { + return err + } + if err := p.Cmd.Start(); err != nil { + return err + } + for k := 0; k < 50; k++ { + _, err := http.Get(p.URL) + if err == nil { + return nil + } + time.Sleep(100 * time.Millisecond) + } + errMsg, _ := ioutil.ReadAll(p.stderr) + return fmt.Errorf("instance %s failed to be available after a long time: %s", p.Name, errMsg) +} + +func (p *Proc) Stop() { + if err := p.Cmd.Process.Kill(); err != nil { + fmt.Printf("Process Kill error: %v", err) + return + } + ioutil.ReadAll(p.stderr) + p.Cmd.Wait() +} + +func (p *Proc) Restart() error { + p.Stop() + return p.Start() +} + +func (p *Proc) Terminate() { + p.Stop() + os.RemoveAll(p.DataDir) +} + +type ProcGroup []*Proc + +func NewProcGroupWithV1Flags(path string, num int) ProcGroup { + pg := make([]*Proc, num) + pg[0] = NewProcWithDefaultFlags(path) + pg[0].SetName("etcd0") + for i := 1; i < num; i++ { + pg[i] = NewProcWithDefaultFlags(path) + pg[i].SetName(fmt.Sprintf("etcd%d", i)) + pg[i].SetV1PeerAddr(fmt.Sprintf("127.0.0.1:%d", 7001+i)) + pg[i].SetV1Addr(fmt.Sprintf("127.0.0.1:%d", 4001+i)) + pg[i].SetV1Peers([]string{"127.0.0.1:7001"}) + } + return pg +} + +func NewProcGroupViaDiscoveryWithV1Flags(path string, num int, url string) ProcGroup { + pg := make([]*Proc, num) + for i := range pg { + pg[i] = NewProcWithDefaultFlags(path) + pg[i].SetName(fmt.Sprintf("etcd%d", i)) + pg[i].SetDiscovery(url) + pg[i].SetV1PeerAddr(fmt.Sprintf("127.0.0.1:%d", 7001+i)) + pg[i].SetV1Addr(fmt.Sprintf("127.0.0.1:%d", 4001+i)) + } + return pg +} + +func (pg ProcGroup) InheritDataDir(opg ProcGroup) { + for i := range pg { + pg[i].SetDataDir(opg[i].DataDir) + } +} + +func (pg ProcGroup) SetSnapCount(count int) { + for i := range pg { + pg[i].SetSnapCount(count) + } +} + +func (pg ProcGroup) CleanUnsuppportedV1Flags() { + for _, p := range pg { + p.CleanUnsuppportedV1Flags() + } +} + +func (pg ProcGroup) Start() error { + for _, p := range pg { + if err := p.Start(); err != nil { + return err + } + } + // leave time for instances to sync and write some entries into disk + // TODO: use more reliable method + time.Sleep(time.Second) + return nil +} + +func (pg ProcGroup) Wait() error { + for _, p := range pg { + if err := p.Wait(); err != nil { + return err + } + } + return nil +} + +func (pg ProcGroup) Stop() { + for _, p := range pg { + p.Stop() + } +} + +func (pg ProcGroup) Terminate() { + for _, p := range pg { + p.Terminate() + } +} diff --git a/migrate/functional/upgrade_test.go b/migrate/functional/upgrade_test.go new file mode 100644 index 000000000..bc67aa4bd --- /dev/null +++ b/migrate/functional/upgrade_test.go @@ -0,0 +1,292 @@ +package functional + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "os" + "os/exec" + "path" + "path/filepath" + "testing" +) + +var ( + binDir = ".versions" + v1BinPath = path.Join(binDir, "1") + v2BinPath = path.Join(binDir, "2") + etcdctlBinPath string +) + +func init() { + os.RemoveAll(binDir) + if err := os.Mkdir(binDir, 0700); err != nil { + fmt.Printf("unexpected Mkdir error: %v\n", err) + os.Exit(1) + } + if err := os.Symlink(absPathFromEnv("ETCD_V1_BIN"), v1BinPath); err != nil { + fmt.Printf("unexpected Symlink error: %v\n", err) + os.Exit(1) + } + if err := os.Symlink(absPathFromEnv("ETCD_V2_BIN"), v2BinPath); err != nil { + fmt.Printf("unexpected Symlink error: %v\n", err) + os.Exit(1) + } + etcdctlBinPath = os.Getenv("ETCDCTL_BIN") + + mustExist(v1BinPath) + mustExist(v2BinPath) + mustExist(etcdctlBinPath) +} + +func TestStartNewMember(t *testing.T) { + tests := []*Proc{ + NewProcWithDefaultFlags(v2BinPath), + NewProcWithV1Flags(v2BinPath), + NewProcWithV2Flags(v2BinPath), + } + for i, tt := range tests { + if err := tt.Start(); err != nil { + t.Fatalf("#%d: Start error: %v", i, err) + } + defer tt.Terminate() + + ver, err := checkInternalVersion(tt.URL) + if err != nil { + t.Fatalf("#%d: checkVersion error: %v", i, err) + } + if ver != "2" { + t.Errorf("#%d: internal version = %s, want %s", i, ver, "2") + } + } +} + +func TestStartV2Member(t *testing.T) { + tests := []*Proc{ + NewProcWithDefaultFlags(v2BinPath), + NewProcWithV1Flags(v2BinPath), + NewProcWithV2Flags(v2BinPath), + } + for i, tt := range tests { + // get v2 data dir + p := NewProcWithDefaultFlags(v2BinPath) + if err := p.Start(); err != nil { + t.Fatalf("#%d: Start error: %v", i, err) + } + p.Stop() + tt.SetDataDir(p.DataDir) + if err := tt.Start(); err != nil { + t.Fatalf("#%d: Start error: %v", i, err) + } + defer tt.Terminate() + + ver, err := checkInternalVersion(tt.URL) + if err != nil { + t.Fatalf("#%d: checkVersion error: %v", i, err) + } + if ver != "2" { + t.Errorf("#%d: internal version = %s, want %s", i, ver, "2") + } + } +} + +func TestStartV1Member(t *testing.T) { + tests := []*Proc{ + NewProcWithDefaultFlags(v2BinPath), + NewProcWithV1Flags(v2BinPath), + NewProcWithV2Flags(v2BinPath), + } + for i, tt := range tests { + // get v1 data dir + p := NewProcWithDefaultFlags(v1BinPath) + if err := p.Start(); err != nil { + t.Fatalf("#%d: Start error: %v", i, err) + } + p.Stop() + tt.SetDataDir(p.DataDir) + if err := tt.Start(); err != nil { + t.Fatalf("#%d: Start error: %v", i, err) + } + defer tt.Terminate() + + ver, err := checkInternalVersion(tt.URL) + if err != nil { + t.Fatalf("#%d: checkVersion error: %v", i, err) + } + if ver != "1" { + t.Errorf("#%d: internal version = %s, want %s", i, ver, "1") + } + } +} + +func TestUpgradeV1Cluster(t *testing.T) { + // get v2-desired v1 data dir + pg := NewProcGroupWithV1Flags(v1BinPath, 3) + if err := pg.Start(); err != nil { + t.Fatalf("Start error: %v", err) + } + cmd := exec.Command(etcdctlBinPath, "upgrade", "--peer-url", pg[1].PeerURL) + if err := cmd.Start(); err != nil { + t.Fatalf("Start error: %v", err) + } + if err := cmd.Wait(); err != nil { + t.Fatalf("Wait error: %v", err) + } + t.Logf("wait until etcd exits...") + if err := pg.Wait(); err != nil { + t.Fatalf("Wait error: %v", err) + } + + npg := NewProcGroupWithV1Flags(v2BinPath, 3) + npg.InheritDataDir(pg) + npg.CleanUnsuppportedV1Flags() + if err := npg.Start(); err != nil { + t.Fatalf("Start error: %v", err) + } + defer npg.Terminate() + + for _, p := range npg { + ver, err := checkInternalVersion(p.URL) + if err != nil { + t.Fatalf("checkVersion error: %v", err) + } + if ver != "2" { + t.Errorf("internal version = %s, want %s", ver, "2") + } + } +} + +func TestUpgradeV1SnapshotedCluster(t *testing.T) { + // get v2-desired v1 data dir + pg := NewProcGroupWithV1Flags(v1BinPath, 3) + pg.SetSnapCount(10) + if err := pg.Start(); err != nil { + t.Fatalf("Start error: %v", err) + } + cmd := exec.Command(etcdctlBinPath, "upgrade", "--peer-url", pg[1].PeerURL) + if err := cmd.Start(); err != nil { + t.Fatalf("Start error: %v", err) + } + if err := cmd.Wait(); err != nil { + t.Fatalf("Wait error: %v", err) + } + t.Logf("wait until etcd exits...") + if err := pg.Wait(); err != nil { + t.Fatalf("Wait error: %v", err) + } + for _, p := range pg { + // check it has taken snapshot + fis, err := ioutil.ReadDir(path.Join(p.DataDir, "snapshot")) + if err != nil { + t.Fatalf("unexpected ReadDir error: %v", err) + } + if len(fis) == 0 { + t.Fatalf("unexpected no-snapshot data dir") + } + } + + npg := NewProcGroupWithV1Flags(v2BinPath, 3) + npg.InheritDataDir(pg) + npg.CleanUnsuppportedV1Flags() + if err := npg.Start(); err != nil { + t.Fatalf("Start error: %v", err) + } + defer npg.Terminate() + + for _, p := range npg { + ver, err := checkInternalVersion(p.URL) + if err != nil { + t.Fatalf("checkVersion error: %v", err) + } + if ver != "2" { + t.Errorf("internal version = %s, want %s", ver, "2") + } + } +} + +func TestJoinV1Cluster(t *testing.T) { + pg := NewProcGroupWithV1Flags(v1BinPath, 1) + if err := pg.Start(); err != nil { + t.Fatalf("Start error: %v", err) + } + pg.Stop() + npg := NewProcGroupWithV1Flags(v2BinPath, 3) + npg[0].SetDataDir(pg[0].DataDir) + if err := npg.Start(); err != nil { + t.Fatalf("Start error: %v", err) + } + defer npg.Terminate() + + for _, p := range npg { + ver, err := checkInternalVersion(p.URL) + if err != nil { + t.Fatalf("checkVersion error: %v", err) + } + if ver != "1" { + t.Errorf("internal version = %s, want %s", ver, "1") + } + } +} + +func TestJoinV1ClusterViaDiscovery(t *testing.T) { + dp := NewProcWithDefaultFlags(v1BinPath) + dp.SetV1Addr("127.0.0.1:5001") + dp.SetV1PeerAddr("127.0.0.1:8001") + if err := dp.Start(); err != nil { + t.Fatalf("Start error: %v", err) + } + defer dp.Terminate() + + durl := "http://127.0.0.1:5001/v2/keys/cluster/" + pg := NewProcGroupViaDiscoveryWithV1Flags(v1BinPath, 1, durl) + if err := pg.Start(); err != nil { + t.Fatalf("Start error: %v", err) + } + pg.Stop() + npg := NewProcGroupViaDiscoveryWithV1Flags(v2BinPath, 3, durl) + npg[0].SetDataDir(pg[0].DataDir) + if err := npg.Start(); err != nil { + t.Fatalf("Start error: %v", err) + } + defer npg.Terminate() + + for _, p := range npg { + ver, err := checkInternalVersion(p.URL) + if err != nil { + t.Fatalf("checkVersion error: %v", err) + } + if ver != "1" { + t.Errorf("internal version = %s, want %s", ver, "1") + } + } +} + +func absPathFromEnv(name string) string { + path, err := filepath.Abs(os.Getenv(name)) + if err != nil { + fmt.Printf("unexpected Abs error: %v\n", err) + } + return path +} + +func mustExist(path string) { + if _, err := os.Stat(path); err != nil { + fmt.Printf("%v\n", err) + os.Exit(1) + } +} + +func checkInternalVersion(url string) (string, error) { + resp, err := http.Get(url + "/version") + if err != nil { + return "", err + } + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", err + } + var m map[string]string + err = json.Unmarshal(b, &m) + return m["internalVersion"], err +} diff --git a/migrate/snapshot.go b/migrate/snapshot.go index cc071d643..cadb3f560 100644 --- a/migrate/snapshot.go +++ b/migrate/snapshot.go @@ -43,7 +43,7 @@ type Snapshot4 struct { } `json:"peers"` } -type sstore struct { +type Store4 struct { Root *node CurrentIndex uint64 CurrentVersion int @@ -165,7 +165,7 @@ func mangleRoot(n *node) *node { } func (s *Snapshot4) GetNodesFromStore() map[string]uint64 { - st := &sstore{} + st := &Store4{} if err := json.Unmarshal(s.State, st); err != nil { log.Fatal("Couldn't unmarshal snapshot") } @@ -174,7 +174,7 @@ func (s *Snapshot4) GetNodesFromStore() map[string]uint64 { } func (s *Snapshot4) Snapshot2() *raftpb.Snapshot { - st := &sstore{} + st := &Store4{} if err := json.Unmarshal(s.State, st); err != nil { log.Fatal("Couldn't unmarshal snapshot") } diff --git a/migrate/starter/starter.go b/migrate/starter/starter.go new file mode 100644 index 000000000..952f52ae4 --- /dev/null +++ b/migrate/starter/starter.go @@ -0,0 +1,302 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package starter + +import ( + "encoding/json" + "flag" + "fmt" + "io/ioutil" + "log" + "net/http" + "net/url" + "os" + "path" + "strings" + "syscall" + + "github.com/coreos/etcd/client" + "github.com/coreos/etcd/etcdmain" + "github.com/coreos/etcd/migrate" + "github.com/coreos/etcd/pkg/flags" + "github.com/coreos/etcd/wal" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" +) + +type version string + +const ( + internalV1 version = "1" + internalV2 version = "2" + internalUnknown version = "unknown" + + defaultInternalV1etcdBinaryDir = "/usr/libexec/etcd/versions/" +) + +func StartDesiredVersion(args []string) { + switch checkStartVersion(args) { + case internalV1: + startInternalV1() + case internalV2: + default: + log.Panicf("migrate: unhandled start version") + } +} + +func checkStartVersion(args []string) version { + fs, err := parseConfig(args) + if err != nil { + return internalV2 + } + // If it uses 2.0 env var explicitly, start 2.0 + if fs.Lookup("initial-cluster").Value.String() != "" { + return internalV2 + } + + dataDir := fs.Lookup("data-dir").Value.String() + if dataDir == "" { + log.Fatalf("migrate: please set ETCD_DATA_DIR for etcd") + } + // check the data directory + walVersion, err := wal.DetectVersion(dataDir) + if err != nil { + log.Fatalf("migrate: failed to detect etcd version in %v: %v", dataDir, err) + } + log.Printf("migrate: detect etcd version %s in %s", walVersion, dataDir) + switch walVersion { + case wal.WALv0_5: + return internalV2 + case wal.WALv0_4: + // TODO: standby case + // if it is standby guy: + // print out detect standby mode + // go to WALNotExist case + // if want to start with 2.0: + // remove old data dir to avoid auto migration + // try to let it fallback? or use local proxy file? + ver, err := checkStartVersionByDataDir4(dataDir) + if err != nil { + log.Fatalf("migrate: failed to check start version in %v: %v", dataDir, err) + } + return ver + case wal.WALUnknown: + log.Fatalf("migrate: unknown etcd version in %v", dataDir) + case wal.WALNotExist: + discovery := fs.Lookup("discovery").Value.String() + peers := trimSplit(fs.Lookup("peers").Value.String(), ",") + peerTLSInfo := &TLSInfo{ + CAFile: fs.Lookup("peer-ca-file").Value.String(), + CertFile: fs.Lookup("peer-cert-file").Value.String(), + KeyFile: fs.Lookup("peer-key-file").Value.String(), + } + ver, err := checkStartVersionByMembers(discovery, peers, peerTLSInfo) + if err != nil { + log.Printf("migrate: failed to check start version through peers: %v", err) + break + } + return ver + default: + log.Panicf("migrate: unhandled etcd version in %v", dataDir) + } + return internalV2 +} + +func checkStartVersionByDataDir4(dataDir string) (version, error) { + // check v0.4 snapshot + snap4, err := migrate.DecodeLatestSnapshot4FromDir(snapDir4(dataDir)) + if err != nil { + return internalUnknown, err + } + if snap4 != nil { + st := &migrate.Store4{} + if err := json.Unmarshal(snap4.State, st); err != nil { + return internalUnknown, err + } + dir := st.Root.Children["_etcd"] + n, ok := dir.Children["next-internal-version"] + if ok && n.Value == "2" { + return internalV2, nil + } + } + + // check v0.4 log + ents4, err := migrate.DecodeLog4FromFile(logFile4(dataDir)) + if err != nil { + return internalUnknown, err + } + for _, e := range ents4 { + cmd, err := migrate.NewCommand4(e.GetCommandName(), e.GetCommand(), nil) + if err != nil { + return internalUnknown, err + } + setcmd, ok := cmd.(*migrate.SetCommand) + if !ok { + continue + } + if setcmd.Key == "/_etcd/next-internal-version" && setcmd.Value == "2" { + return internalV2, nil + } + } + return internalV1, nil +} + +func checkStartVersionByMembers(discoverURL string, peers []string, tls *TLSInfo) (version, error) { + tr := &http.Transport{} + if tls.Scheme() == "https" { + tlsConfig, err := tls.ClientConfig() + if err != nil { + return internalUnknown, err + } + tr.TLSClientConfig = tlsConfig + } + c := &http.Client{Transport: tr} + + possiblePeers, err := getPeersFromDiscoveryURL(discoverURL) + if err != nil { + return internalUnknown, err + } + for _, p := range peers { + possiblePeers = append(possiblePeers, tls.Scheme()+"://"+p) + } + + for _, p := range possiblePeers { + resp, err := c.Get(p + "/etcdURL") + if err != nil { + log.Printf("migrate: failed to get /etcdURL from %s", p) + continue + } + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Printf("migrate: failed to read body from %s", p) + continue + } + resp, err = c.Get(string(b) + "/version") + if err != nil { + log.Printf("migrate: failed to get /version from %s", p) + continue + } + b, err = ioutil.ReadAll(resp.Body) + if err != nil { + log.Printf("migrate: failed to read body from %s", p) + continue + } + + var m map[string]string + err = json.Unmarshal(b, &m) + if err != nil { + log.Printf("migrate: failed to unmarshal body %s from %s", b, p) + continue + } + switch m["internalVersion"] { + case "1": + return internalV1, nil + case "2": + return internalV2, nil + default: + log.Printf("migrate: unrecognized internal version %s from %s", m["internalVersion"], p) + } + } + return internalUnknown, fmt.Errorf("failed to get version from peers %v", possiblePeers) +} + +func getPeersFromDiscoveryURL(discoverURL string) ([]string, error) { + if discoverURL == "" { + return nil, nil + } + + u, err := url.Parse(discoverURL) + if err != nil { + return nil, err + } + token := u.Path + u.Path = "" + c, err := client.NewHTTPClient(&http.Transport{}, []string{u.String()}) + if err != nil { + return nil, err + } + dc := client.NewDiscoveryKeysAPI(c) + + ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout) + resp, err := dc.Get(ctx, token) + cancel() + if err != nil { + return nil, err + } + peers := make([]string, 0) + // append non-config keys to peers + for _, n := range resp.Node.Nodes { + if g := path.Base(n.Key); g == "_config" || g == "_state" { + continue + } + peers = append(peers, n.Value) + } + return peers, nil +} + +func startInternalV1() { + p := os.Getenv("ETCD_BINARY_DIR") + if p == "" { + p = defaultInternalV1etcdBinaryDir + } + p = path.Join(p, "1") + err := syscall.Exec(p, os.Args, syscall.Environ()) + if err != nil { + log.Fatalf("migrate: failed to execute internal v1 etcd: %v", err) + } +} + +type value struct { + s string +} + +func (v *value) String() string { return v.s } + +func (v *value) Set(s string) error { + v.s = s + return nil +} + +// parseConfig parses out the input config from cmdline arguments and +// environment variables. +func parseConfig(args []string) (*flag.FlagSet, error) { + fs := flag.NewFlagSet("full flagset", flag.ContinueOnError) + etcdmain.NewConfig().VisitAll(func(f *flag.Flag) { + fs.Var(&value{}, f.Name, "") + }) + if err := fs.Parse(args); err != nil { + return nil, err + } + if err := flags.SetFlagsFromEnv(fs); err != nil { + return nil, err + } + return fs, nil +} + +func snapDir4(dataDir string) string { + return path.Join(dataDir, "snapshot") +} + +func logFile4(dataDir string) string { + return path.Join(dataDir, "log") +} + +func trimSplit(s, sep string) []string { + trimmed := strings.Split(s, sep) + for i := range trimmed { + trimmed[i] = strings.TrimSpace(trimmed[i]) + } + return trimmed +} diff --git a/migrate/starter/tls_info.go b/migrate/starter/tls_info.go new file mode 100644 index 000000000..6c2525400 --- /dev/null +++ b/migrate/starter/tls_info.go @@ -0,0 +1,120 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package starter + +import ( + "crypto/tls" + "crypto/x509" + "encoding/pem" + "fmt" + "io/ioutil" +) + +// TLSInfo holds the SSL certificates paths. +type TLSInfo struct { + CertFile string `json:"CertFile"` + KeyFile string `json:"KeyFile"` + CAFile string `json:"CAFile"` +} + +func (info TLSInfo) Scheme() string { + if info.KeyFile != "" && info.CertFile != "" { + return "https" + } else { + return "http" + } +} + +// Generates a tls.Config object for a server from the given files. +func (info TLSInfo) ServerConfig() (*tls.Config, error) { + // Both the key and cert must be present. + if info.KeyFile == "" || info.CertFile == "" { + return nil, fmt.Errorf("KeyFile and CertFile must both be present[key: %v, cert: %v]", info.KeyFile, info.CertFile) + } + + var cfg tls.Config + + tlsCert, err := tls.LoadX509KeyPair(info.CertFile, info.KeyFile) + if err != nil { + return nil, err + } + + cfg.Certificates = []tls.Certificate{tlsCert} + + if info.CAFile != "" { + cfg.ClientAuth = tls.RequireAndVerifyClientCert + cp, err := newCertPool(info.CAFile) + if err != nil { + return nil, err + } + + cfg.RootCAs = cp + cfg.ClientCAs = cp + } else { + cfg.ClientAuth = tls.NoClientCert + } + + return &cfg, nil +} + +// Generates a tls.Config object for a client from the given files. +func (info TLSInfo) ClientConfig() (*tls.Config, error) { + var cfg tls.Config + + if info.KeyFile == "" || info.CertFile == "" { + return &cfg, nil + } + + tlsCert, err := tls.LoadX509KeyPair(info.CertFile, info.KeyFile) + if err != nil { + return nil, err + } + + cfg.Certificates = []tls.Certificate{tlsCert} + + if info.CAFile != "" { + cp, err := newCertPool(info.CAFile) + if err != nil { + return nil, err + } + + cfg.RootCAs = cp + } + + return &cfg, nil +} + +// newCertPool creates x509 certPool with provided CA file +func newCertPool(CAFile string) (*x509.CertPool, error) { + certPool := x509.NewCertPool() + pemByte, err := ioutil.ReadFile(CAFile) + if err != nil { + return nil, err + } + + for { + var block *pem.Block + block, pemByte = pem.Decode(pemByte) + if block == nil { + return certPool, nil + } + cert, err := x509.ParseCertificate(block.Bytes) + if err != nil { + return nil, err + } + certPool.AddCert(cert) + } + +} diff --git a/pkg/flags/flag.go b/pkg/flags/flag.go index fb23c6cba..51e880bcc 100644 --- a/pkg/flags/flag.go +++ b/pkg/flags/flag.go @@ -119,3 +119,13 @@ func URLsFromFlags(fs *flag.FlagSet, urlsFlagName string, addrFlagName string, t return []url.URL(*fs.Lookup(urlsFlagName).Value.(*URLsValue)), nil } + +func IsSet(fs *flag.FlagSet, name string) bool { + set := false + fs.Visit(func(f *flag.Flag) { + if f.Name == name { + set = true + } + }) + return set +}