atproto/packages/pds/tests/sync/subscribe-repos.test.ts
Matthieu Sieben b934b396b1
Client SDK rework (#2483)
* feat(api): support creation of oauth based AtpAgents

* oauth: misc fixes for confidential clients

* fix(xprc): remove ReadableStream.from polyfill

* OAuth docs tweaks (#2679)

* OAuth: clarification about client_name being shown

* OAuth: re-write handle resolution privacy concern

* avoid relying on ReadableStream.from in xrpc-server tests

* feat(oauth-types): expose "ALLOW_UNSECURE_ORIGINS" constant

* feat(handle-resolver): expose "AtprotoIdentityDidMethods" type

* fix(oauth-client): ensure that the oauth metadata document contains client_id_metadata_document_supported

* fix(oauth-types): prevent unknown query string in loopback client id

* fix(identity-resolver): check that handle is in did doc's "alsoKnownAs"

* feat(oauth-client:oauth-resolver): allow logging in using either the PDS URL or Entryway URL

* fix(oauth-client): return better error in case of invalid "oauth-protected-resource" status code

* refactor(did): group atproto specific checks in own

* feat(api): relax typing of "appLabelers" and "labelers" AtpClient properties

* allow any did as labeller (for tests mainly)

* fix(api): allow to override "atproto-proxy" on a per-request basis

* remove release candidate versions from changelog

* update changeset for api and xrpc packages

* Add missing changeset

* revert RC versions

* Proper wording in OAUTH.md api example

* remove "pre" changeset file

* xrpc: restore original behavior of setHEader and unsetHeader

* docs: add comment for XrpcClient 's constructor arg

* feat(api): expose "schemas" publicly

* feat(api): allow customizing the whatwg fetch function of the AtpAgent

* docs(api): improve migration docs

* docs: change reference to BskyAgent to AtpAgent

* docs: mention the breaking change regarding setSessionPersistHandler

* fix(api): better split AtpClient concerns

* fix(xrpc): remove unused import

* refactor(api): simplify class hierarchu by removeing AtpClient

* fix(api): mock proper method for facets detection

* restore ability to restore session asynchronously

* feat(api): allow instantiating Agent with same argument as super class

* docs(api): properly extend Agent class

* style(xrpc): var name

* docs(api): remove "async" to header getter

---------

Co-authored-by: Devin Ivy <devinivy@gmail.com>
Co-authored-by: bnewbold <bnewbold@robocracy.org>
Co-authored-by: Hailey <me@haileyok.com>
2024-08-12 19:57:21 +02:00

616 lines
18 KiB
TypeScript

import { TestNetworkNoAppView, SeedClient } from '@atproto/dev-env'
import { AtpAgent } from '@atproto/api'
import {
cborDecode,
HOUR,
MINUTE,
readFromGenerator,
wait,
} from '@atproto/common'
import { randomStr } from '@atproto/crypto'
import * as repo from '@atproto/repo'
import { readCar } from '@atproto/repo'
import { byFrame, ErrorFrame, Frame, MessageFrame } from '@atproto/xrpc-server'
import { WebSocket } from 'ws'
import {
Commit as CommitEvt,
Handle as HandleEvt,
Tombstone as TombstoneEvt,
Account as AccountEvt,
Identity as IdentityEvt,
} from '../../src/lexicon/types/com/atproto/sync/subscribeRepos'
import { AppContext } from '../../src'
import basicSeed from '../seeds/basic'
import { CID } from 'multiformats/cid'
import { AccountStatus } from '../../src/account-manager'
describe('repo subscribe repos', () => {
let serverHost: string
let network: TestNetworkNoAppView
let ctx: AppContext
let agent: AtpAgent
let sc: SeedClient
let alice: string
let bob: string
let carol: string
let dan: string
beforeAll(async () => {
network = await TestNetworkNoAppView.create({
dbPostgresSchema: 'repo_subscribe_repos',
pds: {
repoBackfillLimitMs: HOUR,
},
})
serverHost = network.pds.url.replace('http://', '')
// @ts-expect-error Error due to circular dependency with the dev-env package
ctx = network.pds.ctx
agent = network.pds.getClient()
sc = network.getSeedClient()
await basicSeed(sc)
alice = sc.dids.alice
bob = sc.dids.bob
carol = sc.dids.carol
dan = sc.dids.dan
})
afterAll(async () => {
await network.close()
})
const getRepo = async (did: string): Promise<repo.VerifiedRepo> => {
const carRes = await agent.api.com.atproto.sync.getRepo({ did })
const car = await repo.readCarWithRoot(carRes.data)
const signingKey = await network.pds.ctx.actorStore.keypair(did)
return repo.verifyRepo(car.blocks, car.root, did, signingKey.did())
}
const getAllEvents = (userDid: string, frames: Frame[]) => {
const types: unknown[] = []
for (const frame of frames) {
if (frame instanceof MessageFrame) {
if (
(frame.header.t === '#commit' &&
(frame.body as CommitEvt).repo === userDid) ||
(frame.header.t === '#handle' &&
(frame.body as HandleEvt).did === userDid) ||
(frame.header.t === '#tombstone' &&
(frame.body as TombstoneEvt).did === userDid)
) {
types.push(frame.body)
}
}
}
return types
}
const getEventType = <T>(frames: Frame[], type: string): T[] => {
const evts: T[] = []
for (const frame of frames) {
if (frame instanceof MessageFrame && frame.header.t === type) {
evts.push(frame.body)
}
}
return evts
}
const getAccountEvts = (frames: Frame[]): AccountEvt[] => {
return getEventType(frames, '#account')
}
const getIdentityEvts = (frames: Frame[]): IdentityEvt[] => {
return getEventType(frames, '#identity')
}
const getHandleEvts = (frames: Frame[]): HandleEvt[] => {
return getEventType(frames, '#handle')
}
const getTombstoneEvts = (frames: Frame[]): TombstoneEvt[] => {
return getEventType(frames, '#tombstone')
}
const getCommitEvents = (frames: Frame[]): CommitEvt[] => {
return getEventType(frames, '#commit')
}
const verifyIdentityEvent = (
evt: IdentityEvt,
did: string,
handle?: string,
) => {
expect(typeof evt.seq).toBe('number')
expect(evt.did).toBe(did)
expect(typeof evt.time).toBe('string')
expect(evt.handle).toEqual(handle)
}
const verifyHandleEvent = (evt: HandleEvt, did: string, handle: string) => {
expect(typeof evt.seq).toBe('number')
expect(evt.did).toBe(did)
expect(evt.handle).toBe(handle)
expect(typeof evt.time).toBe('string')
}
const verifyAccountEvent = (
evt: AccountEvt,
did: string,
active: boolean,
status?: AccountStatus,
) => {
expect(typeof evt.seq).toBe('number')
expect(evt.did).toBe(did)
expect(typeof evt.time).toBe('string')
expect(evt.active).toBe(active)
expect(evt.status).toBe(status)
}
const verifyTombstoneEvent = (evt: unknown, did: string) => {
expect(evt?.['did']).toBe(did)
expect(typeof evt?.['time']).toBe('string')
expect(typeof evt?.['seq']).toBe('number')
}
const verifyCommitEvents = async (frames: Frame[]) => {
const forUser = (user: string) => (commit: CommitEvt) =>
commit.repo === user
const commits = getCommitEvents(frames)
await verifyRepo(alice, commits.filter(forUser(alice)))
await verifyRepo(bob, commits.filter(forUser(bob)))
await verifyRepo(carol, commits.filter(forUser(carol)))
await verifyRepo(dan, commits.filter(forUser(dan)))
}
const verifyRepo = async (did: string, evts: CommitEvt[]) => {
const fromRpc = await getRepo(did)
const contents = {} as Record<string, Record<string, CID>>
const allBlocks = new repo.BlockMap()
for (const evt of evts) {
const car = await readCar(evt.blocks)
allBlocks.addMap(car.blocks)
for (const op of evt.ops) {
const { collection, rkey } = repo.parseDataKey(op.path)
if (op.action === 'delete') {
delete contents[collection][rkey]
} else {
if (op.cid) {
contents[collection] ??= {}
contents[collection][rkey] ??= op.cid
}
}
}
}
for (const write of fromRpc.creates) {
expect(contents[write.collection][write.rkey].equals(write.cid)).toBe(
true,
)
}
const lastCommit = evts.at(-1)?.commit
if (!lastCommit) {
throw new Error('no last commit')
}
const signingKey = await network.pds.ctx.actorStore.keypair(did)
const fromStream = await repo.verifyRepo(
allBlocks,
lastCommit,
did,
signingKey.did(),
)
const fromRpcOps = fromRpc.creates
const fromStreamOps = fromStream.creates
expect(fromStreamOps.length).toEqual(fromRpcOps.length)
for (let i = 0; i < fromRpcOps.length; i++) {
expect(fromStreamOps[i].collection).toEqual(fromRpcOps[i].collection)
expect(fromStreamOps[i].rkey).toEqual(fromRpcOps[i].rkey)
expect(fromStreamOps[i].cid).toEqual(fromRpcOps[i].cid)
}
}
const randomPost = (by: string) => sc.post(by, randomStr(8, 'base32'))
const makePosts = async () => {
for (let i = 0; i < 10; i++) {
await Promise.all([
randomPost(alice),
randomPost(bob),
randomPost(carol),
randomPost(dan),
])
}
}
const readTillCaughtUp = async <T>(
gen: AsyncGenerator<T>,
waitFor?: Promise<unknown>,
) => {
const isDone = async (evt: any) => {
if (evt === undefined) return false
if (evt instanceof ErrorFrame) return true
const curr = await ctx.sequencer.curr()
return evt.body.seq === curr
}
return readFromGenerator(gen, isDone, waitFor)
}
it('sync backfilled events', async () => {
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`,
)
const gen = byFrame(ws)
const evts = await readTillCaughtUp(gen)
ws.terminate()
await verifyCommitEvents(evts)
const accountEvts = getAccountEvts(evts)
expect(accountEvts.length).toBe(4)
verifyAccountEvent(accountEvts[0], alice, true)
verifyAccountEvent(accountEvts[1], bob, true)
verifyAccountEvent(accountEvts[2], carol, true)
verifyAccountEvent(accountEvts[3], dan, true)
const identityEvts = getIdentityEvts(evts)
expect(identityEvts.length).toBe(4)
verifyIdentityEvent(identityEvts[0], alice, 'alice.test')
verifyIdentityEvent(identityEvts[1], bob, 'bob.test')
verifyIdentityEvent(identityEvts[2], carol, 'carol.test')
verifyIdentityEvent(identityEvts[3], dan, 'dan.test')
})
it('syncs new events', async () => {
const postPromise = makePosts()
const readAfterDelay = async () => {
await wait(200) // wait just a hair so that we catch it during cutover
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`,
)
const evts = await readTillCaughtUp(byFrame(ws), postPromise)
ws.terminate()
return evts
}
const [evts] = await Promise.all([readAfterDelay(), postPromise])
await verifyCommitEvents(evts)
})
it('handles no backfill', async () => {
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos`,
)
const makePostsAfterWait = async () => {
// give them just a second to get subscriptions set up
await wait(200)
await makePosts()
}
const postPromise = makePostsAfterWait()
const [evts] = await Promise.all([
readTillCaughtUp(byFrame(ws), postPromise),
postPromise,
])
ws.terminate()
expect(evts.length).toBe(40)
await wait(100) // Let cleanup occur on server
expect(ctx.sequencer.listeners('events').length).toEqual(0)
})
it('backfills only from provided cursor', async () => {
const seqs = await ctx.sequencer.db.db
.selectFrom('repo_seq')
.selectAll()
.orderBy('seq', 'asc')
.execute()
const midPoint = Math.floor(seqs.length / 2)
const midPointSeq = seqs[midPoint].seq
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${midPointSeq}`,
)
const evts = await readTillCaughtUp(byFrame(ws))
ws.terminate()
const seqSlice = seqs.slice(midPoint + 1)
expect(evts.length).toBe(seqSlice.length)
for (let i = 0; i < evts.length; i++) {
const evt = evts[i].body as CommitEvt
const seq = seqSlice[i]
const seqEvt = cborDecode(seq.event) as { commit: CID }
expect(evt.time).toEqual(seq.sequencedAt)
expect(evt.commit.equals(seqEvt.commit)).toBeTruthy()
expect(evt.repo).toEqual(seq.did)
}
})
it('syncs handle changes', async () => {
await sc.updateHandle(alice, 'alice2.test')
await sc.updateHandle(bob, 'bob2.test')
await sc.updateHandle(bob, 'bob2.test') // idempotent update re-sends
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`,
)
const gen = byFrame(ws)
const evts = await readTillCaughtUp(gen)
ws.terminate()
await verifyCommitEvents(evts)
const handleEvts = getHandleEvts(evts.slice(-6))
expect(handleEvts.length).toBe(3)
verifyHandleEvent(handleEvts[0], alice, 'alice2.test')
verifyHandleEvent(handleEvts[1], bob, 'bob2.test')
verifyHandleEvent(handleEvts[2], bob, 'bob2.test')
const identityEvts = getIdentityEvts(evts.slice(-6))
expect(identityEvts.length).toBe(3)
verifyIdentityEvent(identityEvts[0], alice, 'alice2.test')
verifyIdentityEvent(identityEvts[1], bob, 'bob2.test')
verifyIdentityEvent(identityEvts[2], bob, 'bob2.test')
})
it('resends handle events on idempotent updates', async () => {
const update = sc.updateHandle(bob, 'bob2.test')
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos`,
)
const gen = byFrame(ws)
const evts = await readTillCaughtUp(gen, update)
ws.terminate()
const handleEvts = getHandleEvts(evts.slice(-2))
verifyHandleEvent(handleEvts[0], bob, 'bob2.test')
})
it('syncs account events', async () => {
// deactivate then reactivate alice
await agent.api.com.atproto.server.deactivateAccount(
{},
{
encoding: 'application/json',
headers: sc.getHeaders(alice),
},
)
await agent.api.com.atproto.server.activateAccount(undefined, {
headers: sc.getHeaders(alice),
})
// takedown then restore bob
await agent.api.com.atproto.admin.updateSubjectStatus(
{
subject: {
$type: 'com.atproto.admin.defs#repoRef',
did: bob,
},
takedown: { applied: true },
},
{
encoding: 'application/json',
headers: network.pds.adminAuthHeaders(),
},
)
await agent.api.com.atproto.admin.updateSubjectStatus(
{
subject: {
$type: 'com.atproto.admin.defs#repoRef',
did: bob,
},
takedown: { applied: false },
},
{
encoding: 'application/json',
headers: network.pds.adminAuthHeaders(),
},
)
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`,
)
const gen = byFrame(ws)
const evts = await readTillCaughtUp(gen)
ws.terminate()
// @NOTE requires a larger slice because of over-emission on activateAccount - see note on route
const accountEvts = getAccountEvts(evts.slice(-6))
expect(accountEvts.length).toBe(4)
verifyAccountEvent(accountEvts[0], alice, false, AccountStatus.Deactivated)
verifyAccountEvent(accountEvts[1], alice, true)
verifyAccountEvent(accountEvts[2], bob, false, AccountStatus.Takendown)
verifyAccountEvent(accountEvts[3], bob, true)
})
it('syncs interleaved account events', async () => {
// deactivate -> takedown -> restore -> activate
// deactivate then reactivate alice
await agent.api.com.atproto.server.deactivateAccount(
{},
{
encoding: 'application/json',
headers: sc.getHeaders(alice),
},
)
await agent.api.com.atproto.admin.updateSubjectStatus(
{
subject: {
$type: 'com.atproto.admin.defs#repoRef',
did: alice,
},
takedown: { applied: true },
},
{
encoding: 'application/json',
headers: network.pds.adminAuthHeaders(),
},
)
await agent.api.com.atproto.admin.updateSubjectStatus(
{
subject: {
$type: 'com.atproto.admin.defs#repoRef',
did: alice,
},
takedown: { applied: false },
},
{
encoding: 'application/json',
headers: network.pds.adminAuthHeaders(),
},
)
await agent.api.com.atproto.server.activateAccount(undefined, {
headers: sc.getHeaders(alice),
})
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`,
)
const gen = byFrame(ws)
const evts = await readTillCaughtUp(gen)
ws.terminate()
// @NOTE requires a larger slice because of over-emission on activateAccount - see note on route
const accountEvts = getAccountEvts(evts.slice(-6))
expect(accountEvts.length).toBe(4)
verifyAccountEvent(accountEvts[0], alice, false, AccountStatus.Deactivated)
verifyAccountEvent(accountEvts[1], alice, false, AccountStatus.Takendown)
verifyAccountEvent(accountEvts[2], alice, false, AccountStatus.Deactivated)
verifyAccountEvent(accountEvts[3], alice, true)
})
it('syncs tombstones', async () => {
const baddie1 = (
await sc.createAccount('baddie1.test', {
email: 'baddie1@test.com',
handle: 'baddie1.test',
password: 'baddie1-pass',
})
).did
const baddie2 = (
await sc.createAccount('baddie2.test', {
email: 'baddie2@test.com',
handle: 'baddie2.test',
password: 'baddie2-pass',
})
).did
const deleteToken = await ctx.accountManager.createEmailToken(
baddie1,
'delete_account',
)
await agent.api.com.atproto.server.deleteAccount({
did: baddie1,
password: 'baddie1-pass',
token: deleteToken,
})
await agent.api.com.atproto.admin.deleteAccount(
{
did: baddie2,
},
{
encoding: 'application/json',
headers: network.pds.adminAuthHeaders(),
},
)
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`,
)
const gen = byFrame(ws)
const evts = await readTillCaughtUp(gen)
ws.terminate()
const tombstoneEvts = getTombstoneEvts(evts.slice(-4))
expect(tombstoneEvts.length).toBe(2)
verifyTombstoneEvent(tombstoneEvts[0], baddie1)
verifyTombstoneEvent(tombstoneEvts[1], baddie2)
const accountEvts = getAccountEvts(evts.slice(-4))
expect(accountEvts.length).toBe(2)
verifyAccountEvent(accountEvts[0], baddie1, false, AccountStatus.Deleted)
verifyAccountEvent(accountEvts[1], baddie2, false, AccountStatus.Deleted)
})
it('account deletions invalidate all seq ops', async () => {
const baddie3 = (
await sc.createAccount('baddie3', {
email: 'baddie3@test.com',
handle: 'baddie3.test',
password: 'baddie3-pass',
})
).did
await randomPost(baddie3)
await sc.updateHandle(baddie3, 'baddie3-update.test')
const token = await network.pds.ctx.accountManager.createEmailToken(
baddie3,
'delete_account',
)
await agent.api.com.atproto.server.deleteAccount({
token,
did: baddie3,
password: sc.accounts[baddie3].password,
})
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`,
)
const gen = byFrame(ws)
const evts = await readTillCaughtUp(gen)
ws.terminate()
const didEvts = getAllEvents(baddie3, evts)
expect(didEvts.length).toBe(1)
verifyTombstoneEvent(didEvts[0], baddie3)
})
it('sends info frame on out of date cursor', async () => {
// we rewrite the sequenceAt time for existing seqs to be past the backfill cutoff
// then we create some new posts
const overAnHourAgo = new Date(Date.now() - HOUR - MINUTE).toISOString()
await ctx.sequencer.db.db
.updateTable('repo_seq')
.set({ sequencedAt: overAnHourAgo })
.execute()
await makePosts()
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`,
)
const [info, ...evts] = await readTillCaughtUp(byFrame(ws))
ws.terminate()
if (!(info instanceof MessageFrame)) {
throw new Error('Expected first frame to be a MessageFrame')
}
expect(info.header.t).toBe('#info')
const body = info.body as Record<string, unknown>
expect(body.name).toEqual('OutdatedCursor')
expect(evts.length).toBe(40)
})
it('errors on future cursor', async () => {
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${100000}`,
)
const frames = await readTillCaughtUp(byFrame(ws))
ws.terminate()
expect(frames.length).toBe(1)
if (!(frames[0] instanceof ErrorFrame)) {
throw new Error('Expected ErrorFrame')
}
expect(frames[0].body.error).toBe('FutureCursor')
})
})