Compare commits

...

2 Commits

Author SHA1 Message Date
stasatdaglabs
69d6148515 [NOD-1367] Fix race condition with notification listeners. 2020-09-10 15:19:22 +03:00
stasatdaglabs
b73d9f4045 [NOD-1367] Add an error handler to GRPCClient. 2020-09-10 14:08:19 +03:00
2 changed files with 18 additions and 5 deletions

View File

@@ -44,8 +44,12 @@ func (m *Manager) routerInitializer(router *router.Router, netConnection *netada
err := m.handleIncomingMessages(router, incomingRoute)
m.handleError(err, netConnection)
})
notificationListener := m.context.NotificationManager.AddListener(router)
spawn("routerInitializer-handleOutgoingNotifications", func() {
err := m.handleOutgoingNotifications(router)
defer m.context.NotificationManager.RemoveListener(router)
err := m.handleOutgoingNotifications(notificationListener)
m.handleError(err, netConnection)
})
}
@@ -72,9 +76,7 @@ func (m *Manager) handleIncomingMessages(router *router.Router, incomingRoute *r
}
}
func (m *Manager) handleOutgoingNotifications(router *router.Router) error {
notificationListener := m.context.NotificationManager.AddListener(router)
defer m.context.NotificationManager.RemoveListener(router)
func (m *Manager) handleOutgoingNotifications(notificationListener *rpccontext.NotificationListener) error {
for {
err := notificationListener.ProcessNextNotification()
if err != nil {

View File

@@ -12,9 +12,12 @@ import (
"time"
)
type OnErrorHandler func(err error)
// GRPCClient is a gRPC-based RPC client
type GRPCClient struct {
stream protowire.RPC_MessageStreamClient
stream protowire.RPC_MessageStreamClient
onErrorHandler OnErrorHandler
}
// Connect connects to the RPC server with the given address
@@ -41,6 +44,10 @@ func (c *GRPCClient) Disconnect() error {
return c.stream.CloseSend()
}
func (c *GRPCClient) SetOnErrorHandler(onErrorHandler OnErrorHandler) {
c.onErrorHandler = onErrorHandler
}
// AttachRouter attaches the given router to the client and starts
// sending/receiving messages via it
func (c *GRPCClient) AttachRouter(router *router.Router) {
@@ -101,5 +108,9 @@ func (c *GRPCClient) handleError(err error) {
}
return
}
if c.onErrorHandler != nil {
c.onErrorHandler(err)
return
}
panic(err)
}