// Copyright 2015 CoreOS, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package migrate import ( "bytes" "encoding/json" "fmt" "io" "log" "os" "path" "time" etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb" etcd4pb "github.com/coreos/etcd/migrate/etcd4pb" "github.com/coreos/etcd/pkg/types" raftpb "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/store" ) const etcdDefaultClusterName = "etcd-cluster" func UnixTimeOrPermanent(expireTime time.Time) int64 { expire := expireTime.Unix() if expireTime == store.Permanent { expire = 0 } return expire } type Log4 []*etcd4pb.LogEntry func (l Log4) NodeIDs() map[string]uint64 { out := make(map[string]uint64) for _, e := range l { if e.GetCommandName() == "etcd:join" { cmd4, err := NewCommand4(e.GetCommandName(), e.GetCommand(), nil) if err != nil { log.Println("error converting an etcd:join to v2.0 format. Likely corrupt!") return nil } join := cmd4.(*JoinCommand) m := generateNodeMember(join.Name, join.RaftURL, "") out[join.Name] = uint64(m.ID) } if e.GetCommandName() == "etcd:remove" { cmd4, err := NewCommand4(e.GetCommandName(), e.GetCommand(), nil) if err != nil { return nil } name := cmd4.(*RemoveCommand).Name delete(out, name) } } return out } func StorePath(key string) string { return path.Join("/1", key) } func DecodeLog4FromFile(logpath string) (Log4, error) { file, err := os.OpenFile(logpath, os.O_RDONLY, 0600) if err != nil { return nil, err } defer file.Close() return DecodeLog4(file) } func DecodeLog4(file *os.File) ([]*etcd4pb.LogEntry, error) { var readBytes int64 entries := make([]*etcd4pb.LogEntry, 0) for { entry, n, err := DecodeNextEntry4(file) if err != nil { if err == io.EOF { break } return nil, fmt.Errorf("failed decoding next log entry: %v", err) } entries = append(entries, entry) readBytes += int64(n) } return entries, nil } // DecodeNextEntry4 unmarshals a v0.4 log entry from a reader. Returns the // number of bytes read and any error that occurs. func DecodeNextEntry4(r io.Reader) (*etcd4pb.LogEntry, int, error) { var length int _, err := fmt.Fscanf(r, "%8x\n", &length) if err != nil { return nil, -1, err } data := make([]byte, length) if _, err = io.ReadFull(r, data); err != nil { return nil, -1, err } ent4 := new(etcd4pb.LogEntry) if err = ent4.Unmarshal(data); err != nil { return nil, -1, err } // add width of scanner token to length length = length + 8 + 1 return ent4, length, nil } func hashName(name string) uint64 { var sum uint64 for _, ch := range name { sum = 131*sum + uint64(ch) } return sum } type Command4 interface { Type2() raftpb.EntryType Data2() ([]byte, error) } func NewCommand4(name string, data []byte, raftMap map[string]uint64) (Command4, error) { var cmd Command4 switch name { case "etcd:remove": cmd = &RemoveCommand{} case "etcd:join": cmd = &JoinCommand{} case "etcd:setClusterConfig": cmd = &NOPCommand{} case "etcd:compareAndDelete": cmd = &CompareAndDeleteCommand{} case "etcd:compareAndSwap": cmd = &CompareAndSwapCommand{} case "etcd:create": cmd = &CreateCommand{} case "etcd:delete": cmd = &DeleteCommand{} case "etcd:set": cmd = &SetCommand{} case "etcd:sync": cmd = &SyncCommand{} case "etcd:update": cmd = &UpdateCommand{} case "raft:join": // These are subsumed by etcd:remove and etcd:join; we shouldn't see them. fallthrough case "raft:leave": return nil, fmt.Errorf("found a raft join/leave command; these shouldn't be in an etcd log") case "raft:nop": cmd = &NOPCommand{} default: return nil, fmt.Errorf("unregistered command type %s", name) } // If data for the command was passed in the decode it. if data != nil { if err := json.NewDecoder(bytes.NewReader(data)).Decode(cmd); err != nil { return nil, fmt.Errorf("unable to decode bytes %q: %v", data, err) } } switch name { case "etcd:join": c := cmd.(*JoinCommand) m := generateNodeMember(c.Name, c.RaftURL, c.EtcdURL) c.memb = *m if raftMap != nil { raftMap[c.Name] = uint64(m.ID) } case "etcd:remove": c := cmd.(*RemoveCommand) if raftMap != nil { m, ok := raftMap[c.Name] if !ok { return nil, fmt.Errorf("removing a node named %s before it joined", c.Name) } c.id = m delete(raftMap, c.Name) } } return cmd, nil } type RemoveCommand struct { Name string `json:"name"` id uint64 } func (c *RemoveCommand) Type2() raftpb.EntryType { return raftpb.EntryConfChange } func (c *RemoveCommand) Data2() ([]byte, error) { req2 := raftpb.ConfChange{ ID: 0, Type: raftpb.ConfChangeRemoveNode, NodeID: c.id, } return req2.Marshal() } type JoinCommand struct { Name string `json:"name"` RaftURL string `json:"raftURL"` EtcdURL string `json:"etcdURL"` memb member } func (c *JoinCommand) Type2() raftpb.EntryType { return raftpb.EntryConfChange } func (c *JoinCommand) Data2() ([]byte, error) { b, err := json.Marshal(c.memb) if err != nil { return nil, err } req2 := &raftpb.ConfChange{ ID: 0, Type: raftpb.ConfChangeAddNode, NodeID: uint64(c.memb.ID), Context: b, } return req2.Marshal() } type SetClusterConfigCommand struct { Config *struct { ActiveSize int `json:"activeSize"` RemoveDelay float64 `json:"removeDelay"` SyncInterval float64 `json:"syncInterval"` } `json:"config"` } func (c *SetClusterConfigCommand) Type2() raftpb.EntryType { return raftpb.EntryNormal } func (c *SetClusterConfigCommand) Data2() ([]byte, error) { b, err := json.Marshal(c.Config) if err != nil { return nil, err } req2 := &etcdserverpb.Request{ Method: "PUT", Path: "/v2/admin/config", Dir: false, Val: string(b), } return req2.Marshal() } type CompareAndDeleteCommand struct { Key string `json:"key"` PrevValue string `json:"prevValue"` PrevIndex uint64 `json:"prevIndex"` } func (c *CompareAndDeleteCommand) Type2() raftpb.EntryType { return raftpb.EntryNormal } func (c *CompareAndDeleteCommand) Data2() ([]byte, error) { req2 := &etcdserverpb.Request{ Method: "DELETE", Path: StorePath(c.Key), PrevValue: c.PrevValue, PrevIndex: c.PrevIndex, } return req2.Marshal() } type CompareAndSwapCommand struct { Key string `json:"key"` Value string `json:"value"` ExpireTime time.Time `json:"expireTime"` PrevValue string `json:"prevValue"` PrevIndex uint64 `json:"prevIndex"` } func (c *CompareAndSwapCommand) Type2() raftpb.EntryType { return raftpb.EntryNormal } func (c *CompareAndSwapCommand) Data2() ([]byte, error) { req2 := &etcdserverpb.Request{ Method: "PUT", Path: StorePath(c.Key), Val: c.Value, PrevValue: c.PrevValue, PrevIndex: c.PrevIndex, Expiration: UnixTimeOrPermanent(c.ExpireTime), } return req2.Marshal() } type CreateCommand struct { Key string `json:"key"` Value string `json:"value"` ExpireTime time.Time `json:"expireTime"` Unique bool `json:"unique"` Dir bool `json:"dir"` } func (c *CreateCommand) Type2() raftpb.EntryType { return raftpb.EntryNormal } func (c *CreateCommand) Data2() ([]byte, error) { req2 := &etcdserverpb.Request{ Path: StorePath(c.Key), Dir: c.Dir, Val: c.Value, Expiration: UnixTimeOrPermanent(c.ExpireTime), } if c.Unique { req2.Method = "POST" } else { var prevExist = true req2.Method = "PUT" req2.PrevExist = &prevExist } return req2.Marshal() } type DeleteCommand struct { Key string `json:"key"` Recursive bool `json:"recursive"` Dir bool `json:"dir"` } func (c *DeleteCommand) Type2() raftpb.EntryType { return raftpb.EntryNormal } func (c *DeleteCommand) Data2() ([]byte, error) { req2 := &etcdserverpb.Request{ Method: "DELETE", Path: StorePath(c.Key), Dir: c.Dir, Recursive: c.Recursive, } return req2.Marshal() } type SetCommand struct { Key string `json:"key"` Value string `json:"value"` ExpireTime time.Time `json:"expireTime"` Dir bool `json:"dir"` } func (c *SetCommand) Type2() raftpb.EntryType { return raftpb.EntryNormal } func (c *SetCommand) Data2() ([]byte, error) { req2 := &etcdserverpb.Request{ Method: "PUT", Path: StorePath(c.Key), Dir: c.Dir, Val: c.Value, Expiration: UnixTimeOrPermanent(c.ExpireTime), } return req2.Marshal() } type UpdateCommand struct { Key string `json:"key"` Value string `json:"value"` ExpireTime time.Time `json:"expireTime"` } func (c *UpdateCommand) Type2() raftpb.EntryType { return raftpb.EntryNormal } func (c *UpdateCommand) Data2() ([]byte, error) { exist := true req2 := &etcdserverpb.Request{ Method: "PUT", Path: StorePath(c.Key), Val: c.Value, PrevExist: &exist, Expiration: UnixTimeOrPermanent(c.ExpireTime), } return req2.Marshal() } type SyncCommand struct { Time time.Time `json:"time"` } func (c *SyncCommand) Type2() raftpb.EntryType { return raftpb.EntryNormal } func (c *SyncCommand) Data2() ([]byte, error) { req2 := &etcdserverpb.Request{ Method: "SYNC", Time: c.Time.UnixNano(), } return req2.Marshal() } type DefaultJoinCommand struct { Name string `json:"name"` ConnectionString string `json:"connectionString"` } type DefaultLeaveCommand struct { Name string `json:"name"` id uint64 } type NOPCommand struct{} //TODO(bcwaldon): Why is CommandName here? func (c NOPCommand) CommandName() string { return "raft:nop" } func (c *NOPCommand) Type2() raftpb.EntryType { return raftpb.EntryNormal } func (c *NOPCommand) Data2() ([]byte, error) { return nil, nil } func Entries4To2(ents4 []*etcd4pb.LogEntry) ([]raftpb.Entry, error) { ents4Len := len(ents4) if ents4Len == 0 { return nil, nil } startIndex := ents4[0].GetIndex() for i, e := range ents4[1:] { eIndex := e.GetIndex() // ensure indexes are monotonically increasing wantIndex := startIndex + uint64(i+1) if wantIndex != eIndex { return nil, fmt.Errorf("skipped log index %d", wantIndex) } } raftMap := make(map[string]uint64) ents2 := make([]raftpb.Entry, 0) for i, e := range ents4 { ent, err := toEntry2(e, raftMap) if err != nil { log.Fatalf("Error converting entry %d, %s", i, err) } else { ents2 = append(ents2, *ent) } } return ents2, nil } func toEntry2(ent4 *etcd4pb.LogEntry, raftMap map[string]uint64) (*raftpb.Entry, error) { cmd4, err := NewCommand4(ent4.GetCommandName(), ent4.GetCommand(), raftMap) if err != nil { return nil, err } data, err := cmd4.Data2() if err != nil { return nil, err } ent2 := raftpb.Entry{ Term: ent4.GetTerm() + termOffset4to2, Index: ent4.GetIndex(), Type: cmd4.Type2(), Data: data, } return &ent2, nil } func generateNodeMember(name, rafturl, etcdurl string) *member { pURLs, err := types.NewURLs([]string{rafturl}) if err != nil { log.Fatalf("Invalid Raft URL %s -- this log could never have worked", rafturl) } m := NewMember(name, pURLs, etcdDefaultClusterName) m.ClientURLs = []string{etcdurl} return m }