resync queued writes, don't localStorage out self get

This commit is contained in:
Mark Nadal 2018-05-22 13:58:49 -07:00
parent b6b8cc90f6
commit 81ac428e4b
5 changed files with 96 additions and 15 deletions

92
gun.js
View File

@ -686,6 +686,8 @@
if(!at.once){
at.on('in', root, at);
at.on('out', root, at);
Gun.on('create', at);
at.on('create', at);
}
at.once = 1;
return gun;
@ -939,6 +941,7 @@
function output(msg){
var put, get, at = this.as, back = at.back, root = at.root;
if(!msg.I){ msg.I = at.gun }
if(!msg.gun){ msg.gun = at.gun }
this.to.next(msg);
if(get = msg.get){
@ -1366,6 +1369,7 @@
as.res = as.res || function(cb){ if(cb){ cb() } };
as.res(function(){
var cat = (as.gun.back(-1)._), ask = cat.ask(function(ack){
cat.root.on('ack', ack);
this.off(); // One response is good enough for us currently. Later we may want to adjust this.
if(!as.ack){ return }
as.ack(ack, this);
@ -1742,6 +1746,71 @@
If you update anything here, consider updating the other adapters as well.
*/
Gun.on('opt', function(root){
// This code is used to queue offline writes for resync.
// See the next 'opt' code below for actual saving of data.
var ev = this.to, opt = root.opt;
if(root.once){ return ev.next(root) }
if(false === opt.localStorage){ return ev.next(root) }
opt.file = opt.file || 'gun/';
var gap = Gun.obj.ify(store.getItem('gap/'+opt.file)) || {};
var empty = Gun.obj.empty, id, to, go;
// add re-sync command.
if(!empty(gap)){
root.on('localStorage', function(disk){
this.off();
var send = {}
Gun.obj.map(gap, function(node, soul){
Gun.obj.map(node, function(val, key){
send[soul] = Gun.state.to(disk[soul], key, send[soul]);
});
});
setTimeout(function(){
root.on('out', {put: send, '#': root.ask(ack), I: root.gun});
},10);
});
}
root.on('out', function(msg){
if(msg.lS){ return }
if(msg.I && msg.put && !msg['@'] && !empty(opt.peers)){
id = msg['#'];
Gun.graph.is(msg.put, null, map);
if(!to){ to = setTimeout(flush, opt.wait || 1) }
}
this.to.next(msg);
});
root.on('ack', ack);
function ack(ack){ // TODO: This is experimental, not sure if we should keep this type of event hook.
if(ack.err || !ack.ok){ return }
var id = ack['@'];
setTimeout(function(){
Gun.obj.map(gap, function(node, soul){
Gun.obj.map(node, function(val, key){
if(id !== val){ return }
delete node[key];
});
if(empty(node)){
delete gap[soul];
}
});
flush();
}, opt.wait || 1);
};
ev.next(root);
var map = function(val, key, node, soul){
(gap[soul] || (gap[soul] = {}))[key] = id;
}
var flush = function(){
clearTimeout(to);
to = false;
try{store.setItem('gap/'+opt.file, JSON.stringify(gap));
}catch(e){ Gun.log(err = e || "localStorage failure") }
}
});
Gun.on('opt', function(root){
this.to.next(root);
var opt = root.opt;
@ -1750,6 +1819,8 @@
opt.file = opt.file || opt.prefix || 'gun/'; // support old option name.
var graph = root.graph, acks = {}, count = 0, to;
var disk = Gun.obj.ify(store.getItem(opt.file)) || {};
var lS = function(){}, u;
root.on('localStorage', disk); // NON-STANDARD EVENT!
root.on('put', function(at){
this.to.next(at);
@ -1763,12 +1834,12 @@
to = setTimeout(flush, opt.wait || 1);
});
root.on('get', function(at){
this.to.next(at);
var lex = at.get, soul, data, u;
root.on('get', function(msg){
this.to.next(msg);
var lex = msg.get, soul, data, u;
//setTimeout(function(){
if(!lex || !(soul = lex['#'])){ return }
//if(0 >= at.cap){ return }
//if(0 >= msg.cap){ return }
var has = lex['.'];
data = disk[soul] || u;
if(data && has){
@ -1777,7 +1848,7 @@
if(!data && !Gun.obj.empty(opt.peers)){ // if data not found, don't ack if there are peers.
return; // Hmm, what if we have peers but we are disconnected?
}
root.on('in', {'@': at['#'], put: Gun.graph.node(data), how: 'lS'});
root.on('in', {'@': msg['#'], put: Gun.graph.node(data), how: 'lS', lS: msg.I});
//},1);
});
@ -1813,6 +1884,7 @@
var mesh = function(){};
mesh.out = function(msg){ var tmp;
//console.log("count:", msg['#'], msg);
if(this.to){ this.to.next(msg) }
if((tmp = msg['@'])
&& (tmp = ctx.dup.s[tmp])
@ -1887,6 +1959,7 @@
}
}
if((tmp = msh.to) && (tmp[peer.url] || tmp[peer.id])){ return } // TODO: still needs to be tested
//console.log('out', JSON.parse(raw));
if(peer.batch){
peer.batch.push(raw);
return;
@ -1898,10 +1971,12 @@
peer.batch = null;
if(!tmp.length){ return }
send(JSON.stringify(tmp), peer);
}, ctx.opt.wait || 1);
}, ctx.opt.gap || ctx.opt.wait || 1);
send(raw, peer);
}
function send(raw, peer){
//console.log("send:", raw.slice(raw.indexOf('#'), 20));
var wire = peer.wire;
try{
if(wire.send){
@ -2019,7 +2094,10 @@
opt.WebSocket = websocket;
var mesh = opt.mesh = opt.mesh || Gun.Mesh(root);
//root.on('create', function(at){
//this.to.next(at);
root.on('out', mesh.out);
//});
opt.wire = opt.wire || open;
function open(peer){
@ -2041,7 +2119,9 @@
mesh.hi(peer);
}
wire.onmessage = function(msg){
//console.log('in', JSON.parse(msg.data || msg));
if(!msg){ return }
env.inLength = (env.inLength || 0) + (msg.data || msg).length; // TEMPORARY, NON-STANDARD, FOR DEBUG
mesh.hear(msg.data || msg, peer);
};
return wire;

View File

@ -50,6 +50,7 @@
Gun.on('opt', function(root){
this.to.next(root);
if(root.once){ return }
console.log(">>>>>>>>>", root);
root.debug = db;
db.root = root;
db.peers = root.opt.peers;

View File

@ -6,9 +6,9 @@ function Radisk(opt){
opt = opt || {};
opt.file = String(opt.file || 'radata');
opt.thrash = opt.thrash || opt.wait || 1;
opt.until = opt.until || opt.wait || 1;
opt.batch = opt.batch || 10 * 1000;
opt.size = opt.size || (1024 * 1024 * 10); // 10MB
opt.chunk = opt.chunk || (1024 * 1024 * 10); // 10MB
opt.code = opt.code || {};
opt.code.from = opt.code.from || '!';
@ -51,7 +51,7 @@ function Radisk(opt){
if(cb){ r.batch.acks.push(cb) }
if(++r.batch.ed >= opt.batch){ return r.thrash() } // (2)
clearTimeout(r.batch.to); // (1)
r.batch.to = setTimeout(r.thrash, opt.thrash || 1);
r.batch.to = setTimeout(r.thrash, opt.until || 1);
}
r.batch = Radix();
@ -137,7 +137,7 @@ function Radisk(opt){
f.each = function(val, key, k, pre){
f.count++;
var enc = Radisk.encode(pre.length) +'#'+ Radisk.encode(k) + (u === val? '' : '='+ Radisk.encode(val)) +'\n';
if(opt.size < f.text.length + enc.length){
if(opt.chunk < f.text.length + enc.length){
f.text = '';
f.limit = Math.ceil(f.count/2);
f.count = 0;

View File

@ -10,8 +10,8 @@ Gun.on('opt', function(ctx){
if(ctx.once){ return }
if(!process.env.AWS_S3_BUCKET){ return }
opt.batch = opt.batch || (1000 * 10);
opt.thrash = opt.thrash || (1000 * 15);
opt.size = opt.size || (1024 * 1024 * 10); // 10MB
opt.until = opt.until || (1000 * 15);
opt.chunk = opt.chunk || (1024 * 1024 * 10); // 10MB
try{AWS = require('aws-sdk');
}catch(e){

View File

@ -1,6 +1,6 @@
{
"name": "gun",
"version": "0.9.996",
"version": "0.9.997",
"description": "A realtime, decentralized, offline-first, graph data synchronization engine.",
"main": "index.js",
"browser": "gun.min.js",