[NOD-63] Merge BlockAccepted and BlockConnected notifications into BlockAdded + remove BlockDisconnected notifications (#221)

* [NOD-63] Merge BlockAccepted and BlockConnected notifications into BlockAdded + remove BlockDisconnected notifications

* [NOD-63] Many instances of chain->DAG and similar

* [NOD-63] Some more chian -> DAG
This commit is contained in:
Svarog 2019-03-20 13:48:32 +02:00 committed by stasatdaglabs
parent a2b69a84f4
commit e93e60aa74
13 changed files with 155 additions and 480 deletions

View File

@ -88,7 +88,7 @@ func (dag *BlockDAG) maybeAcceptBlock(block *util.Block, flags BehaviorFlags) er
// DAG. The caller would typically want to react by relaying the
// inventory to other peers.
dag.dagLock.Unlock()
dag.sendNotification(NTBlockAccepted, block)
dag.sendNotification(NTBlockAdded, block)
dag.dagLock.Lock()
isOk = true

View File

@ -474,15 +474,6 @@ func (dag *BlockDAG) addBlock(node *blockNode, parentNodes blockSet, block *util
dag.blockCount++
}
if err != nil {
// Notify the caller that the block was connected to the DAG.
// The caller would typically want to react with actions such as
// updating wallets.
dag.dagLock.Unlock()
dag.sendNotification(NTBlockConnected, block)
dag.dagLock.Lock()
}
// Intentionally ignore errors writing updated node status to DB. If
// it fails to write, it's not the end of the world. If the block is
// invalid, the worst that can happen is we revalidate the block

View File

@ -17,21 +17,15 @@ type NotificationCallback func(*Notification)
// Constants for the type of a notification message.
const (
// NTBlockAccepted indicates the associated block was accepted into
// the block chain. Note that this does not necessarily mean it was
// added to the main chain. For that, use NTBlockConnected.
NTBlockAccepted NotificationType = iota
// NTBlockConnected indicates the associated block was connected to the
// main chain.
NTBlockConnected
// NTBlockAdded indicates the associated block was added into
// the blockDAG.
NTBlockAdded NotificationType = iota
)
// notificationTypeStrings is a map of notification types back to their constant
// names for pretty printing.
var notificationTypeStrings = map[NotificationType]string{
NTBlockAccepted: "NTBlockAccepted",
NTBlockConnected: "NTBlockConnected",
NTBlockAdded: "NTBlockAdded",
}
// String returns the NotificationType in human-readable form.
@ -45,9 +39,7 @@ func (n NotificationType) String() string {
// Notification defines notification that is sent to the caller via the callback
// function provided during the call to New and consists of a notification type
// as well as associated data that depends on the type as follows:
// - NTBlockAccepted: *util.Block
// - NTBlockConnected: *util.Block
// - NTBlockDisconnected: *util.Block
// - Added: *util.Block
type Notification struct {
Type NotificationType
Data interface{}

View File

@ -30,7 +30,7 @@ func TestNotifications(t *testing.T) {
notificationCount := 0
callback := func(notification *Notification) {
if notification.Type == NTBlockAccepted {
if notification.Type == NTBlockAdded {
notificationCount++
}
}

View File

@ -9,34 +9,22 @@
package btcjson
const (
// BlockConnectedNtfnMethod is the legacy, deprecated method used for
// BlockAddedNtfnMethod is the legacy, deprecated method used for
// notifications from the dag server that a block has been connected.
//
// NOTE: Deprecated. Use FilteredBlockConnectedNtfnMethod instead.
BlockConnectedNtfnMethod = "blockConnected"
// NOTE: Deprecated. Use FilteredBlockAddedNtfnMethod instead.
BlockAddedNtfnMethod = "blockAdded"
// BlockDisconnectedNtfnMethod is the legacy, deprecated method used for
// notifications from the dag server that a block has been
// disconnected.
//
// NOTE: Deprecated. Use FilteredBlockDisconnectedNtfnMethod instead.
BlockDisconnectedNtfnMethod = "blockDisconnected"
// FilteredBlockConnectedNtfnMethod is the new method used for
// FilteredBlockAddedNtfnMethod is the new method used for
// notifications from the dag server that a block has been connected.
FilteredBlockConnectedNtfnMethod = "filteredBlockConnected"
// FilteredBlockDisconnectedNtfnMethod is the new method used for
// notifications from the dag server that a block has been
// disconnected.
FilteredBlockDisconnectedNtfnMethod = "filteredBlockDisconnected"
FilteredBlockAddedNtfnMethod = "filteredBlockAdded"
// RecvTxNtfnMethod is the legacy, deprecated method used for
// notifications from the dag server that a transaction which pays to
// a registered address has been processed.
//
// NOTE: Deprecated. Use RelevantTxAcceptedNtfnMethod and
// FilteredBlockConnectedNtfnMethod instead.
// FilteredBlockAddedNtfnMethod instead.
RecvTxNtfnMethod = "recvTx"
// RedeemingTxNtfnMethod is the legacy, deprecated method used for
@ -44,7 +32,7 @@ const (
// registered outpoint has been processed.
//
// NOTE: Deprecated. Use RelevantTxAcceptedNtfnMethod and
// FilteredBlockConnectedNtfnMethod instead.
// FilteredBlockAddedNtfnMethod instead.
RedeemingTxNtfnMethod = "redeemingTx"
// RescanFinishedNtfnMethod is the legacy, deprecated method used for
@ -77,82 +65,45 @@ const (
RelevantTxAcceptedNtfnMethod = "relevantTxAccepted"
)
// BlockConnectedNtfn defines the blockConnected JSON-RPC notification.
// BlockAddedNtfn defines the blockAdded JSON-RPC notification.
//
// NOTE: Deprecated. Use FilteredBlockConnectedNtfn instead.
type BlockConnectedNtfn struct {
// NOTE: Deprecated. Use FilteredBlockAddedNtfn instead.
type BlockAddedNtfn struct {
Hash string
Height int32
Time int64
}
// NewBlockConnectedNtfn returns a new instance which can be used to issue a
// blockConnected JSON-RPC notification.
// NewBlockAddedNtfn returns a new instance which can be used to issue a
// blockAdded JSON-RPC notification.
//
// NOTE: Deprecated. Use NewFilteredBlockConnectedNtfn instead.
func NewBlockConnectedNtfn(hash string, height int32, time int64) *BlockConnectedNtfn {
return &BlockConnectedNtfn{
// NOTE: Deprecated. Use NewFilteredBlockAddedNtfn instead.
func NewBlockAddedNtfn(hash string, height int32, time int64) *BlockAddedNtfn {
return &BlockAddedNtfn{
Hash: hash,
Height: height,
Time: time,
}
}
// BlockDisconnectedNtfn defines the blockDisconnected JSON-RPC notification.
//
// NOTE: Deprecated. Use FilteredBlockDisconnectedNtfn instead.
type BlockDisconnectedNtfn struct {
Hash string
Height int32
Time int64
}
// NewBlockDisconnectedNtfn returns a new instance which can be used to issue a
// blockDisconnected JSON-RPC notification.
//
// NOTE: Deprecated. Use NewFilteredBlockDisconnectedNtfn instead.
func NewBlockDisconnectedNtfn(hash string, height int32, time int64) *BlockDisconnectedNtfn {
return &BlockDisconnectedNtfn{
Hash: hash,
Height: height,
Time: time,
}
}
// FilteredBlockConnectedNtfn defines the filteredBlockConnected JSON-RPC
// FilteredBlockAddedNtfn defines the filteredBlockAdded JSON-RPC
// notification.
type FilteredBlockConnectedNtfn struct {
type FilteredBlockAddedNtfn struct {
Height int32
Header string
SubscribedTxs []string
}
// NewFilteredBlockConnectedNtfn returns a new instance which can be used to
// issue a filteredBlockConnected JSON-RPC notification.
func NewFilteredBlockConnectedNtfn(height int32, header string, subscribedTxs []string) *FilteredBlockConnectedNtfn {
return &FilteredBlockConnectedNtfn{
// NewFilteredBlockAddedNtfn returns a new instance which can be used to
// issue a filteredBlockAdded JSON-RPC notification.
func NewFilteredBlockAddedNtfn(height int32, header string, subscribedTxs []string) *FilteredBlockAddedNtfn {
return &FilteredBlockAddedNtfn{
Height: height,
Header: header,
SubscribedTxs: subscribedTxs,
}
}
// FilteredBlockDisconnectedNtfn defines the filteredBlockDisconnected JSON-RPC
// notification.
type FilteredBlockDisconnectedNtfn struct {
Height int32
Header string
}
// NewFilteredBlockDisconnectedNtfn returns a new instance which can be used to
// issue a filteredBlockDisconnected JSON-RPC notification.
func NewFilteredBlockDisconnectedNtfn(height int32, header string) *FilteredBlockDisconnectedNtfn {
return &FilteredBlockDisconnectedNtfn{
Height: height,
Header: header,
}
}
// BlockDetails describes details of a tx in a block.
type BlockDetails struct {
Height int32 `json:"height"`
@ -163,7 +114,7 @@ type BlockDetails struct {
// RecvTxNtfn defines the recvTx JSON-RPC notification.
//
// NOTE: Deprecated. Use RelevantTxAcceptedNtfn and FilteredBlockConnectedNtfn
// NOTE: Deprecated. Use RelevantTxAcceptedNtfn and FilteredBlockAddedNtfn
// instead.
type RecvTxNtfn struct {
HexTx string
@ -174,7 +125,7 @@ type RecvTxNtfn struct {
// JSON-RPC notification.
//
// NOTE: Deprecated. Use NewRelevantTxAcceptedNtfn and
// NewFilteredBlockConnectedNtfn instead.
// NewFilteredBlockAddedNtfn instead.
func NewRecvTxNtfn(hexTx string, block *BlockDetails) *RecvTxNtfn {
return &RecvTxNtfn{
HexTx: hexTx,
@ -184,7 +135,7 @@ func NewRecvTxNtfn(hexTx string, block *BlockDetails) *RecvTxNtfn {
// RedeemingTxNtfn defines the redeemingTx JSON-RPC notification.
//
// NOTE: Deprecated. Use RelevantTxAcceptedNtfn and FilteredBlockConnectedNtfn
// NOTE: Deprecated. Use RelevantTxAcceptedNtfn and FilteredBlockAddedNtfn
// instead.
type RedeemingTxNtfn struct {
HexTx string
@ -195,7 +146,7 @@ type RedeemingTxNtfn struct {
// redeemingTx JSON-RPC notification.
//
// NOTE: Deprecated. Use NewRelevantTxAcceptedNtfn and
// NewFilteredBlockConnectedNtfn instead.
// NewFilteredBlockAddedNtfn instead.
func NewRedeemingTxNtfn(hexTx string, block *BlockDetails) *RedeemingTxNtfn {
return &RedeemingTxNtfn{
HexTx: hexTx,
@ -290,10 +241,8 @@ func init() {
// notifications.
flags := UFWebsocketOnly | UFNotification
MustRegisterCmd(BlockConnectedNtfnMethod, (*BlockConnectedNtfn)(nil), flags)
MustRegisterCmd(BlockDisconnectedNtfnMethod, (*BlockDisconnectedNtfn)(nil), flags)
MustRegisterCmd(FilteredBlockConnectedNtfnMethod, (*FilteredBlockConnectedNtfn)(nil), flags)
MustRegisterCmd(FilteredBlockDisconnectedNtfnMethod, (*FilteredBlockDisconnectedNtfn)(nil), flags)
MustRegisterCmd(BlockAddedNtfnMethod, (*BlockAddedNtfn)(nil), flags)
MustRegisterCmd(FilteredBlockAddedNtfnMethod, (*FilteredBlockAddedNtfn)(nil), flags)
MustRegisterCmd(RecvTxNtfnMethod, (*RecvTxNtfn)(nil), flags)
MustRegisterCmd(RedeemingTxNtfnMethod, (*RedeemingTxNtfn)(nil), flags)
MustRegisterCmd(RescanFinishedNtfnMethod, (*RescanFinishedNtfn)(nil), flags)

View File

@ -33,64 +33,35 @@ func TestDAGSvrWsNtfns(t *testing.T) {
unmarshalled interface{}
}{
{
name: "blockConnected",
name: "blockAdded",
newNtfn: func() (interface{}, error) {
return btcjson.NewCmd("blockConnected", "123", 100000, 123456789)
return btcjson.NewCmd("blockAdded", "123", 100000, 123456789)
},
staticNtfn: func() interface{} {
return btcjson.NewBlockConnectedNtfn("123", 100000, 123456789)
return btcjson.NewBlockAddedNtfn("123", 100000, 123456789)
},
marshalled: `{"jsonrpc":"1.0","method":"blockConnected","params":["123",100000,123456789],"id":null}`,
unmarshalled: &btcjson.BlockConnectedNtfn{
marshalled: `{"jsonrpc":"1.0","method":"blockAdded","params":["123",100000,123456789],"id":null}`,
unmarshalled: &btcjson.BlockAddedNtfn{
Hash: "123",
Height: 100000,
Time: 123456789,
},
},
{
name: "blockDisconnected",
name: "filteredBlockAdded",
newNtfn: func() (interface{}, error) {
return btcjson.NewCmd("blockDisconnected", "123", 100000, 123456789)
return btcjson.NewCmd("filteredBlockAdded", 100000, "header", []string{"tx0", "tx1"})
},
staticNtfn: func() interface{} {
return btcjson.NewBlockDisconnectedNtfn("123", 100000, 123456789)
return btcjson.NewFilteredBlockAddedNtfn(100000, "header", []string{"tx0", "tx1"})
},
marshalled: `{"jsonrpc":"1.0","method":"blockDisconnected","params":["123",100000,123456789],"id":null}`,
unmarshalled: &btcjson.BlockDisconnectedNtfn{
Hash: "123",
Height: 100000,
Time: 123456789,
},
},
{
name: "filteredBlockConnected",
newNtfn: func() (interface{}, error) {
return btcjson.NewCmd("filteredBlockConnected", 100000, "header", []string{"tx0", "tx1"})
},
staticNtfn: func() interface{} {
return btcjson.NewFilteredBlockConnectedNtfn(100000, "header", []string{"tx0", "tx1"})
},
marshalled: `{"jsonrpc":"1.0","method":"filteredBlockConnected","params":[100000,"header",["tx0","tx1"]],"id":null}`,
unmarshalled: &btcjson.FilteredBlockConnectedNtfn{
marshalled: `{"jsonrpc":"1.0","method":"filteredBlockAdded","params":[100000,"header",["tx0","tx1"]],"id":null}`,
unmarshalled: &btcjson.FilteredBlockAddedNtfn{
Height: 100000,
Header: "header",
SubscribedTxs: []string{"tx0", "tx1"},
},
},
{
name: "filteredBlockDisconnected",
newNtfn: func() (interface{}, error) {
return btcjson.NewCmd("filteredBlockDisconnected", 100000, "header")
},
staticNtfn: func() interface{} {
return btcjson.NewFilteredBlockDisconnectedNtfn(100000, "header")
},
marshalled: `{"jsonrpc":"1.0","method":"filteredBlockDisconnected","params":[100000,"header"],"id":null}`,
unmarshalled: &btcjson.FilteredBlockDisconnectedNtfn{
Height: 100000,
Header: "header",
},
},
{
name: "recvTx",
newNtfn: func() (interface{}, error) {

View File

@ -49,16 +49,15 @@ func (u *utxo) isMature(height int32) bool {
return height >= u.maturityHeight
}
// chainUpdate encapsulates an update to the current main chain. This struct is
// used to sync up the memWallet each time a new block is connected to the main
// chain.
type chainUpdate struct {
// dagUpdate encapsulates an update to the current DAG. This struct is
// used to sync up the memWallet each time a new block is connected to the DAG.
type dagUpdate struct {
blockHeight int32
filteredTxns []*util.Tx
isConnect bool // True if connect, false if disconnect
}
// undoEntry is functionally the opposite of a chainUpdate. An undoEntry is
// undoEntry is functionally the opposite of a dagUpdate. An undoEntry is
// created for each new block received, then stored in a log in order to
// properly handle block re-orgs.
type undoEntry struct {
@ -96,9 +95,9 @@ type memWallet struct {
// disconnected block on the wallet's set of spendable utxos.
reorgJournal map[int32]*undoEntry
chainUpdates []*chainUpdate
chainUpdateSignal chan struct{}
chainMtx sync.Mutex
dagUpdates []*dagUpdate
dagUpdateSignal chan struct{}
dagMtx sync.Mutex
net *dagconfig.Params
@ -143,21 +142,21 @@ func newMemWallet(net *dagconfig.Params, harnessID uint32) (*memWallet, error) {
addrs[0] = coinbaseAddr
return &memWallet{
net: net,
coinbaseKey: coinbaseKey,
coinbaseAddr: coinbaseAddr,
hdIndex: 1,
hdRoot: hdRoot,
addrs: addrs,
utxos: make(map[wire.OutPoint]*utxo),
chainUpdateSignal: make(chan struct{}),
reorgJournal: make(map[int32]*undoEntry),
net: net,
coinbaseKey: coinbaseKey,
coinbaseAddr: coinbaseAddr,
hdIndex: 1,
hdRoot: hdRoot,
addrs: addrs,
utxos: make(map[wire.OutPoint]*utxo),
dagUpdateSignal: make(chan struct{}),
reorgJournal: make(map[int32]*undoEntry),
}, nil
}
// Start launches all goroutines required for the wallet to function properly.
func (m *memWallet) Start() {
go m.chainSyncer()
go m.dagSyncer()
}
// SyncedHeight returns the height the wallet is known to be synced to.
@ -176,27 +175,27 @@ func (m *memWallet) SetRPCClient(rpcClient *rpcclient.Client) {
}
// IngestBlock is a call-back which is to be triggered each time a new block is
// connected to the main chain. It queues the update for the chain syncer,
// connected to the blockDAG. It queues the update for the DAG syncer,
// calling the private version in sequential order.
func (m *memWallet) IngestBlock(height int32, header *wire.BlockHeader, filteredTxns []*util.Tx) {
// Append this new chain update to the end of the queue of new chain
// Append this new DAG update to the end of the queue of new DAG
// updates.
m.chainMtx.Lock()
m.chainUpdates = append(m.chainUpdates, &chainUpdate{height,
m.dagMtx.Lock()
m.dagUpdates = append(m.dagUpdates, &dagUpdate{height,
filteredTxns, true})
m.chainMtx.Unlock()
m.dagMtx.Unlock()
// Launch a goroutine to signal the chainSyncer that a new update is
// Launch a goroutine to signal the dagSyncer that a new update is
// available. We do this in a new goroutine in order to avoid blocking
// the main loop of the rpc client.
go func() {
m.chainUpdateSignal <- struct{}{}
m.dagUpdateSignal <- struct{}{}
}()
}
// ingestBlock updates the wallet's internal utxo state based on the outputs
// created and destroyed within each block.
func (m *memWallet) ingestBlock(update *chainUpdate) {
func (m *memWallet) ingestBlock(update *dagUpdate) {
// Update the latest synced height, then process each filtered
// transaction in the block creating and destroying utxos within
// the wallet as a result.
@ -218,27 +217,25 @@ func (m *memWallet) ingestBlock(update *chainUpdate) {
m.reorgJournal[update.blockHeight] = undo
}
// chainSyncer is a goroutine dedicated to processing new blocks in order to
// dagSyncer is a goroutine dedicated to processing new blocks in order to
// keep the wallet's utxo state up to date.
//
// NOTE: This MUST be run as a goroutine.
func (m *memWallet) chainSyncer() {
var update *chainUpdate
func (m *memWallet) dagSyncer() {
var update *dagUpdate
for range m.chainUpdateSignal {
for range m.dagUpdateSignal {
// A new update is available, so pop the new chain update from
// the front of the update queue.
m.chainMtx.Lock()
update = m.chainUpdates[0]
m.chainUpdates[0] = nil // Set to nil to prevent GC leak.
m.chainUpdates = m.chainUpdates[1:]
m.chainMtx.Unlock()
m.dagMtx.Lock()
update = m.dagUpdates[0]
m.dagUpdates[0] = nil // Set to nil to prevent GC leak.
m.dagUpdates = m.dagUpdates[1:]
m.dagMtx.Unlock()
m.Lock()
if update.isConnect {
m.ingestBlock(update)
} else {
m.unwindBlock(update)
}
m.Unlock()
}
@ -295,41 +292,6 @@ func (m *memWallet) evalInputs(inputs []*wire.TxIn, undo *undoEntry) {
}
}
// UnwindBlock is a call-back which is to be executed each time a block is
// disconnected from the main chain. It queues the update for the chain syncer,
// calling the private version in sequential order.
func (m *memWallet) UnwindBlock(height int32, header *wire.BlockHeader) {
// Append this new chain update to the end of the queue of new chain
// updates.
m.chainMtx.Lock()
m.chainUpdates = append(m.chainUpdates, &chainUpdate{height,
nil, false})
m.chainMtx.Unlock()
// Launch a goroutine to signal the chainSyncer that a new update is
// available. We do this in a new goroutine in order to avoid blocking
// the main loop of the rpc client.
go func() {
m.chainUpdateSignal <- struct{}{}
}()
}
// unwindBlock undoes the effect that a particular block had on the wallet's
// internal utxo state.
func (m *memWallet) unwindBlock(update *chainUpdate) {
undo := m.reorgJournal[update.blockHeight]
for _, utxo := range undo.utxosCreated {
delete(m.utxos, utxo)
}
for outPoint, utxo := range undo.utxosDestroyed {
m.utxos[outPoint] = utxo
}
delete(m.reorgJournal, update.blockHeight)
}
// newAddress returns a new address from the wallet's hd key chain. It also
// loads the address into the RPC client's transaction filter to ensure any
// transactions that involve it are delivered via the notifications.

View File

@ -169,24 +169,15 @@ func New(activeNet *dagconfig.Params, handlers *rpcclient.NotificationHandlers,
// callback has already been set, then create a wrapper callback which
// executes both the currently registered callback and the mem wallet's
// callback.
if handlers.OnFilteredBlockConnected != nil {
obc := handlers.OnFilteredBlockConnected
handlers.OnFilteredBlockConnected = func(height int32, header *wire.BlockHeader, filteredTxns []*util.Tx) {
if handlers.OnFilteredBlockAdded != nil {
obc := handlers.OnFilteredBlockAdded
handlers.OnFilteredBlockAdded = func(height int32, header *wire.BlockHeader, filteredTxns []*util.Tx) {
wallet.IngestBlock(height, header, filteredTxns)
obc(height, header, filteredTxns)
}
} else {
// Otherwise, we can claim the callback ourselves.
handlers.OnFilteredBlockConnected = wallet.IngestBlock
}
if handlers.OnFilteredBlockDisconnected != nil {
obd := handlers.OnFilteredBlockDisconnected
handlers.OnFilteredBlockDisconnected = func(height int32, header *wire.BlockHeader) {
wallet.UnwindBlock(height, header)
obd(height, header)
}
} else {
handlers.OnFilteredBlockDisconnected = wallet.UnwindBlock
handlers.OnFilteredBlockAdded = wallet.IngestBlock
}
h := &Harness{

View File

@ -1171,9 +1171,8 @@ out:
// connected peers.
func (sm *SyncManager) handleBlockDAGNotification(notification *blockdag.Notification) {
switch notification.Type {
// A block has been accepted into the block chain. Relay it to other
// peers.
case blockdag.NTBlockAccepted:
// A block has been accepted into the blockDAG. Relay it to other peers.
case blockdag.NTBlockAdded:
// Don't relay if we are not current. Other peers that are
// current should already know about it.
if !sm.current() {
@ -1182,7 +1181,7 @@ func (sm *SyncManager) handleBlockDAGNotification(notification *blockdag.Notific
block, ok := notification.Data.(*util.Block)
if !ok {
log.Warnf("Chain accepted notification is not a block.")
log.Warnf("Block Added notification data is not a block.")
break
}
@ -1190,14 +1189,7 @@ func (sm *SyncManager) handleBlockDAGNotification(notification *blockdag.Notific
iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header)
// A block has been connected to the block DAG.
case blockdag.NTBlockConnected:
block, ok := notification.Data.(*util.Block)
if !ok {
log.Warnf("Chain connected notification is not a block.")
break
}
// Update mempool
ch := make(chan mempool.NewBlockMsg)
go func() {
err := sm.txMemPool.HandleNewBlock(block, ch)
@ -1329,8 +1321,7 @@ func (sm *SyncManager) SyncPeerID() int32 {
return <-reply
}
// ProcessBlock makes use of ProcessBlock on an internal instance of a block
// chain.
// ProcessBlock makes use of ProcessBlock on an internal instance of a blockDAG.
func (sm *SyncManager) ProcessBlock(block *util.Block, flags blockdag.BehaviorFlags) (bool, error) {
reply := make(chan processBlockResponse, 1)
sm.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply}

View File

@ -21,12 +21,8 @@ func main() {
// for notifications. See the documentation of the rpcclient
// NotificationHandlers type for more details about each handler.
ntfnHandlers := rpcclient.NotificationHandlers{
OnFilteredBlockConnected: func(height int32, header *wire.BlockHeader, txns []*util.Tx) {
log.Printf("Block connected: %s (%d) %s",
header.BlockHash(), height, header.Timestamp)
},
OnFilteredBlockDisconnected: func(height int32, header *wire.BlockHeader) {
log.Printf("Block disconnected: %s (%d) %s",
OnFilteredBlockAdded: func(height int32, header *wire.BlockHeader, txns []*util.Tx) {
log.Printf("Block added: %s (%d) %s",
header.BlockHash(), height, header.Timestamp)
},
}
@ -49,7 +45,7 @@ func main() {
log.Fatal(err)
}
// Register for block connect and disconnect notifications.
// Register for block added notifications.
if err := client.NotifyBlocks(); err != nil {
log.Fatal(err)
}

View File

@ -92,40 +92,24 @@ type NotificationHandlers struct {
// notification handlers, and is safe for blocking client requests.
OnClientConnected func()
// OnBlockConnected is invoked when a block is connected to the longest
// (best) chain. It will only be invoked if a preceding call to
// NotifyBlocks has been made to register for the notification and the
// function is non-nil.
// OnBlockAdded is invoked when a block is connected to the DAG.
// It will only be invoked if a preceding call to NotifyBlocks has been made
// to register for the notification and the function is non-nil.
//
// NOTE: Deprecated. Use OnFilteredBlockConnected instead.
OnBlockConnected func(hash *daghash.Hash, height int32, t time.Time)
// NOTE: Deprecated. Use OnFilteredBlockAdded instead.
OnBlockAdded func(hash *daghash.Hash, height int32, t time.Time)
// OnFilteredBlockConnected is invoked when a block is connected to the
// longest (best) chain. It will only be invoked if a preceding call to
// OnFilteredBlockAdded is invoked when a block is connected to the
// bloackDAG. It will only be invoked if a preceding call to
// NotifyBlocks has been made to register for the notification and the
// function is non-nil. Its parameters differ from OnBlockConnected: it
// function is non-nil. Its parameters differ from OnBlockAdded: it
// receives the block's height, header, and relevant transactions.
OnFilteredBlockConnected func(height int32, header *wire.BlockHeader,
OnFilteredBlockAdded func(height int32, header *wire.BlockHeader,
txs []*util.Tx)
// OnBlockDisconnected is invoked when a block is disconnected from the
// longest (best) chain. It will only be invoked if a preceding call to
// NotifyBlocks has been made to register for the notification and the
// function is non-nil.
//
// NOTE: Deprecated. Use OnFilteredBlockDisconnected instead.
OnBlockDisconnected func(hash *daghash.Hash, height int32, t time.Time)
// OnFilteredBlockDisconnected is invoked when a block is disconnected
// from the longest (best) chain. It will only be invoked if a
// preceding NotifyBlocks has been made to register for the notification
// and the call to function is non-nil. Its parameters differ from
// OnBlockDisconnected: it receives the block's height and header.
OnFilteredBlockDisconnected func(height int32, header *wire.BlockHeader)
// OnRecvTx is invoked when a transaction that receives funds to a
// registered address is received into the memory pool and also
// connected to the longest (best) chain. It will only be invoked if a
// connected to the BlockDAG. It will only be invoked if a
// preceding call to NotifyReceived, Rescan, or RescanEndHeight has been
// made to register for the notification and the function is non-nil.
//
@ -134,7 +118,7 @@ type NotificationHandlers struct {
// OnRedeemingTx is invoked when a transaction that spends a registered
// outpoint is received into the memory pool and also connected to the
// longest (best) chain. It will only be invoked if a preceding call to
// blockDAG. It will only be invoked if a preceding call to
// NotifySpent, Rescan, or RescanEndHeight has been made to register for
// the notification and the function is non-nil.
//
@ -220,78 +204,42 @@ func (c *Client) handleNotification(ntfn *rawNotification) {
}
switch ntfn.Method {
// OnBlockConnected
case btcjson.BlockConnectedNtfnMethod:
// OnBlockAdded
case btcjson.BlockAddedNtfnMethod:
// Ignore the notification if the client is not interested in
// it.
if c.ntfnHandlers.OnBlockConnected == nil {
if c.ntfnHandlers.OnBlockAdded == nil {
return
}
blockHash, blockHeight, blockTime, err := parseChainNtfnParams(ntfn.Params)
blockHash, blockHeight, blockTime, err := parseDAGNtfnParams(ntfn.Params)
if err != nil {
log.Warnf("Received invalid block connected "+
log.Warnf("Received invalid block added "+
"notification: %s", err)
return
}
c.ntfnHandlers.OnBlockConnected(blockHash, blockHeight, blockTime)
c.ntfnHandlers.OnBlockAdded(blockHash, blockHeight, blockTime)
// OnFilteredBlockConnected
case btcjson.FilteredBlockConnectedNtfnMethod:
// OnFilteredBlockAdded
case btcjson.FilteredBlockAddedNtfnMethod:
// Ignore the notification if the client is not interested in
// it.
if c.ntfnHandlers.OnFilteredBlockConnected == nil {
if c.ntfnHandlers.OnFilteredBlockAdded == nil {
return
}
blockHeight, blockHeader, transactions, err :=
parseFilteredBlockConnectedParams(ntfn.Params)
parseFilteredBlockAddedParams(ntfn.Params)
if err != nil {
log.Warnf("Received invalid filtered block "+
"connected notification: %s", err)
return
}
c.ntfnHandlers.OnFilteredBlockConnected(blockHeight,
c.ntfnHandlers.OnFilteredBlockAdded(blockHeight,
blockHeader, transactions)
// OnBlockDisconnected
case btcjson.BlockDisconnectedNtfnMethod:
// Ignore the notification if the client is not interested in
// it.
if c.ntfnHandlers.OnBlockDisconnected == nil {
return
}
blockHash, blockHeight, blockTime, err := parseChainNtfnParams(ntfn.Params)
if err != nil {
log.Warnf("Received invalid block connected "+
"notification: %s", err)
return
}
c.ntfnHandlers.OnBlockDisconnected(blockHash, blockHeight, blockTime)
// OnFilteredBlockDisconnected
case btcjson.FilteredBlockDisconnectedNtfnMethod:
// Ignore the notification if the client is not interested in
// it.
if c.ntfnHandlers.OnFilteredBlockDisconnected == nil {
return
}
blockHeight, blockHeader, err :=
parseFilteredBlockDisconnectedParams(ntfn.Params)
if err != nil {
log.Warnf("Received invalid filtered block "+
"disconnected notification: %s", err)
return
}
c.ntfnHandlers.OnFilteredBlockDisconnected(blockHeight,
blockHeader)
// OnRecvTx
case btcjson.RecvTxNtfnMethod:
// Ignore the notification if the client is not interested in
@ -485,9 +433,9 @@ func (e wrongNumParams) Error() string {
return fmt.Sprintf("wrong number of parameters (%d)", e)
}
// parseChainNtfnParams parses out the block hash and height from the parameters
// of blockconnected and blockdisconnected notifications.
func parseChainNtfnParams(params []json.RawMessage) (*daghash.Hash,
// parseDAGNtfnParams parses out the block hash and height from the parameters
// of blockadded.
func parseDAGNtfnParams(params []json.RawMessage) (*daghash.Hash,
int32, time.Time, error) {
if len(params) != 3 {
@ -527,12 +475,12 @@ func parseChainNtfnParams(params []json.RawMessage) (*daghash.Hash,
return blockHash, blockHeight, blockTime, nil
}
// parseFilteredBlockConnectedParams parses out the parameters included in a
// filteredblockconnected notification.
// parseFilteredBlockAddedParams parses out the parameters included in a
// filteredblockadded notification.
//
// NOTE: This is a btcd extension ported from github.com/decred/dcrrpcclient
// and requires a websocket connection.
func parseFilteredBlockConnectedParams(params []json.RawMessage) (int32,
func parseFilteredBlockAddedParams(params []json.RawMessage) (int32,
*wire.BlockHeader, []*util.Tx, error) {
if len(params) < 3 {
@ -583,40 +531,6 @@ func parseFilteredBlockConnectedParams(params []json.RawMessage) (int32,
return blockHeight, &blockHeader, transactions, nil
}
// parseFilteredBlockDisconnectedParams parses out the parameters included in a
// filteredblockdisconnected notification.
//
// NOTE: This is a btcd extension ported from github.com/decred/dcrrpcclient
// and requires a websocket connection.
func parseFilteredBlockDisconnectedParams(params []json.RawMessage) (int32,
*wire.BlockHeader, error) {
if len(params) < 2 {
return 0, nil, wrongNumParams(len(params))
}
// Unmarshal first parameter as an integer.
var blockHeight int32
err := json.Unmarshal(params[0], &blockHeight)
if err != nil {
return 0, nil, err
}
// Unmarshal second parmeter as a slice of bytes.
blockHeaderBytes, err := parseHexParam(params[1])
if err != nil {
return 0, nil, err
}
// Deserialize block header from slice of bytes.
var blockHeader wire.BlockHeader
err = blockHeader.Deserialize(bytes.NewReader(blockHeaderBytes))
if err != nil {
return 0, nil, err
}
return blockHeight, &blockHeader, nil
}
func parseHexParam(param json.RawMessage) ([]byte, error) {
var s string
err := json.Unmarshal(param, &s)
@ -896,8 +810,7 @@ func (c *Client) NotifyBlocksAsync() FutureNotifyBlocksResult {
// this function has no effect if there are no notification handlers and will
// result in an error if the client is configured to run in HTTP POST mode.
//
// The notifications delivered as a result of this call will be via one of
// OnBlockConnected or OnBlockDisconnected.
// The notifications delivered as a result of this call will be via OnBlockAdded
//
// NOTE: This is a btcd extension and requires a websocket connection.
func (c *Client) NotifyBlocks() error {
@ -1104,7 +1017,7 @@ func (c *Client) NotifyReceivedAsync(addresses []util.Address) FutureNotifyRecei
// NotifyReceived registers the client to receive notifications every time a
// new transaction which pays to one of the passed addresses is accepted to
// memory pool or in a block connected to the block chain. In addition, when
// memory pool or in a block added to the block DAG. In addition, when
// one of these transactions is detected, the client is also automatically
// registered for notifications when the new transaction outpoints the address
// now has available are spent (See NotifySpent). The notifications are

View File

@ -1498,10 +1498,10 @@ func (state *gbtWorkState) notifyLongPollers(tipHashes []daghash.Hash, lastGener
}
}
// NotifyBlockConnected uses the newly-connected block to notify any long poll
// NotifyBlockAdded uses the newly-added block to notify any long poll
// clients with a new block template when their existing block template is
// stale due to the newly connected block.
func (state *gbtWorkState) NotifyBlockConnected(tipHashes []daghash.Hash) {
// stale due to the newly added block.
func (state *gbtWorkState) NotifyBlockAdded(tipHashes []daghash.Hash) {
go func() {
state.Lock()
defer state.Unlock()
@ -4325,27 +4325,26 @@ func NewRPCServer(
return &rpc, nil
}
// Callback for notifications from blockchain. It notifies clients that are
// Callback for notifications from blockdag. It notifies clients that are
// long polling for changes or subscribed to websockets notifications.
func (s *Server) handleBlockchainNotification(notification *blockdag.Notification) {
switch notification.Type {
case blockdag.NTBlockAccepted:
case blockdag.NTBlockAdded:
block, ok := notification.Data.(*util.Block)
if !ok {
log.Warnf("Block added notification data is not a block.")
break
}
tipHashes := s.cfg.DAG.TipHashes()
// Allow any clients performing long polling via the
// getBlockTemplate RPC to be notified when the new block causes
// their old block template to become stale.
s.gbtWorkState.NotifyBlockConnected(tipHashes)
case blockdag.NTBlockConnected:
block, ok := notification.Data.(*util.Block)
if !ok {
log.Warnf("Chain connected notification is not a block.")
break
}
s.gbtWorkState.NotifyBlockAdded(tipHashes)
// Notify registered websocket clients of incoming block.
s.ntfnMgr.NotifyBlockConnected(block)
s.ntfnMgr.NotifyBlockAdded(block)
}
}

View File

@ -200,29 +200,16 @@ func (m *wsNotificationManager) queueHandler() {
m.wg.Done()
}
// NotifyBlockConnected passes a block newly-connected to the best chain
// NotifyBlockAdded passes a block newly-added to the blockDAG
// to the notification manager for block and transaction notification
// processing.
func (m *wsNotificationManager) NotifyBlockConnected(block *util.Block) {
// As NotifyBlockConnected will be called by the block manager
func (m *wsNotificationManager) NotifyBlockAdded(block *util.Block) {
// As NotifyBlockAdded will be called by the block manager
// and the RPC server may no longer be running, use a select
// statement to unblock enqueuing the notification once the RPC
// server has begun shutting down.
select {
case m.queueNotification <- (*notificationBlockConnected)(block):
case <-m.quit:
}
}
// NotifyBlockDisconnected passes a block disconnected from the best chain
// to the notification manager for block notification processing.
func (m *wsNotificationManager) NotifyBlockDisconnected(block *util.Block) {
// As NotifyBlockDisconnected will be called by the block manager
// and the RPC server may no longer be running, use a select
// statement to unblock enqueuing the notification once the RPC
// server has begun shutting down.
select {
case m.queueNotification <- (*notificationBlockDisconnected)(block):
case m.queueNotification <- (*notificationBlockAdded)(block):
case <-m.quit:
}
}
@ -446,8 +433,7 @@ func (f *wsClientFilter) removeUnspentOutPoint(op *wire.OutPoint) {
}
// Notification types
type notificationBlockConnected util.Block
type notificationBlockDisconnected util.Block
type notificationBlockAdded util.Block
type notificationTxAcceptedByMempool struct {
isNew bool
tx *util.Tx
@ -504,7 +490,7 @@ out:
break out
}
switch n := n.(type) {
case *notificationBlockConnected:
case *notificationBlockAdded:
block := (*util.Block)(n)
// Skip iterating through all txs if no
@ -517,19 +503,9 @@ out:
}
if len(blockNotifications) != 0 {
m.notifyBlockConnected(blockNotifications,
m.notifyBlockAdded(blockNotifications,
block)
m.notifyFilteredBlockConnected(blockNotifications,
block)
}
case *notificationBlockDisconnected:
block := (*util.Block)(n)
if len(blockNotifications) != 0 {
m.notifyBlockDisconnected(blockNotifications,
block)
m.notifyFilteredBlockDisconnected(blockNotifications,
m.notifyFilteredBlockAdded(blockNotifications,
block)
}
@ -688,17 +664,17 @@ func (m *wsNotificationManager) subscribedClients(tx *util.Tx,
return subscribed
}
// notifyBlockConnected notifies websocket clients that have registered for
// block updates when a block is connected to the main chain.
func (*wsNotificationManager) notifyBlockConnected(clients map[chan struct{}]*wsClient,
// notifyBlockAdded notifies websocket clients that have registered for
// block updates when a block is added to the blockDAG.
func (*wsNotificationManager) notifyBlockAdded(clients map[chan struct{}]*wsClient,
block *util.Block) {
// Notify interested websocket clients about the connected block.
ntfn := btcjson.NewBlockConnectedNtfn(block.Hash().String(), block.Height(),
// Notify interested websocket clients about the added block.
ntfn := btcjson.NewBlockAddedNtfn(block.Hash().String(), block.Height(),
block.MsgBlock().Header.Timestamp.Unix())
marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn)
if err != nil {
log.Errorf("Failed to marshal block connected notification: "+
log.Errorf("Failed to marshal block added notification: "+
"%s", err)
return
}
@ -707,33 +683,9 @@ func (*wsNotificationManager) notifyBlockConnected(clients map[chan struct{}]*ws
}
}
// notifyBlockDisconnected notifies websocket clients that have registered for
// block updates when a block is disconnected from the main chain (due to a
// reorganize).
func (*wsNotificationManager) notifyBlockDisconnected(clients map[chan struct{}]*wsClient, block *util.Block) {
// Skip notification creation if no clients have requested block
// connected/disconnected notifications.
if len(clients) == 0 {
return
}
// Notify interested websocket clients about the disconnected block.
ntfn := btcjson.NewBlockDisconnectedNtfn(block.Hash().String(),
block.Height(), block.MsgBlock().Header.Timestamp.Unix())
marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn)
if err != nil {
log.Errorf("Failed to marshal block disconnected "+
"notification: %s", err)
return
}
for _, wsc := range clients {
wsc.QueueNotification(marshalledJSON)
}
}
// notifyFilteredBlockConnected notifies websocket clients that have registered for
// block updates when a block is connected to the main chain.
func (m *wsNotificationManager) notifyFilteredBlockConnected(clients map[chan struct{}]*wsClient,
// notifyFilteredBlockAdded notifies websocket clients that have registered for
// block updates when a block is added to the blockDAG.
func (m *wsNotificationManager) notifyFilteredBlockAdded(clients map[chan struct{}]*wsClient,
block *util.Block) {
// Create the common portion of the notification that is the same for
@ -742,10 +694,10 @@ func (m *wsNotificationManager) notifyFilteredBlockConnected(clients map[chan st
err := block.MsgBlock().Header.Serialize(&w)
if err != nil {
log.Errorf("Failed to serialize header for filtered block "+
"connected notification: %s", err)
"added notification: %s", err)
return
}
ntfn := btcjson.NewFilteredBlockConnectedNtfn(block.Height(),
ntfn := btcjson.NewFilteredBlockAddedNtfn(block.Height(),
hex.EncodeToString(w.Bytes()), nil)
// Search for relevant transactions for each client and save them
@ -776,38 +728,6 @@ func (m *wsNotificationManager) notifyFilteredBlockConnected(clients map[chan st
}
}
// notifyFilteredBlockDisconnected notifies websocket clients that have registered for
// block updates when a block is disconnected from the main chain (due to a
// reorganize).
func (*wsNotificationManager) notifyFilteredBlockDisconnected(clients map[chan struct{}]*wsClient,
block *util.Block) {
// Skip notification creation if no clients have requested block
// connected/disconnected notifications.
if len(clients) == 0 {
return
}
// Notify interested websocket clients about the disconnected block.
var w bytes.Buffer
err := block.MsgBlock().Header.Serialize(&w)
if err != nil {
log.Errorf("Failed to serialize header for filtered block "+
"disconnected notification: %s", err)
return
}
ntfn := btcjson.NewFilteredBlockDisconnectedNtfn(block.Height(),
hex.EncodeToString(w.Bytes()))
marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn)
if err != nil {
log.Errorf("Failed to marshal filtered block disconnected "+
"notification: %s", err)
return
}
for _, wsc := range clients {
wsc.QueueNotification(marshalledJSON)
}
}
// RegisterNewMempoolTxsUpdates requests notifications to the passed websocket
// client when new transactions are added to the memory pool.
func (m *wsNotificationManager) RegisterNewMempoolTxsUpdates(wsc *wsClient) {
@ -902,8 +822,8 @@ func (m *wsNotificationManager) notifyForNewTx(clients map[chan struct{}]*wsClie
}
// RegisterSpentRequests requests a notification when each of the passed
// outpoints is confirmed spent (contained in a block connected to the main
// chain) for the passed websocket client. The request is automatically
// outpoints is confirmed spent (contained in a block added to the blockDAG)
// for the passed websocket client. The request is automatically
// removed once the notification has been sent.
func (m *wsNotificationManager) RegisterSpentRequests(wsc *wsClient, ops []*wire.OutPoint) {
m.queueNotification <- &notificationRegisterSpent{
@ -952,7 +872,7 @@ func (m *wsNotificationManager) addSpentRequests(opMap map[wire.OutPoint]map[cha
// UnregisterSpentRequest removes a request from the passed websocket client
// to be notified when the passed outpoint is confirmed spent (contained in a
// block connected to the main chain).
// block added to the blockDAG).
func (m *wsNotificationManager) UnregisterSpentRequest(wsc *wsClient, op *wire.OutPoint) {
m.queueNotification <- &notificationUnregisterSpent{
wsc: wsc,