[NOD-1439] Added Stop command (#940)

* [NOD-1439] Added Stop command

* [NOD-1439] Added comment explaining why we wait before closing the StopChan

* [NOD-1439] Warnf -> Warn

* [NOD-1439] Rename Stop command to Shut Down

* [NOD-1439] Clean up pauseBeforeShutDown

* [NOD-1439] Add ShutDownRequestMessage case for toRPCPayload

* [NOD-1439] Minor stylistic changes
This commit is contained in:
Svarog 2020-09-29 15:59:47 +03:00 committed by GitHub
parent 6ab8ada9ff
commit 22237a4a8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1063 additions and 783 deletions

View File

@ -83,13 +83,13 @@ func (a *App) Stop() {
// New returns a new App instance configured to listen on addr for the
// kaspa network type specified by dagParams. Use start to begin accepting
// connections from peers.
func New(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, interrupt <-chan struct{}) (*App, error) {
func New(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, interrupt chan<- struct{}) (*App, error) {
indexManager, acceptanceIndex := setupIndexes(cfg)
sigCache := txscript.NewSigCache(cfg.SigCacheMaxSize)
// Create a new block DAG instance with the appropriate configuration.
dag, err := setupDAG(cfg, databaseContext, interrupt, sigCache, indexManager)
dag, err := setupDAG(cfg, databaseContext, sigCache, indexManager)
if err != nil {
return nil, err
}
@ -112,7 +112,7 @@ func New(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, interrup
if err != nil {
return nil, err
}
rpcManager := setupRPC(cfg, txMempool, dag, sigCache, netAdapter, protocolManager, connectionManager, addressManager, acceptanceIndex)
rpcManager := setupRPC(cfg, txMempool, dag, sigCache, netAdapter, protocolManager, connectionManager, addressManager, acceptanceIndex, interrupt)
return &App{
cfg: cfg,
@ -122,6 +122,7 @@ func New(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, interrup
netAdapter: netAdapter,
addressManager: addressManager,
}, nil
}
func setupRPC(
@ -133,10 +134,12 @@ func setupRPC(
protocolManager *protocol.Manager,
connectionManager *connmanager.ConnectionManager,
addressManager *addressmanager.AddressManager,
acceptanceIndex *indexers.AcceptanceIndex) *rpc.Manager {
acceptanceIndex *indexers.AcceptanceIndex,
shutDownChan chan<- struct{},
) *rpc.Manager {
blockTemplateGenerator := mining.NewBlkTmplGenerator(&mining.Policy{BlockMaxMass: cfg.BlockMaxMass}, txMempool, dag, sigCache)
rpcManager := rpc.NewManager(cfg, netAdapter, dag, protocolManager, connectionManager, blockTemplateGenerator, txMempool, addressManager, acceptanceIndex)
rpcManager := rpc.NewManager(cfg, netAdapter, dag, protocolManager, connectionManager, blockTemplateGenerator, txMempool, addressManager, acceptanceIndex, shutDownChan)
protocolManager.SetOnBlockAddedToDAGHandler(rpcManager.NotifyBlockAddedToDAG)
protocolManager.SetOnTransactionAddedToMempoolHandler(rpcManager.NotifyTransactionAddedToMempool)
dag.Subscribe(func(notification *blockdag.Notification) {
@ -196,11 +199,10 @@ func (a *App) maybeSeedFromDNS() {
})
}
}
func setupDAG(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, interrupt <-chan struct{},
func setupDAG(cfg *config.Config, databaseContext *dbaccess.DatabaseContext,
sigCache *txscript.SigCache, indexManager blockdag.IndexManager) (*blockdag.BlockDAG, error) {
dag, err := blockdag.New(&blockdag.Config{
Interrupt: interrupt,
DatabaseContext: databaseContext,
DAGParams: cfg.NetParams(),
TimeSource: blockdag.NewTimeSource(),

View File

@ -99,6 +99,8 @@ const (
CmdFinalityConflictResolvedNotificationMessage
CmdGetMempoolEntriesRequestMessage
CmdGetMempoolEntriesResponseMessage
CmdShutDownRequestMessage
CmdShutDownResponseMessage
)
// ProtocolMessageCommandToString maps all MessageCommands to their string representation

View File

@ -0,0 +1,34 @@
package appmessage
// ShutDownRequestMessage is an appmessage corresponding to
// its respective RPC message
type ShutDownRequestMessage struct {
baseMessage
}
// Command returns the protocol command string for the message
func (msg *ShutDownRequestMessage) Command() MessageCommand {
return CmdShutDownRequestMessage
}
// NewShutDownRequestMessage returns a instance of the message
func NewShutDownRequestMessage() *ShutDownRequestMessage {
return &ShutDownRequestMessage{}
}
// ShutDownResponseMessage is an appmessage corresponding to
// its respective RPC message
type ShutDownResponseMessage struct {
baseMessage
Error *RPCError
}
// Command returns the protocol command string for the message
func (msg *ShutDownResponseMessage) Command() MessageCommand {
return CmdShutDownResponseMessage
}
// NewShutDownResponseMessage returns a instance of the message
func NewShutDownResponseMessage() *ShutDownResponseMessage {
return &ShutDownResponseMessage{}
}

View File

@ -31,7 +31,8 @@ func NewManager(
blockTemplateGenerator *mining.BlkTmplGenerator,
mempool *mempool.TxPool,
addressManager *addressmanager.AddressManager,
acceptanceIndex *indexers.AcceptanceIndex) *Manager {
acceptanceIndex *indexers.AcceptanceIndex,
shutDownChan chan<- struct{}) *Manager {
manager := Manager{
context: rpccontext.NewContext(
@ -44,6 +45,7 @@ func NewManager(
mempool,
addressManager,
acceptanceIndex,
shutDownChan,
),
}
netAdapter.SetRPCRouterInitializer(manager.routerInitializer)

View File

@ -32,6 +32,7 @@ var handlers = map[appmessage.MessageCommand]handler{
appmessage.CmdResolveFinalityConflictRequestMessage: rpchandlers.HandleResolveFinalityConflict,
appmessage.CmdNotifyFinalityConflictsRequestMessage: rpchandlers.HandleNotifyFinalityConflicts,
appmessage.CmdGetMempoolEntriesRequestMessage: rpchandlers.HandleGetMempoolEntries,
appmessage.CmdShutDownRequestMessage: rpchandlers.HandleShutDown,
}
func (m *Manager) routerInitializer(router *router.Router, netConnection *netadapter.NetConnection) {

View File

@ -23,14 +23,14 @@ type Context struct {
Mempool *mempool.TxPool
AddressManager *addressmanager.AddressManager
AcceptanceIndex *indexers.AcceptanceIndex
ShutDownChan chan<- struct{}
BlockTemplateState *BlockTemplateState
NotificationManager *NotificationManager
}
// NewContext creates a new RPC context
func NewContext(
cfg *config.Config,
func NewContext(cfg *config.Config,
netAdapter *netadapter.NetAdapter,
dag *blockdag.BlockDAG,
protocolManager *protocol.Manager,
@ -38,7 +38,9 @@ func NewContext(
blockTemplateGenerator *mining.BlkTmplGenerator,
mempool *mempool.TxPool,
addressManager *addressmanager.AddressManager,
acceptanceIndex *indexers.AcceptanceIndex) *Context {
acceptanceIndex *indexers.AcceptanceIndex,
shutDownChan chan<- struct{}) *Context {
context := &Context{
Config: cfg,
NetAdapter: netAdapter,
@ -49,6 +51,7 @@ func NewContext(
Mempool: mempool,
AddressManager: addressManager,
AcceptanceIndex: acceptanceIndex,
ShutDownChan: shutDownChan,
}
context.BlockTemplateState = NewBlockTemplateState(context)
context.NotificationManager = NewNotificationManager()

View File

@ -2,6 +2,8 @@ package rpchandlers
import (
"github.com/kaspanet/kaspad/infrastructure/logger"
"github.com/kaspanet/kaspad/util/panics"
)
var log, _ = logger.Get(logger.SubsystemTags.RPCS)
var spawn = panics.GoroutineWrapperFunc(log)

View File

@ -0,0 +1,25 @@
package rpchandlers
import (
"time"
"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/app/rpc/rpccontext"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
)
const pauseBeforeShutDown = time.Second
// HandleShutDown handles the respectively named RPC command
func HandleShutDown(context *rpccontext.Context, _ *router.Router, _ appmessage.Message) (appmessage.Message, error) {
log.Warn("ShutDown RPC called.")
// Wait a second before shutting down, to allow time to return the response to the caller
spawn("HandleShutDown-pauseAndShutDown", func() {
<-time.After(pauseBeforeShutDown)
close(context.ShutDownChan)
})
response := appmessage.NewShutDownResponseMessage()
return response, nil
}

View File

@ -9,13 +9,6 @@ import (
// Config is a descriptor which specifies the blockDAG instance configuration.
type Config struct {
// Interrupt specifies a channel the caller can close to signal that
// long running operations, such as catching up indexes or performing
// database migrations, should be interrupted.
//
// This field can be nil if the caller does not desire the behavior.
Interrupt <-chan struct{}
// DAGParams identifies which DAG parameters the DAG is associated
// with.
//

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.25.0-devel
// protoc v3.6.1
// protoc-gen-go v1.25.0
// protoc v3.12.3
// source: peer_service.proto
package pb

View File

@ -72,6 +72,8 @@ message KaspadMessage {
FinalityConflictResolvedNotificationMessage finalityConflictResolvedNotification = 1042;
GetMempoolEntriesRequestMessage getMempoolEntriesRequest = 1043;
GetMempoolEntriesResponseMessage getMempoolEntriesResponse = 1044;
ShutDownRequestMessage shutDownRequest = 1045;
ShutDownResponseMessage shutDownResponse = 1046;
}
}
@ -612,6 +614,13 @@ message FinalityConflictResolvedNotificationMessage{
string finalityBlockHash = 1;
}
message ShutDownRequestMessage{
}
message ShutDownResponseMessage{
RPCError error = 1000;
}
service RPC {
rpc MessageStream (stream KaspadMessage) returns (stream KaspadMessage) {}
}

View File

@ -0,0 +1,33 @@
package protowire
import "github.com/kaspanet/kaspad/app/appmessage"
func (x *KaspadMessage_ShutDownRequest) toAppMessage() (appmessage.Message, error) {
return &appmessage.ShutDownRequestMessage{}, nil
}
func (x *KaspadMessage_ShutDownRequest) fromAppMessage(_ *appmessage.ShutDownRequestMessage) error {
x.ShutDownRequest = &ShutDownRequestMessage{}
return nil
}
func (x *KaspadMessage_ShutDownResponse) toAppMessage() (appmessage.Message, error) {
var err *appmessage.RPCError
if x.ShutDownResponse.Error != nil {
err = &appmessage.RPCError{Message: x.ShutDownResponse.Error.Message}
}
return &appmessage.ShutDownResponseMessage{
Error: err,
}, nil
}
func (x *KaspadMessage_ShutDownResponse) fromAppMessage(message *appmessage.ShutDownResponseMessage) error {
var err *RPCError
if message.Error != nil {
err = &RPCError{Message: message.Error.Message}
}
x.ShutDownResponse = &ShutDownResponseMessage{
Error: err,
}
return nil
}

View File

@ -520,6 +520,20 @@ func toRPCPayload(message appmessage.Message) (isKaspadMessage_Payload, error) {
return nil, err
}
return payload, nil
case *appmessage.ShutDownRequestMessage:
payload := new(KaspadMessage_ShutDownRequest)
err := payload.fromAppMessage(message)
if err != nil {
return nil, err
}
return payload, nil
case *appmessage.ShutDownResponseMessage:
payload := new(KaspadMessage_ShutDownResponse)
err := payload.fromAppMessage(message)
if err != nil {
return nil, err
}
return payload, nil
default:
return nil, nil
}

View File

@ -20,7 +20,7 @@ var interruptSignals = []os.Signal{os.Interrupt}
// InterruptListener listens for OS Signals such as SIGINT (Ctrl+C) and shutdown
// requests from shutdownRequestChannel. It returns a channel that is closed
// when either signal is received.
func InterruptListener() <-chan struct{} {
func InterruptListener() chan struct{} {
c := make(chan struct{})
go func() {
interruptChannel := make(chan os.Signal, 1)