From e93e60aa7452751a240cd69aa1cdb0b57f046329 Mon Sep 17 00:00:00 2001 From: Svarog Date: Wed, 20 Mar 2019 13:48:32 +0200 Subject: [PATCH] [NOD-63] Merge BlockAccepted and BlockConnected notifications into BlockAdded + remove BlockDisconnected notifications (#221) * [NOD-63] Merge BlockAccepted and BlockConnected notifications into BlockAdded + remove BlockDisconnected notifications * [NOD-63] Many instances of chain->DAG and similar * [NOD-63] Some more chian -> DAG --- blockdag/accept.go | 2 +- blockdag/dag.go | 9 -- blockdag/notifications.go | 18 +-- blockdag/notifications_test.go | 2 +- btcjson/dagsvrwsntfns.go | 105 ++++------------ btcjson/dagsvrwsntfns_test.go | 49 ++------ integration/rpctest/memwallet.go | 106 +++++----------- integration/rpctest/rpc_harness.go | 17 +-- netsync/manager.go | 19 +-- rpcclient/examples/btcdwebsockets/main.go | 10 +- rpcclient/notify.go | 147 +++++----------------- server/rpc/rpcserver.go | 27 ++-- server/rpc/rpcwebsocket.go | 124 ++++-------------- 13 files changed, 155 insertions(+), 480 deletions(-) diff --git a/blockdag/accept.go b/blockdag/accept.go index f790d0350..789c174b1 100644 --- a/blockdag/accept.go +++ b/blockdag/accept.go @@ -88,7 +88,7 @@ func (dag *BlockDAG) maybeAcceptBlock(block *util.Block, flags BehaviorFlags) er // DAG. The caller would typically want to react by relaying the // inventory to other peers. dag.dagLock.Unlock() - dag.sendNotification(NTBlockAccepted, block) + dag.sendNotification(NTBlockAdded, block) dag.dagLock.Lock() isOk = true diff --git a/blockdag/dag.go b/blockdag/dag.go index 7f3bd65ae..79efb9eaa 100644 --- a/blockdag/dag.go +++ b/blockdag/dag.go @@ -474,15 +474,6 @@ func (dag *BlockDAG) addBlock(node *blockNode, parentNodes blockSet, block *util dag.blockCount++ } - if err != nil { - // Notify the caller that the block was connected to the DAG. - // The caller would typically want to react with actions such as - // updating wallets. - dag.dagLock.Unlock() - dag.sendNotification(NTBlockConnected, block) - dag.dagLock.Lock() - } - // Intentionally ignore errors writing updated node status to DB. If // it fails to write, it's not the end of the world. If the block is // invalid, the worst that can happen is we revalidate the block diff --git a/blockdag/notifications.go b/blockdag/notifications.go index 83e997da8..08573b39c 100644 --- a/blockdag/notifications.go +++ b/blockdag/notifications.go @@ -17,21 +17,15 @@ type NotificationCallback func(*Notification) // Constants for the type of a notification message. const ( - // NTBlockAccepted indicates the associated block was accepted into - // the block chain. Note that this does not necessarily mean it was - // added to the main chain. For that, use NTBlockConnected. - NTBlockAccepted NotificationType = iota - - // NTBlockConnected indicates the associated block was connected to the - // main chain. - NTBlockConnected + // NTBlockAdded indicates the associated block was added into + // the blockDAG. + NTBlockAdded NotificationType = iota ) // notificationTypeStrings is a map of notification types back to their constant // names for pretty printing. var notificationTypeStrings = map[NotificationType]string{ - NTBlockAccepted: "NTBlockAccepted", - NTBlockConnected: "NTBlockConnected", + NTBlockAdded: "NTBlockAdded", } // String returns the NotificationType in human-readable form. @@ -45,9 +39,7 @@ func (n NotificationType) String() string { // Notification defines notification that is sent to the caller via the callback // function provided during the call to New and consists of a notification type // as well as associated data that depends on the type as follows: -// - NTBlockAccepted: *util.Block -// - NTBlockConnected: *util.Block -// - NTBlockDisconnected: *util.Block +// - Added: *util.Block type Notification struct { Type NotificationType Data interface{} diff --git a/blockdag/notifications_test.go b/blockdag/notifications_test.go index 6108fb31f..279ebf094 100644 --- a/blockdag/notifications_test.go +++ b/blockdag/notifications_test.go @@ -30,7 +30,7 @@ func TestNotifications(t *testing.T) { notificationCount := 0 callback := func(notification *Notification) { - if notification.Type == NTBlockAccepted { + if notification.Type == NTBlockAdded { notificationCount++ } } diff --git a/btcjson/dagsvrwsntfns.go b/btcjson/dagsvrwsntfns.go index 972f30c1d..e50f9f8e6 100644 --- a/btcjson/dagsvrwsntfns.go +++ b/btcjson/dagsvrwsntfns.go @@ -9,34 +9,22 @@ package btcjson const ( - // BlockConnectedNtfnMethod is the legacy, deprecated method used for + // BlockAddedNtfnMethod is the legacy, deprecated method used for // notifications from the dag server that a block has been connected. // - // NOTE: Deprecated. Use FilteredBlockConnectedNtfnMethod instead. - BlockConnectedNtfnMethod = "blockConnected" + // NOTE: Deprecated. Use FilteredBlockAddedNtfnMethod instead. + BlockAddedNtfnMethod = "blockAdded" - // BlockDisconnectedNtfnMethod is the legacy, deprecated method used for - // notifications from the dag server that a block has been - // disconnected. - // - // NOTE: Deprecated. Use FilteredBlockDisconnectedNtfnMethod instead. - BlockDisconnectedNtfnMethod = "blockDisconnected" - - // FilteredBlockConnectedNtfnMethod is the new method used for + // FilteredBlockAddedNtfnMethod is the new method used for // notifications from the dag server that a block has been connected. - FilteredBlockConnectedNtfnMethod = "filteredBlockConnected" - - // FilteredBlockDisconnectedNtfnMethod is the new method used for - // notifications from the dag server that a block has been - // disconnected. - FilteredBlockDisconnectedNtfnMethod = "filteredBlockDisconnected" + FilteredBlockAddedNtfnMethod = "filteredBlockAdded" // RecvTxNtfnMethod is the legacy, deprecated method used for // notifications from the dag server that a transaction which pays to // a registered address has been processed. // // NOTE: Deprecated. Use RelevantTxAcceptedNtfnMethod and - // FilteredBlockConnectedNtfnMethod instead. + // FilteredBlockAddedNtfnMethod instead. RecvTxNtfnMethod = "recvTx" // RedeemingTxNtfnMethod is the legacy, deprecated method used for @@ -44,7 +32,7 @@ const ( // registered outpoint has been processed. // // NOTE: Deprecated. Use RelevantTxAcceptedNtfnMethod and - // FilteredBlockConnectedNtfnMethod instead. + // FilteredBlockAddedNtfnMethod instead. RedeemingTxNtfnMethod = "redeemingTx" // RescanFinishedNtfnMethod is the legacy, deprecated method used for @@ -77,82 +65,45 @@ const ( RelevantTxAcceptedNtfnMethod = "relevantTxAccepted" ) -// BlockConnectedNtfn defines the blockConnected JSON-RPC notification. +// BlockAddedNtfn defines the blockAdded JSON-RPC notification. // -// NOTE: Deprecated. Use FilteredBlockConnectedNtfn instead. -type BlockConnectedNtfn struct { +// NOTE: Deprecated. Use FilteredBlockAddedNtfn instead. +type BlockAddedNtfn struct { Hash string Height int32 Time int64 } -// NewBlockConnectedNtfn returns a new instance which can be used to issue a -// blockConnected JSON-RPC notification. +// NewBlockAddedNtfn returns a new instance which can be used to issue a +// blockAdded JSON-RPC notification. // -// NOTE: Deprecated. Use NewFilteredBlockConnectedNtfn instead. -func NewBlockConnectedNtfn(hash string, height int32, time int64) *BlockConnectedNtfn { - return &BlockConnectedNtfn{ +// NOTE: Deprecated. Use NewFilteredBlockAddedNtfn instead. +func NewBlockAddedNtfn(hash string, height int32, time int64) *BlockAddedNtfn { + return &BlockAddedNtfn{ Hash: hash, Height: height, Time: time, } } -// BlockDisconnectedNtfn defines the blockDisconnected JSON-RPC notification. -// -// NOTE: Deprecated. Use FilteredBlockDisconnectedNtfn instead. -type BlockDisconnectedNtfn struct { - Hash string - Height int32 - Time int64 -} - -// NewBlockDisconnectedNtfn returns a new instance which can be used to issue a -// blockDisconnected JSON-RPC notification. -// -// NOTE: Deprecated. Use NewFilteredBlockDisconnectedNtfn instead. -func NewBlockDisconnectedNtfn(hash string, height int32, time int64) *BlockDisconnectedNtfn { - return &BlockDisconnectedNtfn{ - Hash: hash, - Height: height, - Time: time, - } -} - -// FilteredBlockConnectedNtfn defines the filteredBlockConnected JSON-RPC +// FilteredBlockAddedNtfn defines the filteredBlockAdded JSON-RPC // notification. -type FilteredBlockConnectedNtfn struct { +type FilteredBlockAddedNtfn struct { Height int32 Header string SubscribedTxs []string } -// NewFilteredBlockConnectedNtfn returns a new instance which can be used to -// issue a filteredBlockConnected JSON-RPC notification. -func NewFilteredBlockConnectedNtfn(height int32, header string, subscribedTxs []string) *FilteredBlockConnectedNtfn { - return &FilteredBlockConnectedNtfn{ +// NewFilteredBlockAddedNtfn returns a new instance which can be used to +// issue a filteredBlockAdded JSON-RPC notification. +func NewFilteredBlockAddedNtfn(height int32, header string, subscribedTxs []string) *FilteredBlockAddedNtfn { + return &FilteredBlockAddedNtfn{ Height: height, Header: header, SubscribedTxs: subscribedTxs, } } -// FilteredBlockDisconnectedNtfn defines the filteredBlockDisconnected JSON-RPC -// notification. -type FilteredBlockDisconnectedNtfn struct { - Height int32 - Header string -} - -// NewFilteredBlockDisconnectedNtfn returns a new instance which can be used to -// issue a filteredBlockDisconnected JSON-RPC notification. -func NewFilteredBlockDisconnectedNtfn(height int32, header string) *FilteredBlockDisconnectedNtfn { - return &FilteredBlockDisconnectedNtfn{ - Height: height, - Header: header, - } -} - // BlockDetails describes details of a tx in a block. type BlockDetails struct { Height int32 `json:"height"` @@ -163,7 +114,7 @@ type BlockDetails struct { // RecvTxNtfn defines the recvTx JSON-RPC notification. // -// NOTE: Deprecated. Use RelevantTxAcceptedNtfn and FilteredBlockConnectedNtfn +// NOTE: Deprecated. Use RelevantTxAcceptedNtfn and FilteredBlockAddedNtfn // instead. type RecvTxNtfn struct { HexTx string @@ -174,7 +125,7 @@ type RecvTxNtfn struct { // JSON-RPC notification. // // NOTE: Deprecated. Use NewRelevantTxAcceptedNtfn and -// NewFilteredBlockConnectedNtfn instead. +// NewFilteredBlockAddedNtfn instead. func NewRecvTxNtfn(hexTx string, block *BlockDetails) *RecvTxNtfn { return &RecvTxNtfn{ HexTx: hexTx, @@ -184,7 +135,7 @@ func NewRecvTxNtfn(hexTx string, block *BlockDetails) *RecvTxNtfn { // RedeemingTxNtfn defines the redeemingTx JSON-RPC notification. // -// NOTE: Deprecated. Use RelevantTxAcceptedNtfn and FilteredBlockConnectedNtfn +// NOTE: Deprecated. Use RelevantTxAcceptedNtfn and FilteredBlockAddedNtfn // instead. type RedeemingTxNtfn struct { HexTx string @@ -195,7 +146,7 @@ type RedeemingTxNtfn struct { // redeemingTx JSON-RPC notification. // // NOTE: Deprecated. Use NewRelevantTxAcceptedNtfn and -// NewFilteredBlockConnectedNtfn instead. +// NewFilteredBlockAddedNtfn instead. func NewRedeemingTxNtfn(hexTx string, block *BlockDetails) *RedeemingTxNtfn { return &RedeemingTxNtfn{ HexTx: hexTx, @@ -290,10 +241,8 @@ func init() { // notifications. flags := UFWebsocketOnly | UFNotification - MustRegisterCmd(BlockConnectedNtfnMethod, (*BlockConnectedNtfn)(nil), flags) - MustRegisterCmd(BlockDisconnectedNtfnMethod, (*BlockDisconnectedNtfn)(nil), flags) - MustRegisterCmd(FilteredBlockConnectedNtfnMethod, (*FilteredBlockConnectedNtfn)(nil), flags) - MustRegisterCmd(FilteredBlockDisconnectedNtfnMethod, (*FilteredBlockDisconnectedNtfn)(nil), flags) + MustRegisterCmd(BlockAddedNtfnMethod, (*BlockAddedNtfn)(nil), flags) + MustRegisterCmd(FilteredBlockAddedNtfnMethod, (*FilteredBlockAddedNtfn)(nil), flags) MustRegisterCmd(RecvTxNtfnMethod, (*RecvTxNtfn)(nil), flags) MustRegisterCmd(RedeemingTxNtfnMethod, (*RedeemingTxNtfn)(nil), flags) MustRegisterCmd(RescanFinishedNtfnMethod, (*RescanFinishedNtfn)(nil), flags) diff --git a/btcjson/dagsvrwsntfns_test.go b/btcjson/dagsvrwsntfns_test.go index b9b252981..c4514f9e7 100644 --- a/btcjson/dagsvrwsntfns_test.go +++ b/btcjson/dagsvrwsntfns_test.go @@ -33,64 +33,35 @@ func TestDAGSvrWsNtfns(t *testing.T) { unmarshalled interface{} }{ { - name: "blockConnected", + name: "blockAdded", newNtfn: func() (interface{}, error) { - return btcjson.NewCmd("blockConnected", "123", 100000, 123456789) + return btcjson.NewCmd("blockAdded", "123", 100000, 123456789) }, staticNtfn: func() interface{} { - return btcjson.NewBlockConnectedNtfn("123", 100000, 123456789) + return btcjson.NewBlockAddedNtfn("123", 100000, 123456789) }, - marshalled: `{"jsonrpc":"1.0","method":"blockConnected","params":["123",100000,123456789],"id":null}`, - unmarshalled: &btcjson.BlockConnectedNtfn{ + marshalled: `{"jsonrpc":"1.0","method":"blockAdded","params":["123",100000,123456789],"id":null}`, + unmarshalled: &btcjson.BlockAddedNtfn{ Hash: "123", Height: 100000, Time: 123456789, }, }, { - name: "blockDisconnected", + name: "filteredBlockAdded", newNtfn: func() (interface{}, error) { - return btcjson.NewCmd("blockDisconnected", "123", 100000, 123456789) + return btcjson.NewCmd("filteredBlockAdded", 100000, "header", []string{"tx0", "tx1"}) }, staticNtfn: func() interface{} { - return btcjson.NewBlockDisconnectedNtfn("123", 100000, 123456789) + return btcjson.NewFilteredBlockAddedNtfn(100000, "header", []string{"tx0", "tx1"}) }, - marshalled: `{"jsonrpc":"1.0","method":"blockDisconnected","params":["123",100000,123456789],"id":null}`, - unmarshalled: &btcjson.BlockDisconnectedNtfn{ - Hash: "123", - Height: 100000, - Time: 123456789, - }, - }, - { - name: "filteredBlockConnected", - newNtfn: func() (interface{}, error) { - return btcjson.NewCmd("filteredBlockConnected", 100000, "header", []string{"tx0", "tx1"}) - }, - staticNtfn: func() interface{} { - return btcjson.NewFilteredBlockConnectedNtfn(100000, "header", []string{"tx0", "tx1"}) - }, - marshalled: `{"jsonrpc":"1.0","method":"filteredBlockConnected","params":[100000,"header",["tx0","tx1"]],"id":null}`, - unmarshalled: &btcjson.FilteredBlockConnectedNtfn{ + marshalled: `{"jsonrpc":"1.0","method":"filteredBlockAdded","params":[100000,"header",["tx0","tx1"]],"id":null}`, + unmarshalled: &btcjson.FilteredBlockAddedNtfn{ Height: 100000, Header: "header", SubscribedTxs: []string{"tx0", "tx1"}, }, }, - { - name: "filteredBlockDisconnected", - newNtfn: func() (interface{}, error) { - return btcjson.NewCmd("filteredBlockDisconnected", 100000, "header") - }, - staticNtfn: func() interface{} { - return btcjson.NewFilteredBlockDisconnectedNtfn(100000, "header") - }, - marshalled: `{"jsonrpc":"1.0","method":"filteredBlockDisconnected","params":[100000,"header"],"id":null}`, - unmarshalled: &btcjson.FilteredBlockDisconnectedNtfn{ - Height: 100000, - Header: "header", - }, - }, { name: "recvTx", newNtfn: func() (interface{}, error) { diff --git a/integration/rpctest/memwallet.go b/integration/rpctest/memwallet.go index 625433e74..1c5fe4407 100644 --- a/integration/rpctest/memwallet.go +++ b/integration/rpctest/memwallet.go @@ -49,16 +49,15 @@ func (u *utxo) isMature(height int32) bool { return height >= u.maturityHeight } -// chainUpdate encapsulates an update to the current main chain. This struct is -// used to sync up the memWallet each time a new block is connected to the main -// chain. -type chainUpdate struct { +// dagUpdate encapsulates an update to the current DAG. This struct is +// used to sync up the memWallet each time a new block is connected to the DAG. +type dagUpdate struct { blockHeight int32 filteredTxns []*util.Tx isConnect bool // True if connect, false if disconnect } -// undoEntry is functionally the opposite of a chainUpdate. An undoEntry is +// undoEntry is functionally the opposite of a dagUpdate. An undoEntry is // created for each new block received, then stored in a log in order to // properly handle block re-orgs. type undoEntry struct { @@ -96,9 +95,9 @@ type memWallet struct { // disconnected block on the wallet's set of spendable utxos. reorgJournal map[int32]*undoEntry - chainUpdates []*chainUpdate - chainUpdateSignal chan struct{} - chainMtx sync.Mutex + dagUpdates []*dagUpdate + dagUpdateSignal chan struct{} + dagMtx sync.Mutex net *dagconfig.Params @@ -143,21 +142,21 @@ func newMemWallet(net *dagconfig.Params, harnessID uint32) (*memWallet, error) { addrs[0] = coinbaseAddr return &memWallet{ - net: net, - coinbaseKey: coinbaseKey, - coinbaseAddr: coinbaseAddr, - hdIndex: 1, - hdRoot: hdRoot, - addrs: addrs, - utxos: make(map[wire.OutPoint]*utxo), - chainUpdateSignal: make(chan struct{}), - reorgJournal: make(map[int32]*undoEntry), + net: net, + coinbaseKey: coinbaseKey, + coinbaseAddr: coinbaseAddr, + hdIndex: 1, + hdRoot: hdRoot, + addrs: addrs, + utxos: make(map[wire.OutPoint]*utxo), + dagUpdateSignal: make(chan struct{}), + reorgJournal: make(map[int32]*undoEntry), }, nil } // Start launches all goroutines required for the wallet to function properly. func (m *memWallet) Start() { - go m.chainSyncer() + go m.dagSyncer() } // SyncedHeight returns the height the wallet is known to be synced to. @@ -176,27 +175,27 @@ func (m *memWallet) SetRPCClient(rpcClient *rpcclient.Client) { } // IngestBlock is a call-back which is to be triggered each time a new block is -// connected to the main chain. It queues the update for the chain syncer, +// connected to the blockDAG. It queues the update for the DAG syncer, // calling the private version in sequential order. func (m *memWallet) IngestBlock(height int32, header *wire.BlockHeader, filteredTxns []*util.Tx) { - // Append this new chain update to the end of the queue of new chain + // Append this new DAG update to the end of the queue of new DAG // updates. - m.chainMtx.Lock() - m.chainUpdates = append(m.chainUpdates, &chainUpdate{height, + m.dagMtx.Lock() + m.dagUpdates = append(m.dagUpdates, &dagUpdate{height, filteredTxns, true}) - m.chainMtx.Unlock() + m.dagMtx.Unlock() - // Launch a goroutine to signal the chainSyncer that a new update is + // Launch a goroutine to signal the dagSyncer that a new update is // available. We do this in a new goroutine in order to avoid blocking // the main loop of the rpc client. go func() { - m.chainUpdateSignal <- struct{}{} + m.dagUpdateSignal <- struct{}{} }() } // ingestBlock updates the wallet's internal utxo state based on the outputs // created and destroyed within each block. -func (m *memWallet) ingestBlock(update *chainUpdate) { +func (m *memWallet) ingestBlock(update *dagUpdate) { // Update the latest synced height, then process each filtered // transaction in the block creating and destroying utxos within // the wallet as a result. @@ -218,27 +217,25 @@ func (m *memWallet) ingestBlock(update *chainUpdate) { m.reorgJournal[update.blockHeight] = undo } -// chainSyncer is a goroutine dedicated to processing new blocks in order to +// dagSyncer is a goroutine dedicated to processing new blocks in order to // keep the wallet's utxo state up to date. // // NOTE: This MUST be run as a goroutine. -func (m *memWallet) chainSyncer() { - var update *chainUpdate +func (m *memWallet) dagSyncer() { + var update *dagUpdate - for range m.chainUpdateSignal { + for range m.dagUpdateSignal { // A new update is available, so pop the new chain update from // the front of the update queue. - m.chainMtx.Lock() - update = m.chainUpdates[0] - m.chainUpdates[0] = nil // Set to nil to prevent GC leak. - m.chainUpdates = m.chainUpdates[1:] - m.chainMtx.Unlock() + m.dagMtx.Lock() + update = m.dagUpdates[0] + m.dagUpdates[0] = nil // Set to nil to prevent GC leak. + m.dagUpdates = m.dagUpdates[1:] + m.dagMtx.Unlock() m.Lock() if update.isConnect { m.ingestBlock(update) - } else { - m.unwindBlock(update) } m.Unlock() } @@ -295,41 +292,6 @@ func (m *memWallet) evalInputs(inputs []*wire.TxIn, undo *undoEntry) { } } -// UnwindBlock is a call-back which is to be executed each time a block is -// disconnected from the main chain. It queues the update for the chain syncer, -// calling the private version in sequential order. -func (m *memWallet) UnwindBlock(height int32, header *wire.BlockHeader) { - // Append this new chain update to the end of the queue of new chain - // updates. - m.chainMtx.Lock() - m.chainUpdates = append(m.chainUpdates, &chainUpdate{height, - nil, false}) - m.chainMtx.Unlock() - - // Launch a goroutine to signal the chainSyncer that a new update is - // available. We do this in a new goroutine in order to avoid blocking - // the main loop of the rpc client. - go func() { - m.chainUpdateSignal <- struct{}{} - }() -} - -// unwindBlock undoes the effect that a particular block had on the wallet's -// internal utxo state. -func (m *memWallet) unwindBlock(update *chainUpdate) { - undo := m.reorgJournal[update.blockHeight] - - for _, utxo := range undo.utxosCreated { - delete(m.utxos, utxo) - } - - for outPoint, utxo := range undo.utxosDestroyed { - m.utxos[outPoint] = utxo - } - - delete(m.reorgJournal, update.blockHeight) -} - // newAddress returns a new address from the wallet's hd key chain. It also // loads the address into the RPC client's transaction filter to ensure any // transactions that involve it are delivered via the notifications. diff --git a/integration/rpctest/rpc_harness.go b/integration/rpctest/rpc_harness.go index dd87d4019..0252af2d5 100644 --- a/integration/rpctest/rpc_harness.go +++ b/integration/rpctest/rpc_harness.go @@ -169,24 +169,15 @@ func New(activeNet *dagconfig.Params, handlers *rpcclient.NotificationHandlers, // callback has already been set, then create a wrapper callback which // executes both the currently registered callback and the mem wallet's // callback. - if handlers.OnFilteredBlockConnected != nil { - obc := handlers.OnFilteredBlockConnected - handlers.OnFilteredBlockConnected = func(height int32, header *wire.BlockHeader, filteredTxns []*util.Tx) { + if handlers.OnFilteredBlockAdded != nil { + obc := handlers.OnFilteredBlockAdded + handlers.OnFilteredBlockAdded = func(height int32, header *wire.BlockHeader, filteredTxns []*util.Tx) { wallet.IngestBlock(height, header, filteredTxns) obc(height, header, filteredTxns) } } else { // Otherwise, we can claim the callback ourselves. - handlers.OnFilteredBlockConnected = wallet.IngestBlock - } - if handlers.OnFilteredBlockDisconnected != nil { - obd := handlers.OnFilteredBlockDisconnected - handlers.OnFilteredBlockDisconnected = func(height int32, header *wire.BlockHeader) { - wallet.UnwindBlock(height, header) - obd(height, header) - } - } else { - handlers.OnFilteredBlockDisconnected = wallet.UnwindBlock + handlers.OnFilteredBlockAdded = wallet.IngestBlock } h := &Harness{ diff --git a/netsync/manager.go b/netsync/manager.go index b93a75f30..f1c27a9c4 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -1171,9 +1171,8 @@ out: // connected peers. func (sm *SyncManager) handleBlockDAGNotification(notification *blockdag.Notification) { switch notification.Type { - // A block has been accepted into the block chain. Relay it to other - // peers. - case blockdag.NTBlockAccepted: + // A block has been accepted into the blockDAG. Relay it to other peers. + case blockdag.NTBlockAdded: // Don't relay if we are not current. Other peers that are // current should already know about it. if !sm.current() { @@ -1182,7 +1181,7 @@ func (sm *SyncManager) handleBlockDAGNotification(notification *blockdag.Notific block, ok := notification.Data.(*util.Block) if !ok { - log.Warnf("Chain accepted notification is not a block.") + log.Warnf("Block Added notification data is not a block.") break } @@ -1190,14 +1189,7 @@ func (sm *SyncManager) handleBlockDAGNotification(notification *blockdag.Notific iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash()) sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header) - // A block has been connected to the block DAG. - case blockdag.NTBlockConnected: - block, ok := notification.Data.(*util.Block) - if !ok { - log.Warnf("Chain connected notification is not a block.") - break - } - + // Update mempool ch := make(chan mempool.NewBlockMsg) go func() { err := sm.txMemPool.HandleNewBlock(block, ch) @@ -1329,8 +1321,7 @@ func (sm *SyncManager) SyncPeerID() int32 { return <-reply } -// ProcessBlock makes use of ProcessBlock on an internal instance of a block -// chain. +// ProcessBlock makes use of ProcessBlock on an internal instance of a blockDAG. func (sm *SyncManager) ProcessBlock(block *util.Block, flags blockdag.BehaviorFlags) (bool, error) { reply := make(chan processBlockResponse, 1) sm.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply} diff --git a/rpcclient/examples/btcdwebsockets/main.go b/rpcclient/examples/btcdwebsockets/main.go index 4d957d5eb..b13a51216 100644 --- a/rpcclient/examples/btcdwebsockets/main.go +++ b/rpcclient/examples/btcdwebsockets/main.go @@ -21,12 +21,8 @@ func main() { // for notifications. See the documentation of the rpcclient // NotificationHandlers type for more details about each handler. ntfnHandlers := rpcclient.NotificationHandlers{ - OnFilteredBlockConnected: func(height int32, header *wire.BlockHeader, txns []*util.Tx) { - log.Printf("Block connected: %s (%d) %s", - header.BlockHash(), height, header.Timestamp) - }, - OnFilteredBlockDisconnected: func(height int32, header *wire.BlockHeader) { - log.Printf("Block disconnected: %s (%d) %s", + OnFilteredBlockAdded: func(height int32, header *wire.BlockHeader, txns []*util.Tx) { + log.Printf("Block added: %s (%d) %s", header.BlockHash(), height, header.Timestamp) }, } @@ -49,7 +45,7 @@ func main() { log.Fatal(err) } - // Register for block connect and disconnect notifications. + // Register for block added notifications. if err := client.NotifyBlocks(); err != nil { log.Fatal(err) } diff --git a/rpcclient/notify.go b/rpcclient/notify.go index 75d54204a..9ecd0f704 100644 --- a/rpcclient/notify.go +++ b/rpcclient/notify.go @@ -92,40 +92,24 @@ type NotificationHandlers struct { // notification handlers, and is safe for blocking client requests. OnClientConnected func() - // OnBlockConnected is invoked when a block is connected to the longest - // (best) chain. It will only be invoked if a preceding call to - // NotifyBlocks has been made to register for the notification and the - // function is non-nil. + // OnBlockAdded is invoked when a block is connected to the DAG. + // It will only be invoked if a preceding call to NotifyBlocks has been made + // to register for the notification and the function is non-nil. // - // NOTE: Deprecated. Use OnFilteredBlockConnected instead. - OnBlockConnected func(hash *daghash.Hash, height int32, t time.Time) + // NOTE: Deprecated. Use OnFilteredBlockAdded instead. + OnBlockAdded func(hash *daghash.Hash, height int32, t time.Time) - // OnFilteredBlockConnected is invoked when a block is connected to the - // longest (best) chain. It will only be invoked if a preceding call to + // OnFilteredBlockAdded is invoked when a block is connected to the + // bloackDAG. It will only be invoked if a preceding call to // NotifyBlocks has been made to register for the notification and the - // function is non-nil. Its parameters differ from OnBlockConnected: it + // function is non-nil. Its parameters differ from OnBlockAdded: it // receives the block's height, header, and relevant transactions. - OnFilteredBlockConnected func(height int32, header *wire.BlockHeader, + OnFilteredBlockAdded func(height int32, header *wire.BlockHeader, txs []*util.Tx) - // OnBlockDisconnected is invoked when a block is disconnected from the - // longest (best) chain. It will only be invoked if a preceding call to - // NotifyBlocks has been made to register for the notification and the - // function is non-nil. - // - // NOTE: Deprecated. Use OnFilteredBlockDisconnected instead. - OnBlockDisconnected func(hash *daghash.Hash, height int32, t time.Time) - - // OnFilteredBlockDisconnected is invoked when a block is disconnected - // from the longest (best) chain. It will only be invoked if a - // preceding NotifyBlocks has been made to register for the notification - // and the call to function is non-nil. Its parameters differ from - // OnBlockDisconnected: it receives the block's height and header. - OnFilteredBlockDisconnected func(height int32, header *wire.BlockHeader) - // OnRecvTx is invoked when a transaction that receives funds to a // registered address is received into the memory pool and also - // connected to the longest (best) chain. It will only be invoked if a + // connected to the BlockDAG. It will only be invoked if a // preceding call to NotifyReceived, Rescan, or RescanEndHeight has been // made to register for the notification and the function is non-nil. // @@ -134,7 +118,7 @@ type NotificationHandlers struct { // OnRedeemingTx is invoked when a transaction that spends a registered // outpoint is received into the memory pool and also connected to the - // longest (best) chain. It will only be invoked if a preceding call to + // blockDAG. It will only be invoked if a preceding call to // NotifySpent, Rescan, or RescanEndHeight has been made to register for // the notification and the function is non-nil. // @@ -220,78 +204,42 @@ func (c *Client) handleNotification(ntfn *rawNotification) { } switch ntfn.Method { - // OnBlockConnected - case btcjson.BlockConnectedNtfnMethod: + // OnBlockAdded + case btcjson.BlockAddedNtfnMethod: // Ignore the notification if the client is not interested in // it. - if c.ntfnHandlers.OnBlockConnected == nil { + if c.ntfnHandlers.OnBlockAdded == nil { return } - blockHash, blockHeight, blockTime, err := parseChainNtfnParams(ntfn.Params) + blockHash, blockHeight, blockTime, err := parseDAGNtfnParams(ntfn.Params) if err != nil { - log.Warnf("Received invalid block connected "+ + log.Warnf("Received invalid block added "+ "notification: %s", err) return } - c.ntfnHandlers.OnBlockConnected(blockHash, blockHeight, blockTime) + c.ntfnHandlers.OnBlockAdded(blockHash, blockHeight, blockTime) - // OnFilteredBlockConnected - case btcjson.FilteredBlockConnectedNtfnMethod: + // OnFilteredBlockAdded + case btcjson.FilteredBlockAddedNtfnMethod: // Ignore the notification if the client is not interested in // it. - if c.ntfnHandlers.OnFilteredBlockConnected == nil { + if c.ntfnHandlers.OnFilteredBlockAdded == nil { return } blockHeight, blockHeader, transactions, err := - parseFilteredBlockConnectedParams(ntfn.Params) + parseFilteredBlockAddedParams(ntfn.Params) if err != nil { log.Warnf("Received invalid filtered block "+ "connected notification: %s", err) return } - c.ntfnHandlers.OnFilteredBlockConnected(blockHeight, + c.ntfnHandlers.OnFilteredBlockAdded(blockHeight, blockHeader, transactions) - // OnBlockDisconnected - case btcjson.BlockDisconnectedNtfnMethod: - // Ignore the notification if the client is not interested in - // it. - if c.ntfnHandlers.OnBlockDisconnected == nil { - return - } - - blockHash, blockHeight, blockTime, err := parseChainNtfnParams(ntfn.Params) - if err != nil { - log.Warnf("Received invalid block connected "+ - "notification: %s", err) - return - } - - c.ntfnHandlers.OnBlockDisconnected(blockHash, blockHeight, blockTime) - - // OnFilteredBlockDisconnected - case btcjson.FilteredBlockDisconnectedNtfnMethod: - // Ignore the notification if the client is not interested in - // it. - if c.ntfnHandlers.OnFilteredBlockDisconnected == nil { - return - } - - blockHeight, blockHeader, err := - parseFilteredBlockDisconnectedParams(ntfn.Params) - if err != nil { - log.Warnf("Received invalid filtered block "+ - "disconnected notification: %s", err) - return - } - - c.ntfnHandlers.OnFilteredBlockDisconnected(blockHeight, - blockHeader) - // OnRecvTx case btcjson.RecvTxNtfnMethod: // Ignore the notification if the client is not interested in @@ -485,9 +433,9 @@ func (e wrongNumParams) Error() string { return fmt.Sprintf("wrong number of parameters (%d)", e) } -// parseChainNtfnParams parses out the block hash and height from the parameters -// of blockconnected and blockdisconnected notifications. -func parseChainNtfnParams(params []json.RawMessage) (*daghash.Hash, +// parseDAGNtfnParams parses out the block hash and height from the parameters +// of blockadded. +func parseDAGNtfnParams(params []json.RawMessage) (*daghash.Hash, int32, time.Time, error) { if len(params) != 3 { @@ -527,12 +475,12 @@ func parseChainNtfnParams(params []json.RawMessage) (*daghash.Hash, return blockHash, blockHeight, blockTime, nil } -// parseFilteredBlockConnectedParams parses out the parameters included in a -// filteredblockconnected notification. +// parseFilteredBlockAddedParams parses out the parameters included in a +// filteredblockadded notification. // // NOTE: This is a btcd extension ported from github.com/decred/dcrrpcclient // and requires a websocket connection. -func parseFilteredBlockConnectedParams(params []json.RawMessage) (int32, +func parseFilteredBlockAddedParams(params []json.RawMessage) (int32, *wire.BlockHeader, []*util.Tx, error) { if len(params) < 3 { @@ -583,40 +531,6 @@ func parseFilteredBlockConnectedParams(params []json.RawMessage) (int32, return blockHeight, &blockHeader, transactions, nil } -// parseFilteredBlockDisconnectedParams parses out the parameters included in a -// filteredblockdisconnected notification. -// -// NOTE: This is a btcd extension ported from github.com/decred/dcrrpcclient -// and requires a websocket connection. -func parseFilteredBlockDisconnectedParams(params []json.RawMessage) (int32, - *wire.BlockHeader, error) { - if len(params) < 2 { - return 0, nil, wrongNumParams(len(params)) - } - - // Unmarshal first parameter as an integer. - var blockHeight int32 - err := json.Unmarshal(params[0], &blockHeight) - if err != nil { - return 0, nil, err - } - - // Unmarshal second parmeter as a slice of bytes. - blockHeaderBytes, err := parseHexParam(params[1]) - if err != nil { - return 0, nil, err - } - - // Deserialize block header from slice of bytes. - var blockHeader wire.BlockHeader - err = blockHeader.Deserialize(bytes.NewReader(blockHeaderBytes)) - if err != nil { - return 0, nil, err - } - - return blockHeight, &blockHeader, nil -} - func parseHexParam(param json.RawMessage) ([]byte, error) { var s string err := json.Unmarshal(param, &s) @@ -896,8 +810,7 @@ func (c *Client) NotifyBlocksAsync() FutureNotifyBlocksResult { // this function has no effect if there are no notification handlers and will // result in an error if the client is configured to run in HTTP POST mode. // -// The notifications delivered as a result of this call will be via one of -// OnBlockConnected or OnBlockDisconnected. +// The notifications delivered as a result of this call will be via OnBlockAdded // // NOTE: This is a btcd extension and requires a websocket connection. func (c *Client) NotifyBlocks() error { @@ -1104,7 +1017,7 @@ func (c *Client) NotifyReceivedAsync(addresses []util.Address) FutureNotifyRecei // NotifyReceived registers the client to receive notifications every time a // new transaction which pays to one of the passed addresses is accepted to -// memory pool or in a block connected to the block chain. In addition, when +// memory pool or in a block added to the block DAG. In addition, when // one of these transactions is detected, the client is also automatically // registered for notifications when the new transaction outpoints the address // now has available are spent (See NotifySpent). The notifications are diff --git a/server/rpc/rpcserver.go b/server/rpc/rpcserver.go index fa420d0ba..995eba357 100644 --- a/server/rpc/rpcserver.go +++ b/server/rpc/rpcserver.go @@ -1498,10 +1498,10 @@ func (state *gbtWorkState) notifyLongPollers(tipHashes []daghash.Hash, lastGener } } -// NotifyBlockConnected uses the newly-connected block to notify any long poll +// NotifyBlockAdded uses the newly-added block to notify any long poll // clients with a new block template when their existing block template is -// stale due to the newly connected block. -func (state *gbtWorkState) NotifyBlockConnected(tipHashes []daghash.Hash) { +// stale due to the newly added block. +func (state *gbtWorkState) NotifyBlockAdded(tipHashes []daghash.Hash) { go func() { state.Lock() defer state.Unlock() @@ -4325,27 +4325,26 @@ func NewRPCServer( return &rpc, nil } -// Callback for notifications from blockchain. It notifies clients that are +// Callback for notifications from blockdag. It notifies clients that are // long polling for changes or subscribed to websockets notifications. func (s *Server) handleBlockchainNotification(notification *blockdag.Notification) { switch notification.Type { - case blockdag.NTBlockAccepted: + case blockdag.NTBlockAdded: + block, ok := notification.Data.(*util.Block) + if !ok { + log.Warnf("Block added notification data is not a block.") + break + } + tipHashes := s.cfg.DAG.TipHashes() // Allow any clients performing long polling via the // getBlockTemplate RPC to be notified when the new block causes // their old block template to become stale. - s.gbtWorkState.NotifyBlockConnected(tipHashes) - - case blockdag.NTBlockConnected: - block, ok := notification.Data.(*util.Block) - if !ok { - log.Warnf("Chain connected notification is not a block.") - break - } + s.gbtWorkState.NotifyBlockAdded(tipHashes) // Notify registered websocket clients of incoming block. - s.ntfnMgr.NotifyBlockConnected(block) + s.ntfnMgr.NotifyBlockAdded(block) } } diff --git a/server/rpc/rpcwebsocket.go b/server/rpc/rpcwebsocket.go index 5dc99e928..97e844879 100644 --- a/server/rpc/rpcwebsocket.go +++ b/server/rpc/rpcwebsocket.go @@ -200,29 +200,16 @@ func (m *wsNotificationManager) queueHandler() { m.wg.Done() } -// NotifyBlockConnected passes a block newly-connected to the best chain +// NotifyBlockAdded passes a block newly-added to the blockDAG // to the notification manager for block and transaction notification // processing. -func (m *wsNotificationManager) NotifyBlockConnected(block *util.Block) { - // As NotifyBlockConnected will be called by the block manager +func (m *wsNotificationManager) NotifyBlockAdded(block *util.Block) { + // As NotifyBlockAdded will be called by the block manager // and the RPC server may no longer be running, use a select // statement to unblock enqueuing the notification once the RPC // server has begun shutting down. select { - case m.queueNotification <- (*notificationBlockConnected)(block): - case <-m.quit: - } -} - -// NotifyBlockDisconnected passes a block disconnected from the best chain -// to the notification manager for block notification processing. -func (m *wsNotificationManager) NotifyBlockDisconnected(block *util.Block) { - // As NotifyBlockDisconnected will be called by the block manager - // and the RPC server may no longer be running, use a select - // statement to unblock enqueuing the notification once the RPC - // server has begun shutting down. - select { - case m.queueNotification <- (*notificationBlockDisconnected)(block): + case m.queueNotification <- (*notificationBlockAdded)(block): case <-m.quit: } } @@ -446,8 +433,7 @@ func (f *wsClientFilter) removeUnspentOutPoint(op *wire.OutPoint) { } // Notification types -type notificationBlockConnected util.Block -type notificationBlockDisconnected util.Block +type notificationBlockAdded util.Block type notificationTxAcceptedByMempool struct { isNew bool tx *util.Tx @@ -504,7 +490,7 @@ out: break out } switch n := n.(type) { - case *notificationBlockConnected: + case *notificationBlockAdded: block := (*util.Block)(n) // Skip iterating through all txs if no @@ -517,19 +503,9 @@ out: } if len(blockNotifications) != 0 { - m.notifyBlockConnected(blockNotifications, + m.notifyBlockAdded(blockNotifications, block) - m.notifyFilteredBlockConnected(blockNotifications, - block) - } - - case *notificationBlockDisconnected: - block := (*util.Block)(n) - - if len(blockNotifications) != 0 { - m.notifyBlockDisconnected(blockNotifications, - block) - m.notifyFilteredBlockDisconnected(blockNotifications, + m.notifyFilteredBlockAdded(blockNotifications, block) } @@ -688,17 +664,17 @@ func (m *wsNotificationManager) subscribedClients(tx *util.Tx, return subscribed } -// notifyBlockConnected notifies websocket clients that have registered for -// block updates when a block is connected to the main chain. -func (*wsNotificationManager) notifyBlockConnected(clients map[chan struct{}]*wsClient, +// notifyBlockAdded notifies websocket clients that have registered for +// block updates when a block is added to the blockDAG. +func (*wsNotificationManager) notifyBlockAdded(clients map[chan struct{}]*wsClient, block *util.Block) { - // Notify interested websocket clients about the connected block. - ntfn := btcjson.NewBlockConnectedNtfn(block.Hash().String(), block.Height(), + // Notify interested websocket clients about the added block. + ntfn := btcjson.NewBlockAddedNtfn(block.Hash().String(), block.Height(), block.MsgBlock().Header.Timestamp.Unix()) marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) if err != nil { - log.Errorf("Failed to marshal block connected notification: "+ + log.Errorf("Failed to marshal block added notification: "+ "%s", err) return } @@ -707,33 +683,9 @@ func (*wsNotificationManager) notifyBlockConnected(clients map[chan struct{}]*ws } } -// notifyBlockDisconnected notifies websocket clients that have registered for -// block updates when a block is disconnected from the main chain (due to a -// reorganize). -func (*wsNotificationManager) notifyBlockDisconnected(clients map[chan struct{}]*wsClient, block *util.Block) { - // Skip notification creation if no clients have requested block - // connected/disconnected notifications. - if len(clients) == 0 { - return - } - - // Notify interested websocket clients about the disconnected block. - ntfn := btcjson.NewBlockDisconnectedNtfn(block.Hash().String(), - block.Height(), block.MsgBlock().Header.Timestamp.Unix()) - marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) - if err != nil { - log.Errorf("Failed to marshal block disconnected "+ - "notification: %s", err) - return - } - for _, wsc := range clients { - wsc.QueueNotification(marshalledJSON) - } -} - -// notifyFilteredBlockConnected notifies websocket clients that have registered for -// block updates when a block is connected to the main chain. -func (m *wsNotificationManager) notifyFilteredBlockConnected(clients map[chan struct{}]*wsClient, +// notifyFilteredBlockAdded notifies websocket clients that have registered for +// block updates when a block is added to the blockDAG. +func (m *wsNotificationManager) notifyFilteredBlockAdded(clients map[chan struct{}]*wsClient, block *util.Block) { // Create the common portion of the notification that is the same for @@ -742,10 +694,10 @@ func (m *wsNotificationManager) notifyFilteredBlockConnected(clients map[chan st err := block.MsgBlock().Header.Serialize(&w) if err != nil { log.Errorf("Failed to serialize header for filtered block "+ - "connected notification: %s", err) + "added notification: %s", err) return } - ntfn := btcjson.NewFilteredBlockConnectedNtfn(block.Height(), + ntfn := btcjson.NewFilteredBlockAddedNtfn(block.Height(), hex.EncodeToString(w.Bytes()), nil) // Search for relevant transactions for each client and save them @@ -776,38 +728,6 @@ func (m *wsNotificationManager) notifyFilteredBlockConnected(clients map[chan st } } -// notifyFilteredBlockDisconnected notifies websocket clients that have registered for -// block updates when a block is disconnected from the main chain (due to a -// reorganize). -func (*wsNotificationManager) notifyFilteredBlockDisconnected(clients map[chan struct{}]*wsClient, - block *util.Block) { - // Skip notification creation if no clients have requested block - // connected/disconnected notifications. - if len(clients) == 0 { - return - } - - // Notify interested websocket clients about the disconnected block. - var w bytes.Buffer - err := block.MsgBlock().Header.Serialize(&w) - if err != nil { - log.Errorf("Failed to serialize header for filtered block "+ - "disconnected notification: %s", err) - return - } - ntfn := btcjson.NewFilteredBlockDisconnectedNtfn(block.Height(), - hex.EncodeToString(w.Bytes())) - marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) - if err != nil { - log.Errorf("Failed to marshal filtered block disconnected "+ - "notification: %s", err) - return - } - for _, wsc := range clients { - wsc.QueueNotification(marshalledJSON) - } -} - // RegisterNewMempoolTxsUpdates requests notifications to the passed websocket // client when new transactions are added to the memory pool. func (m *wsNotificationManager) RegisterNewMempoolTxsUpdates(wsc *wsClient) { @@ -902,8 +822,8 @@ func (m *wsNotificationManager) notifyForNewTx(clients map[chan struct{}]*wsClie } // RegisterSpentRequests requests a notification when each of the passed -// outpoints is confirmed spent (contained in a block connected to the main -// chain) for the passed websocket client. The request is automatically +// outpoints is confirmed spent (contained in a block added to the blockDAG) +// for the passed websocket client. The request is automatically // removed once the notification has been sent. func (m *wsNotificationManager) RegisterSpentRequests(wsc *wsClient, ops []*wire.OutPoint) { m.queueNotification <- ¬ificationRegisterSpent{ @@ -952,7 +872,7 @@ func (m *wsNotificationManager) addSpentRequests(opMap map[wire.OutPoint]map[cha // UnregisterSpentRequest removes a request from the passed websocket client // to be notified when the passed outpoint is confirmed spent (contained in a -// block connected to the main chain). +// block added to the blockDAG). func (m *wsNotificationManager) UnregisterSpentRequest(wsc *wsClient, op *wire.OutPoint) { m.queueNotification <- ¬ificationUnregisterSpent{ wsc: wsc,