functional-tester/rpcpb: initial commit

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyuho Lee 2018-03-27 17:01:18 -07:00
parent 3a8a150b0f
commit 1f77a6a9c0
5 changed files with 3345 additions and 0 deletions

View File

@ -0,0 +1,76 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rpcpb
import (
"fmt"
"reflect"
"strings"
)
var etcdFields = []string{
"Name",
"DataDir",
"WALDir",
"ListenClientURLs",
"AdvertiseClientURLs",
"ListenPeerURLs",
"InitialAdvertisePeerURLs",
"InitialCluster",
"InitialClusterState",
"InitialClusterToken",
"SnapshotCount",
"QuotaBackendBytes",
"PreVote",
"InitialCorruptCheck",
}
// Flags returns etcd flags in string slice.
func (cfg *Etcd) Flags() (fs []string) {
tp := reflect.TypeOf(*cfg)
vo := reflect.ValueOf(*cfg)
for _, name := range etcdFields {
field, ok := tp.FieldByName(name)
if !ok {
panic(fmt.Errorf("field %q not found", name))
}
fv := reflect.Indirect(vo).FieldByName(name)
var sv string
switch fv.Type().Kind() {
case reflect.String:
sv = fv.String()
case reflect.Slice:
n := fv.Len()
sl := make([]string, n)
for i := 0; i < n; i++ {
sl[i] = fv.Index(i).String()
}
sv = strings.Join(sl, ",")
case reflect.Int64:
sv = fmt.Sprintf("%d", fv.Int())
case reflect.Bool:
sv = fmt.Sprintf("%v", fv.Bool())
default:
panic(fmt.Errorf("field %q (%v) cannot be parsed", name, fv.Type().Kind()))
}
fname := field.Tag.Get("yaml")
// TODO: remove this
if fname == "initial-corrupt-check" {
fname = "experimental-" + fname
}
fs = append(fs, fmt.Sprintf("--%s=%s", fname, sv))
}
return fs
}

View File

@ -0,0 +1,59 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rpcpb
import (
"reflect"
"testing"
)
func TestEtcdFlags(t *testing.T) {
cfg := &Etcd{
Name: "s1",
DataDir: "/tmp/etcd-agent-data-1/etcd.data",
WALDir: "/tmp/etcd-agent-data-1/etcd.data/member/wal",
ListenClientURLs: []string{"127.0.0.1:1379"},
AdvertiseClientURLs: []string{"127.0.0.1:13790"},
ListenPeerURLs: []string{"127.0.0.1:1380"},
InitialAdvertisePeerURLs: []string{"127.0.0.1:13800"},
InitialCluster: "s1=127.0.0.1:13800,s2=127.0.0.1:23800,s3=127.0.0.1:33800",
InitialClusterState: "new",
InitialClusterToken: "tkn",
SnapshotCount: 10000,
QuotaBackendBytes: 10740000000,
PreVote: true,
InitialCorruptCheck: true,
}
exp := []string{
"--name=s1",
"--data-dir=/tmp/etcd-agent-data-1/etcd.data",
"--wal-dir=/tmp/etcd-agent-data-1/etcd.data/member/wal",
"--listen-client-urls=127.0.0.1:1379",
"--advertise-client-urls=127.0.0.1:13790",
"--listen-peer-urls=127.0.0.1:1380",
"--initial-advertise-peer-urls=127.0.0.1:13800",
"--initial-cluster=s1=127.0.0.1:13800,s2=127.0.0.1:23800,s3=127.0.0.1:33800",
"--initial-cluster-state=new",
"--initial-cluster-token=tkn",
"--snapshot-count=10000",
"--quota-backend-bytes=10740000000",
"--pre-vote=true",
"--experimental-initial-corrupt-check=true",
}
fs := cfg.Flags()
if !reflect.DeepEqual(exp, fs) {
t.Fatalf("expected %q, got %q", exp, fs)
}
}

View File

@ -0,0 +1,161 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rpcpb
import (
"context"
"fmt"
"time"
"github.com/coreos/etcd/clientv3"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
grpc "google.golang.org/grpc"
)
var dialOpts = []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithTimeout(5 * time.Second),
grpc.WithBlock(),
}
// DialEtcdGRPCServer creates a raw gRPC connection to an etcd member.
func (m *Member) DialEtcdGRPCServer() (*grpc.ClientConn, error) {
if m.EtcdClientTLS {
// TODO: support TLS
panic("client TLS not supported yet")
}
return grpc.Dial(m.EtcdClientEndpoint, dialOpts...)
}
// CreateEtcdClient creates a client from member.
func (m *Member) CreateEtcdClient() (*clientv3.Client, error) {
if m.EtcdClientTLS {
// TODO: support TLS
panic("client TLS not supported yet")
}
return clientv3.New(clientv3.Config{
Endpoints: []string{m.EtcdClientEndpoint},
DialTimeout: 5 * time.Second,
})
}
// CheckCompact ensures that historical data before given revision has been compacted.
func (m *Member) CheckCompact(rev int64) error {
cli, err := m.CreateEtcdClient()
if err != nil {
return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
wch := cli.Watch(ctx, "\x00", clientv3.WithFromKey(), clientv3.WithRev(rev-1))
wr, ok := <-wch
cancel()
if !ok {
return fmt.Errorf("watch channel terminated (endpoint %q)", m.EtcdClientEndpoint)
}
if wr.CompactRevision != rev {
return fmt.Errorf("got compact revision %v, wanted %v (endpoint %q)", wr.CompactRevision, rev, m.EtcdClientEndpoint)
}
return nil
}
// Defrag runs defragmentation on this member.
func (m *Member) Defrag() error {
cli, err := m.CreateEtcdClient()
if err != nil {
return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
_, err = cli.Defragment(ctx, m.EtcdClientEndpoint)
cancel()
if err != nil {
return err
}
return nil
}
// RevHash fetches current revision and hash on this member.
func (m *Member) RevHash() (int64, int64, error) {
conn, err := m.DialEtcdGRPCServer()
if err != nil {
return 0, 0, err
}
defer conn.Close()
mt := pb.NewMaintenanceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := mt.Hash(ctx, &pb.HashRequest{}, grpc.FailFast(false))
cancel()
if err != nil {
return 0, 0, err
}
return resp.Header.Revision, int64(resp.Hash), nil
}
// Rev fetches current revision on this member.
func (m *Member) Rev(ctx context.Context) (int64, error) {
cli, err := m.CreateEtcdClient()
if err != nil {
return 0, fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
}
defer cli.Close()
resp, err := cli.Status(ctx, m.EtcdClientEndpoint)
if err != nil {
return 0, err
}
return resp.Header.Revision, nil
}
// IsLeader returns true if this member is the current cluster leader.
func (m *Member) IsLeader() (bool, error) {
cli, err := m.CreateEtcdClient()
if err != nil {
return false, fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
}
defer cli.Close()
resp, err := cli.Status(context.Background(), m.EtcdClientEndpoint)
if err != nil {
return false, err
}
return resp.Header.MemberId == resp.Leader, nil
}
// WriteHealthKey writes a health key to this member.
func (m *Member) WriteHealthKey() error {
cli, err := m.CreateEtcdClient()
if err != nil {
return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
}
defer cli.Close()
// give enough time-out in case expensive requests (range/delete) are pending
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, err = cli.Put(ctx, "health", "good")
cancel()
if err != nil {
return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
}
return nil
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,187 @@
syntax = "proto3";
package rpcpb;
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.sizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
service Transport {
rpc Transport(stream Request) returns (stream Response) {}
}
enum Operation {
NotStarted = 0;
// InitialStartEtcd is only called to start etcd very first time.
InitialStartEtcd = 1;
// RestartEtcd is sent to restart killed etcd.
RestartEtcd = 2;
// KillEtcd pauses etcd process while keeping data directories
// and previous etcd configurations.
KillEtcd = 3;
// FailArchive is sent when consistency check failed,
// thus need to archive etcd data directories.
FailArchive = 4;
// DestroyEtcdAgent destroys etcd process, etcd data, and agent server.
DestroyEtcdAgent = 5;
BlackholePeerPortTxRx = 100;
UnblackholePeerPortTxRx = 101;
DelayPeerPortTxRx = 102;
UndelayPeerPortTxRx = 103;
}
message Etcd {
string Name = 1 [(gogoproto.moretags) = "yaml:\"name\""];
string DataDir = 2 [(gogoproto.moretags) = "yaml:\"data-dir\""];
string WALDir = 3 [(gogoproto.moretags) = "yaml:\"wal-dir\""];
repeated string ListenClientURLs = 4 [(gogoproto.moretags) = "yaml:\"listen-client-urls\""];
repeated string AdvertiseClientURLs = 5 [(gogoproto.moretags) = "yaml:\"advertise-client-urls\""];
repeated string ListenPeerURLs = 6 [(gogoproto.moretags) = "yaml:\"listen-peer-urls\""];
repeated string InitialAdvertisePeerURLs = 7 [(gogoproto.moretags) = "yaml:\"initial-advertise-peer-urls\""];
string InitialCluster = 8 [(gogoproto.moretags) = "yaml:\"initial-cluster\""];
string InitialClusterState = 9 [(gogoproto.moretags) = "yaml:\"initial-cluster-state\""];
string InitialClusterToken = 10 [(gogoproto.moretags) = "yaml:\"initial-cluster-token\""];
int64 SnapshotCount = 11 [(gogoproto.moretags) = "yaml:\"snapshot-count\""];
int64 QuotaBackendBytes = 12 [(gogoproto.moretags) = "yaml:\"quota-backend-bytes\""];
bool PreVote = 13 [(gogoproto.moretags) = "yaml:\"pre-vote\""];
bool InitialCorruptCheck = 14 [(gogoproto.moretags) = "yaml:\"initial-corrupt-check\""];
// TODO: support TLS
}
message Member {
// EtcdExecPath is the executable etcd binary path in agent server.
string EtcdExecPath = 1 [(gogoproto.moretags) = "yaml:\"etcd-exec-path\""];
// TODO: support embedded etcd
// AgentAddr is the agent HTTP server address.
string AgentAddr = 11 [(gogoproto.moretags) = "yaml:\"agent-addr\""];
// FailpointHTTPAddr is the agent's failpoints HTTP server address.
string FailpointHTTPAddr = 12 [(gogoproto.moretags) = "yaml:\"failpoint-http-addr\""];
// BaseDir is the base directory where all logs and etcd data are stored.
string BaseDir = 101 [(gogoproto.moretags) = "yaml:\"base-dir\""];
// EtcdLogPath is the log file to store current etcd server logs.
string EtcdLogPath = 102 [(gogoproto.moretags) = "yaml:\"etcd-log-path\""];
// EtcdClientTLS is true when client traffic needs to be encrypted.
bool EtcdClientTLS = 201 [(gogoproto.moretags) = "yaml:\"etcd-client-tls\""];
// EtcdClientProxy is true when client traffic needs to be proxied.
// If true, listen client URL port must be different than advertise client URL port.
bool EtcdClientProxy = 202 [(gogoproto.moretags) = "yaml:\"etcd-client-proxy\""];
// EtcdPeerProxy is true when peer traffic needs to be proxied.
// If true, listen peer URL port must be different than advertise peer URL port.
bool EtcdPeerProxy = 203 [(gogoproto.moretags) = "yaml:\"etcd-peer-proxy\""];
// EtcdClientEndpoint is the etcd client endpoint.
string EtcdClientEndpoint = 204 [(gogoproto.moretags) = "yaml:\"etcd-client-endpoint\""];
// Etcd defines etcd binary configuration flags.
Etcd Etcd = 301 [(gogoproto.moretags) = "yaml:\"etcd-config\""];
}
enum FailureCase {
KILL_ONE_FOLLOWER = 0;
KILL_LEADER = 1;
KILL_ONE_FOLLOWER_FOR_LONG = 2;
KILL_LEADER_FOR_LONG = 3;
KILL_QUORUM = 4;
KILL_ALL = 5;
BLACKHOLE_PEER_PORT_TX_RX_ONE_FOLLOWER = 6;
BLACKHOLE_PEER_PORT_TX_RX_LEADER_ONE = 7;
BLACKHOLE_PEER_PORT_TX_RX_ALL = 8;
DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER = 9;
DELAY_PEER_PORT_TX_RX_LEADER = 10;
DELAY_PEER_PORT_TX_RX_ALL = 11;
FAILPOINTS = 100;
NO_FAIL = 200;
// TODO: support no-op of liveness duration
// NO_FAIL_LIVENESS = 201;
EXTERNAL = 300;
}
enum StressType {
KV = 0;
LEASE = 1;
NO_STRESS = 2;
ELECTION_RUNNER = 3;
WATCH_RUNNER = 4;
LOCK_RACER_RUNNER = 5;
LEASE_RUNNER = 6;
}
message Tester {
string TesterNetwork = 1 [(gogoproto.moretags) = "yaml:\"tester-network\""];
string TesterAddr = 2 [(gogoproto.moretags) = "yaml:\"tester-addr\""];
// DelayLatencyMsRv is the delay latency in milliseconds,
// to inject to simulated slow network.
uint32 DelayLatencyMs = 11 [(gogoproto.moretags) = "yaml:\"delay-latency-ms\""];
// DelayLatencyMsRv is the delay latency random variable in milliseconds.
uint32 DelayLatencyMsRv = 12 [(gogoproto.moretags) = "yaml:\"delay-latency-ms-rv\""];
// RoundLimit is the limit of rounds to run failure set (-1 to run without limits).
int32 RoundLimit = 21 [(gogoproto.moretags) = "yaml:\"round-limit\""];
// ExitOnFailure is true, then exit tester on first failure.
bool ExitOnFailure = 22 [(gogoproto.moretags) = "yaml:\"exit-on-failure\""];
// ConsistencyCheck is true to check consistency (revision, hash).
bool ConsistencyCheck = 23 [(gogoproto.moretags) = "yaml:\"consistency-check\""];
// EnablePprof is true to enable profiler.
bool EnablePprof = 24 [(gogoproto.moretags) = "yaml:\"enable-pprof\""];
// FailureCases is the selected test cases to schedule.
// If empty, run all failure cases.
// TODO: support no-op
repeated string FailureCases = 31 [(gogoproto.moretags) = "yaml:\"failure-cases\""];
// FailureShuffle is true to randomize failure injecting order.
// TODO: support shuffle
// bool FailureShuffle = 32 [(gogoproto.moretags) = "yaml:\"failure-shuffle\""];
// FailpointCommands is the list of "gofail" commands (e.g. panic("etcd-tester"),1*sleep(1000)).
repeated string FailpointCommands = 33 [(gogoproto.moretags) = "yaml:\"failpoint-commands\""];
// RunnerExecPath is a path of etcd-runner binary.
string RunnerExecPath = 41 [(gogoproto.moretags) = "yaml:\"runner-exec-path\""];
// ExternalExecPath is a path of script for enabling/disabling an external fault injector.
string ExternalExecPath = 42 [(gogoproto.moretags) = "yaml:\"external-exec-path\""];
// StressTypes is the list of stresser names:
// keys, lease, nop, election-runner, watch-runner, lock-racer-runner, lease-runner.
repeated string StressTypes = 101 [(gogoproto.moretags) = "yaml:\"stress-types\""];
// StressKeySize is the size of each small key written into etcd.
int32 StressKeySize = 102 [(gogoproto.moretags) = "yaml:\"stress-key-size\""];
// StressKeySizeLarge is the size of each large key written into etcd.
int32 StressKeySizeLarge = 103 [(gogoproto.moretags) = "yaml:\"stress-key-size-large\""];
// StressKeySuffixRange is the count of key range written into etcd.
// Stress keys are created with "fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange)".
int32 StressKeySuffixRange = 104 [(gogoproto.moretags) = "yaml:\"stress-key-suffix-range\""];
// StressKeySuffixRangeTxn is the count of key range written into etcd txn (max 100).
// Stress keys are created with "fmt.Sprintf("/k%03d", i)".
int32 StressKeySuffixRangeTxn = 105 [(gogoproto.moretags) = "yaml:\"stress-key-suffix-range-txn\""];
// StressKeyTxnOps is the number of operations per a transaction (max 64).
int32 StressKeyTxnOps = 106 [(gogoproto.moretags) = "yaml:\"stress-key-txn-ops\""];
// StressQPS is the maximum number of stresser requests per second.
int32 StressQPS = 107 [(gogoproto.moretags) = "yaml:\"stress-qps\""];
}
message Request {
Operation Operation = 1;
Member Member = 2;
Tester Tester = 3;
}
message Response {
bool Success = 1;
string Status = 2;
// TODO: support TLS
}