Ori Newman 5dbb1da84b
Implement pruning point proof (#1832)
* Calculate GHOSTDAG, reachability etc for each level

* Don't preallocate cache for dag stores except level 0 and reduce the number of connections in the integration test to 32

* Reduce the number of connections in the integration test to 16

* Increase page file

* BuildPruningPointProof

* BuildPruningPointProof

* Add PruningProofManager

* Implement ApplyPruningPointProof

* Add prefix and fix blockAtDepth and fill headersByLevel

* Some bug fixes

* Include all relevant blocks for each level in the proof

* Fix syncAndValidatePruningPointProof to return the right block hash

* Fix block window

* Fix isAncestorOfPruningPoint

* Ban for rule errors on pruning proof

* Find common ancestor for blockAtDepthMAtNextLevel

* Use pruning proof in TestValidateAndInsertImportedPruningPoint

* stage status and finality point for proof blocks

* Uncomment golint

* Change test timeouts

* Calculate merge set for ApplyPruningPointProof

* Increase test timeout

* Add better caching for daa window store

* Return to default timeout

* Add ErrPruningProofMissesBlocksBelowPruningPoint

* Add errDAAWindowBlockNotFound

* Force connection loop next iteration on connection manager stop

* Revert to Test64IncomingConnections

* Remove BlockAtDepth from DAGTraversalManager

* numBullies->16

* Set page file size to 8gb

* Increase p2p max message size

* Test64IncomingConnections->Test16IncomingConnections

* Add comment for PruningProofM

* Add comment in `func (c *ConnectionManager) Stop()`

* Rename isAncestorOfPruningPoint->isAncestorOfSelectedTip

* Revert page file to 16gb

* Improve ExpectedHeaderPruningPoint perf

* Fix comment

* Revert "Improve ExpectedHeaderPruningPoint perf"

This reverts commit bca1080e7140c78d510f51bbea858ae280c2f38e.

* Don't test windows
2021-10-26 09:48:27 +03:00

84 lines
2.8 KiB
Go

package grpcserver
import (
"context"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server"
"github.com/kaspanet/kaspad/infrastructure/network/netadapter/server/grpcserver/protowire"
"github.com/kaspanet/kaspad/util/panics"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/peer"
"net"
"time"
)
type p2pServer struct {
protowire.UnimplementedP2PServer
gRPCServer
}
const p2pMaxMessageSize = 100 * 1024 * 1024 // 100MB
// p2pMaxInboundConnections is the max amount of inbound connections for the P2P server.
// Note that inbound connections are not limited by the gRPC server. (A value of 0 means
// unlimited inbound connections.) The P2P limiting logic is more applicative, and as such
// is handled in the ConnectionManager instead.
const p2pMaxInboundConnections = 0
// NewP2PServer creates a new P2PServer
func NewP2PServer(listeningAddresses []string) (server.P2PServer, error) {
gRPCServer := newGRPCServer(listeningAddresses, p2pMaxMessageSize, p2pMaxInboundConnections, "P2P")
p2pServer := &p2pServer{gRPCServer: *gRPCServer}
protowire.RegisterP2PServer(gRPCServer.server, p2pServer)
return p2pServer, nil
}
func (p *p2pServer) MessageStream(stream protowire.P2P_MessageStreamServer) error {
defer panics.HandlePanic(log, "p2pServer.MessageStream", nil)
return p.handleInboundConnection(stream.Context(), stream)
}
// Connect connects to the given address
// This is part of the P2PServer interface
func (p *p2pServer) Connect(address string) (server.Connection, error) {
log.Debugf("%s Dialing to %s", p.name, address)
const dialTimeout = 30 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
defer cancel()
gRPCClientConnection, err := grpc.DialContext(ctx, address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return nil, errors.Wrapf(err, "%s error connecting to %s", p.name, address)
}
client := protowire.NewP2PClient(gRPCClientConnection)
stream, err := client.MessageStream(context.Background(), grpc.UseCompressor(gzip.Name),
grpc.MaxCallRecvMsgSize(p2pMaxMessageSize), grpc.MaxCallSendMsgSize(p2pMaxMessageSize))
if err != nil {
return nil, errors.Wrapf(err, "%s error getting client stream for %s", p.name, address)
}
peerInfo, ok := peer.FromContext(stream.Context())
if !ok {
return nil, errors.Errorf("%s error getting stream peer info from context for %s", p.name, address)
}
tcpAddress, ok := peerInfo.Addr.(*net.TCPAddr)
if !ok {
return nil, errors.Errorf("non-tcp addresses are not supported")
}
connection := newConnection(&p.gRPCServer, tcpAddress, stream, gRPCClientConnection)
err = p.onConnectedHandler(connection)
if err != nil {
return nil, err
}
log.Infof("%s Connected to %s", p.name, address)
return connection, nil
}