From 3d942ce355ddaa768ac3621c459df48faf831579 Mon Sep 17 00:00:00 2001 From: Svarog Date: Thu, 30 Jul 2020 10:47:56 +0300 Subject: [PATCH] [NOD-1162] Integration test (#822) * [NOD-1162] Separate kaspad to it's own package, so that I can use it out of integration test * [NOD-1162] Begin integration tests * [NOD-1162] [FIX] Assign cfg to RPCServer * [NOD-1162] Basic integration test ready * [NOD-1162] Wait for connection for real * [NOD-1162] [FIX] Connection manager should run the moment it adds a request * [NOD-1162] Make connect something that can be invoked in middle of test * [NOD-1162] Complete first integration test * [NOD-1162] Undo refactor error * [NOD-1162] Rename Kaspad to App * [NOD-1162] Convert checking connection to polling * [NOD-1162] [FIX] Set peerID on handshake * [NOD-1162] [FIX] Broadcast should send to outgoing route, not incoming * [NOD-1162] [FIX] Add CmdInvRelayBlock to MakeEmptyMessage * [NOD-1162] [FIX] Initialize Hash before decoding MsgInvRelayBlock * [NOD-1162] [FIX] Invert condition * [NOD-1162] [FIX] Fixes to encoding of MsgGetRelayBlocks * [NOD-1162] [FIX] Add MsgGetRelayBlocks to MakeEmptyMessage * [NOD-1162] [FIX] Connection manager should run the moment it adds a request * [NOD-1162] [FIX] Set peerID on handshake * [NOD-1162] [FIX] Broadcast should send to outgoing route, not incoming * [NOD-1162] [FIX] Add CmdInvRelayBlock to MakeEmptyMessage * [NOD-1162] [FIX] Initialize Hash before decoding MsgInvRelayBlock * [NOD-1162] [FIX] Invert condition * [NOD-1162] [FIX] Fixes to encoding of MsgGetRelayBlocks * [NOD-1162] [FIX] Add MsgGetRelayBlocks to MakeEmptyMessage * [NOD-1162] Add comment * [NOD-1162] Added support for 3 nodes and clients in integration tests * [NOD-1162] Add third node to integration test * [NOD-1192] Use lock-less functions in TxPool.HandleNewBlock * [NOD-1192] Broadcast transactions only if there's more then 0 * [NOD-1162] Removed double waitTillNextIteration * [NOD-1192] Rename: broadcastTransactions -> broadcastTransactionsAfterBlockAdded * [NOD-1162] Call NotifyBlocks on client3 as well * [NOD-1162] ErrTimeout and ErrRouteClosed should be ProtocolErrors * [NOD-1162] Added comment and removed redundant type PeerAddedCallback * [NOD-1162] Revert overly eager rename * [NOD-1162] Move DisalbeTLS to common config + minimize call for ioutil.TempDir() * [NOD-1162] Add some clarifications in code * [NOD-1193] Skip closed connections in NetAdapter.Broadcast * [NOD-1193] Make sure to protect connectionsToRouters from concurrent access * [NOD-1162] Add _test to all files in integration package * [NOD-1162] Introduced appHarness to better encapsulate a single node * [NOD-1162] Removed onChainChanged handler * [NOD-1162] Remove redundant closure * [NOD-1162] Correctly mark integration_test config as Simnet * [NOD-1162] Rename app.ID -> app.P2PNodeID --- kaspad.go => app/app.go | 114 ++++++++++++++------------- app/log.go | 14 ++++ doc.go | 2 +- integration/config_test.go | 64 +++++++++++++++ integration/connect_test.go | 71 +++++++++++++++++ integration/integration_test.go | 76 ++++++++++++++++++ integration/log_test.go | 14 ++++ integration/mining_test.go | 25 ++++++ integration/rpc_test.go | 37 +++++++++ integration/setup_test.go | 94 ++++++++++++++++++++++ main.go | 15 ++-- netadapter/router/route.go | 7 +- protocol/flowcontext/flow_context.go | 5 +- protocol/flowcontext/network.go | 1 + rpc/client/net.go | 13 +-- rpc/rpcserver.go | 12 +-- service_windows.go | 2 +- 17 files changed, 491 insertions(+), 75 deletions(-) rename kaspad.go => app/app.go (74%) create mode 100644 app/log.go create mode 100644 integration/config_test.go create mode 100644 integration/connect_test.go create mode 100644 integration/integration_test.go create mode 100644 integration/log_test.go create mode 100644 integration/mining_test.go create mode 100644 integration/rpc_test.go create mode 100644 integration/setup_test.go diff --git a/kaspad.go b/app/app.go similarity index 74% rename from kaspad.go rename to app/app.go index 821701d60..f1ecba746 100644 --- a/kaspad.go +++ b/app/app.go @@ -1,99 +1,86 @@ -package main +package app import ( "fmt" - "github.com/kaspanet/kaspad/addressmanager" "sync/atomic" - "github.com/kaspanet/kaspad/dbaccess" + "github.com/kaspanet/kaspad/addressmanager" - "github.com/kaspanet/kaspad/dnsseed" - "github.com/kaspanet/kaspad/wire" - - "github.com/kaspanet/kaspad/connmanager" - - "github.com/kaspanet/kaspad/netadapter" - - "github.com/kaspanet/kaspad/util/panics" + "github.com/kaspanet/kaspad/netadapter/id" "github.com/kaspanet/kaspad/blockdag" "github.com/kaspanet/kaspad/blockdag/indexers" "github.com/kaspanet/kaspad/config" + "github.com/kaspanet/kaspad/connmanager" + "github.com/kaspanet/kaspad/dbaccess" + "github.com/kaspanet/kaspad/dnsseed" "github.com/kaspanet/kaspad/mempool" "github.com/kaspanet/kaspad/mining" + "github.com/kaspanet/kaspad/netadapter" "github.com/kaspanet/kaspad/protocol" "github.com/kaspanet/kaspad/rpc" "github.com/kaspanet/kaspad/signal" "github.com/kaspanet/kaspad/txscript" "github.com/kaspanet/kaspad/util" + "github.com/kaspanet/kaspad/util/panics" + "github.com/kaspanet/kaspad/wire" ) -// kaspad is a wrapper for all the kaspad services -type kaspad struct { +// App is a wrapper for all the kaspad services +type App struct { cfg *config.Config rpcServer *rpc.Server addressManager *addressmanager.AddressManager protocolManager *protocol.Manager connectionManager *connmanager.ConnectionManager + netAdapter *netadapter.NetAdapter started, shutdown int32 } -// start launches all the kaspad services. -func (k *kaspad) start() { +// Start launches all the kaspad services. +func (a *App) Start() { // Already started? - if atomic.AddInt32(&k.started, 1) != 1 { + if atomic.AddInt32(&a.started, 1) != 1 { return } log.Trace("Starting kaspad") - err := k.protocolManager.Start() + err := a.protocolManager.Start() if err != nil { panics.Exit(log, fmt.Sprintf("Error starting the p2p protocol: %+v", err)) } - k.maybeSeedFromDNS() + a.maybeSeedFromDNS() - k.connectionManager.Start() + a.connectionManager.Start() - if !k.cfg.DisableRPC { - k.rpcServer.Start() + if !a.cfg.DisableRPC { + a.rpcServer.Start() } } -func (k *kaspad) maybeSeedFromDNS() { - if !k.cfg.DisableDNSSeed { - dnsseed.SeedFromDNS(k.cfg.NetParams(), k.cfg.DNSSeed, wire.SFNodeNetwork, false, nil, - k.cfg.Lookup, func(addresses []*wire.NetAddress) { - // Kaspad uses a lookup of the dns seeder here. Since seeder returns - // IPs of nodes and not its own IP, we can not know real IP of - // source. So we'll take first returned address as source. - k.addressManager.AddAddresses(addresses, addresses[0], nil) - }) - } -} - -// stop gracefully shuts down all the kaspad services. -func (k *kaspad) stop() error { +// Stop gracefully shuts down all the kaspad services. +func (a *App) Stop() error { // Make sure this only happens once. - if atomic.AddInt32(&k.shutdown, 1) != 1 { + if atomic.AddInt32(&a.shutdown, 1) != 1 { log.Infof("Kaspad is already in the process of shutting down") return nil } log.Warnf("Kaspad shutting down") - k.connectionManager.Stop() + a.connectionManager.Stop() - err := k.protocolManager.Stop() + err := a.protocolManager.Stop() if err != nil { log.Errorf("Error stopping the p2p protocol: %+v", err) } // Shutdown the RPC server if it's not disabled. - if !k.cfg.DisableRPC { - err := k.rpcServer.Stop() + if !a.cfg.DisableRPC { + err := a.rpcServer.Stop() if err != nil { log.Errorf("Error stopping rpcServer: %+v", err) } @@ -102,10 +89,10 @@ func (k *kaspad) stop() error { return nil } -// newKaspad returns a new kaspad instance configured to listen on addr for the +// New returns a new App instance configured to listen on addr for the // kaspa network type specified by dagParams. Use start to begin accepting // connections from peers. -func newKaspad(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, interrupt <-chan struct{}) (*kaspad, error) { +func New(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, interrupt <-chan struct{}) (*App, error) { indexManager, acceptanceIndex := setupIndexes(cfg) sigCache := txscript.NewSigCache(cfg.SigCacheMaxSize) @@ -133,21 +120,32 @@ func newKaspad(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, in if err != nil { return nil, err } - - rpcServer, err := setupRPC(cfg, dag, txMempool, sigCache, acceptanceIndex, - connectionManager, addressManager, protocolManager) + rpcServer, err := setupRPC( + cfg, dag, txMempool, sigCache, acceptanceIndex, connectionManager, addressManager, protocolManager) if err != nil { return nil, err } - return &kaspad{ + return &App{ cfg: cfg, rpcServer: rpcServer, protocolManager: protocolManager, connectionManager: connectionManager, + netAdapter: netAdapter, }, nil } +func (a *App) maybeSeedFromDNS() { + if !a.cfg.DisableDNSSeed { + dnsseed.SeedFromDNS(a.cfg.NetParams(), a.cfg.DNSSeed, wire.SFNodeNetwork, false, nil, + a.cfg.Lookup, func(addresses []*wire.NetAddress) { + // Kaspad uses a lookup of the dns seeder here. Since seeder returns + // IPs of nodes and not its own IP, we can not know real IP of + // source. So we'll take first returned address as source. + a.addressManager.AddAddresses(addresses, addresses[0], nil) + }) + } +} func setupDAG(cfg *config.Config, databaseContext *dbaccess.DatabaseContext, interrupt <-chan struct{}, sigCache *txscript.SigCache, indexManager blockdag.IndexManager) (*blockdag.BlockDAG, error) { @@ -200,9 +198,14 @@ func setupMempool(cfg *config.Config, dag *blockdag.BlockDAG, sigCache *txscript return mempool.New(&mempoolConfig) } -func setupRPC(cfg *config.Config, dag *blockdag.BlockDAG, txMempool *mempool.TxPool, sigCache *txscript.SigCache, - acceptanceIndex *indexers.AcceptanceIndex, connectionManager *connmanager.ConnectionManager, - addressManager *addressmanager.AddressManager, protocolManager *protocol.Manager) (*rpc.Server, error) { +func setupRPC(cfg *config.Config, + dag *blockdag.BlockDAG, + txMempool *mempool.TxPool, + sigCache *txscript.SigCache, + acceptanceIndex *indexers.AcceptanceIndex, + connectionManager *connmanager.ConnectionManager, + addressManager *addressmanager.AddressManager, + protocolManager *protocol.Manager) (*rpc.Server, error) { if !cfg.DisableRPC { policy := mining.Policy{ @@ -227,8 +230,13 @@ func setupRPC(cfg *config.Config, dag *blockdag.BlockDAG, txMempool *mempool.TxP return nil, nil } -// WaitForShutdown blocks until the main listener and peer handlers are stopped. -func (k *kaspad) WaitForShutdown() { - // TODO(libp2p) - // k.p2pServer.WaitForShutdown() +// P2PNodeID returns the network ID associated with this App +func (a *App) P2PNodeID() *id.ID { + return a.netAdapter.ID() +} + +// WaitForShutdown blocks until the main listener and peer handlers are stopped. +func (a *App) WaitForShutdown() { + // TODO(libp2p) + // a.p2pServer.WaitForShutdown() } diff --git a/app/log.go b/app/log.go new file mode 100644 index 000000000..19f956012 --- /dev/null +++ b/app/log.go @@ -0,0 +1,14 @@ +// Copyright (c) 2013-2017 The btcsuite developers +// Copyright (c) 2017 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package app + +import ( + "github.com/kaspanet/kaspad/logger" + "github.com/kaspanet/kaspad/util/panics" +) + +var log, _ = logger.Get(logger.SubsystemTags.KASD) +var spawn = panics.GoroutineWrapperFunc(log) diff --git a/doc.go b/doc.go index 972db3949..09eb220a9 100644 --- a/doc.go +++ b/doc.go @@ -6,7 +6,7 @@ Copyright (c) 2013-2014 Conformal Systems LLC. Use of this source code is governed by an ISC license that can be found in the LICENSE file. -kaspad is a full-node kaspa implementation written in Go. +Kaspad is a full-node kaspa implementation written in Go. The default options are sane for most users. This means kaspad will work 'out of the box' for most users. However, there are also a wide variety of flags that diff --git a/integration/config_test.go b/integration/config_test.go new file mode 100644 index 000000000..3a3f651eb --- /dev/null +++ b/integration/config_test.go @@ -0,0 +1,64 @@ +package integration + +import ( + "io/ioutil" + "testing" + "time" + + "github.com/kaspanet/kaspad/config" + "github.com/kaspanet/kaspad/dagconfig" +) + +const ( + p2pAddress1 = "127.0.0.1:54321" + p2pAddress2 = "127.0.0.1:54322" + p2pAddress3 = "127.0.0.1:54323" + + rpcAddress1 = "127.0.0.1:12345" + rpcAddress2 = "127.0.0.1:12346" + rpcAddress3 = "127.0.0.1:12347" + + rpcUser = "user" + rpcPass = "pass" + + testAddress1 = "kaspasim:qz3tm5pew9lrdpnn8kytgtm6a0mx772j4uw02snetn" + testAddress1PK = "69f470ff9cd4010de7f4a95161867c49834435423526d9bab83781821cdf95bf" + + testAddress2 = "kaspasim:qqdf0vrh3u576eqzkp0s8qagc04tuj2xnu4sfskhx0" + testAddress2PK = "aed46ef760223032d2641e086dd48d0b0a4d581811e68ccf15bed2b8fe87348e" + + testAddress3 = "kaspasim:qq2wz0hl73a0qcl8872wr3djplwmyulurscsqxehu2" + testAddress3PK = "cc94a79bbccca30b0e3edff1895cbdf8d4ddcc119eacfd692970151dcc2881c2" + + defaultTimeout = 10 * time.Second +) + +func setConfig(t *testing.T, harness *appHarness) { + harness.config = commonConfig() + harness.config.DataDir = randomDirectory(t) + harness.config.Listeners = []string{harness.p2pAddress} + harness.config.RPCListeners = []string{harness.rpcAddress} +} + +func commonConfig() *config.Config { + commonConfig := config.DefaultConfig() + + commonConfig.ActiveNetParams = &dagconfig.SimnetParams + commonConfig.TargetOutboundPeers = 0 + commonConfig.DisableDNSSeed = true + commonConfig.RPCUser = rpcUser + commonConfig.RPCPass = rpcPass + commonConfig.DisableTLS = true + commonConfig.Simnet = true + + return commonConfig +} + +func randomDirectory(t *testing.T) string { + dir, err := ioutil.TempDir("", "integration-test") + if err != nil { + t.Fatalf("Error creating temporary directory for test: %+v", err) + } + + return dir +} diff --git a/integration/connect_test.go b/integration/connect_test.go new file mode 100644 index 000000000..baba823f0 --- /dev/null +++ b/integration/connect_test.go @@ -0,0 +1,71 @@ +package integration + +import ( + "testing" + "time" +) + +func connect(t *testing.T, appHarness1, appHarness2 *appHarness) { + err := appHarness2.rpcClient.ConnectNode(appHarness1.p2pAddress) + if err != nil { + t.Fatalf("Error connecting the nodes") + } + + onConnectedChan := make(chan struct{}) + abortConnectionChan := make(chan struct{}) + defer close(abortConnectionChan) + + spawn("integration.connect-Wait for connection", func() { + for range time.Tick(10 * time.Millisecond) { + if isConnected(t, appHarness1, appHarness2) { + close(onConnectedChan) + return + } + + select { + case <-abortConnectionChan: + return + default: + } + } + }) + + select { + case <-onConnectedChan: + case <-time.After(defaultTimeout): + t.Fatalf("Timed out waiting for the apps to connect") + } +} +func isConnected(t *testing.T, appHarness1, appHarness2 *appHarness) bool { + connectedPeerInfo1, err := appHarness1.rpcClient.GetConnectedPeerInfo() + if err != nil { + t.Fatalf("Error getting connected peer info for app1: %+v", err) + } + connectedPeerInfo2, err := appHarness2.rpcClient.GetConnectedPeerInfo() + if err != nil { + t.Fatalf("Error getting connected peer info for app2: %+v", err) + } + + var app1Connected, app2Connected bool + app1ID, app2ID := appHarness1.app.P2PNodeID().String(), appHarness2.app.P2PNodeID().String() + + for _, connectedPeer := range connectedPeerInfo1 { + if connectedPeer.ID == app2ID { + app1Connected = true + break + } + } + + for _, connectedPeer := range connectedPeerInfo2 { + if connectedPeer.ID == app1ID { + app2Connected = true + break + } + } + + if (app1Connected && !app2Connected) || (!app1Connected && app2Connected) { + t.Fatalf("app1Connected is %t while app2Connected is %t", app1Connected, app2Connected) + } + + return app1Connected && app2Connected +} diff --git a/integration/integration_test.go b/integration/integration_test.go new file mode 100644 index 000000000..e6d737714 --- /dev/null +++ b/integration/integration_test.go @@ -0,0 +1,76 @@ +package integration + +import ( + "testing" + "time" + + "github.com/kaspanet/kaspad/wire" + + clientpkg "github.com/kaspanet/kaspad/rpc/client" +) + +func TestIntegrationBasicSync(t *testing.T) { + appHarness1, appHarness2, appHarness3, teardown := setup(t) + defer teardown() + + // Connect nodes in chain: 1 <--> 2 <--> 3 + // So that node 3 doesn't directly get blocks from node 1 + connect(t, appHarness1, appHarness2) + connect(t, appHarness2, appHarness3) + + blockTemplate, err := appHarness1.rpcClient.GetBlockTemplate(testAddress1, "") + if err != nil { + t.Fatalf("Error getting block template: %+v", err) + } + + block, err := clientpkg.ConvertGetBlockTemplateResultToBlock(blockTemplate) + if err != nil { + t.Fatalf("Error parsing blockTemplate: %s", err) + } + + solveBlock(t, block) + + err = appHarness2.rpcClient.NotifyBlocks() + if err != nil { + t.Fatalf("Error from NotifyBlocks: %+v", err) + } + app2OnBlockAddedChan := make(chan *wire.BlockHeader) + appHarness2.rpcClient.onBlockAdded = func(header *wire.BlockHeader) { + app2OnBlockAddedChan <- header + } + + err = appHarness3.rpcClient.NotifyBlocks() + if err != nil { + t.Fatalf("Error from NotifyBlocks: %+v", err) + } + app3OnBlockAddedChan := make(chan *wire.BlockHeader) + appHarness3.rpcClient.onBlockAdded = func(header *wire.BlockHeader) { + app3OnBlockAddedChan <- header + } + + err = appHarness1.rpcClient.SubmitBlock(block, nil) + if err != nil { + t.Fatalf("Error submitting block: %s", err) + } + + var header *wire.BlockHeader + select { + case header = <-app2OnBlockAddedChan: + case <-time.After(defaultTimeout): + t.Fatalf("Timeout waiting for block added notification on node directly connected to miner") + } + + if !header.BlockHash().IsEqual(block.Hash()) { + t.Errorf("Expected block with hash '%s', but got '%s'", block.Hash(), header.BlockHash()) + } + + select { + case header = <-app3OnBlockAddedChan: + case <-time.After(defaultTimeout): + t.Fatalf("Timeout waiting for block added notification on node indirectly connected to miner") + } + + if !header.BlockHash().IsEqual(block.Hash()) { + t.Errorf("Expected block with hash '%s', but got '%s'", block.Hash(), header.BlockHash()) + } +} diff --git a/integration/log_test.go b/integration/log_test.go new file mode 100644 index 000000000..42f3f7717 --- /dev/null +++ b/integration/log_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2013-2017 The btcsuite developers +// Copyright (c) 2017 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package integration + +import ( + "github.com/kaspanet/kaspad/logger" + "github.com/kaspanet/kaspad/util/panics" +) + +var log, _ = logger.Get(logger.SubsystemTags.KASD) +var spawn = panics.GoroutineWrapperFunc(log) diff --git a/integration/mining_test.go b/integration/mining_test.go new file mode 100644 index 000000000..393d36ccc --- /dev/null +++ b/integration/mining_test.go @@ -0,0 +1,25 @@ +package integration + +import ( + "math/rand" + "testing" + + "github.com/kaspanet/kaspad/util" + "github.com/kaspanet/kaspad/util/daghash" + "github.com/kaspanet/kaspad/wire" +) + +func solveBlock(t *testing.T, block *util.Block) *wire.MsgBlock { + msgBlock := block.MsgBlock() + targetDifficulty := util.CompactToBig(msgBlock.Header.Bits) + initialNonce := rand.Uint64() + for i := initialNonce; i != initialNonce-1; i++ { + msgBlock.Header.Nonce = i + hash := msgBlock.BlockHash() + if daghash.HashToBig(hash).Cmp(targetDifficulty) <= 0 { + return msgBlock + } + } + + panic("Failed to solve block! This should never happen") +} diff --git a/integration/rpc_test.go b/integration/rpc_test.go new file mode 100644 index 000000000..557f2d93c --- /dev/null +++ b/integration/rpc_test.go @@ -0,0 +1,37 @@ +package integration + +import ( + "github.com/kaspanet/kaspad/util" + "github.com/kaspanet/kaspad/wire" + + rpcclient "github.com/kaspanet/kaspad/rpc/client" +) + +type rpcClient struct { + *rpcclient.Client + onBlockAdded func(*wire.BlockHeader) +} + +func newRPCClient(rpcAddress string) (*rpcClient, error) { + client := &rpcClient{} + notificationHandlers := &rpcclient.NotificationHandlers{ + OnFilteredBlockAdded: func(height uint64, header *wire.BlockHeader, txs []*util.Tx) { + if client.onBlockAdded != nil { + client.onBlockAdded(header) + } + }, + } + + connConfig := &rpcclient.ConnConfig{ + Host: rpcAddress, + Endpoint: "ws", + User: rpcUser, + Pass: rpcPass, + DisableTLS: true, + RequestTimeout: defaultTimeout, + } + + var err error + client.Client, err = rpcclient.New(connConfig, notificationHandlers) + return client, err +} diff --git a/integration/setup_test.go b/integration/setup_test.go new file mode 100644 index 000000000..652a0d933 --- /dev/null +++ b/integration/setup_test.go @@ -0,0 +1,94 @@ +package integration + +import ( + "path/filepath" + "testing" + + "github.com/kaspanet/kaspad/app" + "github.com/kaspanet/kaspad/config" + "github.com/kaspanet/kaspad/dbaccess" +) + +type appHarness struct { + app *app.App + rpcClient *rpcClient + p2pAddress string + rpcAddress string + config *config.Config + databaseContext *dbaccess.DatabaseContext +} + +func setup(t *testing.T) (appHarness1, appHarness2, appHarness3 *appHarness, teardownFunc func()) { + appHarness1 = &appHarness{p2pAddress: p2pAddress1, rpcAddress: rpcAddress1} + appHarness2 = &appHarness{p2pAddress: p2pAddress2, rpcAddress: rpcAddress2} + appHarness3 = &appHarness{p2pAddress: p2pAddress3, rpcAddress: rpcAddress3} + + setConfig(t, appHarness1) + setConfig(t, appHarness2) + setConfig(t, appHarness3) + + setDatabaseContext(t, appHarness1) + setDatabaseContext(t, appHarness2) + setDatabaseContext(t, appHarness3) + + setApp(t, appHarness1) + setApp(t, appHarness2) + setApp(t, appHarness3) + + appHarness1.app.Start() + appHarness2.app.Start() + appHarness3.app.Start() + + setRPCClient(t, appHarness1) + setRPCClient(t, appHarness2) + setRPCClient(t, appHarness3) + + return appHarness1, appHarness2, appHarness3, + func() { + teardown(t, appHarness1) + teardown(t, appHarness2) + teardown(t, appHarness3) + } +} + +func setRPCClient(t *testing.T, harness *appHarness) { + var err error + harness.rpcClient, err = newRPCClient(harness.rpcAddress) + if err != nil { + t.Fatalf("Error getting RPC client %+v", err) + } +} +func teardown(t *testing.T, harness *appHarness) { + err := harness.app.Stop() + if err != nil { + t.Errorf("Error stopping App: %+v", err) + } + + harness.app.WaitForShutdown() + + err = harness.databaseContext.Close() + if err != nil { + t.Errorf("Error closing database context: %+v", err) + } +} + +func setApp(t *testing.T, harness *appHarness) { + var err error + harness.app, err = app.New(harness.config, harness.databaseContext, make(chan struct{})) + if err != nil { + t.Fatalf("Error creating app: %+v", err) + } +} + +func setDatabaseContext(t *testing.T, harness *appHarness) { + var err error + harness.databaseContext, err = openDB(harness.config) + if err != nil { + t.Fatalf("Error openning database: %+v", err) + } +} + +func openDB(cfg *config.Config) (*dbaccess.DatabaseContext, error) { + dbPath := filepath.Join(cfg.DataDir, "db") + return dbaccess.New(dbPath) +} diff --git a/main.go b/main.go index 22b7a7f29..808823123 100644 --- a/main.go +++ b/main.go @@ -13,6 +13,8 @@ import ( "runtime/pprof" "time" + "github.com/kaspanet/kaspad/app" + "github.com/kaspanet/kaspad/dbaccess" "github.com/kaspanet/kaspad/blockdag/indexers" @@ -122,19 +124,22 @@ func kaspadMain(startedChan chan<- struct{}) error { return nil } - // Create kaspad and start it. - kaspad, err := newKaspad(cfg, databaseContext, interrupt) + // Create app and start it. + app, err := app.New(cfg, databaseContext, interrupt) if err != nil { log.Errorf("Unable to start kaspad: %+v", err) return err } defer func() { log.Infof("Gracefully shutting down kaspad...") - kaspad.stop() + err := app.Stop() + if err != nil { + log.Errorf("Error stopping kaspad: %+v", err) + } shutdownDone := make(chan struct{}) go func() { - kaspad.WaitForShutdown() + app.WaitForShutdown() shutdownDone <- struct{}{} }() @@ -147,7 +152,7 @@ func kaspadMain(startedChan chan<- struct{}) error { } log.Infof("Kaspad shutdown complete") }() - kaspad.start() + app.Start() if startedChan != nil { startedChan <- struct{}{} } diff --git a/netadapter/router/route.go b/netadapter/router/route.go index 1a293b264..9a722c6b0 100644 --- a/netadapter/router/route.go +++ b/netadapter/router/route.go @@ -4,6 +4,8 @@ import ( "sync" "time" + "github.com/kaspanet/kaspad/protocol/protocolerrors" + "github.com/kaspanet/kaspad/wire" "github.com/pkg/errors" ) @@ -14,10 +16,11 @@ const ( var ( // ErrTimeout signifies that one of the router functions had a timeout. - ErrTimeout = errors.New("timeout expired") + ErrTimeout = protocolerrors.New(false, "timeout expired") // ErrRouteClosed indicates that a route was closed while reading/writing. - ErrRouteClosed = errors.New("route is closed") + // TODO(libp2p): Remove protocol error here + ErrRouteClosed = protocolerrors.New(false, "route is closed") ) // onCapacityReachedHandler is a function that is to be diff --git a/protocol/flowcontext/flow_context.go b/protocol/flowcontext/flow_context.go index 296c8ed55..9550cc9e4 100644 --- a/protocol/flowcontext/flow_context.go +++ b/protocol/flowcontext/flow_context.go @@ -1,6 +1,9 @@ package flowcontext import ( + "sync" + "time" + "github.com/kaspanet/kaspad/addressmanager" "github.com/kaspanet/kaspad/blockdag" "github.com/kaspanet/kaspad/config" @@ -13,8 +16,6 @@ import ( peerpkg "github.com/kaspanet/kaspad/protocol/peer" "github.com/kaspanet/kaspad/util" "github.com/kaspanet/kaspad/util/daghash" - "sync" - "time" ) // FlowContext holds state that is relevant to more than one flow or one peer, and allows communication between diff --git a/protocol/flowcontext/network.go b/protocol/flowcontext/network.go index db5db5330..ce18b2da6 100644 --- a/protocol/flowcontext/network.go +++ b/protocol/flowcontext/network.go @@ -29,6 +29,7 @@ func (f *FlowContext) AddToPeers(peer *peerpkg.Peer) error { } f.peers[peer.ID()] = peer + return nil } diff --git a/rpc/client/net.go b/rpc/client/net.go index d02416fa3..ae6bf21ac 100644 --- a/rpc/client/net.go +++ b/rpc/client/net.go @@ -8,6 +8,7 @@ import ( "bytes" "encoding/hex" "encoding/json" + "github.com/kaspanet/kaspad/util/daghash" "github.com/kaspanet/kaspad/util/pointers" "github.com/kaspanet/kaspad/wire" @@ -26,23 +27,23 @@ func (r FutureAddNodeResult) Receive() error { return err } -// AddManualNodeAsync returns an instance of a type that can be used to get the result +// ConnectNodeAsync returns an instance of a type that can be used to get the result // of the RPC at some future time by invoking the Receive function on the // returned instance. // -// See AddNode for the blocking version and more details. -func (c *Client) AddManualNodeAsync(host string) FutureAddNodeResult { +// See Connect for the blocking version and more details. +func (c *Client) ConnectNodeAsync(host string) FutureAddNodeResult { cmd := model.NewConnectCmd(host, pointers.Bool(false)) return c.sendCmd(cmd) } -// AddManualNode attempts to perform the passed command on the passed persistent peer. +// ConnectNode attempts to perform the passed command on the passed persistent peer. // For example, it can be used to add or a remove a persistent peer, or to do // a one time connection to a peer. // // It may not be used to remove non-persistent peers. -func (c *Client) AddManualNode(host string) error { - return c.AddManualNodeAsync(host).Receive() +func (c *Client) ConnectNode(host string) error { + return c.ConnectNodeAsync(host).Receive() } // FutureGetConnectionCountResult is a future promise to deliver the result diff --git a/rpc/rpcserver.go b/rpc/rpcserver.go index 1ad1df259..a4a6efa30 100644 --- a/rpc/rpcserver.go +++ b/rpc/rpcserver.go @@ -12,10 +12,6 @@ import ( "encoding/base64" "encoding/json" "fmt" - "github.com/kaspanet/kaspad/addressmanager" - "github.com/kaspanet/kaspad/connmanager" - "github.com/kaspanet/kaspad/protocol" - "github.com/kaspanet/kaspad/util/mstime" "io" "io/ioutil" "math/rand" @@ -26,6 +22,11 @@ import ( "sync/atomic" "time" + "github.com/kaspanet/kaspad/addressmanager" + "github.com/kaspanet/kaspad/connmanager" + "github.com/kaspanet/kaspad/protocol" + "github.com/kaspanet/kaspad/util/mstime" + "github.com/pkg/errors" "github.com/btcsuite/websocket" @@ -707,7 +708,8 @@ func NewRPCServer( return nil, errors.New("RPCS: No valid listen address") } rpc := Server{ - cfg: cfg, + cfg: cfg, + listeners: rpcListeners, startupTime: mstime.Now(), statusLines: make(map[int]string), diff --git a/service_windows.go b/service_windows.go index bf22ff790..d4fbf5fff 100644 --- a/service_windows.go +++ b/service_windows.go @@ -184,7 +184,7 @@ func removeService() error { return service.Delete() } -// startService attempts to start the kaspad service. +// startService attempts to Start the kaspad service. func startService() error { // Connect to the windows service manager. serviceManager, err := mgr.Connect()