mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #8153 from gyuho/leadership-transfer
*: expose Leadership Transfer API to clients
This commit is contained in:
commit
a57405a958
@ -70,6 +70,7 @@ This is a generated documentation. Please read the proto files for more.
|
||||
| Defragment | DefragmentRequest | DefragmentResponse | Defragment defragments a member's backend database to recover storage space. |
|
||||
| Hash | HashRequest | HashResponse | Hash returns the hash of the local KV state for consistency checking purpose. This is designed for testing; do not use this in production when there are ongoing transactions. |
|
||||
| Snapshot | SnapshotRequest | SnapshotResponse | Snapshot sends a snapshot of the entire backend from a member over a stream to a client. |
|
||||
| MoveLeader | MoveLeaderRequest | MoveLeaderResponse | MoveLeader requests current leader node to transfer its leadership to transferee. |
|
||||
|
||||
|
||||
|
||||
@ -608,6 +609,22 @@ Empty field.
|
||||
|
||||
|
||||
|
||||
##### message `MoveLeaderRequest` (etcdserver/etcdserverpb/rpc.proto)
|
||||
|
||||
| Field | Description | Type |
|
||||
| ----- | ----------- | ---- |
|
||||
| targetID | targetID is the node ID for the new leader. | uint64 |
|
||||
|
||||
|
||||
|
||||
##### message `MoveLeaderResponse` (etcdserver/etcdserverpb/rpc.proto)
|
||||
|
||||
| Field | Description | Type |
|
||||
| ----- | ----------- | ---- |
|
||||
| header | | ResponseHeader |
|
||||
|
||||
|
||||
|
||||
##### message `PutRequest` (etcdserver/etcdserverpb/rpc.proto)
|
||||
|
||||
| Field | Description | Type |
|
||||
|
@ -934,6 +934,33 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"/v3alpha/maintenance/transfer-leadership": {
|
||||
"post": {
|
||||
"tags": [
|
||||
"Maintenance"
|
||||
],
|
||||
"summary": "MoveLeader requests current leader node to transfer its leadership to transferee.",
|
||||
"operationId": "MoveLeader",
|
||||
"parameters": [
|
||||
{
|
||||
"name": "body",
|
||||
"in": "body",
|
||||
"required": true,
|
||||
"schema": {
|
||||
"$ref": "#/definitions/etcdserverpbMoveLeaderRequest"
|
||||
}
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "(empty)",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/etcdserverpbMoveLeaderResponse"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/v3alpha/watch": {
|
||||
"post": {
|
||||
"tags": [
|
||||
@ -1803,6 +1830,24 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"etcdserverpbMoveLeaderRequest": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"targetID": {
|
||||
"description": "targetID is the node ID for the new leader.",
|
||||
"type": "string",
|
||||
"format": "uint64"
|
||||
}
|
||||
}
|
||||
},
|
||||
"etcdserverpbMoveLeaderResponse": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"header": {
|
||||
"$ref": "#/definitions/etcdserverpbResponseHeader"
|
||||
}
|
||||
}
|
||||
},
|
||||
"etcdserverpbPutRequest": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
@ -2177,7 +2222,7 @@
|
||||
"format": "boolean"
|
||||
},
|
||||
"compact_revision": {
|
||||
"description": "compact_revision is set to the minimum index if a watcher tries to watch\nat a compacted index.\n\nThis happens when creating a watcher at a compacted revision or the watcher cannot\ncatch up with the progress of the key-value store. \n\nThe client should treat the watcher as canceled and should not try to create any\nwatcher with the same start_revision again.",
|
||||
"description": "compact_revision is set to the minimum index if a watcher tries to watch\nat a compacted index.\n\nThis happens when creating a watcher at a compacted revision or the watcher cannot\ncatch up with the progress of the key-value store.\n\nThe client should treat the watcher as canceled and should not try to create any\nwatcher with the same start_revision again.",
|
||||
"type": "string",
|
||||
"format": "int64"
|
||||
},
|
||||
|
53
clientv3/integration/maintenance_test.go
Normal file
53
clientv3/integration/maintenance_test.go
Normal file
@ -0,0 +1,53 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
"github.com/coreos/etcd/integration"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
)
|
||||
|
||||
func TestMaintenanceMoveLeader(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
oldLeadIdx := clus.WaitLeader(t)
|
||||
targetIdx := (oldLeadIdx + 1) % 3
|
||||
target := uint64(clus.Members[targetIdx].ID())
|
||||
|
||||
cli := clus.Client(targetIdx)
|
||||
_, err := cli.MoveLeader(context.Background(), target)
|
||||
if err != rpctypes.ErrNotLeader {
|
||||
t.Fatalf("error expected %v, got %v", rpctypes.ErrNotLeader, err)
|
||||
}
|
||||
|
||||
cli = clus.Client(oldLeadIdx)
|
||||
_, err = cli.MoveLeader(context.Background(), target)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
leadIdx := clus.WaitLeader(t)
|
||||
lead := uint64(clus.Members[leadIdx].ID())
|
||||
if target != lead {
|
||||
t.Fatalf("new leader expected %d, got %d", target, lead)
|
||||
}
|
||||
}
|
@ -28,6 +28,7 @@ type (
|
||||
AlarmResponse pb.AlarmResponse
|
||||
AlarmMember pb.AlarmMember
|
||||
StatusResponse pb.StatusResponse
|
||||
MoveLeaderResponse pb.MoveLeaderResponse
|
||||
)
|
||||
|
||||
type Maintenance interface {
|
||||
@ -51,6 +52,10 @@ type Maintenance interface {
|
||||
|
||||
// Snapshot provides a reader for a snapshot of a backend.
|
||||
Snapshot(ctx context.Context) (io.ReadCloser, error)
|
||||
|
||||
// MoveLeader requests current leader to transfer its leadership to the transferee.
|
||||
// Request must be made to the leader.
|
||||
MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error)
|
||||
}
|
||||
|
||||
type maintenance struct {
|
||||
@ -180,3 +185,8 @@ func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
|
||||
}()
|
||||
return pr, nil
|
||||
}
|
||||
|
||||
func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) {
|
||||
resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, grpc.FailFast(false))
|
||||
return (*MoveLeaderResponse)(resp), toErr(ctx, err)
|
||||
}
|
||||
|
92
e2e/ctl_v3_move_leader_test.go
Normal file
92
e2e/ctl_v3_move_leader_test.go
Normal file
@ -0,0 +1,92 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
)
|
||||
|
||||
func TestCtlV3MoveLeader(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
epc := setupEtcdctlTest(t, &configNoTLS, true)
|
||||
defer func() {
|
||||
if errC := epc.Close(); errC != nil {
|
||||
t.Fatalf("error closing etcd processes (%v)", errC)
|
||||
}
|
||||
}()
|
||||
|
||||
var leadIdx int
|
||||
var leaderID uint64
|
||||
var transferee uint64
|
||||
for i, ep := range epc.grpcEndpoints() {
|
||||
cli, err := clientv3.New(clientv3.Config{
|
||||
Endpoints: []string{ep},
|
||||
DialTimeout: 3 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
resp, err := cli.Status(context.Background(), ep)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cli.Close()
|
||||
|
||||
if resp.Header.GetMemberId() == resp.Leader {
|
||||
leadIdx = i
|
||||
leaderID = resp.Leader
|
||||
} else {
|
||||
transferee = resp.Header.GetMemberId()
|
||||
}
|
||||
}
|
||||
|
||||
os.Setenv("ETCDCTL_API", "3")
|
||||
defer os.Unsetenv("ETCDCTL_API")
|
||||
cx := ctlCtx{
|
||||
t: t,
|
||||
cfg: configNoTLS,
|
||||
dialTimeout: 7 * time.Second,
|
||||
epc: epc,
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
prefixes []string
|
||||
expect string
|
||||
}{
|
||||
{ // request to non-leader
|
||||
cx.prefixArgs([]string{cx.epc.grpcEndpoints()[(leadIdx+1)%3]}),
|
||||
"no leader endpoint given at ",
|
||||
},
|
||||
{ // request to leader
|
||||
cx.prefixArgs([]string{cx.epc.grpcEndpoints()[leadIdx]}),
|
||||
fmt.Sprintf("Leadership transferred from %s to %s", types.ID(leaderID), types.ID(transferee)),
|
||||
},
|
||||
}
|
||||
for i, tc := range tests {
|
||||
cmdArgs := append(tc.prefixes, "move-leader", types.ID(transferee).String())
|
||||
if err := spawnWithExpect(cmdArgs, tc.expect); err != nil {
|
||||
t.Fatalf("#%d: %v", i, err)
|
||||
}
|
||||
}
|
||||
}
|
@ -805,6 +805,29 @@ Prints a line of JSON encoding the database hash, revision, total keys, and size
|
||||
+----------+----------+------------+------------+
|
||||
```
|
||||
|
||||
### MOVE-LEADER \<hexadecimal-transferee-id\>
|
||||
|
||||
MOVE-LEADER transfers leadership from the leader to another member in the cluster.
|
||||
|
||||
#### Example
|
||||
|
||||
```bash
|
||||
# to choose transferee
|
||||
transferee_id=$(./etcdctl \
|
||||
--endpoints localhost:12379,localhost:22379,localhost:32379 \
|
||||
endpoint status | grep -m 1 "false" | awk -F', ' '{print $2}')
|
||||
echo ${transferee_id}
|
||||
# c89feb932daef420
|
||||
|
||||
# endpoints should include leader node
|
||||
./etcdctl --endpoints ${transferee_ep} move-leader ${transferee_id}
|
||||
# Error: no leader endpoint given at [localhost:22379 localhost:32379]
|
||||
|
||||
# request to leader with target node ID
|
||||
./etcdctl --endpoints ${leader_ep} move-leader ${transferee_id}
|
||||
# Leadership transferred from 45ddc0e800e20b93 to c89feb932daef420
|
||||
```
|
||||
|
||||
## Concurrency commands
|
||||
|
||||
### LOCK \<lockname\> [command arg1 arg2 ...]
|
||||
|
87
etcdctl/ctlv3/command/move_leader_command.go
Normal file
87
etcdctl/ctlv3/command/move_leader_command.go
Normal file
@ -0,0 +1,87 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package command
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
// NewMoveLeaderCommand returns the cobra command for "move-leader".
|
||||
func NewMoveLeaderCommand() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "move-leader <transferee-member-id>",
|
||||
Short: "Transfers leadership to another etcd cluster member.",
|
||||
Run: transferLeadershipCommandFunc,
|
||||
}
|
||||
return cmd
|
||||
}
|
||||
|
||||
// transferLeadershipCommandFunc executes the "compaction" command.
|
||||
func transferLeadershipCommandFunc(cmd *cobra.Command, args []string) {
|
||||
if len(args) != 1 {
|
||||
ExitWithError(ExitBadArgs, fmt.Errorf("move-leader command needs 1 argument"))
|
||||
}
|
||||
target, err := strconv.ParseUint(args[0], 16, 64)
|
||||
if err != nil {
|
||||
ExitWithError(ExitBadArgs, err)
|
||||
}
|
||||
|
||||
c := mustClientFromCmd(cmd)
|
||||
eps := c.Endpoints()
|
||||
c.Close()
|
||||
|
||||
ctx, cancel := commandCtx(cmd)
|
||||
|
||||
// find current leader
|
||||
var leaderCli *clientv3.Client
|
||||
var leaderID uint64
|
||||
for _, ep := range eps {
|
||||
cli, err := clientv3.New(clientv3.Config{
|
||||
Endpoints: []string{ep},
|
||||
DialTimeout: 3 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
resp, err := cli.Status(ctx, ep)
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
|
||||
if resp.Header.GetMemberId() == resp.Leader {
|
||||
leaderCli = cli
|
||||
leaderID = resp.Leader
|
||||
break
|
||||
}
|
||||
cli.Close()
|
||||
}
|
||||
if leaderCli == nil {
|
||||
ExitWithError(ExitBadArgs, fmt.Errorf("no leader endpoint given at %v", eps))
|
||||
}
|
||||
|
||||
var resp *clientv3.MoveLeaderResponse
|
||||
resp, err = leaderCli.MoveLeader(ctx, target)
|
||||
cancel()
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
}
|
||||
|
||||
display.MoveLeader(leaderID, target, *resp)
|
||||
}
|
@ -43,6 +43,7 @@ type printer interface {
|
||||
MemberList(v3.MemberListResponse)
|
||||
|
||||
EndpointStatus([]epStatus)
|
||||
MoveLeader(leader, target uint64, r v3.MoveLeaderResponse)
|
||||
|
||||
Alarm(v3.AlarmResponse)
|
||||
DBStatus(dbstatus)
|
||||
@ -104,7 +105,9 @@ func (p *printerRPC) MemberUpdate(id uint64, r v3.MemberUpdateResponse) {
|
||||
}
|
||||
func (p *printerRPC) MemberList(r v3.MemberListResponse) { p.p((*pb.MemberListResponse)(&r)) }
|
||||
func (p *printerRPC) Alarm(r v3.AlarmResponse) { p.p((*pb.AlarmResponse)(&r)) }
|
||||
|
||||
func (p *printerRPC) MoveLeader(leader, target uint64, r v3.MoveLeaderResponse) {
|
||||
p.p((*pb.MoveLeaderResponse)(&r))
|
||||
}
|
||||
func (p *printerRPC) RoleAdd(_ string, r v3.AuthRoleAddResponse) { p.p((*pb.AuthRoleAddResponse)(&r)) }
|
||||
func (p *printerRPC) RoleGet(_ string, r v3.AuthRoleGetResponse) { p.p((*pb.AuthRoleGetResponse)(&r)) }
|
||||
func (p *printerRPC) RoleDelete(_ string, r v3.AuthRoleDeleteResponse) {
|
||||
@ -145,6 +148,8 @@ func newPrinterUnsupported(n string) printer {
|
||||
func (p *printerUnsupported) EndpointStatus([]epStatus) { p.p(nil) }
|
||||
func (p *printerUnsupported) DBStatus(dbstatus) { p.p(nil) }
|
||||
|
||||
func (p *printerUnsupported) MoveLeader(leader, target uint64, r v3.MoveLeaderResponse) { p.p(nil) }
|
||||
|
||||
func makeMemberListTable(r v3.MemberListResponse) (hdr []string, rows [][]string) {
|
||||
hdr = []string{"ID", "Status", "Name", "Peer Addrs", "Client Addrs"}
|
||||
for _, m := range r.Members {
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
|
||||
v3 "github.com/coreos/etcd/clientv3"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
)
|
||||
|
||||
type simplePrinter struct {
|
||||
@ -142,6 +143,10 @@ func (s *simplePrinter) DBStatus(ds dbstatus) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *simplePrinter) MoveLeader(leader, target uint64, r v3.MoveLeaderResponse) {
|
||||
fmt.Printf("Leadership transferred from %s to %s\n", types.ID(leader), types.ID(target))
|
||||
}
|
||||
|
||||
func (s *simplePrinter) RoleAdd(role string, r v3.AuthRoleAddResponse) {
|
||||
fmt.Printf("Role %s created\n", role)
|
||||
}
|
||||
|
@ -69,6 +69,7 @@ func init() {
|
||||
command.NewAlarmCommand(),
|
||||
command.NewDefragCommand(),
|
||||
command.NewEndpointCommand(),
|
||||
command.NewMoveLeaderCommand(),
|
||||
command.NewWatchCommand(),
|
||||
command.NewVersionCommand(),
|
||||
command.NewLeaseCommand(),
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
|
||||
"github.com/coreos/etcd/auth"
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/mvcc"
|
||||
"github.com/coreos/etcd/mvcc/backend"
|
||||
@ -40,9 +41,14 @@ type Alarmer interface {
|
||||
Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
|
||||
}
|
||||
|
||||
type LeaderTransferrer interface {
|
||||
MoveLeader(ctx context.Context, lead, target uint64) error
|
||||
}
|
||||
|
||||
type RaftStatusGetter interface {
|
||||
Index() uint64
|
||||
Term() uint64
|
||||
ID() types.ID
|
||||
Leader() types.ID
|
||||
}
|
||||
|
||||
@ -56,11 +62,12 @@ type maintenanceServer struct {
|
||||
kg KVGetter
|
||||
bg BackendGetter
|
||||
a Alarmer
|
||||
lt LeaderTransferrer
|
||||
hdr header
|
||||
}
|
||||
|
||||
func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
|
||||
srv := &maintenanceServer{rg: s, kg: s, bg: s, a: s, hdr: newHeader(s)}
|
||||
srv := &maintenanceServer{rg: s, kg: s, bg: s, a: s, lt: s, hdr: newHeader(s)}
|
||||
return &authMaintenanceServer{srv, s}
|
||||
}
|
||||
|
||||
@ -147,6 +154,17 @@ func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (ms *maintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
|
||||
if ms.rg.ID() != ms.rg.Leader() {
|
||||
return nil, rpctypes.ErrGRPCNotLeader
|
||||
}
|
||||
|
||||
if err := ms.lt.MoveLeader(ctx, uint64(ms.rg.Leader()), tr.TargetID); err != nil {
|
||||
return nil, togRPCError(err)
|
||||
}
|
||||
return &pb.MoveLeaderResponse{}, nil
|
||||
}
|
||||
|
||||
type authMaintenanceServer struct {
|
||||
*maintenanceServer
|
||||
ag AuthGetter
|
||||
@ -188,3 +206,7 @@ func (ams *authMaintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (
|
||||
func (ams *authMaintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
|
||||
return ams.maintenanceServer.Status(ctx, ar)
|
||||
}
|
||||
|
||||
func (ams *authMaintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
|
||||
return ams.maintenanceServer.MoveLeader(ctx, tr)
|
||||
}
|
||||
|
@ -59,6 +59,7 @@ var (
|
||||
ErrGRPCInvalidAuthMgmt = grpc.Errorf(codes.InvalidArgument, "etcdserver: invalid auth management")
|
||||
|
||||
ErrGRPCNoLeader = grpc.Errorf(codes.Unavailable, "etcdserver: no leader")
|
||||
ErrGRPCNotLeader = grpc.Errorf(codes.Unavailable, "etcdserver: not leader")
|
||||
ErrGRPCNotCapable = grpc.Errorf(codes.Unavailable, "etcdserver: not capable")
|
||||
ErrGRPCStopped = grpc.Errorf(codes.Unavailable, "etcdserver: server stopped")
|
||||
ErrGRPCTimeout = grpc.Errorf(codes.Unavailable, "etcdserver: request timed out")
|
||||
@ -106,6 +107,7 @@ var (
|
||||
grpc.ErrorDesc(ErrGRPCInvalidAuthMgmt): ErrGRPCInvalidAuthMgmt,
|
||||
|
||||
grpc.ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
|
||||
grpc.ErrorDesc(ErrGRPCNotLeader): ErrGRPCNotLeader,
|
||||
grpc.ErrorDesc(ErrGRPCNotCapable): ErrGRPCNotCapable,
|
||||
grpc.ErrorDesc(ErrGRPCStopped): ErrGRPCStopped,
|
||||
grpc.ErrorDesc(ErrGRPCTimeout): ErrGRPCTimeout,
|
||||
@ -153,6 +155,7 @@ var (
|
||||
ErrInvalidAuthMgmt = Error(ErrGRPCInvalidAuthMgmt)
|
||||
|
||||
ErrNoLeader = Error(ErrGRPCNoLeader)
|
||||
ErrNotLeader = Error(ErrGRPCNotLeader)
|
||||
ErrNotCapable = Error(ErrGRPCNotCapable)
|
||||
ErrStopped = Error(ErrGRPCStopped)
|
||||
ErrTimeout = Error(ErrGRPCTimeout)
|
||||
|
@ -39,6 +39,7 @@ var toGRPCErrorMap = map[error]error{
|
||||
etcdserver.ErrTooManyRequests: rpctypes.ErrTooManyRequests,
|
||||
|
||||
etcdserver.ErrNoLeader: rpctypes.ErrGRPCNoLeader,
|
||||
etcdserver.ErrNotLeader: rpctypes.ErrGRPCNotLeader,
|
||||
etcdserver.ErrStopped: rpctypes.ErrGRPCStopped,
|
||||
etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout,
|
||||
etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail,
|
||||
|
@ -29,6 +29,7 @@ var (
|
||||
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
|
||||
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
|
||||
ErrNoLeader = errors.New("etcdserver: no leader")
|
||||
ErrNotLeader = errors.New("etcdserver: not leader")
|
||||
ErrRequestTooLarge = errors.New("etcdserver: request is too large")
|
||||
ErrNoSpace = errors.New("etcdserver: no space")
|
||||
ErrTooManyRequests = errors.New("etcdserver: too many requests")
|
||||
|
@ -58,6 +58,8 @@
|
||||
MemberListResponse
|
||||
DefragmentRequest
|
||||
DefragmentResponse
|
||||
MoveLeaderRequest
|
||||
MoveLeaderResponse
|
||||
AlarmRequest
|
||||
AlarmMember
|
||||
AlarmResponse
|
||||
|
@ -361,6 +361,19 @@ func request_Maintenance_Snapshot_0(ctx context.Context, marshaler runtime.Marsh
|
||||
|
||||
}
|
||||
|
||||
func request_Maintenance_MoveLeader_0(ctx context.Context, marshaler runtime.Marshaler, client etcdserverpb.MaintenanceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
|
||||
var protoReq etcdserverpb.MoveLeaderRequest
|
||||
var metadata runtime.ServerMetadata
|
||||
|
||||
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil {
|
||||
return nil, metadata, grpc.Errorf(codes.InvalidArgument, "%v", err)
|
||||
}
|
||||
|
||||
msg, err := client.MoveLeader(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
|
||||
return msg, metadata, err
|
||||
|
||||
}
|
||||
|
||||
func request_Auth_AuthEnable_0(ctx context.Context, marshaler runtime.Marshaler, client etcdserverpb.AuthClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
|
||||
var protoReq etcdserverpb.AuthEnableRequest
|
||||
var metadata runtime.ServerMetadata
|
||||
@ -1335,6 +1348,34 @@ func RegisterMaintenanceHandler(ctx context.Context, mux *runtime.ServeMux, conn
|
||||
|
||||
})
|
||||
|
||||
mux.Handle("POST", pattern_Maintenance_MoveLeader_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
if cn, ok := w.(http.CloseNotifier); ok {
|
||||
go func(done <-chan struct{}, closed <-chan bool) {
|
||||
select {
|
||||
case <-done:
|
||||
case <-closed:
|
||||
cancel()
|
||||
}
|
||||
}(ctx.Done(), cn.CloseNotify())
|
||||
}
|
||||
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
|
||||
rctx, err := runtime.AnnotateContext(ctx, req)
|
||||
if err != nil {
|
||||
runtime.HTTPError(ctx, outboundMarshaler, w, req, err)
|
||||
}
|
||||
resp, md, err := request_Maintenance_MoveLeader_0(rctx, inboundMarshaler, client, req, pathParams)
|
||||
ctx = runtime.NewServerMetadataContext(ctx, md)
|
||||
if err != nil {
|
||||
runtime.HTTPError(ctx, outboundMarshaler, w, req, err)
|
||||
return
|
||||
}
|
||||
|
||||
forward_Maintenance_MoveLeader_0(ctx, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
|
||||
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1348,6 +1389,8 @@ var (
|
||||
pattern_Maintenance_Hash_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v3alpha", "maintenance", "hash"}, ""))
|
||||
|
||||
pattern_Maintenance_Snapshot_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v3alpha", "maintenance", "snapshot"}, ""))
|
||||
|
||||
pattern_Maintenance_MoveLeader_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v3alpha", "maintenance", "transfer-leadership"}, ""))
|
||||
)
|
||||
|
||||
var (
|
||||
@ -1360,6 +1403,8 @@ var (
|
||||
forward_Maintenance_Hash_0 = runtime.ForwardResponseMessage
|
||||
|
||||
forward_Maintenance_Snapshot_0 = runtime.ForwardResponseStream
|
||||
|
||||
forward_Maintenance_MoveLeader_0 = runtime.ForwardResponseMessage
|
||||
)
|
||||
|
||||
// RegisterAuthHandlerFromEndpoint is same as RegisterAuthHandler but
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -191,6 +191,14 @@ service Maintenance {
|
||||
body: "*"
|
||||
};
|
||||
}
|
||||
|
||||
// MoveLeader requests current leader node to transfer its leadership to transferee.
|
||||
rpc MoveLeader(MoveLeaderRequest) returns (MoveLeaderResponse) {
|
||||
option (google.api.http) = {
|
||||
post: "/v3alpha/maintenance/transfer-leadership"
|
||||
body: "*"
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
service Auth {
|
||||
@ -380,7 +388,7 @@ message RangeRequest {
|
||||
|
||||
// keys_only when set returns only the keys and not the values.
|
||||
bool keys_only = 8;
|
||||
|
||||
|
||||
// count_only when set returns only the count of the keys in the range.
|
||||
bool count_only = 9;
|
||||
|
||||
@ -558,7 +566,7 @@ message TxnResponse {
|
||||
// CompactionRequest compacts the key-value store up to a given revision. All superseded keys
|
||||
// with a revision less than the compaction revision will be removed.
|
||||
message CompactionRequest {
|
||||
// revision is the key-value store revision for the compaction operation.
|
||||
// revision is the key-value store revision for the compaction operation.
|
||||
int64 revision = 1;
|
||||
// physical is set so the RPC will wait until the compaction is physically
|
||||
// applied to the local database such that compacted entries are totally
|
||||
@ -654,7 +662,7 @@ message WatchResponse {
|
||||
// at a compacted index.
|
||||
//
|
||||
// This happens when creating a watcher at a compacted revision or the watcher cannot
|
||||
// catch up with the progress of the key-value store.
|
||||
// catch up with the progress of the key-value store.
|
||||
//
|
||||
// The client should treat the watcher as canceled and should not try to create any
|
||||
// watcher with the same start_revision again.
|
||||
@ -787,6 +795,15 @@ message DefragmentResponse {
|
||||
ResponseHeader header = 1;
|
||||
}
|
||||
|
||||
message MoveLeaderRequest {
|
||||
// targetID is the node ID for the new leader.
|
||||
uint64 targetID = 1;
|
||||
}
|
||||
|
||||
message MoveLeaderResponse {
|
||||
ResponseHeader header = 1;
|
||||
}
|
||||
|
||||
enum AlarmType {
|
||||
NONE = 0; // default, used to query if any alarm is active
|
||||
NOSPACE = 1; // space quota is exhausted
|
||||
|
@ -932,9 +932,8 @@ func (s *EtcdServer) isLeader() bool {
|
||||
return uint64(s.ID()) == s.Lead()
|
||||
}
|
||||
|
||||
// transferLeadership transfers the leader to the given transferee.
|
||||
// TODO: maybe expose to client?
|
||||
func (s *EtcdServer) transferLeadership(ctx context.Context, lead, transferee uint64) error {
|
||||
// MoveLeader transfers the leader to the given transferee.
|
||||
func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) error {
|
||||
now := time.Now()
|
||||
interval := time.Duration(s.Cfg.TickMs) * time.Millisecond
|
||||
|
||||
@ -973,7 +972,7 @@ func (s *EtcdServer) TransferLeadership() error {
|
||||
|
||||
tm := s.Cfg.ReqTimeout()
|
||||
ctx, cancel := context.WithTimeout(s.ctx, tm)
|
||||
err := s.transferLeadership(ctx, s.Lead(), uint64(transferee))
|
||||
err := s.MoveLeader(ctx, s.Lead(), uint64(transferee))
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
|
@ -578,6 +578,8 @@ func (m *member) electionTimeout() time.Duration {
|
||||
return time.Duration(m.s.Cfg.ElectionTicks) * time.Millisecond
|
||||
}
|
||||
|
||||
func (m *member) ID() types.ID { return m.s.ID() }
|
||||
|
||||
func (m *member) DropConnections() { m.grpcBridge.Reset() }
|
||||
func (m *member) PauseConnections() { m.grpcBridge.Pause() }
|
||||
func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
|
||||
|
@ -526,51 +526,6 @@ func clusterMustProgress(t *testing.T, membs []*member) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTransferLeader(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
oldLeadIdx := clus.WaitLeader(t)
|
||||
oldLeadID := uint64(clus.Members[oldLeadIdx].s.ID())
|
||||
|
||||
// ensure followers go through leader transition while learship transfer
|
||||
idc := make(chan uint64)
|
||||
for i := range clus.Members {
|
||||
if oldLeadIdx != i {
|
||||
go func(m *member) {
|
||||
idc <- checkLeaderTransition(t, m, oldLeadID)
|
||||
}(clus.Members[i])
|
||||
}
|
||||
}
|
||||
|
||||
err := clus.Members[oldLeadIdx].s.TransferLeadership()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// wait until leader transitions have happened
|
||||
var newLeadIDs [2]uint64
|
||||
for i := range newLeadIDs {
|
||||
select {
|
||||
case newLeadIDs[i] = <-idc:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timed out waiting for leader transition")
|
||||
}
|
||||
}
|
||||
|
||||
// remaining members must agree on the same leader
|
||||
if newLeadIDs[0] != newLeadIDs[1] {
|
||||
t.Fatalf("expected same new leader %d == %d", newLeadIDs[0], newLeadIDs[1])
|
||||
}
|
||||
|
||||
// new leader must be different than the old leader
|
||||
if oldLeadID == newLeadIDs[0] {
|
||||
t.Fatalf("expected old leader %d != new leader %d", oldLeadID, newLeadIDs[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestSpeedyTerminate(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||
|
108
integration/v3_leadership_test.go
Normal file
108
integration/v3_leadership_test.go
Normal file
@ -0,0 +1,108 @@
|
||||
// Copyright 2017 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/pkg/testutil"
|
||||
)
|
||||
|
||||
func TestMoveLeader(t *testing.T) { testMoveLeader(t, true) }
|
||||
func TestMoveLeaderService(t *testing.T) { testMoveLeader(t, false) }
|
||||
|
||||
func testMoveLeader(t *testing.T, auto bool) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
oldLeadIdx := clus.WaitLeader(t)
|
||||
oldLeadID := uint64(clus.Members[oldLeadIdx].s.ID())
|
||||
|
||||
// ensure followers go through leader transition while learship transfer
|
||||
idc := make(chan uint64)
|
||||
for i := range clus.Members {
|
||||
if oldLeadIdx != i {
|
||||
go func(m *member) {
|
||||
idc <- checkLeaderTransition(t, m, oldLeadID)
|
||||
}(clus.Members[i])
|
||||
}
|
||||
}
|
||||
|
||||
target := uint64(clus.Members[(oldLeadIdx+1)%3].s.ID())
|
||||
if auto {
|
||||
err := clus.Members[oldLeadIdx].s.TransferLeadership()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
} else {
|
||||
mvc := toGRPC(clus.Client(oldLeadIdx)).Maintenance
|
||||
_, err := mvc.MoveLeader(context.TODO(), &pb.MoveLeaderRequest{TargetID: target})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// wait until leader transitions have happened
|
||||
var newLeadIDs [2]uint64
|
||||
for i := range newLeadIDs {
|
||||
select {
|
||||
case newLeadIDs[i] = <-idc:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("timed out waiting for leader transition")
|
||||
}
|
||||
}
|
||||
|
||||
// remaining members must agree on the same leader
|
||||
if newLeadIDs[0] != newLeadIDs[1] {
|
||||
t.Fatalf("expected same new leader %d == %d", newLeadIDs[0], newLeadIDs[1])
|
||||
}
|
||||
|
||||
// new leader must be different than the old leader
|
||||
if oldLeadID == newLeadIDs[0] {
|
||||
t.Fatalf("expected old leader %d != new leader %d", oldLeadID, newLeadIDs[0])
|
||||
}
|
||||
|
||||
// if move-leader were used, new leader must match transferee
|
||||
if !auto {
|
||||
if newLeadIDs[0] != target {
|
||||
t.Fatalf("expected new leader %d != target %d", newLeadIDs[0], target)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestMoveLeaderError ensures that request to non-leader fail.
|
||||
func TestMoveLeaderError(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
oldLeadIdx := clus.WaitLeader(t)
|
||||
followerIdx := (oldLeadIdx + 1) % 3
|
||||
|
||||
target := uint64(clus.Members[(oldLeadIdx+2)%3].s.ID())
|
||||
|
||||
mvc := toGRPC(clus.Client(followerIdx)).Maintenance
|
||||
_, err := mvc.MoveLeader(context.TODO(), &pb.MoveLeaderRequest{TargetID: target})
|
||||
if !eqErrGRPC(err, rpctypes.ErrGRPCNotLeader) {
|
||||
t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCNotLeader)
|
||||
}
|
||||
}
|
@ -43,6 +43,10 @@ func (s *mts2mtc) Hash(ctx context.Context, r *pb.HashRequest, opts ...grpc.Call
|
||||
return s.mts.Hash(ctx, r)
|
||||
}
|
||||
|
||||
func (s *mts2mtc) MoveLeader(ctx context.Context, r *pb.MoveLeaderRequest, opts ...grpc.CallOption) (*pb.MoveLeaderResponse, error) {
|
||||
return s.mts.MoveLeader(ctx, r)
|
||||
}
|
||||
|
||||
func (s *mts2mtc) Snapshot(ctx context.Context, in *pb.SnapshotRequest, opts ...grpc.CallOption) (pb.Maintenance_SnapshotClient, error) {
|
||||
cs := newPipeStream(ctx, func(ss chanServerStream) error {
|
||||
return s.mts.Snapshot(in, &ss2scServerStream{ss})
|
||||
|
@ -72,3 +72,8 @@ func (mp *maintenanceProxy) Status(ctx context.Context, r *pb.StatusRequest) (*p
|
||||
conn := mp.client.ActiveConnection()
|
||||
return pb.NewMaintenanceClient(conn).Status(ctx, r)
|
||||
}
|
||||
|
||||
func (mp *maintenanceProxy) MoveLeader(ctx context.Context, r *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
|
||||
conn := mp.client.ActiveConnection()
|
||||
return pb.NewMaintenanceClient(conn).MoveLeader(ctx, r)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user