Better policy for used outpoints and wait for first sync when creating an unsigned tx

This commit is contained in:
Ori Newman 2023-12-26 18:51:33 +02:00
parent c28eaa6998
commit eabbdad738
3 changed files with 37 additions and 26 deletions

View File

@ -35,7 +35,6 @@ func (s *server) createUnsignedTransactions(address string, amount uint64, isSen
if !s.isSynced() { if !s.isSynced() {
return nil, errors.Errorf("wallet daemon is not synced yet, %s", s.formatSyncStateReport()) return nil, errors.Errorf("wallet daemon is not synced yet, %s", s.formatSyncStateReport())
} }
// make sure address string is correct before proceeding to a // make sure address string is correct before proceeding to a
// potentially long UTXO refreshment operation // potentially long UTXO refreshment operation
toAddress, err := util.DecodeAddress(address, s.params.Prefix) toAddress, err := util.DecodeAddress(address, s.params.Prefix)
@ -113,7 +112,9 @@ func (s *server) selectUTXOs(spendAmount uint64, isSendAll bool, feePerInput uin
} }
if broadcastTime, ok := s.usedOutpoints[*utxo.Outpoint]; ok { if broadcastTime, ok := s.usedOutpoints[*utxo.Outpoint]; ok {
if time.Since(broadcastTime) > time.Minute { // We want to free an outpoint from the used outpoints set if we refresh the UTXOs since it was
// marked as used, and it least one minute has passed.
if time.Since(broadcastTime) > time.Minute && s.startTimeOfLastCompletedRefresh.After(broadcastTime) {
delete(s.usedOutpoints, *utxo.Outpoint) delete(s.usedOutpoints, *utxo.Outpoint)
} else { } else {
continue continue

View File

@ -6,6 +6,7 @@ import (
"net" "net"
"os" "os"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
@ -37,9 +38,11 @@ type server struct {
keysFile *keys.File keysFile *keys.File
shutdown chan struct{} shutdown chan struct{}
forceSyncChan chan struct{} forceSyncChan chan struct{}
startTimeOfLastCompletedRefresh time.Time
addressSet walletAddressSet addressSet walletAddressSet
txMassCalculator *txmass.Calculator txMassCalculator *txmass.Calculator
usedOutpoints map[externalapi.DomainOutpoint]time.Time usedOutpoints map[externalapi.DomainOutpoint]time.Time
firstSyncDone atomic.Bool
isLogFinalProgressLineShown bool isLogFinalProgressLineShown bool
maxUsedAddressesForLog uint32 maxUsedAddressesForLog uint32

View File

@ -37,20 +37,20 @@ func (s *server) syncLoop() error {
return err return err
} }
s.firstSyncDone.Store(true)
log.Infof("Wallet is synced and ready for operation")
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
err := s.sync()
if err != nil {
return err
}
case <-s.forceSyncChan: case <-s.forceSyncChan:
}
err := s.sync() err := s.sync()
if err != nil { if err != nil {
return err return err
} }
} }
}
} }
func (s *server) sync() error { func (s *server) sync() error {
@ -219,7 +219,7 @@ func (s *server) updateAddressesAndLastUsedIndexes(requestedAddressSet walletAdd
} }
// updateUTXOSet clears the current UTXO set, and re-fills it with the given entries // updateUTXOSet clears the current UTXO set, and re-fills it with the given entries
func (s *server) updateUTXOSet(entries []*appmessage.UTXOsByAddressesEntry, mempoolEntries []*appmessage.MempoolEntryByAddress) error { func (s *server) updateUTXOSet(entries []*appmessage.UTXOsByAddressesEntry, mempoolEntries []*appmessage.MempoolEntryByAddress, refreshStart time.Time) error {
utxos := make([]*walletUTXO, 0, len(entries)) utxos := make([]*walletUTXO, 0, len(entries))
exclude := make(map[appmessage.RPCOutpoint]struct{}) exclude := make(map[appmessage.RPCOutpoint]struct{})
@ -260,6 +260,7 @@ func (s *server) updateUTXOSet(entries []*appmessage.UTXOsByAddressesEntry, memp
sort.Slice(utxos, func(i, j int) bool { return utxos[i].UTXOEntry.Amount() > utxos[j].UTXOEntry.Amount() }) sort.Slice(utxos, func(i, j int) bool { return utxos[i].UTXOEntry.Amount() > utxos[j].UTXOEntry.Amount() })
s.lock.Lock() s.lock.Lock()
s.startTimeOfLastCompletedRefresh = refreshStart
s.utxosSortedByAmount = utxos s.utxosSortedByAmount = utxos
s.lock.Unlock() s.lock.Unlock()
@ -267,6 +268,7 @@ func (s *server) updateUTXOSet(entries []*appmessage.UTXOsByAddressesEntry, memp
} }
func (s *server) refreshUTXOs() error { func (s *server) refreshUTXOs() error {
refreshStart := time.Now()
// It's important to check the mempool before calling `GetUTXOsByAddresses`: // It's important to check the mempool before calling `GetUTXOsByAddresses`:
// If we would do it the other way around an output can be spent in the mempool // If we would do it the other way around an output can be spent in the mempool
// and not in consensus, and between the calls its spending transaction will be // and not in consensus, and between the calls its spending transaction will be
@ -282,11 +284,13 @@ func (s *server) refreshUTXOs() error {
return err return err
} }
return s.updateUTXOSet(getUTXOsByAddressesResponse.Entries, mempoolEntriesByAddresses.Entries) return s.updateUTXOSet(getUTXOsByAddressesResponse.Entries, mempoolEntriesByAddresses.Entries, refreshStart)
} }
func (s *server) forceSync() { func (s *server) forceSync() {
if len(s.forceSyncChan) != 0 { // Technically if two callers check the `if` simultaneously they will both spawn a
// goroutine, but we don't care about the small redundancy in such a rare case.
if len(s.forceSyncChan) == 0 {
go func() { go func() {
s.forceSyncChan <- struct{}{} s.forceSyncChan <- struct{}{}
}() }()
@ -294,7 +298,7 @@ func (s *server) forceSync() {
} }
func (s *server) isSynced() bool { func (s *server) isSynced() bool {
return s.nextSyncStartIndex > s.maxUsedIndex() return s.nextSyncStartIndex > s.maxUsedIndex() && s.firstSyncDone.Load()
} }
func (s *server) formatSyncStateReport() string { func (s *server) formatSyncStateReport() string {
@ -304,8 +308,11 @@ func (s *server) formatSyncStateReport() string {
maxUsedIndex = s.nextSyncStartIndex maxUsedIndex = s.nextSyncStartIndex
} }
if s.nextSyncStartIndex < s.maxUsedIndex() {
return fmt.Sprintf("scanned %d out of %d addresses (%.2f%%)", return fmt.Sprintf("scanned %d out of %d addresses (%.2f%%)",
s.nextSyncStartIndex, maxUsedIndex, float64(s.nextSyncStartIndex)*100.0/float64(maxUsedIndex)) s.nextSyncStartIndex, maxUsedIndex, float64(s.nextSyncStartIndex)*100.0/float64(maxUsedIndex))
}
return "loading the wallet UTXO set"
} }
func (s *server) updateSyncingProgressLog(currProcessedAddresses, currMaxUsedAddresses uint32) { func (s *server) updateSyncingProgressLog(currProcessedAddresses, currMaxUsedAddresses uint32) {
@ -324,7 +331,7 @@ func (s *server) updateSyncingProgressLog(currProcessedAddresses, currMaxUsedAdd
if s.maxProcessedAddressesForLog >= s.maxUsedAddressesForLog { if s.maxProcessedAddressesForLog >= s.maxUsedAddressesForLog {
if !s.isLogFinalProgressLineShown { if !s.isLogFinalProgressLineShown {
log.Infof("Wallet is synced, ready for queries") log.Infof("Finished scanning recent addresses")
s.isLogFinalProgressLineShown = true s.isLogFinalProgressLineShown = true
} }
} else { } else {