Feature branch: streaming repo sync ()

* Scaffold xrpc-stream

* Write and test stream frames

* Write and test stream server

* Test bad stream frame parsing cases

* Proof of concept streaming on xrpc-server

* Test bad streaming endpoint

* Make frame stream to fix buffering frame bytes together

* reorg

* some lex-cli things

* better codegen for subscriptions

* scaffolding repo subscriptions

* wip

* remove repo ops

* setup notify/listen for db

* end pool to fix hanging test

* small comment in test

* basic sequencer

* some refactoring

* switch to event emitter

* reconnect on listener error

* rename notifyClient

* remove payload on channels

* pr feedback

* subscribeRepo outbox

* some cleanup

* wip

* wip

* bugfixin

* only send msgs after tx is committed

* better handle event-emitter -> generator

* max buffer size

* cleanup

* Sync-up xrpc subscriptions with current iteration of the spec

* Allow missing encoding for subscriptions

* track sequencedAt & eventType as well

* Resolve streaming type codes, streaming tests

* Fix interface for hooking into http server for streaming, fix stream routing

* Minor reorg how streaming is enabled in xrpc-server

* Server codegen for xrpc subscriptions, supporting types in xrpc-server

* fix up buffer overloading

* Rename data frame to message frame, rename subscription lex output to message

* Move body param first for streaming frames

* Tidy

* clean up evt types

* buff up tests

* missed merge conflict

* new schema

* blobs on subscriptions

* rm genned client subscription methods

* backfill limits

* testing subscription route & quick outbox bugfix

* fix up migration

* cascade on delete

* comments & naming

* fix dev env

* delete seqs on account deletion

* tidy

* fixing things up with db notify system for schemas

* fix duplicates in outbox

* tidy

* Break out of stream handler when socket closes

* fixing up some timing issues

* tidy

* terminate ws after using

* bump up timer on async reader

* fixing up NOTIFY in txns

* pr feedback

* pr bugfixes

* make order asc explicit

* bringing tests up to speed w atpagent

* bump up max listeners on sequencer

* increase timeouts for reading from generators

---------

Co-authored-by: Devin Ivy <devinivy@gmail.com>
This commit is contained in:
Daniel Holmgren 2023-02-09 16:17:06 -06:00 committed by GitHub
parent 5554d80d20
commit 7d92eb0513
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
64 changed files with 2819 additions and 224 deletions

@ -0,0 +1,53 @@
{
"lexicon": 1,
"id": "com.atproto.sync.subscribeAllRepos",
"defs": {
"main": {
"type": "subscription",
"description": "Subscribe to repo updates",
"parameters": {
"type": "params",
"properties": {
"backfillFrom": {
"type": "datetime",
"description": "The last known event to backfill from. Does not dedupe as there may be an overlap in timestamps."
}
}
},
"message": {
"schema": {
"type": "union",
"refs": ["#repoAppend", "#repoRebase"]
},
"codes": {
"#repoAppend": 0,
"#repoRebase": 1
}
}
},
"repoAppend": {
"type": "object",
"required": ["time", "repo", "commit", "blocks", "blobs"],
"properties": {
"time": {"type": "datetime"},
"repo": {"type": "string"},
"commit": {"type": "string"},
"prev": {"type": "string"},
"blocks": {"type": "unknown"},
"blobs": {
"type": "array",
"items": {"type": "string"}
}
}
},
"repoRebase": {
"type": "object",
"required": ["time", "repo", "commit"],
"properties": {
"time": {"type": "datetime"},
"repo": {"type": "string"},
"commit": {"type": "string"}
}
}
}
}

@ -53,6 +53,7 @@ import * as ComAtprotoSyncGetCommitPath from './types/com/atproto/sync/getCommit
import * as ComAtprotoSyncGetHead from './types/com/atproto/sync/getHead'
import * as ComAtprotoSyncGetRecord from './types/com/atproto/sync/getRecord'
import * as ComAtprotoSyncGetRepo from './types/com/atproto/sync/getRepo'
import * as ComAtprotoSyncSubscribeAllRepos from './types/com/atproto/sync/subscribeAllRepos'
import * as AppBskyActorGetProfile from './types/app/bsky/actor/getProfile'
import * as AppBskyActorGetProfiles from './types/app/bsky/actor/getProfiles'
import * as AppBskyActorGetSuggestions from './types/app/bsky/actor/getSuggestions'
@ -137,6 +138,7 @@ export * as ComAtprotoSyncGetCommitPath from './types/com/atproto/sync/getCommit
export * as ComAtprotoSyncGetHead from './types/com/atproto/sync/getHead'
export * as ComAtprotoSyncGetRecord from './types/com/atproto/sync/getRecord'
export * as ComAtprotoSyncGetRepo from './types/com/atproto/sync/getRepo'
export * as ComAtprotoSyncSubscribeAllRepos from './types/com/atproto/sync/subscribeAllRepos'
export * as AppBskyActorGetProfile from './types/app/bsky/actor/getProfile'
export * as AppBskyActorGetProfiles from './types/app/bsky/actor/getProfiles'
export * as AppBskyActorGetSuggestions from './types/app/bsky/actor/getSuggestions'

@ -2173,6 +2173,81 @@ export const schemaDict = {
},
},
},
ComAtprotoSyncSubscribeAllRepos: {
lexicon: 1,
id: 'com.atproto.sync.subscribeAllRepos',
defs: {
main: {
type: 'subscription',
description: 'Subscribe to repo updates',
parameters: {
type: 'params',
properties: {
backfillFrom: {
type: 'datetime',
description:
'The last known event to backfill from. Does not dedupe as there may be an overlap in timestamps.',
},
},
},
message: {
schema: {
type: 'union',
refs: [
'lex:com.atproto.sync.subscribeAllRepos#repoAppend',
'lex:com.atproto.sync.subscribeAllRepos#repoRebase',
],
},
codes: {
'lex:com.atproto.sync.subscribeAllRepos#repoAppend': 0,
'lex:com.atproto.sync.subscribeAllRepos#repoRebase': 1,
},
},
},
repoAppend: {
type: 'object',
required: ['time', 'repo', 'commit', 'blocks', 'blobs'],
properties: {
time: {
type: 'datetime',
},
repo: {
type: 'string',
},
commit: {
type: 'string',
},
prev: {
type: 'string',
},
blocks: {
type: 'unknown',
},
blobs: {
type: 'array',
items: {
type: 'string',
},
},
},
},
repoRebase: {
type: 'object',
required: ['time', 'repo', 'commit'],
properties: {
time: {
type: 'datetime',
},
repo: {
type: 'string',
},
commit: {
type: 'string',
},
},
},
},
},
AppBskyActorGetProfile: {
lexicon: 1,
id: 'app.bsky.actor.getProfile',
@ -4119,6 +4194,7 @@ export const ids = {
ComAtprotoSyncGetHead: 'com.atproto.sync.getHead',
ComAtprotoSyncGetRecord: 'com.atproto.sync.getRecord',
ComAtprotoSyncGetRepo: 'com.atproto.sync.getRepo',
ComAtprotoSyncSubscribeAllRepos: 'com.atproto.sync.subscribeAllRepos',
AppBskyActorGetProfile: 'app.bsky.actor.getProfile',
AppBskyActorGetProfiles: 'app.bsky.actor.getProfiles',
AppBskyActorGetSuggestions: 'app.bsky.actor.getSuggestions',

@ -0,0 +1,48 @@
/**
* GENERATED CODE - DO NOT MODIFY
*/
import { Headers, XRPCError } from '@atproto/xrpc'
import { ValidationResult } from '@atproto/lexicon'
import { isObj, hasProp } from '../../../../util'
import { lexicons } from '../../../../lexicons'
export interface RepoAppend {
time: string
repo: string
commit: string
prev?: string
blocks: {}
blobs: string[]
[k: string]: unknown
}
export function isRepoAppend(v: unknown): v is RepoAppend {
return (
isObj(v) &&
hasProp(v, '$type') &&
v.$type === 'com.atproto.sync.subscribeAllRepos#repoAppend'
)
}
export function validateRepoAppend(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.sync.subscribeAllRepos#repoAppend', v)
}
export interface RepoRebase {
time: string
repo: string
commit: string
[k: string]: unknown
}
export function isRepoRebase(v: unknown): v is RepoRebase {
return (
isObj(v) &&
hasProp(v, '$type') &&
v.$type === 'com.atproto.sync.subscribeAllRepos#repoRebase'
)
}
export function validateRepoRebase(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.sync.subscribeAllRepos#repoRebase', v)
}

@ -1,3 +1,23 @@
import { wait } from './util'
// reads values from a generator into a list
// NOTE: does not signal generator to close. it *will* continue to produce values
export const readFromGenerator = async <T>(
gen: AsyncGenerator<T>,
maxLength = Number.MAX_SAFE_INTEGER,
timeout = 2000,
): Promise<T[]> => {
const evts: T[] = []
while (evts.length < maxLength) {
const maybeEvt = await Promise.race([gen.next(), wait(timeout)])
if (!maybeEvt) break
const evt = maybeEvt as IteratorResult<T>
if (evt.done) break
evts.push(evt.value)
}
return evts
}
export type Deferrable = {
resolve: () => void
complete: Promise<void>
@ -22,3 +42,57 @@ export const createDeferrables = (count: number): Deferrable[] => {
export const allComplete = async (deferrables: Deferrable[]): Promise<void> => {
await Promise.all(deferrables.map((d) => d.complete))
}
export class AsyncBuffer<T> {
private buffer: T[] = []
private promise: Promise<void>
private resolve: () => void
constructor(public maxSize?: number) {
this.resetPromise()
}
get curr(): T[] {
return this.buffer
}
get size(): number {
return this.buffer.length
}
resetPromise() {
this.promise = new Promise<void>((r) => (this.resolve = r))
}
push(item: T) {
this.buffer.push(item)
this.resolve()
}
pushMany(items: T[]) {
items.forEach((i) => this.buffer.push(i))
this.resolve()
}
async *events(): AsyncGenerator<T> {
while (true) {
await this.promise
if (this.maxSize && this.size > this.maxSize) {
throw new AsyncBufferFullError(this.maxSize)
}
const [first, ...rest] = this.buffer
if (first) {
this.buffer = rest
yield first
} else {
this.resetPromise()
}
}
}
}
export class AsyncBufferFullError extends Error {
constructor(maxSize: number) {
super(`ReachedMaxBufferSize: ${maxSize}`)
}
}

@ -8,3 +8,4 @@ export * from './ipld'
export * from './logger'
export * from './types'
export * from './streams'
export * from './times'

@ -0,0 +1,4 @@
export const SECOND = 1000
export const MINUTE = SECOND * 60
export const HOUR = MINUTE * 60
export const DAY = HOUR * 24

@ -0,0 +1,22 @@
import { readFromGenerator, wait } from '../src'
describe('async', () => {
describe('readFromGenerator', () => {
async function* waitToYield(time: number) {
for (let i = 0; i < 5; i++) {
await wait(time)
yield true
}
}
it('reads from generator with timeout', async () => {
const read = await readFromGenerator(waitToYield(100), undefined, 101)
expect(read).toEqual([true, true, true, true, true])
})
it('stops reading at timeout', async () => {
const read = await readFromGenerator(waitToYield(100), undefined, 99)
expect(read).toEqual([])
})
})
})

@ -10,6 +10,7 @@ import * as plc from '@atproto/plc'
import * as crypto from '@atproto/crypto'
import AtpAgent from '@atproto/api'
import { ServerType, ServerConfig, StartParams } from './types.js'
import { HOUR } from '@atproto/common'
interface Startable {
start(): Promise<http.Server>
@ -102,6 +103,8 @@ export class DevEnvServer {
'f23ecd142835025f42c3db2cf25dd813956c178392760256211f9d315f8ab4d8',
privacyPolicyUrl: 'https://example.com/privacy',
termsOfServiceUrl: 'https://example.com/tos',
maxSubscriptionBuffer: 200,
repoBackfillLimitMs: HOUR,
}),
})
await startServer(pds)

@ -275,13 +275,14 @@ function genNamespaceCls(file: SourceFile, ns: DefTreeNode) {
if (userType.def.type !== 'query' && userType.def.type !== 'procedure') {
continue
}
const isGetReq = userType.def.type === 'query'
const moduleName = toTitleCase(userType.nsid)
const name = toCamelCase(NSID.parse(userType.nsid).name || '')
const method = cls.addMethod({
name,
returnType: `Promise<${moduleName}.Response>`,
})
if (userType.def.type === 'query') {
if (isGetReq) {
method.addParameter({
name: 'params?',
type: `${moduleName}.QueryParams`,
@ -299,7 +300,7 @@ function genNamespaceCls(file: SourceFile, ns: DefTreeNode) {
method.setBodyText(
[
`return this._service.xrpc`,
userType.def.type === 'query'
isGetReq
? `.call('${userType.nsid}', params, undefined, opts)`
: `.call('${userType.nsid}', opts?.qp, data, opts)`,
` .catch((e) => {`,
@ -469,7 +470,11 @@ const lexiconTs = (project, lexicons: Lexicons, lexiconDoc: LexiconDoc) =>
const imports: Set<string> = new Set()
const main = lexiconDoc.defs.main
if (main?.type === 'query' || main?.type === 'procedure') {
if (
main?.type === 'query' ||
main?.type === 'subscription' ||
main?.type === 'procedure'
) {
//= import {Headers, XRPCError} from '@atproto/xrpc'
const xrpcImport = file.addImportDeclaration({
moduleSpecifier: '@atproto/xrpc',
@ -510,6 +515,8 @@ const lexiconTs = (project, lexicons: Lexicons, lexiconDoc: LexiconDoc) =>
genXrpcInput(file, imports, lexicons, lexUri)
genXrpcOutput(file, imports, lexicons, lexUri)
genClientXrpcCommon(file, lexicons, lexUri)
} else if (def.type === 'subscription') {
continue
} else if (def.type === 'record') {
genClientRecord(file, imports, lexicons, lexUri)
} else {
@ -563,7 +570,7 @@ function genClientXrpcCommon(
res.addProperty({ name: 'success', type: 'boolean' })
res.addProperty({ name: 'headers', type: 'Headers' })
if (def.output?.schema) {
if (def.output.encoding.includes(',')) {
if (def.output.encoding?.includes(',')) {
res.addProperty({ name: 'data', type: 'OutputSchema | Uint8Array' })
} else {
res.addProperty({ name: 'data', type: 'OutputSchema' })

@ -10,6 +10,7 @@ import {
LexXrpcProcedure,
LexXrpcQuery,
LexToken,
LexXrpcSubscription,
} from '@atproto/lexicon'
import { toCamelCase, toTitleCase, toScreamingSnakeCase } from './util'
@ -244,6 +245,7 @@ export function genXrpcParams(
) {
const def = lexicons.getDefOrThrow(lexUri, [
'query',
'subscription',
'procedure',
]) as LexXrpcQuery
@ -328,23 +330,20 @@ export function genXrpcOutput(
) {
const def = lexicons.getDefOrThrow(lexUri, [
'query',
'subscription',
'procedure',
]) as LexXrpcQuery
]) as LexXrpcQuery | LexXrpcSubscription | LexXrpcProcedure
if (def.output?.schema) {
if (
def.output.schema.type === 'ref' ||
def.output.schema.type === 'union'
) {
const schema =
def.type === 'subscription' ? def.message?.schema : def.output?.schema
if (schema) {
if (schema.type === 'ref' || schema.type === 'union') {
//= export type OutputSchema = ...
const refs =
def.output.schema.type === 'union'
? def.output.schema.refs
: [def.output.schema.ref]
const refs = schema.type === 'union' ? schema.refs : [schema.ref]
const types = refs.map((ref) =>
refToType(ref, stripScheme(stripHash(lexUri)), imports),
)
if (def.output.schema.type === 'union' && !def.output.schema.closed) {
if (schema.type === 'union' && !schema.closed) {
types.push('{$type: string; [k: string]: unknown}')
}
file.addTypeAlias({
@ -354,7 +353,7 @@ export function genXrpcOutput(
})
} else {
//= export interface OutputSchema {...}
genObject(file, imports, lexUri, def.output.schema, `OutputSchema`)
genObject(file, imports, lexUri, schema, `OutputSchema`)
}
}
}

@ -10,6 +10,7 @@ import {
LexXrpcProcedure,
LexXrpcQuery,
LexRecord,
LexXrpcSubscription,
} from '@atproto/lexicon'
import { NSID } from '@atproto/nsid'
import { gen, lexiconsTs, utilTs } from './common'
@ -60,23 +61,24 @@ const indexTs = (
) =>
gen(project, '/index.ts', async (file) => {
//= import {createServer as createXrpcServer, Server as XrpcServer} from '@atproto/xrpc-server'
const xrpcImport = file.addImportDeclaration({
file.addImportDeclaration({
moduleSpecifier: '@atproto/xrpc-server',
})
xrpcImport.addNamedImport({
name: 'createServer',
alias: 'createXrpcServer',
})
xrpcImport.addNamedImport({
name: 'Server',
alias: 'XrpcServer',
})
xrpcImport.addNamedImport({
name: 'Options',
alias: 'XrpcOptions',
})
xrpcImport.addNamedImport({
name: 'AuthVerifier',
namedImports: [
{
name: 'createServer',
alias: 'createXrpcServer',
},
{
name: 'Server',
alias: 'XrpcServer',
},
{
name: 'Options',
alias: 'XrpcOptions',
},
{ name: 'AuthVerifier' },
{ name: 'StreamAuthVerifier' },
],
})
//= import {schemas} from './lexicons'
file
@ -91,6 +93,7 @@ const indexTs = (
for (const lexiconDoc of lexiconDocs) {
if (
lexiconDoc.defs.main?.type !== 'query' &&
lexiconDoc.defs.main?.type !== 'subscription' &&
lexiconDoc.defs.main?.type !== 'procedure'
) {
continue
@ -192,7 +195,9 @@ const indexTs = (
file.addTypeAlias({
name: 'ExtractAuth',
typeParameters: [{ name: 'AV', constraint: 'AuthVerifier' }],
typeParameters: [
{ name: 'AV', constraint: 'AuthVerifier | StreamAuthVerifier' },
],
type: `Extract<
Awaited<ReturnType<AV>>,
{ credentials: unknown }
@ -243,25 +248,36 @@ function genNamespaceCls(file: SourceFile, ns: DefTreeNode) {
// methods
for (const userType of ns.userTypes) {
if (userType.def.type !== 'query' && userType.def.type !== 'procedure') {
if (
userType.def.type !== 'query' &&
userType.def.type !== 'subscription' &&
userType.def.type !== 'procedure'
) {
continue
}
const moduleName = toTitleCase(userType.nsid)
const name = toCamelCase(NSID.parse(userType.nsid).name || '')
const isSubscription = userType.def.type === 'subscription'
const method = cls.addMethod({
name,
typeParameters: [{ name: 'AV', constraint: 'AuthVerifier' }],
typeParameters: [
{
name: 'AV',
constraint: isSubscription ? 'StreamAuthVerifier' : 'AuthVerifier',
},
],
})
method.addParameter({
name: 'cfg',
type: `ConfigOf<AV, ${moduleName}.Handler<ExtractAuth<AV>>>`,
})
const methodType = isSubscription ? 'streamMethod' : 'method'
method.setBodyText(
[
// Placing schema on separate line, since the following one was being formatted
// into multiple lines and causing the ts-ignore to ignore the wrong line.
`const nsid = '${userType.nsid}' // @ts-ignore`,
`return this._server.xrpc.method(nsid, cfg)`,
`return this._server.xrpc.${methodType}(nsid, cfg)`,
].join('\n'),
)
}
@ -328,7 +344,11 @@ const lexiconTs = (project, lexicons: Lexicons, lexiconDoc: LexiconDoc) =>
genXrpcParams(file, lexicons, lexUri)
genXrpcInput(file, imports, lexicons, lexUri)
genXrpcOutput(file, imports, lexicons, lexUri)
genServerXrpcCommon(file, lexicons, lexUri)
genServerXrpcMethod(file, lexicons, lexUri)
} else if (def.type === 'subscription') {
genXrpcParams(file, lexicons, lexUri)
genXrpcOutput(file, imports, lexicons, lexUri)
genServerXrpcStreaming(file, lexicons, lexUri)
} else if (def.type === 'record') {
genServerRecord(file, imports, lexicons, lexUri)
} else {
@ -342,7 +362,7 @@ const lexiconTs = (project, lexicons: Lexicons, lexiconDoc: LexiconDoc) =>
},
)
function genServerXrpcCommon(
function genServerXrpcMethod(
file: SourceFile,
lexicons: Lexicons,
lexUri: string,
@ -461,6 +481,66 @@ function genServerXrpcCommon(
})
}
function genServerXrpcStreaming(
file: SourceFile,
lexicons: Lexicons,
lexUri: string,
) {
const def = lexicons.getDefOrThrow(lexUri, [
'subscription',
]) as LexXrpcSubscription
file.addImportDeclaration({
moduleSpecifier: '@atproto/xrpc-server',
namedImports: [
{ name: 'HandlerAuth' },
{ name: 'InfoFrame' },
{ name: 'ErrorFrame' },
],
})
file.addImportDeclaration({
moduleSpecifier: 'http',
namedImports: [{ name: 'IncomingMessage' }],
})
// export type HandlerError = ...
file.addTypeAlias({
name: 'HandlerError',
isExported: true,
type: `ErrorFrame<${arrayToUnion(def.errors?.map((e) => e.name))}>`,
})
// export type HandlerInfo = ...
file.addTypeAlias({
name: 'HandlerInfo',
isExported: true,
type: `InfoFrame<${arrayToUnion(def.infos?.map((e) => e.name))}>`,
})
// export type HandlerOutput = ...
file.addTypeAlias({
isExported: true,
name: 'HandlerOutput',
type: `HandlerInfo | HandlerError | ${
def.message?.schema ? 'OutputSchema' : 'void'
}`,
})
file.addTypeAlias({
name: 'Handler',
isExported: true,
typeParameters: [
{ name: 'HA', constraint: 'HandlerAuth', default: 'never' },
],
type: `(ctx: {
auth: HA
params: QueryParams
req: IncomingMessage
}) => AsyncIterable<HandlerOutput>`,
})
}
function genServerRecord(
file: SourceFile,
imports: Set<string>,
@ -474,3 +554,10 @@ function genServerRecord(
//= export function isRecord(v: unknown): v is Record {...}
genObjHelpers(file, lexUri, 'Record')
}
function arrayToUnion(arr?: string[]) {
if (!arr?.length) {
return 'never'
}
return arr.map((item) => `'${item}'`).join(' | ')
}

@ -13,6 +13,7 @@ import {
ValidationError,
isObj,
hasProp,
LexXrpcSubscription,
} from './types'
import {
assertValidRecord,
@ -66,7 +67,7 @@ export class Lexicons {
// WARNING
// mutates the object
// -prf
resolveRefUris(validatedDoc, uri)
resolveRefUris(validatedDoc, uri, true)
this.docs.set(uri, validatedDoc)
for (const [defUri, def] of iterDefs(validatedDoc)) {
@ -166,8 +167,16 @@ export class Lexicons {
*/
assertValidXrpcParams(lexUri: string, value: unknown) {
lexUri = toLexUri(lexUri)
const def = this.getDefOrThrow(lexUri, ['query', 'procedure'])
assertValidXrpcParams(this, def as LexXrpcProcedure | LexXrpcQuery, value)
const def = this.getDefOrThrow(lexUri, [
'query',
'procedure',
'subscription',
])
assertValidXrpcParams(
this,
def as LexXrpcProcedure | LexXrpcQuery | LexXrpcSubscription,
value,
)
}
/**
@ -187,6 +196,14 @@ export class Lexicons {
const def = this.getDefOrThrow(lexUri, ['query', 'procedure'])
assertValidXrpcOutput(this, def as LexXrpcProcedure | LexXrpcQuery, value)
}
/**
* Resolve a lex uri given a ref
*/
resolveLexUri(lexUri: string, ref: string) {
lexUri = toLexUri(lexUri)
return toLexUri(ref, lexUri)
}
}
function* iterDefs(doc: LexiconDoc): Generator<[string, LexUserType]> {
@ -201,7 +218,7 @@ function* iterDefs(doc: LexiconDoc): Generator<[string, LexUserType]> {
// WARNING
// this method mutates objects
// -prf
function resolveRefUris(obj: any, baseUri: string): any {
function resolveRefUris(obj: any, baseUri: string, root?: boolean): any {
for (const k in obj) {
if (obj.type === 'ref') {
obj.ref = toLexUri(obj.ref, baseUri)
@ -220,5 +237,16 @@ function resolveRefUris(obj: any, baseUri: string): any {
obj[k] = resolveRefUris(obj[k], baseUri)
}
}
if (root && obj?.defs?.main?.type === 'subscription') {
const sub = obj.defs.main as LexXrpcSubscription
if (sub.message?.codes) {
sub.message.codes = Object.entries(sub.message.codes).reduce(
(acc, [key, code]) => {
return Object.assign(acc, { [toLexUri(key, baseUri)]: code })
},
{} as Record<string, number>,
)
}
}
return obj
}

@ -186,6 +186,15 @@ export const lexXrpcBody = z.object({
})
export type LexXrpcBody = z.infer<typeof lexXrpcBody>
export const lexXrpcSubscriptionMessage = z.object({
description: z.string().optional(),
schema: z.union([lexRefVariant, lexObject]).optional(),
codes: z.record(z.number().int()).optional(),
})
export type LexXrpcSubscriptionMessage = z.infer<
typeof lexXrpcSubscriptionMessage
>
export const lexXrpcError = z.object({
name: z.string(),
description: z.string().optional(),
@ -211,6 +220,16 @@ export const lexXrpcProcedure = z.object({
})
export type LexXrpcProcedure = z.infer<typeof lexXrpcProcedure>
export const lexXrpcSubscription = z.object({
type: z.literal('subscription'),
description: z.string().optional(),
parameters: lexXrpcParameters.optional(),
message: lexXrpcSubscriptionMessage.optional(),
infos: lexXrpcError.array().optional(),
errors: lexXrpcError.array().optional(),
})
export type LexXrpcSubscription = z.infer<typeof lexXrpcSubscription>
// database
// =
@ -230,6 +249,7 @@ export const lexUserType = z.union([
lexXrpcQuery,
lexXrpcProcedure,
lexXrpcSubscription,
lexBlob,
lexImage,
@ -266,11 +286,12 @@ export const lexiconDoc = z
defId !== 'main' &&
(def.type === 'record' ||
def.type === 'procedure' ||
def.type === 'query')
def.type === 'query' ||
def.type === 'subscription')
) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message: `Records, procedures, and queries must be the main definition.`,
message: `Records, procedures, queries, and subscriptions must be the main definition.`,
})
}
}

@ -1,5 +1,10 @@
import { Lexicons } from './lexicons'
import { LexRecord, LexXrpcProcedure, LexXrpcQuery } from './types'
import {
LexRecord,
LexXrpcProcedure,
LexXrpcQuery,
LexXrpcSubscription,
} from './types'
import { assertValidOneOf } from './util'
import * as ComplexValidators from './validators/complex'
@ -16,7 +21,7 @@ export function assertValidRecord(
export function assertValidXrpcParams(
lexicons: Lexicons,
def: LexXrpcProcedure | LexXrpcQuery,
def: LexXrpcProcedure | LexXrpcQuery | LexXrpcSubscription,
value: unknown,
) {
if (def.parameters) {

@ -1,93 +0,0 @@
import { CID } from 'multiformats/cid'
import * as repo from '@atproto/repo'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../../lexicon'
import SqlRepoStorage from '../../../sql-repo-storage'
import AppContext from '../../../context'
export default function (server: Server, ctx: AppContext) {
server.com.atproto.sync.getHead(async ({ params }) => {
const { did } = params
const storage = new SqlRepoStorage(ctx.db, did)
const root = await storage.getHead()
if (root === null) {
throw new InvalidRequestError(`Could not find root for DID: ${did}`)
}
return {
encoding: 'application/json',
body: { root: root.toString() },
}
})
server.com.atproto.sync.getCommitPath(async ({ params }) => {
const { did } = params
const storage = new SqlRepoStorage(ctx.db, did)
const earliest = params.earliest ? CID.parse(params.earliest) : null
const latest = params.latest
? CID.parse(params.latest)
: await storage.getHead()
if (latest === null) {
throw new InvalidRequestError(`Could not find root for DID: ${did}`)
}
const commitPath = await storage.getCommitPath(latest, earliest)
if (commitPath === null) {
throw new InvalidRequestError(
`Could not find a valid commit path from ${latest.toString()} to ${earliest?.toString()}`,
)
}
const commits = commitPath.map((c) => c.toString())
return {
encoding: 'application/json',
body: { commits },
}
})
server.com.atproto.sync.getRepo(async ({ params }) => {
const { did } = params
const storage = new SqlRepoStorage(ctx.db, did)
const earliest = params.earliest ? CID.parse(params.earliest) : null
const latest = params.latest
? CID.parse(params.latest)
: await storage.getHead()
if (latest === null) {
throw new InvalidRequestError(`Could not find repo for DID: ${did}`)
}
const diff = await repo.getDiff(storage, latest, earliest)
return {
encoding: 'application/vnd.ipld.car',
body: Buffer.from(diff),
}
})
server.com.atproto.sync.getCheckout(async ({ params }) => {
const { did } = params
const storage = new SqlRepoStorage(ctx.db, did)
const commit = params.commit
? CID.parse(params.commit)
: await storage.getHead()
if (!commit) {
throw new InvalidRequestError(`Could not find repo for DID: ${did}`)
}
const checkout = await repo.getCheckout(storage, commit)
return {
encoding: 'application/vnd.ipld.car',
body: Buffer.from(checkout),
}
})
server.com.atproto.sync.getRecord(async ({ params }) => {
const { did, collection, rkey } = params
const storage = new SqlRepoStorage(ctx.db, did)
const commit = params.commit
? CID.parse(params.commit)
: await storage.getHead()
if (!commit) {
throw new InvalidRequestError(`Could not find repo for DID: ${did}`)
}
const proof = await repo.getRecords(storage, commit, [{ collection, rkey }])
return {
encoding: 'application/vnd.ipld.car',
body: Buffer.from(proof),
}
})
}

@ -0,0 +1,24 @@
import { CID } from 'multiformats/cid'
import * as repo from '@atproto/repo'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../../../lexicon'
import SqlRepoStorage from '../../../../sql-repo-storage'
import AppContext from '../../../../context'
export default function (server: Server, ctx: AppContext) {
server.com.atproto.sync.getCheckout(async ({ params }) => {
const { did } = params
const storage = new SqlRepoStorage(ctx.db, did)
const commit = params.commit
? CID.parse(params.commit)
: await storage.getHead()
if (!commit) {
throw new InvalidRequestError(`Could not find repo for DID: ${did}`)
}
const checkout = await repo.getCheckout(storage, commit)
return {
encoding: 'application/vnd.ipld.car',
body: Buffer.from(checkout),
}
})
}

@ -0,0 +1,30 @@
import { CID } from 'multiformats/cid'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../../../lexicon'
import SqlRepoStorage from '../../../../sql-repo-storage'
import AppContext from '../../../../context'
export default function (server: Server, ctx: AppContext) {
server.com.atproto.sync.getCommitPath(async ({ params }) => {
const { did } = params
const storage = new SqlRepoStorage(ctx.db, did)
const earliest = params.earliest ? CID.parse(params.earliest) : null
const latest = params.latest
? CID.parse(params.latest)
: await storage.getHead()
if (latest === null) {
throw new InvalidRequestError(`Could not find root for DID: ${did}`)
}
const commitPath = await storage.getCommitPath(latest, earliest)
if (commitPath === null) {
throw new InvalidRequestError(
`Could not find a valid commit path from ${latest.toString()} to ${earliest?.toString()}`,
)
}
const commits = commitPath.map((c) => c.toString())
return {
encoding: 'application/json',
body: { commits },
}
})
}

@ -0,0 +1,19 @@
import { InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../../../lexicon'
import SqlRepoStorage from '../../../../sql-repo-storage'
import AppContext from '../../../../context'
export default function (server: Server, ctx: AppContext) {
server.com.atproto.sync.getHead(async ({ params }) => {
const { did } = params
const storage = new SqlRepoStorage(ctx.db, did)
const root = await storage.getHead()
if (root === null) {
throw new InvalidRequestError(`Could not find root for DID: ${did}`)
}
return {
encoding: 'application/json',
body: { root: root.toString() },
}
})
}

@ -0,0 +1,24 @@
import { CID } from 'multiformats/cid'
import * as repo from '@atproto/repo'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../../../lexicon'
import SqlRepoStorage from '../../../../sql-repo-storage'
import AppContext from '../../../../context'
export default function (server: Server, ctx: AppContext) {
server.com.atproto.sync.getRecord(async ({ params }) => {
const { did, collection, rkey } = params
const storage = new SqlRepoStorage(ctx.db, did)
const commit = params.commit
? CID.parse(params.commit)
: await storage.getHead()
if (!commit) {
throw new InvalidRequestError(`Could not find repo for DID: ${did}`)
}
const proof = await repo.getRecords(storage, commit, [{ collection, rkey }])
return {
encoding: 'application/vnd.ipld.car',
body: Buffer.from(proof),
}
})
}

@ -0,0 +1,25 @@
import { CID } from 'multiformats/cid'
import * as repo from '@atproto/repo'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../../../lexicon'
import SqlRepoStorage from '../../../../sql-repo-storage'
import AppContext from '../../../../context'
export default function (server: Server, ctx: AppContext) {
server.com.atproto.sync.getRepo(async ({ params }) => {
const { did } = params
const storage = new SqlRepoStorage(ctx.db, did)
const earliest = params.earliest ? CID.parse(params.earliest) : null
const latest = params.latest
? CID.parse(params.latest)
: await storage.getHead()
if (latest === null) {
throw new InvalidRequestError(`Could not find repo for DID: ${did}`)
}
const diff = await repo.getDiff(storage, latest, earliest)
return {
encoding: 'application/vnd.ipld.car',
body: Buffer.from(diff),
}
})
}

@ -0,0 +1,17 @@
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import getCheckout from './getCheckout'
import getCommitPath from './getCommitPath'
import getHead from './getHead'
import getRecord from './getRecord'
import getRepo from './getRepo'
import subscribe from './subscribeAllRepos'
export default function (server: Server, ctx: AppContext) {
getCheckout(server, ctx)
getCommitPath(server, ctx)
getHead(server, ctx)
getRecord(server, ctx)
getRepo(server, ctx)
subscribe(server, ctx)
}

@ -0,0 +1,28 @@
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import Outbox from '../../../../sequencer/outbox'
import { RepoAppend } from '../../../../lexicon/types/com/atproto/sync/subscribeAllRepos'
import { InvalidRequestError } from '@atproto/xrpc-server'
export default function (server: Server, ctx: AppContext) {
server.com.atproto.sync.subscribeAllRepos(async function* ({ params }) {
const outbox = new Outbox(ctx.sequencer, {
maxBufferSize: ctx.cfg.maxSubscriptionBuffer,
})
const backfillFrom = params.backfillFrom
if (backfillFrom) {
const now = Date.now()
const backfillUnix = new Date(backfillFrom).getTime()
if (isNaN(backfillUnix)) {
throw new InvalidRequestError('Invalid "backfillFrom"')
}
if (now - backfillUnix > ctx.cfg.repoBackfillLimitMs) {
throw new InvalidRequestError('Backfill request too long')
}
}
for await (const evt of outbox.events(backfillFrom)) {
const { time, repo, commit, prev, blocks, blobs } = evt
yield { time, repo, commit, prev, blocks, blobs } as RepoAppend
}
})
}

@ -1,3 +1,5 @@
import { DAY } from '@atproto/common'
export interface ServerConfigValues {
debugMode?: boolean
version: string
@ -37,6 +39,9 @@ export interface ServerConfigValues {
appUrlPasswordReset: string
emailSmtpUrl?: string
emailNoReplyAddress: string
maxSubscriptionBuffer: number
repoBackfillLimitMs: number
}
export class ServerConfig {
@ -61,7 +66,7 @@ export class ServerConfig {
} else {
scheme = hostname === 'localhost' ? 'http' : 'https'
}
const envPort = parseInt(process.env.PORT || '')
const envPort = parseInt(process.env.PORT || '', 10)
const port = isNaN(envPort) ? 2583 : envPort
const jwtSecret = process.env.JWT_SECRET || 'jwt_secret'
@ -112,6 +117,12 @@ export class ServerConfig {
const dbPostgresUrl = process.env.DB_POSTGRES_URL
const dbPostgresSchema = process.env.DB_POSTGRES_SCHEMA
const maxBuffer = parseInt(process.env.MAX_SUBSCRIPTION_BUFFER || '', 10)
const maxSubscriptionBuffer = isNaN(maxBuffer) ? 500 : maxBuffer
const backfillLimit = parseInt(process.env.REPO_BACKFILL_LIMIT_MS || '', 10)
const repoBackfillLimitMs = isNaN(backfillLimit) ? DAY : backfillLimit
return new ServerConfig({
debugMode,
version,
@ -140,6 +151,8 @@ export class ServerConfig {
appUrlPasswordReset,
emailSmtpUrl,
emailNoReplyAddress,
maxSubscriptionBuffer,
repoBackfillLimitMs,
...overrides,
})
}
@ -281,4 +294,12 @@ export class ServerConfig {
get emailNoReplyAddress() {
return this.cfg.emailNoReplyAddress
}
get maxSubscriptionBuffer() {
return this.cfg.maxSubscriptionBuffer
}
get repoBackfillLimitMs() {
return this.cfg.repoBackfillLimitMs
}
}

@ -8,6 +8,7 @@ import { BlobStore } from '@atproto/repo'
import { ImageUriBuilder } from './image/uri'
import { Services } from './services'
import { MessageQueue } from './event-stream/types'
import Sequencer from './sequencer'
export class AppContext {
constructor(
@ -21,6 +22,7 @@ export class AppContext {
mailer: ServerMailer
services: Services
messageQueue: MessageQueue
sequencer: Sequencer
},
) {}
@ -76,6 +78,10 @@ export class AppContext {
return this.opts.messageQueue
}
get sequencer(): Sequencer {
return this.opts.sequencer
}
get plcClient(): plc.PlcClient {
return new plc.PlcClient(this.cfg.didPlcUrl)
}

@ -26,6 +26,7 @@ import * as messageQueue from './tables/message-queue'
import * as messageQueueCursor from './tables/message-queue-cursor'
import * as moderation from './tables/moderation'
import * as mute from './tables/mute'
import * as repoSeq from './tables/repo-seq'
export type DatabaseSchemaType = user.PartialDB &
didHandle.PartialDB &
@ -55,7 +56,8 @@ export type DatabaseSchemaType = user.PartialDB &
messageQueue.PartialDB &
messageQueueCursor.PartialDB &
moderation.PartialDB &
mute.PartialDB
mute.PartialDB &
repoSeq.PartialDB
export type DatabaseSchema = Kysely<DatabaseSchemaType>

@ -1,11 +1,7 @@
import assert from 'assert'
import { Kysely, SqliteDialect, PostgresDialect, Migrator } from 'kysely'
import { Kysely, SqliteDialect, PostgresDialect, Migrator, sql } from 'kysely'
import SqliteDB from 'better-sqlite3'
import {
Pool as PgPool,
PoolClient as PgPoolClient,
types as pgTypes,
} from 'pg'
import { Pool as PgPool, Client as PgClient, types as pgTypes } from 'pg'
import EventEmitter from 'events'
import TypedEmitter from 'typed-emitter'
import DatabaseSchema, { DatabaseSchemaType } from './database-schema'
@ -15,18 +11,26 @@ import { CtxMigrationProvider } from './migrations/provider'
import { dbLogger as log } from '../logger'
export class Database {
channels: Channels = {
repo_seq: new EventEmitter() as ChannelEmitter,
}
txChannelMsgs: ChannelMsg[] = []
channels: Channels
migrator: Migrator
private channelClient: PgPoolClient | null = null
destroyed = false
constructor(public db: DatabaseSchema, public cfg: DialectConfig) {
private channelClient: PgClient | null = null
constructor(
public db: DatabaseSchema,
public cfg: DialectConfig,
channels?: Channels,
) {
this.migrator = new Migrator({
db,
migrationTableSchema: cfg.dialect === 'pg' ? cfg.schema : undefined,
provider: new CtxMigrationProvider(migrations, cfg.dialect),
})
this.channels = channels || {
repo_seq: new EventEmitter() as ChannelEmitter,
}
}
static sqlite(location: string): Database {
@ -39,9 +43,8 @@ export class Database {
}
static postgres(opts: PgOptions): Database {
const { schema } = opts
const pool =
'pool' in opts ? opts.pool : new PgPool({ connectionString: opts.url })
const { schema, url } = opts
const pool = opts.pool ?? new PgPool({ connectionString: url })
// Select count(*) and other pg bigints as js integer
pgTypes.setTypeParser(pgTypes.builtins.INT8, (n) => parseInt(n, 10))
@ -63,7 +66,7 @@ export class Database {
dialect: new PostgresDialect({ pool }),
})
return new Database(db, { dialect: 'pg', pool, schema })
return new Database(db, { dialect: 'pg', pool, schema, url })
}
static memory(): Database {
@ -72,10 +75,14 @@ export class Database {
async startListeningToChannels() {
if (this.cfg.dialect !== 'pg') return
this.channelClient = await this.cfg.pool.connect()
await this.channelClient.query(`LISTEN repo_seq`)
if (this.channelClient) return
this.channelClient = new PgClient(this.cfg.url)
await this.channelClient.connect()
await this.channelClient.query(
`LISTEN ${this.getSchemaChannel('repo_seq')}`,
)
this.channelClient.on('notification', (msg) => {
const channel = this.channels[msg.channel]
const channel = this.channels[this.normalizeSchemaChannel(msg.channel)]
if (channel) {
channel.emit('message')
}
@ -87,12 +94,49 @@ export class Database {
})
}
notify(channel: keyof Channels) {
async notify(channel: keyof Channels) {
if (channel !== 'repo_seq') {
throw new Error(`attempted sending on unavailable channel: ${channel}`)
}
// hardcoded b/c of type system & we only have one msg type
const message: ChannelMsg = 'repo_seq'
// if in a sqlite tx, we buffer the notification until the tx successfully commits
if (this.isTransaction && this.dialect === 'sqlite') {
// no duplicate notifies in a tx per Postgres semantics
if (!this.txChannelMsgs.includes(message)) {
this.txChannelMsgs.push(message)
}
} else {
await this.sendChannelMsg(message)
}
}
private getSchemaChannel(channel: string) {
if (this.cfg.dialect === 'pg' && this.cfg.schema) {
return this.cfg.schema + '_' + channel
} else {
return channel
}
}
private normalizeSchemaChannel(schemaChannel: string): string {
if (this.cfg.dialect === 'pg' && this.cfg.schema) {
const prefix = this.cfg.schema + '_'
if (schemaChannel.startsWith(prefix)) {
return schemaChannel.slice(prefix.length)
} else {
return schemaChannel
}
} else {
return schemaChannel
}
}
private async sendChannelMsg(channel: ChannelMsg) {
if (this.cfg.dialect === 'pg') {
this.cfg.pool.query(`NOTIFY ${channel}`)
const { ref } = this.db.dynamic
await sql`NOTIFY ${ref(this.getSchemaChannel(channel))}`.execute(this.db)
} else {
const emitter = this.channels[channel]
if (emitter) {
@ -102,10 +146,17 @@ export class Database {
}
async transaction<T>(fn: (db: Database) => Promise<T>): Promise<T> {
return await this.db.transaction().execute((txn) => {
const dbTxn = new Database(txn, this.cfg)
return fn(dbTxn)
let txMsgs: ChannelMsg[] = []
const res = await this.db.transaction().execute(async (txn) => {
const dbTxn = new Database(txn, this.cfg, this.channels)
const txRes = await fn(dbTxn)
txMsgs = dbTxn.txChannelMsgs
return txRes
})
txMsgs.forEach((msg) => {
this.sendChannelMsg(msg)
})
return res
}
get schema(): string | undefined {
@ -125,13 +176,12 @@ export class Database {
}
async close(): Promise<void> {
this.channelClient?.removeAllListeners()
this.channelClient?.release()
// @TODO investigate
// if (this.cfg.dialect === 'pg') {
// await this.cfg.pool.end()
// }
if (this.destroyed) return
if (this.channelClient) {
await this.channelClient.end()
}
await this.db.destroy()
this.destroyed = true
}
async migrateToOrThrow(migration: string) {
@ -172,6 +222,7 @@ export type DialectConfig = PgConfig | SqliteConfig
export type PgConfig = {
dialect: 'pg'
pool: PgPool
url: string
schema?: string
}
@ -182,9 +233,11 @@ export type SqliteConfig = {
// Can use with typeof to get types for partial queries
export const dbType = new Kysely<DatabaseSchema>({ dialect: dummyDialect })
type PgOptions =
| { url: string; schema?: string }
| { pool: PgPool; schema?: string }
type PgOptions = {
url: string
pool?: PgPool
schema?: string
}
type ChannelEvents = {
message: () => void
@ -192,6 +245,8 @@ type ChannelEvents = {
type ChannelEmitter = TypedEmitter<ChannelEvents>
type ChannelMsg = 'repo_seq'
type Channels = {
repo_seq: ChannelEmitter
}

@ -0,0 +1,41 @@
import { Kysely } from 'kysely'
import { Dialect } from '..'
const repoSeqTable = 'repo_seq'
const repoSeqDidIndex = 'repo_seq_did_index'
const repoSeqCommitIndex = 'repo_seq_commit_index'
export async function up(db: Kysely<unknown>, dialect: Dialect): Promise<void> {
let builder = db.schema.createTable(repoSeqTable)
if (dialect === 'pg') {
builder = builder.addColumn('seq', 'serial', (col) => col.primaryKey())
} else {
builder = builder.addColumn('seq', 'integer', (col) =>
col.autoIncrement().primaryKey(),
)
}
await builder
.addColumn('did', 'varchar', (col) => col.notNull())
.addColumn('commit', 'varchar', (col) => col.notNull())
.addColumn('eventType', 'varchar', (col) => col.notNull())
.addColumn('sequencedAt', 'varchar', (col) => col.notNull())
.execute()
await db.schema
.createIndex(repoSeqDidIndex)
.on(repoSeqTable)
.column('did')
.execute()
await db.schema
.createIndex(repoSeqCommitIndex)
.on(repoSeqTable)
.column('commit')
.execute()
}
export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema.dropIndex(repoSeqCommitIndex).execute()
await db.schema.dropIndex(repoSeqDidIndex).execute()
await db.schema.dropTable(repoSeqTable).execute()
}

@ -16,3 +16,4 @@ export * as _20230201T200606704Z from './20230201T200606704Z-repo-sync-data-pt2'
export * as _20230202T170426672Z from './20230202T170426672Z-user-partitioned-cids'
export * as _20230202T170435937Z from './20230202T170435937Z-delete-account-token'
export * as _20230202T172831900Z from './20230202T172831900Z-moderation-subject-blob'
export * as _20230202T213952826Z from './20230202T213952826Z-repo-seq'

@ -0,0 +1,13 @@
import { Generated } from 'kysely'
export interface RepoSeq {
seq: Generated<number>
did: string
commit: string
eventType: 'repo_append'
sequencedAt: string
}
export const tableName = 'repo_seq'
export type PartialDB = { [tableName]: RepoSeq }

@ -26,6 +26,7 @@ import { BlobDiskCache, ImageProcessingServer } from './image/server'
import { createServices } from './services'
import { createHttpTerminator, HttpTerminator } from 'http-terminator'
import AppContext from './context'
import Sequencer from './sequencer'
import {
ImageInvalidator,
ImageProcessingServerInvalidator,
@ -65,6 +66,7 @@ export class PDS {
})
const messageQueue = new SqlMessageQueue('pds', db)
const sequencer = new Sequencer(db)
const mailTransport =
config.emailSmtpUrl !== undefined
@ -121,6 +123,7 @@ export class PDS {
cfg: config,
auth,
messageQueue,
sequencer,
services,
mailer,
imgUriBuilder,
@ -146,6 +149,8 @@ export class PDS {
}
async start(): Promise<http.Server> {
await this.ctx.sequencer.start()
await this.ctx.db.startListeningToChannels()
const server = this.app.listen(this.ctx.cfg.port)
this.server = server
this.terminator = createHttpTerminator({ server })

@ -6,6 +6,7 @@ import {
Server as XrpcServer,
Options as XrpcOptions,
AuthVerifier,
StreamAuthVerifier,
} from '@atproto/xrpc-server'
import { schemas } from './lexicons'
import * as ComAtprotoAccountCreate from './types/com/atproto/account/create'
@ -45,6 +46,7 @@ import * as ComAtprotoSyncGetCommitPath from './types/com/atproto/sync/getCommit
import * as ComAtprotoSyncGetHead from './types/com/atproto/sync/getHead'
import * as ComAtprotoSyncGetRecord from './types/com/atproto/sync/getRecord'
import * as ComAtprotoSyncGetRepo from './types/com/atproto/sync/getRepo'
import * as ComAtprotoSyncSubscribeAllRepos from './types/com/atproto/sync/subscribeAllRepos'
import * as AppBskyActorGetProfile from './types/app/bsky/actor/getProfile'
import * as AppBskyActorGetProfiles from './types/app/bsky/actor/getProfiles'
import * as AppBskyActorGetSuggestions from './types/app/bsky/actor/getSuggestions'
@ -494,6 +496,13 @@ export class SyncNS {
const nsid = 'com.atproto.sync.getRepo' // @ts-ignore
return this._server.xrpc.method(nsid, cfg)
}
subscribeAllRepos<AV extends StreamAuthVerifier>(
cfg: ConfigOf<AV, ComAtprotoSyncSubscribeAllRepos.Handler<ExtractAuth<AV>>>,
) {
const nsid = 'com.atproto.sync.subscribeAllRepos' // @ts-ignore
return this._server.xrpc.streamMethod(nsid, cfg)
}
}
export class AppNS {
@ -720,7 +729,7 @@ type ConfigOf<Auth, Handler> =
auth?: Auth
handler: Handler
}
type ExtractAuth<AV extends AuthVerifier> = Extract<
type ExtractAuth<AV extends AuthVerifier | StreamAuthVerifier> = Extract<
Awaited<ReturnType<AV>>,
{ credentials: unknown }
>

@ -2173,6 +2173,81 @@ export const schemaDict = {
},
},
},
ComAtprotoSyncSubscribeAllRepos: {
lexicon: 1,
id: 'com.atproto.sync.subscribeAllRepos',
defs: {
main: {
type: 'subscription',
description: 'Subscribe to repo updates',
parameters: {
type: 'params',
properties: {
backfillFrom: {
type: 'datetime',
description:
'The last known event to backfill from. Does not dedupe as there may be an overlap in timestamps.',
},
},
},
message: {
schema: {
type: 'union',
refs: [
'lex:com.atproto.sync.subscribeAllRepos#repoAppend',
'lex:com.atproto.sync.subscribeAllRepos#repoRebase',
],
},
codes: {
'lex:com.atproto.sync.subscribeAllRepos#repoAppend': 0,
'lex:com.atproto.sync.subscribeAllRepos#repoRebase': 1,
},
},
},
repoAppend: {
type: 'object',
required: ['time', 'repo', 'commit', 'blocks', 'blobs'],
properties: {
time: {
type: 'datetime',
},
repo: {
type: 'string',
},
commit: {
type: 'string',
},
prev: {
type: 'string',
},
blocks: {
type: 'unknown',
},
blobs: {
type: 'array',
items: {
type: 'string',
},
},
},
},
repoRebase: {
type: 'object',
required: ['time', 'repo', 'commit'],
properties: {
time: {
type: 'datetime',
},
repo: {
type: 'string',
},
commit: {
type: 'string',
},
},
},
},
},
AppBskyActorGetProfile: {
lexicon: 1,
id: 'app.bsky.actor.getProfile',
@ -4119,6 +4194,7 @@ export const ids = {
ComAtprotoSyncGetHead: 'com.atproto.sync.getHead',
ComAtprotoSyncGetRecord: 'com.atproto.sync.getRecord',
ComAtprotoSyncGetRepo: 'com.atproto.sync.getRepo',
ComAtprotoSyncSubscribeAllRepos: 'com.atproto.sync.subscribeAllRepos',
AppBskyActorGetProfile: 'app.bsky.actor.getProfile',
AppBskyActorGetProfiles: 'app.bsky.actor.getProfiles',
AppBskyActorGetSuggestions: 'app.bsky.actor.getSuggestions',

@ -0,0 +1,67 @@
/**
* GENERATED CODE - DO NOT MODIFY
*/
import { ValidationResult } from '@atproto/lexicon'
import { lexicons } from '../../../../lexicons'
import { isObj, hasProp } from '../../../../util'
import { HandlerAuth, InfoFrame, ErrorFrame } from '@atproto/xrpc-server'
import { IncomingMessage } from 'http'
export interface QueryParams {
/** The last known event to backfill from. Does not dedupe as there may be an overlap in timestamps. */
backfillFrom?: string
}
export type OutputSchema =
| RepoAppend
| RepoRebase
| { $type: string; [k: string]: unknown }
export type HandlerError = ErrorFrame<never>
export type HandlerInfo = InfoFrame<never>
export type HandlerOutput = HandlerInfo | HandlerError | OutputSchema
export type Handler<HA extends HandlerAuth = never> = (ctx: {
auth: HA
params: QueryParams
req: IncomingMessage
}) => AsyncIterable<HandlerOutput>
export interface RepoAppend {
time: string
repo: string
commit: string
prev?: string
blocks: {}
blobs: string[]
[k: string]: unknown
}
export function isRepoAppend(v: unknown): v is RepoAppend {
return (
isObj(v) &&
hasProp(v, '$type') &&
v.$type === 'com.atproto.sync.subscribeAllRepos#repoAppend'
)
}
export function validateRepoAppend(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.sync.subscribeAllRepos#repoAppend', v)
}
export interface RepoRebase {
time: string
repo: string
commit: string
[k: string]: unknown
}
export function isRepoRebase(v: unknown): v is RepoRebase {
return (
isObj(v) &&
hasProp(v, '$type') &&
v.$type === 'com.atproto.sync.subscribeAllRepos#repoRebase'
)
}
export function validateRepoRebase(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.sync.subscribeAllRepos#repoRebase', v)
}

@ -5,6 +5,7 @@ import * as jwt from 'jsonwebtoken'
import { parseBasicAuth } from './auth'
export const dbLogger = subsystemLogger('pds:db')
export const seqLogger = subsystemLogger('pds:sequencer')
export const mailerLogger = subsystemLogger('pds:mailer')
export const httpLogger = subsystemLogger('pds')

@ -0,0 +1,184 @@
import { BlockMap, writeCar } from '@atproto/repo'
import EventEmitter from 'events'
import TypedEmitter from 'typed-emitter'
import { CID } from 'multiformats/cid'
import Database from '../db'
import { seqLogger as log } from '../logger'
export class Sequencer extends (EventEmitter as new () => SequencerEmitter) {
polling = false
queued = false
constructor(public db: Database, public lastSeen?: number) {
super()
// note: this does not err when surpassed, just prints a warning to stderr
this.setMaxListeners(100)
}
async start() {
const found = await this.db.db
.selectFrom('repo_seq')
.selectAll()
.orderBy('seq', 'desc')
.limit(1)
.executeTakeFirst()
if (found) {
this.lastSeen = found.seq
}
this.db.channels.repo_seq.addListener('message', () => {
if (this.polling) {
this.queued = true
} else {
this.polling = true
this.pollDb()
}
})
}
async requestSeqRange(opts: {
earliestSeq?: number
earliestTime?: string
limit?: number
}): Promise<RepoAppendEvent[]> {
const { earliestSeq, earliestTime, limit } = opts
let seqQb = this.db.db
.selectFrom('repo_seq')
.selectAll()
.orderBy('seq', 'asc')
if (earliestSeq !== undefined) {
seqQb = seqQb.where('seq', '>', earliestSeq)
}
if (earliestTime !== undefined) {
seqQb = seqQb.where('sequencedAt', '>=', earliestTime)
}
if (limit !== undefined) {
seqQb = seqQb.limit(limit)
}
const getEvents = this.db.db
.selectFrom(seqQb.as('repo_seq'))
.leftJoin('repo_commit_history', (join) =>
join
.onRef('repo_commit_history.creator', '=', 'repo_seq.did')
.onRef('repo_commit_history.commit', '=', 'repo_seq.commit'),
)
.select([
'repo_seq.seq as seq',
'repo_seq.did as did',
'repo_seq.commit as commit',
'repo_seq.sequencedAt as sequencedAt',
'repo_commit_history.prev as prev',
])
.orderBy('seq', 'asc')
.execute()
const getBlocks = this.db.db
.selectFrom(seqQb.as('repo_seq'))
.innerJoin('repo_commit_block', (join) =>
join
.onRef('repo_commit_block.creator', '=', 'repo_seq.did')
.onRef('repo_commit_block.commit', '=', 'repo_seq.commit'),
)
.innerJoin('ipld_block', (join) =>
join
.onRef('ipld_block.cid', '=', 'repo_commit_block.block')
.onRef('ipld_block.creator', '=', 'repo_commit_block.creator'),
)
.select([
'repo_seq.seq as seq',
'ipld_block.cid as cid',
'ipld_block.content as content',
])
.execute()
const getBlobs = this.db.db
.selectFrom(seqQb.as('repo_seq'))
.innerJoin('repo_blob', (join) =>
join
.onRef('repo_blob.did', '=', 'repo_blob.did')
.onRef('repo_blob.commit', '=', 'repo_seq.commit'),
)
.select(['repo_seq.seq as seq', 'repo_blob.cid as cid'])
.execute()
const [events, blocks, blobs] = await Promise.all([
getEvents,
getBlocks,
getBlobs,
])
const blocksBySeq = blocks.reduce((acc, cur) => {
acc[cur.seq] ??= new BlockMap()
acc[cur.seq].set(CID.parse(cur.cid), cur.content)
return acc
}, {} as Record<number, BlockMap>)
const blobsBySeq = blobs.reduce((acc, cur) => {
acc[cur.seq] ??= []
acc[cur.seq].push(cur.cid)
return acc
}, {} as Record<number, string[]>)
return Promise.all(
events.map(async (evt) => {
const commit = CID.parse(evt.commit)
const carSlice = await writeCar(commit, async (car) => {
const blocks = blocksBySeq[evt.seq]
if (blocks) {
for (const block of blocks.entries()) {
await car.put(block)
}
}
})
const blobs = blobsBySeq[evt.seq] || []
return {
seq: evt.seq,
time: evt.sequencedAt,
repo: evt.did,
commit: evt.commit,
prev: evt.prev || undefined,
blocks: carSlice,
blobs,
} as RepoAppendEvent
}),
)
}
async pollDb() {
try {
const evts = await this.requestSeqRange({ earliestSeq: this.lastSeen })
if (evts.length > 0) {
this.lastSeen = evts[evts.length - 1].seq
this.emit('events', evts)
}
} catch (err) {
log.error({ err, lastSeen: this.lastSeen }, 'sequencer failed to poll db')
} finally {
// check if we should continue polling
if (this.queued === false) {
this.polling = false
} else {
this.queued = false
this.pollDb()
}
}
}
}
export type RepoAppendEvent = {
seq: number
time: string
repo: string
commit: string
prev?: string
blocks: Uint8Array
blobs: string[]
}
type SequencerEvents = {
events: (evts: RepoAppendEvent[]) => void
}
export type SequencerEmitter = TypedEmitter<SequencerEvents>
export default Sequencer

@ -0,0 +1,112 @@
import { AsyncBuffer, AsyncBufferFullError } from '@atproto/common'
import Sequencer, { RepoAppendEvent } from '.'
export type OutboxOpts = {
maxBufferSize: number
}
export class Outbox {
caughtUp = false
lastSeen = -1
cutoverBuffer: RepoAppendEvent[]
outBuffer: AsyncBuffer<RepoAppendEvent>
constructor(public sequencer: Sequencer, opts: Partial<OutboxOpts> = {}) {
const { maxBufferSize = 500 } = opts
this.cutoverBuffer = []
this.outBuffer = new AsyncBuffer<RepoAppendEvent>(maxBufferSize)
}
// event stream occurs in 3 phases
// 1. backfill events: events that have been added to the DB since the last time a connection was open.
// The outbox is not yet listening for new events from the sequencer
// 2. cutover: the outbox has caught up with where the sequencer purports to be,
// but the sequencer might already be halfway through sending out a round of updates.
// Therefore, we start accepting the sequencer's events in a buffer, while making our own request to the
// database to ensure we're caught up. We then dedupe the query & the buffer & stream the events in order
// 3. streaming: we're all caught up on historic state, so the sequencer outputs events and we
// immediately yield them
async *events(backfillFrom?: string): AsyncGenerator<RepoAppendEvent> {
// catch up as much as we can
if (backfillFrom) {
for await (const evt of this.getBackfill(backfillFrom)) {
this.lastSeen = evt.seq
yield evt
}
} else {
// if not backfill, we don't need to cutover, just start streaming
this.caughtUp = true
}
// streams updates from sequencer, but buffers them for cutover as it makes a last request
this.sequencer.on('events', (evts) => {
if (this.caughtUp) {
this.outBuffer.pushMany(evts)
} else {
this.cutoverBuffer = [...this.cutoverBuffer, ...evts]
}
})
const cutover = async () => {
// only need to perform cutover if we've been backfilling
if (backfillFrom) {
const cutoverEvts = await this.sequencer.requestSeqRange({
earliestSeq: this.lastSeen,
earliestTime: backfillFrom,
})
this.outBuffer.pushMany(cutoverEvts)
// dont worry about dupes, we ensure order on yield
this.outBuffer.pushMany(this.cutoverBuffer)
this.caughtUp = true
this.cutoverBuffer = []
} else {
this.caughtUp = true
}
}
cutover()
while (true) {
try {
for await (const evt of this.outBuffer.events()) {
if (evt.seq > this.lastSeen) {
this.lastSeen = evt.seq
yield evt
}
}
} catch (err) {
if (err instanceof AsyncBufferFullError) {
throw new StreamConsumerTooSlowError(err)
} else {
throw err
}
}
}
}
// yields only historical events
async *getBackfill(startTime?: string) {
while (true) {
const evts = await this.sequencer.requestSeqRange({
earliestTime: startTime,
earliestSeq: this.lastSeen,
limit: 50,
})
for (const evt of evts) {
yield evt
}
// we requested 50, if we get less than that then we know we're caught up & can do cutover
if (evts.length < 50) {
break
}
}
}
}
export class StreamConsumerTooSlowError extends Error {
constructor(bufferErr: AsyncBufferFullError) {
super(`Stream consumer too slow: ${bufferErr.message}`)
}
}
export default Outbox

@ -122,7 +122,23 @@ export class RepoService {
commit: CID,
writes: PreparedWrite[],
) {
await this.blobs.processWriteBlobs(did, commit, writes)
await Promise.all([
this.blobs.processWriteBlobs(did, commit, writes),
this.sequenceWrite(did, commit),
])
}
async sequenceWrite(did: string, commit: CID) {
await this.db.db
.insertInto('repo_seq')
.values({
did,
commit: commit.toString(),
eventType: 'repo_append',
sequencedAt: new Date().toISOString(),
})
.execute()
await this.db.notify('repo_seq')
}
async deleteRepo(did: string) {
@ -139,6 +155,7 @@ export class RepoService {
.where('creator', '=', did)
.execute(),
this.db.db.deleteFrom('repo_root').where('did', '=', did).execute(),
this.db.db.deleteFrom('repo_seq').where('did', '=', did).execute(),
this.blobs.deleteForUser(did),
])
}

@ -11,6 +11,7 @@ import { PDS, ServerConfig, Database, MemoryBlobStore } from '../src/index'
import { Main as FeedViewPost } from '../src/lexicon/types/app/bsky/feed/feedViewPost'
import DiskBlobStore from '../src/storage/disk-blobstore'
import AppContext from '../src/context'
import { HOUR } from '@atproto/common'
const ADMIN_PASSWORD = 'admin-pass'
@ -72,6 +73,8 @@ export const runTestServer = async (
dbPostgresUrl: process.env.DB_POSTGRES_URL,
blobstoreLocation: `${blobstoreLoc}/blobs`,
blobstoreTmp: `${blobstoreLoc}/tmp`,
maxSubscriptionBuffer: 200,
repoBackfillLimitMs: HOUR,
...params,
})

@ -23,6 +23,8 @@ import { PostEmbedExternal } from '../src/db/tables/post-embed-external'
import { RepoCommitHistory } from '../src/db/tables/repo-commit-history'
import { RepoCommitBlock } from '../src/db/tables/repo-commit-block'
import { Record } from '../src/db/tables/record'
import { RepoSeq } from '../src/db/tables/repo-seq'
import { Selectable } from 'kysely'
describe('account deletion', () => {
let agent: AtpAgent
@ -142,6 +144,9 @@ describe('account deletion', () => {
expect(updatedDbContents.blocks).toEqual(
initialDbContents.blocks.filter((row) => row.creator !== carol.did),
)
expect(updatedDbContents.seqs).toEqual(
initialDbContents.seqs.filter((row) => row.did !== carol.did),
)
expect(updatedDbContents.commitBlocks).toEqual(
initialDbContents.commitBlocks.filter((row) => row.creator !== carol.did),
)
@ -253,6 +258,7 @@ type DbContents = {
roots: RepoRoot[]
users: User[]
blocks: IpldBlock[]
seqs: Selectable<RepoSeq>[]
commitHistories: RepoCommitHistory[]
commitBlocks: RepoCommitBlock[]
records: Record[]
@ -272,6 +278,7 @@ const getDbContents = async (db: Database): Promise<DbContents> => {
roots,
users,
blocks,
seqs,
commitHistories,
commitBlocks,
records,
@ -293,6 +300,7 @@ const getDbContents = async (db: Database): Promise<DbContents> => {
.orderBy('cid')
.selectAll()
.execute(),
db.db.selectFrom('repo_seq').orderBy('seq').selectAll().execute(),
db.db
.selectFrom('repo_commit_history')
.orderBy('creator')
@ -302,6 +310,7 @@ const getDbContents = async (db: Database): Promise<DbContents> => {
db.db
.selectFrom('repo_commit_block')
.orderBy('creator')
.orderBy('commit')
.orderBy('block')
.selectAll()
.execute(),
@ -334,6 +343,7 @@ const getDbContents = async (db: Database): Promise<DbContents> => {
roots,
users,
blocks,
seqs,
commitHistories,
commitBlocks,
records,

@ -1,7 +1,7 @@
import { allComplete, createDeferrables } from '@atproto/common'
import { allComplete, createDeferrables, wait } from '@atproto/common'
import { Database } from '../src'
describe('db', () => {
describe('db notify', () => {
let dbOne: Database
let dbTwo: Database
@ -61,11 +61,49 @@ describe('db', () => {
})
for (let i = 0; i < sendCount; i++) {
dbTwo.notify('repo_seq')
await dbTwo.notify('repo_seq')
}
await allComplete(deferrables)
expect(receivedOne).toBe(sendCount)
expect(receivedTwo).toBe(sendCount)
})
it('bundles within txs', async () => {
const sendCount = 5
let receivedCount = 0
dbOne.channels.repo_seq.addListener('message', () => {
receivedCount++
})
await dbTwo.transaction(async (dbTx) => {
for (let i = 0; i < sendCount; i++) {
await dbTx.notify('repo_seq')
}
})
await wait(200)
expect(receivedCount).toBe(1)
})
it('does not send on failed tx', async () => {
let received = false
dbOne.channels.repo_seq.addListener('message', () => {
received = true
})
const fakeErr = new Error('test')
try {
await dbTwo.transaction(async (dbTx) => {
await dbTx.notify('repo_seq')
throw fakeErr
})
} catch (err) {
if (err !== fakeErr) {
throw err
}
}
await wait(200)
expect(received).toBeFalsy()
})
})

@ -11,6 +11,7 @@ describe('event stream concurrency', () => {
const dbPostgresUrl = process.env.DB_POSTGRES_URL
db = dbPostgresUrl
? Database.postgres({
url: dbPostgresUrl,
pool: new Pool({ connectionString: dbPostgresUrl, max: 10 }),
schema: 'event_stream_concurrency',
})

@ -0,0 +1,181 @@
import AtpAgent from '@atproto/api'
import { randomStr } from '@atproto/crypto'
import { readFromGenerator, wait } from '@atproto/common'
import Sequencer, { RepoAppendEvent } from '../src/sequencer'
import Outbox, { StreamConsumerTooSlowError } from '../src/sequencer/outbox'
import { Database } from '../src'
import { SeedClient } from './seeds/client'
import userSeed from './seeds/users'
import { CloseFn, runTestServer } from './_util'
describe('sequencer', () => {
let db: Database
let sequencer: Sequencer
let close: CloseFn
let sc: SeedClient
let alice: string
let bob: string
let totalEvts
const timeBeforeWrites = new Date().toISOString()
let lastSeen: string
beforeAll(async () => {
const server = await runTestServer({
dbPostgresSchema: 'sequencer',
})
close = server.close
db = server.ctx.db
sequencer = server.ctx.sequencer
const agent = new AtpAgent({ service: server.url })
sc = new SeedClient(agent)
await userSeed(sc)
alice = sc.dids.alice
bob = sc.dids.bob
// 6 events in userSeed
totalEvts = 6
})
afterAll(async () => {
await close()
})
const randomPost = async (by: string) => sc.post(by, randomStr(8, 'base32'))
const createPosts = async (count: number): Promise<void> => {
const promises: Promise<unknown>[] = []
for (let i = 0; i < count; i++) {
if (i % 2 === 0) {
promises.push(randomPost(alice))
} else {
promises.push(randomPost(bob))
}
await Promise.all(promises)
}
}
const loadFromDb = (lastSeen: string) => {
return db.db
.selectFrom('repo_seq')
.selectAll()
.where('repo_seq.sequencedAt', '>=', lastSeen)
.orderBy('repo_seq.seq', 'asc')
.execute()
}
const evtToDbRow = (e: RepoAppendEvent) => ({
seq: e.seq,
did: e.repo,
commit: e.commit,
eventType: 'repo_append',
sequencedAt: e.time,
})
it('sends to outbox', async () => {
const count = 20
totalEvts += count
await createPosts(count)
const outbox = new Outbox(sequencer)
const evts = await readFromGenerator(outbox.events(timeBeforeWrites))
expect(evts.length).toBe(totalEvts)
const fromDb = await loadFromDb(timeBeforeWrites)
expect(evts.map(evtToDbRow)).toEqual(fromDb)
lastSeen = evts.at(-1)?.time ?? lastSeen
})
it('handles cut over', async () => {
const count = 20
totalEvts += count
const outbox = new Outbox(sequencer)
const [evts] = await Promise.all([
readFromGenerator(outbox.events(timeBeforeWrites)),
createPosts(count),
])
expect(evts.length).toBe(totalEvts)
const fromDb = await loadFromDb(timeBeforeWrites)
expect(evts.map(evtToDbRow)).toEqual(fromDb)
lastSeen = evts.at(-1)?.time ?? lastSeen
})
it('only gets events after (inclusive) lastSeen', async () => {
const count = 20
totalEvts += count
const outbox = new Outbox(sequencer)
const gen = outbox.events(lastSeen)
const [evts] = await Promise.all([
readFromGenerator(gen),
createPosts(count),
])
// +1 because we send the lastSeen date as well
expect(evts.length).toBe(count + 1)
const fromDb = await loadFromDb(lastSeen)
expect(evts.map(evtToDbRow)).toEqual(fromDb)
lastSeen = evts.at(-1)?.time ?? lastSeen
})
it('buffers events that are not being read', async () => {
const count = 20
totalEvts += count
const outbox = new Outbox(sequencer)
const evtGenerator = outbox.events(lastSeen)
// read enough to start streaming then wait so that the rest go into the buffer,
// then stream out from buffer
const [firstPart] = await Promise.all([
readFromGenerator(evtGenerator, 5),
createPosts(count),
])
const secondPart = await readFromGenerator(evtGenerator)
const evts = [...firstPart, ...secondPart]
expect(evts.length).toBe(count + 1)
const fromDb = await loadFromDb(lastSeen)
expect(evts.map(evtToDbRow)).toEqual(fromDb)
lastSeen = evts.at(-1)?.time ?? lastSeen
})
it('errors when buffer is overloaded', async () => {
const count = 20
totalEvts += count
const outbox = new Outbox(sequencer, { maxBufferSize: 5 })
const evtGenerator = outbox.events(lastSeen)
// read enough to start streaming then wait to stream rest until buffer is overloaded
const overloadBuffer = async () => {
await Promise.all([
readFromGenerator(evtGenerator, 5),
createPosts(count),
])
await wait(500)
await readFromGenerator(evtGenerator)
}
await expect(overloadBuffer).rejects.toThrow(StreamConsumerTooSlowError)
const fromDb = await loadFromDb(lastSeen)
lastSeen = fromDb.at(-1)?.sequencedAt ?? lastSeen
})
it('handles many open connections', async () => {
const count = 20
const outboxes: Outbox[] = []
for (let i = 0; i < 50; i++) {
outboxes.push(new Outbox(sequencer))
}
const readOutboxes = Promise.all(
outboxes.map((o) => readFromGenerator(o.events(lastSeen))),
)
const [results] = await Promise.all([readOutboxes, createPosts(count)])
const fromDb = await loadFromDb(lastSeen)
for (let i = 0; i < 50; i++) {
const evts = results[i]
expect(evts.length).toBe(count + 1)
expect(evts.map(evtToDbRow)).toEqual(fromDb)
}
lastSeen = results[0].at(-1)?.time ?? lastSeen
})
})

@ -54,6 +54,7 @@ describe('server', () => {
const axiosError = err as AxiosError
expect(axiosError.response?.status).toEqual(500)
expect(axiosError.response?.data).toEqual({
error: 'InternalServerError',
message: 'Internal Server Error',
})
}
@ -91,7 +92,7 @@ describe('server', () => {
})
it('healthcheck fails when database is unavailable.', async () => {
await db.db.destroy()
await db.close()
let error: AxiosError
try {
await axios.get(`${server.url}/xrpc/_health`)

@ -0,0 +1,212 @@
import AtpAgent from '@atproto/api'
import { HOUR, MINUTE, readFromGenerator, wait } from '@atproto/common'
import { randomStr } from '@atproto/crypto'
import { DidResolver } from '@atproto/did-resolver'
import * as repo from '@atproto/repo'
import { MemoryBlockstore } from '@atproto/repo'
import { byFrame, ErrorFrame } from '@atproto/xrpc-server'
import { WebSocket } from 'ws'
import { RepoAppend } from '../../src/lexicon/types/com/atproto/sync/subscribeAllRepos'
import { Database } from '../../src'
import { SeedClient } from '../seeds/client'
import basicSeed from '../seeds/basic'
import { CloseFn, runTestServer } from '../_util'
describe('repo subscribe all repos', () => {
let serverHost: string
let db: Database
let didResolver: DidResolver
let agent: AtpAgent
let sc: SeedClient
let alice: string
let bob: string
let carol: string
let dan: string
const timeAtStart = new Date().toISOString()
let close: CloseFn
beforeAll(async () => {
const server = await runTestServer({
dbPostgresSchema: 'repo_subscribe_all_repos',
})
serverHost = server.url.replace('http://', '')
db = server.ctx.db
close = server.close
agent = new AtpAgent({ service: server.url })
sc = new SeedClient(agent)
await basicSeed(sc)
alice = sc.dids.alice
bob = sc.dids.bob
carol = sc.dids.carol
dan = sc.dids.dan
didResolver = new DidResolver({ plcUrl: server.ctx.cfg.didPlcUrl })
})
afterAll(async () => {
await close()
})
const getRepo = async (did: string) => {
const car = await agent.api.com.atproto.sync.getRepo({ did })
const storage = new MemoryBlockstore()
const synced = await repo.loadFullRepo(
storage,
new Uint8Array(car.data),
didResolver,
)
return repo.Repo.load(storage, synced.root)
}
const verifyRepo = async (did: string, evts: RepoAppend[]) => {
const didRepo = await getRepo(did)
const commits = await didRepo.storage.getCommits(didRepo.cid, null)
if (!commits) {
return expect(commits !== null)
}
expect(evts.length).toBe(commits.length)
for (let i = 0; i < commits.length; i++) {
const commit = commits[i]
const evt = evts[i]
expect(evt.repo).toEqual(did)
expect(evt.commit).toEqual(commit.commit.toString())
expect(evt.prev).toEqual(commits[i - 1]?.commit?.toString())
const car = await repo.readCar(evt.blocks as Uint8Array)
expect(car.root.equals(commit.commit))
expect(car.blocks.equals(commit.blocks))
}
}
const randomPost = async (by: string) => sc.post(by, randomStr(8, 'base32'))
const makePosts = async () => {
const promises: Promise<unknown>[] = []
for (let i = 0; i < 10; i++) {
promises.push(randomPost(alice))
promises.push(randomPost(bob))
promises.push(randomPost(carol))
promises.push(randomPost(dan))
}
await Promise.all(promises)
}
it('sync backfilled events', async () => {
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeAllRepos?backfillFrom=${timeAtStart}`,
)
const gen = byFrame(ws)
const evts = await readFromGenerator(gen)
ws.terminate()
const byUser = evts.reduce((acc, cur) => {
const evt = cur.body as RepoAppend
acc[evt.repo] ??= []
acc[evt.repo].push(evt)
return acc
}, {} as Record<string, RepoAppend[]>)
await verifyRepo(alice, byUser[alice])
await verifyRepo(bob, byUser[bob])
await verifyRepo(carol, byUser[carol])
await verifyRepo(dan, byUser[dan])
})
it('syncs new events', async () => {
const readAfterDelay = async () => {
await wait(200) // wait just a hair so that we catch it during cutover
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeAllRepos?backfillFrom=${timeAtStart}`,
)
const evts = await readFromGenerator(byFrame(ws))
ws.terminate()
return evts
}
const [evts] = await Promise.all([readAfterDelay(), makePosts()])
const byUser = evts.reduce((acc, cur) => {
const evt = cur.body as RepoAppend
acc[evt.repo] ??= []
acc[evt.repo].push(evt)
return acc
}, {} as Record<string, RepoAppend[]>)
await verifyRepo(alice, byUser[alice])
await verifyRepo(bob, byUser[bob])
await verifyRepo(carol, byUser[carol])
await verifyRepo(dan, byUser[dan])
})
it('handles no backfill & backfill in future', async () => {
const wsNoBackfill = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeAllRepos`,
)
const FUTURE = new Date(Date.now() + 100000).toISOString()
const wsFuture = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeAllRepos?backfillFrom=${FUTURE}`,
)
const makePostsAfterWait = async () => {
// give them just a second to get subscriptions set up
await wait(200)
await makePosts()
}
const [noBackfill, future] = await Promise.all([
// give these generators a little bit more leeway time
readFromGenerator(byFrame(wsNoBackfill)),
readFromGenerator(byFrame(wsFuture)),
makePostsAfterWait(),
])
wsNoBackfill.terminate()
wsFuture.terminate()
expect(future.length).toBe(40)
expect(noBackfill.length).toBe(40)
expect(noBackfill).toEqual(future)
})
it('backfills only from provided time', async () => {
const seqs = await db.db
.selectFrom('repo_seq')
.selectAll()
.orderBy('seq', 'asc')
.execute()
let midPoint = Math.floor(seqs.length / 2)
let midPointTime = seqs[midPoint].sequencedAt
// ensure we get the earliest seq with the same timestamp as the midpoint
while (seqs[midPoint - 1].sequencedAt === midPointTime) {
midPoint = midPoint - 1
midPointTime = seqs[midPoint].sequencedAt
}
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeAllRepos?backfillFrom=${midPointTime}`,
)
const evts = await readFromGenerator(byFrame(ws))
ws.terminate()
const seqSlice = seqs.slice(midPoint)
expect(evts.length).toBe(seqSlice.length)
for (let i = 0; i < evts.length; i++) {
const evt = evts[i].body as RepoAppend
const seq = seqSlice[i]
expect(evt.time).toEqual(seq.sequencedAt)
expect(evt.commit).toEqual(seq.commit)
expect(evt.repo).toEqual(seq.did)
}
})
it('errors on too old of a backfill', async () => {
const overAnHourAgo = new Date(Date.now() - HOUR - MINUTE).toISOString()
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeAllRepos?backfillFrom=${overAnHourAgo}`,
)
const frames = await readFromGenerator(byFrame(ws))
ws.terminate()
expect(frames.length).toBe(1)
expect(frames[0]).toBeInstanceOf(ErrorFrame)
})
})

@ -6,7 +6,7 @@ import * as repo from '@atproto/repo'
import { collapseWriteLog, MemoryBlockstore } from '@atproto/repo'
import { AtUri } from '@atproto/uri'
import { CID } from 'multiformats/cid'
import { CloseFn, runTestServer } from './_util'
import { CloseFn, runTestServer } from '../_util'
describe('repo sync', () => {
let agent: AtpAgent

@ -21,15 +21,19 @@
"dependencies": {
"@atproto/common": "*",
"@atproto/lexicon": "*",
"cbor-x": "^1.5.1",
"express": "^4.17.2",
"http-errors": "^2.0.0",
"mime-types": "^2.1.35",
"uint8arrays": "3.0.0",
"ws": "^8.12.0",
"zod": "^3.14.2"
},
"devDependencies": {
"@atproto/crypto": "*",
"@atproto/xrpc": "*",
"@types/express": "^4.17.13",
"@types/http-errors": "^2.0.1"
"@types/http-errors": "^2.0.1",
"@types/ws": "^8.5.4"
}
}

@ -1,2 +1,3 @@
export * from './types'
export * from './server'
export * from './stream'

@ -4,7 +4,13 @@ import express, {
NextFunction,
RequestHandler,
} from 'express'
import { Lexicons, LexXrpcProcedure, LexXrpcQuery } from '@atproto/lexicon'
import {
Lexicons,
LexXrpcProcedure,
LexXrpcQuery,
LexXrpcSubscription,
} from '@atproto/lexicon'
import { ErrorFrame, Frame, MessageFrame, XrpcStreamServer } from './stream'
import {
XRPCHandler,
XRPCError,
@ -18,8 +24,15 @@ import {
AuthVerifier,
isHandlerError,
Options,
XRPCStreamHandlerConfig,
XRPCStreamHandler,
} from './types'
import { decodeQueryParams, validateInput, validateOutput } from './util'
import {
decodeQueryParams,
getQueryParams,
validateInput,
validateOutput,
} from './util'
import log from './logger'
export function createServer(lexicons?: unknown[], options?: Options) {
@ -27,8 +40,9 @@ export function createServer(lexicons?: unknown[], options?: Options) {
}
export class Server {
router = express.Router()
router = express()
routes = express.Router()
subscriptions = new Map<string, XrpcStreamServer>()
lex = new Lexicons()
options: Options
middleware: Record<'json' | 'text', RequestHandler>
@ -40,6 +54,9 @@ export class Server {
this.router.use(this.routes)
this.router.use('/xrpc/:methodId', this.catchall.bind(this))
this.router.use(errorMiddleware)
this.router.once('mount', (app: express.Application) => {
this.enableStreamingOnListen(app)
})
this.options = opts ?? {}
this.middleware = {
json: express.json({ limit: opts?.payload?.jsonLimit }),
@ -58,10 +75,32 @@ export class Server {
const config =
typeof configOrFn === 'function' ? { handler: configOrFn } : configOrFn
const def = this.lex.getDef(nsid)
if (!def || (def.type !== 'query' && def.type !== 'procedure')) {
if (def?.type === 'query' || def?.type === 'procedure') {
this.addRoute(nsid, def, config)
} else {
throw new Error(`Lex def for ${nsid} is not a query or a procedure`)
}
this.addRoute(nsid, def, config)
}
streamMethod(
nsid: string,
configOrFn: XRPCStreamHandlerConfig | XRPCStreamHandler,
) {
this.addStreamMethod(nsid, configOrFn)
}
addStreamMethod(
nsid: string,
configOrFn: XRPCStreamHandlerConfig | XRPCStreamHandler,
) {
const config =
typeof configOrFn === 'function' ? { handler: configOrFn } : configOrFn
const def = this.lex.getDef(nsid)
if (def?.type === 'subscription') {
this.addSubscription(nsid, def, config)
} else {
throw new Error(`Lex def for ${nsid} is not a subscription`)
}
}
// schemas
@ -200,6 +239,85 @@ export class Server {
}
}
}
protected async addSubscription(
nsid: string,
def: LexXrpcSubscription,
config: XRPCStreamHandlerConfig,
) {
const assertValidXrpcParams = (params: unknown) =>
this.lex.assertValidXrpcParams(nsid, params)
const resolveLexUri = (ref: string) => this.lex.resolveLexUri(nsid, ref)
this.subscriptions.set(
nsid,
new XrpcStreamServer({
noServer: true,
handler: async function* (req) {
try {
// authenticate request
const auth = await config.auth?.({ req })
if (isHandlerError(auth)) {
throw XRPCError.fromError(auth)
}
// validate request
const params = decodeQueryParams(def, getQueryParams(req.url))
try {
assertValidXrpcParams(params)
} catch (e) {
throw new InvalidRequestError(String(e))
}
// stream
const items = config.handler({ req, params, auth })
for await (const item of items) {
if (item instanceof Frame) {
yield item
} else if (
typeof item?.['$type'] === 'string' &&
def.message?.codes
) {
const typeUri = resolveLexUri(item['$type'])
if (def.message.codes[typeUri] !== undefined) {
const code = def.message.codes[typeUri]
const clone = { ...item }
delete clone['$type']
yield new MessageFrame(clone, { type: code })
} else {
yield new MessageFrame(item)
}
} else {
yield new MessageFrame(item)
}
}
} catch (err) {
const xrpcErrPayload = XRPCError.fromError(err).payload
yield new ErrorFrame({
error: xrpcErrPayload.error ?? 'Unknown',
message: xrpcErrPayload.message,
})
}
},
}),
)
}
private enableStreamingOnListen(app: express.Application) {
const _listen = app.listen
app.listen = (...args) => {
// @ts-ignore the args spread
const httpServer = _listen.call(app, ...args)
httpServer.on('upgrade', (req, socket, head) => {
const url = new URL(req.url || '', 'http://x')
const sub = url.pathname.startsWith('/xrpc/')
? this.subscriptions.get(url.pathname.replace('/xrpc/', ''))
: undefined
if (!sub) return socket.destroy()
sub.wss.handleUpgrade(req, socket, head, (ws) =>
sub.wss.emit('connection', ws, req),
)
})
return httpServer
}
}
}
function isHandlerSuccess(v: HandlerOutput): v is HandlerSuccess {

@ -0,0 +1,118 @@
import * as cborx from 'cbor-x'
import * as uint8arrays from 'uint8arrays'
import {
frameHeader,
FrameHeader,
FrameType,
InfoFrameHeader,
MessageFrameHeader,
ErrorFrameHeader,
infoFrameBody,
InfoFrameBody,
ErrorFrameBody,
errorFrameBody,
} from './types'
export abstract class Frame {
header: FrameHeader
body: unknown
get op(): FrameType {
return this.header.op
}
toBytes(): Uint8Array {
return uint8arrays.concat([
cborx.encode(this.header),
cborx.encode(this.body),
])
}
static fromBytes(bytes: Uint8Array) {
let i = 0
let header: unknown
let body: unknown = kUnset
cborx.decodeMultiple(bytes, (item) => {
if (i === 0) {
header = item
} else if (i === 1) {
body = item
} else {
throw new Error('Too many CBOR data items in frame')
}
i++
})
const parsedHeader = frameHeader.safeParse(header)
if (!parsedHeader.success) {
throw new Error(`Invalid frame header: ${parsedHeader.error.message}`)
}
if (body === kUnset) {
throw new Error('Missing frame body')
}
const frameOp = parsedHeader.data.op
if (frameOp === FrameType.Message) {
return new MessageFrame(body, {
type: parsedHeader.data.t,
})
} else if (frameOp === FrameType.Info) {
const parsedBody = infoFrameBody.safeParse(body)
if (!parsedBody.success) {
throw new Error(`Invalid info frame body: ${parsedBody.error.message}`)
}
return new InfoFrame(parsedBody.data)
} else if (frameOp === FrameType.Error) {
const parsedBody = errorFrameBody.safeParse(body)
if (!parsedBody.success) {
throw new Error(`Invalid error frame body: ${parsedBody.error.message}`)
}
return new ErrorFrame(parsedBody.data)
} else {
const exhaustiveCheck: never = frameOp
throw new Error(`Unknown frame op: ${exhaustiveCheck}`)
}
}
}
export class MessageFrame<T = Record<string, unknown>> extends Frame {
header: MessageFrameHeader
body: T
constructor(body: T, opts?: { type?: number }) {
super()
this.header = { op: FrameType.Message, t: opts?.type }
this.body = body
}
get type() {
return this.header.t
}
}
export class InfoFrame<T extends string = string> extends Frame {
header: InfoFrameHeader
body: InfoFrameBody<T>
constructor(body: InfoFrameBody<T>) {
super()
this.header = { op: FrameType.Info }
this.body = body
}
get code() {
return this.body.info
}
get message() {
return this.body.message
}
}
export class ErrorFrame<T extends string = string> extends Frame {
header: ErrorFrameHeader
body: ErrorFrameBody<T>
constructor(body: ErrorFrameBody<T>) {
super()
this.header = { op: FrameType.Error }
this.body = body
}
get code() {
return this.body.error
}
get message() {
return this.body.message
}
}
const kUnset = Symbol('unset')

@ -0,0 +1,4 @@
export * from './types'
export * from './frames'
export * from './stream'
export * from './server'

@ -0,0 +1,5 @@
import { subsystemLogger } from '@atproto/common'
export const logger = subsystemLogger('xrpc-stream')
export default logger

@ -0,0 +1,74 @@
import { IncomingMessage } from 'http'
import { WebSocketServer, ServerOptions, WebSocket } from 'ws'
import { ErrorFrame, Frame } from './frames'
import logger from './logger'
export class XrpcStreamServer {
wss: WebSocketServer
constructor(opts: ServerOptions & { handler: Handler }) {
const { handler, ...serverOpts } = opts
this.wss = new WebSocketServer(serverOpts)
this.wss.on('connection', async (socket, req) => {
socket.on('error', (err) => logger.error(err, 'websocket error'))
try {
const iterator = unwrapIterator(handler(req, socket, this))
socket.once('close', () => iterator.return?.())
const safeFrames = wrapIterator(iterator)
for await (const frame of safeFrames) {
if (frame instanceof ErrorFrame) {
await new Promise((res, rej) => {
socket.send(frame.toBytes(), { binary: true }, (err) => {
if (err) return rej(err)
res(undefined)
})
})
throw new DisconnectError(CloseCode.Policy, frame.body.error)
} else {
socket.send(frame.toBytes(), { binary: true })
}
}
} catch (err) {
if (err instanceof DisconnectError) {
return socket.close(err.wsCode, err.xrpcCode)
} else {
logger.error(err, 'websocket server error')
return socket.terminate()
}
}
socket.close(CloseCode.Normal)
})
}
}
export type Handler = (
req: IncomingMessage,
socket: WebSocket,
server: XrpcStreamServer,
) => AsyncIterable<Frame>
export class DisconnectError extends Error {
constructor(
public wsCode: CloseCode = CloseCode.Policy,
public xrpcCode?: string,
) {
super()
}
}
// https://www.rfc-editor.org/rfc/rfc6455#section-7.4.1
export enum CloseCode {
Normal = 1000,
Policy = 1008,
}
function unwrapIterator<T>(iterable: AsyncIterable<T>): AsyncIterator<T> {
return iterable[Symbol.asyncIterator]()
}
function wrapIterator<T>(iterator: AsyncIterator<T>): AsyncIterable<T> {
return {
[Symbol.asyncIterator]() {
return iterator
},
}
}

@ -0,0 +1,13 @@
import { DuplexOptions } from 'stream'
import { createWebSocketStream, WebSocket } from 'ws'
import { Frame } from './frames'
export async function* byFrame(ws: WebSocket, options?: DuplexOptions) {
const wsStream = createWebSocketStream(ws, {
...options,
readableObjectMode: true, // Ensures frame bytes don't get buffered/combined together
})
for await (const chunk of wsStream) {
yield Frame.fromBytes(chunk)
}
}

@ -0,0 +1,44 @@
import { z } from 'zod'
export enum FrameType {
Message = 1,
Info = 2,
Error = -1,
}
export const messageFrameHeader = z.object({
op: z.literal(FrameType.Message), // Frame op
t: z.number().int().optional(), // Message body type discriminator
})
export type MessageFrameHeader = z.infer<typeof messageFrameHeader>
export const infoFrameHeader = z.object({
op: z.literal(FrameType.Info),
})
export const infoFrameBody = z.object({
info: z.string(), // Info code
message: z.string().optional(), // Info message
})
export type InfoFrameHeader = z.infer<typeof infoFrameHeader>
export type InfoFrameBody<T extends string = string> = { info: T } & z.infer<
typeof infoFrameBody
>
export const errorFrameHeader = z.object({
op: z.literal(FrameType.Error),
})
export const errorFrameBody = z.object({
error: z.string(), // Error code
message: z.string().optional(), // Error message
})
export type ErrorFrameHeader = z.infer<typeof errorFrameHeader>
export type ErrorFrameBody<T extends string = string> = { error: T } & z.infer<
typeof errorFrameBody
>
export const frameHeader = z.union([
messageFrameHeader,
infoFrameHeader,
errorFrameHeader,
])
export type FrameHeader = z.infer<typeof frameHeader>

@ -1,7 +1,12 @@
import { IncomingMessage } from 'http'
import express from 'express'
import { isHttpError } from 'http-errors'
import zod from 'zod'
import { ResponseType, ResponseTypeStrings } from '@atproto/xrpc'
import {
ResponseType,
ResponseTypeStrings,
ResponseTypeNames,
} from '@atproto/xrpc'
export type Options = {
validateResponse?: boolean
@ -52,6 +57,12 @@ export type XRPCHandler = (ctx: {
res: express.Response
}) => Promise<HandlerOutput> | HandlerOutput | undefined
export type XRPCStreamHandler = (ctx: {
auth: HandlerAuth | undefined
params: Params
req: IncomingMessage
}) => AsyncIterable<unknown>
export type AuthOutput = HandlerAuth | HandlerError
export type AuthVerifier = (ctx: {
@ -59,11 +70,20 @@ export type AuthVerifier = (ctx: {
res: express.Response
}) => Promise<AuthOutput> | AuthOutput
export type StreamAuthVerifier = (ctx: {
req: IncomingMessage
}) => Promise<AuthOutput> | AuthOutput
export type XRPCHandlerConfig = {
auth?: AuthVerifier
handler: XRPCHandler
}
export type XRPCStreamHandlerConfig = {
auth?: StreamAuthVerifier
handler: XRPCStreamHandler
}
export class XRPCError extends Error {
constructor(
public type: ResponseType,
@ -75,7 +95,7 @@ export class XRPCError extends Error {
get payload() {
return {
error: this.customErrorName,
error: this.customErrorName ?? this.typeName,
message:
this.type === ResponseType.InternalServerError
? this.typeStr // Do not respond with error details for 500s
@ -83,6 +103,10 @@ export class XRPCError extends Error {
}
}
get typeName(): string | undefined {
return ResponseTypeNames[this.type]
}
get typeStr(): string | undefined {
return ResponseTypeStrings[this.type]
}

@ -2,7 +2,12 @@ import { Readable, Transform } from 'stream'
import { createDeflate, createGunzip } from 'zlib'
import express from 'express'
import mime from 'mime-types'
import { Lexicons, LexXrpcProcedure, LexXrpcQuery } from '@atproto/lexicon'
import {
Lexicons,
LexXrpcProcedure,
LexXrpcQuery,
LexXrpcSubscription,
} from '@atproto/lexicon'
import { forwardStreamErrors, MaxSizeChecker } from '@atproto/common'
import {
UndecodedParams,
@ -17,7 +22,7 @@ import {
} from './types'
export function decodeQueryParams(
def: LexXrpcProcedure | LexXrpcQuery,
def: LexXrpcProcedure | LexXrpcQuery | LexXrpcSubscription,
params: UndecodedParams,
): Params {
const decoded: Params = {}
@ -59,6 +64,18 @@ export function decodeQueryParam(
}
}
export function getQueryParams(url = ''): Record<string, string | string[]> {
const { searchParams } = new URL(url ?? '', 'http://x')
const result: Record<string, string | string[]> = {}
for (const key of searchParams.keys()) {
result[key] = searchParams.getAll(key)
if (result[key].length === 1) {
result[key] = result[key][0]
}
}
return result
}
export function validateInput(
nsid: string,
def: LexXrpcProcedure | LexXrpcQuery,

@ -1,6 +1,7 @@
import * as http from 'http'
import express from 'express'
import * as xrpc from '../src/index'
import * as xrpc from '../src'
import { AuthRequiredError } from '../src'
export async function createServer(
port: number,
@ -18,3 +19,37 @@ export async function closeServer(httpServer: http.Server) {
httpServer.close(() => r(undefined))
})
}
export function createBasicAuth(allowed: {
username: string
password: string
}) {
return function (ctx: { req: http.IncomingMessage }) {
const header = ctx.req.headers.authorization ?? ''
if (!header.startsWith('Basic ')) {
throw new AuthRequiredError()
}
const original = header.replace('Basic ', '')
const [username, password] = Buffer.from(original, 'base64')
.toString()
.split(':')
if (username !== allowed.username || password !== allowed.password) {
throw new AuthRequiredError()
}
return {
credentials: { username },
artifacts: { original },
}
}
}
export function basicAuthHeaders(creds: {
username: string
password: string
}) {
return {
authorization:
'Basic ' +
Buffer.from(`${creds.username}:${creds.password}`).toString('base64'),
}
}

@ -1,9 +1,12 @@
import * as http from 'http'
import express from 'express'
import xrpc, { XRPCError } from '@atproto/xrpc'
import { createServer, closeServer } from './_util'
import * as xrpcServer from '../src'
import { AuthRequiredError } from '../src'
import {
createServer,
closeServer,
createBasicAuth,
basicAuthHeaders,
} from './_util'
const LEXICONS = [
{
@ -125,31 +128,3 @@ describe('Auth', () => {
})
})
})
function createBasicAuth(allowed: { username: string; password: string }) {
return function (ctx: { req: express.Request }) {
const header = ctx.req.headers.authorization ?? ''
if (!header.startsWith('Basic ')) {
throw new AuthRequiredError()
}
const original = header.replace('Basic ', '')
const [username, password] = Buffer.from(original, 'base64')
.toString()
.split(':')
if (username !== allowed.username || password !== allowed.password) {
throw new AuthRequiredError()
}
return {
credentials: { username },
artifacts: { original },
}
}
}
function basicAuthHeaders(creds: { username: string; password: string }) {
return {
authorization:
'Basic ' +
Buffer.from(`${creds.username}:${creds.password}`).toString('base64'),
}
}

@ -0,0 +1,179 @@
import * as cborx from 'cbor-x'
import * as uint8arrays from 'uint8arrays'
import { MessageFrame, ErrorFrame, Frame, FrameType, InfoFrame } from '../src'
describe('Frames', () => {
it('creates and parses message frame.', async () => {
const messageFrame = new MessageFrame({ a: 'b', c: [1, 2, 3] }, { type: 2 })
expect(messageFrame.header).toEqual({
op: FrameType.Message,
t: 2,
})
expect(messageFrame.op).toEqual(FrameType.Message)
expect(messageFrame.type).toEqual(2)
expect(messageFrame.body).toEqual({ a: 'b', c: [1, 2, 3] })
const bytes = messageFrame.toBytes()
expect(
uint8arrays.equals(
bytes,
new Uint8Array([
/*header*/ 185, 0, 2, 98, 111, 112, 1, 97, 116, 2, /*body*/ 185, 0, 2,
97, 97, 97, 98, 97, 99, 131, 1, 2, 3,
]),
),
).toEqual(true)
const parsedFrame = Frame.fromBytes(bytes)
if (!(parsedFrame instanceof MessageFrame)) {
throw new Error('Did not parse as message frame')
}
expect(parsedFrame.header).toEqual(messageFrame.header)
expect(parsedFrame.op).toEqual(messageFrame.op)
expect(parsedFrame.type).toEqual(messageFrame.type)
expect(parsedFrame.body).toEqual(messageFrame.body)
})
it('creates and parses info frame.', async () => {
const infoFrame = new InfoFrame({
info: 'SomeOccurrence',
message: 'X occurred',
})
expect(infoFrame.header).toEqual({ op: FrameType.Info })
expect(infoFrame.op).toEqual(FrameType.Info)
expect(infoFrame.code).toEqual('SomeOccurrence')
expect(infoFrame.message).toEqual('X occurred')
expect(infoFrame.body).toEqual({
info: 'SomeOccurrence',
message: 'X occurred',
})
const bytes = infoFrame.toBytes()
expect(
uint8arrays.equals(
bytes,
new Uint8Array([
/*header*/ 185, 0, 1, 98, 111, 112, 2, /*body*/ 185, 0, 2, 100, 105,
110, 102, 111, 110, 83, 111, 109, 101, 79, 99, 99, 117, 114, 114, 101,
110, 99, 101, 103, 109, 101, 115, 115, 97, 103, 101, 106, 88, 32, 111,
99, 99, 117, 114, 114, 101, 100,
]),
),
).toEqual(true)
const parsedFrame = Frame.fromBytes(bytes)
if (!(parsedFrame instanceof InfoFrame)) {
throw new Error('Did not parse as info frame')
}
expect(parsedFrame.header).toEqual(infoFrame.header)
expect(parsedFrame.op).toEqual(infoFrame.op)
expect(parsedFrame.code).toEqual(infoFrame.code)
expect(parsedFrame.message).toEqual(infoFrame.message)
expect(parsedFrame.body).toEqual(infoFrame.body)
})
it('creates and parses error frame.', async () => {
const errorFrame = new ErrorFrame({
error: 'BigOops',
message: 'Something went awry',
})
expect(errorFrame.header).toEqual({ op: FrameType.Error })
expect(errorFrame.op).toEqual(FrameType.Error)
expect(errorFrame.code).toEqual('BigOops')
expect(errorFrame.message).toEqual('Something went awry')
expect(errorFrame.body).toEqual({
error: 'BigOops',
message: 'Something went awry',
})
const bytes = errorFrame.toBytes()
expect(
uint8arrays.equals(
bytes,
new Uint8Array([
/*header*/ 185, 0, 1, 98, 111, 112, 32, /*body*/ 185, 0, 2, 101, 101,
114, 114, 111, 114, 103, 66, 105, 103, 79, 111, 112, 115, 103, 109,
101, 115, 115, 97, 103, 101, 115, 83, 111, 109, 101, 116, 104, 105,
110, 103, 32, 119, 101, 110, 116, 32, 97, 119, 114, 121,
]),
),
).toEqual(true)
const parsedFrame = Frame.fromBytes(bytes)
if (!(parsedFrame instanceof ErrorFrame)) {
throw new Error('Did not parse as error frame')
}
expect(parsedFrame.header).toEqual(errorFrame.header)
expect(parsedFrame.op).toEqual(errorFrame.op)
expect(parsedFrame.code).toEqual(errorFrame.code)
expect(parsedFrame.message).toEqual(errorFrame.message)
expect(parsedFrame.body).toEqual(errorFrame.body)
})
it('parsing fails when frame is not CBOR.', async () => {
const bytes = Buffer.from('some utf8 bytes')
const emptyBytes = Buffer.from('')
expect(() => Frame.fromBytes(bytes)).toThrow('Unexpected end of CBOR data')
expect(() => Frame.fromBytes(emptyBytes)).toThrow(
'Unexpected end of CBOR data',
)
})
it('parsing fails when frame header is malformed.', async () => {
const bytes = uint8arrays.concat([
cborx.encode({ op: -2 }), // Unknown op
cborx.encode({ a: 'b', c: [1, 2, 3] }),
])
expect(() => Frame.fromBytes(bytes)).toThrow('Invalid frame header:')
})
it('parsing fails when frame is missing body.', async () => {
const messageFrame = new MessageFrame({ a: 'b', c: [1, 2, 3] }, { type: 2 })
const headerBytes = cborx.encode(messageFrame.header)
expect(() => Frame.fromBytes(headerBytes)).toThrow('Missing frame body')
})
it('parsing fails when frame has too many data items.', async () => {
const messageFrame = new MessageFrame({ a: 'b', c: [1, 2, 3] }, { type: 2 })
const bytes = uint8arrays.concat([
messageFrame.toBytes(),
cborx.encode({ d: 'e', f: [4, 5, 6] }),
])
expect(() => Frame.fromBytes(bytes)).toThrow(
'Too many CBOR data items in frame',
)
})
it('parsing fails when info frame has invalid body.', async () => {
const infoFrame = new InfoFrame({ info: 'SomeOccurrence' })
const bytes = uint8arrays.concat([
cborx.encode(infoFrame.header),
cborx.encode({ blah: 1 }),
])
expect(() => Frame.fromBytes(bytes)).toThrow('Invalid info frame body:')
})
it('parsing fails when error frame has invalid body.', async () => {
const errorFrame = new ErrorFrame({ error: 'BadOops' })
const bytes = uint8arrays.concat([
cborx.encode(errorFrame.header),
cborx.encode({ blah: 1 }),
])
expect(() => Frame.fromBytes(bytes)).toThrow('Invalid error frame body:')
})
})

@ -0,0 +1,124 @@
import * as http from 'http'
import { once } from 'events'
import { AddressInfo } from 'net'
import { WebSocket } from 'ws'
import {
ErrorFrame,
Frame,
InfoFrame,
MessageFrame,
XrpcStreamServer,
byFrame,
} from '../src'
describe('Stream', () => {
const wait = (ms) => new Promise((res) => setTimeout(res, ms))
it('streams message and info frames.', async () => {
const httpServer = http.createServer()
const server = new XrpcStreamServer({
server: httpServer,
handler: async function* () {
await wait(1)
yield new MessageFrame(1)
await wait(1)
yield new MessageFrame(2)
await wait(1)
yield new InfoFrame({ info: 'SomeDiagnostic' })
await wait(1)
yield new MessageFrame(3)
return
},
})
await once(httpServer.listen(), 'listening')
const { port } = server.wss.address() as AddressInfo
const ws = new WebSocket(`ws://localhost:${port}`)
const frames: Frame[] = []
for await (const frame of byFrame(ws)) {
frames.push(frame)
}
expect(frames).toEqual([
new MessageFrame(1),
new MessageFrame(2),
new InfoFrame({ info: 'SomeDiagnostic' }),
new MessageFrame(3),
])
httpServer.close()
})
it('kills handler and closes on error frame.', async () => {
let proceededAfterError = false
const httpServer = http.createServer()
const server = new XrpcStreamServer({
server: httpServer,
handler: async function* () {
await wait(1)
yield new MessageFrame(1)
await wait(1)
yield new MessageFrame(2)
await wait(1)
yield new ErrorFrame({ error: 'BadOops' })
proceededAfterError = true
await wait(1)
yield new MessageFrame(3)
return
},
})
await once(httpServer.listen(), 'listening')
const { port } = server.wss.address() as AddressInfo
const ws = new WebSocket(`ws://localhost:${port}`)
const frames: Frame[] = []
for await (const frame of byFrame(ws)) {
frames.push(frame)
}
await wait(5) // Ensure handler hasn't kept running
expect(proceededAfterError).toEqual(false)
expect(frames).toEqual([
new MessageFrame(1),
new MessageFrame(2),
new ErrorFrame({ error: 'BadOops' }),
])
httpServer.close()
})
it('kills handler and closes client disconnect.', async () => {
const httpServer = http.createServer()
let i = 1
const server = new XrpcStreamServer({
server: httpServer,
handler: async function* () {
while (true) {
await wait(0)
yield new MessageFrame(i++)
}
},
})
await once(httpServer.listen(), 'listening')
const { port } = server.wss.address() as AddressInfo
const ws = new WebSocket(`ws://localhost:${port}`)
const frames: Frame[] = []
for await (const frame of byFrame(ws)) {
frames.push(frame)
if (frame.body === 3) ws.terminate()
}
// Grace period to let close take place on the server
await wait(5)
// Ensure handler hasn't kept running
const currentCount = i
await wait(5)
expect(i).toBe(currentCount)
httpServer.close()
})
})

@ -0,0 +1,232 @@
import * as http from 'http'
import { WebSocket, createWebSocketStream } from 'ws'
import { byFrame, MessageFrame, ErrorFrame, Frame } from '../src'
import {
createServer,
closeServer,
createBasicAuth,
basicAuthHeaders,
} from './_util'
import * as xrpcServer from '../src'
const LEXICONS = [
{
lexicon: 1,
id: 'io.example.stream1',
defs: {
main: {
type: 'subscription',
parameters: {
type: 'params',
required: ['countdown'],
properties: {
countdown: { type: 'integer' },
},
},
message: {
schema: {
type: 'object',
required: ['count'],
properties: { count: { type: 'integer' } },
},
},
},
},
},
{
lexicon: 1,
id: 'io.example.stream2',
defs: {
main: {
type: 'subscription',
parameters: {
type: 'params',
required: ['countdown'],
properties: {
countdown: { type: 'integer' },
},
},
message: {
schema: {
type: 'union',
refs: ['#even', '#odd'],
},
codes: {
'#even': 0,
'io.example.stream2#odd': 1,
},
},
},
even: {
type: 'object',
required: ['count'],
properties: { count: { type: 'integer' } },
},
odd: {
type: 'object',
required: ['count'],
properties: { count: { type: 'integer' } },
},
},
},
{
lexicon: 1,
id: 'io.example.streamAuth',
defs: {
main: {
type: 'subscription',
},
},
},
]
describe('Subscriptions', () => {
let s: http.Server
const server = xrpcServer.createServer(LEXICONS)
server.streamMethod('io.example.stream1', async function* ({ params }) {
const countdown = Number(params.countdown ?? 0)
for (let i = countdown; i >= 0; i--) {
yield { count: i }
}
})
server.streamMethod('io.example.stream2', async function* ({ params }) {
const countdown = Number(params.countdown ?? 0)
for (let i = countdown; i >= 0; i--) {
yield {
$type:
i % 2 === 0 ? 'io.example.stream2#even' : 'io.example.stream2#odd',
count: i,
}
}
yield { $type: 'io.example.stream2#done' }
})
server.streamMethod('io.example.streamAuth', {
auth: createBasicAuth({ username: 'admin', password: 'password' }),
handler: async function* ({ auth }) {
yield auth
},
})
beforeAll(async () => {
s = await createServer(8895, server)
})
afterAll(async () => {
await closeServer(s)
})
it('streams messages', async () => {
const ws = new WebSocket(
'ws://localhost:8895/xrpc/io.example.stream1?countdown=5',
)
const frames: Frame[] = []
for await (const frame of byFrame(ws)) {
frames.push(frame)
}
expect(frames).toEqual([
new MessageFrame({ count: 5 }),
new MessageFrame({ count: 4 }),
new MessageFrame({ count: 3 }),
new MessageFrame({ count: 2 }),
new MessageFrame({ count: 1 }),
new MessageFrame({ count: 0 }),
])
})
it('streams messages in a union', async () => {
const ws = new WebSocket(
'ws://localhost:8895/xrpc/io.example.stream2?countdown=5',
)
const frames: Frame[] = []
for await (const frame of byFrame(ws)) {
frames.push(frame)
}
expect(frames).toEqual([
new MessageFrame({ count: 5 }, { type: 1 }),
new MessageFrame({ count: 4 }, { type: 0 }),
new MessageFrame({ count: 3 }, { type: 1 }),
new MessageFrame({ count: 2 }, { type: 0 }),
new MessageFrame({ count: 1 }, { type: 1 }),
new MessageFrame({ count: 0 }, { type: 0 }),
new MessageFrame({ $type: 'io.example.stream2#done' }),
])
})
it('resolves auth into handler', async () => {
const ws = new WebSocket('ws://localhost:8895/xrpc/io.example.streamAuth', {
headers: basicAuthHeaders({
username: 'admin',
password: 'password',
}),
})
const frames: Frame[] = []
for await (const frame of byFrame(ws)) {
frames.push(frame)
}
expect(frames).toEqual([
new MessageFrame({
credentials: {
username: 'admin',
},
artifacts: {
original: 'YWRtaW46cGFzc3dvcmQ=',
},
}),
])
})
it('errors immediately on bad parameter', async () => {
const ws = new WebSocket('ws://localhost:8895/xrpc/io.example.stream1')
const frames: Frame[] = []
for await (const frame of byFrame(ws)) {
frames.push(frame)
}
expect(frames).toEqual([
new ErrorFrame({
error: 'InvalidRequest',
message: 'Error: Params must have the property "countdown"',
}),
])
})
it('errors immediately on bad auth', async () => {
const ws = new WebSocket('ws://localhost:8895/xrpc/io.example.streamAuth', {
headers: basicAuthHeaders({
username: 'bad',
password: 'wrong',
}),
})
const frames: Frame[] = []
for await (const frame of byFrame(ws)) {
frames.push(frame)
}
expect(frames).toEqual([
new ErrorFrame({
error: 'AuthenticationRequired',
message: 'Authentication Required',
}),
])
})
it('does not websocket upgrade at bad endpoint', async () => {
const ws = new WebSocket('ws://localhost:8895/xrpc/does.not.exist')
const drainStream = async () => {
for await (const bytes of createWebSocketStream(ws)) {
bytes // drain
}
}
expect(drainStream).rejects.toThrow('ECONNREFUSED')
})
})

@ -38,6 +38,6 @@
{ "path": "./packages/nsid/tsconfig.build.json" },
{ "path": "./packages/uri/tsconfig.build.json" },
{ "path": "./packages/xrpc/tsconfig.build.json" },
{ "path": "./packages/xrpc-server/tsconfig.build.json" },
{ "path": "./packages/xrpc-server/tsconfig.build.json" }
]
}
}

@ -3208,6 +3208,36 @@
resolved "https://registry.npmjs.org/@bcoe/v8-coverage/-/v8-coverage-0.2.3.tgz"
integrity sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw==
"@cbor-extract/cbor-extract-darwin-arm64@2.1.1":
version "2.1.1"
resolved "https://registry.yarnpkg.com/@cbor-extract/cbor-extract-darwin-arm64/-/cbor-extract-darwin-arm64-2.1.1.tgz#5721f6dd3feae0b96d23122853ce977e0671b7a6"
integrity sha512-blVBy5MXz6m36Vx0DfLd7PChOQKEs8lK2bD1WJn/vVgG4FXZiZmZb2GECHFvVPA5T7OnODd9xZiL3nMCv6QUhA==
"@cbor-extract/cbor-extract-darwin-x64@2.1.1":
version "2.1.1"
resolved "https://registry.yarnpkg.com/@cbor-extract/cbor-extract-darwin-x64/-/cbor-extract-darwin-x64-2.1.1.tgz#c25e7d0133950d87d101d7b3afafea8d50d83f5f"
integrity sha512-h6KFOzqk8jXTvkOftyRIWGrd7sKQzQv2jVdTL9nKSf3D2drCvQB/LHUxAOpPXo3pv2clDtKs3xnHalpEh3rDsw==
"@cbor-extract/cbor-extract-linux-arm64@2.1.1":
version "2.1.1"
resolved "https://registry.yarnpkg.com/@cbor-extract/cbor-extract-linux-arm64/-/cbor-extract-linux-arm64-2.1.1.tgz#48f78e7d8f0fcc84ed074b6bfa6d15dd83187c63"
integrity sha512-SxAaRcYf8S0QHaMc7gvRSiTSr7nUYMqbUdErBEu+HYA4Q6UNydx1VwFE68hGcp1qvxcy9yT5U7gA+a5XikfwSQ==
"@cbor-extract/cbor-extract-linux-arm@2.1.1":
version "2.1.1"
resolved "https://registry.yarnpkg.com/@cbor-extract/cbor-extract-linux-arm/-/cbor-extract-linux-arm-2.1.1.tgz#7507d346389cb682e44fab8fae9534edd52e2e41"
integrity sha512-ds0uikdcIGUjPyraV4oJqyVE5gl/qYBpa/Wnh6l6xLE2lj/hwnjT2XcZCChdXwW/YFZ1LUHs6waoYN8PmK0nKQ==
"@cbor-extract/cbor-extract-linux-x64@2.1.1":
version "2.1.1"
resolved "https://registry.yarnpkg.com/@cbor-extract/cbor-extract-linux-x64/-/cbor-extract-linux-x64-2.1.1.tgz#b7c1d2be61c58ec18d58afbad52411ded63cd4cd"
integrity sha512-GVK+8fNIE9lJQHAlhOROYiI0Yd4bAZ4u++C2ZjlkS3YmO6hi+FUxe6Dqm+OKWTcMpL/l71N6CQAmaRcb4zyJuA==
"@cbor-extract/cbor-extract-win32-x64@2.1.1":
version "2.1.1"
resolved "https://registry.yarnpkg.com/@cbor-extract/cbor-extract-win32-x64/-/cbor-extract-win32-x64-2.1.1.tgz#21b11a1a3f18c3e7d62fd5f87438b7ed2c64c1f7"
integrity sha512-2Niq1C41dCRIDeD8LddiH+mxGlO7HJ612Ll3D/E73ZWBmycued+8ghTr/Ho3CMOWPUEr08XtyBMVXAjqF+TcKw==
"@cspotcode/source-map-support@^0.8.0":
version "0.8.1"
resolved "https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.8.1.tgz"
@ -4707,6 +4737,13 @@
resolved "https://registry.npmjs.org/@types/stack-utils/-/stack-utils-2.0.1.tgz"
integrity sha512-Hl219/BT5fLAaz6NDkSuhzasy49dwQS/DSdu4MdggFB8zcXv7vflBI3xp7FEmkmdDkBUI2bPUNeMttp2knYdxw==
"@types/ws@^8.5.4":
version "8.5.4"
resolved "https://registry.yarnpkg.com/@types/ws/-/ws-8.5.4.tgz#bb10e36116d6e570dd943735f86c933c1587b8a5"
integrity sha512-zdQDHKUgcX/zBc4GrwsE/7dVdAD8JR4EuiAXiiUhhfyIJXXb2+PrGshFyeXWQPMmmZ2XxgaqclgpIC7eTXc1mg==
dependencies:
"@types/node" "*"
"@types/yargs-parser@*":
version "21.0.0"
resolved "https://registry.npmjs.org/@types/yargs-parser/-/yargs-parser-21.0.0.tgz"
@ -5456,6 +5493,27 @@ caseless@~0.12.0:
resolved "https://registry.npmjs.org/caseless/-/caseless-0.12.0.tgz"
integrity sha512-4tYFyifaFfGacoiObjJegolkwSU4xQNGbVgUiNYVUxbQ2x2lUsFvY4hVgVzGiIe6WLOPqycWXA40l+PWsxthUw==
cbor-extract@^2.1.1:
version "2.1.1"
resolved "https://registry.yarnpkg.com/cbor-extract/-/cbor-extract-2.1.1.tgz#f154b31529fdb6b7c70fb3ca448f44eda96a1b42"
integrity sha512-1UX977+L+zOJHsp0mWFG13GLwO6ucKgSmSW6JTl8B9GUvACvHeIVpFqhU92299Z6PfD09aTXDell5p+lp1rUFA==
dependencies:
node-gyp-build-optional-packages "5.0.3"
optionalDependencies:
"@cbor-extract/cbor-extract-darwin-arm64" "2.1.1"
"@cbor-extract/cbor-extract-darwin-x64" "2.1.1"
"@cbor-extract/cbor-extract-linux-arm" "2.1.1"
"@cbor-extract/cbor-extract-linux-arm64" "2.1.1"
"@cbor-extract/cbor-extract-linux-x64" "2.1.1"
"@cbor-extract/cbor-extract-win32-x64" "2.1.1"
cbor-x@^1.5.1:
version "1.5.1"
resolved "https://registry.yarnpkg.com/cbor-x/-/cbor-x-1.5.1.tgz#d2b0915c556c8ca294bebb4eac7d602218fd63c0"
integrity sha512-/vAkC4tiKCQCm5en4sA+mpKmjwY6Xxp1LO+BgZCNhp+Zow3pomyUHeBOK5EDp0mDaE36jw39l5eLHsoF3M1Lmg==
optionalDependencies:
cbor-extract "^2.1.1"
cborg@^1.6.0:
version "1.9.5"
resolved "https://registry.npmjs.org/cborg/-/cborg-1.9.5.tgz"
@ -8956,6 +9014,11 @@ node-fetch@^2.6.1, node-fetch@^2.6.7:
dependencies:
whatwg-url "^5.0.0"
node-gyp-build-optional-packages@5.0.3:
version "5.0.3"
resolved "https://registry.yarnpkg.com/node-gyp-build-optional-packages/-/node-gyp-build-optional-packages-5.0.3.tgz#92a89d400352c44ad3975010368072b41ad66c17"
integrity sha512-k75jcVzk5wnnc/FMxsf4udAoTEUv2jY3ycfdSd3yWu6Cnd1oee6/CfZJApyscA4FJOmdoixWwiwOyf16RzD5JA==
node-gyp@^5.0.2:
version "5.1.1"
resolved "https://registry.npmjs.org/node-gyp/-/node-gyp-5.1.1.tgz"
@ -11521,6 +11584,11 @@ write-pkg@^4.0.0:
type-fest "^0.4.1"
write-json-file "^3.2.0"
ws@^8.12.0:
version "8.12.0"
resolved "https://registry.yarnpkg.com/ws/-/ws-8.12.0.tgz#485074cc392689da78e1828a9ff23585e06cddd8"
integrity sha512-kU62emKIdKVeEIOIKVegvqpXMSTAMLJozpHZaJNDYqBjzlSYXQGviYwN1osDLJ9av68qHd4a2oSjd7yD4pacig==
xtend@^4.0.0, xtend@~4.0.1:
version "4.0.2"
resolved "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz"