*: remove migration related stuff from 2.2

This commit is contained in:
Xiang Li 2015-06-30 15:13:21 -07:00
parent eee1c8b8ee
commit 2b8abeb093
19 changed files with 2 additions and 2302 deletions

View File

@ -24,49 +24,6 @@ https://github.com/coreos/etcd/blob/master/Documentation/configuration.md.
The default data dir location has changed from {$hostname}.etcd to {name}.etcd.
## Data Directory Migration
The disk format within the data directory changed with etcd 2.0.
If you run etcd 2.0 on an etcd 0.4 data directory it will automatically migrate the data and start.
You will want to coordinate this upgrade by walking through each of your machines in the cluster, stopping etcd 0.4 and then starting etcd 2.0.
If you would rather manually do the migration, to test it out first in another environment, you can use the [migration tool doc][migrationtooldoc].
[migrationtooldoc]: https://github.com/coreos/etcd/blob/master/tools/etcd-migrate/README.md
## Snapshot Migration
If you are only interested in the data in etcd you can migrate a snapshot of your data from a v0.4.9+ cluster into a new etcd 2.0 cluster using a snapshot migration.
The advantage of this method is that you are directly dumping only the etcd data so you can run your old and new cluster side-by-side, snapshot the data, import it and then point your applications at this cluster.
The disadvantage is that the etcd indexes of your data will change which may confuse applications that use etcd.
To get started get the newest data snapshot from the 0.4.9+ cluster:
```
curl http://cluster.example.com:4001/v2/migration/snapshot > backup.snap
```
Now, import the snapshot into your new cluster:
```
etcdctl -C new_cluster.example.com import --snap backup.snap
```
If you have a large amount of data, you can specify more concurrent works to copy data in parallel by using `-c` flag.
If you have hidden keys to copy, you can use `--hidden` flag to specify.
And the data will quickly copy into the new cluster:
```
entering dir: /
entering dir: /foo
entering dir: /foo/bar
copying key: /foo/bar/1 1
entering dir: /
entering dir: /foo2
entering dir: /foo2/bar2
copying key: /foo2/bar2/2 2
```
## Key-Value API
### Read consistency flag

View File

@ -20,7 +20,6 @@ import (
"path"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/migrate"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
@ -109,14 +108,6 @@ func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID,
// It must ensure that, after upgrading, the most recent version is present.
func upgradeDataDir(baseDataDir string, name string, ver version.DataDirVersion) error {
switch ver {
case version.DataDir0_4:
plog.Infof("converting v0.4 log to v2.0")
err := migrate.Migrate4To2(baseDataDir, name)
if err != nil {
plog.Fatalf("failed to migrate data-dir (%v)", err)
return err
}
fallthrough
case version.DataDir2_0:
err := makeMemberDir(baseDataDir)
if err != nil {

View File

@ -1,112 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"errors"
"flag"
"fmt"
"log"
"path"
etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/migrate"
"github.com/coreos/etcd/pkg/types"
raftpb "github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
)
func walDir5(dataDir string) string {
return path.Join(dataDir, "wal")
}
func logFile4(dataDir string) string {
return path.Join(dataDir, "log")
}
func main() {
version := flag.Int("version", 5, "4 or 5")
from := flag.String("data-dir", "", "")
flag.Parse()
if *from == "" {
log.Fatal("Must provide -data-dir flag")
}
var ents []raftpb.Entry
var err error
switch *version {
case 4:
ents, err = dump4(*from)
case 5:
ents, err = dump5(*from)
default:
err = errors.New("value of -version flag must be 4 or 5")
}
if err != nil {
log.Fatalf("Failed decoding log: %v", err)
}
for _, e := range ents {
msg := fmt.Sprintf("%2d %5d: ", e.Term, e.Index)
switch e.Type {
case raftpb.EntryNormal:
msg = fmt.Sprintf("%s norm", msg)
var r etcdserverpb.Request
if err := r.Unmarshal(e.Data); err != nil {
msg = fmt.Sprintf("%s ???", msg)
} else {
msg = fmt.Sprintf("%s %s %s %s", msg, r.Method, r.Path, r.Val)
}
case raftpb.EntryConfChange:
msg = fmt.Sprintf("%s conf", msg)
var r raftpb.ConfChange
if err := r.Unmarshal(e.Data); err != nil {
msg = fmt.Sprintf("%s ???", msg)
} else {
msg = fmt.Sprintf("%s %s %s %s", msg, r.Type, types.ID(r.NodeID), r.Context)
}
}
fmt.Println(msg)
}
}
func dump4(dataDir string) ([]raftpb.Entry, error) {
lf4 := logFile4(dataDir)
ents, err := migrate.DecodeLog4FromFile(lf4)
if err != nil {
return nil, err
}
return migrate.Entries4To2(ents)
}
func dump5(dataDir string) ([]raftpb.Entry, error) {
wd5 := walDir5(dataDir)
if !wal.Exist(wd5) {
return nil, fmt.Errorf("No wal exists at %s", wd5)
}
w, err := wal.Open(wd5, walpb.Snapshot{})
if err != nil {
return nil, err
}
defer w.Close()
_, _, ents, err := w.ReadAll()
return ents, err
}

View File

@ -1,53 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migrate
import (
"encoding/json"
"io/ioutil"
"github.com/coreos/etcd/raft/raftpb"
)
type Config4 struct {
CommitIndex uint64 `json:"commitIndex"`
Peers []struct {
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
} `json:"peers"`
}
func (c *Config4) HardState2() raftpb.HardState {
return raftpb.HardState{
Commit: c.CommitIndex,
Term: 0,
Vote: 0,
}
}
func DecodeConfig4FromFile(cfgPath string) (*Config4, error) {
b, err := ioutil.ReadFile(cfgPath)
if err != nil {
return nil, err
}
conf := &Config4{}
if err = json.Unmarshal(b, conf); err != nil {
return nil, err
}
return conf, nil
}

View File

@ -1,195 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migrate
import (
"fmt"
"log"
"os"
"path"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
raftpb "github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
)
// We need an offset in leader election terms, because term 0 is special in 2.0.
const termOffset4to2 = 1
func snapDir4(dataDir string) string {
return path.Join(dataDir, "snapshot")
}
func logFile4(dataDir string) string {
return path.Join(dataDir, "log")
}
func cfgFile4(dataDir string) string {
return path.Join(dataDir, "conf")
}
func snapDir2(dataDir string) string {
return path.Join(dataDir, "snap")
}
func walDir2(dataDir string) string {
return path.Join(dataDir, "wal")
}
func Migrate4To2(dataDir string, name string) error {
// prep new directories
sd2 := snapDir2(dataDir)
if err := os.MkdirAll(sd2, 0700); err != nil {
return fmt.Errorf("failed creating snapshot directory %s: %v", sd2, err)
}
// read v0.4 data
snap4, err := DecodeLatestSnapshot4FromDir(snapDir4(dataDir))
if err != nil {
return err
}
cfg4, err := DecodeConfig4FromFile(cfgFile4(dataDir))
if err != nil {
return err
}
ents4, err := DecodeLog4FromFile(logFile4(dataDir))
if err != nil {
return err
}
nodeIDs := ents4.NodeIDs()
nodeID := GuessNodeID(nodeIDs, snap4, cfg4, name)
if nodeID == 0 {
return fmt.Errorf("Couldn't figure out the node ID from the log or flags, cannot convert")
}
metadata := pbutil.MustMarshal(&pb.Metadata{NodeID: nodeID, ClusterID: 0x04add5})
wd2 := walDir2(dataDir)
w, err := wal.Create(wd2, metadata)
if err != nil {
return fmt.Errorf("failed initializing wal at %s: %v", wd2, err)
}
defer w.Close()
// transform v0.4 data
var snap2 *raftpb.Snapshot
if snap4 == nil {
log.Printf("No snapshot found")
} else {
log.Printf("Found snapshot: lastIndex=%d", snap4.LastIndex)
snap2 = snap4.Snapshot2()
}
st2 := cfg4.HardState2()
// If we've got the most recent snapshot, we can use it's committed index. Still likely less than the current actual index, but worth it for the replay.
if snap2 != nil && st2.Commit < snap2.Metadata.Index {
st2.Commit = snap2.Metadata.Index
}
ents2, err := Entries4To2(ents4)
if err != nil {
return err
}
ents2Len := len(ents2)
log.Printf("Found %d log entries: firstIndex=%d lastIndex=%d", ents2Len, ents2[0].Index, ents2[ents2Len-1].Index)
// set the state term to the biggest term we have ever seen,
// so term of future entries will not be the same with term of old ones.
st2.Term = ents2[ents2Len-1].Term
// explicitly prepend an empty entry as the WAL code expects it
ents2 = append(make([]raftpb.Entry, 1), ents2...)
if err = w.Save(st2, ents2); err != nil {
return err
}
log.Printf("Log migration successful")
// migrate snapshot (if necessary) and logs
var walsnap walpb.Snapshot
if snap2 != nil {
walsnap.Index, walsnap.Term = snap2.Metadata.Index, snap2.Metadata.Term
ss := snap.New(sd2)
if err := ss.SaveSnap(*snap2); err != nil {
return err
}
log.Printf("Snapshot migration successful")
}
if err = w.SaveSnapshot(walsnap); err != nil {
return err
}
return nil
}
func GuessNodeID(nodes map[string]uint64, snap4 *Snapshot4, cfg *Config4, name string) uint64 {
var snapNodes map[string]uint64
if snap4 != nil {
snapNodes = snap4.GetNodesFromStore()
}
// First, use the flag, if set.
if name != "" {
log.Printf("Using suggested name %s", name)
if val, ok := nodes[name]; ok {
log.Printf("Assigning %s the ID %s", name, types.ID(val))
return val
}
if snapNodes != nil {
if val, ok := snapNodes[name]; ok {
log.Printf("Assigning %s the ID %s", name, types.ID(val))
return val
}
}
log.Printf("Name not found, autodetecting...")
}
// Next, look at the snapshot peers, if that exists.
if snap4 != nil {
//snapNodes := make(map[string]uint64)
//for _, p := range snap4.Peers {
//m := generateNodeMember(p.Name, p.ConnectionString, "")
//snapNodes[p.Name] = uint64(m.ID)
//}
for _, p := range cfg.Peers {
delete(snapNodes, p.Name)
}
if len(snapNodes) == 1 {
for nodename, id := range nodes {
log.Printf("Autodetected from snapshot: name %s", nodename)
return id
}
}
}
// Then, try and deduce from the log.
for _, p := range cfg.Peers {
delete(nodes, p.Name)
}
if len(nodes) == 1 {
for nodename, id := range nodes {
log.Printf("Autodetected name %s", nodename)
return id
}
}
return 0
}

View File

@ -1,670 +0,0 @@
// Code generated by protoc-gen-gogo.
// source: log_entry.proto
// DO NOT EDIT!
/*
Package protobuf is a generated protocol buffer package.
It is generated from these files:
log_entry.proto
It has these top-level messages:
LogEntry
*/
package protobuf
import proto "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
import math "math"
// discarding unused import gogoproto "github.com/gogo/protobuf/gogoproto/gogo.pb"
import io "io"
import fmt "fmt"
import github_com_gogo_protobuf_proto "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
import strings "strings"
import reflect "reflect"
import sort "sort"
import strconv "strconv"
import bytes "bytes"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = math.Inf
type LogEntry struct {
Index *uint64 `protobuf:"varint,1,req" json:"Index,omitempty"`
Term *uint64 `protobuf:"varint,2,req" json:"Term,omitempty"`
CommandName *string `protobuf:"bytes,3,req" json:"CommandName,omitempty"`
Command []byte `protobuf:"bytes,4,opt" json:"Command,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *LogEntry) Reset() { *m = LogEntry{} }
func (*LogEntry) ProtoMessage() {}
func (m *LogEntry) GetIndex() uint64 {
if m != nil && m.Index != nil {
return *m.Index
}
return 0
}
func (m *LogEntry) GetTerm() uint64 {
if m != nil && m.Term != nil {
return *m.Term
}
return 0
}
func (m *LogEntry) GetCommandName() string {
if m != nil && m.CommandName != nil {
return *m.CommandName
}
return ""
}
func (m *LogEntry) GetCommand() []byte {
if m != nil {
return m.Command
}
return nil
}
func init() {
}
func (m *LogEntry) Unmarshal(data []byte) error {
var hasFields [1]uint64
l := len(data)
iNdEx := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType)
}
var v uint64
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
v |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
m.Index = &v
hasFields[0] |= uint64(0x00000001)
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Term", wireType)
}
var v uint64
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
v |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
m.Term = &v
hasFields[0] |= uint64(0x00000002)
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field CommandName", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
postIndex := iNdEx + int(stringLen)
if postIndex > l {
return io.ErrUnexpectedEOF
}
s := string(data[iNdEx:postIndex])
m.CommandName = &s
iNdEx = postIndex
hasFields[0] |= uint64(0x00000004)
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Command", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
byteLen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
postIndex := iNdEx + byteLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Command = append([]byte{}, data[iNdEx:postIndex]...)
iNdEx = postIndex
default:
var sizeOfWire int
for {
sizeOfWire++
wire >>= 7
if wire == 0 {
break
}
}
iNdEx -= sizeOfWire
skippy, err := skipLogEntry(data[iNdEx:])
if err != nil {
return err
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, data[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if hasFields[0]&uint64(0x00000001) == 0 {
return github_com_gogo_protobuf_proto.NewRequiredNotSetError("Index")
}
if hasFields[0]&uint64(0x00000002) == 0 {
return github_com_gogo_protobuf_proto.NewRequiredNotSetError("Term")
}
if hasFields[0]&uint64(0x00000004) == 0 {
return github_com_gogo_protobuf_proto.NewRequiredNotSetError("CommandName")
}
return nil
}
func skipLogEntry(data []byte) (n int, err error) {
l := len(data)
iNdEx := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for {
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if data[iNdEx-1] < 0x80 {
break
}
}
return iNdEx, nil
case 1:
iNdEx += 8
return iNdEx, nil
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
iNdEx += length
return iNdEx, nil
case 3:
for {
var innerWire uint64
var start int = iNdEx
for shift := uint(0); ; shift += 7 {
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
innerWire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
innerWireType := int(innerWire & 0x7)
if innerWireType == 4 {
break
}
next, err := skipLogEntry(data[start:])
if err != nil {
return 0, err
}
iNdEx = start + next
}
return iNdEx, nil
case 4:
return iNdEx, nil
case 5:
iNdEx += 4
return iNdEx, nil
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
}
panic("unreachable")
}
func (this *LogEntry) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&LogEntry{`,
`Index:` + valueToStringLogEntry(this.Index) + `,`,
`Term:` + valueToStringLogEntry(this.Term) + `,`,
`CommandName:` + valueToStringLogEntry(this.CommandName) + `,`,
`Command:` + valueToStringLogEntry(this.Command) + `,`,
`XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`,
`}`,
}, "")
return s
}
func valueToStringLogEntry(v interface{}) string {
rv := reflect.ValueOf(v)
if rv.IsNil() {
return "nil"
}
pv := reflect.Indirect(rv).Interface()
return fmt.Sprintf("*%v", pv)
}
func (m *LogEntry) Size() (n int) {
var l int
_ = l
if m.Index != nil {
n += 1 + sovLogEntry(uint64(*m.Index))
}
if m.Term != nil {
n += 1 + sovLogEntry(uint64(*m.Term))
}
if m.CommandName != nil {
l = len(*m.CommandName)
n += 1 + l + sovLogEntry(uint64(l))
}
if m.Command != nil {
l = len(m.Command)
n += 1 + l + sovLogEntry(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovLogEntry(x uint64) (n int) {
for {
n++
x >>= 7
if x == 0 {
break
}
}
return n
}
func sozLogEntry(x uint64) (n int) {
return sovLogEntry(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func NewPopulatedLogEntry(r randyLogEntry, easy bool) *LogEntry {
this := &LogEntry{}
v1 := uint64(uint64(r.Uint32()))
this.Index = &v1
v2 := uint64(uint64(r.Uint32()))
this.Term = &v2
v3 := randStringLogEntry(r)
this.CommandName = &v3
if r.Intn(10) != 0 {
v4 := r.Intn(100)
this.Command = make([]byte, v4)
for i := 0; i < v4; i++ {
this.Command[i] = byte(r.Intn(256))
}
}
if !easy && r.Intn(10) != 0 {
this.XXX_unrecognized = randUnrecognizedLogEntry(r, 5)
}
return this
}
type randyLogEntry interface {
Float32() float32
Float64() float64
Int63() int64
Int31() int32
Uint32() uint32
Intn(n int) int
}
func randUTF8RuneLogEntry(r randyLogEntry) rune {
ru := r.Intn(62)
if ru < 10 {
return rune(ru + 48)
} else if ru < 36 {
return rune(ru + 55)
}
return rune(ru + 61)
}
func randStringLogEntry(r randyLogEntry) string {
v5 := r.Intn(100)
tmps := make([]rune, v5)
for i := 0; i < v5; i++ {
tmps[i] = randUTF8RuneLogEntry(r)
}
return string(tmps)
}
func randUnrecognizedLogEntry(r randyLogEntry, maxFieldNumber int) (data []byte) {
l := r.Intn(5)
for i := 0; i < l; i++ {
wire := r.Intn(4)
if wire == 3 {
wire = 5
}
fieldNumber := maxFieldNumber + r.Intn(100)
data = randFieldLogEntry(data, r, fieldNumber, wire)
}
return data
}
func randFieldLogEntry(data []byte, r randyLogEntry, fieldNumber int, wire int) []byte {
key := uint32(fieldNumber)<<3 | uint32(wire)
switch wire {
case 0:
data = encodeVarintPopulateLogEntry(data, uint64(key))
v6 := r.Int63()
if r.Intn(2) == 0 {
v6 *= -1
}
data = encodeVarintPopulateLogEntry(data, uint64(v6))
case 1:
data = encodeVarintPopulateLogEntry(data, uint64(key))
data = append(data, byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)))
case 2:
data = encodeVarintPopulateLogEntry(data, uint64(key))
ll := r.Intn(100)
data = encodeVarintPopulateLogEntry(data, uint64(ll))
for j := 0; j < ll; j++ {
data = append(data, byte(r.Intn(256)))
}
default:
data = encodeVarintPopulateLogEntry(data, uint64(key))
data = append(data, byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)), byte(r.Intn(256)))
}
return data
}
func encodeVarintPopulateLogEntry(data []byte, v uint64) []byte {
for v >= 1<<7 {
data = append(data, uint8(uint64(v)&0x7f|0x80))
v >>= 7
}
data = append(data, uint8(v))
return data
}
func (m *LogEntry) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
if err != nil {
return nil, err
}
return data[:n], nil
}
func (m *LogEntry) MarshalTo(data []byte) (n int, err error) {
var i int
_ = i
var l int
_ = l
if m.Index == nil {
return 0, github_com_gogo_protobuf_proto.NewRequiredNotSetError("Index")
} else {
data[i] = 0x8
i++
i = encodeVarintLogEntry(data, i, uint64(*m.Index))
}
if m.Term == nil {
return 0, github_com_gogo_protobuf_proto.NewRequiredNotSetError("Term")
} else {
data[i] = 0x10
i++
i = encodeVarintLogEntry(data, i, uint64(*m.Term))
}
if m.CommandName == nil {
return 0, github_com_gogo_protobuf_proto.NewRequiredNotSetError("CommandName")
} else {
data[i] = 0x1a
i++
i = encodeVarintLogEntry(data, i, uint64(len(*m.CommandName)))
i += copy(data[i:], *m.CommandName)
}
if m.Command != nil {
data[i] = 0x22
i++
i = encodeVarintLogEntry(data, i, uint64(len(m.Command)))
i += copy(data[i:], m.Command)
}
if m.XXX_unrecognized != nil {
i += copy(data[i:], m.XXX_unrecognized)
}
return i, nil
}
func encodeFixed64LogEntry(data []byte, offset int, v uint64) int {
data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8)
data[offset+2] = uint8(v >> 16)
data[offset+3] = uint8(v >> 24)
data[offset+4] = uint8(v >> 32)
data[offset+5] = uint8(v >> 40)
data[offset+6] = uint8(v >> 48)
data[offset+7] = uint8(v >> 56)
return offset + 8
}
func encodeFixed32LogEntry(data []byte, offset int, v uint32) int {
data[offset] = uint8(v)
data[offset+1] = uint8(v >> 8)
data[offset+2] = uint8(v >> 16)
data[offset+3] = uint8(v >> 24)
return offset + 4
}
func encodeVarintLogEntry(data []byte, offset int, v uint64) int {
for v >= 1<<7 {
data[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
data[offset] = uint8(v)
return offset + 1
}
func (this *LogEntry) GoString() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&protobuf.LogEntry{` +
`Index:` + valueToGoStringLogEntry(this.Index, "uint64"),
`Term:` + valueToGoStringLogEntry(this.Term, "uint64"),
`CommandName:` + valueToGoStringLogEntry(this.CommandName, "string"),
`Command:` + valueToGoStringLogEntry(this.Command, "byte"),
`XXX_unrecognized:` + fmt.Sprintf("%#v", this.XXX_unrecognized) + `}`}, ", ")
return s
}
func valueToGoStringLogEntry(v interface{}, typ string) string {
rv := reflect.ValueOf(v)
if rv.IsNil() {
return "nil"
}
pv := reflect.Indirect(rv).Interface()
return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv)
}
func extensionToGoStringLogEntry(e map[int32]github_com_gogo_protobuf_proto.Extension) string {
if e == nil {
return "nil"
}
s := "map[int32]proto.Extension{"
keys := make([]int, 0, len(e))
for k := range e {
keys = append(keys, int(k))
}
sort.Ints(keys)
ss := []string{}
for _, k := range keys {
ss = append(ss, strconv.Itoa(k)+": "+e[int32(k)].GoString())
}
s += strings.Join(ss, ",") + "}"
return s
}
func (this *LogEntry) VerboseEqual(that interface{}) error {
if that == nil {
if this == nil {
return nil
}
return fmt.Errorf("that == nil && this != nil")
}
that1, ok := that.(*LogEntry)
if !ok {
return fmt.Errorf("that is not of type *LogEntry")
}
if that1 == nil {
if this == nil {
return nil
}
return fmt.Errorf("that is type *LogEntry but is nil && this != nil")
} else if this == nil {
return fmt.Errorf("that is type *LogEntrybut is not nil && this == nil")
}
if this.Index != nil && that1.Index != nil {
if *this.Index != *that1.Index {
return fmt.Errorf("Index this(%v) Not Equal that(%v)", *this.Index, *that1.Index)
}
} else if this.Index != nil {
return fmt.Errorf("this.Index == nil && that.Index != nil")
} else if that1.Index != nil {
return fmt.Errorf("Index this(%v) Not Equal that(%v)", this.Index, that1.Index)
}
if this.Term != nil && that1.Term != nil {
if *this.Term != *that1.Term {
return fmt.Errorf("Term this(%v) Not Equal that(%v)", *this.Term, *that1.Term)
}
} else if this.Term != nil {
return fmt.Errorf("this.Term == nil && that.Term != nil")
} else if that1.Term != nil {
return fmt.Errorf("Term this(%v) Not Equal that(%v)", this.Term, that1.Term)
}
if this.CommandName != nil && that1.CommandName != nil {
if *this.CommandName != *that1.CommandName {
return fmt.Errorf("CommandName this(%v) Not Equal that(%v)", *this.CommandName, *that1.CommandName)
}
} else if this.CommandName != nil {
return fmt.Errorf("this.CommandName == nil && that.CommandName != nil")
} else if that1.CommandName != nil {
return fmt.Errorf("CommandName this(%v) Not Equal that(%v)", this.CommandName, that1.CommandName)
}
if !bytes.Equal(this.Command, that1.Command) {
return fmt.Errorf("Command this(%v) Not Equal that(%v)", this.Command, that1.Command)
}
if !bytes.Equal(this.XXX_unrecognized, that1.XXX_unrecognized) {
return fmt.Errorf("XXX_unrecognized this(%v) Not Equal that(%v)", this.XXX_unrecognized, that1.XXX_unrecognized)
}
return nil
}
func (this *LogEntry) Equal(that interface{}) bool {
if that == nil {
if this == nil {
return true
}
return false
}
that1, ok := that.(*LogEntry)
if !ok {
return false
}
if that1 == nil {
if this == nil {
return true
}
return false
} else if this == nil {
return false
}
if this.Index != nil && that1.Index != nil {
if *this.Index != *that1.Index {
return false
}
} else if this.Index != nil {
return false
} else if that1.Index != nil {
return false
}
if this.Term != nil && that1.Term != nil {
if *this.Term != *that1.Term {
return false
}
} else if this.Term != nil {
return false
} else if that1.Term != nil {
return false
}
if this.CommandName != nil && that1.CommandName != nil {
if *this.CommandName != *that1.CommandName {
return false
}
} else if this.CommandName != nil {
return false
} else if that1.CommandName != nil {
return false
}
if !bytes.Equal(this.Command, that1.Command) {
return false
}
if !bytes.Equal(this.XXX_unrecognized, that1.XXX_unrecognized) {
return false
}
return true
}

View File

@ -1,22 +0,0 @@
package protobuf;
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option (gogoproto.gostring_all) = true;
option (gogoproto.equal_all) = true;
option (gogoproto.verbose_equal_all) = true;
option (gogoproto.goproto_stringer_all) = false;
option (gogoproto.stringer_all) = true;
option (gogoproto.populate_all) = true;
option (gogoproto.testgen_all) = true;
option (gogoproto.benchgen_all) = true;
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
message LogEntry {
required uint64 Index=1;
required uint64 Term=2;
required string CommandName=3;
optional bytes Command=4; // for nop-command
}

Binary file not shown.

View File

@ -1,519 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migrate
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"os"
"path"
"time"
etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb"
etcd4pb "github.com/coreos/etcd/migrate/etcd4pb"
"github.com/coreos/etcd/pkg/types"
raftpb "github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/store"
)
const etcdDefaultClusterName = "etcd-cluster"
func UnixTimeOrPermanent(expireTime time.Time) int64 {
expire := expireTime.Unix()
if expireTime == store.Permanent {
expire = 0
}
return expire
}
type Log4 []*etcd4pb.LogEntry
func (l Log4) NodeIDs() map[string]uint64 {
out := make(map[string]uint64)
for _, e := range l {
if e.GetCommandName() == "etcd:join" {
cmd4, err := NewCommand4(e.GetCommandName(), e.GetCommand(), nil)
if err != nil {
log.Println("error converting an etcd:join to v2.0 format. Likely corrupt!")
return nil
}
join := cmd4.(*JoinCommand)
m := generateNodeMember(join.Name, join.RaftURL, "")
out[join.Name] = uint64(m.ID)
}
if e.GetCommandName() == "etcd:remove" {
cmd4, err := NewCommand4(e.GetCommandName(), e.GetCommand(), nil)
if err != nil {
return nil
}
name := cmd4.(*RemoveCommand).Name
delete(out, name)
}
}
return out
}
func StorePath(key string) string {
return path.Join("/1", key)
}
func DecodeLog4FromFile(logpath string) (Log4, error) {
file, err := os.OpenFile(logpath, os.O_RDONLY, 0600)
if err != nil {
return nil, err
}
defer file.Close()
return DecodeLog4(file)
}
func DecodeLog4(file *os.File) ([]*etcd4pb.LogEntry, error) {
var readBytes int64
entries := make([]*etcd4pb.LogEntry, 0)
for {
entry, n, err := DecodeNextEntry4(file)
if err != nil {
if err == io.EOF {
break
}
return nil, fmt.Errorf("failed decoding next log entry: %v", err)
}
entries = append(entries, entry)
readBytes += int64(n)
}
return entries, nil
}
// DecodeNextEntry4 unmarshals a v0.4 log entry from a reader. Returns the
// number of bytes read and any error that occurs.
func DecodeNextEntry4(r io.Reader) (*etcd4pb.LogEntry, int, error) {
var length int
_, err := fmt.Fscanf(r, "%8x\n", &length)
if err != nil {
return nil, -1, err
}
data := make([]byte, length)
if _, err = io.ReadFull(r, data); err != nil {
return nil, -1, err
}
ent4 := new(etcd4pb.LogEntry)
if err = ent4.Unmarshal(data); err != nil {
return nil, -1, err
}
// add width of scanner token to length
length = length + 8 + 1
return ent4, length, nil
}
func hashName(name string) uint64 {
var sum uint64
for _, ch := range name {
sum = 131*sum + uint64(ch)
}
return sum
}
type Command4 interface {
Type2() raftpb.EntryType
Data2() ([]byte, error)
}
func NewCommand4(name string, data []byte, raftMap map[string]uint64) (Command4, error) {
var cmd Command4
switch name {
case "etcd:remove":
cmd = &RemoveCommand{}
case "etcd:join":
cmd = &JoinCommand{}
case "etcd:setClusterConfig":
cmd = &NOPCommand{}
case "etcd:compareAndDelete":
cmd = &CompareAndDeleteCommand{}
case "etcd:compareAndSwap":
cmd = &CompareAndSwapCommand{}
case "etcd:create":
cmd = &CreateCommand{}
case "etcd:delete":
cmd = &DeleteCommand{}
case "etcd:set":
cmd = &SetCommand{}
case "etcd:sync":
cmd = &SyncCommand{}
case "etcd:update":
cmd = &UpdateCommand{}
case "raft:join":
// These are subsumed by etcd:remove and etcd:join; we shouldn't see them.
fallthrough
case "raft:leave":
return nil, fmt.Errorf("found a raft join/leave command; these shouldn't be in an etcd log")
case "raft:nop":
cmd = &NOPCommand{}
default:
return nil, fmt.Errorf("unregistered command type %s", name)
}
// If data for the command was passed in the decode it.
if data != nil {
if err := json.NewDecoder(bytes.NewReader(data)).Decode(cmd); err != nil {
return nil, fmt.Errorf("unable to decode bytes %q: %v", data, err)
}
}
switch name {
case "etcd:join":
c := cmd.(*JoinCommand)
m := generateNodeMember(c.Name, c.RaftURL, c.EtcdURL)
c.memb = *m
if raftMap != nil {
raftMap[c.Name] = uint64(m.ID)
}
case "etcd:remove":
c := cmd.(*RemoveCommand)
if raftMap != nil {
m, ok := raftMap[c.Name]
if !ok {
return nil, fmt.Errorf("removing a node named %s before it joined", c.Name)
}
c.id = m
delete(raftMap, c.Name)
}
}
return cmd, nil
}
type RemoveCommand struct {
Name string `json:"name"`
id uint64
}
func (c *RemoveCommand) Type2() raftpb.EntryType {
return raftpb.EntryConfChange
}
func (c *RemoveCommand) Data2() ([]byte, error) {
req2 := raftpb.ConfChange{
ID: 0,
Type: raftpb.ConfChangeRemoveNode,
NodeID: c.id,
}
return req2.Marshal()
}
type JoinCommand struct {
Name string `json:"name"`
RaftURL string `json:"raftURL"`
EtcdURL string `json:"etcdURL"`
memb member
}
func (c *JoinCommand) Type2() raftpb.EntryType {
return raftpb.EntryConfChange
}
func (c *JoinCommand) Data2() ([]byte, error) {
b, err := json.Marshal(c.memb)
if err != nil {
return nil, err
}
req2 := &raftpb.ConfChange{
ID: 0,
Type: raftpb.ConfChangeAddNode,
NodeID: uint64(c.memb.ID),
Context: b,
}
return req2.Marshal()
}
type SetClusterConfigCommand struct {
Config *struct {
ActiveSize int `json:"activeSize"`
RemoveDelay float64 `json:"removeDelay"`
SyncInterval float64 `json:"syncInterval"`
} `json:"config"`
}
func (c *SetClusterConfigCommand) Type2() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *SetClusterConfigCommand) Data2() ([]byte, error) {
b, err := json.Marshal(c.Config)
if err != nil {
return nil, err
}
req2 := &etcdserverpb.Request{
Method: "PUT",
Path: "/v2/admin/config",
Dir: false,
Val: string(b),
}
return req2.Marshal()
}
type CompareAndDeleteCommand struct {
Key string `json:"key"`
PrevValue string `json:"prevValue"`
PrevIndex uint64 `json:"prevIndex"`
}
func (c *CompareAndDeleteCommand) Type2() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *CompareAndDeleteCommand) Data2() ([]byte, error) {
req2 := &etcdserverpb.Request{
Method: "DELETE",
Path: StorePath(c.Key),
PrevValue: c.PrevValue,
PrevIndex: c.PrevIndex,
}
return req2.Marshal()
}
type CompareAndSwapCommand struct {
Key string `json:"key"`
Value string `json:"value"`
ExpireTime time.Time `json:"expireTime"`
PrevValue string `json:"prevValue"`
PrevIndex uint64 `json:"prevIndex"`
}
func (c *CompareAndSwapCommand) Type2() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *CompareAndSwapCommand) Data2() ([]byte, error) {
req2 := &etcdserverpb.Request{
Method: "PUT",
Path: StorePath(c.Key),
Val: c.Value,
PrevValue: c.PrevValue,
PrevIndex: c.PrevIndex,
Expiration: UnixTimeOrPermanent(c.ExpireTime),
}
return req2.Marshal()
}
type CreateCommand struct {
Key string `json:"key"`
Value string `json:"value"`
ExpireTime time.Time `json:"expireTime"`
Unique bool `json:"unique"`
Dir bool `json:"dir"`
}
func (c *CreateCommand) Type2() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *CreateCommand) Data2() ([]byte, error) {
req2 := &etcdserverpb.Request{
Path: StorePath(c.Key),
Dir: c.Dir,
Val: c.Value,
Expiration: UnixTimeOrPermanent(c.ExpireTime),
}
if c.Unique {
req2.Method = "POST"
} else {
var prevExist = true
req2.Method = "PUT"
req2.PrevExist = &prevExist
}
return req2.Marshal()
}
type DeleteCommand struct {
Key string `json:"key"`
Recursive bool `json:"recursive"`
Dir bool `json:"dir"`
}
func (c *DeleteCommand) Type2() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *DeleteCommand) Data2() ([]byte, error) {
req2 := &etcdserverpb.Request{
Method: "DELETE",
Path: StorePath(c.Key),
Dir: c.Dir,
Recursive: c.Recursive,
}
return req2.Marshal()
}
type SetCommand struct {
Key string `json:"key"`
Value string `json:"value"`
ExpireTime time.Time `json:"expireTime"`
Dir bool `json:"dir"`
}
func (c *SetCommand) Type2() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *SetCommand) Data2() ([]byte, error) {
req2 := &etcdserverpb.Request{
Method: "PUT",
Path: StorePath(c.Key),
Dir: c.Dir,
Val: c.Value,
Expiration: UnixTimeOrPermanent(c.ExpireTime),
}
return req2.Marshal()
}
type UpdateCommand struct {
Key string `json:"key"`
Value string `json:"value"`
ExpireTime time.Time `json:"expireTime"`
}
func (c *UpdateCommand) Type2() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *UpdateCommand) Data2() ([]byte, error) {
exist := true
req2 := &etcdserverpb.Request{
Method: "PUT",
Path: StorePath(c.Key),
Val: c.Value,
PrevExist: &exist,
Expiration: UnixTimeOrPermanent(c.ExpireTime),
}
return req2.Marshal()
}
type SyncCommand struct {
Time time.Time `json:"time"`
}
func (c *SyncCommand) Type2() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *SyncCommand) Data2() ([]byte, error) {
req2 := &etcdserverpb.Request{
Method: "SYNC",
Time: c.Time.UnixNano(),
}
return req2.Marshal()
}
type DefaultJoinCommand struct {
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
}
type DefaultLeaveCommand struct {
Name string `json:"name"`
id uint64
}
type NOPCommand struct{}
//TODO(bcwaldon): Why is CommandName here?
func (c NOPCommand) CommandName() string {
return "raft:nop"
}
func (c *NOPCommand) Type2() raftpb.EntryType {
return raftpb.EntryNormal
}
func (c *NOPCommand) Data2() ([]byte, error) {
return nil, nil
}
func Entries4To2(ents4 []*etcd4pb.LogEntry) ([]raftpb.Entry, error) {
ents4Len := len(ents4)
if ents4Len == 0 {
return nil, nil
}
startIndex := ents4[0].GetIndex()
for i, e := range ents4[1:] {
eIndex := e.GetIndex()
// ensure indexes are monotonically increasing
wantIndex := startIndex + uint64(i+1)
if wantIndex != eIndex {
return nil, fmt.Errorf("skipped log index %d", wantIndex)
}
}
raftMap := make(map[string]uint64)
ents2 := make([]raftpb.Entry, 0)
for i, e := range ents4 {
ent, err := toEntry2(e, raftMap)
if err != nil {
log.Fatalf("Error converting entry %d, %s", i, err)
} else {
ents2 = append(ents2, *ent)
}
}
return ents2, nil
}
func toEntry2(ent4 *etcd4pb.LogEntry, raftMap map[string]uint64) (*raftpb.Entry, error) {
cmd4, err := NewCommand4(ent4.GetCommandName(), ent4.GetCommand(), raftMap)
if err != nil {
return nil, err
}
data, err := cmd4.Data2()
if err != nil {
return nil, err
}
ent2 := raftpb.Entry{
Term: ent4.GetTerm() + termOffset4to2,
Index: ent4.GetIndex(),
Type: cmd4.Type2(),
Data: data,
}
return &ent2, nil
}
func generateNodeMember(name, rafturl, etcdurl string) *member {
pURLs, err := types.NewURLs([]string{rafturl})
if err != nil {
log.Fatalf("Invalid Raft URL %s -- this log could never have worked", rafturl)
}
m := NewMember(name, pURLs, etcdDefaultClusterName)
m.ClientURLs = []string{etcdurl}
return m
}

View File

@ -1,69 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migrate
import (
"fmt"
"net/url"
"reflect"
"testing"
"time"
)
func TestNewCommand(t *testing.T) {
entries, err := DecodeLog4FromFile("fixtures/cmdlog")
if err != nil {
t.Errorf("read log file error: %v", err)
}
zeroTime, err := time.Parse(time.RFC3339, "1969-12-31T16:00:00-08:00")
if err != nil {
t.Errorf("couldn't create time: %v", err)
}
m := NewMember("alice", []url.URL{{Scheme: "http", Host: "127.0.0.1:7001"}}, etcdDefaultClusterName)
m.ClientURLs = []string{"http://127.0.0.1:4001"}
tests := []interface{}{
&JoinCommand{"alice", "http://127.0.0.1:7001", "http://127.0.0.1:4001", *m},
&NOPCommand{},
&NOPCommand{},
&RemoveCommand{"alice", 0xe52ada62956ff923},
&CompareAndDeleteCommand{"foo", "baz", 9},
&CompareAndSwapCommand{"foo", "bar", zeroTime, "baz", 9},
&CreateCommand{"foo", "bar", zeroTime, true, true},
&DeleteCommand{"foo", true, true},
&SetCommand{"foo", "bar", zeroTime, true},
&SyncCommand{zeroTime},
&UpdateCommand{"foo", "bar", zeroTime},
}
raftMap := make(map[string]uint64)
for i, test := range tests {
e := entries[i]
cmd, err := NewCommand4(e.GetCommandName(), e.GetCommand(), raftMap)
if err != nil {
t.Errorf("#%d: %v", i, err)
continue
}
if !reflect.DeepEqual(cmd, test) {
if i == 5 {
fmt.Println(cmd.(*CompareAndSwapCommand).ExpireTime.Location())
}
t.Errorf("#%d: cmd = %+v, want %+v", i, cmd, test)
}
}
}

View File

@ -1,57 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migrate
import (
"crypto/sha1"
"encoding/binary"
"sort"
"github.com/coreos/etcd/pkg/types"
)
type raftAttributes struct {
PeerURLs []string `json:"peerURLs"`
}
type attributes struct {
Name string `json:"name,omitempty"`
ClientURLs []string `json:"clientURLs,omitempty"`
}
type member struct {
ID types.ID `json:"id"`
raftAttributes
attributes
}
func NewMember(name string, peerURLs types.URLs, clusterName string) *member {
m := &member{
raftAttributes: raftAttributes{PeerURLs: peerURLs.StringSlice()},
attributes: attributes{Name: name},
}
var b []byte
sort.Strings(m.PeerURLs)
for _, p := range m.PeerURLs {
b = append(b, []byte(p)...)
}
b = append(b, []byte(clusterName)...)
hash := sha1.Sum(b)
m.ID = types.ID(binary.BigEndian.Uint64(hash[:8]))
return m
}

View File

@ -1,384 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migrate
import (
"encoding/json"
"errors"
"fmt"
"hash/crc32"
"io/ioutil"
"log"
"net/url"
"os"
"path"
"sort"
"strconv"
"strings"
"time"
raftpb "github.com/coreos/etcd/raft/raftpb"
)
type Snapshot4 struct {
State []byte `json:"state"`
LastIndex uint64 `json:"lastIndex"`
LastTerm uint64 `json:"lastTerm"`
Peers []struct {
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
} `json:"peers"`
}
type Store4 struct {
Root *node
CurrentIndex uint64
CurrentVersion int
}
type node struct {
Path string
CreatedIndex uint64
ModifiedIndex uint64
Parent *node `json:"-"` // should not encode this field! avoid circular dependency.
ExpireTime time.Time
ACL string
Value string // for key-value pair
Children map[string]*node // for directory
}
func deepCopyNode(n *node, parent *node) *node {
out := &node{
Path: n.Path,
CreatedIndex: n.CreatedIndex,
ModifiedIndex: n.ModifiedIndex,
Parent: parent,
ExpireTime: n.ExpireTime,
ACL: n.ACL,
Value: n.Value,
Children: make(map[string]*node),
}
for k, v := range n.Children {
out.Children[k] = deepCopyNode(v, out)
}
return out
}
func replacePathNames(n *node, s1, s2 string) {
n.Path = path.Clean(strings.Replace(n.Path, s1, s2, 1))
for _, c := range n.Children {
replacePathNames(c, s1, s2)
}
}
func pullNodesFromEtcd(n *node) map[string]uint64 {
out := make(map[string]uint64)
machines := n.Children["machines"]
for name, c := range machines.Children {
q, err := url.ParseQuery(c.Value)
if err != nil {
log.Fatal("Couldn't parse old query string value")
}
etcdurl := q.Get("etcd")
rafturl := q.Get("raft")
m := generateNodeMember(name, rafturl, etcdurl)
out[m.Name] = uint64(m.ID)
}
return out
}
func fixEtcd(etcdref *node) *node {
n := &node{
Path: "/0",
CreatedIndex: etcdref.CreatedIndex,
ModifiedIndex: etcdref.ModifiedIndex,
ExpireTime: etcdref.ExpireTime,
ACL: etcdref.ACL,
Children: make(map[string]*node),
}
var machines *node
if machineOrig, ok := etcdref.Children["machines"]; ok {
machines = deepCopyNode(machineOrig, n)
}
if machines == nil {
return n
}
n.Children["members"] = &node{
Path: "/0/members",
CreatedIndex: machines.CreatedIndex,
ModifiedIndex: machines.ModifiedIndex,
ExpireTime: machines.ExpireTime,
ACL: machines.ACL,
Children: make(map[string]*node),
Parent: n,
}
for name, c := range machines.Children {
q, err := url.ParseQuery(c.Value)
if err != nil {
log.Fatal("Couldn't parse old query string value")
}
etcdurl := q.Get("etcd")
rafturl := q.Get("raft")
m := generateNodeMember(name, rafturl, etcdurl)
attrBytes, err := json.Marshal(m.attributes)
if err != nil {
log.Fatal("Couldn't marshal attributes")
}
raftBytes, err := json.Marshal(m.raftAttributes)
if err != nil {
log.Fatal("Couldn't marshal raft attributes")
}
newNode := &node{
Path: path.Join("/0/members", m.ID.String()),
CreatedIndex: c.CreatedIndex,
ModifiedIndex: c.ModifiedIndex,
ExpireTime: c.ExpireTime,
ACL: c.ACL,
Children: make(map[string]*node),
Parent: n.Children["members"],
}
attrs := &node{
Path: path.Join("/0/members", m.ID.String(), "attributes"),
CreatedIndex: c.CreatedIndex,
ModifiedIndex: c.ModifiedIndex,
ExpireTime: c.ExpireTime,
ACL: c.ACL,
Value: string(attrBytes),
Parent: newNode,
}
newNode.Children["attributes"] = attrs
raftAttrs := &node{
Path: path.Join("/0/members", m.ID.String(), "raftAttributes"),
CreatedIndex: c.CreatedIndex,
ModifiedIndex: c.ModifiedIndex,
ExpireTime: c.ExpireTime,
ACL: c.ACL,
Value: string(raftBytes),
Parent: newNode,
}
newNode.Children["raftAttributes"] = raftAttrs
n.Children["members"].Children[m.ID.String()] = newNode
}
return n
}
func mangleRoot(n *node) *node {
newRoot := &node{
Path: "/",
CreatedIndex: n.CreatedIndex,
ModifiedIndex: n.ModifiedIndex,
ExpireTime: n.ExpireTime,
ACL: n.ACL,
Children: make(map[string]*node),
}
newRoot.Children["1"] = n
etcd := n.Children["_etcd"]
replacePathNames(n, "/", "/1/")
newZero := fixEtcd(etcd)
newZero.Parent = newRoot
newRoot.Children["0"] = newZero
return newRoot
}
func (s *Snapshot4) GetNodesFromStore() map[string]uint64 {
st := &Store4{}
if err := json.Unmarshal(s.State, st); err != nil {
log.Fatal("Couldn't unmarshal snapshot")
}
etcd := st.Root.Children["_etcd"]
return pullNodesFromEtcd(etcd)
}
func (s *Snapshot4) Snapshot2() *raftpb.Snapshot {
st := &Store4{}
if err := json.Unmarshal(s.State, st); err != nil {
log.Fatal("Couldn't unmarshal snapshot")
}
st.Root = mangleRoot(st.Root)
newState, err := json.Marshal(st)
if err != nil {
log.Fatal("Couldn't re-marshal new snapshot")
}
nodes := s.GetNodesFromStore()
nodeList := make([]uint64, 0)
for _, v := range nodes {
nodeList = append(nodeList, v)
}
snap2 := raftpb.Snapshot{
Data: newState,
Metadata: raftpb.SnapshotMetadata{
Index: s.LastIndex,
Term: s.LastTerm + termOffset4to2,
ConfState: raftpb.ConfState{
Nodes: nodeList,
},
},
}
return &snap2
}
func DecodeLatestSnapshot4FromDir(snapdir string) (*Snapshot4, error) {
fname, err := FindLatestFile(snapdir)
if err != nil {
return nil, err
}
if fname == "" {
return nil, nil
}
snappath := path.Join(snapdir, fname)
log.Printf("Decoding snapshot from %s", snappath)
return DecodeSnapshot4FromFile(snappath)
}
// FindLatestFile identifies the "latest" filename in a given directory
// by sorting all the files and choosing the highest value.
func FindLatestFile(dirpath string) (string, error) {
dir, err := os.OpenFile(dirpath, os.O_RDONLY, 0)
if err != nil {
if os.IsNotExist(err) {
err = nil
}
return "", err
}
defer dir.Close()
fnames, err := dir.Readdirnames(-1)
if err != nil {
return "", err
}
if len(fnames) == 0 {
return "", nil
}
names, err := NewSnapshotFileNames(fnames)
if err != nil {
return "", err
}
return names[len(names)-1].FileName, nil
}
func DecodeSnapshot4FromFile(path string) (*Snapshot4, error) {
// Read snapshot data.
f, err := os.OpenFile(path, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
defer f.Close()
return DecodeSnapshot4(f)
}
func DecodeSnapshot4(f *os.File) (*Snapshot4, error) {
// Verify checksum
var checksum uint32
n, err := fmt.Fscanf(f, "%08x\n", &checksum)
if err != nil {
return nil, err
} else if n != 1 {
return nil, errors.New("miss heading checksum")
}
// Load remaining snapshot contents.
b, err := ioutil.ReadAll(f)
if err != nil {
return nil, err
}
// Generate checksum.
byteChecksum := crc32.ChecksumIEEE(b)
if uint32(checksum) != byteChecksum {
return nil, errors.New("bad checksum")
}
// Decode snapshot.
snapshot := new(Snapshot4)
if err = json.Unmarshal(b, snapshot); err != nil {
return nil, err
}
return snapshot, nil
}
func NewSnapshotFileNames(names []string) ([]SnapshotFileName, error) {
s := make([]SnapshotFileName, 0)
for _, n := range names {
trimmed := strings.TrimSuffix(n, ".ss")
if trimmed == n {
return nil, fmt.Errorf("file %q does not have .ss extension", n)
}
parts := strings.SplitN(trimmed, "_", 2)
if len(parts) != 2 {
return nil, fmt.Errorf("unrecognized file name format %q", n)
}
fn := SnapshotFileName{FileName: n}
var err error
fn.Term, err = strconv.ParseUint(parts[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("unable to parse term from filename %q: %v", n, err)
}
fn.Index, err = strconv.ParseUint(parts[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("unable to parse index from filename %q: %v", n, err)
}
s = append(s, fn)
}
sortable := SnapshotFileNames(s)
sort.Sort(&sortable)
return s, nil
}
type SnapshotFileNames []SnapshotFileName
type SnapshotFileName struct {
FileName string
Term uint64
Index uint64
}
func (n *SnapshotFileNames) Less(i, j int) bool {
iTerm, iIndex := (*n)[i].Term, (*n)[i].Index
jTerm, jIndex := (*n)[j].Term, (*n)[j].Index
return iTerm < jTerm || (iTerm == jTerm && iIndex < jIndex)
}
func (n *SnapshotFileNames) Swap(i, j int) {
(*n)[i], (*n)[j] = (*n)[j], (*n)[i]
}
func (n *SnapshotFileNames) Len() int {
return len([]SnapshotFileName(*n))
}

View File

@ -1,70 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migrate
import (
"bytes"
"encoding/json"
"fmt"
"os"
)
type StandbyInfo4 struct {
Running bool
Cluster []*MachineMessage
SyncInterval float64
}
// MachineMessage represents information about a peer or standby in the registry.
type MachineMessage struct {
Name string `json:"name"`
State string `json:"state"`
ClientURL string `json:"clientURL"`
PeerURL string `json:"peerURL"`
}
func (si *StandbyInfo4) ClientURLs() []string {
var urls []string
for _, m := range si.Cluster {
urls = append(urls, m.ClientURL)
}
return urls
}
func (si *StandbyInfo4) InitialCluster() string {
b := &bytes.Buffer{}
first := true
for _, m := range si.Cluster {
if !first {
fmt.Fprintf(b, ",")
}
first = false
fmt.Fprintf(b, "%s=%s", m.Name, m.PeerURL)
}
return b.String()
}
func DecodeStandbyInfo4FromFile(path string) (*StandbyInfo4, error) {
var info StandbyInfo4
file, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
return nil, err
}
defer file.Close()
if err = json.NewDecoder(file).Decode(&info); err != nil {
return nil, err
}
return &info, nil
}

View File

@ -5,7 +5,7 @@
#
PREFIX="github.com/coreos/etcd/Godeps/_workspace/src"
DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./migrate/etcd4pb ./storage/storagepb"
DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./storage/storagepb"
SHA="64f27bf06efee53589314a6e5a4af34cdd85adf6"

2
test
View File

@ -15,7 +15,7 @@ COVER=${COVER:-"-cover"}
source ./build
# Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt.
TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/auth etcdserver/etcdhttp etcdserver/etcdhttp/httptypes migrate pkg/fileutil pkg/flags pkg/idutil pkg/ioutil pkg/netutil pkg/osutil pkg/pbutil pkg/types pkg/transport pkg/wait proxy raft snap store version wal"
TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/auth etcdserver/etcdhttp etcdserver/etcdhttp/httptypes pkg/fileutil pkg/flags pkg/idutil pkg/ioutil pkg/netutil pkg/osutil pkg/pbutil pkg/types pkg/transport pkg/wait proxy raft snap store version wal"
# TODO: add it to race testing when the issue is resolved
# https://github.com/golang/go/issues/9946
NO_RACE_TESTABLE="rafthttp"

View File

@ -1,48 +0,0 @@
## etcd 0.4.x -> 2.0.0 Data Migration Tool
### Upgrading from 0.4.x
Between 0.4.x and 2.0, the on-disk data formats have changed. In order to allow users to convert to 2.0, a migration tool is provided.
etcd will detect 0.4.x data dir and update the data automatically (while leaving a backup, in case of emergency).
### Data Migration Tips
* Keep the environment variables and etcd instance flags the same, particularly `--name`/`ETCD_NAME`.
* Don't change the cluster configuration. If there's a plan to add or remove machines, it's probably best to arrange for that after the migration, rather than before or at the same time.
### Running the tool
The tool can be run via:
```sh
./go build
./etcd-migrate --data-dir=<PATH TO YOUR DATA>
```
It should autodetect everything and convert the data-dir to be 2.0 compatible. It does not remove the 0.4.x data, and is safe to convert multiple times; the 2.0 data will be overwritten. Recovering the disk space once everything is settled is covered later in the document.
If, however, it complains about autodetecting the name (which can happen, depending on how the cluster was configured), you need to supply the name of this particular node. This is equivalent to the `--name` flag (or `ETCD_NAME` variable) that etcd was run with, which can also be found by accessing the self api, eg:
```sh
curl -L http://127.0.0.1:4001/v2/stats/self
```
Where the `"name"` field is the name of the local machine.
Then, run the migration tool with
```sh
./bin/etcd-migrate --data-dir=<PATH TO YOUR DATA> --name=<NAME>
```
And the tool should migrate successfully. If it still has an error at this time, it's a failure or bug in the tool and it's worth reporting a bug.
### Recovering Disk Space
If the conversion has completed, the entire cluster is running on something 2.0-based, and the disk space is important, the following command will clear 0.4.x data from the data-dir:
```sh
rm -ri snapshot conf log
```
It will ask before every deletion, but these are the 0.4.x files and will not affect the working 2.0 data.

View File

@ -1,37 +0,0 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"flag"
"log"
"github.com/coreos/etcd/migrate"
)
func main() {
from := flag.String("data-dir", "", "etcd v0.4 data-dir")
name := flag.String("name", "", "etcd node name")
flag.Parse()
if *from == "" {
log.Fatal("Must provide -data-dir flag")
}
err := migrate.Migrate4To2(*from, *name)
if err != nil {
log.Fatalf("Failed migrating data-dir: %v", err)
}
}

View File

@ -36,7 +36,6 @@ type DataDirVersion string
const (
DataDirUnknown DataDirVersion = "Unknown WAL"
DataDir0_4 DataDirVersion = "0.4.x"
DataDir2_0 DataDirVersion = "2.0.0"
DataDir2_0Proxy DataDirVersion = "2.0 proxy"
DataDir2_0_1 DataDirVersion = "2.0.1"
@ -62,9 +61,6 @@ func DetectDataDir(dirpath string) (DataDirVersion, error) {
ver, err := DetectDataDir(path.Join(dirpath, "member"))
if ver == DataDir2_0 {
return DataDir2_0_1, nil
} else if ver == DataDir0_4 {
// How in the blazes did it get there?
return DataDirUnknown, nil
}
return ver, err
}
@ -78,12 +74,5 @@ func DetectDataDir(dirpath string) (DataDirVersion, error) {
if nameSet.ContainsAll([]string{"proxy"}) {
return DataDir2_0Proxy, nil
}
if nameSet.ContainsAll([]string{"snapshot", "conf", "log"}) {
return DataDir0_4, nil
}
if nameSet.ContainsAll([]string{"standby_info"}) {
return DataDir0_4, nil
}
return DataDirUnknown, nil
}

View File

@ -29,7 +29,6 @@ func TestDetectDataDir(t *testing.T) {
}{
{[]string{"member/", "member/wal/", "member/wal/1", "member/snap/"}, DataDir2_0_1},
{[]string{"snap/", "wal/", "wal/1"}, DataDir2_0},
{[]string{"snapshot/", "conf", "log"}, DataDir0_4},
{[]string{"weird"}, DataDirUnknown},
{[]string{"snap/", "wal/"}, DataDirUnknown},
}