mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

I move the checker logic from tester to cluster so that stressers and checkers can be initialized at the same time. this is useful because some checker depends on stressers.
279 lines
6.4 KiB
Go
279 lines
6.4 KiB
Go
// Copyright 2015 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"strings"
|
|
"time"
|
|
|
|
"golang.org/x/net/context"
|
|
|
|
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
|
"github.com/coreos/etcd/tools/functional-tester/etcd-agent/client"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
// agentConfig holds information needed to interact/configure an agent and its etcd process
|
|
type agentConfig struct {
|
|
endpoint string
|
|
clientPort int
|
|
peerPort int
|
|
failpointPort int
|
|
|
|
datadir string
|
|
}
|
|
|
|
type cluster struct {
|
|
agents []agentConfig
|
|
|
|
v2Only bool // to be deprecated
|
|
consistencyCheck bool
|
|
Size int
|
|
|
|
Stressers []Stresser
|
|
stressBuilder stressBuilder
|
|
Checker Checker
|
|
|
|
Members []*member
|
|
}
|
|
|
|
type ClusterStatus struct {
|
|
AgentStatuses map[string]client.Status
|
|
}
|
|
|
|
func (c *cluster) bootstrap() error {
|
|
size := len(c.agents)
|
|
|
|
members := make([]*member, size)
|
|
memberNameURLs := make([]string, size)
|
|
for i, a := range c.agents {
|
|
agent, err := client.NewAgent(a.endpoint)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
host, _, err := net.SplitHostPort(a.endpoint)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
members[i] = &member{
|
|
Agent: agent,
|
|
Endpoint: a.endpoint,
|
|
Name: fmt.Sprintf("etcd-%d", i),
|
|
ClientURL: fmt.Sprintf("http://%s:%d", host, a.clientPort),
|
|
PeerURL: fmt.Sprintf("http://%s:%d", host, a.peerPort),
|
|
FailpointURL: fmt.Sprintf("http://%s:%d", host, a.failpointPort),
|
|
}
|
|
memberNameURLs[i] = members[i].ClusterEntry()
|
|
}
|
|
clusterStr := strings.Join(memberNameURLs, ",")
|
|
token := fmt.Sprint(rand.Int())
|
|
|
|
for i, m := range members {
|
|
flags := append(
|
|
m.Flags(),
|
|
"--data-dir", c.agents[i].datadir,
|
|
"--initial-cluster-token", token,
|
|
"--initial-cluster", clusterStr)
|
|
|
|
if _, err := m.Agent.Start(flags...); err != nil {
|
|
// cleanup
|
|
for _, m := range members[:i] {
|
|
m.Agent.Terminate()
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
|
|
c.Stressers = make([]Stresser, len(members))
|
|
for i, m := range members {
|
|
c.Stressers[i] = c.stressBuilder(m)
|
|
go c.Stressers[i].Stress()
|
|
}
|
|
|
|
if c.consistencyCheck && !c.v2Only {
|
|
c.Checker = newHashChecker(hashAndRevGetter(c))
|
|
} else {
|
|
c.Checker = newNoChecker()
|
|
}
|
|
|
|
c.Size = size
|
|
c.Members = members
|
|
return nil
|
|
}
|
|
|
|
func (c *cluster) Reset() error { return c.bootstrap() }
|
|
|
|
func (c *cluster) WaitHealth() error {
|
|
var err error
|
|
// wait 60s to check cluster health.
|
|
// TODO: set it to a reasonable value. It is set that high because
|
|
// follower may use long time to catch up the leader when reboot under
|
|
// reasonable workload (https://github.com/coreos/etcd/issues/2698)
|
|
healthFunc := func(m *member) error { return m.SetHealthKeyV3() }
|
|
if c.v2Only {
|
|
healthFunc = func(m *member) error { return m.SetHealthKeyV2() }
|
|
}
|
|
for i := 0; i < 60; i++ {
|
|
for _, m := range c.Members {
|
|
if err = healthFunc(m); err != nil {
|
|
break
|
|
}
|
|
}
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
plog.Warningf("#%d setHealthKey error (%v)", i, err)
|
|
time.Sleep(time.Second)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// GetLeader returns the index of leader and error if any.
|
|
func (c *cluster) GetLeader() (int, error) {
|
|
if c.v2Only {
|
|
return 0, nil
|
|
}
|
|
for i, m := range c.Members {
|
|
isLeader, err := m.IsLeader()
|
|
if isLeader || err != nil {
|
|
return i, err
|
|
}
|
|
}
|
|
return 0, fmt.Errorf("no leader found")
|
|
}
|
|
|
|
func (c *cluster) Report() (success, failure int) {
|
|
for _, stress := range c.Stressers {
|
|
s, f := stress.Report()
|
|
success += s
|
|
failure += f
|
|
}
|
|
return
|
|
}
|
|
|
|
func (c *cluster) Cleanup() error {
|
|
var lasterr error
|
|
for _, m := range c.Members {
|
|
if err := m.Agent.Cleanup(); err != nil {
|
|
lasterr = err
|
|
}
|
|
}
|
|
for _, s := range c.Stressers {
|
|
s.Cancel()
|
|
}
|
|
return lasterr
|
|
}
|
|
|
|
func (c *cluster) Terminate() {
|
|
for _, m := range c.Members {
|
|
m.Agent.Terminate()
|
|
}
|
|
for _, s := range c.Stressers {
|
|
s.Cancel()
|
|
}
|
|
}
|
|
|
|
func (c *cluster) Status() ClusterStatus {
|
|
cs := ClusterStatus{
|
|
AgentStatuses: make(map[string]client.Status),
|
|
}
|
|
|
|
for _, m := range c.Members {
|
|
s, err := m.Agent.Status()
|
|
// TODO: add a.Desc() as a key of the map
|
|
desc := m.Endpoint
|
|
if err != nil {
|
|
cs.AgentStatuses[desc] = client.Status{State: "unknown"}
|
|
plog.Printf("failed to get the status of agent [%s]", desc)
|
|
}
|
|
cs.AgentStatuses[desc] = s
|
|
}
|
|
return cs
|
|
}
|
|
|
|
func (c *cluster) getRevisionHash() (map[string]int64, map[string]int64, error) {
|
|
revs := make(map[string]int64)
|
|
hashes := make(map[string]int64)
|
|
for _, m := range c.Members {
|
|
rev, hash, err := m.RevHash()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
revs[m.ClientURL] = rev
|
|
hashes[m.ClientURL] = hash
|
|
}
|
|
return revs, hashes, nil
|
|
}
|
|
|
|
func (c *cluster) compactKV(rev int64, timeout time.Duration) (err error) {
|
|
if rev <= 0 {
|
|
return nil
|
|
}
|
|
|
|
for i, m := range c.Members {
|
|
u := m.ClientURL
|
|
conn, derr := m.dialGRPC()
|
|
if derr != nil {
|
|
plog.Printf("[compact kv #%d] dial error %v (endpoint %s)", i, derr, u)
|
|
err = derr
|
|
continue
|
|
}
|
|
kvc := pb.NewKVClient(conn)
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
plog.Printf("[compact kv #%d] starting (endpoint %s)", i, u)
|
|
_, cerr := kvc.Compact(ctx, &pb.CompactionRequest{Revision: rev, Physical: true}, grpc.FailFast(false))
|
|
cancel()
|
|
conn.Close()
|
|
succeed := true
|
|
if cerr != nil {
|
|
if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 {
|
|
plog.Printf("[compact kv #%d] already compacted (endpoint %s)", i, u)
|
|
} else {
|
|
plog.Warningf("[compact kv #%d] error %v (endpoint %s)", i, cerr, u)
|
|
err = cerr
|
|
succeed = false
|
|
}
|
|
}
|
|
if succeed {
|
|
plog.Printf("[compact kv #%d] done (endpoint %s)", i, u)
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (c *cluster) checkCompact(rev int64) error {
|
|
if rev == 0 {
|
|
return nil
|
|
}
|
|
for _, m := range c.Members {
|
|
if err := m.CheckCompact(rev); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *cluster) defrag() error {
|
|
for _, m := range c.Members {
|
|
if err := m.Defrag(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|