[NOD-270] Implement NotifyChainUpdates api call (#368)

* [NOD-270] Added notifyChainChanges and related commands.

* [NOD-270] Added NTChainChanged to blockdag.

* [NOD-270] Implemented collection and sending of ChainChanged notifications.

* [NOD-270] Fixed an improperly named test.

* [NOD-270] Added a test: TestChainChangedNotification.

* [NOD-270] Fixed a couple copy+paste errors.

* [NOD-270] Added a couple of comments for TestChainChangedNotification.

* [NOD-270] Fixed formatting error.

* [NOD-270] Fixed a comment.

* [NOD-270] Uncoupled chain updates inside blockdag from the concept of a notification.

* [NOD-270] Removed intermediary ChainUpdates object from ChainChangedNotificationData.
This commit is contained in:
stasatdaglabs 2019-08-21 12:58:32 +03:00 committed by Svarog
parent 6bb53eaae3
commit 1a569c7bd7
13 changed files with 376 additions and 35 deletions

View File

@ -75,7 +75,7 @@ func (dag *BlockDAG) maybeAcceptBlock(block *util.Block, flags BehaviorFlags) er
// Connect the passed block to the DAG. This also handles validation of the
// transaction scripts.
err = dag.addBlock(newNode, parents, block, flags)
chainUpdates, err := dag.addBlock(newNode, parents, block, flags)
if err != nil {
return err
}
@ -88,6 +88,10 @@ func (dag *BlockDAG) maybeAcceptBlock(block *util.Block, flags BehaviorFlags) er
Block: block,
WasUnorphaned: flags&BFWasUnorphaned != 0,
})
dag.sendNotification(NTChainChanged, &ChainChangedNotificationData{
RemovedChainBlockHashes: chainUpdates.removedChainBlockHashes,
AddedChainBlockHashes: chainUpdates.addedChainBlockHashes,
})
dag.dagLock.Lock()
return nil

View File

@ -39,6 +39,13 @@ type orphanBlock struct {
expiration time.Time
}
// chainUpdates represents the updates made to the selected parent chain after
// a block had been added to the DAG.
type chainUpdates struct {
removedChainBlockHashes []*daghash.Hash
addedChainBlockHashes []*daghash.Hash
}
// BlockDAG provides functions for working with the bitcoin block chain.
// It includes functionality such as rejecting duplicate blocks, ensuring blocks
// follow all rules, orphan handling, checkpoint handling, and best chain
@ -467,17 +474,18 @@ func LockTimeToSequence(isSeconds bool, locktime uint64) uint64 {
// This is useful when using checkpoints.
//
// This function MUST be called with the DAG state lock held (for writes).
func (dag *BlockDAG) addBlock(node *blockNode, parentNodes blockSet, block *util.Block, flags BehaviorFlags) error {
func (dag *BlockDAG) addBlock(node *blockNode, parentNodes blockSet,
block *util.Block, flags BehaviorFlags) (*chainUpdates, error) {
// Skip checks if node has already been fully validated.
fastAdd := flags&BFFastAdd == BFFastAdd || dag.index.NodeStatus(node).KnownValid()
// Connect the block to the DAG.
err := dag.connectBlock(node, block, fastAdd)
chainUpdates, err := dag.connectBlock(node, block, fastAdd)
if err != nil {
if _, ok := err.(RuleError); ok {
dag.index.SetStatusFlags(node, statusValidateFailed)
} else {
return err
return nil, err
}
} else {
dag.blockCount++
@ -491,9 +499,11 @@ func (dag *BlockDAG) addBlock(node *blockNode, parentNodes blockSet, block *util
log.Warnf("Error flushing block index changes to disk: %s",
writeErr)
}
// If dag.connectBlock returned a rule error, return it here after updating DB
return err
if err != nil {
return nil, err
}
return chainUpdates, nil
}
func calculateAcceptedIDMerkleRoot(txsAcceptanceData MultiBlockTxsAcceptanceData) *daghash.Hash {
@ -533,47 +543,48 @@ func (node *blockNode) validateAcceptedIDMerkleRoot(dag *BlockDAG, txsAcceptance
// connectBlock handles connecting the passed node/block to the DAG.
//
// This function MUST be called with the DAG state lock held (for writes).
func (dag *BlockDAG) connectBlock(node *blockNode, block *util.Block, fastAdd bool) error {
func (dag *BlockDAG) connectBlock(node *blockNode,
block *util.Block, fastAdd bool) (*chainUpdates, error) {
// No warnings about unknown rules or versions until the DAG is
// current.
if dag.isCurrent() {
// Warn if any unknown new rules are either about to activate or
// have already been activated.
if err := dag.warnUnknownRuleActivations(node); err != nil {
return err
return nil, err
}
// Warn if a high enough percentage of the last blocks have
// unexpected versions.
if err := dag.warnUnknownVersions(node); err != nil {
return err
return nil, err
}
}
if err := dag.checkFinalityRules(node); err != nil {
return err
return nil, err
}
if err := dag.validateGasLimit(block); err != nil {
return err
return nil, err
}
newBlockUTXO, txsAcceptanceData, newBlockFeeData, err := node.verifyAndBuildUTXO(dag, block.Transactions(), fastAdd)
if err != nil {
newErrString := fmt.Sprintf("error verifying UTXO for %s: %s", node, err)
if err, ok := err.(RuleError); ok {
return ruleError(err.ErrorCode, newErrString)
return nil, ruleError(err.ErrorCode, newErrString)
}
return errors.New(newErrString)
return nil, errors.New(newErrString)
}
err = node.validateCoinbaseTransaction(dag, block, txsAcceptanceData)
if err != nil {
return err
return nil, err
}
// Apply all changes to the DAG.
virtualUTXODiff, virtualTxsAcceptanceData, err := dag.applyDAGChanges(node, block, newBlockUTXO, fastAdd)
virtualUTXODiff, virtualTxsAcceptanceData, chainUpdates, err := dag.applyDAGChanges(node, block, newBlockUTXO, fastAdd)
if err != nil {
// Since all validation logic has already ran, if applyDAGChanges errors out,
// this means we have a problem in the internal structure of the DAG - a problem which is
@ -584,10 +595,10 @@ func (dag *BlockDAG) connectBlock(node *blockNode, block *util.Block, fastAdd bo
err = dag.saveChangesFromBlock(node, block, virtualUTXODiff, txsAcceptanceData, virtualTxsAcceptanceData, newBlockFeeData)
if err != nil {
return err
return nil, err
}
return nil
return chainUpdates, nil
}
func (dag *BlockDAG) saveChangesFromBlock(node *blockNode, block *util.Block, virtualUTXODiff *UTXODiff,
@ -842,35 +853,36 @@ func (dag *BlockDAG) TxsAcceptedByVirtual() (MultiBlockTxsAcceptanceData, error)
//
// This function MUST be called with the DAG state lock held (for writes).
func (dag *BlockDAG) applyDAGChanges(node *blockNode, block *util.Block, newBlockUTXO UTXOSet, fastAdd bool) (
virtualUTXODiff *UTXODiff, virtualTxsAcceptanceData MultiBlockTxsAcceptanceData, err error) {
virtualUTXODiff *UTXODiff, virtualTxsAcceptanceData MultiBlockTxsAcceptanceData,
chainUpdates *chainUpdates, err error) {
if err = node.updateParents(dag, newBlockUTXO); err != nil {
return nil, nil, fmt.Errorf("failed updating parents of %s: %s", node, err)
return nil, nil, nil, fmt.Errorf("failed updating parents of %s: %s", node, err)
}
// Update the virtual block's parents (the DAG tips) to include the new block.
dag.virtual.AddTip(node)
chainUpdates = dag.virtual.AddTip(node)
// Build a UTXO set for the new virtual block
newVirtualPastUTXO, virtualTxsAcceptanceData, err := dag.pastUTXO(&dag.virtual.blockNode)
if err != nil {
return nil, nil, fmt.Errorf("could not restore past UTXO for virtual %s: %s", dag.virtual, err)
return nil, nil, nil, fmt.Errorf("could not restore past UTXO for virtual %s: %s", dag.virtual, err)
}
// Apply the new virtual's blue score to all the unaccepted UTXOs
diffFromAcceptanceData, err := dag.virtual.diffFromAcceptanceData(newVirtualPastUTXO, virtualTxsAcceptanceData)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
newVirtualUTXO, err := newVirtualPastUTXO.WithDiff(diffFromAcceptanceData)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
// Apply new utxoDiffs to all the tips
err = updateTipsUTXO(dag, newVirtualUTXO)
if err != nil {
return nil, nil, fmt.Errorf("failed updating the tips' UTXO: %s", err)
return nil, nil, nil, fmt.Errorf("failed updating the tips' UTXO: %s", err)
}
// It is now safe to meld the UTXO set to base.
@ -878,7 +890,7 @@ func (dag *BlockDAG) applyDAGChanges(node *blockNode, block *util.Block, newBloc
virtualUTXODiff = diffSet.UTXODiff
err = dag.meldVirtualUTXO(diffSet)
if err != nil {
return nil, nil, fmt.Errorf("failed melding the virtual UTXO: %s", err)
return nil, nil, nil, fmt.Errorf("failed melding the virtual UTXO: %s", err)
}
dag.index.SetStatusFlags(node, statusValid)
@ -886,7 +898,7 @@ func (dag *BlockDAG) applyDAGChanges(node *blockNode, block *util.Block, newBloc
// And now we can update the finality point of the DAG (if required)
dag.updateFinalityPoint()
return virtualUTXODiff, virtualTxsAcceptanceData, nil
return virtualUTXODiff, virtualTxsAcceptanceData, chainUpdates, nil
}
func (dag *BlockDAG) meldVirtualUTXO(newVirtualUTXODiffSet *DiffUTXOSet) error {

View File

@ -7,6 +7,7 @@ package blockdag
import (
"fmt"
"github.com/daglabs/btcd/util"
"github.com/daglabs/btcd/util/daghash"
)
// NotificationType represents the type of a notification message.
@ -21,12 +22,17 @@ const (
// NTBlockAdded indicates the associated block was added into
// the blockDAG.
NTBlockAdded NotificationType = iota
// NTChainChanged indicates that selected parent
// chain had changed.
NTChainChanged
)
// notificationTypeStrings is a map of notification types back to their constant
// names for pretty printing.
var notificationTypeStrings = map[NotificationType]string{
NTBlockAdded: "NTBlockAdded",
NTBlockAdded: "NTBlockAdded",
NTChainChanged: "NTChainChanged",
}
// String returns the NotificationType in human-readable form.
@ -74,3 +80,10 @@ type BlockAddedNotificationData struct {
Block *util.Block
WasUnorphaned bool
}
// ChainChangedNotificationData defines data to be sent along with a ChainChanged
// notification
type ChainChangedNotificationData struct {
RemovedChainBlockHashes []*daghash.Hash
AddedChainBlockHashes []*daghash.Hash
}

View File

@ -5,6 +5,7 @@
package blockdag
import (
"github.com/daglabs/btcd/util/daghash"
"sync"
)
@ -53,10 +54,10 @@ func (v *virtualBlock) clone() *virtualBlock {
// is up to the caller to ensure the lock is held.
//
// This function MUST be called with the view mutex locked (for writes).
func (v *virtualBlock) setTips(tips blockSet) {
func (v *virtualBlock) setTips(tips blockSet) *chainUpdates {
oldSelectedParent := v.selectedParent
v.blockNode = *newBlockNode(nil, tips, v.phantomK)
v.updateSelectedParentSet(oldSelectedParent)
return v.updateSelectedParentSet(oldSelectedParent)
}
// updateSelectedParentSet updates the selectedParentSet to match the
@ -67,7 +68,7 @@ func (v *virtualBlock) setTips(tips blockSet) {
// parent and are not selected ancestors of the new one, and adding
// blocks that are selected ancestors of the new selected parent
// and aren't selected ancestors of the old one.
func (v *virtualBlock) updateSelectedParentSet(oldSelectedParent *blockNode) {
func (v *virtualBlock) updateSelectedParentSet(oldSelectedParent *blockNode) *chainUpdates {
var intersectionNode *blockNode
nodesToAdd := make([]*blockNode, 0)
for node := v.blockNode.selectedParent; intersectionNode == nil && node != nil; node = node.selectedParent {
@ -83,10 +84,13 @@ func (v *virtualBlock) updateSelectedParentSet(oldSelectedParent *blockNode) {
}
// Remove the nodes in the set from the oldSelectedParent down to the intersectionNode
// Also, save the hashes of the removed blocks to removedChainBlockHashes
removeCount := 0
var removedChainBlockHashes []*daghash.Hash
if intersectionNode != nil {
for node := oldSelectedParent; !node.hash.IsEqual(intersectionNode.hash); node = node.selectedParent {
v.selectedParentChainSet.remove(node)
removedChainBlockHashes = append(removedChainBlockHashes, node.hash)
removeCount++
}
}
@ -98,10 +102,18 @@ func (v *virtualBlock) updateSelectedParentSet(oldSelectedParent *blockNode) {
nodesToAdd[left], nodesToAdd[right] = nodesToAdd[right], nodesToAdd[left]
}
// Add the nodes to the set and to the slice
// Also, save the hashes of the added blocks to addedChainBlockHashes
var addedChainBlockHashes []*daghash.Hash
for _, node := range nodesToAdd {
v.selectedParentChainSet.add(node)
addedChainBlockHashes = append(addedChainBlockHashes, node.hash)
}
v.selectedParentChainSlice = append(v.selectedParentChainSlice, nodesToAdd...)
return &chainUpdates{
removedChainBlockHashes: removedChainBlockHashes,
addedChainBlockHashes: addedChainBlockHashes,
}
}
// SetTips replaces the tips of the virtual block with the blocks in the
@ -120,14 +132,14 @@ func (v *virtualBlock) SetTips(tips blockSet) {
// is up to the caller to ensure the lock is held.
//
// This function MUST be called with the view mutex locked (for writes).
func (v *virtualBlock) addTip(newTip *blockNode) {
func (v *virtualBlock) addTip(newTip *blockNode) *chainUpdates {
updatedTips := v.tips().clone()
for _, parent := range newTip.parents {
updatedTips.remove(parent)
}
updatedTips.add(newTip)
v.setTips(updatedTips)
return v.setTips(updatedTips)
}
// AddTip adds the given tip to the set of tips in the virtual block.
@ -135,10 +147,10 @@ func (v *virtualBlock) addTip(newTip *blockNode) {
// from the set.
//
// This function is safe for concurrent access.
func (v *virtualBlock) AddTip(newTip *blockNode) {
func (v *virtualBlock) AddTip(newTip *blockNode) *chainUpdates {
v.mtx.Lock()
v.addTip(newTip)
v.mtx.Unlock()
defer v.mtx.Unlock()
return v.addTip(newTip)
}
// tips returns the current tip block nodes for the DAG. It will return

View File

@ -178,3 +178,57 @@ func TestSelectedPath(t *testing.T) {
}()
virtual2.updateSelectedParentSet(buildNode(setFromSlice()))
}
func TestChainUpdates(t *testing.T) {
phantomK := uint32(1)
buildNode := buildNodeGenerator(phantomK, false)
genesis := buildNode(setFromSlice())
// Create a chain to be removed
var toBeRemovedNodes []*blockNode
toBeRemovedTip := genesis
for i := 0; i < 5; i++ {
toBeRemovedTip = buildNode(setFromSlice(toBeRemovedTip))
toBeRemovedNodes = append(toBeRemovedNodes, toBeRemovedTip)
}
// Create a VirtualBlock with the toBeRemoved chain
virtual := newVirtualBlock(setFromSlice(toBeRemovedNodes...), phantomK)
// Create a chain to be added
var toBeAddedNodes []*blockNode
toBeAddedTip := genesis
for i := 0; i < 8; i++ {
toBeAddedTip = buildNode(setFromSlice(toBeAddedTip))
toBeAddedNodes = append(toBeAddedNodes, toBeAddedTip)
}
// Set the virtual tip to be the tip of the toBeAdded chain
chainUpdates := virtual.setTips(setFromSlice(toBeAddedTip))
// Make sure that the removed blocks are as expected (in reverse order)
if len(chainUpdates.removedChainBlockHashes) != len(toBeRemovedNodes) {
t.Fatalf("TestChainUpdates: wrong removed amount. "+
"Got: %d, want: %d", len(chainUpdates.removedChainBlockHashes), len(toBeRemovedNodes))
}
for i, removedHash := range chainUpdates.removedChainBlockHashes {
correspondingRemovedNode := toBeRemovedNodes[len(toBeRemovedNodes)-1-i]
if !removedHash.IsEqual(correspondingRemovedNode.hash) {
t.Fatalf("TestChainUpdates: wrong removed hash. "+
"Got: %s, want: %s", removedHash, correspondingRemovedNode.hash)
}
}
// Make sure that the added blocks are as expected (in forward order)
if len(chainUpdates.addedChainBlockHashes) != len(toBeAddedNodes) {
t.Fatalf("TestChainUpdates: wrong added amount. "+
"Got: %d, want: %d", len(chainUpdates.removedChainBlockHashes), len(toBeAddedNodes))
}
for i, addedHash := range chainUpdates.addedChainBlockHashes {
correspondingAddedNode := toBeAddedNodes[i]
if !addedHash.IsEqual(correspondingAddedNode.hash) {
t.Fatalf("TestChainUpdates: wrong added hash. "+
"Got: %s, want: %s", addedHash, correspondingAddedNode.hash)
}
}
}

View File

@ -41,6 +41,24 @@ func NewStopNotifyBlocksCmd() *StopNotifyBlocksCmd {
return &StopNotifyBlocksCmd{}
}
// NotifyChainChangesCmd defines the notifyChainChanges JSON-RPC command.
type NotifyChainChangesCmd struct{}
// NewNotifyChainChangesCmd returns a new instance which can be used to issue a
// notifyChainChanges JSON-RPC command.
func NewNotifyChainChangesCmd() *NotifyChainChangesCmd {
return &NotifyChainChangesCmd{}
}
// StopNotifyChainChangesCmd defines the stopNotifyChainChanges JSON-RPC command.
type StopNotifyChainChangesCmd struct{}
// NewStopNotifyChainChangesCmd returns a new instance which can be used to issue a
// stopNotifyChainChanges JSON-RPC command.
func NewStopNotifyChainChangesCmd() *StopNotifyChainChangesCmd {
return &StopNotifyChainChangesCmd{}
}
// NotifyNewTransactionsCmd defines the notifyNewTransactions JSON-RPC command.
type NotifyNewTransactionsCmd struct {
Verbose *bool `jsonrpcdefault:"false"`
@ -136,9 +154,11 @@ func init() {
MustRegisterCmd("authenticate", (*AuthenticateCmd)(nil), flags)
MustRegisterCmd("loadTxFilter", (*LoadTxFilterCmd)(nil), flags)
MustRegisterCmd("notifyBlocks", (*NotifyBlocksCmd)(nil), flags)
MustRegisterCmd("notifyChainChanges", (*NotifyChainChangesCmd)(nil), flags)
MustRegisterCmd("notifyNewTransactions", (*NotifyNewTransactionsCmd)(nil), flags)
MustRegisterCmd("session", (*SessionCmd)(nil), flags)
MustRegisterCmd("stopNotifyBlocks", (*StopNotifyBlocksCmd)(nil), flags)
MustRegisterCmd("stopNotifyChainChanges", (*StopNotifyChainChangesCmd)(nil), flags)
MustRegisterCmd("stopNotifyNewTransactions", (*StopNotifyNewTransactionsCmd)(nil), flags)
MustRegisterCmd("rescanBlocks", (*RescanBlocksCmd)(nil), flags)
}

View File

@ -63,6 +63,28 @@ func TestDAGSvrWsCmds(t *testing.T) {
marshalled: `{"jsonrpc":"1.0","method":"stopNotifyBlocks","params":[],"id":1}`,
unmarshalled: &btcjson.StopNotifyBlocksCmd{},
},
{
name: "notifyChainChanges",
newCmd: func() (interface{}, error) {
return btcjson.NewCmd("notifyChainChanges")
},
staticCmd: func() interface{} {
return btcjson.NewNotifyChainChangesCmd()
},
marshalled: `{"jsonrpc":"1.0","method":"notifyChainChanges","params":[],"id":1}`,
unmarshalled: &btcjson.NotifyChainChangesCmd{},
},
{
name: "stopNotifyChainChanges",
newCmd: func() (interface{}, error) {
return btcjson.NewCmd("stopNotifyChainChanges")
},
staticCmd: func() interface{} {
return btcjson.NewStopNotifyChainChangesCmd()
},
marshalled: `{"jsonrpc":"1.0","method":"stopNotifyChainChanges","params":[],"id":1}`,
unmarshalled: &btcjson.StopNotifyChainChangesCmd{},
},
{
name: "notifyNewTransactions",
newCmd: func() (interface{}, error) {

View File

@ -8,6 +8,8 @@
package btcjson
import "github.com/daglabs/btcd/util/daghash"
const (
// FilteredBlockAddedNtfnMethod is the new method used for
// notifications from the dag server that a block has been connected.
@ -47,6 +49,23 @@ func NewFilteredBlockAddedNtfn(chainHeight uint64, header string, subscribedTxs
}
}
// ChainChangedNtfn defines the chainChanged JSON-RPC
// notification.
type ChainChangedNtfn struct {
RemovedChainBlockHashes []daghash.Hash
AddedChainBlocks []ChainBlock
}
// NewChainChangedNtfn returns a new instance which can be used to
// issue a chainChanged JSON-RPC notification.
func NewChainChangedNtfn(removedChainBlockHashes []daghash.Hash,
addedChainBlocks []ChainBlock) *ChainChangedNtfn {
return &ChainChangedNtfn{
RemovedChainBlockHashes: removedChainBlockHashes,
AddedChainBlocks: addedChainBlocks,
}
}
// BlockDetails describes details of a tx in a block.
type BlockDetails struct {
Height uint64 `json:"height"`

View File

@ -254,6 +254,9 @@ func (c *Client) trackRegisteredNtfns(cmd interface{}) {
case *btcjson.NotifyBlocksCmd:
c.ntfnState.notifyBlocks = true
case *btcjson.NotifyChainChangesCmd:
c.ntfnState.notifyChainChanges = true
case *btcjson.NotifyNewTransactionsCmd:
if bcmd.Verbose != nil && *bcmd.Verbose {
c.ntfnState.notifyNewTxVerbose = true
@ -516,6 +519,14 @@ func (c *Client) reregisterNtfns() error {
}
}
// Reregister notifychainchanges if needed.
if stateCopy.notifyChainChanges {
log.Debugf("Reregistering [notifychainchanges]")
if err := c.NotifyChainChanges(); err != nil {
return err
}
}
// Reregister notifynewtransactions if needed.
if stateCopy.notifyNewTx || stateCopy.notifyNewTxVerbose {
log.Debugf("Reregistering [notifynewtransactions] (verbose=%t)",

View File

@ -33,6 +33,7 @@ var (
// reconnect.
type notificationState struct {
notifyBlocks bool
notifyChainChanges bool
notifyNewTx bool
notifyNewTxVerbose bool
notifyNewTxSubnetworkID *string
@ -42,6 +43,7 @@ type notificationState struct {
func (s *notificationState) Copy() *notificationState {
var stateCopy notificationState
stateCopy.notifyBlocks = s.notifyBlocks
stateCopy.notifyChainChanges = s.notifyChainChanges
stateCopy.notifyNewTx = s.notifyNewTx
stateCopy.notifyNewTxVerbose = s.notifyNewTxVerbose
stateCopy.notifyNewTxSubnetworkID = s.notifyNewTxSubnetworkID
@ -89,6 +91,13 @@ type NotificationHandlers struct {
OnFilteredBlockAdded func(height uint64, header *wire.BlockHeader,
txs []*util.Tx)
// OnChainChanged is invoked when the selected parent chain of the
// DAG had changed. It will only be invoked if a preceding call to
// NotifyChainChanges has been made to register for the notification and the
// function is non-nil.
OnChainChanged func(removedChainBlockHashes []*daghash.Hash,
addedChainBlocks []*btcjson.ChainBlock)
// OnRelevantTxAccepted is invoked when an unmined transaction passes
// the client's transaction filter.
//
@ -488,6 +497,53 @@ func (c *Client) NotifyBlocks() error {
return c.NotifyBlocksAsync().Receive()
}
// FutureNotifyChainChangesResult is a future promise to deliver the result of a
// NotifyChainChangesAsync RPC invocation (or an applicable error).
type FutureNotifyChainChangesResult chan *response
// Receive waits for the response promised by the future and returns an error
// if the registration was not successful.
func (r FutureNotifyChainChangesResult) Receive() error {
_, err := receiveFuture(r)
return err
}
// NotifyChainChangesAsync returns an instance of a type that can be used to get the
// result of the RPC at some future time by invoking the Receive function on
// the returned instance.
//
// See NotifyChainChanges for the blocking version and more details.
//
// NOTE: This is a btcd extension and requires a websocket connection.
func (c *Client) NotifyChainChangesAsync() FutureNotifyBlocksResult {
// Not supported in HTTP POST mode.
if c.config.HTTPPostMode {
return newFutureError(ErrWebsocketsRequired)
}
// Ignore the notification if the client is not interested in
// notifications.
if c.ntfnHandlers == nil {
return newNilFutureResult()
}
cmd := btcjson.NewNotifyChainChangesCmd()
return c.sendCmd(cmd)
}
// NotifyChainChanges registers the client to receive notifications when the
// selected parent chain changes. The notifications are delivered to the
// notification handlers associated with the client. Calling this function has
// no effect if there are no notification handlers and will result in an error
// if the client is configured to run in HTTP POST mode.
//
// The notifications delivered as a result of this call will be via OnBlockAdded
//
// NOTE: This is a btcd extension and requires a websocket connection.
func (c *Client) NotifyChainChanges() error {
return c.NotifyChainChangesAsync().Receive()
}
// newOutpointFromWire constructs the btcjson representation of a transaction
// outpoint from the wire type.
func newOutpointFromWire(op *wire.Outpoint) btcjson.Outpoint {

View File

@ -198,6 +198,7 @@ var rpcLimited = map[string]struct{}{
// Websockets commands
"loadTxFilter": {},
"notifyBlocks": {},
"notifyChainChanges": {},
"notifyNewTransactions": {},
"notifyReceived": {},
"notifySpent": {},
@ -4416,6 +4417,16 @@ func (s *Server) handleBlockDAGNotification(notification *blockdag.Notification)
// Notify registered websocket clients of incoming block.
s.ntfnMgr.NotifyBlockAdded(block)
case blockdag.NTChainChanged:
data, ok := notification.Data.(*blockdag.ChainChangedNotificationData)
if !ok {
log.Warnf("Chain changed notification data is of wrong type.")
break
}
// Notify registered websocket clients of chain changes.
s.ntfnMgr.NotifyChainChanged(data.RemovedChainBlockHashes,
data.AddedChainBlockHashes)
}
}

View File

@ -607,6 +607,12 @@ var helpDescsEnUS = map[string]string{
// StopNotifyBlocksCmd help.
"stopNotifyBlocks--synopsis": "Cancel registered notifications for whenever a block is connected or disconnected from the main (best) chain.",
// NotifyChainChangesCmd help.
"notifyChainChanges--synopsis": "Request notifications for whenever the selected parent chain changes.",
// StopNotifyChainChangesCmd help.
"stopNotifyChainChanges--synopsis": "Cancel registered notifications for whenever the selected parent chain changes.",
// NotifyNewTransactionsCmd help.
"notifyNewTransactions--synopsis": "Send either a txaccepted or a txacceptedverbose notification when a new transaction is accepted into the mempool.",
"notifyNewTransactions-verbose": "Specifies which type of notification to receive. If verbose is true, then the caller receives txacceptedverbose, otherwise the caller receives txaccepted",
@ -719,6 +725,8 @@ var rpcResultTypes = map[string][]interface{}{
"session": {(*btcjson.SessionResult)(nil)},
"notifyBlocks": nil,
"stopNotifyBlocks": nil,
"notifyChainChanges": nil,
"stopNotifyChainChanges": nil,
"notifyNewTransactions": nil,
"stopNotifyNewTransactions": nil,
"rescanBlocks": {(*[]btcjson.RescannedBlock)(nil)},

View File

@ -68,9 +68,11 @@ var wsHandlersBeforeInit = map[string]wsCommandHandler{
"loadTxFilter": handleLoadTxFilter,
"help": handleWebsocketHelp,
"notifyBlocks": handleNotifyBlocks,
"notifyChainChanges": handleNotifyChainChanges,
"notifyNewTransactions": handleNotifyNewTransactions,
"session": handleSession,
"stopNotifyBlocks": handleStopNotifyBlocks,
"stopNotifyChainChanges": handleStopNotifyChainChanges,
"stopNotifyNewTransactions": handleStopNotifyNewTransactions,
"rescanBlocks": handleRescanBlocks,
}
@ -210,6 +212,24 @@ func (m *wsNotificationManager) NotifyBlockAdded(block *util.Block) {
}
}
// NotifyChainChanged passes changes to the selected parent chain of
// the blockDAG to the notification manager for processing.
func (m *wsNotificationManager) NotifyChainChanged(removedChainBlockHashes []*daghash.Hash,
addedChainBlockHashes []*daghash.Hash) {
n := &notificationChainChanged{
removedChainBlockHashes: removedChainBlockHashes,
addedChainBlocksHashes: addedChainBlockHashes,
}
// As NotifyChainChanged will be called by the DAG manager
// and the RPC server may no longer be running, use a select
// statement to unblock enqueuing the notification once the RPC
// server has begun shutting down.
select {
case m.queueNotification <- n:
case <-m.quit:
}
}
// NotifyMempoolTx passes a transaction accepted by mempool to the
// notification manager for transaction notification processing. If
// isNew is true, the tx is is a new transaction, rather than one
@ -382,6 +402,10 @@ func (f *wsClientFilter) removeUnspentOutpoint(op *wire.Outpoint) {
// Notification types
type notificationBlockAdded util.Block
type notificationChainChanged struct {
removedChainBlockHashes []*daghash.Hash
addedChainBlocksHashes []*daghash.Hash
}
type notificationTxAcceptedByMempool struct {
isNew bool
tx *util.Tx
@ -392,6 +416,8 @@ type notificationRegisterClient wsClient
type notificationUnregisterClient wsClient
type notificationRegisterBlocks wsClient
type notificationUnregisterBlocks wsClient
type notificationRegisterChainChanges wsClient
type notificationUnregisterChainChanges wsClient
type notificationRegisterNewMempoolTxs wsClient
type notificationUnregisterNewMempoolTxs wsClient
@ -409,6 +435,7 @@ func (m *wsNotificationManager) notificationHandler() {
// Where possible, the quit channel is used as the unique id for a client
// since it is quite a bit more efficient than using the entire struct.
blockNotifications := make(map[chan struct{}]*wsClient)
chainChangeNotifications := make(map[chan struct{}]*wsClient)
txNotifications := make(map[chan struct{}]*wsClient)
out:
@ -428,6 +455,10 @@ out:
block)
}
case *notificationChainChanged:
m.notifyChainChanged(chainChangeNotifications,
n.removedChainBlockHashes, n.addedChainBlocksHashes)
case *notificationTxAcceptedByMempool:
if n.isNew && len(txNotifications) != 0 {
m.notifyForNewTx(txNotifications, n.tx)
@ -442,6 +473,14 @@ out:
wsc := (*wsClient)(n)
delete(blockNotifications, wsc.quit)
case *notificationRegisterChainChanges:
wsc := (*wsClient)(n)
chainChangeNotifications[wsc.quit] = wsc
case *notificationUnregisterChainChanges:
wsc := (*wsClient)(n)
delete(chainChangeNotifications, wsc.quit)
case *notificationRegisterClient:
wsc := (*wsClient)(n)
clients[wsc.quit] = wsc
@ -451,6 +490,7 @@ out:
// Remove any requests made by the client as well as
// the client itself.
delete(blockNotifications, wsc.quit)
delete(chainChangeNotifications, wsc.quit)
delete(txNotifications, wsc.quit)
delete(clients, wsc.quit)
@ -501,6 +541,51 @@ func (m *wsNotificationManager) UnregisterBlockUpdates(wsc *wsClient) {
m.queueNotification <- (*notificationUnregisterBlocks)(wsc)
}
// RegisterChainChanges requests chain change notifications to the passed
// websocket client.
func (m *wsNotificationManager) RegisterChainChanges(wsc *wsClient) {
m.queueNotification <- (*notificationRegisterChainChanges)(wsc)
}
// UnregisterChainChanges removes chain change notifications for the passed
// websocket client.
func (m *wsNotificationManager) UnregisterChainChanges(wsc *wsClient) {
m.queueNotification <- (*notificationUnregisterChainChanges)(wsc)
}
// notifyChainChanged notifies websocket clients that have registered for
// chain changes.
func (m *wsNotificationManager) notifyChainChanged(clients map[chan struct{}]*wsClient,
removedChainBlockHashes []*daghash.Hash, addedChainBlockHashes []*daghash.Hash) {
// Collect removed chain hashes.
removedChainHashes := make([]daghash.Hash, 0, len(removedChainBlockHashes))
for i, hash := range removedChainBlockHashes {
removedChainHashes[i] = *hash
}
// Collect added chain blocks.
addedChainBlocks, err := collectChainBlocks(m.server, addedChainBlockHashes)
if err != nil {
log.Errorf("Failed to collect chain blocks: %s", err)
return
}
// Create the notification.
ntfn := btcjson.NewChainChangedNtfn(removedChainHashes, addedChainBlocks)
for _, wsc := range clients {
// Marshal and queue notification.
marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn)
if err != nil {
log.Errorf("Failed to marshal chain changed "+
"notification: %s", err)
return
}
wsc.QueueNotification(marshalledJSON)
}
}
// subscribedClients returns the set of all websocket client quit channels that
// are registered to receive notifications regarding tx, either due to tx
// spending a watched output or outputting to a watched address. Matching
@ -1380,6 +1465,13 @@ func handleNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) {
return nil, nil
}
// handleNotifyChainChanges implements the notifyChainChanges command extension for
// websocket connections.
func handleNotifyChainChanges(wsc *wsClient, icmd interface{}) (interface{}, error) {
wsc.server.ntfnMgr.RegisterChainChanges(wsc)
return nil, nil
}
// handleSession implements the session command extension for websocket
// connections.
func handleSession(wsc *wsClient, icmd interface{}) (interface{}, error) {
@ -1393,6 +1485,13 @@ func handleStopNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error
return nil, nil
}
// handleStopNotifyChainChanges implements the stopNotifyChainChanges command extension for
// websocket connections.
func handleStopNotifyChainChanges(wsc *wsClient, icmd interface{}) (interface{}, error) {
wsc.server.ntfnMgr.UnregisterChainChanges(wsc)
return nil, nil
}
// handleNotifyNewTransations implements the notifyNewTransactions command
// extension for websocket connections.
func handleNotifyNewTransactions(wsc *wsClient, icmd interface{}) (interface{}, error) {