diff --git a/blockmanager.go b/blockmanager.go index 677bc1696..e26a39d06 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -1014,7 +1014,7 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { // Reinsert all of the transactions (except the coinbase) into // the transaction pool. for _, tx := range block.Transactions()[1:] { - err := b.server.txMemPool.MaybeAcceptTransaction(tx, nil) + err := b.server.txMemPool.MaybeAcceptTransaction(tx, nil, false) if err != nil { // Remove the transaction and all transactions // that depend on it if it wasn't accepted into diff --git a/mempool.go b/mempool.go index df62ab4ae..ea5bbd81f 100644 --- a/mempool.go +++ b/mempool.go @@ -717,7 +717,7 @@ func (mp *txMemPool) FetchTransaction(txHash *btcwire.ShaHash) (*btcutil.Tx, err // more details. // // This function MUST be called with the mempool lock held (for writes). -func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool) error { +func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool, isNew bool) error { if isOrphan != nil { *isOrphan = false } @@ -879,6 +879,10 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool) erro // Notify wallets of mempool transactions to wallet addresses. if mp.server.rpcServer != nil { mp.server.rpcServer.NotifyForTxOuts(tx, nil) + + if isNew { + mp.server.rpcServer.NotifyForNewTx(tx) + } } return nil @@ -892,12 +896,12 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool) erro // or not the transaction is an orphan. // // This function is safe for concurrent access. -func (mp *txMemPool) MaybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool) error { +func (mp *txMemPool) MaybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool, isNew bool) error { // Protect concurrent access. mp.Lock() defer mp.Unlock() - return mp.maybeAcceptTransaction(tx, isOrphan) + return mp.maybeAcceptTransaction(tx, isOrphan, isNew) } // processOrphans determines if there are any orphans which depend on the passed @@ -937,7 +941,7 @@ func (mp *txMemPool) processOrphans(hash *btcwire.ShaHash) error { // Potentially accept the transaction into the // transaction pool. var isOrphan bool - err := mp.maybeAcceptTransaction(tx, &isOrphan) + err := mp.maybeAcceptTransaction(tx, &isOrphan, true) if err != nil { return err } @@ -975,7 +979,7 @@ func (mp *txMemPool) ProcessTransaction(tx *btcutil.Tx) error { // Potentially accept the transaction to the memory pool. var isOrphan bool - err := mp.maybeAcceptTransaction(tx, &isOrphan) + err := mp.maybeAcceptTransaction(tx, &isOrphan, true) if err != nil { return err } diff --git a/rpcwebsocket.go b/rpcwebsocket.go index 4d4a87894..39d4753bd 100644 --- a/rpcwebsocket.go +++ b/rpcwebsocket.go @@ -46,6 +46,7 @@ var wsHandlers = map[string]wsCommandHandler{ "getbestblock": handleGetBestBlock, "notifyblocks": handleNotifyBlocks, "notifynewtxs": handleNotifyNewTXs, + "notifyallnewtxs": handleNotifyAllNewTXs, "notifyspent": handleNotifySpent, "rescan": handleRescan, "sendrawtransaction": handleWalletSendRawTransaction, @@ -83,6 +84,15 @@ func (r *wsContext) AddBlockUpdateRequest(n ntfnChan) { rc.blockUpdates = true } +func (r *wsContext) AddAllNewTxRequest(n ntfnChan, verbose bool) { + r.Lock() + defer r.Unlock() + + rc := r.connections[n] + rc.allTxUpdates = true + rc.verboseTxUpdates = verbose +} + // AddTxRequest adds the request context for new transaction notifications. func (r *wsContext) AddTxRequest(n ntfnChan, addr string) { r.Lock() @@ -251,6 +261,14 @@ type requestContexts struct { // chain. blockUpdates bool + // allTxUpdates specifies whether a client has requested notifications + // for all new transactions. + allTxUpdates bool + + // verboseTxUpdates specifies whether a client has requested more verbose + // information about all new transactions + verboseTxUpdates bool + // txRequests is a set of addresses a wallet has requested transactions // updates for. It is maintained here so all requests can be removed // when a wallet disconnects. @@ -362,6 +380,18 @@ func handleNotifyNewTXs(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interfa return nil, nil } +// handleNotifyAllNewTXs implements the notifyallnewtxs command extension for +// websocket connections. +func handleNotifyAllNewTXs(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) { + cmd, ok := icmd.(*btcws.NotifyAllNewTXsCmd) + if !ok { + return nil, &btcjson.ErrInternal + } + + s.ws.AddAllNewTxRequest(c.n, cmd.Verbose) + return nil, nil +} + // handleNotifySpent implements the notifyspent command extension for // websocket connections. func handleNotifySpent(s *rpcServer, icmd btcjson.Cmd, c handlerChans) (interface{}, *btcjson.Error) { @@ -951,3 +981,35 @@ func (s *rpcServer) NotifyForTxOuts(tx *btcutil.Tx, block *btcutil.Block) { } } } + +// NotifyForNewTx sends delivers the new tx to any client that has +// registered for all new TX. +func (s *rpcServer) NotifyForNewTx(tx *btcutil.Tx) { + txId := tx.Sha().String() + mtx := tx.MsgTx() + + var amount int64 + for _, txOut := range mtx.TxOut { + amount += txOut.Value + } + + ntfn := btcws.NewAllTxNtfn(txId, amount) + var verboseNtfn *btcws.AllVerboseTxNtfn + + for ntfnChan, rc := range s.ws.connections { + if rc.allTxUpdates { + if rc.verboseTxUpdates { + if verboseNtfn == nil { + rawTx, err := createTxRawResult(s.server.btcnet, txId, mtx, nil, 0, nil) + if err != nil { + return + } + verboseNtfn = btcws.NewAllVerboseTxNtfn(rawTx) + } + ntfnChan <- verboseNtfn + } else { + ntfnChan <- ntfn + } + } + } +}