mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #10725 from jingyih/learner_part1
*: support raft learner in etcd - part 1
This commit is contained in:
commit
919b93b742
@ -37,6 +37,7 @@ This is a generated documentation. Please read the proto files for more.
|
||||
| MemberRemove | MemberRemoveRequest | MemberRemoveResponse | MemberRemove removes an existing member from the cluster. |
|
||||
| MemberUpdate | MemberUpdateRequest | MemberUpdateResponse | MemberUpdate updates the member configuration. |
|
||||
| MemberList | MemberListRequest | MemberListResponse | MemberList lists all the members in the cluster. |
|
||||
| MemberPromote | MemberPromoteRequest | MemberPromoteResponse | MemberPromote promotes a member from raft learner (non-voting) to raft voting member. |
|
||||
|
||||
|
||||
|
||||
@ -609,6 +610,7 @@ Empty field.
|
||||
| name | name is the human-readable name of the member. If the member is not started, the name will be an empty string. | string |
|
||||
| peerURLs | peerURLs is the list of URLs the member exposes to the cluster for communication. | (slice of) string |
|
||||
| clientURLs | clientURLs is the list of URLs the member exposes to clients for communication. If the member is not started, clientURLs will be empty. | (slice of) string |
|
||||
| isLearner | isLearner indicates if the member is raft learner. | bool |
|
||||
|
||||
|
||||
|
||||
@ -617,6 +619,7 @@ Empty field.
|
||||
| Field | Description | Type |
|
||||
| ----- | ----------- | ---- |
|
||||
| peerURLs | peerURLs is the list of URLs the added member will use to communicate with the cluster. | (slice of) string |
|
||||
| isLearner | isLearner indicates if the added member is raft learner. | bool |
|
||||
|
||||
|
||||
|
||||
@ -645,6 +648,23 @@ Empty field.
|
||||
|
||||
|
||||
|
||||
##### message `MemberPromoteRequest` (etcdserver/etcdserverpb/rpc.proto)
|
||||
|
||||
| Field | Description | Type |
|
||||
| ----- | ----------- | ---- |
|
||||
| ID | ID is the member ID of the member to promote. | uint64 |
|
||||
|
||||
|
||||
|
||||
##### message `MemberPromoteResponse` (etcdserver/etcdserverpb/rpc.proto)
|
||||
|
||||
| Field | Description | Type |
|
||||
| ----- | ----------- | ---- |
|
||||
| header | | ResponseHeader |
|
||||
| members | members is a list of all members after promoting the member. | (slice of) Member |
|
||||
|
||||
|
||||
|
||||
##### message `MemberRemoveRequest` (etcdserver/etcdserverpb/rpc.proto)
|
||||
|
||||
| Field | Description | Type |
|
||||
@ -819,6 +839,7 @@ Empty field.
|
||||
| raftAppliedIndex | raftAppliedIndex is the current raft applied index of the responding member. | uint64 |
|
||||
| errors | errors contains alarm/health information and status. | (slice of) string |
|
||||
| dbSizeInUse | dbSizeInUse is the size of the backend database logically in use, in bytes, of the responding member. | int64 |
|
||||
| isLearner | isLearner indicates if the member is raft learner. | bool |
|
||||
|
||||
|
||||
|
||||
|
@ -501,6 +501,33 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"/v3/cluster/member/promote": {
|
||||
"post": {
|
||||
"tags": [
|
||||
"Cluster"
|
||||
],
|
||||
"summary": "MemberPromote promotes a member from raft learner (non-voting) to raft voting member.",
|
||||
"operationId": "MemberPromote",
|
||||
"parameters": [
|
||||
{
|
||||
"name": "body",
|
||||
"in": "body",
|
||||
"required": true,
|
||||
"schema": {
|
||||
"$ref": "#/definitions/etcdserverpbMemberPromoteRequest"
|
||||
}
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "A successful response.",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/etcdserverpbMemberPromoteResponse"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/v3/cluster/member/remove": {
|
||||
"post": {
|
||||
"tags": [
|
||||
@ -820,7 +847,7 @@
|
||||
"200": {
|
||||
"description": "A successful response.(streaming responses)",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/etcdserverpbLeaseKeepAliveResponse"
|
||||
"$ref": "#/x-stream-definitions/etcdserverpbLeaseKeepAliveResponse"
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1009,7 +1036,7 @@
|
||||
"200": {
|
||||
"description": "A successful response.(streaming responses)",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/etcdserverpbSnapshotResponse"
|
||||
"$ref": "#/x-stream-definitions/etcdserverpbSnapshotResponse"
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1091,7 +1118,7 @@
|
||||
"200": {
|
||||
"description": "A successful response.(streaming responses)",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/etcdserverpbWatchResponse"
|
||||
"$ref": "#/x-stream-definitions/etcdserverpbWatchResponse"
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1882,6 +1909,11 @@
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"isLearner": {
|
||||
"description": "isLearner indicates if the member is raft learner.",
|
||||
"type": "boolean",
|
||||
"format": "boolean"
|
||||
},
|
||||
"name": {
|
||||
"description": "name is the human-readable name of the member. If the member is not started, the name will be an empty string.",
|
||||
"type": "string"
|
||||
@ -1898,6 +1930,11 @@
|
||||
"etcdserverpbMemberAddRequest": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"isLearner": {
|
||||
"description": "isLearner indicates if the added member is raft learner.",
|
||||
"type": "boolean",
|
||||
"format": "boolean"
|
||||
},
|
||||
"peerURLs": {
|
||||
"description": "peerURLs is the list of URLs the added member will use to communicate with the cluster.",
|
||||
"type": "array",
|
||||
@ -1944,6 +1981,31 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"etcdserverpbMemberPromoteRequest": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"ID": {
|
||||
"description": "ID is the member ID of the member to promote.",
|
||||
"type": "string",
|
||||
"format": "uint64"
|
||||
}
|
||||
}
|
||||
},
|
||||
"etcdserverpbMemberPromoteResponse": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"header": {
|
||||
"$ref": "#/definitions/etcdserverpbResponseHeader"
|
||||
},
|
||||
"members": {
|
||||
"description": "members is a list of all members after promoting the member.",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/etcdserverpbMember"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"etcdserverpbMemberRemoveRequest": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
@ -2266,6 +2328,11 @@
|
||||
"header": {
|
||||
"$ref": "#/definitions/etcdserverpbResponseHeader"
|
||||
},
|
||||
"isLearner": {
|
||||
"description": "isLearner indicates if the member is raft learner.",
|
||||
"type": "boolean",
|
||||
"format": "boolean"
|
||||
},
|
||||
"leader": {
|
||||
"description": "leader is the member ID which the responding member believes is the current leader.",
|
||||
"type": "string",
|
||||
@ -2508,6 +2575,43 @@
|
||||
"format": "int64"
|
||||
}
|
||||
}
|
||||
},
|
||||
"protobufAny": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type_url": {
|
||||
"type": "string"
|
||||
},
|
||||
"value": {
|
||||
"type": "string",
|
||||
"format": "byte"
|
||||
}
|
||||
}
|
||||
},
|
||||
"runtimeStreamError": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"details": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/protobufAny"
|
||||
}
|
||||
},
|
||||
"grpc_code": {
|
||||
"type": "integer",
|
||||
"format": "int32"
|
||||
},
|
||||
"http_code": {
|
||||
"type": "integer",
|
||||
"format": "int32"
|
||||
},
|
||||
"http_status": {
|
||||
"type": "string"
|
||||
},
|
||||
"message": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"securityDefinitions": {
|
||||
@ -2521,5 +2625,43 @@
|
||||
{
|
||||
"ApiKey": []
|
||||
}
|
||||
]
|
||||
],
|
||||
"x-stream-definitions": {
|
||||
"etcdserverpbLeaseKeepAliveResponse": {
|
||||
"properties": {
|
||||
"error": {
|
||||
"$ref": "#/definitions/runtimeStreamError"
|
||||
},
|
||||
"result": {
|
||||
"$ref": "#/definitions/etcdserverpbLeaseKeepAliveResponse"
|
||||
}
|
||||
},
|
||||
"title": "Stream result of etcdserverpbLeaseKeepAliveResponse",
|
||||
"type": "object"
|
||||
},
|
||||
"etcdserverpbSnapshotResponse": {
|
||||
"properties": {
|
||||
"error": {
|
||||
"$ref": "#/definitions/runtimeStreamError"
|
||||
},
|
||||
"result": {
|
||||
"$ref": "#/definitions/etcdserverpbSnapshotResponse"
|
||||
}
|
||||
},
|
||||
"title": "Stream result of etcdserverpbSnapshotResponse",
|
||||
"type": "object"
|
||||
},
|
||||
"etcdserverpbWatchResponse": {
|
||||
"properties": {
|
||||
"error": {
|
||||
"$ref": "#/definitions/runtimeStreamError"
|
||||
},
|
||||
"result": {
|
||||
"$ref": "#/definitions/etcdserverpbWatchResponse"
|
||||
}
|
||||
},
|
||||
"title": "Stream result of etcdserverpbWatchResponse",
|
||||
"type": "object"
|
||||
}
|
||||
}
|
||||
}
|
@ -77,7 +77,7 @@
|
||||
"200": {
|
||||
"description": "A successful response.(streaming responses)",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/v3electionpbLeaderResponse"
|
||||
"$ref": "#/x-stream-definitions/v3electionpbLeaderResponse"
|
||||
}
|
||||
}
|
||||
},
|
||||
@ -212,6 +212,43 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"protobufAny": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type_url": {
|
||||
"type": "string"
|
||||
},
|
||||
"value": {
|
||||
"type": "string",
|
||||
"format": "byte"
|
||||
}
|
||||
}
|
||||
},
|
||||
"runtimeStreamError": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"grpc_code": {
|
||||
"type": "integer",
|
||||
"format": "int32"
|
||||
},
|
||||
"http_code": {
|
||||
"type": "integer",
|
||||
"format": "int32"
|
||||
},
|
||||
"message": {
|
||||
"type": "string"
|
||||
},
|
||||
"http_status": {
|
||||
"type": "string"
|
||||
},
|
||||
"details": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/protobufAny"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"v3electionpbCampaignRequest": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
@ -330,5 +367,19 @@
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"x-stream-definitions": {
|
||||
"v3electionpbLeaderResponse": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"result": {
|
||||
"$ref": "#/definitions/v3electionpbLeaderResponse"
|
||||
},
|
||||
"error": {
|
||||
"$ref": "#/definitions/runtimeStreamError"
|
||||
}
|
||||
},
|
||||
"title": "Stream result of v3electionpbLeaderResponse"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ package clientv3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
|
||||
"go.etcd.io/etcd/v3/pkg/types"
|
||||
@ -24,11 +25,12 @@ import (
|
||||
)
|
||||
|
||||
type (
|
||||
Member pb.Member
|
||||
MemberListResponse pb.MemberListResponse
|
||||
MemberAddResponse pb.MemberAddResponse
|
||||
MemberRemoveResponse pb.MemberRemoveResponse
|
||||
MemberUpdateResponse pb.MemberUpdateResponse
|
||||
Member pb.Member
|
||||
MemberListResponse pb.MemberListResponse
|
||||
MemberAddResponse pb.MemberAddResponse
|
||||
MemberRemoveResponse pb.MemberRemoveResponse
|
||||
MemberUpdateResponse pb.MemberUpdateResponse
|
||||
MemberPromoteResponse pb.MemberPromoteResponse
|
||||
)
|
||||
|
||||
type Cluster interface {
|
||||
@ -38,11 +40,17 @@ type Cluster interface {
|
||||
// MemberAdd adds a new member into the cluster.
|
||||
MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)
|
||||
|
||||
// MemberAddAsLearner adds a new learner member into the cluster.
|
||||
MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error)
|
||||
|
||||
// MemberRemove removes an existing member from the cluster.
|
||||
MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error)
|
||||
|
||||
// MemberUpdate updates the peer addresses of the member.
|
||||
MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error)
|
||||
|
||||
// MemberPromote promotes a member from raft learner (non-voting) to raft voting member.
|
||||
MemberPromote(ctx context.Context, id uint64) (*MemberPromoteResponse, error)
|
||||
}
|
||||
|
||||
type cluster struct {
|
||||
@ -67,12 +75,23 @@ func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster {
|
||||
}
|
||||
|
||||
func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
|
||||
return c.memberAdd(ctx, peerAddrs, false)
|
||||
}
|
||||
|
||||
func (c *cluster) MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) {
|
||||
return c.memberAdd(ctx, peerAddrs, true)
|
||||
}
|
||||
|
||||
func (c *cluster) memberAdd(ctx context.Context, peerAddrs []string, isLearner bool) (*MemberAddResponse, error) {
|
||||
// fail-fast before panic in rafthttp
|
||||
if _, err := types.NewURLs(peerAddrs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r := &pb.MemberAddRequest{PeerURLs: peerAddrs}
|
||||
r := &pb.MemberAddRequest{
|
||||
PeerURLs: peerAddrs,
|
||||
IsLearner: isLearner,
|
||||
}
|
||||
resp, err := c.remote.MemberAdd(ctx, r, c.callOpts...)
|
||||
if err != nil {
|
||||
return nil, toErr(ctx, err)
|
||||
@ -112,3 +131,8 @@ func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
|
||||
}
|
||||
return nil, toErr(ctx, err)
|
||||
}
|
||||
|
||||
func (c *cluster) MemberPromote(ctx context.Context, id uint64) (*MemberPromoteResponse, error) {
|
||||
// TODO: implement
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
@ -59,6 +59,27 @@ func ExampleCluster_memberAdd() {
|
||||
// added member.PeerURLs: [http://localhost:32380]
|
||||
}
|
||||
|
||||
func ExampleCluster_memberAddAsLearner() {
|
||||
cli, err := clientv3.New(clientv3.Config{
|
||||
Endpoints: endpoints[:2],
|
||||
DialTimeout: dialTimeout,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer cli.Close()
|
||||
|
||||
peerURLs := endpoints[2:]
|
||||
mresp, err := cli.MemberAddAsLearner(context.Background(), peerURLs)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
fmt.Println("added member.PeerURLs:", mresp.Member.PeerURLs)
|
||||
fmt.Println("added member.IsLearner:", mresp.Member.IsLearner)
|
||||
// added member.PeerURLs: [http://localhost:32380]
|
||||
// added member.IsLearner: true
|
||||
}
|
||||
|
||||
func ExampleCluster_memberRemove() {
|
||||
cli, err := clientv3.New(clientv3.Config{
|
||||
Endpoints: endpoints[1:],
|
||||
|
@ -16,6 +16,7 @@ package integration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
@ -184,3 +185,46 @@ func TestMemberAddUpdateWrongURLs(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemberAddForLearner(t *testing.T) {
|
||||
defer testutil.AfterTest(t)
|
||||
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
capi := clus.RandClient()
|
||||
|
||||
urls := []string{"http://127.0.0.1:1234"}
|
||||
resp, err := capi.MemberAddAsLearner(context.Background(), urls)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to add member %v", err)
|
||||
}
|
||||
|
||||
if !resp.Member.IsLearner {
|
||||
t.Errorf("Added a member as learner, got resp.Member.IsLearner = %v", resp.Member.IsLearner)
|
||||
}
|
||||
|
||||
numOfLearners, err := getNumberOfLearners(clus)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get the number of learners in cluster: %v", err)
|
||||
}
|
||||
if numOfLearners != 1 {
|
||||
t.Errorf("Added 1 learner node to cluster, got %d", numOfLearners)
|
||||
}
|
||||
}
|
||||
|
||||
// getNumberOfLearners return the number of learner nodes in cluster using MemberList API
|
||||
func getNumberOfLearners(clus *integration.ClusterV3) (int, error) {
|
||||
cli := clus.RandClient()
|
||||
resp, err := cli.MemberList(context.Background())
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to list member %v", err)
|
||||
}
|
||||
numberOfLearners := 0
|
||||
for _, m := range resp.Members {
|
||||
if m.IsLearner {
|
||||
numberOfLearners++
|
||||
}
|
||||
}
|
||||
return numberOfLearners, nil
|
||||
}
|
||||
|
@ -183,6 +183,10 @@ func (rcc *retryClusterClient) MemberUpdate(ctx context.Context, in *pb.MemberUp
|
||||
return rcc.cc.MemberUpdate(ctx, in, opts...)
|
||||
}
|
||||
|
||||
func (rcc *retryClusterClient) MemberPromote(ctx context.Context, in *pb.MemberPromoteRequest, opts ...grpc.CallOption) (resp *pb.MemberPromoteResponse, err error) {
|
||||
return rcc.cc.MemberPromote(ctx, in, opts...)
|
||||
}
|
||||
|
||||
type retryMaintenanceClient struct {
|
||||
mc pb.MaintenanceClient
|
||||
}
|
||||
|
@ -21,9 +21,13 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"go.etcd.io/etcd/v3/clientv3"
|
||||
)
|
||||
|
||||
var memberPeerURLs string
|
||||
var (
|
||||
memberPeerURLs string
|
||||
isLearner bool
|
||||
)
|
||||
|
||||
// NewMemberCommand returns the cobra command for "member".
|
||||
func NewMemberCommand() *cobra.Command {
|
||||
@ -50,6 +54,7 @@ func NewMemberAddCommand() *cobra.Command {
|
||||
}
|
||||
|
||||
cc.Flags().StringVar(&memberPeerURLs, "peer-urls", "", "comma separated peer URLs for the new member.")
|
||||
cc.Flags().BoolVar(&isLearner, "learner", false, "indicates if the new member is raft learner")
|
||||
|
||||
return cc
|
||||
}
|
||||
@ -86,7 +91,7 @@ func NewMemberListCommand() *cobra.Command {
|
||||
Use: "list",
|
||||
Short: "Lists all members in the cluster",
|
||||
Long: `When --write-out is set to simple, this command prints out comma-separated member lists for each endpoint.
|
||||
The items in the lists are ID, Status, Name, Peer Addrs, Client Addrs.
|
||||
The items in the lists are ID, Status, Name, Peer Addrs, Client Addrs, Is Learner.
|
||||
`,
|
||||
|
||||
Run: memberListCommandFunc,
|
||||
@ -118,7 +123,15 @@ func memberAddCommandFunc(cmd *cobra.Command, args []string) {
|
||||
urls := strings.Split(memberPeerURLs, ",")
|
||||
ctx, cancel := commandCtx(cmd)
|
||||
cli := mustClientFromCmd(cmd)
|
||||
resp, err := cli.MemberAdd(ctx, urls)
|
||||
var (
|
||||
resp *clientv3.MemberAddResponse
|
||||
err error
|
||||
)
|
||||
if isLearner {
|
||||
resp, err = cli.MemberAddAsLearner(ctx, urls)
|
||||
} else {
|
||||
resp, err = cli.MemberAdd(ctx, urls)
|
||||
}
|
||||
cancel()
|
||||
if err != nil {
|
||||
ExitWithError(ExitError, err)
|
||||
|
@ -158,18 +158,23 @@ func (p *printerUnsupported) DBStatus(snapshot.Status) { p.p(nil) }
|
||||
func (p *printerUnsupported) MoveLeader(leader, target uint64, r v3.MoveLeaderResponse) { p.p(nil) }
|
||||
|
||||
func makeMemberListTable(r v3.MemberListResponse) (hdr []string, rows [][]string) {
|
||||
hdr = []string{"ID", "Status", "Name", "Peer Addrs", "Client Addrs"}
|
||||
hdr = []string{"ID", "Status", "Name", "Peer Addrs", "Client Addrs", "Is Learner"}
|
||||
for _, m := range r.Members {
|
||||
status := "started"
|
||||
if len(m.Name) == 0 {
|
||||
status = "unstarted"
|
||||
}
|
||||
isLearner := "false"
|
||||
if m.IsLearner {
|
||||
isLearner = "true"
|
||||
}
|
||||
rows = append(rows, []string{
|
||||
fmt.Sprintf("%x", m.ID),
|
||||
status,
|
||||
m.Name,
|
||||
strings.Join(m.PeerURLs, ","),
|
||||
strings.Join(m.ClientURLs, ","),
|
||||
isLearner,
|
||||
})
|
||||
}
|
||||
return hdr, rows
|
||||
|
@ -137,6 +137,7 @@ func (p *fieldsPrinter) MemberList(r v3.MemberListResponse) {
|
||||
for _, u := range m.ClientURLs {
|
||||
fmt.Printf("\"ClientURL\" : %q\n", u)
|
||||
}
|
||||
fmt.Println(`"IsLearner" :`, m.IsLearner)
|
||||
fmt.Println()
|
||||
}
|
||||
}
|
||||
|
@ -59,6 +59,8 @@ type RaftCluster struct {
|
||||
removed map[types.ID]bool
|
||||
}
|
||||
|
||||
// NewClusterFromURLsMap creates a new raft cluster using provided urls map. Currently, it does not support creating
|
||||
// cluster with raft learner member.
|
||||
func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) (*RaftCluster, error) {
|
||||
c := NewCluster(lg, token)
|
||||
for name, urls := range urlsmap {
|
||||
@ -259,7 +261,7 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
|
||||
return ErrIDRemoved
|
||||
}
|
||||
switch cc.Type {
|
||||
case raftpb.ConfChangeAddNode:
|
||||
case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
|
||||
if members[id] != nil {
|
||||
return ErrIDExists
|
||||
}
|
||||
|
@ -472,6 +472,29 @@ func TestClusterAddMember(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestClusterAddMemberAsLearner(t *testing.T) {
|
||||
st := mockstore.NewRecorder()
|
||||
c := newTestCluster(nil)
|
||||
c.SetStore(st)
|
||||
c.AddMember(newTestMemberAsLearner(1, nil, "node1", nil))
|
||||
|
||||
wactions := []testutil.Action{
|
||||
{
|
||||
Name: "Create",
|
||||
Params: []interface{}{
|
||||
path.Join(StoreMembersPrefix, "1", "raftAttributes"),
|
||||
false,
|
||||
`{"peerURLs":null,"isLearner":true}`,
|
||||
false,
|
||||
v2store.TTLOptionSet{ExpireTime: v2store.Permanent},
|
||||
},
|
||||
},
|
||||
}
|
||||
if g := st.Action(); !reflect.DeepEqual(g, wactions) {
|
||||
t.Errorf("actions = %v, want %v", g, wactions)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClusterMembers(t *testing.T) {
|
||||
cls := &RaftCluster{
|
||||
members: map[types.ID]*Member{
|
||||
|
@ -35,6 +35,8 @@ type RaftAttributes struct {
|
||||
// PeerURLs is the list of peers in the raft cluster.
|
||||
// TODO(philips): ensure these are URLs
|
||||
PeerURLs []string `json:"peerURLs"`
|
||||
// IsLearner indicates if the member is raft learner.
|
||||
IsLearner bool `json:"isLearner,omitempty"`
|
||||
}
|
||||
|
||||
// Attributes represents all the non-raft related attributes of an etcd member.
|
||||
@ -52,9 +54,22 @@ type Member struct {
|
||||
// NewMember creates a Member without an ID and generates one based on the
|
||||
// cluster name, peer URLs, and time. This is used for bootstrapping/adding new member.
|
||||
func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member {
|
||||
return newMember(name, peerURLs, clusterName, now, false)
|
||||
}
|
||||
|
||||
// NewMemberAsLearner creates a learner Member without an ID and generates one based on the
|
||||
// cluster name, peer URLs, and time. This is used for adding new learner member.
|
||||
func NewMemberAsLearner(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member {
|
||||
return newMember(name, peerURLs, clusterName, now, true)
|
||||
}
|
||||
|
||||
func newMember(name string, peerURLs types.URLs, clusterName string, now *time.Time, isLearner bool) *Member {
|
||||
m := &Member{
|
||||
RaftAttributes: RaftAttributes{PeerURLs: peerURLs.StringSlice()},
|
||||
Attributes: Attributes{Name: name},
|
||||
RaftAttributes: RaftAttributes{
|
||||
PeerURLs: peerURLs.StringSlice(),
|
||||
IsLearner: isLearner,
|
||||
},
|
||||
Attributes: Attributes{Name: name},
|
||||
}
|
||||
|
||||
var b []byte
|
||||
@ -88,6 +103,9 @@ func (m *Member) Clone() *Member {
|
||||
}
|
||||
mm := &Member{
|
||||
ID: m.ID,
|
||||
RaftAttributes: RaftAttributes{
|
||||
IsLearner: m.IsLearner,
|
||||
},
|
||||
Attributes: Attributes{
|
||||
Name: m.Name,
|
||||
},
|
||||
|
@ -113,3 +113,11 @@ func newTestMember(id uint64, peerURLs []string, name string, clientURLs []strin
|
||||
Attributes: Attributes{Name: name, ClientURLs: clientURLs},
|
||||
}
|
||||
}
|
||||
|
||||
func newTestMemberAsLearner(id uint64, peerURLs []string, name string, clientURLs []string) *Member {
|
||||
return &Member{
|
||||
ID: types.ID(id),
|
||||
RaftAttributes: RaftAttributes{PeerURLs: peerURLs, IsLearner: true},
|
||||
Attributes: Attributes{Name: name, ClientURLs: clientURLs},
|
||||
}
|
||||
}
|
||||
|
@ -63,6 +63,7 @@ func (s *v2v3Server) Leader() types.ID {
|
||||
}
|
||||
|
||||
func (s *v2v3Server) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
|
||||
// adding member as learner is not supported by V2 Server.
|
||||
resp, err := s.c.MemberAdd(ctx, memb.PeerURLs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -92,7 +93,8 @@ func v3MembersToMembership(v3membs []*pb.Member) []*membership.Member {
|
||||
membs[i] = &membership.Member{
|
||||
ID: types.ID(m.ID),
|
||||
RaftAttributes: membership.RaftAttributes{
|
||||
PeerURLs: m.PeerURLs,
|
||||
PeerURLs: m.PeerURLs,
|
||||
IsLearner: m.IsLearner,
|
||||
},
|
||||
Attributes: membership.Attributes{
|
||||
Name: m.Name,
|
||||
|
@ -16,6 +16,7 @@ package v3rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/v3/etcdserver"
|
||||
@ -45,15 +46,24 @@ func (cs *ClusterServer) MemberAdd(ctx context.Context, r *pb.MemberAddRequest)
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
m := membership.NewMember("", urls, "", &now)
|
||||
var m *membership.Member
|
||||
if r.IsLearner {
|
||||
m = membership.NewMemberAsLearner("", urls, "", &now)
|
||||
} else {
|
||||
m = membership.NewMember("", urls, "", &now)
|
||||
}
|
||||
membs, merr := cs.server.AddMember(ctx, *m)
|
||||
if merr != nil {
|
||||
return nil, togRPCError(merr)
|
||||
}
|
||||
|
||||
return &pb.MemberAddResponse{
|
||||
Header: cs.header(),
|
||||
Member: &pb.Member{ID: uint64(m.ID), PeerURLs: m.PeerURLs},
|
||||
Header: cs.header(),
|
||||
Member: &pb.Member{
|
||||
ID: uint64(m.ID),
|
||||
PeerURLs: m.PeerURLs,
|
||||
IsLearner: m.IsLearner,
|
||||
},
|
||||
Members: membersToProtoMembers(membs),
|
||||
}, nil
|
||||
}
|
||||
@ -83,6 +93,11 @@ func (cs *ClusterServer) MemberList(ctx context.Context, r *pb.MemberListRequest
|
||||
return &pb.MemberListResponse{Header: cs.header(), Members: membs}, nil
|
||||
}
|
||||
|
||||
func (cs *ClusterServer) MemberPromote(ctx context.Context, r *pb.MemberPromoteRequest) (*pb.MemberPromoteResponse, error) {
|
||||
// TODO: implement
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (cs *ClusterServer) header() *pb.ResponseHeader {
|
||||
return &pb.ResponseHeader{ClusterId: uint64(cs.cluster.ID()), MemberId: uint64(cs.server.ID()), RaftTerm: cs.server.Term()}
|
||||
}
|
||||
@ -95,6 +110,7 @@ func membersToProtoMembers(membs []*membership.Member) []*pb.Member {
|
||||
ID: uint64(membs[i].ID),
|
||||
PeerURLs: membs[i].PeerURLs,
|
||||
ClientURLs: membs[i].ClientURLs,
|
||||
IsLearner: membs[i].IsLearner,
|
||||
}
|
||||
}
|
||||
return protoMembs
|
||||
|
@ -64,6 +64,8 @@
|
||||
MemberUpdateResponse
|
||||
MemberListRequest
|
||||
MemberListResponse
|
||||
MemberPromoteRequest
|
||||
MemberPromoteResponse
|
||||
DefragmentRequest
|
||||
DefragmentResponse
|
||||
MoveLeaderRequest
|
||||
|
@ -341,6 +341,19 @@ func request_Cluster_MemberList_0(ctx context.Context, marshaler runtime.Marshal
|
||||
|
||||
}
|
||||
|
||||
func request_Cluster_MemberPromote_0(ctx context.Context, marshaler runtime.Marshaler, client etcdserverpb.ClusterClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
|
||||
var protoReq etcdserverpb.MemberPromoteRequest
|
||||
var metadata runtime.ServerMetadata
|
||||
|
||||
if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && err != io.EOF {
|
||||
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
|
||||
}
|
||||
|
||||
msg, err := client.MemberPromote(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
|
||||
return msg, metadata, err
|
||||
|
||||
}
|
||||
|
||||
func request_Maintenance_Alarm_0(ctx context.Context, marshaler runtime.Marshaler, client etcdserverpb.MaintenanceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
|
||||
var protoReq etcdserverpb.AlarmRequest
|
||||
var metadata runtime.ServerMetadata
|
||||
@ -1399,6 +1412,35 @@ func RegisterClusterHandlerClient(ctx context.Context, mux *runtime.ServeMux, cl
|
||||
|
||||
})
|
||||
|
||||
mux.Handle("POST", pattern_Cluster_MemberPromote_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
|
||||
ctx, cancel := context.WithCancel(req.Context())
|
||||
defer cancel()
|
||||
if cn, ok := w.(http.CloseNotifier); ok {
|
||||
go func(done <-chan struct{}, closed <-chan bool) {
|
||||
select {
|
||||
case <-done:
|
||||
case <-closed:
|
||||
cancel()
|
||||
}
|
||||
}(ctx.Done(), cn.CloseNotify())
|
||||
}
|
||||
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
|
||||
rctx, err := runtime.AnnotateContext(ctx, mux, req)
|
||||
if err != nil {
|
||||
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
|
||||
return
|
||||
}
|
||||
resp, md, err := request_Cluster_MemberPromote_0(rctx, inboundMarshaler, client, req, pathParams)
|
||||
ctx = runtime.NewServerMetadataContext(ctx, md)
|
||||
if err != nil {
|
||||
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
|
||||
return
|
||||
}
|
||||
|
||||
forward_Cluster_MemberPromote_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
|
||||
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1410,6 +1452,8 @@ var (
|
||||
pattern_Cluster_MemberUpdate_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"v3", "cluster", "member", "update"}, ""))
|
||||
|
||||
pattern_Cluster_MemberList_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"v3", "cluster", "member", "list"}, ""))
|
||||
|
||||
pattern_Cluster_MemberPromote_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"v3", "cluster", "member", "promote"}, ""))
|
||||
)
|
||||
|
||||
var (
|
||||
@ -1420,6 +1464,8 @@ var (
|
||||
forward_Cluster_MemberUpdate_0 = runtime.ForwardResponseMessage
|
||||
|
||||
forward_Cluster_MemberList_0 = runtime.ForwardResponseMessage
|
||||
|
||||
forward_Cluster_MemberPromote_0 = runtime.ForwardResponseMessage
|
||||
)
|
||||
|
||||
// RegisterMaintenanceHandlerFromEndpoint is same as RegisterMaintenanceHandler but
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -165,6 +165,14 @@ service Cluster {
|
||||
body: "*"
|
||||
};
|
||||
}
|
||||
|
||||
// MemberPromote promotes a member from raft learner (non-voting) to raft voting member.
|
||||
rpc MemberPromote(MemberPromoteRequest) returns (MemberPromoteResponse) {
|
||||
option (google.api.http) = {
|
||||
post: "/v3/cluster/member/promote"
|
||||
body: "*"
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
service Maintenance {
|
||||
@ -846,11 +854,15 @@ message Member {
|
||||
repeated string peerURLs = 3;
|
||||
// clientURLs is the list of URLs the member exposes to clients for communication. If the member is not started, clientURLs will be empty.
|
||||
repeated string clientURLs = 4;
|
||||
// isLearner indicates if the member is raft learner.
|
||||
bool isLearner = 5;
|
||||
}
|
||||
|
||||
message MemberAddRequest {
|
||||
// peerURLs is the list of URLs the added member will use to communicate with the cluster.
|
||||
repeated string peerURLs = 1;
|
||||
// isLearner indicates if the added member is raft learner.
|
||||
bool isLearner = 2;
|
||||
}
|
||||
|
||||
message MemberAddResponse {
|
||||
@ -894,6 +906,17 @@ message MemberListResponse {
|
||||
repeated Member members = 2;
|
||||
}
|
||||
|
||||
message MemberPromoteRequest {
|
||||
// ID is the member ID of the member to promote.
|
||||
uint64 ID = 1;
|
||||
}
|
||||
|
||||
message MemberPromoteResponse {
|
||||
ResponseHeader header = 1;
|
||||
// members is a list of all members after promoting the member.
|
||||
repeated Member members = 2;
|
||||
}
|
||||
|
||||
message DefragmentRequest {
|
||||
}
|
||||
|
||||
@ -967,6 +990,8 @@ message StatusResponse {
|
||||
repeated string errors = 8;
|
||||
// dbSizeInUse is the size of the backend database logically in use, in bytes, of the responding member.
|
||||
int64 dbSizeInUse = 9;
|
||||
// isLearner indicates if the member is raft learner.
|
||||
bool isLearner = 10;
|
||||
}
|
||||
|
||||
message AuthEnableRequest {
|
||||
|
@ -1544,6 +1544,7 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) ([]*
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: might switch to less strict check when adding raft learner
|
||||
if s.Cfg.StrictReconfigCheck {
|
||||
// by default StrictReconfigCheck is enabled; reject new members if unhealthy
|
||||
if !s.cluster.IsReadyToAddNewMember() {
|
||||
@ -1585,6 +1586,11 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) ([]*
|
||||
NodeID: uint64(memb.ID),
|
||||
Context: b,
|
||||
}
|
||||
|
||||
if memb.IsLearner {
|
||||
cc.Type = raftpb.ConfChangeAddLearnerNode
|
||||
}
|
||||
|
||||
return s.configure(ctx, cc)
|
||||
}
|
||||
|
||||
@ -2054,7 +2060,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
||||
lg := s.getLogger()
|
||||
*confState = *s.r.ApplyConfChange(cc)
|
||||
switch cc.Type {
|
||||
case raftpb.ConfChangeAddNode:
|
||||
case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
|
||||
m := new(membership.Member)
|
||||
if err := json.Unmarshal(cc.Context, m); err != nil {
|
||||
if lg != nil {
|
||||
|
@ -43,3 +43,7 @@ func (s *cls2clc) MemberUpdate(ctx context.Context, r *pb.MemberUpdateRequest, o
|
||||
func (s *cls2clc) MemberRemove(ctx context.Context, r *pb.MemberRemoveRequest, opts ...grpc.CallOption) (*pb.MemberRemoveResponse, error) {
|
||||
return s.cls.MemberRemove(ctx, r)
|
||||
}
|
||||
|
||||
func (s *cls2clc) MemberPromote(ctx context.Context, r *pb.MemberPromoteRequest, opts ...grpc.CallOption) (*pb.MemberPromoteResponse, error) {
|
||||
return s.cls.MemberPromote(ctx, r)
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ package grpcproxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
@ -108,7 +109,23 @@ func (cp *clusterProxy) monitor(wa gnaming.Watcher) {
|
||||
}
|
||||
|
||||
func (cp *clusterProxy) MemberAdd(ctx context.Context, r *pb.MemberAddRequest) (*pb.MemberAddResponse, error) {
|
||||
mresp, err := cp.clus.MemberAdd(ctx, r.PeerURLs)
|
||||
if r.IsLearner {
|
||||
return cp.memberAddAsLearner(ctx, r.PeerURLs)
|
||||
}
|
||||
return cp.memberAdd(ctx, r.PeerURLs)
|
||||
}
|
||||
|
||||
func (cp *clusterProxy) memberAdd(ctx context.Context, peerURLs []string) (*pb.MemberAddResponse, error) {
|
||||
mresp, err := cp.clus.MemberAdd(ctx, peerURLs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp := (pb.MemberAddResponse)(*mresp)
|
||||
return &resp, err
|
||||
}
|
||||
|
||||
func (cp *clusterProxy) memberAddAsLearner(ctx context.Context, peerURLs []string) (*pb.MemberAddResponse, error) {
|
||||
mresp, err := cp.clus.MemberAddAsLearner(ctx, peerURLs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -175,3 +192,8 @@ func (cp *clusterProxy) MemberList(ctx context.Context, r *pb.MemberListRequest)
|
||||
resp := (pb.MemberListResponse)(*mresp)
|
||||
return &resp, err
|
||||
}
|
||||
|
||||
func (cp *clusterProxy) MemberPromote(ctx context.Context, r *pb.MemberPromoteRequest) (*pb.MemberPromoteResponse, error) {
|
||||
// TODO: implement
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
@ -510,13 +510,13 @@ func authTestMemberAdd(cx ctlCtx) {
|
||||
peerURL := fmt.Sprintf("http://localhost:%d", etcdProcessBasePort+11)
|
||||
// ordinary user cannot add a new member
|
||||
cx.user, cx.pass = "test-user", "pass"
|
||||
if err := ctlV3MemberAdd(cx, peerURL); err == nil {
|
||||
if err := ctlV3MemberAdd(cx, peerURL, false); err == nil {
|
||||
cx.t.Fatalf("ordinary user must not be allowed to add a member")
|
||||
}
|
||||
|
||||
// root can add a new member
|
||||
cx.user, cx.pass = "root", "root"
|
||||
if err := ctlV3MemberAdd(cx, peerURL); err != nil {
|
||||
if err := ctlV3MemberAdd(cx, peerURL, false); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -59,9 +59,10 @@ func TestCtlV3MemberAddClientTLS(t *testing.T) { testCtl(t, memberAddTest, withC
|
||||
func TestCtlV3MemberAddClientAutoTLS(t *testing.T) {
|
||||
testCtl(t, memberAddTest, withCfg(configClientAutoTLS))
|
||||
}
|
||||
func TestCtlV3MemberAddPeerTLS(t *testing.T) { testCtl(t, memberAddTest, withCfg(configPeerTLS)) }
|
||||
func TestCtlV3MemberUpdate(t *testing.T) { testCtl(t, memberUpdateTest) }
|
||||
func TestCtlV3MemberUpdateNoTLS(t *testing.T) { testCtl(t, memberUpdateTest, withCfg(configNoTLS)) }
|
||||
func TestCtlV3MemberAddPeerTLS(t *testing.T) { testCtl(t, memberAddTest, withCfg(configPeerTLS)) }
|
||||
func TestCtlV3MemberAddForLearner(t *testing.T) { testCtl(t, memberAddForLearnerTest) }
|
||||
func TestCtlV3MemberUpdate(t *testing.T) { testCtl(t, memberUpdateTest) }
|
||||
func TestCtlV3MemberUpdateNoTLS(t *testing.T) { testCtl(t, memberUpdateTest, withCfg(configNoTLS)) }
|
||||
func TestCtlV3MemberUpdateClientTLS(t *testing.T) {
|
||||
testCtl(t, memberUpdateTest, withCfg(configClientTLS))
|
||||
}
|
||||
@ -122,13 +123,22 @@ func ctlV3MemberRemove(cx ctlCtx, ep, memberID, clusterID string) error {
|
||||
}
|
||||
|
||||
func memberAddTest(cx ctlCtx) {
|
||||
if err := ctlV3MemberAdd(cx, fmt.Sprintf("http://localhost:%d", etcdProcessBasePort+11)); err != nil {
|
||||
if err := ctlV3MemberAdd(cx, fmt.Sprintf("http://localhost:%d", etcdProcessBasePort+11), false); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func ctlV3MemberAdd(cx ctlCtx, peerURL string) error {
|
||||
func memberAddForLearnerTest(cx ctlCtx) {
|
||||
if err := ctlV3MemberAdd(cx, fmt.Sprintf("http://localhost:%d", etcdProcessBasePort+11), true); err != nil {
|
||||
cx.t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func ctlV3MemberAdd(cx ctlCtx, peerURL string, isLearner bool) error {
|
||||
cmdArgs := append(cx.PrefixArgs(), "member", "add", "newmember", fmt.Sprintf("--peer-urls=%s", peerURL))
|
||||
if isLearner {
|
||||
cmdArgs = append(cmdArgs, "--learner")
|
||||
}
|
||||
return spawnWithExpect(cmdArgs, " added to cluster ")
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user