Merge pull request #9523 from gyuho/fff

functional-tester/tester: use "*rpcpb.Member" directly to dial
This commit is contained in:
Gyuho Lee 2018-04-03 11:12:37 -07:00 committed by GitHub
commit f25188d35b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 244 additions and 249 deletions

View File

@ -36,7 +36,7 @@ func (srv *Server) handleTesterRequest(req *rpcpb.Request) (resp *rpcpb.Response
defer func() { defer func() {
if err == nil { if err == nil {
srv.last = req.Operation srv.last = req.Operation
srv.logger.Info("handler success", zap.String("operation", req.Operation.String())) srv.lg.Info("handler success", zap.String("operation", req.Operation.String()))
} }
}() }()
@ -78,24 +78,24 @@ func (srv *Server) handleInitialStartEtcd(req *rpcpb.Request) (*rpcpb.Response,
srv.Member = req.Member srv.Member = req.Member
srv.Tester = req.Tester srv.Tester = req.Tester
srv.logger.Info("creating base directory", zap.String("path", srv.Member.BaseDir)) srv.lg.Info("creating base directory", zap.String("path", srv.Member.BaseDir))
err := fileutil.TouchDirAll(srv.Member.BaseDir) err := fileutil.TouchDirAll(srv.Member.BaseDir)
if err != nil { if err != nil {
return nil, err return nil, err
} }
srv.logger.Info("created base directory", zap.String("path", srv.Member.BaseDir)) srv.lg.Info("created base directory", zap.String("path", srv.Member.BaseDir))
if err = srv.createEtcdFile(); err != nil { if err = srv.createEtcdFile(); err != nil {
return nil, err return nil, err
} }
srv.creatEtcdCmd() srv.creatEtcdCmd()
srv.logger.Info("starting etcd") srv.lg.Info("starting etcd")
err = srv.startEtcdCmd() err = srv.startEtcdCmd()
if err != nil { if err != nil {
return nil, err return nil, err
} }
srv.logger.Info("started etcd", zap.String("command-path", srv.etcdCmd.Path)) srv.lg.Info("started etcd", zap.String("command-path", srv.etcdCmd.Path))
// wait some time for etcd listener start // wait some time for etcd listener start
// before setting up proxy // before setting up proxy
@ -121,9 +121,9 @@ func (srv *Server) startProxy() error {
return err return err
} }
srv.logger.Info("starting proxy on client traffic", zap.String("url", advertiseClientURL.String())) srv.lg.Info("starting proxy on client traffic", zap.String("url", advertiseClientURL.String()))
srv.advertiseClientPortToProxy[advertiseClientURLPort] = transport.NewProxy(transport.ProxyConfig{ srv.advertiseClientPortToProxy[advertiseClientURLPort] = transport.NewProxy(transport.ProxyConfig{
Logger: srv.logger, Logger: srv.lg,
From: *advertiseClientURL, From: *advertiseClientURL,
To: *listenClientURL, To: *listenClientURL,
}) })
@ -131,7 +131,7 @@ func (srv *Server) startProxy() error {
case err = <-srv.advertiseClientPortToProxy[advertiseClientURLPort].Error(): case err = <-srv.advertiseClientPortToProxy[advertiseClientURLPort].Error():
return err return err
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
srv.logger.Info("started proxy on client traffic", zap.String("url", advertiseClientURL.String())) srv.lg.Info("started proxy on client traffic", zap.String("url", advertiseClientURL.String()))
} }
} }
@ -145,9 +145,9 @@ func (srv *Server) startProxy() error {
return err return err
} }
srv.logger.Info("starting proxy on peer traffic", zap.String("url", advertisePeerURL.String())) srv.lg.Info("starting proxy on peer traffic", zap.String("url", advertisePeerURL.String()))
srv.advertisePeerPortToProxy[advertisePeerURLPort] = transport.NewProxy(transport.ProxyConfig{ srv.advertisePeerPortToProxy[advertisePeerURLPort] = transport.NewProxy(transport.ProxyConfig{
Logger: srv.logger, Logger: srv.lg,
From: *advertisePeerURL, From: *advertisePeerURL,
To: *listenPeerURL, To: *listenPeerURL,
}) })
@ -155,7 +155,7 @@ func (srv *Server) startProxy() error {
case err = <-srv.advertisePeerPortToProxy[advertisePeerURLPort].Error(): case err = <-srv.advertisePeerPortToProxy[advertisePeerURLPort].Error():
return err return err
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
srv.logger.Info("started proxy on peer traffic", zap.String("url", advertisePeerURL.String())) srv.lg.Info("started proxy on peer traffic", zap.String("url", advertisePeerURL.String()))
} }
} }
return nil return nil
@ -164,13 +164,13 @@ func (srv *Server) startProxy() error {
func (srv *Server) stopProxy() { func (srv *Server) stopProxy() {
if srv.Member.EtcdClientProxy && len(srv.advertiseClientPortToProxy) > 0 { if srv.Member.EtcdClientProxy && len(srv.advertiseClientPortToProxy) > 0 {
for port, px := range srv.advertiseClientPortToProxy { for port, px := range srv.advertiseClientPortToProxy {
srv.logger.Info("closing proxy", srv.lg.Info("closing proxy",
zap.Int("port", port), zap.Int("port", port),
zap.String("from", px.From()), zap.String("from", px.From()),
zap.String("to", px.To()), zap.String("to", px.To()),
) )
if err := px.Close(); err != nil { if err := px.Close(); err != nil {
srv.logger.Warn("failed to close proxy", zap.Int("port", port)) srv.lg.Warn("failed to close proxy", zap.Int("port", port))
continue continue
} }
select { select {
@ -179,7 +179,7 @@ func (srv *Server) stopProxy() {
time.Sleep(time.Second) time.Sleep(time.Second)
case <-time.After(time.Second): case <-time.After(time.Second):
} }
srv.logger.Info("closed proxy", srv.lg.Info("closed proxy",
zap.Int("port", port), zap.Int("port", port),
zap.String("from", px.From()), zap.String("from", px.From()),
zap.String("to", px.To()), zap.String("to", px.To()),
@ -189,13 +189,13 @@ func (srv *Server) stopProxy() {
} }
if srv.Member.EtcdPeerProxy && len(srv.advertisePeerPortToProxy) > 0 { if srv.Member.EtcdPeerProxy && len(srv.advertisePeerPortToProxy) > 0 {
for port, px := range srv.advertisePeerPortToProxy { for port, px := range srv.advertisePeerPortToProxy {
srv.logger.Info("closing proxy", srv.lg.Info("closing proxy",
zap.Int("port", port), zap.Int("port", port),
zap.String("from", px.From()), zap.String("from", px.From()),
zap.String("to", px.To()), zap.String("to", px.To()),
) )
if err := px.Close(); err != nil { if err := px.Close(); err != nil {
srv.logger.Warn("failed to close proxy", zap.Int("port", port)) srv.lg.Warn("failed to close proxy", zap.Int("port", port))
continue continue
} }
select { select {
@ -204,7 +204,7 @@ func (srv *Server) stopProxy() {
time.Sleep(time.Second) time.Sleep(time.Second)
case <-time.After(time.Second): case <-time.After(time.Second):
} }
srv.logger.Info("closed proxy", srv.lg.Info("closed proxy",
zap.Int("port", port), zap.Int("port", port),
zap.String("from", px.From()), zap.String("from", px.From()),
zap.String("to", px.To()), zap.String("to", px.To()),
@ -215,20 +215,20 @@ func (srv *Server) stopProxy() {
} }
func (srv *Server) createEtcdFile() error { func (srv *Server) createEtcdFile() error {
srv.logger.Info("creating etcd log file", zap.String("path", srv.Member.EtcdLogPath)) srv.lg.Info("creating etcd log file", zap.String("path", srv.Member.EtcdLogPath))
var err error var err error
srv.etcdLogFile, err = os.Create(srv.Member.EtcdLogPath) srv.etcdLogFile, err = os.Create(srv.Member.EtcdLogPath)
if err != nil { if err != nil {
return err return err
} }
srv.logger.Info("created etcd log file", zap.String("path", srv.Member.EtcdLogPath)) srv.lg.Info("created etcd log file", zap.String("path", srv.Member.EtcdLogPath))
return nil return nil
} }
func (srv *Server) creatEtcdCmd() { func (srv *Server) creatEtcdCmd() {
etcdPath, etcdFlags := srv.Member.EtcdExecPath, srv.Member.Etcd.Flags() etcdPath, etcdFlags := srv.Member.EtcdExecPath, srv.Member.Etcd.Flags()
u, _ := url.Parse(srv.Member.FailpointHTTPAddr) u, _ := url.Parse(srv.Member.FailpointHTTPAddr)
srv.logger.Info("creating etcd command", srv.lg.Info("creating etcd command",
zap.String("etcd-exec-path", etcdPath), zap.String("etcd-exec-path", etcdPath),
zap.Strings("etcd-flags", etcdFlags), zap.Strings("etcd-flags", etcdFlags),
zap.String("failpoint-http-addr", srv.Member.FailpointHTTPAddr), zap.String("failpoint-http-addr", srv.Member.FailpointHTTPAddr),
@ -248,12 +248,12 @@ func (srv *Server) startEtcdCmd() error {
func (srv *Server) handleRestartEtcd() (*rpcpb.Response, error) { func (srv *Server) handleRestartEtcd() (*rpcpb.Response, error) {
srv.creatEtcdCmd() srv.creatEtcdCmd()
srv.logger.Info("restarting etcd") srv.lg.Info("restarting etcd")
err := srv.startEtcdCmd() err := srv.startEtcdCmd()
if err != nil { if err != nil {
return nil, err return nil, err
} }
srv.logger.Info("restarted etcd", zap.String("command-path", srv.etcdCmd.Path)) srv.lg.Info("restarted etcd", zap.String("command-path", srv.etcdCmd.Path))
// wait some time for etcd listener start // wait some time for etcd listener start
// before setting up proxy // before setting up proxy
@ -273,12 +273,12 @@ func (srv *Server) handleRestartEtcd() (*rpcpb.Response, error) {
func (srv *Server) handleKillEtcd() (*rpcpb.Response, error) { func (srv *Server) handleKillEtcd() (*rpcpb.Response, error) {
srv.stopProxy() srv.stopProxy()
srv.logger.Info("killing etcd", zap.String("signal", syscall.SIGTERM.String())) srv.lg.Info("killing etcd", zap.String("signal", syscall.SIGTERM.String()))
err := stopWithSig(srv.etcdCmd, syscall.SIGTERM) err := stopWithSig(srv.etcdCmd, syscall.SIGTERM)
if err != nil { if err != nil {
return nil, err return nil, err
} }
srv.logger.Info("killed etcd", zap.String("signal", syscall.SIGTERM.String())) srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGTERM.String()))
return &rpcpb.Response{ return &rpcpb.Response{
Success: true, Success: true,
@ -290,18 +290,18 @@ func (srv *Server) handleFailArchive() (*rpcpb.Response, error) {
srv.stopProxy() srv.stopProxy()
// exit with stackstrace // exit with stackstrace
srv.logger.Info("killing etcd", zap.String("signal", syscall.SIGQUIT.String())) srv.lg.Info("killing etcd", zap.String("signal", syscall.SIGQUIT.String()))
err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT) err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT)
if err != nil { if err != nil {
return nil, err return nil, err
} }
srv.logger.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String())) srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))
srv.etcdLogFile.Sync() srv.etcdLogFile.Sync()
srv.etcdLogFile.Close() srv.etcdLogFile.Close()
// TODO: support separate WAL directory // TODO: support separate WAL directory
srv.logger.Info("archiving data", zap.String("base-dir", srv.Member.BaseDir)) srv.lg.Info("archiving data", zap.String("base-dir", srv.Member.BaseDir))
if err = archive( if err = archive(
srv.Member.BaseDir, srv.Member.BaseDir,
srv.Member.EtcdLogPath, srv.Member.EtcdLogPath,
@ -309,17 +309,17 @@ func (srv *Server) handleFailArchive() (*rpcpb.Response, error) {
); err != nil { ); err != nil {
return nil, err return nil, err
} }
srv.logger.Info("archived data", zap.String("base-dir", srv.Member.BaseDir)) srv.lg.Info("archived data", zap.String("base-dir", srv.Member.BaseDir))
if err = srv.createEtcdFile(); err != nil { if err = srv.createEtcdFile(); err != nil {
return nil, err return nil, err
} }
srv.logger.Info("cleaning up page cache") srv.lg.Info("cleaning up page cache")
if err := cleanPageCache(); err != nil { if err := cleanPageCache(); err != nil {
srv.logger.Warn("failed to clean up page cache", zap.String("error", err.Error())) srv.lg.Warn("failed to clean up page cache", zap.String("error", err.Error()))
} }
srv.logger.Info("cleaned up page cache") srv.lg.Info("cleaned up page cache")
return &rpcpb.Response{ return &rpcpb.Response{
Success: true, Success: true,
@ -329,32 +329,32 @@ func (srv *Server) handleFailArchive() (*rpcpb.Response, error) {
// stop proxy, etcd, delete data directory // stop proxy, etcd, delete data directory
func (srv *Server) handleDestroyEtcdAgent() (*rpcpb.Response, error) { func (srv *Server) handleDestroyEtcdAgent() (*rpcpb.Response, error) {
srv.logger.Info("killing etcd", zap.String("signal", syscall.SIGTERM.String())) srv.lg.Info("killing etcd", zap.String("signal", syscall.SIGTERM.String()))
err := stopWithSig(srv.etcdCmd, syscall.SIGTERM) err := stopWithSig(srv.etcdCmd, syscall.SIGTERM)
if err != nil { if err != nil {
return nil, err return nil, err
} }
srv.logger.Info("killed etcd", zap.String("signal", syscall.SIGTERM.String())) srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGTERM.String()))
srv.logger.Info("removing base directory", zap.String("dir", srv.Member.BaseDir)) srv.lg.Info("removing base directory", zap.String("dir", srv.Member.BaseDir))
err = os.RemoveAll(srv.Member.BaseDir) err = os.RemoveAll(srv.Member.BaseDir)
if err != nil { if err != nil {
return nil, err return nil, err
} }
srv.logger.Info("removed base directory", zap.String("dir", srv.Member.BaseDir)) srv.lg.Info("removed base directory", zap.String("dir", srv.Member.BaseDir))
// stop agent server // stop agent server
srv.Stop() srv.Stop()
for port, px := range srv.advertiseClientPortToProxy { for port, px := range srv.advertiseClientPortToProxy {
srv.logger.Info("closing proxy", zap.Int("client-port", port)) srv.lg.Info("closing proxy", zap.Int("client-port", port))
err := px.Close() err := px.Close()
srv.logger.Info("closed proxy", zap.Int("client-port", port), zap.Error(err)) srv.lg.Info("closed proxy", zap.Int("client-port", port), zap.Error(err))
} }
for port, px := range srv.advertisePeerPortToProxy { for port, px := range srv.advertisePeerPortToProxy {
srv.logger.Info("closing proxy", zap.Int("peer-port", port)) srv.lg.Info("closing proxy", zap.Int("peer-port", port))
err := px.Close() err := px.Close()
srv.logger.Info("closed proxy", zap.Int("peer-port", port), zap.Error(err)) srv.lg.Info("closed proxy", zap.Int("peer-port", port), zap.Error(err))
} }
return &rpcpb.Response{ return &rpcpb.Response{
@ -365,10 +365,10 @@ func (srv *Server) handleDestroyEtcdAgent() (*rpcpb.Response, error) {
func (srv *Server) handleBlackholePeerPortTxRx() (*rpcpb.Response, error) { func (srv *Server) handleBlackholePeerPortTxRx() (*rpcpb.Response, error) {
for port, px := range srv.advertisePeerPortToProxy { for port, px := range srv.advertisePeerPortToProxy {
srv.logger.Info("blackholing", zap.Int("peer-port", port)) srv.lg.Info("blackholing", zap.Int("peer-port", port))
px.BlackholeTx() px.BlackholeTx()
px.BlackholeRx() px.BlackholeRx()
srv.logger.Info("blackholed", zap.Int("peer-port", port)) srv.lg.Info("blackholed", zap.Int("peer-port", port))
} }
return &rpcpb.Response{ return &rpcpb.Response{
Success: true, Success: true,
@ -378,10 +378,10 @@ func (srv *Server) handleBlackholePeerPortTxRx() (*rpcpb.Response, error) {
func (srv *Server) handleUnblackholePeerPortTxRx() (*rpcpb.Response, error) { func (srv *Server) handleUnblackholePeerPortTxRx() (*rpcpb.Response, error) {
for port, px := range srv.advertisePeerPortToProxy { for port, px := range srv.advertisePeerPortToProxy {
srv.logger.Info("unblackholing", zap.Int("peer-port", port)) srv.lg.Info("unblackholing", zap.Int("peer-port", port))
px.UnblackholeTx() px.UnblackholeTx()
px.UnblackholeRx() px.UnblackholeRx()
srv.logger.Info("unblackholed", zap.Int("peer-port", port)) srv.lg.Info("unblackholed", zap.Int("peer-port", port))
} }
return &rpcpb.Response{ return &rpcpb.Response{
Success: true, Success: true,
@ -394,14 +394,14 @@ func (srv *Server) handleDelayPeerPortTxRx() (*rpcpb.Response, error) {
rv := time.Duration(srv.Tester.DelayLatencyMsRv) * time.Millisecond rv := time.Duration(srv.Tester.DelayLatencyMsRv) * time.Millisecond
for port, px := range srv.advertisePeerPortToProxy { for port, px := range srv.advertisePeerPortToProxy {
srv.logger.Info("delaying", srv.lg.Info("delaying",
zap.Int("peer-port", port), zap.Int("peer-port", port),
zap.Duration("latency", lat), zap.Duration("latency", lat),
zap.Duration("random-variable", rv), zap.Duration("random-variable", rv),
) )
px.DelayTx(lat, rv) px.DelayTx(lat, rv)
px.DelayRx(lat, rv) px.DelayRx(lat, rv)
srv.logger.Info("delayed", srv.lg.Info("delayed",
zap.Int("peer-port", port), zap.Int("peer-port", port),
zap.Duration("latency", lat), zap.Duration("latency", lat),
zap.Duration("random-variable", rv), zap.Duration("random-variable", rv),
@ -416,10 +416,10 @@ func (srv *Server) handleDelayPeerPortTxRx() (*rpcpb.Response, error) {
func (srv *Server) handleUndelayPeerPortTxRx() (*rpcpb.Response, error) { func (srv *Server) handleUndelayPeerPortTxRx() (*rpcpb.Response, error) {
for port, px := range srv.advertisePeerPortToProxy { for port, px := range srv.advertisePeerPortToProxy {
srv.logger.Info("undelaying", zap.Int("peer-port", port)) srv.lg.Info("undelaying", zap.Int("peer-port", port))
px.UndelayTx() px.UndelayTx()
px.UndelayRx() px.UndelayRx()
srv.logger.Info("undelayed", zap.Int("peer-port", port)) srv.lg.Info("undelayed", zap.Int("peer-port", port))
} }
return &rpcpb.Response{ return &rpcpb.Response{
Success: true, Success: true,

View File

@ -34,7 +34,7 @@ import (
// serialized in tester-side // serialized in tester-side
type Server struct { type Server struct {
grpcServer *grpc.Server grpcServer *grpc.Server
logger *zap.Logger lg *zap.Logger
network string network string
address string address string
@ -56,12 +56,12 @@ type Server struct {
// NewServer returns a new agent server. // NewServer returns a new agent server.
func NewServer( func NewServer(
logger *zap.Logger, lg *zap.Logger,
network string, network string,
address string, address string,
) *Server { ) *Server {
return &Server{ return &Server{
logger: logger, lg: lg,
network: network, network: network,
address: address, address: address,
last: rpcpb.Operation_NotStarted, last: rpcpb.Operation_NotStarted,
@ -93,34 +93,33 @@ func (srv *Server) StartServe() error {
rpcpb.RegisterTransportServer(srv.grpcServer, srv) rpcpb.RegisterTransportServer(srv.grpcServer, srv)
srv.logger.Info( srv.lg.Info(
"gRPC server started", "gRPC server started",
zap.String("address", srv.address), zap.String("address", srv.address),
zap.String("listener-address", srv.ln.Addr().String()), zap.String("listener-address", srv.ln.Addr().String()),
) )
err = srv.grpcServer.Serve(srv.ln) err = srv.grpcServer.Serve(srv.ln)
if err != nil && strings.Contains(err.Error(), "use of closed network connection") { if err != nil && strings.Contains(err.Error(), "use of closed network connection") {
srv.logger.Info( srv.lg.Info(
"gRPC server is shut down", "gRPC server is shut down",
zap.String("address", srv.address), zap.String("address", srv.address),
zap.Error(err), zap.Error(err),
) )
} else { } else {
srv.logger.Warn( srv.lg.Warn(
"gRPC server returned with error", "gRPC server returned with error",
zap.String("address", srv.address), zap.String("address", srv.address),
zap.Error(err), zap.Error(err),
) )
} }
return err return err
} }
// Stop stops serving gRPC server. // Stop stops serving gRPC server.
func (srv *Server) Stop() { func (srv *Server) Stop() {
srv.logger.Info("gRPC server stopping", zap.String("address", srv.address)) srv.lg.Info("gRPC server stopping", zap.String("address", srv.address))
srv.grpcServer.Stop() srv.grpcServer.Stop()
srv.logger.Info("gRPC server stopped", zap.String("address", srv.address)) srv.lg.Info("gRPC server stopped", zap.String("address", srv.address))
} }
// Transport communicates with etcd tester. // Transport communicates with etcd tester.

View File

@ -32,12 +32,12 @@ var dialOpts = []grpc.DialOption{
} }
// DialEtcdGRPCServer creates a raw gRPC connection to an etcd member. // DialEtcdGRPCServer creates a raw gRPC connection to an etcd member.
func (m *Member) DialEtcdGRPCServer() (*grpc.ClientConn, error) { func (m *Member) DialEtcdGRPCServer(opts ...grpc.DialOption) (*grpc.ClientConn, error) {
if m.EtcdClientTLS { if m.EtcdClientTLS {
// TODO: support TLS // TODO: support TLS
panic("client TLS not supported yet") panic("client TLS not supported yet")
} }
return grpc.Dial(m.EtcdClientEndpoint, dialOpts...) return grpc.Dial(m.EtcdClientEndpoint, append(dialOpts, opts...)...)
} }
// CreateEtcdClient creates a client from member. // CreateEtcdClient creates a client from member.

View File

@ -22,6 +22,7 @@ import (
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/tools/functional-tester/rpcpb"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -29,6 +30,7 @@ import (
const retries = 7 const retries = 7
// Checker checks cluster consistency.
type Checker interface { type Checker interface {
// Check returns an error if the system fails a consistency check. // Check returns an error if the system fails a consistency check.
Check() error Check() error
@ -39,14 +41,14 @@ type hashAndRevGetter interface {
} }
type hashChecker struct { type hashChecker struct {
logger *zap.Logger lg *zap.Logger
hrg hashAndRevGetter hrg hashAndRevGetter
} }
func newHashChecker(logger *zap.Logger, hrg hashAndRevGetter) Checker { func newHashChecker(lg *zap.Logger, hrg hashAndRevGetter) Checker {
return &hashChecker{ return &hashChecker{
logger: logger, lg: lg,
hrg: hrg, hrg: hrg,
} }
} }
@ -57,12 +59,11 @@ func (hc *hashChecker) checkRevAndHashes() (err error) {
revs map[string]int64 revs map[string]int64
hashes map[string]int64 hashes map[string]int64
) )
// retries in case of transient failure or etcd cluster has not stablized yet. // retries in case of transient failure or etcd cluster has not stablized yet.
for i := 0; i < retries; i++ { for i := 0; i < retries; i++ {
revs, hashes, err = hc.hrg.getRevisionHash() revs, hashes, err = hc.hrg.getRevisionHash()
if err != nil { if err != nil {
hc.logger.Warn( hc.lg.Warn(
"failed to get revision and hash", "failed to get revision and hash",
zap.Int("retries", i), zap.Int("retries", i),
zap.Error(err), zap.Error(err),
@ -73,7 +74,7 @@ func (hc *hashChecker) checkRevAndHashes() (err error) {
if sameRev && sameHashes { if sameRev && sameHashes {
return nil return nil
} }
hc.logger.Warn( hc.lg.Warn(
"retrying; etcd cluster is not stable", "retrying; etcd cluster is not stable",
zap.Int("retries", i), zap.Int("retries", i),
zap.Bool("same-revisions", sameRev), zap.Bool("same-revisions", sameRev),
@ -97,19 +98,17 @@ func (hc *hashChecker) Check() error {
} }
type leaseChecker struct { type leaseChecker struct {
logger *zap.Logger lg *zap.Logger
m *rpcpb.Member
endpoint string // TODO: use Member ls *leaseStresser
lsc pb.LeaseClient
ls *leaseStresser kvc pb.KVClient
leaseClient pb.LeaseClient
kvc pb.KVClient
} }
func (lc *leaseChecker) Check() error { func (lc *leaseChecker) Check() error {
conn, err := grpc.Dial(lc.ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1)) conn, err := lc.m.DialEtcdGRPCServer(grpc.WithBackoffMaxDelay(time.Second))
if err != nil { if err != nil {
return fmt.Errorf("%v (%s)", err, lc.ls.endpoint) return fmt.Errorf("%v (%q)", err, lc.m.EtcdClientEndpoint)
} }
defer func() { defer func() {
if conn != nil { if conn != nil {
@ -117,7 +116,7 @@ func (lc *leaseChecker) Check() error {
} }
}() }()
lc.kvc = pb.NewKVClient(conn) lc.kvc = pb.NewKVClient(conn)
lc.leaseClient = pb.NewLeaseClient(conn) lc.lsc = pb.NewLeaseClient(conn)
if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil { if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil {
return err return err
} }
@ -157,7 +156,7 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64)
return nil return nil
} }
if err != nil { if err != nil {
lc.logger.Debug( lc.lg.Debug(
"retrying; Lease TimeToLive failed", "retrying; Lease TimeToLive failed",
zap.Int("retries", i), zap.Int("retries", i),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
@ -167,7 +166,7 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64)
} }
if resp.TTL > 0 { if resp.TTL > 0 {
dur := time.Duration(resp.TTL) * time.Second dur := time.Duration(resp.TTL) * time.Second
lc.logger.Debug( lc.lg.Debug(
"lease has not been expired, wait until expire", "lease has not been expired, wait until expire",
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Int64("ttl", resp.TTL), zap.Int64("ttl", resp.TTL),
@ -175,7 +174,7 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64)
) )
time.Sleep(dur) time.Sleep(dur)
} else { } else {
lc.logger.Debug( lc.lg.Debug(
"lease expired but not yet revoked", "lease expired but not yet revoked",
zap.Int("retries", i), zap.Int("retries", i),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
@ -195,18 +194,18 @@ func (lc *leaseChecker) checkShortLivedLease(ctx context.Context, leaseID int64)
func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID int64) error { func (lc *leaseChecker) checkLease(ctx context.Context, expired bool, leaseID int64) error {
keysExpired, err := lc.hasKeysAttachedToLeaseExpired(ctx, leaseID) keysExpired, err := lc.hasKeysAttachedToLeaseExpired(ctx, leaseID)
if err != nil { if err != nil {
lc.logger.Warn( lc.lg.Warn(
"hasKeysAttachedToLeaseExpired failed", "hasKeysAttachedToLeaseExpired failed",
zap.String("endpoint", lc.endpoint), zap.String("endpoint", lc.m.EtcdClientEndpoint),
zap.Error(err), zap.Error(err),
) )
return err return err
} }
leaseExpired, err := lc.hasLeaseExpired(ctx, leaseID) leaseExpired, err := lc.hasLeaseExpired(ctx, leaseID)
if err != nil { if err != nil {
lc.logger.Warn( lc.lg.Warn(
"hasLeaseExpired failed", "hasLeaseExpired failed",
zap.String("endpoint", lc.endpoint), zap.String("endpoint", lc.m.EtcdClientEndpoint),
zap.Error(err), zap.Error(err),
) )
return err return err
@ -233,7 +232,7 @@ func (lc *leaseChecker) check(expired bool, leases map[int64]time.Time) error {
func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*pb.LeaseTimeToLiveResponse, error) { func (lc *leaseChecker) getLeaseByID(ctx context.Context, leaseID int64) (*pb.LeaseTimeToLiveResponse, error) {
ltl := &pb.LeaseTimeToLiveRequest{ID: leaseID, Keys: true} ltl := &pb.LeaseTimeToLiveRequest{ID: leaseID, Keys: true}
return lc.leaseClient.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false)) return lc.lsc.LeaseTimeToLive(ctx, ltl, grpc.FailFast(false))
} }
func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) { func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
@ -248,9 +247,9 @@ func (lc *leaseChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (boo
} else { } else {
return resp.TTL == -1, nil return resp.TTL == -1, nil
} }
lc.logger.Warn( lc.lg.Warn(
"hasLeaseExpired getLeaseByID failed", "hasLeaseExpired getLeaseByID failed",
zap.String("endpoint", lc.endpoint), zap.String("endpoint", lc.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err), zap.Error(err),
) )
@ -267,9 +266,9 @@ func (lc *leaseChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, lease
RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))), RangeEnd: []byte(clientv3.GetPrefixRangeEnd(fmt.Sprintf("%d", leaseID))),
}, grpc.FailFast(false)) }, grpc.FailFast(false))
if err != nil { if err != nil {
lc.logger.Warn( lc.lg.Warn(
"hasKeysAttachedToLeaseExpired failed", "hasKeysAttachedToLeaseExpired failed",
zap.String("endpoint", lc.endpoint), zap.String("endpoint", lc.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err), zap.Error(err),
) )

View File

@ -38,7 +38,7 @@ import (
// Cluster defines tester cluster. // Cluster defines tester cluster.
type Cluster struct { type Cluster struct {
logger *zap.Logger lg *zap.Logger
agentConns []*grpc.ClientConn agentConns []*grpc.ClientConn
agentClients []rpcpb.TransportClient agentClients []rpcpb.TransportClient
@ -61,15 +61,15 @@ type Cluster struct {
cs int cs int
} }
func newCluster(logger *zap.Logger, fpath string) (*Cluster, error) { func newCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
logger.Info("reading configuration file", zap.String("path", fpath)) lg.Info("reading configuration file", zap.String("path", fpath))
bts, err := ioutil.ReadFile(fpath) bts, err := ioutil.ReadFile(fpath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
logger.Info("opened configuration file", zap.String("path", fpath)) lg.Info("opened configuration file", zap.String("path", fpath))
clus := &Cluster{logger: logger} clus := &Cluster{lg: lg}
if err = yaml.Unmarshal(bts, clus); err != nil { if err = yaml.Unmarshal(bts, clus); err != nil {
return nil, err return nil, err
} }
@ -192,8 +192,8 @@ var dialOpts = []grpc.DialOption{
} }
// NewCluster creates a client from a tester configuration. // NewCluster creates a client from a tester configuration.
func NewCluster(logger *zap.Logger, fpath string) (*Cluster, error) { func NewCluster(lg *zap.Logger, fpath string) (*Cluster, error) {
clus, err := newCluster(logger, fpath) clus, err := newCluster(lg, fpath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -205,21 +205,21 @@ func NewCluster(logger *zap.Logger, fpath string) (*Cluster, error) {
clus.failures = make([]Failure, 0) clus.failures = make([]Failure, 0)
for i, ap := range clus.Members { for i, ap := range clus.Members {
logger.Info("connecting", zap.String("agent-address", ap.AgentAddr)) clus.lg.Info("connecting", zap.String("agent-address", ap.AgentAddr))
var err error var err error
clus.agentConns[i], err = grpc.Dial(ap.AgentAddr, dialOpts...) clus.agentConns[i], err = grpc.Dial(ap.AgentAddr, dialOpts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
clus.agentClients[i] = rpcpb.NewTransportClient(clus.agentConns[i]) clus.agentClients[i] = rpcpb.NewTransportClient(clus.agentConns[i])
logger.Info("connected", zap.String("agent-address", ap.AgentAddr)) clus.lg.Info("connected", zap.String("agent-address", ap.AgentAddr))
logger.Info("creating stream", zap.String("agent-address", ap.AgentAddr)) clus.lg.Info("creating stream", zap.String("agent-address", ap.AgentAddr))
clus.agentStreams[i], err = clus.agentClients[i].Transport(context.Background()) clus.agentStreams[i], err = clus.agentClients[i].Transport(context.Background())
if err != nil { if err != nil {
return nil, err return nil, err
} }
logger.Info("created stream", zap.String("agent-address", ap.AgentAddr)) clus.lg.Info("created stream", zap.String("agent-address", ap.AgentAddr))
} }
mux := http.NewServeMux() mux := http.NewServeMux()
@ -246,18 +246,18 @@ func NewCluster(logger *zap.Logger, fpath string) (*Cluster, error) {
} }
func (clus *Cluster) serveTesterServer() { func (clus *Cluster) serveTesterServer() {
clus.logger.Info( clus.lg.Info(
"started tester HTTP server", "started tester HTTP server",
zap.String("tester-address", clus.Tester.TesterAddr), zap.String("tester-address", clus.Tester.TesterAddr),
) )
err := clus.testerHTTPServer.ListenAndServe() err := clus.testerHTTPServer.ListenAndServe()
clus.logger.Info( clus.lg.Info(
"tester HTTP server returned", "tester HTTP server returned",
zap.String("tester-address", clus.Tester.TesterAddr), zap.String("tester-address", clus.Tester.TesterAddr),
zap.Error(err), zap.Error(err),
) )
if err != nil && err != http.ErrServerClosed { if err != nil && err != http.ErrServerClosed {
clus.logger.Fatal("tester HTTP errored", zap.Error(err)) clus.lg.Fatal("tester HTTP errored", zap.Error(err))
} }
} }
@ -291,7 +291,7 @@ func (clus *Cluster) updateFailures() {
case "FAILPOINTS": case "FAILPOINTS":
fpFailures, fperr := failpointFailures(clus) fpFailures, fperr := failpointFailures(clus)
if len(fpFailures) == 0 { if len(fpFailures) == 0 {
clus.logger.Info("no failpoints found!", zap.Error(fperr)) clus.lg.Info("no failpoints found!", zap.Error(fperr))
} }
clus.failures = append(clus.failures, fpFailures...) clus.failures = append(clus.failures, fpFailures...)
case "NO_FAIL": case "NO_FAIL":
@ -316,13 +316,13 @@ func (clus *Cluster) shuffleFailures() {
n := len(clus.failures) n := len(clus.failures)
cp := coprime(n) cp := coprime(n)
clus.logger.Info("shuffling test failure cases", zap.Int("total", n)) clus.lg.Info("shuffling test failure cases", zap.Int("total", n))
fs := make([]Failure, n) fs := make([]Failure, n)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
fs[i] = clus.failures[(cp*i+offset)%n] fs[i] = clus.failures[(cp*i+offset)%n]
} }
clus.failures = fs clus.failures = fs
clus.logger.Info("shuffled test failure cases", zap.Int("total", n)) clus.lg.Info("shuffled test failure cases", zap.Int("total", n))
} }
/* /*
@ -354,7 +354,7 @@ func gcd(x, y int) int {
} }
func (clus *Cluster) updateStresserChecker() { func (clus *Cluster) updateStresserChecker() {
clus.logger.Info( clus.lg.Info(
"updating stressers", "updating stressers",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
@ -367,7 +367,7 @@ func (clus *Cluster) updateStresserChecker() {
clus.stresser = cs clus.stresser = cs
if clus.Tester.ConsistencyCheck { if clus.Tester.ConsistencyCheck {
clus.checker = newHashChecker(clus.logger, hashAndRevGetter(clus)) clus.checker = newHashChecker(clus.lg, hashAndRevGetter(clus))
if schk := cs.Checker(); schk != nil { if schk := cs.Checker(); schk != nil {
clus.checker = newCompositeChecker([]Checker{clus.checker, schk}) clus.checker = newCompositeChecker([]Checker{clus.checker, schk})
} }
@ -375,7 +375,7 @@ func (clus *Cluster) updateStresserChecker() {
clus.checker = newNoChecker() clus.checker = newNoChecker()
} }
clus.logger.Info( clus.lg.Info(
"updated stressers", "updated stressers",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
@ -383,13 +383,13 @@ func (clus *Cluster) updateStresserChecker() {
} }
func (clus *Cluster) startStresser() (err error) { func (clus *Cluster) startStresser() (err error) {
clus.logger.Info( clus.lg.Info(
"starting stressers", "starting stressers",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
) )
err = clus.stresser.Stress() err = clus.stresser.Stress()
clus.logger.Info( clus.lg.Info(
"started stressers", "started stressers",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
@ -398,13 +398,13 @@ func (clus *Cluster) startStresser() (err error) {
} }
func (clus *Cluster) closeStresser() { func (clus *Cluster) closeStresser() {
clus.logger.Info( clus.lg.Info(
"closing stressers", "closing stressers",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
) )
clus.stresser.Close() clus.stresser.Close()
clus.logger.Info( clus.lg.Info(
"closed stressers", "closed stressers",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
@ -412,13 +412,13 @@ func (clus *Cluster) closeStresser() {
} }
func (clus *Cluster) pauseStresser() { func (clus *Cluster) pauseStresser() {
clus.logger.Info( clus.lg.Info(
"pausing stressers", "pausing stressers",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
) )
clus.stresser.Pause() clus.stresser.Pause()
clus.logger.Info( clus.lg.Info(
"paused stressers", "paused stressers",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
@ -431,7 +431,7 @@ func (clus *Cluster) checkConsistency() (err error) {
return return
} }
if err = clus.updateRevision(); err != nil { if err = clus.updateRevision(); err != nil {
clus.logger.Warn( clus.lg.Warn(
"updateRevision failed", "updateRevision failed",
zap.Error(err), zap.Error(err),
) )
@ -440,20 +440,20 @@ func (clus *Cluster) checkConsistency() (err error) {
err = clus.startStresser() err = clus.startStresser()
}() }()
clus.logger.Info( clus.lg.Info(
"checking consistency and invariant of cluster", "checking consistency and invariant of cluster",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
zap.String("desc", clus.failures[clus.cs].Desc()), zap.String("desc", clus.failures[clus.cs].Desc()),
) )
if err = clus.checker.Check(); err != nil { if err = clus.checker.Check(); err != nil {
clus.logger.Warn( clus.lg.Warn(
"checker.Check failed", "checker.Check failed",
zap.Error(err), zap.Error(err),
) )
return err return err
} }
clus.logger.Info( clus.lg.Info(
"checked consistency and invariant of cluster", "checked consistency and invariant of cluster",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
@ -488,7 +488,7 @@ func (clus *Cluster) broadcastOperation(op rpcpb.Operation) error {
strings.Contains(err.Error(), "rpc error: code = Unavailable desc = transport is closing") { strings.Contains(err.Error(), "rpc error: code = Unavailable desc = transport is closing") {
// agent server has already closed; // agent server has already closed;
// so this error is expected // so this error is expected
clus.logger.Info( clus.lg.Info(
"successfully destroyed", "successfully destroyed",
zap.String("member", clus.Members[i].EtcdClientEndpoint), zap.String("member", clus.Members[i].EtcdClientEndpoint),
) )
@ -511,13 +511,13 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
clus.agentRequests[idx].Operation = op clus.agentRequests[idx].Operation = op
} }
clus.logger.Info( clus.lg.Info(
"sending request", "sending request",
zap.String("operation", op.String()), zap.String("operation", op.String()),
zap.String("to", clus.Members[idx].EtcdClientEndpoint), zap.String("to", clus.Members[idx].EtcdClientEndpoint),
) )
err := clus.agentStreams[idx].Send(clus.agentRequests[idx]) err := clus.agentStreams[idx].Send(clus.agentRequests[idx])
clus.logger.Info( clus.lg.Info(
"sent request", "sent request",
zap.String("operation", op.String()), zap.String("operation", op.String()),
zap.String("to", clus.Members[idx].EtcdClientEndpoint), zap.String("to", clus.Members[idx].EtcdClientEndpoint),
@ -527,14 +527,14 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
return err return err
} }
clus.logger.Info( clus.lg.Info(
"receiving response", "receiving response",
zap.String("operation", op.String()), zap.String("operation", op.String()),
zap.String("from", clus.Members[idx].EtcdClientEndpoint), zap.String("from", clus.Members[idx].EtcdClientEndpoint),
) )
resp, err := clus.agentStreams[idx].Recv() resp, err := clus.agentStreams[idx].Recv()
if resp != nil { if resp != nil {
clus.logger.Info( clus.lg.Info(
"received response", "received response",
zap.String("operation", op.String()), zap.String("operation", op.String()),
zap.String("from", clus.Members[idx].EtcdClientEndpoint), zap.String("from", clus.Members[idx].EtcdClientEndpoint),
@ -543,7 +543,7 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
zap.Error(err), zap.Error(err),
) )
} else { } else {
clus.logger.Info( clus.lg.Info(
"received empty response", "received empty response",
zap.String("operation", op.String()), zap.String("operation", op.String()),
zap.String("from", clus.Members[idx].EtcdClientEndpoint), zap.String("from", clus.Members[idx].EtcdClientEndpoint),
@ -562,26 +562,26 @@ func (clus *Cluster) sendOperation(idx int, op rpcpb.Operation) error {
// DestroyEtcdAgents terminates all tester connections to agents and etcd servers. // DestroyEtcdAgents terminates all tester connections to agents and etcd servers.
func (clus *Cluster) DestroyEtcdAgents() { func (clus *Cluster) DestroyEtcdAgents() {
clus.logger.Info("destroying etcd servers and agents") clus.lg.Info("destroying etcd servers and agents")
err := clus.broadcastOperation(rpcpb.Operation_DestroyEtcdAgent) err := clus.broadcastOperation(rpcpb.Operation_DestroyEtcdAgent)
if err != nil { if err != nil {
clus.logger.Warn("failed to destroy etcd servers and agents", zap.Error(err)) clus.lg.Warn("failed to destroy etcd servers and agents", zap.Error(err))
} else { } else {
clus.logger.Info("destroyed etcd servers and agents") clus.lg.Info("destroyed etcd servers and agents")
} }
for i, conn := range clus.agentConns { for i, conn := range clus.agentConns {
clus.logger.Info("closing connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr)) clus.lg.Info("closing connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr))
err := conn.Close() err := conn.Close()
clus.logger.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err)) clus.lg.Info("closed connection to agent", zap.String("agent-address", clus.Members[i].AgentAddr), zap.Error(err))
} }
if clus.testerHTTPServer != nil { if clus.testerHTTPServer != nil {
clus.logger.Info("closing tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr)) clus.lg.Info("closing tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr))
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
err := clus.testerHTTPServer.Shutdown(ctx) err := clus.testerHTTPServer.Shutdown(ctx)
cancel() cancel()
clus.logger.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr), zap.Error(err)) clus.lg.Info("closed tester HTTP server", zap.String("tester-address", clus.Tester.TesterAddr), zap.Error(err))
} }
} }
@ -595,13 +595,13 @@ func (clus *Cluster) WaitHealth() error {
// reasonable workload (https://github.com/coreos/etcd/issues/2698) // reasonable workload (https://github.com/coreos/etcd/issues/2698)
for i := 0; i < 60; i++ { for i := 0; i < 60; i++ {
for _, m := range clus.Members { for _, m := range clus.Members {
clus.logger.Info( clus.lg.Info(
"writing health key", "writing health key",
zap.Int("retries", i), zap.Int("retries", i),
zap.String("endpoint", m.EtcdClientEndpoint), zap.String("endpoint", m.EtcdClientEndpoint),
) )
if err = m.WriteHealthKey(); err != nil { if err = m.WriteHealthKey(); err != nil {
clus.logger.Warn( clus.lg.Warn(
"writing health key failed", "writing health key failed",
zap.Int("retries", i), zap.Int("retries", i),
zap.String("endpoint", m.EtcdClientEndpoint), zap.String("endpoint", m.EtcdClientEndpoint),
@ -609,14 +609,14 @@ func (clus *Cluster) WaitHealth() error {
) )
break break
} }
clus.logger.Info( clus.lg.Info(
"wrote health key", "wrote health key",
zap.Int("retries", i), zap.Int("retries", i),
zap.String("endpoint", m.EtcdClientEndpoint), zap.String("endpoint", m.EtcdClientEndpoint),
) )
} }
if err == nil { if err == nil {
clus.logger.Info( clus.lg.Info(
"writing health key success on all members", "writing health key success on all members",
zap.Int("retries", i), zap.Int("retries", i),
) )
@ -683,7 +683,7 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
for i, m := range clus.Members { for i, m := range clus.Members {
conn, derr := m.DialEtcdGRPCServer() conn, derr := m.DialEtcdGRPCServer()
if derr != nil { if derr != nil {
clus.logger.Warn( clus.lg.Warn(
"compactKV dial failed", "compactKV dial failed",
zap.String("endpoint", m.EtcdClientEndpoint), zap.String("endpoint", m.EtcdClientEndpoint),
zap.Error(derr), zap.Error(derr),
@ -693,7 +693,7 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
} }
kvc := pb.NewKVClient(conn) kvc := pb.NewKVClient(conn)
clus.logger.Info( clus.lg.Info(
"compacting", "compacting",
zap.String("endpoint", m.EtcdClientEndpoint), zap.String("endpoint", m.EtcdClientEndpoint),
zap.Int64("compact-revision", rev), zap.Int64("compact-revision", rev),
@ -709,14 +709,14 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
succeed := true succeed := true
if cerr != nil { if cerr != nil {
if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 { if strings.Contains(cerr.Error(), "required revision has been compacted") && i > 0 {
clus.logger.Info( clus.lg.Info(
"compact error is ignored", "compact error is ignored",
zap.String("endpoint", m.EtcdClientEndpoint), zap.String("endpoint", m.EtcdClientEndpoint),
zap.Int64("compact-revision", rev), zap.Int64("compact-revision", rev),
zap.Error(cerr), zap.Error(cerr),
) )
} else { } else {
clus.logger.Warn( clus.lg.Warn(
"compact failed", "compact failed",
zap.String("endpoint", m.EtcdClientEndpoint), zap.String("endpoint", m.EtcdClientEndpoint),
zap.Int64("compact-revision", rev), zap.Int64("compact-revision", rev),
@ -728,7 +728,7 @@ func (clus *Cluster) compactKV(rev int64, timeout time.Duration) (err error) {
} }
if succeed { if succeed {
clus.logger.Info( clus.lg.Info(
"compacted", "compacted",
zap.String("endpoint", m.EtcdClientEndpoint), zap.String("endpoint", m.EtcdClientEndpoint),
zap.Int64("compact-revision", rev), zap.Int64("compact-revision", rev),
@ -753,14 +753,14 @@ func (clus *Cluster) checkCompact(rev int64) error {
} }
func (clus *Cluster) defrag() error { func (clus *Cluster) defrag() error {
clus.logger.Info( clus.lg.Info(
"defragmenting", "defragmenting",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
) )
for _, m := range clus.Members { for _, m := range clus.Members {
if err := m.Defrag(); err != nil { if err := m.Defrag(); err != nil {
clus.logger.Warn( clus.lg.Warn(
"defrag failed", "defrag failed",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
@ -769,7 +769,7 @@ func (clus *Cluster) defrag() error {
return err return err
} }
} }
clus.logger.Info( clus.lg.Info(
"defragmented", "defragmented",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),

View File

@ -156,13 +156,13 @@ func Test_newCluster(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
cfg.logger = nil cfg.lg = nil
if !reflect.DeepEqual(exp, cfg) { if !reflect.DeepEqual(exp, cfg) {
t.Fatalf("expected %+v, got %+v", exp, cfg) t.Fatalf("expected %+v, got %+v", exp, cfg)
} }
cfg.logger = logger cfg.lg = logger
cfg.updateFailures() cfg.updateFailures()
fs1 := cfg.failureStrings() fs1 := cfg.failureStrings()

View File

@ -37,7 +37,7 @@ func (clus *Cluster) StartTester() {
clus.rd = round clus.rd = round
if err := clus.doRound(); err != nil { if err := clus.doRound(); err != nil {
clus.logger.Warn( clus.lg.Warn(
"doRound failed; returning", "doRound failed; returning",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
@ -60,21 +60,21 @@ func (clus *Cluster) StartTester() {
preModifiedKey = currentModifiedKey preModifiedKey = currentModifiedKey
timeout := 10 * time.Second timeout := 10 * time.Second
timeout += time.Duration(modifiedKey/compactQPS) * time.Second timeout += time.Duration(modifiedKey/compactQPS) * time.Second
clus.logger.Info( clus.lg.Info(
"compacting", "compacting",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
zap.Duration("timeout", timeout), zap.Duration("timeout", timeout),
) )
if err := clus.compact(revToCompact, timeout); err != nil { if err := clus.compact(revToCompact, timeout); err != nil {
clus.logger.Warn( clus.lg.Warn(
"compact failed", "compact failed",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
zap.Error(err), zap.Error(err),
) )
if err = clus.cleanup(); err != nil { if err = clus.cleanup(); err != nil {
clus.logger.Warn( clus.lg.Warn(
"cleanup failed", "cleanup failed",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
@ -87,7 +87,7 @@ func (clus *Cluster) StartTester() {
} }
if round > 0 && round%500 == 0 { // every 500 rounds if round > 0 && round%500 == 0 { // every 500 rounds
if err := clus.defrag(); err != nil { if err := clus.defrag(); err != nil {
clus.logger.Warn( clus.lg.Warn(
"defrag failed; returning", "defrag failed; returning",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
@ -99,7 +99,7 @@ func (clus *Cluster) StartTester() {
} }
} }
clus.logger.Info( clus.lg.Info(
"functional-tester passed", "functional-tester passed",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
@ -111,7 +111,7 @@ func (clus *Cluster) doRound() error {
clus.shuffleFailures() clus.shuffleFailures()
} }
clus.logger.Info( clus.lg.Info(
"starting round", "starting round",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Strings("failures", clus.failureStrings()), zap.Strings("failures", clus.failureStrings()),
@ -121,12 +121,12 @@ func (clus *Cluster) doRound() error {
caseTotalCounter.WithLabelValues(f.Desc()).Inc() caseTotalCounter.WithLabelValues(f.Desc()).Inc()
clus.logger.Info("wait health before injecting failures") clus.lg.Info("wait health before injecting failures")
if err := clus.WaitHealth(); err != nil { if err := clus.WaitHealth(); err != nil {
return fmt.Errorf("wait full health error: %v", err) return fmt.Errorf("wait full health error: %v", err)
} }
clus.logger.Info( clus.lg.Info(
"injecting failure", "injecting failure",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
@ -135,7 +135,7 @@ func (clus *Cluster) doRound() error {
if err := f.Inject(clus); err != nil { if err := f.Inject(clus); err != nil {
return fmt.Errorf("injection error: %v", err) return fmt.Errorf("injection error: %v", err)
} }
clus.logger.Info( clus.lg.Info(
"injected failure", "injected failure",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
@ -145,7 +145,7 @@ func (clus *Cluster) doRound() error {
// if run local, recovering server may conflict // if run local, recovering server may conflict
// with stressing client ports // with stressing client ports
// TODO: use unix for local tests // TODO: use unix for local tests
clus.logger.Info( clus.lg.Info(
"recovering failure", "recovering failure",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
@ -154,26 +154,26 @@ func (clus *Cluster) doRound() error {
if err := f.Recover(clus); err != nil { if err := f.Recover(clus); err != nil {
return fmt.Errorf("recovery error: %v", err) return fmt.Errorf("recovery error: %v", err)
} }
clus.logger.Info( clus.lg.Info(
"recovered failure", "recovered failure",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
zap.String("desc", f.Desc()), zap.String("desc", f.Desc()),
) )
clus.logger.Info("pausing stresser after failure recovery, before wait health") clus.lg.Info("pausing stresser after failure recovery, before wait health")
clus.pauseStresser() clus.pauseStresser()
clus.logger.Info("wait health after recovering failures") clus.lg.Info("wait health after recovering failures")
if err := clus.WaitHealth(); err != nil { if err := clus.WaitHealth(); err != nil {
return fmt.Errorf("wait full health error: %v", err) return fmt.Errorf("wait full health error: %v", err)
} }
clus.logger.Info("check consistency after recovering failures") clus.lg.Info("check consistency after recovering failures")
if err := clus.checkConsistency(); err != nil { if err := clus.checkConsistency(); err != nil {
return fmt.Errorf("tt.checkConsistency error (%v)", err) return fmt.Errorf("tt.checkConsistency error (%v)", err)
} }
clus.logger.Info( clus.lg.Info(
"failure case passed", "failure case passed",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
@ -181,7 +181,7 @@ func (clus *Cluster) doRound() error {
) )
} }
clus.logger.Info( clus.lg.Info(
"finished round", "finished round",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Strings("failures", clus.failureStrings()), zap.Strings("failures", clus.failureStrings()),
@ -196,7 +196,7 @@ func (clus *Cluster) updateRevision() error {
break // just need get one of the current revisions break // just need get one of the current revisions
} }
clus.logger.Info( clus.lg.Info(
"updated current revision", "updated current revision",
zap.Int64("current-revision", clus.currentRevision), zap.Int64("current-revision", clus.currentRevision),
) )
@ -204,7 +204,7 @@ func (clus *Cluster) updateRevision() error {
} }
func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) { func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) {
clus.logger.Info("pausing stresser before compact") clus.lg.Info("pausing stresser before compact")
clus.pauseStresser() clus.pauseStresser()
defer func() { defer func() {
if err == nil { if err == nil {
@ -212,7 +212,7 @@ func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) {
} }
}() }()
clus.logger.Info( clus.lg.Info(
"compacting storage", "compacting storage",
zap.Int64("current-revision", clus.currentRevision), zap.Int64("current-revision", clus.currentRevision),
zap.Int64("compact-revision", rev), zap.Int64("compact-revision", rev),
@ -220,19 +220,19 @@ func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) {
if err = clus.compactKV(rev, timeout); err != nil { if err = clus.compactKV(rev, timeout); err != nil {
return err return err
} }
clus.logger.Info( clus.lg.Info(
"compacted storage", "compacted storage",
zap.Int64("current-revision", clus.currentRevision), zap.Int64("current-revision", clus.currentRevision),
zap.Int64("compact-revision", rev), zap.Int64("compact-revision", rev),
) )
clus.logger.Info( clus.lg.Info(
"checking compaction", "checking compaction",
zap.Int64("current-revision", clus.currentRevision), zap.Int64("current-revision", clus.currentRevision),
zap.Int64("compact-revision", rev), zap.Int64("compact-revision", rev),
) )
if err = clus.checkCompact(rev); err != nil { if err = clus.checkCompact(rev); err != nil {
clus.logger.Warn( clus.lg.Warn(
"checkCompact failed", "checkCompact failed",
zap.Int64("current-revision", clus.currentRevision), zap.Int64("current-revision", clus.currentRevision),
zap.Int64("compact-revision", rev), zap.Int64("compact-revision", rev),
@ -240,7 +240,7 @@ func (clus *Cluster) compact(rev int64, timeout time.Duration) (err error) {
) )
return err return err
} }
clus.logger.Info( clus.lg.Info(
"confirmed compaction", "confirmed compaction",
zap.Int64("current-revision", clus.currentRevision), zap.Int64("current-revision", clus.currentRevision),
zap.Int64("compact-revision", rev), zap.Int64("compact-revision", rev),
@ -254,7 +254,7 @@ func (clus *Cluster) failed() {
return return
} }
clus.logger.Info( clus.lg.Info(
"exiting on failure", "exiting on failure",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
@ -275,7 +275,7 @@ func (clus *Cluster) cleanup() error {
clus.closeStresser() clus.closeStresser()
if err := clus.FailArchive(); err != nil { if err := clus.FailArchive(); err != nil {
clus.logger.Warn( clus.lg.Warn(
"cleanup failed", "cleanup failed",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),
@ -284,7 +284,7 @@ func (clus *Cluster) cleanup() error {
return err return err
} }
if err := clus.Restart(); err != nil { if err := clus.Restart(); err != nil {
clus.logger.Warn( clus.lg.Warn(
"restart failed", "restart failed",
zap.Int("round", clus.rd), zap.Int("round", clus.rd),
zap.Int("case", clus.cs), zap.Int("case", clus.cs),

View File

@ -30,7 +30,7 @@ func (f *failureDelay) Inject(clus *Cluster) error {
return err return err
} }
if f.delayDuration > 0 { if f.delayDuration > 0 {
clus.logger.Info( clus.lg.Info(
"sleeping in failureDelay", "sleeping in failureDelay",
zap.Duration("delay", f.delayDuration), zap.Duration("delay", f.delayDuration),
zap.String("case", f.Failure.Desc()), zap.String("case", f.Failure.Desc()),

View File

@ -21,6 +21,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
// Stresser defines stressing client operations.
type Stresser interface { type Stresser interface {
// Stress starts to stress the etcd cluster // Stress starts to stress the etcd cluster
Stress() error Stress() error
@ -38,7 +39,7 @@ type Stresser interface {
func newStresser(clus *Cluster, idx int) Stresser { func newStresser(clus *Cluster, idx int) Stresser {
stressers := make([]Stresser, len(clus.Tester.StressTypes)) stressers := make([]Stresser, len(clus.Tester.StressTypes))
for i, stype := range clus.Tester.StressTypes { for i, stype := range clus.Tester.StressTypes {
clus.logger.Info("creating stresser", zap.String("type", stype)) clus.lg.Info("creating stresser", zap.String("type", stype))
switch stype { switch stype {
case "NO_STRESS": case "NO_STRESS":
@ -48,8 +49,8 @@ func newStresser(clus *Cluster, idx int) Stresser {
// TODO: Too intensive stressing clients can panic etcd member with // TODO: Too intensive stressing clients can panic etcd member with
// 'out of memory' error. Put rate limits in server side. // 'out of memory' error. Put rate limits in server side.
stressers[i] = &keyStresser{ stressers[i] = &keyStresser{
logger: clus.logger, lg: clus.lg,
Endpoint: clus.Members[idx].EtcdClientEndpoint, m: clus.Members[idx],
keySize: int(clus.Tester.StressKeySize), keySize: int(clus.Tester.StressKeySize),
keyLargeSize: int(clus.Tester.StressKeySizeLarge), keyLargeSize: int(clus.Tester.StressKeySizeLarge),
keySuffixRange: int(clus.Tester.StressKeySuffixRange), keySuffixRange: int(clus.Tester.StressKeySuffixRange),
@ -61,8 +62,8 @@ func newStresser(clus *Cluster, idx int) Stresser {
case "LEASE": case "LEASE":
stressers[i] = &leaseStresser{ stressers[i] = &leaseStresser{
logger: clus.logger, lg: clus.lg,
endpoint: clus.Members[idx].EtcdClientEndpoint, m: clus.Members[idx],
numLeases: 10, // TODO: configurable numLeases: 10, // TODO: configurable
keysPerLease: 10, // TODO: configurable keysPerLease: 10, // TODO: configurable
rateLimiter: clus.rateLimiter, rateLimiter: clus.rateLimiter,

View File

@ -25,6 +25,7 @@ import (
"github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/tools/functional-tester/rpcpb"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/time/rate" "golang.org/x/time/rate"
@ -33,9 +34,9 @@ import (
) )
type keyStresser struct { type keyStresser struct {
logger *zap.Logger lg *zap.Logger
Endpoint string // TODO: use Member m *rpcpb.Member
keySize int keySize int
keyLargeSize int keyLargeSize int
@ -59,9 +60,9 @@ type keyStresser struct {
func (s *keyStresser) Stress() error { func (s *keyStresser) Stress() error {
// TODO: add backoff option // TODO: add backoff option
conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure()) conn, err := s.m.DialEtcdGRPCServer()
if err != nil { if err != nil {
return fmt.Errorf("%v (%s)", err, s.Endpoint) return fmt.Errorf("%v (%q)", err, s.m.EtcdClientEndpoint)
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -96,9 +97,9 @@ func (s *keyStresser) Stress() error {
go s.run(ctx) go s.run(ctx)
} }
s.logger.Info( s.lg.Info(
"key stresser started in background", "key stresser started in background",
zap.String("endpoint", s.Endpoint), zap.String("endpoint", s.m.EtcdClientEndpoint),
) )
return nil return nil
} }
@ -150,9 +151,9 @@ func (s *keyStresser) run(ctx context.Context) {
// from stresser.Cancel method: // from stresser.Cancel method:
return return
default: default:
s.logger.Warn( s.lg.Warn(
"key stresser exited with error", "key stresser exited with error",
zap.String("endpoint", s.Endpoint), zap.String("endpoint", s.m.EtcdClientEndpoint),
zap.Error(err), zap.Error(err),
) )
return return
@ -169,9 +170,9 @@ func (s *keyStresser) Close() {
s.conn.Close() s.conn.Close()
s.wg.Wait() s.wg.Wait()
s.logger.Info( s.lg.Info(
"key stresser is closed", "key stresser is closed",
zap.String("endpoint", s.Endpoint), zap.String("endpoint", s.m.EtcdClientEndpoint),
) )
} }

View File

@ -24,6 +24,7 @@ import (
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb" pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/tools/functional-tester/rpcpb"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/time/rate" "golang.org/x/time/rate"
@ -37,14 +38,14 @@ const (
) )
type leaseStresser struct { type leaseStresser struct {
logger *zap.Logger lg *zap.Logger
endpoint string m *rpcpb.Member
cancel func() cancel func()
conn *grpc.ClientConn conn *grpc.ClientConn
kvc pb.KVClient kvc pb.KVClient
lc pb.LeaseClient lc pb.LeaseClient
ctx context.Context ctx context.Context
rateLimiter *rate.Limiter rateLimiter *rate.Limiter
// atomicModifiedKey records the number of keys created and deleted during a test case // atomicModifiedKey records the number of keys created and deleted during a test case
@ -122,18 +123,18 @@ func (ls *leaseStresser) setupOnce() error {
} }
func (ls *leaseStresser) Stress() error { func (ls *leaseStresser) Stress() error {
ls.logger.Info( ls.lg.Info(
"lease stresser is started", "lease stresser is started",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
) )
if err := ls.setupOnce(); err != nil { if err := ls.setupOnce(); err != nil {
return err return err
} }
conn, err := grpc.Dial(ls.endpoint, grpc.WithInsecure(), grpc.WithBackoffMaxDelay(1*time.Second)) conn, err := ls.m.DialEtcdGRPCServer(grpc.WithBackoffMaxDelay(1 * time.Second))
if err != nil { if err != nil {
return fmt.Errorf("%v (%s)", err, ls.endpoint) return fmt.Errorf("%v (%s)", err, ls.m.EtcdClientEndpoint)
} }
ls.conn = conn ls.conn = conn
ls.kvc = pb.NewKVClient(conn) ls.kvc = pb.NewKVClient(conn)
@ -161,24 +162,24 @@ func (ls *leaseStresser) run() {
return return
} }
ls.logger.Debug( ls.lg.Debug(
"lease stresser is creating leases", "lease stresser is creating leases",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
) )
ls.createLeases() ls.createLeases()
ls.logger.Debug( ls.lg.Debug(
"lease stresser created leases", "lease stresser created leases",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
) )
ls.logger.Debug( ls.lg.Debug(
"lease stresser is dropped leases", "lease stresser is dropped leases",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
) )
ls.randomlyDropLeases() ls.randomlyDropLeases()
ls.logger.Debug( ls.lg.Debug(
"lease stresser dropped leases", "lease stresser dropped leases",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
) )
} }
} }
@ -206,9 +207,9 @@ func (ls *leaseStresser) createAliveLeases() {
defer wg.Done() defer wg.Done()
leaseID, err := ls.createLeaseWithKeys(TTL) leaseID, err := ls.createLeaseWithKeys(TTL)
if err != nil { if err != nil {
ls.logger.Debug( ls.lg.Debug(
"createLeaseWithKeys failed", "createLeaseWithKeys failed",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.Error(err), zap.Error(err),
) )
return return
@ -244,17 +245,17 @@ func (ls *leaseStresser) createShortLivedLeases() {
func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) { func (ls *leaseStresser) createLeaseWithKeys(ttl int64) (int64, error) {
leaseID, err := ls.createLease(ttl) leaseID, err := ls.createLease(ttl)
if err != nil { if err != nil {
ls.logger.Debug( ls.lg.Debug(
"createLease failed", "createLease failed",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.Error(err), zap.Error(err),
) )
return -1, err return -1, err
} }
ls.logger.Debug( ls.lg.Debug(
"createLease created lease", "createLease created lease",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
) )
if err := ls.attachKeysWithLease(leaseID); err != nil { if err := ls.attachKeysWithLease(leaseID); err != nil {
@ -273,9 +274,9 @@ func (ls *leaseStresser) randomlyDropLeases() {
// if randomlyDropLease encountered an error such as context is cancelled, remove the lease from aliveLeases // if randomlyDropLease encountered an error such as context is cancelled, remove the lease from aliveLeases
// because we can't tell whether the lease is dropped or not. // because we can't tell whether the lease is dropped or not.
if err != nil { if err != nil {
ls.logger.Debug( ls.lg.Debug(
"randomlyDropLease failed", "randomlyDropLease failed",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err), zap.Error(err),
) )
@ -285,9 +286,9 @@ func (ls *leaseStresser) randomlyDropLeases() {
if !dropped { if !dropped {
return return
} }
ls.logger.Debug( ls.lg.Debug(
"randomlyDropLease dropped a lease", "randomlyDropLease dropped a lease",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
) )
ls.revokedLeases.add(leaseID, time.Now()) ls.revokedLeases.add(leaseID, time.Now())
@ -314,9 +315,9 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
select { select {
case <-time.After(500 * time.Millisecond): case <-time.After(500 * time.Millisecond):
case <-ls.ctx.Done(): case <-ls.ctx.Done():
ls.logger.Debug( ls.lg.Debug(
"keepLeaseAlive context canceled", "keepLeaseAlive context canceled",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(ls.ctx.Err()), zap.Error(ls.ctx.Err()),
) )
@ -328,9 +329,9 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
renewTime, ok := ls.aliveLeases.read(leaseID) renewTime, ok := ls.aliveLeases.read(leaseID)
if ok && renewTime.Add(TTL/2*time.Second).Before(time.Now()) { if ok && renewTime.Add(TTL/2*time.Second).Before(time.Now()) {
ls.aliveLeases.remove(leaseID) ls.aliveLeases.remove(leaseID)
ls.logger.Debug( ls.lg.Debug(
"keepLeaseAlive lease has not been renewed, dropped it", "keepLeaseAlive lease has not been renewed, dropped it",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
) )
} }
@ -338,9 +339,9 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
} }
if err != nil { if err != nil {
ls.logger.Debug( ls.lg.Debug(
"keepLeaseAlive lease creates stream error", "keepLeaseAlive lease creates stream error",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err), zap.Error(err),
) )
@ -351,32 +352,32 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
continue continue
} }
ls.logger.Debug( ls.lg.Debug(
"keepLeaseAlive stream sends lease keepalive request", "keepLeaseAlive stream sends lease keepalive request",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
) )
err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID}) err = stream.Send(&pb.LeaseKeepAliveRequest{ID: leaseID})
if err != nil { if err != nil {
ls.logger.Debug( ls.lg.Debug(
"keepLeaseAlive stream failed to send lease keepalive request", "keepLeaseAlive stream failed to send lease keepalive request",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err), zap.Error(err),
) )
continue continue
} }
leaseRenewTime := time.Now() leaseRenewTime := time.Now()
ls.logger.Debug( ls.lg.Debug(
"keepLeaseAlive stream sent lease keepalive request", "keepLeaseAlive stream sent lease keepalive request",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
) )
respRC, err := stream.Recv() respRC, err := stream.Recv()
if err != nil { if err != nil {
ls.logger.Debug( ls.lg.Debug(
"keepLeaseAlive stream failed to receive lease keepalive response", "keepLeaseAlive stream failed to receive lease keepalive response",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(err), zap.Error(err),
) )
@ -385,9 +386,9 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
// lease expires after TTL become 0 // lease expires after TTL become 0
// don't send keepalive if the lease has expired // don't send keepalive if the lease has expired
if respRC.TTL <= 0 { if respRC.TTL <= 0 {
ls.logger.Debug( ls.lg.Debug(
"keepLeaseAlive stream received lease keepalive response TTL <= 0", "keepLeaseAlive stream received lease keepalive response TTL <= 0",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Int64("ttl", respRC.TTL), zap.Int64("ttl", respRC.TTL),
) )
@ -395,9 +396,9 @@ func (ls *leaseStresser) keepLeaseAlive(leaseID int64) {
return return
} }
// renew lease timestamp only if lease is present // renew lease timestamp only if lease is present
ls.logger.Debug( ls.lg.Debug(
"keepLeaseAlive renewed a lease", "keepLeaseAlive renewed a lease",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
) )
ls.aliveLeases.update(leaseID, leaseRenewTime) ls.aliveLeases.update(leaseID, leaseRenewTime)
@ -444,31 +445,29 @@ func (ls *leaseStresser) randomlyDropLease(leaseID int64) (bool, error) {
} }
} }
ls.logger.Debug( ls.lg.Debug(
"randomlyDropLease error", "randomlyDropLease error",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
zap.String("lease-id", fmt.Sprintf("%016x", leaseID)), zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
zap.Error(ls.ctx.Err()), zap.Error(ls.ctx.Err()),
) )
return false, ls.ctx.Err() return false, ls.ctx.Err()
} }
func (ls *leaseStresser) Pause() { func (ls *leaseStresser) Pause() { ls.Close() }
ls.Close()
}
func (ls *leaseStresser) Close() { func (ls *leaseStresser) Close() {
ls.logger.Info( ls.lg.Info(
"lease stresser is closing", "lease stresser is closing",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
) )
ls.cancel() ls.cancel()
ls.runWg.Wait() ls.runWg.Wait()
ls.aliveWg.Wait() ls.aliveWg.Wait()
ls.conn.Close() ls.conn.Close()
ls.logger.Info( ls.lg.Info(
"lease stresser is closed", "lease stresser is closed",
zap.String("endpoint", ls.endpoint), zap.String("endpoint", ls.m.EtcdClientEndpoint),
) )
} }
@ -477,9 +476,5 @@ func (ls *leaseStresser) ModifiedKeys() int64 {
} }
func (ls *leaseStresser) Checker() Checker { func (ls *leaseStresser) Checker() Checker {
return &leaseChecker{ return &leaseChecker{lg: ls.lg, m: ls.m, ls: ls}
logger: ls.logger,
endpoint: ls.endpoint,
ls: ls,
}
} }