Compare commits

...

6 Commits

Author SHA1 Message Date
Ori Newman
225291d451 return error on capacity reached 2020-09-02 18:32:26 +03:00
Ori Newman
a32a9011c7 [NOD-1305] Close client connection on disconnect (#909) 2020-08-31 15:57:11 +03:00
stasatdaglabs
5da957f16e Update to version 0.6.8 2020-08-30 11:31:43 +03:00
Ori Newman
505d264603 [NOD-1322] Fix compilation on windows (#905) 2020-08-27 18:04:54 +03:00
Ori Newman
883361fea3 [NOD-1323] Always save new block reachability data (#906) 2020-08-27 18:03:50 +03:00
stasatdaglabs
13a6872a45 Update to version 0.6.7 2020-08-26 12:13:43 +03:00
10 changed files with 39 additions and 56 deletions

View File

@@ -1,10 +1,10 @@
package ping
import (
"github.com/kaspanet/kaspad/app/protocol/common"
"time"
"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/protocol/common"
peerpkg "github.com/kaspanet/kaspad/app/protocol/peer"
"github.com/kaspanet/kaspad/app/protocol/protocolerrors"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"

View File

@@ -268,6 +268,9 @@ func (rtn *reachabilityTreeNode) addChild(child *reachabilityTreeNode, reindexRo
rtn.children = append(rtn.children, child)
child.parent = rtn
modifiedNodes[rtn] = struct{}{}
modifiedNodes[child] = struct{}{}
// Temporarily set the child's interval to be empty, at
// the start of rtn's remaining interval. This is done
// so that child-of-rtn checks (e.g.
@@ -312,8 +315,6 @@ func (rtn *reachabilityTreeNode) addChild(child *reachabilityTreeNode, reindexRo
return err
}
child.interval = allocated
modifiedNodes[rtn] = struct{}{}
modifiedNodes[child] = struct{}{}
return nil
}

View File

@@ -45,10 +45,6 @@ func newNetConnection(connection server.Connection, routerInitializer RouterInit
netConnection.invalidMessageChan <- err
})
router.SetOnRouteCapacityReachedHandler(func() {
netConnection.Disconnect()
})
routerInitializer(router, netConnection)
return netConnection

View File

@@ -21,11 +21,10 @@ var (
// ErrRouteClosed indicates that a route was closed while reading/writing.
ErrRouteClosed = errors.New("route is closed")
)
// onCapacityReachedHandler is a function that is to be
// called when a route reaches capacity.
type onCapacityReachedHandler func()
// ErrRouteCapacityReached indicates that route's capacity has reached
ErrRouteCapacityReached = protocolerrors.New(false, "route capacity has reached")
)
// Route represents an incoming or outgoing Router route
type Route struct {
@@ -34,8 +33,7 @@ type Route struct {
// reads use the channel's built-in mechanism to check if the channel is closed
closed bool
closeLock sync.Mutex
onCapacityReachedHandler onCapacityReachedHandler
capacity int
}
// NewRoute create a new Route
@@ -45,8 +43,9 @@ func NewRoute() *Route {
func newRouteWithCapacity(capacity int) *Route {
return &Route{
channel: make(chan appmessage.Message, capacity),
closed: false,
channel: make(chan appmessage.Message, capacity),
closed: false,
capacity: capacity,
}
}
@@ -58,8 +57,8 @@ func (r *Route) Enqueue(message appmessage.Message) error {
if r.closed {
return errors.WithStack(ErrRouteClosed)
}
if len(r.channel) == DefaultMaxMessages {
r.onCapacityReachedHandler()
if len(r.channel) == r.capacity {
return errors.Wrapf(ErrRouteCapacityReached, "reached capacity of %d", r.capacity)
}
r.channel <- message
return nil
@@ -88,10 +87,6 @@ func (r *Route) DequeueWithTimeout(timeout time.Duration) (appmessage.Message, e
}
}
func (r *Route) setOnCapacityReachedHandler(onCapacityReachedHandler onCapacityReachedHandler) {
r.onCapacityReachedHandler = onCapacityReachedHandler
}
// Close closes this route
func (r *Route) Close() {
r.closeLock.Lock()

View File

@@ -20,8 +20,6 @@ type Router struct {
incomingRoutesLock sync.RWMutex
outgoingRoute *Route
onRouteCapacityReachedHandler OnRouteCapacityReachedHandler
}
// NewRouter creates a new empty router
@@ -30,18 +28,9 @@ func NewRouter() *Router {
incomingRoutes: make(map[appmessage.MessageCommand]*Route),
outgoingRoute: newRouteWithCapacity(outgoingRouteMaxMessages),
}
router.outgoingRoute.setOnCapacityReachedHandler(func() {
router.onRouteCapacityReachedHandler()
})
return &router
}
// SetOnRouteCapacityReachedHandler sets the onRouteCapacityReachedHandler
// function for this router
func (r *Router) SetOnRouteCapacityReachedHandler(onRouteCapacityReachedHandler OnRouteCapacityReachedHandler) {
r.onRouteCapacityReachedHandler = onRouteCapacityReachedHandler
}
// AddIncomingRoute registers the messages of types `messageTypes` to
// be routed to the given `route`
func (r *Router) AddIncomingRoute(messageTypes []appmessage.MessageCommand) (*Route, error) {
@@ -52,9 +41,6 @@ func (r *Router) AddIncomingRoute(messageTypes []appmessage.MessageCommand) (*Ro
}
r.setIncomingRoute(messageType, route)
}
route.setOnCapacityReachedHandler(func() {
r.onRouteCapacityReachedHandler()
})
return route, nil
}

View File

@@ -13,11 +13,11 @@ import (
)
type gRPCConnection struct {
server *gRPCServer
address *net.TCPAddr
isOutbound bool
stream grpcStream
router *router.Router
server *gRPCServer
address *net.TCPAddr
stream grpcStream
router *router.Router
lowLevelClientConnection *grpc.ClientConn
// streamLock protects concurrent access to stream.
// Note that it's an RWMutex. Despite what the name
@@ -34,14 +34,16 @@ type gRPCConnection struct {
isConnected uint32
}
func newConnection(server *gRPCServer, address *net.TCPAddr, isOutbound bool, stream grpcStream) *gRPCConnection {
func newConnection(server *gRPCServer, address *net.TCPAddr, stream grpcStream,
lowLevelClientConnection *grpc.ClientConn) *gRPCConnection {
connection := &gRPCConnection{
server: server,
address: address,
isOutbound: isOutbound,
stream: stream,
stopChan: make(chan struct{}),
isConnected: 1,
server: server,
address: address,
stream: stream,
stopChan: make(chan struct{}),
isConnected: 1,
lowLevelClientConnection: lowLevelClientConnection,
}
return connection
@@ -83,7 +85,7 @@ func (c *gRPCConnection) SetOnInvalidMessageHandler(onInvalidMessageHandler serv
}
func (c *gRPCConnection) IsOutbound() bool {
return c.isOutbound
return c.lowLevelClientConnection != nil
}
// Disconnect disconnects the connection
@@ -98,7 +100,7 @@ func (c *gRPCConnection) Disconnect() {
close(c.stopChan)
if c.isOutbound {
if c.IsOutbound() {
c.closeSend()
log.Debugf("Disconnected from %s", c)
}
@@ -138,5 +140,8 @@ func (c *gRPCConnection) closeSend() {
defer c.streamLock.Unlock()
clientStream := c.stream.(protowire.P2P_MessageStreamClient)
_ = clientStream.CloseSend() // ignore error because we don't really know what's the status of the connection
// ignore error because we don't really know what's the status of the connection
_ = clientStream.CloseSend()
_ = c.lowLevelClientConnection.Close()
}

View File

@@ -90,12 +90,12 @@ func (s *gRPCServer) Connect(address string) (server.Connection, error) {
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
defer cancel()
gRPCConnection, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBlock())
gRPCClientConnection, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return nil, errors.Wrapf(err, "error connecting to %s", address)
}
client := protowire.NewP2PClient(gRPCConnection)
client := protowire.NewP2PClient(gRPCClientConnection)
stream, err := client.MessageStream(context.Background(), grpc.UseCompressor(gzip.Name))
if err != nil {
return nil, errors.Wrapf(err, "error getting client stream for %s", address)
@@ -110,7 +110,7 @@ func (s *gRPCServer) Connect(address string) (server.Connection, error) {
return nil, errors.Errorf("non-tcp addresses are not supported")
}
connection := newConnection(s, tcpAddress, true, stream)
connection := newConnection(s, tcpAddress, stream, gRPCClientConnection)
err = s.onConnectedHandler(connection)
if err != nil {

View File

@@ -29,7 +29,7 @@ func (p *p2pServer) MessageStream(stream protowire.P2P_MessageStreamServer) erro
return errors.Errorf("non-tcp connections are not supported")
}
connection := newConnection(p.server, tcpAddress, false, stream)
connection := newConnection(p.server, tcpAddress, stream, nil)
err := p.server.onConnectedHandler(connection)
if err != nil {

View File

@@ -68,7 +68,7 @@ func (s *kaspadService) Execute(args []string, r <-chan svc.ChangeRequest, chang
// be properly logged
doneChan := make(chan error)
startedChan := make(chan struct{})
spawn(func() {
spawn("kaspadMain-windows", func() {
err := kaspadMain(startedChan)
doneChan <- err
})

View File

@@ -11,7 +11,7 @@ const validCharacters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrs
const (
appMajor uint = 0
appMinor uint = 6
appPatch uint = 6
appPatch uint = 8
)
// appBuild is defined as a variable so it can be overridden during the build