atproto/packages/pds/tests/sync/invertible-ops.test.ts
Daniel Holmgren 7e3678c089
Send prevs on firehose (#3449)
* schema

* reset rate limit codegen

* codegen

* send prev cids on firehose

* fix test

* fix some test compiler errors & add experimental note

* fix linting

* build branch

* add prevData to commit event

* fix cbor undefined err

* add sibling proofs to relevant blocks

* bump depth of obj in test

* fix bug on right sibling proof & add some tests

* another test

* refactor proof construction

* more tests

* factor into fixtures

* fix styles in json

* lint: import ordering

* pr feedback

* add invertible op test

* remove prev from outgoing events

* return to original proof construction

* dont build branch

* changeset
2025-02-21 15:01:08 -06:00

105 lines
2.8 KiB
TypeScript

import { CID } from 'multiformats/cid'
import { AtUri } from '@atproto/api'
import { SeedClient, TestNetworkNoAppView } from '@atproto/dev-env'
import * as repo from '@atproto/repo'
import { Subscription } from '@atproto/xrpc-server'
import {
OutputSchema as SubscribeReposOutput,
RepoOp,
isCommit,
} from '../../src/lexicon/types/com/atproto/sync/subscribeRepos'
import basicSeed from '../seeds/basic'
describe('invertible ops', () => {
let network: TestNetworkNoAppView
let sc: SeedClient
beforeAll(async () => {
network = await TestNetworkNoAppView.create({
dbPostgresSchema: 'repo_invertible_ops',
})
sc = network.getSeedClient()
await basicSeed(sc)
let posts: AtUri[] = []
for (let i = 0; i < 20; i++) {
const [aliceRef, bobRef, carolRef, danRef] = await Promise.all([
sc.post(sc.dids.alice, 'test'),
sc.post(sc.dids.bob, 'test'),
sc.post(sc.dids.carol, 'test'),
sc.post(sc.dids.dan, 'test'),
])
posts = [
...posts,
aliceRef.ref.uri,
bobRef.ref.uri,
carolRef.ref.uri,
danRef.ref.uri,
]
}
for (const post of posts) {
await sc.deletePost(post.hostname, post)
}
await network.processAll()
})
afterAll(async () => {
await network.close()
})
it('works', async () => {
const currSeq = (await network.pds.ctx.sequencer.curr()) ?? 0
const sub = new Subscription({
service: network.pds.url.replace('http://', 'ws://'),
method: 'com.atproto.sync.subscribeRepos',
validate: (value: unknown): SubscribeReposOutput => {
return value as any
},
getParams: () => {
return { cursor: 0 }
},
})
for await (const evt of sub) {
if (!isCommit(evt)) {
continue
}
const prevData = evt.prevData as CID | undefined
if (!prevData) {
continue
}
const { blocks, root } = await repo.readCarWithRoot(
evt.blocks as Uint8Array,
)
const storage = new repo.MemoryBlockstore(blocks)
const slice = await repo.Repo.load(storage, root)
let data = slice.data
const ops = evt.ops as RepoOp[]
for (const op of ops) {
if (op.action === 'create') {
data = await data.delete(op.path)
} else if (op.action === 'update') {
if (!op.prev) throw new Error('missing prev')
data = await data.update(op.path, op.prev)
} else if (op.action === 'delete') {
if (!op.prev) throw new Error('missing prev')
data = await data.add(op.path, op.prev)
} else {
throw new Error('unknown action')
}
}
const invertedRoot = await data.getPointer()
expect(invertedRoot.equals(prevData)).toBe(true)
if (evt.seq >= currSeq) {
break
}
}
})
})