[NOD-1163] Combine seperated flows into single packages (#801)

* [NOD-1163] Combine seperated flows into single packages

* [NOD-1163] Move handshake.go to handshake package

* [NOD-1163] Use single logger prefix for everything under protocol

* [NOD-1163] Add comment

* [NOD-1163] Fix refactor error
This commit is contained in:
Svarog 2020-07-19 11:24:25 +03:00 committed by GitHub
parent b42b8b16fd
commit b9a25c1141
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 44 additions and 62 deletions

View File

@ -51,9 +51,6 @@ var (
grpcLog = BackendLog.Logger("GRPC")
p2psLog = BackendLog.Logger("P2PS")
ntarLog = BackendLog.Logger("NTAR")
blkrLog = BackendLog.Logger("BLKR")
gbrlLog = BackendLog.Logger("GBRL")
blprLog = BackendLog.Logger("BLPR")
dnssLog = BackendLog.Logger("DNSS")
snvrLog = BackendLog.Logger("SNVR")
)
@ -82,10 +79,7 @@ var SubsystemTags = struct {
MUXX,
GRPC,
P2PS,
BLKR,
NTAR,
GBRL,
BLPR,
DNSS,
SNVR string
}{
@ -111,10 +105,7 @@ var SubsystemTags = struct {
MUXX: "MUXX",
GRPC: "GRPC",
P2PS: "P2PS",
BLKR: "BLKR",
GBRL: "GBRL",
NTAR: "NTAR",
BLPR: "BLPR",
DNSS: "DNSS",
SNVR: "SNVR",
}
@ -143,10 +134,7 @@ var subsystemLoggers = map[string]*logs.Logger{
SubsystemTags.MUXX: muxxLog,
SubsystemTags.GRPC: grpcLog,
SubsystemTags.P2PS: p2psLog,
SubsystemTags.BLKR: blkrLog,
SubsystemTags.GBRL: gbrlLog,
SubsystemTags.NTAR: ntarLog,
SubsystemTags.BLPR: blprLog,
SubsystemTags.DNSS: dnssLog,
SubsystemTags.SNVR: snvrLog,
}

View File

@ -8,4 +8,4 @@ import (
"github.com/kaspanet/kaspad/logger"
)
var log, _ = logger.Get(logger.SubsystemTags.BLPR)
var log, _ = logger.Get(logger.SubsystemTags.PROT)

View File

@ -1,13 +1,14 @@
package receiveaddresses
package addressexchange
import (
"time"
"github.com/kaspanet/kaspad/addrmgr"
"github.com/kaspanet/kaspad/config"
"github.com/kaspanet/kaspad/netadapter/router"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/protocol/protocolerrors"
"github.com/kaspanet/kaspad/wire"
"time"
)
const timeout = 30 * time.Second

View File

@ -1,11 +1,12 @@
package sendaddresses
package addressexchange
import (
"math/rand"
"time"
"github.com/kaspanet/kaspad/addrmgr"
"github.com/kaspanet/kaspad/netadapter/router"
"github.com/kaspanet/kaspad/wire"
"math/rand"
"time"
)
// SendAddresses sends addresses to a peer that requests it.

View File

@ -1,4 +1,4 @@
package handlerelayblockrequests
package blockrelay
import (
"github.com/kaspanet/kaspad/blockdag"

View File

@ -1,6 +1,8 @@
package handlerelayinvs
package blockrelay
import (
"time"
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/netadapter/router"
@ -12,7 +14,6 @@ import (
mathUtil "github.com/kaspanet/kaspad/util/math"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
"time"
)
const timeout = 30 * time.Second

View File

@ -1,4 +1,4 @@
package handlerelayinvs
package blockrelay
import "github.com/kaspanet/kaspad/util/daghash"

View File

@ -1,9 +1,9 @@
package sendversion
package blockrelay
import (
"github.com/kaspanet/kaspad/logger"
"github.com/kaspanet/kaspad/util/panics"
)
var log, _ = logger.Get(logger.SubsystemTags.GBRL)
var log, _ = logger.Get(logger.SubsystemTags.PROT)
var spawn = panics.GoroutineWrapperFunc(log)

View File

@ -1,8 +1,9 @@
package handlerelayinvs
package blockrelay
import (
"github.com/kaspanet/kaspad/util/daghash"
"sync"
"github.com/kaspanet/kaspad/util/daghash"
)
type sharedRequestedBlocks struct {

View File

@ -1,4 +1,4 @@
package protocol
package handshake
import (
"sync"
@ -9,14 +9,14 @@ import (
"github.com/kaspanet/kaspad/netadapter"
routerpkg "github.com/kaspanet/kaspad/netadapter/router"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/protocol/receiveversion"
"github.com/kaspanet/kaspad/protocol/sendversion"
"github.com/kaspanet/kaspad/util/locks"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
)
func handshake(router *routerpkg.Router, netAdapter *netadapter.NetAdapter, peer *peerpkg.Peer,
// HandleHandshake sets up the handshake protocol - It sends a version message and waits for an incoming
// version message, as well as a verack for the sent version
func HandleHandshake(router *routerpkg.Router, netAdapter *netadapter.NetAdapter, peer *peerpkg.Peer,
dag *blockdag.BlockDAG, addressManager *addrmgr.AddrManager) (closed bool, err error) {
receiveVersionRoute, err := router.AddIncomingRoute([]wire.MessageCommand{wire.CmdVersion})
@ -29,9 +29,9 @@ func handshake(router *routerpkg.Router, netAdapter *netadapter.NetAdapter, peer
panic(err)
}
// For the handshake to finish, we need to get from the other node
// For HandleHandshake to finish, we need to get from the other node
// a version and verack messages, so we increase the wait group by 2
// and block the handshake with wg.Wait().
// and block HandleHandshake with wg.Wait().
wg := sync.WaitGroup{}
wg.Add(2)
@ -41,7 +41,7 @@ func handshake(router *routerpkg.Router, netAdapter *netadapter.NetAdapter, peer
var peerAddress *wire.NetAddress
spawn(func() {
defer wg.Done()
address, closed, err := receiveversion.ReceiveVersion(receiveVersionRoute, router.OutgoingRoute(), netAdapter, peer, dag)
address, closed, err := ReceiveVersion(receiveVersionRoute, router.OutgoingRoute(), netAdapter, peer, dag)
if err != nil {
log.Errorf("error from ReceiveVersion: %s", err)
}
@ -56,7 +56,7 @@ func handshake(router *routerpkg.Router, netAdapter *netadapter.NetAdapter, peer
spawn(func() {
defer wg.Done()
closed, err := sendversion.SendVersion(sendVersionRoute, router.OutgoingRoute(), netAdapter, dag)
closed, err := SendVersion(sendVersionRoute, router.OutgoingRoute(), netAdapter, dag)
if err != nil {
log.Errorf("error from SendVersion: %s", err)
}

View File

@ -1,9 +1,9 @@
package handlerelayinvs
package handshake
import (
"github.com/kaspanet/kaspad/logger"
"github.com/kaspanet/kaspad/util/panics"
)
var log, _ = logger.Get(logger.SubsystemTags.BLKR)
var log, _ = logger.Get(logger.SubsystemTags.PROT)
var spawn = panics.GoroutineWrapperFunc(log)

View File

@ -1,13 +1,14 @@
package receiveversion
package handshake
import (
"time"
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/netadapter/router"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/protocol/protocolerrors"
"github.com/kaspanet/kaspad/wire"
"time"
)
var (

View File

@ -1,4 +1,4 @@
package sendversion
package handshake
import (
"github.com/kaspanet/kaspad/blockdag"
@ -7,7 +7,6 @@ import (
"github.com/kaspanet/kaspad/netadapter/router"
"github.com/kaspanet/kaspad/version"
"github.com/kaspanet/kaspad/wire"
"time"
)
var (
@ -28,8 +27,6 @@ var (
defaultRequiredServices = wire.SFNodeNetwork
)
const timeout = 30 * time.Second
// SendVersion sends a version to a peer and waits for verack.
func SendVersion(incomingRoute *router.Route, outgoingRoute *router.Route, netAdapter *netadapter.NetAdapter,
dag *blockdag.BlockDAG) (routeClosed bool, err error) {

View File

@ -1,9 +0,0 @@
package handlerelayblockrequests
import (
"github.com/kaspanet/kaspad/logger"
"github.com/kaspanet/kaspad/util/panics"
)
var log, _ = logger.Get(logger.SubsystemTags.GBRL)
var spawn = panics.GoroutineWrapperFunc(log)

View File

@ -5,5 +5,5 @@ import (
"github.com/kaspanet/kaspad/util/panics"
)
var log, _ = logger.Get(logger.SubsystemTags.PEER)
var log, _ = logger.Get(logger.SubsystemTags.PROT)
var spawn = panics.GoroutineWrapperFunc(log)

View File

@ -4,17 +4,18 @@ import (
"errors"
"sync/atomic"
"github.com/kaspanet/kaspad/protocol/flows/handshake"
"github.com/kaspanet/kaspad/protocol/flows/addressexchange"
"github.com/kaspanet/kaspad/protocol/flows/blockrelay"
"github.com/kaspanet/kaspad/addrmgr"
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/netadapter"
routerpkg "github.com/kaspanet/kaspad/netadapter/router"
"github.com/kaspanet/kaspad/protocol/handlerelayblockrequests"
"github.com/kaspanet/kaspad/protocol/handlerelayinvs"
"github.com/kaspanet/kaspad/protocol/flows/ping"
peerpkg "github.com/kaspanet/kaspad/protocol/peer"
"github.com/kaspanet/kaspad/protocol/ping"
"github.com/kaspanet/kaspad/protocol/protocolerrors"
"github.com/kaspanet/kaspad/protocol/receiveaddresses"
"github.com/kaspanet/kaspad/protocol/sendaddresses"
"github.com/kaspanet/kaspad/wire"
)
@ -65,7 +66,7 @@ func startFlows(netAdapter *netadapter.NetAdapter, router *routerpkg.Router, dag
outgoingRoute := router.OutgoingRoute()
peer := new(peerpkg.Peer)
closed, err := handshake(router, netAdapter, peer, dag, addressManager)
closed, err := handshake.HandleHandshake(router, netAdapter, peer, dag, addressManager)
if err != nil {
return err
}
@ -75,25 +76,25 @@ func startFlows(netAdapter *netadapter.NetAdapter, router *routerpkg.Router, dag
addOneTimeFlow("SendAddresses", router, []wire.MessageCommand{wire.CmdGetAddresses}, &stopped, stop,
func(incomingRoute *routerpkg.Route) (routeClosed bool, err error) {
return sendaddresses.SendAddresses(incomingRoute, outgoingRoute, addressManager)
return addressexchange.SendAddresses(incomingRoute, outgoingRoute, addressManager)
},
)
addOneTimeFlow("ReceiveAddresses", router, []wire.MessageCommand{wire.CmdAddress}, &stopped, stop,
func(incomingRoute *routerpkg.Route) (routeClosed bool, err error) {
return receiveaddresses.ReceiveAddresses(incomingRoute, outgoingRoute, peer, addressManager)
return addressexchange.ReceiveAddresses(incomingRoute, outgoingRoute, peer, addressManager)
},
)
addFlow("HandleRelayInvs", router, []wire.MessageCommand{wire.CmdInvRelayBlock, wire.CmdBlock}, &stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return handlerelayinvs.HandleRelayInvs(incomingRoute, outgoingRoute, peer, netAdapter, dag)
return blockrelay.HandleRelayInvs(incomingRoute, outgoingRoute, peer, netAdapter, dag)
},
)
addFlow("HandleRelayBlockRequests", router, []wire.MessageCommand{wire.CmdGetRelayBlocks}, &stopped, stop,
func(incomingRoute *routerpkg.Route) error {
return handlerelayblockrequests.HandleRelayBlockRequests(incomingRoute, outgoingRoute, peer, dag)
return blockrelay.HandleRelayBlockRequests(incomingRoute, outgoingRoute, peer, dag)
},
)