Clean up notification contexts and goroutines after ws disconnect.

This refactors the wallet notification code to reverse the order of
how notification contexts are stored.  Before, watched addresses and
outpoints were used as keys, with a special reply channel as the
value.  This channel was read from and replies were marshalled and
sent to the main wallet notification chan, but the goroutine handling
this marshalling never exited because the reply channel was never
closed (and couldn't have been, because there was no way to tell it
was handling notifications for any particular wallet).

Notification contexts are now primarily mapped by wallet notification
channels, and code to send the notifications send directly to the
wallet channel, with the previous goroutine reading the reply chan
properly closing.

The RPC code is also refactored with this change as well, to separate
it more from websocket code.  Websocket JSON extensions are no longer
available to RPC clients.

While here, unbreak RPC.  Previously, replies were never sent back.
This broke when I merged in my websocket code, as sends for the reply
channel in jsonRead blocked before a reader for the channel was
opened.  A 3 liner could have fixed this, but doing a proper fix
(changing jsonRead so it did not use the reply channel as it is
unneeded for the standard RPC API) is preferred.
This commit is contained in:
Josh Rickmar 2013-10-16 14:12:00 -04:00
parent 90fbae1781
commit bbcfdcf5aa

View File

@ -19,7 +19,6 @@ import (
"github.com/conformal/btcscript" "github.com/conformal/btcscript"
"github.com/conformal/btcutil" "github.com/conformal/btcutil"
"github.com/conformal/btcwire" "github.com/conformal/btcwire"
"github.com/davecgh/go-spew/spew"
"math/big" "math/big"
"net" "net"
"net/http" "net/http"
@ -31,7 +30,13 @@ import (
// Errors // Errors
var ( var (
// ErrBadParamsField describes an error where the parameters JSON
// field cannot be properly parsed.
ErrBadParamsField = errors.New("bad params field") ErrBadParamsField = errors.New("bad params field")
// ErrMethodNotImplemented describes an error where the RPC or
// websocket JSON method is not implemented.
ErrMethodNotImplemented = errors.New("method not implemented")
) )
// rpcServer holds the items the rpc server may need to access (config, // rpcServer holds the items the rpc server may need to access (config,
@ -52,20 +57,8 @@ type rpcServer struct {
// wsContext holds the items the RPC server needs to handle websocket // wsContext holds the items the RPC server needs to handle websocket
// connections for wallets. // connections for wallets.
type wsContext struct { type wsContext struct {
// txRequests maps between a 160-byte pubkey hash and slice of contexts // requests holds all wallet notification requests.
// to route replies back to the original requesting wallets. requests wsRequests
txRequests struct {
sync.RWMutex
m map[addressHash][]requesterContext
}
// spentRequests maps between the Outpoint of an unspent transaction
// output and a slice of contexts to route notifications back to the
// original requesting wallets.
spentRequests struct {
sync.RWMutex
m map[btcwire.OutPoint][]requesterContext
}
// Channel to add a wallet listener. // Channel to add a wallet listener.
addWalletListener chan (chan []byte) addWalletListener chan (chan []byte)
@ -78,14 +71,82 @@ type wsContext struct {
walletNotificationMaster chan []byte walletNotificationMaster chan []byte
} }
// wsRequests maps request contexts for wallet notifications to a
// wallet notification channel. A Mutex is used to protect incorrect
// concurrent access to the map.
type wsRequests struct {
sync.Mutex
m map[chan []byte]*requestContexts
}
// getOrCreateContexts gets the request contexts, or creates and adds a
// new context if one for this wallet is not already present.
func (r *wsRequests) getOrCreateContexts(walletNotification chan []byte) *requestContexts {
rc, ok := r.m[walletNotification]
if !ok {
rc = &requestContexts{
txRequests: make(map[addressHash]interface{}),
spentRequests: make(map[btcwire.OutPoint]interface{}),
}
r.m[walletNotification] = rc
}
return rc
}
// AddTxRequest adds the request context for new transaction notifications.
func (r *wsRequests) AddTxRequest(walletNotification chan []byte, addr addressHash, id interface{}) {
r.Lock()
defer r.Unlock()
rc := r.getOrCreateContexts(walletNotification)
rc.txRequests[addr] = id
}
// AddSpentRequest adds a request context for notifications of a spent
// Outpoint.
func (r *wsRequests) AddSpentRequest(walletNotification chan []byte, op *btcwire.OutPoint, id interface{}) {
r.Lock()
defer r.Unlock()
rc := r.getOrCreateContexts(walletNotification)
rc.spentRequests[*op] = id
}
// RemoveSpentRequest removes a request context for notifications of a
// spent Outpoint.
func (r *wsRequests) RemoveSpentRequest(walletNotification chan []byte, op *btcwire.OutPoint) {
r.Lock()
defer r.Unlock()
rc := r.getOrCreateContexts(walletNotification)
delete(rc.spentRequests, *op)
}
// CloseListeners removes all request contexts for notifications sent
// to a wallet notification channel and closes the channel to stop all
// goroutines currently serving that wallet.
func (r *wsRequests) CloseListeners(walletNotification chan []byte) {
r.Lock()
defer r.Unlock()
delete(r.m, walletNotification)
close(walletNotification)
}
type addressHash [ripemd160.Size]byte type addressHash [ripemd160.Size]byte
// requesterContext holds a slice of reply channels for wallets // requestContexts holds all requests for a single wallet connection.
// requesting information about some address, and the id of the original type requestContexts struct {
// request so notifications can be routed back to the appropiate handler. // txRequests maps between a 160-byte pubkey hash and the JSON
type requesterContext struct { // id of the requester so replies can be correctly routed back
c chan *btcjson.Reply // to the correct btcwallet callback.
id interface{} txRequests map[addressHash]interface{}
// spentRequests maps between an Outpoint of an unspent
// transaction output and the JSON id of the requester so
// replies can be correctly routed back to the correct
// btcwallet callback.
spentRequests map[btcwire.OutPoint]interface{}
} }
// Start is used by server.go to start the rpc listener. // Start is used by server.go to start the rpc listener.
@ -153,8 +214,7 @@ func newRPCServer(s *server) (*rpcServer, error) {
rpc.password = cfg.RPCPass rpc.password = cfg.RPCPass
// initialize memory for websocket connections // initialize memory for websocket connections
rpc.ws.txRequests.m = make(map[addressHash][]requesterContext) rpc.ws.requests.m = make(map[chan []byte]*requestContexts)
rpc.ws.spentRequests.m = make(map[btcwire.OutPoint][]requesterContext)
rpc.ws.addWalletListener = make(chan (chan []byte)) rpc.ws.addWalletListener = make(chan (chan []byte))
rpc.ws.removeWalletListener = make(chan (chan []byte)) rpc.ws.removeWalletListener = make(chan (chan []byte))
rpc.ws.walletNotificationMaster = make(chan []byte) rpc.ws.walletNotificationMaster = make(chan []byte)
@ -191,49 +251,42 @@ func jsonAuthFail(w http.ResponseWriter, r *http.Request, s *rpcServer) {
// jsonRPCRead is the RPC wrapper around the jsonRead function to handles // jsonRPCRead is the RPC wrapper around the jsonRead function to handles
// reading and responding to RPC messages. // reading and responding to RPC messages.
func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) {
_ = spew.Dump
r.Close = true r.Close = true
if atomic.LoadInt32(&s.shutdown) != 0 { if atomic.LoadInt32(&s.shutdown) != 0 {
return return
} }
body, err := btcjson.GetRaw(r.Body) body, err := btcjson.GetRaw(r.Body)
spew.Dump(body)
if err != nil { if err != nil {
log.Errorf("RPCS: Error getting json message: %v", err) log.Errorf("RPCS: Error getting json message: %v", err)
return return
} }
replychan := make(chan *btcjson.Reply) // Error is intentionally ignored here. It's used in in the
if err = jsonRead(replychan, body, s); err != nil { // websocket handler to tell when a method is not supported by
log.Error(err) // the standard RPC API, and is not needed here. Error logging
return // is done inside jsonRead, so no need to log the error here.
} reply, _ := jsonRead(body, s)
reply := <-replychan log.Tracef("[RPCS] reply: %v", reply)
if reply != nil { msg, err := btcjson.MarshallAndSend(reply, w)
log.Tracef("[RPCS] reply: %v", *reply)
msg, err := btcjson.MarshallAndSend(*reply, w)
if err != nil { if err != nil {
log.Errorf(msg) log.Errorf(msg)
return return
} }
log.Debugf(msg) log.Debugf(msg)
} }
}
// jsonRead abstracts the JSON unmarshalling and reply handling, // jsonRead abstracts the JSON unmarshalling and reply handling used
// returning replies across a channel. A channel is used as some websocket // by both RPC and websockets.
// method extensions require multiple replies. func jsonRead(body []byte, s *rpcServer) (reply btcjson.Reply, err error) {
func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err error) {
var message btcjson.Message var message btcjson.Message
err = json.Unmarshal(body, &message) if err := json.Unmarshal(body, &message); err != nil {
if err != nil {
jsonError := btcjson.Error{ jsonError := btcjson.Error{
Code: -32700, Code: -32700,
Message: "Parse error", Message: "Parse error",
} }
reply := btcjson.Reply{ reply = btcjson.Reply{
Result: nil, Result: nil,
Error: &jsonError, Error: &jsonError,
Id: nil, Id: nil,
@ -241,130 +294,119 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err
log.Tracef("RPCS: reply: %v", reply) log.Tracef("RPCS: reply: %v", reply)
replychan <- &reply return reply, jsonError
return fmt.Errorf("RPCS: Error unmarshalling json message: %v", err)
} }
log.Tracef("RPCS: received: %v", message) log.Tracef("RPCS: received: %v", message)
var rawReply btcjson.Reply // Set final reply based on error if non-nil.
requester := false
defer func() { defer func() {
replychan <- &rawReply if err != nil {
if !requester { if jsonErr, ok := err.(btcjson.Error); ok {
close(replychan) reply = btcjson.Reply{
Error: &jsonErr,
Id: &message.Id,
}
err = errors.New(jsonErr.Message)
} else {
rawJSONError := btcjson.Error{
Code: -32603,
Message: err.Error(),
}
reply = btcjson.Reply{
Error: &rawJSONError,
Id: &message.Id,
}
}
} }
}() }()
// Deal with commands // Deal with commands
switch message.Method { switch message.Method {
case "stop": case "stop":
rawReply = btcjson.Reply{ reply = btcjson.Reply{
Result: "btcd stopping.", Result: "btcd stopping.",
Error: nil,
Id: &message.Id, Id: &message.Id,
} }
s.server.Stop() s.server.Stop()
case "getblockcount": case "getblockcount":
_, maxidx, err := s.server.db.NewestSha() var maxidx int64
_, maxidx, err = s.server.db.NewestSha()
if err != nil { if err != nil {
log.Errorf("RPCS: Error getting newest sha: %v", err) log.Errorf("RPCS: Error getting newest sha: %v", err)
jsonError := btcjson.Error{ err = btcjson.Error{
Code: -5, Code: -5,
Message: "Error getting block count", Message: "Error getting block count",
} }
rawReply = btcjson.Reply{ return
Result: nil,
Error: &jsonError,
Id: &message.Id,
} }
log.Tracef("RPCS: reply: %v", rawReply) reply = btcjson.Reply{
break
}
rawReply = btcjson.Reply{
Result: maxidx, Result: maxidx,
Error: nil,
Id: &message.Id, Id: &message.Id,
} }
case "getbestblockhash": case "getbestblockhash":
sha, _, err := s.server.db.NewestSha() var sha *btcwire.ShaHash
sha, _, err = s.server.db.NewestSha()
if err != nil { if err != nil {
log.Errorf("RPCS: Error getting newest sha: %v", err) log.Errorf("RPCS: Error getting newest sha: %v", err)
jsonError := btcjson.Error{ err = btcjson.Error{
Code: -5, Code: -5,
Message: "Error getting best block hash", Message: "Error getting best block hash",
} }
rawReply = btcjson.Reply{ return
Result: nil,
Error: &jsonError,
Id: &message.Id,
} }
log.Tracef("RPCS: reply: %v", rawReply) reply = btcjson.Reply{
break
}
rawReply = btcjson.Reply{
Result: sha, Result: sha,
Error: nil,
Id: &message.Id, Id: &message.Id,
} }
case "getdifficulty": case "getdifficulty":
sha, _, err := s.server.db.NewestSha() var sha *btcwire.ShaHash
sha, _, err = s.server.db.NewestSha()
if err != nil { if err != nil {
log.Errorf("RPCS: Error getting sha: %v", err) log.Errorf("RPCS: Error getting sha: %v", err)
jsonError := btcjson.Error{ err = btcjson.Error{
Code: -5, Code: -5,
Message: "Error Getting difficulty", Message: "Error Getting difficulty",
} }
return
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
} }
log.Tracef("RPCS: reply: %v", rawReply) var blk *btcutil.Block
break blk, err = s.server.db.FetchBlockBySha(sha)
}
blk, err := s.server.db.FetchBlockBySha(sha)
if err != nil { if err != nil {
log.Errorf("RPCS: Error getting block: %v", err) log.Errorf("RPCS: Error getting block: %v", err)
jsonError := btcjson.Error{ err = btcjson.Error{
Code: -5, Code: -5,
Message: "Error Getting difficulty", Message: "Error Getting difficulty",
} }
return
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
log.Tracef("RPCS: reply: %v", rawReply)
break
} }
blockHeader := &blk.MsgBlock().Header blockHeader := &blk.MsgBlock().Header
rawReply = btcjson.Reply{ reply = btcjson.Reply{
Result: getDifficultyRatio(blockHeader.Bits), Result: getDifficultyRatio(blockHeader.Bits),
Error: nil,
Id: &message.Id, Id: &message.Id,
} }
// btcd does not do mining so we can hardcode replies here. // btcd does not do mining so we can hardcode replies here.
case "getgenerate": case "getgenerate":
rawReply = btcjson.Reply{ reply = btcjson.Reply{
Result: false, Result: false,
Error: nil,
Id: &message.Id, Id: &message.Id,
} }
case "setgenerate": case "setgenerate":
rawReply = btcjson.Reply{ reply = btcjson.Reply{
Result: nil, Result: nil,
Error: nil,
Id: &message.Id, Id: &message.Id,
} }
case "gethashespersec": case "gethashespersec":
rawReply = btcjson.Reply{ reply = btcjson.Reply{
Result: 0, Result: 0,
Error: nil,
Id: &message.Id, Id: &message.Id,
} }
case "getblockhash": case "getblockhash":
var f interface{} var f interface{}
err = json.Unmarshal(body, &f) err = json.Unmarshal(body, &f)
@ -379,29 +421,21 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err
default: default:
} }
} }
sha, err := s.server.db.FetchBlockShaByHeight(int64(idx)) var sha *btcwire.ShaHash
sha, err = s.server.db.FetchBlockShaByHeight(int64(idx))
if err != nil { if err != nil {
log.Errorf("[RCPS] Error getting block: %v", err) log.Errorf("[RCPS] Error getting block: %v", err)
jsonError := btcjson.Error{ err = btcjson.Error{
Code: -1, Code: -1,
Message: "Block number out of range.", Message: "Block number out of range.",
} }
return
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
} }
log.Tracef("RPCS: reply: %v", rawReply) reply = btcjson.Reply{
break
}
rawReply = btcjson.Reply{
Result: sha.String(), Result: sha.String(),
Error: nil,
Id: &message.Id, Id: &message.Id,
} }
case "getblock": case "getblock":
var f interface{} var f interface{}
err = json.Unmarshal(body, &f) err = json.Unmarshal(body, &f)
@ -416,54 +450,36 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err
default: default:
} }
} }
sha, err := btcwire.NewShaHashFromStr(hash) var sha *btcwire.ShaHash
sha, err = btcwire.NewShaHashFromStr(hash)
if err != nil { if err != nil {
log.Errorf("RPCS: Error generating sha: %v", err) log.Errorf("RPCS: Error generating sha: %v", err)
jsonError := btcjson.Error{ err = btcjson.Error{
Code: -5, Code: -5,
Message: "Block not found", Message: "Block not found",
} }
return
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
} }
log.Tracef("RPCS: reply: %v", rawReply) var blk *btcutil.Block
break blk, err = s.server.db.FetchBlockBySha(sha)
}
blk, err := s.server.db.FetchBlockBySha(sha)
if err != nil { if err != nil {
log.Errorf("RPCS: Error fetching sha: %v", err) log.Errorf("RPCS: Error fetching sha: %v", err)
jsonError := btcjson.Error{ err = btcjson.Error{
Code: -5, Code: -5,
Message: "Block not found", Message: "Block not found",
} }
return
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
log.Tracef("RPCS: reply: %v", rawReply)
break
} }
idx := blk.Height() idx := blk.Height()
buf, err := blk.Bytes() var buf []byte
buf, err = blk.Bytes()
if err != nil { if err != nil {
log.Errorf("RPCS: Error fetching block: %v", err) log.Errorf("RPCS: Error fetching block: %v", err)
jsonError := btcjson.Error{ err = btcjson.Error{
Code: -5, Code: -5,
Message: "Block not found", Message: "Block not found",
} }
return
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
log.Tracef("RPCS: reply: %v", rawReply)
break
} }
txList, _ := blk.TxShas() txList, _ := blk.TxShas()
@ -473,9 +489,15 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err
txNames[i] = v.String() txNames[i] = v.String()
} }
_, maxidx, err := s.server.db.NewestSha() var maxidx int64
_, maxidx, err = s.server.db.NewestSha()
if err != nil { if err != nil {
return fmt.Errorf("RPCS: Cannot get newest sha: %v", err) log.Errorf("RPCS: Cannot get newest sha: %v", err)
err = btcjson.Error{
Code: -5,
Message: "Block not found",
}
return
} }
blockHeader := &blk.MsgBlock().Header blockHeader := &blk.MsgBlock().Header
@ -496,20 +518,27 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err
// Get next block unless we are already at the top. // Get next block unless we are already at the top.
if idx < maxidx { if idx < maxidx {
shaNext, err := s.server.db.FetchBlockShaByHeight(int64(idx + 1)) var shaNext *btcwire.ShaHash
shaNext, err = s.server.db.FetchBlockShaByHeight(int64(idx + 1))
if err != nil { if err != nil {
log.Errorf("RPCS: No next block: %v", err) log.Errorf("RPCS: No next block: %v", err)
} else { err = btcjson.Error{
Code: -5,
Message: "Block not found",
}
return
}
blockReply.NextHash = shaNext.String() blockReply.NextHash = shaNext.String()
} }
}
rawReply = btcjson.Reply{ reply = btcjson.Reply{
Result: blockReply, Result: blockReply,
Error: nil, Error: nil,
Id: &message.Id, Id: &message.Id,
} }
case "getrawtransaction": case "getrawtransaction":
// TODO: Perform smarter paramter parsing.
var f interface{} var f interface{}
err = json.Unmarshal(body, &f) err = json.Unmarshal(body, &f)
m := f.(map[string]interface{}) m := f.(map[string]interface{})
@ -532,43 +561,33 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err
} }
if int(verbose) != 0 { if int(verbose) != 0 {
// TODO: check error code. tx is not checked before
// this point.
txSha, _ := btcwire.NewShaHashFromStr(tx) txSha, _ := btcwire.NewShaHashFromStr(tx)
var txS *btcwire.MsgTx var txS *btcwire.MsgTx
txList, err := s.server.db.FetchTxBySha(txSha) var txList []*btcdb.TxListReply
txList, err = s.server.db.FetchTxBySha(txSha)
if err != nil { if err != nil {
log.Errorf("RPCS: Error fetching tx: %v", err) log.Errorf("RPCS: Error fetching tx: %v", err)
jsonError := btcjson.Error{ err = btcjson.Error{
Code: -5, Code: -5,
Message: "No information available about transaction", Message: "No information available about transaction",
} }
return
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
log.Tracef("RPCS: reply: %v", rawReply)
break
} }
lastTx := len(txList) - 1 lastTx := len(txList) - 1
txS = txList[lastTx].Tx txS = txList[lastTx].Tx
blksha := txList[lastTx].BlkSha blksha := txList[lastTx].BlkSha
blk, err := s.server.db.FetchBlockBySha(blksha) var blk *btcutil.Block
blk, err = s.server.db.FetchBlockBySha(blksha)
if err != nil { if err != nil {
log.Errorf("RPCS: Error fetching sha: %v", err) log.Errorf("RPCS: Error fetching sha: %v", err)
jsonError := btcjson.Error{ err = btcjson.Error{
Code: -5, Code: -5,
Message: "Block not found", Message: "Block not found",
} }
return
rawReply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
log.Tracef("RPCS: reply: %v", rawReply)
break
} }
idx := blk.Height() idx := blk.Height()
@ -594,18 +613,26 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err
voutList[i].ScriptPubKey.ReqSig = strings.Count(isbuf, "OP_CHECKSIG") voutList[i].ScriptPubKey.ReqSig = strings.Count(isbuf, "OP_CHECKSIG")
_, addrhash, err := btcscript.ScriptToAddrHash(v.PkScript) _, addrhash, err := btcscript.ScriptToAddrHash(v.PkScript)
if err != nil { if err != nil {
// TODO: set and return error?
log.Errorf("RPCS: Error getting address hash for %v: %v", txSha, err) log.Errorf("RPCS: Error getting address hash for %v: %v", txSha, err)
} }
if addr, err := btcutil.EncodeAddress(addrhash, s.server.btcnet); err != nil { if addr, err := btcutil.EncodeAddress(addrhash, s.server.btcnet); err != nil {
// TODO: set and return error?
addrList := make([]string, 1) addrList := make([]string, 1)
addrList[0] = addr addrList[0] = addr
voutList[i].ScriptPubKey.Addresses = addrList voutList[i].ScriptPubKey.Addresses = addrList
} }
} }
_, maxidx, err := s.server.db.NewestSha() var maxidx int64
_, maxidx, err = s.server.db.NewestSha()
if err != nil { if err != nil {
return fmt.Errorf("RPCS: Cannot get newest sha: %v", err) log.Errorf("RPCS: Cannot get newest sha: %v", err)
err = btcjson.Error{
Code: -5,
Message: "No information about newest block",
}
return
} }
confirmations := uint64(1 + maxidx - idx) confirmations := uint64(1 + maxidx - idx)
@ -623,7 +650,7 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err
BlockHash: blksha.String(), BlockHash: blksha.String(),
Confirmations: confirmations, Confirmations: confirmations,
} }
rawReply = btcjson.Reply{ reply = btcjson.Reply{
Result: txReply, Result: txReply,
Error: nil, Error: nil,
Id: &message.Id, Id: &message.Id,
@ -632,7 +659,9 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err
// Don't return details // Don't return details
// not used yet // not used yet
} }
case "decoderawtransaction": case "decoderawtransaction":
// TODO: Perform smarter paramter parsing.
var f interface{} var f interface{}
err = json.Unmarshal(body, &f) err = json.Unmarshal(body, &f)
m := f.(map[string]interface{}) m := f.(map[string]interface{})
@ -646,82 +675,60 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err
default: default:
} }
} }
spew.Dump(hash) // TODO: use hash and fill result with info.
_ = hash
txReply := btcjson.TxRawDecodeResult{} txReply := btcjson.TxRawDecodeResult{}
rawReply = btcjson.Reply{ reply = btcjson.Reply{
Result: txReply, Result: txReply,
Error: nil, Error: nil,
Id: &message.Id, Id: &message.Id,
} }
case "sendrawtransaction": case "sendrawtransaction":
params, ok := message.Params.([]interface{}) params, ok := message.Params.([]interface{})
if !ok || len(params) != 1 { if !ok || len(params) != 1 {
jsonError := btcjson.Error{ err = btcjson.Error{
Code: -32602, Code: -32602,
Message: "Invalid parameters", Message: "Invalid parameters",
} }
rawReply = btcjson.Reply{ return
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
return ErrBadParamsField
} }
serializedtxhex, ok := params[0].(string) serializedtxhex, ok := params[0].(string)
if !ok { if !ok {
jsonError := btcjson.Error{ err = btcjson.Error{
Code: -32602, Code: -32602,
Message: "Raw tx is not a string", Message: "Raw tx is not a string",
} }
rawReply = btcjson.Reply{ return
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
return ErrBadParamsField
} }
// Deserialize and send off to tx relay // Deserialize and send off to tx relay
serializedtx, err := hex.DecodeString(serializedtxhex) var serializedTx []byte
serializedTx, err = hex.DecodeString(serializedtxhex)
if err != nil { if err != nil {
jsonError := btcjson.Error{ err = btcjson.Error{
Code: -22, Code: -22,
Message: "Unable to decode hex string", Message: "Unable to decode hex string",
} }
rawReply = btcjson.Reply{ return
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
return err
} }
msgtx := btcwire.NewMsgTx() msgtx := btcwire.NewMsgTx()
err = msgtx.Deserialize(bytes.NewBuffer(serializedtx)) err = msgtx.Deserialize(bytes.NewBuffer(serializedTx))
if err != nil { if err != nil {
jsonError := btcjson.Error{ err = btcjson.Error{
Code: -22, Code: -22,
Message: "Unable to deserialize raw tx", Message: "Unable to deserialize raw tx",
} }
rawReply = btcjson.Reply{ return
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
return err
} }
err = s.server.txMemPool.ProcessTransaction(msgtx) err = s.server.txMemPool.ProcessTransaction(msgtx)
if err != nil { if err != nil {
log.Errorf("RPCS: Failed to process transaction: %v", err) log.Errorf("RPCS: Failed to process transaction: %v", err)
jsonError := btcjson.Error{ err = btcjson.Error{
Code: -22, Code: -22,
Message: "Failed to process transaction", Message: "Failed to process transaction",
} }
rawReply = btcjson.Reply{ return
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
return err
} }
var result interface{} var result interface{}
@ -729,13 +736,58 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err
if err == nil { if err == nil {
result = txsha.String() result = txsha.String()
} }
rawReply = btcjson.Reply{ reply = btcjson.Reply{
Result: result, Result: result,
Error: nil, Error: nil,
Id: &message.Id, Id: &message.Id,
} }
// Extensions default:
jsonError := btcjson.Error{
Code: -32601,
Message: "Method not found",
}
reply = btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: &message.Id,
}
err = ErrMethodNotImplemented
}
return
}
func jsonWSRead(walletNotification chan []byte, replychan chan *btcjson.Reply, body []byte, s *rpcServer) error {
var message btcjson.Message
err := json.Unmarshal(body, &message)
if err != nil {
jsonError := btcjson.Error{
Code: -32700,
Message: "Parse error",
}
reply := btcjson.Reply{
Result: nil,
Error: &jsonError,
Id: nil,
}
log.Tracef("RPCS: reply: %v", reply)
replychan <- &reply
return fmt.Errorf("RPCS: Error unmarshalling json message: %v", err)
}
log.Tracef("RPCS: received: %v", message)
var rawReply btcjson.Reply
defer func() {
replychan <- &rawReply
close(replychan)
}()
// Deal with commands
switch message.Method {
case "getcurrentnet": case "getcurrentnet":
var net btcwire.BitcoinNet var net btcwire.BitcoinNet
if cfg.TestNet3 { if cfg.TestNet3 {
@ -868,21 +920,13 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err
} }
var hash addressHash var hash addressHash
copy(hash[:], addrhash) copy(hash[:], addrhash)
s.ws.txRequests.Lock() s.ws.requests.AddTxRequest(walletNotification, hash, message.Id)
cxts := s.ws.txRequests.m[hash]
cxt := requesterContext{
c: replychan,
id: message.Id,
}
s.ws.txRequests.m[hash] = append(cxts, cxt)
s.ws.txRequests.Unlock()
rawReply = btcjson.Reply{ rawReply = btcjson.Reply{
Result: nil, Result: nil,
Error: nil, Error: nil,
Id: &message.Id, Id: &message.Id,
} }
requester = true
case "notifyspent": case "notifyspent":
params, ok := message.Params.([]interface{}) params, ok := message.Params.([]interface{})
@ -912,7 +956,6 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err
} }
return ErrBadParamsField return ErrBadParamsField
} }
s.ws.spentRequests.Lock()
hash, err := btcwire.NewShaHashFromStr(hashBE) hash, err := btcwire.NewShaHashFromStr(hashBE)
if err != nil { if err != nil {
jsonError := btcjson.Error{ jsonError := btcjson.Error{
@ -927,20 +970,13 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err
return ErrBadParamsField return ErrBadParamsField
} }
op := btcwire.NewOutPoint(hash, uint32(index)) op := btcwire.NewOutPoint(hash, uint32(index))
cxts := s.ws.spentRequests.m[*op] s.ws.requests.AddSpentRequest(walletNotification, op, message.Id)
cxt := requesterContext{
c: replychan,
id: message.Id,
}
s.ws.spentRequests.m[*op] = append(cxts, cxt)
s.ws.spentRequests.Unlock()
rawReply = btcjson.Reply{ rawReply = btcjson.Reply{
Result: nil, Result: nil,
Error: nil, Error: nil,
Id: &message.Id, Id: &message.Id,
} }
requester = true
default: default:
jsonError := btcjson.Error{ jsonError := btcjson.Error{
@ -953,8 +989,7 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err
Id: &message.Id, Id: &message.Id,
} }
} }
return ErrMethodNotImplemented
return nil
} }
// getDifficultyRatio returns the proof-of-work difficulty as a multiple of the // getDifficultyRatio returns the proof-of-work difficulty as a multiple of the
@ -1097,14 +1132,27 @@ func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn) {
// websocketJSONHandler parses and handles a marshalled json message, // websocketJSONHandler parses and handles a marshalled json message,
// sending the marshalled reply to a wallet notification channel. // sending the marshalled reply to a wallet notification channel.
func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []byte) { func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []byte) {
replychan := make(chan *btcjson.Reply) s.wg.Add(1)
reply, err := jsonRead(msg, s)
s.wg.Done()
if err != ErrMethodNotImplemented {
replyBytes, err := json.Marshal(reply)
if err != nil {
log.Errorf("RPCS: Error marshalling reply: %v", err)
}
walletNotification <- replyBytes
return
}
// Try websocket extensions
replychan := make(chan *btcjson.Reply)
go func() { go func() {
for { for {
select { select {
case reply, ok := <-replychan: case reply, ok := <-replychan:
if !ok { if !ok {
// jsonRead() function called below has finished. // no more replies expected.
return return
} }
if reply == nil { if reply == nil {
@ -1113,7 +1161,7 @@ func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []b
log.Tracef("[RPCS] reply: %v", *reply) log.Tracef("[RPCS] reply: %v", *reply)
replyBytes, err := json.Marshal(reply) replyBytes, err := json.Marshal(reply)
if err != nil { if err != nil {
log.Errorf("[RPCS] Error Marshalling reply: %v", err) log.Errorf("RPCS: Error Marshalling reply: %v", err)
return return
} }
walletNotification <- replyBytes walletNotification <- replyBytes
@ -1124,10 +1172,13 @@ func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []b
} }
}() }()
if err == ErrMethodNotImplemented {
// Try websocket extensions
s.wg.Add(1) s.wg.Add(1)
err := jsonRead(replychan, msg, s) err = jsonWSRead(walletNotification, replychan, msg, s)
s.wg.Done() s.wg.Done()
if err != nil { }
if err != nil && err != ErrMethodNotImplemented {
log.Error(err) log.Error(err)
} }
} }
@ -1200,10 +1251,10 @@ func (s *rpcServer) NotifyNewTxListeners(db btcdb.Db, block *btcutil.Block) {
// each transaction input of a new block and perform any checks and // each transaction input of a new block and perform any checks and
// notify listening frontends when necessary. // notify listening frontends when necessary.
func (s *rpcServer) newBlockNotifyCheckTxIn(txins []*btcwire.TxIn) { func (s *rpcServer) newBlockNotifyCheckTxIn(txins []*btcwire.TxIn) {
for wltNtfn, cxt := range s.ws.requests.m {
for _, txin := range txins { for _, txin := range txins {
s.ws.spentRequests.RLock() for op, id := range cxt.spentRequests {
for out, cxts := range s.ws.spentRequests.m { if txin.PreviousOutpoint != op {
if txin.PreviousOutpoint != out {
continue continue
} }
@ -1212,24 +1263,21 @@ func (s *rpcServer) newBlockNotifyCheckTxIn(txins []*btcwire.TxIn) {
TxHash string `json:"txhash"` TxHash string `json:"txhash"`
Index uint32 `json:"index"` Index uint32 `json:"index"`
}{ }{
TxHash: out.Hash.String(), TxHash: op.Hash.String(),
Index: uint32(out.Index), Index: uint32(op.Index),
}, },
Error: nil, Error: nil,
// Id is set for each requester separately below. Id: &id,
} }
for _, cxt := range cxts { replyBytes, err := json.Marshal(reply)
reply.Id = &cxt.id if err != nil {
cxt.c <- reply log.Errorf("RPCS: Unable to marshal spent notification: %v", err)
continue
}
wltNtfn <- replyBytes
s.ws.requests.RemoveSpentRequest(wltNtfn, &op)
} }
s.ws.spentRequests.RUnlock()
s.ws.spentRequests.Lock()
delete(s.ws.spentRequests.m, out)
s.ws.spentRequests.Unlock()
s.ws.spentRequests.RLock()
} }
s.ws.spentRequests.RUnlock()
} }
} }
@ -1237,14 +1285,14 @@ func (s *rpcServer) newBlockNotifyCheckTxIn(txins []*btcwire.TxIn) {
// each transaction output of a new block and perform any checks and // each transaction output of a new block and perform any checks and
// notify listening frontends when necessary. // notify listening frontends when necessary.
func (s *rpcServer) newBlockNotifyCheckTxOut(db btcdb.Db, block *btcutil.Block, tx *btcdb.TxListReply) { func (s *rpcServer) newBlockNotifyCheckTxOut(db btcdb.Db, block *btcutil.Block, tx *btcdb.TxListReply) {
for wltNtfn, cxt := range s.ws.requests.m {
for i, txout := range tx.Tx.TxOut { for i, txout := range tx.Tx.TxOut {
_, txaddrhash, err := btcscript.ScriptToAddrHash(txout.PkScript) _, txaddrhash, err := btcscript.ScriptToAddrHash(txout.PkScript)
if err != nil { if err != nil {
log.Debug("Error getting payment address from tx; dropping any Tx notifications.") log.Debug("Error getting payment address from tx; dropping any Tx notifications.")
break break
} }
s.ws.txRequests.RLock() for addr, id := range cxt.txRequests {
for addr, cxts := range s.ws.txRequests.m {
if !bytes.Equal(addr[:], txaddrhash) { if !bytes.Equal(addr[:], txaddrhash) {
continue continue
} }
@ -1281,13 +1329,15 @@ func (s *rpcServer) newBlockNotifyCheckTxOut(db btcdb.Db, block *btcutil.Block,
Spent: tx.TxSpent[i], Spent: tx.TxSpent[i],
}, },
Error: nil, Error: nil,
// Id is set for each requester separately below. Id: &id,
} }
for _, cxt := range cxts { replyBytes, err := json.Marshal(reply)
reply.Id = &cxt.id if err != nil {
cxt.c <- reply log.Errorf("RPCS: Unable to marshal tx notification: %v", err)
continue
}
wltNtfn <- replyBytes
} }
} }
s.ws.txRequests.RUnlock()
} }
} }