etcd/migrate/snapshot.go
Ben Darnell 0d680d0e6b Merge remote-tracking branch 'coreos/master' into merge
* coreos/master:
  rafthttp: fix import
  raft: should not decrease match and next when handling out of order msgAppResp
  Fix migration to allow snapshots to have the right IDs
  add snapshotted integration test
  fix test import loop
  fix import loop, add set to types, and fix comments
  etcdserver: autodetect v0.4 WALs and upgrade them to v0.5 automatically
  wal: add a bench for write entry
  rafthttp: add streaming server and client
  dep: use vendored imports in codegangsta/cli
  dep: bump golang.org/x/net/context

Conflicts:
	etcdserver/server.go
	etcdserver/server_test.go
	migrate/snapshot.go
2014-11-21 15:40:11 -05:00

335 lines
7.5 KiB
Go

package migrate
import (
"encoding/json"
"errors"
"fmt"
"hash/crc32"
"io/ioutil"
"log"
"net/url"
"os"
"path"
"sort"
"strconv"
"strings"
"time"
raftpb "github.com/coreos/etcd/raft/raftpb"
)
type Snapshot4 struct {
State []byte `json:"state"`
LastIndex uint64 `json:"lastIndex"`
LastTerm uint64 `json:"lastTerm"`
Peers []struct {
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
} `json:"peers"`
}
type sstore struct {
Root *node
CurrentIndex uint64
CurrentVersion int
}
type node struct {
Path string
CreatedIndex uint64
ModifiedIndex uint64
Parent *node `json:"-"` // should not encode this field! avoid circular dependency.
ExpireTime time.Time
ACL string
Value string // for key-value pair
Children map[string]*node // for directory
}
func replacePathNames(n *node, s1, s2 string) {
n.Path = path.Clean(strings.Replace(n.Path, s1, s2, 1))
for _, c := range n.Children {
replacePathNames(c, s1, s2)
}
}
func pullNodesFromEtcd(n *node) map[string]uint64 {
out := make(map[string]uint64)
machines := n.Children["machines"]
for name, c := range machines.Children {
q, err := url.ParseQuery(c.Value)
if err != nil {
log.Fatal("Couldn't parse old query string value")
}
etcdurl := q.Get("etcd")
rafturl := q.Get("raft")
m := generateNodeMember(name, rafturl, etcdurl)
out[m.Name] = uint64(m.ID)
}
return out
}
func fixEtcd(n *node) {
n.Path = "/0"
machines := n.Children["machines"]
n.Children["members"] = &node{
Path: "/0/members",
CreatedIndex: machines.CreatedIndex,
ModifiedIndex: machines.ModifiedIndex,
ExpireTime: machines.ExpireTime,
ACL: machines.ACL,
Children: make(map[string]*node),
}
for name, c := range machines.Children {
q, err := url.ParseQuery(c.Value)
if err != nil {
log.Fatal("Couldn't parse old query string value")
}
etcdurl := q.Get("etcd")
rafturl := q.Get("raft")
m := generateNodeMember(name, rafturl, etcdurl)
attrBytes, err := json.Marshal(m.attributes)
if err != nil {
log.Fatal("Couldn't marshal attributes")
}
raftBytes, err := json.Marshal(m.raftAttributes)
if err != nil {
log.Fatal("Couldn't marshal raft attributes")
}
newNode := &node{
Path: path.Join("/0/members", m.ID.String()),
CreatedIndex: c.CreatedIndex,
ModifiedIndex: c.ModifiedIndex,
ExpireTime: c.ExpireTime,
ACL: c.ACL,
Children: map[string]*node{
"attributes": &node{
Path: path.Join("/0/members", m.ID.String(), "attributes"),
CreatedIndex: c.CreatedIndex,
ModifiedIndex: c.ModifiedIndex,
ExpireTime: c.ExpireTime,
ACL: c.ACL,
Value: string(attrBytes),
},
"raftAttributes": &node{
Path: path.Join("/0/members", m.ID.String(), "raftAttributes"),
CreatedIndex: c.CreatedIndex,
ModifiedIndex: c.ModifiedIndex,
ExpireTime: c.ExpireTime,
ACL: c.ACL,
Value: string(raftBytes),
},
},
}
n.Children["members"].Children[m.ID.String()] = newNode
}
delete(n.Children, "machines")
}
func mangleRoot(n *node) *node {
newRoot := &node{
Path: "/",
CreatedIndex: n.CreatedIndex,
ModifiedIndex: n.ModifiedIndex,
ExpireTime: n.ExpireTime,
ACL: n.ACL,
Children: make(map[string]*node),
}
newRoot.Children["1"] = n
etcd := n.Children["_etcd"]
delete(n.Children, "_etcd")
replacePathNames(n, "/", "/1/")
fixEtcd(etcd)
newRoot.Children["0"] = etcd
return newRoot
}
func (s *Snapshot4) GetNodesFromStore() map[string]uint64 {
st := &sstore{}
if err := json.Unmarshal(s.State, st); err != nil {
log.Fatal("Couldn't unmarshal snapshot")
}
etcd := st.Root.Children["_etcd"]
return pullNodesFromEtcd(etcd)
}
func (s *Snapshot4) Snapshot5() *raftpb.Snapshot {
st := &sstore{}
if err := json.Unmarshal(s.State, st); err != nil {
log.Fatal("Couldn't unmarshal snapshot")
}
st.Root = mangleRoot(st.Root)
newState, err := json.Marshal(st)
if err != nil {
log.Fatal("Couldn't re-marshal new snapshot")
}
nodes := s.GetNodesFromStore()
nodeList := make([]uint64, 0)
for _, v := range nodes {
nodeList = append(nodeList, v)
}
snap5 := raftpb.Snapshot{
Data: newState,
Metadata: raftpb.SnapshotMetadata{
Index: s.LastIndex,
Term: s.LastTerm,
ConfState: raftpb.ConfState{
Nodes: nodeList,
},
},
}
return &snap5
}
func DecodeLatestSnapshot4FromDir(snapdir string) (*Snapshot4, error) {
fname, err := FindLatestFile(snapdir)
if err != nil {
return nil, err
}
if fname == "" {
return nil, nil
}
snappath := path.Join(snapdir, fname)
log.Printf("Decoding snapshot from %s", snappath)
return DecodeSnapshot4FromFile(snappath)
}
// FindLatestFile identifies the "latest" filename in a given directory
// by sorting all the files and choosing the highest value.
func FindLatestFile(dirpath string) (string, error) {
dir, err := os.OpenFile(dirpath, os.O_RDONLY, 0)
if err != nil {
if os.IsNotExist(err) {
err = nil
}
return "", err
}
defer dir.Close()
fnames, err := dir.Readdirnames(-1)
if err != nil {
return "", err
}
if len(fnames) == 0 {
return "", nil
}
names, err := NewSnapshotFileNames(fnames)
if err != nil {
return "", err
}
return names[len(names)-1].FileName, nil
}
func DecodeSnapshot4FromFile(path string) (*Snapshot4, error) {
// Read snapshot data.
f, err := os.OpenFile(path, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
defer f.Close()
return DecodeSnapshot4(f)
}
func DecodeSnapshot4(f *os.File) (*Snapshot4, error) {
// Verify checksum
var checksum uint32
n, err := fmt.Fscanf(f, "%08x\n", &checksum)
if err != nil {
return nil, err
} else if n != 1 {
return nil, errors.New("miss heading checksum")
}
// Load remaining snapshot contents.
b, err := ioutil.ReadAll(f)
if err != nil {
return nil, err
}
// Generate checksum.
byteChecksum := crc32.ChecksumIEEE(b)
if uint32(checksum) != byteChecksum {
return nil, errors.New("bad checksum")
}
// Decode snapshot.
snapshot := new(Snapshot4)
if err = json.Unmarshal(b, snapshot); err != nil {
return nil, err
}
return snapshot, nil
}
func NewSnapshotFileNames(names []string) ([]SnapshotFileName, error) {
s := make([]SnapshotFileName, 0)
for _, n := range names {
trimmed := strings.TrimSuffix(n, ".ss")
if trimmed == n {
return nil, fmt.Errorf("file %q does not have .ss extension", n)
}
parts := strings.SplitN(trimmed, "_", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("unrecognized file name format %q", n)
}
fn := SnapshotFileName{FileName: n}
var err error
fn.Term, err = strconv.ParseUint(parts[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("unable to parse term from filename %q: %v", n, err)
}
fn.Index, err = strconv.ParseUint(parts[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("unable to parse index from filename %q: %v", n, err)
}
s = append(s, fn)
}
sortable := SnapshotFileNames(s)
sort.Sort(&sortable)
return s, nil
}
type SnapshotFileNames []SnapshotFileName
type SnapshotFileName struct {
FileName string
Term uint64
Index uint64
}
func (n *SnapshotFileNames) Less(i, j int) bool {
iTerm, iIndex := (*n)[i].Term, (*n)[i].Index
jTerm, jIndex := (*n)[j].Term, (*n)[j].Index
return iTerm < jTerm || (iTerm == jTerm && iIndex < jIndex)
}
func (n *SnapshotFileNames) Swap(i, j int) {
(*n)[i], (*n)[j] = (*n)[j], (*n)[i]
}
func (n *SnapshotFileNames) Len() int {
return len([]SnapshotFileName(*n))
}