[NOD-1120] Connection Manager (#796)

* [NOD-1120] Removed closure in NetAdapter.onConnectedHanlder

* [NOD-1120] Implement all connection manager methods

* [NOD-1120] Integrated connmanager into kaspad + added call for dnsseeder

* [NOD-1120] Allow buffer to not be bytes.Buffer

* [NOD-1120] Added timeout to connect

* [NOD-1120] Don't enter connections to  add loop if none needed

* [NOD-1120] Add call for addressManager.Good

* [NOD-1120] Minor bug fixes

* [NOD-1120] Remove errChan from grpcConnection

* [NOD-1120] Add comments to exported methods

* [NOD-1120] cancel the context for DialContext in gRPCServer.Connect

* [NOD-1120] Don't try to remove from connSet a connection that doesn't exist

* [NOD-1120] add ok bool to connectionSet.get

* [NOD-1120] Remove overuse of if-else in checkConnectionRequests

* [NOD-1120] Made some order in ConnectionManager

* [NOD-1120] Moved checkIncomingConnections to it's own file

* [NOD-1120] cleanup in checkOutgoingConnections

* [NOD-1120] Cleanup in SeedDNS, and move call outside of connection manager

* [NOD-1120] Add check that both --connect and --addpeer aren't used

* [NOD-1120] Move dial timeout to constant

* [NOD-1120] Enhance comment

* [NOD-1120] Log connection failure out of initiateConnection

* [NOD-1148] Reshuffle checkRequestedConnections to make more sense

* [NOD-1120] Move continue to correct place + reshuffle logging code

* [NOD-1120] Don't expose server.Connection outside netAdapter - expose a wrapper instead

* [NOD-1120] Add comments

* [NOD-1120] Don't return the connection from netAdapter.Connect()

* [NOD-1120] Use .Address as key for connectionSet

* [NOD-1120] Fix minRetryDuration usage

* [NOD-1120] Remove the correct number of incoming connections

* [NOD-1120] Add comment

* [NOD-1120] Rename connSet -> incomingConnectionSet

* [NOD-1120] fix grammar
This commit is contained in:
Svarog 2020-07-16 17:15:58 +03:00 committed by GitHub
parent e0aac68759
commit b42b8b16fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 646 additions and 216 deletions

View File

@ -476,9 +476,10 @@ func loadConfig() (*Config, []string, error) {
activeConfig.DisableListen = true
}
// Connect means no DNS seeding.
// ConnectPeers means no DNS seeding and no outbound peers
if len(activeConfig.ConnectPeers) > 0 {
activeConfig.DisableDNSSeed = true
activeConfig.TargetOutboundPeers = 0
}
// Add the default listener if none were specified. The default
@ -652,6 +653,15 @@ func loadConfig() (*Config, []string, error) {
}
}
// Disallow --addpeer and --connect used together
if len(activeConfig.AddPeers) > 0 && len(activeConfig.ConnectPeers) > 0 {
str := "%s: --addpeer and --connect can not be used together"
err := errors.Errorf(str, funcName)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
}
// Add default port to all added peer addresses if needed and remove
// duplicate addresses.
activeConfig.AddPeers, err = network.NormalizeAddresses(activeConfig.AddPeers,

View File

@ -0,0 +1,104 @@
package connmanager
import (
"time"
)
const (
minRetryDuration = 30 * time.Second
maxRetryDuration = 10 * time.Minute
)
func nextRetryDuration(previousDuration time.Duration) time.Duration {
if previousDuration < minRetryDuration {
return minRetryDuration
}
if previousDuration*2 > maxRetryDuration {
return maxRetryDuration
}
return previousDuration * 2
}
// checkRequestedConnections checks that all activeRequested are still active, and initiates connections
// for pendingRequested.
// While doing so, it filters out of connSet all connections that were initiated as a connectionRequest
func (c *ConnectionManager) checkRequestedConnections(connSet connectionSet) {
c.connectionRequestsLock.Lock()
defer c.connectionRequestsLock.Unlock()
now := time.Now()
for address, connReq := range c.activeRequested {
connection, ok := connSet.get(address)
if !ok { // a requested connection was disconnected
delete(c.activeRequested, address)
if connReq.isPermanent { // if is one-try - ignore. If permanent - add to pending list to retry
connReq.nextAttempt = now
connReq.retryDuration = 0
c.pendingRequested[address] = connReq
}
continue
}
connSet.remove(connection)
}
for address, connReq := range c.pendingRequested {
if connReq.nextAttempt.After(now) { // ignore connection requests which are still waiting for retry
continue
}
connection, ok := connSet.get(address)
// The pending connection request has already connected - move it to active
// This can happen in rare cases such as when the other side has connected to our node
// while it has been pending on our side.
if ok {
delete(c.pendingRequested, address)
c.pendingRequested[address] = connReq
connSet.remove(connection)
continue
}
// try to initiate connection
err := c.initiateConnection(connReq.address)
if err != nil {
log.Infof("Couldn't connect to %s: %s", address, err)
// if connection request is one try - remove from pending and ignore failure
if !connReq.isPermanent {
delete(c.pendingRequested, address)
continue
}
// if connection request is permanent - keep in pending, and increase retry time
connReq.retryDuration = nextRetryDuration(connReq.retryDuration)
connReq.nextAttempt = now.Add(connReq.retryDuration)
log.Debugf("Retrying permanent connection to %s in %s", address, connReq.retryDuration)
continue
}
// if connected successfully - move from pending to active
delete(c.pendingRequested, address)
c.activeRequested[address] = connReq
}
}
// AddConnectionRequest adds the given address to list of pending connection requests
func (c *ConnectionManager) AddConnectionRequest(address string, isPermanent bool) {
// spawn goroutine so that caller doesn't wait in case connectionManager is in the midst of handling
// connection requests
spawn(func() {
c.connectionRequestsLock.Lock()
defer c.connectionRequestsLock.Unlock()
if _, ok := c.activeRequested[address]; ok {
return
}
c.pendingRequested[address] = &connectionRequest{
address: address,
isPermanent: isPermanent,
}
})
}

View File

@ -0,0 +1,30 @@
package connmanager
import (
"github.com/kaspanet/kaspad/netadapter"
)
type connectionSet map[string]*netadapter.NetConnection
func (cs connectionSet) add(connection *netadapter.NetConnection) {
cs[connection.Address()] = connection
}
func (cs connectionSet) remove(connection *netadapter.NetConnection) {
delete(cs, connection.Address())
}
func (cs connectionSet) get(address string) (*netadapter.NetConnection, bool) {
connection, ok := cs[address]
return connection, ok
}
func convertToSet(connections []*netadapter.NetConnection) connectionSet {
connSet := make(connectionSet, len(connections))
for _, connection := range connections {
connSet[connection.Address()] = connection
}
return connSet
}

108
connmanager/connmanager.go Normal file
View File

@ -0,0 +1,108 @@
package connmanager
import (
"sync"
"sync/atomic"
"time"
"github.com/kaspanet/kaspad/addrmgr"
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/config"
)
// connectionRequest represents a user request (either through CLI or RPC) to connect to a certain node
type connectionRequest struct {
address string
isPermanent bool
nextAttempt time.Time
retryDuration time.Duration
}
// ConnectionManager monitors that the current active connections satisfy the requirements of
// outgoing, requested and incoming connections
type ConnectionManager struct {
netAdapter *netadapter.NetAdapter
addressManager *addrmgr.AddrManager
activeRequested map[string]*connectionRequest
pendingRequested map[string]*connectionRequest
activeOutgoing map[string]struct{}
targetOutgoing int
activeIncoming map[string]struct{}
maxIncoming int
stop uint32
connectionRequestsLock sync.Mutex
}
// New instantiates a new instance of a ConnectionManager
func New(netAdapter *netadapter.NetAdapter, addressManager *addrmgr.AddrManager) (*ConnectionManager, error) {
c := &ConnectionManager{
netAdapter: netAdapter,
addressManager: addressManager,
activeRequested: map[string]*connectionRequest{},
pendingRequested: map[string]*connectionRequest{},
activeOutgoing: map[string]struct{}{},
activeIncoming: map[string]struct{}{},
}
cfg := config.ActiveConfig()
connectPeers := cfg.AddPeers
if len(cfg.ConnectPeers) > 0 {
connectPeers = cfg.ConnectPeers
}
c.maxIncoming = cfg.MaxInboundPeers
c.targetOutgoing = cfg.TargetOutboundPeers
for _, connectPeer := range connectPeers {
c.pendingRequested[connectPeer] = &connectionRequest{
address: connectPeer,
isPermanent: true,
}
}
return c, nil
}
// Start begins the operation of the ConnectionManager
func (c *ConnectionManager) Start() {
spawn(c.connectionsLoop)
}
// Stop halts the operation of the ConnectionManager
func (c *ConnectionManager) Stop() {
atomic.StoreUint32(&c.stop, 1)
for _, connection := range c.netAdapter.Connections() {
_ = c.netAdapter.Disconnect(connection) // Ignore errors since connection might be in the midst of disconnecting
}
}
func (c *ConnectionManager) initiateConnection(address string) error {
log.Infof("Connecting to %s", address)
return c.netAdapter.Connect(address)
}
const connectionsLoopInterval = 30 * time.Second
func (c *ConnectionManager) connectionsLoop() {
for atomic.LoadUint32(&c.stop) == 0 {
connections := c.netAdapter.Connections()
// We convert the connections list to a set, so that connections can be found quickly
// Then we go over the set, classifying connection by category: requested, outgoing or incoming.
// Every step removes all matching connections so that once we get to checkIncomingConnections -
// the only connections left are the incoming ones
connSet := convertToSet(connections)
c.checkRequestedConnections(connSet)
c.checkOutgoingConnections(connSet)
c.checkIncomingConnections(connSet)
<-time.Tick(connectionsLoopInterval)
}
}

View File

@ -0,0 +1,23 @@
package connmanager
// checkIncomingConnections makes sure there's no more than maxIncoming incoming connections
// if there are - it randomly disconnects enough to go below that number
func (c *ConnectionManager) checkIncomingConnections(incomingConnectionSet connectionSet) {
if len(incomingConnectionSet) <= c.maxIncoming {
return
}
numConnectionsOverMax := len(incomingConnectionSet) - c.maxIncoming
// randomly disconnect nodes until the number of incoming connections is smaller than maxIncoming
for address, connection := range incomingConnectionSet {
err := c.netAdapter.Disconnect(connection)
if err != nil {
log.Errorf("Error disconnecting from %s: %+v", address, err)
}
numConnectionsOverMax--
if numConnectionsOverMax == 0 {
break
}
}
}

9
connmanager/log.go Normal file
View File

@ -0,0 +1,9 @@
package connmanager
import (
"github.com/kaspanet/kaspad/logger"
"github.com/kaspanet/kaspad/util/panics"
)
var log, _ = logger.Get(logger.SubsystemTags.CMGR)
var spawn = panics.GoroutineWrapperFunc(log)

View File

@ -0,0 +1,44 @@
package connmanager
// checkOutgoingConnections goes over all activeOutgoing and makes sure they are still active.
// Then it opens connections so that we have targetOutgoing active connections
func (c *ConnectionManager) checkOutgoingConnections(connSet connectionSet) {
for address := range c.activeOutgoing {
connection, ok := connSet.get(address)
if ok { // connection is still connected
connSet.remove(connection)
continue
}
// if connection is dead - remove from list of active ones
delete(c.activeOutgoing, address)
}
liveConnections := len(c.activeOutgoing)
if c.targetOutgoing == liveConnections {
return
}
log.Debugf("Have got %d outgoing connections out of target %d, adding %d more",
liveConnections, c.targetOutgoing, c.targetOutgoing-liveConnections)
for len(c.activeOutgoing) < c.targetOutgoing {
address := c.addressManager.GetAddress()
if address == nil {
log.Warnf("No more addresses available")
return
}
netAddress := address.NetAddress()
c.addressManager.Attempt(netAddress)
addressString := netAddress.TCPAddress().String()
err := c.initiateConnection(addressString)
if err != nil {
log.Infof("Couldn't connect to %s: %s", addressString, err)
continue
}
c.addressManager.Connected(address.NetAddress())
c.activeOutgoing[address.NetAddress().TCPAddress().String()] = struct{}{}
}
}

14
dnsseed/log.go Normal file
View File

@ -0,0 +1,14 @@
// Copyright (c) 2016 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package dnsseed
import (
"github.com/kaspanet/kaspad/logger"
"github.com/kaspanet/kaspad/util/panics"
)
var log, _ = logger.Get(logger.SubsystemTags.CMGR)
var spawn = panics.GoroutineWrapperFunc(log)
var spawnAfter = panics.AfterFuncWrapperFunc(log)

101
dnsseed/seed.go Normal file
View File

@ -0,0 +1,101 @@
// Copyright (c) 2016 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package dnsseed
import (
"fmt"
"math/rand"
"net"
"strconv"
"time"
"github.com/kaspanet/kaspad/util/mstime"
"github.com/kaspanet/kaspad/config"
"github.com/kaspanet/kaspad/util/subnetworkid"
"github.com/kaspanet/kaspad/dagconfig"
"github.com/kaspanet/kaspad/wire"
)
const (
// These constants are used by the DNS seed code to pick a random last
// seen time.
secondsIn3Days int32 = 24 * 60 * 60 * 3
secondsIn4Days int32 = 24 * 60 * 60 * 4
// SubnetworkIDPrefixChar is the prefix of subnetworkID, when building a DNS seed request
SubnetworkIDPrefixChar byte = 'n'
// ServiceFlagPrefixChar is the prefix of service flag, when building a DNS seed request
ServiceFlagPrefixChar byte = 'x'
)
// OnSeed is the signature of the callback function which is invoked when DNS
// seeding is successful.
type OnSeed func(addrs []*wire.NetAddress)
// LookupFunc is the signature of the DNS lookup function.
type LookupFunc func(string) ([]net.IP, error)
// SeedFromDNS uses DNS seeding to populate the address manager with peers.
func SeedFromDNS(dagParams *dagconfig.Params, reqServices wire.ServiceFlag, includeAllSubnetworks bool,
subnetworkID *subnetworkid.SubnetworkID, lookupFn LookupFunc, seedFn OnSeed) {
var dnsSeeds []string
cfg := config.ActiveConfig()
if cfg != nil && cfg.DNSSeed != "" {
dnsSeeds = []string{cfg.DNSSeed}
} else {
dnsSeeds = dagParams.DNSSeeds
}
for _, dnsseed := range dnsSeeds {
var host string
if reqServices == wire.SFNodeNetwork {
host = dnsseed
} else {
host = fmt.Sprintf("%c%x.%s", ServiceFlagPrefixChar, uint64(reqServices), dnsseed)
}
if !includeAllSubnetworks {
if subnetworkID != nil {
host = fmt.Sprintf("%c%s.%s", SubnetworkIDPrefixChar, subnetworkID, host)
} else {
host = fmt.Sprintf("%c.%s", SubnetworkIDPrefixChar, host)
}
}
spawn(func() {
randSource := rand.New(rand.NewSource(time.Now().UnixNano()))
seedPeers, err := lookupFn(host)
if err != nil {
log.Infof("DNS discovery failed on seed %s: %s", host, err)
return
}
numPeers := len(seedPeers)
log.Infof("%d addresses found from DNS seed %s", numPeers, host)
if numPeers == 0 {
return
}
addresses := make([]*wire.NetAddress, len(seedPeers))
// if this errors then we have *real* problems
intPort, _ := strconv.Atoi(dagParams.DefaultPort)
for i, peer := range seedPeers {
addresses[i] = wire.NewNetAddressTimestamp(
// seed with addresses from a time randomly selected
// between 3 and 7 days ago.
mstime.Now().Add(-1*time.Second*time.Duration(secondsIn3Days+
randSource.Int31n(secondsIn4Days))),
0, peer, uint16(intPort))
}
seedFn(addresses)
})
}
}

View File

@ -2,9 +2,17 @@ package main
import (
"fmt"
"sync/atomic"
"github.com/kaspanet/kaspad/dnsseed"
"github.com/kaspanet/kaspad/wire"
"github.com/kaspanet/kaspad/connmanager"
"github.com/kaspanet/kaspad/addrmgr"
"github.com/kaspanet/kaspad/server/serverutils"
"sync/atomic"
"github.com/kaspanet/kaspad/netadapter"
"github.com/kaspanet/kaspad/util/panics"
@ -22,8 +30,10 @@ import (
// kaspad is a wrapper for all the kaspad services
type kaspad struct {
rpcServer *rpc.Server
protocolManager *protocol.Manager
rpcServer *rpc.Server
addressManager *addrmgr.AddrManager
networkAdapter *netadapter.NetAdapter
connectionManager *connmanager.ConnectionManager
started, shutdown int32
}
@ -39,16 +49,32 @@ func (s *kaspad) start() {
cfg := config.ActiveConfig()
err := s.protocolManager.Start()
err := s.networkAdapter.Start()
if err != nil {
panics.Exit(log, fmt.Sprintf("Error starting the p2p protocol: %+v", err))
}
maybeSeedFromDNS(cfg, s.addressManager)
s.connectionManager.Start()
if !cfg.DisableRPC {
s.rpcServer.Start()
}
}
func maybeSeedFromDNS(cfg *config.Config, addressManager *addrmgr.AddrManager) {
if !cfg.DisableDNSSeed {
dnsseed.SeedFromDNS(cfg.NetParams(), wire.SFNodeNetwork, false, nil,
config.ActiveConfig().Lookup, func(addresses []*wire.NetAddress) {
// Kaspad uses a lookup of the dns seeder here. Since seeder returns
// IPs of nodes and not its own IP, we can not know real IP of
// source. So we'll take first returned address as source.
addressManager.AddAddresses(addresses, addresses[0], nil)
})
}
}
// stop gracefully shuts down all the kaspad services.
func (s *kaspad) stop() error {
// Make sure this only happens once.
@ -59,7 +85,9 @@ func (s *kaspad) stop() error {
log.Warnf("Kaspad shutting down")
err := s.protocolManager.Stop()
s.connectionManager.Stop()
err := s.networkAdapter.Stop()
if err != nil {
log.Errorf("Error stopping the p2p protocol: %+v", err)
}
@ -78,10 +106,12 @@ func (s *kaspad) stop() error {
// newKaspad returns a new kaspad instance configured to listen on addr for the
// kaspa network type specified by dagParams. Use start to begin accepting
// connections from peers.
func newKaspad(listenAddrs []string, interrupt <-chan struct{}) (*kaspad, error) {
func newKaspad(interrupt <-chan struct{}) (*kaspad, error) {
cfg := config.ActiveConfig()
indexManager, acceptanceIndex := setupIndexes()
sigCache := txscript.NewSigCache(config.ActiveConfig().SigCacheMaxSize)
sigCache := txscript.NewSigCache(cfg.SigCacheMaxSize)
// Create a new block DAG instance with the appropriate configuration.
dag, err := setupDAG(interrupt, sigCache, indexManager)
@ -91,8 +121,15 @@ func newKaspad(listenAddrs []string, interrupt <-chan struct{}) (*kaspad, error)
txMempool := setupMempool(dag, sigCache)
netAdapter, err := netadapter.NewNetAdapter(cfg.Listeners)
if err != nil {
return nil, err
}
addressManager := addrmgr.New(serverutils.KaspadLookup, config.ActiveConfig().SubnetworkID)
protocolManager, err := protocol.NewManager(listenAddrs, dag, addressManager)
protocol.Init(netAdapter, addressManager, dag)
connectionManager, err := connmanager.New(netAdapter, addressManager)
if err != nil {
return nil, err
}
@ -103,8 +140,9 @@ func newKaspad(listenAddrs []string, interrupt <-chan struct{}) (*kaspad, error)
}
return &kaspad{
rpcServer: rpcServer,
protocolManager: protocolManager,
rpcServer: rpcServer,
networkAdapter: netAdapter,
connectionManager: connectionManager,
}, nil
}

View File

@ -54,6 +54,7 @@ var (
blkrLog = BackendLog.Logger("BLKR")
gbrlLog = BackendLog.Logger("GBRL")
blprLog = BackendLog.Logger("BLPR")
dnssLog = BackendLog.Logger("DNSS")
snvrLog = BackendLog.Logger("SNVR")
)
@ -85,6 +86,7 @@ var SubsystemTags = struct {
NTAR,
GBRL,
BLPR,
DNSS,
SNVR string
}{
ADXR: "ADXR",
@ -113,6 +115,7 @@ var SubsystemTags = struct {
GBRL: "GBRL",
NTAR: "NTAR",
BLPR: "BLPR",
DNSS: "DNSS",
SNVR: "SNVR",
}
@ -144,6 +147,7 @@ var subsystemLoggers = map[string]*logs.Logger{
SubsystemTags.GBRL: gbrlLog,
SubsystemTags.NTAR: ntarLog,
SubsystemTags.BLPR: blprLog,
SubsystemTags.DNSS: dnssLog,
SubsystemTags.SNVR: snvrLog,
}

43
main.go
View File

@ -11,7 +11,6 @@ import (
"path/filepath"
"runtime"
"runtime/pprof"
"strings"
"time"
"github.com/kaspanet/kaspad/dbaccess"
@ -32,10 +31,6 @@ const (
blockDbNamePrefix = "blocks"
)
var (
cfg *config.Config
)
// winServiceMain is only invoked on Windows. It detects when kaspad is running
// as a service and reacts accordingly.
var winServiceMain func() (bool, error)
@ -52,7 +47,7 @@ func kaspadMain(startedChan chan<- struct{}) error {
if err != nil {
return err
}
cfg = config.ActiveConfig()
cfg := config.ActiveConfig()
defer panics.HandlePanic(log, nil)
// Get a channel that will be closed when a shutdown signal has been
@ -131,10 +126,9 @@ func kaspadMain(startedChan chan<- struct{}) error {
}
// Create kaspad and start it.
kaspad, err := newKaspad(cfg.Listeners, interrupt)
kaspad, err := newKaspad(interrupt)
if err != nil {
log.Errorf("Unable to start kaspad on %s: %+v",
strings.Join(cfg.Listeners, ", "), err)
log.Errorf("Unable to start kaspad: %+v", err)
return err
}
defer func() {
@ -169,37 +163,12 @@ func kaspadMain(startedChan chan<- struct{}) error {
}
func removeDatabase() error {
dbPath := blockDbPath(cfg.DbType)
dbPath := blockDbPath(config.ActiveConfig().DbType)
return os.RemoveAll(dbPath)
}
// removeRegressionDB removes the existing regression test database if running
// in regression test mode and it already exists.
func removeRegressionDB(dbPath string) error {
// Don't do anything if not in regression test mode.
if !cfg.RegressionTest {
return nil
}
// Remove the old regression test database if it already exists.
fi, err := os.Stat(dbPath)
if err == nil {
log.Infof("Removing regression test database from '%s'", dbPath)
if fi.IsDir() {
err := os.RemoveAll(dbPath)
if err != nil {
return err
}
} else {
err := os.Remove(dbPath)
if err != nil {
return err
}
}
}
return nil
}
// dbPath returns the path to the block database given a database type.
func blockDbPath(dbType string) string {
@ -208,12 +177,12 @@ func blockDbPath(dbType string) string {
if dbType == "sqlite" {
dbName = dbName + ".db"
}
dbPath := filepath.Join(cfg.DataDir, dbName)
dbPath := filepath.Join(config.ActiveConfig().DataDir, dbName)
return dbPath
}
func openDB() error {
dbPath := filepath.Join(cfg.DataDir, "db")
dbPath := filepath.Join(config.ActiveConfig().DataDir, "db")
log.Infof("Loading database from '%s'", dbPath)
err := dbaccess.Open(dbPath)
if err != nil {

View File

@ -1,14 +1,14 @@
package netadapter
import (
"github.com/kaspanet/kaspad/netadapter/id"
routerpkg "github.com/kaspanet/kaspad/netadapter/router"
"net"
"strconv"
"sync"
"sync/atomic"
"github.com/kaspanet/kaspad/config"
"github.com/kaspanet/kaspad/netadapter/id"
routerpkg "github.com/kaspanet/kaspad/netadapter/router"
"github.com/kaspanet/kaspad/netadapter/server"
"github.com/kaspanet/kaspad/netadapter/server/grpcserver"
"github.com/kaspanet/kaspad/wire"
@ -30,8 +30,8 @@ type NetAdapter struct {
routerInitializer RouterInitializer
stop uint32
routersToConnections map[*routerpkg.Router]server.Connection
connectionsToIDs map[server.Connection]*id.ID
routersToConnections map[*routerpkg.Router]*NetConnection
connectionsToIDs map[*NetConnection]*id.ID
idsToRouters map[*id.ID]*routerpkg.Router
sync.RWMutex
}
@ -51,13 +51,12 @@ func NewNetAdapter(listeningAddrs []string) (*NetAdapter, error) {
id: netAdapterID,
server: s,
routersToConnections: make(map[*routerpkg.Router]server.Connection),
connectionsToIDs: make(map[server.Connection]*id.ID),
routersToConnections: make(map[*routerpkg.Router]*NetConnection),
connectionsToIDs: make(map[*NetConnection]*id.ID),
idsToRouters: make(map[*id.ID]*routerpkg.Router),
}
onConnectedHandler := adapter.newOnConnectedHandler()
adapter.server.SetOnConnectedHandler(onConnectedHandler)
adapter.server.SetOnConnectedHandler(adapter.onConnectedHandler)
return &adapter, nil
}
@ -69,15 +68,6 @@ func (na *NetAdapter) Start() error {
return err
}
// TODO(libp2p): Replace with real connection manager
cfg := config.ActiveConfig()
for _, connectPeer := range cfg.ConnectPeers {
_, err := na.server.Connect(connectPeer)
if err != nil {
log.Errorf("Error connecting to %s: %+v", connectPeer, err)
}
}
return nil
}
@ -89,55 +79,76 @@ func (na *NetAdapter) Stop() error {
return na.server.Stop()
}
func (na *NetAdapter) newOnConnectedHandler() server.OnConnectedHandler {
return func(connection server.Connection) error {
router, err := na.routerInitializer()
if err != nil {
return err
}
connection.Start(router)
na.routersToConnections[router] = connection
// Connect tells the NetAdapter's underlying server to initiate a connection
// to the given address
func (na *NetAdapter) Connect(address string) error {
_, err := na.server.Connect(address)
return err
}
router.SetOnRouteCapacityReachedHandler(func() {
err := connection.Disconnect()
if err != nil {
if !errors.Is(err, server.ErrNetwork) {
panic(err)
}
log.Warnf("Failed to disconnect from %s", connection)
}
})
connection.SetOnDisconnectedHandler(func() error {
na.cleanupConnection(connection, router)
na.server.RemoveConnection(connection)
return router.Close()
})
na.server.AddConnection(connection)
return nil
// Connections returns a list of connections currently connected and active
func (na *NetAdapter) Connections() []*NetConnection {
netConnections := make([]*NetConnection, 0, len(na.connectionsToIDs))
for netConnection := range na.connectionsToIDs {
netConnections = append(netConnections, netConnection)
}
return netConnections
}
func (na *NetAdapter) onConnectedHandler(connection server.Connection) error {
router, err := na.routerInitializer()
if err != nil {
return err
}
connection.Start(router)
netConnection := newNetConnection(connection, nil)
na.routersToConnections[router] = netConnection
na.connectionsToIDs[netConnection] = nil
router.SetOnRouteCapacityReachedHandler(func() {
err := connection.Disconnect()
if err != nil {
if !errors.Is(err, server.ErrNetwork) {
panic(err)
}
log.Warnf("Failed to disconnect from %s", connection)
}
})
connection.SetOnDisconnectedHandler(func() error {
na.cleanupConnection(netConnection, router)
return router.Close()
})
return nil
}
// AssociateRouterID associates the connection for the given router
// with the given ID
func (na *NetAdapter) AssociateRouterID(router *routerpkg.Router, id *id.ID) error {
connection, ok := na.routersToConnections[router]
netConnection, ok := na.routersToConnections[router]
if !ok {
return errors.Errorf("router not registered for id %s", id)
}
na.connectionsToIDs[connection] = id
netConnection.id = id
na.connectionsToIDs[netConnection] = id
na.idsToRouters[id] = router
return nil
}
func (na *NetAdapter) cleanupConnection(connection server.Connection, router *routerpkg.Router) {
connectionID, ok := na.connectionsToIDs[connection]
func (na *NetAdapter) cleanupConnection(netConnection *NetConnection, router *routerpkg.Router) {
connectionID, ok := na.connectionsToIDs[netConnection]
if !ok {
return
}
delete(na.routersToConnections, router)
delete(na.connectionsToIDs, connection)
delete(na.connectionsToIDs, netConnection)
delete(na.idsToRouters, connectionID)
}
@ -228,13 +239,18 @@ func (na *NetAdapter) GetBestLocalAddress() (*wire.NetAddress, error) {
// DisconnectAssociatedConnection disconnects from the connection associated with the given router.
func (na *NetAdapter) DisconnectAssociatedConnection(router *routerpkg.Router) error {
connection := na.routersToConnections[router]
err := connection.Disconnect()
netConnection := na.routersToConnections[router]
return na.Disconnect(netConnection)
}
// Disconnect disconnects the given connection
func (na *NetAdapter) Disconnect(netConnection *NetConnection) error {
err := netConnection.connection.Disconnect()
if err != nil {
if !errors.Is(err, server.ErrNetwork) {
return err
}
log.Warnf("Error disconnecting from %s: %s", connection, err)
log.Warnf("Error disconnecting from %s: %s", netConnection, err)
}
return nil
}

View File

@ -0,0 +1,35 @@
package netadapter
import (
"fmt"
"github.com/kaspanet/kaspad/netadapter/id"
"github.com/kaspanet/kaspad/netadapter/server"
)
// NetConnection is a wrapper to a server connection for use by services external to NetAdapter
type NetConnection struct {
connection server.Connection
id *id.ID
}
func newNetConnection(connection server.Connection, id *id.ID) *NetConnection {
return &NetConnection{
connection: connection,
id: id,
}
}
func (c *NetConnection) String() string {
return fmt.Sprintf("<%s: %s>", c.id, c.connection)
}
// ID returns the ID associated with this connection
func (c *NetConnection) ID() *id.ID {
return c.id
}
// Address returns the address associated with this connection
func (c *NetConnection) Address() string {
return c.connection.Address().String()
}

View File

@ -37,18 +37,8 @@ func (c *gRPCConnection) sendLoop() error {
if err != nil {
return err
}
err = func() error {
c.writeToErrChanDuringDisconnectLock.Lock()
defer c.writeToErrChanDuringDisconnectLock.Unlock()
err := c.stream.Send(messageProto)
if c.IsConnected() {
c.errChan <- err
if err != nil {
return err
}
}
return nil
}()
err = c.stream.Send(messageProto)
if err != nil {
return err
}

View File

@ -1,11 +1,11 @@
package grpcserver
import (
"net"
"sync/atomic"
"github.com/kaspanet/kaspad/netadapter/router"
"github.com/kaspanet/kaspad/netadapter/server/grpcserver/protowire"
"net"
"sync"
"sync/atomic"
"github.com/kaspanet/kaspad/netadapter/server"
"google.golang.org/grpc"
@ -18,11 +18,9 @@ type gRPCConnection struct {
stream grpcStream
router *router.Router
writeToErrChanDuringDisconnectLock sync.Mutex
errChan chan error
stopChan chan struct{}
clientConn grpc.ClientConn
onDisconnectedHandler server.OnDisconnectedHandler
stopChan chan struct{}
clientConn grpc.ClientConn
onDisconnectedHandler server.OnDisconnectedHandler
isConnected uint32
}
@ -33,7 +31,6 @@ func newConnection(server *gRPCServer, address net.Addr, isOutbound bool, stream
address: address,
isOutbound: isOutbound,
stream: stream,
errChan: make(chan error),
stopChan: make(chan struct{}),
isConnected: 1,
}
@ -74,9 +71,6 @@ func (c *gRPCConnection) Disconnect() error {
}
atomic.StoreUint32(&c.isConnected, 0)
c.writeToErrChanDuringDisconnectLock.Lock()
defer c.writeToErrChanDuringDisconnectLock.Unlock()
close(c.errChan)
close(c.stopChan)
if c.isOutbound {
@ -84,6 +78,8 @@ func (c *gRPCConnection) Disconnect() error {
_ = clientStream.CloseSend() // ignore error because we don't really know what's the status of the connection
}
log.Debugf("Disconnected from %s", c)
return c.onDisconnectedHandler()
}

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"time"
"google.golang.org/grpc/peer"
@ -16,7 +17,6 @@ import (
type gRPCServer struct {
onConnectedHandler server.OnConnectedHandler
connections map[string]*gRPCConnection
listeningAddrs []string
server *grpc.Server
}
@ -27,7 +27,6 @@ func NewGRPCServer(listeningAddrs []string) (server.Server, error) {
s := &gRPCServer{
server: grpc.NewServer(),
listeningAddrs: listeningAddrs,
connections: map[string]*gRPCConnection{},
}
protowire.RegisterP2PServer(s.server, newP2PServer(s))
@ -63,12 +62,6 @@ func (s *gRPCServer) listenOn(listenAddr string) error {
}
func (s *gRPCServer) Stop() error {
for _, connection := range s.connections {
err := connection.Disconnect()
if err != nil {
log.Errorf("error closing connection to %s: %+v", connection, err)
}
}
s.server.GracefulStop()
return nil
}
@ -83,10 +76,16 @@ func (s *gRPCServer) SetOnConnectedHandler(onConnectedHandler server.OnConnected
// This is part of the Server interface
func (s *gRPCServer) Connect(address string) (server.Connection, error) {
log.Infof("Dialing to %s", address)
gRPCConnection, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
const dialTimeout = 30 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
defer cancel()
gRPCConnection, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return nil, errors.Wrapf(err, "error connecting to %s", address)
}
client := protowire.NewP2PClient(gRPCConnection)
stream, err := client.MessageStream(context.Background())
if err != nil {
@ -109,31 +108,3 @@ func (s *gRPCServer) Connect(address string) (server.Connection, error) {
return connection, nil
}
// Connections returns a slice of connections the server
// is currently connected to.
// This is part of the Server interface
func (s *gRPCServer) Connections() []server.Connection {
result := make([]server.Connection, 0, len(s.connections))
for _, conn := range s.connections {
result = append(result, conn)
}
return result
}
// AddConnection adds the provided connection to the connection list
func (s *gRPCServer) AddConnection(connection server.Connection) error {
conn := connection.(*gRPCConnection)
s.connections[conn.String()] = conn
return nil
}
// RemoveConnection removes the provided connection from the connection list
func (s *gRPCServer) RemoveConnection(connection server.Connection) error {
delete(s.connections, connection.String())
return nil
}

View File

@ -2,9 +2,11 @@ package server
import (
"fmt"
"github.com/kaspanet/kaspad/netadapter/router"
"github.com/pkg/errors"
"net"
"github.com/pkg/errors"
"github.com/kaspanet/kaspad/netadapter/router"
)
// OnConnectedHandler is a function that is to be called
@ -18,13 +20,9 @@ type OnDisconnectedHandler func() error
// Server represents a p2p server.
type Server interface {
Connect(address string) (Connection, error)
Connections() []Connection
Start() error
Stop() error
SetOnConnectedHandler(onConnectedHandler OnConnectedHandler)
// TODO(libp2p): Move AddConnection and RemoveConnection to connection manager
AddConnection(connection Connection) error
RemoveConnection(connection Connection) error
}
// Connection represents a p2p server connection.

View File

@ -1,6 +1,9 @@
package protocol
import (
"sync"
"sync/atomic"
"github.com/kaspanet/kaspad/addrmgr"
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/netadapter"
@ -11,8 +14,6 @@ import (
"github.com/kaspanet/kaspad/util/locks"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
"sync"
"sync/atomic"
)
func handshake(router *routerpkg.Router, netAdapter *netadapter.NetAdapter, peer *peerpkg.Peer,
@ -100,6 +101,7 @@ func handshake(router *routerpkg.Router, netAdapter *netadapter.NetAdapter, peer
panic(err)
}
addressManager.AddAddress(peerAddress, peerAddress, subnetworkID)
addressManager.Good(peerAddress, subnetworkID)
}
err = router.RemoveRoute([]wire.MessageCommand{wire.CmdVersion, wire.CmdVerAck})

View File

@ -1,15 +1,16 @@
package peer
import (
"sync"
"sync/atomic"
"time"
"github.com/kaspanet/kaspad/netadapter/id"
"github.com/kaspanet/kaspad/util/daghash"
mathUtil "github.com/kaspanet/kaspad/util/math"
"github.com/kaspanet/kaspad/util/subnetworkid"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
"sync"
"sync/atomic"
"time"
)
// Peer holds data about a peer.
@ -85,7 +86,7 @@ func (p *Peer) UpdateFieldsFromMsgVersion(msg *wire.MsgVersion) {
p.advertisedProtocolVer = msg.ProtocolVersion
p.protocolVersion = mathUtil.MinUint32(p.protocolVersion, p.advertisedProtocolVer)
log.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p)
p.protocolVersion, p.id)
// Set the peer's ID.
p.id = msg.ID

View File

@ -1,6 +1,9 @@
package protocol
import (
"errors"
"sync/atomic"
"github.com/kaspanet/kaspad/addrmgr"
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/netadapter"
@ -13,41 +16,12 @@ import (
"github.com/kaspanet/kaspad/protocol/receiveaddresses"
"github.com/kaspanet/kaspad/protocol/sendaddresses"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
"sync/atomic"
)
// Manager manages the p2p protocol
type Manager struct {
netAdapter *netadapter.NetAdapter
}
// NewManager creates a new instance of the p2p protocol manager
func NewManager(listeningAddresses []string, dag *blockdag.BlockDAG,
addressManager *addrmgr.AddrManager) (*Manager, error) {
netAdapter, err := netadapter.NewNetAdapter(listeningAddresses)
if err != nil {
return nil, err
}
// Init initializes the p2p protocol
func Init(netAdapter *netadapter.NetAdapter, addressManager *addrmgr.AddrManager, dag *blockdag.BlockDAG) {
routerInitializer := newRouterInitializer(netAdapter, addressManager, dag)
netAdapter.SetRouterInitializer(routerInitializer)
manager := Manager{
netAdapter: netAdapter,
}
return &manager, nil
}
// Start starts the p2p protocol
func (p *Manager) Start() error {
return p.netAdapter.Start()
}
// Stop stops the p2p protocol
func (p *Manager) Stop() error {
return p.netAdapter.Stop()
}
func newRouterInitializer(netAdapter *netadapter.NetAdapter,

View File

@ -5,14 +5,13 @@
package wire
import (
"bytes"
"fmt"
"io"
"strings"
"github.com/kaspanet/kaspad/netadapter/id"
"github.com/kaspanet/kaspad/util/mstime"
"github.com/kaspanet/kaspad/version"
"github.com/pkg/errors"
"io"
"strings"
"github.com/kaspanet/kaspad/util/daghash"
"github.com/kaspanet/kaspad/util/subnetworkid"
@ -82,13 +81,7 @@ func (msg *MsgVersion) AddService(service ServiceFlag) {
//
// This is part of the Message interface implementation.
func (msg *MsgVersion) KaspaDecode(r io.Reader, pver uint32) error {
buf, ok := r.(*bytes.Buffer)
if !ok {
return errors.Errorf("MsgVersion.KaspaDecode reader is not a " +
"*bytes.Buffer")
}
err := readElements(buf, &msg.ProtocolVersion, &msg.Services,
err := readElements(r, &msg.ProtocolVersion, &msg.Services,
(*int64Time)(&msg.Timestamp))
if err != nil {
return err
@ -119,18 +112,18 @@ func (msg *MsgVersion) KaspaDecode(r io.Reader, pver uint32) error {
if hasAddress {
msg.Address = new(NetAddress)
err = readNetAddress(buf, pver, msg.Address, false)
err = readNetAddress(r, pver, msg.Address, false)
if err != nil {
return err
}
}
msg.ID = new(id.ID)
err = ReadElement(buf, msg.ID)
err = ReadElement(r, msg.ID)
if err != nil {
return err
}
userAgent, err := ReadVarString(buf, pver)
userAgent, err := ReadVarString(r, pver)
if err != nil {
return err
}
@ -141,7 +134,7 @@ func (msg *MsgVersion) KaspaDecode(r io.Reader, pver uint32) error {
msg.UserAgent = userAgent
msg.SelectedTipHash = &daghash.Hash{}
err = ReadElement(buf, msg.SelectedTipHash)
err = ReadElement(r, msg.SelectedTipHash)
if err != nil {
return err
}