Fix tests (#51)

* Fix error event test in Sync tests

* Fix race condition

* Debug timeouts

* More tests fixing

* Try running local webrtc-star-signalling server for tests

* Fix sync tests

* Skip Log references tests in the browser tests
This commit is contained in:
Haad 2023-03-27 15:09:26 +03:00 committed by GitHub
parent 4fe1b0c1a1
commit fddcd0d775
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 2085 additions and 774 deletions

View File

@ -30,5 +30,7 @@ jobs:
run: npm ci
- name: Run linter
run: npm run lint
- name: Run tests
- name: Run webrtc-star-signalling-server in the background
run: npm run webrtc:background
- name: Run browser tests
run: npm run test:browser

2571
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -26,9 +26,11 @@
"lru": "^3.1.0",
"multiformats": "^11.0.1",
"p-queue": "^7.3.4",
"timeout-abort-controller": "^3.0.0",
"uint8arrays": "^4.0.3"
},
"devDependencies": {
"@libp2p/webrtc-star-signalling-server": "^3.0.0",
"assert": "^2.0.0",
"babel-loader": "^9.1.2",
"c8": "^7.13.0",
@ -67,7 +69,9 @@
"build:tests": "rm -f test/browser/bundle.js* && webpack --config ./conf/webpack.tests.config.js",
"prepublishOnly": "npm run build",
"lint": "standard --env=mocha",
"lint:fix": "standard --fix"
"lint:fix": "standard --fix",
"webrtc": "webrtc-star --port=12345",
"webrtc:background": "webrtc-star --port=12345 &"
},
"standard": {
"env": [

View File

@ -2,6 +2,9 @@ import { pipe } from 'it-pipe'
import PQueue from 'p-queue'
import Path from 'path'
import { EventEmitter } from 'events'
import { TimeoutController } from 'timeout-abort-controller'
const DefaultTimeout = 30000 // 30 seconds
/**
* @description
@ -38,7 +41,7 @@ import { EventEmitter } from 'events'
* otherwise. Defaults to true.
* @return {Sync} The Sync protocol instance.
*/
const Sync = async ({ ipfs, log, events, onSynced, start }) => {
const Sync = async ({ ipfs, log, events, onSynced, start, timeout }) => {
if (!ipfs) throw new Error('An instance of ipfs is required.')
if (!log) throw new Error('An instance of log is required.')
@ -49,6 +52,7 @@ const Sync = async ({ ipfs, log, events, onSynced, start }) => {
const peers = new Set()
events = events || new EventEmitter()
timeout = timeout || DefaultTimeout
let started = false
@ -99,9 +103,11 @@ const Sync = async ({ ipfs, log, events, onSynced, start }) => {
if (peers.has(peerId)) {
return
}
const timeoutController = new TimeoutController(timeout)
const { signal } = timeoutController
try {
peers.add(peerId)
const stream = await ipfs.libp2p.dialProtocol(remotePeer, headsSyncAddress)
const stream = await ipfs.libp2p.dialProtocol(remotePeer, headsSyncAddress, { signal })
await pipe(sendHeads, stream, receiveHeads(peerId))
} catch (e) {
if (e.code === 'ERR_UNSUPPORTED_PROTOCOL') {
@ -110,6 +116,10 @@ const Sync = async ({ ipfs, log, events, onSynced, start }) => {
peers.delete(peerId)
events.emit('error', e)
}
} finally {
if (timeoutController) {
timeoutController.clear()
}
}
} else {
peers.delete(peerId)

View File

@ -1,7 +1,7 @@
const isBrowser = () => typeof window !== 'undefined'
const swarmAddress = isBrowser()
? ['/dns4/wrtc-star1.par.dwebops.pub/tcp/443/wss/p2p-webrtc-star']
? ['/ip4/0.0.0.0/tcp/12345/ws/p2p-webrtc-star']
: ['/ip4/0.0.0.0/tcp/0']
export default {
@ -39,7 +39,7 @@ export default {
config: {
Addresses: {
API: '/ip4/127.0.0.1/tcp/0',
Swarm: isBrowser() ? ['/dns4/wrtc-star1.par.dwebops.pub/tcp/443/wss/p2p-webrtc-star'] : ['/ip4/0.0.0.0/tcp/0'],
Swarm: isBrowser() ? ['/ip4/0.0.0.0/tcp/12345/ws/p2p-webrtc-star'] : ['/ip4/0.0.0.0/tcp/0'],
Gateway: '/ip4/0.0.0.0/tcp/0'
},
Bootstrap: [],
@ -62,7 +62,7 @@ export default {
config: {
Addresses: {
API: '/ip4/127.0.0.1/tcp/0',
Swarm: isBrowser() ? ['/dns4/wrtc-star1.par.dwebops.pub/tcp/443/wss/p2p-webrtc-star'] : ['/ip4/0.0.0.0/tcp/0'],
Swarm: isBrowser() ? ['/ip4/0.0.0.0/tcp/12345/ws/p2p-webrtc-star'] : ['/ip4/0.0.0.0/tcp/0'],
Gateway: '/ip4/0.0.0.0/tcp/0'
},
Bootstrap: [],

View File

@ -15,7 +15,7 @@ const OpLog = { Log, Entry }
const keysPath = './testkeys'
describe('Database - Replication', function () {
this.timeout(30000)
this.timeout(60000)
let ipfs1, ipfs2
let keystore
@ -87,10 +87,11 @@ describe('Database - Replication', function () {
let expectedEntryHash = null
const onConnected = (peerId, heads) => {
replicated = expectedEntryHash && heads.map(e => e.hash).includes(expectedEntryHash)
replicated = expectedEntryHash !== null && heads.map(e => e.hash).includes(expectedEntryHash)
}
const onUpdate = (entry) => {
replicated = expectedEntryHash && entry.hash === expectedEntryHash
replicated = expectedEntryHash !== null && entry.hash === expectedEntryHash
}
db2.events.on('join', onConnected)

View File

@ -85,15 +85,15 @@ describe('Events Database Replication', function () {
})
it('replicates a database', async () => {
let connected = false
let updateCount = 0
let replicated = false
let expectedEntryHash = null
const onConnected = async (peerId, heads) => {
connected = true
const onConnected = (peerId, heads) => {
replicated = expectedEntryHash !== null && heads.map(e => e.hash).includes(expectedEntryHash)
}
const onUpdate = async (peerId) => {
++updateCount
const onUpdate = (entry) => {
replicated = expectedEntryHash !== null && entry.hash === expectedEntryHash
}
const onError = (err) => {
@ -104,8 +104,8 @@ describe('Events Database Replication', function () {
db2 = await EventStore({ OpLog, Database, ipfs: ipfs2, identity: testIdentity2, address: databaseId, accessController, directory: './orbitdb2' })
db2.events.on('join', onConnected)
db1.events.on('join', onConnected)
db2.events.on('update', onUpdate)
db2.events.on('error', onError)
db1.events.on('error', onError)
@ -116,10 +116,9 @@ describe('Events Database Replication', function () {
await db1.add(expected[4])
await db1.add(expected[5])
await db1.add(expected[6])
await db1.add(expected[7])
expectedEntryHash = await db1.add(expected[7])
await waitFor(() => connected, () => true)
await waitFor(() => updateCount > 0, () => true)
await waitFor(() => replicated, () => true)
const all2 = []
for await (const event of db2.iterator()) {
@ -135,15 +134,15 @@ describe('Events Database Replication', function () {
db1 = await EventStore({ OpLog, Database, ipfs: ipfs1, identity: testIdentity1, address: databaseId, accessController, directory: './orbitdb1' })
db2 = await EventStore({ OpLog, Database, ipfs: ipfs2, identity: testIdentity2, address: databaseId, accessController, directory: './orbitdb2' })
let connected = false
let updateCount = 0
let replicated = false
let expectedEntryHash = null
const onConnected = async (peerId, heads) => {
connected = true
const onConnected = (peerId, heads) => {
replicated = expectedEntryHash !== null && heads.map(e => e.hash).includes(expectedEntryHash)
}
const onUpdate = async (peerId) => {
++updateCount
const onUpdate = (entry) => {
replicated = expectedEntryHash !== null && entry.hash === expectedEntryHash
}
const onError = (err) => {
@ -152,6 +151,7 @@ describe('Events Database Replication', function () {
db2.events.on('join', onConnected)
db2.events.on('update', onUpdate)
db2.events.on('error', onError)
db1.events.on('error', onError)
@ -162,10 +162,9 @@ describe('Events Database Replication', function () {
await db1.add(expected[4])
await db1.add(expected[5])
await db1.add(expected[6])
await db1.add(expected[7])
expectedEntryHash = await db1.add(expected[7])
await waitFor(() => connected, () => true)
await waitFor(() => updateCount > 0, () => true)
await waitFor(() => replicated, () => true)
await db1.drop()
await db1.close()

View File

@ -12,7 +12,7 @@ import waitFor from '../../utils/wait-for.js'
const OpLog = { Log, Entry }
const keysPath = './testkeys'
describe('KeyValue Database Replication', function () {
describe('KeyValue-persisted Database Replication', function () {
this.timeout(30000)
let ipfs1, ipfs2
@ -74,15 +74,15 @@ describe('KeyValue Database Replication', function () {
})
it('replicates a database', async () => {
let connected = false
let updateCount = 0
let replicated = false
let expectedEntryHash = null
const onConnected = async (peerId, heads) => {
connected = true
const onConnected = (peerId, heads) => {
replicated = expectedEntryHash !== null && heads.map(e => e.hash).includes(expectedEntryHash)
}
const onUpdate = (entry) => {
++updateCount
replicated = expectedEntryHash !== null && entry.hash === expectedEntryHash
}
const onError = (err) => {
@ -93,8 +93,8 @@ describe('KeyValue Database Replication', function () {
kv2 = await KeyValuePersisted({ KeyValue, OpLog, Database, ipfs: ipfs2, identity: testIdentity2, address: databaseId, accessController, directory: './orbitdb2' })
kv2.events.on('join', onConnected)
kv1.events.on('join', onConnected)
kv2.events.on('update', onUpdate)
kv2.events.on('error', onError)
kv1.events.on('error', onError)
@ -105,10 +105,9 @@ describe('KeyValue Database Replication', function () {
await kv1.del('hello')
await kv1.set('empty', '')
await kv1.del('empty')
await kv1.set('hello', 'friend3')
expectedEntryHash = await kv1.set('hello', 'friend3')
await waitFor(() => connected, () => true)
await waitFor(() => updateCount > 0, () => true)
await waitFor(() => replicated, () => true)
const value0 = await kv2.get('init')
deepStrictEqual(value0, true)
@ -142,15 +141,15 @@ describe('KeyValue Database Replication', function () {
})
it('loads the database after replication', async () => {
let updateCount = 0
let connected = false
let replicated = false
let expectedEntryHash = null
const onConnected = async (peerId, heads) => {
connected = true
const onConnected = (peerId, heads) => {
replicated = expectedEntryHash !== null && heads.map(e => e.hash).includes(expectedEntryHash)
}
const onUpdate = (entry) => {
++updateCount
replicated = expectedEntryHash !== null && entry.hash === expectedEntryHash
}
const onError = (err) => {
@ -161,8 +160,8 @@ describe('KeyValue Database Replication', function () {
kv2 = await KeyValuePersisted({ KeyValue, OpLog, Database, ipfs: ipfs2, identity: testIdentity2, address: databaseId, accessController, directory: './orbitdb2' })
kv2.events.on('join', onConnected)
kv1.events.on('join', onConnected)
kv2.events.on('update', onUpdate)
kv2.events.on('error', onError)
kv1.events.on('error', onError)
@ -173,10 +172,9 @@ describe('KeyValue Database Replication', function () {
await kv1.del('hello')
await kv1.set('empty', '')
await kv1.del('empty')
await kv1.set('hello', 'friend3')
expectedEntryHash = await kv1.set('hello', 'friend3')
await waitFor(() => connected, () => true)
await waitFor(() => updateCount > 0, () => true)
await waitFor(() => replicated, () => true)
await kv1.close()
await kv2.close()

View File

@ -74,15 +74,15 @@ describe('KeyValue Database Replication', function () {
})
it('replicates a database', async () => {
let connected = false
let updateCount = 0
let replicated = false
let expectedEntryHash = null
const onConnected = async (peerId, heads) => {
connected = true
const onConnected = (peerId, heads) => {
replicated = expectedEntryHash !== null && heads.map(e => e.hash).includes(expectedEntryHash)
}
const onUpdate = (entry) => {
++updateCount
replicated = expectedEntryHash !== null && entry.hash === expectedEntryHash
}
const onError = (err) => {
@ -93,8 +93,8 @@ describe('KeyValue Database Replication', function () {
kv2 = await KeyValue({ OpLog, Database, ipfs: ipfs2, identity: testIdentity2, address: databaseId, accessController, directory: './orbitdb2' })
kv2.events.on('join', onConnected)
kv1.events.on('join', onConnected)
kv2.events.on('update', onUpdate)
kv2.events.on('error', onError)
kv1.events.on('error', onError)
@ -105,10 +105,9 @@ describe('KeyValue Database Replication', function () {
await kv1.del('hello')
await kv1.set('empty', '')
await kv1.del('empty')
await kv1.set('hello', 'friend3')
expectedEntryHash = await kv1.set('hello', 'friend3')
await waitFor(() => connected, () => true)
await waitFor(() => updateCount > 0, () => true)
await waitFor(() => replicated, () => true)
const value0 = await kv2.get('init')
deepStrictEqual(value0, true)
@ -142,15 +141,15 @@ describe('KeyValue Database Replication', function () {
})
it('loads the database after replication', async () => {
let updateCount = 0
let connected = false
let replicated = false
let expectedEntryHash = null
const onConnected = async (peerId, heads) => {
connected = true
const onConnected = (peerId, heads) => {
replicated = expectedEntryHash !== null && heads.map(e => e.hash).includes(expectedEntryHash)
}
const onUpdate = (entry) => {
++updateCount
replicated = expectedEntryHash !== null && entry.hash === expectedEntryHash
}
const onError = (err) => {
@ -173,10 +172,9 @@ describe('KeyValue Database Replication', function () {
await kv1.del('hello')
await kv1.set('empty', '')
await kv1.del('empty')
await kv1.set('hello', 'friend3')
expectedEntryHash = await kv1.set('hello', 'friend3')
await waitFor(() => connected, () => true)
await waitFor(() => updateCount > 0, () => true)
await waitFor(() => replicated, () => true)
await kv1.close()
await kv2.close()

View File

@ -7,7 +7,14 @@ import testKeysPath from '../fixtures/test-keys-path.js'
const keysPath = './testkeys'
const isBrowser = () => typeof window !== 'undefined'
describe('Log - References', function () {
if (isBrowser()) {
// Skip these tests when running in the browser since they take a long time
return
}
this.timeout(60000)
let keystore

View File

@ -63,7 +63,7 @@ describe('Log - Replication', function () {
})
describe('replicates logs deterministically', async function () {
const amount = 128 + 1
const amount = 32 + 1
const logId = 'A'
let log1, log2, input1, input2
@ -156,10 +156,12 @@ describe('Log - Replication', function () {
strictEqual(values3[1].payload, 'B1')
strictEqual(values3[2].payload, 'A2')
strictEqual(values3[3].payload, 'B2')
strictEqual(values3[99].payload, 'B50')
strictEqual(values3[100].payload, 'A51')
strictEqual(values3[198].payload, 'A100')
strictEqual(values3[199].payload, 'B100')
strictEqual(values3[18].payload, 'A10')
strictEqual(values3[19].payload, 'B10')
strictEqual(values3[30].payload, 'A16')
strictEqual(values3[31].payload, 'B16')
strictEqual(values3[62].payload, 'A32')
strictEqual(values3[63].payload, 'B32')
})
})
})

View File

@ -137,7 +137,7 @@ describe('orbit-db - Multiple Databases', function () {
})
it('replicates multiple open databases', async () => {
const entryCount = 32
const entryCount = 10
// Write entries to each database
console.log('Writing to databases')
@ -169,7 +169,9 @@ describe('orbit-db - Multiple Databases', function () {
console.log('Waiting for replication to finish')
await waitFor(() => allReplicated(), () => true)
await waitFor(async () => await allReplicated(), () => true, 2000)
console.log('Replication finished')
for (let i = 0; i < databaseInterfaces.length; i++) {
const db = remoteDatabases[i]

View File

@ -141,9 +141,11 @@ describe('Sync protocol', function () {
log2 = await Log(testIdentity2, { logId: 'synclog1', entryStorage: entryStorage2 })
const onSynced = async (bytes) => {
syncedHead = await Entry.decode(bytes)
await log2.joinEntry(syncedHead)
syncedEventFired = true
const entry = await Entry.decode(bytes)
if (await log2.joinEntry(entry)) {
syncedHead = entry
syncedEventFired = true
}
}
const onJoin = (peerId, heads) => {
@ -154,7 +156,9 @@ describe('Sync protocol', function () {
sync1 = await Sync({ ipfs: ipfs1, log: log1, onSynced: () => {} })
sync2 = await Sync({ ipfs: ipfs2, log: log2, onSynced })
sync1.events.on('join', onJoin)
sync2.events.on('join', onJoin)
await waitFor(() => joinEventFired, () => true)
await waitFor(() => syncedEventFired, () => true)
@ -177,13 +181,66 @@ describe('Sync protocol', function () {
strictEqual(sync2.peers.has(String(peerId1)), true)
strictEqual(sync1.peers.has(String(peerId2)), true)
})
})
describe('Eventual Consistency', () => {
let sync1, sync2
let log1, log2
let joinEventFired = false
let syncedHead
before(async () => {
const entryStorage1 = await ComposedStorage(
await LRUStorage({ size: 1000 }),
await IPFSBlockStorage({ ipfs: ipfs1, pin: true })
)
const entryStorage2 = await ComposedStorage(
await LRUStorage({ size: 1000 }),
await IPFSBlockStorage({ ipfs: ipfs2, pin: true })
)
log1 = await Log(testIdentity1, { logId: 'synclog7', entryStorage: entryStorage1 })
log2 = await Log(testIdentity2, { logId: 'synclog7', entryStorage: entryStorage2 })
const onSynced = async (bytes) => {
const entry = await Entry.decode(bytes)
if (await log2.joinEntry(entry)) {
syncedHead = entry
}
}
const onJoin = (peerId, heads) => {
joinEventFired = true
}
sync1 = await Sync({ ipfs: ipfs1, log: log1, onSynced: () => {} })
sync2 = await Sync({ ipfs: ipfs2, log: log2, onSynced })
sync1.events.on('join', onJoin)
sync2.events.on('join', onJoin)
await waitFor(() => joinEventFired, () => true)
})
after(async () => {
if (sync1) {
await sync1.stop()
}
if (sync2) {
await sync2.stop()
}
})
it('is eventually consistent', async () => {
await sync1.add(await log1.append('hello2'))
await sync1.add(await log1.append('hello3'))
await sync1.add(await log1.append('hello4'))
const e2 = await log1.append('hello2')
const e3 = await log1.append('hello3')
const e4 = await log1.append('hello4')
const expected = await log1.append('hello5')
await sync1.add(e3)
await sync1.add(e2)
await sync1.add(e4)
await sync1.add(expected)
await waitFor(() => Entry.isEqual(expected, syncedHead), () => true)
@ -202,6 +259,13 @@ describe('Sync protocol', function () {
all2.unshift(item)
}
deepStrictEqual(all1.map(e => e.payload), [
'hello2',
'hello3',
'hello4',
'hello5'
])
deepStrictEqual(all1, all2)
})
})
@ -481,6 +545,51 @@ describe('Sync protocol', function () {
})
})
describe('Timeouts', () => {
let sync1, sync2
let log1, log2
const timeoutTime = 1 // 1 millisecond
before(async () => {
log1 = await Log(testIdentity1, { logId: 'synclog5' })
log2 = await Log(testIdentity2, { logId: 'synclog5' })
sync1 = await Sync({ ipfs: ipfs1, log: log1, timeout: timeoutTime })
sync2 = await Sync({ ipfs: ipfs2, log: log2, start: false, timeout: timeoutTime })
await log1.append('hello1')
})
after(async () => {
if (sync1) {
await sync1.stop()
}
if (sync2) {
await sync2.stop()
}
})
it('emits an error when connecting to peer was cancelled due to timeout', async () => {
let err = null
const onError = (error) => {
err = error
}
sync1.events.on('error', onError)
sync2.events.on('error', onError)
await sync2.start()
await waitFor(() => err !== null, () => true)
notStrictEqual(err, null)
strictEqual(err.type, 'aborted')
strictEqual(err.message, 'The operation was aborted')
})
})
describe('Events', () => {
let sync1, sync2
let joinEventFired = false