etcd-tester: support per-agent client/peer/failpoint ports

This commit is contained in:
Anthony Romano 2016-09-08 16:09:50 -07:00
parent 2e25a772a5
commit 55ba3d95fb
4 changed files with 86 additions and 26 deletions

View File

@ -29,15 +29,21 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
const ( // agentConfig holds information needed to interact/configure an agent and its etcd process
peerURLPort = 2380 type agentConfig struct {
failpointPort = 2381 endpoint string
) clientPort int
peerPort int
failpointPort int
datadir string
}
type cluster struct { type cluster struct {
agents []agentConfig
v2Only bool // to be deprecated v2Only bool // to be deprecated
datadir string
stressQPS int stressQPS int
stressKeyLargeSize int stressKeyLargeSize int
stressKeySize int stressKeySize int
@ -53,27 +59,27 @@ type ClusterStatus struct {
AgentStatuses map[string]client.Status AgentStatuses map[string]client.Status
} }
func (c *cluster) bootstrap(agentEndpoints []string) error { func (c *cluster) bootstrap() error {
size := len(agentEndpoints) size := len(c.agents)
members := make([]*member, size) members := make([]*member, size)
memberNameURLs := make([]string, size) memberNameURLs := make([]string, size)
for i, u := range agentEndpoints { for i, a := range c.agents {
agent, err := client.NewAgent(u) agent, err := client.NewAgent(a.endpoint)
if err != nil { if err != nil {
return err return err
} }
host, _, err := net.SplitHostPort(u) host, _, err := net.SplitHostPort(a.endpoint)
if err != nil { if err != nil {
return err return err
} }
members[i] = &member{ members[i] = &member{
Agent: agent, Agent: agent,
Endpoint: u, Endpoint: a.endpoint,
Name: fmt.Sprintf("etcd-%d", i), Name: fmt.Sprintf("etcd-%d", i),
ClientURL: fmt.Sprintf("http://%s:2379", host), ClientURL: fmt.Sprintf("http://%s:%d", host, a.clientPort),
PeerURL: fmt.Sprintf("http://%s:%d", host, peerURLPort), PeerURL: fmt.Sprintf("http://%s:%d", host, a.peerPort),
FailpointURL: fmt.Sprintf("http://%s:%d", host, failpointPort), FailpointURL: fmt.Sprintf("http://%s:%d", host, a.failpointPort),
} }
memberNameURLs[i] = members[i].ClusterEntry() memberNameURLs[i] = members[i].ClusterEntry()
} }
@ -83,7 +89,7 @@ func (c *cluster) bootstrap(agentEndpoints []string) error {
for i, m := range members { for i, m := range members {
flags := append( flags := append(
m.Flags(), m.Flags(),
"--data-dir", c.datadir, "--data-dir", c.agents[i].datadir,
"--initial-cluster-token", token, "--initial-cluster-token", token,
"--initial-cluster", clusterStr) "--initial-cluster", clusterStr)
@ -127,13 +133,7 @@ func (c *cluster) bootstrap(agentEndpoints []string) error {
return nil return nil
} }
func (c *cluster) Reset() error { func (c *cluster) Reset() error { return c.bootstrap() }
eps := make([]string, len(c.Members))
for i, m := range c.Members {
eps[i] = m.Endpoint
}
return c.bootstrap(eps)
}
func (c *cluster) WaitHealth() error { func (c *cluster) WaitHealth() error {
var err error var err error

View File

@ -78,8 +78,8 @@ func newFailureKillLeaderForLongTime() failure {
return &failureUntilSnapshot{newFailureKillLeader()} return &failureUntilSnapshot{newFailureKillLeader()}
} }
func injectDropPort(m *member) error { return m.Agent.DropPort(peerURLPort) } func injectDropPort(m *member) error { return m.Agent.DropPort(m.peerPort()) }
func recoverDropPort(m *member) error { return m.Agent.RecoverPort(peerURLPort) } func recoverDropPort(m *member) error { return m.Agent.RecoverPort(m.peerPort()) }
func newFailureIsolate() failure { func newFailureIsolate() failure {
return &failureOne{ return &failureOne{

View File

@ -18,6 +18,7 @@ import (
"flag" "flag"
"fmt" "fmt"
"net/http" "net/http"
"os"
"strings" "strings"
"github.com/coreos/pkg/capnslog" "github.com/coreos/pkg/capnslog"
@ -26,8 +27,18 @@ import (
var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcd-tester") var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcd-tester")
const (
defaultClientPort = 2379
defaultPeerPort = 2380
defaultFailpointPort = 2381
)
func main() { func main() {
endpointStr := flag.String("agent-endpoints", "localhost:9027", "HTTP RPC endpoints of agents. Do not specify the schema.") endpointStr := flag.String("agent-endpoints", "localhost:9027", "HTTP RPC endpoints of agents. Do not specify the schema.")
clientPorts := flag.String("client-ports", "", "etcd client port for each agent endpoint")
peerPorts := flag.String("peer-ports", "", "etcd peer port for each agent endpoint")
failpointPorts := flag.String("failpoint-ports", "", "etcd failpoint port for each agent endpoint")
datadir := flag.String("data-dir", "agent.etcd", "etcd data directory location on agent machine.") datadir := flag.String("data-dir", "agent.etcd", "etcd data directory location on agent machine.")
stressKeyLargeSize := flag.Uint("stress-key-large-size", 32*1024+1, "the size of each large key written into etcd.") stressKeyLargeSize := flag.Uint("stress-key-large-size", 32*1024+1, "the size of each large key written into etcd.")
stressKeySize := flag.Uint("stress-key-size", 100, "the size of each small key written into etcd.") stressKeySize := flag.Uint("stress-key-size", 100, "the size of each small key written into etcd.")
@ -39,15 +50,29 @@ func main() {
isV2Only := flag.Bool("v2-only", false, "'true' to run V2 only tester.") isV2Only := flag.Bool("v2-only", false, "'true' to run V2 only tester.")
flag.Parse() flag.Parse()
eps := strings.Split(*endpointStr, ",")
cports := portsFromArg(*clientPorts, len(eps), defaultClientPort)
pports := portsFromArg(*peerPorts, len(eps), defaultPeerPort)
fports := portsFromArg(*failpointPorts, len(eps), defaultFailpointPort)
agents := make([]agentConfig, len(eps))
for i := range eps {
agents[i].endpoint = eps[i]
agents[i].clientPort = cports[i]
agents[i].peerPort = pports[i]
agents[i].failpointPort = fports[i]
agents[i].datadir = *datadir
}
c := &cluster{ c := &cluster{
agents: agents,
v2Only: *isV2Only, v2Only: *isV2Only,
datadir: *datadir,
stressQPS: *stressQPS, stressQPS: *stressQPS,
stressKeyLargeSize: int(*stressKeyLargeSize), stressKeyLargeSize: int(*stressKeyLargeSize),
stressKeySize: int(*stressKeySize), stressKeySize: int(*stressKeySize),
stressKeySuffixRange: int(*stressKeySuffixRange), stressKeySuffixRange: int(*stressKeySuffixRange),
} }
if err := c.bootstrap(strings.Split(*endpointStr, ",")); err != nil {
if err := c.bootstrap(); err != nil {
plog.Fatal(err) plog.Fatal(err)
} }
defer c.Terminate() defer c.Terminate()
@ -102,3 +127,26 @@ func main() {
t.runLoop() t.runLoop()
} }
// portsFromArg converts a comma separated list into a slice of ints
func portsFromArg(arg string, n, defaultPort int) []int {
ret := make([]int, n)
if len(arg) == 0 {
for i := range ret {
ret[i] = defaultPort
}
return ret
}
s := strings.Split(arg, ",")
if len(s) != n {
fmt.Printf("expected %d ports, got %d (%s)\n", n, len(s), arg)
os.Exit(1)
}
for i := range s {
if _, err := fmt.Sscanf(s[i], "%d", &ret[i]); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
return ret
}

View File

@ -16,6 +16,7 @@ package main
import ( import (
"fmt" "fmt"
"net"
"net/url" "net/url"
"time" "time"
@ -165,3 +166,14 @@ func (m *member) grpcAddr() string {
} }
return u.Host return u.Host
} }
func (m *member) peerPort() (port int) {
_, portStr, err := net.SplitHostPort(m.PeerURL)
if err != nil {
panic(err)
}
if _, err = fmt.Sscanf(portStr, "%d", &port); err != nil {
panic(err)
}
return port
}