Daniel Holmgren b15dec2f4f
Atproto sync package (#2752)
* first pass/port

* reworking

* authenticated commit parsing

* authenticate identity evts

* some testing

* tidy & add firehose to queue

* error handling

* fix test

* refactor sync queue + some tests

* fix race in sync queue

* rm firehose from syncqueue

* add tests for queue utils

* README

* lint readme

* filter before parsing

* pr feedback

* small fix

* changesets

* fix type

* Rework dataplane subscription (#2766)

* working sync package into appview subscription

* add restart method to subscription for tests

* fix another test

* tidy subscription utils/files

* remove dupe property

* tidy after merge

* fix start cursor on subscription

* tweak process full subscription logic

* fixes
2024-09-04 20:18:16 -05:00

827 lines
25 KiB
TypeScript

import { sql } from 'kysely'
import { CID } from 'multiformats/cid'
import { cidForCbor, TID } from '@atproto/common'
import { repoPrepare } from '@atproto/pds'
import { WriteOpAction } from '@atproto/repo'
import { AtUri } from '@atproto/syntax'
import {
AppBskyActorProfile,
AppBskyFeedPost,
AppBskyFeedLike,
AppBskyFeedRepost,
AppBskyGraphFollow,
AtpAgent,
} from '@atproto/api'
import { TestNetwork, SeedClient, usersSeed, basicSeed } from '@atproto/dev-env'
import { forSnapshot } from '../_util'
import { ids } from '../../src/lexicon/lexicons'
import { Database } from '../../src/data-plane/server/db'
describe('indexing', () => {
let network: TestNetwork
let agent: AtpAgent
let pdsAgent: AtpAgent
let sc: SeedClient
let db: Database
beforeAll(async () => {
network = await TestNetwork.create({
dbPostgresSchema: 'bsky_indexing',
})
agent = network.bsky.getClient()
pdsAgent = network.pds.getClient()
sc = network.getSeedClient()
db = network.bsky.db
await usersSeed(sc)
// Data in tests is not processed from subscription
await network.processAll()
await network.bsky.sub.destroy()
})
afterAll(async () => {
await network.close()
})
it('indexes posts.', async () => {
const createdAt = new Date().toISOString()
const createRecord = await prepareCreate({
did: sc.dids.alice,
collection: ids.AppBskyFeedPost,
record: {
$type: ids.AppBskyFeedPost,
text: '@bob.test how are you?',
facets: [
{
index: { byteStart: 0, byteEnd: 9 },
features: [
{
$type: `${ids.AppBskyRichtextFacet}#mention`,
did: sc.dids.bob,
},
],
},
],
createdAt,
} as AppBskyFeedPost.Record,
})
const [uri] = createRecord
const updateRecord = await prepareUpdate({
did: sc.dids.alice,
collection: ids.AppBskyFeedPost,
rkey: uri.rkey,
record: {
$type: ids.AppBskyFeedPost,
text: '@carol.test how are you?',
facets: [
{
index: { byteStart: 0, byteEnd: 11 },
features: [
{
$type: `${ids.AppBskyRichtextFacet}#mention`,
did: sc.dids.carol,
},
],
},
],
createdAt,
} as AppBskyFeedPost.Record,
})
const deleteRecord = prepareDelete({
did: sc.dids.alice,
collection: ids.AppBskyFeedPost,
rkey: uri.rkey,
})
// Create
await network.bsky.sub.indexingSvc.indexRecord(...createRecord)
const getAfterCreate = await agent.api.app.bsky.feed.getPostThread(
{ uri: uri.toString() },
{
headers: await network.serviceHeaders(
sc.dids.alice,
ids.AppBskyFeedGetPostThread,
),
},
)
expect(forSnapshot(getAfterCreate.data)).toMatchSnapshot()
const createNotifications = await getNotifications(db, uri)
// Update
await network.bsky.sub.indexingSvc.indexRecord(...updateRecord)
const getAfterUpdate = await agent.api.app.bsky.feed.getPostThread(
{ uri: uri.toString() },
{
headers: await network.serviceHeaders(
sc.dids.alice,
ids.AppBskyFeedGetPostThread,
),
},
)
expect(forSnapshot(getAfterUpdate.data)).toMatchSnapshot()
const updateNotifications = await getNotifications(db, uri)
// Delete
await network.bsky.sub.indexingSvc.deleteRecord(...deleteRecord)
const getAfterDelete = agent.api.app.bsky.feed.getPostThread(
{ uri: uri.toString() },
{
headers: await network.serviceHeaders(
sc.dids.alice,
ids.AppBskyFeedGetPostThread,
),
},
)
await expect(getAfterDelete).rejects.toThrow(/Post not found:/)
const deleteNotifications = await getNotifications(db, uri)
expect(
forSnapshot({
createNotifications,
updateNotifications,
deleteNotifications,
}),
).toMatchSnapshot()
})
it('indexes profiles.', async () => {
const createRecord = await prepareCreate({
did: sc.dids.dan,
collection: ids.AppBskyActorProfile,
rkey: 'self',
record: {
$type: ids.AppBskyActorProfile,
displayName: 'dan',
} as AppBskyActorProfile.Record,
})
const [uri] = createRecord
const updateRecord = await prepareUpdate({
did: sc.dids.dan,
collection: ids.AppBskyActorProfile,
rkey: uri.rkey,
record: {
$type: ids.AppBskyActorProfile,
displayName: 'danny',
} as AppBskyActorProfile.Record,
})
const deleteRecord = prepareDelete({
did: sc.dids.dan,
collection: ids.AppBskyActorProfile,
rkey: uri.rkey,
})
// Create
await network.bsky.sub.indexingSvc.indexRecord(...createRecord)
const getAfterCreate = await agent.api.app.bsky.actor.getProfile(
{ actor: sc.dids.dan },
{
headers: await network.serviceHeaders(
sc.dids.alice,
ids.AppBskyActorGetProfile,
),
},
)
expect(forSnapshot(getAfterCreate.data)).toMatchSnapshot()
// Update
await network.bsky.sub.indexingSvc.indexRecord(...updateRecord)
const getAfterUpdate = await agent.api.app.bsky.actor.getProfile(
{ actor: sc.dids.dan },
{
headers: await network.serviceHeaders(
sc.dids.alice,
ids.AppBskyActorGetProfile,
),
},
)
expect(forSnapshot(getAfterUpdate.data)).toMatchSnapshot()
// Delete
await network.bsky.sub.indexingSvc.deleteRecord(...deleteRecord)
const getAfterDelete = await agent.api.app.bsky.actor.getProfile(
{ actor: sc.dids.dan },
{
headers: await network.serviceHeaders(
sc.dids.alice,
ids.AppBskyActorGetProfile,
),
},
)
expect(forSnapshot(getAfterDelete.data)).toMatchSnapshot()
})
it('handles post aggregations out of order.', async () => {
const createdAt = new Date().toISOString()
const originalPost = await prepareCreate({
did: sc.dids.alice,
collection: ids.AppBskyFeedPost,
record: {
$type: ids.AppBskyFeedPost,
text: 'original post',
createdAt,
} as AppBskyFeedPost.Record,
})
const originalPostRef = {
uri: originalPost[0].toString(),
cid: originalPost[1].toString(),
}
const reply = await prepareCreate({
did: sc.dids.bob,
collection: ids.AppBskyFeedPost,
record: {
$type: ids.AppBskyFeedPost,
text: 'reply post',
reply: {
root: originalPostRef,
parent: originalPostRef,
},
createdAt,
} as AppBskyFeedPost.Record,
})
const like = await prepareCreate({
did: sc.dids.bob,
collection: ids.AppBskyFeedLike,
record: {
$type: ids.AppBskyFeedLike,
subject: originalPostRef,
createdAt,
} as AppBskyFeedLike.Record,
})
const repost = await prepareCreate({
did: sc.dids.bob,
collection: ids.AppBskyFeedRepost,
record: {
$type: ids.AppBskyFeedRepost,
subject: originalPostRef,
createdAt,
} as AppBskyFeedRepost.Record,
})
// reply, like, and repost indexed orior to the original post
await network.bsky.sub.indexingSvc.indexRecord(...reply)
await network.bsky.sub.indexingSvc.indexRecord(...like)
await network.bsky.sub.indexingSvc.indexRecord(...repost)
await network.bsky.sub.indexingSvc.indexRecord(...originalPost)
await network.bsky.sub.background.processAll()
const agg = await db.db
.selectFrom('post_agg')
.selectAll()
.where('uri', '=', originalPostRef.uri)
.executeTakeFirst()
expect(agg).toEqual({
uri: originalPostRef.uri,
replyCount: 1,
repostCount: 1,
likeCount: 1,
quoteCount: 0,
})
// Cleanup
const del = (uri: AtUri) => {
return prepareDelete({
did: uri.host,
collection: uri.collection,
rkey: uri.rkey,
})
}
await network.bsky.sub.indexingSvc.deleteRecord(...del(reply[0]))
await network.bsky.sub.indexingSvc.deleteRecord(...del(like[0]))
await network.bsky.sub.indexingSvc.deleteRecord(...del(repost[0]))
await network.bsky.sub.indexingSvc.deleteRecord(...del(originalPost[0]))
})
it('does not notify user of own like or repost', async () => {
const createdAt = new Date().toISOString()
const originalPost = await prepareCreate({
did: sc.dids.bob,
collection: ids.AppBskyFeedPost,
record: {
$type: ids.AppBskyFeedPost,
text: 'original post',
createdAt,
} as AppBskyFeedPost.Record,
})
const originalPostRef = {
uri: originalPost[0].toString(),
cid: originalPost[1].toString(),
}
// own actions
const ownLike = await prepareCreate({
did: sc.dids.bob,
collection: ids.AppBskyFeedLike,
record: {
$type: ids.AppBskyFeedLike,
subject: originalPostRef,
createdAt,
} as AppBskyFeedLike.Record,
})
const ownRepost = await prepareCreate({
did: sc.dids.bob,
collection: ids.AppBskyFeedRepost,
record: {
$type: ids.AppBskyFeedRepost,
subject: originalPostRef,
createdAt,
} as AppBskyFeedRepost.Record,
})
// other actions
const aliceLike = await prepareCreate({
did: sc.dids.alice,
collection: ids.AppBskyFeedLike,
record: {
$type: ids.AppBskyFeedLike,
subject: originalPostRef,
createdAt,
} as AppBskyFeedLike.Record,
})
const aliceRepost = await prepareCreate({
did: sc.dids.alice,
collection: ids.AppBskyFeedRepost,
record: {
$type: ids.AppBskyFeedRepost,
subject: originalPostRef,
createdAt,
} as AppBskyFeedRepost.Record,
})
await network.bsky.sub.indexingSvc.indexRecord(...originalPost)
await network.bsky.sub.indexingSvc.indexRecord(...ownLike)
await network.bsky.sub.indexingSvc.indexRecord(...ownRepost)
await network.bsky.sub.indexingSvc.indexRecord(...aliceLike)
await network.bsky.sub.indexingSvc.indexRecord(...aliceRepost)
await network.bsky.sub.background.processAll()
const {
data: { notifications },
} = await agent.api.app.bsky.notification.listNotifications(
{},
{
headers: await network.serviceHeaders(
sc.dids.bob,
ids.AppBskyNotificationListNotifications,
),
},
)
expect(notifications).toHaveLength(2)
expect(
notifications.every((n) => {
return n.author.did !== sc.dids.bob
}),
).toBeTruthy()
// Cleanup
const del = (uri: AtUri) => {
return prepareDelete({
did: uri.host,
collection: uri.collection,
rkey: uri.rkey,
})
}
await network.bsky.sub.indexingSvc.deleteRecord(...del(ownLike[0]))
await network.bsky.sub.indexingSvc.deleteRecord(...del(ownRepost[0]))
await network.bsky.sub.indexingSvc.deleteRecord(...del(aliceLike[0]))
await network.bsky.sub.indexingSvc.deleteRecord(...del(aliceRepost[0]))
await network.bsky.sub.indexingSvc.deleteRecord(...del(originalPost[0]))
})
it('handles profile aggregations out of order.', async () => {
const createdAt = new Date().toISOString()
const unknownDid = 'did:example:unknown'
const follow = await prepareCreate({
did: sc.dids.bob,
collection: ids.AppBskyGraphFollow,
record: {
$type: ids.AppBskyGraphFollow,
subject: unknownDid,
createdAt,
} as AppBskyGraphFollow.Record,
})
await network.bsky.sub.indexingSvc.indexRecord(...follow)
await network.bsky.sub.background.processAll()
const agg = await db.db
.selectFrom('profile_agg')
.select(['did', 'followersCount'])
.where('did', '=', unknownDid)
.executeTakeFirst()
expect(agg).toEqual({
did: unknownDid,
followersCount: 1,
})
// Cleanup
const del = (uri: AtUri) => {
return prepareDelete({
did: uri.host,
collection: uri.collection,
rkey: uri.rkey,
})
}
await network.bsky.sub.indexingSvc.deleteRecord(...del(follow[0]))
})
describe('indexRepo', () => {
beforeAll(async () => {
await network.bsky.sub.restart()
await basicSeed(sc, false)
await network.processAll()
await network.bsky.sub.destroy()
await network.bsky.sub.background.processAll()
})
it('preserves indexes when no record changes.', async () => {
// Mark originals
const { data: origProfile } = await agent.api.app.bsky.actor.getProfile(
{ actor: sc.dids.alice },
{
headers: await network.serviceHeaders(
sc.dids.alice,
ids.AppBskyActorGetProfile,
),
},
)
const { data: origFeed } = await agent.api.app.bsky.feed.getAuthorFeed(
{ actor: sc.dids.alice },
{
headers: await network.serviceHeaders(
sc.dids.alice,
ids.AppBskyFeedGetAuthorFeed,
),
},
)
const { data: origFollows } = await agent.api.app.bsky.graph.getFollows(
{ actor: sc.dids.alice },
{
headers: await network.serviceHeaders(
sc.dids.alice,
ids.AppBskyGraphGetFollows,
),
},
)
// Index
const { data: commit } =
await pdsAgent.api.com.atproto.sync.getLatestCommit({
did: sc.dids.alice,
})
await network.bsky.sub.indexingSvc.indexRepo(sc.dids.alice, commit.cid)
await network.bsky.sub.background.processAll()
// Check
const { data: profile } = await agent.api.app.bsky.actor.getProfile(
{ actor: sc.dids.alice },
{
headers: await network.serviceHeaders(
sc.dids.alice,
ids.AppBskyActorGetProfile,
),
},
)
const { data: feed } = await agent.api.app.bsky.feed.getAuthorFeed(
{ actor: sc.dids.alice },
{
headers: await network.serviceHeaders(
sc.dids.alice,
ids.AppBskyFeedGetAuthorFeed,
),
},
)
const { data: follows } = await agent.api.app.bsky.graph.getFollows(
{ actor: sc.dids.alice },
{
headers: await network.serviceHeaders(
sc.dids.alice,
ids.AppBskyGraphGetFollows,
),
},
)
expect(forSnapshot([origProfile, origFeed, origFollows])).toEqual(
forSnapshot([profile, feed, follows]),
)
})
it('updates indexes when records change.', async () => {
// Update profile
await pdsAgent.api.com.atproto.repo.putRecord(
{
repo: sc.dids.alice,
collection: ids.AppBskyActorProfile,
rkey: 'self',
record: { description: 'freshening things up' },
},
{ headers: sc.getHeaders(sc.dids.alice), encoding: 'application/json' },
)
// Add post
const newPost = await sc.post(sc.dids.alice, 'fresh post!')
// Remove a follow
const removedFollow = sc.follows[sc.dids.alice][sc.dids.carol]
await pdsAgent.api.app.bsky.graph.follow.delete(
{ repo: sc.dids.alice, rkey: removedFollow.uri.rkey },
sc.getHeaders(sc.dids.alice),
)
// Index
const { data: commit } =
await pdsAgent.api.com.atproto.sync.getLatestCommit({
did: sc.dids.alice,
})
await network.bsky.sub.indexingSvc.indexRepo(sc.dids.alice, commit.cid)
await network.bsky.sub.background.processAll()
// Check
const { data: profile } = await agent.api.app.bsky.actor.getProfile(
{ actor: sc.dids.alice },
{
headers: await network.serviceHeaders(
sc.dids.alice,
ids.AppBskyActorGetProfile,
),
},
)
const { data: feed } = await agent.api.app.bsky.feed.getAuthorFeed(
{ actor: sc.dids.alice },
{
headers: await network.serviceHeaders(
sc.dids.alice,
ids.AppBskyFeedGetAuthorFeed,
),
},
)
const { data: follows } = await agent.api.app.bsky.graph.getFollows(
{ actor: sc.dids.alice },
{
headers: await network.serviceHeaders(
sc.dids.alice,
ids.AppBskyGraphGetFollows,
),
},
)
expect(profile.description).toEqual('freshening things up')
expect(feed.feed[0].post.uri).toEqual(newPost.ref.uriStr)
expect(feed.feed[0].post.cid).toEqual(newPost.ref.cidStr)
expect(follows.follows.map(({ did }) => did)).not.toContain(sc.dids.carol)
expect(forSnapshot([profile, feed, follows])).toMatchSnapshot()
})
it('skips invalid records.', async () => {
const { accountManager } = network.pds.ctx
// const { db: pdsDb, services: pdsServices } = network.pds.ctx
// Create a good and a bad post record
const writes = await Promise.all([
repoPrepare.prepareCreate({
did: sc.dids.alice,
collection: ids.AppBskyFeedPost,
record: { text: 'valid', createdAt: new Date().toISOString() },
}),
repoPrepare.prepareCreate({
did: sc.dids.alice,
collection: ids.AppBskyFeedPost,
record: { text: 0 },
validate: false,
}),
])
const writeCommit = await network.pds.ctx.actorStore.transact(
sc.dids.alice,
(store) => store.repo.processWrites(writes),
)
await accountManager.updateRepoRoot(
sc.dids.alice,
writeCommit.cid,
writeCommit.rev,
)
await network.pds.ctx.sequencer.sequenceCommit(
sc.dids.alice,
writeCommit,
writes,
)
// Index
const { data: commit } =
await pdsAgent.api.com.atproto.sync.getLatestCommit({
did: sc.dids.alice,
})
await network.bsky.sub.indexingSvc.indexRepo(sc.dids.alice, commit.cid)
// Check
const getGoodPost = agent.api.app.bsky.feed.getPostThread(
{ uri: writes[0].uri.toString(), depth: 0 },
{
headers: await network.serviceHeaders(
sc.dids.alice,
ids.AppBskyFeedGetPostThread,
),
},
)
await expect(getGoodPost).resolves.toBeDefined()
const getBadPost = agent.api.app.bsky.feed.getPostThread(
{ uri: writes[1].uri.toString(), depth: 0 },
{
headers: await network.serviceHeaders(
sc.dids.alice,
ids.AppBskyFeedGetPostThread,
),
},
)
await expect(getBadPost).rejects.toThrow('Post not found')
})
})
describe('indexHandle', () => {
const getIndexedHandle = async (did) => {
const res = await agent.api.app.bsky.actor.getProfile(
{ actor: did },
{
headers: await network.serviceHeaders(
sc.dids.alice,
ids.AppBskyActorGetProfile,
),
},
)
return res.data.handle
}
it('indexes handle for a fresh did', async () => {
const now = new Date().toISOString()
const sessionAgent = new AtpAgent({ service: network.pds.url })
const {
data: { did },
} = await sessionAgent.createAccount({
email: 'did1@test.com',
handle: 'did1.test',
password: 'password',
})
await expect(getIndexedHandle(did)).rejects.toThrow('Profile not found')
await network.bsky.sub.indexingSvc.indexHandle(did, now)
await expect(getIndexedHandle(did)).resolves.toEqual('did1.test')
})
it('reindexes handle for existing did when forced', async () => {
const now = new Date().toISOString()
const sessionAgent = network.pds.getClient()
const {
data: { did },
} = await sessionAgent.createAccount({
email: 'did2@test.com',
handle: 'did2.test',
password: 'password',
})
await network.bsky.sub.indexingSvc.indexHandle(did, now)
await expect(getIndexedHandle(did)).resolves.toEqual('did2.test')
await sessionAgent.com.atproto.identity.updateHandle({
handle: 'did2-updated.test',
})
await network.bsky.sub.indexingSvc.indexHandle(did, now)
await expect(getIndexedHandle(did)).resolves.toEqual('did2.test') // Didn't update, not forced
await network.bsky.sub.indexingSvc.indexHandle(did, now, true)
await expect(getIndexedHandle(did)).resolves.toEqual('did2-updated.test')
})
it('handles profile aggregations out of order', async () => {
const now = new Date().toISOString()
const agent = network.pds.getClient()
await agent.createAccount({
email: 'did3@test.com',
handle: 'did3.test',
password: 'password',
})
const did = agent.accountDid
const follow = await prepareCreate({
did: sc.dids.bob,
collection: ids.AppBskyGraphFollow,
record: {
$type: ids.AppBskyGraphFollow,
subject: did,
createdAt: now,
} as AppBskyGraphFollow.Record,
})
await network.bsky.sub.indexingSvc.indexRecord(...follow)
await network.bsky.sub.indexingSvc.indexHandle(did, now)
await network.bsky.sub.background.processAll()
const agg = await db.db
.selectFrom('profile_agg')
.select(['did', 'followersCount'])
.where('did', '=', did)
.executeTakeFirst()
expect(agg).toEqual({
did,
followersCount: 1,
})
})
})
describe('deleteActor', () => {
it('does not unindex actor when they are still being hosted by their pds', async () => {
const { data: profileBefore } = await agent.api.app.bsky.actor.getProfile(
{ actor: sc.dids.alice },
{
headers: await network.serviceHeaders(
sc.dids.bob,
ids.AppBskyActorGetProfile,
),
},
)
// Attempt indexing tombstone
await network.bsky.sub.indexingSvc.deleteActor(sc.dids.alice)
const { data: profileAfter } = await agent.api.app.bsky.actor.getProfile(
{ actor: sc.dids.alice },
{
headers: await network.serviceHeaders(
sc.dids.bob,
ids.AppBskyActorGetProfile,
),
},
)
expect(profileAfter).toEqual(profileBefore)
})
it('unindexes actor when they are no longer hosted by their pds', async () => {
const { alice } = sc.dids
const getProfileBefore = agent.api.app.bsky.actor.getProfile(
{ actor: alice },
{
headers: await network.serviceHeaders(
sc.dids.bob,
ids.AppBskyActorGetProfile,
),
},
)
await expect(getProfileBefore).resolves.toBeDefined()
// Delete account on pds
const token = await network.pds.ctx.accountManager.createEmailToken(
alice,
'delete_account',
)
await pdsAgent.api.com.atproto.server.deleteAccount({
token,
did: alice,
password: sc.accounts[alice].password,
})
await network.pds.ctx.backgroundQueue.processAll()
// Index tombstone
await network.bsky.sub.indexingSvc.deleteActor(alice)
const getProfileAfter = agent.api.app.bsky.actor.getProfile(
{ actor: alice },
{
headers: await network.serviceHeaders(
sc.dids.bob,
ids.AppBskyActorGetProfile,
),
},
)
await expect(getProfileAfter).rejects.toThrow('Profile not found')
})
})
async function getNotifications(db: Database, uri: AtUri) {
return await db.db
.selectFrom('notification')
.selectAll()
.select(sql`0`.as('id')) // Ignore notification ids in comparisons
.where('recordUri', '=', uri.toString())
.orderBy('sortAt')
.execute()
}
})
async function prepareCreate(opts: {
did: string
collection: string
rkey?: string
record: unknown
timestamp?: string
}): Promise<[AtUri, CID, unknown, WriteOpAction.Create, string]> {
const rkey = opts.rkey ?? TID.nextStr()
return [
AtUri.make(opts.did, opts.collection, rkey),
await cidForCbor(opts.record),
opts.record,
WriteOpAction.Create,
opts.timestamp ?? new Date().toISOString(),
]
}
async function prepareUpdate(opts: {
did: string
collection: string
rkey: string
record: unknown
timestamp?: string
}): Promise<[AtUri, CID, unknown, WriteOpAction.Update, string]> {
return [
AtUri.make(opts.did, opts.collection, opts.rkey),
await cidForCbor(opts.record),
opts.record,
WriteOpAction.Update,
opts.timestamp ?? new Date().toISOString(),
]
}
function prepareDelete(opts: {
did: string
collection: string
rkey: string
}): [AtUri] {
return [AtUri.make(opts.did, opts.collection, opts.rkey)]
}