Daniel Holmgren 255d5ea1f0
Account deactivation (#2531)
* subscribe repos lex: new #account event type

* lex: getAccountStatus endpoint

* lex: add account status errors to sync methods

* tweak type of token union

* fix getAccountStatus parameter name

* codegen

* Account -> Repo

* re-codegen

* update errors in sync methods

* add getRepoStatus route

* add account events to sequencer

* emit account evts

* fix test + small bugfixin

* handle evt on bsky side

* codegen

* loggable message

* schema tweaks

* build errors & tidy

* tidy account deactivation tests

* more subscribe repos tests

* identity evt tests + tidy

* return optional did doc on identity evts

* update identity evt

* update impl for identity evt handles

* add description to handle field

* status on listRepos & notate deprecated firehose events

* return status on listRepos

* pr feedback

* tests

* protos

* add status to session schemas

* codegen

* revamp auth-verifier

* thread through agent

* fix tests

* fix account deletion test

* fix more tests

* fix type error in repo.test

* bsky index account evts

* apply in appview

* add to admin route

* add deactivatedAt to ozone views

* pds tests

* appview tests

* ozone test

* fix bsky test

* fix agent test

* fix indexing test

* tweak session schemas

* update session routes & clean up status handling in pds & agent

* changeset

* patch pds test

* rm active from knownValues

---------

Co-authored-by: bryan newbold <bnewbold@robocracy.org>
2024-05-31 16:39:21 -05:00

189 lines
5.1 KiB
TypeScript

import { sql } from 'kysely'
import { wait } from '@atproto/common'
import { TestNetwork } from '@atproto/dev-env'
import { Database } from '../../src'
describe('db', () => {
let network: TestNetwork
let db: Database
beforeAll(async () => {
network = await TestNetwork.create({
dbPostgresSchema: 'bsky_db',
})
db = network.bsky.db
})
afterAll(async () => {
await network.close()
})
it('handles client errors without crashing.', async () => {
const tryKillConnection = db.transaction(async (dbTxn) => {
const result = await sql`select pg_backend_pid() as pid;`.execute(
dbTxn.db,
)
const pid = result.rows[0]?.['pid'] as number
await sql`select pg_terminate_backend(${pid});`.execute(db.db)
await sql`select 1;`.execute(dbTxn.db)
})
// This should throw, but no unhandled error
await expect(tryKillConnection).rejects.toThrow()
})
it('handles pool errors without crashing.', async () => {
const conn1 = await db.pool.connect()
const conn2 = await db.pool.connect()
const result = await conn1.query('select pg_backend_pid() as pid;')
const conn1pid: number = result.rows[0].pid
conn1.release()
await wait(100) // let release apply, conn is now idle on pool.
await conn2.query(`select pg_terminate_backend(${conn1pid});`)
conn2.release()
})
describe('transaction()', () => {
it('commits changes', async () => {
const result = await db.transaction(async (dbTxn) => {
return await dbTxn.db
.insertInto('actor')
.values({
did: 'x',
handle: 'x',
indexedAt: 'bad-date',
})
.returning('did')
.executeTakeFirst()
})
if (!result) {
return expect(result).toBeTruthy()
}
expect(result.did).toEqual('x')
const row = await db.db
.selectFrom('actor')
.select(['did', 'handle', 'indexedAt'])
.where('did', '=', 'x')
.executeTakeFirst()
expect(row).toEqual({
did: 'x',
handle: 'x',
indexedAt: 'bad-date',
})
})
it('rolls-back changes on failure', async () => {
const promise = db.transaction(async (dbTxn) => {
await dbTxn.db
.insertInto('actor')
.values({
did: 'y',
handle: 'y',
indexedAt: 'bad-date',
})
.returning('did')
.executeTakeFirst()
throw new Error('Oops!')
})
await expect(promise).rejects.toThrow('Oops!')
const row = await db.db
.selectFrom('actor')
.selectAll()
.where('did', '=', 'y')
.executeTakeFirst()
expect(row).toBeUndefined()
})
it('indicates isTransaction', async () => {
expect(db.isTransaction).toEqual(false)
await db.transaction(async (dbTxn) => {
expect(db.isTransaction).toEqual(false)
expect(dbTxn.isTransaction).toEqual(true)
})
expect(db.isTransaction).toEqual(false)
})
it('asserts transaction', async () => {
expect(() => db.assertTransaction()).toThrow('Transaction required')
await db.transaction(async (dbTxn) => {
expect(() => dbTxn.assertTransaction()).not.toThrow()
})
})
it('does not allow leaky transactions', async () => {
let leakedTx: Database | undefined
const tx = db.transaction(async (dbTxn) => {
leakedTx = dbTxn
await dbTxn.db
.insertInto('actor')
.values({ handle: 'a', did: 'a', indexedAt: 'bad-date' })
.execute()
throw new Error('test tx failed')
})
await expect(tx).rejects.toThrow('test tx failed')
const attempt = leakedTx?.db
.insertInto('actor')
.values({ handle: 'b', did: 'b', indexedAt: 'bad-date' })
.execute()
await expect(attempt).rejects.toThrow('tx already failed')
const res = await db.db
.selectFrom('actor')
.selectAll()
.where('did', 'in', ['a', 'b'])
.execute()
expect(res.length).toBe(0)
})
it('ensures all inflight queries are rolled back', async () => {
let promise: Promise<unknown> | undefined = undefined
const names: string[] = []
try {
await db.transaction(async (dbTxn) => {
const queries: Promise<unknown>[] = []
for (let i = 0; i < 20; i++) {
const name = `user${i}`
const query = dbTxn.db
.insertInto('actor')
.values({
handle: name,
did: name,
indexedAt: 'bad-date',
})
.execute()
names.push(name)
queries.push(query)
}
promise = Promise.allSettled(queries)
throw new Error()
})
} catch (err) {
expect(err).toBeDefined()
}
if (promise) {
await promise
}
const res = await db.db
.selectFrom('actor')
.selectAll()
.where('did', 'in', names)
.execute()
expect(res.length).toBe(0)
})
})
})