diff --git a/blockdag/accept.go b/blockdag/accept.go index 2ee3fa966..20c9920ac 100644 --- a/blockdag/accept.go +++ b/blockdag/accept.go @@ -75,7 +75,7 @@ func (dag *BlockDAG) maybeAcceptBlock(block *util.Block, flags BehaviorFlags) er // Connect the passed block to the DAG. This also handles validation of the // transaction scripts. - err = dag.addBlock(newNode, parents, block, flags) + chainUpdates, err := dag.addBlock(newNode, parents, block, flags) if err != nil { return err } @@ -88,6 +88,10 @@ func (dag *BlockDAG) maybeAcceptBlock(block *util.Block, flags BehaviorFlags) er Block: block, WasUnorphaned: flags&BFWasUnorphaned != 0, }) + dag.sendNotification(NTChainChanged, &ChainChangedNotificationData{ + RemovedChainBlockHashes: chainUpdates.removedChainBlockHashes, + AddedChainBlockHashes: chainUpdates.addedChainBlockHashes, + }) dag.dagLock.Lock() return nil diff --git a/blockdag/dag.go b/blockdag/dag.go index 0b2e4cff5..6477c82f4 100644 --- a/blockdag/dag.go +++ b/blockdag/dag.go @@ -39,6 +39,13 @@ type orphanBlock struct { expiration time.Time } +// chainUpdates represents the updates made to the selected parent chain after +// a block had been added to the DAG. +type chainUpdates struct { + removedChainBlockHashes []*daghash.Hash + addedChainBlockHashes []*daghash.Hash +} + // BlockDAG provides functions for working with the bitcoin block chain. // It includes functionality such as rejecting duplicate blocks, ensuring blocks // follow all rules, orphan handling, checkpoint handling, and best chain @@ -467,17 +474,18 @@ func LockTimeToSequence(isSeconds bool, locktime uint64) uint64 { // This is useful when using checkpoints. // // This function MUST be called with the DAG state lock held (for writes). -func (dag *BlockDAG) addBlock(node *blockNode, parentNodes blockSet, block *util.Block, flags BehaviorFlags) error { +func (dag *BlockDAG) addBlock(node *blockNode, parentNodes blockSet, + block *util.Block, flags BehaviorFlags) (*chainUpdates, error) { // Skip checks if node has already been fully validated. fastAdd := flags&BFFastAdd == BFFastAdd || dag.index.NodeStatus(node).KnownValid() // Connect the block to the DAG. - err := dag.connectBlock(node, block, fastAdd) + chainUpdates, err := dag.connectBlock(node, block, fastAdd) if err != nil { if _, ok := err.(RuleError); ok { dag.index.SetStatusFlags(node, statusValidateFailed) } else { - return err + return nil, err } } else { dag.blockCount++ @@ -491,9 +499,11 @@ func (dag *BlockDAG) addBlock(node *blockNode, parentNodes blockSet, block *util log.Warnf("Error flushing block index changes to disk: %s", writeErr) } - // If dag.connectBlock returned a rule error, return it here after updating DB - return err + if err != nil { + return nil, err + } + return chainUpdates, nil } func calculateAcceptedIDMerkleRoot(txsAcceptanceData MultiBlockTxsAcceptanceData) *daghash.Hash { @@ -533,47 +543,48 @@ func (node *blockNode) validateAcceptedIDMerkleRoot(dag *BlockDAG, txsAcceptance // connectBlock handles connecting the passed node/block to the DAG. // // This function MUST be called with the DAG state lock held (for writes). -func (dag *BlockDAG) connectBlock(node *blockNode, block *util.Block, fastAdd bool) error { +func (dag *BlockDAG) connectBlock(node *blockNode, + block *util.Block, fastAdd bool) (*chainUpdates, error) { // No warnings about unknown rules or versions until the DAG is // current. if dag.isCurrent() { // Warn if any unknown new rules are either about to activate or // have already been activated. if err := dag.warnUnknownRuleActivations(node); err != nil { - return err + return nil, err } // Warn if a high enough percentage of the last blocks have // unexpected versions. if err := dag.warnUnknownVersions(node); err != nil { - return err + return nil, err } } if err := dag.checkFinalityRules(node); err != nil { - return err + return nil, err } if err := dag.validateGasLimit(block); err != nil { - return err + return nil, err } newBlockUTXO, txsAcceptanceData, newBlockFeeData, err := node.verifyAndBuildUTXO(dag, block.Transactions(), fastAdd) if err != nil { newErrString := fmt.Sprintf("error verifying UTXO for %s: %s", node, err) if err, ok := err.(RuleError); ok { - return ruleError(err.ErrorCode, newErrString) + return nil, ruleError(err.ErrorCode, newErrString) } - return errors.New(newErrString) + return nil, errors.New(newErrString) } err = node.validateCoinbaseTransaction(dag, block, txsAcceptanceData) if err != nil { - return err + return nil, err } // Apply all changes to the DAG. - virtualUTXODiff, virtualTxsAcceptanceData, err := dag.applyDAGChanges(node, block, newBlockUTXO, fastAdd) + virtualUTXODiff, virtualTxsAcceptanceData, chainUpdates, err := dag.applyDAGChanges(node, block, newBlockUTXO, fastAdd) if err != nil { // Since all validation logic has already ran, if applyDAGChanges errors out, // this means we have a problem in the internal structure of the DAG - a problem which is @@ -584,10 +595,10 @@ func (dag *BlockDAG) connectBlock(node *blockNode, block *util.Block, fastAdd bo err = dag.saveChangesFromBlock(node, block, virtualUTXODiff, txsAcceptanceData, virtualTxsAcceptanceData, newBlockFeeData) if err != nil { - return err + return nil, err } - return nil + return chainUpdates, nil } func (dag *BlockDAG) saveChangesFromBlock(node *blockNode, block *util.Block, virtualUTXODiff *UTXODiff, @@ -842,35 +853,36 @@ func (dag *BlockDAG) TxsAcceptedByVirtual() (MultiBlockTxsAcceptanceData, error) // // This function MUST be called with the DAG state lock held (for writes). func (dag *BlockDAG) applyDAGChanges(node *blockNode, block *util.Block, newBlockUTXO UTXOSet, fastAdd bool) ( - virtualUTXODiff *UTXODiff, virtualTxsAcceptanceData MultiBlockTxsAcceptanceData, err error) { + virtualUTXODiff *UTXODiff, virtualTxsAcceptanceData MultiBlockTxsAcceptanceData, + chainUpdates *chainUpdates, err error) { if err = node.updateParents(dag, newBlockUTXO); err != nil { - return nil, nil, fmt.Errorf("failed updating parents of %s: %s", node, err) + return nil, nil, nil, fmt.Errorf("failed updating parents of %s: %s", node, err) } // Update the virtual block's parents (the DAG tips) to include the new block. - dag.virtual.AddTip(node) + chainUpdates = dag.virtual.AddTip(node) // Build a UTXO set for the new virtual block newVirtualPastUTXO, virtualTxsAcceptanceData, err := dag.pastUTXO(&dag.virtual.blockNode) if err != nil { - return nil, nil, fmt.Errorf("could not restore past UTXO for virtual %s: %s", dag.virtual, err) + return nil, nil, nil, fmt.Errorf("could not restore past UTXO for virtual %s: %s", dag.virtual, err) } // Apply the new virtual's blue score to all the unaccepted UTXOs diffFromAcceptanceData, err := dag.virtual.diffFromAcceptanceData(newVirtualPastUTXO, virtualTxsAcceptanceData) if err != nil { - return nil, nil, err + return nil, nil, nil, err } newVirtualUTXO, err := newVirtualPastUTXO.WithDiff(diffFromAcceptanceData) if err != nil { - return nil, nil, err + return nil, nil, nil, err } // Apply new utxoDiffs to all the tips err = updateTipsUTXO(dag, newVirtualUTXO) if err != nil { - return nil, nil, fmt.Errorf("failed updating the tips' UTXO: %s", err) + return nil, nil, nil, fmt.Errorf("failed updating the tips' UTXO: %s", err) } // It is now safe to meld the UTXO set to base. @@ -878,7 +890,7 @@ func (dag *BlockDAG) applyDAGChanges(node *blockNode, block *util.Block, newBloc virtualUTXODiff = diffSet.UTXODiff err = dag.meldVirtualUTXO(diffSet) if err != nil { - return nil, nil, fmt.Errorf("failed melding the virtual UTXO: %s", err) + return nil, nil, nil, fmt.Errorf("failed melding the virtual UTXO: %s", err) } dag.index.SetStatusFlags(node, statusValid) @@ -886,7 +898,7 @@ func (dag *BlockDAG) applyDAGChanges(node *blockNode, block *util.Block, newBloc // And now we can update the finality point of the DAG (if required) dag.updateFinalityPoint() - return virtualUTXODiff, virtualTxsAcceptanceData, nil + return virtualUTXODiff, virtualTxsAcceptanceData, chainUpdates, nil } func (dag *BlockDAG) meldVirtualUTXO(newVirtualUTXODiffSet *DiffUTXOSet) error { diff --git a/blockdag/notifications.go b/blockdag/notifications.go index 089d0780b..801ef07f0 100644 --- a/blockdag/notifications.go +++ b/blockdag/notifications.go @@ -7,6 +7,7 @@ package blockdag import ( "fmt" "github.com/daglabs/btcd/util" + "github.com/daglabs/btcd/util/daghash" ) // NotificationType represents the type of a notification message. @@ -21,12 +22,17 @@ const ( // NTBlockAdded indicates the associated block was added into // the blockDAG. NTBlockAdded NotificationType = iota + + // NTChainChanged indicates that selected parent + // chain had changed. + NTChainChanged ) // notificationTypeStrings is a map of notification types back to their constant // names for pretty printing. var notificationTypeStrings = map[NotificationType]string{ - NTBlockAdded: "NTBlockAdded", + NTBlockAdded: "NTBlockAdded", + NTChainChanged: "NTChainChanged", } // String returns the NotificationType in human-readable form. @@ -74,3 +80,10 @@ type BlockAddedNotificationData struct { Block *util.Block WasUnorphaned bool } + +// ChainChangedNotificationData defines data to be sent along with a ChainChanged +// notification +type ChainChangedNotificationData struct { + RemovedChainBlockHashes []*daghash.Hash + AddedChainBlockHashes []*daghash.Hash +} diff --git a/blockdag/virtualblock.go b/blockdag/virtualblock.go index c852d0803..06fbff5fe 100644 --- a/blockdag/virtualblock.go +++ b/blockdag/virtualblock.go @@ -5,6 +5,7 @@ package blockdag import ( + "github.com/daglabs/btcd/util/daghash" "sync" ) @@ -53,10 +54,10 @@ func (v *virtualBlock) clone() *virtualBlock { // is up to the caller to ensure the lock is held. // // This function MUST be called with the view mutex locked (for writes). -func (v *virtualBlock) setTips(tips blockSet) { +func (v *virtualBlock) setTips(tips blockSet) *chainUpdates { oldSelectedParent := v.selectedParent v.blockNode = *newBlockNode(nil, tips, v.phantomK) - v.updateSelectedParentSet(oldSelectedParent) + return v.updateSelectedParentSet(oldSelectedParent) } // updateSelectedParentSet updates the selectedParentSet to match the @@ -67,7 +68,7 @@ func (v *virtualBlock) setTips(tips blockSet) { // parent and are not selected ancestors of the new one, and adding // blocks that are selected ancestors of the new selected parent // and aren't selected ancestors of the old one. -func (v *virtualBlock) updateSelectedParentSet(oldSelectedParent *blockNode) { +func (v *virtualBlock) updateSelectedParentSet(oldSelectedParent *blockNode) *chainUpdates { var intersectionNode *blockNode nodesToAdd := make([]*blockNode, 0) for node := v.blockNode.selectedParent; intersectionNode == nil && node != nil; node = node.selectedParent { @@ -83,10 +84,13 @@ func (v *virtualBlock) updateSelectedParentSet(oldSelectedParent *blockNode) { } // Remove the nodes in the set from the oldSelectedParent down to the intersectionNode + // Also, save the hashes of the removed blocks to removedChainBlockHashes removeCount := 0 + var removedChainBlockHashes []*daghash.Hash if intersectionNode != nil { for node := oldSelectedParent; !node.hash.IsEqual(intersectionNode.hash); node = node.selectedParent { v.selectedParentChainSet.remove(node) + removedChainBlockHashes = append(removedChainBlockHashes, node.hash) removeCount++ } } @@ -98,10 +102,18 @@ func (v *virtualBlock) updateSelectedParentSet(oldSelectedParent *blockNode) { nodesToAdd[left], nodesToAdd[right] = nodesToAdd[right], nodesToAdd[left] } // Add the nodes to the set and to the slice + // Also, save the hashes of the added blocks to addedChainBlockHashes + var addedChainBlockHashes []*daghash.Hash for _, node := range nodesToAdd { v.selectedParentChainSet.add(node) + addedChainBlockHashes = append(addedChainBlockHashes, node.hash) } v.selectedParentChainSlice = append(v.selectedParentChainSlice, nodesToAdd...) + + return &chainUpdates{ + removedChainBlockHashes: removedChainBlockHashes, + addedChainBlockHashes: addedChainBlockHashes, + } } // SetTips replaces the tips of the virtual block with the blocks in the @@ -120,14 +132,14 @@ func (v *virtualBlock) SetTips(tips blockSet) { // is up to the caller to ensure the lock is held. // // This function MUST be called with the view mutex locked (for writes). -func (v *virtualBlock) addTip(newTip *blockNode) { +func (v *virtualBlock) addTip(newTip *blockNode) *chainUpdates { updatedTips := v.tips().clone() for _, parent := range newTip.parents { updatedTips.remove(parent) } updatedTips.add(newTip) - v.setTips(updatedTips) + return v.setTips(updatedTips) } // AddTip adds the given tip to the set of tips in the virtual block. @@ -135,10 +147,10 @@ func (v *virtualBlock) addTip(newTip *blockNode) { // from the set. // // This function is safe for concurrent access. -func (v *virtualBlock) AddTip(newTip *blockNode) { +func (v *virtualBlock) AddTip(newTip *blockNode) *chainUpdates { v.mtx.Lock() - v.addTip(newTip) - v.mtx.Unlock() + defer v.mtx.Unlock() + return v.addTip(newTip) } // tips returns the current tip block nodes for the DAG. It will return diff --git a/blockdag/virtualblock_test.go b/blockdag/virtualblock_test.go index 1044191c5..ed8cf1fd3 100644 --- a/blockdag/virtualblock_test.go +++ b/blockdag/virtualblock_test.go @@ -178,3 +178,57 @@ func TestSelectedPath(t *testing.T) { }() virtual2.updateSelectedParentSet(buildNode(setFromSlice())) } + +func TestChainUpdates(t *testing.T) { + phantomK := uint32(1) + buildNode := buildNodeGenerator(phantomK, false) + genesis := buildNode(setFromSlice()) + + // Create a chain to be removed + var toBeRemovedNodes []*blockNode + toBeRemovedTip := genesis + for i := 0; i < 5; i++ { + toBeRemovedTip = buildNode(setFromSlice(toBeRemovedTip)) + toBeRemovedNodes = append(toBeRemovedNodes, toBeRemovedTip) + } + + // Create a VirtualBlock with the toBeRemoved chain + virtual := newVirtualBlock(setFromSlice(toBeRemovedNodes...), phantomK) + + // Create a chain to be added + var toBeAddedNodes []*blockNode + toBeAddedTip := genesis + for i := 0; i < 8; i++ { + toBeAddedTip = buildNode(setFromSlice(toBeAddedTip)) + toBeAddedNodes = append(toBeAddedNodes, toBeAddedTip) + } + + // Set the virtual tip to be the tip of the toBeAdded chain + chainUpdates := virtual.setTips(setFromSlice(toBeAddedTip)) + + // Make sure that the removed blocks are as expected (in reverse order) + if len(chainUpdates.removedChainBlockHashes) != len(toBeRemovedNodes) { + t.Fatalf("TestChainUpdates: wrong removed amount. "+ + "Got: %d, want: %d", len(chainUpdates.removedChainBlockHashes), len(toBeRemovedNodes)) + } + for i, removedHash := range chainUpdates.removedChainBlockHashes { + correspondingRemovedNode := toBeRemovedNodes[len(toBeRemovedNodes)-1-i] + if !removedHash.IsEqual(correspondingRemovedNode.hash) { + t.Fatalf("TestChainUpdates: wrong removed hash. "+ + "Got: %s, want: %s", removedHash, correspondingRemovedNode.hash) + } + } + + // Make sure that the added blocks are as expected (in forward order) + if len(chainUpdates.addedChainBlockHashes) != len(toBeAddedNodes) { + t.Fatalf("TestChainUpdates: wrong added amount. "+ + "Got: %d, want: %d", len(chainUpdates.removedChainBlockHashes), len(toBeAddedNodes)) + } + for i, addedHash := range chainUpdates.addedChainBlockHashes { + correspondingAddedNode := toBeAddedNodes[i] + if !addedHash.IsEqual(correspondingAddedNode.hash) { + t.Fatalf("TestChainUpdates: wrong added hash. "+ + "Got: %s, want: %s", addedHash, correspondingAddedNode.hash) + } + } +} diff --git a/btcjson/dagsvrwscmds.go b/btcjson/dagsvrwscmds.go index bdf74eed6..1081169a5 100644 --- a/btcjson/dagsvrwscmds.go +++ b/btcjson/dagsvrwscmds.go @@ -41,6 +41,24 @@ func NewStopNotifyBlocksCmd() *StopNotifyBlocksCmd { return &StopNotifyBlocksCmd{} } +// NotifyChainChangesCmd defines the notifyChainChanges JSON-RPC command. +type NotifyChainChangesCmd struct{} + +// NewNotifyChainChangesCmd returns a new instance which can be used to issue a +// notifyChainChanges JSON-RPC command. +func NewNotifyChainChangesCmd() *NotifyChainChangesCmd { + return &NotifyChainChangesCmd{} +} + +// StopNotifyChainChangesCmd defines the stopNotifyChainChanges JSON-RPC command. +type StopNotifyChainChangesCmd struct{} + +// NewStopNotifyChainChangesCmd returns a new instance which can be used to issue a +// stopNotifyChainChanges JSON-RPC command. +func NewStopNotifyChainChangesCmd() *StopNotifyChainChangesCmd { + return &StopNotifyChainChangesCmd{} +} + // NotifyNewTransactionsCmd defines the notifyNewTransactions JSON-RPC command. type NotifyNewTransactionsCmd struct { Verbose *bool `jsonrpcdefault:"false"` @@ -136,9 +154,11 @@ func init() { MustRegisterCmd("authenticate", (*AuthenticateCmd)(nil), flags) MustRegisterCmd("loadTxFilter", (*LoadTxFilterCmd)(nil), flags) MustRegisterCmd("notifyBlocks", (*NotifyBlocksCmd)(nil), flags) + MustRegisterCmd("notifyChainChanges", (*NotifyChainChangesCmd)(nil), flags) MustRegisterCmd("notifyNewTransactions", (*NotifyNewTransactionsCmd)(nil), flags) MustRegisterCmd("session", (*SessionCmd)(nil), flags) MustRegisterCmd("stopNotifyBlocks", (*StopNotifyBlocksCmd)(nil), flags) + MustRegisterCmd("stopNotifyChainChanges", (*StopNotifyChainChangesCmd)(nil), flags) MustRegisterCmd("stopNotifyNewTransactions", (*StopNotifyNewTransactionsCmd)(nil), flags) MustRegisterCmd("rescanBlocks", (*RescanBlocksCmd)(nil), flags) } diff --git a/btcjson/dagsvrwscmds_test.go b/btcjson/dagsvrwscmds_test.go index 01770ee58..e8ab5468e 100644 --- a/btcjson/dagsvrwscmds_test.go +++ b/btcjson/dagsvrwscmds_test.go @@ -63,6 +63,28 @@ func TestDAGSvrWsCmds(t *testing.T) { marshalled: `{"jsonrpc":"1.0","method":"stopNotifyBlocks","params":[],"id":1}`, unmarshalled: &btcjson.StopNotifyBlocksCmd{}, }, + { + name: "notifyChainChanges", + newCmd: func() (interface{}, error) { + return btcjson.NewCmd("notifyChainChanges") + }, + staticCmd: func() interface{} { + return btcjson.NewNotifyChainChangesCmd() + }, + marshalled: `{"jsonrpc":"1.0","method":"notifyChainChanges","params":[],"id":1}`, + unmarshalled: &btcjson.NotifyChainChangesCmd{}, + }, + { + name: "stopNotifyChainChanges", + newCmd: func() (interface{}, error) { + return btcjson.NewCmd("stopNotifyChainChanges") + }, + staticCmd: func() interface{} { + return btcjson.NewStopNotifyChainChangesCmd() + }, + marshalled: `{"jsonrpc":"1.0","method":"stopNotifyChainChanges","params":[],"id":1}`, + unmarshalled: &btcjson.StopNotifyChainChangesCmd{}, + }, { name: "notifyNewTransactions", newCmd: func() (interface{}, error) { diff --git a/btcjson/dagsvrwsntfns.go b/btcjson/dagsvrwsntfns.go index ad20c3c33..b21031099 100644 --- a/btcjson/dagsvrwsntfns.go +++ b/btcjson/dagsvrwsntfns.go @@ -8,6 +8,8 @@ package btcjson +import "github.com/daglabs/btcd/util/daghash" + const ( // FilteredBlockAddedNtfnMethod is the new method used for // notifications from the dag server that a block has been connected. @@ -47,6 +49,23 @@ func NewFilteredBlockAddedNtfn(chainHeight uint64, header string, subscribedTxs } } +// ChainChangedNtfn defines the chainChanged JSON-RPC +// notification. +type ChainChangedNtfn struct { + RemovedChainBlockHashes []daghash.Hash + AddedChainBlocks []ChainBlock +} + +// NewChainChangedNtfn returns a new instance which can be used to +// issue a chainChanged JSON-RPC notification. +func NewChainChangedNtfn(removedChainBlockHashes []daghash.Hash, + addedChainBlocks []ChainBlock) *ChainChangedNtfn { + return &ChainChangedNtfn{ + RemovedChainBlockHashes: removedChainBlockHashes, + AddedChainBlocks: addedChainBlocks, + } +} + // BlockDetails describes details of a tx in a block. type BlockDetails struct { Height uint64 `json:"height"` diff --git a/rpcclient/infrastructure.go b/rpcclient/infrastructure.go index 8f64306a7..490420bc5 100644 --- a/rpcclient/infrastructure.go +++ b/rpcclient/infrastructure.go @@ -254,6 +254,9 @@ func (c *Client) trackRegisteredNtfns(cmd interface{}) { case *btcjson.NotifyBlocksCmd: c.ntfnState.notifyBlocks = true + case *btcjson.NotifyChainChangesCmd: + c.ntfnState.notifyChainChanges = true + case *btcjson.NotifyNewTransactionsCmd: if bcmd.Verbose != nil && *bcmd.Verbose { c.ntfnState.notifyNewTxVerbose = true @@ -516,6 +519,14 @@ func (c *Client) reregisterNtfns() error { } } + // Reregister notifychainchanges if needed. + if stateCopy.notifyChainChanges { + log.Debugf("Reregistering [notifychainchanges]") + if err := c.NotifyChainChanges(); err != nil { + return err + } + } + // Reregister notifynewtransactions if needed. if stateCopy.notifyNewTx || stateCopy.notifyNewTxVerbose { log.Debugf("Reregistering [notifynewtransactions] (verbose=%t)", diff --git a/rpcclient/notify.go b/rpcclient/notify.go index f41483fb5..cb2e7ff27 100644 --- a/rpcclient/notify.go +++ b/rpcclient/notify.go @@ -33,6 +33,7 @@ var ( // reconnect. type notificationState struct { notifyBlocks bool + notifyChainChanges bool notifyNewTx bool notifyNewTxVerbose bool notifyNewTxSubnetworkID *string @@ -42,6 +43,7 @@ type notificationState struct { func (s *notificationState) Copy() *notificationState { var stateCopy notificationState stateCopy.notifyBlocks = s.notifyBlocks + stateCopy.notifyChainChanges = s.notifyChainChanges stateCopy.notifyNewTx = s.notifyNewTx stateCopy.notifyNewTxVerbose = s.notifyNewTxVerbose stateCopy.notifyNewTxSubnetworkID = s.notifyNewTxSubnetworkID @@ -89,6 +91,13 @@ type NotificationHandlers struct { OnFilteredBlockAdded func(height uint64, header *wire.BlockHeader, txs []*util.Tx) + // OnChainChanged is invoked when the selected parent chain of the + // DAG had changed. It will only be invoked if a preceding call to + // NotifyChainChanges has been made to register for the notification and the + // function is non-nil. + OnChainChanged func(removedChainBlockHashes []*daghash.Hash, + addedChainBlocks []*btcjson.ChainBlock) + // OnRelevantTxAccepted is invoked when an unmined transaction passes // the client's transaction filter. // @@ -488,6 +497,53 @@ func (c *Client) NotifyBlocks() error { return c.NotifyBlocksAsync().Receive() } +// FutureNotifyChainChangesResult is a future promise to deliver the result of a +// NotifyChainChangesAsync RPC invocation (or an applicable error). +type FutureNotifyChainChangesResult chan *response + +// Receive waits for the response promised by the future and returns an error +// if the registration was not successful. +func (r FutureNotifyChainChangesResult) Receive() error { + _, err := receiveFuture(r) + return err +} + +// NotifyChainChangesAsync returns an instance of a type that can be used to get the +// result of the RPC at some future time by invoking the Receive function on +// the returned instance. +// +// See NotifyChainChanges for the blocking version and more details. +// +// NOTE: This is a btcd extension and requires a websocket connection. +func (c *Client) NotifyChainChangesAsync() FutureNotifyBlocksResult { + // Not supported in HTTP POST mode. + if c.config.HTTPPostMode { + return newFutureError(ErrWebsocketsRequired) + } + + // Ignore the notification if the client is not interested in + // notifications. + if c.ntfnHandlers == nil { + return newNilFutureResult() + } + + cmd := btcjson.NewNotifyChainChangesCmd() + return c.sendCmd(cmd) +} + +// NotifyChainChanges registers the client to receive notifications when the +// selected parent chain changes. The notifications are delivered to the +// notification handlers associated with the client. Calling this function has +// no effect if there are no notification handlers and will result in an error +// if the client is configured to run in HTTP POST mode. +// +// The notifications delivered as a result of this call will be via OnBlockAdded +// +// NOTE: This is a btcd extension and requires a websocket connection. +func (c *Client) NotifyChainChanges() error { + return c.NotifyChainChangesAsync().Receive() +} + // newOutpointFromWire constructs the btcjson representation of a transaction // outpoint from the wire type. func newOutpointFromWire(op *wire.Outpoint) btcjson.Outpoint { diff --git a/server/rpc/rpcserver.go b/server/rpc/rpcserver.go index 9197de867..2152eec38 100644 --- a/server/rpc/rpcserver.go +++ b/server/rpc/rpcserver.go @@ -198,6 +198,7 @@ var rpcLimited = map[string]struct{}{ // Websockets commands "loadTxFilter": {}, "notifyBlocks": {}, + "notifyChainChanges": {}, "notifyNewTransactions": {}, "notifyReceived": {}, "notifySpent": {}, @@ -4416,6 +4417,16 @@ func (s *Server) handleBlockDAGNotification(notification *blockdag.Notification) // Notify registered websocket clients of incoming block. s.ntfnMgr.NotifyBlockAdded(block) + case blockdag.NTChainChanged: + data, ok := notification.Data.(*blockdag.ChainChangedNotificationData) + if !ok { + log.Warnf("Chain changed notification data is of wrong type.") + break + } + + // Notify registered websocket clients of chain changes. + s.ntfnMgr.NotifyChainChanged(data.RemovedChainBlockHashes, + data.AddedChainBlockHashes) } } diff --git a/server/rpc/rpcserverhelp.go b/server/rpc/rpcserverhelp.go index 9517900d6..a7d19496c 100644 --- a/server/rpc/rpcserverhelp.go +++ b/server/rpc/rpcserverhelp.go @@ -607,6 +607,12 @@ var helpDescsEnUS = map[string]string{ // StopNotifyBlocksCmd help. "stopNotifyBlocks--synopsis": "Cancel registered notifications for whenever a block is connected or disconnected from the main (best) chain.", + // NotifyChainChangesCmd help. + "notifyChainChanges--synopsis": "Request notifications for whenever the selected parent chain changes.", + + // StopNotifyChainChangesCmd help. + "stopNotifyChainChanges--synopsis": "Cancel registered notifications for whenever the selected parent chain changes.", + // NotifyNewTransactionsCmd help. "notifyNewTransactions--synopsis": "Send either a txaccepted or a txacceptedverbose notification when a new transaction is accepted into the mempool.", "notifyNewTransactions-verbose": "Specifies which type of notification to receive. If verbose is true, then the caller receives txacceptedverbose, otherwise the caller receives txaccepted", @@ -719,6 +725,8 @@ var rpcResultTypes = map[string][]interface{}{ "session": {(*btcjson.SessionResult)(nil)}, "notifyBlocks": nil, "stopNotifyBlocks": nil, + "notifyChainChanges": nil, + "stopNotifyChainChanges": nil, "notifyNewTransactions": nil, "stopNotifyNewTransactions": nil, "rescanBlocks": {(*[]btcjson.RescannedBlock)(nil)}, diff --git a/server/rpc/rpcwebsocket.go b/server/rpc/rpcwebsocket.go index f3584cb0e..6481f9f73 100644 --- a/server/rpc/rpcwebsocket.go +++ b/server/rpc/rpcwebsocket.go @@ -68,9 +68,11 @@ var wsHandlersBeforeInit = map[string]wsCommandHandler{ "loadTxFilter": handleLoadTxFilter, "help": handleWebsocketHelp, "notifyBlocks": handleNotifyBlocks, + "notifyChainChanges": handleNotifyChainChanges, "notifyNewTransactions": handleNotifyNewTransactions, "session": handleSession, "stopNotifyBlocks": handleStopNotifyBlocks, + "stopNotifyChainChanges": handleStopNotifyChainChanges, "stopNotifyNewTransactions": handleStopNotifyNewTransactions, "rescanBlocks": handleRescanBlocks, } @@ -210,6 +212,24 @@ func (m *wsNotificationManager) NotifyBlockAdded(block *util.Block) { } } +// NotifyChainChanged passes changes to the selected parent chain of +// the blockDAG to the notification manager for processing. +func (m *wsNotificationManager) NotifyChainChanged(removedChainBlockHashes []*daghash.Hash, + addedChainBlockHashes []*daghash.Hash) { + n := ¬ificationChainChanged{ + removedChainBlockHashes: removedChainBlockHashes, + addedChainBlocksHashes: addedChainBlockHashes, + } + // As NotifyChainChanged will be called by the DAG manager + // and the RPC server may no longer be running, use a select + // statement to unblock enqueuing the notification once the RPC + // server has begun shutting down. + select { + case m.queueNotification <- n: + case <-m.quit: + } +} + // NotifyMempoolTx passes a transaction accepted by mempool to the // notification manager for transaction notification processing. If // isNew is true, the tx is is a new transaction, rather than one @@ -382,6 +402,10 @@ func (f *wsClientFilter) removeUnspentOutpoint(op *wire.Outpoint) { // Notification types type notificationBlockAdded util.Block +type notificationChainChanged struct { + removedChainBlockHashes []*daghash.Hash + addedChainBlocksHashes []*daghash.Hash +} type notificationTxAcceptedByMempool struct { isNew bool tx *util.Tx @@ -392,6 +416,8 @@ type notificationRegisterClient wsClient type notificationUnregisterClient wsClient type notificationRegisterBlocks wsClient type notificationUnregisterBlocks wsClient +type notificationRegisterChainChanges wsClient +type notificationUnregisterChainChanges wsClient type notificationRegisterNewMempoolTxs wsClient type notificationUnregisterNewMempoolTxs wsClient @@ -409,6 +435,7 @@ func (m *wsNotificationManager) notificationHandler() { // Where possible, the quit channel is used as the unique id for a client // since it is quite a bit more efficient than using the entire struct. blockNotifications := make(map[chan struct{}]*wsClient) + chainChangeNotifications := make(map[chan struct{}]*wsClient) txNotifications := make(map[chan struct{}]*wsClient) out: @@ -428,6 +455,10 @@ out: block) } + case *notificationChainChanged: + m.notifyChainChanged(chainChangeNotifications, + n.removedChainBlockHashes, n.addedChainBlocksHashes) + case *notificationTxAcceptedByMempool: if n.isNew && len(txNotifications) != 0 { m.notifyForNewTx(txNotifications, n.tx) @@ -442,6 +473,14 @@ out: wsc := (*wsClient)(n) delete(blockNotifications, wsc.quit) + case *notificationRegisterChainChanges: + wsc := (*wsClient)(n) + chainChangeNotifications[wsc.quit] = wsc + + case *notificationUnregisterChainChanges: + wsc := (*wsClient)(n) + delete(chainChangeNotifications, wsc.quit) + case *notificationRegisterClient: wsc := (*wsClient)(n) clients[wsc.quit] = wsc @@ -451,6 +490,7 @@ out: // Remove any requests made by the client as well as // the client itself. delete(blockNotifications, wsc.quit) + delete(chainChangeNotifications, wsc.quit) delete(txNotifications, wsc.quit) delete(clients, wsc.quit) @@ -501,6 +541,51 @@ func (m *wsNotificationManager) UnregisterBlockUpdates(wsc *wsClient) { m.queueNotification <- (*notificationUnregisterBlocks)(wsc) } +// RegisterChainChanges requests chain change notifications to the passed +// websocket client. +func (m *wsNotificationManager) RegisterChainChanges(wsc *wsClient) { + m.queueNotification <- (*notificationRegisterChainChanges)(wsc) +} + +// UnregisterChainChanges removes chain change notifications for the passed +// websocket client. +func (m *wsNotificationManager) UnregisterChainChanges(wsc *wsClient) { + m.queueNotification <- (*notificationUnregisterChainChanges)(wsc) +} + +// notifyChainChanged notifies websocket clients that have registered for +// chain changes. +func (m *wsNotificationManager) notifyChainChanged(clients map[chan struct{}]*wsClient, + removedChainBlockHashes []*daghash.Hash, addedChainBlockHashes []*daghash.Hash) { + + // Collect removed chain hashes. + removedChainHashes := make([]daghash.Hash, 0, len(removedChainBlockHashes)) + for i, hash := range removedChainBlockHashes { + removedChainHashes[i] = *hash + } + + // Collect added chain blocks. + addedChainBlocks, err := collectChainBlocks(m.server, addedChainBlockHashes) + if err != nil { + log.Errorf("Failed to collect chain blocks: %s", err) + return + } + + // Create the notification. + ntfn := btcjson.NewChainChangedNtfn(removedChainHashes, addedChainBlocks) + + for _, wsc := range clients { + // Marshal and queue notification. + marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) + if err != nil { + log.Errorf("Failed to marshal chain changed "+ + "notification: %s", err) + return + } + wsc.QueueNotification(marshalledJSON) + } +} + // subscribedClients returns the set of all websocket client quit channels that // are registered to receive notifications regarding tx, either due to tx // spending a watched output or outputting to a watched address. Matching @@ -1380,6 +1465,13 @@ func handleNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) { return nil, nil } +// handleNotifyChainChanges implements the notifyChainChanges command extension for +// websocket connections. +func handleNotifyChainChanges(wsc *wsClient, icmd interface{}) (interface{}, error) { + wsc.server.ntfnMgr.RegisterChainChanges(wsc) + return nil, nil +} + // handleSession implements the session command extension for websocket // connections. func handleSession(wsc *wsClient, icmd interface{}) (interface{}, error) { @@ -1393,6 +1485,13 @@ func handleStopNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error return nil, nil } +// handleStopNotifyChainChanges implements the stopNotifyChainChanges command extension for +// websocket connections. +func handleStopNotifyChainChanges(wsc *wsClient, icmd interface{}) (interface{}, error) { + wsc.server.ntfnMgr.UnregisterChainChanges(wsc) + return nil, nil +} + // handleNotifyNewTransations implements the notifyNewTransactions command // extension for websocket connections. func handleNotifyNewTransactions(wsc *wsClient, icmd interface{}) (interface{}, error) {