f689bd51a2
* refactor(crypto): remove circular dependency * refactor(crypto): expose compress/decompress as part of the DidKeyPlugin interface * fix(crypto): remove import from private file * refactor: isolate tsconfig * fix: remove unused bench file * chore(repo): remove unused deps * fix(ozone): properly list dependencies * fix(services): do lint js files * fix(services/pds): remove unused deps * chore(pds): remove bench * chore(dev-env): remove unused deps * chore(api): remove bench * remove unused babel.config.js files * fix: remove .ts extension from import * fix(pds): remove imports of src files * fix(tsconfig): properly list all projects * fix(dev-env): remove imports of src files * fix(bsky): remove direct import to crypto src * fix(api): remove imports to api internals * chore(build): prevent bundling of built output * chore(dev): add "dev" script to build in watch mode * chore(deps): move ts-node dependency where it is actually used * fix(deps): add dev-env as project dependency * fix(xrpc-server): properly type kexicon * fix(bsky): improve typings * fix(pds): fully type formatRecordEmbedInternal return value * fix(repo): remove imports from @ipld/car/api * feat(dev-env): re-export BskyIngester * fix: properly lint & type jest config & test files * fix(ci): test after build * fix(types): use NodeJS.Timeout instead of NodeJS.Timer * fix(bsky): make types exportable * fix(ozone): make types exportable * fix(xrpc-server): make types exportable * fix(xprc-server): make code compliant with "node" types * fix(xrpc-server): avoid accessing properties of unknown * chore(deps): update @types/node * feat(tsconfig): narrow down available types depending on the package's target environment * fix(pds): remove unused prop * fix(bsync): Database's migrator not always initialized * fix(dev-env): remove unreachable code * fix(xrpc-server): remove unused import * fix(xrpc-server): mark header property as abstract * fix(pds): initialize LeakyTxPlugin's txOver property * fix(bsky): initialize LeakyTxPlugin's txOver property * fix(bsky): remove unused migrator from DatabaseCoordinator * fix(bsky): Properly initialize LabelService's cache property * fix(ozone): Database's migrator not initialized * fix(ozone): initialize LeakyTxPlugin's txOver property * fix(crypto): ignore unused variable error * feat(tsconfig): use stricter rules * feat(tsconfig): enable useDefineForClassFields * feat(xrpc-server): add support for brotli incoming payload * fix(xrpc-server): properly parse & process content-encoding * fix(common:stream): always call cb in _transform * tidy/fix tests and service entrypoints * Revert "fix(xrpc-server): properly parse & process content-encoding" This reverts commit 2b1c66e153820d3e128fc839fcc1834d52a66686. * Revert "feat(xrpc-server): add support for brotli incoming payload" This reverts commit e710c21e6118214ddf215b0515e68cb87299a952. * remove special node env for tests (defaults to jest val of "test") * kill mute sync handler on disconnect * work around connect-es bug w/ request aborts * style(crypto): rename imports from uint8arrays * fix update package-lock * fix lint * force hbs files to be bundled as cjs * fix: use concurrently instead of npm-run-all npm-run-all seems not to be maintained anymore. Additionally, concurrently better forwards signals to child processes. * remove concurrently alltogether * ignore sqlite files in services/pds * fix verify * fix verify * tidy, fix verify * fix blob diversion test * build rework changeset --------- Co-authored-by: Devin Ivy <devinivy@gmail.com>
228 lines
6.6 KiB
TypeScript
228 lines
6.6 KiB
TypeScript
import { EXAMPLE_LABELER, TestNetwork } from '@atproto/dev-env'
|
|
import { readFromGenerator, wait } from '@atproto/common'
|
|
import { LabelsEvt, Sequencer } from '../src/sequencer'
|
|
import Outbox from '../src/sequencer/outbox'
|
|
import { randomStr } from '@atproto/crypto'
|
|
import { Label } from '../src/lexicon/types/com/atproto/label/defs'
|
|
|
|
describe('sequencer', () => {
|
|
let network: TestNetwork
|
|
let sequencer: Sequencer
|
|
|
|
let totalEvts = 0
|
|
let lastSeen: number
|
|
|
|
beforeAll(async () => {
|
|
network = await TestNetwork.create({
|
|
dbPostgresSchema: 'ozone_sequencer',
|
|
})
|
|
// @ts-expect-error
|
|
sequencer = network.ozone.ctx.sequencer
|
|
})
|
|
|
|
afterAll(async () => {
|
|
await network.close()
|
|
})
|
|
|
|
const loadFromDb = (lastSeen: number) => {
|
|
return sequencer.db.db
|
|
.selectFrom('label')
|
|
.selectAll()
|
|
.where('id', '>', lastSeen)
|
|
.orderBy('id', 'asc')
|
|
.execute()
|
|
}
|
|
|
|
const evtToDbRow = (e: LabelsEvt) => {
|
|
const { ver: _, ...label } = e.labels[0]
|
|
return {
|
|
id: e.seq,
|
|
...label,
|
|
neg: !!label.neg,
|
|
cid: label.cid ? label.cid : '',
|
|
exp: null,
|
|
sig: label.sig ? Buffer.from(label.sig) : null,
|
|
signingKeyId: network.ozone.ctx.signingKeyId,
|
|
}
|
|
}
|
|
|
|
const caughtUp = (outbox: Outbox): (() => Promise<boolean>) => {
|
|
return async () => {
|
|
const lastEvt = await outbox.sequencer.curr()
|
|
if (lastEvt === null) return true
|
|
return outbox.lastSeen >= (lastEvt ?? 0)
|
|
}
|
|
}
|
|
|
|
const createLabels = async (count: number): Promise<Label[]> => {
|
|
const labels: Label[] = []
|
|
for (let i = 0; i < count; i++) {
|
|
const did = `did:example:${randomStr(10, 'base32')}`
|
|
const label = {
|
|
src: EXAMPLE_LABELER,
|
|
uri: did,
|
|
val: 'spam',
|
|
neg: false,
|
|
cts: new Date().toISOString(),
|
|
}
|
|
await network.ozone.ctx.db.transaction((dbTxn) =>
|
|
network.ozone.ctx.modService(dbTxn).createLabels([label]),
|
|
)
|
|
labels.push(label)
|
|
}
|
|
return labels
|
|
}
|
|
|
|
it('sends to outbox', async () => {
|
|
const count = 20
|
|
totalEvts += count
|
|
await createLabels(count)
|
|
const outbox = new Outbox(sequencer)
|
|
const evts = await readFromGenerator(outbox.events(-1), caughtUp(outbox))
|
|
expect(evts.length).toBe(totalEvts)
|
|
|
|
const fromDb = await loadFromDb(-1)
|
|
expect(evts.map(evtToDbRow)).toEqual(fromDb)
|
|
|
|
lastSeen = evts.at(-1)?.seq ?? lastSeen
|
|
})
|
|
|
|
it('sequences negative labels', async () => {
|
|
const count = 5
|
|
totalEvts += count
|
|
const created = await createLabels(count)
|
|
const toNegate = created
|
|
.slice(0, 2)
|
|
.map((l) => ({ ...l, neg: true, cts: new Date().toISOString() }))
|
|
await network.ozone.ctx
|
|
.modService(network.ozone.ctx.db)
|
|
.createLabels(toNegate)
|
|
|
|
const outbox = new Outbox(sequencer)
|
|
const evts = await readFromGenerator(
|
|
outbox.events(lastSeen),
|
|
caughtUp(outbox),
|
|
)
|
|
expect(evts.length).toBe(count)
|
|
|
|
const fromDb = await loadFromDb(lastSeen)
|
|
expect(evts.map(evtToDbRow)).toEqual(fromDb)
|
|
expect(evts[3].labels[0].uri).toEqual(toNegate[0].uri)
|
|
expect(evts[3].labels[0].neg).toBe(true)
|
|
expect(evts[4].labels[0].uri).toEqual(toNegate[1].uri)
|
|
expect(evts[4].labels[0].neg).toBe(true)
|
|
|
|
lastSeen = evts.at(-1)?.seq ?? lastSeen
|
|
})
|
|
|
|
it('handles cut over', async () => {
|
|
const count = 20
|
|
totalEvts += count
|
|
const outbox = new Outbox(sequencer)
|
|
const createPromise = createLabels(count)
|
|
const [evts] = await Promise.all([
|
|
readFromGenerator(outbox.events(-1), caughtUp(outbox), createPromise),
|
|
createPromise,
|
|
])
|
|
expect(evts.length).toBe(totalEvts)
|
|
|
|
const fromDb = await loadFromDb(-1)
|
|
expect(evts.map(evtToDbRow)).toEqual(fromDb)
|
|
|
|
lastSeen = evts.at(-1)?.seq ?? lastSeen
|
|
})
|
|
|
|
it('only gets events after cursor', async () => {
|
|
const count = 20
|
|
totalEvts += count
|
|
const outbox = new Outbox(sequencer)
|
|
const createPromise = createLabels(count)
|
|
const [evts] = await Promise.all([
|
|
readFromGenerator(
|
|
outbox.events(lastSeen),
|
|
caughtUp(outbox),
|
|
createPromise,
|
|
),
|
|
createPromise,
|
|
])
|
|
|
|
// +1 because we send the lastSeen date as well
|
|
expect(evts.length).toBe(count)
|
|
|
|
const fromDb = await loadFromDb(lastSeen)
|
|
expect(evts.map(evtToDbRow)).toEqual(fromDb)
|
|
|
|
lastSeen = evts.at(-1)?.seq ?? lastSeen
|
|
})
|
|
|
|
it('buffers events that are not being read', async () => {
|
|
const count = 20
|
|
totalEvts += count
|
|
const outbox = new Outbox(sequencer)
|
|
const createPromise = createLabels(count)
|
|
const gen = outbox.events(lastSeen)
|
|
// read enough to start streaming then wait so that the rest go into the buffer,
|
|
// then stream out from buffer
|
|
const [firstPart] = await Promise.all([
|
|
readFromGenerator(gen, caughtUp(outbox), createPromise, 5),
|
|
createPromise,
|
|
])
|
|
const secondPart = await readFromGenerator(
|
|
gen,
|
|
caughtUp(outbox),
|
|
createPromise,
|
|
)
|
|
const evts = [...firstPart, ...secondPart]
|
|
expect(evts.length).toBe(count)
|
|
|
|
const fromDb = await loadFromDb(lastSeen)
|
|
expect(evts.map(evtToDbRow)).toEqual(fromDb)
|
|
|
|
lastSeen = evts.at(-1)?.seq ?? lastSeen
|
|
})
|
|
|
|
it('errors when buffer is overloaded', async () => {
|
|
const count = 20
|
|
totalEvts += count
|
|
const outbox = new Outbox(sequencer, { maxBufferSize: 5 })
|
|
const gen = outbox.events(lastSeen)
|
|
const createPromise = createLabels(count)
|
|
// read enough to start streaming then wait to stream rest until buffer is overloaded
|
|
const overloadBuffer = async () => {
|
|
await Promise.all([
|
|
readFromGenerator(gen, caughtUp(outbox), createPromise, 5),
|
|
createPromise,
|
|
])
|
|
await wait(500)
|
|
await readFromGenerator(gen, caughtUp(outbox), createPromise)
|
|
}
|
|
await expect(overloadBuffer).rejects.toThrow('Stream consumer too slow')
|
|
|
|
await createPromise
|
|
|
|
const fromDb = await loadFromDb(lastSeen)
|
|
lastSeen = fromDb.at(-1)?.id ?? lastSeen
|
|
})
|
|
|
|
it('handles many open connections', async () => {
|
|
const count = 20
|
|
const outboxes: Outbox[] = []
|
|
for (let i = 0; i < 50; i++) {
|
|
outboxes.push(new Outbox(sequencer))
|
|
}
|
|
const createPromise = createLabels(count)
|
|
const readOutboxes = Promise.all(
|
|
outboxes.map((o) =>
|
|
readFromGenerator(o.events(lastSeen), caughtUp(o), createPromise),
|
|
),
|
|
)
|
|
const [results] = await Promise.all([readOutboxes, createPromise])
|
|
const fromDb = await loadFromDb(lastSeen)
|
|
for (const result of results) {
|
|
expect(result.length).toBe(count)
|
|
expect(result.map(evtToDbRow)).toEqual(fromDb)
|
|
}
|
|
lastSeen = results[0].at(-1)?.seq ?? lastSeen
|
|
})
|
|
})
|