Merge pull request #6794 from xiang90/fix_migration

ctlv3: fix migration
This commit is contained in:
Xiang Li 2016-11-03 08:43:30 -07:00 committed by GitHub
commit c5ac02164d
2 changed files with 47 additions and 10 deletions

View File

@ -89,8 +89,8 @@ func TestCtlV3Migrate(t *testing.T) {
if len(resp.Kvs) != 1 { if len(resp.Kvs) != 1 {
t.Fatalf("len(resp.Kvs) expected 1, got %+v", resp.Kvs) t.Fatalf("len(resp.Kvs) expected 1, got %+v", resp.Kvs)
} }
if resp.Kvs[0].CreateRevision != 4 { if resp.Kvs[0].CreateRevision != 7 {
t.Fatalf("resp.Kvs[0].CreateRevision expected 4, got %d", resp.Kvs[0].CreateRevision) t.Fatalf("resp.Kvs[0].CreateRevision expected 7, got %d", resp.Kvs[0].CreateRevision)
} }
} }

View File

@ -27,11 +27,14 @@ import (
"github.com/coreos/etcd/client" "github.com/coreos/etcd/client"
etcdErr "github.com/coreos/etcd/error" etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/mvcc" "github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb" "github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap" "github.com/coreos/etcd/snap"
"github.com/coreos/etcd/store" "github.com/coreos/etcd/store"
@ -74,18 +77,17 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) {
writer, reader, errc = defaultTransformer() writer, reader, errc = defaultTransformer()
} }
st := rebuildStoreV2() st, index := rebuildStoreV2()
be := prepareBackend() be := prepareBackend()
defer be.Close() defer be.Close()
maxIndexc := make(chan uint64, 1)
go func() { go func() {
maxIndexc <- writeStore(writer, st) writeStore(writer, st)
writer.Close() writer.Close()
}() }()
readKeys(reader, be) readKeys(reader, be)
mvcc.UpdateConsistentIndex(be, <-maxIndexc) mvcc.UpdateConsistentIndex(be, index)
err := <-errc err := <-errc
if err != nil { if err != nil {
fmt.Println("failed to transform keys") fmt.Println("failed to transform keys")
@ -106,7 +108,10 @@ func prepareBackend() backend.Backend {
return be return be
} }
func rebuildStoreV2() store.Store { func rebuildStoreV2() (store.Store, uint64) {
var index uint64
cl := membership.NewCluster("")
waldir := migrateWALdir waldir := migrateWALdir
if len(waldir) == 0 { if len(waldir) == 0 {
waldir = path.Join(migrateDatadir, "member", "wal") waldir = path.Join(migrateDatadir, "member", "wal")
@ -122,6 +127,7 @@ func rebuildStoreV2() store.Store {
var walsnap walpb.Snapshot var walsnap walpb.Snapshot
if snapshot != nil { if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
index = snapshot.Metadata.Index
} }
w, err := wal.OpenForRead(waldir, walsnap) w, err := wal.OpenForRead(waldir, walsnap)
@ -143,9 +149,15 @@ func rebuildStoreV2() store.Store {
} }
} }
applier := etcdserver.NewApplierV2(st, nil) cl.SetStore(st)
cl.Recover(api.UpdateCapability)
applier := etcdserver.NewApplierV2(st, cl)
for _, ent := range ents { for _, ent := range ents {
if ent.Type != raftpb.EntryNormal { if ent.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, ent.Data)
applyConf(cc, cl)
continue continue
} }
@ -160,9 +172,34 @@ func rebuildStoreV2() store.Store {
applyRequest(req, applier) applyRequest(req, applier)
} }
} }
if ent.Index > index {
index = ent.Index
}
} }
return st return st, index
}
func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) {
if err := cl.ValidateConfigurationChange(cc); err != nil {
return
}
switch cc.Type {
case raftpb.ConfChangeAddNode:
m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
panic(err)
}
cl.AddMember(m)
case raftpb.ConfChangeRemoveNode:
cl.RemoveMember(types.ID(cc.NodeID))
case raftpb.ConfChangeUpdateNode:
m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
panic(err)
}
cl.UpdateRaftAttributes(m.ID, m.RaftAttributes)
}
} }
func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) { func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {