mirror of
https://github.com/kaspanet/kaspad.git
synced 2025-03-30 15:08:33 +00:00
[NOD-827] Get rid of dbtools insecureimport.go and loadheaders.go (#655)
* [NOD-827] Get rid of dbtools insecureimport.go and loadheaders.go * [NOD-827] Remove commands from realMain().
This commit is contained in:
parent
3d8dd8724d
commit
299826f392
@ -1,394 +0,0 @@
|
||||
// Copyright (c) 2015-2016 The btcsuite developers
|
||||
// Use of this source code is governed by an ISC
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"github.com/pkg/errors"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/kaspanet/kaspad/database"
|
||||
"github.com/kaspanet/kaspad/util"
|
||||
"github.com/kaspanet/kaspad/wire"
|
||||
)
|
||||
|
||||
// importCmd defines the configuration options for the insecureimport command.
|
||||
type importCmd struct {
|
||||
InFile string `short:"i" long:"infile" description:"File containing the block(s)"`
|
||||
Progress int `short:"p" long:"progress" description:"Show a progress message each time this number of seconds have passed -- Use 0 to disable progress announcements"`
|
||||
}
|
||||
|
||||
var (
|
||||
// importCfg defines the configuration options for the command.
|
||||
importCfg = importCmd{
|
||||
InFile: "bootstrap.dat",
|
||||
Progress: 10,
|
||||
}
|
||||
)
|
||||
|
||||
// importResults houses the stats and result as an import operation.
|
||||
type importResults struct {
|
||||
blocksProcessed int64
|
||||
blocksImported int64
|
||||
err error
|
||||
}
|
||||
|
||||
// blockImporter houses information about an ongoing import from a block data
|
||||
// file to the block database.
|
||||
type blockImporter struct {
|
||||
db database.DB
|
||||
r io.ReadSeeker
|
||||
processQueue chan []byte
|
||||
doneChan chan bool
|
||||
errChan chan error
|
||||
quit chan struct{}
|
||||
wg sync.WaitGroup
|
||||
blocksProcessed int64
|
||||
blocksImported int64
|
||||
receivedLogBlocks int64
|
||||
receivedLogTx int64
|
||||
lastHeight int64
|
||||
lastBlockTime time.Time
|
||||
lastLogTime time.Time
|
||||
}
|
||||
|
||||
// readBlock reads the next block from the input file.
|
||||
func (bi *blockImporter) readBlock() ([]byte, error) {
|
||||
// The block file format is:
|
||||
// <network> <block length> <serialized block>
|
||||
var net uint32
|
||||
err := binary.Read(bi.r, binary.LittleEndian, &net)
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// No block and no error means there are no more blocks to read.
|
||||
return nil, nil
|
||||
}
|
||||
if net != uint32(activeNetParams.Net) {
|
||||
return nil, errors.Errorf("network mismatch -- got %x, want %x",
|
||||
net, uint32(activeNetParams.Net))
|
||||
}
|
||||
|
||||
// Read the block length and ensure it is sane.
|
||||
var blockLen uint32
|
||||
if err := binary.Read(bi.r, binary.LittleEndian, &blockLen); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if blockLen > wire.MaxMessagePayload {
|
||||
return nil, errors.Errorf("block payload of %d bytes is larger "+
|
||||
"than the max allowed %d bytes", blockLen,
|
||||
wire.MaxMessagePayload)
|
||||
}
|
||||
|
||||
serializedBlock := make([]byte, blockLen)
|
||||
if _, err := io.ReadFull(bi.r, serializedBlock); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return serializedBlock, nil
|
||||
}
|
||||
|
||||
// processBlock potentially imports the block into the database. It first
|
||||
// deserializes the raw block while checking for errors. Already known blocks
|
||||
// are skipped and orphan blocks are considered errors. Returns whether the
|
||||
// block was imported along with any potential errors.
|
||||
//
|
||||
// NOTE: This is not a safe import as it does not verify DAG rules.
|
||||
func (bi *blockImporter) processBlock(serializedBlock []byte) (bool, error) {
|
||||
// Deserialize the block which includes checks for malformed blocks.
|
||||
block, err := util.NewBlockFromBytes(serializedBlock)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// update progress statistics
|
||||
bi.lastBlockTime = block.MsgBlock().Header.Timestamp
|
||||
bi.receivedLogTx += int64(len(block.MsgBlock().Transactions))
|
||||
|
||||
// Skip blocks that already exist.
|
||||
var exists bool
|
||||
err = bi.db.View(func(dbTx database.Tx) error {
|
||||
exists, err = dbTx.HasBlock(block.Hash())
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if exists {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Don't bother trying to process orphans.
|
||||
parentHashes := block.MsgBlock().Header.ParentHashes
|
||||
for _, parentHash := range parentHashes {
|
||||
var exists bool
|
||||
err := bi.db.View(func(dbTx database.Tx) error {
|
||||
exists, err = dbTx.HasBlock(parentHash)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !exists {
|
||||
return false, errors.Errorf("import file contains block "+
|
||||
"%s which does not link to the available "+
|
||||
"block DAG", parentHash)
|
||||
}
|
||||
}
|
||||
|
||||
// Put the blocks into the database with no checking of DAG rules.
|
||||
err = bi.db.Update(func(dbTx database.Tx) error {
|
||||
return dbTx.StoreBlock(block)
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// readHandler is the main handler for reading blocks from the import file.
|
||||
// This allows block processing to take place in parallel with block reads.
|
||||
// It must be run as a goroutine.
|
||||
func (bi *blockImporter) readHandler() {
|
||||
out:
|
||||
for {
|
||||
// Read the next block from the file and if anything goes wrong
|
||||
// notify the status handler with the error and bail.
|
||||
serializedBlock, err := bi.readBlock()
|
||||
if err != nil {
|
||||
bi.errChan <- errors.Errorf("Error reading from input "+
|
||||
"file: %s", err.Error())
|
||||
break out
|
||||
}
|
||||
|
||||
// A nil block with no error means we're done.
|
||||
if serializedBlock == nil {
|
||||
break out
|
||||
}
|
||||
|
||||
// Send the block or quit if we've been signalled to exit by
|
||||
// the status handler due to an error elsewhere.
|
||||
select {
|
||||
case bi.processQueue <- serializedBlock:
|
||||
case <-bi.quit:
|
||||
break out
|
||||
}
|
||||
}
|
||||
|
||||
// Close the processing channel to signal no more blocks are coming.
|
||||
close(bi.processQueue)
|
||||
bi.wg.Done()
|
||||
}
|
||||
|
||||
// logProgress logs block progress as an information message. In order to
|
||||
// prevent spam, it limits logging to one message every importCfg.Progress
|
||||
// seconds with duration and totals included.
|
||||
func (bi *blockImporter) logProgress() {
|
||||
bi.receivedLogBlocks++
|
||||
|
||||
now := time.Now()
|
||||
duration := now.Sub(bi.lastLogTime)
|
||||
if duration < time.Second*time.Duration(importCfg.Progress) {
|
||||
return
|
||||
}
|
||||
|
||||
// Truncate the duration to 10s of milliseconds.
|
||||
durationMillis := int64(duration / time.Millisecond)
|
||||
tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10)
|
||||
|
||||
// Log information about new block height.
|
||||
blockStr := "blocks"
|
||||
if bi.receivedLogBlocks == 1 {
|
||||
blockStr = "block"
|
||||
}
|
||||
txStr := "transactions"
|
||||
if bi.receivedLogTx == 1 {
|
||||
txStr = "transaction"
|
||||
}
|
||||
log.Infof("Processed %d %s in the last %s (%d %s, height %d, %s)",
|
||||
bi.receivedLogBlocks, blockStr, tDuration, bi.receivedLogTx,
|
||||
txStr, bi.lastHeight, bi.lastBlockTime)
|
||||
|
||||
bi.receivedLogBlocks = 0
|
||||
bi.receivedLogTx = 0
|
||||
bi.lastLogTime = now
|
||||
}
|
||||
|
||||
// processHandler is the main handler for processing blocks. This allows block
|
||||
// processing to take place in parallel with block reads from the import file.
|
||||
// It must be run as a goroutine.
|
||||
func (bi *blockImporter) processHandler() {
|
||||
out:
|
||||
for {
|
||||
select {
|
||||
case serializedBlock, ok := <-bi.processQueue:
|
||||
// We're done when the channel is closed.
|
||||
if !ok {
|
||||
break out
|
||||
}
|
||||
|
||||
bi.blocksProcessed++
|
||||
bi.lastHeight++
|
||||
imported, err := bi.processBlock(serializedBlock)
|
||||
if err != nil {
|
||||
bi.errChan <- err
|
||||
break out
|
||||
}
|
||||
|
||||
if imported {
|
||||
bi.blocksImported++
|
||||
}
|
||||
|
||||
bi.logProgress()
|
||||
|
||||
case <-bi.quit:
|
||||
break out
|
||||
}
|
||||
}
|
||||
bi.wg.Done()
|
||||
}
|
||||
|
||||
// statusHandler waits for updates from the import operation and notifies
|
||||
// the passed doneChan with the results of the import. It also causes all
|
||||
// goroutines to exit if an error is reported from any of them.
|
||||
func (bi *blockImporter) statusHandler(resultsChan chan *importResults) {
|
||||
select {
|
||||
// An error from either of the goroutines means we're done so signal
|
||||
// caller with the error and signal all goroutines to quit.
|
||||
case err := <-bi.errChan:
|
||||
resultsChan <- &importResults{
|
||||
blocksProcessed: bi.blocksProcessed,
|
||||
blocksImported: bi.blocksImported,
|
||||
err: err,
|
||||
}
|
||||
close(bi.quit)
|
||||
|
||||
// The import finished normally.
|
||||
case <-bi.doneChan:
|
||||
resultsChan <- &importResults{
|
||||
blocksProcessed: bi.blocksProcessed,
|
||||
blocksImported: bi.blocksImported,
|
||||
err: nil,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Import is the core function which handles importing the blocks from the file
|
||||
// associated with the block importer to the database. It returns a channel
|
||||
// on which the results will be returned when the operation has completed.
|
||||
func (bi *blockImporter) Import() chan *importResults {
|
||||
// Start up the read and process handling goroutines. This setup allows
|
||||
// blocks to be read from disk in parallel while being processed.
|
||||
bi.wg.Add(2)
|
||||
spawn(bi.readHandler)
|
||||
spawn(bi.processHandler)
|
||||
|
||||
// Wait for the import to finish in a separate goroutine and signal
|
||||
// the status handler when done.
|
||||
spawn(func() {
|
||||
bi.wg.Wait()
|
||||
bi.doneChan <- true
|
||||
})
|
||||
|
||||
// Start the status handler and return the result channel that it will
|
||||
// send the results on when the import is done.
|
||||
resultChan := make(chan *importResults)
|
||||
spawn(func() {
|
||||
bi.statusHandler(resultChan)
|
||||
})
|
||||
return resultChan
|
||||
}
|
||||
|
||||
// newBlockImporter returns a new importer for the provided file reader seeker
|
||||
// and database.
|
||||
func newBlockImporter(db database.DB, r io.ReadSeeker) *blockImporter {
|
||||
return &blockImporter{
|
||||
db: db,
|
||||
r: r,
|
||||
processQueue: make(chan []byte, 2),
|
||||
doneChan: make(chan bool),
|
||||
errChan: make(chan error),
|
||||
quit: make(chan struct{}),
|
||||
lastLogTime: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
// Execute is the main entry point for the command. It's invoked by the parser.
|
||||
func (cmd *importCmd) Execute(args []string) error {
|
||||
// Setup the global config options and ensure they are valid.
|
||||
if err := setupGlobalConfig(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Ensure the specified block file exists.
|
||||
if !fileExists(cmd.InFile) {
|
||||
str := "The specified block file [%s] does not exist"
|
||||
return errors.Errorf(str, cmd.InFile)
|
||||
}
|
||||
|
||||
// Load the block database.
|
||||
db, err := loadBlockDB()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Ensure the database is sync'd and closed on Ctrl+C.
|
||||
addInterruptHandler(func() {
|
||||
log.Infof("Gracefully shutting down the database...")
|
||||
db.Close()
|
||||
})
|
||||
|
||||
fi, err := os.Open(importCfg.InFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer fi.Close()
|
||||
|
||||
// Create a block importer for the database and input file and start it.
|
||||
// The results channel returned from start will contain an error if
|
||||
// anything went wrong.
|
||||
importer := newBlockImporter(db, fi)
|
||||
|
||||
// Perform the import asynchronously and signal the main goroutine when
|
||||
// done. This allows blocks to be processed and read in parallel. The
|
||||
// results channel returned from Import contains the statistics about
|
||||
// the import including an error if something went wrong. This is done
|
||||
// in a separate goroutine rather than waiting directly so the main
|
||||
// goroutine can be signaled for shutdown by either completion, error,
|
||||
// or from the main interrupt handler. This is necessary since the main
|
||||
// goroutine must be kept running long enough for the interrupt handler
|
||||
// goroutine to finish.
|
||||
spawn(func() {
|
||||
log.Info("Starting import")
|
||||
resultsChan := importer.Import()
|
||||
results := <-resultsChan
|
||||
if results.err != nil {
|
||||
var dbErr database.Error
|
||||
ok := errors.As(err, dbErr)
|
||||
if !ok || ok && dbErr.ErrorCode != database.ErrDbNotOpen {
|
||||
shutdownChannel <- results.err
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof("Processed a total of %d blocks (%d imported, %d "+
|
||||
"already known)", results.blocksProcessed,
|
||||
results.blocksImported,
|
||||
results.blocksProcessed-results.blocksImported)
|
||||
shutdownChannel <- nil
|
||||
})
|
||||
|
||||
// Wait for shutdown signal from either a normal completion or from the
|
||||
// interrupt handler.
|
||||
err = <-shutdownChannel
|
||||
return err
|
||||
}
|
@ -1,93 +0,0 @@
|
||||
// Copyright (c) 2015-2016 The btcsuite developers
|
||||
// Use of this source code is governed by an ISC
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/kaspanet/kaspad/database"
|
||||
"github.com/kaspanet/kaspad/util/daghash"
|
||||
)
|
||||
|
||||
// headersCmd defines the configuration options for the loadheaders command.
|
||||
type headersCmd struct {
|
||||
Bulk bool `long:"bulk" description:"Use bulk loading of headers instead of one at a time"`
|
||||
}
|
||||
|
||||
var (
|
||||
// headersCfg defines the configuration options for the command.
|
||||
headersCfg = headersCmd{
|
||||
Bulk: false,
|
||||
}
|
||||
)
|
||||
|
||||
// Execute is the main entry point for the command. It's invoked by the parser.
|
||||
func (cmd *headersCmd) Execute(args []string) error {
|
||||
// Setup the global config options and ensure they are valid.
|
||||
if err := setupGlobalConfig(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Load the block database.
|
||||
db, err := loadBlockDB()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// NOTE: This code will only work for ffldb. Ideally the package using
|
||||
// the database would keep a metadata index of its own.
|
||||
blockIdxName := []byte("ffldb-blockidx")
|
||||
if !headersCfg.Bulk {
|
||||
err = db.View(func(dbTx database.Tx) error {
|
||||
totalHdrs := 0
|
||||
blockIdxBucket := dbTx.Metadata().Bucket(blockIdxName)
|
||||
blockIdxBucket.ForEach(func(k, v []byte) error {
|
||||
totalHdrs++
|
||||
return nil
|
||||
})
|
||||
log.Infof("Loading headers for %d blocks...", totalHdrs)
|
||||
numLoaded := 0
|
||||
startTime := time.Now()
|
||||
blockIdxBucket.ForEach(func(k, v []byte) error {
|
||||
var hash daghash.Hash
|
||||
copy(hash[:], k)
|
||||
_, err := dbTx.FetchBlockHeader(&hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
numLoaded++
|
||||
return nil
|
||||
})
|
||||
log.Infof("Loaded %d headers in %s", numLoaded,
|
||||
time.Since(startTime))
|
||||
return nil
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// Bulk load headers.
|
||||
err = db.View(func(dbTx database.Tx) error {
|
||||
blockIdxBucket := dbTx.Metadata().Bucket(blockIdxName)
|
||||
hashes := make([]*daghash.Hash, 0, 500000)
|
||||
blockIdxBucket.ForEach(func(k, v []byte) error {
|
||||
var hash daghash.Hash
|
||||
copy(hash[:], k)
|
||||
hashes = append(hashes, &hash)
|
||||
return nil
|
||||
})
|
||||
|
||||
log.Infof("Loading headers for %d blocks...", len(hashes))
|
||||
startTime := time.Now()
|
||||
hdrs, err := dbTx.FetchBlockHeaders(hashes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Loaded %d headers in %s", len(hdrs),
|
||||
time.Since(startTime))
|
||||
return nil
|
||||
})
|
||||
return err
|
||||
}
|
@ -79,15 +79,6 @@ func realMain() error {
|
||||
parserFlags := flags.Options(flags.HelpFlag | flags.PassDoubleDash)
|
||||
parser := flags.NewNamedParser(appName, parserFlags)
|
||||
parser.AddGroup("Global Options", "", cfg)
|
||||
parser.AddCommand("insecureimport",
|
||||
"Insecurely import bulk block data from bootstrap.dat",
|
||||
"Insecurely import bulk block data from bootstrap.dat. "+
|
||||
"WARNING: This is NOT secure because it does NOT "+
|
||||
"verify DAG rules. It is only provided for testing "+
|
||||
"purposes.", &importCfg)
|
||||
parser.AddCommand("loadheaders",
|
||||
"Time how long to load headers for all blocks in the database",
|
||||
"", &headersCfg)
|
||||
parser.AddCommand("fetchblock",
|
||||
"Fetch the specific block hash from the database", "",
|
||||
&fetchBlockCfg)
|
||||
|
Loading…
x
Reference in New Issue
Block a user