From 9a2eee78a4ca2f7c64b4b94b2b93197f70a7b3f3 Mon Sep 17 00:00:00 2001 From: stasatdaglabs <39559713+stasatdaglabs@users.noreply.github.com> Date: Thu, 24 Jan 2019 18:18:03 +0200 Subject: [PATCH] [DEV-319] Update WS-API call notifyNewTransactions to allow filtering by SubNetworkID (#174) * [DEV-319] Implemented transaction data sending logic. * [DEV-319] Implemented NotifyNewTransactions command validation. * [DEV-319] Reduced some duplication in notifyForNewTx. * [DEV-319] Renamed a parameter for clarity. * [DEV-319] Added a test for marshalling/unmarshalling the new varient of notifyNewTransactions. * [DEV-319] Added a check in handleNotifyNewTransactions to avoid unnecessary validation. * [DEV-319] Added comments to explain the initialization of marshalledJSONVerboseFull and marshalledJSONVerbosePartial. --- btcjson/dagsvrwscmds.go | 8 ++- btcjson/dagsvrwscmds_test.go | 18 ++++- rpcclient/infrastructure.go | 4 +- rpcclient/notify.go | 20 +++--- server/rpc/rpcserverhelp.go | 5 +- server/rpc/rpcwebsocket.go | 123 +++++++++++++++++++++++++++++------ 6 files changed, 139 insertions(+), 39 deletions(-) diff --git a/btcjson/dagsvrwscmds.go b/btcjson/dagsvrwscmds.go index 88c186e56..d5dca180f 100644 --- a/btcjson/dagsvrwscmds.go +++ b/btcjson/dagsvrwscmds.go @@ -43,7 +43,8 @@ func NewStopNotifyBlocksCmd() *StopNotifyBlocksCmd { // NotifyNewTransactionsCmd defines the notifyNewTransactions JSON-RPC command. type NotifyNewTransactionsCmd struct { - Verbose *bool `jsonrpcdefault:"false"` + Verbose *bool `jsonrpcdefault:"false"` + Subnetwork *string } // NewNotifyNewTransactionsCmd returns a new instance which can be used to issue @@ -51,9 +52,10 @@ type NotifyNewTransactionsCmd struct { // // The parameters which are pointers indicate they are optional. Passing nil // for optional parameters will use the default value. -func NewNotifyNewTransactionsCmd(verbose *bool) *NotifyNewTransactionsCmd { +func NewNotifyNewTransactionsCmd(verbose *bool, subnetworkID *string) *NotifyNewTransactionsCmd { return &NotifyNewTransactionsCmd{ - Verbose: verbose, + Verbose: verbose, + Subnetwork: subnetworkID, } } diff --git a/btcjson/dagsvrwscmds_test.go b/btcjson/dagsvrwscmds_test.go index 482407be8..5b7c9dae1 100644 --- a/btcjson/dagsvrwscmds_test.go +++ b/btcjson/dagsvrwscmds_test.go @@ -69,7 +69,7 @@ func TestDAGSvrWsCmds(t *testing.T) { return btcjson.NewCmd("notifyNewTransactions") }, staticCmd: func() interface{} { - return btcjson.NewNotifyNewTransactionsCmd(nil) + return btcjson.NewNotifyNewTransactionsCmd(nil, nil) }, marshalled: `{"jsonrpc":"1.0","method":"notifyNewTransactions","params":[],"id":1}`, unmarshalled: &btcjson.NotifyNewTransactionsCmd{ @@ -82,13 +82,27 @@ func TestDAGSvrWsCmds(t *testing.T) { return btcjson.NewCmd("notifyNewTransactions", true) }, staticCmd: func() interface{} { - return btcjson.NewNotifyNewTransactionsCmd(btcjson.Bool(true)) + return btcjson.NewNotifyNewTransactionsCmd(btcjson.Bool(true), nil) }, marshalled: `{"jsonrpc":"1.0","method":"notifyNewTransactions","params":[true],"id":1}`, unmarshalled: &btcjson.NotifyNewTransactionsCmd{ Verbose: btcjson.Bool(true), }, }, + { + name: "notifyNewTransactions optional 2", + newCmd: func() (interface{}, error) { + return btcjson.NewCmd("notifyNewTransactions", true, "0000000000000000000000000000000000000123") + }, + staticCmd: func() interface{} { + return btcjson.NewNotifyNewTransactionsCmd(btcjson.Bool(true), btcjson.String("0000000000000000000000000000000000000123")) + }, + marshalled: `{"jsonrpc":"1.0","method":"notifyNewTransactions","params":[true,"0000000000000000000000000000000000000123"],"id":1}`, + unmarshalled: &btcjson.NotifyNewTransactionsCmd{ + Verbose: btcjson.Bool(true), + Subnetwork: btcjson.String("0000000000000000000000000000000000000123"), + }, + }, { name: "stopNotifyNewTransactions", newCmd: func() (interface{}, error) { diff --git a/rpcclient/infrastructure.go b/rpcclient/infrastructure.go index 951e0c239..5fe5e779c 100644 --- a/rpcclient/infrastructure.go +++ b/rpcclient/infrastructure.go @@ -246,8 +246,8 @@ func (c *Client) trackRegisteredNtfns(cmd interface{}) { c.ntfnState.notifyNewTxVerbose = true } else { c.ntfnState.notifyNewTx = true - } + c.ntfnState.notifyNewTxSubnetworkID = bcmd.Subnetwork case *btcjson.NotifySpentCmd: for _, op := range bcmd.OutPoints { @@ -517,7 +517,7 @@ func (c *Client) reregisterNtfns() error { if stateCopy.notifyNewTx || stateCopy.notifyNewTxVerbose { log.Debugf("Reregistering [notifynewtransactions] (verbose=%v)", stateCopy.notifyNewTxVerbose) - err := c.NotifyNewTransactions(stateCopy.notifyNewTxVerbose) + err := c.NotifyNewTransactions(stateCopy.notifyNewTxVerbose, stateCopy.notifyNewTxSubnetworkID) if err != nil { return err } diff --git a/rpcclient/notify.go b/rpcclient/notify.go index b8abb6156..858036c02 100644 --- a/rpcclient/notify.go +++ b/rpcclient/notify.go @@ -32,11 +32,12 @@ var ( // registered notification so the state can be automatically re-established on // reconnect. type notificationState struct { - notifyBlocks bool - notifyNewTx bool - notifyNewTxVerbose bool - notifyReceived map[string]struct{} - notifySpent map[btcjson.OutPoint]struct{} + notifyBlocks bool + notifyNewTx bool + notifyNewTxVerbose bool + notifyNewTxSubnetworkID *string + notifyReceived map[string]struct{} + notifySpent map[btcjson.OutPoint]struct{} } // Copy returns a deep copy of the receiver. @@ -45,6 +46,7 @@ func (s *notificationState) Copy() *notificationState { stateCopy.notifyBlocks = s.notifyBlocks stateCopy.notifyNewTx = s.notifyNewTx stateCopy.notifyNewTxVerbose = s.notifyNewTxVerbose + stateCopy.notifyNewTxSubnetworkID = s.notifyNewTxSubnetworkID stateCopy.notifyReceived = make(map[string]struct{}) for addr := range s.notifyReceived { stateCopy.notifyReceived[addr] = struct{}{} @@ -1006,7 +1008,7 @@ func (r FutureNotifyNewTransactionsResult) Receive() error { // See NotifyNewTransactionsAsync for the blocking version and more details. // // NOTE: This is a btcd extension and requires a websocket connection. -func (c *Client) NotifyNewTransactionsAsync(verbose bool) FutureNotifyNewTransactionsResult { +func (c *Client) NotifyNewTransactionsAsync(verbose bool, subnetworkID *string) FutureNotifyNewTransactionsResult { // Not supported in HTTP POST mode. if c.config.HTTPPostMode { return newFutureError(ErrWebsocketsRequired) @@ -1018,7 +1020,7 @@ func (c *Client) NotifyNewTransactionsAsync(verbose bool) FutureNotifyNewTransac return newNilFutureResult() } - cmd := btcjson.NewNotifyNewTransactionsCmd(&verbose) + cmd := btcjson.NewNotifyNewTransactionsCmd(&verbose, subnetworkID) return c.sendCmd(cmd) } @@ -1033,8 +1035,8 @@ func (c *Client) NotifyNewTransactionsAsync(verbose bool) FutureNotifyNewTransac // true). // // NOTE: This is a btcd extension and requires a websocket connection. -func (c *Client) NotifyNewTransactions(verbose bool) error { - return c.NotifyNewTransactionsAsync(verbose).Receive() +func (c *Client) NotifyNewTransactions(verbose bool, subnetworkID *string) error { + return c.NotifyNewTransactionsAsync(verbose, subnetworkID).Receive() } // FutureNotifyReceivedResult is a future promise to deliver the result of a diff --git a/server/rpc/rpcserverhelp.go b/server/rpc/rpcserverhelp.go index 4ae0ae5ac..6e268e7cd 100644 --- a/server/rpc/rpcserverhelp.go +++ b/server/rpc/rpcserverhelp.go @@ -601,8 +601,9 @@ var helpDescsEnUS = map[string]string{ "stopNotifyBlocks--synopsis": "Cancel registered notifications for whenever a block is connected or disconnected from the main (best) chain.", // NotifyNewTransactionsCmd help. - "notifyNewTransactions--synopsis": "Send either a txaccepted or a txacceptedverbose notification when a new transaction is accepted into the mempool.", - "notifyNewTransactions-verbose": "Specifies which type of notification to receive. If verbose is true, then the caller receives txacceptedverbose, otherwise the caller receives txaccepted", + "notifyNewTransactions--synopsis": "Send either a txaccepted or a txacceptedverbose notification when a new transaction is accepted into the mempool.", + "notifyNewTransactions-verbose": "Specifies which type of notification to receive. If verbose is true, then the caller receives txacceptedverbose, otherwise the caller receives txaccepted", + "notifyNewTransactions-subnetwork": "Specifies which subnetwork to receive full transactions of. Requires verbose=true. Not allowed when node subnetwork is Native. Must be equal to node subnetwork when node is partial.", // StopNotifyNewTransactionsCmd help. "stopNotifyNewTransactions--synopsis": "Stop sending either a txaccepted or a txacceptedverbose notification when a new transaction is accepted into the mempool.", diff --git a/server/rpc/rpcwebsocket.go b/server/rpc/rpcwebsocket.go index 295b09289..76744098b 100644 --- a/server/rpc/rpcwebsocket.go +++ b/server/rpc/rpcwebsocket.go @@ -15,6 +15,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/daglabs/btcd/util/subnetworkid" "io" "sync" "time" @@ -835,31 +836,63 @@ func (m *wsNotificationManager) notifyForNewTx(clients map[chan struct{}]*wsClie return } - var verboseNtfn *btcjson.TxAcceptedVerboseNtfn - var marshalledJSONVerbose []byte + // To avoid unnecessary marshalling of verbose transactions, only initialize + // marshalledJSONVerboseFull and marshalledJSONVerbosePartial if required. + // Note: both are initialized at the same time + // Note: for simplicity's sake, this operation modifies mtx in place + var marshalledJSONVerboseFull []byte + var marshalledJSONVerbosePartial []byte + initializeMarshalledJSONVerbose := func() bool { + net := m.server.cfg.DAGParams + build := func() ([]byte, bool) { + rawTx, err := createTxRawResult(net, mtx, txIDStr, nil, "", 0, 0, nil) + if err != nil { + return nil, false + } + verboseNtfn := btcjson.NewTxAcceptedVerboseNtfn(*rawTx) + marshalledJSONVerbose, err := btcjson.MarshalCmd(nil, verboseNtfn) + if err != nil { + log.Errorf("Failed to marshal verbose tx notification: %s", err.Error()) + return nil, false + } + + return marshalledJSONVerbose, true + } + + // First, build the given mtx for a Full version of the transaction + var ok bool + marshalledJSONVerboseFull, ok = build() + if !ok { + return false + } + + // Second, modify the given mtx to make it partial + mtx.Payload = []byte{} + + // Third, build again, now with the modified mtx, for a Partial version + marshalledJSONVerbosePartial, ok = build() + if !ok { + return false + } + + return true + } + for _, wsc := range clients { if wsc.verboseTxUpdates { - if marshalledJSONVerbose != nil { - wsc.QueueNotification(marshalledJSONVerbose) - continue + if marshalledJSONVerboseFull == nil { + ok := initializeMarshalledJSONVerbose() + if !ok { + return + } } - net := m.server.cfg.DAGParams - rawTx, err := createTxRawResult(net, mtx, txIDStr, nil, - "", 0, 0, nil) - if err != nil { - return + nodeSubnetworkID := m.server.cfg.DAG.SubnetworkID() + if wsc.subnetworkIDForTxUpdates == nil || wsc.subnetworkIDForTxUpdates.IsEqual(nodeSubnetworkID) { + wsc.QueueNotification(marshalledJSONVerboseFull) + } else { + wsc.QueueNotification(marshalledJSONVerbosePartial) } - - verboseNtfn = btcjson.NewTxAcceptedVerboseNtfn(*rawTx) - marshalledJSONVerbose, err = btcjson.MarshalCmd(nil, - verboseNtfn) - if err != nil { - log.Errorf("Failed to marshal verbose tx "+ - "notification: %s", err.Error()) - return - } - wsc.QueueNotification(marshalledJSONVerbose) } else { wsc.QueueNotification(marshalledJSON) } @@ -1275,6 +1308,10 @@ type wsClient struct { // information about all new transactions. verboseTxUpdates bool + // subnetworkIDForTxUpdates specifies whether a client has requested to receive + // new transaction information from a specific subnetwork. + subnetworkIDForTxUpdates *subnetworkid.SubnetworkID + // addrRequests is a set of addresses the caller has requested to be // notified about. It is maintained here so all requests can be removed // when a wallet disconnects. Owned by the notification manager. @@ -1868,7 +1905,51 @@ func handleNotifyNewTransactions(wsc *wsClient, icmd interface{}) (interface{}, return nil, btcjson.ErrRPCInternal } - wsc.verboseTxUpdates = cmd.Verbose != nil && *cmd.Verbose + isVerbose := cmd.Verbose != nil && *cmd.Verbose + if isVerbose == false && cmd.Subnetwork != nil { + return nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCInvalidParameter, + Message: "Subnetwork switch is only allowed if verbose=true", + } + } + + var subnetworkID *subnetworkid.SubnetworkID + if cmd.Subnetwork != nil { + var err error + subnetworkID, err = subnetworkid.NewFromStr(*cmd.Subnetwork) + if err != nil { + return nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCInvalidParameter, + Message: "Subnetwork is malformed", + } + } + } + + if isVerbose { + nodeSubnetworkID := wsc.server.cfg.DAG.SubnetworkID() + if nodeSubnetworkID.IsEqual(&wire.SubnetworkIDNative) && subnetworkID != nil { + return nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCInvalidParameter, + Message: "Subnetwork switch is disabled when node is in Native subnetwork", + } + } else if !nodeSubnetworkID.IsEqual(&wire.SubnetworkIDSupportsAll) { + if subnetworkID == nil { + return nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCInvalidParameter, + Message: "Subnetwork switch is required when node is partial", + } + } + if !nodeSubnetworkID.IsEqual(subnetworkID) { + return nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCInvalidParameter, + Message: "Subnetwork must equal the node's subnetwork when the node is partial", + } + } + } + } + + wsc.verboseTxUpdates = isVerbose + wsc.subnetworkIDForTxUpdates = subnetworkID wsc.server.ntfnMgr.RegisterNewMempoolTxsUpdates(wsc) return nil, nil }