From c36abeabd1eed63d7ab925bab8d238c691cf000a Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Tue, 7 Oct 2014 18:34:15 -0700 Subject: [PATCH 1/3] etcdserver: export Member.StoreKey --- etcdserver/cluster_store.go | 131 ++++++++++++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 etcdserver/cluster_store.go diff --git a/etcdserver/cluster_store.go b/etcdserver/cluster_store.go new file mode 100644 index 000000000..93f650c90 --- /dev/null +++ b/etcdserver/cluster_store.go @@ -0,0 +1,131 @@ +package etcdserver + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "net/http" + + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/store" +) + +const ( + raftPrefix = "/raft" +) + +type ClusterStore interface { + Add(m Member) + Get() Cluster + Remove(id int64) +} + +type clusterStore struct { + Store store.Store +} + +func NewClusterStore(st store.Store, c Cluster) ClusterStore { + cls := &clusterStore{Store: st} + for _, m := range c { + cls.Add(*m) + } + return cls +} + +// Add puts a new Member into the store. +// A Member with a matching id must not exist. +func (s *clusterStore) Add(m Member) { + b, err := json.Marshal(m) + if err != nil { + log.Panicf("marshal peer info error: %v", err) + } + + if _, err := s.Store.Create(m.StoreKey(), false, string(b), false, store.Permanent); err != nil { + log.Panicf("add member should never fail: %v", err) + } +} + +// TODO(philips): keep the latest copy without going to the store to avoid the +// lock here. +func (s *clusterStore) Get() Cluster { + c := &Cluster{} + e, err := s.Store.Get(machineKVPrefix, true, false) + if err != nil { + log.Panicf("get member should never fail: %v", err) + } + for _, n := range e.Node.Nodes { + m := Member{} + if err := json.Unmarshal([]byte(*n.Value), &m); err != nil { + log.Panicf("unmarshal peer error: %v", err) + } + err := c.Add(m) + if err != nil { + log.Panicf("add member to cluster should never fail: %v", err) + } + } + return *c +} + +// Remove removes a member from the store. +// The given id MUST exist. +func (s *clusterStore) Remove(id int64) { + p := s.Get().FindID(id).StoreKey() + if _, err := s.Store.Delete(p, false, false); err != nil { + log.Panicf("delete peer should never fail: %v", err) + } +} + +func Sender(t *http.Transport, cls ClusterStore) func(msgs []raftpb.Message) { + c := &http.Client{Transport: t} + + return func(msgs []raftpb.Message) { + for _, m := range msgs { + // TODO: reuse go routines + // limit the number of outgoing connections for the same receiver + go send(c, cls, m) + } + } +} + +func send(c *http.Client, cls ClusterStore, m raftpb.Message) { + // TODO (xiangli): reasonable retry logic + for i := 0; i < 3; i++ { + u := cls.Get().Pick(m.To) + if u == "" { + // TODO: unknown peer id.. what do we do? I + // don't think his should ever happen, need to + // look into this further. + log.Printf("etcdhttp: no addr for %d", m.To) + return + } + + u = fmt.Sprintf("%s%s", u, raftPrefix) + + // TODO: don't block. we should be able to have 1000s + // of messages out at a time. + data, err := m.Marshal() + if err != nil { + log.Println("etcdhttp: dropping message:", err) + return // drop bad message + } + if httpPost(c, u, data) { + return // success + } + // TODO: backoff + } +} + +func httpPost(c *http.Client, url string, data []byte) bool { + resp, err := c.Post(url, "application/protobuf", bytes.NewBuffer(data)) + if err != nil { + // TODO: log the error? + return false + } + resp.Body.Close() + if resp.StatusCode != http.StatusNoContent { + // TODO: log the error? + return false + } + return true +} From 5ea1f2d96fc3c4cce2de6f9d6e9cebce27d9ac9b Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Tue, 7 Oct 2014 15:06:51 -0700 Subject: [PATCH 2/3] etcd4: migration from v0.4 -> v0.5 --- etcdserver/cluster_store.go | 1 + etcdserver/member.go | 2 +- migrate/cmd/etcd-dump-logs/main.go | 90 +++++ migrate/cmd/etcd-migrate/main.go | 22 ++ migrate/config.go | 40 +++ migrate/etcd4.go | 98 +++++ migrate/etcd4pb/log_entry.pb.go | 552 +++++++++++++++++++++++++++++ migrate/etcd4pb/log_entry.proto | 22 ++ migrate/fixtures/cmdlog | Bin 0 -> 1100 bytes migrate/log.go | 475 +++++++++++++++++++++++++ migrate/log_test.go | 42 +++ migrate/snapshot.go | 187 ++++++++++ 12 files changed, 1530 insertions(+), 1 deletion(-) create mode 100644 migrate/cmd/etcd-dump-logs/main.go create mode 100644 migrate/cmd/etcd-migrate/main.go create mode 100644 migrate/config.go create mode 100644 migrate/etcd4.go create mode 100644 migrate/etcd4pb/log_entry.pb.go create mode 100644 migrate/etcd4pb/log_entry.proto create mode 100644 migrate/fixtures/cmdlog create mode 100644 migrate/log.go create mode 100644 migrate/log_test.go create mode 100644 migrate/snapshot.go diff --git a/etcdserver/cluster_store.go b/etcdserver/cluster_store.go index 93f650c90..f0c8e278b 100644 --- a/etcdserver/cluster_store.go +++ b/etcdserver/cluster_store.go @@ -59,6 +59,7 @@ func (s *clusterStore) Get() Cluster { if err := json.Unmarshal([]byte(*n.Value), &m); err != nil { log.Panicf("unmarshal peer error: %v", err) } + log.Printf("Found member in cluster: %#v", m) err := c.Add(m) if err != nil { log.Panicf("add member to cluster should never fail: %v", err) diff --git a/etcdserver/member.go b/etcdserver/member.go index 843886d74..58583a012 100644 --- a/etcdserver/member.go +++ b/etcdserver/member.go @@ -47,7 +47,7 @@ type Member struct { Attributes } -// newMember creates a Member without an ID and generates one based on the +// NewMember creates a Member without an ID and generates one based on the // name, peer URLs. This is used for bootstrapping/adding new member. func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member { m := &Member{ diff --git a/migrate/cmd/etcd-dump-logs/main.go b/migrate/cmd/etcd-dump-logs/main.go new file mode 100644 index 000000000..17759e2a3 --- /dev/null +++ b/migrate/cmd/etcd-dump-logs/main.go @@ -0,0 +1,90 @@ +package main + +import ( + "errors" + "flag" + "fmt" + "log" + "path" + + etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/migrate" + raftpb "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/wal" +) + +func walDir5(dataDir string) string { + return path.Join(dataDir, "wal") +} + +func logFile4(dataDir string) string { + return path.Join(dataDir, "log") +} + +func main() { + version := flag.Int("version", 5, "4 or 5") + from := flag.String("data-dir", "", "") + flag.Parse() + + if *from == "" { + log.Fatal("Must provide -from flag") + } + + var ents []raftpb.Entry + var err error + switch *version { + case 4: + ents, err = dump4(*from) + case 5: + ents, err = dump5(*from) + default: + err = errors.New("value of -version flag must be 4 or 5") + } + + if err != nil { + log.Fatalf("Failed decoding log: %v", err) + } + + for _, e := range ents { + msg := fmt.Sprintf("%2d %5d: ", e.Term, e.Index) + switch e.Type { + case raftpb.EntryNormal: + msg = fmt.Sprintf("%s norm", msg) + var r etcdserverpb.Request + if err := r.Unmarshal(e.Data); err != nil { + msg = fmt.Sprintf("%s ???", msg) + } else { + msg = fmt.Sprintf("%s %s %s %s", msg, r.Method, r.Path, r.Val) + } + case raftpb.EntryConfChange: + msg = fmt.Sprintf("%s conf", msg) + } + fmt.Println(msg) + } +} + +func dump4(dataDir string) ([]raftpb.Entry, error) { + lf4 := logFile4(dataDir) + ents, err := migrate.DecodeLog4FromFile(lf4) + if err != nil { + return nil, err + } + + return migrate.Entries4To5(0, ents) +} + +func dump5(dataDir string) ([]raftpb.Entry, error) { + wd5 := walDir5(dataDir) + if !wal.Exist(wd5) { + return nil, fmt.Errorf("No wal exists at %s", wd5) + } + + w, err := wal.OpenAtIndex(wd5, 0) + if err != nil { + return nil, err + } + defer w.Close() + + _, _, ents, err := w.ReadAll() + return ents, err +} diff --git a/migrate/cmd/etcd-migrate/main.go b/migrate/cmd/etcd-migrate/main.go new file mode 100644 index 000000000..ead6cea4f --- /dev/null +++ b/migrate/cmd/etcd-migrate/main.go @@ -0,0 +1,22 @@ +package main + +import ( + "flag" + "log" + + "github.com/coreos/etcd/migrate" +) + +func main() { + from := flag.String("data-dir", "", "etcd v0.4 data-dir") + flag.Parse() + + if *from == "" { + log.Fatal("Must provide -from flag") + } + + err := migrate.Migrate4To5(*from) + if err != nil { + log.Fatalf("Failed migrating data-dir: %v", err) + } +} diff --git a/migrate/config.go b/migrate/config.go new file mode 100644 index 000000000..a77884317 --- /dev/null +++ b/migrate/config.go @@ -0,0 +1,40 @@ +package migrate + +import ( + "encoding/json" + "io/ioutil" + + raftpb "github.com/coreos/etcd/raft/raftpb" +) + +type Config4 struct { + CommitIndex uint64 `json:"commitIndex"` + + //TODO(bcwaldon): is this needed? + //Peers []struct{ + // Name string `json:"name"` + // ConnectionString string `json:"connectionString"` + //} `json:"peers"` +} + +func (c *Config4) HardState5() raftpb.HardState { + return raftpb.HardState{ + Commit: int64(c.CommitIndex), + Term: 0, + Vote: 0, + } +} + +func DecodeConfig4FromFile(cfgPath string) (*Config4, error) { + b, err := ioutil.ReadFile(cfgPath) + if err != nil { + return nil, err + } + + conf := &Config4{} + if err = json.Unmarshal(b, conf); err != nil { + return nil, err + } + + return conf, nil +} diff --git a/migrate/etcd4.go b/migrate/etcd4.go new file mode 100644 index 000000000..436521f49 --- /dev/null +++ b/migrate/etcd4.go @@ -0,0 +1,98 @@ +package migrate + +import ( + "fmt" + "log" + "os" + "path" + + raftpb "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/snap" + "github.com/coreos/etcd/wal" +) + +func snapDir4(dataDir string) string { + return path.Join(dataDir, "snapshot") +} + +func logFile4(dataDir string) string { + return path.Join(dataDir, "log") +} + +func cfgFile4(dataDir string) string { + return path.Join(dataDir, "conf") +} + +func snapDir5(dataDir string) string { + return path.Join(dataDir, "snap") +} + +func walDir5(dataDir string) string { + return path.Join(dataDir, "wal") +} + +func Migrate4To5(dataDir string) error { + // prep new directories + sd5 := snapDir5(dataDir) + if err := os.MkdirAll(sd5, 0700); err != nil { + return fmt.Errorf("failed creating snapshot directory %s: %v", sd5, err) + } + + wd5 := walDir5(dataDir) + w, err := wal.Create(wd5) + if err != nil { + return fmt.Errorf("failed initializing wal at %s: %v", wd5, err) + } + defer w.Close() + + // read v0.4 data + snap4, err := DecodeLatestSnapshot4FromDir(snapDir4(dataDir)) + if err != nil { + return err + } + + cfg4, err := DecodeConfig4FromFile(cfgFile4(dataDir)) + if err != nil { + return err + } + + ents4, err := DecodeLog4FromFile(logFile4(dataDir)) + if err != nil { + return err + } + + // transform v0.4 data + var snap5 *raftpb.Snapshot + if snap4 == nil { + log.Printf("No snapshot found") + } else { + log.Printf("Found snapshot: lastIndex=%d", snap4.LastIndex) + + snap5 = snap4.Snapshot5() + } + + st5 := cfg4.HardState5() + + ents5, err := Entries4To5(uint64(st5.Commit), ents4) + if err != nil { + return err + } + + ents5Len := len(ents5) + log.Printf("Found %d log entries: firstIndex=%d lastIndex=%d", ents5Len, ents5[0].Index, ents5[ents5Len-1].Index) + + // migrate snapshot (if necessary) and logs + if snap5 != nil { + ss := snap.New(sd5) + ss.SaveSnap(*snap5) + log.Printf("Snapshot migration successful") + } + + // explicitly prepend an empty entry as the WAL code expects it + ents5 = append(make([]raftpb.Entry, 1), ents5...) + + w.Save(st5, ents5) + log.Printf("Log migration successful") + + return nil +} diff --git a/migrate/etcd4pb/log_entry.pb.go b/migrate/etcd4pb/log_entry.pb.go new file mode 100644 index 000000000..adab85b9b --- /dev/null +++ b/migrate/etcd4pb/log_entry.pb.go @@ -0,0 +1,552 @@ +// Code generated by protoc-gen-gogo. +// source: log_entry.proto +// DO NOT EDIT! + +package protobuf + +import proto "github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto" +import json "encoding/json" +import math "math" + +// discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo.pb" + +import io "io" +import code_google_com_p_gogoprotobuf_proto "github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto" + +import fmt "fmt" +import strings "strings" +import reflect "reflect" + +import fmt1 "fmt" +import strings1 "strings" +import code_google_com_p_gogoprotobuf_proto1 "github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto" +import sort "sort" +import strconv "strconv" +import reflect1 "reflect" + +import fmt2 "fmt" +import bytes "bytes" + +// Reference proto, json, and math imports to suppress error if they are not otherwise used. +var _ = proto.Marshal +var _ = &json.SyntaxError{} +var _ = math.Inf + +type LogEntry struct { + Index *uint64 `protobuf:"varint,1,req" json:"Index,omitempty"` + Term *uint64 `protobuf:"varint,2,req" json:"Term,omitempty"` + CommandName *string `protobuf:"bytes,3,req" json:"CommandName,omitempty"` + Command []byte `protobuf:"bytes,4,opt" json:"Command,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *LogEntry) Reset() { *m = LogEntry{} } +func (*LogEntry) ProtoMessage() {} + +func (m *LogEntry) GetIndex() uint64 { + if m != nil && m.Index != nil { + return *m.Index + } + return 0 +} + +func (m *LogEntry) GetTerm() uint64 { + if m != nil && m.Term != nil { + return *m.Term + } + return 0 +} + +func (m *LogEntry) GetCommandName() string { + if m != nil && m.CommandName != nil { + return *m.CommandName + } + return "" +} + +func (m *LogEntry) GetCommand() []byte { + if m != nil { + return m.Command + } + return nil +} + +func init() { +} +func (m *LogEntry) Unmarshal(data []byte) error { + l := len(data) + index := 0 + for index < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + var v uint64 + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + v |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Index = &v + case 2: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + var v uint64 + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + v |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Term = &v + case 3: + if wireType != 2 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := index + int(stringLen) + if postIndex > l { + return io.ErrUnexpectedEOF + } + s := string(data[index:postIndex]) + m.CommandName = &s + index = postIndex + case 4: + if wireType != 2 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := index + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Command = append(m.Command, data[index:postIndex]...) + index = postIndex + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + index -= sizeOfWire + skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:]) + if err != nil { + return err + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...) + index += skippy + } + } + return nil +} +func (this *LogEntry) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&LogEntry{`, + `Index:` + valueToStringLogEntry(this.Index) + `,`, + `Term:` + valueToStringLogEntry(this.Term) + `,`, + `CommandName:` + valueToStringLogEntry(this.CommandName) + `,`, + `Command:` + valueToStringLogEntry(this.Command) + `,`, + `XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`, + `}`, + }, "") + return s +} +func valueToStringLogEntry(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *LogEntry) Size() (n int) { + var l int + _ = l + if m.Index != nil { + n += 1 + sovLogEntry(uint64(*m.Index)) + } + if m.Term != nil { + n += 1 + sovLogEntry(uint64(*m.Term)) + } + if m.CommandName != nil { + l = len(*m.CommandName) + n += 1 + l + sovLogEntry(uint64(l)) + } + if m.Command != nil { + l = len(m.Command) + n += 1 + l + sovLogEntry(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovLogEntry(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozLogEntry(x uint64) (n int) { + return sovLogEntry(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func NewPopulatedLogEntry(r randyLogEntry, easy bool) *LogEntry { + this := &LogEntry{} + v1 := uint64(r.Uint32()) + this.Index = &v1 + v2 := uint64(r.Uint32()) + this.Term = &v2 + v3 := randStringLogEntry(r) + this.CommandName = &v3 + if r.Intn(10) != 0 { + v4 := r.Intn(100) + this.Command = make([]byte, v4) + for i := 0; i < v4; i++ { + this.Command[i] = byte(r.Intn(256)) + } + } + if !easy && r.Intn(10) != 0 { + this.XXX_unrecognized = randUnrecognizedLogEntry(r, 5) + } + return this +} + +type randyLogEntry interface { + Float32() float32 + Float64() float64 + Int63() int64 + Int31() int32 + Uint32() uint32 + Intn(n int) int +} + +func randUTF8RuneLogEntry(r randyLogEntry) rune { + res := rune(r.Uint32() % 1112064) + if 55296 <= res { + res += 2047 + } + return res +} +func randStringLogEntry(r randyLogEntry) string { + v5 := r.Intn(100) + tmps := make([]rune, v5) + for i := 0; i < v5; i++ { + tmps[i] = randUTF8RuneLogEntry(r) + } + return string(tmps) +} +func randUnrecognizedLogEntry(r randyLogEntry, maxFieldNumber int) (data []byte) { + l := r.Intn(5) + for i := 0; i < l; i++ { + wire := r.Intn(4) + if wire == 3 { + wire = 5 + } + fieldNumber := maxFieldNumber + r.Intn(100) + data = randFieldLogEntry(data, r, fieldNumber, wire) + } + return data +} +func randFieldLogEntry(data []byte, r randyLogEntry, fieldNumber int, wire int) []byte { + key := uint32(fieldNumber)<<3 | uint32(wire) + switch wire { + case 0: + data = encodeVarintPopulateLogEntry(data, uint64(key)) + data = encodeVarintPopulateLogEntry(data, uint64(r.Int63())) + case 1: + data = encodeVarintPopulateLogEntry(data, uint64(key)) + data = append(data, byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256))) + case 2: + data = encodeVarintPopulateLogEntry(data, uint64(key)) + ll := r.Intn(100) + data = encodeVarintPopulateLogEntry(data, uint64(ll)) + for j := 0; j < ll; j++ { + data = append(data, byte(r.Intn(256))) + } + default: + data = encodeVarintPopulateLogEntry(data, uint64(key)) + data = append(data, byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256))) + } + return data +} +func encodeVarintPopulateLogEntry(data []byte, v uint64) []byte { + for v >= 1<<7 { + data = append(data, uint8(uint64(v)&0x7f|0x80)) + v >>= 7 + } + data = append(data, uint8(v)) + return data +} +func (m *LogEntry) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *LogEntry) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + if m.Index != nil { + data[i] = 0x8 + i++ + i = encodeVarintLogEntry(data, i, uint64(*m.Index)) + } + if m.Term != nil { + data[i] = 0x10 + i++ + i = encodeVarintLogEntry(data, i, uint64(*m.Term)) + } + if m.CommandName != nil { + data[i] = 0x1a + i++ + i = encodeVarintLogEntry(data, i, uint64(len(*m.CommandName))) + i += copy(data[i:], *m.CommandName) + } + if m.Command != nil { + data[i] = 0x22 + i++ + i = encodeVarintLogEntry(data, i, uint64(len(m.Command))) + i += copy(data[i:], m.Command) + } + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} +func encodeFixed64LogEntry(data []byte, offset int, v uint64) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + data[offset+4] = uint8(v >> 32) + data[offset+5] = uint8(v >> 40) + data[offset+6] = uint8(v >> 48) + data[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32LogEntry(data []byte, offset int, v uint32) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintLogEntry(data []byte, offset int, v uint64) int { + for v >= 1<<7 { + data[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + data[offset] = uint8(v) + return offset + 1 +} +func (this *LogEntry) GoString() string { + if this == nil { + return "nil" + } + s := strings1.Join([]string{`&protobuf.LogEntry{` + `Index:` + valueToGoStringLogEntry(this.Index, "uint64"), `Term:` + valueToGoStringLogEntry(this.Term, "uint64"), `CommandName:` + valueToGoStringLogEntry(this.CommandName, "string"), `Command:` + valueToGoStringLogEntry(this.Command, "byte"), `XXX_unrecognized:` + fmt1.Sprintf("%#v", this.XXX_unrecognized) + `}`}, ", ") + return s +} +func valueToGoStringLogEntry(v interface{}, typ string) string { + rv := reflect1.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect1.Indirect(rv).Interface() + return fmt1.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} +func extensionToGoStringLogEntry(e map[int32]code_google_com_p_gogoprotobuf_proto1.Extension) string { + if e == nil { + return "nil" + } + s := "map[int32]proto.Extension{" + keys := make([]int, 0, len(e)) + for k := range e { + keys = append(keys, int(k)) + } + sort.Ints(keys) + ss := []string{} + for _, k := range keys { + ss = append(ss, strconv.Itoa(k)+": "+e[int32(k)].GoString()) + } + s += strings1.Join(ss, ",") + "}" + return s +} +func (this *LogEntry) VerboseEqual(that interface{}) error { + if that == nil { + if this == nil { + return nil + } + return fmt2.Errorf("that == nil && this != nil") + } + + that1, ok := that.(*LogEntry) + if !ok { + return fmt2.Errorf("that is not of type *LogEntry") + } + if that1 == nil { + if this == nil { + return nil + } + return fmt2.Errorf("that is type *LogEntry but is nil && this != nil") + } else if this == nil { + return fmt2.Errorf("that is type *LogEntrybut is not nil && this == nil") + } + if this.Index != nil && that1.Index != nil { + if *this.Index != *that1.Index { + return fmt2.Errorf("Index this(%v) Not Equal that(%v)", *this.Index, *that1.Index) + } + } else if this.Index != nil { + return fmt2.Errorf("this.Index == nil && that.Index != nil") + } else if that1.Index != nil { + return fmt2.Errorf("Index this(%v) Not Equal that(%v)", this.Index, that1.Index) + } + if this.Term != nil && that1.Term != nil { + if *this.Term != *that1.Term { + return fmt2.Errorf("Term this(%v) Not Equal that(%v)", *this.Term, *that1.Term) + } + } else if this.Term != nil { + return fmt2.Errorf("this.Term == nil && that.Term != nil") + } else if that1.Term != nil { + return fmt2.Errorf("Term this(%v) Not Equal that(%v)", this.Term, that1.Term) + } + if this.CommandName != nil && that1.CommandName != nil { + if *this.CommandName != *that1.CommandName { + return fmt2.Errorf("CommandName this(%v) Not Equal that(%v)", *this.CommandName, *that1.CommandName) + } + } else if this.CommandName != nil { + return fmt2.Errorf("this.CommandName == nil && that.CommandName != nil") + } else if that1.CommandName != nil { + return fmt2.Errorf("CommandName this(%v) Not Equal that(%v)", this.CommandName, that1.CommandName) + } + if !bytes.Equal(this.Command, that1.Command) { + return fmt2.Errorf("Command this(%v) Not Equal that(%v)", this.Command, that1.Command) + } + if !bytes.Equal(this.XXX_unrecognized, that1.XXX_unrecognized) { + return fmt2.Errorf("XXX_unrecognized this(%v) Not Equal that(%v)", this.XXX_unrecognized, that1.XXX_unrecognized) + } + return nil +} +func (this *LogEntry) Equal(that interface{}) bool { + if that == nil { + if this == nil { + return true + } + return false + } + + that1, ok := that.(*LogEntry) + if !ok { + return false + } + if that1 == nil { + if this == nil { + return true + } + return false + } else if this == nil { + return false + } + if this.Index != nil && that1.Index != nil { + if *this.Index != *that1.Index { + return false + } + } else if this.Index != nil { + return false + } else if that1.Index != nil { + return false + } + if this.Term != nil && that1.Term != nil { + if *this.Term != *that1.Term { + return false + } + } else if this.Term != nil { + return false + } else if that1.Term != nil { + return false + } + if this.CommandName != nil && that1.CommandName != nil { + if *this.CommandName != *that1.CommandName { + return false + } + } else if this.CommandName != nil { + return false + } else if that1.CommandName != nil { + return false + } + if !bytes.Equal(this.Command, that1.Command) { + return false + } + if !bytes.Equal(this.XXX_unrecognized, that1.XXX_unrecognized) { + return false + } + return true +} diff --git a/migrate/etcd4pb/log_entry.proto b/migrate/etcd4pb/log_entry.proto new file mode 100644 index 000000000..39fa826c0 --- /dev/null +++ b/migrate/etcd4pb/log_entry.proto @@ -0,0 +1,22 @@ +package protobuf; + +import "code.google.com/p/gogoprotobuf/gogoproto/gogo.proto"; + +option (gogoproto.gostring_all) = true; +option (gogoproto.equal_all) = true; +option (gogoproto.verbose_equal_all) = true; +option (gogoproto.goproto_stringer_all) = false; +option (gogoproto.stringer_all) = true; +option (gogoproto.populate_all) = true; +option (gogoproto.testgen_all) = true; +option (gogoproto.benchgen_all) = true; +option (gogoproto.marshaler_all) = true; +option (gogoproto.sizer_all) = true; +option (gogoproto.unmarshaler_all) = true; + +message LogEntry { + required uint64 Index=1; + required uint64 Term=2; + required string CommandName=3; + optional bytes Command=4; // for nop-command +} diff --git a/migrate/fixtures/cmdlog b/migrate/fixtures/cmdlog new file mode 100644 index 0000000000000000000000000000000000000000..89691b67892dad42303f49e25a5e193c178cd653 GIT binary patch literal 1100 zcmb`H&2Q5{5XJfKb_)^+^};C{2_YdTwe#ut1S%XVaX_T>h_tmQWx?Jxo3%-6^?zs9 zjtv1tAXF!cHmmh}zj>ZD4L!)j`q#fo zWC#%igH*>N1rLl^+K(UJ(cWik^XTX(7|sSheS&D_`vIMB>N`%1-HB^n)AS|@XLkta zu*O-TJx+_2MRTmxQeMET3gTuJRUl$3OI*k+f|(NpX1c^Tn2Jk?g3$LpSY4|4MA7Xf zVMded)}@D>-6a?1z$2ye#(X&D>>h0&)aH#IJgHz)uSjK#9qIU-v-=%stg~Dge611! zK#MS_;42acEVZWkbA#oXNDG(SxwzWwoTvoX1mALnK4%X|^Jxw6Q*-fMsTpAxvM$P!ReuL)~ z*uOo8w%SR^*+ExrktZ(aw>N;=8VpCAJtPBOcY2C~Lfh5RBZ7J4YN^lGuYwo9HNWS^ SPazGSRtsySx}bihk@x{qvMd+? literal 0 HcmV?d00001 diff --git a/migrate/log.go b/migrate/log.go new file mode 100644 index 000000000..977fdcf20 --- /dev/null +++ b/migrate/log.go @@ -0,0 +1,475 @@ +package migrate + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "log" + "os" + "time" + + "github.com/coreos/etcd/etcdserver" + etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb" + etcd4pb "github.com/coreos/etcd/migrate/etcd4pb" + "github.com/coreos/etcd/pkg/types" + raftpb "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/store" +) + +func DecodeLog4FromFile(logpath string) ([]*etcd4pb.LogEntry, error) { + file, err := os.OpenFile(logpath, os.O_RDWR, 0600) + if err != nil { + return nil, err + } + defer file.Close() + + return DecodeLog4(file) +} + +func DecodeLog4(file *os.File) ([]*etcd4pb.LogEntry, error) { + var readBytes int64 + entries := make([]*etcd4pb.LogEntry, 0) + + for { + entry, n, err := DecodeNextEntry4(file) + if err != nil { + if err == io.EOF { + break + } + return nil, fmt.Errorf("failed decoding next log entry: ", err) + } + + if entry != nil { + entries = append(entries, entry) + } + + readBytes += int64(n) + } + + return entries, nil +} + +// DecodeNextEntry4 unmarshals a v0.4 log entry from a reader. Returns the +// number of bytes read and any error that occurs. +func DecodeNextEntry4(r io.Reader) (*etcd4pb.LogEntry, int, error) { + var length int + _, err := fmt.Fscanf(r, "%8x\n", &length) + if err != nil { + return nil, -1, err + } + + data := make([]byte, length) + if _, err = io.ReadFull(r, data); err != nil { + return nil, -1, err + } + + ent4 := new(etcd4pb.LogEntry) + if err = ent4.Unmarshal(data); err != nil { + return nil, -1, err + } + + // add width of scanner token to length + length = length + 8 + 1 + + return ent4, length, nil +} + +func hashName(name string) int64 { + var sum int64 + for _, ch := range name { + sum = 131*sum + int64(ch) + } + return sum +} + +type Command4 interface { + Type5() raftpb.EntryType + Data5() ([]byte, error) +} + +func NewCommand4(name string, data []byte) (Command4, error) { + var cmd Command4 + + switch name { + case "etcd:remove": + cmd = &RemoveCommand{} + case "etcd:join": + cmd = &JoinCommand{} + case "etcd:setClusterConfig": + //TODO(bcwaldon): can this safely be discarded? + cmd = &NOPCommand{} + case "etcd:compareAndDelete": + cmd = &CompareAndDeleteCommand{} + case "etcd:compareAndSwap": + cmd = &CompareAndSwapCommand{} + case "etcd:create": + cmd = &CreateCommand{} + case "etcd:delete": + cmd = &DeleteCommand{} + case "etcd:set": + cmd = &SetCommand{} + case "etcd:sync": + cmd = &SyncCommand{} + case "etcd:update": + cmd = &UpdateCommand{} + case "raft:join": + cmd = &DefaultJoinCommand{} + case "raft:leave": + cmd = &DefaultLeaveCommand{} + case "raft:nop": + cmd = &NOPCommand{} + default: + return nil, fmt.Errorf("unregistered command type %s", name) + } + + // If data for the command was passed in the decode it. + if data != nil { + if err := json.NewDecoder(bytes.NewReader(data)).Decode(cmd); err != nil { + return nil, fmt.Errorf("unable to decode bytes %q: %v", data, err) + } + } + + return cmd, nil +} + +type RemoveCommand struct { + Name string `json:"name"` +} + +func (c *RemoveCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *RemoveCommand) Data5() ([]byte, error) { + m := etcdserver.Member{ + ID: hashName(c.Name), + } + + req5 := &etcdserverpb.Request{ + Method: "DELETE", + Path: m.StoreKey(), + } + + return req5.Marshal() +} + +type JoinCommand struct { + Name string `json:"name"` + RaftURL string `json:"raftURL"` + EtcdURL string `json:"etcdURL"` + + //TODO(bcwaldon): Should these be converted? + //MinVersion int `json:"minVersion"` + //MaxVersion int `json:"maxVersion"` +} + +func (c *JoinCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *JoinCommand) Data5() ([]byte, error) { + pURLs, err := types.NewURLs([]string{c.RaftURL}) + if err != nil { + return nil, err + } + + m := etcdserver.GenerateMember(c.Name, pURLs, nil) + + //TODO(bcwaldon): why doesn't this go through GenerateMember? + m.ClientURLs = []string{c.EtcdURL} + + b, err := json.Marshal(*m) + if err != nil { + return nil, err + } + + req5 := &etcdserverpb.Request{ + Method: "PUT", + Path: m.StoreKey(), + Val: string(b), + + // TODO(bcwaldon): Is this correct? + Time: store.Permanent.Unix(), + + //TODO(bcwaldon): What is the new equivalent of Unique? + //Unique: c.Unique, + } + + return req5.Marshal() + +} + +type SetClusterConfigCommand struct { + Config *struct { + ActiveSize int `json:"activeSize"` + RemoveDelay float64 `json:"removeDelay"` + SyncInterval float64 `json:"syncInterval"` + } `json:"config"` +} + +func (c *SetClusterConfigCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *SetClusterConfigCommand) Data5() ([]byte, error) { + b, err := json.Marshal(c.Config) + if err != nil { + return nil, err + } + + req5 := &etcdserverpb.Request{ + Method: "PUT", + Path: "/v2/admin/config", + Dir: false, + Val: string(b), + + // TODO(bcwaldon): Is this correct? + Time: store.Permanent.Unix(), + } + + return req5.Marshal() +} + +type CompareAndDeleteCommand struct { + Key string `json:"key"` + PrevValue string `json:"prevValue"` + PrevIndex uint64 `json:"prevIndex"` +} + +func (c *CompareAndDeleteCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *CompareAndDeleteCommand) Data5() ([]byte, error) { + req5 := &etcdserverpb.Request{ + Method: "DELETE", + Path: c.Key, + PrevValue: c.PrevValue, + PrevIndex: c.PrevIndex, + } + return req5.Marshal() +} + +type CompareAndSwapCommand struct { + Key string `json:"key"` + Value string `json:"value"` + ExpireTime time.Time `json:"expireTime"` + PrevValue string `json:"prevValue"` + PrevIndex uint64 `json:"prevIndex"` +} + +func (c *CompareAndSwapCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *CompareAndSwapCommand) Data5() ([]byte, error) { + req5 := &etcdserverpb.Request{ + Method: "PUT", + Path: c.Key, + Val: c.Value, + PrevValue: c.PrevValue, + PrevIndex: c.PrevIndex, + Time: c.ExpireTime.Unix(), + } + return req5.Marshal() +} + +type CreateCommand struct { + Key string `json:"key"` + Value string `json:"value"` + ExpireTime time.Time `json:"expireTime"` + Unique bool `json:"unique"` + Dir bool `json:"dir"` +} + +func (c *CreateCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *CreateCommand) Data5() ([]byte, error) { + req5 := &etcdserverpb.Request{ + Method: "PUT", + Path: c.Key, + Dir: c.Dir, + Val: c.Value, + + // TODO(bcwaldon): Is this correct? + Time: c.ExpireTime.Unix(), + + //TODO(bcwaldon): What is the new equivalent of Unique? + //Unique: c.Unique, + } + return req5.Marshal() +} + +type DeleteCommand struct { + Key string `json:"key"` + Recursive bool `json:"recursive"` + Dir bool `json:"dir"` +} + +func (c *DeleteCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *DeleteCommand) Data5() ([]byte, error) { + req5 := &etcdserverpb.Request{ + Method: "DELETE", + Path: c.Key, + Dir: c.Dir, + Recursive: c.Recursive, + } + return req5.Marshal() +} + +type SetCommand struct { + Key string `json:"key"` + Value string `json:"value"` + ExpireTime time.Time `json:"expireTime"` + Dir bool `json:"dir"` +} + +func (c *SetCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *SetCommand) Data5() ([]byte, error) { + req5 := &etcdserverpb.Request{ + Method: "PUT", + Path: c.Key, + Dir: c.Dir, + Val: c.Value, + + //TODO(bcwaldon): Is this correct? + Time: c.ExpireTime.Unix(), + } + return req5.Marshal() +} + +type UpdateCommand struct { + Key string `json:"key"` + Value string `json:"value"` + ExpireTime time.Time `json:"expireTime"` +} + +func (c *UpdateCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *UpdateCommand) Data5() ([]byte, error) { + req5 := &etcdserverpb.Request{ + Method: "PUT", + Path: c.Key, + Val: c.Value, + + //TODO(bcwaldon): Is this correct? + Time: c.ExpireTime.Unix(), + } + return req5.Marshal() +} + +type SyncCommand struct { + Time time.Time `json:"time"` +} + +func (c *SyncCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *SyncCommand) Data5() ([]byte, error) { + req5 := &etcdserverpb.Request{ + Method: "SYNC", + //TODO(bcwaldon): Is this correct? + Time: c.Time.UnixNano(), + } + return req5.Marshal() +} + +type DefaultJoinCommand struct { + //TODO(bcwaldon): implement Type5, Data5 + Command4 + + Name string `json:"name"` + ConnectionString string `json:"connectionString"` +} + +type DefaultLeaveCommand struct { + //TODO(bcwaldon): implement Type5, Data5 + Command4 + + Name string `json:"name"` +} + +//TODO(bcwaldon): Why is CommandName here? +func (c *DefaultLeaveCommand) CommandName() string { + return "raft:leave" +} + +type NOPCommand struct{} + +//TODO(bcwaldon): Why is CommandName here? +func (c NOPCommand) CommandName() string { + return "raft:nop" +} + +func (c *NOPCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *NOPCommand) Data5() ([]byte, error) { + return nil, nil +} + +func Entries4To5(commitIndex uint64, ents4 []*etcd4pb.LogEntry) ([]raftpb.Entry, error) { + ents4Len := len(ents4) + + if ents4Len == 0 { + return nil, nil + } + + startIndex := ents4[0].GetIndex() + for i, e := range ents4[1:] { + eIndex := e.GetIndex() + // ensure indexes are monotonically increasing + wantIndex := startIndex + uint64(i+1) + if wantIndex != eIndex { + return nil, fmt.Errorf("skipped log index %d", wantIndex) + } + } + + ents5 := make([]raftpb.Entry, 0) + for i, e := range ents4 { + ent, err := toEntry5(e) + if err != nil { + log.Printf("Ignoring invalid log data in entry %d: %v", i, err) + } else { + ents5 = append(ents5, *ent) + } + } + + return ents5, nil +} + +func toEntry5(ent4 *etcd4pb.LogEntry) (*raftpb.Entry, error) { + cmd4, err := NewCommand4(ent4.GetCommandName(), ent4.GetCommand()) + if err != nil { + return nil, err + } + + data, err := cmd4.Data5() + if err != nil { + return nil, err + } + + ent5 := raftpb.Entry{ + Term: int64(ent4.GetTerm()), + Index: int64(ent4.GetIndex()), + Type: cmd4.Type5(), + Data: data, + } + + log.Printf("%d: %s -> %s", ent5.Index, ent4.GetCommandName(), ent5.Type) + + return &ent5, nil +} diff --git a/migrate/log_test.go b/migrate/log_test.go new file mode 100644 index 000000000..366d2cfa3 --- /dev/null +++ b/migrate/log_test.go @@ -0,0 +1,42 @@ +package migrate + +import ( + "reflect" + "testing" + "time" +) + +func TestNewCommand(t *testing.T) { + entries, err := ReadLogFile("fixtures/cmdlog") + if err != nil { + t.Errorf("read log file error: %v", err) + } + + tests := []interface{}{ + &JoinCommand{2, 2, "1.local", "http://127.0.0.1:7001", "http://127.0.0.1:4001"}, + &SetClusterConfigCommand{&ClusterConfig{9, 1800.0, 5.0}}, + &NOPCommand{}, + &RemoveCommand{"alice"}, + &CompareAndDeleteCommand{"foo", "baz", 9}, + &CompareAndSwapCommand{"foo", "bar", time.Unix(0, 0), "baz", 9}, + &CreateCommand{"foo", "bar", time.Unix(0, 0), true, true}, + &DeleteCommand{"foo", true, true}, + &SetCommand{"foo", "bar", time.Unix(0, 0), true}, + &SyncCommand{time.Unix(0, 0)}, + &UpdateCommand{"foo", "bar", time.Unix(0, 0)}, + &DefaultLeaveCommand{"alice"}, + &DefaultJoinCommand{"alice", ""}, + } + + for i, e := range entries { + cmd, err := NewCommand(e.GetCommandName(), e.GetCommand()) + if err != nil { + t.Errorf("#%d: %v", i, err) + continue + } + + if !reflect.DeepEqual(cmd, tests[i]) { + t.Errorf("#%d: cmd = %+v, want %+v", i, cmd, tests[i]) + } + } +} diff --git a/migrate/snapshot.go b/migrate/snapshot.go new file mode 100644 index 000000000..5b4a2fe79 --- /dev/null +++ b/migrate/snapshot.go @@ -0,0 +1,187 @@ +package migrate + +import ( + "encoding/json" + "errors" + "fmt" + "hash/crc32" + "io/ioutil" + "log" + "os" + "path" + "sort" + "strconv" + "strings" + + raftpb "github.com/coreos/etcd/raft/raftpb" +) + +type Snapshot4 struct { + State []byte `json:"state"` + LastIndex uint64 `json:"lastIndex"` + LastTerm uint64 `json:"lastTerm"` + + Peers []struct { + Name string `json:"name"` + ConnectionString string `json:"connectionString"` + } `json:"peers"` + + //TODO(bcwaldon): is this needed? + //Path string `json:"path"` +} + +func (s *Snapshot4) Snapshot5() *raftpb.Snapshot { + snap5 := raftpb.Snapshot{ + Data: s.State, + Index: int64(s.LastIndex), + Term: int64(s.LastTerm), + Nodes: make([]int64, len(s.Peers)), + } + + for i, p := range s.Peers { + snap5.Nodes[i] = hashName(p.Name) + } + + return &snap5 +} + +func DecodeLatestSnapshot4FromDir(snapdir string) (*Snapshot4, error) { + fname, err := FindLatestFile(snapdir) + if err != nil { + return nil, err + } + + if fname == "" { + return nil, nil + } + + snappath := path.Join(snapdir, fname) + log.Printf("Decoding snapshot from %s", snappath) + + return DecodeSnapshot4FromFile(snappath) +} + +// FindLatestFile identifies the "latest" filename in a given directory +// by sorting all the files and choosing the highest value. +func FindLatestFile(dirpath string) (string, error) { + dir, err := os.OpenFile(dirpath, os.O_RDONLY, 0) + if err != nil { + if os.IsNotExist(err) { + err = nil + } + return "", err + } + defer dir.Close() + + fnames, err := dir.Readdirnames(-1) + if err != nil { + return "", err + } + + if len(fnames) == 0 { + return "", nil + } + + names, err := NewSnapshotFileNames(fnames) + if err != nil { + return "", err + } + + return names[len(names)-1].FileName, nil +} + +func DecodeSnapshot4FromFile(path string) (*Snapshot4, error) { + // Read snapshot data. + f, err := os.OpenFile(path, os.O_RDONLY, 0) + if err != nil { + return nil, err + } + defer f.Close() + + return DecodeSnapshot4(f) +} + +func DecodeSnapshot4(f *os.File) (*Snapshot4, error) { + // Verify checksum + var checksum uint32 + n, err := fmt.Fscanf(f, "%08x\n", &checksum) + if err != nil { + return nil, err + } else if n != 1 { + return nil, errors.New("miss heading checksum") + } + + // Load remaining snapshot contents. + b, err := ioutil.ReadAll(f) + if err != nil { + return nil, err + } + + // Generate checksum. + byteChecksum := crc32.ChecksumIEEE(b) + if uint32(checksum) != byteChecksum { + return nil, errors.New("bad checksum") + } + + // Decode snapshot. + snapshot := new(Snapshot4) + if err = json.Unmarshal(b, snapshot); err != nil { + return nil, err + } + return snapshot, nil +} + +func NewSnapshotFileNames(names []string) ([]SnapshotFileName, error) { + s := make([]SnapshotFileName, 0) + for _, n := range names { + trimmed := strings.TrimSuffix(n, ".ss") + if trimmed == n { + return nil, fmt.Errorf("file %q does not have .ss extension", n) + } + + parts := strings.SplitN(trimmed, "_", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("unrecognized file name format %q", n) + } + + fn := SnapshotFileName{FileName: n} + + var err error + fn.Term, err = strconv.ParseUint(parts[0], 10, 64) + if err != nil { + return nil, fmt.Errorf("unable to parse term from filename %q: %v", err) + } + + fn.Index, err = strconv.ParseUint(parts[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("unable to parse index from filename %q: %v", err) + } + + s = append(s, fn) + } + + sortable := SnapshotFileNames(s) + sort.Sort(&sortable) + return s, nil +} + +type SnapshotFileNames []SnapshotFileName +type SnapshotFileName struct { + FileName string + Term uint64 + Index uint64 +} + +func (n *SnapshotFileNames) Less(i, j int) bool { + iTerm, iIndex := (*n)[i].Term, (*n)[i].Index + jTerm, jIndex := (*n)[j].Term, (*n)[j].Index + return iTerm < jTerm || (iTerm == jTerm && iIndex < jIndex) +} + +func (n *SnapshotFileNames) Swap(i, j int) { + (*n)[i], (*n)[j] = (*n)[j], (*n)[i] +} + +func (n *SnapshotFileNames) Len() int { + return len([]SnapshotFileName(*n)) +} From 192f200d9e5fa72894161477a7e68ea34cb0bc93 Mon Sep 17 00:00:00 2001 From: Barak Michener Date: Wed, 29 Oct 2014 16:27:07 -0400 Subject: [PATCH 3/3] Fix up migration tool, add snapshot migration Fixes all updates since bcwaldon sketched the original, with cleanup and into an acutal working state. The commit log follows: fix pb reference and remove unused file post rebase unbreak the migrate folder correctly detect node IDs fix snapshotting Fix previous broken snapshot Add raft log entries to the translation; fix test for all timezones. (Still in progress, but passing) Fix etcd:join and etcd:remove print more data when dumping the log Cleanup based on yichengq's comments more comments Fix the commited index based on the snapshot, if one exists detect nodeIDs from snapshot add initial tool documentation and match the semantics in the build script and main formalize migration doc rename function and clarify docs fix nil pointer fix the record conversion test add migration to test suite and fix govet --- Documentation/0.5/0_4_migration_tool.md | 47 +++++ build | 1 + etcdserver/cluster_store.go | 132 ------------- etcdserver/member.go | 4 + etcdserver/server.go | 2 +- migrate/cmd/etcd-dump-logs/main.go | 11 +- migrate/cmd/etcd-migrate/main.go | 5 +- migrate/config.go | 13 +- migrate/etcd4.go | 104 ++++++++-- migrate/etcd4pb/log_entry.pb.go | 6 +- migrate/fixtures/cmdlog | Bin 1100 -> 1098 bytes migrate/log.go | 241 ++++++++++++++---------- migrate/log_test.go | 53 ++++-- migrate/snapshot.go | 157 ++++++++++++++- test | 2 +- 15 files changed, 482 insertions(+), 296 deletions(-) create mode 100644 Documentation/0.5/0_4_migration_tool.md delete mode 100644 etcdserver/cluster_store.go diff --git a/Documentation/0.5/0_4_migration_tool.md b/Documentation/0.5/0_4_migration_tool.md new file mode 100644 index 000000000..93f02e862 --- /dev/null +++ b/Documentation/0.5/0_4_migration_tool.md @@ -0,0 +1,47 @@ +## etcd 0.4.x -> 0.5.0 Data Migration Tool + +### Upgrading from 0.4.x + +Between 0.4.x and 0.5, the on-disk data formats have changed. In order to allow users to convert to 0.5, a migration tool is provided. + +In the early 0.5.0-alpha series, we're providing this tool early to encourage adoption. However, before 0.5.0-release, etcd will autodetect the 0.4.x data dir upon upgrade and automatically update the data too (while leaving a backup, in case of emergency). + +### Data Migration Tips + +* Keep the environment variables and etcd instance flags the same (much as [the upgrade document](../upgrade.md) suggests), particularly `--name`/`ETCD_NAME`. +* Don't change the cluster configuration. If there's a plan to add or remove machines, it's probably best to arrange for that after the migration, rather than before or at the same time. + +### Running the tool + +The tool can be run via: +```sh +./bin/etcd-migrate --data-dir= +``` + +It should autodetect everything and convert the data-dir to be 0.5 compatible. It does not remove the 0.4.x data, and is safe to convert multiple times; the 0.5 data will be overwritten. Recovering the disk space once everything is settled is covered later in the document. + +If, however, it complains about autodetecting the name (which can happen, depending on how the cluster was configured), you need to supply the name of this particular node. This is equivalent to the `--name` flag (or `ETCD_NAME` variable) that etcd was run with, which can also be found by accessing the self api, eg: + +```sh +curl -L http://127.0.0.1:4001/v2/stats/self +``` + +Where the `"name"` field is the name of the local machine. + +Then, run the migration tool with + +```sh +./bin/etcd-migrate --data-dir= --name= +``` + +And the tool should migrate successfully. If it still has an error at this time, it's a failure or bug in the tool and it's worth reporting a bug. + +### Recovering Disk Space + +If the conversion has completed, the entire cluster is running on something 0.5-based, and the disk space is important, the following command will clear 0.4.x data from the data-dir: + +```sh +rm -ri snapshot conf log +``` + +It will ask before every deletion, but these are the 0.4.x files and will not affect the working 0.5 data. diff --git a/build b/build index 459ae8036..473c4af98 100755 --- a/build +++ b/build @@ -13,3 +13,4 @@ eval $(go env) go build -o bin/etcd ${REPO_PATH} go build -o bin/etcdctl ${REPO_PATH}/etcdctl +go build -o bin/etcd-migrate ${REPO_PATH}/migrate/cmd/etcd-migrate diff --git a/etcdserver/cluster_store.go b/etcdserver/cluster_store.go deleted file mode 100644 index f0c8e278b..000000000 --- a/etcdserver/cluster_store.go +++ /dev/null @@ -1,132 +0,0 @@ -package etcdserver - -import ( - "bytes" - "encoding/json" - "fmt" - "log" - "net/http" - - "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/store" -) - -const ( - raftPrefix = "/raft" -) - -type ClusterStore interface { - Add(m Member) - Get() Cluster - Remove(id int64) -} - -type clusterStore struct { - Store store.Store -} - -func NewClusterStore(st store.Store, c Cluster) ClusterStore { - cls := &clusterStore{Store: st} - for _, m := range c { - cls.Add(*m) - } - return cls -} - -// Add puts a new Member into the store. -// A Member with a matching id must not exist. -func (s *clusterStore) Add(m Member) { - b, err := json.Marshal(m) - if err != nil { - log.Panicf("marshal peer info error: %v", err) - } - - if _, err := s.Store.Create(m.StoreKey(), false, string(b), false, store.Permanent); err != nil { - log.Panicf("add member should never fail: %v", err) - } -} - -// TODO(philips): keep the latest copy without going to the store to avoid the -// lock here. -func (s *clusterStore) Get() Cluster { - c := &Cluster{} - e, err := s.Store.Get(machineKVPrefix, true, false) - if err != nil { - log.Panicf("get member should never fail: %v", err) - } - for _, n := range e.Node.Nodes { - m := Member{} - if err := json.Unmarshal([]byte(*n.Value), &m); err != nil { - log.Panicf("unmarshal peer error: %v", err) - } - log.Printf("Found member in cluster: %#v", m) - err := c.Add(m) - if err != nil { - log.Panicf("add member to cluster should never fail: %v", err) - } - } - return *c -} - -// Remove removes a member from the store. -// The given id MUST exist. -func (s *clusterStore) Remove(id int64) { - p := s.Get().FindID(id).StoreKey() - if _, err := s.Store.Delete(p, false, false); err != nil { - log.Panicf("delete peer should never fail: %v", err) - } -} - -func Sender(t *http.Transport, cls ClusterStore) func(msgs []raftpb.Message) { - c := &http.Client{Transport: t} - - return func(msgs []raftpb.Message) { - for _, m := range msgs { - // TODO: reuse go routines - // limit the number of outgoing connections for the same receiver - go send(c, cls, m) - } - } -} - -func send(c *http.Client, cls ClusterStore, m raftpb.Message) { - // TODO (xiangli): reasonable retry logic - for i := 0; i < 3; i++ { - u := cls.Get().Pick(m.To) - if u == "" { - // TODO: unknown peer id.. what do we do? I - // don't think his should ever happen, need to - // look into this further. - log.Printf("etcdhttp: no addr for %d", m.To) - return - } - - u = fmt.Sprintf("%s%s", u, raftPrefix) - - // TODO: don't block. we should be able to have 1000s - // of messages out at a time. - data, err := m.Marshal() - if err != nil { - log.Println("etcdhttp: dropping message:", err) - return // drop bad message - } - if httpPost(c, u, data) { - return // success - } - // TODO: backoff - } -} - -func httpPost(c *http.Client, url string, data []byte) bool { - resp, err := c.Post(url, "application/protobuf", bytes.NewBuffer(data)) - if err != nil { - // TODO: log the error? - return false - } - resp.Body.Close() - if resp.StatusCode != http.StatusNoContent { - // TODO: log the error? - return false - } - return true -} diff --git a/etcdserver/member.go b/etcdserver/member.go index 58583a012..18949e2eb 100644 --- a/etcdserver/member.go +++ b/etcdserver/member.go @@ -105,6 +105,10 @@ func memberStoreKey(id types.ID) string { return path.Join(storeMembersPrefix, id.String()) } +func MemberAttributesStorePath(id types.ID) string { + return path.Join(memberStoreKey(id), attributesSuffix) +} + func mustParseMemberIDFromKey(key string) types.ID { id, err := types.IDFromString(path.Base(key)) if err != nil { diff --git a/etcdserver/server.go b/etcdserver/server.go index a3ce4bd5b..9b82967ef 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -581,7 +581,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) { req := pb.Request{ ID: GenID(), Method: "PUT", - Path: path.Join(memberStoreKey(s.id), attributesSuffix), + Path: MemberAttributesStorePath(s.id), Val: string(b), } diff --git a/migrate/cmd/etcd-dump-logs/main.go b/migrate/cmd/etcd-dump-logs/main.go index 17759e2a3..fba12ba93 100644 --- a/migrate/cmd/etcd-dump-logs/main.go +++ b/migrate/cmd/etcd-dump-logs/main.go @@ -9,6 +9,7 @@ import ( etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/migrate" + "github.com/coreos/etcd/pkg/types" raftpb "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/wal" ) @@ -27,7 +28,7 @@ func main() { flag.Parse() if *from == "" { - log.Fatal("Must provide -from flag") + log.Fatal("Must provide -data-dir flag") } var ents []raftpb.Entry @@ -58,6 +59,12 @@ func main() { } case raftpb.EntryConfChange: msg = fmt.Sprintf("%s conf", msg) + var r raftpb.ConfChange + if err := r.Unmarshal(e.Data); err != nil { + msg = fmt.Sprintf("%s ???", msg) + } else { + msg = fmt.Sprintf("%s %s %s %s", msg, r.Type, types.ID(r.NodeID), r.Context) + } } fmt.Println(msg) } @@ -70,7 +77,7 @@ func dump4(dataDir string) ([]raftpb.Entry, error) { return nil, err } - return migrate.Entries4To5(0, ents) + return migrate.Entries4To5(ents) } func dump5(dataDir string) ([]raftpb.Entry, error) { diff --git a/migrate/cmd/etcd-migrate/main.go b/migrate/cmd/etcd-migrate/main.go index ead6cea4f..dc7c1a8b0 100644 --- a/migrate/cmd/etcd-migrate/main.go +++ b/migrate/cmd/etcd-migrate/main.go @@ -9,13 +9,14 @@ import ( func main() { from := flag.String("data-dir", "", "etcd v0.4 data-dir") + name := flag.String("name", "", "etcd node name") flag.Parse() if *from == "" { - log.Fatal("Must provide -from flag") + log.Fatal("Must provide -data-dir flag") } - err := migrate.Migrate4To5(*from) + err := migrate.Migrate4To5(*from, *name) if err != nil { log.Fatalf("Failed migrating data-dir: %v", err) } diff --git a/migrate/config.go b/migrate/config.go index a77884317..9d029ace1 100644 --- a/migrate/config.go +++ b/migrate/config.go @@ -4,22 +4,21 @@ import ( "encoding/json" "io/ioutil" - raftpb "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/raft/raftpb" ) type Config4 struct { CommitIndex uint64 `json:"commitIndex"` - //TODO(bcwaldon): is this needed? - //Peers []struct{ - // Name string `json:"name"` - // ConnectionString string `json:"connectionString"` - //} `json:"peers"` + Peers []struct { + Name string `json:"name"` + ConnectionString string `json:"connectionString"` + } `json:"peers"` } func (c *Config4) HardState5() raftpb.HardState { return raftpb.HardState{ - Commit: int64(c.CommitIndex), + Commit: c.CommitIndex, Term: 0, Vote: 0, } diff --git a/migrate/etcd4.go b/migrate/etcd4.go index 436521f49..cf0a2ee1e 100644 --- a/migrate/etcd4.go +++ b/migrate/etcd4.go @@ -6,6 +6,8 @@ import ( "os" "path" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/pbutil" raftpb "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" "github.com/coreos/etcd/wal" @@ -31,20 +33,13 @@ func walDir5(dataDir string) string { return path.Join(dataDir, "wal") } -func Migrate4To5(dataDir string) error { +func Migrate4To5(dataDir string, name string) error { // prep new directories sd5 := snapDir5(dataDir) if err := os.MkdirAll(sd5, 0700); err != nil { return fmt.Errorf("failed creating snapshot directory %s: %v", sd5, err) } - wd5 := walDir5(dataDir) - w, err := wal.Create(wd5) - if err != nil { - return fmt.Errorf("failed initializing wal at %s: %v", wd5, err) - } - defer w.Close() - // read v0.4 data snap4, err := DecodeLatestSnapshot4FromDir(snapDir4(dataDir)) if err != nil { @@ -61,6 +56,21 @@ func Migrate4To5(dataDir string) error { return err } + nodeIDs := ents4.NodeIDs() + nodeID := GuessNodeID(nodeIDs, snap4, cfg4, name) + + if nodeID == 0 { + return fmt.Errorf("Couldn't figure out the node ID from the log or flags, cannot convert") + } + + metadata := pbutil.MustMarshal(&pb.Metadata{NodeID: nodeID, ClusterID: 0x04add5}) + wd5 := walDir5(dataDir) + w, err := wal.Create(wd5, metadata) + if err != nil { + return fmt.Errorf("failed initializing wal at %s: %v", wd5, err) + } + defer w.Close() + // transform v0.4 data var snap5 *raftpb.Snapshot if snap4 == nil { @@ -73,7 +83,12 @@ func Migrate4To5(dataDir string) error { st5 := cfg4.HardState5() - ents5, err := Entries4To5(uint64(st5.Commit), ents4) + // If we've got the most recent snapshot, we can use it's committed index. Still likely less than the current actual index, but worth it for the replay. + if snap5 != nil { + st5.Commit = snap5.Index + } + + ents5, err := Entries4To5(ents4) if err != nil { return err } @@ -81,18 +96,73 @@ func Migrate4To5(dataDir string) error { ents5Len := len(ents5) log.Printf("Found %d log entries: firstIndex=%d lastIndex=%d", ents5Len, ents5[0].Index, ents5[ents5Len-1].Index) - // migrate snapshot (if necessary) and logs - if snap5 != nil { - ss := snap.New(sd5) - ss.SaveSnap(*snap5) - log.Printf("Snapshot migration successful") - } - // explicitly prepend an empty entry as the WAL code expects it ents5 = append(make([]raftpb.Entry, 1), ents5...) - w.Save(st5, ents5) + if err = w.Save(st5, ents5); err != nil { + return err + } log.Printf("Log migration successful") + // migrate snapshot (if necessary) and logs + if snap5 != nil { + ss := snap.New(sd5) + if err := ss.SaveSnap(*snap5); err != nil { + return err + } + log.Printf("Snapshot migration successful") + } + return nil } + +func GuessNodeID(nodes map[string]uint64, snap4 *Snapshot4, cfg *Config4, name string) uint64 { + var snapNodes map[string]uint64 + if snap4 != nil { + snapNodes = snap4.GetNodesFromStore() + } + // First, use the flag, if set. + if name != "" { + log.Printf("Using suggested name %s", name) + if val, ok := nodes[name]; ok { + log.Printf("Found ID %d", val) + return val + } + if snapNodes != nil { + if val, ok := snapNodes[name]; ok { + log.Printf("Found ID %d", val) + return val + } + } + log.Printf("Name not found, autodetecting...") + } + // Next, look at the snapshot peers, if that exists. + if snap4 != nil { + //snapNodes := make(map[string]uint64) + //for _, p := range snap4.Peers { + //m := generateNodeMember(p.Name, p.ConnectionString, "") + //snapNodes[p.Name] = uint64(m.ID) + //} + for _, p := range cfg.Peers { + log.Printf(p.Name) + delete(snapNodes, p.Name) + } + if len(snapNodes) == 1 { + for name, id := range nodes { + log.Printf("Autodetected from snapshot: name %s", name) + return id + } + } + } + // Then, try and deduce from the log. + for _, p := range cfg.Peers { + delete(nodes, p.Name) + } + if len(nodes) == 1 { + for name, id := range nodes { + log.Printf("Autodetected name %s", name) + return id + } + } + return 0 +} diff --git a/migrate/etcd4pb/log_entry.pb.go b/migrate/etcd4pb/log_entry.pb.go index adab85b9b..787ffe730 100644 --- a/migrate/etcd4pb/log_entry.pb.go +++ b/migrate/etcd4pb/log_entry.pb.go @@ -4,14 +4,14 @@ package protobuf -import proto "github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto" +import proto "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto" import json "encoding/json" import math "math" // discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo.pb" import io "io" -import code_google_com_p_gogoprotobuf_proto "github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto" +import code_google_com_p_gogoprotobuf_proto "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto" import fmt "fmt" import strings "strings" @@ -19,7 +19,7 @@ import reflect "reflect" import fmt1 "fmt" import strings1 "strings" -import code_google_com_p_gogoprotobuf_proto1 "github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto" +import code_google_com_p_gogoprotobuf_proto1 "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto" import sort "sort" import strconv "strconv" import reflect1 "reflect" diff --git a/migrate/fixtures/cmdlog b/migrate/fixtures/cmdlog index 89691b67892dad42303f49e25a5e193c178cd653..28dc2e01ee16dbda8920c3b1e4ba3d96493b13bf 100644 GIT binary patch delta 26 hcmX@Zaf)MtBTr&ZW^$^Mj#5!#TFJyh@r`NEm;rz>3Ag|N delta 28 jcmX@bafV}pBcGvOPJVJ?j*^a2QDR!j#C-9MDbJVzjn)bx diff --git a/migrate/log.go b/migrate/log.go index 977fdcf20..84a0a9b0e 100644 --- a/migrate/log.go +++ b/migrate/log.go @@ -7,6 +7,7 @@ import ( "io" "log" "os" + "path" "time" "github.com/coreos/etcd/etcdserver" @@ -17,8 +18,49 @@ import ( "github.com/coreos/etcd/store" ) -func DecodeLog4FromFile(logpath string) ([]*etcd4pb.LogEntry, error) { - file, err := os.OpenFile(logpath, os.O_RDWR, 0600) +const etcdDefaultClusterName = "etcd-cluster" + +func UnixTimeOrPermanent(expireTime time.Time) int64 { + expire := expireTime.Unix() + if expireTime == store.Permanent { + expire = 0 + } + return expire +} + +type Log4 []*etcd4pb.LogEntry + +func (l Log4) NodeIDs() map[string]uint64 { + out := make(map[string]uint64) + for _, e := range l { + if e.GetCommandName() == "etcd:join" { + cmd4, err := NewCommand4(e.GetCommandName(), e.GetCommand(), nil) + if err != nil { + log.Println("error converting an etcd:join to v0.5 format. Likely corrupt!") + return nil + } + join := cmd4.(*JoinCommand) + m := generateNodeMember(join.Name, join.RaftURL, "") + out[join.Name] = uint64(m.ID) + } + if e.GetCommandName() == "etcd:remove" { + cmd4, err := NewCommand4(e.GetCommandName(), e.GetCommand(), nil) + if err != nil { + return nil + } + name := cmd4.(*RemoveCommand).Name + delete(out, name) + } + } + return out +} + +func StorePath(key string) string { + return path.Join(etcdserver.StoreKeysPrefix, key) +} + +func DecodeLog4FromFile(logpath string) (Log4, error) { + file, err := os.OpenFile(logpath, os.O_RDONLY, 0600) if err != nil { return nil, err } @@ -37,12 +79,10 @@ func DecodeLog4(file *os.File) ([]*etcd4pb.LogEntry, error) { if err == io.EOF { break } - return nil, fmt.Errorf("failed decoding next log entry: ", err) + return nil, fmt.Errorf("failed decoding next log entry: %v", err) } - if entry != nil { - entries = append(entries, entry) - } + entries = append(entries, entry) readBytes += int64(n) } @@ -75,10 +115,10 @@ func DecodeNextEntry4(r io.Reader) (*etcd4pb.LogEntry, int, error) { return ent4, length, nil } -func hashName(name string) int64 { - var sum int64 +func hashName(name string) uint64 { + var sum uint64 for _, ch := range name { - sum = 131*sum + int64(ch) + sum = 131*sum + uint64(ch) } return sum } @@ -88,7 +128,7 @@ type Command4 interface { Data5() ([]byte, error) } -func NewCommand4(name string, data []byte) (Command4, error) { +func NewCommand4(name string, data []byte, raftMap map[string]uint64) (Command4, error) { var cmd Command4 switch name { @@ -97,7 +137,6 @@ func NewCommand4(name string, data []byte) (Command4, error) { case "etcd:join": cmd = &JoinCommand{} case "etcd:setClusterConfig": - //TODO(bcwaldon): can this safely be discarded? cmd = &NOPCommand{} case "etcd:compareAndDelete": cmd = &CompareAndDeleteCommand{} @@ -114,9 +153,10 @@ func NewCommand4(name string, data []byte) (Command4, error) { case "etcd:update": cmd = &UpdateCommand{} case "raft:join": - cmd = &DefaultJoinCommand{} + // These are subsumed by etcd:remove and etcd:join; we shouldn't see them. + fallthrough case "raft:leave": - cmd = &DefaultLeaveCommand{} + return nil, fmt.Errorf("found a raft join/leave command; these shouldn't be in an etcd log") case "raft:nop": cmd = &NOPCommand{} default: @@ -130,27 +170,43 @@ func NewCommand4(name string, data []byte) (Command4, error) { } } + switch name { + case "etcd:join": + c := cmd.(*JoinCommand) + m := generateNodeMember(c.Name, c.RaftURL, c.EtcdURL) + c.memb = *m + if raftMap != nil { + raftMap[c.Name] = uint64(m.ID) + } + case "etcd:remove": + c := cmd.(*RemoveCommand) + if raftMap != nil { + m, ok := raftMap[c.Name] + if !ok { + return nil, fmt.Errorf("removing a node named %s before it joined", c.Name) + } + c.id = m + delete(raftMap, c.Name) + } + } return cmd, nil } type RemoveCommand struct { Name string `json:"name"` + id uint64 } func (c *RemoveCommand) Type5() raftpb.EntryType { - return raftpb.EntryNormal + return raftpb.EntryConfChange } func (c *RemoveCommand) Data5() ([]byte, error) { - m := etcdserver.Member{ - ID: hashName(c.Name), + req5 := raftpb.ConfChange{ + ID: 0, + Type: raftpb.ConfChangeRemoveNode, + NodeID: c.id, } - - req5 := &etcdserverpb.Request{ - Method: "DELETE", - Path: m.StoreKey(), - } - return req5.Marshal() } @@ -158,46 +214,26 @@ type JoinCommand struct { Name string `json:"name"` RaftURL string `json:"raftURL"` EtcdURL string `json:"etcdURL"` - - //TODO(bcwaldon): Should these be converted? - //MinVersion int `json:"minVersion"` - //MaxVersion int `json:"maxVersion"` + memb etcdserver.Member } func (c *JoinCommand) Type5() raftpb.EntryType { - return raftpb.EntryNormal + return raftpb.EntryConfChange } func (c *JoinCommand) Data5() ([]byte, error) { - pURLs, err := types.NewURLs([]string{c.RaftURL}) + b, err := json.Marshal(c.memb) if err != nil { return nil, err } - m := etcdserver.GenerateMember(c.Name, pURLs, nil) - - //TODO(bcwaldon): why doesn't this go through GenerateMember? - m.ClientURLs = []string{c.EtcdURL} - - b, err := json.Marshal(*m) - if err != nil { - return nil, err + req5 := &raftpb.ConfChange{ + ID: 0, + Type: raftpb.ConfChangeAddNode, + NodeID: uint64(c.memb.ID), + Context: b, } - - req5 := &etcdserverpb.Request{ - Method: "PUT", - Path: m.StoreKey(), - Val: string(b), - - // TODO(bcwaldon): Is this correct? - Time: store.Permanent.Unix(), - - //TODO(bcwaldon): What is the new equivalent of Unique? - //Unique: c.Unique, - } - return req5.Marshal() - } type SetClusterConfigCommand struct { @@ -223,9 +259,6 @@ func (c *SetClusterConfigCommand) Data5() ([]byte, error) { Path: "/v2/admin/config", Dir: false, Val: string(b), - - // TODO(bcwaldon): Is this correct? - Time: store.Permanent.Unix(), } return req5.Marshal() @@ -244,7 +277,7 @@ func (c *CompareAndDeleteCommand) Type5() raftpb.EntryType { func (c *CompareAndDeleteCommand) Data5() ([]byte, error) { req5 := &etcdserverpb.Request{ Method: "DELETE", - Path: c.Key, + Path: StorePath(c.Key), PrevValue: c.PrevValue, PrevIndex: c.PrevIndex, } @@ -265,12 +298,12 @@ func (c *CompareAndSwapCommand) Type5() raftpb.EntryType { func (c *CompareAndSwapCommand) Data5() ([]byte, error) { req5 := &etcdserverpb.Request{ - Method: "PUT", - Path: c.Key, - Val: c.Value, - PrevValue: c.PrevValue, - PrevIndex: c.PrevIndex, - Time: c.ExpireTime.Unix(), + Method: "PUT", + Path: StorePath(c.Key), + Val: c.Value, + PrevValue: c.PrevValue, + PrevIndex: c.PrevIndex, + Expiration: UnixTimeOrPermanent(c.ExpireTime), } return req5.Marshal() } @@ -289,16 +322,17 @@ func (c *CreateCommand) Type5() raftpb.EntryType { func (c *CreateCommand) Data5() ([]byte, error) { req5 := &etcdserverpb.Request{ - Method: "PUT", - Path: c.Key, - Dir: c.Dir, - Val: c.Value, - - // TODO(bcwaldon): Is this correct? - Time: c.ExpireTime.Unix(), - - //TODO(bcwaldon): What is the new equivalent of Unique? - //Unique: c.Unique, + Path: StorePath(c.Key), + Dir: c.Dir, + Val: c.Value, + Expiration: UnixTimeOrPermanent(c.ExpireTime), + } + if c.Unique { + req5.Method = "POST" + } else { + var prevExist = true + req5.Method = "PUT" + req5.PrevExist = &prevExist } return req5.Marshal() } @@ -316,7 +350,7 @@ func (c *DeleteCommand) Type5() raftpb.EntryType { func (c *DeleteCommand) Data5() ([]byte, error) { req5 := &etcdserverpb.Request{ Method: "DELETE", - Path: c.Key, + Path: StorePath(c.Key), Dir: c.Dir, Recursive: c.Recursive, } @@ -336,13 +370,11 @@ func (c *SetCommand) Type5() raftpb.EntryType { func (c *SetCommand) Data5() ([]byte, error) { req5 := &etcdserverpb.Request{ - Method: "PUT", - Path: c.Key, - Dir: c.Dir, - Val: c.Value, - - //TODO(bcwaldon): Is this correct? - Time: c.ExpireTime.Unix(), + Method: "PUT", + Path: StorePath(c.Key), + Dir: c.Dir, + Val: c.Value, + Expiration: UnixTimeOrPermanent(c.ExpireTime), } return req5.Marshal() } @@ -358,13 +390,13 @@ func (c *UpdateCommand) Type5() raftpb.EntryType { } func (c *UpdateCommand) Data5() ([]byte, error) { + exist := true req5 := &etcdserverpb.Request{ - Method: "PUT", - Path: c.Key, - Val: c.Value, - - //TODO(bcwaldon): Is this correct? - Time: c.ExpireTime.Unix(), + Method: "PUT", + Path: StorePath(c.Key), + Val: c.Value, + PrevExist: &exist, + Expiration: UnixTimeOrPermanent(c.ExpireTime), } return req5.Marshal() } @@ -380,30 +412,19 @@ func (c *SyncCommand) Type5() raftpb.EntryType { func (c *SyncCommand) Data5() ([]byte, error) { req5 := &etcdserverpb.Request{ Method: "SYNC", - //TODO(bcwaldon): Is this correct? - Time: c.Time.UnixNano(), + Time: c.Time.UnixNano(), } return req5.Marshal() } type DefaultJoinCommand struct { - //TODO(bcwaldon): implement Type5, Data5 - Command4 - Name string `json:"name"` ConnectionString string `json:"connectionString"` } type DefaultLeaveCommand struct { - //TODO(bcwaldon): implement Type5, Data5 - Command4 - Name string `json:"name"` -} - -//TODO(bcwaldon): Why is CommandName here? -func (c *DefaultLeaveCommand) CommandName() string { - return "raft:leave" + id uint64 } type NOPCommand struct{} @@ -421,7 +442,7 @@ func (c *NOPCommand) Data5() ([]byte, error) { return nil, nil } -func Entries4To5(commitIndex uint64, ents4 []*etcd4pb.LogEntry) ([]raftpb.Entry, error) { +func Entries4To5(ents4 []*etcd4pb.LogEntry) ([]raftpb.Entry, error) { ents4Len := len(ents4) if ents4Len == 0 { @@ -438,11 +459,12 @@ func Entries4To5(commitIndex uint64, ents4 []*etcd4pb.LogEntry) ([]raftpb.Entry, } } + raftMap := make(map[string]uint64) ents5 := make([]raftpb.Entry, 0) for i, e := range ents4 { - ent, err := toEntry5(e) + ent, err := toEntry5(e, raftMap) if err != nil { - log.Printf("Ignoring invalid log data in entry %d: %v", i, err) + log.Fatalf("Error converting entry %d, %s", i, err) } else { ents5 = append(ents5, *ent) } @@ -451,8 +473,8 @@ func Entries4To5(commitIndex uint64, ents4 []*etcd4pb.LogEntry) ([]raftpb.Entry, return ents5, nil } -func toEntry5(ent4 *etcd4pb.LogEntry) (*raftpb.Entry, error) { - cmd4, err := NewCommand4(ent4.GetCommandName(), ent4.GetCommand()) +func toEntry5(ent4 *etcd4pb.LogEntry, raftMap map[string]uint64) (*raftpb.Entry, error) { + cmd4, err := NewCommand4(ent4.GetCommandName(), ent4.GetCommand(), raftMap) if err != nil { return nil, err } @@ -463,8 +485,8 @@ func toEntry5(ent4 *etcd4pb.LogEntry) (*raftpb.Entry, error) { } ent5 := raftpb.Entry{ - Term: int64(ent4.GetTerm()), - Index: int64(ent4.GetIndex()), + Term: ent4.GetTerm(), + Index: ent4.GetIndex(), Type: cmd4.Type5(), Data: data, } @@ -473,3 +495,14 @@ func toEntry5(ent4 *etcd4pb.LogEntry) (*raftpb.Entry, error) { return &ent5, nil } + +func generateNodeMember(name, rafturl, etcdurl string) *etcdserver.Member { + pURLs, err := types.NewURLs([]string{rafturl}) + if err != nil { + log.Fatalf("Invalid Raft URL %s -- this log could never have worked", rafturl) + } + + m := etcdserver.NewMember(name, pURLs, etcdDefaultClusterName, nil) + m.ClientURLs = []string{etcdurl} + return m +} diff --git a/migrate/log_test.go b/migrate/log_test.go index 366d2cfa3..b1db15079 100644 --- a/migrate/log_test.go +++ b/migrate/log_test.go @@ -1,42 +1,57 @@ package migrate import ( + "fmt" + "net/url" "reflect" "testing" "time" + + "github.com/coreos/etcd/etcdserver" ) func TestNewCommand(t *testing.T) { - entries, err := ReadLogFile("fixtures/cmdlog") + entries, err := DecodeLog4FromFile("fixtures/cmdlog") if err != nil { t.Errorf("read log file error: %v", err) } - tests := []interface{}{ - &JoinCommand{2, 2, "1.local", "http://127.0.0.1:7001", "http://127.0.0.1:4001"}, - &SetClusterConfigCommand{&ClusterConfig{9, 1800.0, 5.0}}, - &NOPCommand{}, - &RemoveCommand{"alice"}, - &CompareAndDeleteCommand{"foo", "baz", 9}, - &CompareAndSwapCommand{"foo", "bar", time.Unix(0, 0), "baz", 9}, - &CreateCommand{"foo", "bar", time.Unix(0, 0), true, true}, - &DeleteCommand{"foo", true, true}, - &SetCommand{"foo", "bar", time.Unix(0, 0), true}, - &SyncCommand{time.Unix(0, 0)}, - &UpdateCommand{"foo", "bar", time.Unix(0, 0)}, - &DefaultLeaveCommand{"alice"}, - &DefaultJoinCommand{"alice", ""}, + zeroTime, err := time.Parse(time.RFC3339, "1969-12-31T16:00:00-08:00") + if err != nil { + t.Errorf("couldn't create time: %v", err) } - for i, e := range entries { - cmd, err := NewCommand(e.GetCommandName(), e.GetCommand()) + m := etcdserver.NewMember("alice", []url.URL{{Scheme: "http", Host: "127.0.0.1:7001"}}, etcdDefaultClusterName, nil) + m.ClientURLs = []string{"http://127.0.0.1:4001"} + + tests := []interface{}{ + &JoinCommand{"alice", "http://127.0.0.1:7001", "http://127.0.0.1:4001", *m}, + &NOPCommand{}, + &NOPCommand{}, + &RemoveCommand{"alice", 0xe52ada62956ff923}, + &CompareAndDeleteCommand{"foo", "baz", 9}, + &CompareAndSwapCommand{"foo", "bar", zeroTime, "baz", 9}, + &CreateCommand{"foo", "bar", zeroTime, true, true}, + &DeleteCommand{"foo", true, true}, + &SetCommand{"foo", "bar", zeroTime, true}, + &SyncCommand{zeroTime}, + &UpdateCommand{"foo", "bar", zeroTime}, + } + + raftMap := make(map[string]uint64) + for i, test := range tests { + e := entries[i] + cmd, err := NewCommand4(e.GetCommandName(), e.GetCommand(), raftMap) if err != nil { t.Errorf("#%d: %v", i, err) continue } - if !reflect.DeepEqual(cmd, tests[i]) { - t.Errorf("#%d: cmd = %+v, want %+v", i, cmd, tests[i]) + if !reflect.DeepEqual(cmd, test) { + if i == 5 { + fmt.Println(cmd.(*CompareAndSwapCommand).ExpireTime.Location()) + } + t.Errorf("#%d: cmd = %+v, want %+v", i, cmd, test) } } } diff --git a/migrate/snapshot.go b/migrate/snapshot.go index 5b4a2fe79..57dcf501f 100644 --- a/migrate/snapshot.go +++ b/migrate/snapshot.go @@ -7,11 +7,13 @@ import ( "hash/crc32" "io/ioutil" "log" + "net/url" "os" "path" "sort" "strconv" "strings" + "time" raftpb "github.com/coreos/etcd/raft/raftpb" ) @@ -25,17 +27,155 @@ type Snapshot4 struct { Name string `json:"name"` ConnectionString string `json:"connectionString"` } `json:"peers"` +} - //TODO(bcwaldon): is this needed? - //Path string `json:"path"` +type sstore struct { + Root *node + CurrentIndex uint64 + CurrentVersion int +} + +type node struct { + Path string + + CreatedIndex uint64 + ModifiedIndex uint64 + + Parent *node `json:"-"` // should not encode this field! avoid circular dependency. + + ExpireTime time.Time + ACL string + Value string // for key-value pair + Children map[string]*node // for directory +} + +func replacePathNames(n *node, s1, s2 string) { + n.Path = path.Clean(strings.Replace(n.Path, s1, s2, 1)) + for _, c := range n.Children { + replacePathNames(c, s1, s2) + } +} + +func pullNodesFromEtcd(n *node) map[string]uint64 { + out := make(map[string]uint64) + machines := n.Children["machines"] + for name, c := range machines.Children { + q, err := url.ParseQuery(c.Value) + if err != nil { + log.Fatal("Couldn't parse old query string value") + } + etcdurl := q.Get("etcd") + rafturl := q.Get("raft") + + m := generateNodeMember(name, rafturl, etcdurl) + out[m.Name] = uint64(m.ID) + } + return out +} + +func fixEtcd(n *node) { + n.Path = "/0" + machines := n.Children["machines"] + n.Children["members"] = &node{ + Path: "/0/members", + CreatedIndex: machines.CreatedIndex, + ModifiedIndex: machines.ModifiedIndex, + ExpireTime: machines.ExpireTime, + ACL: machines.ACL, + Children: make(map[string]*node), + } + for name, c := range machines.Children { + q, err := url.ParseQuery(c.Value) + if err != nil { + log.Fatal("Couldn't parse old query string value") + } + etcdurl := q.Get("etcd") + rafturl := q.Get("raft") + + m := generateNodeMember(name, rafturl, etcdurl) + attrBytes, err := json.Marshal(m.Attributes) + if err != nil { + log.Fatal("Couldn't marshal attributes") + } + raftBytes, err := json.Marshal(m.RaftAttributes) + if err != nil { + log.Fatal("Couldn't marshal raft attributes") + } + newNode := &node{ + Path: path.Join("/0/members", m.ID.String()), + CreatedIndex: c.CreatedIndex, + ModifiedIndex: c.ModifiedIndex, + ExpireTime: c.ExpireTime, + ACL: c.ACL, + Children: map[string]*node{ + "attributes": &node{ + Path: path.Join("/0/members", m.ID.String(), "attributes"), + CreatedIndex: c.CreatedIndex, + ModifiedIndex: c.ModifiedIndex, + ExpireTime: c.ExpireTime, + ACL: c.ACL, + Value: string(attrBytes), + }, + "raftAttributes": &node{ + Path: path.Join("/0/members", m.ID.String(), "raftAttributes"), + CreatedIndex: c.CreatedIndex, + ModifiedIndex: c.ModifiedIndex, + ExpireTime: c.ExpireTime, + ACL: c.ACL, + Value: string(raftBytes), + }, + }, + } + n.Children["members"].Children[m.ID.String()] = newNode + } + delete(n.Children, "machines") + +} + +func mangleRoot(n *node) *node { + newRoot := &node{ + Path: "/", + CreatedIndex: n.CreatedIndex, + ModifiedIndex: n.ModifiedIndex, + ExpireTime: n.ExpireTime, + ACL: n.ACL, + Children: make(map[string]*node), + } + newRoot.Children["1"] = n + etcd := n.Children["_etcd"] + delete(n.Children, "_etcd") + replacePathNames(n, "/", "/1/") + fixEtcd(etcd) + newRoot.Children["0"] = etcd + return newRoot +} + +func (s *Snapshot4) GetNodesFromStore() map[string]uint64 { + st := &sstore{} + if err := json.Unmarshal(s.State, st); err != nil { + log.Fatal("Couldn't unmarshal snapshot") + } + etcd := st.Root.Children["_etcd"] + return pullNodesFromEtcd(etcd) } func (s *Snapshot4) Snapshot5() *raftpb.Snapshot { + st := &sstore{} + if err := json.Unmarshal(s.State, st); err != nil { + log.Fatal("Couldn't unmarshal snapshot") + } + st.Root = mangleRoot(st.Root) + + newState, err := json.Marshal(st) + if err != nil { + log.Fatal("Couldn't re-marshal new snapshot") + } + snap5 := raftpb.Snapshot{ - Data: s.State, - Index: int64(s.LastIndex), - Term: int64(s.LastTerm), - Nodes: make([]int64, len(s.Peers)), + Data: newState, + Index: s.LastIndex, + Term: s.LastTerm, + Nodes: make([]uint64, len(s.Peers)), } for i, p := range s.Peers { @@ -132,6 +272,7 @@ func DecodeSnapshot4(f *os.File) (*Snapshot4, error) { } func NewSnapshotFileNames(names []string) ([]SnapshotFileName, error) { + s := make([]SnapshotFileName, 0) for _, n := range names { trimmed := strings.TrimSuffix(n, ".ss") @@ -149,12 +290,12 @@ func NewSnapshotFileNames(names []string) ([]SnapshotFileName, error) { var err error fn.Term, err = strconv.ParseUint(parts[0], 10, 64) if err != nil { - return nil, fmt.Errorf("unable to parse term from filename %q: %v", err) + return nil, fmt.Errorf("unable to parse term from filename %q: %v", n, err) } fn.Index, err = strconv.ParseUint(parts[1], 10, 64) if err != nil { - return nil, fmt.Errorf("unable to parse index from filename %q: %v", err) + return nil, fmt.Errorf("unable to parse index from filename %q: %v", n, err) } s = append(s, fn) diff --git a/test b/test index 7fd900684..99d2048bc 100755 --- a/test +++ b/test @@ -15,7 +15,7 @@ COVER=${COVER:-"-cover"} source ./build # Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt. -TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration pkg/flags pkg/types pkg/transport pkg/wait proxy raft snap store wal" +TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration migrate pkg/flags pkg/types pkg/transport pkg/wait proxy raft snap store wal" FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go etcdctl/" # user has not provided PKG override