diff --git a/rpcserver.go b/rpcserver.go index a6ce14b6d..9f20598d8 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -19,7 +19,6 @@ import ( "github.com/conformal/btcscript" "github.com/conformal/btcutil" "github.com/conformal/btcwire" - "github.com/davecgh/go-spew/spew" "math/big" "net" "net/http" @@ -31,7 +30,13 @@ import ( // Errors var ( + // ErrBadParamsField describes an error where the parameters JSON + // field cannot be properly parsed. ErrBadParamsField = errors.New("bad params field") + + // ErrMethodNotImplemented describes an error where the RPC or + // websocket JSON method is not implemented. + ErrMethodNotImplemented = errors.New("method not implemented") ) // rpcServer holds the items the rpc server may need to access (config, @@ -52,20 +57,8 @@ type rpcServer struct { // wsContext holds the items the RPC server needs to handle websocket // connections for wallets. type wsContext struct { - // txRequests maps between a 160-byte pubkey hash and slice of contexts - // to route replies back to the original requesting wallets. - txRequests struct { - sync.RWMutex - m map[addressHash][]requesterContext - } - - // spentRequests maps between the Outpoint of an unspent transaction - // output and a slice of contexts to route notifications back to the - // original requesting wallets. - spentRequests struct { - sync.RWMutex - m map[btcwire.OutPoint][]requesterContext - } + // requests holds all wallet notification requests. + requests wsRequests // Channel to add a wallet listener. addWalletListener chan (chan []byte) @@ -78,14 +71,82 @@ type wsContext struct { walletNotificationMaster chan []byte } +// wsRequests maps request contexts for wallet notifications to a +// wallet notification channel. A Mutex is used to protect incorrect +// concurrent access to the map. +type wsRequests struct { + sync.Mutex + m map[chan []byte]*requestContexts +} + +// getOrCreateContexts gets the request contexts, or creates and adds a +// new context if one for this wallet is not already present. +func (r *wsRequests) getOrCreateContexts(walletNotification chan []byte) *requestContexts { + rc, ok := r.m[walletNotification] + if !ok { + rc = &requestContexts{ + txRequests: make(map[addressHash]interface{}), + spentRequests: make(map[btcwire.OutPoint]interface{}), + } + r.m[walletNotification] = rc + } + return rc +} + +// AddTxRequest adds the request context for new transaction notifications. +func (r *wsRequests) AddTxRequest(walletNotification chan []byte, addr addressHash, id interface{}) { + r.Lock() + defer r.Unlock() + + rc := r.getOrCreateContexts(walletNotification) + rc.txRequests[addr] = id +} + +// AddSpentRequest adds a request context for notifications of a spent +// Outpoint. +func (r *wsRequests) AddSpentRequest(walletNotification chan []byte, op *btcwire.OutPoint, id interface{}) { + r.Lock() + defer r.Unlock() + + rc := r.getOrCreateContexts(walletNotification) + rc.spentRequests[*op] = id +} + +// RemoveSpentRequest removes a request context for notifications of a +// spent Outpoint. +func (r *wsRequests) RemoveSpentRequest(walletNotification chan []byte, op *btcwire.OutPoint) { + r.Lock() + defer r.Unlock() + + rc := r.getOrCreateContexts(walletNotification) + delete(rc.spentRequests, *op) +} + +// CloseListeners removes all request contexts for notifications sent +// to a wallet notification channel and closes the channel to stop all +// goroutines currently serving that wallet. +func (r *wsRequests) CloseListeners(walletNotification chan []byte) { + r.Lock() + defer r.Unlock() + + delete(r.m, walletNotification) + close(walletNotification) +} + type addressHash [ripemd160.Size]byte -// requesterContext holds a slice of reply channels for wallets -// requesting information about some address, and the id of the original -// request so notifications can be routed back to the appropiate handler. -type requesterContext struct { - c chan *btcjson.Reply - id interface{} +// requestContexts holds all requests for a single wallet connection. +type requestContexts struct { + // txRequests maps between a 160-byte pubkey hash and the JSON + // id of the requester so replies can be correctly routed back + // to the correct btcwallet callback. + txRequests map[addressHash]interface{} + + // spentRequests maps between an Outpoint of an unspent + // transaction output and the JSON id of the requester so + // replies can be correctly routed back to the correct + // btcwallet callback. + spentRequests map[btcwire.OutPoint]interface{} } // Start is used by server.go to start the rpc listener. @@ -153,8 +214,7 @@ func newRPCServer(s *server) (*rpcServer, error) { rpc.password = cfg.RPCPass // initialize memory for websocket connections - rpc.ws.txRequests.m = make(map[addressHash][]requesterContext) - rpc.ws.spentRequests.m = make(map[btcwire.OutPoint][]requesterContext) + rpc.ws.requests.m = make(map[chan []byte]*requestContexts) rpc.ws.addWalletListener = make(chan (chan []byte)) rpc.ws.removeWalletListener = make(chan (chan []byte)) rpc.ws.walletNotificationMaster = make(chan []byte) @@ -191,49 +251,42 @@ func jsonAuthFail(w http.ResponseWriter, r *http.Request, s *rpcServer) { // jsonRPCRead is the RPC wrapper around the jsonRead function to handles // reading and responding to RPC messages. func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { - _ = spew.Dump r.Close = true if atomic.LoadInt32(&s.shutdown) != 0 { return } body, err := btcjson.GetRaw(r.Body) - spew.Dump(body) if err != nil { log.Errorf("RPCS: Error getting json message: %v", err) return } - replychan := make(chan *btcjson.Reply) - if err = jsonRead(replychan, body, s); err != nil { - log.Error(err) + // Error is intentionally ignored here. It's used in in the + // websocket handler to tell when a method is not supported by + // the standard RPC API, and is not needed here. Error logging + // is done inside jsonRead, so no need to log the error here. + reply, _ := jsonRead(body, s) + log.Tracef("[RPCS] reply: %v", reply) + + msg, err := btcjson.MarshallAndSend(reply, w) + if err != nil { + log.Errorf(msg) return } - reply := <-replychan - - if reply != nil { - log.Tracef("[RPCS] reply: %v", *reply) - msg, err := btcjson.MarshallAndSend(*reply, w) - if err != nil { - log.Errorf(msg) - return - } - log.Debugf(msg) - } + log.Debugf(msg) } -// jsonRead abstracts the JSON unmarshalling and reply handling, -// returning replies across a channel. A channel is used as some websocket -// method extensions require multiple replies. -func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err error) { +// jsonRead abstracts the JSON unmarshalling and reply handling used +// by both RPC and websockets. +func jsonRead(body []byte, s *rpcServer) (reply btcjson.Reply, err error) { var message btcjson.Message - err = json.Unmarshal(body, &message) - if err != nil { + if err := json.Unmarshal(body, &message); err != nil { jsonError := btcjson.Error{ Code: -32700, Message: "Parse error", } - reply := btcjson.Reply{ + reply = btcjson.Reply{ Result: nil, Error: &jsonError, Id: nil, @@ -241,130 +294,119 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err log.Tracef("RPCS: reply: %v", reply) - replychan <- &reply - return fmt.Errorf("RPCS: Error unmarshalling json message: %v", err) + return reply, jsonError } log.Tracef("RPCS: received: %v", message) - var rawReply btcjson.Reply - requester := false - + // Set final reply based on error if non-nil. defer func() { - replychan <- &rawReply - if !requester { - close(replychan) + if err != nil { + if jsonErr, ok := err.(btcjson.Error); ok { + reply = btcjson.Reply{ + Error: &jsonErr, + Id: &message.Id, + } + err = errors.New(jsonErr.Message) + } else { + rawJSONError := btcjson.Error{ + Code: -32603, + Message: err.Error(), + } + reply = btcjson.Reply{ + Error: &rawJSONError, + Id: &message.Id, + } + } } }() // Deal with commands switch message.Method { case "stop": - rawReply = btcjson.Reply{ + reply = btcjson.Reply{ Result: "btcd stopping.", - Error: nil, Id: &message.Id, } s.server.Stop() + case "getblockcount": - _, maxidx, err := s.server.db.NewestSha() + var maxidx int64 + _, maxidx, err = s.server.db.NewestSha() if err != nil { log.Errorf("RPCS: Error getting newest sha: %v", err) - jsonError := btcjson.Error{ + err = btcjson.Error{ Code: -5, Message: "Error getting block count", } - rawReply = btcjson.Reply{ - Result: nil, - Error: &jsonError, - Id: &message.Id, - } - log.Tracef("RPCS: reply: %v", rawReply) - break + return } - rawReply = btcjson.Reply{ + reply = btcjson.Reply{ Result: maxidx, - Error: nil, Id: &message.Id, } + case "getbestblockhash": - sha, _, err := s.server.db.NewestSha() + var sha *btcwire.ShaHash + sha, _, err = s.server.db.NewestSha() if err != nil { log.Errorf("RPCS: Error getting newest sha: %v", err) - jsonError := btcjson.Error{ + err = btcjson.Error{ Code: -5, Message: "Error getting best block hash", } - rawReply = btcjson.Reply{ - Result: nil, - Error: &jsonError, - Id: &message.Id, - } - log.Tracef("RPCS: reply: %v", rawReply) - break + return } - rawReply = btcjson.Reply{ + reply = btcjson.Reply{ Result: sha, - Error: nil, Id: &message.Id, } + case "getdifficulty": - sha, _, err := s.server.db.NewestSha() + var sha *btcwire.ShaHash + sha, _, err = s.server.db.NewestSha() if err != nil { log.Errorf("RPCS: Error getting sha: %v", err) - jsonError := btcjson.Error{ + err = btcjson.Error{ Code: -5, Message: "Error Getting difficulty", } - - rawReply = btcjson.Reply{ - Result: nil, - Error: &jsonError, - Id: &message.Id, - } - log.Tracef("RPCS: reply: %v", rawReply) - break + return } - blk, err := s.server.db.FetchBlockBySha(sha) + var blk *btcutil.Block + blk, err = s.server.db.FetchBlockBySha(sha) if err != nil { log.Errorf("RPCS: Error getting block: %v", err) - jsonError := btcjson.Error{ + err = btcjson.Error{ Code: -5, Message: "Error Getting difficulty", } - - rawReply = btcjson.Reply{ - Result: nil, - Error: &jsonError, - Id: &message.Id, - } - log.Tracef("RPCS: reply: %v", rawReply) - break + return } blockHeader := &blk.MsgBlock().Header - rawReply = btcjson.Reply{ + reply = btcjson.Reply{ Result: getDifficultyRatio(blockHeader.Bits), - Error: nil, Id: &message.Id, } + // btcd does not do mining so we can hardcode replies here. case "getgenerate": - rawReply = btcjson.Reply{ + reply = btcjson.Reply{ Result: false, - Error: nil, Id: &message.Id, } + case "setgenerate": - rawReply = btcjson.Reply{ + reply = btcjson.Reply{ Result: nil, - Error: nil, Id: &message.Id, } + case "gethashespersec": - rawReply = btcjson.Reply{ + reply = btcjson.Reply{ Result: 0, - Error: nil, Id: &message.Id, } + case "getblockhash": var f interface{} err = json.Unmarshal(body, &f) @@ -379,29 +421,21 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err default: } } - sha, err := s.server.db.FetchBlockShaByHeight(int64(idx)) + var sha *btcwire.ShaHash + sha, err = s.server.db.FetchBlockShaByHeight(int64(idx)) if err != nil { log.Errorf("[RCPS] Error getting block: %v", err) - jsonError := btcjson.Error{ + err = btcjson.Error{ Code: -1, Message: "Block number out of range.", } - - rawReply = btcjson.Reply{ - Result: nil, - Error: &jsonError, - Id: &message.Id, - } - log.Tracef("RPCS: reply: %v", rawReply) - break - + return } - - rawReply = btcjson.Reply{ + reply = btcjson.Reply{ Result: sha.String(), - Error: nil, Id: &message.Id, } + case "getblock": var f interface{} err = json.Unmarshal(body, &f) @@ -416,54 +450,36 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err default: } } - sha, err := btcwire.NewShaHashFromStr(hash) + var sha *btcwire.ShaHash + sha, err = btcwire.NewShaHashFromStr(hash) if err != nil { log.Errorf("RPCS: Error generating sha: %v", err) - jsonError := btcjson.Error{ + err = btcjson.Error{ Code: -5, Message: "Block not found", } - - rawReply = btcjson.Reply{ - Result: nil, - Error: &jsonError, - Id: &message.Id, - } - log.Tracef("RPCS: reply: %v", rawReply) - break + return } - blk, err := s.server.db.FetchBlockBySha(sha) + var blk *btcutil.Block + blk, err = s.server.db.FetchBlockBySha(sha) if err != nil { log.Errorf("RPCS: Error fetching sha: %v", err) - jsonError := btcjson.Error{ + err = btcjson.Error{ Code: -5, Message: "Block not found", } - - rawReply = btcjson.Reply{ - Result: nil, - Error: &jsonError, - Id: &message.Id, - } - log.Tracef("RPCS: reply: %v", rawReply) - break + return } idx := blk.Height() - buf, err := blk.Bytes() + var buf []byte + buf, err = blk.Bytes() if err != nil { log.Errorf("RPCS: Error fetching block: %v", err) - jsonError := btcjson.Error{ + err = btcjson.Error{ Code: -5, Message: "Block not found", } - - rawReply = btcjson.Reply{ - Result: nil, - Error: &jsonError, - Id: &message.Id, - } - log.Tracef("RPCS: reply: %v", rawReply) - break + return } txList, _ := blk.TxShas() @@ -473,9 +489,15 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err txNames[i] = v.String() } - _, maxidx, err := s.server.db.NewestSha() + var maxidx int64 + _, maxidx, err = s.server.db.NewestSha() if err != nil { - return fmt.Errorf("RPCS: Cannot get newest sha: %v", err) + log.Errorf("RPCS: Cannot get newest sha: %v", err) + err = btcjson.Error{ + Code: -5, + Message: "Block not found", + } + return } blockHeader := &blk.MsgBlock().Header @@ -496,20 +518,27 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err // Get next block unless we are already at the top. if idx < maxidx { - shaNext, err := s.server.db.FetchBlockShaByHeight(int64(idx + 1)) + var shaNext *btcwire.ShaHash + shaNext, err = s.server.db.FetchBlockShaByHeight(int64(idx + 1)) if err != nil { log.Errorf("RPCS: No next block: %v", err) - } else { - blockReply.NextHash = shaNext.String() + err = btcjson.Error{ + Code: -5, + Message: "Block not found", + } + return } + blockReply.NextHash = shaNext.String() } - rawReply = btcjson.Reply{ + reply = btcjson.Reply{ Result: blockReply, Error: nil, Id: &message.Id, } + case "getrawtransaction": + // TODO: Perform smarter paramter parsing. var f interface{} err = json.Unmarshal(body, &f) m := f.(map[string]interface{}) @@ -532,43 +561,33 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err } if int(verbose) != 0 { + // TODO: check error code. tx is not checked before + // this point. txSha, _ := btcwire.NewShaHashFromStr(tx) var txS *btcwire.MsgTx - txList, err := s.server.db.FetchTxBySha(txSha) + var txList []*btcdb.TxListReply + txList, err = s.server.db.FetchTxBySha(txSha) if err != nil { log.Errorf("RPCS: Error fetching tx: %v", err) - jsonError := btcjson.Error{ + err = btcjson.Error{ Code: -5, Message: "No information available about transaction", } - - rawReply = btcjson.Reply{ - Result: nil, - Error: &jsonError, - Id: &message.Id, - } - log.Tracef("RPCS: reply: %v", rawReply) - break + return } lastTx := len(txList) - 1 txS = txList[lastTx].Tx blksha := txList[lastTx].BlkSha - blk, err := s.server.db.FetchBlockBySha(blksha) + var blk *btcutil.Block + blk, err = s.server.db.FetchBlockBySha(blksha) if err != nil { log.Errorf("RPCS: Error fetching sha: %v", err) - jsonError := btcjson.Error{ + err = btcjson.Error{ Code: -5, Message: "Block not found", } - - rawReply = btcjson.Reply{ - Result: nil, - Error: &jsonError, - Id: &message.Id, - } - log.Tracef("RPCS: reply: %v", rawReply) - break + return } idx := blk.Height() @@ -594,18 +613,26 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err voutList[i].ScriptPubKey.ReqSig = strings.Count(isbuf, "OP_CHECKSIG") _, addrhash, err := btcscript.ScriptToAddrHash(v.PkScript) if err != nil { + // TODO: set and return error? log.Errorf("RPCS: Error getting address hash for %v: %v", txSha, err) } if addr, err := btcutil.EncodeAddress(addrhash, s.server.btcnet); err != nil { + // TODO: set and return error? addrList := make([]string, 1) addrList[0] = addr voutList[i].ScriptPubKey.Addresses = addrList } } - _, maxidx, err := s.server.db.NewestSha() + var maxidx int64 + _, maxidx, err = s.server.db.NewestSha() if err != nil { - return fmt.Errorf("RPCS: Cannot get newest sha: %v", err) + log.Errorf("RPCS: Cannot get newest sha: %v", err) + err = btcjson.Error{ + Code: -5, + Message: "No information about newest block", + } + return } confirmations := uint64(1 + maxidx - idx) @@ -623,7 +650,7 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err BlockHash: blksha.String(), Confirmations: confirmations, } - rawReply = btcjson.Reply{ + reply = btcjson.Reply{ Result: txReply, Error: nil, Id: &message.Id, @@ -632,7 +659,9 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err // Don't return details // not used yet } + case "decoderawtransaction": + // TODO: Perform smarter paramter parsing. var f interface{} err = json.Unmarshal(body, &f) m := f.(map[string]interface{}) @@ -646,82 +675,60 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err default: } } - spew.Dump(hash) + // TODO: use hash and fill result with info. + _ = hash txReply := btcjson.TxRawDecodeResult{} - rawReply = btcjson.Reply{ + reply = btcjson.Reply{ Result: txReply, Error: nil, Id: &message.Id, } + case "sendrawtransaction": params, ok := message.Params.([]interface{}) if !ok || len(params) != 1 { - jsonError := btcjson.Error{ + err = btcjson.Error{ Code: -32602, Message: "Invalid parameters", } - rawReply = btcjson.Reply{ - Result: nil, - Error: &jsonError, - Id: &message.Id, - } - return ErrBadParamsField + return } serializedtxhex, ok := params[0].(string) if !ok { - jsonError := btcjson.Error{ + err = btcjson.Error{ Code: -32602, Message: "Raw tx is not a string", } - rawReply = btcjson.Reply{ - Result: nil, - Error: &jsonError, - Id: &message.Id, - } - return ErrBadParamsField + return } // Deserialize and send off to tx relay - serializedtx, err := hex.DecodeString(serializedtxhex) + var serializedTx []byte + serializedTx, err = hex.DecodeString(serializedtxhex) if err != nil { - jsonError := btcjson.Error{ + err = btcjson.Error{ Code: -22, Message: "Unable to decode hex string", } - rawReply = btcjson.Reply{ - Result: nil, - Error: &jsonError, - Id: &message.Id, - } - return err + return } msgtx := btcwire.NewMsgTx() - err = msgtx.Deserialize(bytes.NewBuffer(serializedtx)) + err = msgtx.Deserialize(bytes.NewBuffer(serializedTx)) if err != nil { - jsonError := btcjson.Error{ + err = btcjson.Error{ Code: -22, Message: "Unable to deserialize raw tx", } - rawReply = btcjson.Reply{ - Result: nil, - Error: &jsonError, - Id: &message.Id, - } - return err + return } err = s.server.txMemPool.ProcessTransaction(msgtx) if err != nil { log.Errorf("RPCS: Failed to process transaction: %v", err) - jsonError := btcjson.Error{ + err = btcjson.Error{ Code: -22, Message: "Failed to process transaction", } - rawReply = btcjson.Reply{ - Result: nil, - Error: &jsonError, - Id: &message.Id, - } - return err + return } var result interface{} @@ -729,13 +736,58 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err if err == nil { result = txsha.String() } - rawReply = btcjson.Reply{ + reply = btcjson.Reply{ Result: result, Error: nil, Id: &message.Id, } - // Extensions + default: + jsonError := btcjson.Error{ + Code: -32601, + Message: "Method not found", + } + reply = btcjson.Reply{ + Result: nil, + Error: &jsonError, + Id: &message.Id, + } + err = ErrMethodNotImplemented + } + + return +} + +func jsonWSRead(walletNotification chan []byte, replychan chan *btcjson.Reply, body []byte, s *rpcServer) error { + var message btcjson.Message + err := json.Unmarshal(body, &message) + if err != nil { + jsonError := btcjson.Error{ + Code: -32700, + Message: "Parse error", + } + + reply := btcjson.Reply{ + Result: nil, + Error: &jsonError, + Id: nil, + } + + log.Tracef("RPCS: reply: %v", reply) + + replychan <- &reply + return fmt.Errorf("RPCS: Error unmarshalling json message: %v", err) + } + log.Tracef("RPCS: received: %v", message) + + var rawReply btcjson.Reply + defer func() { + replychan <- &rawReply + close(replychan) + }() + + // Deal with commands + switch message.Method { case "getcurrentnet": var net btcwire.BitcoinNet if cfg.TestNet3 { @@ -868,21 +920,13 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err } var hash addressHash copy(hash[:], addrhash) - s.ws.txRequests.Lock() - cxts := s.ws.txRequests.m[hash] - cxt := requesterContext{ - c: replychan, - id: message.Id, - } - s.ws.txRequests.m[hash] = append(cxts, cxt) - s.ws.txRequests.Unlock() + s.ws.requests.AddTxRequest(walletNotification, hash, message.Id) rawReply = btcjson.Reply{ Result: nil, Error: nil, Id: &message.Id, } - requester = true case "notifyspent": params, ok := message.Params.([]interface{}) @@ -912,7 +956,6 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err } return ErrBadParamsField } - s.ws.spentRequests.Lock() hash, err := btcwire.NewShaHashFromStr(hashBE) if err != nil { jsonError := btcjson.Error{ @@ -927,20 +970,13 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err return ErrBadParamsField } op := btcwire.NewOutPoint(hash, uint32(index)) - cxts := s.ws.spentRequests.m[*op] - cxt := requesterContext{ - c: replychan, - id: message.Id, - } - s.ws.spentRequests.m[*op] = append(cxts, cxt) - s.ws.spentRequests.Unlock() + s.ws.requests.AddSpentRequest(walletNotification, op, message.Id) rawReply = btcjson.Reply{ Result: nil, Error: nil, Id: &message.Id, } - requester = true default: jsonError := btcjson.Error{ @@ -953,8 +989,7 @@ func jsonRead(replychan chan *btcjson.Reply, body []byte, s *rpcServer) (err err Id: &message.Id, } } - - return nil + return ErrMethodNotImplemented } // getDifficultyRatio returns the proof-of-work difficulty as a multiple of the @@ -1097,14 +1132,27 @@ func (s *rpcServer) walletReqsNotifications(ws *websocket.Conn) { // websocketJSONHandler parses and handles a marshalled json message, // sending the marshalled reply to a wallet notification channel. func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []byte) { - replychan := make(chan *btcjson.Reply) + s.wg.Add(1) + reply, err := jsonRead(msg, s) + s.wg.Done() + if err != ErrMethodNotImplemented { + replyBytes, err := json.Marshal(reply) + if err != nil { + log.Errorf("RPCS: Error marshalling reply: %v", err) + } + walletNotification <- replyBytes + return + } + + // Try websocket extensions + replychan := make(chan *btcjson.Reply) go func() { for { select { case reply, ok := <-replychan: if !ok { - // jsonRead() function called below has finished. + // no more replies expected. return } if reply == nil { @@ -1113,7 +1161,7 @@ func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []b log.Tracef("[RPCS] reply: %v", *reply) replyBytes, err := json.Marshal(reply) if err != nil { - log.Errorf("[RPCS] Error Marshalling reply: %v", err) + log.Errorf("RPCS: Error Marshalling reply: %v", err) return } walletNotification <- replyBytes @@ -1124,10 +1172,13 @@ func (s *rpcServer) websocketJSONHandler(walletNotification chan []byte, msg []b } }() - s.wg.Add(1) - err := jsonRead(replychan, msg, s) - s.wg.Done() - if err != nil { + if err == ErrMethodNotImplemented { + // Try websocket extensions + s.wg.Add(1) + err = jsonWSRead(walletNotification, replychan, msg, s) + s.wg.Done() + } + if err != nil && err != ErrMethodNotImplemented { log.Error(err) } } @@ -1200,36 +1251,33 @@ func (s *rpcServer) NotifyNewTxListeners(db btcdb.Db, block *btcutil.Block) { // each transaction input of a new block and perform any checks and // notify listening frontends when necessary. func (s *rpcServer) newBlockNotifyCheckTxIn(txins []*btcwire.TxIn) { - for _, txin := range txins { - s.ws.spentRequests.RLock() - for out, cxts := range s.ws.spentRequests.m { - if txin.PreviousOutpoint != out { - continue - } + for wltNtfn, cxt := range s.ws.requests.m { + for _, txin := range txins { + for op, id := range cxt.spentRequests { + if txin.PreviousOutpoint != op { + continue + } - reply := &btcjson.Reply{ - Result: struct { - TxHash string `json:"txhash"` - Index uint32 `json:"index"` - }{ - TxHash: out.Hash.String(), - Index: uint32(out.Index), - }, - Error: nil, - // Id is set for each requester separately below. + reply := &btcjson.Reply{ + Result: struct { + TxHash string `json:"txhash"` + Index uint32 `json:"index"` + }{ + TxHash: op.Hash.String(), + Index: uint32(op.Index), + }, + Error: nil, + Id: &id, + } + replyBytes, err := json.Marshal(reply) + if err != nil { + log.Errorf("RPCS: Unable to marshal spent notification: %v", err) + continue + } + wltNtfn <- replyBytes + s.ws.requests.RemoveSpentRequest(wltNtfn, &op) } - for _, cxt := range cxts { - reply.Id = &cxt.id - cxt.c <- reply - } - - s.ws.spentRequests.RUnlock() - s.ws.spentRequests.Lock() - delete(s.ws.spentRequests.m, out) - s.ws.spentRequests.Unlock() - s.ws.spentRequests.RLock() } - s.ws.spentRequests.RUnlock() } } @@ -1237,57 +1285,59 @@ func (s *rpcServer) newBlockNotifyCheckTxIn(txins []*btcwire.TxIn) { // each transaction output of a new block and perform any checks and // notify listening frontends when necessary. func (s *rpcServer) newBlockNotifyCheckTxOut(db btcdb.Db, block *btcutil.Block, tx *btcdb.TxListReply) { - for i, txout := range tx.Tx.TxOut { - _, txaddrhash, err := btcscript.ScriptToAddrHash(txout.PkScript) - if err != nil { - log.Debug("Error getting payment address from tx; dropping any Tx notifications.") - break - } - s.ws.txRequests.RLock() - for addr, cxts := range s.ws.txRequests.m { - if !bytes.Equal(addr[:], txaddrhash) { - continue - } - blkhash, err := block.Sha() + for wltNtfn, cxt := range s.ws.requests.m { + for i, txout := range tx.Tx.TxOut { + _, txaddrhash, err := btcscript.ScriptToAddrHash(txout.PkScript) if err != nil { - log.Error("Error getting block sha; dropping Tx notification.") + log.Debug("Error getting payment address from tx; dropping any Tx notifications.") break } - txaddr, err := btcutil.EncodeAddress(txaddrhash, s.server.btcnet) - if err != nil { - log.Error("Error encoding address; dropping Tx notification.") - break - } - reply := &btcjson.Reply{ - Result: struct { - Sender string `json:"sender"` - Receiver string `json:"receiver"` - BlockHash string `json:"blockhash"` - Height int64 `json:"height"` - TxHash string `json:"txhash"` - Index uint32 `json:"index"` - Amount int64 `json:"amount"` - PkScript string `json:"pkscript"` - Spent bool `json:"spent"` - }{ - Sender: "Unknown", // TODO(jrick) - Receiver: txaddr, - BlockHash: blkhash.String(), - Height: block.Height(), - TxHash: tx.Sha.String(), - Index: uint32(i), - Amount: txout.Value, - PkScript: btcutil.Base58Encode(txout.PkScript), - Spent: tx.TxSpent[i], - }, - Error: nil, - // Id is set for each requester separately below. - } - for _, cxt := range cxts { - reply.Id = &cxt.id - cxt.c <- reply + for addr, id := range cxt.txRequests { + if !bytes.Equal(addr[:], txaddrhash) { + continue + } + blkhash, err := block.Sha() + if err != nil { + log.Error("Error getting block sha; dropping Tx notification.") + break + } + txaddr, err := btcutil.EncodeAddress(txaddrhash, s.server.btcnet) + if err != nil { + log.Error("Error encoding address; dropping Tx notification.") + break + } + reply := &btcjson.Reply{ + Result: struct { + Sender string `json:"sender"` + Receiver string `json:"receiver"` + BlockHash string `json:"blockhash"` + Height int64 `json:"height"` + TxHash string `json:"txhash"` + Index uint32 `json:"index"` + Amount int64 `json:"amount"` + PkScript string `json:"pkscript"` + Spent bool `json:"spent"` + }{ + Sender: "Unknown", // TODO(jrick) + Receiver: txaddr, + BlockHash: blkhash.String(), + Height: block.Height(), + TxHash: tx.Sha.String(), + Index: uint32(i), + Amount: txout.Value, + PkScript: btcutil.Base58Encode(txout.PkScript), + Spent: tx.TxSpent[i], + }, + Error: nil, + Id: &id, + } + replyBytes, err := json.Marshal(reply) + if err != nil { + log.Errorf("RPCS: Unable to marshal tx notification: %v", err) + continue + } + wltNtfn <- replyBytes } } - s.ws.txRequests.RUnlock() } }