diff --git a/blockmanager.go b/blockmanager.go index 9f182f82a..2907e440a 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -995,12 +995,9 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { b.server.txMemPool.RemoveDoubleSpends(tx) } - // Notify frontends + // Notify registered websocket clients if r := b.server.rpcServer; r != nil { - go func() { - r.ntfnMgr.NotifyBlockTXs(block) - r.ntfnMgr.NotifyBlockConnected(block) - }() + r.ntfnMgr.NotifyBlockConnected(block) } // A block has been disconnected from the main block chain. @@ -1023,9 +1020,9 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { } } - // Notify frontends + // Notify registered websocket clients if r := b.server.rpcServer; r != nil { - go r.ntfnMgr.NotifyBlockDisconnected(block) + r.ntfnMgr.NotifyBlockDisconnected(block) } } } diff --git a/mempool.go b/mempool.go index 299d030db..20c32450c 100644 --- a/mempool.go +++ b/mempool.go @@ -911,13 +911,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isOrphan *bool, isNe // Notify websocket clients about mempool transactions. if mp.server.rpcServer != nil { - go func() { - mp.server.rpcServer.ntfnMgr.NotifyForTx(tx, nil) - - if isNew { - mp.server.rpcServer.ntfnMgr.NotifyForNewTx(tx) - } - }() + mp.server.rpcServer.ntfnMgr.NotifyMempoolTx(tx, isNew) } return nil diff --git a/rpcserver.go b/rpcserver.go index c8f002afc..fefbc297d 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -210,6 +210,8 @@ func (s *rpcServer) Start() { s.wg.Done() }(listener) } + + s.ntfnMgr.Start() } // limitConnections responds with a 503 service unavailable and returns true if @@ -297,6 +299,7 @@ func (s *rpcServer) Stop() error { } } s.ntfnMgr.Shutdown() + s.ntfnMgr.WaitForShutdown() close(s.quit) s.wg.Wait() rpcsLog.Infof("RPC server shutdown complete") diff --git a/rpcwebsocket.go b/rpcwebsocket.go index 48aa17621..137e01412 100644 --- a/rpcwebsocket.go +++ b/rpcwebsocket.go @@ -104,75 +104,263 @@ func (s *rpcServer) WebsocketHandler(conn *websocket.Conn, remoteAddr string, // have registered for and notifies them accordingly. It is also used to keep // track of all connected websocket clients. type wsNotificationManager struct { - sync.Mutex - // server is the RPC server the notification manager is associated with. server *rpcServer + // queueNotification queues a notification for handling. + queueNotification chan interface{} + + // notificationMsgs feeds notificationHandler with notifications + // and client (un)registeration requests from a queue as well as + // registeration and unregisteration requests from clients. + notificationMsgs chan interface{} + + // Access channel for current number of connected clients. + numClients chan int + + // Shutdown handling + wg sync.WaitGroup + quit chan struct{} +} + +// queueHandler manages a queue of empty interfaces, reading from in and +// sending the oldest unsent to out. This handler stops when either of the +// in or quit channels are closed, and closes out before returning, without +// waiting to send any variables still remaining in the queue. +func queueHandler(in <-chan interface{}, out chan<- interface{}, quit <-chan struct{}) { + var q []interface{} + var dequeue chan<- interface{} + skipQueue := out + var next interface{} +out: + for { + select { + case n, ok := <-in: + if !ok { + // Sender closed input channel. + break out + } + + // Either send to out immediately if skipQueue is + // non-nil (queue is empty) and reader is ready, + // or append to the queue and send later. + select { + case skipQueue <- n: + default: + q = append(q, n) + dequeue = out + skipQueue = nil + next = q[0] + } + + case dequeue <- next: + copy(q, q[1:]) + q[len(q)-1] = nil // avoid leak + q = q[:len(q)-1] + if len(q) == 0 { + dequeue = nil + skipQueue = out + } + + case <-quit: + break out + } + } + close(out) +} + +// queueHandler maintains a queue of notifications and notification handler +// control messages. +func (m *wsNotificationManager) queueHandler() { + queueHandler(m.queueNotification, m.notificationMsgs, m.quit) + m.wg.Done() +} + +// NotifyBlockConnected passes a block newly-connected to the best chain +// to the notification manager for block and transaction notification +// processing. +func (m *wsNotificationManager) NotifyBlockConnected(block *btcutil.Block) { + m.queueNotification <- (*notificationBlockConnected)(block) +} + +// NotifyBlockDisconnected passes a block disconnected from the best chain +// to the notification manager for block notification processing. +func (m *wsNotificationManager) NotifyBlockDisconnected(block *btcutil.Block) { + m.queueNotification <- (*notificationBlockDisconnected)(block) +} + +// NotifyMempoolTx passes a transaction accepted by mempool to the +// notification manager for transaction notification processing. If +// isNew is true, the tx is is a new transaction, rather than one +// added to the mempool during a reorg. +func (m *wsNotificationManager) NotifyMempoolTx(tx *btcutil.Tx, isNew bool) { + m.queueNotification <- ¬ificationTxAcceptedByMempool{ + isNew: isNew, + tx: tx, + } +} + +// Notification types +type notificationBlockConnected btcutil.Block +type notificationBlockDisconnected btcutil.Block +type notificationTxAcceptedByMempool struct { + isNew bool + tx *btcutil.Tx +} + +// Notification control requests +type notificationRegisterClient wsClient +type notificationUnregisterClient wsClient +type notificationRegisterBlocks wsClient +type notificationUnregisterBlocks wsClient +type notificationRegisterNewMempoolTxs wsClient +type notificationUnregisterNewMempoolTxs wsClient +type notificationRegisterSpent struct { + wsc *wsClient + op *btcwire.OutPoint +} +type notificationUnregisterSpent struct { + wsc *wsClient + op *btcwire.OutPoint +} +type notificationRegisterAddr struct { + wsc *wsClient + addr string +} +type notificationUnregisterAddr struct { + wsc *wsClient + addr string +} + +// notificationHandler reads notifications and control messages from the queue +// handler and processes one at a time. +func (m *wsNotificationManager) notificationHandler() { // clients is a map of all currently connected websocket clients. - clients map[chan bool]*wsClient + clients := make(map[chan bool]*wsClient) // Maps used to hold lists of websocket clients to be notified on // certain events. Each websocket client also keeps maps for the events // which have multiple triggers to make removal from these lists on // connection close less horrendously expensive. - blockNotifications map[chan bool]*wsClient - txNotifications map[chan bool]*wsClient - spentNotifications map[btcwire.OutPoint]map[chan bool]*wsClient - addrNotifications map[string]map[chan bool]*wsClient + // + // Where possible, the quit channel is used as the unique id for a client + // since it is quite a bit more efficient than using the entire struct. + blockNotifications := make(map[chan bool]*wsClient) + txNotifications := make(map[chan bool]*wsClient) + watchedOutPoints := make(map[btcwire.OutPoint]map[chan bool]*wsClient) + watchedAddrs := make(map[string]map[chan bool]*wsClient) + +out: + for { + select { + case n, ok := <-m.notificationMsgs: + if !ok { + // queueHandler quit. + break out + } + switch n := n.(type) { + case *notificationBlockConnected: + block := (*btcutil.Block)(n) + if len(blockNotifications) != 0 { + m.notifyBlockConnected(blockNotifications, + block) + } + + // Skip iterating through all txs if no + // tx notification requests exist. + if len(watchedOutPoints) == 0 && len(watchedAddrs) == 0 { + continue + } + + for _, tx := range block.Transactions() { + m.notifyForTx(watchedOutPoints, + watchedAddrs, tx, block) + } + + case *notificationBlockDisconnected: + m.notifyBlockDisconnected(blockNotifications, + (*btcutil.Block)(n)) + + case *notificationTxAcceptedByMempool: + if n.isNew && len(txNotifications) != 0 { + m.notifyForNewTx(txNotifications, n.tx) + } + m.notifyForTx(watchedOutPoints, watchedAddrs, n.tx, nil) + + case *notificationRegisterBlocks: + wsc := (*wsClient)(n) + blockNotifications[wsc.quit] = wsc + + case *notificationRegisterClient: + wsc := (*wsClient)(n) + clients[wsc.quit] = wsc + + case *notificationUnregisterClient: + wsc := (*wsClient)(n) + // Remove any requests made by the client as well as + // the client itself. + delete(blockNotifications, wsc.quit) + delete(txNotifications, wsc.quit) + for k := range wsc.spentRequests { + op := k + m.removeSpentRequest(watchedOutPoints, wsc, &op) + } + for addr := range wsc.addrRequests { + m.removeAddrRequest(watchedAddrs, wsc, addr) + } + delete(clients, wsc.quit) + + case *notificationRegisterSpent: + m.addSpentRequest(watchedOutPoints, n.wsc, n.op) + + case *notificationUnregisterSpent: + m.removeSpentRequest(watchedOutPoints, n.wsc, n.op) + + case *notificationRegisterAddr: + m.addAddrRequest(watchedAddrs, n.wsc, n.addr) + + case *notificationUnregisterAddr: + m.removeAddrRequest(watchedAddrs, n.wsc, n.addr) + + default: + rpcsLog.Warn("Unhandled notification type") + } + + case m.numClients <- len(clients): + + case <-m.quit: + // RPC server shutting down. + break out + } + } + + for _, c := range clients { + c.Disconnect() + } + m.wg.Done() } // NumClients returns the number of clients actively being served. -// -// This function is safe for concurrent access. func (m *wsNotificationManager) NumClients() int { - m.Lock() - defer m.Unlock() - - return len(m.clients) + return <-m.numClients } -// AddBlockUpdateRequest requests block update notifications to the passed +// RegisterBlockUpdates requests block update notifications to the passed // websocket client. -// -// This function is safe for concurrent access. -func (m *wsNotificationManager) AddBlockUpdateRequest(wsc *wsClient) { - m.Lock() - defer m.Unlock() - - // Add the client to the map to notify when block updates are seen. - // Use the quit channel as a unique id for the client since it is quite - // a bit more efficient than using the entire struct. - m.blockNotifications[wsc.quit] = wsc +func (m *wsNotificationManager) RegisterBlockUpdates(wsc *wsClient) { + m.queueNotification <- (*notificationRegisterBlocks)(wsc) } -// RemoveBlockUpdateRequest removes block update notifications for the passed +// UnregisterBlockUpdates removes block update notifications for the passed // websocket client. -// -// This function is safe for concurrent access. -func (m *wsNotificationManager) RemoveBlockUpdateRequest(wsc *wsClient) { - m.Lock() - defer m.Unlock() - - // Delete the client from the map to notify when block updates are seen. - // Use the quit channel as a unique id for the client since it is quite - // a bit more efficient than using the entire struct. - delete(m.blockNotifications, wsc.quit) +func (m *wsNotificationManager) UnregisterBlockUpdates(wsc *wsClient) { + m.queueNotification <- (*notificationUnregisterBlocks)(wsc) } -// NotifyBlockConnected notifies websocket clients that have registered for +// notifyBlockConnected notifies websocket clients that have registered for // block updates when a block is connected to the main chain. -// -// This function is safe for concurrent access. -func (m *wsNotificationManager) NotifyBlockConnected(block *btcutil.Block) { - m.Lock() - defer m.Unlock() - - // Nothing to do if there are no websocket clients registered to - // receive notifications that result from a newly connected block. - if len(m.blockNotifications) == 0 { - return - } +func (*wsNotificationManager) notifyBlockConnected(clients map[chan bool]*wsClient, + block *btcutil.Block) { hash, err := block.Sha() if err != nil { @@ -188,23 +376,18 @@ func (m *wsNotificationManager) NotifyBlockConnected(block *btcutil.Block) { "%v", err) return } - for _, wsc := range m.blockNotifications { + for _, wsc := range clients { wsc.QueueNotification(marshalledJSON) } } -// NotifyBlockDisconnected notifies websocket clients that have registered for +// notifyBlockDisconnected notifies websocket clients that have registered for // block updates when a block is disconnected from the main chain (due to a // reorganize). -// -// This function is safe for concurrent access. -func (m *wsNotificationManager) NotifyBlockDisconnected(block *btcutil.Block) { - m.Lock() - defer m.Unlock() - - // Nothing to do if there are no websocket clients registered to - // receive notifications that result from a newly connected block. - if len(m.blockNotifications) == 0 { +func (*wsNotificationManager) notifyBlockDisconnected(clients map[chan bool]*wsClient, block *btcutil.Block) { + // Skip notification creation if no clients have requested block + // connected/disconnected notifications. + if len(clients) == 0 { return } @@ -224,56 +407,27 @@ func (m *wsNotificationManager) NotifyBlockDisconnected(block *btcutil.Block) { "notification: %v", err) return } - for _, wsc := range m.blockNotifications { + for _, wsc := range clients { wsc.QueueNotification(marshalledJSON) } } -// AddNewTxRequest requests notifications to the passed websocket client when -// new transactions are added to the memory pool. -// -// This function is safe for concurrent access. -func (m *wsNotificationManager) AddNewTxRequest(wsc *wsClient) { - m.Lock() - defer m.Unlock() - - // Add the client to the map to notify when a new transaction is added - // to the memory pool. Use the quit channel as a unique id for the - // client since it is quite a bit more efficient than using the entire - // struct. - m.txNotifications[wsc.quit] = wsc +// RegisterNewMempoolTxsUpdates requests notifications to the passed websocket +// client when new transactions are added to the memory pool. +func (m *wsNotificationManager) RegisterNewMempoolTxsUpdates(wsc *wsClient) { + m.queueNotification <- (*notificationRegisterNewMempoolTxs)(wsc) } -// RemoveNewTxRequest removes notifications to the passed websocket client when -// new transaction are added to the memory pool. -// -// This function is safe for concurrent access. -func (m *wsNotificationManager) RemoveNewTxRequest(wsc *wsClient) { - m.Lock() - defer m.Unlock() - - // Delete the client from the map to notify when a new transaction is - // seen in the memory pool. Use the quit channel as a unique id for the - // client since it is quite a bit more efficient than using the entire - // struct. - delete(m.txNotifications, wsc.quit) +// UnregisterNewMempoolTxsUpdates removes notifications to the passed websocket +// client when new transaction are added to the memory pool. +func (m *wsNotificationManager) UnregisterNewMempoolTxsUpdates(wsc *wsClient) { + m.queueNotification <- (*notificationUnregisterNewMempoolTxs)(wsc) } -// NotifyForNewTx notifies websocket clients that have registerd for updates +// notifyForNewTx notifies websocket clients that have registerd for updates // when a new transaction is added to the memory pool. -// -// This function is safe for concurrent access. -func (m *wsNotificationManager) NotifyForNewTx(tx *btcutil.Tx) { - m.Lock() - defer m.Unlock() - - // Nothing to do if there are no websocket clients registered to - // receive notifications about transactions added to the memory pool. - if len(m.txNotifications) == 0 { - return - } - - txID := tx.Sha().String() +func (m *wsNotificationManager) notifyForNewTx(clients map[chan bool]*wsClient, tx *btcutil.Tx) { + txShaStr := tx.Sha().String() mtx := tx.MsgTx() var amount int64 @@ -281,7 +435,7 @@ func (m *wsNotificationManager) NotifyForNewTx(tx *btcutil.Tx) { amount += txOut.Value } - ntfn := btcws.NewAllTxNtfn(txID, amount) + ntfn := btcws.NewAllTxNtfn(txShaStr, amount) marshalledJSON, err := json.Marshal(ntfn) if err != nil { rpcsLog.Errorf("Failed to marshal tx notification: %s", err.Error()) @@ -290,10 +444,12 @@ func (m *wsNotificationManager) NotifyForNewTx(tx *btcutil.Tx) { var verboseNtfn *btcws.AllVerboseTxNtfn var marshalledJSONVerbose []byte - for _, wsc := range m.txNotifications { + for _, wsc := range clients { if wsc.verboseTxUpdates { if verboseNtfn == nil { - rawTx, err := createTxRawResult(m.server.server.btcnet, txID, mtx, nil, 0, nil) + net := m.server.server.btcnet + rawTx, err := createTxRawResult(net, txShaStr, + mtx, nil, 0, nil) if err != nil { return } @@ -312,48 +468,59 @@ func (m *wsNotificationManager) NotifyForNewTx(tx *btcutil.Tx) { } } -// addSpentRequest is the internal function which implements the public -// AddSpentRequest. See the comment for AddSpentRequest for more details. -// -// This function MUST be called with the notification manager lock held. -func (m *wsNotificationManager) addSpentRequest(wsc *wsClient, op *btcwire.OutPoint) { +// RegisterSpentRequest requests an notification when the passed outpoint is +// confirmed spent (contained in a block connected to the main chain) for the +// passed websocket client. The request is automatically removed once the +// notification has been sent. +func (m *wsNotificationManager) RegisterSpentRequest(wsc *wsClient, op *btcwire.OutPoint) { + m.queueNotification <- ¬ificationRegisterSpent{ + wsc: wsc, + op: op, + } +} + +// addSpentRequest modifies a map of watched outpoints to sets of websocket +// clients to add a new request watch the outpoint op and create and send +// a notification when spent to the websocket client wsc. +func (*wsNotificationManager) addSpentRequest(ops map[btcwire.OutPoint]map[chan bool]*wsClient, + wsc *wsClient, op *btcwire.OutPoint) { + // Track the request in the client as well so it can be quickly be // removed on disconnect. wsc.spentRequests[*op] = struct{}{} // Add the client to the list to notify when the outpoint is seen. // Create the list as needed. - cmap, ok := m.spentNotifications[*op] + cmap, ok := ops[*op] if !ok { cmap = make(map[chan bool]*wsClient) - m.spentNotifications[*op] = cmap + ops[*op] = cmap } cmap[wsc.quit] = wsc } -// AddSpentRequest requests an notification when the passed outpoint is -// confirmed spent (contained in a block connected to the main chain) for the -// passed websocket client. The request is automatically removed once the -// notification has been sent. -// -// This function is safe for concurrent access. -func (m *wsNotificationManager) AddSpentRequest(wsc *wsClient, op *btcwire.OutPoint) { - m.Lock() - defer m.Unlock() - - m.addSpentRequest(wsc, op) +// UnregisterSpentRequest removes a request from the passed websocket client +// to be notified when the passed outpoint is confirmed spent (contained in a +// block connected to the main chain). +func (m *wsNotificationManager) UnregisterSpentRequest(wsc *wsClient, op *btcwire.OutPoint) { + m.queueNotification <- ¬ificationUnregisterSpent{ + wsc: wsc, + op: op, + } } -// removeSpentRequest is the internal function which implements the public -// RemoveSpentRequest. See the comment for RemoveSpentRequest for more details. -// -// This function MUST be called with the notification manager lock held. -func (m *wsNotificationManager) removeSpentRequest(wsc *wsClient, op *btcwire.OutPoint) { +// removeSpentRequest modifies a map of watched outpoints to remove the +// websocket client wsc from the set of clients to be notified when a +// watched outpoint is spent. If wsc is the last client, the outpoint +// key is removed from the map. +func (*wsNotificationManager) removeSpentRequest(ops map[btcwire.OutPoint]map[chan bool]*wsClient, + wsc *wsClient, op *btcwire.OutPoint) { + // Remove the request tracking from the client. delete(wsc.spentRequests, *op) // Remove the client from the list to notify. - notifyMap, ok := m.spentNotifications[*op] + notifyMap, ok := ops[*op] if !ok { rpcsLog.Warnf("Attempt to remove nonexistent spent request "+ "for websocket client %s", wsc.addr) @@ -361,25 +528,13 @@ func (m *wsNotificationManager) removeSpentRequest(wsc *wsClient, op *btcwire.Ou } delete(notifyMap, wsc.quit) - // Remove the map entry altogether if there are no more clients - // interested in it. + // Remove the map entry altogether if there are + // no more clients interested in it. if len(notifyMap) == 0 { - delete(m.spentNotifications, *op) + delete(ops, *op) } } -// RemoveSpentRequest removes a request from the passed websocket client to be -// notified when the passed outpoint is confirmed spent (contained in a block -// connected to the main chain). -// -// This function is safe for concurrent access. -func (m *wsNotificationManager) RemoveSpentRequest(wsc *wsClient, op *btcwire.OutPoint) { - m.Lock() - defer m.Unlock() - - m.removeSpentRequest(wsc, op) -} - // txHexString returns the serialized transaction encoded in hexadecimal. func txHexString(tx *btcutil.Tx) string { var buf bytes.Buffer @@ -388,30 +543,52 @@ func txHexString(tx *btcutil.Tx) string { return hex.EncodeToString(buf.Bytes()) } +// blockDetails creates a BlockDetails struct to include in btcws notifications +// from a block and a transaction's block index. +func blockDetails(block *btcutil.Block, txIndex int) *btcws.BlockDetails { + if block == nil { + return nil + } + blockSha, _ := block.Sha() // never errors + return &btcws.BlockDetails{ + Height: int32(block.Height()), + Hash: blockSha.String(), + Index: txIndex, + Time: block.MsgBlock().Header.Timestamp.Unix(), + } +} + +// newRedeemingTxNotification returns a new marshalled redeemingtx notification +// with the passed parameters. +func newRedeemingTxNotification(txHex string, index int, block *btcutil.Block) ([]byte, error) { + // Create and marshal the notification. + ntfn := btcws.NewRedeemingTxNtfn(txHex, blockDetails(block, index)) + return json.Marshal(ntfn) +} + // notifyForTxOuts examines each transaction output, notifying interested // websocket clients of the transaction if an output spends to a watched // address. A spent notification request is automatically registered for // the client for each matching output. -// -// This function MUST be called with the notification manager lock held. -func (m *wsNotificationManager) notifyForTxOuts(tx *btcutil.Tx, block *btcutil.Block) { +func (m *wsNotificationManager) notifyForTxOuts(ops map[btcwire.OutPoint]map[chan bool]*wsClient, + addrs map[string]map[chan bool]*wsClient, tx *btcutil.Tx, block *btcutil.Block) { + // Nothing to do if nobody is listening for address notifications. - if len(m.addrNotifications) == 0 { + if len(addrs) == 0 { return } txHex := "" wscNotified := make(map[chan bool]bool) for i, txOut := range tx.MsgTx().TxOut { - _, addrs, _, err := btcscript.ExtractPkScriptAddrs( + _, txAddrs, _, err := btcscript.ExtractPkScriptAddrs( txOut.PkScript, m.server.server.btcnet) if err != nil { continue } - for _, addr := range addrs { - encodedAddr := addr.EncodeAddress() - cmap, ok := m.addrNotifications[encodedAddr] + for _, txAddr := range txAddrs { + cmap, ok := addrs[txAddr.EncodeAddress()] if !ok { continue } @@ -429,7 +606,7 @@ func (m *wsNotificationManager) notifyForTxOuts(tx *btcutil.Tx, block *btcutil.B op := btcwire.NewOutPoint(tx.Sha(), uint32(i)) for wscQuit, wsc := range cmap { - m.addSpentRequest(wsc, op) + m.addSpentRequest(ops, wsc, op) if !wscNotified[wscQuit] { wscNotified[wscQuit] = true @@ -440,36 +617,29 @@ func (m *wsNotificationManager) notifyForTxOuts(tx *btcutil.Tx, block *btcutil.B } } -// NotifyForTx examines the inputs and outputs of the passed transaction, +// notifyForTx examines the inputs and outputs of the passed transaction, // notifying websocket clients of outputs spending to a watched address // and inputs spending a watched outpoint. -// -// This function is safe for concurrent access. -func (m *wsNotificationManager) NotifyForTx(tx *btcutil.Tx, block *btcutil.Block) { - m.Lock() - defer m.Unlock() +func (m *wsNotificationManager) notifyForTx(ops map[btcwire.OutPoint]map[chan bool]*wsClient, + addrs map[string]map[chan bool]*wsClient, tx *btcutil.Tx, block *btcutil.Block) { - m.notifyForTxIns(tx, block) - m.notifyForTxOuts(tx, block) -} - -// newRedeemingTxNotification returns a new marshalled redeemingtx notification -// with the passed parameters. -func newRedeemingTxNotification(txHex string, index int, block *btcutil.Block) ([]byte, error) { - // Create and marshal the notification. - ntfn := btcws.NewRedeemingTxNtfn(txHex, blockDetails(block, index)) - return json.Marshal(ntfn) + if len(ops) != 0 { + m.notifyForTxIns(ops, tx, block) + } + if len(addrs) != 0 { + m.notifyForTxOuts(ops, addrs, tx, block) + } } // notifyForTxIns examines the inputs of the passed transaction and sends // interested websocket clients a redeemingtx notification if any inputs // spend a watched output. If block is non-nil, any matching spent // requests are removed. -// -// This function MUST be called with the notification manager lock held. -func (m *wsNotificationManager) notifyForTxIns(tx *btcutil.Tx, block *btcutil.Block) { - // Nothing to do if nobody is listening for spent notifications. - if len(m.spentNotifications) == 0 { +func (m *wsNotificationManager) notifyForTxIns(ops map[btcwire.OutPoint]map[chan bool]*wsClient, + tx *btcutil.Tx, block *btcutil.Block) { + + // Nothing to do if nobody is watching outpoints. + if len(ops) == 0 { return } @@ -477,7 +647,7 @@ func (m *wsNotificationManager) notifyForTxIns(tx *btcutil.Tx, block *btcutil.Bl wscNotified := make(map[chan bool]bool) for _, txIn := range tx.MsgTx().TxIn { prevOut := &txIn.PreviousOutpoint - if cmap, ok := m.spentNotifications[*prevOut]; ok { + if cmap, ok := ops[*prevOut]; ok { if txHex == "" { txHex = txHexString(tx) } @@ -488,7 +658,7 @@ func (m *wsNotificationManager) notifyForTxIns(tx *btcutil.Tx, block *btcutil.Bl } for wscQuit, wsc := range cmap { if block != nil { - m.removeSpentRequest(wsc, prevOut) + m.removeSpentRequest(ops, wsc, prevOut) } if !wscNotified[wscQuit] { @@ -500,144 +670,109 @@ func (m *wsNotificationManager) notifyForTxIns(tx *btcutil.Tx, block *btcutil.Bl } } -// NotifyBlockTXs examines the input and outputs of the passed transaction -// and sends websocket clients notifications they are interested in. -// -// This function is safe for concurrent access. -func (m *wsNotificationManager) NotifyBlockTXs(block *btcutil.Block) { - m.Lock() - defer m.Unlock() - - // Nothing to do if there are no websocket clients registered to receive - // notifications about spent outpoints or payments to addresses. - if len(m.spentNotifications) == 0 && len(m.addrNotifications) == 0 { - return - } - - for _, tx := range block.Transactions() { - m.notifyForTxIns(tx, block) - m.notifyForTxOuts(tx, block) +// RegisterTxOutAddressRequest requests notifications to the passed websocket +// client when a transaction output spends to the passed address. +func (m *wsNotificationManager) RegisterTxOutAddressRequest(wsc *wsClient, addr string) { + m.queueNotification <- ¬ificationRegisterAddr{ + wsc: wsc, + addr: addr, } } -func blockDetails(block *btcutil.Block, txIndex int) *btcws.BlockDetails { - if block == nil { - return nil - } - blockSha, _ := block.Sha() // never errors - return &btcws.BlockDetails{ - Height: int32(block.Height()), - Hash: blockSha.String(), - Index: txIndex, - Time: block.MsgBlock().Header.Timestamp.Unix(), - } -} - -// AddAddrRequest requests notifications to the passed websocket client when -// a transaction pays to the passed address. -// -// This function is safe for concurrent access. -func (m *wsNotificationManager) AddAddrRequest(wsc *wsClient, addr string) { - m.Lock() - defer m.Unlock() +// addAddrRequest adds the websocket client wsc to the address to client set +// addrs so wsc will be notified for any mempool or block transaction outputs +// spending to addr. +func (*wsNotificationManager) addAddrRequest(addrs map[string]map[chan bool]*wsClient, + wsc *wsClient, addr string) { // Track the request in the client as well so it can be quickly be // removed on disconnect. wsc.addrRequests[addr] = struct{}{} - // Add the client to the list to notify when the outpoint is seen. - // Create the list as needed. - cmap, ok := m.addrNotifications[addr] + // Add the client to the set of clients to notify when the outpoint is + // seen. Create map as needed. + cmap, ok := addrs[addr] if !ok { cmap = make(map[chan bool]*wsClient) - m.addrNotifications[addr] = cmap + addrs[addr] = cmap } cmap[wsc.quit] = wsc } -// removeAddrRequest is the internal function which implements the public -// RemoveAddrRequest. See the comment for RemoveAddrRequest for more details. -// -// This function MUST be called with the notification manager lock held. -func (m *wsNotificationManager) removeAddrRequest(wsc *wsClient, addr string) { +// UnregisterTxOutAddressRequest removes a request from the passed websocket +// client to be notified when a transaction spends to the passed address. +func (m *wsNotificationManager) UnregisterTxOutAddressRequest(wsc *wsClient, addr string) { + m.queueNotification <- ¬ificationUnregisterAddr{ + wsc: wsc, + addr: addr, + } +} + +// removeAddrRequest removes the websocket client wsc from the address to +// client set addrs so it will no longer receive notification updates for +// any transaction outputs send to addr. +func (*wsNotificationManager) removeAddrRequest(addrs map[string]map[chan bool]*wsClient, + wsc *wsClient, addr string) { + // Remove the request tracking from the client. delete(wsc.addrRequests, addr) // Remove the client from the list to notify. - notifyMap, ok := m.addrNotifications[addr] + cmap, ok := addrs[addr] if !ok { rpcsLog.Warnf("Attempt to remove nonexistent addr request "+ "<%s> for websocket client %s", addr, wsc.addr) return } - delete(notifyMap, wsc.quit) + delete(cmap, wsc.quit) // Remove the map entry altogether if there are no more clients // interested in it. - if len(notifyMap) == 0 { - delete(m.addrNotifications, addr) + if len(cmap) == 0 { + delete(addrs, addr) } } -// RemoveAddrRequest removes a request from the passed websocket client to be -// notified when a transaction pays to the passed address. -// -// This function is safe for concurrent access. -func (m *wsNotificationManager) RemoveAddrRequest(wsc *wsClient, addr string) { - m.Lock() - defer m.Unlock() - - m.removeAddrRequest(wsc, addr) -} - // AddClient adds the passed websocket client to the notification manager. -// -// This function is safe for concurrent access. func (m *wsNotificationManager) AddClient(wsc *wsClient) { - m.Lock() - defer m.Unlock() - - m.clients[wsc.quit] = wsc + m.queueNotification <- (*notificationRegisterClient)(wsc) } // RemoveClient removes the passed websocket client and all notifications // registered for it. -// -// This function is safe for concurrent access. func (m *wsNotificationManager) RemoveClient(wsc *wsClient) { - m.Lock() - defer m.Unlock() - - // Remove any requests made by the client as well as the client itself. - delete(m.blockNotifications, wsc.quit) - delete(m.txNotifications, wsc.quit) - for k := range wsc.spentRequests { - op := k - m.removeSpentRequest(wsc, &op) - } - for addr := range wsc.addrRequests { - m.removeAddrRequest(wsc, addr) - } - delete(m.clients, wsc.quit) + m.queueNotification <- (*notificationUnregisterClient)(wsc) } -// Shutdown disconnects all websocket clients the manager knows about. +// Start starts the goroutines required for the manager to queue and process +// websocket client notifications. +func (m *wsNotificationManager) Start() { + m.wg.Add(2) + go m.queueHandler() + go m.notificationHandler() +} + +// WaitForShutdown blocks until all notification manager goroutines have +// finished. +func (m *wsNotificationManager) WaitForShutdown() { + m.wg.Wait() +} + +// Shutdown shuts down the manager, stopping the notification queue and +// notification handler goroutines. func (m *wsNotificationManager) Shutdown() { - for _, wsc := range m.clients { - wsc.Disconnect() - } + close(m.quit) } // newWsNotificationManager returns a new notification manager ready for use. // See wsNotificationManager for more details. func newWsNotificationManager(server *rpcServer) *wsNotificationManager { return &wsNotificationManager{ - server: server, - clients: make(map[chan bool]*wsClient), - blockNotifications: make(map[chan bool]*wsClient), - txNotifications: make(map[chan bool]*wsClient), - spentNotifications: make(map[btcwire.OutPoint]map[chan bool]*wsClient), - addrNotifications: make(map[string]map[chan bool]*wsClient), + server: server, + queueNotification: make(chan interface{}), + notificationMsgs: make(chan interface{}), + numClients: make(chan int), + quit: make(chan struct{}), } } @@ -1228,7 +1363,7 @@ func handleGetCurrentNet(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson // handleNotifyBlocks implements the notifyblocks command extension for // websocket connections. func handleNotifyBlocks(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error) { - wsc.server.ntfnMgr.AddBlockUpdateRequest(wsc) + wsc.server.ntfnMgr.RegisterBlockUpdates(wsc) return nil, nil } @@ -1240,7 +1375,7 @@ func handleNotifySpent(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.E return nil, &btcjson.ErrInternal } - wsc.server.ntfnMgr.AddSpentRequest(wsc, cmd.OutPoint) + wsc.server.ntfnMgr.RegisterSpentRequest(wsc, cmd.OutPoint) return nil, nil } @@ -1253,7 +1388,7 @@ func handleNotifyAllNewTXs(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjs } wsc.verboseTxUpdates = cmd.Verbose - wsc.server.ntfnMgr.AddNewTxRequest(wsc) + wsc.server.ntfnMgr.RegisterNewMempoolTxsUpdates(wsc) return nil, nil } @@ -1275,7 +1410,7 @@ func handleNotifyNewTXs(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson. return nil, &e } - wsc.server.ntfnMgr.AddAddrRequest(wsc, addr.EncodeAddress()) + wsc.server.ntfnMgr.RegisterTxOutAddressRequest(wsc, addr.EncodeAddress()) } return nil, nil