mirror of
https://github.com/kaspanet/kaspad.git
synced 2026-02-26 13:15:47 +00:00
Compare commits
6 Commits
v0.6.6-dev
...
big-capaci
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
225291d451 | ||
|
|
a32a9011c7 | ||
|
|
5da957f16e | ||
|
|
505d264603 | ||
|
|
883361fea3 | ||
|
|
13a6872a45 |
@@ -1,10 +1,10 @@
|
||||
package ping
|
||||
|
||||
import (
|
||||
"github.com/kaspanet/kaspad/app/protocol/common"
|
||||
"time"
|
||||
|
||||
"github.com/kaspanet/kaspad/app/appmessage"
|
||||
"github.com/kaspanet/kaspad/app/protocol/common"
|
||||
peerpkg "github.com/kaspanet/kaspad/app/protocol/peer"
|
||||
"github.com/kaspanet/kaspad/app/protocol/protocolerrors"
|
||||
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
|
||||
|
||||
@@ -268,6 +268,9 @@ func (rtn *reachabilityTreeNode) addChild(child *reachabilityTreeNode, reindexRo
|
||||
rtn.children = append(rtn.children, child)
|
||||
child.parent = rtn
|
||||
|
||||
modifiedNodes[rtn] = struct{}{}
|
||||
modifiedNodes[child] = struct{}{}
|
||||
|
||||
// Temporarily set the child's interval to be empty, at
|
||||
// the start of rtn's remaining interval. This is done
|
||||
// so that child-of-rtn checks (e.g.
|
||||
@@ -312,8 +315,6 @@ func (rtn *reachabilityTreeNode) addChild(child *reachabilityTreeNode, reindexRo
|
||||
return err
|
||||
}
|
||||
child.interval = allocated
|
||||
modifiedNodes[rtn] = struct{}{}
|
||||
modifiedNodes[child] = struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -45,10 +45,6 @@ func newNetConnection(connection server.Connection, routerInitializer RouterInit
|
||||
netConnection.invalidMessageChan <- err
|
||||
})
|
||||
|
||||
router.SetOnRouteCapacityReachedHandler(func() {
|
||||
netConnection.Disconnect()
|
||||
})
|
||||
|
||||
routerInitializer(router, netConnection)
|
||||
|
||||
return netConnection
|
||||
|
||||
@@ -21,11 +21,10 @@ var (
|
||||
|
||||
// ErrRouteClosed indicates that a route was closed while reading/writing.
|
||||
ErrRouteClosed = errors.New("route is closed")
|
||||
)
|
||||
|
||||
// onCapacityReachedHandler is a function that is to be
|
||||
// called when a route reaches capacity.
|
||||
type onCapacityReachedHandler func()
|
||||
// ErrRouteCapacityReached indicates that route's capacity has reached
|
||||
ErrRouteCapacityReached = protocolerrors.New(false, "route capacity has reached")
|
||||
)
|
||||
|
||||
// Route represents an incoming or outgoing Router route
|
||||
type Route struct {
|
||||
@@ -34,8 +33,7 @@ type Route struct {
|
||||
// reads use the channel's built-in mechanism to check if the channel is closed
|
||||
closed bool
|
||||
closeLock sync.Mutex
|
||||
|
||||
onCapacityReachedHandler onCapacityReachedHandler
|
||||
capacity int
|
||||
}
|
||||
|
||||
// NewRoute create a new Route
|
||||
@@ -45,8 +43,9 @@ func NewRoute() *Route {
|
||||
|
||||
func newRouteWithCapacity(capacity int) *Route {
|
||||
return &Route{
|
||||
channel: make(chan appmessage.Message, capacity),
|
||||
closed: false,
|
||||
channel: make(chan appmessage.Message, capacity),
|
||||
closed: false,
|
||||
capacity: capacity,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,8 +57,8 @@ func (r *Route) Enqueue(message appmessage.Message) error {
|
||||
if r.closed {
|
||||
return errors.WithStack(ErrRouteClosed)
|
||||
}
|
||||
if len(r.channel) == DefaultMaxMessages {
|
||||
r.onCapacityReachedHandler()
|
||||
if len(r.channel) == r.capacity {
|
||||
return errors.Wrapf(ErrRouteCapacityReached, "reached capacity of %d", r.capacity)
|
||||
}
|
||||
r.channel <- message
|
||||
return nil
|
||||
@@ -88,10 +87,6 @@ func (r *Route) DequeueWithTimeout(timeout time.Duration) (appmessage.Message, e
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Route) setOnCapacityReachedHandler(onCapacityReachedHandler onCapacityReachedHandler) {
|
||||
r.onCapacityReachedHandler = onCapacityReachedHandler
|
||||
}
|
||||
|
||||
// Close closes this route
|
||||
func (r *Route) Close() {
|
||||
r.closeLock.Lock()
|
||||
|
||||
@@ -20,8 +20,6 @@ type Router struct {
|
||||
incomingRoutesLock sync.RWMutex
|
||||
|
||||
outgoingRoute *Route
|
||||
|
||||
onRouteCapacityReachedHandler OnRouteCapacityReachedHandler
|
||||
}
|
||||
|
||||
// NewRouter creates a new empty router
|
||||
@@ -30,18 +28,9 @@ func NewRouter() *Router {
|
||||
incomingRoutes: make(map[appmessage.MessageCommand]*Route),
|
||||
outgoingRoute: newRouteWithCapacity(outgoingRouteMaxMessages),
|
||||
}
|
||||
router.outgoingRoute.setOnCapacityReachedHandler(func() {
|
||||
router.onRouteCapacityReachedHandler()
|
||||
})
|
||||
return &router
|
||||
}
|
||||
|
||||
// SetOnRouteCapacityReachedHandler sets the onRouteCapacityReachedHandler
|
||||
// function for this router
|
||||
func (r *Router) SetOnRouteCapacityReachedHandler(onRouteCapacityReachedHandler OnRouteCapacityReachedHandler) {
|
||||
r.onRouteCapacityReachedHandler = onRouteCapacityReachedHandler
|
||||
}
|
||||
|
||||
// AddIncomingRoute registers the messages of types `messageTypes` to
|
||||
// be routed to the given `route`
|
||||
func (r *Router) AddIncomingRoute(messageTypes []appmessage.MessageCommand) (*Route, error) {
|
||||
@@ -52,9 +41,6 @@ func (r *Router) AddIncomingRoute(messageTypes []appmessage.MessageCommand) (*Ro
|
||||
}
|
||||
r.setIncomingRoute(messageType, route)
|
||||
}
|
||||
route.setOnCapacityReachedHandler(func() {
|
||||
r.onRouteCapacityReachedHandler()
|
||||
})
|
||||
return route, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -13,11 +13,11 @@ import (
|
||||
)
|
||||
|
||||
type gRPCConnection struct {
|
||||
server *gRPCServer
|
||||
address *net.TCPAddr
|
||||
isOutbound bool
|
||||
stream grpcStream
|
||||
router *router.Router
|
||||
server *gRPCServer
|
||||
address *net.TCPAddr
|
||||
stream grpcStream
|
||||
router *router.Router
|
||||
lowLevelClientConnection *grpc.ClientConn
|
||||
|
||||
// streamLock protects concurrent access to stream.
|
||||
// Note that it's an RWMutex. Despite what the name
|
||||
@@ -34,14 +34,16 @@ type gRPCConnection struct {
|
||||
isConnected uint32
|
||||
}
|
||||
|
||||
func newConnection(server *gRPCServer, address *net.TCPAddr, isOutbound bool, stream grpcStream) *gRPCConnection {
|
||||
func newConnection(server *gRPCServer, address *net.TCPAddr, stream grpcStream,
|
||||
lowLevelClientConnection *grpc.ClientConn) *gRPCConnection {
|
||||
|
||||
connection := &gRPCConnection{
|
||||
server: server,
|
||||
address: address,
|
||||
isOutbound: isOutbound,
|
||||
stream: stream,
|
||||
stopChan: make(chan struct{}),
|
||||
isConnected: 1,
|
||||
server: server,
|
||||
address: address,
|
||||
stream: stream,
|
||||
stopChan: make(chan struct{}),
|
||||
isConnected: 1,
|
||||
lowLevelClientConnection: lowLevelClientConnection,
|
||||
}
|
||||
|
||||
return connection
|
||||
@@ -83,7 +85,7 @@ func (c *gRPCConnection) SetOnInvalidMessageHandler(onInvalidMessageHandler serv
|
||||
}
|
||||
|
||||
func (c *gRPCConnection) IsOutbound() bool {
|
||||
return c.isOutbound
|
||||
return c.lowLevelClientConnection != nil
|
||||
}
|
||||
|
||||
// Disconnect disconnects the connection
|
||||
@@ -98,7 +100,7 @@ func (c *gRPCConnection) Disconnect() {
|
||||
|
||||
close(c.stopChan)
|
||||
|
||||
if c.isOutbound {
|
||||
if c.IsOutbound() {
|
||||
c.closeSend()
|
||||
log.Debugf("Disconnected from %s", c)
|
||||
}
|
||||
@@ -138,5 +140,8 @@ func (c *gRPCConnection) closeSend() {
|
||||
defer c.streamLock.Unlock()
|
||||
|
||||
clientStream := c.stream.(protowire.P2P_MessageStreamClient)
|
||||
_ = clientStream.CloseSend() // ignore error because we don't really know what's the status of the connection
|
||||
|
||||
// ignore error because we don't really know what's the status of the connection
|
||||
_ = clientStream.CloseSend()
|
||||
_ = c.lowLevelClientConnection.Close()
|
||||
}
|
||||
|
||||
@@ -90,12 +90,12 @@ func (s *gRPCServer) Connect(address string) (server.Connection, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
|
||||
defer cancel()
|
||||
|
||||
gRPCConnection, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBlock())
|
||||
gRPCClientConnection, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBlock())
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error connecting to %s", address)
|
||||
}
|
||||
|
||||
client := protowire.NewP2PClient(gRPCConnection)
|
||||
client := protowire.NewP2PClient(gRPCClientConnection)
|
||||
stream, err := client.MessageStream(context.Background(), grpc.UseCompressor(gzip.Name))
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "error getting client stream for %s", address)
|
||||
@@ -110,7 +110,7 @@ func (s *gRPCServer) Connect(address string) (server.Connection, error) {
|
||||
return nil, errors.Errorf("non-tcp addresses are not supported")
|
||||
}
|
||||
|
||||
connection := newConnection(s, tcpAddress, true, stream)
|
||||
connection := newConnection(s, tcpAddress, stream, gRPCClientConnection)
|
||||
|
||||
err = s.onConnectedHandler(connection)
|
||||
if err != nil {
|
||||
|
||||
@@ -29,7 +29,7 @@ func (p *p2pServer) MessageStream(stream protowire.P2P_MessageStreamServer) erro
|
||||
return errors.Errorf("non-tcp connections are not supported")
|
||||
}
|
||||
|
||||
connection := newConnection(p.server, tcpAddress, false, stream)
|
||||
connection := newConnection(p.server, tcpAddress, stream, nil)
|
||||
|
||||
err := p.server.onConnectedHandler(connection)
|
||||
if err != nil {
|
||||
|
||||
@@ -68,7 +68,7 @@ func (s *kaspadService) Execute(args []string, r <-chan svc.ChangeRequest, chang
|
||||
// be properly logged
|
||||
doneChan := make(chan error)
|
||||
startedChan := make(chan struct{})
|
||||
spawn(func() {
|
||||
spawn("kaspadMain-windows", func() {
|
||||
err := kaspadMain(startedChan)
|
||||
doneChan <- err
|
||||
})
|
||||
|
||||
@@ -11,7 +11,7 @@ const validCharacters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrs
|
||||
const (
|
||||
appMajor uint = 0
|
||||
appMinor uint = 6
|
||||
appPatch uint = 6
|
||||
appPatch uint = 8
|
||||
)
|
||||
|
||||
// appBuild is defined as a variable so it can be overridden during the build
|
||||
|
||||
Reference in New Issue
Block a user