mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdctl: add migrate command
Migrate command accepts a datadir and an optional user-provided transformer function that transform v2 keys to v2 keys. Migrate command then builds a v3 backend state based on the existing v2 keys and the output of the transformer function.
This commit is contained in:
parent
dc073e1aa7
commit
6f2e7875aa
350
etcdctl/ctlv3/command/migrate_command.go
Normal file
350
etcdctl/ctlv3/command/migrate_command.go
Normal file
@ -0,0 +1,350 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package command
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/client"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/mvcc"
|
||||
"github.com/coreos/etcd/mvcc/backend"
|
||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||
"github.com/coreos/etcd/pkg/pbutil"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/snap"
|
||||
"github.com/coreos/etcd/store"
|
||||
"github.com/coreos/etcd/wal"
|
||||
"github.com/coreos/etcd/wal/walpb"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var (
|
||||
migrateDatadir string
|
||||
migrateWALdir string
|
||||
migrateTransformer string
|
||||
)
|
||||
|
||||
// NewMigrateCommand returns the cobra command for "migrate".
|
||||
func NewMigrateCommand() *cobra.Command {
|
||||
mc := &cobra.Command{
|
||||
Use: "migrate",
|
||||
Short: "migrate",
|
||||
Run: migrateCommandFunc,
|
||||
}
|
||||
|
||||
mc.Flags().StringVar(&migrateDatadir, "data-dir", "", "Path to the data directory.")
|
||||
mc.Flags().StringVar(&migrateWALdir, "wal-dir", "", "Path to the WAL directory.")
|
||||
mc.Flags().StringVar(&migrateTransformer, "transformer", "", "Path to the user-provided transformer program.")
|
||||
return mc
|
||||
}
|
||||
|
||||
func migrateCommandFunc(cmd *cobra.Command, args []string) {
|
||||
var (
|
||||
writer io.WriteCloser
|
||||
reader io.ReadCloser
|
||||
errc chan error
|
||||
)
|
||||
if migrateTransformer != "" {
|
||||
writer, reader, errc = startTransformer()
|
||||
} else {
|
||||
fmt.Println("using default transformer")
|
||||
writer, reader, errc = defaultTransformer()
|
||||
}
|
||||
|
||||
st := rebuildStoreV2()
|
||||
be := prepareBackend()
|
||||
defer be.Close()
|
||||
|
||||
maxIndexc := make(chan uint64, 1)
|
||||
go func() {
|
||||
maxIndexc <- writeStore(writer, st)
|
||||
writer.Close()
|
||||
}()
|
||||
|
||||
readKeys(reader, be)
|
||||
mvcc.UpdateConsistentIndex(be, <-maxIndexc)
|
||||
err := <-errc
|
||||
if err != nil {
|
||||
fmt.Println("failed to transform keys")
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
|
||||
fmt.Println("finished transforming keys")
|
||||
}
|
||||
|
||||
func prepareBackend() backend.Backend {
|
||||
dbpath := path.Join(migrateDatadir, "member", "snap", "db")
|
||||
be := backend.New(dbpath, time.Second, 10000)
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.UnsafeCreateBucket([]byte("key"))
|
||||
tx.UnsafeCreateBucket([]byte("meta"))
|
||||
tx.Unlock()
|
||||
return be
|
||||
}
|
||||
|
||||
func rebuildStoreV2() store.Store {
|
||||
waldir := migrateWALdir
|
||||
if len(waldir) == 0 {
|
||||
waldir = path.Join(migrateDatadir, "member", "wal")
|
||||
}
|
||||
snapdir := path.Join(migrateDatadir, "member", "snap")
|
||||
|
||||
ss := snap.New(snapdir)
|
||||
snapshot, err := ss.Load()
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
|
||||
var walsnap walpb.Snapshot
|
||||
if snapshot != nil {
|
||||
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||
}
|
||||
|
||||
w, err := wal.OpenForRead(waldir, walsnap)
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
_, _, ents, err := w.ReadAll()
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
|
||||
st := store.New()
|
||||
if snapshot != nil {
|
||||
err := st.Recovery(snapshot.Data)
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
}
|
||||
|
||||
applier := etcdserver.NewApplierV2(st, nil)
|
||||
for _, ent := range ents {
|
||||
if ent.Type != raftpb.EntryNormal {
|
||||
continue
|
||||
}
|
||||
|
||||
var raftReq pb.InternalRaftRequest
|
||||
if !pbutil.MaybeUnmarshal(&raftReq, ent.Data) { // backward compatible
|
||||
var r pb.Request
|
||||
pbutil.MustUnmarshal(&r, ent.Data)
|
||||
applyRequest(&r, applier)
|
||||
} else {
|
||||
if raftReq.V2 != nil {
|
||||
req := raftReq.V2
|
||||
applyRequest(req, applier)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return st
|
||||
}
|
||||
|
||||
func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {
|
||||
toTTLOptions(r)
|
||||
switch r.Method {
|
||||
case "POST":
|
||||
applyV2.Post(r)
|
||||
case "PUT":
|
||||
applyV2.Put(r)
|
||||
case "DELETE":
|
||||
applyV2.Delete(r)
|
||||
case "QGET":
|
||||
applyV2.QGet(r)
|
||||
case "SYNC":
|
||||
applyV2.Sync(r)
|
||||
default:
|
||||
panic("unknown command")
|
||||
}
|
||||
}
|
||||
|
||||
func toTTLOptions(r *pb.Request) store.TTLOptionSet {
|
||||
refresh, _ := pbutil.GetBool(r.Refresh)
|
||||
ttlOptions := store.TTLOptionSet{Refresh: refresh}
|
||||
if r.Expiration != 0 {
|
||||
ttlOptions.ExpireTime = time.Unix(0, r.Expiration)
|
||||
}
|
||||
return ttlOptions
|
||||
}
|
||||
|
||||
func writeStore(w io.Writer, st store.Store) uint64 {
|
||||
all, err := st.Get("/1", true, true)
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
return writeKeys(w, all.Node)
|
||||
}
|
||||
|
||||
func writeKeys(w io.Writer, n *store.NodeExtern) uint64 {
|
||||
maxIndex := n.ModifiedIndex
|
||||
|
||||
nodes := n.Nodes
|
||||
// remove store v2 bucket prefix
|
||||
n.Key = n.Key[2:]
|
||||
if n.Key == "" {
|
||||
n.Key = "/"
|
||||
}
|
||||
if n.Dir {
|
||||
n.Nodes = nil
|
||||
}
|
||||
b, err := json.Marshal(n)
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
fmt.Fprintf(w, string(b))
|
||||
for _, nn := range nodes {
|
||||
max := writeKeys(w, nn)
|
||||
if max > maxIndex {
|
||||
maxIndex = max
|
||||
}
|
||||
}
|
||||
return maxIndex
|
||||
}
|
||||
|
||||
func readKeys(r io.Reader, be backend.Backend) error {
|
||||
for {
|
||||
length64, err := readInt64(r)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
buf := make([]byte, int(length64))
|
||||
if _, err = io.ReadFull(r, buf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var kv mvccpb.KeyValue
|
||||
err = proto.Unmarshal(buf, &kv)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mvcc.WriteKV(be, kv)
|
||||
}
|
||||
}
|
||||
|
||||
func readInt64(r io.Reader) (int64, error) {
|
||||
var n int64
|
||||
err := binary.Read(r, binary.LittleEndian, &n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func startTransformer() (io.WriteCloser, io.ReadCloser, chan error) {
|
||||
cmd := exec.Command(migrateTransformer)
|
||||
cmd.Stderr = os.Stderr
|
||||
|
||||
writer, err := cmd.StdinPipe()
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
|
||||
reader, rerr := cmd.StdoutPipe()
|
||||
if rerr != nil {
|
||||
ExitWithError(ExitError, rerr)
|
||||
}
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
|
||||
errc := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
errc <- cmd.Wait()
|
||||
}()
|
||||
|
||||
return writer, reader, errc
|
||||
}
|
||||
|
||||
func defaultTransformer() (io.WriteCloser, io.ReadCloser, chan error) {
|
||||
// transformer decodes v2 keys from sr
|
||||
sr, sw := io.Pipe()
|
||||
// transformer encodes v3 keys into dw
|
||||
dr, dw := io.Pipe()
|
||||
|
||||
decoder := json.NewDecoder(sr)
|
||||
|
||||
errc := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
sr.Close()
|
||||
dw.Close()
|
||||
}()
|
||||
|
||||
for decoder.More() {
|
||||
node := &client.Node{}
|
||||
if err := decoder.Decode(node); err != nil {
|
||||
errc <- err
|
||||
return
|
||||
}
|
||||
|
||||
kv := transform(node)
|
||||
if kv == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
data, err := proto.Marshal(kv)
|
||||
if err != nil {
|
||||
errc <- err
|
||||
return
|
||||
}
|
||||
buf := make([]byte, 8)
|
||||
binary.LittleEndian.PutUint64(buf, uint64(len(data)))
|
||||
if _, err := dw.Write(buf); err != nil {
|
||||
errc <- err
|
||||
return
|
||||
}
|
||||
if _, err := dw.Write(data); err != nil {
|
||||
errc <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
errc <- nil
|
||||
}()
|
||||
|
||||
return sw, dr, errc
|
||||
}
|
||||
|
||||
func transform(n *client.Node) *mvccpb.KeyValue {
|
||||
const unKnownVersion = 1
|
||||
if n.Dir {
|
||||
return nil
|
||||
}
|
||||
kv := &mvccpb.KeyValue{
|
||||
Key: []byte(n.Key),
|
||||
Value: []byte(n.Value),
|
||||
CreateRevision: int64(n.CreatedIndex),
|
||||
ModRevision: int64(n.ModifiedIndex),
|
||||
Version: unKnownVersion,
|
||||
}
|
||||
return kv
|
||||
}
|
@ -74,6 +74,7 @@ func init() {
|
||||
command.NewMemberCommand(),
|
||||
command.NewSnapshotCommand(),
|
||||
command.NewMakeMirrorCommand(),
|
||||
command.NewMigrateCommand(),
|
||||
command.NewLockCommand(),
|
||||
command.NewAuthCommand(),
|
||||
command.NewElectCommand(),
|
||||
|
@ -26,8 +26,8 @@ import (
|
||||
"github.com/coreos/go-semver/semver"
|
||||
)
|
||||
|
||||
// applierV2 is the interface for processing V2 raft messages
|
||||
type applierV2 interface {
|
||||
// ApplierV2 is the interface for processing V2 raft messages
|
||||
type ApplierV2 interface {
|
||||
Delete(r *pb.Request) Response
|
||||
Post(r *pb.Request) Response
|
||||
Put(r *pb.Request) Response
|
||||
@ -35,19 +35,26 @@ type applierV2 interface {
|
||||
Sync(r *pb.Request) Response
|
||||
}
|
||||
|
||||
type applierV2store struct{ s *EtcdServer }
|
||||
func NewApplierV2(s store.Store, c *membership.RaftCluster) ApplierV2 {
|
||||
return &applierV2store{store: s, cluster: c}
|
||||
}
|
||||
|
||||
type applierV2store struct {
|
||||
store store.Store
|
||||
cluster *membership.RaftCluster
|
||||
}
|
||||
|
||||
func (a *applierV2store) Delete(r *pb.Request) Response {
|
||||
switch {
|
||||
case r.PrevIndex > 0 || r.PrevValue != "":
|
||||
return toResponse(a.s.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
|
||||
return toResponse(a.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
|
||||
default:
|
||||
return toResponse(a.s.store.Delete(r.Path, r.Dir, r.Recursive))
|
||||
return toResponse(a.store.Delete(r.Path, r.Dir, r.Recursive))
|
||||
}
|
||||
}
|
||||
|
||||
func (a *applierV2store) Post(r *pb.Request) Response {
|
||||
return toResponse(a.s.store.Create(r.Path, r.Dir, r.Val, true, toTTLOptions(r)))
|
||||
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, toTTLOptions(r)))
|
||||
}
|
||||
|
||||
func (a *applierV2store) Put(r *pb.Request) Response {
|
||||
@ -57,14 +64,14 @@ func (a *applierV2store) Put(r *pb.Request) Response {
|
||||
case existsSet:
|
||||
if exists {
|
||||
if r.PrevIndex == 0 && r.PrevValue == "" {
|
||||
return toResponse(a.s.store.Update(r.Path, r.Val, ttlOptions))
|
||||
return toResponse(a.store.Update(r.Path, r.Val, ttlOptions))
|
||||
} else {
|
||||
return toResponse(a.s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
|
||||
return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
|
||||
}
|
||||
}
|
||||
return toResponse(a.s.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions))
|
||||
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions))
|
||||
case r.PrevIndex > 0 || r.PrevValue != "":
|
||||
return toResponse(a.s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
|
||||
return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
|
||||
default:
|
||||
if storeMemberAttributeRegexp.MatchString(r.Path) {
|
||||
id := membership.MustParseMemberIDFromKey(path.Dir(r.Path))
|
||||
@ -72,25 +79,29 @@ func (a *applierV2store) Put(r *pb.Request) Response {
|
||||
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
|
||||
plog.Panicf("unmarshal %s should never fail: %v", r.Val, err)
|
||||
}
|
||||
a.s.cluster.UpdateAttributes(id, attr)
|
||||
if a.cluster != nil {
|
||||
a.cluster.UpdateAttributes(id, attr)
|
||||
}
|
||||
// return an empty response since there is no consumer.
|
||||
return Response{}
|
||||
}
|
||||
if r.Path == membership.StoreClusterVersionKey() {
|
||||
a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
|
||||
if a.cluster != nil {
|
||||
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
|
||||
}
|
||||
// return an empty response since there is no consumer.
|
||||
return Response{}
|
||||
}
|
||||
return toResponse(a.s.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
|
||||
return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
|
||||
}
|
||||
}
|
||||
|
||||
func (a *applierV2store) QGet(r *pb.Request) Response {
|
||||
return toResponse(a.s.store.Get(r.Path, r.Recursive, r.Sorted))
|
||||
return toResponse(a.store.Get(r.Path, r.Recursive, r.Sorted))
|
||||
}
|
||||
|
||||
func (a *applierV2store) Sync(r *pb.Request) Response {
|
||||
a.s.store.DeleteExpiredKeys(time.Unix(0, r.Time))
|
||||
a.store.DeleteExpiredKeys(time.Unix(0, r.Time))
|
||||
return Response{}
|
||||
}
|
||||
|
||||
|
@ -180,7 +180,7 @@ type EtcdServer struct {
|
||||
|
||||
store store.Store
|
||||
|
||||
applyV2 applierV2
|
||||
applyV2 ApplierV2
|
||||
|
||||
applyV3 applierV3
|
||||
kv mvcc.ConsistentWatchableKV
|
||||
@ -391,7 +391,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
|
||||
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
|
||||
}
|
||||
|
||||
srv.applyV2 = &applierV2store{srv}
|
||||
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
|
||||
|
||||
srv.be = be
|
||||
srv.lessor = lease.NewLessor(srv.be)
|
||||
|
@ -179,7 +179,7 @@ func TestApplyRepeat(t *testing.T) {
|
||||
cluster: cl,
|
||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||
}
|
||||
s.applyV2 = &applierV2store{s}
|
||||
s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}
|
||||
s.start()
|
||||
req := &pb.Request{Method: "QGET", ID: uint64(1)}
|
||||
ents := []raftpb.Entry{{Index: 1, Data: pbutil.MustMarshal(req)}}
|
||||
@ -445,7 +445,7 @@ func TestApplyRequest(t *testing.T) {
|
||||
for i, tt := range tests {
|
||||
st := mockstore.NewRecorder()
|
||||
srv := &EtcdServer{store: st}
|
||||
srv.applyV2 = &applierV2store{srv}
|
||||
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
|
||||
resp := srv.applyV2Request(&tt.req)
|
||||
|
||||
if !reflect.DeepEqual(resp, tt.wresp) {
|
||||
@ -464,7 +464,7 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
|
||||
store: mockstore.NewRecorder(),
|
||||
cluster: cl,
|
||||
}
|
||||
srv.applyV2 = &applierV2store{srv}
|
||||
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
|
||||
|
||||
req := pb.Request{
|
||||
Method: "PUT",
|
||||
@ -639,7 +639,7 @@ func TestDoProposal(t *testing.T) {
|
||||
store: st,
|
||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||
}
|
||||
srv.applyV2 = &applierV2store{srv}
|
||||
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
|
||||
srv.start()
|
||||
resp, err := srv.Do(context.Background(), tt)
|
||||
srv.Stop()
|
||||
@ -666,7 +666,7 @@ func TestDoProposalCancelled(t *testing.T) {
|
||||
w: wt,
|
||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||
}
|
||||
srv.applyV2 = &applierV2store{srv}
|
||||
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
@ -688,7 +688,7 @@ func TestDoProposalTimeout(t *testing.T) {
|
||||
w: mockwait.NewNop(),
|
||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||
}
|
||||
srv.applyV2 = &applierV2store{srv}
|
||||
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), 0)
|
||||
_, err := srv.Do(ctx, pb.Request{Method: "PUT"})
|
||||
@ -704,7 +704,7 @@ func TestDoProposalStopped(t *testing.T) {
|
||||
w: mockwait.NewNop(),
|
||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||
}
|
||||
srv.applyV2 = &applierV2store{srv}
|
||||
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
|
||||
|
||||
srv.done = make(chan struct{})
|
||||
close(srv.done)
|
||||
@ -721,7 +721,7 @@ func TestSync(t *testing.T) {
|
||||
r: raftNode{Node: n},
|
||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||
}
|
||||
srv.applyV2 = &applierV2store{srv}
|
||||
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
|
||||
|
||||
// check that sync is non-blocking
|
||||
done := make(chan struct{})
|
||||
@ -761,7 +761,7 @@ func TestSyncTimeout(t *testing.T) {
|
||||
r: raftNode{Node: n},
|
||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||
}
|
||||
srv.applyV2 = &applierV2store{srv}
|
||||
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
|
||||
|
||||
// check that sync is non-blocking
|
||||
done := make(chan struct{})
|
||||
@ -900,7 +900,7 @@ func TestTriggerSnap(t *testing.T) {
|
||||
store: st,
|
||||
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
||||
}
|
||||
srv.applyV2 = &applierV2store{srv}
|
||||
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
|
||||
|
||||
srv.kv = mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex)
|
||||
srv.be = be
|
||||
@ -968,7 +968,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
|
||||
cluster: cl,
|
||||
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
|
||||
}
|
||||
s.applyV2 = &applierV2store{s}
|
||||
s.applyV2 = &applierV2store{store: s.store, cluster: s.cluster}
|
||||
|
||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||
defer func() {
|
||||
|
57
mvcc/util.go
Normal file
57
mvcc/util.go
Normal file
@ -0,0 +1,57 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package mvcc
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"log"
|
||||
|
||||
"github.com/coreos/etcd/mvcc/backend"
|
||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||
)
|
||||
|
||||
func UpdateConsistentIndex(be backend.Backend, index uint64) {
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
|
||||
var oldi uint64
|
||||
_, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
|
||||
if len(vs) != 0 {
|
||||
oldi = binary.BigEndian.Uint64(vs[0])
|
||||
}
|
||||
|
||||
if index <= oldi {
|
||||
return
|
||||
}
|
||||
|
||||
bs := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(bs, index)
|
||||
tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
|
||||
}
|
||||
|
||||
func WriteKV(be backend.Backend, kv mvccpb.KeyValue) {
|
||||
ibytes := newRevBytes()
|
||||
revToBytes(revision{main: kv.ModRevision}, ibytes)
|
||||
|
||||
d, err := kv.Marshal()
|
||||
if err != nil {
|
||||
log.Fatalf("mvcc: cannot marshal event: %v", err)
|
||||
}
|
||||
|
||||
be.BatchTx().Lock()
|
||||
be.BatchTx().UnsafePut(keyBucketName, ibytes, d)
|
||||
be.BatchTx().Unlock()
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user