Fix replication

This commit is contained in:
haad 2021-10-21 14:42:34 +03:00
parent 5df477ea27
commit 08eaf84d74
22 changed files with 6431 additions and 34518 deletions

View File

@ -26,6 +26,8 @@ jobs:
# VM instead of a container) see https://circleci.com/docs/2.0/executor-types/
# To see the list of pre-built images that CircleCI provides for most common languages see
# https://circleci.com/docs/2.0/circleci-images/
# TODO: update to node 14 or 16?
docker:
- image: circleci/node:12-browsers
steps:

39991
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -25,7 +25,7 @@
"multihashes": "~3.0.1",
"orbit-db-access-controllers": "^0.3.2",
"orbit-db-cache": "~0.3.0",
"orbit-db-counterstore": "~1.12.0",
"orbit-db-counterstore": "github:orbitdb/orbit-db-counterstore#fix/ids-and-merge",
"orbit-db-docstore": "~1.12.0",
"orbit-db-eventstore": "~1.12.0",
"orbit-db-feedstore": "~1.12.0",
@ -35,7 +35,7 @@
"orbit-db-kvstore": "~1.12.0",
"orbit-db-pubsub": "~0.6.0",
"orbit-db-storage-adapter": "~0.5.3",
"orbit-db-store": "^4.2.0"
"orbit-db-store": "github:haadcode/orbit-db-store#fix/replication-concurrency"
},
"devDependencies": {
"adm-zip": "^0.4.16",
@ -56,7 +56,7 @@
"localstorage-level-migration": "~0.1.0",
"markdown-toc": "^1.2.0",
"mkdirp": "^1.0.4",
"mocha": "^8.1.1",
"mocha": "^9.1.3",
"node-pre-gyp": "^0.13.0",
"open-cli": "^6.0.1",
"orbit-db-test-utils": "^1.3.0",
@ -82,7 +82,7 @@
"examples:browser-webpack": "open-cli examples/browser/browser-webpack-example/index.html",
"lint:docs": "remark -qf -u validate-links .",
"test:all": "npm run test:browser-multiple-tabs && npm run test",
"test": "cross-env TEST=all mocha && cross-env TEST=all mocha ./test/access-controllers --exit",
"test": "cross-env TEST=all mocha --config test/.mocharc.js",
"test:browser-multiple-tabs": "npm run build:dist && cpy dist/orbitdb.min.js ./test/browser --rename=orbitdb.js && cpy node_modules/ipfs/index.min.js ./test/browser --rename=ipfs.js && cpy node_modules/orbit-db-identity-provider/dist/index-browser.min.js ./test/browser --rename=identities.js && cpy node_modules/ipfs-log/dist/ipfslog.min.js ./test/browser && mocha ./test/browser/concurrent.spec.js",
"build": "npm run build:es5 && npm run build:debug && npm run build:dist && npm run build:examples && npm run build:docs/toc",
"build:examples": "webpack --config conf/webpack.example.config.js --sort-modules-by size",

View File

@ -32,6 +32,8 @@ const databaseTypes = {
keyvalue: KeyValueStore
}
const defaultTimeout = 30000 // 30 seconds
class OrbitDB {
constructor (ipfs, identity, options = {}) {
if (!isDefined(ipfs)) { throw new Error('IPFS is a required argument. See https://github.com/orbitdb/orbit-db/blob/master/API.md#createinstance') }
@ -41,11 +43,11 @@ class OrbitDB {
this._ipfs = ipfs
this.identity = identity
this.id = options.peerId
this._pubsub = !options.offline ?
new (
options.broker ? options.broker : Pubsub
)(this._ipfs, this.id)
: null
this._pubsub = !options.offline
? new (
options.broker ? options.broker : Pubsub
)(this._ipfs, this.id)
: null
this.directory = options.directory || './orbitdb'
this.storage = options.storage
this._directConnections = {}
@ -67,7 +69,7 @@ class OrbitDB {
static get AccessControllers () { return AccessControllers }
static get Storage () { return Storage }
static get OrbitDBAddress () { return OrbitDBAddress }
static get Store () { return Store }
static get EventStore () { return EventStore }
static get FeedStore () { return FeedStore }
@ -88,7 +90,7 @@ class OrbitDB {
throw new Error('Offline mode requires passing an `id` in the options')
}
const { id } = options.offline ? ({ id: options.id }) : await ipfs.id()
const { id } = options.id || options.offline ? ({ id: options.id }) : await ipfs.id()
if (!options.directory) { options.directory = './orbitdb' }
@ -104,7 +106,7 @@ class OrbitDB {
}
if (!options.keystore) {
const keystorePath = path.join(options.directory, id, '/keystore')
const keystorePath = path.join(options.directory, options.id || id, '/keystore')
const keyStorage = await options.storage.createStore(keystorePath)
options.keystore = new Keystore(keyStorage)
}
@ -165,6 +167,20 @@ class OrbitDB {
}
async disconnect () {
// Close a direct connection and remove it from internal state
const removeDirectConnect = e => {
this._directConnections[e].close()
delete this._directConnections[e]
}
// Close all direct connections to peers
Object.keys(this._directConnections).forEach(removeDirectConnect)
// Disconnect from pubsub
if (this._pubsub) {
await this._pubsub.disconnect()
}
// close keystore
await this.keystore.close()
@ -181,20 +197,6 @@ class OrbitDB {
delete this.caches[directory]
}
// Close a direct connection and remove it from internal state
const removeDirectConnect = e => {
this._directConnections[e].close()
delete this._directConnections[e]
}
// Close all direct connections to peers
Object.keys(this._directConnections).forEach(removeDirectConnect)
// Disconnect from pubsub
if (this._pubsub) {
await this._pubsub.disconnect()
}
// Remove all databases from the state
this.stores = {}
}
@ -300,8 +302,7 @@ class OrbitDB {
await this._pubsub.unsubscribe(address)
}
const store = this.stores[address]
const dir = store && store.options.directory ? store.options.directory : this.directory
const dir = db && db.options.directory ? db.options.directory : this.directory
const cache = this.caches[dir]
if (cache && cache.handlers.has(address)) {
@ -316,7 +317,6 @@ class OrbitDB {
const address = db.address.toString()
const dir = db && db.options.directory ? db.options.directory : this.directory
await this._requestCache(address, dir, db._cache)
this.stores[address] = db
}
async _onLoad (db) {
@ -427,6 +427,11 @@ class OrbitDB {
// Parse the database address
const dbAddress = OrbitDBAddress.parse(address)
// If database is already open, return early by returning the instance
// if (this.stores[dbAddress]) {
// return this.stores[dbAddress]
// }
options.cache = await this._requestCache(dbAddress.toString(), options.directory)
// Check if we have the database
@ -444,19 +449,24 @@ class OrbitDB {
logger.debug(`Loading Manifest for '${dbAddress}'`)
// Get the database manifest from IPFS
const manifest = await io.read(this._ipfs, dbAddress.root)
const manifest = await io.read(this._ipfs, dbAddress.root, { timeout: options.timeout || defaultTimeout })
logger.debug(`Manifest for '${dbAddress}':\n${JSON.stringify(manifest, null, 2)}`)
if (manifest.name !== dbAddress.path) {
logger.warn(`Manifest name '${manifest.name}' and path name '${dbAddress.path}' do not match`)
}
// Make sure the type from the manifest matches the type that was given as an option
if (manifest.name !== dbAddress.path) { throw new Error(`Manifest '${manifest.name}' cannot be opened as '${dbAddress.path}'`) }
if (options.type && manifest.type !== options.type) { throw new Error(`Database '${dbAddress}' is type '${manifest.type}' but was opened as '${options.type}'`) }
if (options.type && manifest.type !== options.type) {
throw new Error(`Database '${dbAddress}' is type '${manifest.type}' but was opened as '${options.type}'`)
}
// Save the database locally
await this._addManifestToCache(options.cache, dbAddress)
// Open the the database
options = Object.assign({}, options, { accessControllerAddress: manifest.accessController, meta: manifest.meta })
return this._createStore(manifest.type, dbAddress, options)
return this._createStore(options.type || manifest.type, dbAddress, options)
}
// Save the database locally

10
test/.mocharc.js Normal file
View File

@ -0,0 +1,10 @@
'use strict'
module.exports = {
reporter: 'spec',
colors: true,
recursive: true,
exit: true,
slow: 1000,
exclude: ['test/browser/**/*.js']
}

View File

@ -17,7 +17,8 @@ const {
config,
startIpfs,
stopIpfs,
testAPIs
testAPIs,
connectPeers
} = require('orbit-db-test-utils')
const dbPath1 = './orbitdb/tests/contract-access-controller-integration/1'
@ -36,7 +37,7 @@ const accessControllers = [
Object.keys(testAPIs).forEach(API => {
describe(`orbit-db - ContractAccessController Integration (${API})`, function () {
this.timeout(config.timeout)
this.timeout(config.timeout * 2)
let ipfsd1, ipfsd2, ipfs1, ipfs2, id1, id2
let orbitdb1, orbitdb2
@ -50,6 +51,10 @@ Object.keys(testAPIs).forEach(API => {
ipfs1 = ipfsd1.api
ipfs2 = ipfsd2.api
// Connect the peers manually to speed up test times
const isLocalhostAddress = (addr) => addr.toString().includes('127.0.0.1')
await connectPeers(ipfs1, ipfs2, { filter: isLocalhostAddress })
const keystore1 = new Keystore(dbPath1 + '/keys')
const keystore2 = new Keystore(dbPath2 + '/keys')
IdentityProvider.addIdentityProvider(EthIdentityProvider)

View File

@ -12,7 +12,8 @@ const {
config,
startIpfs,
stopIpfs,
testAPIs
testAPIs,
connectPeers
} = require('orbit-db-test-utils')
const dbPath1 = './orbitdb/tests/orbitdb-access-controller-integration/1'
@ -33,6 +34,10 @@ Object.keys(testAPIs).forEach(API => {
ipfs1 = ipfsd1.api
ipfs2 = ipfsd2.api
// Connect the peers manually to speed up test times
const isLocalhostAddress = (addr) => addr.toString().includes('127.0.0.1')
await connectPeers(ipfs1, ipfs2, { filter: isLocalhostAddress })
const keystore1 = new Keystore(dbPath1 + '/keys')
const keystore2 = new Keystore(dbPath2 + '/keys')

View File

@ -12,7 +12,8 @@ const {
config,
startIpfs,
stopIpfs,
testAPIs
testAPIs,
connectPeers
} = require('orbit-db-test-utils')
const dbPath1 = './orbitdb/tests/orbitdb-access-controller-integration/1'
@ -33,6 +34,10 @@ Object.keys(testAPIs).forEach(API => {
ipfs1 = ipfsd1.api
ipfs2 = ipfsd2.api
// Connect the peers manually to speed up test times
const isLocalhostAddress = (addr) => addr.toString().includes('127.0.0.1')
await connectPeers(ipfs1, ipfs2, { filter: isLocalhostAddress })
const keystore1 = new Keystore(dbPath1 + '/keys')
const keystore2 = new Keystore(dbPath2 + '/keys')

View File

@ -80,7 +80,7 @@ describe(`orbit-db - browser concurrent writes`, function() {
const interval = setInterval(async () => {
let logHashes = []
await mapSeries(tabs, async (page) => {
await page.evaluate(() => loadLogs())
await page.evaluate(() => loadConsistentLog())
const hash = await page.evaluate(async () => await getConsistentLogHash())
logHashes.push(hash)
})
@ -106,7 +106,6 @@ describe(`orbit-db - browser concurrent writes`, function() {
const interval = setInterval(async () => {
let logHashes = []
await mapSeries(tabs, async (page) => {
await page.evaluate(() => loadLogs())
const hash = await page.evaluate(async () => await getInconsistentLogHash())
logHashes.push(hash)
})

View File

@ -37,8 +37,6 @@
window.inconsistentLog = inconsistentLog
waitForOpenDB.innerHTML = consistentLog.address.toString() + ' + ' + inconsistentLog.address.toString()
await consistentLog.load()
await inconsistentLog.load()
addData.addEventListener('click', async event => {
const data = randStr()
@ -71,11 +69,9 @@
return window.inconsistentLog.iterator({ limit: -1 }).collect()
}
async function loadLogs () {
async function loadConsistentLog () {
await window.consistentLog.load()
await window.inconsistentLog.load()
}
</script>
</body>
</html>

View File

@ -14,8 +14,10 @@ const {
waitForPeers
} = require('orbit-db-test-utils')
const dbPath1 = './orbitdb/tests/counters/peer1'
const dbPath2 = './orbitdb/tests/counters/peer2'
const orbitdbPath1 = './orbitdb/tests/counters/1'
const orbitdbPath2 = './orbitdb/tests/counters/2'
const dbPath1 = './orbitdb/tests/counters/db1'
const dbPath2 = './orbitdb/tests/counters/db2'
Object.keys(testAPIs).forEach(API => {
describe(`orbit-db - Counters (${API})`, function () {
@ -27,12 +29,15 @@ Object.keys(testAPIs).forEach(API => {
before(async () => {
rmrf.sync(dbPath1)
rmrf.sync(dbPath2)
rmrf.sync(orbitdbPath1)
rmrf.sync(orbitdbPath2)
ipfsd1 = await startIpfs(API, config.daemon1)
ipfsd2 = await startIpfs(API, config.daemon2)
ipfs1 = ipfsd1.api
ipfs2 = ipfsd2.api
// Connect the peers manually to speed up test times
await connectPeers(ipfs1, ipfs2)
const isLocalhostAddress = (addr) => addr.toString().includes('127.0.0.1')
await connectPeers(ipfs1, ipfs2, { filter: isLocalhostAddress })
})
after(async () => {
@ -90,10 +95,12 @@ Object.keys(testAPIs).forEach(API => {
const counter = await orbitdb1.counter(address, { path: dbPath1 })
await counter.load()
assert.equal(counter.value, 14)
await counter.close()
await counter.drop()
})
it('syncs counters', async () => {
console.log("Sync counters")
let options = {
accessController: {
// Set write access for both clients
@ -104,33 +111,66 @@ Object.keys(testAPIs).forEach(API => {
}
}
const dbName = new Date().getTime().toString()
const numbers = [[13, 10], [2, 5]]
const increaseCounter = (counterDB, i) => mapSeries(numbers[i], n => counterDB.inc(n))
// Create a new counter database in the first client
options = Object.assign({}, options, { path: dbPath1 })
const counter1 = await orbitdb1.counter(new Date().getTime().toString(), options)
// Open the database in the second client
options = Object.assign({}, options, { path: dbPath2, sync: true })
const counter2 = await orbitdb2.counter(counter1.address.toString(), options)
const counter1 = await orbitdb1.counter(dbName, options)
// Wait for peers to connect first
// Open the database in the second client
options = Object.assign({}, options, { path: dbPath2 })
const counter2 = await orbitdb2.counter(dbName, options)
// Make sure database addresses match since they're built deterministically
assert.equal(counter1.address.toString(), counter2.address.toString())
// Wait for peers to connect
console.log("Waiting for peers to connect")
await waitForPeers(ipfs1, [orbitdb2.id], counter1.address.toString())
await waitForPeers(ipfs2, [orbitdb1.id], counter1.address.toString())
let finished1 = counter1.value === 30
let finished2 = counter2.value === 30
counter1.events.on('replicated', () => {
finished1 = (counter1.value === 30)
finished2 = (counter2.value === 30)
})
counter2.events.on('replicated', () => {
finished1 = (counter1.value === 30)
finished2 = (counter2.value === 30)
})
counter1.events.on('write', () => {
finished1 = (counter1.value === 30)
finished2 = (counter2.value === 30)
})
counter2.events.on('write', () => {
finished1 = (counter1.value === 30)
finished2 = (counter2.value === 30)
})
// Increase the counters sequentially
await mapSeries([counter1, counter2], increaseCounter)
console.log("Waiting for replication to finish")
return new Promise(resolve => {
// Wait for a while to make sure db's have been synced
setTimeout(async () => {
assert.equal(counter1.value, 30)
assert.equal(counter2.value, 30)
await counter1.close()
await counter2.close()
resolve()
}, 1000)
return new Promise((resolve, reject) => {
let timer = setInterval(async () => {
if (finished1 && finished2) {
try {
clearInterval(timer)
assert.equal(counter1.value, 30)
assert.equal(counter2.value, 30)
await counter1.close()
await counter2.close()
resolve()
} catch (e) {
reject(e)
}
}
}, 100)
})
})
})

View File

@ -352,7 +352,7 @@ Object.keys(testAPIs).forEach(API => {
})
it('doesn\'t open a database if we don\'t have it locally', async () => {
const db = await orbitdb.open('abc', { create: true, type: 'feed', overwrite: true })
const db = await orbitdb.open('abcabc', { create: true, type: 'feed', overwrite: true })
const address = new OrbitDBAddress(db.address.root.slice(0, -1) + 'A', 'non-existent')
await db.drop()
return new Promise((resolve, reject) => {

View File

@ -1,6 +0,0 @@
--reporter spec
--colors
--recursive
--exit
--slow 1000
--exclude test/browser/**/*.js

View File

@ -68,12 +68,15 @@ Object.keys(testAPIs).forEach(API => {
before(async () => {
rmrf.sync(dbPath1)
rmrf.sync(dbPath2)
ipfsd1 = await startIpfs(API, config.daemon1)
ipfsd2 = await startIpfs(API, config.daemon2)
ipfs1 = ipfsd1.api
ipfs2 = ipfsd2.api
// Connect the peers manually to speed up test times
await connectPeers(ipfs1, ipfs2)
const isLocalhostAddress = (addr) => addr.toString().includes('127.0.0.1')
await connectPeers(ipfs1, ipfs2, { filter: isLocalhostAddress })
console.log("Peers connected")
orbitdb1 = await OrbitDB.createInstance(ipfs1, { directory: dbPath1 })
orbitdb2 = await OrbitDB.createInstance(ipfs2, { directory: dbPath2 })
})
@ -111,9 +114,6 @@ Object.keys(testAPIs).forEach(API => {
localDatabases.push(db)
}
// Open the databases on the second node, set 'sync' flag so that
// the second peer fetches the db manifest from the network
options = Object.assign({}, options, { sync: true })
for (let [index, dbInterface] of databaseInterfaces.entries()) {
const address = localDatabases[index].address.toString()
const db = await dbInterface.open(orbitdb2, address, options)
@ -122,7 +122,7 @@ Object.keys(testAPIs).forEach(API => {
// Wait for the peers to connect
await waitForPeers(ipfs1, [orbitdb2.id], localDatabases[0].address.toString())
await waitForPeers(ipfs1, [orbitdb2.id], localDatabases[0].address.toString())
await waitForPeers(ipfs2, [orbitdb1.id], localDatabases[0].address.toString())
console.log("Peers connected")
})
@ -143,20 +143,6 @@ Object.keys(testAPIs).forEach(API => {
for (let i = 1; i < entryCount + 1; i ++)
entryArr.push(i)
// Result state,
// we count how many times 'replicated' event was fired per db
let replicated = {}
localDatabases.forEach(db => {
replicated[db.address.toString()] = 0
})
// Listen for the updates from remote peers
remoteDatabases.forEach(db => {
db.events.on('replicated', (address) => {
replicated[address] += 1
})
})
// Write entries to each database
console.log("Writing to databases")
for (let index = 0; index < databaseInterfaces.length; index++) {
@ -165,8 +151,7 @@ Object.keys(testAPIs).forEach(API => {
await mapSeries(entryArr, val => dbInterface.write(db, val))
}
// Function to check if all databases have been replicated,
// we calculate this by checking number of 'replicated' events fired
// Function to check if all databases have been replicated
const allReplicated = () => {
return remoteDatabases.every(db => db._oplog.length === entryCount)
}

View File

@ -31,7 +31,6 @@ Object.keys(testAPIs).forEach(API => {
let localDataPath
before(async () => {
rmrf.sync('./orbitdb/tests/offline')
rmrf.sync(dbPath1)
rmrf.sync(dbPath2)
ipfsd1 = await startIpfs(API, config.daemon1)
@ -50,6 +49,11 @@ Object.keys(testAPIs).forEach(API => {
await stopIpfs(ipfsd2)
})
beforeEach(() => {
rmrf.sync(dbPath1)
rmrf.sync(dbPath2)
})
it('starts in offline mode', async () => {
orbitdb = await OrbitDB.createInstance(ipfs1, { id: 'A', offline: true, directory: dbPath1 })
assert.equal(orbitdb._pubsub, null)

View File

@ -28,7 +28,7 @@ const tests = [
{
title: 'Persistency with custom cache',
type: "custom",
orbitDBConfig: { directory: path.join(dbPath, '1') }
orbitDBConfig: { directory: path.join(dbPath, '2') }
}
]
@ -118,17 +118,19 @@ Object.keys(testAPIs).forEach(API => {
})
it('closes database while loading', async () => {
db = await orbitdb1.eventlog(address)
await new Promise(async (resolve, reject) => {
db = await orbitdb1.eventlog(address, { replicationConcurrency: 1 })
return new Promise(async (resolve, reject) => {
// don't wait for load to finish
db.load().catch(e => {
if (e.toString() !== 'ReadError: Database is not open') {
reject(e)
} else {
assert.equal(db._cache.store, null)
resolve()
}
})
db.load()
.then(() => reject("Should not finish loading?"))
.catch(e => {
if (e.toString() !== 'ReadError: Database is not open') {
reject(e)
} else {
assert.equal(db._cache.store, null)
resolve()
}
})
await db.close()
})
})

View File

@ -15,27 +15,33 @@ const {
waitForPeers,
} = require('orbit-db-test-utils')
const dbPath1 = './orbitdb/tests/replicate-and-load/1'
const dbPath2 = './orbitdb/tests/replicate-and-load/2'
const orbitdbPath1 = './orbitdb/tests/replicate-and-load/1'
const orbitdbPath2 = './orbitdb/tests/replicate-and-load/2'
const dbPath1 = './orbitdb/tests/replicate-and-load/1/db1'
const dbPath2 = './orbitdb/tests/replicate-and-load/2/db2'
Object.keys(testAPIs).forEach(API => {
describe(`orbit-db - Replicate and Load (${API})`, function() {
this.timeout(config.timeout * 2)
this.timeout(config.timeout)
let ipfsd1, ipfsd2, ipfs1, ipfs2
let orbitdb1, orbitdb2
before(async () => {
rmrf.sync(orbitdbPath1)
rmrf.sync(orbitdbPath2)
rmrf.sync(dbPath1)
rmrf.sync(dbPath2)
ipfsd1 = await startIpfs(API, config.daemon1)
ipfsd2 = await startIpfs(API, config.daemon2)
ipfs1 = ipfsd1.api
ipfs2 = ipfsd2.api
orbitdb1 = await OrbitDB.createInstance(ipfs1, { directory: dbPath1 })
orbitdb2 = await OrbitDB.createInstance(ipfs2, { directory: dbPath2 })
orbitdb1 = await OrbitDB.createInstance(ipfs1, { directory: orbitdbPath1 })
orbitdb2 = await OrbitDB.createInstance(ipfs2, { directory: orbitdbPath2 })
// Connect the peers manually to speed up test times
await connectPeers(ipfs1, ipfs2)
const isLocalhostAddress = (addr) => addr.toString().includes('127.0.0.1')
await connectPeers(ipfs1, ipfs2, { filter: isLocalhostAddress })
console.log("Peers connected")
})
after(async () => {
@ -50,17 +56,20 @@ Object.keys(testAPIs).forEach(API => {
if (ipfsd2)
await stopIpfs(ipfsd2)
rmrf.sync(dbPath1)
rmrf.sync(dbPath2)
})
describe('two peers', function() {
let db1, db2
const openDatabases = async (options) => {
const openDatabases = async (options = {}) => {
// Set write access for both clients
options.write = [
orbitdb1.identity.publicKey,
orbitdb2.identity.publicKey
],
]
options = Object.assign({}, options, { path: dbPath1, create: true })
db1 = await orbitdb1.eventlog('tests', options)
@ -70,19 +79,23 @@ Object.keys(testAPIs).forEach(API => {
}
before(async () => {
await openDatabases({ sync: true })
await openDatabases()
assert.equal(db1.address.toString(), db2.address.toString())
console.log("Waiting for peers...")
await waitForPeers(ipfs1, [orbitdb2.id], db1.address.toString())
await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString())
console.log("Found peers")
})
after(async () => {
await db1.drop()
await db2.drop()
if (db1) {
await db1.drop()
}
if (db2) {
await db2.drop()
}
})
it('replicates database of 100 entries and loads it from the disk', async () => {
@ -145,7 +158,7 @@ Object.keys(testAPIs).forEach(API => {
}
resolve()
}
}, 100)
}, 1000)
})
})
})

View File

@ -12,7 +12,6 @@ const {
stopIpfs,
testAPIs,
connectPeers,
waitForPeers,
} = require('orbit-db-test-utils')
const dbPath1 = './orbitdb/tests/replicate-automatically/1'
@ -26,6 +25,7 @@ Object.keys(testAPIs).forEach(API => {
let orbitdb1, orbitdb2, db1, db2, db3, db4
before(async () => {
rmrf.sync('./orbitdb')
rmrf.sync(dbPath1)
rmrf.sync(dbPath2)
ipfsd1 = await startIpfs(API, config.daemon1)
@ -34,130 +34,96 @@ Object.keys(testAPIs).forEach(API => {
ipfs2 = ipfsd2.api
orbitdb1 = await OrbitDB.createInstance(ipfs1, { directory: dbPath1 })
orbitdb2 = await OrbitDB.createInstance(ipfs2, { directory: dbPath2 })
// Connect the peers manually to speed up test times
await connectPeers(ipfs1, ipfs2)
})
after(async () => {
if(orbitdb1)
await orbitdb1.stop()
if(orbitdb2)
await orbitdb2.stop()
if (ipfsd1)
await stopIpfs(ipfsd1)
if (ipfs2)
await stopIpfs(ipfsd2)
})
beforeEach(async () => {
let options = {}
// Set write access for both clients
options.write = [
orbitdb1.identity.publicKey,
orbitdb2.identity.publicKey
],
]
options = Object.assign({}, options, { path: dbPath1 })
options = Object.assign({}, options)
db1 = await orbitdb1.eventlog('replicate-automatically-tests', options)
db3 = await orbitdb1.keyvalue('replicate-automatically-tests-kv', options)
})
afterEach(async () => {
if (db1) await db1.drop()
if (db2) await db2.drop()
if (db3) await db3.drop()
if (db4) await db4.drop()
after(async () => {
if (orbitdb1) {
await orbitdb1.stop()
}
if (orbitdb2) {
await orbitdb2.stop()
}
if (ipfsd1) {
await stopIpfs(ipfsd1)
}
if (ipfs2) {
await stopIpfs(ipfsd2)
}
rmrf.sync(dbPath1)
rmrf.sync(dbPath2)
})
it('starts replicating the database when peers connect', async () => {
const entryCount = 10
const isLocalhostAddress = (addr) => addr.toString().includes('127.0.0.1')
await connectPeers(ipfs1, ipfs2, { filter: isLocalhostAddress })
console.log('Peers connected')
const entryCount = 33
const entryArr = []
let options = {}
let timer
// Create the entries in the first database
for (let i = 0; i < entryCount; i ++)
for (let i = 0; i < entryCount; i++) {
entryArr.push(i)
}
await mapSeries(entryArr, (i) => db1.add('hello' + i))
// Open the second database
options = Object.assign({}, options, { path: dbPath2, sync: true })
db2 = await orbitdb2.eventlog(db1.address.toString(), options)
db2 = await orbitdb2.eventlog(db1.address.toString())
db4 = await orbitdb2.keyvalue(db3.address.toString())
// Listen for the 'replicated' events and check that all the entries
// were replicated to the second database
return new Promise((resolve, reject) => {
db2.events.on('replicated', (address) => {
try {
const result1 = db1.iterator({ limit: -1 }).collect()
const result2 = db2.iterator({ limit: -1 }).collect()
// Make sure we have all the entries
if (result1.length === entryCount && result2.length === entryCount) {
assert.deepEqual(result1, result2)
resolve()
}
} catch (e) {
reject(e)
}
// Check if db2 was already replicated
let all = db2.iterator({ limit: -1 }).collect().length
// Run the test asserts below if replication was done
let finished = (all === entryCount)
db3.events.on('replicated', (address, hash, entry) => {
reject(new Error("db3 should not receive the 'replicated' event!"))
})
})
})
it('automatic replication exchanges the correct heads', async () => {
const entryCount = 33
const entryArr = []
let options = {}
let timer
// Create the entries in the first database
for (let i = 0; i < entryCount; i ++)
entryArr.push(i)
await mapSeries(entryArr, (i) => db1.add('hello' + i))
// Open the second database
options = Object.assign({}, options, { path: dbPath2, sync: true })
db2 = await orbitdb2.eventlog(db1.address.toString(), options)
db4 = await orbitdb2.keyvalue(db3.address.toString(), options)
// Listen for the 'replicated' events and check that all the entries
// were replicated to the second database
return new Promise(async (resolve, reject) => {
db4.events.on('replicated', (address, hash, entry) => {
reject(new Error("Should not receive the 'replicated' event!"))
reject(new Error("db4 should not receive the 'replicated' event!"))
})
// Can't check this for now as db1 might've sent the heads to db2
// before we subscribe to the event
db2.events.on('replicate.progress', (address, hash, entry) => {
try {
// Check that the head we received from the first peer is the latest
assert.equal(entry.payload.op, 'ADD')
assert.equal(entry.payload.key, null)
assert.notEqual(entry.payload.value.indexOf('hello'), -1)
assert.notEqual(entry.clock, null)
} catch (e) {
reject(e)
}
db2.events.on('replicated', (address, length) => {
// Once db2 has finished replication, make sure it has all elements
// and process to the asserts below
all = db2.iterator({ limit: -1 }).collect().length
finished = (all === entryCount)
})
db2.events.on('replicated', (address) => {
try {
const result1 = db1.iterator({ limit: -1 }).collect()
const result2 = db2.iterator({ limit: -1 }).collect()
// Make sure we have all the entries
if (result1.length === entryCount && result2.length === entryCount) {
try {
const timer = setInterval(() => {
if (finished) {
clearInterval(timer)
const result1 = db1.iterator({ limit: -1 }).collect()
const result2 = db2.iterator({ limit: -1 }).collect()
assert.equal(result1.length, result2.length)
assert.deepEqual(result1, result2)
resolve()
}
} catch (e) {
reject(e)
}
})
}, 1000)
} catch (e) {
reject(e)
}
})
})
})

View File

@ -15,29 +15,30 @@ const {
waitForPeers,
} = require('orbit-db-test-utils')
const dbPath1 = './orbitdb/tests/replication/1'
const dbPath2 = './orbitdb/tests/replication/2'
const orbitdbPath1 = './orbitdb/tests/replication/1'
const orbitdbPath2 = './orbitdb/tests/replication/2'
const dbPath1 = './orbitdb/tests/replication/1/db1'
const dbPath2 = './orbitdb/tests/replication/2/db2'
Object.keys(testAPIs).forEach(API => {
describe(`orbit-db - Replication (${API})`, function() {
this.timeout(config.timeout)
this.timeout(config.timeout * 2)
let ipfsd1, ipfsd2, ipfs1, ipfs2
let orbitdb1, orbitdb2, db1, db2
let id1, id2
let timer
let options
before(async () => {
rmrf.sync(dbPath1)
rmrf.sync(dbPath2)
ipfsd1 = await startIpfs(API, config.daemon1)
ipfsd2 = await startIpfs(API, config.daemon2)
ipfs1 = ipfsd1.api
ipfs2 = ipfsd2.api
// Connect the peers manually to speed up test times
await connectPeers(ipfs1, ipfs2)
const isLocalhostAddress = (addr) => addr.toString().includes('127.0.0.1')
await connectPeers(ipfs1, ipfs2, { filter: isLocalhostAddress })
console.log("Peers connected")
})
after(async () => {
@ -50,8 +51,14 @@ Object.keys(testAPIs).forEach(API => {
beforeEach(async () => {
clearInterval(timer)
orbitdb1 = await OrbitDB.createInstance(ipfs1, { directory: dbPath1 })
orbitdb2 = await OrbitDB.createInstance(ipfs2, { directory: dbPath2 })
rmrf.sync(orbitdbPath1)
rmrf.sync(orbitdbPath2)
rmrf.sync(dbPath1)
rmrf.sync(dbPath2)
orbitdb1 = await OrbitDB.createInstance(ipfs1, { directory: orbitdbPath1 })
orbitdb2 = await OrbitDB.createInstance(ipfs2, { directory: orbitdbPath2 })
options = {
// Set write access for both clients
@ -70,6 +77,7 @@ Object.keys(testAPIs).forEach(API => {
afterEach(async () => {
clearInterval(timer)
options = {}
if (db1)
await db1.drop()
@ -84,28 +92,48 @@ Object.keys(testAPIs).forEach(API => {
})
it('replicates database of 1 entry', async () => {
console.log("Waiting for peers to connect")
await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString())
// Set 'sync' flag on. It'll prevent creating a new local database and rather
// fetch the database from the network
options = Object.assign({}, options, { directory: dbPath2, sync: true })
db2 = await orbitdb2.eventlog(db1.address.toString(), options)
await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString())
let finished = false
await db1.add('hello')
return new Promise(resolve => {
setTimeout(() => {
const items = db2.iterator().collect()
assert.equal(items.length, 1)
assert.equal(items[0].payload.value, 'hello')
resolve()
}, 500)
let replicatedEventCount = 0
db2.events.on('replicated', (address, length) => {
replicatedEventCount++
// Once db2 has finished replication, make sure it has all elements
// and process to the asserts below
const all = db2.iterator({ limit: -1 }).collect().length
finished = (all === 1)
})
timer = setInterval(() => {
if (finished) {
clearInterval(timer)
const entries = db2.iterator({ limit: -1 }).collect()
assert.equal(entries.length, 1)
assert.equal(entries[0].payload.value, 'hello')
assert.equal(replicatedEventCount, 1)
resolve()
}
}, 100)
})
})
it('replicates database of 100 entries', async () => {
options = Object.assign({}, options, { directory: dbPath2, sync: true })
db2 = await orbitdb2.eventlog(db1.address.toString(), options)
console.log("Waiting for peers to connect")
await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString())
options = Object.assign({}, options, { directory: dbPath2, sync: true })
db2 = await orbitdb2.eventlog(db1.address.toString(), options)
let finished = false
const entryCount = 100
const entryArr = []
@ -113,6 +141,13 @@ Object.keys(testAPIs).forEach(API => {
entryArr.push(i)
return new Promise(async (resolve, reject) => {
db2.events.on('replicated', () => {
// Once db2 has finished replication, make sure it has all elements
// and process to the asserts below
const all = db2.iterator({ limit: -1 }).collect().length
finished = (all === entryCount)
})
try {
const add = i => db1.add('hello' + i)
await mapSeries(entryArr, add)
@ -121,12 +156,12 @@ Object.keys(testAPIs).forEach(API => {
}
timer = setInterval(() => {
const items = db2.iterator({ limit: -1 }).collect()
if (items.length === entryCount) {
if (finished) {
clearInterval(timer)
assert.equal(items.length, entryCount)
assert.equal(items[0].payload.value, 'hello0')
assert.equal(items[items.length - 1].payload.value, 'hello99')
const entries = db2.iterator({ limit: -1 }).collect()
assert.equal(entries.length, entryCount)
assert.equal(entries[0].payload.value, 'hello0')
assert.equal(entries[entries.length - 1].payload.value, 'hello99')
resolve()
}
}, 100)
@ -134,234 +169,157 @@ Object.keys(testAPIs).forEach(API => {
})
it('emits correct replication info', async () => {
options = Object.assign({}, options, { directory: dbPath2, sync: true })
db2 = await orbitdb2.eventlog(db1.address.toString(), options)
console.log("Waiting for peers to connect")
await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString())
options = Object.assign({}, options, { directory: dbPath2, sync: true })
db2 = await orbitdb2.eventlog(db1.address.toString(), options)
let finished = false
let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0, 'peer.exchanged': 0 }
let events = []
let expectedEventCount = 99
let expectedPeerExchangeCount = 99
const entryCount = 99
db2.events.on('replicate', (address, entry) => {
eventCount['replicate'] ++
events.push({
event: 'replicate',
count: eventCount['replicate'],
entry: entry,
return new Promise(async (resolve, reject) => {
// Test that none of the entries gets into the replication queue twice
const replicateSet = new Set()
db2.events.on('replicate', (address, entry) => {
if (!replicateSet.has(entry.hash)) {
replicateSet.add(entry.hash)
} else {
reject(new Error('Shouldn\'t have started replication twice for entry ' + entry.hash + '\n' + entry.payload.value))
}
})
})
db2.events.on('replicate.progress', (address, hash, entry, progress, total) => {
eventCount['replicate.progress'] ++
events.push({
event: 'replicate.progress',
count: eventCount['replicate.progress'],
entry: entry ,
replicationInfo: {
max: db2.replicationStatus.max,
progress: db2.replicationStatus.progress,
},
// Verify that progress count increases monotonically by saving
// each event's current progress into an array
const progressEvents = []
db2.events.on('replicate.progress', () => {
progressEvents.push(db2.replicationStatus.progress)
})
})
db2.events.on('peer.exchanged', (address, entry) => {
eventCount['peer.exchanged'] ++
events.push({
event: 'peer.exchanged',
count: eventCount['peer.exchanged'],
entry: entry,
db2.events.on('replicated', (address, length) => {
// Once db2 has finished replication, make sure it has all elements
// and process to the asserts below
const all = db2.iterator({ limit: -1 }).collect().length
finished = (all === entryCount)
})
})
db2.events.on('replicated', (address) => {
eventCount['replicated'] ++
events.push({
event: 'replicated',
count: eventCount['replicate'],
replicationInfo: {
max: db2.replicationStatus.max,
progress: db2.replicationStatus.progress,
},
})
// Resolve with a little timeout to make sure we
// don't receive more than one event
setTimeout(() => {
finished = db2.iterator({ limit: -1 }).collect().length === expectedEventCount
}, 1000)
})
return new Promise((resolve, reject) => {
try {
timer = setInterval(() => {
if (finished) {
clearInterval(timer)
assert.equal(eventCount['replicate'], expectedEventCount)
assert.equal(eventCount['replicate.progress'], expectedEventCount)
assert.equal(eventCount['peer.exchanged'] >= expectedPeerExchangeCount, true, 'insuficcient peer.exchanged events fired')
const replicateEvents = events.filter(e => e.event === 'replicate')
const minClock = Math.min(...replicateEvents.filter(e => !!e.entry.clock).map(e => e.entry.clock.time))
assert.equal(replicateEvents.length, expectedEventCount)
assert.equal(replicateEvents[0].entry.payload.value.split(' ')[0], 'hello')
assert.equal(minClock, 1)
const replicateProgressEvents = events.filter(e => e.event === 'replicate.progress')
const minProgressClock = Math.min(...replicateProgressEvents.filter(e => !!e.entry.clock).map(e => e.entry.clock.time))
assert.equal(replicateProgressEvents.length, expectedEventCount)
assert.equal(replicateProgressEvents[0].entry.payload.value.split(' ')[0], 'hello')
assert.equal(minProgressClock, 1)
assert.equal(replicateProgressEvents[0].replicationInfo.max >= 1, true)
assert.equal(replicateProgressEvents[0].replicationInfo.progress, 1)
const replicatedEvents = events.filter(e => e.event === 'replicated')
assert.equal(replicatedEvents[0].replicationInfo.max >= 1, true)
assert.equal(replicatedEvents[0].replicationInfo.progress >= 1, true)
// All entries should be in the database
assert.equal(db2.iterator({ limit: -1 }).collect().length, entryCount)
// progress events should increase monotonically
assert.equal(progressEvents.length, entryCount)
for (const [idx, e] of progressEvents.entries()) {
assert.equal(e, idx + 1)
}
// Verify replication status
assert.equal(db2.replicationStatus.progress, entryCount)
assert.equal(db2.replicationStatus.max, entryCount)
// Verify replicator state
assert.equal(db2._replicator.tasksRunning, 0)
assert.equal(db2._replicator.tasksQueued, 0)
assert.equal(db2._replicator.unfinished.length, 0)
// Replicator's internal caches should be empty
assert.equal(db2._replicator._logs.length, 0)
assert.equal(Object.keys(db2._replicator._fetching).length, 0)
resolve()
}
}, 100)
}, 1000)
} catch (e) {
reject(e)
}
// Trigger replication
let adds = []
for (let i = 0; i < expectedEventCount; i ++) {
for (let i = 0; i < entryCount; i ++) {
adds.push(i)
}
mapSeries(adds, i => db1.add('hello ' + i))
await mapSeries(adds, i => db1.add('hello ' + i))
})
})
it('emits correct replication info on fresh replication', async () => {
return new Promise(async (resolve, reject) => {
let finished = false
let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0, 'peer.exchanged': 0 }
let events = []
let expectedEventCount = 512
let expectedPeerExchangeCount = 1
const entryCount = 512
// Trigger replication
let adds = []
for (let i = 0; i < expectedEventCount; i ++) {
const adds = []
for (let i = 0; i < entryCount; i ++) {
adds.push(i)
}
const add = async (i) => {
process.stdout.write("\rWriting " + (i + 1) + " / " + expectedEventCount + " ")
process.stdout.write("\rWriting " + (i + 1) + " / " + entryCount + " ")
await db1.add('hello ' + i)
}
await mapSeries(adds, add)
console.log()
// Open second instance again
options = {
directory: dbPath2 + '1',
directory: dbPath2,
overwrite: true,
sync: true,
}
db2 = await orbitdb2.eventlog(db1.address.toString(), options)
// Test that none of the entries gets into the replication queue twice
const replicateSet = new Set()
db2.events.on('replicate', (address, entry) => {
eventCount['replicate'] ++
// console.log("[replicate] ", '#' + eventCount['replicate'] + ':', db2.replicationStatus.progress, '/', db2.replicationStatus.max, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished)
events.push({
event: 'replicate',
count: eventCount['replicate'],
entry: entry,
})
})
db2.events.on('replicate.progress', (address, hash, entry) => {
eventCount['replicate.progress'] ++
// console.log("[progress] ", '#' + eventCount['replicate.progress'] + ':', db2.replicationStatus.progress, '/', db2.replicationStatus.max, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished)
// assert.equal(db2.replicationStatus.progress, eventCount['replicate.progress'])
events.push({
event: 'replicate.progress',
count: eventCount['replicate.progress'],
entry: entry ,
replicationInfo: {
max: db2.replicationStatus.max,
progress: db2.replicationStatus.progress,
},
})
})
db2.events.on('peer.exchanged', (address, entry) => {
eventCount['peer.exchanged'] ++
// console.log("[replicate] ", '#' + eventCount['replicate'] + ':', db2.replicationStatus.progress, '/', db2.replicationStatus.max, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished)
events.push({
event: 'peer.exchanged',
count: eventCount['peer.exchanged'],
entry: entry,
})
})
db2.events.on('replicated', (address, length) => {
eventCount['replicated'] += length
// console.log("[replicated]", '#' + eventCount['replicated'] + ':', db2.replicationStatus.progress, '/', db2.replicationStatus.max, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished, "|")
try {
// Test the replicator state
assert.equal(db2._loader.tasksRequested >= db2.replicationStatus.progress, true)
assert.equal(db2.options.referenceCount, 32)
} catch (e) {
reject(e)
if (!replicateSet.has(entry.hash)) {
replicateSet.add(entry.hash)
} else {
reject(new Error('Shouldn\'t have started replication twice for entry ' + entry.hash))
}
events.push({
event: 'replicated',
count: eventCount['replicate'],
replicationInfo: {
max: db2.replicationStatus.max,
progress: db2.replicationStatus.progress,
},
})
// Resolve with a little timeout to make sure we
// don't receive more than one event
setTimeout( async () => {
if (eventCount['replicated'] === expectedEventCount) {
finished = true
}
}, 1000)
})
const st = new Date().getTime()
// Verify that progress count increases monotonically by saving
// each event's current progress into an array
const progressEvents = []
db2.events.on('replicate.progress', (address, hash, entry) => {
progressEvents.push(db2.replicationStatus.progress)
})
let replicatedEventCount = 0
db2.events.on('replicated', (address, length) => {
replicatedEventCount++
// Once db2 has finished replication, make sure it has all elements
// and process to the asserts below
const all = db2.iterator({ limit: -1 }).collect().length
finished = (all === entryCount)
})
timer = setInterval(async () => {
if (finished) {
clearInterval(timer)
const et = new Date().getTime()
console.log("Duration:", et - st, "ms")
try {
assert.equal(eventCount['replicate'], expectedEventCount)
assert.equal(eventCount['replicate.progress'], expectedEventCount)
assert.equal(eventCount['peer.exchanged'], expectedPeerExchangeCount)
const replicateEvents = events.filter(e => e.event === 'replicate')
const maxClock = Math.max(...replicateEvents.filter(e => !!e.entry.clock).map(e => e.entry.clock.time))
assert.equal(replicateEvents.length, expectedEventCount)
assert.equal(replicateEvents[0].entry.payload.value.split(' ')[0], 'hello')
assert.equal(maxClock, expectedEventCount)
const replicateProgressEvents = events.filter(e => e.event === 'replicate.progress')
const maxProgressClock = Math.max(...replicateProgressEvents.filter(e => !!e.entry.clock).map(e => e.entry.clock.time))
const maxReplicationMax = Math.max(...replicateProgressEvents.map(e => e.replicationInfo.max))
assert.equal(replicateProgressEvents.length, expectedEventCount)
assert.equal(replicateProgressEvents[0].entry.payload.value.split(' ')[0], 'hello')
assert.equal(maxProgressClock, expectedEventCount)
assert.equal(maxReplicationMax, expectedEventCount)
assert.equal(replicateProgressEvents[0].replicationInfo.progress, 1)
const replicatedEvents = events.filter(e => e.event === 'replicated')
const replicateMax = Math.max(...replicatedEvents.map(e => e.replicationInfo.max))
assert.equal(replicateMax, expectedEventCount)
assert.equal(replicatedEvents[replicatedEvents.length - 1].replicationInfo.progress, expectedEventCount)
// All entries should be in the database
assert.equal(db2.iterator({ limit: -1 }).collect().length, entryCount)
// 'replicated' event should've been received only once
assert.equal(replicatedEventCount, 1)
// progress events should increase monotonically
assert.equal(progressEvents.length, entryCount)
for (const [idx, e] of progressEvents.entries()) {
assert.equal(e, idx + 1)
}
// Verify replication status
assert.equal(db2.replicationStatus.progress, entryCount)
assert.equal(db2.replicationStatus.max, entryCount)
// Verify replicator state
assert.equal(db2._replicator.tasksRunning, 0)
assert.equal(db2._replicator.tasksQueued, 0)
assert.equal(db2._replicator.unfinished.length, 0)
// Replicator's internal caches should be empty
assert.equal(db2._replicator._logs.length, 0)
assert.equal(Object.keys(db2._replicator._fetching).length, 0)
resolve()
} catch (e) {
@ -374,137 +332,76 @@ Object.keys(testAPIs).forEach(API => {
it('emits correct replication info in two-way replication', async () => {
return new Promise(async (resolve, reject) => {
console.log("Waiting for peers to connect")
await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString())
let finished = false
let eventCount = { 'replicate': 0, 'replicate.progress': 0, 'replicated': 0, 'peer.exchanged': 0 }
let events = []
let expectedEventCount = 100
let expectedPeerExchangeCount = 100
const entryCount = 100
// Trigger replication
let adds = []
for (let i = 0; i < expectedEventCount; i ++) {
const adds = []
for (let i = 0; i < entryCount; i ++) {
adds.push(i)
}
const add = async (i) => {
// process.stdout.write("\rWriting " + (i + 1) + " / " + expectedEventCount)
process.stdout.write("\rWriting " + (i + 1) + " / " + entryCount + " ")
await Promise.all([db1.add('hello-1-' + i), db2.add('hello-2-' + i)])
}
// Open second instance again
let options = {
directory: dbPath2,
directory: dbPath2 + '2',
overwrite: true,
sync: true,
}
db2 = await orbitdb2.eventlog(db1.address.toString(), options)
assert.equal(db1.address.toString(), db2.address.toString())
await waitForPeers(ipfs2, [orbitdb1.id], db1.address.toString())
// Test that none of the entries gets into the replication queue twice
const replicateSet = new Set()
db2.events.on('replicate', (address, entry) => {
eventCount['replicate'] ++
// console.log("[replicate] ", '#' + eventCount['replicate'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished)
events.push({
event: 'replicate',
count: eventCount['replicate'],
entry: entry,
})
})
let prevProgress = 0
db2.events.on('replicate.progress', (address, hash, entry) => {
eventCount['replicate.progress'] ++
// console.log("[progress] ", '#' + eventCount['replicate.progress'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished)
// assert.equal(current, total)
events.push({
event: 'replicate.progress',
count: eventCount['replicate.progress'],
entry: entry ,
replicationInfo: {
max: db2.replicationStatus.max,
progress: db2.replicationStatus.progress,
},
})
})
db2.events.on('peer.exchanged', (address, entry) => {
eventCount['peer.exchanged'] ++
events.push({
event: 'peer.exchanged',
count: eventCount['peer.exchanged'],
entry: entry,
})
if (!replicateSet.has(entry.hash)) {
replicateSet.add(entry.hash)
} else {
reject(new Error('Shouldn\'t have started replication twice for entry ' + entry.hash))
}
})
db2.events.on('replicated', (address, length) => {
eventCount['replicated'] += length
const values = db2.iterator({limit: -1}).collect()
// console.log("[replicated]", '#' + eventCount['replicated'] + ':', current, '/', total, '| Tasks (in/queued/running/out):', db2._loader.tasksRequested, '/', db2._loader.tasksQueued, '/', db2._loader.tasksRunning, '/', db2._loader.tasksFinished, "|", db2._loader._stats.a, db2._loader._stats.b, db2._loader._stats.c, db2._loader._stats.d)
try {
assert.equal(db2.replicationStatus.progress <= db2.replicationStatus.max, true)
} catch (e) {
reject(e)
}
events.push({
event: 'replicated',
count: eventCount['replicate'],
replicationInfo: {
max: db2.replicationStatus.max,
progress: db2.replicationStatus.progress,
},
})
if (db2.replicationStatus.max >= expectedEventCount * 2
&& db2.replicationStatus.progress >= expectedEventCount * 2)
finished = true
// Once db2 has finished replication, make sure it has all elements
// and process to the asserts below
const all = db2.iterator({ limit: -1 }).collect().length
finished = (all === entryCount * 2)
})
const st = new Date().getTime()
try {
await mapSeries(adds, add)
console.log()
timer = setInterval(() => {
if (finished) {
clearInterval(timer)
const et = new Date().getTime()
// console.log("Duration:", et - st, "ms")
assert.equal(eventCount['replicate'], expectedEventCount)
assert.equal(eventCount['replicate.progress'], expectedEventCount)
assert.equal(eventCount['replicated'], expectedEventCount)
assert.equal(eventCount['peer.exchanged'] >= expectedPeerExchangeCount, true, 'insuficcient peer.exchanged events fired')
const replicateEvents = events.filter(e => e.event === 'replicate')
assert.equal(replicateEvents.length, expectedEventCount)
const replicateProgressEvents = events.filter(e => e.event === 'replicate.progress')
const maxProgressClock = Math.max(...replicateProgressEvents.filter(e => !!e.entry.clock).map(e => e.entry.clock.time))
assert.equal(replicateProgressEvents.length, expectedEventCount)
assert.equal(maxProgressClock, expectedEventCount)
assert.equal(db2.replicationStatus.max, expectedEventCount * 2)
assert.equal(db2.replicationStatus.progress, expectedEventCount * 2)
const replicatedEvents = events.filter(e => e.event === 'replicated')
assert.equal(replicatedEvents[replicatedEvents.length - 1].replicationInfo.progress, expectedEventCount * 2)
assert.equal(replicatedEvents[replicatedEvents.length - 1].replicationInfo.max, expectedEventCount * 2)
// Database values should match
const values1 = db1.iterator({limit: -1}).collect()
const values2 = db2.iterator({limit: -1}).collect()
assert.equal(values1.length, values2.length)
assert.deepEqual(values1, values2)
// Test the replicator state
assert.equal(db1._loader.tasksRequested, expectedEventCount)
assert.equal(db1._loader.tasksQueued, 0)
assert.equal(db1._loader.tasksRunning, 0)
assert.equal(db1._loader.tasksFinished, expectedEventCount)
assert.equal(db2._loader.tasksRequested, expectedEventCount)
assert.equal(db2._loader.tasksQueued, 0)
assert.equal(db2._loader.tasksRunning, 0)
assert.equal(db2._loader.tasksFinished, expectedEventCount)
// All entries should be in the database
assert.equal(values1.length, entryCount * 2)
assert.equal(values2.length, entryCount * 2)
// Verify replication status
assert.equal(db2.replicationStatus.progress, entryCount * 2)
assert.equal(db2.replicationStatus.max, entryCount * 2)
// Verify replicator state
assert.equal(db2._replicator.tasksRunning, 0)
assert.equal(db2._replicator.tasksQueued, 0)
assert.equal(db2._replicator.unfinished.length, 0)
// Replicator's internal caches should be empty
assert.equal(db2._replicator._logs.length, 0)
assert.equal(Object.keys(db2._replicator._fetching).length, 0)
resolve()
}

View File

@ -1,14 +1,8 @@
'use strict'
const assert = require('assert')
const mapSeries = require('p-map-series')
const fs = require('fs')
const path = require('path')
const rmrf = require('rimraf')
const levelup = require('levelup')
const leveldown = require('leveldown')
const OrbitDB = require('../src/OrbitDB')
const OrbitDBAddress = require('../src/orbit-db-address')
// Include test utilities
const {
@ -26,7 +20,6 @@ Object.keys(testAPIs).forEach(API => {
this.timeout(config.timeout)
let ipfsd, ipfs, orbitdb1, orbitdb2, db, address
let localDataPath
before(async () => {
rmrf.sync(dbPath1)
@ -50,33 +43,33 @@ Object.keys(testAPIs).forEach(API => {
})
it('has correct initial state', async () => {
assert.deepEqual(db.replicationStatus, { buffered: 0, queued: 0, progress: 0, max: 0 })
assert.deepEqual(db.replicationStatus, { progress: 0, max: 0 })
})
it('has correct replication info after load', async () => {
await db.add('hello')
await db.close()
await db.load()
assert.deepEqual(db.replicationStatus, { buffered: 0, queued: 0, progress: 1, max: 1 })
assert.deepEqual(db.replicationStatus, { progress: 1, max: 1 })
await db.close()
})
it('has correct replication info after close', async () => {
await db.close()
assert.deepEqual(db.replicationStatus, { buffered: 0, queued: 0, progress: 0, max: 0 })
assert.deepEqual(db.replicationStatus, { progress: 0, max: 0 })
})
it('has correct replication info after sync', async () => {
await db.load()
await db.add('hello2')
assert.deepEqual(db.replicationStatus, { buffered: 0, queued: 0, progress: 2, max: 2 })
const db2 = await orbitdb2.log(db.address.toString(), { create: false, sync: false })
const db2 = await orbitdb2.log(db.address.toString(), { create: false })
await db2.sync(db._oplog.heads)
return new Promise((resolve, reject) => {
setTimeout(() => {
try {
assert.deepEqual(db2.replicationStatus, { buffered: 0, queued: 0, progress: 2, max: 2 })
assert.deepEqual(db2.replicationStatus, { progress: 2, max: 2 })
resolve()
} catch (e) {
reject(e)
@ -86,11 +79,11 @@ Object.keys(testAPIs).forEach(API => {
})
it('has correct replication info after loading from snapshot', async () => {
await db._cache._store.open();
await db._cache._store.open()
await db.saveSnapshot()
await db.close()
await db.loadFromSnapshot()
assert.deepEqual(db.replicationStatus, { buffered: 0, queued: 0, progress: 2, max: 2 })
assert.deepEqual(db.replicationStatus, { progress: 2, max: 2 })
})
})
})

View File

@ -2,7 +2,6 @@
const fs = require('fs')
const assert = require('assert')
const mapSeries = require('p-map-series')
const rmrf = require('rimraf')
const OrbitDB = require('../src/OrbitDB')
const Identities = require('orbit-db-identity-provider')
@ -27,7 +26,6 @@ Object.keys(testAPIs).forEach(API => {
let ipfsd, ipfs, orbitdb, keystore, options
let identity1, identity2
let localDataPath
before(async () => {
rmrf.sync(dbPath)

View File

@ -1,16 +1,12 @@
'use strict'
const assert = require('assert')
const mapSeries = require('p-map-series')
const fs = require('fs-extra')
const path = require('path')
const rmrf = require('rimraf')
const levelup = require('levelup')
const leveldown = require('leveldown')
const Zip = require('adm-zip')
const OrbitDB = require('../src/OrbitDB')
const OrbitDBAddress = require('../src/orbit-db-address')
const io = require('orbit-db-io')
const Identities = require('orbit-db-identity-provider')
const migrate = require('localstorage-level-migration')
const Keystore = require('orbit-db-keystore')
@ -34,14 +30,13 @@ const keyFixtures = path.join('./test', 'fixtures', 'keys','QmRfPsKJs9YqTot5krRi
const ipfsFixturesDir = path.join('./test', 'fixtures', 'ipfs')
Object.keys(testAPIs).forEach(API => {
let ipfsFixtures = path.join('./test', 'fixtures', `${API}.zip`)
const ipfsFixtures = path.join('./test', 'fixtures', `${API}.zip`)
describe(`orbit-db - Backward-Compatibility - Open & Load (${API})`, function () {
this.retries(1) // windows...
this.timeout(config.timeout)
let ipfsd, ipfs, orbitdb, db, address, keystore
let localDataPath
let ipfsd, ipfs, orbitdb, db, keystore
before(async () => {
ipfsd = await startIpfs(API, config.daemon1)
@ -65,7 +60,7 @@ Object.keys(testAPIs).forEach(API => {
const store = await storage.createStore(path.join(dbPath, peerId, 'keys'))
keystore = new Keystore(store)
let identity = await Identities.createIdentity({ id: peerId, migrate: migrate(keyFixtures), keystore })
const identity = await Identities.createIdentity({ id: peerId, migrate: migrate(keyFixtures), keystore })
orbitdb = await OrbitDB.createInstance(ipfs, { identity, keystore })
})
@ -79,6 +74,7 @@ Object.keys(testAPIs).forEach(API => {
await stopIpfs(ipfsd)
rmrf.sync(ipfsFixturesDir)
rmrf.sync('./orbitdb')
})
describe('Open & Load - V0 entries', function () {
@ -88,8 +84,8 @@ Object.keys(testAPIs).forEach(API => {
db = await orbitdb.open('/orbitdb/QmWDUfC4zcWJGgc9UHn1X3qQ5KZqBv4KCiCtjnpMmBT8JC/v0-db', { directory: dbPath, accessController: { type: 'legacy-ipfs', skipManifest: true } })
const localFixtures = await db._cache.get('_localHeads')
const remoteFixtures = await db._cache.get('_remoteHeads')
db._cache.set(db.localHeadsPath, localFixtures)
db._cache.set(db.remoteHeadsPath, remoteFixtures)
await db._cache.set(db.localHeadsPath, localFixtures)
await db._cache.set(db.remoteHeadsPath, remoteFixtures)
await db.load()
})