kaspad/app/app.go
stasatdaglabs 8a4ece1101
[NOD-1223] Reorganize project (#868)
* [NOD-1223] Move all network stuff into a new network package.

* [NOD-1223] Delete the unused package testutil.

* [NOD-1223] Move infrastructure stuff into a new instrastructure package.

* [NOD-1223] Move domain stuff into a new domain package.
2020-08-13 17:27:25 +03:00

248 lines
7.1 KiB
Go

package app
import (
"fmt"
"sync/atomic"
"github.com/kaspanet/kaspad/network/addressmanager"
"github.com/kaspanet/kaspad/network/netadapter/id"
"github.com/kaspanet/kaspad/domain/blockdag"
"github.com/kaspanet/kaspad/domain/blockdag/indexers"
"github.com/kaspanet/kaspad/domain/mempool"
"github.com/kaspanet/kaspad/domain/mining"
"github.com/kaspanet/kaspad/domain/txscript"
"github.com/kaspanet/kaspad/infrastructure/config"
"github.com/kaspanet/kaspad/infrastructure/dbaccess"
"github.com/kaspanet/kaspad/infrastructure/signal"
"github.com/kaspanet/kaspad/network/connmanager"
"github.com/kaspanet/kaspad/network/dnsseed"
"github.com/kaspanet/kaspad/network/domainmessage"
"github.com/kaspanet/kaspad/network/netadapter"
"github.com/kaspanet/kaspad/network/protocol"
"github.com/kaspanet/kaspad/network/rpc"
"github.com/kaspanet/kaspad/util"
"github.com/kaspanet/kaspad/util/panics"
)
// App is a wrapper for all the kaspad services
type App struct {
cfg *config.Config
rpcServer *rpc.Server
addressManager *addressmanager.AddressManager
protocolManager *protocol.Manager
connectionManager *connmanager.ConnectionManager
netAdapter *netadapter.NetAdapter
started, shutdown int32
}
// Start launches all the kaspad services.
func (a *App) Start() {
// Already started?
if atomic.AddInt32(&a.started, 1) != 1 {
return
}
log.Trace("Starting kaspad")
err := a.protocolManager.Start()
if err != nil {
panics.Exit(log, fmt.Sprintf("Error starting the p2p protocol: %+v", err))
}
a.maybeSeedFromDNS()
a.connectionManager.Start()
if !a.cfg.DisableRPC {
a.rpcServer.Start()
}
}
// Stop gracefully shuts down all the kaspad services.
func (a *App) Stop() {
// Make sure this only happens once.
if atomic.AddInt32(&a.shutdown, 1) != 1 {
log.Infof("Kaspad is already in the process of shutting down")
return
}
log.Warnf("Kaspad shutting down")
a.connectionManager.Stop()
err := a.protocolManager.Stop()
if err != nil {
log.Errorf("Error stopping the p2p protocol: %+v", err)
}
// Shutdown the RPC server if it's not disabled.
if !a.cfg.DisableRPC {
err := a.rpcServer.Stop()
if err != nil {
log.Errorf("Error stopping rpcServer: %+v", err)
}
}
err = a.addressManager.Stop()
if err != nil {
log.Errorf("Error stopping address manager: %s", err)
}
return
}
// New returns a new App instance configured to listen on addr for the
// kaspa network type specified by dagParams. Use start to begin accepting
// connections from peers.
func New(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, interrupt <-chan struct{}) (*App, error) {
indexManager, acceptanceIndex := setupIndexes(cfg)
sigCache := txscript.NewSigCache(cfg.SigCacheMaxSize)
// Create a new block DAG instance with the appropriate configuration.
dag, err := setupDAG(cfg, databaseContext, interrupt, sigCache, indexManager)
if err != nil {
return nil, err
}
txMempool := setupMempool(cfg, dag, sigCache)
netAdapter, err := netadapter.NewNetAdapter(cfg)
if err != nil {
return nil, err
}
addressManager := addressmanager.New(cfg, databaseContext)
connectionManager, err := connmanager.New(cfg, netAdapter, addressManager)
if err != nil {
return nil, err
}
protocolManager, err := protocol.NewManager(cfg, dag, netAdapter, addressManager, txMempool, connectionManager)
if err != nil {
return nil, err
}
rpcServer, err := setupRPC(
cfg, dag, txMempool, sigCache, acceptanceIndex, connectionManager, addressManager, protocolManager)
if err != nil {
return nil, err
}
return &App{
cfg: cfg,
rpcServer: rpcServer,
protocolManager: protocolManager,
connectionManager: connectionManager,
netAdapter: netAdapter,
addressManager: addressManager,
}, nil
}
func (a *App) maybeSeedFromDNS() {
if !a.cfg.DisableDNSSeed {
dnsseed.SeedFromDNS(a.cfg.NetParams(), a.cfg.DNSSeed, domainmessage.SFNodeNetwork, false, nil,
a.cfg.Lookup, func(addresses []*domainmessage.NetAddress) {
// Kaspad uses a lookup of the dns seeder here. Since seeder returns
// IPs of nodes and not its own IP, we can not know real IP of
// source. So we'll take first returned address as source.
a.addressManager.AddAddresses(addresses, addresses[0], nil)
})
}
}
func setupDAG(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, interrupt <-chan struct{},
sigCache *txscript.SigCache, indexManager blockdag.IndexManager) (*blockdag.BlockDAG, error) {
dag, err := blockdag.New(&blockdag.Config{
Interrupt: interrupt,
DatabaseContext: databaseContext,
DAGParams: cfg.NetParams(),
TimeSource: blockdag.NewTimeSource(),
SigCache: sigCache,
IndexManager: indexManager,
SubnetworkID: cfg.SubnetworkID,
})
return dag, err
}
func setupIndexes(cfg *config.Config) (blockdag.IndexManager, *indexers.AcceptanceIndex) {
// Create indexes if needed.
var indexes []indexers.Indexer
var acceptanceIndex *indexers.AcceptanceIndex
if cfg.AcceptanceIndex {
log.Info("acceptance index is enabled")
acceptanceIndex = indexers.NewAcceptanceIndex()
indexes = append(indexes, acceptanceIndex)
}
// Create an index manager if any of the optional indexes are enabled.
if len(indexes) < 0 {
return nil, nil
}
indexManager := indexers.NewManager(indexes)
return indexManager, acceptanceIndex
}
func setupMempool(cfg *config.Config, dag *blockdag.BlockDAG, sigCache *txscript.SigCache) *mempool.TxPool {
mempoolConfig := mempool.Config{
Policy: mempool.Policy{
AcceptNonStd: cfg.RelayNonStd,
MaxOrphanTxs: cfg.MaxOrphanTxs,
MaxOrphanTxSize: config.DefaultMaxOrphanTxSize,
MinRelayTxFee: cfg.MinRelayTxFee,
MaxTxVersion: 1,
},
CalcSequenceLockNoLock: func(tx *util.Tx, utxoSet blockdag.UTXOSet) (*blockdag.SequenceLock, error) {
return dag.CalcSequenceLockNoLock(tx, utxoSet)
},
SigCache: sigCache,
DAG: dag,
}
return mempool.New(&mempoolConfig)
}
func setupRPC(cfg *config.Config,
dag *blockdag.BlockDAG,
txMempool *mempool.TxPool,
sigCache *txscript.SigCache,
acceptanceIndex *indexers.AcceptanceIndex,
connectionManager *connmanager.ConnectionManager,
addressManager *addressmanager.AddressManager,
protocolManager *protocol.Manager) (*rpc.Server, error) {
if !cfg.DisableRPC {
policy := mining.Policy{
BlockMaxMass: cfg.BlockMaxMass,
}
blockTemplateGenerator := mining.NewBlkTmplGenerator(&policy, txMempool, dag, sigCache)
rpcServer, err := rpc.NewRPCServer(cfg, dag, txMempool, acceptanceIndex, blockTemplateGenerator,
connectionManager, addressManager, protocolManager)
if err != nil {
return nil, err
}
// Signal process shutdown when the RPC server requests it.
spawn("setupRPC-handleShutdownRequest", func() {
<-rpcServer.RequestedProcessShutdown()
signal.ShutdownRequestChannel <- struct{}{}
})
return rpcServer, nil
}
return nil, nil
}
// P2PNodeID returns the network ID associated with this App
func (a *App) P2PNodeID() *id.ID {
return a.netAdapter.ID()
}
// AddressManager returns the AddressManager associated with this App
func (a *App) AddressManager() *addressmanager.AddressManager {
return a.addressManager
}