etcd/etcdctl/ctlv3/command/ep_command.go
Benjamin Wang 74085136b3 etcdctl: connect to the same endpoint as the target to be maintained
The client may connect to a different endpint as the target to be
maintained. When auth is enabled, the target endpoint might haven't
finished applying the authentiation request, so it might reject the
corresponding maintenance request due to permission denied.

Signed-off-by: Benjamin Wang <wachao@vmware.com>
2022-11-03 04:55:27 +08:00

303 lines
7.9 KiB
Go

// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package command
import (
"fmt"
"os"
"sync"
"time"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/pkg/v3/flags"
"github.com/spf13/cobra"
"go.uber.org/zap"
)
var epClusterEndpoints bool
var epHashKVRev int64
// NewEndpointCommand returns the cobra command for "endpoint".
func NewEndpointCommand() *cobra.Command {
ec := &cobra.Command{
Use: "endpoint <subcommand>",
Short: "Endpoint related commands",
}
ec.PersistentFlags().BoolVar(&epClusterEndpoints, "cluster", false, "use all endpoints from the cluster member list")
ec.AddCommand(newEpHealthCommand())
ec.AddCommand(newEpStatusCommand())
ec.AddCommand(newEpHashKVCommand())
return ec
}
func newEpHealthCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "health",
Short: "Checks the healthiness of endpoints specified in `--endpoints` flag",
Run: epHealthCommandFunc,
}
return cmd
}
func newEpStatusCommand() *cobra.Command {
return &cobra.Command{
Use: "status",
Short: "Prints out the status of endpoints specified in `--endpoints` flag",
Long: `When --write-out is set to simple, this command prints out comma-separated status lists for each endpoint.
The items in the lists are endpoint, ID, version, db size, is leader, is learner, raft term, raft index, raft applied index, errors.
`,
Run: epStatusCommandFunc,
}
}
func newEpHashKVCommand() *cobra.Command {
hc := &cobra.Command{
Use: "hashkv",
Short: "Prints the KV history hash for each endpoint in --endpoints",
Run: epHashKVCommandFunc,
}
hc.PersistentFlags().Int64Var(&epHashKVRev, "rev", 0, "maximum revision to hash (default: all revisions)")
return hc
}
type epHealth struct {
Ep string `json:"endpoint"`
Health bool `json:"health"`
Took string `json:"took"`
Error string `json:"error,omitempty"`
}
// epHealthCommandFunc executes the "endpoint-health" command.
func epHealthCommandFunc(cmd *cobra.Command, args []string) {
lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
flags.SetPflagsFromEnv(lg, "ETCDCTL", cmd.InheritedFlags())
initDisplayFromCmd(cmd)
sec := secureCfgFromCmd(cmd)
dt := dialTimeoutFromCmd(cmd)
ka := keepAliveTimeFromCmd(cmd)
kat := keepAliveTimeoutFromCmd(cmd)
auth := authCfgFromCmd(cmd)
var cfgs []*clientv3.Config
for _, ep := range endpointsFromCluster(cmd) {
cfg, err := clientv3.NewClientConfig(&clientv3.ConfigSpec{
Endpoints: []string{ep},
DialTimeout: dt,
KeepAliveTime: ka,
KeepAliveTimeout: kat,
Secure: sec,
Auth: auth,
}, lg)
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitBadArgs, err)
}
cfgs = append(cfgs, cfg)
}
var wg sync.WaitGroup
hch := make(chan epHealth, len(cfgs))
for _, cfg := range cfgs {
wg.Add(1)
go func(cfg *clientv3.Config) {
defer wg.Done()
ep := cfg.Endpoints[0]
cfg.Logger = lg.Named("client")
cli, err := clientv3.New(*cfg)
if err != nil {
hch <- epHealth{Ep: ep, Health: false, Error: err.Error()}
return
}
st := time.Now()
// get a random key. As long as we can get the response without an error, the
// endpoint is health.
ctx, cancel := commandCtx(cmd)
_, err = cli.Get(ctx, "health")
eh := epHealth{Ep: ep, Health: false, Took: time.Since(st).String()}
// permission denied is OK since proposal goes through consensus to get it
if err == nil || err == rpctypes.ErrPermissionDenied {
eh.Health = true
} else {
eh.Error = err.Error()
}
if eh.Health {
resp, err := cli.AlarmList(ctx)
if err == nil && len(resp.Alarms) > 0 {
eh.Health = false
eh.Error = "Active Alarm(s): "
for _, v := range resp.Alarms {
switch v.Alarm {
case etcdserverpb.AlarmType_NOSPACE:
eh.Error = eh.Error + "NOSPACE "
case etcdserverpb.AlarmType_CORRUPT:
eh.Error = eh.Error + "CORRUPT "
default:
eh.Error = eh.Error + "UNKNOWN "
}
}
} else if err != nil {
eh.Health = false
eh.Error = "Unable to fetch the alarm list"
}
}
cancel()
hch <- eh
}(cfg)
}
wg.Wait()
close(hch)
errs := false
var healthList []epHealth
for h := range hch {
healthList = append(healthList, h)
if h.Error != "" {
errs = true
}
}
display.EndpointHealth(healthList)
if errs {
cobrautl.ExitWithError(cobrautl.ExitError, fmt.Errorf("unhealthy cluster"))
}
}
type epStatus struct {
Ep string `json:"Endpoint"`
Resp *clientv3.StatusResponse `json:"Status"`
}
func epStatusCommandFunc(cmd *cobra.Command, args []string) {
cfg := clientConfigFromCmd(cmd)
var statusList []epStatus
var err error
for _, ep := range endpointsFromCluster(cmd) {
cfg.Endpoints = []string{ep}
c := mustClient(cfg)
ctx, cancel := commandCtx(cmd)
resp, serr := c.Status(ctx, ep)
cancel()
c.Close()
if serr != nil {
err = serr
fmt.Fprintf(os.Stderr, "Failed to get the status of endpoint %s (%v)\n", ep, serr)
continue
}
statusList = append(statusList, epStatus{Ep: ep, Resp: resp})
}
display.EndpointStatus(statusList)
if err != nil {
os.Exit(cobrautl.ExitError)
}
}
type epHashKV struct {
Ep string `json:"Endpoint"`
Resp *clientv3.HashKVResponse `json:"HashKV"`
}
func epHashKVCommandFunc(cmd *cobra.Command, args []string) {
cfg := clientConfigFromCmd(cmd)
var hashList []epHashKV
var err error
for _, ep := range endpointsFromCluster(cmd) {
cfg.Endpoints = []string{ep}
c := mustClient(cfg)
ctx, cancel := commandCtx(cmd)
resp, serr := c.HashKV(ctx, ep, epHashKVRev)
cancel()
c.Close()
if serr != nil {
err = serr
fmt.Fprintf(os.Stderr, "Failed to get the hash of endpoint %s (%v)\n", ep, serr)
continue
}
hashList = append(hashList, epHashKV{Ep: ep, Resp: resp})
}
display.EndpointHashKV(hashList)
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
}
func endpointsFromCluster(cmd *cobra.Command) []string {
if !epClusterEndpoints {
endpoints, err := cmd.Flags().GetStringSlice("endpoints")
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
return endpoints
}
sec := secureCfgFromCmd(cmd)
dt := dialTimeoutFromCmd(cmd)
ka := keepAliveTimeFromCmd(cmd)
kat := keepAliveTimeoutFromCmd(cmd)
eps, err := endpointsFromCmd(cmd)
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
// exclude auth for not asking needless password (MemberList() doesn't need authentication)
lg, _ := logutil.CreateDefaultZapLogger(zap.InfoLevel)
cfg, err := clientv3.NewClientConfig(&clientv3.ConfigSpec{
Endpoints: eps,
DialTimeout: dt,
KeepAliveTime: ka,
KeepAliveTimeout: kat,
Secure: sec,
}, lg)
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
c, err := clientv3.New(*cfg)
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
ctx, cancel := commandCtx(cmd)
defer func() {
c.Close()
cancel()
}()
membs, err := c.MemberList(ctx)
if err != nil {
err = fmt.Errorf("failed to fetch endpoints from etcd cluster member list: %v", err)
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
var ret []string
for _, m := range membs.Members {
ret = append(ret, m.ClientURLs...)
}
return ret
}