diff --git a/etcdserver/cluster_store.go b/etcdserver/cluster_store.go index 93f650c90..f0c8e278b 100644 --- a/etcdserver/cluster_store.go +++ b/etcdserver/cluster_store.go @@ -59,6 +59,7 @@ func (s *clusterStore) Get() Cluster { if err := json.Unmarshal([]byte(*n.Value), &m); err != nil { log.Panicf("unmarshal peer error: %v", err) } + log.Printf("Found member in cluster: %#v", m) err := c.Add(m) if err != nil { log.Panicf("add member to cluster should never fail: %v", err) diff --git a/etcdserver/member.go b/etcdserver/member.go index 843886d74..58583a012 100644 --- a/etcdserver/member.go +++ b/etcdserver/member.go @@ -47,7 +47,7 @@ type Member struct { Attributes } -// newMember creates a Member without an ID and generates one based on the +// NewMember creates a Member without an ID and generates one based on the // name, peer URLs. This is used for bootstrapping/adding new member. func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member { m := &Member{ diff --git a/migrate/cmd/etcd-dump-logs/main.go b/migrate/cmd/etcd-dump-logs/main.go new file mode 100644 index 000000000..17759e2a3 --- /dev/null +++ b/migrate/cmd/etcd-dump-logs/main.go @@ -0,0 +1,90 @@ +package main + +import ( + "errors" + "flag" + "fmt" + "log" + "path" + + etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/migrate" + raftpb "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/wal" +) + +func walDir5(dataDir string) string { + return path.Join(dataDir, "wal") +} + +func logFile4(dataDir string) string { + return path.Join(dataDir, "log") +} + +func main() { + version := flag.Int("version", 5, "4 or 5") + from := flag.String("data-dir", "", "") + flag.Parse() + + if *from == "" { + log.Fatal("Must provide -from flag") + } + + var ents []raftpb.Entry + var err error + switch *version { + case 4: + ents, err = dump4(*from) + case 5: + ents, err = dump5(*from) + default: + err = errors.New("value of -version flag must be 4 or 5") + } + + if err != nil { + log.Fatalf("Failed decoding log: %v", err) + } + + for _, e := range ents { + msg := fmt.Sprintf("%2d %5d: ", e.Term, e.Index) + switch e.Type { + case raftpb.EntryNormal: + msg = fmt.Sprintf("%s norm", msg) + var r etcdserverpb.Request + if err := r.Unmarshal(e.Data); err != nil { + msg = fmt.Sprintf("%s ???", msg) + } else { + msg = fmt.Sprintf("%s %s %s %s", msg, r.Method, r.Path, r.Val) + } + case raftpb.EntryConfChange: + msg = fmt.Sprintf("%s conf", msg) + } + fmt.Println(msg) + } +} + +func dump4(dataDir string) ([]raftpb.Entry, error) { + lf4 := logFile4(dataDir) + ents, err := migrate.DecodeLog4FromFile(lf4) + if err != nil { + return nil, err + } + + return migrate.Entries4To5(0, ents) +} + +func dump5(dataDir string) ([]raftpb.Entry, error) { + wd5 := walDir5(dataDir) + if !wal.Exist(wd5) { + return nil, fmt.Errorf("No wal exists at %s", wd5) + } + + w, err := wal.OpenAtIndex(wd5, 0) + if err != nil { + return nil, err + } + defer w.Close() + + _, _, ents, err := w.ReadAll() + return ents, err +} diff --git a/migrate/cmd/etcd-migrate/main.go b/migrate/cmd/etcd-migrate/main.go new file mode 100644 index 000000000..ead6cea4f --- /dev/null +++ b/migrate/cmd/etcd-migrate/main.go @@ -0,0 +1,22 @@ +package main + +import ( + "flag" + "log" + + "github.com/coreos/etcd/migrate" +) + +func main() { + from := flag.String("data-dir", "", "etcd v0.4 data-dir") + flag.Parse() + + if *from == "" { + log.Fatal("Must provide -from flag") + } + + err := migrate.Migrate4To5(*from) + if err != nil { + log.Fatalf("Failed migrating data-dir: %v", err) + } +} diff --git a/migrate/config.go b/migrate/config.go new file mode 100644 index 000000000..a77884317 --- /dev/null +++ b/migrate/config.go @@ -0,0 +1,40 @@ +package migrate + +import ( + "encoding/json" + "io/ioutil" + + raftpb "github.com/coreos/etcd/raft/raftpb" +) + +type Config4 struct { + CommitIndex uint64 `json:"commitIndex"` + + //TODO(bcwaldon): is this needed? + //Peers []struct{ + // Name string `json:"name"` + // ConnectionString string `json:"connectionString"` + //} `json:"peers"` +} + +func (c *Config4) HardState5() raftpb.HardState { + return raftpb.HardState{ + Commit: int64(c.CommitIndex), + Term: 0, + Vote: 0, + } +} + +func DecodeConfig4FromFile(cfgPath string) (*Config4, error) { + b, err := ioutil.ReadFile(cfgPath) + if err != nil { + return nil, err + } + + conf := &Config4{} + if err = json.Unmarshal(b, conf); err != nil { + return nil, err + } + + return conf, nil +} diff --git a/migrate/etcd4.go b/migrate/etcd4.go new file mode 100644 index 000000000..436521f49 --- /dev/null +++ b/migrate/etcd4.go @@ -0,0 +1,98 @@ +package migrate + +import ( + "fmt" + "log" + "os" + "path" + + raftpb "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/snap" + "github.com/coreos/etcd/wal" +) + +func snapDir4(dataDir string) string { + return path.Join(dataDir, "snapshot") +} + +func logFile4(dataDir string) string { + return path.Join(dataDir, "log") +} + +func cfgFile4(dataDir string) string { + return path.Join(dataDir, "conf") +} + +func snapDir5(dataDir string) string { + return path.Join(dataDir, "snap") +} + +func walDir5(dataDir string) string { + return path.Join(dataDir, "wal") +} + +func Migrate4To5(dataDir string) error { + // prep new directories + sd5 := snapDir5(dataDir) + if err := os.MkdirAll(sd5, 0700); err != nil { + return fmt.Errorf("failed creating snapshot directory %s: %v", sd5, err) + } + + wd5 := walDir5(dataDir) + w, err := wal.Create(wd5) + if err != nil { + return fmt.Errorf("failed initializing wal at %s: %v", wd5, err) + } + defer w.Close() + + // read v0.4 data + snap4, err := DecodeLatestSnapshot4FromDir(snapDir4(dataDir)) + if err != nil { + return err + } + + cfg4, err := DecodeConfig4FromFile(cfgFile4(dataDir)) + if err != nil { + return err + } + + ents4, err := DecodeLog4FromFile(logFile4(dataDir)) + if err != nil { + return err + } + + // transform v0.4 data + var snap5 *raftpb.Snapshot + if snap4 == nil { + log.Printf("No snapshot found") + } else { + log.Printf("Found snapshot: lastIndex=%d", snap4.LastIndex) + + snap5 = snap4.Snapshot5() + } + + st5 := cfg4.HardState5() + + ents5, err := Entries4To5(uint64(st5.Commit), ents4) + if err != nil { + return err + } + + ents5Len := len(ents5) + log.Printf("Found %d log entries: firstIndex=%d lastIndex=%d", ents5Len, ents5[0].Index, ents5[ents5Len-1].Index) + + // migrate snapshot (if necessary) and logs + if snap5 != nil { + ss := snap.New(sd5) + ss.SaveSnap(*snap5) + log.Printf("Snapshot migration successful") + } + + // explicitly prepend an empty entry as the WAL code expects it + ents5 = append(make([]raftpb.Entry, 1), ents5...) + + w.Save(st5, ents5) + log.Printf("Log migration successful") + + return nil +} diff --git a/migrate/etcd4pb/log_entry.pb.go b/migrate/etcd4pb/log_entry.pb.go new file mode 100644 index 000000000..adab85b9b --- /dev/null +++ b/migrate/etcd4pb/log_entry.pb.go @@ -0,0 +1,552 @@ +// Code generated by protoc-gen-gogo. +// source: log_entry.proto +// DO NOT EDIT! + +package protobuf + +import proto "github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto" +import json "encoding/json" +import math "math" + +// discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo.pb" + +import io "io" +import code_google_com_p_gogoprotobuf_proto "github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto" + +import fmt "fmt" +import strings "strings" +import reflect "reflect" + +import fmt1 "fmt" +import strings1 "strings" +import code_google_com_p_gogoprotobuf_proto1 "github.com/coreos/etcd/third_party/code.google.com/p/gogoprotobuf/proto" +import sort "sort" +import strconv "strconv" +import reflect1 "reflect" + +import fmt2 "fmt" +import bytes "bytes" + +// Reference proto, json, and math imports to suppress error if they are not otherwise used. +var _ = proto.Marshal +var _ = &json.SyntaxError{} +var _ = math.Inf + +type LogEntry struct { + Index *uint64 `protobuf:"varint,1,req" json:"Index,omitempty"` + Term *uint64 `protobuf:"varint,2,req" json:"Term,omitempty"` + CommandName *string `protobuf:"bytes,3,req" json:"CommandName,omitempty"` + Command []byte `protobuf:"bytes,4,opt" json:"Command,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *LogEntry) Reset() { *m = LogEntry{} } +func (*LogEntry) ProtoMessage() {} + +func (m *LogEntry) GetIndex() uint64 { + if m != nil && m.Index != nil { + return *m.Index + } + return 0 +} + +func (m *LogEntry) GetTerm() uint64 { + if m != nil && m.Term != nil { + return *m.Term + } + return 0 +} + +func (m *LogEntry) GetCommandName() string { + if m != nil && m.CommandName != nil { + return *m.CommandName + } + return "" +} + +func (m *LogEntry) GetCommand() []byte { + if m != nil { + return m.Command + } + return nil +} + +func init() { +} +func (m *LogEntry) Unmarshal(data []byte) error { + l := len(data) + index := 0 + for index < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + switch fieldNum { + case 1: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + var v uint64 + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + v |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Index = &v + case 2: + if wireType != 0 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + var v uint64 + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + v |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Term = &v + case 3: + if wireType != 2 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := index + int(stringLen) + if postIndex > l { + return io.ErrUnexpectedEOF + } + s := string(data[index:postIndex]) + m.CommandName = &s + index = postIndex + case 4: + if wireType != 2 { + return code_google_com_p_gogoprotobuf_proto.ErrWrongType + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if index >= l { + return io.ErrUnexpectedEOF + } + b := data[index] + index++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + postIndex := index + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Command = append(m.Command, data[index:postIndex]...) + index = postIndex + default: + var sizeOfWire int + for { + sizeOfWire++ + wire >>= 7 + if wire == 0 { + break + } + } + index -= sizeOfWire + skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:]) + if err != nil { + return err + } + if (index + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...) + index += skippy + } + } + return nil +} +func (this *LogEntry) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&LogEntry{`, + `Index:` + valueToStringLogEntry(this.Index) + `,`, + `Term:` + valueToStringLogEntry(this.Term) + `,`, + `CommandName:` + valueToStringLogEntry(this.CommandName) + `,`, + `Command:` + valueToStringLogEntry(this.Command) + `,`, + `XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`, + `}`, + }, "") + return s +} +func valueToStringLogEntry(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *LogEntry) Size() (n int) { + var l int + _ = l + if m.Index != nil { + n += 1 + sovLogEntry(uint64(*m.Index)) + } + if m.Term != nil { + n += 1 + sovLogEntry(uint64(*m.Term)) + } + if m.CommandName != nil { + l = len(*m.CommandName) + n += 1 + l + sovLogEntry(uint64(l)) + } + if m.Command != nil { + l = len(m.Command) + n += 1 + l + sovLogEntry(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovLogEntry(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozLogEntry(x uint64) (n int) { + return sovLogEntry(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func NewPopulatedLogEntry(r randyLogEntry, easy bool) *LogEntry { + this := &LogEntry{} + v1 := uint64(r.Uint32()) + this.Index = &v1 + v2 := uint64(r.Uint32()) + this.Term = &v2 + v3 := randStringLogEntry(r) + this.CommandName = &v3 + if r.Intn(10) != 0 { + v4 := r.Intn(100) + this.Command = make([]byte, v4) + for i := 0; i < v4; i++ { + this.Command[i] = byte(r.Intn(256)) + } + } + if !easy && r.Intn(10) != 0 { + this.XXX_unrecognized = randUnrecognizedLogEntry(r, 5) + } + return this +} + +type randyLogEntry interface { + Float32() float32 + Float64() float64 + Int63() int64 + Int31() int32 + Uint32() uint32 + Intn(n int) int +} + +func randUTF8RuneLogEntry(r randyLogEntry) rune { + res := rune(r.Uint32() % 1112064) + if 55296 <= res { + res += 2047 + } + return res +} +func randStringLogEntry(r randyLogEntry) string { + v5 := r.Intn(100) + tmps := make([]rune, v5) + for i := 0; i < v5; i++ { + tmps[i] = randUTF8RuneLogEntry(r) + } + return string(tmps) +} +func randUnrecognizedLogEntry(r randyLogEntry, maxFieldNumber int) (data []byte) { + l := r.Intn(5) + for i := 0; i < l; i++ { + wire := r.Intn(4) + if wire == 3 { + wire = 5 + } + fieldNumber := maxFieldNumber + r.Intn(100) + data = randFieldLogEntry(data, r, fieldNumber, wire) + } + return data +} +func randFieldLogEntry(data []byte, r randyLogEntry, fieldNumber int, wire int) []byte { + key := uint32(fieldNumber)<<3 | uint32(wire) + switch wire { + case 0: + data = encodeVarintPopulateLogEntry(data, uint64(key)) + data = encodeVarintPopulateLogEntry(data, uint64(r.Int63())) + case 1: + data = encodeVarintPopulateLogEntry(data, uint64(key)) + data = append(data, byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256))) + case 2: + data = encodeVarintPopulateLogEntry(data, uint64(key)) + ll := r.Intn(100) + data = encodeVarintPopulateLogEntry(data, uint64(ll)) + for j := 0; j < ll; j++ { + data = append(data, byte(r.Intn(256))) + } + default: + data = encodeVarintPopulateLogEntry(data, uint64(key)) + data = append(data, byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256))) + } + return data +} +func encodeVarintPopulateLogEntry(data []byte, v uint64) []byte { + for v >= 1<<7 { + data = append(data, uint8(uint64(v)&0x7f|0x80)) + v >>= 7 + } + data = append(data, uint8(v)) + return data +} +func (m *LogEntry) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *LogEntry) MarshalTo(data []byte) (n int, err error) { + var i int + _ = i + var l int + _ = l + if m.Index != nil { + data[i] = 0x8 + i++ + i = encodeVarintLogEntry(data, i, uint64(*m.Index)) + } + if m.Term != nil { + data[i] = 0x10 + i++ + i = encodeVarintLogEntry(data, i, uint64(*m.Term)) + } + if m.CommandName != nil { + data[i] = 0x1a + i++ + i = encodeVarintLogEntry(data, i, uint64(len(*m.CommandName))) + i += copy(data[i:], *m.CommandName) + } + if m.Command != nil { + data[i] = 0x22 + i++ + i = encodeVarintLogEntry(data, i, uint64(len(m.Command))) + i += copy(data[i:], m.Command) + } + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} +func encodeFixed64LogEntry(data []byte, offset int, v uint64) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + data[offset+4] = uint8(v >> 32) + data[offset+5] = uint8(v >> 40) + data[offset+6] = uint8(v >> 48) + data[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32LogEntry(data []byte, offset int, v uint32) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintLogEntry(data []byte, offset int, v uint64) int { + for v >= 1<<7 { + data[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + data[offset] = uint8(v) + return offset + 1 +} +func (this *LogEntry) GoString() string { + if this == nil { + return "nil" + } + s := strings1.Join([]string{`&protobuf.LogEntry{` + `Index:` + valueToGoStringLogEntry(this.Index, "uint64"), `Term:` + valueToGoStringLogEntry(this.Term, "uint64"), `CommandName:` + valueToGoStringLogEntry(this.CommandName, "string"), `Command:` + valueToGoStringLogEntry(this.Command, "byte"), `XXX_unrecognized:` + fmt1.Sprintf("%#v", this.XXX_unrecognized) + `}`}, ", ") + return s +} +func valueToGoStringLogEntry(v interface{}, typ string) string { + rv := reflect1.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect1.Indirect(rv).Interface() + return fmt1.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} +func extensionToGoStringLogEntry(e map[int32]code_google_com_p_gogoprotobuf_proto1.Extension) string { + if e == nil { + return "nil" + } + s := "map[int32]proto.Extension{" + keys := make([]int, 0, len(e)) + for k := range e { + keys = append(keys, int(k)) + } + sort.Ints(keys) + ss := []string{} + for _, k := range keys { + ss = append(ss, strconv.Itoa(k)+": "+e[int32(k)].GoString()) + } + s += strings1.Join(ss, ",") + "}" + return s +} +func (this *LogEntry) VerboseEqual(that interface{}) error { + if that == nil { + if this == nil { + return nil + } + return fmt2.Errorf("that == nil && this != nil") + } + + that1, ok := that.(*LogEntry) + if !ok { + return fmt2.Errorf("that is not of type *LogEntry") + } + if that1 == nil { + if this == nil { + return nil + } + return fmt2.Errorf("that is type *LogEntry but is nil && this != nil") + } else if this == nil { + return fmt2.Errorf("that is type *LogEntrybut is not nil && this == nil") + } + if this.Index != nil && that1.Index != nil { + if *this.Index != *that1.Index { + return fmt2.Errorf("Index this(%v) Not Equal that(%v)", *this.Index, *that1.Index) + } + } else if this.Index != nil { + return fmt2.Errorf("this.Index == nil && that.Index != nil") + } else if that1.Index != nil { + return fmt2.Errorf("Index this(%v) Not Equal that(%v)", this.Index, that1.Index) + } + if this.Term != nil && that1.Term != nil { + if *this.Term != *that1.Term { + return fmt2.Errorf("Term this(%v) Not Equal that(%v)", *this.Term, *that1.Term) + } + } else if this.Term != nil { + return fmt2.Errorf("this.Term == nil && that.Term != nil") + } else if that1.Term != nil { + return fmt2.Errorf("Term this(%v) Not Equal that(%v)", this.Term, that1.Term) + } + if this.CommandName != nil && that1.CommandName != nil { + if *this.CommandName != *that1.CommandName { + return fmt2.Errorf("CommandName this(%v) Not Equal that(%v)", *this.CommandName, *that1.CommandName) + } + } else if this.CommandName != nil { + return fmt2.Errorf("this.CommandName == nil && that.CommandName != nil") + } else if that1.CommandName != nil { + return fmt2.Errorf("CommandName this(%v) Not Equal that(%v)", this.CommandName, that1.CommandName) + } + if !bytes.Equal(this.Command, that1.Command) { + return fmt2.Errorf("Command this(%v) Not Equal that(%v)", this.Command, that1.Command) + } + if !bytes.Equal(this.XXX_unrecognized, that1.XXX_unrecognized) { + return fmt2.Errorf("XXX_unrecognized this(%v) Not Equal that(%v)", this.XXX_unrecognized, that1.XXX_unrecognized) + } + return nil +} +func (this *LogEntry) Equal(that interface{}) bool { + if that == nil { + if this == nil { + return true + } + return false + } + + that1, ok := that.(*LogEntry) + if !ok { + return false + } + if that1 == nil { + if this == nil { + return true + } + return false + } else if this == nil { + return false + } + if this.Index != nil && that1.Index != nil { + if *this.Index != *that1.Index { + return false + } + } else if this.Index != nil { + return false + } else if that1.Index != nil { + return false + } + if this.Term != nil && that1.Term != nil { + if *this.Term != *that1.Term { + return false + } + } else if this.Term != nil { + return false + } else if that1.Term != nil { + return false + } + if this.CommandName != nil && that1.CommandName != nil { + if *this.CommandName != *that1.CommandName { + return false + } + } else if this.CommandName != nil { + return false + } else if that1.CommandName != nil { + return false + } + if !bytes.Equal(this.Command, that1.Command) { + return false + } + if !bytes.Equal(this.XXX_unrecognized, that1.XXX_unrecognized) { + return false + } + return true +} diff --git a/migrate/etcd4pb/log_entry.proto b/migrate/etcd4pb/log_entry.proto new file mode 100644 index 000000000..39fa826c0 --- /dev/null +++ b/migrate/etcd4pb/log_entry.proto @@ -0,0 +1,22 @@ +package protobuf; + +import "code.google.com/p/gogoprotobuf/gogoproto/gogo.proto"; + +option (gogoproto.gostring_all) = true; +option (gogoproto.equal_all) = true; +option (gogoproto.verbose_equal_all) = true; +option (gogoproto.goproto_stringer_all) = false; +option (gogoproto.stringer_all) = true; +option (gogoproto.populate_all) = true; +option (gogoproto.testgen_all) = true; +option (gogoproto.benchgen_all) = true; +option (gogoproto.marshaler_all) = true; +option (gogoproto.sizer_all) = true; +option (gogoproto.unmarshaler_all) = true; + +message LogEntry { + required uint64 Index=1; + required uint64 Term=2; + required string CommandName=3; + optional bytes Command=4; // for nop-command +} diff --git a/migrate/fixtures/cmdlog b/migrate/fixtures/cmdlog new file mode 100644 index 000000000..89691b678 Binary files /dev/null and b/migrate/fixtures/cmdlog differ diff --git a/migrate/log.go b/migrate/log.go new file mode 100644 index 000000000..977fdcf20 --- /dev/null +++ b/migrate/log.go @@ -0,0 +1,475 @@ +package migrate + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "log" + "os" + "time" + + "github.com/coreos/etcd/etcdserver" + etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb" + etcd4pb "github.com/coreos/etcd/migrate/etcd4pb" + "github.com/coreos/etcd/pkg/types" + raftpb "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/store" +) + +func DecodeLog4FromFile(logpath string) ([]*etcd4pb.LogEntry, error) { + file, err := os.OpenFile(logpath, os.O_RDWR, 0600) + if err != nil { + return nil, err + } + defer file.Close() + + return DecodeLog4(file) +} + +func DecodeLog4(file *os.File) ([]*etcd4pb.LogEntry, error) { + var readBytes int64 + entries := make([]*etcd4pb.LogEntry, 0) + + for { + entry, n, err := DecodeNextEntry4(file) + if err != nil { + if err == io.EOF { + break + } + return nil, fmt.Errorf("failed decoding next log entry: ", err) + } + + if entry != nil { + entries = append(entries, entry) + } + + readBytes += int64(n) + } + + return entries, nil +} + +// DecodeNextEntry4 unmarshals a v0.4 log entry from a reader. Returns the +// number of bytes read and any error that occurs. +func DecodeNextEntry4(r io.Reader) (*etcd4pb.LogEntry, int, error) { + var length int + _, err := fmt.Fscanf(r, "%8x\n", &length) + if err != nil { + return nil, -1, err + } + + data := make([]byte, length) + if _, err = io.ReadFull(r, data); err != nil { + return nil, -1, err + } + + ent4 := new(etcd4pb.LogEntry) + if err = ent4.Unmarshal(data); err != nil { + return nil, -1, err + } + + // add width of scanner token to length + length = length + 8 + 1 + + return ent4, length, nil +} + +func hashName(name string) int64 { + var sum int64 + for _, ch := range name { + sum = 131*sum + int64(ch) + } + return sum +} + +type Command4 interface { + Type5() raftpb.EntryType + Data5() ([]byte, error) +} + +func NewCommand4(name string, data []byte) (Command4, error) { + var cmd Command4 + + switch name { + case "etcd:remove": + cmd = &RemoveCommand{} + case "etcd:join": + cmd = &JoinCommand{} + case "etcd:setClusterConfig": + //TODO(bcwaldon): can this safely be discarded? + cmd = &NOPCommand{} + case "etcd:compareAndDelete": + cmd = &CompareAndDeleteCommand{} + case "etcd:compareAndSwap": + cmd = &CompareAndSwapCommand{} + case "etcd:create": + cmd = &CreateCommand{} + case "etcd:delete": + cmd = &DeleteCommand{} + case "etcd:set": + cmd = &SetCommand{} + case "etcd:sync": + cmd = &SyncCommand{} + case "etcd:update": + cmd = &UpdateCommand{} + case "raft:join": + cmd = &DefaultJoinCommand{} + case "raft:leave": + cmd = &DefaultLeaveCommand{} + case "raft:nop": + cmd = &NOPCommand{} + default: + return nil, fmt.Errorf("unregistered command type %s", name) + } + + // If data for the command was passed in the decode it. + if data != nil { + if err := json.NewDecoder(bytes.NewReader(data)).Decode(cmd); err != nil { + return nil, fmt.Errorf("unable to decode bytes %q: %v", data, err) + } + } + + return cmd, nil +} + +type RemoveCommand struct { + Name string `json:"name"` +} + +func (c *RemoveCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *RemoveCommand) Data5() ([]byte, error) { + m := etcdserver.Member{ + ID: hashName(c.Name), + } + + req5 := &etcdserverpb.Request{ + Method: "DELETE", + Path: m.StoreKey(), + } + + return req5.Marshal() +} + +type JoinCommand struct { + Name string `json:"name"` + RaftURL string `json:"raftURL"` + EtcdURL string `json:"etcdURL"` + + //TODO(bcwaldon): Should these be converted? + //MinVersion int `json:"minVersion"` + //MaxVersion int `json:"maxVersion"` +} + +func (c *JoinCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *JoinCommand) Data5() ([]byte, error) { + pURLs, err := types.NewURLs([]string{c.RaftURL}) + if err != nil { + return nil, err + } + + m := etcdserver.GenerateMember(c.Name, pURLs, nil) + + //TODO(bcwaldon): why doesn't this go through GenerateMember? + m.ClientURLs = []string{c.EtcdURL} + + b, err := json.Marshal(*m) + if err != nil { + return nil, err + } + + req5 := &etcdserverpb.Request{ + Method: "PUT", + Path: m.StoreKey(), + Val: string(b), + + // TODO(bcwaldon): Is this correct? + Time: store.Permanent.Unix(), + + //TODO(bcwaldon): What is the new equivalent of Unique? + //Unique: c.Unique, + } + + return req5.Marshal() + +} + +type SetClusterConfigCommand struct { + Config *struct { + ActiveSize int `json:"activeSize"` + RemoveDelay float64 `json:"removeDelay"` + SyncInterval float64 `json:"syncInterval"` + } `json:"config"` +} + +func (c *SetClusterConfigCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *SetClusterConfigCommand) Data5() ([]byte, error) { + b, err := json.Marshal(c.Config) + if err != nil { + return nil, err + } + + req5 := &etcdserverpb.Request{ + Method: "PUT", + Path: "/v2/admin/config", + Dir: false, + Val: string(b), + + // TODO(bcwaldon): Is this correct? + Time: store.Permanent.Unix(), + } + + return req5.Marshal() +} + +type CompareAndDeleteCommand struct { + Key string `json:"key"` + PrevValue string `json:"prevValue"` + PrevIndex uint64 `json:"prevIndex"` +} + +func (c *CompareAndDeleteCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *CompareAndDeleteCommand) Data5() ([]byte, error) { + req5 := &etcdserverpb.Request{ + Method: "DELETE", + Path: c.Key, + PrevValue: c.PrevValue, + PrevIndex: c.PrevIndex, + } + return req5.Marshal() +} + +type CompareAndSwapCommand struct { + Key string `json:"key"` + Value string `json:"value"` + ExpireTime time.Time `json:"expireTime"` + PrevValue string `json:"prevValue"` + PrevIndex uint64 `json:"prevIndex"` +} + +func (c *CompareAndSwapCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *CompareAndSwapCommand) Data5() ([]byte, error) { + req5 := &etcdserverpb.Request{ + Method: "PUT", + Path: c.Key, + Val: c.Value, + PrevValue: c.PrevValue, + PrevIndex: c.PrevIndex, + Time: c.ExpireTime.Unix(), + } + return req5.Marshal() +} + +type CreateCommand struct { + Key string `json:"key"` + Value string `json:"value"` + ExpireTime time.Time `json:"expireTime"` + Unique bool `json:"unique"` + Dir bool `json:"dir"` +} + +func (c *CreateCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *CreateCommand) Data5() ([]byte, error) { + req5 := &etcdserverpb.Request{ + Method: "PUT", + Path: c.Key, + Dir: c.Dir, + Val: c.Value, + + // TODO(bcwaldon): Is this correct? + Time: c.ExpireTime.Unix(), + + //TODO(bcwaldon): What is the new equivalent of Unique? + //Unique: c.Unique, + } + return req5.Marshal() +} + +type DeleteCommand struct { + Key string `json:"key"` + Recursive bool `json:"recursive"` + Dir bool `json:"dir"` +} + +func (c *DeleteCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *DeleteCommand) Data5() ([]byte, error) { + req5 := &etcdserverpb.Request{ + Method: "DELETE", + Path: c.Key, + Dir: c.Dir, + Recursive: c.Recursive, + } + return req5.Marshal() +} + +type SetCommand struct { + Key string `json:"key"` + Value string `json:"value"` + ExpireTime time.Time `json:"expireTime"` + Dir bool `json:"dir"` +} + +func (c *SetCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *SetCommand) Data5() ([]byte, error) { + req5 := &etcdserverpb.Request{ + Method: "PUT", + Path: c.Key, + Dir: c.Dir, + Val: c.Value, + + //TODO(bcwaldon): Is this correct? + Time: c.ExpireTime.Unix(), + } + return req5.Marshal() +} + +type UpdateCommand struct { + Key string `json:"key"` + Value string `json:"value"` + ExpireTime time.Time `json:"expireTime"` +} + +func (c *UpdateCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *UpdateCommand) Data5() ([]byte, error) { + req5 := &etcdserverpb.Request{ + Method: "PUT", + Path: c.Key, + Val: c.Value, + + //TODO(bcwaldon): Is this correct? + Time: c.ExpireTime.Unix(), + } + return req5.Marshal() +} + +type SyncCommand struct { + Time time.Time `json:"time"` +} + +func (c *SyncCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *SyncCommand) Data5() ([]byte, error) { + req5 := &etcdserverpb.Request{ + Method: "SYNC", + //TODO(bcwaldon): Is this correct? + Time: c.Time.UnixNano(), + } + return req5.Marshal() +} + +type DefaultJoinCommand struct { + //TODO(bcwaldon): implement Type5, Data5 + Command4 + + Name string `json:"name"` + ConnectionString string `json:"connectionString"` +} + +type DefaultLeaveCommand struct { + //TODO(bcwaldon): implement Type5, Data5 + Command4 + + Name string `json:"name"` +} + +//TODO(bcwaldon): Why is CommandName here? +func (c *DefaultLeaveCommand) CommandName() string { + return "raft:leave" +} + +type NOPCommand struct{} + +//TODO(bcwaldon): Why is CommandName here? +func (c NOPCommand) CommandName() string { + return "raft:nop" +} + +func (c *NOPCommand) Type5() raftpb.EntryType { + return raftpb.EntryNormal +} + +func (c *NOPCommand) Data5() ([]byte, error) { + return nil, nil +} + +func Entries4To5(commitIndex uint64, ents4 []*etcd4pb.LogEntry) ([]raftpb.Entry, error) { + ents4Len := len(ents4) + + if ents4Len == 0 { + return nil, nil + } + + startIndex := ents4[0].GetIndex() + for i, e := range ents4[1:] { + eIndex := e.GetIndex() + // ensure indexes are monotonically increasing + wantIndex := startIndex + uint64(i+1) + if wantIndex != eIndex { + return nil, fmt.Errorf("skipped log index %d", wantIndex) + } + } + + ents5 := make([]raftpb.Entry, 0) + for i, e := range ents4 { + ent, err := toEntry5(e) + if err != nil { + log.Printf("Ignoring invalid log data in entry %d: %v", i, err) + } else { + ents5 = append(ents5, *ent) + } + } + + return ents5, nil +} + +func toEntry5(ent4 *etcd4pb.LogEntry) (*raftpb.Entry, error) { + cmd4, err := NewCommand4(ent4.GetCommandName(), ent4.GetCommand()) + if err != nil { + return nil, err + } + + data, err := cmd4.Data5() + if err != nil { + return nil, err + } + + ent5 := raftpb.Entry{ + Term: int64(ent4.GetTerm()), + Index: int64(ent4.GetIndex()), + Type: cmd4.Type5(), + Data: data, + } + + log.Printf("%d: %s -> %s", ent5.Index, ent4.GetCommandName(), ent5.Type) + + return &ent5, nil +} diff --git a/migrate/log_test.go b/migrate/log_test.go new file mode 100644 index 000000000..366d2cfa3 --- /dev/null +++ b/migrate/log_test.go @@ -0,0 +1,42 @@ +package migrate + +import ( + "reflect" + "testing" + "time" +) + +func TestNewCommand(t *testing.T) { + entries, err := ReadLogFile("fixtures/cmdlog") + if err != nil { + t.Errorf("read log file error: %v", err) + } + + tests := []interface{}{ + &JoinCommand{2, 2, "1.local", "http://127.0.0.1:7001", "http://127.0.0.1:4001"}, + &SetClusterConfigCommand{&ClusterConfig{9, 1800.0, 5.0}}, + &NOPCommand{}, + &RemoveCommand{"alice"}, + &CompareAndDeleteCommand{"foo", "baz", 9}, + &CompareAndSwapCommand{"foo", "bar", time.Unix(0, 0), "baz", 9}, + &CreateCommand{"foo", "bar", time.Unix(0, 0), true, true}, + &DeleteCommand{"foo", true, true}, + &SetCommand{"foo", "bar", time.Unix(0, 0), true}, + &SyncCommand{time.Unix(0, 0)}, + &UpdateCommand{"foo", "bar", time.Unix(0, 0)}, + &DefaultLeaveCommand{"alice"}, + &DefaultJoinCommand{"alice", ""}, + } + + for i, e := range entries { + cmd, err := NewCommand(e.GetCommandName(), e.GetCommand()) + if err != nil { + t.Errorf("#%d: %v", i, err) + continue + } + + if !reflect.DeepEqual(cmd, tests[i]) { + t.Errorf("#%d: cmd = %+v, want %+v", i, cmd, tests[i]) + } + } +} diff --git a/migrate/snapshot.go b/migrate/snapshot.go new file mode 100644 index 000000000..5b4a2fe79 --- /dev/null +++ b/migrate/snapshot.go @@ -0,0 +1,187 @@ +package migrate + +import ( + "encoding/json" + "errors" + "fmt" + "hash/crc32" + "io/ioutil" + "log" + "os" + "path" + "sort" + "strconv" + "strings" + + raftpb "github.com/coreos/etcd/raft/raftpb" +) + +type Snapshot4 struct { + State []byte `json:"state"` + LastIndex uint64 `json:"lastIndex"` + LastTerm uint64 `json:"lastTerm"` + + Peers []struct { + Name string `json:"name"` + ConnectionString string `json:"connectionString"` + } `json:"peers"` + + //TODO(bcwaldon): is this needed? + //Path string `json:"path"` +} + +func (s *Snapshot4) Snapshot5() *raftpb.Snapshot { + snap5 := raftpb.Snapshot{ + Data: s.State, + Index: int64(s.LastIndex), + Term: int64(s.LastTerm), + Nodes: make([]int64, len(s.Peers)), + } + + for i, p := range s.Peers { + snap5.Nodes[i] = hashName(p.Name) + } + + return &snap5 +} + +func DecodeLatestSnapshot4FromDir(snapdir string) (*Snapshot4, error) { + fname, err := FindLatestFile(snapdir) + if err != nil { + return nil, err + } + + if fname == "" { + return nil, nil + } + + snappath := path.Join(snapdir, fname) + log.Printf("Decoding snapshot from %s", snappath) + + return DecodeSnapshot4FromFile(snappath) +} + +// FindLatestFile identifies the "latest" filename in a given directory +// by sorting all the files and choosing the highest value. +func FindLatestFile(dirpath string) (string, error) { + dir, err := os.OpenFile(dirpath, os.O_RDONLY, 0) + if err != nil { + if os.IsNotExist(err) { + err = nil + } + return "", err + } + defer dir.Close() + + fnames, err := dir.Readdirnames(-1) + if err != nil { + return "", err + } + + if len(fnames) == 0 { + return "", nil + } + + names, err := NewSnapshotFileNames(fnames) + if err != nil { + return "", err + } + + return names[len(names)-1].FileName, nil +} + +func DecodeSnapshot4FromFile(path string) (*Snapshot4, error) { + // Read snapshot data. + f, err := os.OpenFile(path, os.O_RDONLY, 0) + if err != nil { + return nil, err + } + defer f.Close() + + return DecodeSnapshot4(f) +} + +func DecodeSnapshot4(f *os.File) (*Snapshot4, error) { + // Verify checksum + var checksum uint32 + n, err := fmt.Fscanf(f, "%08x\n", &checksum) + if err != nil { + return nil, err + } else if n != 1 { + return nil, errors.New("miss heading checksum") + } + + // Load remaining snapshot contents. + b, err := ioutil.ReadAll(f) + if err != nil { + return nil, err + } + + // Generate checksum. + byteChecksum := crc32.ChecksumIEEE(b) + if uint32(checksum) != byteChecksum { + return nil, errors.New("bad checksum") + } + + // Decode snapshot. + snapshot := new(Snapshot4) + if err = json.Unmarshal(b, snapshot); err != nil { + return nil, err + } + return snapshot, nil +} + +func NewSnapshotFileNames(names []string) ([]SnapshotFileName, error) { + s := make([]SnapshotFileName, 0) + for _, n := range names { + trimmed := strings.TrimSuffix(n, ".ss") + if trimmed == n { + return nil, fmt.Errorf("file %q does not have .ss extension", n) + } + + parts := strings.SplitN(trimmed, "_", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("unrecognized file name format %q", n) + } + + fn := SnapshotFileName{FileName: n} + + var err error + fn.Term, err = strconv.ParseUint(parts[0], 10, 64) + if err != nil { + return nil, fmt.Errorf("unable to parse term from filename %q: %v", err) + } + + fn.Index, err = strconv.ParseUint(parts[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("unable to parse index from filename %q: %v", err) + } + + s = append(s, fn) + } + + sortable := SnapshotFileNames(s) + sort.Sort(&sortable) + return s, nil +} + +type SnapshotFileNames []SnapshotFileName +type SnapshotFileName struct { + FileName string + Term uint64 + Index uint64 +} + +func (n *SnapshotFileNames) Less(i, j int) bool { + iTerm, iIndex := (*n)[i].Term, (*n)[i].Index + jTerm, jIndex := (*n)[j].Term, (*n)[j].Index + return iTerm < jTerm || (iTerm == jTerm && iIndex < jIndex) +} + +func (n *SnapshotFileNames) Swap(i, j int) { + (*n)[i], (*n)[j] = (*n)[j], (*n)[i] +} + +func (n *SnapshotFileNames) Len() int { + return len([]SnapshotFileName(*n)) +}