atproto/packages/sync/tests/firehose.test.ts
Daniel Holmgren b15dec2f4f
Atproto sync package (#2752)
* first pass/port

* reworking

* authenticated commit parsing

* authenticate identity evts

* some testing

* tidy & add firehose to queue

* error handling

* fix test

* refactor sync queue + some tests

* fix race in sync queue

* rm firehose from syncqueue

* add tests for queue utils

* README

* lint readme

* filter before parsing

* pr feedback

* small fix

* changesets

* fix type

* Rework dataplane subscription (#2766)

* working sync package into appview subscription

* add restart method to subscription for tests

* fix another test

* tidy subscription utils/files

* remove dupe property

* tidy after merge

* fix start cursor on subscription

* tweak process full subscription logic

* fixes
2024-09-04 20:18:16 -05:00

181 lines
5.3 KiB
TypeScript

import {
mockResolvers,
SeedClient,
TestNetworkNoAppView,
} from '@atproto/dev-env'
import { Firehose, FirehoseOptions, MemoryRunner } from '../src'
import { IdResolver } from '@atproto/identity'
import { Create, Event } from '../src/events'
import { createDeferrable, wait } from '@atproto/common'
describe('firehose', () => {
let network: TestNetworkNoAppView
let sc: SeedClient
let idResolver: IdResolver
beforeAll(async () => {
network = await TestNetworkNoAppView.create({
dbPostgresSchema: 'sync_firehose',
})
idResolver = new IdResolver({ plcUrl: network.plc.url })
mockResolvers(idResolver, network.pds)
sc = network.getSeedClient()
})
afterAll(async () => {
await network.close()
})
const createAndReadFirehose = async (
count: number,
opts: Partial<FirehoseOptions> = {},
addRandomWait = false,
): Promise<Event[]> => {
const defer = createDeferrable()
const evts: Event[] = []
const firehose = new Firehose({
idResolver,
service: network.pds.url.replace('http', 'ws'),
handleEvent: async (evt) => {
if (addRandomWait) {
const time = Math.floor(Math.random()) * 20
await wait(time)
}
evts.push(evt)
if (evts.length >= count) {
defer.resolve()
}
},
onError: (err) => {
throw err
},
...opts,
})
firehose.start()
await defer.complete
await firehose.destroy()
return evts
}
let alice: string
it('reads events from firehose', async () => {
const evtsPromise = createAndReadFirehose(5)
await wait(10) // give the websocket just a second to spin up
const aliceRes = await sc.createAccount('alice', {
handle: 'alice.test',
email: 'alice@test.com',
password: 'alice-pass',
})
alice = aliceRes.did
await sc.post(alice, 'one')
await sc.post(alice, 'two')
await sc.post(alice, 'three')
const evts = await evtsPromise
expect(evts.length).toBe(5)
expect(evts.at(0)).toMatchObject({
event: 'identity',
did: alice,
handle: aliceRes.handle,
didDocument: {
id: alice,
},
})
expect(evts.at(1)).toMatchObject({
event: 'account',
did: alice,
active: true,
status: undefined,
})
expect(evts.at(2)).toMatchObject({
event: 'create',
did: alice,
collection: 'app.bsky.feed.post',
record: {
text: 'one',
},
})
expect(evts.at(3)).toMatchObject({
event: 'create',
did: alice,
collection: 'app.bsky.feed.post',
record: {
text: 'two',
},
})
expect(evts.at(4)).toMatchObject({
event: 'create',
did: alice,
collection: 'app.bsky.feed.post',
record: {
text: 'three',
},
})
})
it('does not naively pass through invalid handle evts', async () => {
const evtsPromise = createAndReadFirehose(1)
await wait(10) // give the websocket just a second to spin up
await network.pds.ctx.sequencer.sequenceIdentityEvt(
alice,
'bad-handle.test',
)
const evts = await evtsPromise
expect(evts.at(0)).toMatchObject({ handle: 'alice.test' })
})
it('processes events through the sync queue', async () => {
const currCursor = await network.pds.ctx.sequencer.curr()
const runner = new MemoryRunner({
startCursor: currCursor ?? undefined,
})
const evtsPromise = createAndReadFirehose(20, { runner }, true)
const createAndPost = async (name: string) => {
const user = await sc.createAccount('name', {
handle: `${name}.test`,
email: `${name}@example.com`,
password: `${name}-pass`,
})
const did = user.did
const post1 = await sc.post(did, 'one')
const post2 = await sc.post(did, 'two')
const post3 = await sc.post(did, 'three')
return {
did,
post1: post1.ref.uriStr,
post2: post2.ref.uriStr,
post3: post3.ref.uriStr,
}
}
const res = await Promise.all([
createAndPost('user1'),
createAndPost('user2'),
createAndPost('user3'),
createAndPost('user4'),
])
const evts = await evtsPromise
const user1Evts = evts.filter((e) => e.did === res[0].did)
const user2Evts = evts.filter((e) => e.did === res[1].did)
const user3Evts = evts.filter((e) => e.did === res[2].did)
const user4Evts = evts.filter((e) => e.did === res[3].did)
const EVT_ORDER = ['identity', 'account', 'create', 'create', 'create']
expect(user1Evts.map((e) => e.event)).toEqual(EVT_ORDER)
expect(user2Evts.map((e) => e.event)).toEqual(EVT_ORDER)
expect(user3Evts.map((e) => e.event)).toEqual(EVT_ORDER)
expect(user4Evts.map((e) => e.event)).toEqual(EVT_ORDER)
expect(
user1Evts.slice(2, 5).map((e) => (e as Create).uri.toString()),
).toEqual([res[0].post1, res[0].post2, res[0].post3])
expect(
user2Evts.slice(2, 5).map((e) => (e as Create).uri.toString()),
).toEqual([res[1].post1, res[1].post2, res[1].post3])
expect(
user3Evts.slice(2, 5).map((e) => (e as Create).uri.toString()),
).toEqual([res[2].post1, res[2].post2, res[2].post3])
expect(
user4Evts.slice(2, 5).map((e) => (e as Create).uri.toString()),
).toEqual([res[3].post1, res[3].post2, res[3].post3])
})
})