[NOD-1118] Implement gRPC basic connectivity (#790)

* [NOD-1118] Added protobufs for the MessageStream

* [NOD-1118] Implement some of the basic grpc methods

* [NOD-1118] Implemented gRPCConnection send and receive

* [NOD-1118] Implemented basic connection loops

* [NOD-1118] gRPC server implementation ready

* [NOD-1118] Add connection management

* [NOD-1118] Sort out the connection loops

* [NOD-1118] Add temporary testConnection

* [NOD-1118] Send to c.errChan whether error was recieved or not

* [NOD-1118] Call OnConnectHandler in time

* [NOD-1118] Handle closing connections properly

* [NOD-1118] Add comments to exported functions

* [NOD-1118] Call server.addConnection on newConnection

* [NOD-1118] Add a TODO comment

* [NOD-1118] Add a TODO comment

* [NOD-1118] Make connection a Stringer

* [NOD-1118] Made the connection loops 100% synchronic

* [NOD-1118] Make connection.isConnected uint32

* [NOD-1118] Move the Add/Remove connection from grpcConnection to register/unregister connection

* [NOD-1118] Convert error messages to lower case

* [NOD-1118] Remove protoc inline dependency

* [NOD-1118] Fix comment

* [NOD-1118] Exit if there was an error starting the protocol manager

* [NOD-1118] Fix error message

* [NOD-1118] Fixed a few comments

* [NOD-1118] Extract listenOn to a method

* [NOD-1118] Use !=0 for isConnected

* [NOD-1118] Refactor listenOn

* [NOD-1118] Add lock on channelWrites in gRPCConnection

* [NOD-1118] Rename channelWriteLock -> writeDuringDisconnectLock

* [NOD-1118] Reshuffle a comment

* [NOD-1118] Add a TODO comment
This commit is contained in:
Svarog 2020-07-12 15:22:49 +03:00 committed by GitHub
parent 6d591dde74
commit 4a4dca1926
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 859 additions and 97 deletions

4
go.mod
View File

@ -7,7 +7,7 @@ require (
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792
github.com/btcsuite/winsvc v1.0.0
github.com/davecgh/go-spew v1.1.1
github.com/golang/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.4.1
github.com/golang/snappy v0.0.1 // indirect
github.com/jessevdk/go-flags v1.4.0
github.com/jrick/logrotate v1.0.0
@ -18,6 +18,8 @@ require (
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59
golang.org/x/sys v0.0.0-20190426135247-a129542de9ae // indirect
golang.org/x/text v0.3.2 // indirect
google.golang.org/grpc v1.30.0
google.golang.org/protobuf v1.25.0
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
)

64
go.sum
View File

@ -1,20 +1,44 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd h1:R/opQEbFEy9JGkIguV40SvRY1uliPX8ifOvi6ICsFCw=
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3SkEwmHoWBmX1DNXhXZqlTpq6s4tyJGc=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0 h1:J9B4L7e3oqhXOcm+2IuNApwzQec85lE+QaikUcCs+dk=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.1 h1:ZFgWrT+bLgsYPirOnRfKLYJLvssAegOj/hgyMFdJZe0=
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA=
@ -36,6 +60,7 @@ github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/syndtr/goleveldb v1.0.1-0.20190923125748-758128399b1d h1:gZZadD8H+fF+n9CmNhYL1Y0dJB+kLOmKd7FbPJLeGHs=
@ -45,13 +70,24 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -61,8 +97,34 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20200228224639-71482053b885/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.30.0 h1:M5a8xTlYTxwMn5ZFkwhRabsygDY5G8TYLyQDBxJNAxE=
google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@ -74,3 +136,5 @@ gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@ -1,8 +1,11 @@
package main
import (
"fmt"
"sync/atomic"
"github.com/kaspanet/kaspad/util/panics"
"github.com/kaspanet/kaspad/blockdag"
"github.com/kaspanet/kaspad/blockdag/indexers"
"github.com/kaspanet/kaspad/config"
@ -34,6 +37,11 @@ func (s *kaspad) start() {
cfg := config.ActiveConfig()
err := s.protocolManager.Start()
if err != nil {
panics.Exit(log, fmt.Sprintf("Error starting the p2p protocol: %+v", err))
}
if !cfg.DisableRPC {
s.rpcServer.Start()
}

View File

@ -48,6 +48,7 @@ var (
profLog = BackendLog.Logger("PROF")
protLog = BackendLog.Logger("PROT")
muxxLog = BackendLog.Logger("MUXX")
grpcLog = BackendLog.Logger("GRPC")
p2psLog = BackendLog.Logger("P2PS")
ntarLog = BackendLog.Logger("NTAR")
)
@ -74,6 +75,7 @@ var SubsystemTags = struct {
PROF,
PROT,
MUXX,
GRPC,
P2PS,
NTAR string
}{
@ -97,6 +99,7 @@ var SubsystemTags = struct {
PROF: "PROF",
PROT: "PROT",
MUXX: "MUXX",
GRPC: "GRPC",
P2PS: "P2PS",
NTAR: "NTAR",
}
@ -123,6 +126,7 @@ var subsystemLoggers = map[string]*logs.Logger{
SubsystemTags.PROF: profLog,
SubsystemTags.PROT: protLog,
SubsystemTags.MUXX: muxxLog,
SubsystemTags.GRPC: grpcLog,
SubsystemTags.P2PS: p2psLog,
SubsystemTags.NTAR: ntarLog,
}

View File

@ -1,11 +1,13 @@
package netadapter
import (
"sync/atomic"
"github.com/kaspanet/kaspad/config"
"github.com/kaspanet/kaspad/netadapter/server"
"github.com/kaspanet/kaspad/netadapter/server/grpcserver"
"github.com/kaspanet/kaspad/wire"
"github.com/pkg/errors"
"sync/atomic"
)
// RouterInitializer is a function that initializes a new
@ -56,7 +58,21 @@ func NewNetAdapter(listeningAddrs []string) (*NetAdapter, error) {
// Start begins the operation of the NetAdapter
func (na *NetAdapter) Start() error {
return na.server.Start()
err := na.server.Start()
if err != nil {
return err
}
// TODO(libp2p): Replace with real connection manager
cfg := config.ActiveConfig()
for _, connectPeer := range cfg.ConnectPeers {
_, err := na.server.Connect(connectPeer)
if err != nil {
log.Errorf("Error connecting to %s: %+v", connectPeer, err)
}
}
return nil
}
// Stop safely closes the NetAdapter
@ -88,12 +104,16 @@ func (na *NetAdapter) newOnConnectedHandler() server.OnConnectedHandler {
}
func (na *NetAdapter) registerConnection(connection server.Connection, router *Router, id *ID) {
na.server.AddConnection(connection)
na.connectionIDs[connection] = id
na.idsToConnections[id] = connection
na.idsToRouters[id] = router
}
func (na *NetAdapter) unregisterConnection(connection server.Connection) {
na.server.RemoveConnection(connection)
id, ok := na.connectionIDs[connection]
if !ok {
return
@ -113,7 +133,8 @@ func (na *NetAdapter) startReceiveLoop(connection server.Connection, router *Rou
}
err = router.RouteInputMessage(message)
if err != nil {
log.Warnf("Failed to receive from %s: %s", connection, err)
// TODO(libp2p): This should never happen, do something more severe
log.Warnf("Failed to route input message from %s: %s", connection, err)
break
}
}

View File

@ -72,6 +72,11 @@ func (r *Router) TakeOutputMessage() wire.Message {
return <-r.outputRoute
}
// WriteOutgoingMessage pushes the given message to the output route
func (r *Router) WriteOutgoingMessage(message wire.Message) {
r.outputRoute <- message
}
// RegisterID registers the remote connection's ID
func (r *Router) RegisterID(id *ID) {
r.onIDReceivedHandler(id)

View File

@ -0,0 +1,78 @@
package grpcserver
import (
"io"
"github.com/kaspanet/kaspad/netadapter/server/grpcserver/protowire"
)
type grpcStream interface {
Send(*protowire.KaspadMessage) error
Recv() (*protowire.KaspadMessage, error)
}
func (c *gRPCConnection) connectionLoops(stream grpcStream) error {
errChan := make(chan error, 1) // buffered channel because one of the loops might try write after disconnect
spawn(func() { errChan <- c.receiveLoop(stream) })
spawn(func() { errChan <- c.sendLoop(stream) })
err := <-errChan
disconnectErr := c.Disconnect()
if disconnectErr != nil {
log.Errorf("Error from disconnect: %s", disconnectErr)
}
return err
}
func (c *gRPCConnection) sendLoop(stream grpcStream) error {
for c.IsConnected() {
message, ok := <-c.sendChan
if !ok {
return nil // this means the sendChan is closed, a.k.a. connection is disconnecting
}
err := stream.Send(message)
c.errChan <- err
if err != nil {
return err
}
}
return nil
}
func (c *gRPCConnection) receiveLoop(stream grpcStream) error {
for c.IsConnected() {
message, err := stream.Recv()
if err != nil {
if err == io.EOF {
err = nil
}
return err
}
func() {
c.writeDuringDisconnectLock.Lock()
defer c.writeDuringDisconnectLock.Unlock()
if c.IsConnected() {
c.receiveChan <- message
}
}()
}
return nil
}
func (c *gRPCConnection) serverConnectionLoop(stream protowire.P2P_MessageStreamServer) error {
return c.connectionLoops(stream)
}
func (c *gRPCConnection) clientConnectionLoop(stream protowire.P2P_MessageStreamClient) error {
err := c.connectionLoops(stream)
_ = stream.CloseSend() // ignore error because we don't really know what's the status of the connection
return err
}

View File

@ -0,0 +1,105 @@
package grpcserver
import (
"net"
"sync"
"sync/atomic"
"github.com/pkg/errors"
"github.com/kaspanet/kaspad/netadapter/server"
"github.com/kaspanet/kaspad/netadapter/server/grpcserver/protowire"
"github.com/kaspanet/kaspad/wire"
"google.golang.org/grpc"
)
type gRPCConnection struct {
server *gRPCServer
address net.Addr
writeDuringDisconnectLock sync.Mutex // writeDuringDisconnectLock makes sure channels aren't written to after close
sendChan chan *protowire.KaspadMessage
receiveChan chan *protowire.KaspadMessage
errChan chan error
clientConn grpc.ClientConn
onDisconnectedHandler server.OnDisconnectedHandler
isConnected uint32
}
func newConnection(server *gRPCServer, address net.Addr) *gRPCConnection {
connection := &gRPCConnection{
server: server,
address: address,
sendChan: make(chan *protowire.KaspadMessage),
receiveChan: make(chan *protowire.KaspadMessage),
errChan: make(chan error),
isConnected: 1,
}
return connection
}
func (c *gRPCConnection) String() string {
return c.Address().String()
}
func (c *gRPCConnection) IsConnected() bool {
return atomic.LoadUint32(&c.isConnected) != 0
}
func (c *gRPCConnection) SetOnDisconnectedHandler(onDisconnectedHandler server.OnDisconnectedHandler) {
c.onDisconnectedHandler = onDisconnectedHandler
}
// Send sends the given message through the connection
// This is part of the Connection interface
func (c *gRPCConnection) Send(message wire.Message) error {
messageProto, err := protowire.FromWireMessage(message)
if err != nil {
return err
}
c.writeDuringDisconnectLock.Lock()
defer c.writeDuringDisconnectLock.Unlock()
if c.IsConnected() {
c.sendChan <- messageProto
return <-c.errChan
}
return nil
}
// Receive receives the next message from the connection
// This is part of the Connection interface
func (c *gRPCConnection) Receive() (wire.Message, error) {
protoMessage := <-c.receiveChan
if protoMessage == nil {
return nil, errors.New("connection closed during receive")
}
return protoMessage.ToWireMessage()
}
// Disconnect disconnects the connection
// Calling this function a second time doesn't do anything
//
// This is part of the Connection interface
func (c *gRPCConnection) Disconnect() error {
if !c.IsConnected() {
return nil
}
atomic.StoreUint32(&c.isConnected, 0)
c.writeDuringDisconnectLock.Lock()
defer c.writeDuringDisconnectLock.Unlock()
close(c.receiveChan)
close(c.sendChan)
close(c.errChan)
return c.onDisconnectedHandler()
}
func (c *gRPCConnection) Address() net.Addr {
return c.address
}

View File

@ -0,0 +1,145 @@
package grpcserver
import (
"context"
"fmt"
"net"
"google.golang.org/grpc/peer"
"github.com/kaspanet/kaspad/netadapter/server"
"github.com/kaspanet/kaspad/netadapter/server/grpcserver/protowire"
"github.com/kaspanet/kaspad/util/panics"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
type gRPCServer struct {
onConnectedHandler server.OnConnectedHandler
connections map[string]*gRPCConnection
listeningAddrs []string
server *grpc.Server
}
// NewGRPCServer creates and starts a gRPC server, listening on the
// provided addresses/ports
func NewGRPCServer(listeningAddrs []string) (server.Server, error) {
s := &gRPCServer{
server: grpc.NewServer(),
listeningAddrs: listeningAddrs,
connections: map[string]*gRPCConnection{},
}
protowire.RegisterP2PServer(s.server, newP2PServer(s))
return s, nil
}
func (s *gRPCServer) Start() error {
for _, listenAddr := range s.listeningAddrs {
err := s.listenOn(listenAddr)
if err != nil {
return err
}
}
return nil
}
func (s *gRPCServer) listenOn(listenAddr string) error {
listener, err := net.Listen("tcp", listenAddr)
if err != nil {
return errors.Wrapf(err, "error listening on %s", listenAddr)
}
spawn(func() {
err := s.server.Serve(listener)
if err != nil {
panics.Exit(log, fmt.Sprintf("error serving on %s: %+v", listenAddr, err))
}
})
log.Infof("P2P server listening on %s", listenAddr)
return nil
}
func (s *gRPCServer) Stop() error {
for _, connection := range s.connections {
err := connection.Disconnect()
if err != nil {
log.Errorf("error closing connection to %s: %+v", connection, err)
}
}
s.server.GracefulStop()
return nil
}
// SetOnConnectedHandler sets the peer connected handler
// function for the server
func (s *gRPCServer) SetOnConnectedHandler(onConnectedHandler server.OnConnectedHandler) {
s.onConnectedHandler = onConnectedHandler
}
// Connect connects to the given address
// This is part of the Server interface
func (s *gRPCServer) Connect(address string) (server.Connection, error) {
log.Infof("Dialing to %s", address)
gRPCConnection, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return nil, errors.Wrapf(err, "error connecting to %s", address)
}
client := protowire.NewP2PClient(gRPCConnection)
stream, err := client.MessageStream(context.Background())
if err != nil {
return nil, errors.Wrapf(err, "error getting client stream for %s", address)
}
peerInfo, ok := peer.FromContext(stream.Context())
if !ok {
return nil, errors.Errorf("error getting stream peer info from context for %s", address)
}
connection := newConnection(s, peerInfo.Addr)
err = s.onConnectedHandler(connection)
if err != nil {
return nil, err
}
log.Infof("Connected to %s", address)
spawn(func() {
err := connection.clientConnectionLoop(stream)
if err != nil {
log.Errorf("error from clientConnectionLoop for %s: %+v", address, err)
}
})
return connection, nil
}
// Connections returns a slice of connections the server
// is currently connected to.
// This is part of the Server interface
func (s *gRPCServer) Connections() []server.Connection {
result := make([]server.Connection, 0, len(s.connections))
for _, conn := range s.connections {
result = append(result, conn)
}
return result
}
// AddConnection adds the provided connection to the connection list
func (s *gRPCServer) AddConnection(connection server.Connection) error {
conn := connection.(*gRPCConnection)
s.connections[conn.String()] = conn
return nil
}
// RemoveConnection removes the provided connection from the connection list
func (s *gRPCServer) RemoveConnection(connection server.Connection) error {
delete(s.connections, connection.String())
return nil
}

View File

@ -1,85 +0,0 @@
package grpcserver
import (
"github.com/kaspanet/kaspad/netadapter/server"
"github.com/kaspanet/kaspad/wire"
)
type gRPCServer struct {
onConnectedHandler server.OnConnectedHandler
connections []server.Connection
}
// NewGRPCServer creates and starts a gRPC server with the given
// listening port
func NewGRPCServer(listeningAddrs []string) (server.Server, error) {
// TODO(libp2p): unimplemented
panic("unimplemented")
}
func (s *gRPCServer) Start() error {
// TODO(libp2p): unimplemented
panic("unimplemented")
}
func (s *gRPCServer) Stop() error {
// TODO(libp2p): unimplemented
panic("unimplemented")
}
// SetOnConnectedHandler sets the on-connected handler
// function for the server
func (s *gRPCServer) SetOnConnectedHandler(onConnectedHandler server.OnConnectedHandler) {
s.onConnectedHandler = onConnectedHandler
}
// Connect connects to the given address
// This is part of the Server interface
func (s *gRPCServer) Connect(address string) (server.Connection, error) {
// TODO(libp2p): unimplemented
panic("unimplemented")
}
// Connections returns a slice of connections the server
// is currently connected to.
// This is part of the Server interface
func (s *gRPCServer) Connections() []server.Connection {
// TODO(libp2p): unimplemented
panic("unimplemented")
}
type gRPCConnection struct {
onDisconnectedHandler server.OnDisconnectedHandler
}
// Send sends the given message through the connection
// This is part of the Connection interface
func (c *gRPCConnection) Send(message wire.Message) error {
// TODO(libp2p): unimplemented
panic("unimplemented")
}
// Receive receives the next message from the connection
// This is part of the Connection interface
func (c *gRPCConnection) Receive() (wire.Message, error) {
// TODO(libp2p): unimplemented
panic("unimplemented")
}
// Disconnect disconnects the connection
// This is part of the Connection interface
func (c *gRPCConnection) Disconnect() error {
// TODO(libp2p): unimplemented
panic("unimplemented")
}
func (c *gRPCConnection) IsConnected() bool {
// TODO(libp2p): unimplemented
panic("unimplemented")
}
// SetOnDisconnectedHandler sets the on-disconnected handler
// function for this connection
func (c *gRPCConnection) SetOnDisconnectedHandler(onDisconnectedHandler server.OnDisconnectedHandler) {
c.onDisconnectedHandler = onDisconnectedHandler
}

View File

@ -0,0 +1,13 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package grpcserver
import (
"github.com/kaspanet/kaspad/logger"
"github.com/kaspanet/kaspad/util/panics"
)
var log, _ = logger.Get(logger.SubsystemTags.TXMP)
var spawn = panics.GoroutineWrapperFunc(log)

View File

@ -0,0 +1,44 @@
package grpcserver
import (
"github.com/kaspanet/kaspad/netadapter/server/grpcserver/protowire"
"github.com/kaspanet/kaspad/util/panics"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)
type p2pServer struct {
protowire.UnimplementedP2PServer
server *gRPCServer
}
func newP2PServer(s *gRPCServer) *p2pServer {
return &p2pServer{server: s}
}
func (p *p2pServer) MessageStream(stream protowire.P2P_MessageStreamServer) error {
defer panics.HandlePanic(log, nil)
peerInfo, ok := peer.FromContext(stream.Context())
if !ok {
return errors.Errorf("Error getting stream peer info from context")
}
connection := newConnection(p.server, peerInfo.Addr)
err := p.server.onConnectedHandler(connection)
if err != nil {
return err
}
log.Infof("Incoming connection from %s", peerInfo.Addr)
err = connection.serverConnectionLoop(stream)
if err != nil {
log.Errorf("Error in serverConnectionLoop: %+v", err)
return status.Error(codes.Internal, err.Error())
}
return nil
}

View File

@ -0,0 +1,3 @@
//go:generate protoc --go_out=. --go-grpc_out=. --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative messages.proto
package protowire

View File

@ -0,0 +1,166 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.25.0
// protoc v3.12.3
// source: messages.proto
package protowire
import (
proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// This is a compile-time assertion that a sufficiently up-to-date version
// of the legacy proto package is being used.
const _ = proto.ProtoPackageIsVersion4
type KaspadMessage struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Command string `protobuf:"bytes,1,opt,name=command,proto3" json:"command,omitempty"`
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
}
func (x *KaspadMessage) Reset() {
*x = KaspadMessage{}
if protoimpl.UnsafeEnabled {
mi := &file_messages_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *KaspadMessage) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*KaspadMessage) ProtoMessage() {}
func (x *KaspadMessage) ProtoReflect() protoreflect.Message {
mi := &file_messages_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use KaspadMessage.ProtoReflect.Descriptor instead.
func (*KaspadMessage) Descriptor() ([]byte, []int) {
return file_messages_proto_rawDescGZIP(), []int{0}
}
func (x *KaspadMessage) GetCommand() string {
if x != nil {
return x.Command
}
return ""
}
func (x *KaspadMessage) GetPayload() []byte {
if x != nil {
return x.Payload
}
return nil
}
var File_messages_proto protoreflect.FileDescriptor
var file_messages_proto_rawDesc = []byte{
0x0a, 0x0e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x12, 0x09, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x77, 0x69, 0x72, 0x65, 0x22, 0x43, 0x0a, 0x0d, 0x4b,
0x61, 0x73, 0x70, 0x61, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07,
0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63,
0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61,
0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
0x32, 0x50, 0x0a, 0x03, 0x50, 0x32, 0x50, 0x12, 0x49, 0x0a, 0x0d, 0x4d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x18, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x77, 0x69, 0x72, 0x65, 0x2e, 0x4b, 0x61, 0x73, 0x70, 0x61, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61,
0x67, 0x65, 0x1a, 0x18, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x77, 0x69, 0x72, 0x65, 0x2e, 0x4b,
0x61, 0x73, 0x70, 0x61, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x28, 0x01,
0x30, 0x01, 0x42, 0x26, 0x5a, 0x24, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d,
0x2f, 0x6b, 0x61, 0x73, 0x70, 0x61, 0x6e, 0x65, 0x74, 0x2f, 0x6b, 0x61, 0x73, 0x70, 0x61, 0x64,
0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x77, 0x69, 0x72, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
}
var (
file_messages_proto_rawDescOnce sync.Once
file_messages_proto_rawDescData = file_messages_proto_rawDesc
)
func file_messages_proto_rawDescGZIP() []byte {
file_messages_proto_rawDescOnce.Do(func() {
file_messages_proto_rawDescData = protoimpl.X.CompressGZIP(file_messages_proto_rawDescData)
})
return file_messages_proto_rawDescData
}
var file_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_messages_proto_goTypes = []interface{}{
(*KaspadMessage)(nil), // 0: protowire.KaspadMessage
}
var file_messages_proto_depIdxs = []int32{
0, // 0: protowire.P2P.MessageStream:input_type -> protowire.KaspadMessage
0, // 1: protowire.P2P.MessageStream:output_type -> protowire.KaspadMessage
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_messages_proto_init() }
func file_messages_proto_init() {
if File_messages_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_messages_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*KaspadMessage); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_messages_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_messages_proto_goTypes,
DependencyIndexes: file_messages_proto_depIdxs,
MessageInfos: file_messages_proto_msgTypes,
}.Build()
File_messages_proto = out.File
file_messages_proto_rawDesc = nil
file_messages_proto_goTypes = nil
file_messages_proto_depIdxs = nil
}

View File

@ -0,0 +1,13 @@
syntax = "proto3";
package protowire;
option go_package = "github.com/kaspanet/kaspad/protowire";
message KaspadMessage{
string command = 1;
bytes payload = 2;
}
service P2P {
rpc MessageStream (stream KaspadMessage) returns (stream KaspadMessage) {}
}

View File

@ -0,0 +1,122 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
package protowire
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6
// P2PClient is the client API for P2P service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type P2PClient interface {
MessageStream(ctx context.Context, opts ...grpc.CallOption) (P2P_MessageStreamClient, error)
}
type p2PClient struct {
cc grpc.ClientConnInterface
}
func NewP2PClient(cc grpc.ClientConnInterface) P2PClient {
return &p2PClient{cc}
}
func (c *p2PClient) MessageStream(ctx context.Context, opts ...grpc.CallOption) (P2P_MessageStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_P2P_serviceDesc.Streams[0], "/protowire.P2P/MessageStream", opts...)
if err != nil {
return nil, err
}
x := &p2PMessageStreamClient{stream}
return x, nil
}
type P2P_MessageStreamClient interface {
Send(*KaspadMessage) error
Recv() (*KaspadMessage, error)
grpc.ClientStream
}
type p2PMessageStreamClient struct {
grpc.ClientStream
}
func (x *p2PMessageStreamClient) Send(m *KaspadMessage) error {
return x.ClientStream.SendMsg(m)
}
func (x *p2PMessageStreamClient) Recv() (*KaspadMessage, error) {
m := new(KaspadMessage)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// P2PServer is the server API for P2P service.
// All implementations must embed UnimplementedP2PServer
// for forward compatibility
type P2PServer interface {
MessageStream(P2P_MessageStreamServer) error
mustEmbedUnimplementedP2PServer()
}
// UnimplementedP2PServer must be embedded to have forward compatible implementations.
type UnimplementedP2PServer struct {
}
func (*UnimplementedP2PServer) MessageStream(P2P_MessageStreamServer) error {
return status.Errorf(codes.Unimplemented, "method MessageStream not implemented")
}
func (*UnimplementedP2PServer) mustEmbedUnimplementedP2PServer() {}
func RegisterP2PServer(s *grpc.Server, srv P2PServer) {
s.RegisterService(&_P2P_serviceDesc, srv)
}
func _P2P_MessageStream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(P2PServer).MessageStream(&p2PMessageStreamServer{stream})
}
type P2P_MessageStreamServer interface {
Send(*KaspadMessage) error
Recv() (*KaspadMessage, error)
grpc.ServerStream
}
type p2PMessageStreamServer struct {
grpc.ServerStream
}
func (x *p2PMessageStreamServer) Send(m *KaspadMessage) error {
return x.ServerStream.SendMsg(m)
}
func (x *p2PMessageStreamServer) Recv() (*KaspadMessage, error) {
m := new(KaspadMessage)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _P2P_serviceDesc = grpc.ServiceDesc{
ServiceName: "protowire.P2P",
HandlerType: (*P2PServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "MessageStream",
Handler: _P2P_MessageStream_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "messages.proto",
}

View File

@ -0,0 +1,38 @@
package protowire
import (
"bytes"
"github.com/kaspanet/kaspad/wire"
)
// ToWireMessage converts a KaspadMessage to its wire.Message representation
func (x *KaspadMessage) ToWireMessage() (wire.Message, error) {
message, err := wire.MakeEmptyMessage(x.Command)
if err != nil {
return nil, err
}
payloadReader := bytes.NewReader(x.Payload)
err = message.KaspaDecode(payloadReader, wire.ProtocolVersion)
if err != nil {
return nil, err
}
return message, nil
}
// FromWireMessage creates a KaspadMessage from a wire.Message
func FromWireMessage(message wire.Message) (*KaspadMessage, error) {
payloadWriter := &bytes.Buffer{}
err := message.KaspaEncode(payloadWriter, wire.ProtocolVersion)
if err != nil {
return nil, err
}
return &KaspadMessage{
Command: message.Command(),
Payload: payloadWriter.Bytes(),
}, nil
}

View File

@ -1,6 +1,9 @@
package server
import (
"fmt"
"net"
"github.com/kaspanet/kaspad/wire"
)
@ -19,13 +22,18 @@ type Server interface {
Start() error
Stop() error
SetOnConnectedHandler(onConnectedHandler OnConnectedHandler)
// TODO(libp2p): Move AddConnection and RemoveConnection to connection manager
AddConnection(connection Connection) error
RemoveConnection(connection Connection) error
}
// Connection represents a p2p server connection.
type Connection interface {
fmt.Stringer
Send(message wire.Message) error
Receive() (wire.Message, error)
Disconnect() error
IsConnected() bool
SetOnDisconnectedHandler(onDisconnectedHandler OnDisconnectedHandler)
Address() net.Addr
}

View File

@ -40,7 +40,7 @@ func (p *Manager) Stop() error {
func newRouterInitializer(netAdapter *netadapter.NetAdapter, dag *blockdag.BlockDAG) netadapter.RouterInitializer {
return func() (*netadapter.Router, error) {
router := netadapter.NewRouter()
err := router.AddRoute([]string{wire.CmdTx}, startDummy(netAdapter, router, dag))
err := router.AddRoute([]string{wire.CmdPing, wire.CmdPong}, startPing(netAdapter, router, dag))
if err != nil {
return nil, err
}
@ -48,12 +48,18 @@ func newRouterInitializer(netAdapter *netadapter.NetAdapter, dag *blockdag.Block
}
}
func startDummy(netAdapter *netadapter.NetAdapter, router *netadapter.Router,
// TODO(libp2p): Remove this and change it with a real Ping-Pong flow.
func startPing(netAdapter *netadapter.NetAdapter, router *netadapter.Router,
dag *blockdag.BlockDAG) chan wire.Message {
ch := make(chan wire.Message)
spawn(func() {
for range ch {
router.WriteOutgoingMessage(wire.NewMsgPing(666))
for message := range ch {
log.Infof("Got message: %+v", message.Command())
if message.Command() == "ping" {
router.WriteOutgoingMessage(wire.NewMsgPong(666))
}
}
})
return ch

View File

@ -13,11 +13,12 @@ import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"github.com/pkg/errors"
"io"
"sync"
"time"
"github.com/pkg/errors"
"github.com/kaspanet/kaspad/util/random"
"github.com/kaspanet/kaspad/util/subnetworkid"

View File

@ -7,10 +7,11 @@ package wire
import (
"bytes"
"fmt"
"github.com/pkg/errors"
"io"
"unicode/utf8"
"github.com/pkg/errors"
"github.com/kaspanet/kaspad/util/daghash"
)
@ -64,9 +65,9 @@ type Message interface {
MaxPayloadLength(uint32) uint32
}
// makeEmptyMessage creates a message of the appropriate concrete type based
// MakeEmptyMessage creates a message of the appropriate concrete type based
// on the command.
func makeEmptyMessage(command string) (Message, error) {
func MakeEmptyMessage(command string) (Message, error) {
var msg Message
switch command {
case CmdVersion:
@ -309,7 +310,7 @@ func ReadMessageN(r io.Reader, pver uint32, kaspaNet KaspaNet) (int, Message, []
}
// Create struct of appropriate message type based on the command.
msg, err := makeEmptyMessage(command)
msg, err := MakeEmptyMessage(command)
if err != nil {
discardInput(r, hdr.length)
return totalBytes, nil, nil, messageError("ReadMessage",