Catch missed txn rollbacks ()

* catch missed rollbacks

* change approach

* revert back to simple way

* test

* add to bsky package as well

* improve handler & tests
This commit is contained in:
Daniel Holmgren 2023-05-26 11:04:26 -05:00 committed by GitHub
parent 7a7c9c75af
commit 03a74b4259
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 90 additions and 2 deletions
packages
bsky
pds

@ -62,7 +62,14 @@ export class Database {
.transaction()
.execute(async (txn) => {
const dbTxn = new Database(txn, this.cfg)
const txRes = await fn(dbTxn).finally(() => leakyTxPlugin.endTx())
const txRes = await fn(dbTxn)
.catch(async (err) => {
leakyTxPlugin.endTx()
// ensure that all in-flight queries are flushed & the connection is open
await dbTxn.db.getExecutor().provideConnection(async () => {})
throw err
})
.finally(() => leakyTxPlugin.endTx())
return txRes
})
return res

@ -125,6 +125,43 @@ describe('db', () => {
expect(res.length).toBe(0)
})
it('ensures all inflight querys are rolled back', async () => {
let promise: Promise<unknown> | undefined = undefined
const names: string[] = []
try {
await db.transaction(async (dbTxn) => {
const queries: Promise<unknown>[] = []
for (let i = 0; i < 20; i++) {
const name = `user${i}`
const query = dbTxn.db
.insertInto('actor')
.values({
handle: name,
did: name,
indexedAt: 'bad-date',
})
.execute()
names.push(name)
queries.push(query)
}
promise = Promise.allSettled(queries)
throw new Error()
})
} catch (err) {
expect(err).toBeDefined()
}
if (promise) {
await promise
}
const res = await db.db
.selectFrom('actor')
.selectAll()
.where('did', 'in', names)
.execute()
expect(res.length).toBe(0)
})
})
describe('Leader', () => {

@ -179,7 +179,14 @@ export class Database {
.transaction()
.execute(async (txn) => {
const dbTxn = new Database(txn, this.cfg, this.channels)
const txRes = await fn(dbTxn).finally(() => leakyTxPlugin.endTx())
const txRes = await fn(dbTxn)
.catch(async (err) => {
leakyTxPlugin.endTx()
// ensure that all in-flight queries are flushed & the connection is open
await dbTxn.db.getExecutor().provideConnection(async () => {})
throw err
})
.finally(() => leakyTxPlugin.endTx())
txMsgs = dbTxn.txChannelMsgs
return { txRes, dbTxn }
})

@ -129,6 +129,43 @@ describe('db', () => {
expect(res.length).toBe(0)
})
it('ensures all inflight querys are rolled back', async () => {
let promise: Promise<unknown> | undefined = undefined
const names: string[] = []
try {
await db.transaction(async (dbTxn) => {
const queries: Promise<unknown>[] = []
for (let i = 0; i < 20; i++) {
const name = `user${i}`
const query = dbTxn.db
.insertInto('repo_root')
.values({
root: name,
did: name,
indexedAt: 'bad-date',
})
.execute()
names.push(name)
queries.push(query)
}
promise = Promise.allSettled(queries)
throw new Error()
})
} catch (err) {
expect(err).toBeDefined()
}
if (promise) {
await promise
}
const res = await db.db
.selectFrom('repo_root')
.selectAll()
.where('did', 'in', names)
.execute()
expect(res.length).toBe(0)
})
})
describe('Leader', () => {