Compare commits

..

2 Commits

Author SHA1 Message Date
Kaspa Profiler
28c5e50a87 Parallel miner 2021-04-13 12:51:04 +03:00
Kaspa Profiler
635b793e14 Add --delay to kaspad 2021-04-13 12:51:02 +03:00
4 changed files with 61 additions and 42 deletions

View File

@@ -1,6 +1,8 @@
package blockrelay
import (
"time"
"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol/common"
peerpkg "github.com/kaspanet/kaspad/app/protocol/peer"
@@ -11,6 +13,7 @@ import (
"github.com/kaspanet/kaspad/domain/consensus/utils/consensushashing"
"github.com/kaspanet/kaspad/infrastructure/config"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
"github.com/kaspanet/kaspad/util/mstime"
"github.com/pkg/errors"
)
@@ -109,6 +112,8 @@ func (flow *handleRelayInvsFlow) start() error {
if err != nil {
return err
}
msTime := mstime.UnixMilliseconds(block.Header.TimeInMilliseconds())
time.Sleep(time.Until(msTime.ToNativeTime().Add(config.DelayDuration)))
log.Debugf("Processing block %s", inv.Hash)
missingParents, blockInsertionResult, err := flow.processBlock(block)

View File

@@ -2,7 +2,9 @@ package main
import (
nativeerrors "errors"
"fmt"
"math/rand"
"runtime"
"sync/atomic"
"time"
@@ -42,39 +44,43 @@ func mineLoop(client *minerClient, numberOfBlocks uint64, targetBlocksPerSecond
templatesLoop(client, miningAddr, errChan)
})
spawn("blocksLoop", func() {
const windowSize = 10
var expectedDurationForWindow time.Duration
var windowExpectedEndTime time.Time
hasBlockRateTarget := targetBlocksPerSecond != 0
if hasBlockRateTarget {
expectedDurationForWindow = time.Duration(float64(windowSize)/targetBlocksPerSecond) * time.Second
windowExpectedEndTime = time.Now().Add(expectedDurationForWindow)
}
blockInWindowIndex := 0
sleepTime := 0 * time.Second
for {
foundBlockChan <- mineNextBlock(mineWhenNotSynced)
for c := 0; c < (runtime.NumCPU()/2)+1; c++ {
c := c
spawn(fmt.Sprintf("blocksLoop %d", c), func() {
const windowSize = 10
var expectedDurationForWindow time.Duration
var windowExpectedEndTime time.Time
hasBlockRateTarget := targetBlocksPerSecond != 0
if hasBlockRateTarget {
blockInWindowIndex++
if blockInWindowIndex == windowSize-1 {
deviation := windowExpectedEndTime.Sub(time.Now())
if deviation > 0 {
sleepTime = deviation / windowSize
log.Infof("Finished to mine %d blocks %s earlier than expected. Setting the miner "+
"to sleep %s between blocks to compensate",
windowSize, deviation, sleepTime)
}
blockInWindowIndex = 0
windowExpectedEndTime = time.Now().Add(expectedDurationForWindow)
}
time.Sleep(sleepTime)
expectedDurationForWindow = time.Duration(float64(windowSize)/targetBlocksPerSecond) * time.Second
windowExpectedEndTime = time.Now().Add(expectedDurationForWindow)
}
}
})
blockInWindowIndex := 0
sleepTime := 0 * time.Second
for {
foundBlockChan <- mineNextBlock(mineWhenNotSynced)
if hasBlockRateTarget {
blockInWindowIndex++
if blockInWindowIndex == windowSize-1 {
deviation := windowExpectedEndTime.Sub(time.Now())
if deviation > 0 {
sleepTime = deviation / windowSize
log.Infof("cpu: %d Finished to mine %d blocks %s earlier than expected. Setting the miner "+
"to sleep %s between blocks to compensate",
c, windowSize, deviation, sleepTime)
}
blockInWindowIndex = 0
windowExpectedEndTime = time.Now().Add(expectedDurationForWindow)
}
time.Sleep(sleepTime)
}
}
})
}
spawn("handleFoundBlock", func() {
for i := uint64(0); numberOfBlocks == 0 || i < numberOfBlocks; i++ {

View File

@@ -25,6 +25,9 @@ import (
"github.com/pkg/errors"
)
// DelayDuration a duration for the delay, global for testing
var DelayDuration time.Duration
const (
defaultConfigFilename = "kaspad.conf"
defaultDataDirname = "data"
@@ -117,6 +120,7 @@ type Flags struct {
MaxUTXOCacheSize uint64 `long:"maxutxocachesize" description:"Max size of loaded UTXO into ram from the disk in bytes"`
UTXOIndex bool `long:"utxoindex" description:"Enable the UTXO index"`
IsArchivalNode bool `long:"archival" description:"Run as an archival node: don't delete old block data when moving the pruning point (Warning: heavy disk usage)'"`
Delay float32 `long:"delay" description:"Provide a delay in seconds as a floating point"`
NetworkFlags
ServiceOptions *ServiceOptions
}
@@ -574,6 +578,7 @@ func LoadConfig() (*Config, error) {
log.Warnf("%s", configFileError)
}
DelayDuration = time.Duration(cfg.Delay * float32(time.Second))
return cfg, nil
}

View File

@@ -3,6 +3,7 @@ package netadapter
import (
"sync"
"sync/atomic"
"time"
"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/infrastructure/config"
@@ -175,18 +176,20 @@ func (na *NetAdapter) ID() *id.ID {
// P2PBroadcast sends the given `message` to every peer corresponding
// to each NetConnection in the given netConnections
func (na *NetAdapter) P2PBroadcast(netConnections []*NetConnection, message appmessage.Message) error {
na.p2pConnectionsLock.RLock()
defer na.p2pConnectionsLock.RUnlock()
for _, netConnection := range netConnections {
err := netConnection.router.OutgoingRoute().Enqueue(message)
if err != nil {
if errors.Is(err, routerpkg.ErrRouteClosed) {
log.Debugf("Cannot enqueue message to %s: router is closed", netConnection)
continue
go func() {
time.Sleep(config.DelayDuration)
na.p2pConnectionsLock.RLock()
defer na.p2pConnectionsLock.RUnlock()
for _, netConnection := range netConnections {
err := netConnection.router.OutgoingRoute().Enqueue(message)
if err != nil {
if errors.Is(err, routerpkg.ErrRouteClosed) {
log.Debugf("Cannot enqueue message to %s: router is closed", netConnection)
continue
}
log.Error(err)
}
return err
}
}
}()
return nil
}