[DEV-319] Update WS-API call notifyNewTransactions to allow filtering by SubNetworkID (#174)

* [DEV-319] Implemented transaction data sending logic.

* [DEV-319] Implemented NotifyNewTransactions command validation.

* [DEV-319] Reduced some duplication in notifyForNewTx.

* [DEV-319] Renamed a parameter for clarity.

* [DEV-319] Added a test for marshalling/unmarshalling the new varient of notifyNewTransactions.

* [DEV-319] Added a check in handleNotifyNewTransactions to avoid unnecessary validation.

* [DEV-319] Added comments to explain the initialization of marshalledJSONVerboseFull and marshalledJSONVerbosePartial.
This commit is contained in:
stasatdaglabs 2019-01-24 18:18:03 +02:00 committed by Ori Newman
parent 9f93a1c50b
commit 9a2eee78a4
6 changed files with 139 additions and 39 deletions

View File

@ -43,7 +43,8 @@ func NewStopNotifyBlocksCmd() *StopNotifyBlocksCmd {
// NotifyNewTransactionsCmd defines the notifyNewTransactions JSON-RPC command. // NotifyNewTransactionsCmd defines the notifyNewTransactions JSON-RPC command.
type NotifyNewTransactionsCmd struct { type NotifyNewTransactionsCmd struct {
Verbose *bool `jsonrpcdefault:"false"` Verbose *bool `jsonrpcdefault:"false"`
Subnetwork *string
} }
// NewNotifyNewTransactionsCmd returns a new instance which can be used to issue // NewNotifyNewTransactionsCmd returns a new instance which can be used to issue
@ -51,9 +52,10 @@ type NotifyNewTransactionsCmd struct {
// //
// The parameters which are pointers indicate they are optional. Passing nil // The parameters which are pointers indicate they are optional. Passing nil
// for optional parameters will use the default value. // for optional parameters will use the default value.
func NewNotifyNewTransactionsCmd(verbose *bool) *NotifyNewTransactionsCmd { func NewNotifyNewTransactionsCmd(verbose *bool, subnetworkID *string) *NotifyNewTransactionsCmd {
return &NotifyNewTransactionsCmd{ return &NotifyNewTransactionsCmd{
Verbose: verbose, Verbose: verbose,
Subnetwork: subnetworkID,
} }
} }

View File

@ -69,7 +69,7 @@ func TestDAGSvrWsCmds(t *testing.T) {
return btcjson.NewCmd("notifyNewTransactions") return btcjson.NewCmd("notifyNewTransactions")
}, },
staticCmd: func() interface{} { staticCmd: func() interface{} {
return btcjson.NewNotifyNewTransactionsCmd(nil) return btcjson.NewNotifyNewTransactionsCmd(nil, nil)
}, },
marshalled: `{"jsonrpc":"1.0","method":"notifyNewTransactions","params":[],"id":1}`, marshalled: `{"jsonrpc":"1.0","method":"notifyNewTransactions","params":[],"id":1}`,
unmarshalled: &btcjson.NotifyNewTransactionsCmd{ unmarshalled: &btcjson.NotifyNewTransactionsCmd{
@ -82,13 +82,27 @@ func TestDAGSvrWsCmds(t *testing.T) {
return btcjson.NewCmd("notifyNewTransactions", true) return btcjson.NewCmd("notifyNewTransactions", true)
}, },
staticCmd: func() interface{} { staticCmd: func() interface{} {
return btcjson.NewNotifyNewTransactionsCmd(btcjson.Bool(true)) return btcjson.NewNotifyNewTransactionsCmd(btcjson.Bool(true), nil)
}, },
marshalled: `{"jsonrpc":"1.0","method":"notifyNewTransactions","params":[true],"id":1}`, marshalled: `{"jsonrpc":"1.0","method":"notifyNewTransactions","params":[true],"id":1}`,
unmarshalled: &btcjson.NotifyNewTransactionsCmd{ unmarshalled: &btcjson.NotifyNewTransactionsCmd{
Verbose: btcjson.Bool(true), Verbose: btcjson.Bool(true),
}, },
}, },
{
name: "notifyNewTransactions optional 2",
newCmd: func() (interface{}, error) {
return btcjson.NewCmd("notifyNewTransactions", true, "0000000000000000000000000000000000000123")
},
staticCmd: func() interface{} {
return btcjson.NewNotifyNewTransactionsCmd(btcjson.Bool(true), btcjson.String("0000000000000000000000000000000000000123"))
},
marshalled: `{"jsonrpc":"1.0","method":"notifyNewTransactions","params":[true,"0000000000000000000000000000000000000123"],"id":1}`,
unmarshalled: &btcjson.NotifyNewTransactionsCmd{
Verbose: btcjson.Bool(true),
Subnetwork: btcjson.String("0000000000000000000000000000000000000123"),
},
},
{ {
name: "stopNotifyNewTransactions", name: "stopNotifyNewTransactions",
newCmd: func() (interface{}, error) { newCmd: func() (interface{}, error) {

View File

@ -246,8 +246,8 @@ func (c *Client) trackRegisteredNtfns(cmd interface{}) {
c.ntfnState.notifyNewTxVerbose = true c.ntfnState.notifyNewTxVerbose = true
} else { } else {
c.ntfnState.notifyNewTx = true c.ntfnState.notifyNewTx = true
} }
c.ntfnState.notifyNewTxSubnetworkID = bcmd.Subnetwork
case *btcjson.NotifySpentCmd: case *btcjson.NotifySpentCmd:
for _, op := range bcmd.OutPoints { for _, op := range bcmd.OutPoints {
@ -517,7 +517,7 @@ func (c *Client) reregisterNtfns() error {
if stateCopy.notifyNewTx || stateCopy.notifyNewTxVerbose { if stateCopy.notifyNewTx || stateCopy.notifyNewTxVerbose {
log.Debugf("Reregistering [notifynewtransactions] (verbose=%v)", log.Debugf("Reregistering [notifynewtransactions] (verbose=%v)",
stateCopy.notifyNewTxVerbose) stateCopy.notifyNewTxVerbose)
err := c.NotifyNewTransactions(stateCopy.notifyNewTxVerbose) err := c.NotifyNewTransactions(stateCopy.notifyNewTxVerbose, stateCopy.notifyNewTxSubnetworkID)
if err != nil { if err != nil {
return err return err
} }

View File

@ -32,11 +32,12 @@ var (
// registered notification so the state can be automatically re-established on // registered notification so the state can be automatically re-established on
// reconnect. // reconnect.
type notificationState struct { type notificationState struct {
notifyBlocks bool notifyBlocks bool
notifyNewTx bool notifyNewTx bool
notifyNewTxVerbose bool notifyNewTxVerbose bool
notifyReceived map[string]struct{} notifyNewTxSubnetworkID *string
notifySpent map[btcjson.OutPoint]struct{} notifyReceived map[string]struct{}
notifySpent map[btcjson.OutPoint]struct{}
} }
// Copy returns a deep copy of the receiver. // Copy returns a deep copy of the receiver.
@ -45,6 +46,7 @@ func (s *notificationState) Copy() *notificationState {
stateCopy.notifyBlocks = s.notifyBlocks stateCopy.notifyBlocks = s.notifyBlocks
stateCopy.notifyNewTx = s.notifyNewTx stateCopy.notifyNewTx = s.notifyNewTx
stateCopy.notifyNewTxVerbose = s.notifyNewTxVerbose stateCopy.notifyNewTxVerbose = s.notifyNewTxVerbose
stateCopy.notifyNewTxSubnetworkID = s.notifyNewTxSubnetworkID
stateCopy.notifyReceived = make(map[string]struct{}) stateCopy.notifyReceived = make(map[string]struct{})
for addr := range s.notifyReceived { for addr := range s.notifyReceived {
stateCopy.notifyReceived[addr] = struct{}{} stateCopy.notifyReceived[addr] = struct{}{}
@ -1006,7 +1008,7 @@ func (r FutureNotifyNewTransactionsResult) Receive() error {
// See NotifyNewTransactionsAsync for the blocking version and more details. // See NotifyNewTransactionsAsync for the blocking version and more details.
// //
// NOTE: This is a btcd extension and requires a websocket connection. // NOTE: This is a btcd extension and requires a websocket connection.
func (c *Client) NotifyNewTransactionsAsync(verbose bool) FutureNotifyNewTransactionsResult { func (c *Client) NotifyNewTransactionsAsync(verbose bool, subnetworkID *string) FutureNotifyNewTransactionsResult {
// Not supported in HTTP POST mode. // Not supported in HTTP POST mode.
if c.config.HTTPPostMode { if c.config.HTTPPostMode {
return newFutureError(ErrWebsocketsRequired) return newFutureError(ErrWebsocketsRequired)
@ -1018,7 +1020,7 @@ func (c *Client) NotifyNewTransactionsAsync(verbose bool) FutureNotifyNewTransac
return newNilFutureResult() return newNilFutureResult()
} }
cmd := btcjson.NewNotifyNewTransactionsCmd(&verbose) cmd := btcjson.NewNotifyNewTransactionsCmd(&verbose, subnetworkID)
return c.sendCmd(cmd) return c.sendCmd(cmd)
} }
@ -1033,8 +1035,8 @@ func (c *Client) NotifyNewTransactionsAsync(verbose bool) FutureNotifyNewTransac
// true). // true).
// //
// NOTE: This is a btcd extension and requires a websocket connection. // NOTE: This is a btcd extension and requires a websocket connection.
func (c *Client) NotifyNewTransactions(verbose bool) error { func (c *Client) NotifyNewTransactions(verbose bool, subnetworkID *string) error {
return c.NotifyNewTransactionsAsync(verbose).Receive() return c.NotifyNewTransactionsAsync(verbose, subnetworkID).Receive()
} }
// FutureNotifyReceivedResult is a future promise to deliver the result of a // FutureNotifyReceivedResult is a future promise to deliver the result of a

View File

@ -601,8 +601,9 @@ var helpDescsEnUS = map[string]string{
"stopNotifyBlocks--synopsis": "Cancel registered notifications for whenever a block is connected or disconnected from the main (best) chain.", "stopNotifyBlocks--synopsis": "Cancel registered notifications for whenever a block is connected or disconnected from the main (best) chain.",
// NotifyNewTransactionsCmd help. // NotifyNewTransactionsCmd help.
"notifyNewTransactions--synopsis": "Send either a txaccepted or a txacceptedverbose notification when a new transaction is accepted into the mempool.", "notifyNewTransactions--synopsis": "Send either a txaccepted or a txacceptedverbose notification when a new transaction is accepted into the mempool.",
"notifyNewTransactions-verbose": "Specifies which type of notification to receive. If verbose is true, then the caller receives txacceptedverbose, otherwise the caller receives txaccepted", "notifyNewTransactions-verbose": "Specifies which type of notification to receive. If verbose is true, then the caller receives txacceptedverbose, otherwise the caller receives txaccepted",
"notifyNewTransactions-subnetwork": "Specifies which subnetwork to receive full transactions of. Requires verbose=true. Not allowed when node subnetwork is Native. Must be equal to node subnetwork when node is partial.",
// StopNotifyNewTransactionsCmd help. // StopNotifyNewTransactionsCmd help.
"stopNotifyNewTransactions--synopsis": "Stop sending either a txaccepted or a txacceptedverbose notification when a new transaction is accepted into the mempool.", "stopNotifyNewTransactions--synopsis": "Stop sending either a txaccepted or a txacceptedverbose notification when a new transaction is accepted into the mempool.",

View File

@ -15,6 +15,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/daglabs/btcd/util/subnetworkid"
"io" "io"
"sync" "sync"
"time" "time"
@ -835,31 +836,63 @@ func (m *wsNotificationManager) notifyForNewTx(clients map[chan struct{}]*wsClie
return return
} }
var verboseNtfn *btcjson.TxAcceptedVerboseNtfn // To avoid unnecessary marshalling of verbose transactions, only initialize
var marshalledJSONVerbose []byte // marshalledJSONVerboseFull and marshalledJSONVerbosePartial if required.
// Note: both are initialized at the same time
// Note: for simplicity's sake, this operation modifies mtx in place
var marshalledJSONVerboseFull []byte
var marshalledJSONVerbosePartial []byte
initializeMarshalledJSONVerbose := func() bool {
net := m.server.cfg.DAGParams
build := func() ([]byte, bool) {
rawTx, err := createTxRawResult(net, mtx, txIDStr, nil, "", 0, 0, nil)
if err != nil {
return nil, false
}
verboseNtfn := btcjson.NewTxAcceptedVerboseNtfn(*rawTx)
marshalledJSONVerbose, err := btcjson.MarshalCmd(nil, verboseNtfn)
if err != nil {
log.Errorf("Failed to marshal verbose tx notification: %s", err.Error())
return nil, false
}
return marshalledJSONVerbose, true
}
// First, build the given mtx for a Full version of the transaction
var ok bool
marshalledJSONVerboseFull, ok = build()
if !ok {
return false
}
// Second, modify the given mtx to make it partial
mtx.Payload = []byte{}
// Third, build again, now with the modified mtx, for a Partial version
marshalledJSONVerbosePartial, ok = build()
if !ok {
return false
}
return true
}
for _, wsc := range clients { for _, wsc := range clients {
if wsc.verboseTxUpdates { if wsc.verboseTxUpdates {
if marshalledJSONVerbose != nil { if marshalledJSONVerboseFull == nil {
wsc.QueueNotification(marshalledJSONVerbose) ok := initializeMarshalledJSONVerbose()
continue if !ok {
return
}
} }
net := m.server.cfg.DAGParams nodeSubnetworkID := m.server.cfg.DAG.SubnetworkID()
rawTx, err := createTxRawResult(net, mtx, txIDStr, nil, if wsc.subnetworkIDForTxUpdates == nil || wsc.subnetworkIDForTxUpdates.IsEqual(nodeSubnetworkID) {
"", 0, 0, nil) wsc.QueueNotification(marshalledJSONVerboseFull)
if err != nil { } else {
return wsc.QueueNotification(marshalledJSONVerbosePartial)
} }
verboseNtfn = btcjson.NewTxAcceptedVerboseNtfn(*rawTx)
marshalledJSONVerbose, err = btcjson.MarshalCmd(nil,
verboseNtfn)
if err != nil {
log.Errorf("Failed to marshal verbose tx "+
"notification: %s", err.Error())
return
}
wsc.QueueNotification(marshalledJSONVerbose)
} else { } else {
wsc.QueueNotification(marshalledJSON) wsc.QueueNotification(marshalledJSON)
} }
@ -1275,6 +1308,10 @@ type wsClient struct {
// information about all new transactions. // information about all new transactions.
verboseTxUpdates bool verboseTxUpdates bool
// subnetworkIDForTxUpdates specifies whether a client has requested to receive
// new transaction information from a specific subnetwork.
subnetworkIDForTxUpdates *subnetworkid.SubnetworkID
// addrRequests is a set of addresses the caller has requested to be // addrRequests is a set of addresses the caller has requested to be
// notified about. It is maintained here so all requests can be removed // notified about. It is maintained here so all requests can be removed
// when a wallet disconnects. Owned by the notification manager. // when a wallet disconnects. Owned by the notification manager.
@ -1868,7 +1905,51 @@ func handleNotifyNewTransactions(wsc *wsClient, icmd interface{}) (interface{},
return nil, btcjson.ErrRPCInternal return nil, btcjson.ErrRPCInternal
} }
wsc.verboseTxUpdates = cmd.Verbose != nil && *cmd.Verbose isVerbose := cmd.Verbose != nil && *cmd.Verbose
if isVerbose == false && cmd.Subnetwork != nil {
return nil, &btcjson.RPCError{
Code: btcjson.ErrRPCInvalidParameter,
Message: "Subnetwork switch is only allowed if verbose=true",
}
}
var subnetworkID *subnetworkid.SubnetworkID
if cmd.Subnetwork != nil {
var err error
subnetworkID, err = subnetworkid.NewFromStr(*cmd.Subnetwork)
if err != nil {
return nil, &btcjson.RPCError{
Code: btcjson.ErrRPCInvalidParameter,
Message: "Subnetwork is malformed",
}
}
}
if isVerbose {
nodeSubnetworkID := wsc.server.cfg.DAG.SubnetworkID()
if nodeSubnetworkID.IsEqual(&wire.SubnetworkIDNative) && subnetworkID != nil {
return nil, &btcjson.RPCError{
Code: btcjson.ErrRPCInvalidParameter,
Message: "Subnetwork switch is disabled when node is in Native subnetwork",
}
} else if !nodeSubnetworkID.IsEqual(&wire.SubnetworkIDSupportsAll) {
if subnetworkID == nil {
return nil, &btcjson.RPCError{
Code: btcjson.ErrRPCInvalidParameter,
Message: "Subnetwork switch is required when node is partial",
}
}
if !nodeSubnetworkID.IsEqual(subnetworkID) {
return nil, &btcjson.RPCError{
Code: btcjson.ErrRPCInvalidParameter,
Message: "Subnetwork must equal the node's subnetwork when the node is partial",
}
}
}
}
wsc.verboseTxUpdates = isVerbose
wsc.subnetworkIDForTxUpdates = subnetworkID
wsc.server.ntfnMgr.RegisterNewMempoolTxsUpdates(wsc) wsc.server.ntfnMgr.RegisterNewMempoolTxsUpdates(wsc)
return nil, nil return nil, nil
} }