b15dec2f4f
* first pass/port * reworking * authenticated commit parsing * authenticate identity evts * some testing * tidy & add firehose to queue * error handling * fix test * refactor sync queue + some tests * fix race in sync queue * rm firehose from syncqueue * add tests for queue utils * README * lint readme * filter before parsing * pr feedback * small fix * changesets * fix type * Rework dataplane subscription (#2766) * working sync package into appview subscription * add restart method to subscription for tests * fix another test * tidy subscription utils/files * remove dupe property * tidy after merge * fix start cursor on subscription * tweak process full subscription logic * fixes
128 lines
4.1 KiB
TypeScript
128 lines
4.1 KiB
TypeScript
import { AtpAgent } from '@atproto/api'
|
|
import { cborDecode, cborEncode } from '@atproto/common'
|
|
import { DatabaseSchemaType } from '../../src/data-plane/server/db/database-schema'
|
|
import { SeedClient, TestNetwork, basicSeed } from '@atproto/dev-env'
|
|
import { PreparedWrite, sequencer } from '@atproto/pds'
|
|
import { CommitData } from '@atproto/repo'
|
|
import { ids } from '../../src/lexicon/lexicons'
|
|
import { forSnapshot } from '../_util'
|
|
|
|
type Database = TestNetwork['bsky']['db']
|
|
|
|
describe('sync', () => {
|
|
let network: TestNetwork
|
|
let pdsAgent: AtpAgent
|
|
let sc: SeedClient
|
|
|
|
beforeAll(async () => {
|
|
network = await TestNetwork.create({
|
|
dbPostgresSchema: 'bsky_subscription_repo',
|
|
})
|
|
pdsAgent = network.pds.getClient()
|
|
sc = network.getSeedClient()
|
|
await basicSeed(sc)
|
|
})
|
|
|
|
afterAll(async () => {
|
|
await network.close()
|
|
})
|
|
|
|
it('indexes permit history being replayed.', async () => {
|
|
const { db } = network.bsky
|
|
|
|
// Generate some modifications and dupes
|
|
const { alice, bob, carol, dan } = sc.dids
|
|
await sc.follow(alice, bob)
|
|
await sc.follow(carol, alice)
|
|
await sc.follow(bob, alice)
|
|
await sc.follow(dan, bob)
|
|
await sc.like(dan, sc.posts[alice][1].ref) // Identical
|
|
await sc.like(alice, sc.posts[carol][0].ref) // Identical
|
|
await updateProfile(pdsAgent, alice, { displayName: 'ali!' })
|
|
await updateProfile(pdsAgent, bob, { displayName: 'robert!' })
|
|
|
|
await network.processAll()
|
|
|
|
// Table comparator
|
|
const getTableDump = async () => {
|
|
const [actor, post, profile, like, follow, dupes] = await Promise.all([
|
|
dumpTable(db, 'actor', ['did']),
|
|
dumpTable(db, 'post', ['uri']),
|
|
dumpTable(db, 'profile', ['uri']),
|
|
dumpTable(db, 'like', ['creator', 'subject']),
|
|
dumpTable(db, 'follow', ['creator', 'subjectDid']),
|
|
dumpTable(db, 'duplicate_record', ['uri']),
|
|
])
|
|
return { actor, post, profile, like, follow, dupes }
|
|
}
|
|
|
|
// Mark originals
|
|
const originalTableDump = await getTableDump()
|
|
|
|
// Reprocess repos via sync subscription, on top of existing indices
|
|
await network.bsky.sub.restart()
|
|
await network.processAll()
|
|
|
|
// Permissive of indexedAt times changing
|
|
expect(forSnapshot(await getTableDump())).toEqual(
|
|
forSnapshot(originalTableDump),
|
|
)
|
|
})
|
|
|
|
it('indexes actor when commit is unprocessable.', async () => {
|
|
// mock sequencing to create an unprocessable commit event
|
|
const sequenceCommitOrig = network.pds.ctx.sequencer.sequenceCommit
|
|
network.pds.ctx.sequencer.sequenceCommit = async function (
|
|
did: string,
|
|
commitData: CommitData,
|
|
writes: PreparedWrite[],
|
|
) {
|
|
const seqEvt = await sequencer.formatSeqCommit(did, commitData, writes)
|
|
const evt = cborDecode(seqEvt.event) as sequencer.CommitEvt
|
|
evt.blocks = new Uint8Array() // bad blocks
|
|
seqEvt.event = cborEncode(evt)
|
|
return await network.pds.ctx.sequencer.sequenceEvt(seqEvt)
|
|
}
|
|
// create account and index the initial commit event
|
|
await sc.createAccount('jack', {
|
|
handle: 'jack.test',
|
|
email: 'jack@test.com',
|
|
password: 'password',
|
|
})
|
|
await network.processAll()
|
|
// confirm jack was indexed as an actor despite the bad event
|
|
const actors = await dumpTable(network.bsky.db, 'actor', ['did'])
|
|
expect(actors.map((a) => a.handle)).toContain('jack.test')
|
|
network.pds.ctx.sequencer.sequenceCommit = sequenceCommitOrig
|
|
})
|
|
|
|
async function updateProfile(
|
|
agent: AtpAgent,
|
|
did: string,
|
|
record: Record<string, unknown>,
|
|
) {
|
|
return await agent.api.com.atproto.repo.putRecord(
|
|
{
|
|
repo: did,
|
|
collection: ids.AppBskyActorProfile,
|
|
rkey: 'self',
|
|
record,
|
|
},
|
|
{ headers: sc.getHeaders(did), encoding: 'application/json' },
|
|
)
|
|
}
|
|
})
|
|
|
|
async function dumpTable<T extends keyof DatabaseSchemaType>(
|
|
db: Database,
|
|
tableName: T,
|
|
pkeys: (keyof DatabaseSchemaType[T] & string)[],
|
|
) {
|
|
const { ref } = db.db.dynamic
|
|
let builder = db.db.selectFrom(tableName).selectAll()
|
|
pkeys.forEach((key) => {
|
|
builder = builder.orderBy(ref(key))
|
|
})
|
|
return await builder.execute()
|
|
}
|