kaspad/netadapter/server/grpcserver/connection_loops.go
stasatdaglabs 05db135d23
[NOD-1124] Implement the Flow thread model and architecture (#791)
* [NOD-1124] Move Router to the router package.

* [NOD-1124] Implement SetOnRouteCapacityReachedHandler.

* [NOD-1124] Use Routes instead of bare channels.

* [NOD-1124] Fix merge errors.

* [NOD-1124] Connect the Router to the Connection.

* [NOD-1124] Fix merge errors.

* [NOD-1124] Move some variables around.

* [NOD-1124] Fix unreachable code.

* [NOD-1124] Fix a variable name.

* [NOD-1124] Rename AddRoute to AddIncomingRoute.

* [NOD-1124] Rename SetRouter to Start.

* [NOD-1124] Make AddIncomingRoute create a Route by itself.

* [NOD-1124] Replace IncomingRoute with EnqueueIncomingMessage.

* [NOD-1124] Make Enqueue and Dequeue return isOpen instead of err.

* [NOD-1124] Remove writeDuringDisconnectLock.

* [NOD-1124] In sendLoop, move outgoingRoute to outside the loop.

* [NOD-1124] Start the connection loops only when Start is called.

* [NOD-1124] Replace OnIDReceivedHandler with AssociateRouterID.

* [NOD-1124] Add isOpen to Enqueue and Dequeue.

* [NOD-1124] Protect errChan from writing during disconnect.
2020-07-13 16:51:13 +03:00

83 lines
1.6 KiB
Go

package grpcserver
import (
"io"
"github.com/kaspanet/kaspad/netadapter/server/grpcserver/protowire"
)
type grpcStream interface {
Send(*protowire.KaspadMessage) error
Recv() (*protowire.KaspadMessage, error)
}
func (c *gRPCConnection) connectionLoops() error {
errChan := make(chan error, 1) // buffered channel because one of the loops might try write after disconnect
spawn(func() { errChan <- c.receiveLoop() })
spawn(func() { errChan <- c.sendLoop() })
err := <-errChan
disconnectErr := c.Disconnect()
if disconnectErr != nil {
log.Errorf("Error from disconnect: %s", disconnectErr)
}
return err
}
func (c *gRPCConnection) sendLoop() error {
outgoingRoute := c.router.OutgoingRoute()
for c.IsConnected() {
message, isOpen := outgoingRoute.Dequeue()
if !isOpen {
return nil
}
messageProto, err := protowire.FromWireMessage(message)
if err != nil {
return err
}
err = func() error {
c.writeToErrChanDuringDisconnectLock.Lock()
defer c.writeToErrChanDuringDisconnectLock.Unlock()
err := c.stream.Send(messageProto)
if c.IsConnected() {
c.errChan <- err
if err != nil {
return err
}
}
return nil
}()
if err != nil {
return err
}
}
return nil
}
func (c *gRPCConnection) receiveLoop() error {
for c.IsConnected() {
protoMessage, err := c.stream.Recv()
if err != nil {
if err == io.EOF {
err = nil
}
return err
}
message, err := protoMessage.ToWireMessage()
if err != nil {
return err
}
isOpen, err := c.router.EnqueueIncomingMessage(message)
if err != nil {
return err
}
if !isOpen {
return nil
}
}
return nil
}