diff --git a/gun.js b/gun.js index 04f2df29..ba520d9b 100644 --- a/gun.js +++ b/gun.js @@ -686,6 +686,8 @@ if(!at.once){ at.on('in', root, at); at.on('out', root, at); + Gun.on('create', at); + at.on('create', at); } at.once = 1; return gun; @@ -939,6 +941,7 @@ function output(msg){ var put, get, at = this.as, back = at.back, root = at.root; + if(!msg.I){ msg.I = at.gun } if(!msg.gun){ msg.gun = at.gun } this.to.next(msg); if(get = msg.get){ @@ -1366,6 +1369,7 @@ as.res = as.res || function(cb){ if(cb){ cb() } }; as.res(function(){ var cat = (as.gun.back(-1)._), ask = cat.ask(function(ack){ + cat.root.on('ack', ack); this.off(); // One response is good enough for us currently. Later we may want to adjust this. if(!as.ack){ return } as.ack(ack, this); @@ -1742,6 +1746,71 @@ If you update anything here, consider updating the other adapters as well. */ + Gun.on('opt', function(root){ + // This code is used to queue offline writes for resync. + // See the next 'opt' code below for actual saving of data. + var ev = this.to, opt = root.opt; + if(root.once){ return ev.next(root) } + if(false === opt.localStorage){ return ev.next(root) } + opt.file = opt.file || 'gun/'; + var gap = Gun.obj.ify(store.getItem('gap/'+opt.file)) || {}; + var empty = Gun.obj.empty, id, to, go; + // add re-sync command. + if(!empty(gap)){ + root.on('localStorage', function(disk){ + this.off(); + var send = {} + Gun.obj.map(gap, function(node, soul){ + Gun.obj.map(node, function(val, key){ + send[soul] = Gun.state.to(disk[soul], key, send[soul]); + }); + }); + setTimeout(function(){ + root.on('out', {put: send, '#': root.ask(ack), I: root.gun}); + },10); + }); + } + + root.on('out', function(msg){ + if(msg.lS){ return } + if(msg.I && msg.put && !msg['@'] && !empty(opt.peers)){ + id = msg['#']; + Gun.graph.is(msg.put, null, map); + if(!to){ to = setTimeout(flush, opt.wait || 1) } + } + this.to.next(msg); + }); + root.on('ack', ack); + + function ack(ack){ // TODO: This is experimental, not sure if we should keep this type of event hook. + if(ack.err || !ack.ok){ return } + var id = ack['@']; + setTimeout(function(){ + Gun.obj.map(gap, function(node, soul){ + Gun.obj.map(node, function(val, key){ + if(id !== val){ return } + delete node[key]; + }); + if(empty(node)){ + delete gap[soul]; + } + }); + flush(); + }, opt.wait || 1); + }; + ev.next(root); + + var map = function(val, key, node, soul){ + (gap[soul] || (gap[soul] = {}))[key] = id; + } + var flush = function(){ + clearTimeout(to); + to = false; + try{store.setItem('gap/'+opt.file, JSON.stringify(gap)); + }catch(e){ Gun.log(err = e || "localStorage failure") } + } + }); + Gun.on('opt', function(root){ this.to.next(root); var opt = root.opt; @@ -1750,6 +1819,8 @@ opt.file = opt.file || opt.prefix || 'gun/'; // support old option name. var graph = root.graph, acks = {}, count = 0, to; var disk = Gun.obj.ify(store.getItem(opt.file)) || {}; + var lS = function(){}, u; + root.on('localStorage', disk); // NON-STANDARD EVENT! root.on('put', function(at){ this.to.next(at); @@ -1763,12 +1834,12 @@ to = setTimeout(flush, opt.wait || 1); }); - root.on('get', function(at){ - this.to.next(at); - var lex = at.get, soul, data, u; + root.on('get', function(msg){ + this.to.next(msg); + var lex = msg.get, soul, data, u; //setTimeout(function(){ if(!lex || !(soul = lex['#'])){ return } - //if(0 >= at.cap){ return } + //if(0 >= msg.cap){ return } var has = lex['.']; data = disk[soul] || u; if(data && has){ @@ -1777,7 +1848,7 @@ if(!data && !Gun.obj.empty(opt.peers)){ // if data not found, don't ack if there are peers. return; // Hmm, what if we have peers but we are disconnected? } - root.on('in', {'@': at['#'], put: Gun.graph.node(data), how: 'lS'}); + root.on('in', {'@': msg['#'], put: Gun.graph.node(data), how: 'lS', lS: msg.I}); //},1); }); @@ -1813,6 +1884,7 @@ var mesh = function(){}; mesh.out = function(msg){ var tmp; + //console.log("count:", msg['#'], msg); if(this.to){ this.to.next(msg) } if((tmp = msg['@']) && (tmp = ctx.dup.s[tmp]) @@ -1886,7 +1958,8 @@ return; // TODO: this still needs to be tested in the browser! } } - if((tmp = msh.to) && (tmp[peer.url] || tmp[peer.id])){ return } // TODO: still needs to be tested + if((tmp = msh.to) && (tmp[peer.url] || tmp[peer.id])){ return } // TODO: still needs to be tested + //console.log('out', JSON.parse(raw)); if(peer.batch){ peer.batch.push(raw); return; @@ -1898,10 +1971,12 @@ peer.batch = null; if(!tmp.length){ return } send(JSON.stringify(tmp), peer); - }, ctx.opt.wait || 1); + }, ctx.opt.gap || ctx.opt.wait || 1); send(raw, peer); } + function send(raw, peer){ + //console.log("send:", raw.slice(raw.indexOf('#'), 20)); var wire = peer.wire; try{ if(wire.send){ @@ -2019,7 +2094,10 @@ opt.WebSocket = websocket; var mesh = opt.mesh = opt.mesh || Gun.Mesh(root); - root.on('out', mesh.out); + //root.on('create', function(at){ + //this.to.next(at); + root.on('out', mesh.out); + //}); opt.wire = opt.wire || open; function open(peer){ @@ -2041,7 +2119,9 @@ mesh.hi(peer); } wire.onmessage = function(msg){ + //console.log('in', JSON.parse(msg.data || msg)); if(!msg){ return } + env.inLength = (env.inLength || 0) + (msg.data || msg).length; // TEMPORARY, NON-STANDARD, FOR DEBUG mesh.hear(msg.data || msg, peer); }; return wire; diff --git a/lib/debug.js b/lib/debug.js index c6965399..2ed48df4 100644 --- a/lib/debug.js +++ b/lib/debug.js @@ -50,6 +50,7 @@ Gun.on('opt', function(root){ this.to.next(root); if(root.once){ return } + console.log(">>>>>>>>>", root); root.debug = db; db.root = root; db.peers = root.opt.peers; diff --git a/lib/radisk.js b/lib/radisk.js index cb2d571d..1c4075fd 100644 --- a/lib/radisk.js +++ b/lib/radisk.js @@ -6,9 +6,9 @@ function Radisk(opt){ opt = opt || {}; opt.file = String(opt.file || 'radata'); - opt.thrash = opt.thrash || opt.wait || 1; + opt.until = opt.until || opt.wait || 1; opt.batch = opt.batch || 10 * 1000; - opt.size = opt.size || (1024 * 1024 * 10); // 10MB + opt.chunk = opt.chunk || (1024 * 1024 * 10); // 10MB opt.code = opt.code || {}; opt.code.from = opt.code.from || '!'; @@ -51,7 +51,7 @@ function Radisk(opt){ if(cb){ r.batch.acks.push(cb) } if(++r.batch.ed >= opt.batch){ return r.thrash() } // (2) clearTimeout(r.batch.to); // (1) - r.batch.to = setTimeout(r.thrash, opt.thrash || 1); + r.batch.to = setTimeout(r.thrash, opt.until || 1); } r.batch = Radix(); @@ -137,7 +137,7 @@ function Radisk(opt){ f.each = function(val, key, k, pre){ f.count++; var enc = Radisk.encode(pre.length) +'#'+ Radisk.encode(k) + (u === val? '' : '='+ Radisk.encode(val)) +'\n'; - if(opt.size < f.text.length + enc.length){ + if(opt.chunk < f.text.length + enc.length){ f.text = ''; f.limit = Math.ceil(f.count/2); f.count = 0; diff --git a/lib/rs3.js b/lib/rs3.js index ce73d99a..aa3ed870 100644 --- a/lib/rs3.js +++ b/lib/rs3.js @@ -10,8 +10,8 @@ Gun.on('opt', function(ctx){ if(ctx.once){ return } if(!process.env.AWS_S3_BUCKET){ return } opt.batch = opt.batch || (1000 * 10); - opt.thrash = opt.thrash || (1000 * 15); - opt.size = opt.size || (1024 * 1024 * 10); // 10MB + opt.until = opt.until || (1000 * 15); + opt.chunk = opt.chunk || (1024 * 1024 * 10); // 10MB try{AWS = require('aws-sdk'); }catch(e){ diff --git a/package.json b/package.json index b3329653..023b9fa5 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "gun", - "version": "0.9.996", + "version": "0.9.997", "description": "A realtime, decentralized, offline-first, graph data synchronization engine.", "main": "index.js", "browser": "gun.min.js",