mirror of
https://github.com/kaspanet/kaspad.git
synced 2025-06-07 06:36:46 +00:00
Lazy wallet utxo sync after broadcasting a tx (#2258)
* Lazy wallet utxo sync after broadcasting a tx * Make a more granular lock for refreshUTXOs * Don't push to forceSyncChan if it has an element * Better policy for used outpoints and wait for first sync when creating an unsigned tx * fix expire condition * lock address reading * fix small memory leak * add an rpc client dedicated for background ops * rename to follow conventions * one more rename * Compare wallet addresses by value * small fixes * Add comment --------- Co-authored-by: Michael Sutton <msutton@cs.huji.ac.il>
This commit is contained in:
parent
629faa8436
commit
d2453f8e7b
@ -2,6 +2,7 @@ package server
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/kaspanet/kaspad/cmd/kaspawallet/daemon/pb"
|
"github.com/kaspanet/kaspad/cmd/kaspawallet/daemon/pb"
|
||||||
"github.com/kaspanet/kaspad/cmd/kaspawallet/libkaspawallet"
|
"github.com/kaspanet/kaspad/cmd/kaspawallet/libkaspawallet"
|
||||||
@ -14,13 +15,15 @@ func (s *server) GetBalance(_ context.Context, _ *pb.GetBalanceRequest) (*pb.Get
|
|||||||
s.lock.RLock()
|
s.lock.RLock()
|
||||||
defer s.lock.RUnlock()
|
defer s.lock.RUnlock()
|
||||||
|
|
||||||
|
if !s.isSynced() {
|
||||||
|
return nil, errors.Errorf("wallet daemon is not synced yet, %s", s.formatSyncStateReport())
|
||||||
|
}
|
||||||
|
|
||||||
dagInfo, err := s.rpcClient.GetBlockDAGInfo()
|
dagInfo, err := s.rpcClient.GetBlockDAGInfo()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
daaScore := dagInfo.VirtualDAAScore
|
daaScore := dagInfo.VirtualDAAScore
|
||||||
maturity := s.params.BlockCoinbaseMaturity
|
|
||||||
|
|
||||||
balancesMap := make(balancesMapType, 0)
|
balancesMap := make(balancesMapType, 0)
|
||||||
for _, entry := range s.utxosSortedByAmount {
|
for _, entry := range s.utxosSortedByAmount {
|
||||||
amount := entry.UTXOEntry.Amount()
|
amount := entry.UTXOEntry.Amount()
|
||||||
@ -30,7 +33,7 @@ func (s *server) GetBalance(_ context.Context, _ *pb.GetBalanceRequest) (*pb.Get
|
|||||||
balances = new(balancesType)
|
balances = new(balancesType)
|
||||||
balancesMap[address] = balances
|
balancesMap[address] = balances
|
||||||
}
|
}
|
||||||
if isUTXOSpendable(entry, daaScore, maturity) {
|
if s.isUTXOSpendable(entry, daaScore) {
|
||||||
balances.available += amount
|
balances.available += amount
|
||||||
} else {
|
} else {
|
||||||
balances.pending += amount
|
balances.pending += amount
|
||||||
@ -64,9 +67,9 @@ func (s *server) GetBalance(_ context.Context, _ *pb.GetBalanceRequest) (*pb.Get
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func isUTXOSpendable(entry *walletUTXO, virtualDAAScore uint64, coinbaseMaturity uint64) bool {
|
func (s *server) isUTXOSpendable(entry *walletUTXO, virtualDAAScore uint64) bool {
|
||||||
if !entry.UTXOEntry.IsCoinbase() {
|
if !entry.UTXOEntry.IsCoinbase() {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return entry.UTXOEntry.BlockDAAScore()+coinbaseMaturity < virtualDAAScore
|
return entry.UTXOEntry.BlockDAAScore()+s.coinbaseMaturity < virtualDAAScore
|
||||||
}
|
}
|
||||||
|
@ -54,11 +54,7 @@ func (s *server) broadcast(transactions [][]byte, isDomain bool) ([]string, erro
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.refreshUTXOs()
|
s.forceSync()
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return txIDs, nil
|
return txIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,14 +3,12 @@ package server
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/kaspanet/kaspad/cmd/kaspawallet/daemon/pb"
|
"github.com/kaspanet/kaspad/cmd/kaspawallet/daemon/pb"
|
||||||
"github.com/kaspanet/kaspad/cmd/kaspawallet/libkaspawallet"
|
"github.com/kaspanet/kaspad/cmd/kaspawallet/libkaspawallet"
|
||||||
"github.com/kaspanet/kaspad/domain/consensus/utils/constants"
|
"github.com/kaspanet/kaspad/domain/consensus/utils/constants"
|
||||||
"github.com/kaspanet/kaspad/util"
|
"github.com/kaspanet/kaspad/util"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"golang.org/x/exp/slices"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO: Implement a better fee estimation mechanism
|
// TODO: Implement a better fee estimation mechanism
|
||||||
@ -35,7 +33,6 @@ func (s *server) createUnsignedTransactions(address string, amount uint64, isSen
|
|||||||
if !s.isSynced() {
|
if !s.isSynced() {
|
||||||
return nil, errors.Errorf("wallet daemon is not synced yet, %s", s.formatSyncStateReport())
|
return nil, errors.Errorf("wallet daemon is not synced yet, %s", s.formatSyncStateReport())
|
||||||
}
|
}
|
||||||
|
|
||||||
// make sure address string is correct before proceeding to a
|
// make sure address string is correct before proceeding to a
|
||||||
// potentially long UTXO refreshment operation
|
// potentially long UTXO refreshment operation
|
||||||
toAddress, err := util.DecodeAddress(address, s.params.Prefix)
|
toAddress, err := util.DecodeAddress(address, s.params.Prefix)
|
||||||
@ -43,16 +40,11 @@ func (s *server) createUnsignedTransactions(address string, amount uint64, isSen
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.refreshUTXOs()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var fromAddresses []*walletAddress
|
var fromAddresses []*walletAddress
|
||||||
for _, from := range fromAddressesString {
|
for _, from := range fromAddressesString {
|
||||||
fromAddress, exists := s.addressSet[from]
|
fromAddress, exists := s.addressSet[from]
|
||||||
if !exists {
|
if !exists {
|
||||||
return nil, fmt.Errorf("Specified from address %s does not exists", from)
|
return nil, fmt.Errorf("specified from address %s does not exists", from)
|
||||||
}
|
}
|
||||||
fromAddresses = append(fromAddresses, fromAddress)
|
fromAddresses = append(fromAddresses, fromAddress)
|
||||||
}
|
}
|
||||||
@ -106,19 +98,14 @@ func (s *server) selectUTXOs(spendAmount uint64, isSendAll bool, feePerInput uin
|
|||||||
return nil, 0, 0, err
|
return nil, 0, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
coinbaseMaturity := s.params.BlockCoinbaseMaturity
|
|
||||||
if dagInfo.NetworkName == "kaspa-testnet-11" {
|
|
||||||
coinbaseMaturity = 1000
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, utxo := range s.utxosSortedByAmount {
|
for _, utxo := range s.utxosSortedByAmount {
|
||||||
if (fromAddresses != nil && !slices.Contains(fromAddresses, utxo.address)) ||
|
if (fromAddresses != nil && !walletAddressesContain(fromAddresses, utxo.address)) ||
|
||||||
!isUTXOSpendable(utxo, dagInfo.VirtualDAAScore, coinbaseMaturity) {
|
!s.isUTXOSpendable(utxo, dagInfo.VirtualDAAScore) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if broadcastTime, ok := s.usedOutpoints[*utxo.Outpoint]; ok {
|
if broadcastTime, ok := s.usedOutpoints[*utxo.Outpoint]; ok {
|
||||||
if time.Since(broadcastTime) > time.Minute {
|
if s.usedOutpointHasExpired(broadcastTime) {
|
||||||
delete(s.usedOutpoints, *utxo.Outpoint)
|
delete(s.usedOutpoints, *utxo.Outpoint)
|
||||||
} else {
|
} else {
|
||||||
continue
|
continue
|
||||||
@ -160,3 +147,13 @@ func (s *server) selectUTXOs(spendAmount uint64, isSendAll bool, feePerInput uin
|
|||||||
|
|
||||||
return selectedUTXOs, totalReceived, totalValue - totalSpend, nil
|
return selectedUTXOs, totalReceived, totalValue - totalSpend, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func walletAddressesContain(addresses []*walletAddress, contain *walletAddress) bool {
|
||||||
|
for _, address := range addresses {
|
||||||
|
if *address == *contain {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
@ -2,12 +2,14 @@ package server
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/kaspanet/kaspad/version"
|
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/kaspanet/kaspad/version"
|
||||||
|
|
||||||
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
|
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
|
||||||
|
|
||||||
"github.com/kaspanet/kaspad/util/txmass"
|
"github.com/kaspanet/kaspad/util/txmass"
|
||||||
@ -28,17 +30,22 @@ import (
|
|||||||
type server struct {
|
type server struct {
|
||||||
pb.UnimplementedKaspawalletdServer
|
pb.UnimplementedKaspawalletdServer
|
||||||
|
|
||||||
rpcClient *rpcclient.RPCClient
|
rpcClient *rpcclient.RPCClient // RPC client for ongoing user requests
|
||||||
params *dagconfig.Params
|
backgroundRPCClient *rpcclient.RPCClient // RPC client dedicated for address and UTXO background fetching
|
||||||
|
params *dagconfig.Params
|
||||||
|
coinbaseMaturity uint64 // Is different from default if we use testnet-11
|
||||||
|
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
utxosSortedByAmount []*walletUTXO
|
utxosSortedByAmount []*walletUTXO
|
||||||
nextSyncStartIndex uint32
|
nextSyncStartIndex uint32
|
||||||
keysFile *keys.File
|
keysFile *keys.File
|
||||||
shutdown chan struct{}
|
shutdown chan struct{}
|
||||||
addressSet walletAddressSet
|
forceSyncChan chan struct{}
|
||||||
txMassCalculator *txmass.Calculator
|
startTimeOfLastCompletedRefresh time.Time
|
||||||
usedOutpoints map[externalapi.DomainOutpoint]time.Time
|
addressSet walletAddressSet
|
||||||
|
txMassCalculator *txmass.Calculator
|
||||||
|
usedOutpoints map[externalapi.DomainOutpoint]time.Time
|
||||||
|
firstSyncDone atomic.Bool
|
||||||
|
|
||||||
isLogFinalProgressLineShown bool
|
isLogFinalProgressLineShown bool
|
||||||
maxUsedAddressesForLog uint32
|
maxUsedAddressesForLog uint32
|
||||||
@ -72,6 +79,10 @@ func Start(params *dagconfig.Params, listen, rpcServer string, keysFilePath stri
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return (errors.Wrapf(err, "Error connecting to RPC server %s", rpcServer))
|
return (errors.Wrapf(err, "Error connecting to RPC server %s", rpcServer))
|
||||||
}
|
}
|
||||||
|
backgroundRPCClient, err := connectToRPC(params, rpcServer, timeout)
|
||||||
|
if err != nil {
|
||||||
|
return (errors.Wrapf(err, "Error making a second connection to RPC server %s", rpcServer))
|
||||||
|
}
|
||||||
|
|
||||||
log.Infof("Connected, reading keys file %s...", keysFilePath)
|
log.Infof("Connected, reading keys file %s...", keysFilePath)
|
||||||
keysFile, err := keys.ReadKeysFile(params, keysFilePath)
|
keysFile, err := keys.ReadKeysFile(params, keysFilePath)
|
||||||
@ -84,13 +95,26 @@ func Start(params *dagconfig.Params, listen, rpcServer string, keysFilePath stri
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dagInfo, err := rpcClient.GetBlockDAGInfo()
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
coinbaseMaturity := params.BlockCoinbaseMaturity
|
||||||
|
if dagInfo.NetworkName == "kaspa-testnet-11" {
|
||||||
|
coinbaseMaturity = 1000
|
||||||
|
}
|
||||||
|
|
||||||
serverInstance := &server{
|
serverInstance := &server{
|
||||||
rpcClient: rpcClient,
|
rpcClient: rpcClient,
|
||||||
|
backgroundRPCClient: backgroundRPCClient,
|
||||||
params: params,
|
params: params,
|
||||||
|
coinbaseMaturity: coinbaseMaturity,
|
||||||
utxosSortedByAmount: []*walletUTXO{},
|
utxosSortedByAmount: []*walletUTXO{},
|
||||||
nextSyncStartIndex: 0,
|
nextSyncStartIndex: 0,
|
||||||
keysFile: keysFile,
|
keysFile: keysFile,
|
||||||
shutdown: make(chan struct{}),
|
shutdown: make(chan struct{}),
|
||||||
|
forceSyncChan: make(chan struct{}),
|
||||||
addressSet: make(walletAddressSet),
|
addressSet: make(walletAddressSet),
|
||||||
txMassCalculator: txmass.NewCalculator(params.MassPerTxByte, params.MassPerScriptPubKeyByte, params.MassPerSigOp),
|
txMassCalculator: txmass.NewCalculator(params.MassPerTxByte, params.MassPerScriptPubKeyByte, params.MassPerSigOp),
|
||||||
usedOutpoints: map[externalapi.DomainOutpoint]time.Time{},
|
usedOutpoints: map[externalapi.DomainOutpoint]time.Time{},
|
||||||
@ -100,8 +124,8 @@ func Start(params *dagconfig.Params, listen, rpcServer string, keysFilePath stri
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("Read, syncing the wallet...")
|
log.Infof("Read, syncing the wallet...")
|
||||||
spawn("serverInstance.sync", func() {
|
spawn("serverInstance.syncLoop", func() {
|
||||||
err := serverInstance.sync()
|
err := serverInstance.syncLoop()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
printErrorAndExit(errors.Wrap(err, "error syncing the wallet"))
|
printErrorAndExit(errors.Wrap(err, "error syncing the wallet"))
|
||||||
}
|
}
|
||||||
|
@ -264,7 +264,7 @@ func (s *server) moreUTXOsForMergeTransaction(alreadySelectedUTXOs []*libkaspawa
|
|||||||
if _, ok := alreadySelectedUTXOsMap[*utxo.Outpoint]; ok {
|
if _, ok := alreadySelectedUTXOsMap[*utxo.Outpoint]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !isUTXOSpendable(utxo, dagInfo.VirtualDAAScore, s.params.BlockCoinbaseMaturity) {
|
if !s.isUTXOSpendable(utxo, dagInfo.VirtualDAAScore) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
additionalUTXOs = append(additionalUTXOs, &libkaspawallet.UTXO{
|
additionalUTXOs = append(additionalUTXOs, &libkaspawallet.UTXO{
|
||||||
|
@ -23,7 +23,7 @@ func (was walletAddressSet) strings() []string {
|
|||||||
return addresses
|
return addresses
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) sync() error {
|
func (s *server) syncLoop() error {
|
||||||
ticker := time.NewTicker(time.Second)
|
ticker := time.NewTicker(time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
@ -32,29 +32,39 @@ func (s *server) sync() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.refreshExistingUTXOsWithLock()
|
err = s.refreshUTXOs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for range ticker.C {
|
s.firstSyncDone.Store(true)
|
||||||
err = s.collectFarAddresses()
|
log.Infof("Wallet is synced and ready for operation")
|
||||||
if err != nil {
|
|
||||||
return err
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
case <-s.forceSyncChan:
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.collectRecentAddresses()
|
err := s.sync()
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = s.refreshExistingUTXOsWithLock()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
func (s *server) sync() error {
|
||||||
|
err := s.collectFarAddresses()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.collectRecentAddresses()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.refreshUTXOs()
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -158,7 +168,7 @@ func (s *server) collectAddresses(start, end uint32) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
getBalancesByAddressesResponse, err := s.rpcClient.GetBalancesByAddresses(addressSet.strings())
|
getBalancesByAddressesResponse, err := s.backgroundRPCClient.GetBalancesByAddresses(addressSet.strings())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -208,15 +218,17 @@ func (s *server) updateAddressesAndLastUsedIndexes(requestedAddressSet walletAdd
|
|||||||
return s.keysFile.SetLastUsedInternalIndex(lastUsedInternalIndex)
|
return s.keysFile.SetLastUsedInternalIndex(lastUsedInternalIndex)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) refreshExistingUTXOsWithLock() error {
|
func (s *server) usedOutpointHasExpired(outpointBroadcastTime time.Time) bool {
|
||||||
s.lock.Lock()
|
// If the node returns a UTXO we previously attempted to spend and enough time has passed, we assume
|
||||||
defer s.lock.Unlock()
|
// that the network rejected or lost the previous transaction and allow a reuse. We set this time
|
||||||
|
// interval to a minute.
|
||||||
return s.refreshUTXOs()
|
// We also verify that a full refresh UTXO operation started after this time point and has already
|
||||||
|
// completed, in order to make sure that indeed this state reflects a state obtained following the required wait time.
|
||||||
|
return s.startTimeOfLastCompletedRefresh.After(outpointBroadcastTime.Add(time.Minute))
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateUTXOSet clears the current UTXO set, and re-fills it with the given entries
|
// updateUTXOSet clears the current UTXO set, and re-fills it with the given entries
|
||||||
func (s *server) updateUTXOSet(entries []*appmessage.UTXOsByAddressesEntry, mempoolEntries []*appmessage.MempoolEntryByAddress) error {
|
func (s *server) updateUTXOSet(entries []*appmessage.UTXOsByAddressesEntry, mempoolEntries []*appmessage.MempoolEntryByAddress, refreshStart time.Time) error {
|
||||||
utxos := make([]*walletUTXO, 0, len(entries))
|
utxos := make([]*walletUTXO, 0, len(entries))
|
||||||
|
|
||||||
exclude := make(map[appmessage.RPCOutpoint]struct{})
|
exclude := make(map[appmessage.RPCOutpoint]struct{})
|
||||||
@ -243,6 +255,7 @@ func (s *server) updateUTXOSet(entries []*appmessage.UTXOsByAddressesEntry, memp
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// No need to lock for reading since the only writer of this set is on `syncLoop` on the same goroutine.
|
||||||
address, ok := s.addressSet[entry.Address]
|
address, ok := s.addressSet[entry.Address]
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.Errorf("Got result from address %s even though it wasn't requested", entry.Address)
|
return errors.Errorf("Got result from address %s even though it wasn't requested", entry.Address)
|
||||||
@ -256,32 +269,56 @@ func (s *server) updateUTXOSet(entries []*appmessage.UTXOsByAddressesEntry, memp
|
|||||||
|
|
||||||
sort.Slice(utxos, func(i, j int) bool { return utxos[i].UTXOEntry.Amount() > utxos[j].UTXOEntry.Amount() })
|
sort.Slice(utxos, func(i, j int) bool { return utxos[i].UTXOEntry.Amount() > utxos[j].UTXOEntry.Amount() })
|
||||||
|
|
||||||
|
s.lock.Lock()
|
||||||
|
s.startTimeOfLastCompletedRefresh = refreshStart
|
||||||
s.utxosSortedByAmount = utxos
|
s.utxosSortedByAmount = utxos
|
||||||
|
|
||||||
|
// Cleanup expired used outpoints to avoid a memory leak
|
||||||
|
for outpoint, broadcastTime := range s.usedOutpoints {
|
||||||
|
if s.usedOutpointHasExpired(broadcastTime) {
|
||||||
|
delete(s.usedOutpoints, outpoint)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.lock.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) refreshUTXOs() error {
|
func (s *server) refreshUTXOs() error {
|
||||||
|
refreshStart := time.Now()
|
||||||
|
|
||||||
|
// No need to lock for reading since the only writer of this set is on `syncLoop` on the same goroutine.
|
||||||
|
addresses := s.addressSet.strings()
|
||||||
// It's important to check the mempool before calling `GetUTXOsByAddresses`:
|
// It's important to check the mempool before calling `GetUTXOsByAddresses`:
|
||||||
// If we would do it the other way around an output can be spent in the mempool
|
// If we would do it the other way around an output can be spent in the mempool
|
||||||
// and not in consensus, and between the calls its spending transaction will be
|
// and not in consensus, and between the calls its spending transaction will be
|
||||||
// added to consensus and removed from the mempool, so `getUTXOsByAddressesResponse`
|
// added to consensus and removed from the mempool, so `getUTXOsByAddressesResponse`
|
||||||
// will include an obsolete output.
|
// will include an obsolete output.
|
||||||
mempoolEntriesByAddresses, err := s.rpcClient.GetMempoolEntriesByAddresses(s.addressSet.strings(), true, true)
|
mempoolEntriesByAddresses, err := s.backgroundRPCClient.GetMempoolEntriesByAddresses(addresses, true, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
getUTXOsByAddressesResponse, err := s.rpcClient.GetUTXOsByAddresses(s.addressSet.strings())
|
getUTXOsByAddressesResponse, err := s.backgroundRPCClient.GetUTXOsByAddresses(addresses)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.updateUTXOSet(getUTXOsByAddressesResponse.Entries, mempoolEntriesByAddresses.Entries)
|
return s.updateUTXOSet(getUTXOsByAddressesResponse.Entries, mempoolEntriesByAddresses.Entries, refreshStart)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *server) forceSync() {
|
||||||
|
// Technically if two callers check the `if` simultaneously they will both spawn a
|
||||||
|
// goroutine, but we don't care about the small redundancy in such a rare case.
|
||||||
|
if len(s.forceSyncChan) == 0 {
|
||||||
|
go func() {
|
||||||
|
s.forceSyncChan <- struct{}{}
|
||||||
|
}()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) isSynced() bool {
|
func (s *server) isSynced() bool {
|
||||||
return s.nextSyncStartIndex > s.maxUsedIndex()
|
return s.nextSyncStartIndex > s.maxUsedIndex() && s.firstSyncDone.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) formatSyncStateReport() string {
|
func (s *server) formatSyncStateReport() string {
|
||||||
@ -291,8 +328,11 @@ func (s *server) formatSyncStateReport() string {
|
|||||||
maxUsedIndex = s.nextSyncStartIndex
|
maxUsedIndex = s.nextSyncStartIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Sprintf("scanned %d out of %d addresses (%.2f%%)",
|
if s.nextSyncStartIndex < s.maxUsedIndex() {
|
||||||
s.nextSyncStartIndex, maxUsedIndex, float64(s.nextSyncStartIndex)*100.0/float64(maxUsedIndex))
|
return fmt.Sprintf("scanned %d out of %d addresses (%.2f%%)",
|
||||||
|
s.nextSyncStartIndex, maxUsedIndex, float64(s.nextSyncStartIndex)*100.0/float64(maxUsedIndex))
|
||||||
|
}
|
||||||
|
return "loading the wallet UTXO set"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) updateSyncingProgressLog(currProcessedAddresses, currMaxUsedAddresses uint32) {
|
func (s *server) updateSyncingProgressLog(currProcessedAddresses, currMaxUsedAddresses uint32) {
|
||||||
@ -311,7 +351,7 @@ func (s *server) updateSyncingProgressLog(currProcessedAddresses, currMaxUsedAdd
|
|||||||
|
|
||||||
if s.maxProcessedAddressesForLog >= s.maxUsedAddressesForLog {
|
if s.maxProcessedAddressesForLog >= s.maxUsedAddressesForLog {
|
||||||
if !s.isLogFinalProgressLineShown {
|
if !s.isLogFinalProgressLineShown {
|
||||||
log.Infof("Wallet is synced, ready for queries")
|
log.Infof("Finished scanning recent addresses")
|
||||||
s.isLogFinalProgressLineShown = true
|
s.isLogFinalProgressLineShown = true
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user