[NOD-492] Split API-Server to syncer and frontend applications (#519)

* [NOD-492] Split ApiServer to server and syncd.

* [NOD-492] Add missing file.

* [NOD-492] Remove references to --migrate from common config.

* [NOD-492] Move MQTT to the sync daemon.

* [NOD-492] Fix server Dockerfile and create one for syncd.

* [NOD-492] Rename ApiServer to Kasparov.

* [NOD-492] Fix packages.

* [NOD-492] Fix more packages.

* [NOD-492] Fix comments and package names.

* [NOD-492] Move blank import packages to main.

* [NOD-492] Move common logging logic out of individual config.go files.

* [NOD-492] Move database models to a package called dbmodels.

* [NOD-492] Rename models package to apimodels.
This commit is contained in:
stasatdaglabs 2019-12-08 14:38:47 +02:00 committed by Ori Newman
parent 11e936d109
commit 683dd52fcf
52 changed files with 327 additions and 173 deletions

View File

@ -1,106 +0,0 @@
package config
import (
"github.com/daglabs/btcd/apiserver/logger"
"github.com/daglabs/btcd/config"
"github.com/daglabs/btcd/util"
"github.com/jessevdk/go-flags"
"github.com/pkg/errors"
"path/filepath"
)
const (
defaultLogFilename = "apiserver.log"
defaultErrLogFilename = "apiserver_err.log"
)
var (
// Default configuration options
defaultLogDir = util.AppDataDir("apiserver", false)
defaultDBAddress = "localhost:3306"
defaultHTTPListen = "0.0.0.0:8080"
activeConfig *Config
)
// ActiveConfig returns the active configuration struct
func ActiveConfig() *Config {
return activeConfig
}
// Config defines the configuration options for the API server.
type Config struct {
LogDir string `long:"logdir" description:"Directory to log output."`
DebugLevel string `short:"d" long:"debuglevel" description:"Set log level {trace, debug, info, warn, error, critical}"`
RPCUser string `short:"u" long:"rpcuser" description:"RPC username"`
RPCPassword string `short:"P" long:"rpcpass" default-mask:"-" description:"RPC password"`
RPCServer string `short:"s" long:"rpcserver" description:"RPC server to connect to"`
RPCCert string `short:"c" long:"rpccert" description:"RPC server certificate chain for validation"`
DisableTLS bool `long:"notls" description:"Disable TLS"`
DBAddress string `long:"dbaddress" description:"Database address"`
DBUser string `long:"dbuser" description:"Database user" required:"true"`
DBPassword string `long:"dbpass" description:"Database password" required:"true"`
DBName string `long:"dbname" description:"Database name" required:"true"`
HTTPListen string `long:"listen" description:"HTTP address to listen on (default: 0.0.0.0:8080)"`
Migrate bool `long:"migrate" description:"Migrate the database to the latest version. The server will not start when using this flag."`
MQTTBrokerAddress string `long:"mqttaddress" description:"MQTT broker address" required:"false"`
MQTTUser string `long:"mqttuser" description:"MQTT server user" required:"false"`
MQTTPassword string `long:"mqttpass" description:"MQTT server password" required:"false"`
config.NetworkFlags
}
// Parse parses the CLI arguments and returns a config struct.
func Parse() error {
activeConfig = &Config{
LogDir: defaultLogDir,
DBAddress: defaultDBAddress,
HTTPListen: defaultHTTPListen,
}
parser := flags.NewParser(activeConfig, flags.PrintErrors|flags.HelpFlag)
_, err := parser.Parse()
if err != nil {
return err
}
if !activeConfig.Migrate {
if activeConfig.RPCUser == "" {
return errors.New("--rpcuser is required if --migrate flag is not used")
}
if activeConfig.RPCPassword == "" {
return errors.New("--rpcpass is required if --migrate flag is not used")
}
if activeConfig.RPCServer == "" {
return errors.New("--rpcserver is required if --migrate flag is not used")
}
}
if activeConfig.RPCCert == "" && !activeConfig.DisableTLS {
return errors.New("--notls has to be disabled if --cert is used")
}
if activeConfig.RPCCert != "" && activeConfig.DisableTLS {
return errors.New("--cert should be omitted if --notls is used")
}
if (activeConfig.MQTTBrokerAddress != "" || activeConfig.MQTTUser != "" || activeConfig.MQTTPassword != "") &&
(activeConfig.MQTTBrokerAddress == "" || activeConfig.MQTTUser == "" || activeConfig.MQTTPassword == "") {
return errors.New("--mqttaddress, --mqttuser, and --mqttpass must be passed all together")
}
err = activeConfig.ResolveNetwork(parser)
if err != nil {
return err
}
logFile := filepath.Join(activeConfig.LogDir, defaultLogFilename)
errLogFile := filepath.Join(activeConfig.LogDir, defaultErrLogFilename)
logger.InitLog(logFile, errLogFile)
if activeConfig.DebugLevel != "" {
err := logger.SetLogLevels(activeConfig.DebugLevel)
if err != nil {
return err
}
}
return nil
}

View File

@ -1,8 +1,8 @@
package config
import (
"github.com/daglabs/btcd/apiserver/logger"
"github.com/daglabs/btcd/dagconfig"
"github.com/daglabs/btcd/kasparov/logger"
"github.com/daglabs/btcd/util"
"github.com/jessevdk/go-flags"
"github.com/pkg/errors"

View File

@ -1,7 +1,7 @@
package database
import "github.com/daglabs/btcd/util/panics"
import "github.com/daglabs/btcd/apiserver/logger"
import "github.com/daglabs/btcd/kasparov/logger"
var (
log = logger.BackendLog.Logger("DTBS")

View File

@ -5,10 +5,10 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"github.com/daglabs/btcd/apiserver/apimodels"
"github.com/daglabs/btcd/blockdag"
"github.com/daglabs/btcd/faucet/config"
"github.com/daglabs/btcd/httpserverutils"
"github.com/daglabs/btcd/kasparov/server/apimodels"
"github.com/daglabs/btcd/txscript"
"github.com/daglabs/btcd/util"
"github.com/daglabs/btcd/util/daghash"

View File

@ -1,7 +1,7 @@
package httpserverutils
import "github.com/daglabs/btcd/util/panics"
import "github.com/daglabs/btcd/apiserver/logger"
import "github.com/daglabs/btcd/kasparov/logger"
var (
log = logger.BackendLog.Logger("UTIL")

69
kasparov/config/config.go Normal file
View File

@ -0,0 +1,69 @@
package config
import (
"github.com/daglabs/btcd/config"
"github.com/daglabs/btcd/kasparov/logger"
"github.com/jessevdk/go-flags"
"github.com/pkg/errors"
"path/filepath"
)
var (
// Default configuration options
defaultDBAddress = "localhost:3306"
)
// KasparovFlags holds configuration common to both the Kasparov server and the Kasparov daemon.
type KasparovFlags struct {
LogDir string `long:"logdir" description:"Directory to log output."`
DebugLevel string `short:"d" long:"debuglevel" description:"Set log level {trace, debug, info, warn, error, critical}"`
DBAddress string `long:"dbaddress" description:"Database address"`
DBUser string `long:"dbuser" description:"Database user" required:"true"`
DBPassword string `long:"dbpass" description:"Database password" required:"true"`
DBName string `long:"dbname" description:"Database name" required:"true"`
RPCUser string `short:"u" long:"rpcuser" description:"RPC username"`
RPCPassword string `short:"P" long:"rpcpass" default-mask:"-" description:"RPC password"`
RPCServer string `short:"s" long:"rpcserver" description:"RPC server to connect to"`
RPCCert string `short:"c" long:"rpccert" description:"RPC server certificate chain for validation"`
DisableTLS bool `long:"notls" description:"Disable TLS"`
config.NetworkFlags
}
// ResolveKasparovFlags parses command line arguments and sets KasparovFlags accordingly.
func (kasparovFlags *KasparovFlags) ResolveKasparovFlags(parser *flags.Parser,
defaultLogDir, logFilename, errLogFilename string) error {
if kasparovFlags.LogDir == "" {
kasparovFlags.LogDir = defaultLogDir
}
logFile := filepath.Join(kasparovFlags.LogDir, logFilename)
errLogFile := filepath.Join(kasparovFlags.LogDir, errLogFilename)
logger.InitLog(logFile, errLogFile)
if kasparovFlags.DebugLevel != "" {
err := logger.SetLogLevels(kasparovFlags.DebugLevel)
if err != nil {
return err
}
}
if kasparovFlags.DBAddress == "" {
kasparovFlags.DBAddress = defaultDBAddress
}
if kasparovFlags.RPCUser == "" {
return errors.New("--rpcuser is required")
}
if kasparovFlags.RPCPassword == "" {
return errors.New("--rpcpass is required")
}
if kasparovFlags.RPCServer == "" {
return errors.New("--rpcserver is required")
}
if kasparovFlags.RPCCert == "" && !kasparovFlags.DisableTLS {
return errors.New("--notls has to be disabled if --cert is used")
}
if kasparovFlags.RPCCert != "" && kasparovFlags.DisableTLS {
return errors.New("--cert should be omitted if --notls is used")
}
return kasparovFlags.ResolveNetwork(parser)
}

View File

@ -3,17 +3,17 @@ package database
import (
nativeerrors "errors"
"fmt"
"github.com/daglabs/btcd/kasparov/config"
"github.com/pkg/errors"
"os"
"github.com/daglabs/btcd/apiserver/config"
"github.com/golang-migrate/migrate/v4/source"
"github.com/jinzhu/gorm"
"github.com/golang-migrate/migrate/v4"
)
// db is the API server database.
// db is the Kasparov database.
var db *gorm.DB
// DB returns a reference to the database connection
@ -33,8 +33,8 @@ func (l gormLogger) Print(v ...interface{}) {
// Connect connects to the database mentioned in
// config variable.
func Connect() error {
connectionString := buildConnectionString()
func Connect(cfg *config.KasparovFlags) error {
connectionString := buildConnectionString(cfg)
migrator, driver, err := openMigrator(connectionString)
if err != nil {
return err
@ -67,8 +67,7 @@ func Close() error {
return err
}
func buildConnectionString() string {
cfg := config.ActiveConfig()
func buildConnectionString(cfg *config.KasparovFlags) string {
return fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8&parseTime=True",
cfg.DBUser, cfg.DBPassword, cfg.DBAddress, cfg.DBName)
}
@ -99,7 +98,7 @@ func isCurrent(migrator *migrate.Migrate, driver source.Driver) (bool, uint, err
}
func openMigrator(connectionString string) (*migrate.Migrate, source.Driver, error) {
driver, err := source.Open("file://migrations")
driver, err := source.Open("file://../database/migrations")
if err != nil {
return nil, nil, err
}
@ -112,8 +111,8 @@ func openMigrator(connectionString string) (*migrate.Migrate, source.Driver, err
}
// Migrate database to the latest version.
func Migrate() error {
connectionString := buildConnectionString()
func Migrate(cfg *config.KasparovFlags) error {
connectionString := buildConnectionString(cfg)
migrator, driver, err := openMigrator(connectionString)
if err != nil {
return err

View File

@ -1,7 +1,7 @@
package database
import "github.com/daglabs/btcd/util/panics"
import "github.com/daglabs/btcd/apiserver/logger"
import "github.com/daglabs/btcd/kasparov/logger"
var (
log = logger.Logger("DTBS")

View File

@ -1,11 +1,11 @@
package jsonrpc
import (
"github.com/daglabs/btcd/kasparov/config"
"github.com/pkg/errors"
"io/ioutil"
"time"
"github.com/daglabs/btcd/apiserver/config"
"github.com/daglabs/btcd/util/daghash"
"github.com/daglabs/btcd/rpcclient"
@ -54,8 +54,7 @@ func Close() {
}
// Connect initiates a connection to the JSON-RPC API Server
func Connect() error {
cfg := config.ActiveConfig()
func Connect(cfg *config.KasparovFlags) error {
var cert []byte
if !cfg.DisableTLS {
var err error

View File

@ -1,7 +1,7 @@
package jsonrpc
import (
"github.com/daglabs/btcd/apiserver/logger"
"github.com/daglabs/btcd/kasparov/logger"
"github.com/daglabs/btcd/rpcclient"
"github.com/daglabs/btcd/util/panics"
)

View File

@ -34,7 +34,7 @@ func Logger(subsystemTag string) logs.Logger {
return logger
}
// SetLogLevels sets the logging level for all of the subsystems in the API server.
// SetLogLevels sets the logging level for all of the subsystems in Kasparov.
func SetLogLevels(level string) error {
lvl, ok := logs.LevelFromString(level)
if !ok {

View File

@ -1,6 +1,6 @@
package apimodels
// RawTransaction represents a raw transaction posted to the API server
// RawTransaction is a json representation of a raw transaction
type RawTransaction struct {
RawTransaction string `json:"rawTransaction"`
}

View File

@ -0,0 +1,49 @@
package config
import (
"github.com/daglabs/btcd/kasparov/config"
"github.com/daglabs/btcd/util"
"github.com/jessevdk/go-flags"
)
const (
logFilename = "apiserver.log"
errLogFilename = "apiserver_err.log"
)
var (
// Default configuration options
defaultLogDir = util.AppDataDir("apiserver", false)
defaultHTTPListen = "0.0.0.0:8080"
activeConfig *Config
)
// ActiveConfig returns the active configuration struct
func ActiveConfig() *Config {
return activeConfig
}
// Config defines the configuration options for the API server.
type Config struct {
HTTPListen string `long:"listen" description:"HTTP address to listen on (default: 0.0.0.0:8080)"`
config.KasparovFlags
}
// Parse parses the CLI arguments and returns a config struct.
func Parse() error {
activeConfig = &Config{
HTTPListen: defaultHTTPListen,
}
parser := flags.NewParser(activeConfig, flags.PrintErrors|flags.HelpFlag)
_, err := parser.Parse()
if err != nil {
return err
}
err = activeConfig.ResolveKasparovFlags(parser, defaultLogDir, logFilename, errLogFilename)
if err != nil {
return err
}
return nil
}

View File

@ -2,14 +2,14 @@ package controllers
import (
"encoding/hex"
"github.com/daglabs/btcd/kasparov/database"
"github.com/daglabs/btcd/kasparov/dbmodels"
"github.com/daglabs/btcd/kasparov/server/apimodels"
"net/http"
"github.com/daglabs/btcd/apiserver/apimodels"
"github.com/daglabs/btcd/apiserver/dbmodels"
"github.com/daglabs/btcd/httpserverutils"
"github.com/pkg/errors"
"github.com/daglabs/btcd/apiserver/database"
"github.com/daglabs/btcd/util/daghash"
)

View File

@ -2,9 +2,9 @@ package controllers
import (
"encoding/hex"
"github.com/daglabs/btcd/apiserver/apimodels"
"github.com/daglabs/btcd/apiserver/dbmodels"
"github.com/daglabs/btcd/btcjson"
"github.com/daglabs/btcd/kasparov/dbmodels"
"github.com/daglabs/btcd/kasparov/server/apimodels"
)
func convertTxDBModelToTxResponse(tx *dbmodels.Transaction) *apimodels.TransactionResponse {

View File

@ -1,7 +1,7 @@
package controllers
import (
"github.com/daglabs/btcd/apiserver/apimodels"
"github.com/daglabs/btcd/kasparov/server/apimodels"
)
// GetFeeEstimatesHandler returns the fee estimates for different priorities

View File

@ -5,19 +5,19 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"github.com/daglabs/btcd/kasparov/database"
"github.com/daglabs/btcd/kasparov/dbmodels"
"github.com/daglabs/btcd/kasparov/jsonrpc"
"github.com/daglabs/btcd/kasparov/server/apimodels"
"github.com/daglabs/btcd/util"
"net/http"
"github.com/daglabs/btcd/apiserver/apimodels"
"github.com/daglabs/btcd/apiserver/config"
"github.com/daglabs/btcd/apiserver/dbmodels"
"github.com/daglabs/btcd/blockdag"
"github.com/daglabs/btcd/httpserverutils"
"github.com/daglabs/btcd/kasparov/server/config"
"github.com/daglabs/btcd/util/subnetworkid"
"github.com/pkg/errors"
"github.com/daglabs/btcd/apiserver/database"
"github.com/daglabs/btcd/apiserver/jsonrpc"
"github.com/daglabs/btcd/btcjson"
"github.com/daglabs/btcd/util/daghash"
"github.com/daglabs/btcd/wire"

View File

@ -0,0 +1,28 @@
# -- multistage docker build: stage #1: build stage
FROM golang:1.13-alpine AS build
RUN mkdir -p /go/src/github.com/daglabs/btcd
WORKDIR /go/src/github.com/daglabs/btcd
RUN apk add --no-cache curl git
COPY go.mod .
COPY go.sum .
RUN go mod download
COPY . .
RUN cd kasparov/server && CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o kasparov-server .
# --- multistage docker build: stage #2: runtime image
FROM alpine
WORKDIR /app
RUN apk add --no-cache tini
COPY --from=build /go/src/github.com/daglabs/btcd/kasparov/server/ /app/
ENTRYPOINT ["/sbin/tini", "--"]
CMD ["/app/kasparov-server"]

View File

@ -1,11 +1,11 @@
package main
import (
"github.com/daglabs/btcd/apiserver/logger"
"github.com/daglabs/btcd/kasparov/logger"
"github.com/daglabs/btcd/util/panics"
)
var (
log = logger.Logger("APIS")
log = logger.Logger("KVSV")
spawn = panics.GoroutineWrapperFunc(log)
)

54
kasparov/server/main.go Normal file
View File

@ -0,0 +1,54 @@
package main
import (
"fmt"
"github.com/pkg/errors"
"os"
"github.com/daglabs/btcd/kasparov/database"
"github.com/daglabs/btcd/kasparov/jsonrpc"
"github.com/daglabs/btcd/kasparov/server/config"
"github.com/daglabs/btcd/kasparov/server/server"
"github.com/daglabs/btcd/signal"
"github.com/daglabs/btcd/util/panics"
_ "github.com/golang-migrate/migrate/v4/database/mysql"
_ "github.com/golang-migrate/migrate/v4/source/file"
_ "github.com/jinzhu/gorm/dialects/mysql"
)
func main() {
defer panics.HandlePanic(log, nil, nil)
err := config.Parse()
if err != nil {
errString := fmt.Sprintf("Error parsing command-line arguments: %s", err)
_, fErr := fmt.Fprintf(os.Stderr, errString)
if fErr != nil {
panic(errString)
}
return
}
err = database.Connect(&config.ActiveConfig().KasparovFlags)
if err != nil {
panic(errors.Errorf("Error connecting to database: %s", err))
}
defer func() {
err := database.Close()
if err != nil {
panic(errors.Errorf("Error closing the database: %s", err))
}
}()
err = jsonrpc.Connect(&config.ActiveConfig().KasparovFlags)
if err != nil {
panic(errors.Errorf("Error connecting to servers: %s", err))
}
defer jsonrpc.Close()
shutdownServer := server.Start(config.ActiveConfig().HTTPListen)
defer shutdownServer()
interrupt := signal.InterruptListener()
<-interrupt
}

View File

@ -1,7 +1,7 @@
package server
import "github.com/daglabs/btcd/util/panics"
import "github.com/daglabs/btcd/apiserver/logger"
import "github.com/daglabs/btcd/kasparov/logger"
var (
log = logger.Logger("REST")

View File

@ -7,7 +7,7 @@ import (
"net/http"
"strconv"
"github.com/daglabs/btcd/apiserver/controllers"
"github.com/daglabs/btcd/kasparov/server/controllers"
"github.com/gorilla/mux"
)
@ -34,7 +34,7 @@ func mainHandler(_ *httpserverutils.ServerContext, _ *http.Request, _ map[string
return struct {
Message string `json:"message"`
}{
Message: "API server is running",
Message: "Kasparov server is running",
}, nil
}

View File

@ -0,0 +1,55 @@
package config
import (
"github.com/daglabs/btcd/kasparov/config"
"github.com/daglabs/btcd/util"
"github.com/jessevdk/go-flags"
"github.com/pkg/errors"
)
const (
logFilename = "syncd.log"
errLogFilename = "syncd_err.log"
)
var (
// Default configuration options
defaultLogDir = util.AppDataDir("syncd", false)
activeConfig *Config
)
// ActiveConfig returns the active configuration struct
func ActiveConfig() *Config {
return activeConfig
}
// Config defines the configuration options for the sync daemon.
type Config struct {
Migrate bool `long:"migrate" description:"Migrate the database to the latest version. The daemon will not start when using this flag."`
MQTTBrokerAddress string `long:"mqttaddress" description:"MQTT broker address" required:"false"`
MQTTUser string `long:"mqttuser" description:"MQTT server user" required:"false"`
MQTTPassword string `long:"mqttpass" description:"MQTT server password" required:"false"`
config.KasparovFlags
}
// Parse parses the CLI arguments and returns a config struct.
func Parse() error {
activeConfig = &Config{}
parser := flags.NewParser(activeConfig, flags.PrintErrors|flags.HelpFlag)
_, err := parser.Parse()
if err != nil {
return err
}
err = activeConfig.ResolveKasparovFlags(parser, defaultLogDir, logFilename, errLogFilename)
if err != nil {
return err
}
if (activeConfig.MQTTBrokerAddress != "" || activeConfig.MQTTUser != "" || activeConfig.MQTTPassword != "") &&
(activeConfig.MQTTBrokerAddress == "" || activeConfig.MQTTUser == "" || activeConfig.MQTTPassword == "") {
return errors.New("--mqttaddress, --mqttuser, and --mqttpass must be passed all together")
}
return nil
}

View File

@ -14,7 +14,7 @@ RUN go mod download
COPY . .
RUN cd apiserver && CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o apiserver .
RUN cd kasparov/syncd && CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o kasparov-syncd .
# --- multistage docker build: stage #2: runtime image
FROM alpine
@ -22,7 +22,7 @@ WORKDIR /app
RUN apk add --no-cache tini
COPY --from=build /go/src/github.com/daglabs/btcd/apiserver/ /app/
COPY --from=build /go/src/github.com/daglabs/btcd/kasparov/syncd/ /app/
ENTRYPOINT ["/sbin/tini", "--"]
CMD ["/app/apiserver"]
CMD ["/app/kasparov-syncd"]

11
kasparov/syncd/log.go Normal file
View File

@ -0,0 +1,11 @@
package main
import (
"github.com/daglabs/btcd/kasparov/logger"
"github.com/daglabs/btcd/util/panics"
)
var (
log = logger.Logger("KVSD")
spawn = panics.GoroutineWrapperFunc(log)
)

View File

@ -2,19 +2,17 @@ package main
import (
"fmt"
"github.com/daglabs/btcd/apiserver/mqtt"
"github.com/pkg/errors"
"os"
"github.com/daglabs/btcd/apiserver/config"
"github.com/daglabs/btcd/apiserver/database"
"github.com/daglabs/btcd/apiserver/jsonrpc"
"github.com/daglabs/btcd/apiserver/server"
"github.com/daglabs/btcd/kasparov/database"
"github.com/daglabs/btcd/kasparov/jsonrpc"
"github.com/daglabs/btcd/kasparov/syncd/config"
"github.com/daglabs/btcd/kasparov/syncd/mqtt"
"github.com/daglabs/btcd/signal"
"github.com/daglabs/btcd/util/panics"
_ "github.com/golang-migrate/migrate/v4/database/mysql"
_ "github.com/golang-migrate/migrate/v4/source/file"
_ "github.com/jinzhu/gorm/dialects/mysql"
"github.com/pkg/errors"
"os"
)
func main() {
@ -31,14 +29,14 @@ func main() {
}
if config.ActiveConfig().Migrate {
err := database.Migrate()
err := database.Migrate(&config.ActiveConfig().KasparovFlags)
if err != nil {
panic(errors.Errorf("Error migrating database: %s", err))
}
return
}
err = database.Connect()
err = database.Connect(&config.ActiveConfig().KasparovFlags)
if err != nil {
panic(errors.Errorf("Error connecting to database: %s", err))
}
@ -55,15 +53,12 @@ func main() {
}
defer mqtt.Close()
err = jsonrpc.Connect()
err = jsonrpc.Connect(&config.ActiveConfig().KasparovFlags)
if err != nil {
panic(errors.Errorf("Error connecting to servers: %s", err))
}
defer jsonrpc.Close()
shutdownServer := server.Start(config.ActiveConfig().HTTPListen)
defer shutdownServer()
doneChan := make(chan struct{}, 1)
spawn(func() {
err := startSync(doneChan)

View File

@ -1,7 +1,7 @@
package mqtt
import "github.com/daglabs/btcd/util/panics"
import "github.com/daglabs/btcd/apiserver/logger"
import "github.com/daglabs/btcd/kasparov/logger"
var (
log = logger.Logger("MQTT")

View File

@ -2,7 +2,7 @@ package mqtt
import (
"encoding/json"
"github.com/daglabs/btcd/apiserver/config"
"github.com/daglabs/btcd/kasparov/syncd/config"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/pkg/errors"
)

View File

@ -1,7 +1,7 @@
package mqtt
import (
"github.com/daglabs/btcd/apiserver/controllers"
"github.com/daglabs/btcd/kasparov/server/controllers"
)
const selectedTipTopic = "dag/selected-tip"

View File

@ -1,9 +1,9 @@
package mqtt
import (
"github.com/daglabs/btcd/apiserver/apimodels"
"github.com/daglabs/btcd/apiserver/controllers"
"github.com/daglabs/btcd/btcjson"
"github.com/daglabs/btcd/kasparov/server/apimodels"
"github.com/daglabs/btcd/kasparov/server/controllers"
"github.com/daglabs/btcd/rpcclient"
"github.com/daglabs/btcd/util/daghash"
"path"

View File

@ -3,15 +3,15 @@ package main
import (
"bytes"
"encoding/hex"
"github.com/daglabs/btcd/apiserver/mqtt"
"github.com/daglabs/btcd/kasparov/database"
"github.com/daglabs/btcd/kasparov/dbmodels"
"github.com/daglabs/btcd/kasparov/jsonrpc"
"github.com/daglabs/btcd/kasparov/syncd/config"
"github.com/daglabs/btcd/kasparov/syncd/mqtt"
"strconv"
"strings"
"time"
"github.com/daglabs/btcd/apiserver/config"
"github.com/daglabs/btcd/apiserver/database"
"github.com/daglabs/btcd/apiserver/dbmodels"
"github.com/daglabs/btcd/apiserver/jsonrpc"
"github.com/daglabs/btcd/blockdag"
"github.com/daglabs/btcd/btcjson"
"github.com/daglabs/btcd/httpserverutils"
@ -27,8 +27,8 @@ import (
// pendingChainChangedMsgs holds chainChangedMsgs in order of arrival
var pendingChainChangedMsgs []*jsonrpc.ChainChangedMsg
// startSync keeps the node and the API server in sync. On start, it downloads
// all data that's missing from the API server, and once it's done it keeps
// startSync keeps the node and the database in sync. On start, it downloads
// all data that's missing from the dabase, and once it's done it keeps
// sync with the node via notifications.
func startSync(doneChan chan struct{}) error {
client, err := jsonrpc.GetClient()
@ -42,7 +42,7 @@ func startSync(doneChan chan struct{}) error {
return err
}
// Keep the node and the API server in sync
// Keep the node and the database in sync
return sync(client, doneChan)
}
@ -63,7 +63,7 @@ func fetchInitialData(client *jsonrpc.Client) error {
return nil
}
// sync keeps the API server in sync with the node via notifications
// sync keeps the database in sync with the node via notifications
func sync(client *jsonrpc.Client, doneChan chan struct{}) error {
// Handle client notifications until we're told to stop
for {
@ -1068,6 +1068,7 @@ func handleChainChangedMsg(chainChanged *jsonrpc.ChainChangedMsg) error {
}
log.Infof("Chain changed: removed %d blocks and added %d block",
len(removedHashes), len(addedBlocks))
err = mqtt.PublishAcceptedTransactionsNotifications(chainChanged.AddedChainBlocks)
if err != nil {
return errors.Wrap(err, "Error while publishing accepted transactions notifications")