[NOD-1547] Make the SendAddresses flow not one-time for the sake of DNSSeeder (#1104)

* [NOD-1547] Make the SendAddresses flow not one-time for the sake of DNSSeeder.

* [NOD-1547] Add all special commands to chooseRouteForCommand.
This commit is contained in:
stasatdaglabs 2020-11-17 18:05:14 +02:00 committed by GitHub
parent 7479f5f5e8
commit 184911f76e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 33 additions and 14 deletions

View File

@ -16,20 +16,25 @@ type SendAddressesContext interface {
// SendAddresses sends addresses to a peer that requests it.
func SendAddresses(context SendAddressesContext, incomingRoute *router.Route, outgoingRoute *router.Route) error {
message, err := incomingRoute.Dequeue()
if err != nil {
return err
}
for {
message, err := incomingRoute.Dequeue()
if err != nil {
return err
}
_, ok := message.(*appmessage.MsgRequestAddresses)
if !ok {
return protocolerrors.Errorf(true, "unexpected message. "+
"Expected: %s, got: %s", appmessage.CmdRequestAddresses, message.Command())
}
addresses := context.AddressManager().Addresses()
msgAddresses := appmessage.NewMsgAddresses(shuffleAddresses(addresses))
_, ok := message.(*appmessage.MsgRequestAddresses)
if !ok {
return protocolerrors.Errorf(true, "unexpected message. "+
"Expected: %s, got: %s", appmessage.CmdRequestAddresses, message.Command())
}
addresses := context.AddressManager().Addresses()
msgAddresses := appmessage.NewMsgAddresses(shuffleAddresses(addresses))
return outgoingRoute.Enqueue(msgAddresses)
err = outgoingRoute.Enqueue(msgAddresses)
if err != nil {
return err
}
}
}
// shuffleAddresses randomizes the given addresses sent if there are more than the maximum allowed in one message.

View File

@ -118,7 +118,7 @@ func (m *Manager) registerAddressFlows(router *routerpkg.Router, isStopping *uin
outgoingRoute := router.OutgoingRoute()
return []*flow{
m.registerOneTimeFlow("SendAddresses", router, []appmessage.MessageCommand{appmessage.CmdRequestAddresses}, isStopping, errChan,
m.registerFlow("SendAddresses", router, []appmessage.MessageCommand{appmessage.CmdRequestAddresses}, isStopping, errChan,
func(incomingRoute *routerpkg.Route, peer *peerpkg.Peer) error {
return addressexchange.SendAddresses(m.context, incomingRoute, outgoingRoute)
},

View File

@ -25,7 +25,8 @@ type Routes struct {
func (r *Routes) WaitForMessageOfType(command appmessage.MessageCommand, timeout time.Duration) (appmessage.Message, error) {
timeoutTime := time.Now().Add(timeout)
for {
message, err := r.IncomingRoute.DequeueWithTimeout(timeoutTime.Sub(time.Now()))
route := r.chooseRouteForCommand(command)
message, err := route.DequeueWithTimeout(timeoutTime.Sub(time.Now()))
if err != nil {
return nil, errors.Wrapf(err, "error waiting for message of type %s", command)
}
@ -35,6 +36,19 @@ func (r *Routes) WaitForMessageOfType(command appmessage.MessageCommand, timeout
}
}
func (r *Routes) chooseRouteForCommand(command appmessage.MessageCommand) *router.Route {
switch command {
case appmessage.CmdVersion, appmessage.CmdVerAck:
return r.handshakeRoute
case appmessage.CmdRequestAddresses, appmessage.CmdAddresses:
return r.addressesRoute
case appmessage.CmdPing:
return r.pingRoute
default:
return r.IncomingRoute
}
}
// WaitForDisconnect waits for a disconnect up to `timeout`, skipping all messages received while waiting
func (r *Routes) WaitForDisconnect(timeout time.Duration) error {
timeoutTime := time.Now().Add(timeout)