Compare commits

...

1 Commits

Author SHA1 Message Date
Svarog
ecc7a3ba0e [NOD-1420] Restructure main (#942)
* [NOD-1420] Moved setting limits to executor

* [NOD-1420] Moved all code dealing with windows service to separate package

* [NOD-1420] Move practically all main to restructured app package

* [NOD-1420] Check for running as interactive only after checking if we are doing any service operation

* [NOD-1420] Add comments

* [NOD-1420] Add a comment
2020-09-30 17:07:40 +03:00
19 changed files with 875 additions and 809 deletions

View File

@@ -2,259 +2,194 @@ package app
import ( import (
"fmt" "fmt"
"sync/atomic" "os"
"path/filepath"
"runtime"
"time"
"github.com/kaspanet/kaspad/infrastructure/network/addressmanager"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/id"
"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol"
"github.com/kaspanet/kaspad/app/rpc"
"github.com/kaspanet/kaspad/domain/blockdag"
"github.com/kaspanet/kaspad/domain/blockdag/indexers"
"github.com/kaspanet/kaspad/domain/mempool"
"github.com/kaspanet/kaspad/domain/mining"
"github.com/kaspanet/kaspad/domain/txscript"
"github.com/kaspanet/kaspad/infrastructure/config"
"github.com/kaspanet/kaspad/infrastructure/db/dbaccess" "github.com/kaspanet/kaspad/infrastructure/db/dbaccess"
"github.com/kaspanet/kaspad/infrastructure/network/connmanager"
"github.com/kaspanet/kaspad/infrastructure/network/dnsseed" "github.com/kaspanet/kaspad/domain/blockdag/indexers"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter" "github.com/kaspanet/kaspad/infrastructure/os/signal"
"github.com/kaspanet/kaspad/util/profiling"
"github.com/kaspanet/kaspad/version"
"github.com/kaspanet/kaspad/util/panics" "github.com/kaspanet/kaspad/util/panics"
"github.com/kaspanet/kaspad/infrastructure/config"
"github.com/kaspanet/kaspad/infrastructure/os/execenv"
"github.com/kaspanet/kaspad/infrastructure/os/limits"
"github.com/kaspanet/kaspad/infrastructure/os/winservice"
) )
// App is a wrapper for all the kaspad services var desiredLimits = &limits.DesiredLimits{
type App struct { FileLimitWant: 2048,
cfg *config.Config FileLimitMin: 1024,
addressManager *addressmanager.AddressManager
protocolManager *protocol.Manager
rpcManager *rpc.Manager
connectionManager *connmanager.ConnectionManager
netAdapter *netadapter.NetAdapter
started, shutdown int32
} }
// Start launches all the kaspad services. var serviceDescription = &winservice.ServiceDescription{
func (a *App) Start() { Name: "kaspadsvc",
// Already started? DisplayName: "Kaspad Service",
if atomic.AddInt32(&a.started, 1) != 1 { Description: "Downloads and stays synchronized with the Kaspa blockDAG and " +
return "provides DAG services to applications.",
}
log.Trace("Starting kaspad")
err := a.netAdapter.Start()
if err != nil {
panics.Exit(log, fmt.Sprintf("Error starting the net adapter: %+v", err))
}
a.maybeSeedFromDNS()
a.connectionManager.Start()
} }
// Stop gracefully shuts down all the kaspad services. type kaspadApp struct {
func (a *App) Stop() { cfg *config.Config
// Make sure this only happens once.
if atomic.AddInt32(&a.shutdown, 1) != 1 {
log.Infof("Kaspad is already in the process of shutting down")
return
}
log.Warnf("Kaspad shutting down")
a.connectionManager.Stop()
err := a.netAdapter.Stop()
if err != nil {
log.Errorf("Error stopping the net adapter: %+v", err)
}
err = a.addressManager.Stop()
if err != nil {
log.Errorf("Error stopping address manager: %s", err)
}
return
} }
// New returns a new App instance configured to listen on addr for the // StartApp starts the kaspad app, and blocks until it finishes running
// kaspa network type specified by dagParams. Use start to begin accepting func StartApp() error {
// connections from peers. execenv.Initialize(desiredLimits)
func New(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, interrupt chan<- struct{}) (*App, error) {
indexManager, acceptanceIndex := setupIndexes(cfg)
sigCache := txscript.NewSigCache(cfg.SigCacheMaxSize) // Load configuration and parse command line. This function also
// initializes logging and configures it accordingly.
// Create a new block DAG instance with the appropriate configuration. cfg, err := config.LoadConfig()
dag, err := setupDAG(cfg, databaseContext, sigCache, indexManager)
if err != nil { if err != nil {
return nil, err fmt.Fprint(os.Stderr, err)
return err
} }
defer panics.HandlePanic(log, "MAIN", nil)
txMempool := setupMempool(cfg, dag, sigCache) app := &kaspadApp{cfg: cfg}
netAdapter, err := netadapter.NewNetAdapter(cfg) // Call serviceMain on Windows to handle running as a service. When
if err != nil { // the return isService flag is true, exit now since we ran as a
return nil, err // service. Otherwise, just fall through to normal operation.
} if runtime.GOOS == "windows" {
addressManager, err := addressmanager.New(cfg, databaseContext) isService, err := winservice.WinServiceMain(app.main, serviceDescription, cfg)
if err != nil {
return nil, err
}
connectionManager, err := connmanager.New(cfg, netAdapter, addressManager)
if err != nil {
return nil, err
}
protocolManager, err := protocol.NewManager(cfg, dag, netAdapter, addressManager, txMempool, connectionManager)
if err != nil {
return nil, err
}
rpcManager := setupRPC(cfg, txMempool, dag, sigCache, netAdapter, protocolManager, connectionManager, addressManager, acceptanceIndex, interrupt)
return &App{
cfg: cfg,
protocolManager: protocolManager,
rpcManager: rpcManager,
connectionManager: connectionManager,
netAdapter: netAdapter,
addressManager: addressManager,
}, nil
}
func setupRPC(
cfg *config.Config,
txMempool *mempool.TxPool,
dag *blockdag.BlockDAG,
sigCache *txscript.SigCache,
netAdapter *netadapter.NetAdapter,
protocolManager *protocol.Manager,
connectionManager *connmanager.ConnectionManager,
addressManager *addressmanager.AddressManager,
acceptanceIndex *indexers.AcceptanceIndex,
shutDownChan chan<- struct{},
) *rpc.Manager {
blockTemplateGenerator := mining.NewBlkTmplGenerator(&mining.Policy{BlockMaxMass: cfg.BlockMaxMass}, txMempool, dag, sigCache)
rpcManager := rpc.NewManager(cfg, netAdapter, dag, protocolManager, connectionManager, blockTemplateGenerator, txMempool, addressManager, acceptanceIndex, shutDownChan)
protocolManager.SetOnBlockAddedToDAGHandler(rpcManager.NotifyBlockAddedToDAG)
protocolManager.SetOnTransactionAddedToMempoolHandler(rpcManager.NotifyTransactionAddedToMempool)
dag.Subscribe(func(notification *blockdag.Notification) {
err := handleBlockDAGNotifications(notification, acceptanceIndex, rpcManager)
if err != nil { if err != nil {
panic(err) return err
} }
}) if isService {
return rpcManager
}
func handleBlockDAGNotifications(notification *blockdag.Notification,
acceptanceIndex *indexers.AcceptanceIndex, rpcManager *rpc.Manager) error {
switch notification.Type {
case blockdag.NTChainChanged:
if acceptanceIndex == nil {
return nil return nil
} }
chainChangedNotificationData := notification.Data.(*blockdag.ChainChangedNotificationData) }
err := rpcManager.NotifyChainChanged(chainChangedNotificationData.RemovedChainBlockHashes,
chainChangedNotificationData.AddedChainBlockHashes) return app.main(nil)
if err != nil { }
return err
} func (app *kaspadApp) main(startedChan chan<- struct{}) error {
case blockdag.NTFinalityConflict: // Get a channel that will be closed when a shutdown signal has been
finalityConflictNotificationData := notification.Data.(*blockdag.FinalityConflictNotificationData) // triggered either from an OS signal such as SIGINT (Ctrl+C) or from
err := rpcManager.NotifyFinalityConflict(finalityConflictNotificationData.ViolatingBlockHash.String()) // another subsystem such as the RPC server.
if err != nil { interrupt := signal.InterruptListener()
return err defer log.Info("Shutdown complete")
}
case blockdag.NTFinalityConflictResolved: // Show version at startup.
finalityConflictResolvedNotificationData := notification.Data.(*blockdag.FinalityConflictResolvedNotificationData) log.Infof("Version %s", version.Version())
err := rpcManager.NotifyFinalityConflictResolved(finalityConflictResolvedNotificationData.FinalityBlockHash.String())
// Enable http profiling server if requested.
if app.cfg.Profile != "" {
profiling.Start(app.cfg.Profile, log)
}
// Perform upgrades to kaspad as new versions require it.
if err := doUpgrades(); err != nil {
log.Error(err)
return err
}
// Return now if an interrupt signal was triggered.
if signal.InterruptRequested(interrupt) {
return nil
}
if app.cfg.ResetDatabase {
err := removeDatabase(app.cfg)
if err != nil { if err != nil {
log.Error(err)
return err return err
} }
} }
// Open the database
databaseContext, err := openDB(app.cfg)
if err != nil {
log.Error(err)
return err
}
defer func() {
log.Infof("Gracefully shutting down the database...")
err := databaseContext.Close()
if err != nil {
log.Errorf("Failed to close the database: %s", err)
}
}()
// Return now if an interrupt signal was triggered.
if signal.InterruptRequested(interrupt) {
return nil
}
// Drop indexes and exit if requested.
if app.cfg.DropAcceptanceIndex {
if err := indexers.DropAcceptanceIndex(databaseContext); err != nil {
log.Errorf("%s", err)
return err
}
return nil
}
// Create componentManager and start it.
componentManager, err := NewComponentManager(app.cfg, databaseContext, interrupt)
if err != nil {
log.Errorf("Unable to start kaspad: %+v", err)
return err
}
defer func() {
log.Infof("Gracefully shutting down kaspad...")
shutdownDone := make(chan struct{})
go func() {
componentManager.Stop()
shutdownDone <- struct{}{}
}()
const shutdownTimeout = 2 * time.Minute
select {
case <-shutdownDone:
case <-time.After(shutdownTimeout):
log.Criticalf("Graceful shutdown timed out %s. Terminating...", shutdownTimeout)
}
log.Infof("Kaspad shutdown complete")
}()
componentManager.Start()
if startedChan != nil {
startedChan <- struct{}{}
}
// Wait until the interrupt signal is received from an OS signal or
// shutdown is requested through one of the subsystems such as the RPC
// server.
<-interrupt
return nil return nil
} }
func (a *App) maybeSeedFromDNS() { // doUpgrades performs upgrades to kaspad as new versions require it.
if !a.cfg.DisableDNSSeed { // currently it's a placeholder we got from kaspad upstream, that does nothing
dnsseed.SeedFromDNS(a.cfg.NetParams(), a.cfg.DNSSeed, appmessage.SFNodeNetwork, false, nil, func doUpgrades() error {
a.cfg.Lookup, func(addresses []*appmessage.NetAddress) { return nil
// Kaspad uses a lookup of the dns seeder here. Since seeder returns
// IPs of nodes and not its own IP, we can not know real IP of
// source. So we'll take first returned address as source.
a.addressManager.AddAddresses(addresses, addresses[0], nil)
})
}
if a.cfg.GRPCSeed != "" {
dnsseed.SeedFromGRPC(a.cfg.NetParams(), a.cfg.GRPCSeed, appmessage.SFNodeNetwork, false, nil,
func(addresses []*appmessage.NetAddress) {
a.addressManager.AddAddresses(addresses, addresses[0], nil)
})
}
}
func setupDAG(cfg *config.Config, databaseContext *dbaccess.DatabaseContext,
sigCache *txscript.SigCache, indexManager blockdag.IndexManager) (*blockdag.BlockDAG, error) {
dag, err := blockdag.New(&blockdag.Config{
DatabaseContext: databaseContext,
DAGParams: cfg.NetParams(),
TimeSource: blockdag.NewTimeSource(),
SigCache: sigCache,
IndexManager: indexManager,
SubnetworkID: cfg.SubnetworkID,
MaxUTXOCacheSize: cfg.MaxUTXOCacheSize,
})
return dag, err
} }
func setupIndexes(cfg *config.Config) (blockdag.IndexManager, *indexers.AcceptanceIndex) { // dbPath returns the path to the block database given a database type.
// Create indexes if needed. func databasePath(cfg *config.Config) string {
var indexes []indexers.Indexer return filepath.Join(cfg.DataDir, "db")
var acceptanceIndex *indexers.AcceptanceIndex
if cfg.AcceptanceIndex {
log.Info("acceptance index is enabled")
acceptanceIndex = indexers.NewAcceptanceIndex()
indexes = append(indexes, acceptanceIndex)
}
// Create an index manager if any of the optional indexes are enabled.
if len(indexes) < 0 {
return nil, nil
}
indexManager := indexers.NewManager(indexes)
return indexManager, acceptanceIndex
} }
func setupMempool(cfg *config.Config, dag *blockdag.BlockDAG, sigCache *txscript.SigCache) *mempool.TxPool { func removeDatabase(cfg *config.Config) error {
mempoolConfig := mempool.Config{ dbPath := databasePath(cfg)
Policy: mempool.Policy{ return os.RemoveAll(dbPath)
AcceptNonStd: cfg.RelayNonStd,
MaxOrphanTxs: cfg.MaxOrphanTxs,
MaxOrphanTxSize: config.DefaultMaxOrphanTxSize,
MinRelayTxFee: cfg.MinRelayTxFee,
MaxTxVersion: 1,
},
CalcTxSequenceLockFromReferencedUTXOEntries: dag.CalcTxSequenceLockFromReferencedUTXOEntries,
SigCache: sigCache,
DAG: dag,
}
return mempool.New(&mempoolConfig)
} }
// P2PNodeID returns the network ID associated with this App func openDB(cfg *config.Config) (*dbaccess.DatabaseContext, error) {
func (a *App) P2PNodeID() *id.ID { dbPath := databasePath(cfg)
return a.netAdapter.ID() log.Infof("Loading database from '%s'", dbPath)
} return dbaccess.New(dbPath)
// AddressManager returns the AddressManager associated with this App
func (a *App) AddressManager() *addressmanager.AddressManager {
return a.addressManager
} }

260
app/component_manager.go Normal file
View File

@@ -0,0 +1,260 @@
package app
import (
"fmt"
"sync/atomic"
"github.com/kaspanet/kaspad/infrastructure/network/addressmanager"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/id"
"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol"
"github.com/kaspanet/kaspad/app/rpc"
"github.com/kaspanet/kaspad/domain/blockdag"
"github.com/kaspanet/kaspad/domain/blockdag/indexers"
"github.com/kaspanet/kaspad/domain/mempool"
"github.com/kaspanet/kaspad/domain/mining"
"github.com/kaspanet/kaspad/domain/txscript"
"github.com/kaspanet/kaspad/infrastructure/config"
"github.com/kaspanet/kaspad/infrastructure/db/dbaccess"
"github.com/kaspanet/kaspad/infrastructure/network/connmanager"
"github.com/kaspanet/kaspad/infrastructure/network/dnsseed"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter"
"github.com/kaspanet/kaspad/util/panics"
)
// ComponentManager is a wrapper for all the kaspad services
type ComponentManager struct {
cfg *config.Config
addressManager *addressmanager.AddressManager
protocolManager *protocol.Manager
rpcManager *rpc.Manager
connectionManager *connmanager.ConnectionManager
netAdapter *netadapter.NetAdapter
started, shutdown int32
}
// Start launches all the kaspad services.
func (a *ComponentManager) Start() {
// Already started?
if atomic.AddInt32(&a.started, 1) != 1 {
return
}
log.Trace("Starting kaspad")
err := a.netAdapter.Start()
if err != nil {
panics.Exit(log, fmt.Sprintf("Error starting the net adapter: %+v", err))
}
a.maybeSeedFromDNS()
a.connectionManager.Start()
}
// Stop gracefully shuts down all the kaspad services.
func (a *ComponentManager) Stop() {
// Make sure this only happens once.
if atomic.AddInt32(&a.shutdown, 1) != 1 {
log.Infof("Kaspad is already in the process of shutting down")
return
}
log.Warnf("Kaspad shutting down")
a.connectionManager.Stop()
err := a.netAdapter.Stop()
if err != nil {
log.Errorf("Error stopping the net adapter: %+v", err)
}
err = a.addressManager.Stop()
if err != nil {
log.Errorf("Error stopping address manager: %s", err)
}
return
}
// NewComponentManager returns a new ComponentManager instance.
// Use Start() to begin all services within this ComponentManager
func NewComponentManager(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, interrupt chan<- struct{}) (*ComponentManager, error) {
indexManager, acceptanceIndex := setupIndexes(cfg)
sigCache := txscript.NewSigCache(cfg.SigCacheMaxSize)
// Create a new block DAG instance with the appropriate configuration.
dag, err := setupDAG(cfg, databaseContext, sigCache, indexManager)
if err != nil {
return nil, err
}
txMempool := setupMempool(cfg, dag, sigCache)
netAdapter, err := netadapter.NewNetAdapter(cfg)
if err != nil {
return nil, err
}
addressManager, err := addressmanager.New(cfg, databaseContext)
if err != nil {
return nil, err
}
connectionManager, err := connmanager.New(cfg, netAdapter, addressManager)
if err != nil {
return nil, err
}
protocolManager, err := protocol.NewManager(cfg, dag, netAdapter, addressManager, txMempool, connectionManager)
if err != nil {
return nil, err
}
rpcManager := setupRPC(cfg, txMempool, dag, sigCache, netAdapter, protocolManager, connectionManager, addressManager, acceptanceIndex, interrupt)
return &ComponentManager{
cfg: cfg,
protocolManager: protocolManager,
rpcManager: rpcManager,
connectionManager: connectionManager,
netAdapter: netAdapter,
addressManager: addressManager,
}, nil
}
func setupRPC(
cfg *config.Config,
txMempool *mempool.TxPool,
dag *blockdag.BlockDAG,
sigCache *txscript.SigCache,
netAdapter *netadapter.NetAdapter,
protocolManager *protocol.Manager,
connectionManager *connmanager.ConnectionManager,
addressManager *addressmanager.AddressManager,
acceptanceIndex *indexers.AcceptanceIndex,
shutDownChan chan<- struct{},
) *rpc.Manager {
blockTemplateGenerator := mining.NewBlkTmplGenerator(&mining.Policy{BlockMaxMass: cfg.BlockMaxMass}, txMempool, dag, sigCache)
rpcManager := rpc.NewManager(cfg, netAdapter, dag, protocolManager, connectionManager, blockTemplateGenerator, txMempool, addressManager, acceptanceIndex, shutDownChan)
protocolManager.SetOnBlockAddedToDAGHandler(rpcManager.NotifyBlockAddedToDAG)
protocolManager.SetOnTransactionAddedToMempoolHandler(rpcManager.NotifyTransactionAddedToMempool)
dag.Subscribe(func(notification *blockdag.Notification) {
err := handleBlockDAGNotifications(notification, acceptanceIndex, rpcManager)
if err != nil {
panic(err)
}
})
return rpcManager
}
func handleBlockDAGNotifications(notification *blockdag.Notification,
acceptanceIndex *indexers.AcceptanceIndex, rpcManager *rpc.Manager) error {
switch notification.Type {
case blockdag.NTChainChanged:
if acceptanceIndex == nil {
return nil
}
chainChangedNotificationData := notification.Data.(*blockdag.ChainChangedNotificationData)
err := rpcManager.NotifyChainChanged(chainChangedNotificationData.RemovedChainBlockHashes,
chainChangedNotificationData.AddedChainBlockHashes)
if err != nil {
return err
}
case blockdag.NTFinalityConflict:
finalityConflictNotificationData := notification.Data.(*blockdag.FinalityConflictNotificationData)
err := rpcManager.NotifyFinalityConflict(finalityConflictNotificationData.ViolatingBlockHash.String())
if err != nil {
return err
}
case blockdag.NTFinalityConflictResolved:
finalityConflictResolvedNotificationData := notification.Data.(*blockdag.FinalityConflictResolvedNotificationData)
err := rpcManager.NotifyFinalityConflictResolved(finalityConflictResolvedNotificationData.FinalityBlockHash.String())
if err != nil {
return err
}
}
return nil
}
func (a *ComponentManager) maybeSeedFromDNS() {
if !a.cfg.DisableDNSSeed {
dnsseed.SeedFromDNS(a.cfg.NetParams(), a.cfg.DNSSeed, appmessage.SFNodeNetwork, false, nil,
a.cfg.Lookup, func(addresses []*appmessage.NetAddress) {
// Kaspad uses a lookup of the dns seeder here. Since seeder returns
// IPs of nodes and not its own IP, we can not know real IP of
// source. So we'll take first returned address as source.
a.addressManager.AddAddresses(addresses, addresses[0], nil)
})
}
if a.cfg.GRPCSeed != "" {
dnsseed.SeedFromGRPC(a.cfg.NetParams(), a.cfg.GRPCSeed, appmessage.SFNodeNetwork, false, nil,
func(addresses []*appmessage.NetAddress) {
a.addressManager.AddAddresses(addresses, addresses[0], nil)
})
}
}
func setupDAG(cfg *config.Config, databaseContext *dbaccess.DatabaseContext,
sigCache *txscript.SigCache, indexManager blockdag.IndexManager) (*blockdag.BlockDAG, error) {
dag, err := blockdag.New(&blockdag.Config{
DatabaseContext: databaseContext,
DAGParams: cfg.NetParams(),
TimeSource: blockdag.NewTimeSource(),
SigCache: sigCache,
IndexManager: indexManager,
SubnetworkID: cfg.SubnetworkID,
MaxUTXOCacheSize: cfg.MaxUTXOCacheSize,
})
return dag, err
}
func setupIndexes(cfg *config.Config) (blockdag.IndexManager, *indexers.AcceptanceIndex) {
// Create indexes if needed.
var indexes []indexers.Indexer
var acceptanceIndex *indexers.AcceptanceIndex
if cfg.AcceptanceIndex {
log.Info("acceptance index is enabled")
acceptanceIndex = indexers.NewAcceptanceIndex()
indexes = append(indexes, acceptanceIndex)
}
// Create an index manager if any of the optional indexes are enabled.
if len(indexes) < 0 {
return nil, nil
}
indexManager := indexers.NewManager(indexes)
return indexManager, acceptanceIndex
}
func setupMempool(cfg *config.Config, dag *blockdag.BlockDAG, sigCache *txscript.SigCache) *mempool.TxPool {
mempoolConfig := mempool.Config{
Policy: mempool.Policy{
AcceptNonStd: cfg.RelayNonStd,
MaxOrphanTxs: cfg.MaxOrphanTxs,
MaxOrphanTxSize: config.DefaultMaxOrphanTxSize,
MinRelayTxFee: cfg.MinRelayTxFee,
MaxTxVersion: 1,
},
CalcTxSequenceLockFromReferencedUTXOEntries: dag.CalcTxSequenceLockFromReferencedUTXOEntries,
SigCache: sigCache,
DAG: dag,
}
return mempool.New(&mempoolConfig)
}
// P2PNodeID returns the network ID associated with this ComponentManager
func (a *ComponentManager) P2PNodeID() *id.ID {
return a.netAdapter.ID()
}
// AddressManager returns the AddressManager associated with this ComponentManager
func (a *ComponentManager) AddressManager() *addressmanager.AddressManager {
return a.addressManager
}

View File

@@ -5,10 +5,11 @@
package main package main
import ( import (
"github.com/kaspanet/kaspad/infrastructure/logger"
"os" "os"
"runtime" "runtime"
"github.com/kaspanet/kaspad/infrastructure/logger"
"github.com/kaspanet/kaspad/infrastructure/os/limits" "github.com/kaspanet/kaspad/infrastructure/os/limits"
"github.com/kaspanet/kaspad/util/panics" "github.com/kaspanet/kaspad/util/panics"
) )
@@ -77,7 +78,7 @@ func realMain() error {
func main() { func main() {
// Use all processor cores and up some limits. // Use all processor cores and up some limits.
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
if err := limits.SetLimits(); err != nil { if err := limits.SetLimits(nil); err != nil {
os.Exit(1) os.Exit(1)
} }

View File

@@ -107,7 +107,6 @@ type Flags struct {
ProxyPass string `long:"proxypass" default-mask:"-" description:"Password for proxy server"` ProxyPass string `long:"proxypass" default-mask:"-" description:"Password for proxy server"`
DbType string `long:"dbtype" description:"Database backend to use for the Block DAG"` DbType string `long:"dbtype" description:"Database backend to use for the Block DAG"`
Profile string `long:"profile" description:"Enable HTTP profiling on given port -- NOTE port must be between 1024 and 65536"` Profile string `long:"profile" description:"Enable HTTP profiling on given port -- NOTE port must be between 1024 and 65536"`
CPUProfile string `long:"cpuprofile" description:"Write CPU profile to the specified file"`
DebugLevel string `short:"d" long:"debuglevel" description:"Logging level for all subsystems {trace, debug, info, warn, error, critical} -- You may also specify <subsystem>=<level>,<subsystem2>=<level>,... to set the log level for individual subsystems -- Use show to list available subsystems"` DebugLevel string `short:"d" long:"debuglevel" description:"Logging level for all subsystems {trace, debug, info, warn, error, critical} -- You may also specify <subsystem>=<level>,<subsystem2>=<level>,... to set the log level for individual subsystems -- Use show to list available subsystems"`
Upnp bool `long:"upnp" description:"Use UPnP to map our listening port outside of NAT"` Upnp bool `long:"upnp" description:"Use UPnP to map our listening port outside of NAT"`
MinRelayTxFee float64 `long:"minrelaytxfee" description:"The minimum transaction fee in KAS/kB to be considered a non-zero fee."` MinRelayTxFee float64 `long:"minrelaytxfee" description:"The minimum transaction fee in KAS/kB to be considered a non-zero fee."`
@@ -124,6 +123,7 @@ type Flags struct {
ResetDatabase bool `long:"reset-db" description:"Reset database before starting node. It's needed when switching between subnetworks."` ResetDatabase bool `long:"reset-db" description:"Reset database before starting node. It's needed when switching between subnetworks."`
MaxUTXOCacheSize uint64 `long:"maxutxocachesize" description:"Max size of loaded UTXO into ram from the disk in bytes"` MaxUTXOCacheSize uint64 `long:"maxutxocachesize" description:"Max size of loaded UTXO into ram from the disk in bytes"`
NetworkFlags NetworkFlags
ServiceOptions *ServiceOptions
} }
// Config defines the configuration options for kaspad. // Config defines the configuration options for kaspad.
@@ -139,9 +139,9 @@ type Config struct {
SubnetworkID *subnetworkid.SubnetworkID // nil in full nodes SubnetworkID *subnetworkid.SubnetworkID // nil in full nodes
} }
// serviceOptions defines the configuration options for the daemon as a service on // ServiceOptions defines the configuration options for the daemon as a service on
// Windows. // Windows.
type serviceOptions struct { type ServiceOptions struct {
ServiceCommand string `short:"s" long:"service" description:"Service command {install, remove, start, stop}"` ServiceCommand string `short:"s" long:"service" description:"Service command {install, remove, start, stop}"`
} }
@@ -160,10 +160,10 @@ func cleanAndExpandPath(path string) string {
} }
// newConfigParser returns a new command line flags parser. // newConfigParser returns a new command line flags parser.
func newConfigParser(cfgFlags *Flags, so *serviceOptions, options flags.Options) *flags.Parser { func newConfigParser(cfgFlags *Flags, options flags.Options) *flags.Parser {
parser := flags.NewParser(cfgFlags, options) parser := flags.NewParser(cfgFlags, options)
if runtime.GOOS == "windows" { if runtime.GOOS == "windows" {
parser.AddGroup("Service Options", "Service Options", so) parser.AddGroup("Service Options", "Service Options", cfgFlags.ServiceOptions)
} }
return parser return parser
} }
@@ -189,6 +189,7 @@ func defaultFlags() *Flags {
MinRelayTxFee: defaultMinRelayTxFee, MinRelayTxFee: defaultMinRelayTxFee,
AcceptanceIndex: defaultAcceptanceIndex, AcceptanceIndex: defaultAcceptanceIndex,
MaxUTXOCacheSize: defaultMaxUTXOCacheSize, MaxUTXOCacheSize: defaultMaxUTXOCacheSize,
ServiceOptions: &ServiceOptions{},
} }
} }
@@ -211,24 +212,20 @@ func DefaultConfig() *Config {
// The above results in kaspad functioning properly without any config settings // The above results in kaspad functioning properly without any config settings
// while still allowing the user to override settings with config files and // while still allowing the user to override settings with config files and
// command line options. Command line options always take precedence. // command line options. Command line options always take precedence.
func LoadConfig() (cfg *Config, remainingArgs []string, err error) { func LoadConfig() (*Config, error) {
cfgFlags := defaultFlags() cfgFlags := defaultFlags()
// Service options which are only added on Windows.
serviceOpts := serviceOptions{}
// Pre-parse the command line options to see if an alternative config // Pre-parse the command line options to see if an alternative config
// file or the version flag was specified. Any errors aside from the // file or the version flag was specified. Any errors aside from the
// help message error can be ignored here since they will be caught by // help message error can be ignored here since they will be caught by
// the final parse below. // the final parse below.
preCfg := cfgFlags preCfg := cfgFlags
preParser := newConfigParser(preCfg, &serviceOpts, flags.HelpFlag) preParser := newConfigParser(preCfg, flags.HelpFlag)
_, err = preParser.Parse() _, err := preParser.Parse()
if err != nil { if err != nil {
var flagsErr *flags.Error var flagsErr *flags.Error
if ok := errors.As(err, &flagsErr); ok && flagsErr.Type == flags.ErrHelp { if ok := errors.As(err, &flagsErr); ok && flagsErr.Type == flags.ErrHelp {
fmt.Fprintln(os.Stderr, err) return nil, err
return nil, nil, err
} }
} }
@@ -242,21 +239,10 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
os.Exit(0) os.Exit(0)
} }
// Perform service command and exit if specified. Invalid service
// commands show an appropriate error. Only runs on Windows since
// the RunServiceCommand function will be nil when not on Windows.
if serviceOpts.ServiceCommand != "" && RunServiceCommand != nil {
err := RunServiceCommand(serviceOpts.ServiceCommand)
if err != nil {
fmt.Fprintln(os.Stderr, err)
}
os.Exit(0)
}
// Load additional config from file. // Load additional config from file.
var configFileError error var configFileError error
parser := newConfigParser(cfgFlags, &serviceOpts, flags.Default) parser := newConfigParser(cfgFlags, flags.Default)
cfg = &Config{ cfg := &Config{
Flags: cfgFlags, Flags: cfgFlags,
} }
if !preCfg.Simnet || preCfg.ConfigFile != if !preCfg.Simnet || preCfg.ConfigFile !=
@@ -265,31 +251,27 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
if _, err := os.Stat(preCfg.ConfigFile); os.IsNotExist(err) { if _, err := os.Stat(preCfg.ConfigFile); os.IsNotExist(err) {
err := createDefaultConfigFile(preCfg.ConfigFile) err := createDefaultConfigFile(preCfg.ConfigFile)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "Error creating a "+ return nil, errors.Wrap(err, "Error creating a default config file")
"default config file: %s\n", err)
} }
} }
err := flags.NewIniParser(parser).ParseFile(preCfg.ConfigFile) err := flags.NewIniParser(parser).ParseFile(preCfg.ConfigFile)
if err != nil { if err != nil {
if pErr := &(os.PathError{}); !errors.As(err, &pErr) { if pErr := &(os.PathError{}); !errors.As(err, &pErr) {
fmt.Fprintf(os.Stderr, "Error parsing config "+ return nil, errors.Wrapf(err, "Error parsing config file: %s\n\n%s", err, usageMessage)
"file: %s\n", err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
} }
configFileError = err configFileError = err
} }
} }
// Parse command line options again to ensure they take precedence. // Parse command line options again to ensure they take precedence.
remainingArgs, err = parser.Parse() _, err = parser.Parse()
if err != nil { if err != nil {
var flagsErr *flags.Error var flagsErr *flags.Error
if ok := errors.As(err, &flagsErr); !ok || flagsErr.Type != flags.ErrHelp { if ok := errors.As(err, &flagsErr); !ok || flagsErr.Type != flags.ErrHelp {
fmt.Fprintln(os.Stderr, usageMessage) return nil, errors.Wrapf(err, "Error parsing command line arguments: %s\n\n%s", err, usageMessage)
} }
return nil, nil, err return nil, err
} }
// Create the home directory if it doesn't already exist. // Create the home directory if it doesn't already exist.
@@ -309,13 +291,12 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
str := "%s: Failed to create home directory: %s" str := "%s: Failed to create home directory: %s"
err := errors.Errorf(str, funcName, err) err := errors.Errorf(str, funcName, err)
fmt.Fprintln(os.Stderr, err) return nil, err
return nil, nil, err
} }
err = cfg.ResolveNetwork(parser) err = cfg.ResolveNetwork(parser)
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
// Set the default policy for relaying non-standard transactions // Set the default policy for relaying non-standard transactions
@@ -330,7 +311,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName) err := errors.Errorf(str, funcName)
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage) fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err return nil, err
case cfg.RejectNonStd: case cfg.RejectNonStd:
relayNonStd = false relayNonStd = false
case cfg.RelayNonStd: case cfg.RelayNonStd:
@@ -367,7 +348,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf("%s: %s", funcName, err.Error()) err := errors.Errorf("%s: %s", funcName, err.Error())
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage) fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err return nil, err
} }
// Validate profile port number // Validate profile port number
@@ -378,7 +359,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName) err := errors.Errorf(str, funcName)
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage) fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err return nil, err
} }
} }
@@ -388,7 +369,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName, cfg.BanDuration) err := errors.Errorf(str, funcName, cfg.BanDuration)
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage) fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err return nil, err
} }
// Validate any given whitelisted IP addresses and networks. // Validate any given whitelisted IP addresses and networks.
@@ -405,7 +386,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err = errors.Errorf(str, funcName, addr) err = errors.Errorf(str, funcName, addr)
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage) fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err return nil, err
} }
var bits int var bits int
if ip.To4() == nil { if ip.To4() == nil {
@@ -430,7 +411,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName) err := errors.Errorf(str, funcName)
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage) fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err return nil, err
} }
// --proxy or --connect without --listen disables listening. // --proxy or --connect without --listen disables listening.
@@ -473,7 +454,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName, cfg.RPCMaxConcurrentReqs) err := errors.Errorf(str, funcName, cfg.RPCMaxConcurrentReqs)
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage) fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err return nil, err
} }
// Validate the the minrelaytxfee. // Validate the the minrelaytxfee.
@@ -483,7 +464,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName, err) err := errors.Errorf(str, funcName, err)
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage) fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err return nil, err
} }
// Disallow 0 and negative min tx fees. // Disallow 0 and negative min tx fees.
@@ -492,7 +473,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName, cfg.MinRelayTxFee) err := errors.Errorf(str, funcName, cfg.MinRelayTxFee)
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage) fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err return nil, err
} }
// Limit the max block mass to a sane value. // Limit the max block mass to a sane value.
@@ -505,7 +486,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
blockMaxMassMax, cfg.BlockMaxMass) blockMaxMassMax, cfg.BlockMaxMass)
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage) fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err return nil, err
} }
// Limit the max orphan count to a sane value. // Limit the max orphan count to a sane value.
@@ -515,7 +496,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName, cfg.MaxOrphanTxs) err := errors.Errorf(str, funcName, cfg.MaxOrphanTxs)
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage) fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err return nil, err
} }
// Look for illegal characters in the user agent comments. // Look for illegal characters in the user agent comments.
@@ -526,7 +507,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
funcName) funcName)
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage) fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err return nil, err
} }
} }
@@ -537,7 +518,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
funcName) funcName)
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage) fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err return nil, err
} }
// Add default port to all listener addresses if needed and remove // Add default port to all listener addresses if needed and remove
@@ -545,7 +526,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
cfg.Listeners, err = network.NormalizeAddresses(cfg.Listeners, cfg.Listeners, err = network.NormalizeAddresses(cfg.Listeners,
cfg.NetParams().DefaultPort) cfg.NetParams().DefaultPort)
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
// Add default port to all rpc listener addresses if needed and remove // Add default port to all rpc listener addresses if needed and remove
@@ -553,7 +534,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
cfg.RPCListeners, err = network.NormalizeAddresses(cfg.RPCListeners, cfg.RPCListeners, err = network.NormalizeAddresses(cfg.RPCListeners,
cfg.NetParams().RPCPort) cfg.NetParams().RPCPort)
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
// Disallow --addpeer and --connect used together // Disallow --addpeer and --connect used together
@@ -562,7 +543,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName) err := errors.Errorf(str, funcName)
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage) fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err return nil, err
} }
// Add default port to all added peer addresses if needed and remove // Add default port to all added peer addresses if needed and remove
@@ -570,13 +551,13 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
cfg.AddPeers, err = network.NormalizeAddresses(cfg.AddPeers, cfg.AddPeers, err = network.NormalizeAddresses(cfg.AddPeers,
cfg.NetParams().DefaultPort) cfg.NetParams().DefaultPort)
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
cfg.ConnectPeers, err = network.NormalizeAddresses(cfg.ConnectPeers, cfg.ConnectPeers, err = network.NormalizeAddresses(cfg.ConnectPeers,
cfg.NetParams().DefaultPort) cfg.NetParams().DefaultPort)
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
// Setup dial and DNS resolution (lookup) functions depending on the // Setup dial and DNS resolution (lookup) functions depending on the
@@ -593,7 +574,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName, cfg.Proxy, err) err := errors.Errorf(str, funcName, cfg.Proxy, err)
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage) fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err return nil, err
} }
proxy := &socks.Proxy{ proxy := &socks.Proxy{
@@ -611,7 +592,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
log.Warnf("%s", configFileError) log.Warnf("%s", configFileError)
} }
return cfg, remainingArgs, nil return cfg, nil
} }
// createDefaultConfig copies the file sample-kaspad.conf to the given destination path, // createDefaultConfig copies the file sample-kaspad.conf to the given destination path,

View File

@@ -53,6 +53,7 @@ var (
dnssLog = BackendLog.Logger("DNSS") dnssLog = BackendLog.Logger("DNSS")
snvrLog = BackendLog.Logger("SNVR") snvrLog = BackendLog.Logger("SNVR")
ibdsLog = BackendLog.Logger("IBDS") ibdsLog = BackendLog.Logger("IBDS")
wsvcLog = BackendLog.Logger("WSVC")
) )
// SubsystemTags is an enum of all sub system tags // SubsystemTags is an enum of all sub system tags
@@ -83,7 +84,8 @@ var SubsystemTags = struct {
NTAR, NTAR,
DNSS, DNSS,
SNVR, SNVR,
IBDS string IBDS,
WSVC string
}{ }{
ADXR: "ADXR", ADXR: "ADXR",
AMGR: "AMGR", AMGR: "AMGR",
@@ -112,6 +114,7 @@ var SubsystemTags = struct {
DNSS: "DNSS", DNSS: "DNSS",
SNVR: "SNVR", SNVR: "SNVR",
IBDS: "IBDS", IBDS: "IBDS",
WSVC: "WSVC",
} }
// subsystemLoggers maps each subsystem identifier to its associated logger. // subsystemLoggers maps each subsystem identifier to its associated logger.
@@ -143,6 +146,7 @@ var subsystemLoggers = map[string]*Logger{
SubsystemTags.DNSS: dnssLog, SubsystemTags.DNSS: dnssLog,
SubsystemTags.SNVR: snvrLog, SubsystemTags.SNVR: snvrLog,
SubsystemTags.IBDS: ibdsLog, SubsystemTags.IBDS: ibdsLog,
SubsystemTags.WSVC: wsvcLog,
} }
// InitLog attaches log file and error log file to the backend log. // InitLog attaches log file and error log file to the backend log.

View File

@@ -0,0 +1,22 @@
package execenv
import (
"fmt"
"os"
"runtime"
"github.com/kaspanet/kaspad/infrastructure/os/limits"
)
// Initialize initializes the execution environment required to run kaspad
func Initialize(desiredLimits *limits.DesiredLimits) {
// Use all processor cores.
runtime.GOMAXPROCS(runtime.NumCPU())
// Up some limits.
if err := limits.SetLimits(desiredLimits); err != nil {
fmt.Fprintf(os.Stderr, "failed to set limits: %s\n", err)
os.Exit(1)
}
}

View File

@@ -0,0 +1,7 @@
package limits
// DesiredLimits is a structure that specifies the limits desired by a running application
type DesiredLimits struct {
FileLimitWant uint64
FileLimitMin uint64
}

View File

@@ -5,6 +5,6 @@
package limits package limits
// SetLimits is a no-op on Plan 9 due to the lack of process accounting. // SetLimits is a no-op on Plan 9 due to the lack of process accounting.
func SetLimits() error { func SetLimits(*DesiredLimits) error {
return nil return nil
} }

View File

@@ -7,41 +7,39 @@
package limits package limits
import ( import (
"github.com/pkg/errors"
"syscall" "syscall"
"github.com/pkg/errors"
) )
const ( const ()
fileLimitWant = 2048
fileLimitMin = 1024
)
// SetLimits raises some process limits to values which allow kaspad and // SetLimits raises some process limits to values which allow kaspad and
// associated utilities to run. // associated utilities to run.
func SetLimits() error { func SetLimits(desiredLimits *DesiredLimits) error {
var rLimit syscall.Rlimit var rLimit syscall.Rlimit
err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit) err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit)
if err != nil { if err != nil {
return err return err
} }
if rLimit.Cur > fileLimitWant { if rLimit.Cur > desiredLimits.FileLimitWant {
return nil return nil
} }
if rLimit.Max < fileLimitMin { if rLimit.Max < desiredLimits.FileLimitMin {
err = errors.Errorf("need at least %d file descriptors", err = errors.Errorf("need at least %d file descriptors",
fileLimitMin) desiredLimits.FileLimitMin)
return err return err
} }
if rLimit.Max < fileLimitWant { if rLimit.Max < desiredLimits.FileLimitWant {
rLimit.Cur = rLimit.Max rLimit.Cur = rLimit.Max
} else { } else {
rLimit.Cur = fileLimitWant rLimit.Cur = desiredLimits.FileLimitWant
} }
err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit) err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit)
if err != nil { if err != nil {
// try min value // try min value
rLimit.Cur = fileLimitMin rLimit.Cur = desiredLimits.FileLimitMin
err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit) err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit)
if err != nil { if err != nil {
return err return err

View File

@@ -5,6 +5,6 @@
package limits package limits
// SetLimits is a no-op on Windows since it's not required there. // SetLimits is a no-op on Windows since it's not required there.
func SetLimits() error { func SetLimits(*DesiredLimits) error {
return nil return nil
} }

View File

@@ -0,0 +1,17 @@
package winservice
import "github.com/kaspanet/kaspad/infrastructure/config"
// ServiceDescription contains information about a service, needed to administer it
type ServiceDescription struct {
Name string
DisplayName string
Description string
}
// MainFunc specifies the signature of an application's main function to be able to run as a windows service
type MainFunc func(startedChan chan<- struct{}) error
// WinServiceMain is only invoked on Windows. It detects when kaspad is running
// as a service and reacts accordingly.
var WinServiceMain = func(MainFunc, *ServiceDescription, *config.Config) (bool, error) { return false, nil }

View File

@@ -0,0 +1,13 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package winservice
import (
"github.com/kaspanet/kaspad/infrastructure/logger"
"github.com/kaspanet/kaspad/util/panics"
)
var log, _ = logger.Get(logger.SubsystemTags.CNFG)
var spawn = panics.GoroutineWrapperFunc(log)

View File

@@ -0,0 +1,178 @@
package winservice
import (
"os"
"path/filepath"
"time"
"github.com/btcsuite/winsvc/eventlog"
"github.com/btcsuite/winsvc/mgr"
"github.com/btcsuite/winsvc/svc"
"github.com/pkg/errors"
)
// performServiceCommand attempts to run one of the supported service commands
// provided on the command line via the service command flag. An appropriate
// error is returned if an invalid command is specified.
func (s *Service) performServiceCommand() error {
var err error
command := s.cfg.ServiceOptions.ServiceCommand
switch command {
case "install":
err = s.installService()
case "remove":
err = s.removeService()
case "start":
err = s.startService()
case "stop":
err = s.controlService(svc.Stop, svc.Stopped)
default:
err = errors.Errorf("invalid service command [%s]", command)
}
return err
}
// installService attempts to install the kaspad service. Typically this should
// be done by the msi installer, but it is provided here since it can be useful
// for development.
func (s *Service) installService() error {
// Get the path of the current executable. This is needed because
// os.Args[0] can vary depending on how the application was launched.
// For example, under cmd.exe it will only be the name of the app
// without the path or extension, but under mingw it will be the full
// path including the extension.
exePath, err := filepath.Abs(os.Args[0])
if err != nil {
return err
}
if filepath.Ext(exePath) == "" {
exePath += ".exe"
}
// Connect to the windows service manager.
serviceManager, err := mgr.Connect()
if err != nil {
return err
}
defer serviceManager.Disconnect()
// Ensure the service doesn't already exist.
service, err := serviceManager.OpenService(s.description.Name)
if err == nil {
service.Close()
return errors.Errorf("service %s already exists", s.description.Name)
}
// Install the service.
service, err = serviceManager.CreateService(s.description.Name, exePath, mgr.Config{
DisplayName: s.description.DisplayName,
Description: s.description.Description,
})
if err != nil {
return err
}
defer service.Close()
// Support events to the event log using the standard "standard" Windows
// EventCreate.exe message file. This allows easy logging of custom
// messges instead of needing to create our own message catalog.
err = eventlog.Remove(s.description.Name)
if err != nil {
return err
}
eventsSupported := uint32(eventlog.Error | eventlog.Warning | eventlog.Info)
return eventlog.InstallAsEventCreate(s.description.Name, eventsSupported)
}
// removeService attempts to uninstall the kaspad service. Typically this should
// be done by the msi uninstaller, but it is provided here since it can be
// useful for development. Not the eventlog entry is intentionally not removed
// since it would invalidate any existing event log messages.
func (s *Service) removeService() error {
// Connect to the windows service manager.
serviceManager, err := mgr.Connect()
if err != nil {
return err
}
defer serviceManager.Disconnect()
// Ensure the service exists.
service, err := serviceManager.OpenService(s.description.Name)
if err != nil {
return errors.Errorf("service %s is not installed", s.description.Name)
}
defer service.Close()
// Remove the service.
return service.Delete()
}
// startService attempts to Start the kaspad service.
func (s *Service) startService() error {
// Connect to the windows service manager.
serviceManager, err := mgr.Connect()
if err != nil {
return err
}
defer serviceManager.Disconnect()
service, err := serviceManager.OpenService(s.description.Name)
if err != nil {
return errors.Errorf("could not access service: %s", err)
}
defer service.Close()
err = service.Start(os.Args)
if err != nil {
return errors.Errorf("could not start service: %s", err)
}
return nil
}
// controlService allows commands which change the status of the service. It
// also waits for up to 10 seconds for the service to change to the passed
// state.
func (s *Service) controlService(c svc.Cmd, to svc.State) error {
// Connect to the windows service manager.
serviceManager, err := mgr.Connect()
if err != nil {
return err
}
defer serviceManager.Disconnect()
service, err := serviceManager.OpenService(s.description.Name)
if err != nil {
return errors.Errorf("could not access service: %s", err)
}
defer service.Close()
status, err := service.Control(c)
if err != nil {
return errors.Errorf("could not send control=%d: %s", c, err)
}
// Send the control message.
timeout := time.Now().Add(10 * time.Second)
for status.State != to {
if timeout.Before(time.Now()) {
return errors.Errorf("timeout waiting for service to go "+
"to state=%d", to)
}
time.Sleep(300 * time.Millisecond)
status, err = service.Query()
if err != nil {
return errors.Errorf("could not retrieve service "+
"status: %s", err)
}
}
return nil
}

View File

@@ -0,0 +1,40 @@
package winservice
import (
"github.com/btcsuite/winsvc/svc"
"github.com/kaspanet/kaspad/infrastructure/config"
)
// serviceMain checks whether we're being invoked as a service, and if so uses
// the service control manager to start the long-running server. A flag is
// returned to the caller so the application can determine whether to exit (when
// running as a service) or launch in normal interactive mode.
func serviceMain(main MainFunc, description *ServiceDescription, cfg *config.Config) (bool, error) {
service := newService(main, description, cfg)
if cfg.ServiceOptions.ServiceCommand != "" {
return true, service.performServiceCommand()
}
// Don't run as a service if we're running interactively (or that can't
// be determined due to an error).
isInteractive, err := svc.IsAnInteractiveSession()
if err != nil {
return false, err
}
if isInteractive {
return false, nil
}
err = service.Start()
if err != nil {
return true, err
}
return true, nil
}
// Set windows specific functions to real functions.
func init() {
WinServiceMain = serviceMain
}

View File

@@ -0,0 +1,119 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package winservice
import (
"fmt"
"github.com/btcsuite/winsvc/eventlog"
"github.com/btcsuite/winsvc/svc"
"github.com/kaspanet/kaspad/infrastructure/config"
"github.com/kaspanet/kaspad/infrastructure/os/signal"
"github.com/kaspanet/kaspad/version"
)
// Service houses the main service handler which handles all service
// updates and launching the application's main.
type Service struct {
main MainFunc
description *ServiceDescription
cfg *config.Config
eventLog *eventlog.Log
}
func newService(main MainFunc, description *ServiceDescription, cfg *config.Config) *Service {
return &Service{
main: main,
description: description,
cfg: cfg,
}
}
// Start starts the srevice
func (s *Service) Start() error {
elog, err := eventlog.Open(s.description.Name)
if err != nil {
return err
}
s.eventLog = elog
defer s.eventLog.Close()
err = svc.Run(s.description.Name, &Service{})
if err != nil {
s.eventLog.Error(1, fmt.Sprintf("Service start failed: %s", err))
return err
}
return nil
}
// Execute is the main entry point the winsvc package calls when receiving
// information from the Windows service control manager. It launches the
// long-running kaspadMain (which is the real meat of kaspad), handles service
// change requests, and notifies the service control manager of changes.
func (s *Service) Execute(args []string, r <-chan svc.ChangeRequest, changes chan<- svc.Status) (bool, uint32) {
// Service start is pending.
const cmdsAccepted = svc.AcceptStop | svc.AcceptShutdown
changes <- svc.Status{State: svc.StartPending}
// Start kaspadMain in a separate goroutine so the service can start
// quickly. Shutdown (along with a potential error) is reported via
// doneChan. startedChan is notified once kaspad is started so this can
// be properly logged
doneChan := make(chan error)
startedChan := make(chan struct{})
spawn("kaspadMain-windows", func() {
err := s.main(startedChan)
doneChan <- err
})
// Service is now started.
changes <- svc.Status{State: svc.Running, Accepts: cmdsAccepted}
loop:
for {
select {
case c := <-r:
switch c.Cmd {
case svc.Interrogate:
changes <- c.CurrentStatus
case svc.Stop, svc.Shutdown:
// Service stop is pending. Don't accept any
// more commands while pending.
changes <- svc.Status{State: svc.StopPending}
// Signal the main function to exit.
signal.ShutdownRequestChannel <- struct{}{}
default:
s.eventLog.Error(1, fmt.Sprintf("Unexpected control "+
"request #%d.", c))
}
case <-startedChan:
s.logServiceStart()
case err := <-doneChan:
if err != nil {
s.eventLog.Error(1, err.Error())
}
break loop
}
}
// Service is now stopped.
changes <- svc.Status{State: svc.Stopped}
return false, 0
}
// logServiceStart logs information about kaspad when the main server has
// been started to the Windows event log.
func (s *Service) logServiceStart() {
var message string
message += fmt.Sprintf("%s version %s\n", s.description.DisplayName, version.Version())
message += fmt.Sprintf("Configuration file: %s\n", s.cfg.ConfigFile)
message += fmt.Sprintf("Data directory: %s\n", s.cfg.DataDir)
message += fmt.Sprintf("Logs directory: %s\n", s.cfg.LogDir)
}

2
log.go
View File

@@ -7,8 +7,6 @@ package main
import ( import (
"github.com/kaspanet/kaspad/infrastructure/logger" "github.com/kaspanet/kaspad/infrastructure/logger"
"github.com/kaspanet/kaspad/util/panics"
) )
var log, _ = logger.Get(logger.SubsystemTags.KASD) var log, _ = logger.Get(logger.SubsystemTags.KASD)
var spawn = panics.GoroutineWrapperFunc(log)

200
main.go
View File

@@ -5,212 +5,14 @@
package main package main
import ( import (
"fmt"
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"path/filepath"
"runtime"
"runtime/pprof"
"time"
"github.com/kaspanet/kaspad/app" "github.com/kaspanet/kaspad/app"
"github.com/kaspanet/kaspad/infrastructure/db/dbaccess"
"github.com/kaspanet/kaspad/domain/blockdag/indexers"
"github.com/kaspanet/kaspad/infrastructure/config"
"github.com/kaspanet/kaspad/infrastructure/os/limits"
"github.com/kaspanet/kaspad/infrastructure/os/signal"
"github.com/kaspanet/kaspad/util/panics"
"github.com/kaspanet/kaspad/util/profiling"
"github.com/kaspanet/kaspad/version"
) )
const (
// blockDbNamePrefix is the prefix for the block database name. The
// database type is appended to this value to form the full block
// database name.
blockDbNamePrefix = "blocks"
)
// winServiceMain is only invoked on Windows. It detects when kaspad is running
// as a service and reacts accordingly.
var winServiceMain func() (bool, error)
// kaspadMain is the real main function for kaspad. It is necessary to work
// around the fact that deferred functions do not run when os.Exit() is called.
// The optional startedChan writes once all services has started.
func kaspadMain(startedChan chan<- struct{}) error {
interrupt := signal.InterruptListener()
// Load configuration and parse command line. This function also
// initializes logging and configures it accordingly.
cfg, _, err := config.LoadConfig()
if err != nil {
return err
}
defer panics.HandlePanic(log, "MAIN", nil)
// Get a channel that will be closed when a shutdown signal has been
// triggered either from an OS signal such as SIGINT (Ctrl+C) or from
// another subsystem such as the RPC server.
defer log.Info("Shutdown complete")
// Show version at startup.
log.Infof("Version %s", version.Version())
// Enable http profiling server if requested.
if cfg.Profile != "" {
profiling.Start(cfg.Profile, log)
}
// Write cpu profile if requested.
if cfg.CPUProfile != "" {
f, err := os.Create(cfg.CPUProfile)
if err != nil {
log.Errorf("Unable to create cpu profile: %s", err)
return err
}
pprof.StartCPUProfile(f)
defer f.Close()
defer pprof.StopCPUProfile()
}
// Perform upgrades to kaspad as new versions require it.
if err := doUpgrades(); err != nil {
log.Errorf("%s", err)
return err
}
// Return now if an interrupt signal was triggered.
if signal.InterruptRequested(interrupt) {
return nil
}
if cfg.ResetDatabase {
err := removeDatabase(cfg)
if err != nil {
log.Errorf("%s", err)
return err
}
}
// Open the database
databaseContext, err := openDB(cfg)
if err != nil {
log.Errorf("%s", err)
return err
}
defer func() {
log.Infof("Gracefully shutting down the database...")
err := databaseContext.Close()
if err != nil {
log.Errorf("Failed to close the database: %s", err)
}
}()
// Return now if an interrupt signal was triggered.
if signal.InterruptRequested(interrupt) {
return nil
}
// Drop indexes and exit if requested.
if cfg.DropAcceptanceIndex {
if err := indexers.DropAcceptanceIndex(databaseContext); err != nil {
log.Errorf("%s", err)
return err
}
return nil
}
// Create app and start it.
app, err := app.New(cfg, databaseContext, interrupt)
if err != nil {
log.Errorf("Unable to start kaspad: %+v", err)
return err
}
defer func() {
log.Infof("Gracefully shutting down kaspad...")
shutdownDone := make(chan struct{})
go func() {
app.Stop()
shutdownDone <- struct{}{}
}()
const shutdownTimeout = 2 * time.Minute
select {
case <-shutdownDone:
case <-time.After(shutdownTimeout):
log.Criticalf("Graceful shutdown timed out %s. Terminating...", shutdownTimeout)
}
log.Infof("Kaspad shutdown complete")
}()
app.Start()
if startedChan != nil {
startedChan <- struct{}{}
}
// Wait until the interrupt signal is received from an OS signal or
// shutdown is requested through one of the subsystems such as the RPC
// server.
<-interrupt
return nil
}
func removeDatabase(cfg *config.Config) error {
dbPath := blockDbPath(cfg)
return os.RemoveAll(dbPath)
}
// dbPath returns the path to the block database given a database type.
func blockDbPath(cfg *config.Config) string {
// The database name is based on the database type.
dbName := blockDbNamePrefix + "_" + cfg.DbType
dbPath := filepath.Join(cfg.DataDir, dbName)
return dbPath
}
func openDB(cfg *config.Config) (*dbaccess.DatabaseContext, error) {
dbPath := filepath.Join(cfg.DataDir, "db")
log.Infof("Loading database from '%s'", dbPath)
return dbaccess.New(dbPath)
}
func main() { func main() {
// Use all processor cores. if err := app.StartApp(); err != nil {
runtime.GOMAXPROCS(runtime.NumCPU())
// Up some limits.
if err := limits.SetLimits(); err != nil {
fmt.Fprintf(os.Stderr, "failed to set limits: %s\n", err)
os.Exit(1)
}
// Call serviceMain on Windows to handle running as a service. When
// the return isService flag is true, exit now since we ran as a
// service. Otherwise, just fall through to normal operation.
if runtime.GOOS == "windows" {
isService, err := winServiceMain()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if isService {
os.Exit(0)
}
}
// Work around defer not working after os.Exit()
if err := kaspadMain(nil); err != nil {
os.Exit(1) os.Exit(1)
} }
} }
// doUpgrades performs upgrades to kaspad as new versions require it.
// currently it's a placeholder we got from kaspad upstream, that does nothing
func doUpgrades() error {
return nil
}

View File

@@ -1,309 +0,0 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"fmt"
"os"
"path/filepath"
"time"
"github.com/pkg/errors"
"github.com/btcsuite/winsvc/eventlog"
"github.com/btcsuite/winsvc/mgr"
"github.com/btcsuite/winsvc/svc"
"github.com/kaspanet/kaspad/infrastructure/config"
"github.com/kaspanet/kaspad/infrastructure/os/signal"
"github.com/kaspanet/kaspad/version"
)
const (
// svcName is the name of kaspad service.
svcName = "kaspadsvc"
// svcDisplayName is the service name that will be shown in the windows
// services list. Not the svcName is the "real" name which is used
// to control the service. This is only for display purposes.
svcDisplayName = "Kaspad Service"
// svcDesc is the description of the service.
svcDesc = "Downloads and stays synchronized with the Kaspa block " +
"DAG and provides DAG services to applications."
)
// elog is used to send messages to the Windows event log.
var elog *eventlog.Log
// logServiceStart logs information about kaspad when the main server has
// been started to the Windows event log.
func logServiceStart() {
var message string
message += fmt.Sprintf("Version %s\n", version.Version())
message += fmt.Sprintf("Configuration directory: %s\n", config.DefaultHomeDir)
message += fmt.Sprintf("Configuration file: %s\n", cfg.ConfigFile)
message += fmt.Sprintf("Data directory: %s\n", cfg.DataDir)
elog.Info(1, message)
}
// kaspadService houses the main service handler which handles all service
// updates and launching kaspadMain.
type kaspadService struct{}
// Execute is the main entry point the winsvc package calls when receiving
// information from the Windows service control manager. It launches the
// long-running kaspadMain (which is the real meat of kaspad), handles service
// change requests, and notifies the service control manager of changes.
func (s *kaspadService) Execute(args []string, r <-chan svc.ChangeRequest, changes chan<- svc.Status) (bool, uint32) {
// Service start is pending.
const cmdsAccepted = svc.AcceptStop | svc.AcceptShutdown
changes <- svc.Status{State: svc.StartPending}
// Start kaspadMain in a separate goroutine so the service can start
// quickly. Shutdown (along with a potential error) is reported via
// doneChan. startedChan is notified once kaspad is started so this can
// be properly logged
doneChan := make(chan error)
startedChan := make(chan struct{})
spawn("kaspadMain-windows", func() {
err := kaspadMain(startedChan)
doneChan <- err
})
// Service is now started.
changes <- svc.Status{State: svc.Running, Accepts: cmdsAccepted}
loop:
for {
select {
case c := <-r:
switch c.Cmd {
case svc.Interrogate:
changes <- c.CurrentStatus
case svc.Stop, svc.Shutdown:
// Service stop is pending. Don't accept any
// more commands while pending.
changes <- svc.Status{State: svc.StopPending}
// Signal the main function to exit.
signal.ShutdownRequestChannel <- struct{}{}
default:
elog.Error(1, fmt.Sprintf("Unexpected control "+
"request #%d.", c))
}
case <-startedChan:
logServiceStart()
case err := <-doneChan:
if err != nil {
elog.Error(1, err.Error())
}
break loop
}
}
// Service is now stopped.
changes <- svc.Status{State: svc.Stopped}
return false, 0
}
// installService attempts to install the kaspad service. Typically this should
// be done by the msi installer, but it is provided here since it can be useful
// for development.
func installService() error {
// Get the path of the current executable. This is needed because
// os.Args[0] can vary depending on how the application was launched.
// For example, under cmd.exe it will only be the name of the app
// without the path or extension, but under mingw it will be the full
// path including the extension.
exePath, err := filepath.Abs(os.Args[0])
if err != nil {
return err
}
if filepath.Ext(exePath) == "" {
exePath += ".exe"
}
// Connect to the windows service manager.
serviceManager, err := mgr.Connect()
if err != nil {
return err
}
defer serviceManager.Disconnect()
// Ensure the service doesn't already exist.
service, err := serviceManager.OpenService(svcName)
if err == nil {
service.Close()
return errors.Errorf("service %s already exists", svcName)
}
// Install the service.
service, err = serviceManager.CreateService(svcName, exePath, mgr.Config{
DisplayName: svcDisplayName,
Description: svcDesc,
})
if err != nil {
return err
}
defer service.Close()
// Support events to the event log using the standard "standard" Windows
// EventCreate.exe message file. This allows easy logging of custom
// messges instead of needing to create our own message catalog.
eventlog.Remove(svcName)
eventsSupported := uint32(eventlog.Error | eventlog.Warning | eventlog.Info)
return eventlog.InstallAsEventCreate(svcName, eventsSupported)
}
// removeService attempts to uninstall the kaspad service. Typically this should
// be done by the msi uninstaller, but it is provided here since it can be
// useful for development. Not the eventlog entry is intentionally not removed
// since it would invalidate any existing event log messages.
func removeService() error {
// Connect to the windows service manager.
serviceManager, err := mgr.Connect()
if err != nil {
return err
}
defer serviceManager.Disconnect()
// Ensure the service exists.
service, err := serviceManager.OpenService(svcName)
if err != nil {
return errors.Errorf("service %s is not installed", svcName)
}
defer service.Close()
// Remove the service.
return service.Delete()
}
// startService attempts to Start the kaspad service.
func startService() error {
// Connect to the windows service manager.
serviceManager, err := mgr.Connect()
if err != nil {
return err
}
defer serviceManager.Disconnect()
service, err := serviceManager.OpenService(svcName)
if err != nil {
return errors.Errorf("could not access service: %s", err)
}
defer service.Close()
err = service.Start(os.Args)
if err != nil {
return errors.Errorf("could not start service: %s", err)
}
return nil
}
// controlService allows commands which change the status of the service. It
// also waits for up to 10 seconds for the service to change to the passed
// state.
func controlService(c svc.Cmd, to svc.State) error {
// Connect to the windows service manager.
serviceManager, err := mgr.Connect()
if err != nil {
return err
}
defer serviceManager.Disconnect()
service, err := serviceManager.OpenService(svcName)
if err != nil {
return errors.Errorf("could not access service: %s", err)
}
defer service.Close()
status, err := service.Control(c)
if err != nil {
return errors.Errorf("could not send control=%d: %s", c, err)
}
// Send the control message.
timeout := time.Now().Add(10 * time.Second)
for status.State != to {
if timeout.Before(time.Now()) {
return errors.Errorf("timeout waiting for service to go "+
"to state=%d", to)
}
time.Sleep(300 * time.Millisecond)
status, err = service.Query()
if err != nil {
return errors.Errorf("could not retrieve service "+
"status: %s", err)
}
}
return nil
}
// performServiceCommand attempts to run one of the supported service commands
// provided on the command line via the service command flag. An appropriate
// error is returned if an invalid command is specified.
func performServiceCommand(command string) error {
var err error
switch command {
case "install":
err = installService()
case "remove":
err = removeService()
case "start":
err = startService()
case "stop":
err = controlService(svc.Stop, svc.Stopped)
default:
err = errors.Errorf("invalid service command [%s]", command)
}
return err
}
// serviceMain checks whether we're being invoked as a service, and if so uses
// the service control manager to start the long-running server. A flag is
// returned to the caller so the application can determine whether to exit (when
// running as a service) or launch in normal interactive mode.
func serviceMain() (bool, error) {
// Don't run as a service if we're running interactively (or that can't
// be determined due to an error).
isInteractive, err := svc.IsAnInteractiveSession()
if err != nil {
return false, err
}
if isInteractive {
return false, nil
}
elog, err = eventlog.Open(svcName)
if err != nil {
return false, err
}
defer elog.Close()
err = svc.Run(svcName, &kaspadService{})
if err != nil {
elog.Error(1, fmt.Sprintf("Service start failed: %s", err))
return true, err
}
return true, nil
}
// Set windows specific functions to real functions.
func init() {
config.RunServiceCommand = performServiceCommand
winServiceMain = serviceMain
}

View File

@@ -10,7 +10,7 @@ import (
) )
type appHarness struct { type appHarness struct {
app *app.App app *app.ComponentManager
rpcClient *testRPCClient rpcClient *testRPCClient
p2pAddress string p2pAddress string
rpcAddress string rpcAddress string
@@ -108,7 +108,7 @@ func teardownHarness(t *testing.T, harness *appHarness) {
func setApp(t *testing.T, harness *appHarness) { func setApp(t *testing.T, harness *appHarness) {
var err error var err error
harness.app, err = app.New(harness.config, harness.databaseContext, make(chan struct{})) harness.app, err = app.NewComponentManager(harness.config, harness.databaseContext, make(chan struct{}))
if err != nil { if err != nil {
t.Fatalf("Error creating app: %+v", err) t.Fatalf("Error creating app: %+v", err)
} }