From b42b8b16fdb38cda4251f540e76cd89c2d862ab8 Mon Sep 17 00:00:00 2001 From: Svarog Date: Thu, 16 Jul 2020 17:15:58 +0300 Subject: [PATCH] [NOD-1120] Connection Manager (#796) * [NOD-1120] Removed closure in NetAdapter.onConnectedHanlder * [NOD-1120] Implement all connection manager methods * [NOD-1120] Integrated connmanager into kaspad + added call for dnsseeder * [NOD-1120] Allow buffer to not be bytes.Buffer * [NOD-1120] Added timeout to connect * [NOD-1120] Don't enter connections to add loop if none needed * [NOD-1120] Add call for addressManager.Good * [NOD-1120] Minor bug fixes * [NOD-1120] Remove errChan from grpcConnection * [NOD-1120] Add comments to exported methods * [NOD-1120] cancel the context for DialContext in gRPCServer.Connect * [NOD-1120] Don't try to remove from connSet a connection that doesn't exist * [NOD-1120] add ok bool to connectionSet.get * [NOD-1120] Remove overuse of if-else in checkConnectionRequests * [NOD-1120] Made some order in ConnectionManager * [NOD-1120] Moved checkIncomingConnections to it's own file * [NOD-1120] cleanup in checkOutgoingConnections * [NOD-1120] Cleanup in SeedDNS, and move call outside of connection manager * [NOD-1120] Add check that both --connect and --addpeer aren't used * [NOD-1120] Move dial timeout to constant * [NOD-1120] Enhance comment * [NOD-1120] Log connection failure out of initiateConnection * [NOD-1148] Reshuffle checkRequestedConnections to make more sense * [NOD-1120] Move continue to correct place + reshuffle logging code * [NOD-1120] Don't expose server.Connection outside netAdapter - expose a wrapper instead * [NOD-1120] Add comments * [NOD-1120] Don't return the connection from netAdapter.Connect() * [NOD-1120] Use .Address as key for connectionSet * [NOD-1120] Fix minRetryDuration usage * [NOD-1120] Remove the correct number of incoming connections * [NOD-1120] Add comment * [NOD-1120] Rename connSet -> incomingConnectionSet * [NOD-1120] fix grammar --- config/config.go | 12 +- connmanager/connection_requests.go | 104 ++++++++++++++++ connmanager/connection_set.go | 30 +++++ connmanager/connmanager.go | 108 +++++++++++++++++ connmanager/incoming_connections.go | 23 ++++ connmanager/log.go | 9 ++ connmanager/outgoing_connections.go | 44 +++++++ dnsseed/log.go | 14 +++ dnsseed/seed.go | 101 ++++++++++++++++ kaspad.go | 58 +++++++-- logger/logger.go | 4 + main.go | 43 +------ netadapter/netadapter.go | 114 ++++++++++-------- netadapter/netconnection.go | 35 ++++++ .../server/grpcserver/connection_loops.go | 14 +-- .../server/grpcserver/grpc_connection.go | 20 ++- netadapter/server/grpcserver/grpc_server.go | 45 ++----- netadapter/server/server.go | 10 +- protocol/handshake.go | 6 +- protocol/peer/peer.go | 9 +- protocol/protocol.go | 36 +----- wire/msgversion.go | 23 ++-- 22 files changed, 646 insertions(+), 216 deletions(-) create mode 100644 connmanager/connection_requests.go create mode 100644 connmanager/connection_set.go create mode 100644 connmanager/connmanager.go create mode 100644 connmanager/incoming_connections.go create mode 100644 connmanager/log.go create mode 100644 connmanager/outgoing_connections.go create mode 100644 dnsseed/log.go create mode 100644 dnsseed/seed.go create mode 100644 netadapter/netconnection.go diff --git a/config/config.go b/config/config.go index 97794a9ba..acf256c97 100644 --- a/config/config.go +++ b/config/config.go @@ -476,9 +476,10 @@ func loadConfig() (*Config, []string, error) { activeConfig.DisableListen = true } - // Connect means no DNS seeding. + // ConnectPeers means no DNS seeding and no outbound peers if len(activeConfig.ConnectPeers) > 0 { activeConfig.DisableDNSSeed = true + activeConfig.TargetOutboundPeers = 0 } // Add the default listener if none were specified. The default @@ -652,6 +653,15 @@ func loadConfig() (*Config, []string, error) { } } + // Disallow --addpeer and --connect used together + if len(activeConfig.AddPeers) > 0 && len(activeConfig.ConnectPeers) > 0 { + str := "%s: --addpeer and --connect can not be used together" + err := errors.Errorf(str, funcName) + fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr, usageMessage) + return nil, nil, err + } + // Add default port to all added peer addresses if needed and remove // duplicate addresses. activeConfig.AddPeers, err = network.NormalizeAddresses(activeConfig.AddPeers, diff --git a/connmanager/connection_requests.go b/connmanager/connection_requests.go new file mode 100644 index 000000000..79c882a43 --- /dev/null +++ b/connmanager/connection_requests.go @@ -0,0 +1,104 @@ +package connmanager + +import ( + "time" +) + +const ( + minRetryDuration = 30 * time.Second + maxRetryDuration = 10 * time.Minute +) + +func nextRetryDuration(previousDuration time.Duration) time.Duration { + if previousDuration < minRetryDuration { + return minRetryDuration + } + if previousDuration*2 > maxRetryDuration { + return maxRetryDuration + } + return previousDuration * 2 +} + +// checkRequestedConnections checks that all activeRequested are still active, and initiates connections +// for pendingRequested. +// While doing so, it filters out of connSet all connections that were initiated as a connectionRequest +func (c *ConnectionManager) checkRequestedConnections(connSet connectionSet) { + c.connectionRequestsLock.Lock() + defer c.connectionRequestsLock.Unlock() + + now := time.Now() + + for address, connReq := range c.activeRequested { + connection, ok := connSet.get(address) + if !ok { // a requested connection was disconnected + delete(c.activeRequested, address) + + if connReq.isPermanent { // if is one-try - ignore. If permanent - add to pending list to retry + connReq.nextAttempt = now + connReq.retryDuration = 0 + c.pendingRequested[address] = connReq + } + continue + } + + connSet.remove(connection) + } + + for address, connReq := range c.pendingRequested { + if connReq.nextAttempt.After(now) { // ignore connection requests which are still waiting for retry + continue + } + + connection, ok := connSet.get(address) + // The pending connection request has already connected - move it to active + // This can happen in rare cases such as when the other side has connected to our node + // while it has been pending on our side. + if ok { + delete(c.pendingRequested, address) + c.pendingRequested[address] = connReq + + connSet.remove(connection) + + continue + } + + // try to initiate connection + err := c.initiateConnection(connReq.address) + if err != nil { + log.Infof("Couldn't connect to %s: %s", address, err) + // if connection request is one try - remove from pending and ignore failure + if !connReq.isPermanent { + delete(c.pendingRequested, address) + continue + } + // if connection request is permanent - keep in pending, and increase retry time + connReq.retryDuration = nextRetryDuration(connReq.retryDuration) + connReq.nextAttempt = now.Add(connReq.retryDuration) + log.Debugf("Retrying permanent connection to %s in %s", address, connReq.retryDuration) + continue + } + + // if connected successfully - move from pending to active + delete(c.pendingRequested, address) + c.activeRequested[address] = connReq + } +} + +// AddConnectionRequest adds the given address to list of pending connection requests +func (c *ConnectionManager) AddConnectionRequest(address string, isPermanent bool) { + // spawn goroutine so that caller doesn't wait in case connectionManager is in the midst of handling + // connection requests + spawn(func() { + c.connectionRequestsLock.Lock() + defer c.connectionRequestsLock.Unlock() + + if _, ok := c.activeRequested[address]; ok { + return + } + + c.pendingRequested[address] = &connectionRequest{ + address: address, + isPermanent: isPermanent, + } + }) +} diff --git a/connmanager/connection_set.go b/connmanager/connection_set.go new file mode 100644 index 000000000..a37d13572 --- /dev/null +++ b/connmanager/connection_set.go @@ -0,0 +1,30 @@ +package connmanager + +import ( + "github.com/kaspanet/kaspad/netadapter" +) + +type connectionSet map[string]*netadapter.NetConnection + +func (cs connectionSet) add(connection *netadapter.NetConnection) { + cs[connection.Address()] = connection +} + +func (cs connectionSet) remove(connection *netadapter.NetConnection) { + delete(cs, connection.Address()) +} + +func (cs connectionSet) get(address string) (*netadapter.NetConnection, bool) { + connection, ok := cs[address] + return connection, ok +} + +func convertToSet(connections []*netadapter.NetConnection) connectionSet { + connSet := make(connectionSet, len(connections)) + + for _, connection := range connections { + connSet[connection.Address()] = connection + } + + return connSet +} diff --git a/connmanager/connmanager.go b/connmanager/connmanager.go new file mode 100644 index 000000000..48f982cdf --- /dev/null +++ b/connmanager/connmanager.go @@ -0,0 +1,108 @@ +package connmanager + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/kaspanet/kaspad/addrmgr" + "github.com/kaspanet/kaspad/netadapter" + + "github.com/kaspanet/kaspad/config" +) + +// connectionRequest represents a user request (either through CLI or RPC) to connect to a certain node +type connectionRequest struct { + address string + isPermanent bool + nextAttempt time.Time + retryDuration time.Duration +} + +// ConnectionManager monitors that the current active connections satisfy the requirements of +// outgoing, requested and incoming connections +type ConnectionManager struct { + netAdapter *netadapter.NetAdapter + addressManager *addrmgr.AddrManager + + activeRequested map[string]*connectionRequest + pendingRequested map[string]*connectionRequest + activeOutgoing map[string]struct{} + targetOutgoing int + activeIncoming map[string]struct{} + maxIncoming int + + stop uint32 + connectionRequestsLock sync.Mutex +} + +// New instantiates a new instance of a ConnectionManager +func New(netAdapter *netadapter.NetAdapter, addressManager *addrmgr.AddrManager) (*ConnectionManager, error) { + c := &ConnectionManager{ + netAdapter: netAdapter, + addressManager: addressManager, + activeRequested: map[string]*connectionRequest{}, + pendingRequested: map[string]*connectionRequest{}, + activeOutgoing: map[string]struct{}{}, + activeIncoming: map[string]struct{}{}, + } + + cfg := config.ActiveConfig() + connectPeers := cfg.AddPeers + if len(cfg.ConnectPeers) > 0 { + connectPeers = cfg.ConnectPeers + } + + c.maxIncoming = cfg.MaxInboundPeers + c.targetOutgoing = cfg.TargetOutboundPeers + + for _, connectPeer := range connectPeers { + c.pendingRequested[connectPeer] = &connectionRequest{ + address: connectPeer, + isPermanent: true, + } + } + + return c, nil +} + +// Start begins the operation of the ConnectionManager +func (c *ConnectionManager) Start() { + spawn(c.connectionsLoop) +} + +// Stop halts the operation of the ConnectionManager +func (c *ConnectionManager) Stop() { + atomic.StoreUint32(&c.stop, 1) + + for _, connection := range c.netAdapter.Connections() { + _ = c.netAdapter.Disconnect(connection) // Ignore errors since connection might be in the midst of disconnecting + } +} + +func (c *ConnectionManager) initiateConnection(address string) error { + log.Infof("Connecting to %s", address) + return c.netAdapter.Connect(address) +} + +const connectionsLoopInterval = 30 * time.Second + +func (c *ConnectionManager) connectionsLoop() { + for atomic.LoadUint32(&c.stop) == 0 { + connections := c.netAdapter.Connections() + + // We convert the connections list to a set, so that connections can be found quickly + // Then we go over the set, classifying connection by category: requested, outgoing or incoming. + // Every step removes all matching connections so that once we get to checkIncomingConnections - + // the only connections left are the incoming ones + connSet := convertToSet(connections) + + c.checkRequestedConnections(connSet) + + c.checkOutgoingConnections(connSet) + + c.checkIncomingConnections(connSet) + + <-time.Tick(connectionsLoopInterval) + } +} diff --git a/connmanager/incoming_connections.go b/connmanager/incoming_connections.go new file mode 100644 index 000000000..166725caf --- /dev/null +++ b/connmanager/incoming_connections.go @@ -0,0 +1,23 @@ +package connmanager + +// checkIncomingConnections makes sure there's no more than maxIncoming incoming connections +// if there are - it randomly disconnects enough to go below that number +func (c *ConnectionManager) checkIncomingConnections(incomingConnectionSet connectionSet) { + if len(incomingConnectionSet) <= c.maxIncoming { + return + } + + numConnectionsOverMax := len(incomingConnectionSet) - c.maxIncoming + // randomly disconnect nodes until the number of incoming connections is smaller than maxIncoming + for address, connection := range incomingConnectionSet { + err := c.netAdapter.Disconnect(connection) + if err != nil { + log.Errorf("Error disconnecting from %s: %+v", address, err) + } + + numConnectionsOverMax-- + if numConnectionsOverMax == 0 { + break + } + } +} diff --git a/connmanager/log.go b/connmanager/log.go new file mode 100644 index 000000000..326214611 --- /dev/null +++ b/connmanager/log.go @@ -0,0 +1,9 @@ +package connmanager + +import ( + "github.com/kaspanet/kaspad/logger" + "github.com/kaspanet/kaspad/util/panics" +) + +var log, _ = logger.Get(logger.SubsystemTags.CMGR) +var spawn = panics.GoroutineWrapperFunc(log) diff --git a/connmanager/outgoing_connections.go b/connmanager/outgoing_connections.go new file mode 100644 index 000000000..d757ac80c --- /dev/null +++ b/connmanager/outgoing_connections.go @@ -0,0 +1,44 @@ +package connmanager + +// checkOutgoingConnections goes over all activeOutgoing and makes sure they are still active. +// Then it opens connections so that we have targetOutgoing active connections +func (c *ConnectionManager) checkOutgoingConnections(connSet connectionSet) { + for address := range c.activeOutgoing { + connection, ok := connSet.get(address) + if ok { // connection is still connected + connSet.remove(connection) + continue + } + + // if connection is dead - remove from list of active ones + delete(c.activeOutgoing, address) + } + + liveConnections := len(c.activeOutgoing) + if c.targetOutgoing == liveConnections { + return + } + + log.Debugf("Have got %d outgoing connections out of target %d, adding %d more", + liveConnections, c.targetOutgoing, c.targetOutgoing-liveConnections) + + for len(c.activeOutgoing) < c.targetOutgoing { + address := c.addressManager.GetAddress() + if address == nil { + log.Warnf("No more addresses available") + return + } + + netAddress := address.NetAddress() + c.addressManager.Attempt(netAddress) + addressString := netAddress.TCPAddress().String() + err := c.initiateConnection(addressString) + if err != nil { + log.Infof("Couldn't connect to %s: %s", addressString, err) + continue + } + + c.addressManager.Connected(address.NetAddress()) + c.activeOutgoing[address.NetAddress().TCPAddress().String()] = struct{}{} + } +} diff --git a/dnsseed/log.go b/dnsseed/log.go new file mode 100644 index 000000000..9ac00e6d8 --- /dev/null +++ b/dnsseed/log.go @@ -0,0 +1,14 @@ +// Copyright (c) 2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package dnsseed + +import ( + "github.com/kaspanet/kaspad/logger" + "github.com/kaspanet/kaspad/util/panics" +) + +var log, _ = logger.Get(logger.SubsystemTags.CMGR) +var spawn = panics.GoroutineWrapperFunc(log) +var spawnAfter = panics.AfterFuncWrapperFunc(log) diff --git a/dnsseed/seed.go b/dnsseed/seed.go new file mode 100644 index 000000000..2d3f77686 --- /dev/null +++ b/dnsseed/seed.go @@ -0,0 +1,101 @@ +// Copyright (c) 2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package dnsseed + +import ( + "fmt" + "math/rand" + "net" + "strconv" + "time" + + "github.com/kaspanet/kaspad/util/mstime" + + "github.com/kaspanet/kaspad/config" + "github.com/kaspanet/kaspad/util/subnetworkid" + + "github.com/kaspanet/kaspad/dagconfig" + "github.com/kaspanet/kaspad/wire" +) + +const ( + // These constants are used by the DNS seed code to pick a random last + // seen time. + secondsIn3Days int32 = 24 * 60 * 60 * 3 + secondsIn4Days int32 = 24 * 60 * 60 * 4 + + // SubnetworkIDPrefixChar is the prefix of subnetworkID, when building a DNS seed request + SubnetworkIDPrefixChar byte = 'n' + + // ServiceFlagPrefixChar is the prefix of service flag, when building a DNS seed request + ServiceFlagPrefixChar byte = 'x' +) + +// OnSeed is the signature of the callback function which is invoked when DNS +// seeding is successful. +type OnSeed func(addrs []*wire.NetAddress) + +// LookupFunc is the signature of the DNS lookup function. +type LookupFunc func(string) ([]net.IP, error) + +// SeedFromDNS uses DNS seeding to populate the address manager with peers. +func SeedFromDNS(dagParams *dagconfig.Params, reqServices wire.ServiceFlag, includeAllSubnetworks bool, + subnetworkID *subnetworkid.SubnetworkID, lookupFn LookupFunc, seedFn OnSeed) { + + var dnsSeeds []string + cfg := config.ActiveConfig() + if cfg != nil && cfg.DNSSeed != "" { + dnsSeeds = []string{cfg.DNSSeed} + } else { + dnsSeeds = dagParams.DNSSeeds + } + + for _, dnsseed := range dnsSeeds { + var host string + if reqServices == wire.SFNodeNetwork { + host = dnsseed + } else { + host = fmt.Sprintf("%c%x.%s", ServiceFlagPrefixChar, uint64(reqServices), dnsseed) + } + + if !includeAllSubnetworks { + if subnetworkID != nil { + host = fmt.Sprintf("%c%s.%s", SubnetworkIDPrefixChar, subnetworkID, host) + } else { + host = fmt.Sprintf("%c.%s", SubnetworkIDPrefixChar, host) + } + } + + spawn(func() { + randSource := rand.New(rand.NewSource(time.Now().UnixNano())) + + seedPeers, err := lookupFn(host) + if err != nil { + log.Infof("DNS discovery failed on seed %s: %s", host, err) + return + } + numPeers := len(seedPeers) + + log.Infof("%d addresses found from DNS seed %s", numPeers, host) + + if numPeers == 0 { + return + } + addresses := make([]*wire.NetAddress, len(seedPeers)) + // if this errors then we have *real* problems + intPort, _ := strconv.Atoi(dagParams.DefaultPort) + for i, peer := range seedPeers { + addresses[i] = wire.NewNetAddressTimestamp( + // seed with addresses from a time randomly selected + // between 3 and 7 days ago. + mstime.Now().Add(-1*time.Second*time.Duration(secondsIn3Days+ + randSource.Int31n(secondsIn4Days))), + 0, peer, uint16(intPort)) + } + + seedFn(addresses) + }) + } +} diff --git a/kaspad.go b/kaspad.go index 78cf879a6..baea5f2bb 100644 --- a/kaspad.go +++ b/kaspad.go @@ -2,9 +2,17 @@ package main import ( "fmt" + "sync/atomic" + + "github.com/kaspanet/kaspad/dnsseed" + "github.com/kaspanet/kaspad/wire" + + "github.com/kaspanet/kaspad/connmanager" + "github.com/kaspanet/kaspad/addrmgr" "github.com/kaspanet/kaspad/server/serverutils" - "sync/atomic" + + "github.com/kaspanet/kaspad/netadapter" "github.com/kaspanet/kaspad/util/panics" @@ -22,8 +30,10 @@ import ( // kaspad is a wrapper for all the kaspad services type kaspad struct { - rpcServer *rpc.Server - protocolManager *protocol.Manager + rpcServer *rpc.Server + addressManager *addrmgr.AddrManager + networkAdapter *netadapter.NetAdapter + connectionManager *connmanager.ConnectionManager started, shutdown int32 } @@ -39,16 +49,32 @@ func (s *kaspad) start() { cfg := config.ActiveConfig() - err := s.protocolManager.Start() + err := s.networkAdapter.Start() if err != nil { panics.Exit(log, fmt.Sprintf("Error starting the p2p protocol: %+v", err)) } + maybeSeedFromDNS(cfg, s.addressManager) + + s.connectionManager.Start() + if !cfg.DisableRPC { s.rpcServer.Start() } } +func maybeSeedFromDNS(cfg *config.Config, addressManager *addrmgr.AddrManager) { + if !cfg.DisableDNSSeed { + dnsseed.SeedFromDNS(cfg.NetParams(), wire.SFNodeNetwork, false, nil, + config.ActiveConfig().Lookup, func(addresses []*wire.NetAddress) { + // Kaspad uses a lookup of the dns seeder here. Since seeder returns + // IPs of nodes and not its own IP, we can not know real IP of + // source. So we'll take first returned address as source. + addressManager.AddAddresses(addresses, addresses[0], nil) + }) + } +} + // stop gracefully shuts down all the kaspad services. func (s *kaspad) stop() error { // Make sure this only happens once. @@ -59,7 +85,9 @@ func (s *kaspad) stop() error { log.Warnf("Kaspad shutting down") - err := s.protocolManager.Stop() + s.connectionManager.Stop() + + err := s.networkAdapter.Stop() if err != nil { log.Errorf("Error stopping the p2p protocol: %+v", err) } @@ -78,10 +106,12 @@ func (s *kaspad) stop() error { // newKaspad returns a new kaspad instance configured to listen on addr for the // kaspa network type specified by dagParams. Use start to begin accepting // connections from peers. -func newKaspad(listenAddrs []string, interrupt <-chan struct{}) (*kaspad, error) { +func newKaspad(interrupt <-chan struct{}) (*kaspad, error) { + cfg := config.ActiveConfig() + indexManager, acceptanceIndex := setupIndexes() - sigCache := txscript.NewSigCache(config.ActiveConfig().SigCacheMaxSize) + sigCache := txscript.NewSigCache(cfg.SigCacheMaxSize) // Create a new block DAG instance with the appropriate configuration. dag, err := setupDAG(interrupt, sigCache, indexManager) @@ -91,8 +121,15 @@ func newKaspad(listenAddrs []string, interrupt <-chan struct{}) (*kaspad, error) txMempool := setupMempool(dag, sigCache) + netAdapter, err := netadapter.NewNetAdapter(cfg.Listeners) + if err != nil { + return nil, err + } addressManager := addrmgr.New(serverutils.KaspadLookup, config.ActiveConfig().SubnetworkID) - protocolManager, err := protocol.NewManager(listenAddrs, dag, addressManager) + + protocol.Init(netAdapter, addressManager, dag) + + connectionManager, err := connmanager.New(netAdapter, addressManager) if err != nil { return nil, err } @@ -103,8 +140,9 @@ func newKaspad(listenAddrs []string, interrupt <-chan struct{}) (*kaspad, error) } return &kaspad{ - rpcServer: rpcServer, - protocolManager: protocolManager, + rpcServer: rpcServer, + networkAdapter: netAdapter, + connectionManager: connectionManager, }, nil } diff --git a/logger/logger.go b/logger/logger.go index 461698253..6b6cd0b4e 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -54,6 +54,7 @@ var ( blkrLog = BackendLog.Logger("BLKR") gbrlLog = BackendLog.Logger("GBRL") blprLog = BackendLog.Logger("BLPR") + dnssLog = BackendLog.Logger("DNSS") snvrLog = BackendLog.Logger("SNVR") ) @@ -85,6 +86,7 @@ var SubsystemTags = struct { NTAR, GBRL, BLPR, + DNSS, SNVR string }{ ADXR: "ADXR", @@ -113,6 +115,7 @@ var SubsystemTags = struct { GBRL: "GBRL", NTAR: "NTAR", BLPR: "BLPR", + DNSS: "DNSS", SNVR: "SNVR", } @@ -144,6 +147,7 @@ var subsystemLoggers = map[string]*logs.Logger{ SubsystemTags.GBRL: gbrlLog, SubsystemTags.NTAR: ntarLog, SubsystemTags.BLPR: blprLog, + SubsystemTags.DNSS: dnssLog, SubsystemTags.SNVR: snvrLog, } diff --git a/main.go b/main.go index b67d600f4..a2996eaff 100644 --- a/main.go +++ b/main.go @@ -11,7 +11,6 @@ import ( "path/filepath" "runtime" "runtime/pprof" - "strings" "time" "github.com/kaspanet/kaspad/dbaccess" @@ -32,10 +31,6 @@ const ( blockDbNamePrefix = "blocks" ) -var ( - cfg *config.Config -) - // winServiceMain is only invoked on Windows. It detects when kaspad is running // as a service and reacts accordingly. var winServiceMain func() (bool, error) @@ -52,7 +47,7 @@ func kaspadMain(startedChan chan<- struct{}) error { if err != nil { return err } - cfg = config.ActiveConfig() + cfg := config.ActiveConfig() defer panics.HandlePanic(log, nil) // Get a channel that will be closed when a shutdown signal has been @@ -131,10 +126,9 @@ func kaspadMain(startedChan chan<- struct{}) error { } // Create kaspad and start it. - kaspad, err := newKaspad(cfg.Listeners, interrupt) + kaspad, err := newKaspad(interrupt) if err != nil { - log.Errorf("Unable to start kaspad on %s: %+v", - strings.Join(cfg.Listeners, ", "), err) + log.Errorf("Unable to start kaspad: %+v", err) return err } defer func() { @@ -169,37 +163,12 @@ func kaspadMain(startedChan chan<- struct{}) error { } func removeDatabase() error { - dbPath := blockDbPath(cfg.DbType) + dbPath := blockDbPath(config.ActiveConfig().DbType) return os.RemoveAll(dbPath) } // removeRegressionDB removes the existing regression test database if running // in regression test mode and it already exists. -func removeRegressionDB(dbPath string) error { - // Don't do anything if not in regression test mode. - if !cfg.RegressionTest { - return nil - } - - // Remove the old regression test database if it already exists. - fi, err := os.Stat(dbPath) - if err == nil { - log.Infof("Removing regression test database from '%s'", dbPath) - if fi.IsDir() { - err := os.RemoveAll(dbPath) - if err != nil { - return err - } - } else { - err := os.Remove(dbPath) - if err != nil { - return err - } - } - } - - return nil -} // dbPath returns the path to the block database given a database type. func blockDbPath(dbType string) string { @@ -208,12 +177,12 @@ func blockDbPath(dbType string) string { if dbType == "sqlite" { dbName = dbName + ".db" } - dbPath := filepath.Join(cfg.DataDir, dbName) + dbPath := filepath.Join(config.ActiveConfig().DataDir, dbName) return dbPath } func openDB() error { - dbPath := filepath.Join(cfg.DataDir, "db") + dbPath := filepath.Join(config.ActiveConfig().DataDir, "db") log.Infof("Loading database from '%s'", dbPath) err := dbaccess.Open(dbPath) if err != nil { diff --git a/netadapter/netadapter.go b/netadapter/netadapter.go index f17da497c..bb649e998 100644 --- a/netadapter/netadapter.go +++ b/netadapter/netadapter.go @@ -1,14 +1,14 @@ package netadapter import ( - "github.com/kaspanet/kaspad/netadapter/id" - routerpkg "github.com/kaspanet/kaspad/netadapter/router" "net" "strconv" "sync" "sync/atomic" "github.com/kaspanet/kaspad/config" + "github.com/kaspanet/kaspad/netadapter/id" + routerpkg "github.com/kaspanet/kaspad/netadapter/router" "github.com/kaspanet/kaspad/netadapter/server" "github.com/kaspanet/kaspad/netadapter/server/grpcserver" "github.com/kaspanet/kaspad/wire" @@ -30,8 +30,8 @@ type NetAdapter struct { routerInitializer RouterInitializer stop uint32 - routersToConnections map[*routerpkg.Router]server.Connection - connectionsToIDs map[server.Connection]*id.ID + routersToConnections map[*routerpkg.Router]*NetConnection + connectionsToIDs map[*NetConnection]*id.ID idsToRouters map[*id.ID]*routerpkg.Router sync.RWMutex } @@ -51,13 +51,12 @@ func NewNetAdapter(listeningAddrs []string) (*NetAdapter, error) { id: netAdapterID, server: s, - routersToConnections: make(map[*routerpkg.Router]server.Connection), - connectionsToIDs: make(map[server.Connection]*id.ID), + routersToConnections: make(map[*routerpkg.Router]*NetConnection), + connectionsToIDs: make(map[*NetConnection]*id.ID), idsToRouters: make(map[*id.ID]*routerpkg.Router), } - onConnectedHandler := adapter.newOnConnectedHandler() - adapter.server.SetOnConnectedHandler(onConnectedHandler) + adapter.server.SetOnConnectedHandler(adapter.onConnectedHandler) return &adapter, nil } @@ -69,15 +68,6 @@ func (na *NetAdapter) Start() error { return err } - // TODO(libp2p): Replace with real connection manager - cfg := config.ActiveConfig() - for _, connectPeer := range cfg.ConnectPeers { - _, err := na.server.Connect(connectPeer) - if err != nil { - log.Errorf("Error connecting to %s: %+v", connectPeer, err) - } - } - return nil } @@ -89,55 +79,76 @@ func (na *NetAdapter) Stop() error { return na.server.Stop() } -func (na *NetAdapter) newOnConnectedHandler() server.OnConnectedHandler { - return func(connection server.Connection) error { - router, err := na.routerInitializer() - if err != nil { - return err - } - connection.Start(router) - na.routersToConnections[router] = connection +// Connect tells the NetAdapter's underlying server to initiate a connection +// to the given address +func (na *NetAdapter) Connect(address string) error { + _, err := na.server.Connect(address) + return err +} - router.SetOnRouteCapacityReachedHandler(func() { - err := connection.Disconnect() - if err != nil { - if !errors.Is(err, server.ErrNetwork) { - panic(err) - } - log.Warnf("Failed to disconnect from %s", connection) - } - }) - connection.SetOnDisconnectedHandler(func() error { - na.cleanupConnection(connection, router) - na.server.RemoveConnection(connection) - return router.Close() - }) - na.server.AddConnection(connection) - return nil +// Connections returns a list of connections currently connected and active +func (na *NetAdapter) Connections() []*NetConnection { + netConnections := make([]*NetConnection, 0, len(na.connectionsToIDs)) + + for netConnection := range na.connectionsToIDs { + netConnections = append(netConnections, netConnection) } + + return netConnections +} + +func (na *NetAdapter) onConnectedHandler(connection server.Connection) error { + router, err := na.routerInitializer() + if err != nil { + return err + } + connection.Start(router) + + netConnection := newNetConnection(connection, nil) + + na.routersToConnections[router] = netConnection + + na.connectionsToIDs[netConnection] = nil + + router.SetOnRouteCapacityReachedHandler(func() { + err := connection.Disconnect() + if err != nil { + if !errors.Is(err, server.ErrNetwork) { + panic(err) + } + log.Warnf("Failed to disconnect from %s", connection) + } + }) + connection.SetOnDisconnectedHandler(func() error { + na.cleanupConnection(netConnection, router) + return router.Close() + }) + return nil } // AssociateRouterID associates the connection for the given router // with the given ID func (na *NetAdapter) AssociateRouterID(router *routerpkg.Router, id *id.ID) error { - connection, ok := na.routersToConnections[router] + netConnection, ok := na.routersToConnections[router] if !ok { return errors.Errorf("router not registered for id %s", id) } - na.connectionsToIDs[connection] = id + netConnection.id = id + + na.connectionsToIDs[netConnection] = id na.idsToRouters[id] = router return nil } -func (na *NetAdapter) cleanupConnection(connection server.Connection, router *routerpkg.Router) { - connectionID, ok := na.connectionsToIDs[connection] +func (na *NetAdapter) cleanupConnection(netConnection *NetConnection, router *routerpkg.Router) { + connectionID, ok := na.connectionsToIDs[netConnection] if !ok { return } delete(na.routersToConnections, router) - delete(na.connectionsToIDs, connection) + delete(na.connectionsToIDs, netConnection) delete(na.idsToRouters, connectionID) } @@ -228,13 +239,18 @@ func (na *NetAdapter) GetBestLocalAddress() (*wire.NetAddress, error) { // DisconnectAssociatedConnection disconnects from the connection associated with the given router. func (na *NetAdapter) DisconnectAssociatedConnection(router *routerpkg.Router) error { - connection := na.routersToConnections[router] - err := connection.Disconnect() + netConnection := na.routersToConnections[router] + return na.Disconnect(netConnection) +} + +// Disconnect disconnects the given connection +func (na *NetAdapter) Disconnect(netConnection *NetConnection) error { + err := netConnection.connection.Disconnect() if err != nil { if !errors.Is(err, server.ErrNetwork) { return err } - log.Warnf("Error disconnecting from %s: %s", connection, err) + log.Warnf("Error disconnecting from %s: %s", netConnection, err) } return nil } diff --git a/netadapter/netconnection.go b/netadapter/netconnection.go new file mode 100644 index 000000000..fae5ce6af --- /dev/null +++ b/netadapter/netconnection.go @@ -0,0 +1,35 @@ +package netadapter + +import ( + "fmt" + + "github.com/kaspanet/kaspad/netadapter/id" + "github.com/kaspanet/kaspad/netadapter/server" +) + +// NetConnection is a wrapper to a server connection for use by services external to NetAdapter +type NetConnection struct { + connection server.Connection + id *id.ID +} + +func newNetConnection(connection server.Connection, id *id.ID) *NetConnection { + return &NetConnection{ + connection: connection, + id: id, + } +} + +func (c *NetConnection) String() string { + return fmt.Sprintf("<%s: %s>", c.id, c.connection) +} + +// ID returns the ID associated with this connection +func (c *NetConnection) ID() *id.ID { + return c.id +} + +// Address returns the address associated with this connection +func (c *NetConnection) Address() string { + return c.connection.Address().String() +} diff --git a/netadapter/server/grpcserver/connection_loops.go b/netadapter/server/grpcserver/connection_loops.go index d32f16671..db0c3afeb 100644 --- a/netadapter/server/grpcserver/connection_loops.go +++ b/netadapter/server/grpcserver/connection_loops.go @@ -37,18 +37,8 @@ func (c *gRPCConnection) sendLoop() error { if err != nil { return err } - err = func() error { - c.writeToErrChanDuringDisconnectLock.Lock() - defer c.writeToErrChanDuringDisconnectLock.Unlock() - err := c.stream.Send(messageProto) - if c.IsConnected() { - c.errChan <- err - if err != nil { - return err - } - } - return nil - }() + + err = c.stream.Send(messageProto) if err != nil { return err } diff --git a/netadapter/server/grpcserver/grpc_connection.go b/netadapter/server/grpcserver/grpc_connection.go index 039b00a7b..a3345504b 100644 --- a/netadapter/server/grpcserver/grpc_connection.go +++ b/netadapter/server/grpcserver/grpc_connection.go @@ -1,11 +1,11 @@ package grpcserver import ( + "net" + "sync/atomic" + "github.com/kaspanet/kaspad/netadapter/router" "github.com/kaspanet/kaspad/netadapter/server/grpcserver/protowire" - "net" - "sync" - "sync/atomic" "github.com/kaspanet/kaspad/netadapter/server" "google.golang.org/grpc" @@ -18,11 +18,9 @@ type gRPCConnection struct { stream grpcStream router *router.Router - writeToErrChanDuringDisconnectLock sync.Mutex - errChan chan error - stopChan chan struct{} - clientConn grpc.ClientConn - onDisconnectedHandler server.OnDisconnectedHandler + stopChan chan struct{} + clientConn grpc.ClientConn + onDisconnectedHandler server.OnDisconnectedHandler isConnected uint32 } @@ -33,7 +31,6 @@ func newConnection(server *gRPCServer, address net.Addr, isOutbound bool, stream address: address, isOutbound: isOutbound, stream: stream, - errChan: make(chan error), stopChan: make(chan struct{}), isConnected: 1, } @@ -74,9 +71,6 @@ func (c *gRPCConnection) Disconnect() error { } atomic.StoreUint32(&c.isConnected, 0) - c.writeToErrChanDuringDisconnectLock.Lock() - defer c.writeToErrChanDuringDisconnectLock.Unlock() - close(c.errChan) close(c.stopChan) if c.isOutbound { @@ -84,6 +78,8 @@ func (c *gRPCConnection) Disconnect() error { _ = clientStream.CloseSend() // ignore error because we don't really know what's the status of the connection } + log.Debugf("Disconnected from %s", c) + return c.onDisconnectedHandler() } diff --git a/netadapter/server/grpcserver/grpc_server.go b/netadapter/server/grpcserver/grpc_server.go index 2dd7e7954..0b0c6d806 100644 --- a/netadapter/server/grpcserver/grpc_server.go +++ b/netadapter/server/grpcserver/grpc_server.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "time" "google.golang.org/grpc/peer" @@ -16,7 +17,6 @@ import ( type gRPCServer struct { onConnectedHandler server.OnConnectedHandler - connections map[string]*gRPCConnection listeningAddrs []string server *grpc.Server } @@ -27,7 +27,6 @@ func NewGRPCServer(listeningAddrs []string) (server.Server, error) { s := &gRPCServer{ server: grpc.NewServer(), listeningAddrs: listeningAddrs, - connections: map[string]*gRPCConnection{}, } protowire.RegisterP2PServer(s.server, newP2PServer(s)) @@ -63,12 +62,6 @@ func (s *gRPCServer) listenOn(listenAddr string) error { } func (s *gRPCServer) Stop() error { - for _, connection := range s.connections { - err := connection.Disconnect() - if err != nil { - log.Errorf("error closing connection to %s: %+v", connection, err) - } - } s.server.GracefulStop() return nil } @@ -83,10 +76,16 @@ func (s *gRPCServer) SetOnConnectedHandler(onConnectedHandler server.OnConnected // This is part of the Server interface func (s *gRPCServer) Connect(address string) (server.Connection, error) { log.Infof("Dialing to %s", address) - gRPCConnection, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock()) + + const dialTimeout = 30 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) + defer cancel() + + gRPCConnection, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { return nil, errors.Wrapf(err, "error connecting to %s", address) } + client := protowire.NewP2PClient(gRPCConnection) stream, err := client.MessageStream(context.Background()) if err != nil { @@ -109,31 +108,3 @@ func (s *gRPCServer) Connect(address string) (server.Connection, error) { return connection, nil } - -// Connections returns a slice of connections the server -// is currently connected to. -// This is part of the Server interface -func (s *gRPCServer) Connections() []server.Connection { - result := make([]server.Connection, 0, len(s.connections)) - - for _, conn := range s.connections { - result = append(result, conn) - } - - return result -} - -// AddConnection adds the provided connection to the connection list -func (s *gRPCServer) AddConnection(connection server.Connection) error { - conn := connection.(*gRPCConnection) - s.connections[conn.String()] = conn - - return nil -} - -// RemoveConnection removes the provided connection from the connection list -func (s *gRPCServer) RemoveConnection(connection server.Connection) error { - delete(s.connections, connection.String()) - - return nil -} diff --git a/netadapter/server/server.go b/netadapter/server/server.go index 831e61808..a683a170b 100644 --- a/netadapter/server/server.go +++ b/netadapter/server/server.go @@ -2,9 +2,11 @@ package server import ( "fmt" - "github.com/kaspanet/kaspad/netadapter/router" - "github.com/pkg/errors" "net" + + "github.com/pkg/errors" + + "github.com/kaspanet/kaspad/netadapter/router" ) // OnConnectedHandler is a function that is to be called @@ -18,13 +20,9 @@ type OnDisconnectedHandler func() error // Server represents a p2p server. type Server interface { Connect(address string) (Connection, error) - Connections() []Connection Start() error Stop() error SetOnConnectedHandler(onConnectedHandler OnConnectedHandler) - // TODO(libp2p): Move AddConnection and RemoveConnection to connection manager - AddConnection(connection Connection) error - RemoveConnection(connection Connection) error } // Connection represents a p2p server connection. diff --git a/protocol/handshake.go b/protocol/handshake.go index bf9a55fcc..97539f565 100644 --- a/protocol/handshake.go +++ b/protocol/handshake.go @@ -1,6 +1,9 @@ package protocol import ( + "sync" + "sync/atomic" + "github.com/kaspanet/kaspad/addrmgr" "github.com/kaspanet/kaspad/blockdag" "github.com/kaspanet/kaspad/netadapter" @@ -11,8 +14,6 @@ import ( "github.com/kaspanet/kaspad/util/locks" "github.com/kaspanet/kaspad/wire" "github.com/pkg/errors" - "sync" - "sync/atomic" ) func handshake(router *routerpkg.Router, netAdapter *netadapter.NetAdapter, peer *peerpkg.Peer, @@ -100,6 +101,7 @@ func handshake(router *routerpkg.Router, netAdapter *netadapter.NetAdapter, peer panic(err) } addressManager.AddAddress(peerAddress, peerAddress, subnetworkID) + addressManager.Good(peerAddress, subnetworkID) } err = router.RemoveRoute([]wire.MessageCommand{wire.CmdVersion, wire.CmdVerAck}) diff --git a/protocol/peer/peer.go b/protocol/peer/peer.go index 8173ed157..82e17719d 100644 --- a/protocol/peer/peer.go +++ b/protocol/peer/peer.go @@ -1,15 +1,16 @@ package peer import ( + "sync" + "sync/atomic" + "time" + "github.com/kaspanet/kaspad/netadapter/id" "github.com/kaspanet/kaspad/util/daghash" mathUtil "github.com/kaspanet/kaspad/util/math" "github.com/kaspanet/kaspad/util/subnetworkid" "github.com/kaspanet/kaspad/wire" "github.com/pkg/errors" - "sync" - "sync/atomic" - "time" ) // Peer holds data about a peer. @@ -85,7 +86,7 @@ func (p *Peer) UpdateFieldsFromMsgVersion(msg *wire.MsgVersion) { p.advertisedProtocolVer = msg.ProtocolVersion p.protocolVersion = mathUtil.MinUint32(p.protocolVersion, p.advertisedProtocolVer) log.Debugf("Negotiated protocol version %d for peer %s", - p.protocolVersion, p) + p.protocolVersion, p.id) // Set the peer's ID. p.id = msg.ID diff --git a/protocol/protocol.go b/protocol/protocol.go index bfdac2f6c..ddea7f12f 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -1,6 +1,9 @@ package protocol import ( + "errors" + "sync/atomic" + "github.com/kaspanet/kaspad/addrmgr" "github.com/kaspanet/kaspad/blockdag" "github.com/kaspanet/kaspad/netadapter" @@ -13,41 +16,12 @@ import ( "github.com/kaspanet/kaspad/protocol/receiveaddresses" "github.com/kaspanet/kaspad/protocol/sendaddresses" "github.com/kaspanet/kaspad/wire" - "github.com/pkg/errors" - "sync/atomic" ) -// Manager manages the p2p protocol -type Manager struct { - netAdapter *netadapter.NetAdapter -} - -// NewManager creates a new instance of the p2p protocol manager -func NewManager(listeningAddresses []string, dag *blockdag.BlockDAG, - addressManager *addrmgr.AddrManager) (*Manager, error) { - - netAdapter, err := netadapter.NewNetAdapter(listeningAddresses) - if err != nil { - return nil, err - } - +// Init initializes the p2p protocol +func Init(netAdapter *netadapter.NetAdapter, addressManager *addrmgr.AddrManager, dag *blockdag.BlockDAG) { routerInitializer := newRouterInitializer(netAdapter, addressManager, dag) netAdapter.SetRouterInitializer(routerInitializer) - - manager := Manager{ - netAdapter: netAdapter, - } - return &manager, nil -} - -// Start starts the p2p protocol -func (p *Manager) Start() error { - return p.netAdapter.Start() -} - -// Stop stops the p2p protocol -func (p *Manager) Stop() error { - return p.netAdapter.Stop() } func newRouterInitializer(netAdapter *netadapter.NetAdapter, diff --git a/wire/msgversion.go b/wire/msgversion.go index fdfb3ba1d..c5b3accdc 100644 --- a/wire/msgversion.go +++ b/wire/msgversion.go @@ -5,14 +5,13 @@ package wire import ( - "bytes" "fmt" + "io" + "strings" + "github.com/kaspanet/kaspad/netadapter/id" "github.com/kaspanet/kaspad/util/mstime" "github.com/kaspanet/kaspad/version" - "github.com/pkg/errors" - "io" - "strings" "github.com/kaspanet/kaspad/util/daghash" "github.com/kaspanet/kaspad/util/subnetworkid" @@ -82,13 +81,7 @@ func (msg *MsgVersion) AddService(service ServiceFlag) { // // This is part of the Message interface implementation. func (msg *MsgVersion) KaspaDecode(r io.Reader, pver uint32) error { - buf, ok := r.(*bytes.Buffer) - if !ok { - return errors.Errorf("MsgVersion.KaspaDecode reader is not a " + - "*bytes.Buffer") - } - - err := readElements(buf, &msg.ProtocolVersion, &msg.Services, + err := readElements(r, &msg.ProtocolVersion, &msg.Services, (*int64Time)(&msg.Timestamp)) if err != nil { return err @@ -119,18 +112,18 @@ func (msg *MsgVersion) KaspaDecode(r io.Reader, pver uint32) error { if hasAddress { msg.Address = new(NetAddress) - err = readNetAddress(buf, pver, msg.Address, false) + err = readNetAddress(r, pver, msg.Address, false) if err != nil { return err } } msg.ID = new(id.ID) - err = ReadElement(buf, msg.ID) + err = ReadElement(r, msg.ID) if err != nil { return err } - userAgent, err := ReadVarString(buf, pver) + userAgent, err := ReadVarString(r, pver) if err != nil { return err } @@ -141,7 +134,7 @@ func (msg *MsgVersion) KaspaDecode(r io.Reader, pver uint32) error { msg.UserAgent = userAgent msg.SelectedTipHash = &daghash.Hash{} - err = ReadElement(buf, msg.SelectedTipHash) + err = ReadElement(r, msg.SelectedTipHash) if err != nil { return err }