255d5ea1f0
* subscribe repos lex: new #account event type * lex: getAccountStatus endpoint * lex: add account status errors to sync methods * tweak type of token union * fix getAccountStatus parameter name * codegen * Account -> Repo * re-codegen * update errors in sync methods * add getRepoStatus route * add account events to sequencer * emit account evts * fix test + small bugfixin * handle evt on bsky side * codegen * loggable message * schema tweaks * build errors & tidy * tidy account deactivation tests * more subscribe repos tests * identity evt tests + tidy * return optional did doc on identity evts * update identity evt * update impl for identity evt handles * add description to handle field * status on listRepos & notate deprecated firehose events * return status on listRepos * pr feedback * tests * protos * add status to session schemas * codegen * revamp auth-verifier * thread through agent * fix tests * fix account deletion test * fix more tests * fix type error in repo.test * bsky index account evts * apply in appview * add to admin route * add deactivatedAt to ozone views * pds tests * appview tests * ozone test * fix bsky test * fix agent test * fix indexing test * tweak session schemas * update session routes & clean up status handling in pds & agent * changeset * patch pds test * rm active from knownValues --------- Co-authored-by: bryan newbold <bnewbold@robocracy.org>
189 lines
5.1 KiB
TypeScript
189 lines
5.1 KiB
TypeScript
import { sql } from 'kysely'
|
|
import { wait } from '@atproto/common'
|
|
import { TestNetwork } from '@atproto/dev-env'
|
|
import { Database } from '../../src'
|
|
|
|
describe('db', () => {
|
|
let network: TestNetwork
|
|
let db: Database
|
|
|
|
beforeAll(async () => {
|
|
network = await TestNetwork.create({
|
|
dbPostgresSchema: 'bsky_db',
|
|
})
|
|
db = network.bsky.db
|
|
})
|
|
|
|
afterAll(async () => {
|
|
await network.close()
|
|
})
|
|
|
|
it('handles client errors without crashing.', async () => {
|
|
const tryKillConnection = db.transaction(async (dbTxn) => {
|
|
const result = await sql`select pg_backend_pid() as pid;`.execute(
|
|
dbTxn.db,
|
|
)
|
|
const pid = result.rows[0]?.['pid'] as number
|
|
await sql`select pg_terminate_backend(${pid});`.execute(db.db)
|
|
await sql`select 1;`.execute(dbTxn.db)
|
|
})
|
|
// This should throw, but no unhandled error
|
|
await expect(tryKillConnection).rejects.toThrow()
|
|
})
|
|
|
|
it('handles pool errors without crashing.', async () => {
|
|
const conn1 = await db.pool.connect()
|
|
const conn2 = await db.pool.connect()
|
|
const result = await conn1.query('select pg_backend_pid() as pid;')
|
|
const conn1pid: number = result.rows[0].pid
|
|
conn1.release()
|
|
await wait(100) // let release apply, conn is now idle on pool.
|
|
await conn2.query(`select pg_terminate_backend(${conn1pid});`)
|
|
conn2.release()
|
|
})
|
|
|
|
describe('transaction()', () => {
|
|
it('commits changes', async () => {
|
|
const result = await db.transaction(async (dbTxn) => {
|
|
return await dbTxn.db
|
|
.insertInto('actor')
|
|
.values({
|
|
did: 'x',
|
|
handle: 'x',
|
|
indexedAt: 'bad-date',
|
|
})
|
|
.returning('did')
|
|
.executeTakeFirst()
|
|
})
|
|
|
|
if (!result) {
|
|
return expect(result).toBeTruthy()
|
|
}
|
|
|
|
expect(result.did).toEqual('x')
|
|
|
|
const row = await db.db
|
|
.selectFrom('actor')
|
|
.select(['did', 'handle', 'indexedAt'])
|
|
.where('did', '=', 'x')
|
|
.executeTakeFirst()
|
|
|
|
expect(row).toEqual({
|
|
did: 'x',
|
|
handle: 'x',
|
|
indexedAt: 'bad-date',
|
|
})
|
|
})
|
|
|
|
it('rolls-back changes on failure', async () => {
|
|
const promise = db.transaction(async (dbTxn) => {
|
|
await dbTxn.db
|
|
.insertInto('actor')
|
|
.values({
|
|
did: 'y',
|
|
handle: 'y',
|
|
indexedAt: 'bad-date',
|
|
})
|
|
.returning('did')
|
|
.executeTakeFirst()
|
|
|
|
throw new Error('Oops!')
|
|
})
|
|
|
|
await expect(promise).rejects.toThrow('Oops!')
|
|
|
|
const row = await db.db
|
|
.selectFrom('actor')
|
|
.selectAll()
|
|
.where('did', '=', 'y')
|
|
.executeTakeFirst()
|
|
|
|
expect(row).toBeUndefined()
|
|
})
|
|
|
|
it('indicates isTransaction', async () => {
|
|
expect(db.isTransaction).toEqual(false)
|
|
|
|
await db.transaction(async (dbTxn) => {
|
|
expect(db.isTransaction).toEqual(false)
|
|
expect(dbTxn.isTransaction).toEqual(true)
|
|
})
|
|
|
|
expect(db.isTransaction).toEqual(false)
|
|
})
|
|
|
|
it('asserts transaction', async () => {
|
|
expect(() => db.assertTransaction()).toThrow('Transaction required')
|
|
|
|
await db.transaction(async (dbTxn) => {
|
|
expect(() => dbTxn.assertTransaction()).not.toThrow()
|
|
})
|
|
})
|
|
|
|
it('does not allow leaky transactions', async () => {
|
|
let leakedTx: Database | undefined
|
|
|
|
const tx = db.transaction(async (dbTxn) => {
|
|
leakedTx = dbTxn
|
|
await dbTxn.db
|
|
.insertInto('actor')
|
|
.values({ handle: 'a', did: 'a', indexedAt: 'bad-date' })
|
|
.execute()
|
|
throw new Error('test tx failed')
|
|
})
|
|
await expect(tx).rejects.toThrow('test tx failed')
|
|
|
|
const attempt = leakedTx?.db
|
|
.insertInto('actor')
|
|
.values({ handle: 'b', did: 'b', indexedAt: 'bad-date' })
|
|
.execute()
|
|
await expect(attempt).rejects.toThrow('tx already failed')
|
|
|
|
const res = await db.db
|
|
.selectFrom('actor')
|
|
.selectAll()
|
|
.where('did', 'in', ['a', 'b'])
|
|
.execute()
|
|
|
|
expect(res.length).toBe(0)
|
|
})
|
|
|
|
it('ensures all inflight queries are rolled back', async () => {
|
|
let promise: Promise<unknown> | undefined = undefined
|
|
const names: string[] = []
|
|
try {
|
|
await db.transaction(async (dbTxn) => {
|
|
const queries: Promise<unknown>[] = []
|
|
for (let i = 0; i < 20; i++) {
|
|
const name = `user${i}`
|
|
const query = dbTxn.db
|
|
.insertInto('actor')
|
|
.values({
|
|
handle: name,
|
|
did: name,
|
|
indexedAt: 'bad-date',
|
|
})
|
|
.execute()
|
|
names.push(name)
|
|
queries.push(query)
|
|
}
|
|
promise = Promise.allSettled(queries)
|
|
throw new Error()
|
|
})
|
|
} catch (err) {
|
|
expect(err).toBeDefined()
|
|
}
|
|
if (promise) {
|
|
await promise
|
|
}
|
|
|
|
const res = await db.db
|
|
.selectFrom('actor')
|
|
.selectAll()
|
|
.where('did', 'in', names)
|
|
.execute()
|
|
expect(res.length).toBe(0)
|
|
})
|
|
})
|
|
})
|