Merge pull request #1578 from barakmich/bcm_migrate

migrate: Add a migration tool to go from etcd v0.4 -> v0.5 data directories
This commit is contained in:
Barak Michener 2014-11-14 16:58:22 -05:00
commit 4e251f8624
15 changed files with 1849 additions and 3 deletions

View File

@ -0,0 +1,47 @@
## etcd 0.4.x -> 0.5.0 Data Migration Tool
### Upgrading from 0.4.x
Between 0.4.x and 0.5, the on-disk data formats have changed. In order to allow users to convert to 0.5, a migration tool is provided.
In the early 0.5.0-alpha series, we're providing this tool early to encourage adoption. However, before 0.5.0-release, etcd will autodetect the 0.4.x data dir upon upgrade and automatically update the data too (while leaving a backup, in case of emergency).
### Data Migration Tips
* Keep the environment variables and etcd instance flags the same (much as [the upgrade document](../upgrade.md) suggests), particularly `--name`/`ETCD_NAME`.
* Don't change the cluster configuration. If there's a plan to add or remove machines, it's probably best to arrange for that after the migration, rather than before or at the same time.
### Running the tool
The tool can be run via:
```sh
./bin/etcd-migrate --data-dir=<PATH TO YOUR DATA>
```
It should autodetect everything and convert the data-dir to be 0.5 compatible. It does not remove the 0.4.x data, and is safe to convert multiple times; the 0.5 data will be overwritten. Recovering the disk space once everything is settled is covered later in the document.
If, however, it complains about autodetecting the name (which can happen, depending on how the cluster was configured), you need to supply the name of this particular node. This is equivalent to the `--name` flag (or `ETCD_NAME` variable) that etcd was run with, which can also be found by accessing the self api, eg:
```sh
curl -L http://127.0.0.1:4001/v2/stats/self
```
Where the `"name"` field is the name of the local machine.
Then, run the migration tool with
```sh
./bin/etcd-migrate --data-dir=<PATH TO YOUR DATA> --name=<NAME>
```
And the tool should migrate successfully. If it still has an error at this time, it's a failure or bug in the tool and it's worth reporting a bug.
### Recovering Disk Space
If the conversion has completed, the entire cluster is running on something 0.5-based, and the disk space is important, the following command will clear 0.4.x data from the data-dir:
```sh
rm -ri snapshot conf log
```
It will ask before every deletion, but these are the 0.4.x files and will not affect the working 0.5 data.

1
build
View File

@ -13,3 +13,4 @@ eval $(go env)
go build -o bin/etcd ${REPO_PATH}
go build -o bin/etcdctl ${REPO_PATH}/etcdctl
go build -o bin/etcd-migrate ${REPO_PATH}/migrate/cmd/etcd-migrate

View File

@ -47,7 +47,7 @@ type Member struct {
Attributes
}
// newMember creates a Member without an ID and generates one based on the
// NewMember creates a Member without an ID and generates one based on the
// name, peer URLs. This is used for bootstrapping/adding new member.
func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member {
m := &Member{
@ -105,6 +105,10 @@ func memberStoreKey(id types.ID) string {
return path.Join(storeMembersPrefix, id.String())
}
func MemberAttributesStorePath(id types.ID) string {
return path.Join(memberStoreKey(id), attributesSuffix)
}
func mustParseMemberIDFromKey(key string) types.ID {
id, err := types.IDFromString(path.Base(key))
if err != nil {

View File

@ -581,7 +581,7 @@ func (s *EtcdServer) publish(retryInterval time.Duration) {
req := pb.Request{
ID: GenID(),
Method: "PUT",
Path: path.Join(memberStoreKey(s.id), attributesSuffix),
Path: MemberAttributesStorePath(s.id),
Val: string(b),
}

View File

@ -0,0 +1,97 @@
package main
import (
"errors"
"flag"
"fmt"
"log"
"path"
etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/migrate"
"github.com/coreos/etcd/pkg/types"
raftpb "github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/wal"
)
func walDir5(dataDir string) string {
return path.Join(dataDir, "wal")
}
func logFile4(dataDir string) string {
return path.Join(dataDir, "log")
}
func main() {
version := flag.Int("version", 5, "4 or 5")
from := flag.String("data-dir", "", "")
flag.Parse()
if *from == "" {
log.Fatal("Must provide -data-dir flag")
}
var ents []raftpb.Entry
var err error
switch *version {
case 4:
ents, err = dump4(*from)
case 5:
ents, err = dump5(*from)
default:
err = errors.New("value of -version flag must be 4 or 5")
}
if err != nil {
log.Fatalf("Failed decoding log: %v", err)
}
for _, e := range ents {
msg := fmt.Sprintf("%2d %5d: ", e.Term, e.Index)
switch e.Type {
case raftpb.EntryNormal:
msg = fmt.Sprintf("%s norm", msg)
var r etcdserverpb.Request
if err := r.Unmarshal(e.Data); err != nil {
msg = fmt.Sprintf("%s ???", msg)
} else {
msg = fmt.Sprintf("%s %s %s %s", msg, r.Method, r.Path, r.Val)
}
case raftpb.EntryConfChange:
msg = fmt.Sprintf("%s conf", msg)
var r raftpb.ConfChange
if err := r.Unmarshal(e.Data); err != nil {
msg = fmt.Sprintf("%s ???", msg)
} else {
msg = fmt.Sprintf("%s %s %s %s", msg, r.Type, types.ID(r.NodeID), r.Context)
}
}
fmt.Println(msg)
}
}
func dump4(dataDir string) ([]raftpb.Entry, error) {
lf4 := logFile4(dataDir)
ents, err := migrate.DecodeLog4FromFile(lf4)
if err != nil {
return nil, err
}
return migrate.Entries4To5(ents)
}
func dump5(dataDir string) ([]raftpb.Entry, error) {
wd5 := walDir5(dataDir)
if !wal.Exist(wd5) {
return nil, fmt.Errorf("No wal exists at %s", wd5)
}
w, err := wal.OpenAtIndex(wd5, 0)
if err != nil {
return nil, err
}
defer w.Close()
_, _, ents, err := w.ReadAll()
return ents, err
}

View File

@ -0,0 +1,23 @@
package main
import (
"flag"
"log"
"github.com/coreos/etcd/migrate"
)
func main() {
from := flag.String("data-dir", "", "etcd v0.4 data-dir")
name := flag.String("name", "", "etcd node name")
flag.Parse()
if *from == "" {
log.Fatal("Must provide -data-dir flag")
}
err := migrate.Migrate4To5(*from, *name)
if err != nil {
log.Fatalf("Failed migrating data-dir: %v", err)
}
}

39
migrate/config.go Normal file
View File

@ -0,0 +1,39 @@
package migrate
import (
"encoding/json"
"io/ioutil"
"github.com/coreos/etcd/raft/raftpb"
)
type Config4 struct {
CommitIndex uint64 `json:"commitIndex"`
Peers []struct {
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
} `json:"peers"`
}
func (c *Config4) HardState5() raftpb.HardState {
return raftpb.HardState{
Commit: c.CommitIndex,
Term: 0,
Vote: 0,
}
}
func DecodeConfig4FromFile(cfgPath string) (*Config4, error) {
b, err := ioutil.ReadFile(cfgPath)
if err != nil {
return nil, err
}
conf := &Config4{}
if err = json.Unmarshal(b, conf); err != nil {
return nil, err
}
return conf, nil
}

168
migrate/etcd4.go Normal file
View File

@ -0,0 +1,168 @@
package migrate
import (
"fmt"
"log"
"os"
"path"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/pbutil"
raftpb "github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
)
func snapDir4(dataDir string) string {
return path.Join(dataDir, "snapshot")
}
func logFile4(dataDir string) string {
return path.Join(dataDir, "log")
}
func cfgFile4(dataDir string) string {
return path.Join(dataDir, "conf")
}
func snapDir5(dataDir string) string {
return path.Join(dataDir, "snap")
}
func walDir5(dataDir string) string {
return path.Join(dataDir, "wal")
}
func Migrate4To5(dataDir string, name string) error {
// prep new directories
sd5 := snapDir5(dataDir)
if err := os.MkdirAll(sd5, 0700); err != nil {
return fmt.Errorf("failed creating snapshot directory %s: %v", sd5, err)
}
// read v0.4 data
snap4, err := DecodeLatestSnapshot4FromDir(snapDir4(dataDir))
if err != nil {
return err
}
cfg4, err := DecodeConfig4FromFile(cfgFile4(dataDir))
if err != nil {
return err
}
ents4, err := DecodeLog4FromFile(logFile4(dataDir))
if err != nil {
return err
}
nodeIDs := ents4.NodeIDs()
nodeID := GuessNodeID(nodeIDs, snap4, cfg4, name)
if nodeID == 0 {
return fmt.Errorf("Couldn't figure out the node ID from the log or flags, cannot convert")
}
metadata := pbutil.MustMarshal(&pb.Metadata{NodeID: nodeID, ClusterID: 0x04add5})
wd5 := walDir5(dataDir)
w, err := wal.Create(wd5, metadata)
if err != nil {
return fmt.Errorf("failed initializing wal at %s: %v", wd5, err)
}
defer w.Close()
// transform v0.4 data
var snap5 *raftpb.Snapshot
if snap4 == nil {
log.Printf("No snapshot found")
} else {
log.Printf("Found snapshot: lastIndex=%d", snap4.LastIndex)
snap5 = snap4.Snapshot5()
}
st5 := cfg4.HardState5()
// If we've got the most recent snapshot, we can use it's committed index. Still likely less than the current actual index, but worth it for the replay.
if snap5 != nil {
st5.Commit = snap5.Index
}
ents5, err := Entries4To5(ents4)
if err != nil {
return err
}
ents5Len := len(ents5)
log.Printf("Found %d log entries: firstIndex=%d lastIndex=%d", ents5Len, ents5[0].Index, ents5[ents5Len-1].Index)
// explicitly prepend an empty entry as the WAL code expects it
ents5 = append(make([]raftpb.Entry, 1), ents5...)
if err = w.Save(st5, ents5); err != nil {
return err
}
log.Printf("Log migration successful")
// migrate snapshot (if necessary) and logs
if snap5 != nil {
ss := snap.New(sd5)
if err := ss.SaveSnap(*snap5); err != nil {
return err
}
log.Printf("Snapshot migration successful")
}
return nil
}
func GuessNodeID(nodes map[string]uint64, snap4 *Snapshot4, cfg *Config4, name string) uint64 {
var snapNodes map[string]uint64
if snap4 != nil {
snapNodes = snap4.GetNodesFromStore()
}
// First, use the flag, if set.
if name != "" {
log.Printf("Using suggested name %s", name)
if val, ok := nodes[name]; ok {
log.Printf("Found ID %d", val)
return val
}
if snapNodes != nil {
if val, ok := snapNodes[name]; ok {
log.Printf("Found ID %d", val)
return val
}
}
log.Printf("Name not found, autodetecting...")
}
// Next, look at the snapshot peers, if that exists.
if snap4 != nil {
//snapNodes := make(map[string]uint64)
//for _, p := range snap4.Peers {
//m := generateNodeMember(p.Name, p.ConnectionString, "")
//snapNodes[p.Name] = uint64(m.ID)
//}
for _, p := range cfg.Peers {
log.Printf(p.Name)
delete(snapNodes, p.Name)
}
if len(snapNodes) == 1 {
for name, id := range nodes {
log.Printf("Autodetected from snapshot: name %s", name)
return id
}
}
}
// Then, try and deduce from the log.
for _, p := range cfg.Peers {
delete(nodes, p.Name)
}
if len(nodes) == 1 {
for name, id := range nodes {
log.Printf("Autodetected name %s", name)
return id
}
}
return 0
}

View File

@ -0,0 +1,552 @@
// Code generated by protoc-gen-gogo.
// source: log_entry.proto
// DO NOT EDIT!
package protobuf
import proto "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto"
import json "encoding/json"
import math "math"
// discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo.pb"
import io "io"
import code_google_com_p_gogoprotobuf_proto "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto"
import fmt "fmt"
import strings "strings"
import reflect "reflect"
import fmt1 "fmt"
import strings1 "strings"
import code_google_com_p_gogoprotobuf_proto1 "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto"
import sort "sort"
import strconv "strconv"
import reflect1 "reflect"
import fmt2 "fmt"
import bytes "bytes"
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
var _ = proto.Marshal
var _ = &json.SyntaxError{}
var _ = math.Inf
type LogEntry struct {
Index *uint64 `protobuf:"varint,1,req" json:"Index,omitempty"`
Term *uint64 `protobuf:"varint,2,req" json:"Term,omitempty"`
CommandName *string `protobuf:"bytes,3,req" json:"CommandName,omitempty"`
Command []byte `protobuf:"bytes,4,opt" json:"Command,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *LogEntry) Reset() { *m = LogEntry{} }
func (*LogEntry) ProtoMessage() {}
func (m *LogEntry) GetIndex() uint64 {
if m != nil && m.Index != nil {
return *m.Index
}
return 0
}
func (m *LogEntry) GetTerm() uint64 {
if m != nil && m.Term != nil {
return *m.Term
}
return 0
}
func (m *LogEntry) GetCommandName() string {
if m != nil && m.CommandName != nil {
return *m.CommandName
}
return ""
}
func (m *LogEntry) GetCommand() []byte {
if m != nil {
return m.Command
}
return nil
}
func init() {
}
func (m *LogEntry) Unmarshal(data []byte) error {
l := len(data)
index := 0
for index < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
switch fieldNum {
case 1:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
var v uint64
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
v |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
m.Index = &v
case 2:
if wireType != 0 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
var v uint64
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
v |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
m.Term = &v
case 3:
if wireType != 2 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
postIndex := index + int(stringLen)
if postIndex > l {
return io.ErrUnexpectedEOF
}
s := string(data[index:postIndex])
m.CommandName = &s
index = postIndex
case 4:
if wireType != 2 {
return code_google_com_p_gogoprotobuf_proto.ErrWrongType
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if index >= l {
return io.ErrUnexpectedEOF
}
b := data[index]
index++
byteLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
postIndex := index + byteLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Command = append(m.Command, data[index:postIndex]...)
index = postIndex
default:
var sizeOfWire int
for {
sizeOfWire++
wire >>= 7
if wire == 0 {
break
}
}
index -= sizeOfWire
skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
if err != nil {
return err
}
if (index + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
index += skippy
}
}
return nil
}
func (this *LogEntry) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&LogEntry{`,
`Index:` + valueToStringLogEntry(this.Index) + `,`,
`Term:` + valueToStringLogEntry(this.Term) + `,`,
`CommandName:` + valueToStringLogEntry(this.CommandName) + `,`,
`Command:` + valueToStringLogEntry(this.Command) + `,`,
`XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`,
`}`,
}, "")
return s
}
func valueToStringLogEntry(v interface{}) string {
rv := reflect.ValueOf(v)
if rv.IsNil() {
return "nil"
}
pv := reflect.Indirect(rv).Interface()
return fmt.Sprintf("*%v", pv)
}
func (m *LogEntry) Size() (n int) {
var l int
_ = l
if m.Index != nil {
n += 1 + sovLogEntry(uint64(*m.Index))
}
if m.Term != nil {
n += 1 + sovLogEntry(uint64(*m.Term))
}
if m.CommandName != nil {
l = len(*m.CommandName)
n += 1 + l + sovLogEntry(uint64(l))
}
if m.Command != nil {
l = len(m.Command)
n += 1 + l + sovLogEntry(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovLogEntry(x uint64) (n int) {
for {
n++
x >>= 7
if x == 0 {
break
}
}
return n
}
func sozLogEntry(x uint64) (n int) {
return sovLogEntry(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func NewPopulatedLogEntry(r randyLogEntry, easy bool) *LogEntry {
this := &LogEntry{}
v1 := uint64(r.Uint32())
this.Index = &v1
v2 := uint64(r.Uint32())
this.Term = &v2
v3 := randStringLogEntry(r)
this.CommandName = &v3
if r.Intn(10) != 0 {
v4 := r.Intn(100)
this.Command = make([]byte, v4)
for i := 0; i < v4; i++ {
this.Command[i] = byte(r.Intn(256))
}
}
if !easy && r.Intn(10) != 0 {
this.XXX_unrecognized = randUnrecognizedLogEntry(r, 5)
}
return this
}
type randyLogEntry interface {
Float32() float32
Float64() float64
Int63() int64
Int31() int32
Uint32() uint32
Intn(n int) int
}
func randUTF8RuneLogEntry(r randyLogEntry) rune {
res := rune(r.Uint32() % 1112064)
if 55296 <= res {
res += 2047
}
return res
}
func randStringLogEntry(r randyLogEntry) string {
v5 := r.Intn(100)
tmps := make([]rune, v5)
for i := 0; i < v5; i++ {
tmps[i] = randUTF8RuneLogEntry(r)
}
return string(tmps)
}
func randUnrecognizedLogEntry(r randyLogEntry, maxFieldNumber int) (data []byte) {
l := r.Intn(5)
for i := 0; i < l; i++ {
wire := r.Intn(4)
if wire == 3 {
wire = 5
}
fieldNumber := maxFieldNumber + r.Intn(100)
data = randFieldLogEntry(data, r, fieldNumber, wire)
}
return data
}
func randFieldLogEntry(data []byte, r randyLogEntry, fieldNumber int, wire int) []byte {
key := uint32(fieldNumber)<<3 | uint32(wire)
switch wire {
case 0:
data = encodeVarintPopulateLogEntry(data, uint64(key))
data = encodeVarintPopulateLogEntry(data, uint64(r.Int63()))
case 1:
data = encodeVarintPopulateLogEntry(data, uint64(key))
data = append(data, byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)))
case 2:
data = encodeVarintPopulateLogEntry(data, uint64(key))
ll := r.Intn(100)
data = encodeVarintPopulateLogEntry(data, uint64(ll))
for j := 0; j < ll; j++ {
data = append(data, byte(r.Intn(256)))
}
default:
data = encodeVarintPopulateLogEntry(data, uint64(key))
data = append(data, byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)))
}
return data
}
func encodeVarintPopulateLogEntry(data []byte, v uint64) []byte {
for v >= 1<<7 {
data = append(data, uint8(uint64(v)&0x7f|0x80))
v >>= 7
}
data = append(data, uint8(v))
return data
}
func (m *LogEntry) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
if err != nil {
return nil, err
}
return data[:n], nil
}
func (m *LogEntry) MarshalTo(data []byte) (n int, err error) {
var i int
_ = i
var l int
_ = l
if m.Index != nil {
data[i] = 0x8
i++
i = encodeVarintLogEntry(data, i, uint64(*m.Index))
}
if m.Term != nil {
data[i] = 0x10
i++
i = encodeVarintLogEntry(data, i, uint64(*m.Term))
}
if m.CommandName != nil {
data[i] = 0x1a
i++
i = encodeVarintLogEntry(data, i, uint64(len(*m.CommandName)))
i += copy(data[i:], *m.CommandName)
}
if m.Command != nil {
data[i] = 0x22
i++
i = encodeVarintLogEntry(data, i, uint64(len(m.Command)))
i += copy(data[i:], m.Command)
}
if m.XXX_unrecognized != nil {
i += copy(data[i:], m.XXX_unrecognized)
}
return i, nil
}
func encodeFixed64LogEntry(data []byte, offset int, v uint64) int {
data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8)
data[offset+2] = uint8(v >> 16)
data[offset+3] = uint8(v >> 24)
data[offset+4] = uint8(v >> 32)
data[offset+5] = uint8(v >> 40)
data[offset+6] = uint8(v >> 48)
data[offset+7] = uint8(v >> 56)
return offset + 8
}
func encodeFixed32LogEntry(data []byte, offset int, v uint32) int {
data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8)
data[offset+2] = uint8(v >> 16)
data[offset+3] = uint8(v >> 24)
return offset + 4
}
func encodeVarintLogEntry(data []byte, offset int, v uint64) int {
for v >= 1<<7 {
data[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
data[offset] = uint8(v)
return offset + 1
}
func (this *LogEntry) GoString() string {
if this == nil {
return "nil"
}
s := strings1.Join([]string{`&protobuf.LogEntry{` + `Index:` + valueToGoStringLogEntry(this.Index, "uint64"), `Term:` + valueToGoStringLogEntry(this.Term, "uint64"), `CommandName:` + valueToGoStringLogEntry(this.CommandName, "string"), `Command:` + valueToGoStringLogEntry(this.Command, "byte"), `XXX_unrecognized:` + fmt1.Sprintf("%#v", this.XXX_unrecognized) + `}`}, ", ")
return s
}
func valueToGoStringLogEntry(v interface{}, typ string) string {
rv := reflect1.ValueOf(v)
if rv.IsNil() {
return "nil"
}
pv := reflect1.Indirect(rv).Interface()
return fmt1.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv)
}
func extensionToGoStringLogEntry(e map[int32]code_google_com_p_gogoprotobuf_proto1.Extension) string {
if e == nil {
return "nil"
}
s := "map[int32]proto.Extension{"
keys := make([]int, 0, len(e))
for k := range e {
keys = append(keys, int(k))
}
sort.Ints(keys)
ss := []string{}
for _, k := range keys {
ss = append(ss, strconv.Itoa(k)+": "+e[int32(k)].GoString())
}
s += strings1.Join(ss, ",") + "}"
return s
}
func (this *LogEntry) VerboseEqual(that interface{}) error {
if that == nil {
if this == nil {
return nil
}
return fmt2.Errorf("that == nil && this != nil")
}
that1, ok := that.(*LogEntry)
if !ok {
return fmt2.Errorf("that is not of type *LogEntry")
}
if that1 == nil {
if this == nil {
return nil
}
return fmt2.Errorf("that is type *LogEntry but is nil && this != nil")
} else if this == nil {
return fmt2.Errorf("that is type *LogEntrybut is not nil && this == nil")
}
if this.Index != nil && that1.Index != nil {
if *this.Index != *that1.Index {
return fmt2.Errorf("Index this(%v) Not Equal that(%v)", *this.Index, *that1.Index)
}
} else if this.Index != nil {
return fmt2.Errorf("this.Index == nil && that.Index != nil")
} else if that1.Index != nil {
return fmt2.Errorf("Index this(%v) Not Equal that(%v)", this.Index, that1.Index)
}
if this.Term != nil && that1.Term != nil {
if *this.Term != *that1.Term {
return fmt2.Errorf("Term this(%v) Not Equal that(%v)", *this.Term, *that1.Term)
}
} else if this.Term != nil {
return fmt2.Errorf("this.Term == nil && that.Term != nil")
} else if that1.Term != nil {
return fmt2.Errorf("Term this(%v) Not Equal that(%v)", this.Term, that1.Term)
}
if this.CommandName != nil && that1.CommandName != nil {
if *this.CommandName != *that1.CommandName {
return fmt2.Errorf("CommandName this(%v) Not Equal that(%v)", *this.CommandName, *that1.CommandName)
}
} else if this.CommandName != nil {
return fmt2.Errorf("this.CommandName == nil && that.CommandName != nil")
} else if that1.CommandName != nil {
return fmt2.Errorf("CommandName this(%v) Not Equal that(%v)", this.CommandName, that1.CommandName)
}
if !bytes.Equal(this.Command, that1.Command) {
return fmt2.Errorf("Command this(%v) Not Equal that(%v)", this.Command, that1.Command)
}
if !bytes.Equal(this.XXX_unrecognized, that1.XXX_unrecognized) {
return fmt2.Errorf("XXX_unrecognized this(%v) Not Equal that(%v)", this.XXX_unrecognized, that1.XXX_unrecognized)
}
return nil
}
func (this *LogEntry) Equal(that interface{}) bool {
if that == nil {
if this == nil {
return true
}
return false
}
that1, ok := that.(*LogEntry)
if !ok {
return false
}
if that1 == nil {
if this == nil {
return true
}
return false
} else if this == nil {
return false
}
if this.Index != nil && that1.Index != nil {
if *this.Index != *that1.Index {
return false
}
} else if this.Index != nil {
return false
} else if that1.Index != nil {
return false
}
if this.Term != nil && that1.Term != nil {
if *this.Term != *that1.Term {
return false
}
} else if this.Term != nil {
return false
} else if that1.Term != nil {
return false
}
if this.CommandName != nil && that1.CommandName != nil {
if *this.CommandName != *that1.CommandName {
return false
}
} else if this.CommandName != nil {
return false
} else if that1.CommandName != nil {
return false
}
if !bytes.Equal(this.Command, that1.Command) {
return false
}
if !bytes.Equal(this.XXX_unrecognized, that1.XXX_unrecognized) {
return false
}
return true
}

View File

@ -0,0 +1,22 @@
package protobuf;
import "code.google.com/p/gogoprotobuf/gogoproto/gogo.proto";
option (gogoproto.gostring_all) = true;
option (gogoproto.equal_all) = true;
option (gogoproto.verbose_equal_all) = true;
option (gogoproto.goproto_stringer_all) = false;
option (gogoproto.stringer_all) = true;
option (gogoproto.populate_all) = true;
option (gogoproto.testgen_all) = true;
option (gogoproto.benchgen_all) = true;
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
message LogEntry {
required uint64 Index=1;
required uint64 Term=2;
required string CommandName=3;
optional bytes Command=4; // for nop-command
}

BIN
migrate/fixtures/cmdlog Normal file

Binary file not shown.

508
migrate/log.go Normal file
View File

@ -0,0 +1,508 @@
package migrate
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"os"
"path"
"time"
"github.com/coreos/etcd/etcdserver"
etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb"
etcd4pb "github.com/coreos/etcd/migrate/etcd4pb"
"github.com/coreos/etcd/pkg/types"
raftpb "github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/store"
)
const etcdDefaultClusterName = "etcd-cluster"
func UnixTimeOrPermanent(expireTime time.Time) int64 {
expire := expireTime.Unix()
if expireTime == store.Permanent {
expire = 0
}
return expire
}
type Log4 []*etcd4pb.LogEntry
func (l Log4) NodeIDs() map[string]uint64 {
out := make(map[string]uint64)
for _, e := range l {
if e.GetCommandName() == "etcd:join" {
cmd4, err := NewCommand4(e.GetCommandName(), e.GetCommand(), nil)
if err != nil {
log.Println("error converting an etcd:join to v0.5 format. Likely corrupt!")
return nil
}
join := cmd4.(*JoinCommand)
m := generateNodeMember(join.Name, join.RaftURL, "")
out[join.Name] = uint64(m.ID)
}
if e.GetCommandName() == "etcd:remove" {
cmd4, err := NewCommand4(e.GetCommandName(), e.GetCommand(), nil)
if err != nil {
return nil
}
name := cmd4.(*RemoveCommand).Name
delete(out, name)
}
}
return out
}
func StorePath(key string) string {
return path.Join(etcdserver.StoreKeysPrefix, key)
}
func DecodeLog4FromFile(logpath string) (Log4, error) {
file, err := os.OpenFile(logpath, os.O_RDONLY, 0600)
if err != nil {
return nil, err
}
defer file.Close()
return DecodeLog4(file)
}
func DecodeLog4(file *os.File) ([]*etcd4pb.LogEntry, error) {
var readBytes int64
entries := make([]*etcd4pb.LogEntry, 0)
for {
entry, n, err := DecodeNextEntry4(file)
if err != nil {
if err == io.EOF {
break
}
return nil, fmt.Errorf("failed decoding next log entry: %v", err)
}
entries = append(entries, entry)
readBytes += int64(n)
}
return entries, nil
}
// DecodeNextEntry4 unmarshals a v0.4 log entry from a reader. Returns the
// number of bytes read and any error that occurs.
func DecodeNextEntry4(r io.Reader) (*etcd4pb.LogEntry, int, error) {
var length int
_, err := fmt.Fscanf(r, "%8x\n", &length)
if err != nil {
return nil, -1, err
}
data := make([]byte, length)
if _, err = io.ReadFull(r, data); err != nil {
return nil, -1, err
}
ent4 := new(etcd4pb.LogEntry)
if err = ent4.Unmarshal(data); err != nil {
return nil, -1, err
}
// add width of scanner token to length
length = length + 8 + 1
return ent4, length, nil
}
func hashName(name string) uint64 {
var sum uint64
for _, ch := range name {
sum = 131*sum + uint64(ch)
}
return sum
}
type Command4 interface {
Type5() raftpb.EntryType
Data5() ([]byte, error)
}
func NewCommand4(name string, data []byte, raftMap map[string]uint64) (Command4, error) {
var cmd Command4
switch name {
case "etcd:remove":
cmd = &RemoveCommand{}
case "etcd:join":
cmd = &JoinCommand{}
case "etcd:setClusterConfig":
cmd = &NOPCommand{}
case "etcd:compareAndDelete":
cmd = &CompareAndDeleteCommand{}
case "etcd:compareAndSwap":
cmd = &CompareAndSwapCommand{}
case "etcd:create":
cmd = &CreateCommand{}
case "etcd:delete":
cmd = &DeleteCommand{}
case "etcd:set":
cmd = &SetCommand{}
case "etcd:sync":
cmd = &SyncCommand{}
case "etcd:update":
cmd = &UpdateCommand{}
case "raft:join":
// These are subsumed by etcd:remove and etcd:join; we shouldn't see them.
fallthrough
case "raft:leave":
return nil, fmt.Errorf("found a raft join/leave command; these shouldn't be in an etcd log")
case "raft:nop":
cmd = &NOPCommand{}
default:
return nil, fmt.Errorf("unregistered command type %s", name)
}
// If data for the command was passed in the decode it.
if data != nil {
if err := json.NewDecoder(bytes.NewReader(data)).Decode(cmd); err != nil {
return nil, fmt.Errorf("unable to decode bytes %q: %v", data, err)
}
}
switch name {
case "etcd:join":
c := cmd.(*JoinCommand)
m := generateNodeMember(c.Name, c.RaftURL, c.EtcdURL)
c.memb = *m
if raftMap != nil {
raftMap[c.Name] = uint64(m.ID)
}
case "etcd:remove":
c := cmd.(*RemoveCommand)
if raftMap != nil {
m, ok := raftMap[c.Name]
if !ok {
return nil, fmt.Errorf("removing a node named %s before it joined", c.Name)
}
c.id = m
delete(raftMap, c.Name)
}
}
return cmd, nil
}
type RemoveCommand struct {
Name string `json:"name"`
id uint64
}
func (c *RemoveCommand) Type5() raftpb.EntryType {
return raftpb.EntryConfChange
}
func (c *RemoveCommand) Data5() ([]byte, error) {
req5 := raftpb.ConfChange{
ID: 0,
Type: raftpb.ConfChangeRemoveNode,
NodeID: c.id,
}
return req5.Marshal()
}
type JoinCommand struct {
Name string `json:"name"`
RaftURL string `json:"raftURL"`
EtcdURL string `json:"etcdURL"`
memb etcdserver.Member
}
func (c *JoinCommand) Type5() raftpb.EntryType {
return raftpb.EntryConfChange
}
func (c *JoinCommand) Data5() ([]byte, error) {
b, err := json.Marshal(c.memb)
if err != nil {
return nil, err
}
req5 := &raftpb.ConfChange{
ID: 0,
Type: raftpb.ConfChangeAddNode,
NodeID: uint64(c.memb.ID),
Context: b,
}
return req5.Marshal()
}
type SetClusterConfigCommand struct {
Config *struct {
ActiveSize int `json:"activeSize"`
RemoveDelay float64 `json:"removeDelay"`
SyncInterval float64 `json:"syncInterval"`
} `json:"config"`
}
func (c *SetClusterConfigCommand) Type5() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *SetClusterConfigCommand) Data5() ([]byte, error) {
b, err := json.Marshal(c.Config)
if err != nil {
return nil, err
}
req5 := &etcdserverpb.Request{
Method: "PUT",
Path: "/v2/admin/config",
Dir: false,
Val: string(b),
}
return req5.Marshal()
}
type CompareAndDeleteCommand struct {
Key string `json:"key"`
PrevValue string `json:"prevValue"`
PrevIndex uint64 `json:"prevIndex"`
}
func (c *CompareAndDeleteCommand) Type5() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *CompareAndDeleteCommand) Data5() ([]byte, error) {
req5 := &etcdserverpb.Request{
Method: "DELETE",
Path: StorePath(c.Key),
PrevValue: c.PrevValue,
PrevIndex: c.PrevIndex,
}
return req5.Marshal()
}
type CompareAndSwapCommand struct {
Key string `json:"key"`
Value string `json:"value"`
ExpireTime time.Time `json:"expireTime"`
PrevValue string `json:"prevValue"`
PrevIndex uint64 `json:"prevIndex"`
}
func (c *CompareAndSwapCommand) Type5() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *CompareAndSwapCommand) Data5() ([]byte, error) {
req5 := &etcdserverpb.Request{
Method: "PUT",
Path: StorePath(c.Key),
Val: c.Value,
PrevValue: c.PrevValue,
PrevIndex: c.PrevIndex,
Expiration: UnixTimeOrPermanent(c.ExpireTime),
}
return req5.Marshal()
}
type CreateCommand struct {
Key string `json:"key"`
Value string `json:"value"`
ExpireTime time.Time `json:"expireTime"`
Unique bool `json:"unique"`
Dir bool `json:"dir"`
}
func (c *CreateCommand) Type5() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *CreateCommand) Data5() ([]byte, error) {
req5 := &etcdserverpb.Request{
Path: StorePath(c.Key),
Dir: c.Dir,
Val: c.Value,
Expiration: UnixTimeOrPermanent(c.ExpireTime),
}
if c.Unique {
req5.Method = "POST"
} else {
var prevExist = true
req5.Method = "PUT"
req5.PrevExist = &prevExist
}
return req5.Marshal()
}
type DeleteCommand struct {
Key string `json:"key"`
Recursive bool `json:"recursive"`
Dir bool `json:"dir"`
}
func (c *DeleteCommand) Type5() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *DeleteCommand) Data5() ([]byte, error) {
req5 := &etcdserverpb.Request{
Method: "DELETE",
Path: StorePath(c.Key),
Dir: c.Dir,
Recursive: c.Recursive,
}
return req5.Marshal()
}
type SetCommand struct {
Key string `json:"key"`
Value string `json:"value"`
ExpireTime time.Time `json:"expireTime"`
Dir bool `json:"dir"`
}
func (c *SetCommand) Type5() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *SetCommand) Data5() ([]byte, error) {
req5 := &etcdserverpb.Request{
Method: "PUT",
Path: StorePath(c.Key),
Dir: c.Dir,
Val: c.Value,
Expiration: UnixTimeOrPermanent(c.ExpireTime),
}
return req5.Marshal()
}
type UpdateCommand struct {
Key string `json:"key"`
Value string `json:"value"`
ExpireTime time.Time `json:"expireTime"`
}
func (c *UpdateCommand) Type5() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *UpdateCommand) Data5() ([]byte, error) {
exist := true
req5 := &etcdserverpb.Request{
Method: "PUT",
Path: StorePath(c.Key),
Val: c.Value,
PrevExist: &exist,
Expiration: UnixTimeOrPermanent(c.ExpireTime),
}
return req5.Marshal()
}
type SyncCommand struct {
Time time.Time `json:"time"`
}
func (c *SyncCommand) Type5() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *SyncCommand) Data5() ([]byte, error) {
req5 := &etcdserverpb.Request{
Method: "SYNC",
Time: c.Time.UnixNano(),
}
return req5.Marshal()
}
type DefaultJoinCommand struct {
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
}
type DefaultLeaveCommand struct {
Name string `json:"name"`
id uint64
}
type NOPCommand struct{}
//TODO(bcwaldon): Why is CommandName here?
func (c NOPCommand) CommandName() string {
return "raft:nop"
}
func (c *NOPCommand) Type5() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *NOPCommand) Data5() ([]byte, error) {
return nil, nil
}
func Entries4To5(ents4 []*etcd4pb.LogEntry) ([]raftpb.Entry, error) {
ents4Len := len(ents4)
if ents4Len == 0 {
return nil, nil
}
startIndex := ents4[0].GetIndex()
for i, e := range ents4[1:] {
eIndex := e.GetIndex()
// ensure indexes are monotonically increasing
wantIndex := startIndex + uint64(i+1)
if wantIndex != eIndex {
return nil, fmt.Errorf("skipped log index %d", wantIndex)
}
}
raftMap := make(map[string]uint64)
ents5 := make([]raftpb.Entry, 0)
for i, e := range ents4 {
ent, err := toEntry5(e, raftMap)
if err != nil {
log.Fatalf("Error converting entry %d, %s", i, err)
} else {
ents5 = append(ents5, *ent)
}
}
return ents5, nil
}
func toEntry5(ent4 *etcd4pb.LogEntry, raftMap map[string]uint64) (*raftpb.Entry, error) {
cmd4, err := NewCommand4(ent4.GetCommandName(), ent4.GetCommand(), raftMap)
if err != nil {
return nil, err
}
data, err := cmd4.Data5()
if err != nil {
return nil, err
}
ent5 := raftpb.Entry{
Term: ent4.GetTerm(),
Index: ent4.GetIndex(),
Type: cmd4.Type5(),
Data: data,
}
log.Printf("%d: %s -> %s", ent5.Index, ent4.GetCommandName(), ent5.Type)
return &ent5, nil
}
func generateNodeMember(name, rafturl, etcdurl string) *etcdserver.Member {
pURLs, err := types.NewURLs([]string{rafturl})
if err != nil {
log.Fatalf("Invalid Raft URL %s -- this log could never have worked", rafturl)
}
m := etcdserver.NewMember(name, pURLs, etcdDefaultClusterName, nil)
m.ClientURLs = []string{etcdurl}
return m
}

57
migrate/log_test.go Normal file
View File

@ -0,0 +1,57 @@
package migrate
import (
"fmt"
"net/url"
"reflect"
"testing"
"time"
"github.com/coreos/etcd/etcdserver"
)
func TestNewCommand(t *testing.T) {
entries, err := DecodeLog4FromFile("fixtures/cmdlog")
if err != nil {
t.Errorf("read log file error: %v", err)
}
zeroTime, err := time.Parse(time.RFC3339, "1969-12-31T16:00:00-08:00")
if err != nil {
t.Errorf("couldn't create time: %v", err)
}
m := etcdserver.NewMember("alice", []url.URL{{Scheme: "http", Host: "127.0.0.1:7001"}}, etcdDefaultClusterName, nil)
m.ClientURLs = []string{"http://127.0.0.1:4001"}
tests := []interface{}{
&JoinCommand{"alice", "http://127.0.0.1:7001", "http://127.0.0.1:4001", *m},
&NOPCommand{},
&NOPCommand{},
&RemoveCommand{"alice", 0xe52ada62956ff923},
&CompareAndDeleteCommand{"foo", "baz", 9},
&CompareAndSwapCommand{"foo", "bar", zeroTime, "baz", 9},
&CreateCommand{"foo", "bar", zeroTime, true, true},
&DeleteCommand{"foo", true, true},
&SetCommand{"foo", "bar", zeroTime, true},
&SyncCommand{zeroTime},
&UpdateCommand{"foo", "bar", zeroTime},
}
raftMap := make(map[string]uint64)
for i, test := range tests {
e := entries[i]
cmd, err := NewCommand4(e.GetCommandName(), e.GetCommand(), raftMap)
if err != nil {
t.Errorf("#%d: %v", i, err)
continue
}
if !reflect.DeepEqual(cmd, test) {
if i == 5 {
fmt.Println(cmd.(*CompareAndSwapCommand).ExpireTime.Location())
}
t.Errorf("#%d: cmd = %+v, want %+v", i, cmd, test)
}
}
}

328
migrate/snapshot.go Normal file
View File

@ -0,0 +1,328 @@
package migrate
import (
"encoding/json"
"errors"
"fmt"
"hash/crc32"
"io/ioutil"
"log"
"net/url"
"os"
"path"
"sort"
"strconv"
"strings"
"time"
raftpb "github.com/coreos/etcd/raft/raftpb"
)
type Snapshot4 struct {
State []byte `json:"state"`
LastIndex uint64 `json:"lastIndex"`
LastTerm uint64 `json:"lastTerm"`
Peers []struct {
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
} `json:"peers"`
}
type sstore struct {
Root *node
CurrentIndex uint64
CurrentVersion int
}
type node struct {
Path string
CreatedIndex uint64
ModifiedIndex uint64
Parent *node `json:"-"` // should not encode this field! avoid circular dependency.
ExpireTime time.Time
ACL string
Value string // for key-value pair
Children map[string]*node // for directory
}
func replacePathNames(n *node, s1, s2 string) {
n.Path = path.Clean(strings.Replace(n.Path, s1, s2, 1))
for _, c := range n.Children {
replacePathNames(c, s1, s2)
}
}
func pullNodesFromEtcd(n *node) map[string]uint64 {
out := make(map[string]uint64)
machines := n.Children["machines"]
for name, c := range machines.Children {
q, err := url.ParseQuery(c.Value)
if err != nil {
log.Fatal("Couldn't parse old query string value")
}
etcdurl := q.Get("etcd")
rafturl := q.Get("raft")
m := generateNodeMember(name, rafturl, etcdurl)
out[m.Name] = uint64(m.ID)
}
return out
}
func fixEtcd(n *node) {
n.Path = "/0"
machines := n.Children["machines"]
n.Children["members"] = &node{
Path: "/0/members",
CreatedIndex: machines.CreatedIndex,
ModifiedIndex: machines.ModifiedIndex,
ExpireTime: machines.ExpireTime,
ACL: machines.ACL,
Children: make(map[string]*node),
}
for name, c := range machines.Children {
q, err := url.ParseQuery(c.Value)
if err != nil {
log.Fatal("Couldn't parse old query string value")
}
etcdurl := q.Get("etcd")
rafturl := q.Get("raft")
m := generateNodeMember(name, rafturl, etcdurl)
attrBytes, err := json.Marshal(m.Attributes)
if err != nil {
log.Fatal("Couldn't marshal attributes")
}
raftBytes, err := json.Marshal(m.RaftAttributes)
if err != nil {
log.Fatal("Couldn't marshal raft attributes")
}
newNode := &node{
Path: path.Join("/0/members", m.ID.String()),
CreatedIndex: c.CreatedIndex,
ModifiedIndex: c.ModifiedIndex,
ExpireTime: c.ExpireTime,
ACL: c.ACL,
Children: map[string]*node{
"attributes": &node{
Path: path.Join("/0/members", m.ID.String(), "attributes"),
CreatedIndex: c.CreatedIndex,
ModifiedIndex: c.ModifiedIndex,
ExpireTime: c.ExpireTime,
ACL: c.ACL,
Value: string(attrBytes),
},
"raftAttributes": &node{
Path: path.Join("/0/members", m.ID.String(), "raftAttributes"),
CreatedIndex: c.CreatedIndex,
ModifiedIndex: c.ModifiedIndex,
ExpireTime: c.ExpireTime,
ACL: c.ACL,
Value: string(raftBytes),
},
},
}
n.Children["members"].Children[m.ID.String()] = newNode
}
delete(n.Children, "machines")
}
func mangleRoot(n *node) *node {
newRoot := &node{
Path: "/",
CreatedIndex: n.CreatedIndex,
ModifiedIndex: n.ModifiedIndex,
ExpireTime: n.ExpireTime,
ACL: n.ACL,
Children: make(map[string]*node),
}
newRoot.Children["1"] = n
etcd := n.Children["_etcd"]
delete(n.Children, "_etcd")
replacePathNames(n, "/", "/1/")
fixEtcd(etcd)
newRoot.Children["0"] = etcd
return newRoot
}
func (s *Snapshot4) GetNodesFromStore() map[string]uint64 {
st := &sstore{}
if err := json.Unmarshal(s.State, st); err != nil {
log.Fatal("Couldn't unmarshal snapshot")
}
etcd := st.Root.Children["_etcd"]
return pullNodesFromEtcd(etcd)
}
func (s *Snapshot4) Snapshot5() *raftpb.Snapshot {
st := &sstore{}
if err := json.Unmarshal(s.State, st); err != nil {
log.Fatal("Couldn't unmarshal snapshot")
}
st.Root = mangleRoot(st.Root)
newState, err := json.Marshal(st)
if err != nil {
log.Fatal("Couldn't re-marshal new snapshot")
}
snap5 := raftpb.Snapshot{
Data: newState,
Index: s.LastIndex,
Term: s.LastTerm,
Nodes: make([]uint64, len(s.Peers)),
}
for i, p := range s.Peers {
snap5.Nodes[i] = hashName(p.Name)
}
return &snap5
}
func DecodeLatestSnapshot4FromDir(snapdir string) (*Snapshot4, error) {
fname, err := FindLatestFile(snapdir)
if err != nil {
return nil, err
}
if fname == "" {
return nil, nil
}
snappath := path.Join(snapdir, fname)
log.Printf("Decoding snapshot from %s", snappath)
return DecodeSnapshot4FromFile(snappath)
}
// FindLatestFile identifies the "latest" filename in a given directory
// by sorting all the files and choosing the highest value.
func FindLatestFile(dirpath string) (string, error) {
dir, err := os.OpenFile(dirpath, os.O_RDONLY, 0)
if err != nil {
if os.IsNotExist(err) {
err = nil
}
return "", err
}
defer dir.Close()
fnames, err := dir.Readdirnames(-1)
if err != nil {
return "", err
}
if len(fnames) == 0 {
return "", nil
}
names, err := NewSnapshotFileNames(fnames)
if err != nil {
return "", err
}
return names[len(names)-1].FileName, nil
}
func DecodeSnapshot4FromFile(path string) (*Snapshot4, error) {
// Read snapshot data.
f, err := os.OpenFile(path, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
defer f.Close()
return DecodeSnapshot4(f)
}
func DecodeSnapshot4(f *os.File) (*Snapshot4, error) {
// Verify checksum
var checksum uint32
n, err := fmt.Fscanf(f, "%08x\n", &checksum)
if err != nil {
return nil, err
} else if n != 1 {
return nil, errors.New("miss heading checksum")
}
// Load remaining snapshot contents.
b, err := ioutil.ReadAll(f)
if err != nil {
return nil, err
}
// Generate checksum.
byteChecksum := crc32.ChecksumIEEE(b)
if uint32(checksum) != byteChecksum {
return nil, errors.New("bad checksum")
}
// Decode snapshot.
snapshot := new(Snapshot4)
if err = json.Unmarshal(b, snapshot); err != nil {
return nil, err
}
return snapshot, nil
}
func NewSnapshotFileNames(names []string) ([]SnapshotFileName, error) {
s := make([]SnapshotFileName, 0)
for _, n := range names {
trimmed := strings.TrimSuffix(n, ".ss")
if trimmed == n {
return nil, fmt.Errorf("file %q does not have .ss extension", n)
}
parts := strings.SplitN(trimmed, "_", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("unrecognized file name format %q", n)
}
fn := SnapshotFileName{FileName: n}
var err error
fn.Term, err = strconv.ParseUint(parts[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("unable to parse term from filename %q: %v", n, err)
}
fn.Index, err = strconv.ParseUint(parts[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("unable to parse index from filename %q: %v", n, err)
}
s = append(s, fn)
}
sortable := SnapshotFileNames(s)
sort.Sort(&sortable)
return s, nil
}
type SnapshotFileNames []SnapshotFileName
type SnapshotFileName struct {
FileName string
Term uint64
Index uint64
}
func (n *SnapshotFileNames) Less(i, j int) bool {
iTerm, iIndex := (*n)[i].Term, (*n)[i].Index
jTerm, jIndex := (*n)[j].Term, (*n)[j].Index
return iTerm < jTerm || (iTerm == jTerm && iIndex < jIndex)
}
func (n *SnapshotFileNames) Swap(i, j int) {
(*n)[i], (*n)[j] = (*n)[j], (*n)[i]
}
func (n *SnapshotFileNames) Len() int {
return len([]SnapshotFileName(*n))
}

2
test
View File

@ -15,7 +15,7 @@ COVER=${COVER:-"-cover"}
source ./build
# Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt.
TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration pkg/flags pkg/types pkg/transport pkg/wait proxy raft snap store wal"
TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/etcdhttp etcdserver/etcdhttp/httptypes etcdserver/etcdserverpb integration migrate pkg/flags pkg/types pkg/transport pkg/wait proxy raft snap store wal"
FORMATTABLE="$TESTABLE_AND_FORMATTABLE *.go etcdctl/"
# user has not provided PKG override