Split etcdctl into etcdctl (public API access) & etcdutl (direct surgery on files)

Motivation is as follows:

  - etcdctl we only depend on clientv3 APIs, no dependencies of bolt, backend, mvcc, file-layout
  - etcdctl can be officially supported across wide range of versions, while etcdutl is pretty specific to file format at particular version.
it's step towards desired modules layout, documented in: https://etcd.io/docs/next/dev-internal/modules/
This commit is contained in:
Piotr Tabor
2021-05-14 13:50:31 +02:00
parent 1675101293
commit c09aca1ba4
44 changed files with 1788 additions and 963 deletions

View File

@@ -17,12 +17,10 @@ package command
import (
"fmt"
"os"
"path/filepath"
"time"
"github.com/spf13/cobra"
"go.etcd.io/etcd/etcdutl/v3/etcdutl"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/server/v3/mvcc/backend"
)
var (
@@ -43,12 +41,11 @@ func NewDefragCommand() *cobra.Command {
func defragCommandFunc(cmd *cobra.Command, args []string) {
if len(defragDataDir) > 0 {
err := defragData(defragDataDir)
fmt.Fprintf(os.Stderr, "Use `etcdutl defrag` instead. The --data-dir is going to be decomissioned in v3.6.\n\n")
err := etcdutl.DefragData(defragDataDir)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to defragment etcd data[%s] (%v)\n", defragDataDir, err)
os.Exit(cobrautl.ExitError)
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
return
}
failures := 0
@@ -69,23 +66,3 @@ func defragCommandFunc(cmd *cobra.Command, args []string) {
os.Exit(cobrautl.ExitError)
}
}
func defragData(dataDir string) error {
var be backend.Backend
bch := make(chan struct{})
dbDir := filepath.Join(dataDir, "member", "snap", "db")
go func() {
defer close(bch)
be = backend.NewDefaultBackend(dbDir)
}()
select {
case <-bch:
case <-time.After(time.Second):
fmt.Fprintf(os.Stderr, "waiting for etcd to close and release its lock on %q. "+
"To defrag a running etcd instance, omit --data-dir.\n", dbDir)
<-bch
}
return be.Defrag()
}

View File

@@ -1,407 +0,0 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package command
import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"time"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/client/v2"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/mvcc"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/wal"
"go.etcd.io/etcd/server/v3/wal/walpb"
"github.com/gogo/protobuf/proto"
"github.com/spf13/cobra"
"go.uber.org/zap"
)
var (
migrateExcludeTTLKey bool
migrateDatadir string
migrateWALdir string
migrateTransformer string
)
// NewMigrateCommand returns the cobra command for "migrate".
func NewMigrateCommand() *cobra.Command {
mc := &cobra.Command{
Use: "migrate",
Short: "Migrates keys in a v2 store to a mvcc store",
Run: migrateCommandFunc,
}
mc.Flags().BoolVar(&migrateExcludeTTLKey, "no-ttl", false, "Do not convert TTL keys")
mc.Flags().StringVar(&migrateDatadir, "data-dir", "", "Path to the data directory")
mc.Flags().StringVar(&migrateWALdir, "wal-dir", "", "Path to the WAL directory")
mc.Flags().StringVar(&migrateTransformer, "transformer", "", "Path to the user-provided transformer program")
return mc
}
func migrateCommandFunc(cmd *cobra.Command, args []string) {
var (
writer io.WriteCloser
reader io.ReadCloser
errc chan error
)
if migrateTransformer != "" {
writer, reader, errc = startTransformer()
} else {
fmt.Println("using default transformer")
writer, reader, errc = defaultTransformer()
}
st, index, term := rebuildStoreV2()
be := prepareBackend()
defer be.Close()
go func() {
writeStore(writer, st)
writer.Close()
}()
readKeys(reader, be)
cindex.UpdateConsistentIndex(be.BatchTx(), index, term, true)
err := <-errc
if err != nil {
fmt.Println("failed to transform keys")
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
fmt.Println("finished transforming keys")
}
func prepareBackend() backend.Backend {
var be backend.Backend
bch := make(chan struct{})
dbpath := filepath.Join(migrateDatadir, "member", "snap", "db")
go func() {
defer close(bch)
be = backend.NewDefaultBackend(dbpath)
}()
select {
case <-bch:
case <-time.After(time.Second):
fmt.Fprintf(os.Stderr, "waiting for etcd to close and release its lock on %q\n", dbpath)
<-bch
}
tx := be.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket([]byte("key"))
tx.UnsafeCreateBucket([]byte("meta"))
tx.Unlock()
return be
}
func rebuildStoreV2() (st v2store.Store, index uint64, term uint64) {
cl := membership.NewCluster(zap.NewExample())
waldir := migrateWALdir
if len(waldir) == 0 {
waldir = filepath.Join(migrateDatadir, "member", "wal")
}
snapdir := filepath.Join(migrateDatadir, "member", "snap")
ss := snap.New(zap.NewExample(), snapdir)
snapshot, err := ss.Load()
if err != nil && err != snap.ErrNoSnapshot {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
index = snapshot.Metadata.Index
term = snapshot.Metadata.Term
}
w, err := wal.OpenForRead(zap.NewExample(), waldir, walsnap)
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
defer w.Close()
_, _, ents, err := w.ReadAll()
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
st = v2store.New()
if snapshot != nil {
err := st.Recovery(snapshot.Data)
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
}
cl.SetStore(st)
cl.Recover(api.UpdateCapability)
applier := etcdserver.NewApplierV2(zap.NewExample(), st, cl)
for _, ent := range ents {
if ent.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, ent.Data)
applyConf(cc, cl)
continue
}
var raftReq pb.InternalRaftRequest
if !pbutil.MaybeUnmarshal(&raftReq, ent.Data) { // backward compatible
var r pb.Request
pbutil.MustUnmarshal(&r, ent.Data)
applyRequest(&r, applier)
} else {
if raftReq.V2 != nil {
req := raftReq.V2
applyRequest(req, applier)
}
}
if ent.Index >= index {
index = ent.Index
term = ent.Term
}
}
return st, index, term
}
func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) {
if err := cl.ValidateConfigurationChange(cc); err != nil {
return
}
switch cc.Type {
case raftpb.ConfChangeAddNode:
m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
panic(err)
}
cl.AddMember(m, true)
case raftpb.ConfChangeRemoveNode:
cl.RemoveMember(types.ID(cc.NodeID), true)
case raftpb.ConfChangeUpdateNode:
m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
panic(err)
}
cl.UpdateRaftAttributes(m.ID, m.RaftAttributes, true)
}
}
func applyRequest(req *pb.Request, applyV2 etcdserver.ApplierV2) {
r := (*etcdserver.RequestV2)(req)
r.TTLOptions()
switch r.Method {
case "POST":
applyV2.Post(r)
case "PUT":
applyV2.Put(r)
case "DELETE":
applyV2.Delete(r)
case "QGET":
applyV2.QGet(r)
case "SYNC":
applyV2.Sync(r)
default:
panic("unknown command")
}
}
func writeStore(w io.Writer, st v2store.Store) uint64 {
all, err := st.Get("/1", true, true)
if err != nil {
if eerr, ok := err.(*v2error.Error); ok && eerr.ErrorCode == v2error.EcodeKeyNotFound {
fmt.Println("no v2 keys to migrate")
os.Exit(0)
}
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
return writeKeys(w, all.Node)
}
func writeKeys(w io.Writer, n *v2store.NodeExtern) uint64 {
maxIndex := n.ModifiedIndex
nodes := n.Nodes
// remove store v2 bucket prefix
n.Key = n.Key[2:]
if n.Key == "" {
n.Key = "/"
}
if n.Dir {
n.Nodes = nil
}
if !migrateExcludeTTLKey || n.TTL == 0 {
b, err := json.Marshal(n)
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
fmt.Fprint(w, string(b))
}
for _, nn := range nodes {
max := writeKeys(w, nn)
if max > maxIndex {
maxIndex = max
}
}
return maxIndex
}
func readKeys(r io.Reader, be backend.Backend) error {
for {
length64, err := readInt64(r)
if err != nil {
if err == io.EOF {
return nil
}
return err
}
buf := make([]byte, int(length64))
if _, err = io.ReadFull(r, buf); err != nil {
return err
}
var kv mvccpb.KeyValue
err = proto.Unmarshal(buf, &kv)
if err != nil {
return err
}
mvcc.WriteKV(be, kv)
}
}
func readInt64(r io.Reader) (int64, error) {
var n int64
err := binary.Read(r, binary.LittleEndian, &n)
return n, err
}
func startTransformer() (io.WriteCloser, io.ReadCloser, chan error) {
cmd := exec.Command(migrateTransformer)
cmd.Stderr = os.Stderr
writer, err := cmd.StdinPipe()
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
reader, rerr := cmd.StdoutPipe()
if rerr != nil {
cobrautl.ExitWithError(cobrautl.ExitError, rerr)
}
if err := cmd.Start(); err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
errc := make(chan error, 1)
go func() {
errc <- cmd.Wait()
}()
return writer, reader, errc
}
func defaultTransformer() (io.WriteCloser, io.ReadCloser, chan error) {
// transformer decodes v2 keys from sr
sr, sw := io.Pipe()
// transformer encodes v3 keys into dw
dr, dw := io.Pipe()
decoder := json.NewDecoder(sr)
errc := make(chan error, 1)
go func() {
defer func() {
sr.Close()
dw.Close()
}()
for decoder.More() {
node := &client.Node{}
if err := decoder.Decode(node); err != nil {
errc <- err
return
}
kv := transform(node)
if kv == nil {
continue
}
data, err := proto.Marshal(kv)
if err != nil {
errc <- err
return
}
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, uint64(len(data)))
if _, err := dw.Write(buf); err != nil {
errc <- err
return
}
if _, err := dw.Write(data); err != nil {
errc <- err
return
}
}
errc <- nil
}()
return sw, dr, errc
}
func transform(n *client.Node) *mvccpb.KeyValue {
const unKnownVersion = 1
if n.Dir {
return nil
}
kv := &mvccpb.KeyValue{
Key: []byte(n.Key),
Value: []byte(n.Value),
CreateRevision: int64(n.CreatedIndex),
ModRevision: int64(n.ModifiedIndex),
Version: unKnownVersion,
}
return kv
}

View File

@@ -21,7 +21,6 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
v3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/etcdctl/v3/snapshot"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"github.com/dustin/go-humanize"
@@ -52,7 +51,6 @@ type printer interface {
MoveLeader(leader, target uint64, r v3.MoveLeaderResponse)
Alarm(v3.AlarmResponse)
DBStatus(snapshot.Status)
RoleAdd(role string, r v3.AuthRoleAddResponse)
RoleGet(role string, r v3.AuthRoleGetResponse)
@@ -160,7 +158,6 @@ func newPrinterUnsupported(n string) printer {
func (p *printerUnsupported) EndpointHealth([]epHealth) { p.p(nil) }
func (p *printerUnsupported) EndpointStatus([]epStatus) { p.p(nil) }
func (p *printerUnsupported) EndpointHashKV([]epHashKV) { p.p(nil) }
func (p *printerUnsupported) DBStatus(snapshot.Status) { p.p(nil) }
func (p *printerUnsupported) MoveLeader(leader, target uint64, r v3.MoveLeaderResponse) { p.p(nil) }
@@ -230,14 +227,3 @@ func makeEndpointHashKVTable(hashList []epHashKV) (hdr []string, rows [][]string
}
return hdr, rows
}
func makeDBStatusTable(ds snapshot.Status) (hdr []string, rows [][]string) {
hdr = []string{"hash", "revision", "total keys", "total size"}
rows = append(rows, []string{
fmt.Sprintf("%x", ds.Hash),
fmt.Sprint(ds.Revision),
fmt.Sprint(ds.TotalKey),
humanize.Bytes(uint64(ds.TotalSize)),
})
return hdr, rows
}

View File

@@ -20,7 +20,6 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
spb "go.etcd.io/etcd/api/v3/mvccpb"
v3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/etcdctl/v3/snapshot"
)
type fieldsPrinter struct{ printer }
@@ -186,13 +185,6 @@ func (p *fieldsPrinter) Alarm(r v3.AlarmResponse) {
}
}
func (p *fieldsPrinter) DBStatus(r snapshot.Status) {
fmt.Println(`"Hash" :`, r.Hash)
fmt.Println(`"Revision" :`, r.Revision)
fmt.Println(`"Keys" :`, r.TotalKey)
fmt.Println(`"Size" :`, r.TotalSize)
}
func (p *fieldsPrinter) RoleAdd(role string, r v3.AuthRoleAddResponse) { p.hdr(r.Header) }
func (p *fieldsPrinter) RoleGet(role string, r v3.AuthRoleGetResponse) {
p.hdr(r.Header)

View File

@@ -22,7 +22,6 @@ import (
"strconv"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/etcdctl/v3/snapshot"
)
type jsonPrinter struct {
@@ -40,7 +39,6 @@ func newJSONPrinter(isHex bool) printer {
func (p *jsonPrinter) EndpointHealth(r []epHealth) { printJSON(r) }
func (p *jsonPrinter) EndpointStatus(r []epStatus) { printJSON(r) }
func (p *jsonPrinter) EndpointHashKV(r []epHashKV) { printJSON(r) }
func (p *jsonPrinter) DBStatus(r snapshot.Status) { printJSON(r) }
func (p *jsonPrinter) MemberList(r clientv3.MemberListResponse) {
if p.isHex {

View File

@@ -22,7 +22,6 @@ import (
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/types"
v3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/etcdctl/v3/snapshot"
)
type simplePrinter struct {
@@ -171,13 +170,6 @@ func (s *simplePrinter) EndpointHashKV(hashList []epHashKV) {
}
}
func (s *simplePrinter) DBStatus(ds snapshot.Status) {
_, rows := makeDBStatusTable(ds)
for _, row := range rows {
fmt.Println(strings.Join(row, ", "))
}
}
func (s *simplePrinter) MoveLeader(leader, target uint64, r v3.MoveLeaderResponse) {
fmt.Printf("Leadership transferred from %s to %s\n", types.ID(leader), types.ID(target))
}

View File

@@ -18,7 +18,6 @@ import (
"os"
v3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/etcdctl/v3/snapshot"
"github.com/olekukonko/tablewriter"
)
@@ -65,13 +64,3 @@ func (tp *tablePrinter) EndpointHashKV(r []epHashKV) {
table.SetAlignment(tablewriter.ALIGN_RIGHT)
table.Render()
}
func (tp *tablePrinter) DBStatus(r snapshot.Status) {
hdr, rows := makeDBStatusTable(r)
table := tablewriter.NewWriter(os.Stdout)
table.SetHeader(hdr)
for _, row := range rows {
table.Append(row)
}
table.SetAlignment(tablewriter.ALIGN_RIGHT)
table.Render()
}

View File

@@ -17,13 +17,12 @@ package command
import (
"context"
"fmt"
"path/filepath"
"strings"
"go.etcd.io/etcd/etcdctl/v3/snapshot"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"os"
"github.com/spf13/cobra"
snapshot "go.etcd.io/etcd/client/v3/snapshot"
"go.etcd.io/etcd/etcdutl/v3/etcdutl"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.uber.org/zap"
)
@@ -65,9 +64,11 @@ func NewSnapshotSaveCommand() *cobra.Command {
func newSnapshotStatusCommand() *cobra.Command {
return &cobra.Command{
Use: "status <filename>",
Short: "Gets backend snapshot status of a given file",
Short: "[deprecated] Gets backend snapshot status of a given file",
Long: `When --write-out is set to simple, this command prints out comma-separated status lists for each endpoint.
The items in the lists are hash, revision, total keys, total size.
Moved to 'etcdctl snapshot status ...'
`,
Run: snapshotStatusCommandFunc,
}
@@ -78,6 +79,7 @@ func NewSnapshotRestoreCommand() *cobra.Command {
Use: "restore <filename> [options]",
Short: "Restores an etcd member snapshot to an etcd directory",
Run: snapshotRestoreCommandFunc,
Long: "Moved to `etcdctl snapshot restore ...`\n",
}
cmd.Flags().StringVar(&restoreDataDir, "data-dir", "", "Path to the data directory")
cmd.Flags().StringVar(&restoreWalDir, "wal-dir", "", "Path to the WAL directory (use --data-dir if none given)")
@@ -100,7 +102,6 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
sp := snapshot.NewV3(lg)
cfg := mustClientCfgFromCmd(cmd)
// if user does not specify "--command-timeout" flag, there will be no timeout for snapshot save command
@@ -111,65 +112,21 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
defer cancel()
path := args[0]
if err := sp.Save(ctx, *cfg, path); err != nil {
if err := snapshot.Save(ctx, lg, *cfg, path); err != nil {
cobrautl.ExitWithError(cobrautl.ExitInterrupted, err)
}
fmt.Printf("Snapshot saved at %s\n", path)
}
func snapshotStatusCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 1 {
err := fmt.Errorf("snapshot status requires exactly one argument")
cobrautl.ExitWithError(cobrautl.ExitBadArgs, err)
}
initDisplayFromCmd(cmd)
lg, err := zap.NewProduction()
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
sp := snapshot.NewV3(lg)
ds, err := sp.Status(args[0])
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
display.DBStatus(ds)
fmt.Fprintf(os.Stderr, "Deprecated: Use `etcdutl snapshot status` instead.\n\n")
etcdutl.SnapshotStatusCommandFunc(cmd, args)
}
func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 1 {
err := fmt.Errorf("snapshot restore requires exactly one argument")
cobrautl.ExitWithError(cobrautl.ExitBadArgs, err)
}
dataDir := restoreDataDir
if dataDir == "" {
dataDir = restoreName + ".etcd"
}
walDir := restoreWalDir
if walDir == "" {
walDir = filepath.Join(dataDir, "member", "wal")
}
lg, err := zap.NewProduction()
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
sp := snapshot.NewV3(lg)
if err := sp.Restore(snapshot.RestoreConfig{
SnapshotPath: args[0],
Name: restoreName,
OutputDataDir: dataDir,
OutputWALDir: walDir,
PeerURLs: strings.Split(restorePeerURLs, ","),
InitialCluster: restoreCluster,
InitialClusterToken: restoreClusterToken,
SkipHashCheck: skipHashCheck,
}); err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
fmt.Fprintf(os.Stderr, "Deprecated: Use `etcdutl snapshot restore` instead.\n\n")
etcdutl.SnapshotRestoreCommandFunc(restoreCluster, restoreClusterToken, restoreDataDir, restoreWalDir,
restorePeerURLs, restoreName, skipHashCheck, args)
}
func initialClusterFromName(name string) string {

View File

@@ -87,7 +87,6 @@ func init() {
command.NewMemberCommand(),
command.NewSnapshotCommand(),
command.NewMakeMirrorCommand(),
command.NewMigrateCommand(),
command.NewLockCommand(),
command.NewElectCommand(),
command.NewAuthCommand(),