ctlv3: fix migration

This commit is contained in:
Xiang Li 2016-11-02 19:36:07 -07:00
parent c33d04fb54
commit 2ba42990ec
2 changed files with 47 additions and 10 deletions

View File

@ -89,8 +89,8 @@ func TestCtlV3Migrate(t *testing.T) {
if len(resp.Kvs) != 1 {
t.Fatalf("len(resp.Kvs) expected 1, got %+v", resp.Kvs)
}
if resp.Kvs[0].CreateRevision != 4 {
t.Fatalf("resp.Kvs[0].CreateRevision expected 4, got %d", resp.Kvs[0].CreateRevision)
if resp.Kvs[0].CreateRevision != 7 {
t.Fatalf("resp.Kvs[0].CreateRevision expected 7, got %d", resp.Kvs[0].CreateRevision)
}
}

View File

@ -27,11 +27,14 @@ import (
"github.com/coreos/etcd/client"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/store"
@ -74,18 +77,17 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) {
writer, reader, errc = defaultTransformer()
}
st := rebuildStoreV2()
st, index := rebuildStoreV2()
be := prepareBackend()
defer be.Close()
maxIndexc := make(chan uint64, 1)
go func() {
maxIndexc <- writeStore(writer, st)
writeStore(writer, st)
writer.Close()
}()
readKeys(reader, be)
mvcc.UpdateConsistentIndex(be, <-maxIndexc)
mvcc.UpdateConsistentIndex(be, index)
err := <-errc
if err != nil {
fmt.Println("failed to transform keys")
@ -106,7 +108,10 @@ func prepareBackend() backend.Backend {
return be
}
func rebuildStoreV2() store.Store {
func rebuildStoreV2() (store.Store, uint64) {
var index uint64
cl := membership.NewCluster("")
waldir := migrateWALdir
if len(waldir) == 0 {
waldir = path.Join(migrateDatadir, "member", "wal")
@ -122,6 +127,7 @@ func rebuildStoreV2() store.Store {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
index = snapshot.Metadata.Index
}
w, err := wal.OpenForRead(waldir, walsnap)
@ -143,9 +149,15 @@ func rebuildStoreV2() store.Store {
}
}
applier := etcdserver.NewApplierV2(st, nil)
cl.SetStore(st)
cl.Recover(api.UpdateCapability)
applier := etcdserver.NewApplierV2(st, cl)
for _, ent := range ents {
if ent.Type != raftpb.EntryNormal {
if ent.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, ent.Data)
applyConf(cc, cl)
continue
}
@ -160,9 +172,34 @@ func rebuildStoreV2() store.Store {
applyRequest(req, applier)
}
}
if ent.Index > index {
index = ent.Index
}
}
return st
return st, index
}
func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) {
if err := cl.ValidateConfigurationChange(cc); err != nil {
return
}
switch cc.Type {
case raftpb.ConfChangeAddNode:
m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
panic(err)
}
cl.AddMember(m)
case raftpb.ConfChangeRemoveNode:
cl.RemoveMember(types.ID(cc.NodeID))
case raftpb.ConfChangeUpdateNode:
m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
panic(err)
}
cl.UpdateRaftAttributes(m.ID, m.RaftAttributes)
}
}
func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {