functional-tester/agent: rename "logger" field

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyuho Lee 2018-04-03 09:43:03 -07:00
parent eb0c66f912
commit c2731cde54
2 changed files with 55 additions and 56 deletions

View File

@ -36,7 +36,7 @@ func (srv *Server) handleTesterRequest(req *rpcpb.Request) (resp *rpcpb.Response
defer func() { defer func() {
if err == nil { if err == nil {
srv.last = req.Operation srv.last = req.Operation
srv.logger.Info("handler success", zap.String("operation", req.Operation.String())) srv.lg.Info("handler success", zap.String("operation", req.Operation.String()))
} }
}() }()
@ -78,24 +78,24 @@ func (srv *Server) handleInitialStartEtcd(req *rpcpb.Request) (*rpcpb.Response,
srv.Member = req.Member srv.Member = req.Member
srv.Tester = req.Tester srv.Tester = req.Tester
srv.logger.Info("creating base directory", zap.String("path", srv.Member.BaseDir)) srv.lg.Info("creating base directory", zap.String("path", srv.Member.BaseDir))
err := fileutil.TouchDirAll(srv.Member.BaseDir) err := fileutil.TouchDirAll(srv.Member.BaseDir)
if err != nil { if err != nil {
return nil, err return nil, err
} }
srv.logger.Info("created base directory", zap.String("path", srv.Member.BaseDir)) srv.lg.Info("created base directory", zap.String("path", srv.Member.BaseDir))
if err = srv.createEtcdFile(); err != nil { if err = srv.createEtcdFile(); err != nil {
return nil, err return nil, err
} }
srv.creatEtcdCmd() srv.creatEtcdCmd()
srv.logger.Info("starting etcd") srv.lg.Info("starting etcd")
err = srv.startEtcdCmd() err = srv.startEtcdCmd()
if err != nil { if err != nil {
return nil, err return nil, err
} }
srv.logger.Info("started etcd", zap.String("command-path", srv.etcdCmd.Path)) srv.lg.Info("started etcd", zap.String("command-path", srv.etcdCmd.Path))
// wait some time for etcd listener start // wait some time for etcd listener start
// before setting up proxy // before setting up proxy
@ -121,9 +121,9 @@ func (srv *Server) startProxy() error {
return err return err
} }
srv.logger.Info("starting proxy on client traffic", zap.String("url", advertiseClientURL.String())) srv.lg.Info("starting proxy on client traffic", zap.String("url", advertiseClientURL.String()))
srv.advertiseClientPortToProxy[advertiseClientURLPort] = transport.NewProxy(transport.ProxyConfig{ srv.advertiseClientPortToProxy[advertiseClientURLPort] = transport.NewProxy(transport.ProxyConfig{
Logger: srv.logger, Logger: srv.lg,
From: *advertiseClientURL, From: *advertiseClientURL,
To: *listenClientURL, To: *listenClientURL,
}) })
@ -131,7 +131,7 @@ func (srv *Server) startProxy() error {
case err = <-srv.advertiseClientPortToProxy[advertiseClientURLPort].Error(): case err = <-srv.advertiseClientPortToProxy[advertiseClientURLPort].Error():
return err return err
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
srv.logger.Info("started proxy on client traffic", zap.String("url", advertiseClientURL.String())) srv.lg.Info("started proxy on client traffic", zap.String("url", advertiseClientURL.String()))
} }
} }
@ -145,9 +145,9 @@ func (srv *Server) startProxy() error {
return err return err
} }
srv.logger.Info("starting proxy on peer traffic", zap.String("url", advertisePeerURL.String())) srv.lg.Info("starting proxy on peer traffic", zap.String("url", advertisePeerURL.String()))
srv.advertisePeerPortToProxy[advertisePeerURLPort] = transport.NewProxy(transport.ProxyConfig{ srv.advertisePeerPortToProxy[advertisePeerURLPort] = transport.NewProxy(transport.ProxyConfig{
Logger: srv.logger, Logger: srv.lg,
From: *advertisePeerURL, From: *advertisePeerURL,
To: *listenPeerURL, To: *listenPeerURL,
}) })
@ -155,7 +155,7 @@ func (srv *Server) startProxy() error {
case err = <-srv.advertisePeerPortToProxy[advertisePeerURLPort].Error(): case err = <-srv.advertisePeerPortToProxy[advertisePeerURLPort].Error():
return err return err
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
srv.logger.Info("started proxy on peer traffic", zap.String("url", advertisePeerURL.String())) srv.lg.Info("started proxy on peer traffic", zap.String("url", advertisePeerURL.String()))
} }
} }
return nil return nil
@ -164,13 +164,13 @@ func (srv *Server) startProxy() error {
func (srv *Server) stopProxy() { func (srv *Server) stopProxy() {
if srv.Member.EtcdClientProxy && len(srv.advertiseClientPortToProxy) > 0 { if srv.Member.EtcdClientProxy && len(srv.advertiseClientPortToProxy) > 0 {
for port, px := range srv.advertiseClientPortToProxy { for port, px := range srv.advertiseClientPortToProxy {
srv.logger.Info("closing proxy", srv.lg.Info("closing proxy",
zap.Int("port", port), zap.Int("port", port),
zap.String("from", px.From()), zap.String("from", px.From()),
zap.String("to", px.To()), zap.String("to", px.To()),
) )
if err := px.Close(); err != nil { if err := px.Close(); err != nil {
srv.logger.Warn("failed to close proxy", zap.Int("port", port)) srv.lg.Warn("failed to close proxy", zap.Int("port", port))
continue continue
} }
select { select {
@ -179,7 +179,7 @@ func (srv *Server) stopProxy() {
time.Sleep(time.Second) time.Sleep(time.Second)
case <-time.After(time.Second): case <-time.After(time.Second):
} }
srv.logger.Info("closed proxy", srv.lg.Info("closed proxy",
zap.Int("port", port), zap.Int("port", port),
zap.String("from", px.From()), zap.String("from", px.From()),
zap.String("to", px.To()), zap.String("to", px.To()),
@ -189,13 +189,13 @@ func (srv *Server) stopProxy() {
} }
if srv.Member.EtcdPeerProxy && len(srv.advertisePeerPortToProxy) > 0 { if srv.Member.EtcdPeerProxy && len(srv.advertisePeerPortToProxy) > 0 {
for port, px := range srv.advertisePeerPortToProxy { for port, px := range srv.advertisePeerPortToProxy {
srv.logger.Info("closing proxy", srv.lg.Info("closing proxy",
zap.Int("port", port), zap.Int("port", port),
zap.String("from", px.From()), zap.String("from", px.From()),
zap.String("to", px.To()), zap.String("to", px.To()),
) )
if err := px.Close(); err != nil { if err := px.Close(); err != nil {
srv.logger.Warn("failed to close proxy", zap.Int("port", port)) srv.lg.Warn("failed to close proxy", zap.Int("port", port))
continue continue
} }
select { select {
@ -204,7 +204,7 @@ func (srv *Server) stopProxy() {
time.Sleep(time.Second) time.Sleep(time.Second)
case <-time.After(time.Second): case <-time.After(time.Second):
} }
srv.logger.Info("closed proxy", srv.lg.Info("closed proxy",
zap.Int("port", port), zap.Int("port", port),
zap.String("from", px.From()), zap.String("from", px.From()),
zap.String("to", px.To()), zap.String("to", px.To()),
@ -215,20 +215,20 @@ func (srv *Server) stopProxy() {
} }
func (srv *Server) createEtcdFile() error { func (srv *Server) createEtcdFile() error {
srv.logger.Info("creating etcd log file", zap.String("path", srv.Member.EtcdLogPath)) srv.lg.Info("creating etcd log file", zap.String("path", srv.Member.EtcdLogPath))
var err error var err error
srv.etcdLogFile, err = os.Create(srv.Member.EtcdLogPath) srv.etcdLogFile, err = os.Create(srv.Member.EtcdLogPath)
if err != nil { if err != nil {
return err return err
} }
srv.logger.Info("created etcd log file", zap.String("path", srv.Member.EtcdLogPath)) srv.lg.Info("created etcd log file", zap.String("path", srv.Member.EtcdLogPath))
return nil return nil
} }
func (srv *Server) creatEtcdCmd() { func (srv *Server) creatEtcdCmd() {
etcdPath, etcdFlags := srv.Member.EtcdExecPath, srv.Member.Etcd.Flags() etcdPath, etcdFlags := srv.Member.EtcdExecPath, srv.Member.Etcd.Flags()
u, _ := url.Parse(srv.Member.FailpointHTTPAddr) u, _ := url.Parse(srv.Member.FailpointHTTPAddr)
srv.logger.Info("creating etcd command", srv.lg.Info("creating etcd command",
zap.String("etcd-exec-path", etcdPath), zap.String("etcd-exec-path", etcdPath),
zap.Strings("etcd-flags", etcdFlags), zap.Strings("etcd-flags", etcdFlags),
zap.String("failpoint-http-addr", srv.Member.FailpointHTTPAddr), zap.String("failpoint-http-addr", srv.Member.FailpointHTTPAddr),
@ -248,12 +248,12 @@ func (srv *Server) startEtcdCmd() error {
func (srv *Server) handleRestartEtcd() (*rpcpb.Response, error) { func (srv *Server) handleRestartEtcd() (*rpcpb.Response, error) {
srv.creatEtcdCmd() srv.creatEtcdCmd()
srv.logger.Info("restarting etcd") srv.lg.Info("restarting etcd")
err := srv.startEtcdCmd() err := srv.startEtcdCmd()
if err != nil { if err != nil {
return nil, err return nil, err
} }
srv.logger.Info("restarted etcd", zap.String("command-path", srv.etcdCmd.Path)) srv.lg.Info("restarted etcd", zap.String("command-path", srv.etcdCmd.Path))
// wait some time for etcd listener start // wait some time for etcd listener start
// before setting up proxy // before setting up proxy
@ -273,12 +273,12 @@ func (srv *Server) handleRestartEtcd() (*rpcpb.Response, error) {
func (srv *Server) handleKillEtcd() (*rpcpb.Response, error) { func (srv *Server) handleKillEtcd() (*rpcpb.Response, error) {
srv.stopProxy() srv.stopProxy()
srv.logger.Info("killing etcd", zap.String("signal", syscall.SIGTERM.String())) srv.lg.Info("killing etcd", zap.String("signal", syscall.SIGTERM.String()))
err := stopWithSig(srv.etcdCmd, syscall.SIGTERM) err := stopWithSig(srv.etcdCmd, syscall.SIGTERM)
if err != nil { if err != nil {
return nil, err return nil, err
} }
srv.logger.Info("killed etcd", zap.String("signal", syscall.SIGTERM.String())) srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGTERM.String()))
return &rpcpb.Response{ return &rpcpb.Response{
Success: true, Success: true,
@ -290,18 +290,18 @@ func (srv *Server) handleFailArchive() (*rpcpb.Response, error) {
srv.stopProxy() srv.stopProxy()
// exit with stackstrace // exit with stackstrace
srv.logger.Info("killing etcd", zap.String("signal", syscall.SIGQUIT.String())) srv.lg.Info("killing etcd", zap.String("signal", syscall.SIGQUIT.String()))
err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT) err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT)
if err != nil { if err != nil {
return nil, err return nil, err
} }
srv.logger.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String())) srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))
srv.etcdLogFile.Sync() srv.etcdLogFile.Sync()
srv.etcdLogFile.Close() srv.etcdLogFile.Close()
// TODO: support separate WAL directory // TODO: support separate WAL directory
srv.logger.Info("archiving data", zap.String("base-dir", srv.Member.BaseDir)) srv.lg.Info("archiving data", zap.String("base-dir", srv.Member.BaseDir))
if err = archive( if err = archive(
srv.Member.BaseDir, srv.Member.BaseDir,
srv.Member.EtcdLogPath, srv.Member.EtcdLogPath,
@ -309,17 +309,17 @@ func (srv *Server) handleFailArchive() (*rpcpb.Response, error) {
); err != nil { ); err != nil {
return nil, err return nil, err
} }
srv.logger.Info("archived data", zap.String("base-dir", srv.Member.BaseDir)) srv.lg.Info("archived data", zap.String("base-dir", srv.Member.BaseDir))
if err = srv.createEtcdFile(); err != nil { if err = srv.createEtcdFile(); err != nil {
return nil, err return nil, err
} }
srv.logger.Info("cleaning up page cache") srv.lg.Info("cleaning up page cache")
if err := cleanPageCache(); err != nil { if err := cleanPageCache(); err != nil {
srv.logger.Warn("failed to clean up page cache", zap.String("error", err.Error())) srv.lg.Warn("failed to clean up page cache", zap.String("error", err.Error()))
} }
srv.logger.Info("cleaned up page cache") srv.lg.Info("cleaned up page cache")
return &rpcpb.Response{ return &rpcpb.Response{
Success: true, Success: true,
@ -329,32 +329,32 @@ func (srv *Server) handleFailArchive() (*rpcpb.Response, error) {
// stop proxy, etcd, delete data directory // stop proxy, etcd, delete data directory
func (srv *Server) handleDestroyEtcdAgent() (*rpcpb.Response, error) { func (srv *Server) handleDestroyEtcdAgent() (*rpcpb.Response, error) {
srv.logger.Info("killing etcd", zap.String("signal", syscall.SIGTERM.String())) srv.lg.Info("killing etcd", zap.String("signal", syscall.SIGTERM.String()))
err := stopWithSig(srv.etcdCmd, syscall.SIGTERM) err := stopWithSig(srv.etcdCmd, syscall.SIGTERM)
if err != nil { if err != nil {
return nil, err return nil, err
} }
srv.logger.Info("killed etcd", zap.String("signal", syscall.SIGTERM.String())) srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGTERM.String()))
srv.logger.Info("removing base directory", zap.String("dir", srv.Member.BaseDir)) srv.lg.Info("removing base directory", zap.String("dir", srv.Member.BaseDir))
err = os.RemoveAll(srv.Member.BaseDir) err = os.RemoveAll(srv.Member.BaseDir)
if err != nil { if err != nil {
return nil, err return nil, err
} }
srv.logger.Info("removed base directory", zap.String("dir", srv.Member.BaseDir)) srv.lg.Info("removed base directory", zap.String("dir", srv.Member.BaseDir))
// stop agent server // stop agent server
srv.Stop() srv.Stop()
for port, px := range srv.advertiseClientPortToProxy { for port, px := range srv.advertiseClientPortToProxy {
srv.logger.Info("closing proxy", zap.Int("client-port", port)) srv.lg.Info("closing proxy", zap.Int("client-port", port))
err := px.Close() err := px.Close()
srv.logger.Info("closed proxy", zap.Int("client-port", port), zap.Error(err)) srv.lg.Info("closed proxy", zap.Int("client-port", port), zap.Error(err))
} }
for port, px := range srv.advertisePeerPortToProxy { for port, px := range srv.advertisePeerPortToProxy {
srv.logger.Info("closing proxy", zap.Int("peer-port", port)) srv.lg.Info("closing proxy", zap.Int("peer-port", port))
err := px.Close() err := px.Close()
srv.logger.Info("closed proxy", zap.Int("peer-port", port), zap.Error(err)) srv.lg.Info("closed proxy", zap.Int("peer-port", port), zap.Error(err))
} }
return &rpcpb.Response{ return &rpcpb.Response{
@ -365,10 +365,10 @@ func (srv *Server) handleDestroyEtcdAgent() (*rpcpb.Response, error) {
func (srv *Server) handleBlackholePeerPortTxRx() (*rpcpb.Response, error) { func (srv *Server) handleBlackholePeerPortTxRx() (*rpcpb.Response, error) {
for port, px := range srv.advertisePeerPortToProxy { for port, px := range srv.advertisePeerPortToProxy {
srv.logger.Info("blackholing", zap.Int("peer-port", port)) srv.lg.Info("blackholing", zap.Int("peer-port", port))
px.BlackholeTx() px.BlackholeTx()
px.BlackholeRx() px.BlackholeRx()
srv.logger.Info("blackholed", zap.Int("peer-port", port)) srv.lg.Info("blackholed", zap.Int("peer-port", port))
} }
return &rpcpb.Response{ return &rpcpb.Response{
Success: true, Success: true,
@ -378,10 +378,10 @@ func (srv *Server) handleBlackholePeerPortTxRx() (*rpcpb.Response, error) {
func (srv *Server) handleUnblackholePeerPortTxRx() (*rpcpb.Response, error) { func (srv *Server) handleUnblackholePeerPortTxRx() (*rpcpb.Response, error) {
for port, px := range srv.advertisePeerPortToProxy { for port, px := range srv.advertisePeerPortToProxy {
srv.logger.Info("unblackholing", zap.Int("peer-port", port)) srv.lg.Info("unblackholing", zap.Int("peer-port", port))
px.UnblackholeTx() px.UnblackholeTx()
px.UnblackholeRx() px.UnblackholeRx()
srv.logger.Info("unblackholed", zap.Int("peer-port", port)) srv.lg.Info("unblackholed", zap.Int("peer-port", port))
} }
return &rpcpb.Response{ return &rpcpb.Response{
Success: true, Success: true,
@ -394,14 +394,14 @@ func (srv *Server) handleDelayPeerPortTxRx() (*rpcpb.Response, error) {
rv := time.Duration(srv.Tester.DelayLatencyMsRv) * time.Millisecond rv := time.Duration(srv.Tester.DelayLatencyMsRv) * time.Millisecond
for port, px := range srv.advertisePeerPortToProxy { for port, px := range srv.advertisePeerPortToProxy {
srv.logger.Info("delaying", srv.lg.Info("delaying",
zap.Int("peer-port", port), zap.Int("peer-port", port),
zap.Duration("latency", lat), zap.Duration("latency", lat),
zap.Duration("random-variable", rv), zap.Duration("random-variable", rv),
) )
px.DelayTx(lat, rv) px.DelayTx(lat, rv)
px.DelayRx(lat, rv) px.DelayRx(lat, rv)
srv.logger.Info("delayed", srv.lg.Info("delayed",
zap.Int("peer-port", port), zap.Int("peer-port", port),
zap.Duration("latency", lat), zap.Duration("latency", lat),
zap.Duration("random-variable", rv), zap.Duration("random-variable", rv),
@ -416,10 +416,10 @@ func (srv *Server) handleDelayPeerPortTxRx() (*rpcpb.Response, error) {
func (srv *Server) handleUndelayPeerPortTxRx() (*rpcpb.Response, error) { func (srv *Server) handleUndelayPeerPortTxRx() (*rpcpb.Response, error) {
for port, px := range srv.advertisePeerPortToProxy { for port, px := range srv.advertisePeerPortToProxy {
srv.logger.Info("undelaying", zap.Int("peer-port", port)) srv.lg.Info("undelaying", zap.Int("peer-port", port))
px.UndelayTx() px.UndelayTx()
px.UndelayRx() px.UndelayRx()
srv.logger.Info("undelayed", zap.Int("peer-port", port)) srv.lg.Info("undelayed", zap.Int("peer-port", port))
} }
return &rpcpb.Response{ return &rpcpb.Response{
Success: true, Success: true,

View File

@ -34,7 +34,7 @@ import (
// serialized in tester-side // serialized in tester-side
type Server struct { type Server struct {
grpcServer *grpc.Server grpcServer *grpc.Server
logger *zap.Logger lg *zap.Logger
network string network string
address string address string
@ -56,12 +56,12 @@ type Server struct {
// NewServer returns a new agent server. // NewServer returns a new agent server.
func NewServer( func NewServer(
logger *zap.Logger, lg *zap.Logger,
network string, network string,
address string, address string,
) *Server { ) *Server {
return &Server{ return &Server{
logger: logger, lg: lg,
network: network, network: network,
address: address, address: address,
last: rpcpb.Operation_NotStarted, last: rpcpb.Operation_NotStarted,
@ -93,34 +93,33 @@ func (srv *Server) StartServe() error {
rpcpb.RegisterTransportServer(srv.grpcServer, srv) rpcpb.RegisterTransportServer(srv.grpcServer, srv)
srv.logger.Info( srv.lg.Info(
"gRPC server started", "gRPC server started",
zap.String("address", srv.address), zap.String("address", srv.address),
zap.String("listener-address", srv.ln.Addr().String()), zap.String("listener-address", srv.ln.Addr().String()),
) )
err = srv.grpcServer.Serve(srv.ln) err = srv.grpcServer.Serve(srv.ln)
if err != nil && strings.Contains(err.Error(), "use of closed network connection") { if err != nil && strings.Contains(err.Error(), "use of closed network connection") {
srv.logger.Info( srv.lg.Info(
"gRPC server is shut down", "gRPC server is shut down",
zap.String("address", srv.address), zap.String("address", srv.address),
zap.Error(err), zap.Error(err),
) )
} else { } else {
srv.logger.Warn( srv.lg.Warn(
"gRPC server returned with error", "gRPC server returned with error",
zap.String("address", srv.address), zap.String("address", srv.address),
zap.Error(err), zap.Error(err),
) )
} }
return err return err
} }
// Stop stops serving gRPC server. // Stop stops serving gRPC server.
func (srv *Server) Stop() { func (srv *Server) Stop() {
srv.logger.Info("gRPC server stopping", zap.String("address", srv.address)) srv.lg.Info("gRPC server stopping", zap.String("address", srv.address))
srv.grpcServer.Stop() srv.grpcServer.Stop()
srv.logger.Info("gRPC server stopped", zap.String("address", srv.address)) srv.lg.Info("gRPC server stopped", zap.String("address", srv.address))
} }
// Transport communicates with etcd tester. // Transport communicates with etcd tester.