From b0fecc9f871dc22ca45a415c629f93e2c3d4f393 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Mon, 10 Aug 2020 12:55:24 +0300 Subject: [PATCH] [NOD-1204] Add timestamp and message number to domain messages (#854) --- domainmessage/base_message.go | 24 +++++++++++++++++++ domainmessage/message.go | 5 ++++ domainmessage/msgaddresses.go | 1 + domainmessage/msgblock.go | 1 + domainmessage/msgblocklocator.go | 1 + domainmessage/msgdoneibdblocks.go | 4 +++- domainmessage/msgibdblock.go | 3 ++- domainmessage/msgibdblock_test.go | 4 ++-- domainmessage/msginvrelayblock.go | 1 + domainmessage/msginvtransaction.go | 1 + domainmessage/msgping.go | 1 + domainmessage/msgpong.go | 1 + domainmessage/msgrequestaddresses.go | 1 + domainmessage/msgrequestblocklocator.go | 1 + domainmessage/msgrequestibdblocks.go | 1 + domainmessage/msgrequestnextibdblocks.go | 4 +++- domainmessage/msgrequestrelayblocks.go | 1 + domainmessage/msgrequestselectedtip.go | 4 +++- domainmessage/msgrequesttransactions.go | 1 + domainmessage/msgselectedtip.go | 1 + domainmessage/msgtransactionnotfound.go | 1 + domainmessage/msgtx.go | 1 + domainmessage/msgverack.go | 4 +++- domainmessage/msgversion.go | 1 + .../server/grpcserver/connection_loops.go | 17 +++++++++---- 25 files changed, 74 insertions(+), 11 deletions(-) create mode 100644 domainmessage/base_message.go diff --git a/domainmessage/base_message.go b/domainmessage/base_message.go new file mode 100644 index 000000000..7fec1e6f7 --- /dev/null +++ b/domainmessage/base_message.go @@ -0,0 +1,24 @@ +package domainmessage + +import "time" + +type baseMessage struct { + messageNumber uint64 + receivedAt time.Time +} + +func (b *baseMessage) MessageNumber() uint64 { + return b.messageNumber +} + +func (b *baseMessage) SetMessageNumber(messageNumber uint64) { + b.messageNumber = messageNumber +} + +func (b *baseMessage) ReceivedAt() time.Time { + return b.receivedAt +} + +func (b *baseMessage) SetReceivedAt(receivedAt time.Time) { + b.receivedAt = receivedAt +} diff --git a/domainmessage/message.go b/domainmessage/message.go index 06385e5ff..930d691b5 100644 --- a/domainmessage/message.go +++ b/domainmessage/message.go @@ -6,6 +6,7 @@ package domainmessage import ( "fmt" + "time" ) // MaxMessagePayload is the maximum bytes a message can be regardless of other @@ -79,4 +80,8 @@ var MessageCommandToString = map[MessageCommand]string{ // are used directly in the protocol encoded message. type Message interface { Command() MessageCommand + MessageNumber() uint64 + SetMessageNumber(index uint64) + ReceivedAt() time.Time + SetReceivedAt(receivedAt time.Time) } diff --git a/domainmessage/msgaddresses.go b/domainmessage/msgaddresses.go index 91e9acab4..cf813f1e6 100644 --- a/domainmessage/msgaddresses.go +++ b/domainmessage/msgaddresses.go @@ -24,6 +24,7 @@ const MaxAddressesPerMsg = 1000 // Use the AddAddress function to build up the list of known addresses when // sending an Addresses message to another peer. type MsgAddresses struct { + baseMessage IncludeAllSubnetworks bool SubnetworkID *subnetworkid.SubnetworkID AddrList []*NetAddress diff --git a/domainmessage/msgblock.go b/domainmessage/msgblock.go index 0d83fd3ab..2298d4dad 100644 --- a/domainmessage/msgblock.go +++ b/domainmessage/msgblock.go @@ -42,6 +42,7 @@ type TxLoc struct { // block message. It is used to deliver block and transaction information in // response to a getdata message (MsgGetData) for a given block hash. type MsgBlock struct { + baseMessage Header BlockHeader Transactions []*MsgTx } diff --git a/domainmessage/msgblocklocator.go b/domainmessage/msgblocklocator.go index b6a2b849d..71ecfc326 100644 --- a/domainmessage/msgblocklocator.go +++ b/domainmessage/msgblocklocator.go @@ -12,6 +12,7 @@ const MaxBlockLocatorsPerMsg = 500 // locator message. It is used to find the blockLocator of a peer that is // syncing with you. type MsgBlockLocator struct { + baseMessage BlockLocatorHashes []*daghash.Hash } diff --git a/domainmessage/msgdoneibdblocks.go b/domainmessage/msgdoneibdblocks.go index 9d7b9a67d..9ff008d77 100644 --- a/domainmessage/msgdoneibdblocks.go +++ b/domainmessage/msgdoneibdblocks.go @@ -5,7 +5,9 @@ package domainmessage // syncer sent all the requested blocks. // // This message has no payload. -type MsgDoneIBDBlocks struct{} +type MsgDoneIBDBlocks struct { + baseMessage +} // Command returns the protocol command string for the message. This is part // of the Message interface implementation. diff --git a/domainmessage/msgibdblock.go b/domainmessage/msgibdblock.go index 434ea105e..07daff084 100644 --- a/domainmessage/msgibdblock.go +++ b/domainmessage/msgibdblock.go @@ -8,6 +8,7 @@ package domainmessage // ibdblock message. It is used to deliver block and transaction information in // response to a RequestIBDBlocks message (MsgRequestIBDBlocks). type MsgIBDBlock struct { + baseMessage *MsgBlock } @@ -26,5 +27,5 @@ func (msg *MsgIBDBlock) MaxPayloadLength(pver uint32) uint32 { // NewMsgIBDBlock returns a new kaspa ibdblock message that conforms to the // Message interface. See MsgIBDBlock for details. func NewMsgIBDBlock(msgBlock *MsgBlock) *MsgIBDBlock { - return &MsgIBDBlock{msgBlock} + return &MsgIBDBlock{MsgBlock: msgBlock} } diff --git a/domainmessage/msgibdblock_test.go b/domainmessage/msgibdblock_test.go index 520cf6759..c01d9b487 100644 --- a/domainmessage/msgibdblock_test.go +++ b/domainmessage/msgibdblock_test.go @@ -77,8 +77,8 @@ func TestIBDBlockEncoding(t *testing.T) { }{ // Latest protocol version. { - &MsgIBDBlock{&blockOne}, - &MsgIBDBlock{&blockOne}, + &MsgIBDBlock{MsgBlock: &blockOne}, + &MsgIBDBlock{MsgBlock: &blockOne}, blockOneBytes, blockOneTxLocs, ProtocolVersion, diff --git a/domainmessage/msginvrelayblock.go b/domainmessage/msginvrelayblock.go index 72e792832..3134f9c40 100644 --- a/domainmessage/msginvrelayblock.go +++ b/domainmessage/msginvrelayblock.go @@ -8,6 +8,7 @@ import ( // block inventory message. It is used to notify the network about new block // by sending their hash, and let the receiving node decide if it needs it. type MsgInvRelayBlock struct { + baseMessage Hash *daghash.Hash } diff --git a/domainmessage/msginvtransaction.go b/domainmessage/msginvtransaction.go index 21c1d7a01..bee533a5f 100644 --- a/domainmessage/msginvtransaction.go +++ b/domainmessage/msginvtransaction.go @@ -12,6 +12,7 @@ const MaxInvPerTxInvMsg = MaxInvPerMsg // TxInv message. It is used to notify the network about new transactions // by sending their ID, and let the receiving node decide if it needs it. type MsgInvTransaction struct { + baseMessage TxIDs []*daghash.TxID } diff --git a/domainmessage/msgping.go b/domainmessage/msgping.go index 2d1977575..b3f3d31a4 100644 --- a/domainmessage/msgping.go +++ b/domainmessage/msgping.go @@ -16,6 +16,7 @@ package domainmessage // The payload for this message just consists of a nonce used for identifying // it later. type MsgPing struct { + baseMessage // Unique value associated with message that is used to identify // specific ping message. Nonce uint64 diff --git a/domainmessage/msgpong.go b/domainmessage/msgpong.go index 3a0aeb1bd..f8ad866bd 100644 --- a/domainmessage/msgpong.go +++ b/domainmessage/msgpong.go @@ -10,6 +10,7 @@ package domainmessage // // This message was not added until protocol versions AFTER BIP0031Version. type MsgPong struct { + baseMessage // Unique value associated with message that is used to identify // specific ping message. Nonce uint64 diff --git a/domainmessage/msgrequestaddresses.go b/domainmessage/msgrequestaddresses.go index cc123845a..dda1c4d2f 100644 --- a/domainmessage/msgrequestaddresses.go +++ b/domainmessage/msgrequestaddresses.go @@ -15,6 +15,7 @@ import ( // // This message has no payload. type MsgRequestAddresses struct { + baseMessage IncludeAllSubnetworks bool SubnetworkID *subnetworkid.SubnetworkID } diff --git a/domainmessage/msgrequestblocklocator.go b/domainmessage/msgrequestblocklocator.go index 97b3995b6..cc1e4c28f 100644 --- a/domainmessage/msgrequestblocklocator.go +++ b/domainmessage/msgrequestblocklocator.go @@ -9,6 +9,7 @@ import ( // and low hash. // The locator is returned via a locator message (MsgBlockLocator). type MsgRequestBlockLocator struct { + baseMessage HighHash *daghash.Hash LowHash *daghash.Hash } diff --git a/domainmessage/msgrequestibdblocks.go b/domainmessage/msgrequestibdblocks.go index afca41378..1301e6381 100644 --- a/domainmessage/msgrequestibdblocks.go +++ b/domainmessage/msgrequestibdblocks.go @@ -12,6 +12,7 @@ import ( // RequestIBDBlocks message. It is used to request a list of blocks starting after the // low hash and until the high hash. type MsgRequestIBDBlocks struct { + baseMessage LowHash *daghash.Hash HighHash *daghash.Hash } diff --git a/domainmessage/msgrequestnextibdblocks.go b/domainmessage/msgrequestnextibdblocks.go index 59be25439..8fb1ca0e1 100644 --- a/domainmessage/msgrequestnextibdblocks.go +++ b/domainmessage/msgrequestnextibdblocks.go @@ -5,7 +5,9 @@ package domainmessage // more blocks. // // This message has no payload. -type MsgRequestNextIBDBlocks struct{} +type MsgRequestNextIBDBlocks struct { + baseMessage +} // Command returns the protocol command string for the message. This is part // of the Message interface implementation. diff --git a/domainmessage/msgrequestrelayblocks.go b/domainmessage/msgrequestrelayblocks.go index a4d1c36e6..dc61a3534 100644 --- a/domainmessage/msgrequestrelayblocks.go +++ b/domainmessage/msgrequestrelayblocks.go @@ -12,6 +12,7 @@ const MsgRequestRelayBlocksHashes = MaxInvPerMsg // RequestRelayBlocks message. It is used to request blocks as part of the block // relay protocol. type MsgRequestRelayBlocks struct { + baseMessage Hashes []*daghash.Hash } diff --git a/domainmessage/msgrequestselectedtip.go b/domainmessage/msgrequestselectedtip.go index 3d9bee883..578efaf02 100644 --- a/domainmessage/msgrequestselectedtip.go +++ b/domainmessage/msgrequestselectedtip.go @@ -4,7 +4,9 @@ package domainmessage // RequestSelectedTip message. It is used to request the selected tip of another peer. // // This message has no payload. -type MsgRequestSelectedTip struct{} +type MsgRequestSelectedTip struct { + baseMessage +} // Command returns the protocol command string for the message. This is part // of the Message interface implementation. diff --git a/domainmessage/msgrequesttransactions.go b/domainmessage/msgrequesttransactions.go index 1929596f4..b6a0831b6 100644 --- a/domainmessage/msgrequesttransactions.go +++ b/domainmessage/msgrequesttransactions.go @@ -12,6 +12,7 @@ const MaxInvPerRequestTransactionsMsg = MaxInvPerMsg // RequestTransactions message. It is used to request transactions as part of the // transactions relay protocol. type MsgRequestTransactions struct { + baseMessage IDs []*daghash.TxID } diff --git a/domainmessage/msgselectedtip.go b/domainmessage/msgselectedtip.go index 837a30f02..36669616b 100644 --- a/domainmessage/msgselectedtip.go +++ b/domainmessage/msgselectedtip.go @@ -8,6 +8,7 @@ import ( // selectedtip message. It is used to answer getseltip messages and tell // the asking peer what is the selected tip of this peer. type MsgSelectedTip struct { + baseMessage // The selected tip hash of the generator of the message. SelectedTipHash *daghash.Hash } diff --git a/domainmessage/msgtransactionnotfound.go b/domainmessage/msgtransactionnotfound.go index 9d7b2a8ef..167d2b4e1 100644 --- a/domainmessage/msgtransactionnotfound.go +++ b/domainmessage/msgtransactionnotfound.go @@ -11,6 +11,7 @@ import ( // MsgTransactionNotFound defines a kaspa TransactionNotFound message which is sent in response to // a RequestTransactions message if any of the requested data in not available on the peer. type MsgTransactionNotFound struct { + baseMessage ID *daghash.TxID } diff --git a/domainmessage/msgtx.go b/domainmessage/msgtx.go index fbbeb5adb..57a0a784f 100644 --- a/domainmessage/msgtx.go +++ b/domainmessage/msgtx.go @@ -268,6 +268,7 @@ func NewTxOut(value uint64, scriptPubKey []byte) *TxOut { // Use the AddTxIn and AddTxOut functions to build up the list of transaction // inputs and outputs. type MsgTx struct { + baseMessage Version int32 TxIn []*TxIn TxOut []*TxOut diff --git a/domainmessage/msgverack.go b/domainmessage/msgverack.go index 5e2e45173..681a8e79c 100644 --- a/domainmessage/msgverack.go +++ b/domainmessage/msgverack.go @@ -9,7 +9,9 @@ package domainmessage // to negotiate parameters. It implements the Message interface. // // This message has no payload. -type MsgVerAck struct{} +type MsgVerAck struct { + baseMessage +} // Command returns the protocol command string for the message. This is part // of the Message interface implementation. diff --git a/domainmessage/msgversion.go b/domainmessage/msgversion.go index c1afc81bf..8f334f2fe 100644 --- a/domainmessage/msgversion.go +++ b/domainmessage/msgversion.go @@ -31,6 +31,7 @@ var DefaultUserAgent = fmt.Sprintf("/kaspad:%s/", version.Version()) // message (MsgVerAck). This exchange must take place before any further // communication is allowed to proceed. type MsgVersion struct { + baseMessage // Version of the protocol the node is using. ProtocolVersion uint32 diff --git a/netadapter/server/grpcserver/connection_loops.go b/netadapter/server/grpcserver/connection_loops.go index 23db318e6..a5b760120 100644 --- a/netadapter/server/grpcserver/connection_loops.go +++ b/netadapter/server/grpcserver/connection_loops.go @@ -2,6 +2,7 @@ package grpcserver import ( "io" + "time" routerpkg "github.com/kaspanet/kaspad/netadapter/router" "github.com/pkg/errors" @@ -61,6 +62,7 @@ func (c *gRPCConnection) sendLoop() error { } func (c *gRPCConnection) receiveLoop() error { + messageNumber := uint64(0) for c.IsConnected() { protoMessage, err := c.stream.Recv() if err != nil { @@ -75,10 +77,17 @@ func (c *gRPCConnection) receiveLoop() error { return err } - log.Debugf("incoming '%s' message from %s", message.Command(), c) - log.Tracef("incoming '%s' message from %s: %s", message.Command(), c, logger.NewLogClosure(func() string { - return spew.Sdump(message) - })) + messageNumber++ + message.SetMessageNumber(messageNumber) + message.SetReceivedAt(time.Now()) + + log.Debugf("incoming '%s' message from %s (message number %d)", message.Command(), c, + message.MessageNumber()) + + log.Tracef("incoming '%s' message from %s (message number %d): %s", message.Command(), + c, message.MessageNumber(), logger.NewLogClosure(func() string { + return spew.Sdump(message) + })) err = c.router.EnqueueIncomingMessage(message) if err != nil {