mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #8351 from gyuho/hash
*: add 'endpoint hashkv' command
This commit is contained in:
commit
a9b9ef5640
@ -6,7 +6,7 @@ Migrating an application from the API v2 to the API v3 involves two steps: 1) mi
|
||||
|
||||
## Migrate client library
|
||||
|
||||
API v3 is different from API v2, thus application developers need to use a new client library to send requests to etcd API v3. The documentation of the client v3 is available at https://godoc.org/github.com/coreos/etcd/clientv3.
|
||||
API v3 is different from API v2, thus application developers need to use a new client library to send requests to etcd API v3. The documentation of the client v3 is available at https://godoc.org/github.com/coreos/etcd/clientv3.
|
||||
|
||||
There are some notable differences between API v2 and API v3:
|
||||
|
||||
@ -38,13 +38,17 @@ Second, migrate the v2 keys into v3 with the [migrate][migrate_command] (`ETCDCT
|
||||
|
||||
Restart the etcd members and everything should just work.
|
||||
|
||||
For etcd v3.3+, run `ETCDCTL_API=3 etcdctl endpoint hashkv --cluster` to ensure key-value stores are consistent post migration.
|
||||
|
||||
**Warn**: When v2 store has expiring TTL keys and migrate command intends to preserve TTLs, migration may be inconsistent with the last committed v2 state when run on any member with a raft index less than the last leader's raft index.
|
||||
|
||||
### Online migration
|
||||
|
||||
If the application cannot tolerate any downtime, then it must migrate online. The implementation of online migration will vary from application to application but the overall idea is the same.
|
||||
|
||||
First, write application code using the v3 API. The application must support two modes: a migration mode and a normal mode. The application starts in migration mode. When running in migration mode, the application reads keys using the v3 API first, and, if it cannot find the key, it retries with the API v2. In normal mode, the application only reads keys using the v3 API. The application writes keys over the API v3 in both modes. To acknowledge a switch from migration mode to normal mode, the application watches on a switch mode key. When switch key’s value turns to `true`, the application switches over from migration mode to normal mode.
|
||||
|
||||
Second, start a background job to migrate data from the store v2 to the mvcc store by reading keys from the API v2 and writing keys to the API v3.
|
||||
Second, start a background job to migrate data from the store v2 to the mvcc store by reading keys from the API v2 and writing keys to the API v3.
|
||||
|
||||
After finishing data migration, the background job writes `true` into the switch mode key to notify the application that it may switch modes.
|
||||
|
||||
|
@ -23,6 +23,39 @@ import (
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
)
|
||||
|
||||
func TestMaintenanceHashKV(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
if _, err := clus.RandClient().Put(context.Background(), "foo", "bar"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
var hv uint32
|
||||
for i := 0; i < 3; i++ {
|
||||
cli := clus.Client(i)
|
||||
// ensure writes are replicated
|
||||
if _, err := cli.Get(context.TODO(), "foo"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
hresp, err := cli.HashKV(context.Background(), clus.Members[i].GRPCAddr(), 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if hv == 0 {
|
||||
hv = hresp.Hash
|
||||
continue
|
||||
}
|
||||
if hv != hresp.Hash {
|
||||
t.Fatalf("#%d: hash expected %d, got %d", i, hv, hresp.Hash)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMaintenanceMoveLeader(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
|
@ -28,6 +28,7 @@ type (
|
||||
AlarmResponse pb.AlarmResponse
|
||||
AlarmMember pb.AlarmMember
|
||||
StatusResponse pb.StatusResponse
|
||||
HashKVResponse pb.HashKVResponse
|
||||
MoveLeaderResponse pb.MoveLeaderResponse
|
||||
)
|
||||
|
||||
@ -50,6 +51,11 @@ type Maintenance interface {
|
||||
// Status gets the status of the endpoint.
|
||||
Status(ctx context.Context, endpoint string) (*StatusResponse, error)
|
||||
|
||||
// HashKV returns a hash of the KV state at the time of the RPC.
|
||||
// If revision is zero, the hash is computed on all keys. If the revision
|
||||
// is non-zero, the hash is computed on all keys at or below the given revision.
|
||||
HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error)
|
||||
|
||||
// Snapshot provides a reader for a snapshot of a backend.
|
||||
Snapshot(ctx context.Context) (io.ReadCloser, error)
|
||||
|
||||
@ -159,6 +165,19 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo
|
||||
return (*StatusResponse)(resp), nil
|
||||
}
|
||||
|
||||
func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error) {
|
||||
remote, cancel, err := m.dial(endpoint)
|
||||
if err != nil {
|
||||
return nil, toErr(ctx, err)
|
||||
}
|
||||
defer cancel()
|
||||
resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, grpc.FailFast(false))
|
||||
if err != nil {
|
||||
return nil, toErr(ctx, err)
|
||||
}
|
||||
return (*HashKVResponse)(resp), nil
|
||||
}
|
||||
|
||||
func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
|
||||
ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, grpc.FailFast(false))
|
||||
if err != nil {
|
||||
|
@ -15,12 +15,18 @@
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
)
|
||||
|
||||
func TestCtlV3EndpointHealth(t *testing.T) { testCtl(t, endpointHealthTest, withQuorum()) }
|
||||
func TestCtlV3EndpointStatus(t *testing.T) { testCtl(t, endpointStatusTest, withQuorum()) }
|
||||
func TestCtlV3EndpointHashKV(t *testing.T) { testCtl(t, endpointHashKVTest, withQuorum()) }
|
||||
|
||||
func endpointHealthTest(cx ctlCtx) {
|
||||
if err := ctlV3EndpointHealth(cx); err != nil {
|
||||
@ -52,3 +58,35 @@ func ctlV3EndpointStatus(cx ctlCtx) error {
|
||||
}
|
||||
return spawnWithExpects(cmdArgs, eps...)
|
||||
}
|
||||
|
||||
func endpointHashKVTest(cx ctlCtx) {
|
||||
if err := ctlV3EndpointHashKV(cx); err != nil {
|
||||
cx.t.Fatalf("endpointHashKVTest ctlV3EndpointHashKV error (%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
func ctlV3EndpointHashKV(cx ctlCtx) error {
|
||||
eps := cx.epc.EndpointsV3()
|
||||
|
||||
// get latest hash to compare
|
||||
cli, err := clientv3.New(clientv3.Config{
|
||||
Endpoints: eps,
|
||||
DialTimeout: 3 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
defer cli.Close()
|
||||
hresp, err := cli.HashKV(context.TODO(), eps[0], 0)
|
||||
if err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
|
||||
cmdArgs := append(cx.PrefixArgs(), "endpoint", "hashkv")
|
||||
var ss []string
|
||||
for _, ep := range cx.epc.EndpointsV3() {
|
||||
u, _ := url.Parse(ep)
|
||||
ss = append(ss, fmt.Sprintf("%s, %d", u.Host, hresp.Hash))
|
||||
}
|
||||
return spawnWithExpects(cmdArgs, ss...)
|
||||
}
|
||||
|
@ -641,6 +641,49 @@ Get the status for all endpoints in the cluster associated with the default endp
|
||||
+------------------------+------------------+----------------+---------+-----------+-----------+------------+
|
||||
```
|
||||
|
||||
### ENDPOINT HASHKV
|
||||
|
||||
ENDPOINT HASHKV fetches the hash of the key-value store of an endpoint.
|
||||
|
||||
#### Output
|
||||
|
||||
##### Simple format
|
||||
|
||||
Prints a humanized table of each endpoint URL and KV history hash.
|
||||
|
||||
##### JSON format
|
||||
|
||||
Prints a line of JSON encoding each endpoint URL and KV history hash.
|
||||
|
||||
#### Examples
|
||||
|
||||
Get the hash for the default endpoint:
|
||||
|
||||
```bash
|
||||
./etcdctl endpoint hashkv
|
||||
# 127.0.0.1:2379, 1084519789
|
||||
```
|
||||
|
||||
Get the status for the default endpoint as JSON:
|
||||
|
||||
```bash
|
||||
./etcdctl -w json endpoint hashkv
|
||||
# [{"Endpoint":"127.0.0.1:2379","Hash":{"header":{"cluster_id":14841639068965178418,"member_id":10276657743932975437,"revision":1,"raft_term":3},"hash":1084519789,"compact_revision":-1}}]
|
||||
```
|
||||
|
||||
Get the status for all endpoints in the cluster associated with the default endpoint:
|
||||
|
||||
```bash
|
||||
./etcdctl -w table endpoint --cluster hashkv
|
||||
+------------------------+------------+
|
||||
| ENDPOINT | HASH |
|
||||
+------------------------+------------+
|
||||
| http://127.0.0.1:12379 | 1084519789 |
|
||||
| http://127.0.0.1:22379 | 1084519789 |
|
||||
| http://127.0.0.1:32379 | 1084519789 |
|
||||
+------------------------+------------+
|
||||
```
|
||||
|
||||
### ALARM \<subcommand\>
|
||||
|
||||
Provides alarm related commands
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
)
|
||||
|
||||
var epClusterEndpoints bool
|
||||
var epHashKVRev int64
|
||||
|
||||
// NewEndpointCommand returns the cobra command for "endpoint".
|
||||
func NewEndpointCommand() *cobra.Command {
|
||||
@ -39,6 +40,7 @@ func NewEndpointCommand() *cobra.Command {
|
||||
ec.PersistentFlags().BoolVar(&epClusterEndpoints, "cluster", false, "use all endpoints from the cluster member list")
|
||||
ec.AddCommand(newEpHealthCommand())
|
||||
ec.AddCommand(newEpStatusCommand())
|
||||
ec.AddCommand(newEpHashKVCommand())
|
||||
|
||||
return ec
|
||||
}
|
||||
@ -64,6 +66,16 @@ The items in the lists are endpoint, ID, version, db size, is leader, raft term,
|
||||
}
|
||||
}
|
||||
|
||||
func newEpHashKVCommand() *cobra.Command {
|
||||
hc := &cobra.Command{
|
||||
Use: "hashkv",
|
||||
Short: "Prints the KV history hash for each endpoint in --endpoints",
|
||||
Run: epHashKVCommandFunc,
|
||||
}
|
||||
hc.PersistentFlags().Int64Var(&epHashKVRev, "rev", 0, "maximum revision to hash (default: all revisions)")
|
||||
return hc
|
||||
}
|
||||
|
||||
// epHealthCommandFunc executes the "endpoint-health" command.
|
||||
func epHealthCommandFunc(cmd *cobra.Command, args []string) {
|
||||
flags.SetPflagsFromEnv("ETCDCTL", cmd.InheritedFlags())
|
||||
@ -151,6 +163,35 @@ func epStatusCommandFunc(cmd *cobra.Command, args []string) {
|
||||
}
|
||||
}
|
||||
|
||||
type epHashKV struct {
|
||||
Ep string `json:"Endpoint"`
|
||||
Resp *v3.HashKVResponse `json:"HashKV"`
|
||||
}
|
||||
|
||||
func epHashKVCommandFunc(cmd *cobra.Command, args []string) {
|
||||
c := mustClientFromCmd(cmd)
|
||||
|
||||
hashList := []epHashKV{}
|
||||
var err error
|
||||
for _, ep := range endpointsFromCluster(cmd) {
|
||||
ctx, cancel := commandCtx(cmd)
|
||||
resp, serr := c.HashKV(ctx, ep, epHashKVRev)
|
||||
cancel()
|
||||
if serr != nil {
|
||||
err = serr
|
||||
fmt.Fprintf(os.Stderr, "Failed to get the hash of endpoint %s (%v)\n", ep, serr)
|
||||
continue
|
||||
}
|
||||
hashList = append(hashList, epHashKV{Ep: ep, Resp: resp})
|
||||
}
|
||||
|
||||
display.EndpointHashKV(hashList)
|
||||
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
}
|
||||
|
||||
func endpointsFromCluster(cmd *cobra.Command) []string {
|
||||
if !epClusterEndpoints {
|
||||
endpoints, err := cmd.Flags().GetStringSlice("endpoints")
|
||||
|
@ -43,6 +43,7 @@ type printer interface {
|
||||
MemberList(v3.MemberListResponse)
|
||||
|
||||
EndpointStatus([]epStatus)
|
||||
EndpointHashKV([]epHashKV)
|
||||
MoveLeader(leader, target uint64, r v3.MoveLeaderResponse)
|
||||
|
||||
Alarm(v3.AlarmResponse)
|
||||
@ -146,6 +147,7 @@ func newPrinterUnsupported(n string) printer {
|
||||
}
|
||||
|
||||
func (p *printerUnsupported) EndpointStatus([]epStatus) { p.p(nil) }
|
||||
func (p *printerUnsupported) EndpointHashKV([]epHashKV) { p.p(nil) }
|
||||
func (p *printerUnsupported) DBStatus(dbstatus) { p.p(nil) }
|
||||
|
||||
func (p *printerUnsupported) MoveLeader(leader, target uint64, r v3.MoveLeaderResponse) { p.p(nil) }
|
||||
@ -184,6 +186,17 @@ func makeEndpointStatusTable(statusList []epStatus) (hdr []string, rows [][]stri
|
||||
return
|
||||
}
|
||||
|
||||
func makeEndpointHashKVTable(hashList []epHashKV) (hdr []string, rows [][]string) {
|
||||
hdr = []string{"endpoint", "hash"}
|
||||
for _, h := range hashList {
|
||||
rows = append(rows, []string{
|
||||
h.Ep,
|
||||
fmt.Sprint(h.Resp.Hash),
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func makeDBStatusTable(ds dbstatus) (hdr []string, rows [][]string) {
|
||||
hdr = []string{"hash", "revision", "total keys", "total size"}
|
||||
rows = append(rows, []string{
|
||||
|
@ -146,6 +146,15 @@ func (p *fieldsPrinter) EndpointStatus(eps []epStatus) {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *fieldsPrinter) EndpointHashKV(hs []epHashKV) {
|
||||
for _, h := range hs {
|
||||
p.hdr(h.Resp.Header)
|
||||
fmt.Printf("\"Endpoint\" : %q\n", h.Ep)
|
||||
fmt.Println(`"Hash" :`, h.Resp.Hash)
|
||||
fmt.Println()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *fieldsPrinter) Alarm(r v3.AlarmResponse) {
|
||||
p.hdr(r.Header)
|
||||
for _, a := range r.Alarms {
|
||||
|
@ -29,6 +29,7 @@ func newJSONPrinter() printer {
|
||||
}
|
||||
|
||||
func (p *jsonPrinter) EndpointStatus(r []epStatus) { printJSON(r) }
|
||||
func (p *jsonPrinter) EndpointHashKV(r []epHashKV) { printJSON(r) }
|
||||
func (p *jsonPrinter) DBStatus(r dbstatus) { printJSON(r) }
|
||||
|
||||
func printJSON(v interface{}) {
|
||||
|
@ -136,6 +136,13 @@ func (s *simplePrinter) EndpointStatus(statusList []epStatus) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *simplePrinter) EndpointHashKV(hashList []epHashKV) {
|
||||
_, rows := makeEndpointHashKVTable(hashList)
|
||||
for _, row := range rows {
|
||||
fmt.Println(strings.Join(row, ", "))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *simplePrinter) DBStatus(ds dbstatus) {
|
||||
_, rows := makeDBStatusTable(ds)
|
||||
for _, row := range rows {
|
||||
|
@ -44,6 +44,16 @@ func (tp *tablePrinter) EndpointStatus(r []epStatus) {
|
||||
table.SetAlignment(tablewriter.ALIGN_RIGHT)
|
||||
table.Render()
|
||||
}
|
||||
func (tp *tablePrinter) EndpointHashKV(r []epHashKV) {
|
||||
hdr, rows := makeEndpointHashKVTable(r)
|
||||
table := tablewriter.NewWriter(os.Stdout)
|
||||
table.SetHeader(hdr)
|
||||
for _, row := range rows {
|
||||
table.Append(row)
|
||||
}
|
||||
table.SetAlignment(tablewriter.ALIGN_RIGHT)
|
||||
table.Render()
|
||||
}
|
||||
func (tp *tablePrinter) DBStatus(r dbstatus) {
|
||||
hdr, rows := makeDBStatusTable(r)
|
||||
table := tablewriter.NewWriter(os.Stdout)
|
||||
|
Loading…
x
Reference in New Issue
Block a user