From 5d20772f9467903bb29036d62a5ef54c0c3bbf76 Mon Sep 17 00:00:00 2001 From: stasatdaglabs <39559713+stasatdaglabs@users.noreply.github.com> Date: Sun, 23 Aug 2020 13:11:48 +0300 Subject: [PATCH] [NOD-1293] Fix kaspad sending 127.0.0.1 in its msgVersion (#886) * [NOD-1293] Use addressManager's GetBestLocalAddress. * [NOD-1293] Copy the initListeners function from the old p2p to the address manager. * [NOD-1293] Remove debug logs. * [NOD-1293] Remove unused import. * [NOD-1293] Fix a comment. --- app/app.go | 7 +- app/appmessage/protocol.go | 4 + .../flows/addressexchange/receiveaddresses.go | 2 +- .../flows/addressexchange/sendaddresses.go | 1 - app/protocol/flows/handshake/handshake.go | 2 +- app/protocol/flows/handshake/sendversion.go | 16 +- .../network/addressmanager/addressmanager.go | 195 +++++++++++++++++- .../addressmanager/addressmanager_test.go | 5 +- .../network/netadapter/netadapter.go | 56 ----- 9 files changed, 216 insertions(+), 72 deletions(-) diff --git a/app/app.go b/app/app.go index 975aa6c21..89ba4adb2 100644 --- a/app/app.go +++ b/app/app.go @@ -114,13 +114,14 @@ func New(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, interrup if err != nil { return nil, err } - addressManager := addressmanager.New(cfg, databaseContext) - + addressManager, err := addressmanager.New(cfg, databaseContext) + if err != nil { + return nil, err + } connectionManager, err := connmanager.New(cfg, netAdapter, addressManager) if err != nil { return nil, err } - protocolManager, err := protocol.NewManager(cfg, dag, netAdapter, addressManager, txMempool, connectionManager) if err != nil { return nil, err diff --git a/app/appmessage/protocol.go b/app/appmessage/protocol.go index 20e219d6a..2b6f096d1 100644 --- a/app/appmessage/protocol.go +++ b/app/appmessage/protocol.go @@ -14,6 +14,10 @@ import ( const ( // ProtocolVersion is the latest protocol version this package supports. ProtocolVersion uint32 = 1 + + // defaultServices describes the default services that are supported by + // the server. + DefaultServices = SFNodeNetwork | SFNodeBloom | SFNodeCF ) // ServiceFlag identifies services supported by a kaspa peer. diff --git a/app/protocol/flows/addressexchange/receiveaddresses.go b/app/protocol/flows/addressexchange/receiveaddresses.go index 15c285088..535f66725 100644 --- a/app/protocol/flows/addressexchange/receiveaddresses.go +++ b/app/protocol/flows/addressexchange/receiveaddresses.go @@ -38,7 +38,7 @@ func ReceiveAddresses(context ReceiveAddressesContext, incomingRoute *router.Rou msgAddresses := message.(*appmessage.MsgAddresses) if len(msgAddresses.AddrList) > addressmanager.GetAddressesMax { - return protocolerrors.Errorf(true, "address count excceeded %d", addressmanager.GetAddressesMax) + return protocolerrors.Errorf(true, "address count exceeded %d", addressmanager.GetAddressesMax) } if msgAddresses.IncludeAllSubnetworks { diff --git a/app/protocol/flows/addressexchange/sendaddresses.go b/app/protocol/flows/addressexchange/sendaddresses.go index c20941ef3..236073406 100644 --- a/app/protocol/flows/addressexchange/sendaddresses.go +++ b/app/protocol/flows/addressexchange/sendaddresses.go @@ -14,7 +14,6 @@ type SendAddressesContext interface { // SendAddresses sends addresses to a peer that requests it. func SendAddresses(context SendAddressesContext, incomingRoute *router.Route, outgoingRoute *router.Route) error { - message, err := incomingRoute.Dequeue() if err != nil { return err diff --git a/app/protocol/flows/handshake/handshake.go b/app/protocol/flows/handshake/handshake.go index 9d4c0bb36..d32d5a496 100644 --- a/app/protocol/flows/handshake/handshake.go +++ b/app/protocol/flows/handshake/handshake.go @@ -60,7 +60,7 @@ func HandleHandshake(context HandleHandshakeContext, netConnection *netadapter.N spawn("HandleHandshake-SendVersion", func() { defer wg.Done() - err := SendVersion(context, sendVersionRoute, outgoingRoute) + err := SendVersion(context, sendVersionRoute, outgoingRoute, peer) if err != nil { handleError(err, "SendVersion", &isStopping, errChan) return diff --git a/app/protocol/flows/handshake/sendversion.go b/app/protocol/flows/handshake/sendversion.go index 5b2b0c555..c24d73e07 100644 --- a/app/protocol/flows/handshake/sendversion.go +++ b/app/protocol/flows/handshake/sendversion.go @@ -3,6 +3,7 @@ package handshake import ( "github.com/kaspanet/kaspad/app/appmessage" "github.com/kaspanet/kaspad/app/protocol/common" + peerpkg "github.com/kaspanet/kaspad/app/protocol/peer" "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router" "github.com/kaspanet/kaspad/version" ) @@ -18,7 +19,7 @@ var ( // defaultServices describes the default services that are supported by // the server. - defaultServices = appmessage.SFNodeNetwork | appmessage.SFNodeBloom | appmessage.SFNodeCF + defaultServices = appmessage.DefaultServices // defaultRequiredServices describes the default services that are // required to be supported by outbound peers. @@ -28,14 +29,18 @@ var ( type sendVersionFlow struct { HandleHandshakeContext incomingRoute, outgoingRoute *router.Route + peer *peerpkg.Peer } // SendVersion sends a version to a peer and waits for verack. -func SendVersion(context HandleHandshakeContext, incomingRoute *router.Route, outgoingRoute *router.Route) error { +func SendVersion(context HandleHandshakeContext, incomingRoute *router.Route, + outgoingRoute *router.Route, peer *peerpkg.Peer) error { + flow := &sendVersionFlow{ HandleHandshakeContext: context, incomingRoute: incomingRoute, outgoingRoute: outgoingRoute, + peer: peer, } return flow.start() } @@ -45,10 +50,7 @@ func (flow *sendVersionFlow) start() error { subnetworkID := flow.Config().SubnetworkID // Version message. - localAddress, err := flow.NetAdapter().GetBestLocalAddress() - if err != nil { - return err - } + localAddress := flow.AddressManager().GetBestLocalAddress(flow.peer.Connection().NetAddress()) msg := appmessage.NewMsgVersion(localAddress, flow.NetAdapter().ID(), flow.Config().ActiveNetParams.Name, selectedTipHash, subnetworkID) msg.AddUserAgent(userAgentName, userAgentVersion, flow.Config().UserAgentComments...) @@ -62,7 +64,7 @@ func (flow *sendVersionFlow) start() error { // Advertise if inv messages for transactions are desired. msg.DisableRelayTx = flow.Config().BlocksOnly - err = flow.outgoingRoute.Enqueue(msg) + err := flow.outgoingRoute.Enqueue(msg) if err != nil { return err } diff --git a/infrastructure/network/addressmanager/addressmanager.go b/infrastructure/network/addressmanager/addressmanager.go index bbcd60338..269b93f77 100644 --- a/infrastructure/network/addressmanager/addressmanager.go +++ b/infrastructure/network/addressmanager/addressmanager.go @@ -17,7 +17,9 @@ import ( "io" "math/rand" "net" + "runtime" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -192,7 +194,7 @@ const ( var ErrAddressNotFound = errors.New("address not found") // New returns a new Kaspa address manager. -func New(cfg *config.Config, databaseContext *dbaccess.DatabaseContext) *AddressManager { +func New(cfg *config.Config, databaseContext *dbaccess.DatabaseContext) (*AddressManager, error) { addressManager := AddressManager{ cfg: cfg, databaseContext: databaseContext, @@ -202,8 +204,12 @@ func New(cfg *config.Config, databaseContext *dbaccess.DatabaseContext) *Address localAddresses: make(map[AddressKey]*localAddress), localSubnetworkID: cfg.SubnetworkID, } + err := addressManager.initListeners() + if err != nil { + return nil, err + } addressManager.reset() - return &addressManager + return &addressManager, nil } // updateAddress is a helper function to either update an address already known @@ -1419,3 +1425,188 @@ func (am *AddressManager) IsBanned(address *appmessage.NetAddress) (bool, error) } return knownAddress.isBanned, nil } + +// initListeners initializes the configured net listeners and adds any bound +// addresses to the address manager +func (am *AddressManager) initListeners() error { + if len(am.cfg.ExternalIPs) != 0 { + defaultPort, err := strconv.ParseUint(am.cfg.NetParams().DefaultPort, 10, 16) + if err != nil { + log.Errorf("Can not parse default port %s for active DAG: %s", + am.cfg.NetParams().DefaultPort, err) + return err + } + + for _, sip := range am.cfg.ExternalIPs { + eport := uint16(defaultPort) + host, portstr, err := net.SplitHostPort(sip) + if err != nil { + // no port, use default. + host = sip + } else { + port, err := strconv.ParseUint(portstr, 10, 16) + if err != nil { + log.Warnf("Can not parse port from %s for "+ + "externalip: %s", sip, err) + continue + } + eport = uint16(port) + } + na, err := am.HostToNetAddress(host, eport, appmessage.DefaultServices) + if err != nil { + log.Warnf("Not adding %s as externalip: %s", sip, err) + continue + } + + err = am.AddLocalAddress(na, ManualPrio) + if err != nil { + log.Warnf("Skipping specified external IP: %s", err) + } + } + } else { + // Listen for TCP connections at the configured addresses + netAddrs, err := parseListeners(am.cfg.Listeners) + if err != nil { + return err + } + + // Add bound addresses to address manager to be advertised to peers. + for _, addr := range netAddrs { + listener, err := net.Listen(addr.Network(), addr.String()) + if err != nil { + log.Warnf("Can't listen on %s: %s", addr, err) + continue + } + addr := listener.Addr().String() + err = listener.Close() + if err != nil { + return err + } + err = am.addLocalAddress(addr) + if err != nil { + log.Warnf("Skipping bound address %s: %s", addr, err) + } + } + } + + return nil +} + +// parseListeners determines whether each listen address is IPv4 and IPv6 and +// returns a slice of appropriate net.Addrs to listen on with TCP. It also +// properly detects addresses which apply to "all interfaces" and adds the +// address as both IPv4 and IPv6. +func parseListeners(addrs []string) ([]net.Addr, error) { + netAddrs := make([]net.Addr, 0, len(addrs)*2) + for _, addr := range addrs { + host, _, err := net.SplitHostPort(addr) + if err != nil { + // Shouldn't happen due to already being normalized. + return nil, err + } + + // Empty host or host of * on plan9 is both IPv4 and IPv6. + if host == "" || (host == "*" && runtime.GOOS == "plan9") { + netAddrs = append(netAddrs, simpleAddr{net: "tcp4", addr: addr}) + netAddrs = append(netAddrs, simpleAddr{net: "tcp6", addr: addr}) + continue + } + + // Strip IPv6 zone id if present since net.ParseIP does not + // handle it. + zoneIndex := strings.LastIndex(host, "%") + if zoneIndex > 0 { + host = host[:zoneIndex] + } + + // Parse the IP. + ip := net.ParseIP(host) + if ip == nil { + hostAddrs, err := net.LookupHost(host) + if err != nil { + return nil, err + } + ip = net.ParseIP(hostAddrs[0]) + if ip == nil { + return nil, errors.Errorf("Cannot resolve IP address for host '%s'", host) + } + } + + // To4 returns nil when the IP is not an IPv4 address, so use + // this determine the address type. + if ip.To4() == nil { + netAddrs = append(netAddrs, simpleAddr{net: "tcp6", addr: addr}) + } else { + netAddrs = append(netAddrs, simpleAddr{net: "tcp4", addr: addr}) + } + } + return netAddrs, nil +} + +// addLocalAddress adds an address that this node is listening on to the +// address manager so that it may be relayed to peers. +func (am *AddressManager) addLocalAddress(addr string) error { + host, portStr, err := net.SplitHostPort(addr) + if err != nil { + return err + } + port, err := strconv.ParseUint(portStr, 10, 16) + if err != nil { + return err + } + + if ip := net.ParseIP(host); ip != nil && ip.IsUnspecified() { + // If bound to unspecified address, advertise all local interfaces + addrs, err := net.InterfaceAddrs() + if err != nil { + return err + } + + for _, addr := range addrs { + ifaceIP, _, err := net.ParseCIDR(addr.String()) + if err != nil { + continue + } + + // If bound to 0.0.0.0, do not add IPv6 interfaces and if bound to + // ::, do not add IPv4 interfaces. + if (ip.To4() == nil) != (ifaceIP.To4() == nil) { + continue + } + + netAddr := appmessage.NewNetAddressIPPort(ifaceIP, uint16(port), appmessage.DefaultServices) + am.AddLocalAddress(netAddr, BoundPrio) + } + } else { + netAddr, err := am.HostToNetAddress(host, uint16(port), appmessage.DefaultServices) + if err != nil { + return err + } + + am.AddLocalAddress(netAddr, BoundPrio) + } + + return nil +} + +// simpleAddr implements the net.Addr interface with two struct fields +type simpleAddr struct { + net, addr string +} + +// String returns the address. +// +// This is part of the net.Addr interface. +func (a simpleAddr) String() string { + return a.addr +} + +// Network returns the network. +// +// This is part of the net.Addr interface. +func (a simpleAddr) Network() string { + return a.net +} + +// Ensure simpleAddr implements the net.Addr interface. +var _ net.Addr = simpleAddr{} diff --git a/infrastructure/network/addressmanager/addressmanager_test.go b/infrastructure/network/addressmanager/addressmanager_test.go index 97a3b9e82..99f3ea2bf 100644 --- a/infrastructure/network/addressmanager/addressmanager_test.go +++ b/infrastructure/network/addressmanager/addressmanager_test.go @@ -123,7 +123,10 @@ func newAddrManagerForTest(t *testing.T, testName string, t.Fatalf("error creating db: %s", err) } - addressManager = New(cfg, databaseContext) + addressManager, err = New(cfg, databaseContext) + if err != nil { + t.Fatalf("error creating address manager: %s", err) + } return addressManager, func() { err := databaseContext.Close() diff --git a/infrastructure/network/netadapter/netadapter.go b/infrastructure/network/netadapter/netadapter.go index 11f5bd6a4..113c4d643 100644 --- a/infrastructure/network/netadapter/netadapter.go +++ b/infrastructure/network/netadapter/netadapter.go @@ -1,8 +1,6 @@ package netadapter import ( - "net" - "strconv" "sync" "sync/atomic" @@ -159,57 +157,3 @@ func (na *NetAdapter) Broadcast(netConnections []*NetConnection, message appmess } return nil } - -// GetBestLocalAddress returns the most appropriate local address to use -// for the given remote address. -func (na *NetAdapter) GetBestLocalAddress() (*appmessage.NetAddress, error) { - if len(na.cfg.ExternalIPs) > 0 { - host, portString, err := net.SplitHostPort(na.cfg.ExternalIPs[0]) - if err != nil { - portString = na.cfg.NetParams().DefaultPort - } - portInt, err := strconv.Atoi(portString) - if err != nil { - return nil, err - } - - ip := net.ParseIP(host) - if ip == nil { - hostAddrs, err := net.LookupHost(host) - if err != nil { - return nil, err - } - ip = net.ParseIP(hostAddrs[0]) - if ip == nil { - return nil, errors.Errorf("Cannot resolve IP address for host '%s'", host) - } - } - return appmessage.NewNetAddressIPPort(ip, uint16(portInt), appmessage.SFNodeNetwork), nil - - } - listenAddress := na.cfg.Listeners[0] - _, portString, err := net.SplitHostPort(listenAddress) - if err != nil { - portString = na.cfg.NetParams().DefaultPort - } - - portInt, err := strconv.Atoi(portString) - if err != nil { - return nil, err - } - - addresses, err := net.InterfaceAddrs() - if err != nil { - return nil, err - } - - for _, address := range addresses { - ip, _, err := net.ParseCIDR(address.String()) - if err != nil { - continue - } - - return appmessage.NewNetAddressIPPort(ip, uint16(portInt), appmessage.SFNodeNetwork), nil - } - return nil, errors.New("no address was found") -}