[NOD-1420] Restructure main (#942)

* [NOD-1420] Moved setting limits to executor

* [NOD-1420] Moved all code dealing with windows service to separate package

* [NOD-1420] Move practically all main to restructured app package

* [NOD-1420] Check for running as interactive only after checking if we are doing any service operation

* [NOD-1420] Add comments

* [NOD-1420] Add a comment
This commit is contained in:
Svarog 2020-09-30 17:07:40 +03:00 committed by Mike Zak
parent ef6c46a231
commit 513ffa7e0c
19 changed files with 875 additions and 809 deletions

View File

@ -2,259 +2,194 @@ package app
import (
"fmt"
"sync/atomic"
"os"
"path/filepath"
"runtime"
"time"
"github.com/kaspanet/kaspad/infrastructure/network/addressmanager"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/id"
"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol"
"github.com/kaspanet/kaspad/app/rpc"
"github.com/kaspanet/kaspad/domain/blockdag"
"github.com/kaspanet/kaspad/domain/blockdag/indexers"
"github.com/kaspanet/kaspad/domain/mempool"
"github.com/kaspanet/kaspad/domain/mining"
"github.com/kaspanet/kaspad/domain/txscript"
"github.com/kaspanet/kaspad/infrastructure/config"
"github.com/kaspanet/kaspad/infrastructure/db/dbaccess"
"github.com/kaspanet/kaspad/infrastructure/network/connmanager"
"github.com/kaspanet/kaspad/infrastructure/network/dnsseed"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter"
"github.com/kaspanet/kaspad/domain/blockdag/indexers"
"github.com/kaspanet/kaspad/infrastructure/os/signal"
"github.com/kaspanet/kaspad/util/profiling"
"github.com/kaspanet/kaspad/version"
"github.com/kaspanet/kaspad/util/panics"
"github.com/kaspanet/kaspad/infrastructure/config"
"github.com/kaspanet/kaspad/infrastructure/os/execenv"
"github.com/kaspanet/kaspad/infrastructure/os/limits"
"github.com/kaspanet/kaspad/infrastructure/os/winservice"
)
// App is a wrapper for all the kaspad services
type App struct {
cfg *config.Config
addressManager *addressmanager.AddressManager
protocolManager *protocol.Manager
rpcManager *rpc.Manager
connectionManager *connmanager.ConnectionManager
netAdapter *netadapter.NetAdapter
started, shutdown int32
var desiredLimits = &limits.DesiredLimits{
FileLimitWant: 2048,
FileLimitMin: 1024,
}
// Start launches all the kaspad services.
func (a *App) Start() {
// Already started?
if atomic.AddInt32(&a.started, 1) != 1 {
return
}
log.Trace("Starting kaspad")
err := a.netAdapter.Start()
if err != nil {
panics.Exit(log, fmt.Sprintf("Error starting the net adapter: %+v", err))
}
a.maybeSeedFromDNS()
a.connectionManager.Start()
var serviceDescription = &winservice.ServiceDescription{
Name: "kaspadsvc",
DisplayName: "Kaspad Service",
Description: "Downloads and stays synchronized with the Kaspa blockDAG and " +
"provides DAG services to applications.",
}
// Stop gracefully shuts down all the kaspad services.
func (a *App) Stop() {
// Make sure this only happens once.
if atomic.AddInt32(&a.shutdown, 1) != 1 {
log.Infof("Kaspad is already in the process of shutting down")
return
}
log.Warnf("Kaspad shutting down")
a.connectionManager.Stop()
err := a.netAdapter.Stop()
if err != nil {
log.Errorf("Error stopping the net adapter: %+v", err)
}
err = a.addressManager.Stop()
if err != nil {
log.Errorf("Error stopping address manager: %s", err)
}
return
type kaspadApp struct {
cfg *config.Config
}
// New returns a new App instance configured to listen on addr for the
// kaspa network type specified by dagParams. Use start to begin accepting
// connections from peers.
func New(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, interrupt chan<- struct{}) (*App, error) {
indexManager, acceptanceIndex := setupIndexes(cfg)
// StartApp starts the kaspad app, and blocks until it finishes running
func StartApp() error {
execenv.Initialize(desiredLimits)
sigCache := txscript.NewSigCache(cfg.SigCacheMaxSize)
// Create a new block DAG instance with the appropriate configuration.
dag, err := setupDAG(cfg, databaseContext, sigCache, indexManager)
// Load configuration and parse command line. This function also
// initializes logging and configures it accordingly.
cfg, err := config.LoadConfig()
if err != nil {
return nil, err
fmt.Fprint(os.Stderr, err)
return err
}
defer panics.HandlePanic(log, "MAIN", nil)
txMempool := setupMempool(cfg, dag, sigCache)
app := &kaspadApp{cfg: cfg}
netAdapter, err := netadapter.NewNetAdapter(cfg)
if err != nil {
return nil, err
}
addressManager, err := addressmanager.New(cfg, databaseContext)
if err != nil {
return nil, err
}
connectionManager, err := connmanager.New(cfg, netAdapter, addressManager)
if err != nil {
return nil, err
}
protocolManager, err := protocol.NewManager(cfg, dag, netAdapter, addressManager, txMempool, connectionManager)
if err != nil {
return nil, err
}
rpcManager := setupRPC(cfg, txMempool, dag, sigCache, netAdapter, protocolManager, connectionManager, addressManager, acceptanceIndex, interrupt)
return &App{
cfg: cfg,
protocolManager: protocolManager,
rpcManager: rpcManager,
connectionManager: connectionManager,
netAdapter: netAdapter,
addressManager: addressManager,
}, nil
}
func setupRPC(
cfg *config.Config,
txMempool *mempool.TxPool,
dag *blockdag.BlockDAG,
sigCache *txscript.SigCache,
netAdapter *netadapter.NetAdapter,
protocolManager *protocol.Manager,
connectionManager *connmanager.ConnectionManager,
addressManager *addressmanager.AddressManager,
acceptanceIndex *indexers.AcceptanceIndex,
shutDownChan chan<- struct{},
) *rpc.Manager {
blockTemplateGenerator := mining.NewBlkTmplGenerator(&mining.Policy{BlockMaxMass: cfg.BlockMaxMass}, txMempool, dag, sigCache)
rpcManager := rpc.NewManager(cfg, netAdapter, dag, protocolManager, connectionManager, blockTemplateGenerator, txMempool, addressManager, acceptanceIndex, shutDownChan)
protocolManager.SetOnBlockAddedToDAGHandler(rpcManager.NotifyBlockAddedToDAG)
protocolManager.SetOnTransactionAddedToMempoolHandler(rpcManager.NotifyTransactionAddedToMempool)
dag.Subscribe(func(notification *blockdag.Notification) {
err := handleBlockDAGNotifications(notification, acceptanceIndex, rpcManager)
// Call serviceMain on Windows to handle running as a service. When
// the return isService flag is true, exit now since we ran as a
// service. Otherwise, just fall through to normal operation.
if runtime.GOOS == "windows" {
isService, err := winservice.WinServiceMain(app.main, serviceDescription, cfg)
if err != nil {
panic(err)
return err
}
})
return rpcManager
}
func handleBlockDAGNotifications(notification *blockdag.Notification,
acceptanceIndex *indexers.AcceptanceIndex, rpcManager *rpc.Manager) error {
switch notification.Type {
case blockdag.NTChainChanged:
if acceptanceIndex == nil {
if isService {
return nil
}
chainChangedNotificationData := notification.Data.(*blockdag.ChainChangedNotificationData)
err := rpcManager.NotifyChainChanged(chainChangedNotificationData.RemovedChainBlockHashes,
chainChangedNotificationData.AddedChainBlockHashes)
if err != nil {
return err
}
case blockdag.NTFinalityConflict:
finalityConflictNotificationData := notification.Data.(*blockdag.FinalityConflictNotificationData)
err := rpcManager.NotifyFinalityConflict(finalityConflictNotificationData.ViolatingBlockHash.String())
if err != nil {
return err
}
case blockdag.NTFinalityConflictResolved:
finalityConflictResolvedNotificationData := notification.Data.(*blockdag.FinalityConflictResolvedNotificationData)
err := rpcManager.NotifyFinalityConflictResolved(finalityConflictResolvedNotificationData.FinalityBlockHash.String())
}
return app.main(nil)
}
func (app *kaspadApp) main(startedChan chan<- struct{}) error {
// Get a channel that will be closed when a shutdown signal has been
// triggered either from an OS signal such as SIGINT (Ctrl+C) or from
// another subsystem such as the RPC server.
interrupt := signal.InterruptListener()
defer log.Info("Shutdown complete")
// Show version at startup.
log.Infof("Version %s", version.Version())
// Enable http profiling server if requested.
if app.cfg.Profile != "" {
profiling.Start(app.cfg.Profile, log)
}
// Perform upgrades to kaspad as new versions require it.
if err := doUpgrades(); err != nil {
log.Error(err)
return err
}
// Return now if an interrupt signal was triggered.
if signal.InterruptRequested(interrupt) {
return nil
}
if app.cfg.ResetDatabase {
err := removeDatabase(app.cfg)
if err != nil {
log.Error(err)
return err
}
}
// Open the database
databaseContext, err := openDB(app.cfg)
if err != nil {
log.Error(err)
return err
}
defer func() {
log.Infof("Gracefully shutting down the database...")
err := databaseContext.Close()
if err != nil {
log.Errorf("Failed to close the database: %s", err)
}
}()
// Return now if an interrupt signal was triggered.
if signal.InterruptRequested(interrupt) {
return nil
}
// Drop indexes and exit if requested.
if app.cfg.DropAcceptanceIndex {
if err := indexers.DropAcceptanceIndex(databaseContext); err != nil {
log.Errorf("%s", err)
return err
}
return nil
}
// Create componentManager and start it.
componentManager, err := NewComponentManager(app.cfg, databaseContext, interrupt)
if err != nil {
log.Errorf("Unable to start kaspad: %+v", err)
return err
}
defer func() {
log.Infof("Gracefully shutting down kaspad...")
shutdownDone := make(chan struct{})
go func() {
componentManager.Stop()
shutdownDone <- struct{}{}
}()
const shutdownTimeout = 2 * time.Minute
select {
case <-shutdownDone:
case <-time.After(shutdownTimeout):
log.Criticalf("Graceful shutdown timed out %s. Terminating...", shutdownTimeout)
}
log.Infof("Kaspad shutdown complete")
}()
componentManager.Start()
if startedChan != nil {
startedChan <- struct{}{}
}
// Wait until the interrupt signal is received from an OS signal or
// shutdown is requested through one of the subsystems such as the RPC
// server.
<-interrupt
return nil
}
func (a *App) maybeSeedFromDNS() {
if !a.cfg.DisableDNSSeed {
dnsseed.SeedFromDNS(a.cfg.NetParams(), a.cfg.DNSSeed, appmessage.SFNodeNetwork, false, nil,
a.cfg.Lookup, func(addresses []*appmessage.NetAddress) {
// Kaspad uses a lookup of the dns seeder here. Since seeder returns
// IPs of nodes and not its own IP, we can not know real IP of
// source. So we'll take first returned address as source.
a.addressManager.AddAddresses(addresses, addresses[0], nil)
})
}
if a.cfg.GRPCSeed != "" {
dnsseed.SeedFromGRPC(a.cfg.NetParams(), a.cfg.GRPCSeed, appmessage.SFNodeNetwork, false, nil,
func(addresses []*appmessage.NetAddress) {
a.addressManager.AddAddresses(addresses, addresses[0], nil)
})
}
}
func setupDAG(cfg *config.Config, databaseContext *dbaccess.DatabaseContext,
sigCache *txscript.SigCache, indexManager blockdag.IndexManager) (*blockdag.BlockDAG, error) {
dag, err := blockdag.New(&blockdag.Config{
DatabaseContext: databaseContext,
DAGParams: cfg.NetParams(),
TimeSource: blockdag.NewTimeSource(),
SigCache: sigCache,
IndexManager: indexManager,
SubnetworkID: cfg.SubnetworkID,
MaxUTXOCacheSize: cfg.MaxUTXOCacheSize,
})
return dag, err
// doUpgrades performs upgrades to kaspad as new versions require it.
// currently it's a placeholder we got from kaspad upstream, that does nothing
func doUpgrades() error {
return nil
}
func setupIndexes(cfg *config.Config) (blockdag.IndexManager, *indexers.AcceptanceIndex) {
// Create indexes if needed.
var indexes []indexers.Indexer
var acceptanceIndex *indexers.AcceptanceIndex
if cfg.AcceptanceIndex {
log.Info("acceptance index is enabled")
acceptanceIndex = indexers.NewAcceptanceIndex()
indexes = append(indexes, acceptanceIndex)
}
// Create an index manager if any of the optional indexes are enabled.
if len(indexes) < 0 {
return nil, nil
}
indexManager := indexers.NewManager(indexes)
return indexManager, acceptanceIndex
// dbPath returns the path to the block database given a database type.
func databasePath(cfg *config.Config) string {
return filepath.Join(cfg.DataDir, "db")
}
func setupMempool(cfg *config.Config, dag *blockdag.BlockDAG, sigCache *txscript.SigCache) *mempool.TxPool {
mempoolConfig := mempool.Config{
Policy: mempool.Policy{
AcceptNonStd: cfg.RelayNonStd,
MaxOrphanTxs: cfg.MaxOrphanTxs,
MaxOrphanTxSize: config.DefaultMaxOrphanTxSize,
MinRelayTxFee: cfg.MinRelayTxFee,
MaxTxVersion: 1,
},
CalcTxSequenceLockFromReferencedUTXOEntries: dag.CalcTxSequenceLockFromReferencedUTXOEntries,
SigCache: sigCache,
DAG: dag,
}
return mempool.New(&mempoolConfig)
func removeDatabase(cfg *config.Config) error {
dbPath := databasePath(cfg)
return os.RemoveAll(dbPath)
}
// P2PNodeID returns the network ID associated with this App
func (a *App) P2PNodeID() *id.ID {
return a.netAdapter.ID()
}
// AddressManager returns the AddressManager associated with this App
func (a *App) AddressManager() *addressmanager.AddressManager {
return a.addressManager
func openDB(cfg *config.Config) (*dbaccess.DatabaseContext, error) {
dbPath := databasePath(cfg)
log.Infof("Loading database from '%s'", dbPath)
return dbaccess.New(dbPath)
}

260
app/component_manager.go Normal file
View File

@ -0,0 +1,260 @@
package app
import (
"fmt"
"sync/atomic"
"github.com/kaspanet/kaspad/infrastructure/network/addressmanager"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/id"
"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol"
"github.com/kaspanet/kaspad/app/rpc"
"github.com/kaspanet/kaspad/domain/blockdag"
"github.com/kaspanet/kaspad/domain/blockdag/indexers"
"github.com/kaspanet/kaspad/domain/mempool"
"github.com/kaspanet/kaspad/domain/mining"
"github.com/kaspanet/kaspad/domain/txscript"
"github.com/kaspanet/kaspad/infrastructure/config"
"github.com/kaspanet/kaspad/infrastructure/db/dbaccess"
"github.com/kaspanet/kaspad/infrastructure/network/connmanager"
"github.com/kaspanet/kaspad/infrastructure/network/dnsseed"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter"
"github.com/kaspanet/kaspad/util/panics"
)
// ComponentManager is a wrapper for all the kaspad services
type ComponentManager struct {
cfg *config.Config
addressManager *addressmanager.AddressManager
protocolManager *protocol.Manager
rpcManager *rpc.Manager
connectionManager *connmanager.ConnectionManager
netAdapter *netadapter.NetAdapter
started, shutdown int32
}
// Start launches all the kaspad services.
func (a *ComponentManager) Start() {
// Already started?
if atomic.AddInt32(&a.started, 1) != 1 {
return
}
log.Trace("Starting kaspad")
err := a.netAdapter.Start()
if err != nil {
panics.Exit(log, fmt.Sprintf("Error starting the net adapter: %+v", err))
}
a.maybeSeedFromDNS()
a.connectionManager.Start()
}
// Stop gracefully shuts down all the kaspad services.
func (a *ComponentManager) Stop() {
// Make sure this only happens once.
if atomic.AddInt32(&a.shutdown, 1) != 1 {
log.Infof("Kaspad is already in the process of shutting down")
return
}
log.Warnf("Kaspad shutting down")
a.connectionManager.Stop()
err := a.netAdapter.Stop()
if err != nil {
log.Errorf("Error stopping the net adapter: %+v", err)
}
err = a.addressManager.Stop()
if err != nil {
log.Errorf("Error stopping address manager: %s", err)
}
return
}
// NewComponentManager returns a new ComponentManager instance.
// Use Start() to begin all services within this ComponentManager
func NewComponentManager(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, interrupt chan<- struct{}) (*ComponentManager, error) {
indexManager, acceptanceIndex := setupIndexes(cfg)
sigCache := txscript.NewSigCache(cfg.SigCacheMaxSize)
// Create a new block DAG instance with the appropriate configuration.
dag, err := setupDAG(cfg, databaseContext, sigCache, indexManager)
if err != nil {
return nil, err
}
txMempool := setupMempool(cfg, dag, sigCache)
netAdapter, err := netadapter.NewNetAdapter(cfg)
if err != nil {
return nil, err
}
addressManager, err := addressmanager.New(cfg, databaseContext)
if err != nil {
return nil, err
}
connectionManager, err := connmanager.New(cfg, netAdapter, addressManager)
if err != nil {
return nil, err
}
protocolManager, err := protocol.NewManager(cfg, dag, netAdapter, addressManager, txMempool, connectionManager)
if err != nil {
return nil, err
}
rpcManager := setupRPC(cfg, txMempool, dag, sigCache, netAdapter, protocolManager, connectionManager, addressManager, acceptanceIndex, interrupt)
return &ComponentManager{
cfg: cfg,
protocolManager: protocolManager,
rpcManager: rpcManager,
connectionManager: connectionManager,
netAdapter: netAdapter,
addressManager: addressManager,
}, nil
}
func setupRPC(
cfg *config.Config,
txMempool *mempool.TxPool,
dag *blockdag.BlockDAG,
sigCache *txscript.SigCache,
netAdapter *netadapter.NetAdapter,
protocolManager *protocol.Manager,
connectionManager *connmanager.ConnectionManager,
addressManager *addressmanager.AddressManager,
acceptanceIndex *indexers.AcceptanceIndex,
shutDownChan chan<- struct{},
) *rpc.Manager {
blockTemplateGenerator := mining.NewBlkTmplGenerator(&mining.Policy{BlockMaxMass: cfg.BlockMaxMass}, txMempool, dag, sigCache)
rpcManager := rpc.NewManager(cfg, netAdapter, dag, protocolManager, connectionManager, blockTemplateGenerator, txMempool, addressManager, acceptanceIndex, shutDownChan)
protocolManager.SetOnBlockAddedToDAGHandler(rpcManager.NotifyBlockAddedToDAG)
protocolManager.SetOnTransactionAddedToMempoolHandler(rpcManager.NotifyTransactionAddedToMempool)
dag.Subscribe(func(notification *blockdag.Notification) {
err := handleBlockDAGNotifications(notification, acceptanceIndex, rpcManager)
if err != nil {
panic(err)
}
})
return rpcManager
}
func handleBlockDAGNotifications(notification *blockdag.Notification,
acceptanceIndex *indexers.AcceptanceIndex, rpcManager *rpc.Manager) error {
switch notification.Type {
case blockdag.NTChainChanged:
if acceptanceIndex == nil {
return nil
}
chainChangedNotificationData := notification.Data.(*blockdag.ChainChangedNotificationData)
err := rpcManager.NotifyChainChanged(chainChangedNotificationData.RemovedChainBlockHashes,
chainChangedNotificationData.AddedChainBlockHashes)
if err != nil {
return err
}
case blockdag.NTFinalityConflict:
finalityConflictNotificationData := notification.Data.(*blockdag.FinalityConflictNotificationData)
err := rpcManager.NotifyFinalityConflict(finalityConflictNotificationData.ViolatingBlockHash.String())
if err != nil {
return err
}
case blockdag.NTFinalityConflictResolved:
finalityConflictResolvedNotificationData := notification.Data.(*blockdag.FinalityConflictResolvedNotificationData)
err := rpcManager.NotifyFinalityConflictResolved(finalityConflictResolvedNotificationData.FinalityBlockHash.String())
if err != nil {
return err
}
}
return nil
}
func (a *ComponentManager) maybeSeedFromDNS() {
if !a.cfg.DisableDNSSeed {
dnsseed.SeedFromDNS(a.cfg.NetParams(), a.cfg.DNSSeed, appmessage.SFNodeNetwork, false, nil,
a.cfg.Lookup, func(addresses []*appmessage.NetAddress) {
// Kaspad uses a lookup of the dns seeder here. Since seeder returns
// IPs of nodes and not its own IP, we can not know real IP of
// source. So we'll take first returned address as source.
a.addressManager.AddAddresses(addresses, addresses[0], nil)
})
}
if a.cfg.GRPCSeed != "" {
dnsseed.SeedFromGRPC(a.cfg.NetParams(), a.cfg.GRPCSeed, appmessage.SFNodeNetwork, false, nil,
func(addresses []*appmessage.NetAddress) {
a.addressManager.AddAddresses(addresses, addresses[0], nil)
})
}
}
func setupDAG(cfg *config.Config, databaseContext *dbaccess.DatabaseContext,
sigCache *txscript.SigCache, indexManager blockdag.IndexManager) (*blockdag.BlockDAG, error) {
dag, err := blockdag.New(&blockdag.Config{
DatabaseContext: databaseContext,
DAGParams: cfg.NetParams(),
TimeSource: blockdag.NewTimeSource(),
SigCache: sigCache,
IndexManager: indexManager,
SubnetworkID: cfg.SubnetworkID,
MaxUTXOCacheSize: cfg.MaxUTXOCacheSize,
})
return dag, err
}
func setupIndexes(cfg *config.Config) (blockdag.IndexManager, *indexers.AcceptanceIndex) {
// Create indexes if needed.
var indexes []indexers.Indexer
var acceptanceIndex *indexers.AcceptanceIndex
if cfg.AcceptanceIndex {
log.Info("acceptance index is enabled")
acceptanceIndex = indexers.NewAcceptanceIndex()
indexes = append(indexes, acceptanceIndex)
}
// Create an index manager if any of the optional indexes are enabled.
if len(indexes) < 0 {
return nil, nil
}
indexManager := indexers.NewManager(indexes)
return indexManager, acceptanceIndex
}
func setupMempool(cfg *config.Config, dag *blockdag.BlockDAG, sigCache *txscript.SigCache) *mempool.TxPool {
mempoolConfig := mempool.Config{
Policy: mempool.Policy{
AcceptNonStd: cfg.RelayNonStd,
MaxOrphanTxs: cfg.MaxOrphanTxs,
MaxOrphanTxSize: config.DefaultMaxOrphanTxSize,
MinRelayTxFee: cfg.MinRelayTxFee,
MaxTxVersion: 1,
},
CalcTxSequenceLockFromReferencedUTXOEntries: dag.CalcTxSequenceLockFromReferencedUTXOEntries,
SigCache: sigCache,
DAG: dag,
}
return mempool.New(&mempoolConfig)
}
// P2PNodeID returns the network ID associated with this ComponentManager
func (a *ComponentManager) P2PNodeID() *id.ID {
return a.netAdapter.ID()
}
// AddressManager returns the AddressManager associated with this ComponentManager
func (a *ComponentManager) AddressManager() *addressmanager.AddressManager {
return a.addressManager
}

View File

@ -5,10 +5,11 @@
package main
import (
"github.com/kaspanet/kaspad/infrastructure/logger"
"os"
"runtime"
"github.com/kaspanet/kaspad/infrastructure/logger"
"github.com/kaspanet/kaspad/infrastructure/os/limits"
"github.com/kaspanet/kaspad/util/panics"
)
@ -77,7 +78,7 @@ func realMain() error {
func main() {
// Use all processor cores and up some limits.
runtime.GOMAXPROCS(runtime.NumCPU())
if err := limits.SetLimits(); err != nil {
if err := limits.SetLimits(nil); err != nil {
os.Exit(1)
}

View File

@ -107,7 +107,6 @@ type Flags struct {
ProxyPass string `long:"proxypass" default-mask:"-" description:"Password for proxy server"`
DbType string `long:"dbtype" description:"Database backend to use for the Block DAG"`
Profile string `long:"profile" description:"Enable HTTP profiling on given port -- NOTE port must be between 1024 and 65536"`
CPUProfile string `long:"cpuprofile" description:"Write CPU profile to the specified file"`
DebugLevel string `short:"d" long:"debuglevel" description:"Logging level for all subsystems {trace, debug, info, warn, error, critical} -- You may also specify <subsystem>=<level>,<subsystem2>=<level>,... to set the log level for individual subsystems -- Use show to list available subsystems"`
Upnp bool `long:"upnp" description:"Use UPnP to map our listening port outside of NAT"`
MinRelayTxFee float64 `long:"minrelaytxfee" description:"The minimum transaction fee in KAS/kB to be considered a non-zero fee."`
@ -124,6 +123,7 @@ type Flags struct {
ResetDatabase bool `long:"reset-db" description:"Reset database before starting node. It's needed when switching between subnetworks."`
MaxUTXOCacheSize uint64 `long:"maxutxocachesize" description:"Max size of loaded UTXO into ram from the disk in bytes"`
NetworkFlags
ServiceOptions *ServiceOptions
}
// Config defines the configuration options for kaspad.
@ -139,9 +139,9 @@ type Config struct {
SubnetworkID *subnetworkid.SubnetworkID // nil in full nodes
}
// serviceOptions defines the configuration options for the daemon as a service on
// ServiceOptions defines the configuration options for the daemon as a service on
// Windows.
type serviceOptions struct {
type ServiceOptions struct {
ServiceCommand string `short:"s" long:"service" description:"Service command {install, remove, start, stop}"`
}
@ -160,10 +160,10 @@ func cleanAndExpandPath(path string) string {
}
// newConfigParser returns a new command line flags parser.
func newConfigParser(cfgFlags *Flags, so *serviceOptions, options flags.Options) *flags.Parser {
func newConfigParser(cfgFlags *Flags, options flags.Options) *flags.Parser {
parser := flags.NewParser(cfgFlags, options)
if runtime.GOOS == "windows" {
parser.AddGroup("Service Options", "Service Options", so)
parser.AddGroup("Service Options", "Service Options", cfgFlags.ServiceOptions)
}
return parser
}
@ -189,6 +189,7 @@ func defaultFlags() *Flags {
MinRelayTxFee: defaultMinRelayTxFee,
AcceptanceIndex: defaultAcceptanceIndex,
MaxUTXOCacheSize: defaultMaxUTXOCacheSize,
ServiceOptions: &ServiceOptions{},
}
}
@ -211,24 +212,20 @@ func DefaultConfig() *Config {
// The above results in kaspad functioning properly without any config settings
// while still allowing the user to override settings with config files and
// command line options. Command line options always take precedence.
func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
func LoadConfig() (*Config, error) {
cfgFlags := defaultFlags()
// Service options which are only added on Windows.
serviceOpts := serviceOptions{}
// Pre-parse the command line options to see if an alternative config
// file or the version flag was specified. Any errors aside from the
// help message error can be ignored here since they will be caught by
// the final parse below.
preCfg := cfgFlags
preParser := newConfigParser(preCfg, &serviceOpts, flags.HelpFlag)
_, err = preParser.Parse()
preParser := newConfigParser(preCfg, flags.HelpFlag)
_, err := preParser.Parse()
if err != nil {
var flagsErr *flags.Error
if ok := errors.As(err, &flagsErr); ok && flagsErr.Type == flags.ErrHelp {
fmt.Fprintln(os.Stderr, err)
return nil, nil, err
return nil, err
}
}
@ -242,21 +239,10 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
os.Exit(0)
}
// Perform service command and exit if specified. Invalid service
// commands show an appropriate error. Only runs on Windows since
// the RunServiceCommand function will be nil when not on Windows.
if serviceOpts.ServiceCommand != "" && RunServiceCommand != nil {
err := RunServiceCommand(serviceOpts.ServiceCommand)
if err != nil {
fmt.Fprintln(os.Stderr, err)
}
os.Exit(0)
}
// Load additional config from file.
var configFileError error
parser := newConfigParser(cfgFlags, &serviceOpts, flags.Default)
cfg = &Config{
parser := newConfigParser(cfgFlags, flags.Default)
cfg := &Config{
Flags: cfgFlags,
}
if !preCfg.Simnet || preCfg.ConfigFile !=
@ -265,31 +251,27 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
if _, err := os.Stat(preCfg.ConfigFile); os.IsNotExist(err) {
err := createDefaultConfigFile(preCfg.ConfigFile)
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating a "+
"default config file: %s\n", err)
return nil, errors.Wrap(err, "Error creating a default config file")
}
}
err := flags.NewIniParser(parser).ParseFile(preCfg.ConfigFile)
if err != nil {
if pErr := &(os.PathError{}); !errors.As(err, &pErr) {
fmt.Fprintf(os.Stderr, "Error parsing config "+
"file: %s\n", err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
return nil, errors.Wrapf(err, "Error parsing config file: %s\n\n%s", err, usageMessage)
}
configFileError = err
}
}
// Parse command line options again to ensure they take precedence.
remainingArgs, err = parser.Parse()
_, err = parser.Parse()
if err != nil {
var flagsErr *flags.Error
if ok := errors.As(err, &flagsErr); !ok || flagsErr.Type != flags.ErrHelp {
fmt.Fprintln(os.Stderr, usageMessage)
return nil, errors.Wrapf(err, "Error parsing command line arguments: %s\n\n%s", err, usageMessage)
}
return nil, nil, err
return nil, err
}
// Create the home directory if it doesn't already exist.
@ -309,13 +291,12 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
str := "%s: Failed to create home directory: %s"
err := errors.Errorf(str, funcName, err)
fmt.Fprintln(os.Stderr, err)
return nil, nil, err
return nil, err
}
err = cfg.ResolveNetwork(parser)
if err != nil {
return nil, nil, err
return nil, err
}
// Set the default policy for relaying non-standard transactions
@ -330,7 +311,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
return nil, err
case cfg.RejectNonStd:
relayNonStd = false
case cfg.RelayNonStd:
@ -367,7 +348,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf("%s: %s", funcName, err.Error())
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
return nil, err
}
// Validate profile port number
@ -378,7 +359,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
return nil, err
}
}
@ -388,7 +369,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName, cfg.BanDuration)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
return nil, err
}
// Validate any given whitelisted IP addresses and networks.
@ -405,7 +386,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err = errors.Errorf(str, funcName, addr)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
return nil, err
}
var bits int
if ip.To4() == nil {
@ -430,7 +411,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
return nil, err
}
// --proxy or --connect without --listen disables listening.
@ -473,7 +454,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName, cfg.RPCMaxConcurrentReqs)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
return nil, err
}
// Validate the the minrelaytxfee.
@ -483,7 +464,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName, err)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
return nil, err
}
// Disallow 0 and negative min tx fees.
@ -492,7 +473,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName, cfg.MinRelayTxFee)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
return nil, err
}
// Limit the max block mass to a sane value.
@ -505,7 +486,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
blockMaxMassMax, cfg.BlockMaxMass)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
return nil, err
}
// Limit the max orphan count to a sane value.
@ -515,7 +496,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName, cfg.MaxOrphanTxs)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
return nil, err
}
// Look for illegal characters in the user agent comments.
@ -526,7 +507,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
funcName)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
return nil, err
}
}
@ -537,7 +518,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
funcName)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
return nil, err
}
// Add default port to all listener addresses if needed and remove
@ -545,7 +526,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
cfg.Listeners, err = network.NormalizeAddresses(cfg.Listeners,
cfg.NetParams().DefaultPort)
if err != nil {
return nil, nil, err
return nil, err
}
// Add default port to all rpc listener addresses if needed and remove
@ -553,7 +534,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
cfg.RPCListeners, err = network.NormalizeAddresses(cfg.RPCListeners,
cfg.NetParams().RPCPort)
if err != nil {
return nil, nil, err
return nil, err
}
// Disallow --addpeer and --connect used together
@ -562,7 +543,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
return nil, err
}
// Add default port to all added peer addresses if needed and remove
@ -570,13 +551,13 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
cfg.AddPeers, err = network.NormalizeAddresses(cfg.AddPeers,
cfg.NetParams().DefaultPort)
if err != nil {
return nil, nil, err
return nil, err
}
cfg.ConnectPeers, err = network.NormalizeAddresses(cfg.ConnectPeers,
cfg.NetParams().DefaultPort)
if err != nil {
return nil, nil, err
return nil, err
}
// Setup dial and DNS resolution (lookup) functions depending on the
@ -593,7 +574,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
err := errors.Errorf(str, funcName, cfg.Proxy, err)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
return nil, err
}
proxy := &socks.Proxy{
@ -611,7 +592,7 @@ func LoadConfig() (cfg *Config, remainingArgs []string, err error) {
log.Warnf("%s", configFileError)
}
return cfg, remainingArgs, nil
return cfg, nil
}
// createDefaultConfig copies the file sample-kaspad.conf to the given destination path,

View File

@ -53,6 +53,7 @@ var (
dnssLog = BackendLog.Logger("DNSS")
snvrLog = BackendLog.Logger("SNVR")
ibdsLog = BackendLog.Logger("IBDS")
wsvcLog = BackendLog.Logger("WSVC")
)
// SubsystemTags is an enum of all sub system tags
@ -83,7 +84,8 @@ var SubsystemTags = struct {
NTAR,
DNSS,
SNVR,
IBDS string
IBDS,
WSVC string
}{
ADXR: "ADXR",
AMGR: "AMGR",
@ -112,6 +114,7 @@ var SubsystemTags = struct {
DNSS: "DNSS",
SNVR: "SNVR",
IBDS: "IBDS",
WSVC: "WSVC",
}
// subsystemLoggers maps each subsystem identifier to its associated logger.
@ -143,6 +146,7 @@ var subsystemLoggers = map[string]*Logger{
SubsystemTags.DNSS: dnssLog,
SubsystemTags.SNVR: snvrLog,
SubsystemTags.IBDS: ibdsLog,
SubsystemTags.WSVC: wsvcLog,
}
// InitLog attaches log file and error log file to the backend log.

View File

@ -0,0 +1,22 @@
package execenv
import (
"fmt"
"os"
"runtime"
"github.com/kaspanet/kaspad/infrastructure/os/limits"
)
// Initialize initializes the execution environment required to run kaspad
func Initialize(desiredLimits *limits.DesiredLimits) {
// Use all processor cores.
runtime.GOMAXPROCS(runtime.NumCPU())
// Up some limits.
if err := limits.SetLimits(desiredLimits); err != nil {
fmt.Fprintf(os.Stderr, "failed to set limits: %s\n", err)
os.Exit(1)
}
}

View File

@ -0,0 +1,7 @@
package limits
// DesiredLimits is a structure that specifies the limits desired by a running application
type DesiredLimits struct {
FileLimitWant uint64
FileLimitMin uint64
}

View File

@ -5,6 +5,6 @@
package limits
// SetLimits is a no-op on Plan 9 due to the lack of process accounting.
func SetLimits() error {
func SetLimits(*DesiredLimits) error {
return nil
}

View File

@ -7,41 +7,39 @@
package limits
import (
"github.com/pkg/errors"
"syscall"
"github.com/pkg/errors"
)
const (
fileLimitWant = 2048
fileLimitMin = 1024
)
const ()
// SetLimits raises some process limits to values which allow kaspad and
// associated utilities to run.
func SetLimits() error {
func SetLimits(desiredLimits *DesiredLimits) error {
var rLimit syscall.Rlimit
err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit)
if err != nil {
return err
}
if rLimit.Cur > fileLimitWant {
if rLimit.Cur > desiredLimits.FileLimitWant {
return nil
}
if rLimit.Max < fileLimitMin {
if rLimit.Max < desiredLimits.FileLimitMin {
err = errors.Errorf("need at least %d file descriptors",
fileLimitMin)
desiredLimits.FileLimitMin)
return err
}
if rLimit.Max < fileLimitWant {
if rLimit.Max < desiredLimits.FileLimitWant {
rLimit.Cur = rLimit.Max
} else {
rLimit.Cur = fileLimitWant
rLimit.Cur = desiredLimits.FileLimitWant
}
err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit)
if err != nil {
// try min value
rLimit.Cur = fileLimitMin
rLimit.Cur = desiredLimits.FileLimitMin
err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit)
if err != nil {
return err

View File

@ -5,6 +5,6 @@
package limits
// SetLimits is a no-op on Windows since it's not required there.
func SetLimits() error {
func SetLimits(*DesiredLimits) error {
return nil
}

View File

@ -0,0 +1,17 @@
package winservice
import "github.com/kaspanet/kaspad/infrastructure/config"
// ServiceDescription contains information about a service, needed to administer it
type ServiceDescription struct {
Name string
DisplayName string
Description string
}
// MainFunc specifies the signature of an application's main function to be able to run as a windows service
type MainFunc func(startedChan chan<- struct{}) error
// WinServiceMain is only invoked on Windows. It detects when kaspad is running
// as a service and reacts accordingly.
var WinServiceMain = func(MainFunc, *ServiceDescription, *config.Config) (bool, error) { return false, nil }

View File

@ -0,0 +1,13 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package winservice
import (
"github.com/kaspanet/kaspad/infrastructure/logger"
"github.com/kaspanet/kaspad/util/panics"
)
var log, _ = logger.Get(logger.SubsystemTags.CNFG)
var spawn = panics.GoroutineWrapperFunc(log)

View File

@ -0,0 +1,178 @@
package winservice
import (
"os"
"path/filepath"
"time"
"github.com/btcsuite/winsvc/eventlog"
"github.com/btcsuite/winsvc/mgr"
"github.com/btcsuite/winsvc/svc"
"github.com/pkg/errors"
)
// performServiceCommand attempts to run one of the supported service commands
// provided on the command line via the service command flag. An appropriate
// error is returned if an invalid command is specified.
func (s *Service) performServiceCommand() error {
var err error
command := s.cfg.ServiceOptions.ServiceCommand
switch command {
case "install":
err = s.installService()
case "remove":
err = s.removeService()
case "start":
err = s.startService()
case "stop":
err = s.controlService(svc.Stop, svc.Stopped)
default:
err = errors.Errorf("invalid service command [%s]", command)
}
return err
}
// installService attempts to install the kaspad service. Typically this should
// be done by the msi installer, but it is provided here since it can be useful
// for development.
func (s *Service) installService() error {
// Get the path of the current executable. This is needed because
// os.Args[0] can vary depending on how the application was launched.
// For example, under cmd.exe it will only be the name of the app
// without the path or extension, but under mingw it will be the full
// path including the extension.
exePath, err := filepath.Abs(os.Args[0])
if err != nil {
return err
}
if filepath.Ext(exePath) == "" {
exePath += ".exe"
}
// Connect to the windows service manager.
serviceManager, err := mgr.Connect()
if err != nil {
return err
}
defer serviceManager.Disconnect()
// Ensure the service doesn't already exist.
service, err := serviceManager.OpenService(s.description.Name)
if err == nil {
service.Close()
return errors.Errorf("service %s already exists", s.description.Name)
}
// Install the service.
service, err = serviceManager.CreateService(s.description.Name, exePath, mgr.Config{
DisplayName: s.description.DisplayName,
Description: s.description.Description,
})
if err != nil {
return err
}
defer service.Close()
// Support events to the event log using the standard "standard" Windows
// EventCreate.exe message file. This allows easy logging of custom
// messges instead of needing to create our own message catalog.
err = eventlog.Remove(s.description.Name)
if err != nil {
return err
}
eventsSupported := uint32(eventlog.Error | eventlog.Warning | eventlog.Info)
return eventlog.InstallAsEventCreate(s.description.Name, eventsSupported)
}
// removeService attempts to uninstall the kaspad service. Typically this should
// be done by the msi uninstaller, but it is provided here since it can be
// useful for development. Not the eventlog entry is intentionally not removed
// since it would invalidate any existing event log messages.
func (s *Service) removeService() error {
// Connect to the windows service manager.
serviceManager, err := mgr.Connect()
if err != nil {
return err
}
defer serviceManager.Disconnect()
// Ensure the service exists.
service, err := serviceManager.OpenService(s.description.Name)
if err != nil {
return errors.Errorf("service %s is not installed", s.description.Name)
}
defer service.Close()
// Remove the service.
return service.Delete()
}
// startService attempts to Start the kaspad service.
func (s *Service) startService() error {
// Connect to the windows service manager.
serviceManager, err := mgr.Connect()
if err != nil {
return err
}
defer serviceManager.Disconnect()
service, err := serviceManager.OpenService(s.description.Name)
if err != nil {
return errors.Errorf("could not access service: %s", err)
}
defer service.Close()
err = service.Start(os.Args)
if err != nil {
return errors.Errorf("could not start service: %s", err)
}
return nil
}
// controlService allows commands which change the status of the service. It
// also waits for up to 10 seconds for the service to change to the passed
// state.
func (s *Service) controlService(c svc.Cmd, to svc.State) error {
// Connect to the windows service manager.
serviceManager, err := mgr.Connect()
if err != nil {
return err
}
defer serviceManager.Disconnect()
service, err := serviceManager.OpenService(s.description.Name)
if err != nil {
return errors.Errorf("could not access service: %s", err)
}
defer service.Close()
status, err := service.Control(c)
if err != nil {
return errors.Errorf("could not send control=%d: %s", c, err)
}
// Send the control message.
timeout := time.Now().Add(10 * time.Second)
for status.State != to {
if timeout.Before(time.Now()) {
return errors.Errorf("timeout waiting for service to go "+
"to state=%d", to)
}
time.Sleep(300 * time.Millisecond)
status, err = service.Query()
if err != nil {
return errors.Errorf("could not retrieve service "+
"status: %s", err)
}
}
return nil
}

View File

@ -0,0 +1,40 @@
package winservice
import (
"github.com/btcsuite/winsvc/svc"
"github.com/kaspanet/kaspad/infrastructure/config"
)
// serviceMain checks whether we're being invoked as a service, and if so uses
// the service control manager to start the long-running server. A flag is
// returned to the caller so the application can determine whether to exit (when
// running as a service) or launch in normal interactive mode.
func serviceMain(main MainFunc, description *ServiceDescription, cfg *config.Config) (bool, error) {
service := newService(main, description, cfg)
if cfg.ServiceOptions.ServiceCommand != "" {
return true, service.performServiceCommand()
}
// Don't run as a service if we're running interactively (or that can't
// be determined due to an error).
isInteractive, err := svc.IsAnInteractiveSession()
if err != nil {
return false, err
}
if isInteractive {
return false, nil
}
err = service.Start()
if err != nil {
return true, err
}
return true, nil
}
// Set windows specific functions to real functions.
func init() {
WinServiceMain = serviceMain
}

View File

@ -0,0 +1,119 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package winservice
import (
"fmt"
"github.com/btcsuite/winsvc/eventlog"
"github.com/btcsuite/winsvc/svc"
"github.com/kaspanet/kaspad/infrastructure/config"
"github.com/kaspanet/kaspad/infrastructure/os/signal"
"github.com/kaspanet/kaspad/version"
)
// Service houses the main service handler which handles all service
// updates and launching the application's main.
type Service struct {
main MainFunc
description *ServiceDescription
cfg *config.Config
eventLog *eventlog.Log
}
func newService(main MainFunc, description *ServiceDescription, cfg *config.Config) *Service {
return &Service{
main: main,
description: description,
cfg: cfg,
}
}
// Start starts the srevice
func (s *Service) Start() error {
elog, err := eventlog.Open(s.description.Name)
if err != nil {
return err
}
s.eventLog = elog
defer s.eventLog.Close()
err = svc.Run(s.description.Name, &Service{})
if err != nil {
s.eventLog.Error(1, fmt.Sprintf("Service start failed: %s", err))
return err
}
return nil
}
// Execute is the main entry point the winsvc package calls when receiving
// information from the Windows service control manager. It launches the
// long-running kaspadMain (which is the real meat of kaspad), handles service
// change requests, and notifies the service control manager of changes.
func (s *Service) Execute(args []string, r <-chan svc.ChangeRequest, changes chan<- svc.Status) (bool, uint32) {
// Service start is pending.
const cmdsAccepted = svc.AcceptStop | svc.AcceptShutdown
changes <- svc.Status{State: svc.StartPending}
// Start kaspadMain in a separate goroutine so the service can start
// quickly. Shutdown (along with a potential error) is reported via
// doneChan. startedChan is notified once kaspad is started so this can
// be properly logged
doneChan := make(chan error)
startedChan := make(chan struct{})
spawn("kaspadMain-windows", func() {
err := s.main(startedChan)
doneChan <- err
})
// Service is now started.
changes <- svc.Status{State: svc.Running, Accepts: cmdsAccepted}
loop:
for {
select {
case c := <-r:
switch c.Cmd {
case svc.Interrogate:
changes <- c.CurrentStatus
case svc.Stop, svc.Shutdown:
// Service stop is pending. Don't accept any
// more commands while pending.
changes <- svc.Status{State: svc.StopPending}
// Signal the main function to exit.
signal.ShutdownRequestChannel <- struct{}{}
default:
s.eventLog.Error(1, fmt.Sprintf("Unexpected control "+
"request #%d.", c))
}
case <-startedChan:
s.logServiceStart()
case err := <-doneChan:
if err != nil {
s.eventLog.Error(1, err.Error())
}
break loop
}
}
// Service is now stopped.
changes <- svc.Status{State: svc.Stopped}
return false, 0
}
// logServiceStart logs information about kaspad when the main server has
// been started to the Windows event log.
func (s *Service) logServiceStart() {
var message string
message += fmt.Sprintf("%s version %s\n", s.description.DisplayName, version.Version())
message += fmt.Sprintf("Configuration file: %s\n", s.cfg.ConfigFile)
message += fmt.Sprintf("Data directory: %s\n", s.cfg.DataDir)
message += fmt.Sprintf("Logs directory: %s\n", s.cfg.LogDir)
}

2
log.go
View File

@ -7,8 +7,6 @@ package main
import (
"github.com/kaspanet/kaspad/infrastructure/logger"
"github.com/kaspanet/kaspad/util/panics"
)
var log, _ = logger.Get(logger.SubsystemTags.KASD)
var spawn = panics.GoroutineWrapperFunc(log)

200
main.go
View File

@ -5,212 +5,14 @@
package main
import (
"fmt"
_ "net/http/pprof"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"time"
"github.com/kaspanet/kaspad/app"
"github.com/kaspanet/kaspad/infrastructure/db/dbaccess"
"github.com/kaspanet/kaspad/domain/blockdag/indexers"
"github.com/kaspanet/kaspad/infrastructure/config"
"github.com/kaspanet/kaspad/infrastructure/os/limits"
"github.com/kaspanet/kaspad/infrastructure/os/signal"
"github.com/kaspanet/kaspad/util/panics"
"github.com/kaspanet/kaspad/util/profiling"
"github.com/kaspanet/kaspad/version"
)
const (
// blockDbNamePrefix is the prefix for the block database name. The
// database type is appended to this value to form the full block
// database name.
blockDbNamePrefix = "blocks"
)
// winServiceMain is only invoked on Windows. It detects when kaspad is running
// as a service and reacts accordingly.
var winServiceMain func() (bool, error)
// kaspadMain is the real main function for kaspad. It is necessary to work
// around the fact that deferred functions do not run when os.Exit() is called.
// The optional startedChan writes once all services has started.
func kaspadMain(startedChan chan<- struct{}) error {
interrupt := signal.InterruptListener()
// Load configuration and parse command line. This function also
// initializes logging and configures it accordingly.
cfg, _, err := config.LoadConfig()
if err != nil {
return err
}
defer panics.HandlePanic(log, "MAIN", nil)
// Get a channel that will be closed when a shutdown signal has been
// triggered either from an OS signal such as SIGINT (Ctrl+C) or from
// another subsystem such as the RPC server.
defer log.Info("Shutdown complete")
// Show version at startup.
log.Infof("Version %s", version.Version())
// Enable http profiling server if requested.
if cfg.Profile != "" {
profiling.Start(cfg.Profile, log)
}
// Write cpu profile if requested.
if cfg.CPUProfile != "" {
f, err := os.Create(cfg.CPUProfile)
if err != nil {
log.Errorf("Unable to create cpu profile: %s", err)
return err
}
pprof.StartCPUProfile(f)
defer f.Close()
defer pprof.StopCPUProfile()
}
// Perform upgrades to kaspad as new versions require it.
if err := doUpgrades(); err != nil {
log.Errorf("%s", err)
return err
}
// Return now if an interrupt signal was triggered.
if signal.InterruptRequested(interrupt) {
return nil
}
if cfg.ResetDatabase {
err := removeDatabase(cfg)
if err != nil {
log.Errorf("%s", err)
return err
}
}
// Open the database
databaseContext, err := openDB(cfg)
if err != nil {
log.Errorf("%s", err)
return err
}
defer func() {
log.Infof("Gracefully shutting down the database...")
err := databaseContext.Close()
if err != nil {
log.Errorf("Failed to close the database: %s", err)
}
}()
// Return now if an interrupt signal was triggered.
if signal.InterruptRequested(interrupt) {
return nil
}
// Drop indexes and exit if requested.
if cfg.DropAcceptanceIndex {
if err := indexers.DropAcceptanceIndex(databaseContext); err != nil {
log.Errorf("%s", err)
return err
}
return nil
}
// Create app and start it.
app, err := app.New(cfg, databaseContext, interrupt)
if err != nil {
log.Errorf("Unable to start kaspad: %+v", err)
return err
}
defer func() {
log.Infof("Gracefully shutting down kaspad...")
shutdownDone := make(chan struct{})
go func() {
app.Stop()
shutdownDone <- struct{}{}
}()
const shutdownTimeout = 2 * time.Minute
select {
case <-shutdownDone:
case <-time.After(shutdownTimeout):
log.Criticalf("Graceful shutdown timed out %s. Terminating...", shutdownTimeout)
}
log.Infof("Kaspad shutdown complete")
}()
app.Start()
if startedChan != nil {
startedChan <- struct{}{}
}
// Wait until the interrupt signal is received from an OS signal or
// shutdown is requested through one of the subsystems such as the RPC
// server.
<-interrupt
return nil
}
func removeDatabase(cfg *config.Config) error {
dbPath := blockDbPath(cfg)
return os.RemoveAll(dbPath)
}
// dbPath returns the path to the block database given a database type.
func blockDbPath(cfg *config.Config) string {
// The database name is based on the database type.
dbName := blockDbNamePrefix + "_" + cfg.DbType
dbPath := filepath.Join(cfg.DataDir, dbName)
return dbPath
}
func openDB(cfg *config.Config) (*dbaccess.DatabaseContext, error) {
dbPath := filepath.Join(cfg.DataDir, "db")
log.Infof("Loading database from '%s'", dbPath)
return dbaccess.New(dbPath)
}
func main() {
// Use all processor cores.
runtime.GOMAXPROCS(runtime.NumCPU())
// Up some limits.
if err := limits.SetLimits(); err != nil {
fmt.Fprintf(os.Stderr, "failed to set limits: %s\n", err)
os.Exit(1)
}
// Call serviceMain on Windows to handle running as a service. When
// the return isService flag is true, exit now since we ran as a
// service. Otherwise, just fall through to normal operation.
if runtime.GOOS == "windows" {
isService, err := winServiceMain()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if isService {
os.Exit(0)
}
}
// Work around defer not working after os.Exit()
if err := kaspadMain(nil); err != nil {
if err := app.StartApp(); err != nil {
os.Exit(1)
}
}
// doUpgrades performs upgrades to kaspad as new versions require it.
// currently it's a placeholder we got from kaspad upstream, that does nothing
func doUpgrades() error {
return nil
}

View File

@ -1,309 +0,0 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"fmt"
"os"
"path/filepath"
"time"
"github.com/pkg/errors"
"github.com/btcsuite/winsvc/eventlog"
"github.com/btcsuite/winsvc/mgr"
"github.com/btcsuite/winsvc/svc"
"github.com/kaspanet/kaspad/infrastructure/config"
"github.com/kaspanet/kaspad/infrastructure/os/signal"
"github.com/kaspanet/kaspad/version"
)
const (
// svcName is the name of kaspad service.
svcName = "kaspadsvc"
// svcDisplayName is the service name that will be shown in the windows
// services list. Not the svcName is the "real" name which is used
// to control the service. This is only for display purposes.
svcDisplayName = "Kaspad Service"
// svcDesc is the description of the service.
svcDesc = "Downloads and stays synchronized with the Kaspa block " +
"DAG and provides DAG services to applications."
)
// elog is used to send messages to the Windows event log.
var elog *eventlog.Log
// logServiceStart logs information about kaspad when the main server has
// been started to the Windows event log.
func logServiceStart() {
var message string
message += fmt.Sprintf("Version %s\n", version.Version())
message += fmt.Sprintf("Configuration directory: %s\n", config.DefaultHomeDir)
message += fmt.Sprintf("Configuration file: %s\n", cfg.ConfigFile)
message += fmt.Sprintf("Data directory: %s\n", cfg.DataDir)
elog.Info(1, message)
}
// kaspadService houses the main service handler which handles all service
// updates and launching kaspadMain.
type kaspadService struct{}
// Execute is the main entry point the winsvc package calls when receiving
// information from the Windows service control manager. It launches the
// long-running kaspadMain (which is the real meat of kaspad), handles service
// change requests, and notifies the service control manager of changes.
func (s *kaspadService) Execute(args []string, r <-chan svc.ChangeRequest, changes chan<- svc.Status) (bool, uint32) {
// Service start is pending.
const cmdsAccepted = svc.AcceptStop | svc.AcceptShutdown
changes <- svc.Status{State: svc.StartPending}
// Start kaspadMain in a separate goroutine so the service can start
// quickly. Shutdown (along with a potential error) is reported via
// doneChan. startedChan is notified once kaspad is started so this can
// be properly logged
doneChan := make(chan error)
startedChan := make(chan struct{})
spawn("kaspadMain-windows", func() {
err := kaspadMain(startedChan)
doneChan <- err
})
// Service is now started.
changes <- svc.Status{State: svc.Running, Accepts: cmdsAccepted}
loop:
for {
select {
case c := <-r:
switch c.Cmd {
case svc.Interrogate:
changes <- c.CurrentStatus
case svc.Stop, svc.Shutdown:
// Service stop is pending. Don't accept any
// more commands while pending.
changes <- svc.Status{State: svc.StopPending}
// Signal the main function to exit.
signal.ShutdownRequestChannel <- struct{}{}
default:
elog.Error(1, fmt.Sprintf("Unexpected control "+
"request #%d.", c))
}
case <-startedChan:
logServiceStart()
case err := <-doneChan:
if err != nil {
elog.Error(1, err.Error())
}
break loop
}
}
// Service is now stopped.
changes <- svc.Status{State: svc.Stopped}
return false, 0
}
// installService attempts to install the kaspad service. Typically this should
// be done by the msi installer, but it is provided here since it can be useful
// for development.
func installService() error {
// Get the path of the current executable. This is needed because
// os.Args[0] can vary depending on how the application was launched.
// For example, under cmd.exe it will only be the name of the app
// without the path or extension, but under mingw it will be the full
// path including the extension.
exePath, err := filepath.Abs(os.Args[0])
if err != nil {
return err
}
if filepath.Ext(exePath) == "" {
exePath += ".exe"
}
// Connect to the windows service manager.
serviceManager, err := mgr.Connect()
if err != nil {
return err
}
defer serviceManager.Disconnect()
// Ensure the service doesn't already exist.
service, err := serviceManager.OpenService(svcName)
if err == nil {
service.Close()
return errors.Errorf("service %s already exists", svcName)
}
// Install the service.
service, err = serviceManager.CreateService(svcName, exePath, mgr.Config{
DisplayName: svcDisplayName,
Description: svcDesc,
})
if err != nil {
return err
}
defer service.Close()
// Support events to the event log using the standard "standard" Windows
// EventCreate.exe message file. This allows easy logging of custom
// messges instead of needing to create our own message catalog.
eventlog.Remove(svcName)
eventsSupported := uint32(eventlog.Error | eventlog.Warning | eventlog.Info)
return eventlog.InstallAsEventCreate(svcName, eventsSupported)
}
// removeService attempts to uninstall the kaspad service. Typically this should
// be done by the msi uninstaller, but it is provided here since it can be
// useful for development. Not the eventlog entry is intentionally not removed
// since it would invalidate any existing event log messages.
func removeService() error {
// Connect to the windows service manager.
serviceManager, err := mgr.Connect()
if err != nil {
return err
}
defer serviceManager.Disconnect()
// Ensure the service exists.
service, err := serviceManager.OpenService(svcName)
if err != nil {
return errors.Errorf("service %s is not installed", svcName)
}
defer service.Close()
// Remove the service.
return service.Delete()
}
// startService attempts to Start the kaspad service.
func startService() error {
// Connect to the windows service manager.
serviceManager, err := mgr.Connect()
if err != nil {
return err
}
defer serviceManager.Disconnect()
service, err := serviceManager.OpenService(svcName)
if err != nil {
return errors.Errorf("could not access service: %s", err)
}
defer service.Close()
err = service.Start(os.Args)
if err != nil {
return errors.Errorf("could not start service: %s", err)
}
return nil
}
// controlService allows commands which change the status of the service. It
// also waits for up to 10 seconds for the service to change to the passed
// state.
func controlService(c svc.Cmd, to svc.State) error {
// Connect to the windows service manager.
serviceManager, err := mgr.Connect()
if err != nil {
return err
}
defer serviceManager.Disconnect()
service, err := serviceManager.OpenService(svcName)
if err != nil {
return errors.Errorf("could not access service: %s", err)
}
defer service.Close()
status, err := service.Control(c)
if err != nil {
return errors.Errorf("could not send control=%d: %s", c, err)
}
// Send the control message.
timeout := time.Now().Add(10 * time.Second)
for status.State != to {
if timeout.Before(time.Now()) {
return errors.Errorf("timeout waiting for service to go "+
"to state=%d", to)
}
time.Sleep(300 * time.Millisecond)
status, err = service.Query()
if err != nil {
return errors.Errorf("could not retrieve service "+
"status: %s", err)
}
}
return nil
}
// performServiceCommand attempts to run one of the supported service commands
// provided on the command line via the service command flag. An appropriate
// error is returned if an invalid command is specified.
func performServiceCommand(command string) error {
var err error
switch command {
case "install":
err = installService()
case "remove":
err = removeService()
case "start":
err = startService()
case "stop":
err = controlService(svc.Stop, svc.Stopped)
default:
err = errors.Errorf("invalid service command [%s]", command)
}
return err
}
// serviceMain checks whether we're being invoked as a service, and if so uses
// the service control manager to start the long-running server. A flag is
// returned to the caller so the application can determine whether to exit (when
// running as a service) or launch in normal interactive mode.
func serviceMain() (bool, error) {
// Don't run as a service if we're running interactively (or that can't
// be determined due to an error).
isInteractive, err := svc.IsAnInteractiveSession()
if err != nil {
return false, err
}
if isInteractive {
return false, nil
}
elog, err = eventlog.Open(svcName)
if err != nil {
return false, err
}
defer elog.Close()
err = svc.Run(svcName, &kaspadService{})
if err != nil {
elog.Error(1, fmt.Sprintf("Service start failed: %s", err))
return true, err
}
return true, nil
}
// Set windows specific functions to real functions.
func init() {
config.RunServiceCommand = performServiceCommand
winServiceMain = serviceMain
}

View File

@ -10,7 +10,7 @@ import (
)
type appHarness struct {
app *app.App
app *app.ComponentManager
rpcClient *testRPCClient
p2pAddress string
rpcAddress string
@ -108,7 +108,7 @@ func teardownHarness(t *testing.T, harness *appHarness) {
func setApp(t *testing.T, harness *appHarness) {
var err error
harness.app, err = app.New(harness.config, harness.databaseContext, make(chan struct{}))
harness.app, err = app.NewComponentManager(harness.config, harness.databaseContext, make(chan struct{}))
if err != nil {
t.Fatalf("Error creating app: %+v", err)
}