b15dec2f4f
* first pass/port * reworking * authenticated commit parsing * authenticate identity evts * some testing * tidy & add firehose to queue * error handling * fix test * refactor sync queue + some tests * fix race in sync queue * rm firehose from syncqueue * add tests for queue utils * README * lint readme * filter before parsing * pr feedback * small fix * changesets * fix type * Rework dataplane subscription (#2766) * working sync package into appview subscription * add restart method to subscription for tests * fix another test * tidy subscription utils/files * remove dupe property * tidy after merge * fix start cursor on subscription * tweak process full subscription logic * fixes
827 lines
25 KiB
TypeScript
827 lines
25 KiB
TypeScript
import { sql } from 'kysely'
|
|
import { CID } from 'multiformats/cid'
|
|
import { cidForCbor, TID } from '@atproto/common'
|
|
import { repoPrepare } from '@atproto/pds'
|
|
import { WriteOpAction } from '@atproto/repo'
|
|
import { AtUri } from '@atproto/syntax'
|
|
import {
|
|
AppBskyActorProfile,
|
|
AppBskyFeedPost,
|
|
AppBskyFeedLike,
|
|
AppBskyFeedRepost,
|
|
AppBskyGraphFollow,
|
|
AtpAgent,
|
|
} from '@atproto/api'
|
|
import { TestNetwork, SeedClient, usersSeed, basicSeed } from '@atproto/dev-env'
|
|
import { forSnapshot } from '../_util'
|
|
import { ids } from '../../src/lexicon/lexicons'
|
|
import { Database } from '../../src/data-plane/server/db'
|
|
|
|
describe('indexing', () => {
|
|
let network: TestNetwork
|
|
let agent: AtpAgent
|
|
let pdsAgent: AtpAgent
|
|
let sc: SeedClient
|
|
let db: Database
|
|
|
|
beforeAll(async () => {
|
|
network = await TestNetwork.create({
|
|
dbPostgresSchema: 'bsky_indexing',
|
|
})
|
|
agent = network.bsky.getClient()
|
|
pdsAgent = network.pds.getClient()
|
|
sc = network.getSeedClient()
|
|
db = network.bsky.db
|
|
await usersSeed(sc)
|
|
// Data in tests is not processed from subscription
|
|
await network.processAll()
|
|
await network.bsky.sub.destroy()
|
|
})
|
|
|
|
afterAll(async () => {
|
|
await network.close()
|
|
})
|
|
|
|
it('indexes posts.', async () => {
|
|
const createdAt = new Date().toISOString()
|
|
const createRecord = await prepareCreate({
|
|
did: sc.dids.alice,
|
|
collection: ids.AppBskyFeedPost,
|
|
record: {
|
|
$type: ids.AppBskyFeedPost,
|
|
text: '@bob.test how are you?',
|
|
facets: [
|
|
{
|
|
index: { byteStart: 0, byteEnd: 9 },
|
|
features: [
|
|
{
|
|
$type: `${ids.AppBskyRichtextFacet}#mention`,
|
|
did: sc.dids.bob,
|
|
},
|
|
],
|
|
},
|
|
],
|
|
createdAt,
|
|
} as AppBskyFeedPost.Record,
|
|
})
|
|
const [uri] = createRecord
|
|
const updateRecord = await prepareUpdate({
|
|
did: sc.dids.alice,
|
|
collection: ids.AppBskyFeedPost,
|
|
rkey: uri.rkey,
|
|
record: {
|
|
$type: ids.AppBskyFeedPost,
|
|
text: '@carol.test how are you?',
|
|
facets: [
|
|
{
|
|
index: { byteStart: 0, byteEnd: 11 },
|
|
features: [
|
|
{
|
|
$type: `${ids.AppBskyRichtextFacet}#mention`,
|
|
did: sc.dids.carol,
|
|
},
|
|
],
|
|
},
|
|
],
|
|
createdAt,
|
|
} as AppBskyFeedPost.Record,
|
|
})
|
|
const deleteRecord = prepareDelete({
|
|
did: sc.dids.alice,
|
|
collection: ids.AppBskyFeedPost,
|
|
rkey: uri.rkey,
|
|
})
|
|
|
|
// Create
|
|
await network.bsky.sub.indexingSvc.indexRecord(...createRecord)
|
|
|
|
const getAfterCreate = await agent.api.app.bsky.feed.getPostThread(
|
|
{ uri: uri.toString() },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.alice,
|
|
ids.AppBskyFeedGetPostThread,
|
|
),
|
|
},
|
|
)
|
|
expect(forSnapshot(getAfterCreate.data)).toMatchSnapshot()
|
|
const createNotifications = await getNotifications(db, uri)
|
|
|
|
// Update
|
|
await network.bsky.sub.indexingSvc.indexRecord(...updateRecord)
|
|
|
|
const getAfterUpdate = await agent.api.app.bsky.feed.getPostThread(
|
|
{ uri: uri.toString() },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.alice,
|
|
ids.AppBskyFeedGetPostThread,
|
|
),
|
|
},
|
|
)
|
|
expect(forSnapshot(getAfterUpdate.data)).toMatchSnapshot()
|
|
const updateNotifications = await getNotifications(db, uri)
|
|
|
|
// Delete
|
|
await network.bsky.sub.indexingSvc.deleteRecord(...deleteRecord)
|
|
|
|
const getAfterDelete = agent.api.app.bsky.feed.getPostThread(
|
|
{ uri: uri.toString() },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.alice,
|
|
ids.AppBskyFeedGetPostThread,
|
|
),
|
|
},
|
|
)
|
|
await expect(getAfterDelete).rejects.toThrow(/Post not found:/)
|
|
const deleteNotifications = await getNotifications(db, uri)
|
|
|
|
expect(
|
|
forSnapshot({
|
|
createNotifications,
|
|
updateNotifications,
|
|
deleteNotifications,
|
|
}),
|
|
).toMatchSnapshot()
|
|
})
|
|
|
|
it('indexes profiles.', async () => {
|
|
const createRecord = await prepareCreate({
|
|
did: sc.dids.dan,
|
|
collection: ids.AppBskyActorProfile,
|
|
rkey: 'self',
|
|
record: {
|
|
$type: ids.AppBskyActorProfile,
|
|
displayName: 'dan',
|
|
} as AppBskyActorProfile.Record,
|
|
})
|
|
const [uri] = createRecord
|
|
const updateRecord = await prepareUpdate({
|
|
did: sc.dids.dan,
|
|
collection: ids.AppBskyActorProfile,
|
|
rkey: uri.rkey,
|
|
record: {
|
|
$type: ids.AppBskyActorProfile,
|
|
displayName: 'danny',
|
|
} as AppBskyActorProfile.Record,
|
|
})
|
|
const deleteRecord = prepareDelete({
|
|
did: sc.dids.dan,
|
|
collection: ids.AppBskyActorProfile,
|
|
rkey: uri.rkey,
|
|
})
|
|
|
|
// Create
|
|
await network.bsky.sub.indexingSvc.indexRecord(...createRecord)
|
|
|
|
const getAfterCreate = await agent.api.app.bsky.actor.getProfile(
|
|
{ actor: sc.dids.dan },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.alice,
|
|
ids.AppBskyActorGetProfile,
|
|
),
|
|
},
|
|
)
|
|
expect(forSnapshot(getAfterCreate.data)).toMatchSnapshot()
|
|
|
|
// Update
|
|
await network.bsky.sub.indexingSvc.indexRecord(...updateRecord)
|
|
|
|
const getAfterUpdate = await agent.api.app.bsky.actor.getProfile(
|
|
{ actor: sc.dids.dan },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.alice,
|
|
ids.AppBskyActorGetProfile,
|
|
),
|
|
},
|
|
)
|
|
expect(forSnapshot(getAfterUpdate.data)).toMatchSnapshot()
|
|
|
|
// Delete
|
|
await network.bsky.sub.indexingSvc.deleteRecord(...deleteRecord)
|
|
|
|
const getAfterDelete = await agent.api.app.bsky.actor.getProfile(
|
|
{ actor: sc.dids.dan },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.alice,
|
|
ids.AppBskyActorGetProfile,
|
|
),
|
|
},
|
|
)
|
|
expect(forSnapshot(getAfterDelete.data)).toMatchSnapshot()
|
|
})
|
|
|
|
it('handles post aggregations out of order.', async () => {
|
|
const createdAt = new Date().toISOString()
|
|
const originalPost = await prepareCreate({
|
|
did: sc.dids.alice,
|
|
collection: ids.AppBskyFeedPost,
|
|
record: {
|
|
$type: ids.AppBskyFeedPost,
|
|
text: 'original post',
|
|
createdAt,
|
|
} as AppBskyFeedPost.Record,
|
|
})
|
|
const originalPostRef = {
|
|
uri: originalPost[0].toString(),
|
|
cid: originalPost[1].toString(),
|
|
}
|
|
const reply = await prepareCreate({
|
|
did: sc.dids.bob,
|
|
collection: ids.AppBskyFeedPost,
|
|
record: {
|
|
$type: ids.AppBskyFeedPost,
|
|
text: 'reply post',
|
|
reply: {
|
|
root: originalPostRef,
|
|
parent: originalPostRef,
|
|
},
|
|
createdAt,
|
|
} as AppBskyFeedPost.Record,
|
|
})
|
|
const like = await prepareCreate({
|
|
did: sc.dids.bob,
|
|
collection: ids.AppBskyFeedLike,
|
|
record: {
|
|
$type: ids.AppBskyFeedLike,
|
|
subject: originalPostRef,
|
|
createdAt,
|
|
} as AppBskyFeedLike.Record,
|
|
})
|
|
const repost = await prepareCreate({
|
|
did: sc.dids.bob,
|
|
collection: ids.AppBskyFeedRepost,
|
|
record: {
|
|
$type: ids.AppBskyFeedRepost,
|
|
subject: originalPostRef,
|
|
createdAt,
|
|
} as AppBskyFeedRepost.Record,
|
|
})
|
|
// reply, like, and repost indexed orior to the original post
|
|
await network.bsky.sub.indexingSvc.indexRecord(...reply)
|
|
await network.bsky.sub.indexingSvc.indexRecord(...like)
|
|
await network.bsky.sub.indexingSvc.indexRecord(...repost)
|
|
await network.bsky.sub.indexingSvc.indexRecord(...originalPost)
|
|
await network.bsky.sub.background.processAll()
|
|
const agg = await db.db
|
|
.selectFrom('post_agg')
|
|
.selectAll()
|
|
.where('uri', '=', originalPostRef.uri)
|
|
.executeTakeFirst()
|
|
expect(agg).toEqual({
|
|
uri: originalPostRef.uri,
|
|
replyCount: 1,
|
|
repostCount: 1,
|
|
likeCount: 1,
|
|
quoteCount: 0,
|
|
})
|
|
// Cleanup
|
|
const del = (uri: AtUri) => {
|
|
return prepareDelete({
|
|
did: uri.host,
|
|
collection: uri.collection,
|
|
rkey: uri.rkey,
|
|
})
|
|
}
|
|
await network.bsky.sub.indexingSvc.deleteRecord(...del(reply[0]))
|
|
await network.bsky.sub.indexingSvc.deleteRecord(...del(like[0]))
|
|
await network.bsky.sub.indexingSvc.deleteRecord(...del(repost[0]))
|
|
await network.bsky.sub.indexingSvc.deleteRecord(...del(originalPost[0]))
|
|
})
|
|
|
|
it('does not notify user of own like or repost', async () => {
|
|
const createdAt = new Date().toISOString()
|
|
|
|
const originalPost = await prepareCreate({
|
|
did: sc.dids.bob,
|
|
collection: ids.AppBskyFeedPost,
|
|
record: {
|
|
$type: ids.AppBskyFeedPost,
|
|
text: 'original post',
|
|
createdAt,
|
|
} as AppBskyFeedPost.Record,
|
|
})
|
|
|
|
const originalPostRef = {
|
|
uri: originalPost[0].toString(),
|
|
cid: originalPost[1].toString(),
|
|
}
|
|
|
|
// own actions
|
|
const ownLike = await prepareCreate({
|
|
did: sc.dids.bob,
|
|
collection: ids.AppBskyFeedLike,
|
|
record: {
|
|
$type: ids.AppBskyFeedLike,
|
|
subject: originalPostRef,
|
|
createdAt,
|
|
} as AppBskyFeedLike.Record,
|
|
})
|
|
const ownRepost = await prepareCreate({
|
|
did: sc.dids.bob,
|
|
collection: ids.AppBskyFeedRepost,
|
|
record: {
|
|
$type: ids.AppBskyFeedRepost,
|
|
subject: originalPostRef,
|
|
createdAt,
|
|
} as AppBskyFeedRepost.Record,
|
|
})
|
|
|
|
// other actions
|
|
const aliceLike = await prepareCreate({
|
|
did: sc.dids.alice,
|
|
collection: ids.AppBskyFeedLike,
|
|
record: {
|
|
$type: ids.AppBskyFeedLike,
|
|
subject: originalPostRef,
|
|
createdAt,
|
|
} as AppBskyFeedLike.Record,
|
|
})
|
|
const aliceRepost = await prepareCreate({
|
|
did: sc.dids.alice,
|
|
collection: ids.AppBskyFeedRepost,
|
|
record: {
|
|
$type: ids.AppBskyFeedRepost,
|
|
subject: originalPostRef,
|
|
createdAt,
|
|
} as AppBskyFeedRepost.Record,
|
|
})
|
|
|
|
await network.bsky.sub.indexingSvc.indexRecord(...originalPost)
|
|
await network.bsky.sub.indexingSvc.indexRecord(...ownLike)
|
|
await network.bsky.sub.indexingSvc.indexRecord(...ownRepost)
|
|
await network.bsky.sub.indexingSvc.indexRecord(...aliceLike)
|
|
await network.bsky.sub.indexingSvc.indexRecord(...aliceRepost)
|
|
await network.bsky.sub.background.processAll()
|
|
|
|
const {
|
|
data: { notifications },
|
|
} = await agent.api.app.bsky.notification.listNotifications(
|
|
{},
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.bob,
|
|
ids.AppBskyNotificationListNotifications,
|
|
),
|
|
},
|
|
)
|
|
|
|
expect(notifications).toHaveLength(2)
|
|
expect(
|
|
notifications.every((n) => {
|
|
return n.author.did !== sc.dids.bob
|
|
}),
|
|
).toBeTruthy()
|
|
|
|
// Cleanup
|
|
const del = (uri: AtUri) => {
|
|
return prepareDelete({
|
|
did: uri.host,
|
|
collection: uri.collection,
|
|
rkey: uri.rkey,
|
|
})
|
|
}
|
|
|
|
await network.bsky.sub.indexingSvc.deleteRecord(...del(ownLike[0]))
|
|
await network.bsky.sub.indexingSvc.deleteRecord(...del(ownRepost[0]))
|
|
await network.bsky.sub.indexingSvc.deleteRecord(...del(aliceLike[0]))
|
|
await network.bsky.sub.indexingSvc.deleteRecord(...del(aliceRepost[0]))
|
|
await network.bsky.sub.indexingSvc.deleteRecord(...del(originalPost[0]))
|
|
})
|
|
|
|
it('handles profile aggregations out of order.', async () => {
|
|
const createdAt = new Date().toISOString()
|
|
const unknownDid = 'did:example:unknown'
|
|
const follow = await prepareCreate({
|
|
did: sc.dids.bob,
|
|
collection: ids.AppBskyGraphFollow,
|
|
record: {
|
|
$type: ids.AppBskyGraphFollow,
|
|
subject: unknownDid,
|
|
createdAt,
|
|
} as AppBskyGraphFollow.Record,
|
|
})
|
|
await network.bsky.sub.indexingSvc.indexRecord(...follow)
|
|
await network.bsky.sub.background.processAll()
|
|
const agg = await db.db
|
|
.selectFrom('profile_agg')
|
|
.select(['did', 'followersCount'])
|
|
.where('did', '=', unknownDid)
|
|
.executeTakeFirst()
|
|
expect(agg).toEqual({
|
|
did: unknownDid,
|
|
followersCount: 1,
|
|
})
|
|
// Cleanup
|
|
const del = (uri: AtUri) => {
|
|
return prepareDelete({
|
|
did: uri.host,
|
|
collection: uri.collection,
|
|
rkey: uri.rkey,
|
|
})
|
|
}
|
|
await network.bsky.sub.indexingSvc.deleteRecord(...del(follow[0]))
|
|
})
|
|
|
|
describe('indexRepo', () => {
|
|
beforeAll(async () => {
|
|
await network.bsky.sub.restart()
|
|
await basicSeed(sc, false)
|
|
await network.processAll()
|
|
await network.bsky.sub.destroy()
|
|
await network.bsky.sub.background.processAll()
|
|
})
|
|
|
|
it('preserves indexes when no record changes.', async () => {
|
|
// Mark originals
|
|
const { data: origProfile } = await agent.api.app.bsky.actor.getProfile(
|
|
{ actor: sc.dids.alice },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.alice,
|
|
ids.AppBskyActorGetProfile,
|
|
),
|
|
},
|
|
)
|
|
const { data: origFeed } = await agent.api.app.bsky.feed.getAuthorFeed(
|
|
{ actor: sc.dids.alice },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.alice,
|
|
ids.AppBskyFeedGetAuthorFeed,
|
|
),
|
|
},
|
|
)
|
|
const { data: origFollows } = await agent.api.app.bsky.graph.getFollows(
|
|
{ actor: sc.dids.alice },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.alice,
|
|
ids.AppBskyGraphGetFollows,
|
|
),
|
|
},
|
|
)
|
|
// Index
|
|
const { data: commit } =
|
|
await pdsAgent.api.com.atproto.sync.getLatestCommit({
|
|
did: sc.dids.alice,
|
|
})
|
|
await network.bsky.sub.indexingSvc.indexRepo(sc.dids.alice, commit.cid)
|
|
await network.bsky.sub.background.processAll()
|
|
// Check
|
|
const { data: profile } = await agent.api.app.bsky.actor.getProfile(
|
|
{ actor: sc.dids.alice },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.alice,
|
|
ids.AppBskyActorGetProfile,
|
|
),
|
|
},
|
|
)
|
|
const { data: feed } = await agent.api.app.bsky.feed.getAuthorFeed(
|
|
{ actor: sc.dids.alice },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.alice,
|
|
ids.AppBskyFeedGetAuthorFeed,
|
|
),
|
|
},
|
|
)
|
|
const { data: follows } = await agent.api.app.bsky.graph.getFollows(
|
|
{ actor: sc.dids.alice },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.alice,
|
|
ids.AppBskyGraphGetFollows,
|
|
),
|
|
},
|
|
)
|
|
expect(forSnapshot([origProfile, origFeed, origFollows])).toEqual(
|
|
forSnapshot([profile, feed, follows]),
|
|
)
|
|
})
|
|
|
|
it('updates indexes when records change.', async () => {
|
|
// Update profile
|
|
await pdsAgent.api.com.atproto.repo.putRecord(
|
|
{
|
|
repo: sc.dids.alice,
|
|
collection: ids.AppBskyActorProfile,
|
|
rkey: 'self',
|
|
record: { description: 'freshening things up' },
|
|
},
|
|
{ headers: sc.getHeaders(sc.dids.alice), encoding: 'application/json' },
|
|
)
|
|
// Add post
|
|
const newPost = await sc.post(sc.dids.alice, 'fresh post!')
|
|
// Remove a follow
|
|
const removedFollow = sc.follows[sc.dids.alice][sc.dids.carol]
|
|
await pdsAgent.api.app.bsky.graph.follow.delete(
|
|
{ repo: sc.dids.alice, rkey: removedFollow.uri.rkey },
|
|
sc.getHeaders(sc.dids.alice),
|
|
)
|
|
// Index
|
|
const { data: commit } =
|
|
await pdsAgent.api.com.atproto.sync.getLatestCommit({
|
|
did: sc.dids.alice,
|
|
})
|
|
await network.bsky.sub.indexingSvc.indexRepo(sc.dids.alice, commit.cid)
|
|
await network.bsky.sub.background.processAll()
|
|
// Check
|
|
const { data: profile } = await agent.api.app.bsky.actor.getProfile(
|
|
{ actor: sc.dids.alice },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.alice,
|
|
ids.AppBskyActorGetProfile,
|
|
),
|
|
},
|
|
)
|
|
const { data: feed } = await agent.api.app.bsky.feed.getAuthorFeed(
|
|
{ actor: sc.dids.alice },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.alice,
|
|
ids.AppBskyFeedGetAuthorFeed,
|
|
),
|
|
},
|
|
)
|
|
const { data: follows } = await agent.api.app.bsky.graph.getFollows(
|
|
{ actor: sc.dids.alice },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.alice,
|
|
ids.AppBskyGraphGetFollows,
|
|
),
|
|
},
|
|
)
|
|
expect(profile.description).toEqual('freshening things up')
|
|
expect(feed.feed[0].post.uri).toEqual(newPost.ref.uriStr)
|
|
expect(feed.feed[0].post.cid).toEqual(newPost.ref.cidStr)
|
|
expect(follows.follows.map(({ did }) => did)).not.toContain(sc.dids.carol)
|
|
expect(forSnapshot([profile, feed, follows])).toMatchSnapshot()
|
|
})
|
|
|
|
it('skips invalid records.', async () => {
|
|
const { accountManager } = network.pds.ctx
|
|
// const { db: pdsDb, services: pdsServices } = network.pds.ctx
|
|
// Create a good and a bad post record
|
|
const writes = await Promise.all([
|
|
repoPrepare.prepareCreate({
|
|
did: sc.dids.alice,
|
|
collection: ids.AppBskyFeedPost,
|
|
record: { text: 'valid', createdAt: new Date().toISOString() },
|
|
}),
|
|
repoPrepare.prepareCreate({
|
|
did: sc.dids.alice,
|
|
collection: ids.AppBskyFeedPost,
|
|
record: { text: 0 },
|
|
validate: false,
|
|
}),
|
|
])
|
|
const writeCommit = await network.pds.ctx.actorStore.transact(
|
|
sc.dids.alice,
|
|
(store) => store.repo.processWrites(writes),
|
|
)
|
|
await accountManager.updateRepoRoot(
|
|
sc.dids.alice,
|
|
writeCommit.cid,
|
|
writeCommit.rev,
|
|
)
|
|
await network.pds.ctx.sequencer.sequenceCommit(
|
|
sc.dids.alice,
|
|
writeCommit,
|
|
writes,
|
|
)
|
|
// Index
|
|
const { data: commit } =
|
|
await pdsAgent.api.com.atproto.sync.getLatestCommit({
|
|
did: sc.dids.alice,
|
|
})
|
|
await network.bsky.sub.indexingSvc.indexRepo(sc.dids.alice, commit.cid)
|
|
// Check
|
|
const getGoodPost = agent.api.app.bsky.feed.getPostThread(
|
|
{ uri: writes[0].uri.toString(), depth: 0 },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.alice,
|
|
ids.AppBskyFeedGetPostThread,
|
|
),
|
|
},
|
|
)
|
|
await expect(getGoodPost).resolves.toBeDefined()
|
|
const getBadPost = agent.api.app.bsky.feed.getPostThread(
|
|
{ uri: writes[1].uri.toString(), depth: 0 },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.alice,
|
|
ids.AppBskyFeedGetPostThread,
|
|
),
|
|
},
|
|
)
|
|
await expect(getBadPost).rejects.toThrow('Post not found')
|
|
})
|
|
})
|
|
|
|
describe('indexHandle', () => {
|
|
const getIndexedHandle = async (did) => {
|
|
const res = await agent.api.app.bsky.actor.getProfile(
|
|
{ actor: did },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.alice,
|
|
ids.AppBskyActorGetProfile,
|
|
),
|
|
},
|
|
)
|
|
return res.data.handle
|
|
}
|
|
|
|
it('indexes handle for a fresh did', async () => {
|
|
const now = new Date().toISOString()
|
|
const sessionAgent = new AtpAgent({ service: network.pds.url })
|
|
const {
|
|
data: { did },
|
|
} = await sessionAgent.createAccount({
|
|
email: 'did1@test.com',
|
|
handle: 'did1.test',
|
|
password: 'password',
|
|
})
|
|
await expect(getIndexedHandle(did)).rejects.toThrow('Profile not found')
|
|
await network.bsky.sub.indexingSvc.indexHandle(did, now)
|
|
await expect(getIndexedHandle(did)).resolves.toEqual('did1.test')
|
|
})
|
|
|
|
it('reindexes handle for existing did when forced', async () => {
|
|
const now = new Date().toISOString()
|
|
const sessionAgent = network.pds.getClient()
|
|
const {
|
|
data: { did },
|
|
} = await sessionAgent.createAccount({
|
|
email: 'did2@test.com',
|
|
handle: 'did2.test',
|
|
password: 'password',
|
|
})
|
|
await network.bsky.sub.indexingSvc.indexHandle(did, now)
|
|
await expect(getIndexedHandle(did)).resolves.toEqual('did2.test')
|
|
await sessionAgent.com.atproto.identity.updateHandle({
|
|
handle: 'did2-updated.test',
|
|
})
|
|
await network.bsky.sub.indexingSvc.indexHandle(did, now)
|
|
await expect(getIndexedHandle(did)).resolves.toEqual('did2.test') // Didn't update, not forced
|
|
await network.bsky.sub.indexingSvc.indexHandle(did, now, true)
|
|
await expect(getIndexedHandle(did)).resolves.toEqual('did2-updated.test')
|
|
})
|
|
|
|
it('handles profile aggregations out of order', async () => {
|
|
const now = new Date().toISOString()
|
|
const agent = network.pds.getClient()
|
|
await agent.createAccount({
|
|
email: 'did3@test.com',
|
|
handle: 'did3.test',
|
|
password: 'password',
|
|
})
|
|
const did = agent.accountDid
|
|
const follow = await prepareCreate({
|
|
did: sc.dids.bob,
|
|
collection: ids.AppBskyGraphFollow,
|
|
record: {
|
|
$type: ids.AppBskyGraphFollow,
|
|
subject: did,
|
|
createdAt: now,
|
|
} as AppBskyGraphFollow.Record,
|
|
})
|
|
await network.bsky.sub.indexingSvc.indexRecord(...follow)
|
|
await network.bsky.sub.indexingSvc.indexHandle(did, now)
|
|
await network.bsky.sub.background.processAll()
|
|
const agg = await db.db
|
|
.selectFrom('profile_agg')
|
|
.select(['did', 'followersCount'])
|
|
.where('did', '=', did)
|
|
.executeTakeFirst()
|
|
expect(agg).toEqual({
|
|
did,
|
|
followersCount: 1,
|
|
})
|
|
})
|
|
})
|
|
|
|
describe('deleteActor', () => {
|
|
it('does not unindex actor when they are still being hosted by their pds', async () => {
|
|
const { data: profileBefore } = await agent.api.app.bsky.actor.getProfile(
|
|
{ actor: sc.dids.alice },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.bob,
|
|
ids.AppBskyActorGetProfile,
|
|
),
|
|
},
|
|
)
|
|
// Attempt indexing tombstone
|
|
await network.bsky.sub.indexingSvc.deleteActor(sc.dids.alice)
|
|
const { data: profileAfter } = await agent.api.app.bsky.actor.getProfile(
|
|
{ actor: sc.dids.alice },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.bob,
|
|
ids.AppBskyActorGetProfile,
|
|
),
|
|
},
|
|
)
|
|
expect(profileAfter).toEqual(profileBefore)
|
|
})
|
|
|
|
it('unindexes actor when they are no longer hosted by their pds', async () => {
|
|
const { alice } = sc.dids
|
|
const getProfileBefore = agent.api.app.bsky.actor.getProfile(
|
|
{ actor: alice },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.bob,
|
|
ids.AppBskyActorGetProfile,
|
|
),
|
|
},
|
|
)
|
|
await expect(getProfileBefore).resolves.toBeDefined()
|
|
// Delete account on pds
|
|
const token = await network.pds.ctx.accountManager.createEmailToken(
|
|
alice,
|
|
'delete_account',
|
|
)
|
|
await pdsAgent.api.com.atproto.server.deleteAccount({
|
|
token,
|
|
did: alice,
|
|
password: sc.accounts[alice].password,
|
|
})
|
|
await network.pds.ctx.backgroundQueue.processAll()
|
|
// Index tombstone
|
|
await network.bsky.sub.indexingSvc.deleteActor(alice)
|
|
const getProfileAfter = agent.api.app.bsky.actor.getProfile(
|
|
{ actor: alice },
|
|
{
|
|
headers: await network.serviceHeaders(
|
|
sc.dids.bob,
|
|
ids.AppBskyActorGetProfile,
|
|
),
|
|
},
|
|
)
|
|
await expect(getProfileAfter).rejects.toThrow('Profile not found')
|
|
})
|
|
})
|
|
|
|
async function getNotifications(db: Database, uri: AtUri) {
|
|
return await db.db
|
|
.selectFrom('notification')
|
|
.selectAll()
|
|
.select(sql`0`.as('id')) // Ignore notification ids in comparisons
|
|
.where('recordUri', '=', uri.toString())
|
|
.orderBy('sortAt')
|
|
.execute()
|
|
}
|
|
})
|
|
|
|
async function prepareCreate(opts: {
|
|
did: string
|
|
collection: string
|
|
rkey?: string
|
|
record: unknown
|
|
timestamp?: string
|
|
}): Promise<[AtUri, CID, unknown, WriteOpAction.Create, string]> {
|
|
const rkey = opts.rkey ?? TID.nextStr()
|
|
return [
|
|
AtUri.make(opts.did, opts.collection, rkey),
|
|
await cidForCbor(opts.record),
|
|
opts.record,
|
|
WriteOpAction.Create,
|
|
opts.timestamp ?? new Date().toISOString(),
|
|
]
|
|
}
|
|
|
|
async function prepareUpdate(opts: {
|
|
did: string
|
|
collection: string
|
|
rkey: string
|
|
record: unknown
|
|
timestamp?: string
|
|
}): Promise<[AtUri, CID, unknown, WriteOpAction.Update, string]> {
|
|
return [
|
|
AtUri.make(opts.did, opts.collection, opts.rkey),
|
|
await cidForCbor(opts.record),
|
|
opts.record,
|
|
WriteOpAction.Update,
|
|
opts.timestamp ?? new Date().toISOString(),
|
|
]
|
|
}
|
|
|
|
function prepareDelete(opts: {
|
|
did: string
|
|
collection: string
|
|
rkey: string
|
|
}): [AtUri] {
|
|
return [AtUri.make(opts.did, opts.collection, opts.rkey)]
|
|
}
|