* schema * reset rate limit codegen * codegen * send prev cids on firehose * fix test * fix some test compiler errors & add experimental note * fix linting * build branch * add prevData to commit event * fix cbor undefined err * add sibling proofs to relevant blocks * bump depth of obj in test * fix bug on right sibling proof & add some tests * another test * refactor proof construction * more tests * factor into fixtures * fix styles in json * lint: import ordering * pr feedback * add invertible op test * remove prev from outgoing events * return to original proof construction * dont build branch * changeset
105 lines
2.8 KiB
TypeScript
105 lines
2.8 KiB
TypeScript
import { CID } from 'multiformats/cid'
|
|
import { AtUri } from '@atproto/api'
|
|
import { SeedClient, TestNetworkNoAppView } from '@atproto/dev-env'
|
|
import * as repo from '@atproto/repo'
|
|
import { Subscription } from '@atproto/xrpc-server'
|
|
import {
|
|
OutputSchema as SubscribeReposOutput,
|
|
RepoOp,
|
|
isCommit,
|
|
} from '../../src/lexicon/types/com/atproto/sync/subscribeRepos'
|
|
import basicSeed from '../seeds/basic'
|
|
|
|
describe('invertible ops', () => {
|
|
let network: TestNetworkNoAppView
|
|
let sc: SeedClient
|
|
|
|
beforeAll(async () => {
|
|
network = await TestNetworkNoAppView.create({
|
|
dbPostgresSchema: 'repo_invertible_ops',
|
|
})
|
|
sc = network.getSeedClient()
|
|
await basicSeed(sc)
|
|
|
|
let posts: AtUri[] = []
|
|
for (let i = 0; i < 20; i++) {
|
|
const [aliceRef, bobRef, carolRef, danRef] = await Promise.all([
|
|
sc.post(sc.dids.alice, 'test'),
|
|
sc.post(sc.dids.bob, 'test'),
|
|
sc.post(sc.dids.carol, 'test'),
|
|
sc.post(sc.dids.dan, 'test'),
|
|
])
|
|
posts = [
|
|
...posts,
|
|
aliceRef.ref.uri,
|
|
bobRef.ref.uri,
|
|
carolRef.ref.uri,
|
|
danRef.ref.uri,
|
|
]
|
|
}
|
|
for (const post of posts) {
|
|
await sc.deletePost(post.hostname, post)
|
|
}
|
|
|
|
await network.processAll()
|
|
})
|
|
|
|
afterAll(async () => {
|
|
await network.close()
|
|
})
|
|
|
|
it('works', async () => {
|
|
const currSeq = (await network.pds.ctx.sequencer.curr()) ?? 0
|
|
|
|
const sub = new Subscription({
|
|
service: network.pds.url.replace('http://', 'ws://'),
|
|
method: 'com.atproto.sync.subscribeRepos',
|
|
validate: (value: unknown): SubscribeReposOutput => {
|
|
return value as any
|
|
},
|
|
getParams: () => {
|
|
return { cursor: 0 }
|
|
},
|
|
})
|
|
|
|
for await (const evt of sub) {
|
|
if (!isCommit(evt)) {
|
|
continue
|
|
}
|
|
const prevData = evt.prevData as CID | undefined
|
|
if (!prevData) {
|
|
continue
|
|
}
|
|
|
|
const { blocks, root } = await repo.readCarWithRoot(
|
|
evt.blocks as Uint8Array,
|
|
)
|
|
const storage = new repo.MemoryBlockstore(blocks)
|
|
const slice = await repo.Repo.load(storage, root)
|
|
|
|
let data = slice.data
|
|
const ops = evt.ops as RepoOp[]
|
|
for (const op of ops) {
|
|
if (op.action === 'create') {
|
|
data = await data.delete(op.path)
|
|
} else if (op.action === 'update') {
|
|
if (!op.prev) throw new Error('missing prev')
|
|
data = await data.update(op.path, op.prev)
|
|
} else if (op.action === 'delete') {
|
|
if (!op.prev) throw new Error('missing prev')
|
|
data = await data.add(op.path, op.prev)
|
|
} else {
|
|
throw new Error('unknown action')
|
|
}
|
|
}
|
|
|
|
const invertedRoot = await data.getPointer()
|
|
expect(invertedRoot.equals(prevData)).toBe(true)
|
|
|
|
if (evt.seq >= currSeq) {
|
|
break
|
|
}
|
|
}
|
|
})
|
|
})
|