Automatically register for tx notifications after a rescan.

This changes the behavior of the rescan RPC to automatically set the
client up for transaction notifications for transactions paying to any
rescanned address and spending outputs in the final rescan UTXO set
after a rescanned is performed through the best block in the chain.
This commit is contained in:
Josh Rickmar 2015-02-19 14:51:44 -05:00
parent 951f244f87
commit 63c1172aa8
3 changed files with 150 additions and 79 deletions

View File

@ -125,6 +125,14 @@ type isCurrentMsg struct {
reply chan bool reply chan bool
} }
// pauseMsg is a message type to be sent across the message channel for
// pausing the block manager. This effectively provides the caller with
// exclusive access over the manager until a receive is performed on the
// unpause channel.
type pauseMsg struct {
unpause <-chan struct{}
}
// headerNode is used as a node in a list of headers that are linked together // headerNode is used as a node in a list of headers that are linked together
// between checkpoints. // between checkpoints.
type headerNode struct { type headerNode struct {
@ -1056,6 +1064,10 @@ out:
case isCurrentMsg: case isCurrentMsg:
msg.reply <- b.current() msg.reply <- b.current()
case pauseMsg:
// Wait until the sender unpauses the manager.
<-msg.unpause
default: default:
bmgrLog.Warnf("Invalid message type in block "+ bmgrLog.Warnf("Invalid message type in block "+
"handler: %T", msg) "handler: %T", msg)
@ -1306,6 +1318,16 @@ func (b *blockManager) IsCurrent() bool {
return <-reply return <-reply
} }
// Pause pauses the block manager until the returned channel is closed.
//
// Note that while paused, all peer and block processing is halted. The
// message sender should avoid pausing the block manager for long durations.
func (b *blockManager) Pause() chan<- struct{} {
c := make(chan struct{})
b.msgChan <- pauseMsg{c}
return c
}
// newBlockManager returns a new bitcoin block manager. // newBlockManager returns a new bitcoin block manager.
// Use Start to begin processing asynchronous block and inv updates. // Use Start to begin processing asynchronous block and inv updates.
func newBlockManager(s *server) (*blockManager, error) { func newBlockManager(s *server) (*blockManager, error) {

View File

@ -682,7 +682,7 @@ The following is an overview of the RPC method requests available exclusively to
|Method|rescan| |Method|rescan|
|Notifications|[recvtx](#recvtx), [redeemingtx](#redeemingtx), [rescanprogress](#rescanprogress), and [rescanfinished](#rescanfinished)| |Notifications|[recvtx](#recvtx), [redeemingtx](#redeemingtx), [rescanprogress](#rescanprogress), and [rescanfinished](#rescanfinished)|
|Parameters|1. BeginBlock (string, required) block hash to begin rescanning from<br />2. Addresses (JSON array, required)<br />&nbsp;`[ (json array of strings)`<br />&nbsp;&nbsp;`"bitcoinaddress", (string) the bitcoin address`<br />&nbsp;&nbsp;`...` <br />&nbsp;`]`<br />3. Outpoints (JSON array, required)<br />&nbsp;`[ (JSON array)`<br />&nbsp;&nbsp;`{ (JSON object)`<br />&nbsp;&nbsp;&nbsp;`"hash":"data", (string) the hex-encoded bytes of the outpoint hash`<br />&nbsp;&nbsp;&nbsp;`"index":n (numeric) the txout index of the outpoint`<br />&nbsp;&nbsp;`},`<br />&nbsp;&nbsp;`...`<br />&nbsp;`]`<br />4. EndBlock (string, optional) hash of final block to rescan| |Parameters|1. BeginBlock (string, required) block hash to begin rescanning from<br />2. Addresses (JSON array, required)<br />&nbsp;`[ (json array of strings)`<br />&nbsp;&nbsp;`"bitcoinaddress", (string) the bitcoin address`<br />&nbsp;&nbsp;`...` <br />&nbsp;`]`<br />3. Outpoints (JSON array, required)<br />&nbsp;`[ (JSON array)`<br />&nbsp;&nbsp;`{ (JSON object)`<br />&nbsp;&nbsp;&nbsp;`"hash":"data", (string) the hex-encoded bytes of the outpoint hash`<br />&nbsp;&nbsp;&nbsp;`"index":n (numeric) the txout index of the outpoint`<br />&nbsp;&nbsp;`},`<br />&nbsp;&nbsp;`...`<br />&nbsp;`]`<br />4. EndBlock (string, optional) hash of final block to rescan|
|Description|Rescan block chain for transactions to addresses, starting at block BeginBlock and ending at EndBlock. If EndBlock is omitted, the rescan continues through the best block in the main chain. The current known UTXO set for all passed addresses at height BeginBlock should included in the Outpoints argument. Rescan results are sent as recvtx and redeemingtx notifications. This call returns once the rescan completes.| |Description|Rescan block chain for transactions to addresses, starting at block BeginBlock and ending at EndBlock. The current known UTXO set for all passed addresses at height BeginBlock should included in the Outpoints argument. If EndBlock is omitted, the rescan continues through the best block in the main chain. Additionally, if no EndBlock is provided, the client is automatically registered for transaction notifications for all rescanned addresses and the final UTXO set. Rescan results are sent as recvtx and redeemingtx notifications. This call returns once the rescan completes.|
|Returns|Nothing| |Returns|Nothing|
[Return to Overview](#ExtensionRequestOverview)<br /> [Return to Overview](#ExtensionRequestOverview)<br />

View File

@ -244,7 +244,7 @@ type notificationRegisterNewMempoolTxs wsClient
type notificationUnregisterNewMempoolTxs wsClient type notificationUnregisterNewMempoolTxs wsClient
type notificationRegisterSpent struct { type notificationRegisterSpent struct {
wsc *wsClient wsc *wsClient
op *wire.OutPoint ops []*wire.OutPoint
} }
type notificationUnregisterSpent struct { type notificationUnregisterSpent struct {
wsc *wsClient wsc *wsClient
@ -252,7 +252,7 @@ type notificationUnregisterSpent struct {
} }
type notificationRegisterAddr struct { type notificationRegisterAddr struct {
wsc *wsClient wsc *wsClient
addr string addrs []string
} }
type notificationUnregisterAddr struct { type notificationUnregisterAddr struct {
wsc *wsClient wsc *wsClient
@ -342,13 +342,13 @@ out:
delete(clients, wsc.quit) delete(clients, wsc.quit)
case *notificationRegisterSpent: case *notificationRegisterSpent:
m.addSpentRequest(watchedOutPoints, n.wsc, n.op) m.addSpentRequests(watchedOutPoints, n.wsc, n.ops)
case *notificationUnregisterSpent: case *notificationUnregisterSpent:
m.removeSpentRequest(watchedOutPoints, n.wsc, n.op) m.removeSpentRequest(watchedOutPoints, n.wsc, n.op)
case *notificationRegisterAddr: case *notificationRegisterAddr:
m.addAddrRequest(watchedAddrs, n.wsc, n.addr) m.addAddrRequests(watchedAddrs, n.wsc, n.addrs)
case *notificationUnregisterAddr: case *notificationUnregisterAddr:
m.removeAddrRequest(watchedAddrs, n.wsc, n.addr) m.removeAddrRequest(watchedAddrs, n.wsc, n.addr)
@ -511,36 +511,38 @@ func (m *wsNotificationManager) notifyForNewTx(clients map[chan struct{}]*wsClie
} }
} }
// RegisterSpentRequest requests an notification when the passed outpoint is // RegisterSpentRequests requests a notification when each of the passed
// confirmed spent (contained in a block connected to the main chain) for the // outpoints is confirmed spent (contained in a block connected to the main
// passed websocket client. The request is automatically removed once the // chain) for the passed websocket client. The request is automatically
// notification has been sent. // removed once the notification has been sent.
func (m *wsNotificationManager) RegisterSpentRequest(wsc *wsClient, op *wire.OutPoint) { func (m *wsNotificationManager) RegisterSpentRequests(wsc *wsClient, ops []*wire.OutPoint) {
m.queueNotification <- &notificationRegisterSpent{ m.queueNotification <- &notificationRegisterSpent{
wsc: wsc, wsc: wsc,
op: op, ops: ops,
} }
} }
// addSpentRequest modifies a map of watched outpoints to sets of websocket // addSpentRequests modifies a map of watched outpoints to sets of websocket
// clients to add a new request watch the outpoint op and create and send // clients to add a new request watch all of the outpoints in ops and create
// a notification when spent to the websocket client wsc. // and send a notification when spent to the websocket client wsc.
func (*wsNotificationManager) addSpentRequest(ops map[wire.OutPoint]map[chan struct{}]*wsClient, func (*wsNotificationManager) addSpentRequests(opMap map[wire.OutPoint]map[chan struct{}]*wsClient,
wsc *wsClient, op *wire.OutPoint) { wsc *wsClient, ops []*wire.OutPoint) {
// Track the request in the client as well so it can be quickly be for _, op := range ops {
// removed on disconnect. // Track the request in the client as well so it can be quickly
// be removed on disconnect.
wsc.spentRequests[*op] = struct{}{} wsc.spentRequests[*op] = struct{}{}
// Add the client to the list to notify when the outpoint is seen. // Add the client to the list to notify when the outpoint is seen.
// Create the list as needed. // Create the list as needed.
cmap, ok := ops[*op] cmap, ok := opMap[*op]
if !ok { if !ok {
cmap = make(map[chan struct{}]*wsClient) cmap = make(map[chan struct{}]*wsClient)
ops[*op] = cmap opMap[*op] = cmap
} }
cmap[wsc.quit] = wsc cmap[wsc.quit] = wsc
} }
}
// UnregisterSpentRequest removes a request from the passed websocket client // UnregisterSpentRequest removes a request from the passed websocket client
// to be notified when the passed outpoint is confirmed spent (contained in a // to be notified when the passed outpoint is confirmed spent (contained in a
@ -647,9 +649,9 @@ func (m *wsNotificationManager) notifyForTxOuts(ops map[wire.OutPoint]map[chan s
continue continue
} }
op := wire.NewOutPoint(tx.Sha(), uint32(i)) op := []*wire.OutPoint{wire.NewOutPoint(tx.Sha(), uint32(i))}
for wscQuit, wsc := range cmap { for wscQuit, wsc := range cmap {
m.addSpentRequest(ops, wsc, op) m.addSpentRequests(ops, wsc, op)
if _, ok := wscNotified[wscQuit]; !ok { if _, ok := wscNotified[wscQuit]; !ok {
wscNotified[wscQuit] = struct{}{} wscNotified[wscQuit] = struct{}{}
@ -713,34 +715,36 @@ func (m *wsNotificationManager) notifyForTxIns(ops map[wire.OutPoint]map[chan st
} }
} }
// RegisterTxOutAddressRequest requests notifications to the passed websocket // RegisterTxOutAddressRequests requests notifications to the passed websocket
// client when a transaction output spends to the passed address. // client when a transaction output spends to the passed address.
func (m *wsNotificationManager) RegisterTxOutAddressRequest(wsc *wsClient, addr string) { func (m *wsNotificationManager) RegisterTxOutAddressRequests(wsc *wsClient, addrs []string) {
m.queueNotification <- &notificationRegisterAddr{ m.queueNotification <- &notificationRegisterAddr{
wsc: wsc, wsc: wsc,
addr: addr, addrs: addrs,
} }
} }
// addAddrRequest adds the websocket client wsc to the address to client set // addAddrRequests adds the websocket client wsc to the address to client set
// addrs so wsc will be notified for any mempool or block transaction outputs // addrMap so wsc will be notified for any mempool or block transaction outputs
// spending to addr. // spending to any of the addresses in addrs.
func (*wsNotificationManager) addAddrRequest(addrs map[string]map[chan struct{}]*wsClient, func (*wsNotificationManager) addAddrRequests(addrMap map[string]map[chan struct{}]*wsClient,
wsc *wsClient, addr string) { wsc *wsClient, addrs []string) {
for _, addr := range addrs {
// Track the request in the client as well so it can be quickly be // Track the request in the client as well so it can be quickly be
// removed on disconnect. // removed on disconnect.
wsc.addrRequests[addr] = struct{}{} wsc.addrRequests[addr] = struct{}{}
// Add the client to the set of clients to notify when the outpoint is // Add the client to the set of clients to notify when the
// seen. Create map as needed. // outpoint is seen. Create map as needed.
cmap, ok := addrs[addr] cmap, ok := addrMap[addr]
if !ok { if !ok {
cmap = make(map[chan struct{}]*wsClient) cmap = make(map[chan struct{}]*wsClient)
addrs[addr] = cmap addrMap[addr] = cmap
} }
cmap[wsc.quit] = wsc cmap[wsc.quit] = wsc
} }
}
// UnregisterTxOutAddressRequest removes a request from the passed websocket // UnregisterTxOutAddressRequest removes a request from the passed websocket
// client to be notified when a transaction spends to the passed address. // client to be notified when a transaction spends to the passed address.
@ -1410,9 +1414,7 @@ func handleNotifySpent(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.E
index := cmd.OutPoints[i].Index index := cmd.OutPoints[i].Index
outpoints = append(outpoints, wire.NewOutPoint(blockHash, index)) outpoints = append(outpoints, wire.NewOutPoint(blockHash, index))
} }
for _, outpoint := range outpoints { wsc.server.ntfnMgr.RegisterSpentRequests(wsc, outpoints)
wsc.server.ntfnMgr.RegisterSpentRequest(wsc, outpoint)
}
return nil, nil return nil, nil
} }
@ -1437,19 +1439,19 @@ func handleNotifyReceived(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjso
return nil, &btcjson.ErrInternal return nil, &btcjson.ErrInternal
} }
for _, addrStr := range cmd.Addresses { // Decode addresses to validate input, but the strings slice is used
addr, err := btcutil.DecodeAddress(addrStr, activeNetParams.Params) // directly if these are all ok.
for _, addr := range cmd.Addresses {
_, err := btcutil.DecodeAddress(addr, activeNetParams.Params)
if err != nil { if err != nil {
e := btcjson.Error{ e := btcjson.Error{
Code: btcjson.ErrInvalidAddressOrKey.Code, Code: btcjson.ErrInvalidAddressOrKey.Code,
Message: fmt.Sprintf("Invalid address or key: %v", addrStr), Message: fmt.Sprintf("Invalid address or key: %v", addr),
} }
return nil, &e return nil, &e
} }
wsc.server.ntfnMgr.RegisterTxOutAddressRequest(wsc, addr.EncodeAddress())
} }
wsc.server.ntfnMgr.RegisterTxOutAddressRequests(wsc, cmd.Addresses)
return nil, nil return nil, nil
} }
@ -1462,6 +1464,18 @@ type rescanKeys struct {
unspent map[wire.OutPoint]struct{} unspent map[wire.OutPoint]struct{}
} }
// unspentSlice returns a slice of currently-unspent outpoints for the rescan
// lookup keys. This is primarily intended to be used to register outpoints
// for continuous notifications after a rescan has completed.
func (r *rescanKeys) unspentSlice() []*wire.OutPoint {
ops := make([]*wire.OutPoint, 0, len(r.unspent))
for op := range r.unspent {
opCopy := op
ops = append(ops, &opCopy)
}
return ops
}
// ErrRescanReorg defines the error that is returned when an unrecoverable // ErrRescanReorg defines the error that is returned when an unrecoverable
// reorganize is detected during a rescan. // reorganize is detected during a rescan.
var ErrRescanReorg = btcjson.Error{ var ErrRescanReorg = btcjson.Error{
@ -1608,7 +1622,7 @@ func rescanBlock(wsc *wsClient, lookups *rescanKeys, blk *btcutil.Block) {
// range of blocks. If this condition does not hold true, the JSON-RPC error // range of blocks. If this condition does not hold true, the JSON-RPC error
// for an unrecoverable reorganize is returned. // for an unrecoverable reorganize is returned.
func recoverFromReorg(db database.Db, minBlock, maxBlock int64, func recoverFromReorg(db database.Db, minBlock, maxBlock int64,
lastBlock *btcutil.Block) ([]wire.ShaHash, *btcjson.Error) { lastBlock *wire.ShaHash) ([]wire.ShaHash, *btcjson.Error) {
hashList, err := db.FetchHeightRange(minBlock, maxBlock) hashList, err := db.FetchHeightRange(minBlock, maxBlock)
if err != nil { if err != nil {
@ -1632,18 +1646,12 @@ func recoverFromReorg(db database.Db, minBlock, maxBlock int64,
} }
// descendantBlock returns the appropiate JSON-RPC error if a current block // descendantBlock returns the appropiate JSON-RPC error if a current block
// 'cur' fetched during a reorganize is not a direct child of the parent block // fetched during a reorganize is not a direct child of the parent block hash.
// 'prev'. func descendantBlock(prevHash *wire.ShaHash, curBlock *btcutil.Block) *btcjson.Error {
func descendantBlock(prev, cur *btcutil.Block) *btcjson.Error { curHash := &curBlock.MsgBlock().Header.PrevBlock
curSha := &cur.MsgBlock().Header.PrevBlock if !prevHash.IsEqual(curHash) {
prevSha, err := prev.Sha()
if err != nil {
rpcsLog.Errorf("Unknown problem creating block sha: %v", err)
return &btcjson.ErrInternal
}
if !prevSha.IsEqual(curSha) {
rpcsLog.Errorf("Stopping rescan for reorged block %v "+ rpcsLog.Errorf("Stopping rescan for reorged block %v "+
"(replaced by block %v)", prevSha, curSha) "(replaced by block %v)", prevHash, curHash)
return &ErrRescanReorg return &ErrRescanReorg
} }
return nil return nil
@ -1765,7 +1773,10 @@ func handleRescan(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error)
} }
} }
// lastBlock and lastBlockHash track the previously-rescanned block.
// They equal nil when no previous blocks have been rescanned.
var lastBlock *btcutil.Block var lastBlock *btcutil.Block
var lastBlockHash *wire.ShaHash
// A ticker is created to wait at least 10 seconds before notifying the // A ticker is created to wait at least 10 seconds before notifying the
// websocket client of the current progress completed by the rescan. // websocket client of the current progress completed by the rescan.
@ -1782,6 +1793,43 @@ fetchRange:
return nil, &btcjson.ErrDatabase return nil, &btcjson.ErrDatabase
} }
if len(hashList) == 0 { if len(hashList) == 0 {
// The rescan is finished if no blocks hashes for this
// range were successfully fetched and a stop block
// was provided.
if maxBlock != database.AllShas {
break
}
// If the rescan is through the current block, set up
// the client to continue to receive notifications
// regarding all rescanned addresses and the current set
// of unspent outputs.
//
// This is done safely by temporarily grabbing exclusive
// access of the block manager. If no more blocks have
// been attached between this pause and the fetch above,
// then it is safe to register the websocket client for
// continuous notifications if necessary. Otherwise,
// continue the fetch loop again to rescan the new
// blocks (or error due to an irrecoverable reorganize).
pauseGuard := wsc.server.server.blockManager.Pause()
curHash, _, err := db.NewestSha()
again := true
if err == nil && (lastBlockHash == nil || *lastBlockHash == *curHash) {
again = false
n := wsc.server.ntfnMgr
n.RegisterSpentRequests(wsc, lookups.unspentSlice())
n.RegisterTxOutAddressRequests(wsc, cmd.Addresses)
}
close(pauseGuard)
if err != nil {
rpcsLog.Errorf("Error fetching best block "+
"hash: %v", err)
return nil, &btcjson.ErrDatabase
}
if again {
continue
}
break break
} }
@ -1819,7 +1867,7 @@ fetchRange:
minBlock += int64(i) minBlock += int64(i)
var jsonErr *btcjson.Error var jsonErr *btcjson.Error
hashList, jsonErr = recoverFromReorg(db, minBlock, hashList, jsonErr = recoverFromReorg(db, minBlock,
maxBlock, lastBlock) maxBlock, lastBlockHash)
if jsonErr != nil { if jsonErr != nil {
return nil, jsonErr return nil, jsonErr
} }
@ -1828,10 +1876,10 @@ fetchRange:
} }
goto loopHashList goto loopHashList
} }
if i == 0 && lastBlock != nil { if i == 0 && lastBlockHash != nil {
// Ensure the new hashList is on the same fork // Ensure the new hashList is on the same fork
// as the last block from the old hashList. // as the last block from the old hashList.
jsonErr := descendantBlock(lastBlock, blk) jsonErr := descendantBlock(lastBlockHash, blk)
if jsonErr != nil { if jsonErr != nil {
return nil, jsonErr return nil, jsonErr
} }
@ -1847,6 +1895,12 @@ fetchRange:
default: default:
rescanBlock(wsc, &lookups, blk) rescanBlock(wsc, &lookups, blk)
lastBlock = blk lastBlock = blk
lastBlockHash, err = blk.Sha()
if err != nil {
rpcsLog.Errorf("Unknown problem creating "+
"block sha: %v", err)
return nil, &btcjson.ErrInternal
}
} }
// Periodically notify the client of the progress // Periodically notify the client of the progress
@ -1884,14 +1938,9 @@ fetchRange:
// there is no guarantee that any of the notifications created during // there is no guarantee that any of the notifications created during
// rescan (such as rescanprogress, recvtx and redeemingtx) will be // rescan (such as rescanprogress, recvtx and redeemingtx) will be
// received before the rescan RPC returns. Therefore, another method // received before the rescan RPC returns. Therefore, another method
// is needed to safely inform clients that all rescan notifiations have // is needed to safely inform clients that all rescan notifications have
// been sent. // been sent.
blkSha, err := lastBlock.Sha() n := btcws.NewRescanFinishedNtfn(lastBlockHash.String(),
if err != nil {
rpcsLog.Errorf("Unknown problem creating block sha: %v", err)
return nil, &btcjson.ErrInternal
}
n := btcws.NewRescanFinishedNtfn(blkSha.String(),
int32(lastBlock.Height()), int32(lastBlock.Height()),
lastBlock.MsgBlock().Header.Timestamp.Unix()) lastBlock.MsgBlock().Header.Timestamp.Unix())
if mn, err := n.MarshalJSON(); err != nil { if mn, err := n.MarshalJSON(); err != nil {