Stop sequencer listener in outbox when subscription aborts ()

* Update types to allow for abort signal in xrpc subscriptions

* Support abort signal in xrpc-server subscriptions

* Stop sequencer listener in outbox when subscription aborts
This commit is contained in:
devin ivy 2023-04-17 11:22:03 -07:00 committed by GitHub
parent d8b50c73e4
commit 6a39ebf9f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 107 additions and 8 deletions
packages
bsky/src/lexicon
lex-cli/src/codegen
pds
src
api/com/atproto/sync
lexicon/types/com/atproto
sequencer
tests/sync
xrpc-server/src

@ -21,6 +21,7 @@ import * as ComAtprotoAdminResolveModerationReports from './types/com/atproto/ad
import * as ComAtprotoAdminReverseModerationAction from './types/com/atproto/admin/reverseModerationAction'
import * as ComAtprotoAdminSearchRepos from './types/com/atproto/admin/searchRepos'
import * as ComAtprotoAdminTakeModerationAction from './types/com/atproto/admin/takeModerationAction'
import * as ComAtprotoAdminUpdateAccountEmail from './types/com/atproto/admin/updateAccountEmail'
import * as ComAtprotoAdminUpdateAccountHandle from './types/com/atproto/admin/updateAccountHandle'
import * as ComAtprotoIdentityResolveHandle from './types/com/atproto/identity/resolveHandle'
import * as ComAtprotoIdentityUpdateHandle from './types/com/atproto/identity/updateHandle'
@ -257,6 +258,16 @@ export class AdminNS {
return this._server.xrpc.method(nsid, cfg)
}
updateAccountEmail<AV extends AuthVerifier>(
cfg: ConfigOf<
AV,
ComAtprotoAdminUpdateAccountEmail.Handler<ExtractAuth<AV>>
>,
) {
const nsid = 'com.atproto.admin.updateAccountEmail' // @ts-ignore
return this._server.xrpc.method(nsid, cfg)
}
updateAccountHandle<AV extends AuthVerifier>(
cfg: ConfigOf<
AV,

@ -1063,6 +1063,33 @@ export const schemaDict = {
},
},
},
ComAtprotoAdminUpdateAccountEmail: {
lexicon: 1,
id: 'com.atproto.admin.updateAccountEmail',
defs: {
main: {
type: 'procedure',
description: "Administrative action to update an account's email",
input: {
encoding: 'application/json',
schema: {
type: 'object',
required: ['account', 'email'],
properties: {
account: {
type: 'string',
format: 'at-identifier',
description: 'The handle or DID of the repo.',
},
email: {
type: 'string',
},
},
},
},
},
},
},
ComAtprotoAdminUpdateAccountHandle: {
lexicon: 1,
id: 'com.atproto.admin.updateAccountHandle',
@ -4796,6 +4823,7 @@ export const ids = {
'com.atproto.admin.reverseModerationAction',
ComAtprotoAdminSearchRepos: 'com.atproto.admin.searchRepos',
ComAtprotoAdminTakeModerationAction: 'com.atproto.admin.takeModerationAction',
ComAtprotoAdminUpdateAccountEmail: 'com.atproto.admin.updateAccountEmail',
ComAtprotoAdminUpdateAccountHandle: 'com.atproto.admin.updateAccountHandle',
ComAtprotoIdentityResolveHandle: 'com.atproto.identity.resolveHandle',
ComAtprotoIdentityUpdateHandle: 'com.atproto.identity.updateHandle',

@ -0,0 +1,37 @@
/**
* GENERATED CODE - DO NOT MODIFY
*/
import express from 'express'
import { ValidationResult, BlobRef } from '@atproto/lexicon'
import { lexicons } from '../../../../lexicons'
import { isObj, hasProp } from '../../../../util'
import { CID } from 'multiformats/cid'
import { HandlerAuth } from '@atproto/xrpc-server'
export interface QueryParams {}
export interface InputSchema {
/** The handle or DID of the repo. */
account: string
email: string
[k: string]: unknown
}
export interface HandlerInput {
encoding: 'application/json'
body: InputSchema
}
export interface HandlerError {
status: number
message?: string
}
export type HandlerOutput = HandlerError | void
export type Handler<HA extends HandlerAuth = never> = (ctx: {
auth: HA
params: QueryParams
input: HandlerInput
req: express.Request
res: express.Response
}) => Promise<HandlerOutput> | HandlerOutput

@ -24,6 +24,7 @@ export type Handler<HA extends HandlerAuth = never> = (ctx: {
auth: HA
params: QueryParams
req: IncomingMessage
signal: AbortSignal
}) => AsyncIterable<HandlerOutput>
export interface Labels {

@ -26,6 +26,7 @@ export type Handler<HA extends HandlerAuth = never> = (ctx: {
auth: HA
params: QueryParams
req: IncomingMessage
signal: AbortSignal
}) => AsyncIterable<HandlerOutput>
export interface Commit {

@ -530,6 +530,7 @@ function genServerXrpcStreaming(
auth: HA
params: QueryParams
req: IncomingMessage
signal: AbortSignal
}) => AsyncIterable<HandlerOutput>`,
})
}

@ -4,7 +4,7 @@ import Outbox from '../../../../sequencer/outbox'
import { InvalidRequestError } from '@atproto/xrpc-server'
export default function (server: Server, ctx: AppContext) {
server.com.atproto.sync.subscribeRepos(async function* ({ params }) {
server.com.atproto.sync.subscribeRepos(async function* ({ params, signal }) {
const { cursor } = params
const outbox = new Outbox(ctx.sequencer, {
maxBufferSize: ctx.cfg.maxSubscriptionBuffer,
@ -30,7 +30,7 @@ export default function (server: Server, ctx: AppContext) {
}
}
for await (const evt of outbox.events(cursor, backfillTime)) {
for await (const evt of outbox.events(cursor, backfillTime, signal)) {
if (evt.type === 'commit') {
yield {
$type: '#commit',

@ -24,6 +24,7 @@ export type Handler<HA extends HandlerAuth = never> = (ctx: {
auth: HA
params: QueryParams
req: IncomingMessage
signal: AbortSignal
}) => AsyncIterable<HandlerOutput>
export interface Labels {

@ -26,6 +26,7 @@ export type Handler<HA extends HandlerAuth = never> = (ctx: {
auth: HA
params: QueryParams
req: IncomingMessage
signal: AbortSignal
}) => AsyncIterable<HandlerOutput>
export interface Commit {

@ -30,6 +30,7 @@ export class Outbox {
async *events(
backfillCursor?: number,
backFillTime?: string,
signal?: AbortSignal,
): AsyncGenerator<SeqEvt> {
// catch up as much as we can
if (backfillCursor !== undefined) {
@ -43,13 +44,21 @@ export class Outbox {
}
// streams updates from sequencer, but buffers them for cutover as it makes a last request
this.sequencer.on('events', (evts) => {
const addToBuffer = (evts) => {
if (this.caughtUp) {
this.outBuffer.pushMany(evts)
} else {
this.cutoverBuffer = [...this.cutoverBuffer, ...evts]
}
})
}
if (!signal?.aborted) {
this.sequencer.on('events', addToBuffer)
}
signal?.addEventListener('abort', () =>
this.sequencer.off('events', addToBuffer),
)
const cutover = async () => {
// only need to perform cutover if we've been backfilling

@ -224,6 +224,9 @@ describe('repo subscribe repos', () => {
ws.terminate()
expect(evts.length).toBe(40)
await wait(100) // Let cleanup occur on server
expect(ctx.sequencer.listeners('events').length).toEqual(0)
})
it('backfills only from provided cursor', async () => {

@ -260,7 +260,7 @@ export class Server {
nsid,
new XrpcStreamServer({
noServer: true,
handler: async function* (req) {
handler: async function* (req, signal) {
try {
// authenticate request
const auth = await config.auth?.({ req })
@ -275,7 +275,7 @@ export class Server {
throw new InvalidRequestError(String(e))
}
// stream
const items = config.handler({ req, params, auth })
const items = config.handler({ req, params, auth, signal })
for await (const item of items) {
if (item instanceof Frame) {
yield item

@ -12,8 +12,12 @@ export class XrpcStreamServer {
this.wss.on('connection', async (socket, req) => {
socket.on('error', (err) => logger.error(err, 'websocket error'))
try {
const iterator = unwrapIterator(handler(req, socket, this))
socket.once('close', () => iterator.return?.())
const ac = new AbortController()
const iterator = unwrapIterator(handler(req, ac.signal, socket, this))
socket.once('close', () => {
iterator.return?.()
ac.abort()
})
const safeFrames = wrapIterator(iterator)
for await (const frame of safeFrames) {
if (frame instanceof ErrorFrame) {
@ -43,6 +47,7 @@ export class XrpcStreamServer {
export type Handler = (
req: IncomingMessage,
signal: AbortSignal,
socket: WebSocket,
server: XrpcStreamServer,
) => AsyncIterable<Frame>

@ -61,6 +61,7 @@ export type XRPCStreamHandler = (ctx: {
auth: HandlerAuth | undefined
params: Params
req: IncomingMessage
signal: AbortSignal
}) => AsyncIterable<unknown>
export type AuthOutput = HandlerAuth | HandlerError