7f008c05a0
* remove return in test * couple of fixups in other pacakges * Add dummy checks to declaration and follow app migrations, remove paranoid join * update db nsid migration * Ensure there are writes in follow app migration * Add dumy check to votes-to-likes app migration, tidy * Ensure there are writes in vote-to-like app migration * update migration name * pr feedback * count utf8 & grapheme length * add maxUtf8 * siwtch max semantics * plural * update post schema * added bytes & cid refs * add ipld<>json * fixin up a could tings * Add app.bsky.richtext.facet, replace post entities with facets * plural actors * wip * Setup backlinks table on pds * wip * send & recieve cids/bytes with xrpc * Track backlinks when indexing records on pds * handle ipld vals in xrpc server * added cids & bytes to codegen * In createRecord, add deletions to avoid duplicate likes/follows/reposts * Tests and fixes for prevention of dupe follows, likes, reposts * Backlink migration tidy * cleanup dag json parser * Fix dupe backlink inserts * Tidy * blob refs + codegen * Make profile displayName optional * Test view and updateProfile for empty display name * working into pds * Make aggregate counts optional on post and profile views * Make viewer state optional on post view for consistency * Remove deprecated myState field on profile view * Tidy repo method descriptions * tests & types & fixes * Implementation and tests for putRecord * Remove updateProfile method * Update repo service so that head can be taken for update externally * Lex updates for compare-and-swap records/commits * Add error to lex for bad repo compare-and-swaps * Improve update-at-head thru repo service * common package * Implement and test compare-and-swaps on repo write methods * Use lex discriminator for applyWrites * Remove post entity/facet index * Update lex descriptions to clarify repo write semantics * Make deleteRecord idempotent w/ tests * cleanup * fix things up * adding more formats * tests * updating schema * Only generate tid rkeys on pds, support literal rkeys on client * Add backlink indexes * Update format of post embed views, fix external uri validation * fixing up tests * Include embeds on record embeds * cleanup * Notify users when they are quoted * Remove determineRkey indirection * fix api tests * support concatenated cbor * integrating to server * re-enable tests * fix up tests * Thread compare-and-swaps down into repo service rather than use pinned storage * Tidy * Update packages/common/tests/ipld-multi.test.ts Co-authored-by: devin ivy <devinivy@gmail.com> * Update packages/lexicon/src/validators/formats.ts Co-authored-by: devin ivy <devinivy@gmail.com> * pr feedback * pr feedback * Add postgres-specific migration path for missing profile display names * Tidy/clarify deep embeds * Tidy * rm unused escape * decrease crud race count * update subscribeRepos lexicon * Fix applyWrite lexicon re: collection fields * sign post event type * update cids & bytes json encoding * update lex blob & cid-link types * updated codegen & pds * number -> float * missed a couple * remove old image constraints * pr feedback + descripts * no hardcoded port numbers * remove separate tooLarge evt * fix dumb build error * fixin gup lex + xrpc server * better parsing of message types * dont mutate body in subscription * bugfix in subscription * rm commented out code * init feature branch * undo * Remove old lexicons * Remove creator from profile view * wip * rework seqs * fixed up tests * bug fixing * sequence handles & notify in dbTxn * tidy * update lex to include times * test syncing handle changes * one more fix * handle too big evts * dont thread sequencer through everything * Split common into server vs web-friendly versions * Make lexicon, identifier web-safe using common-web * Switch api package to be a browser build, fix identifier package for browser bundling * Fix pds and repo for lexicon package changes, tidy * Make common-web a browser build, tidy * fixing up deps * fix up test * turn off caching in actions * Standardize repo write interfaces around repo input * Update repo write endpoints for repo input field * Remove scene follows during app migration * API package updates (#712) * Add bsky agent and various sugars to the api package * Add richtext library to api package * Update richtext to use facets and deprecate entities * Update richtext to use utf8 indices * Richtext converts deprecated entity indices from utf16 locations to utf8 locations * Add note about encodings in the lexicon * Add RichText facet detection * Remove dead code * Add deprecation notices to lexicons * Usability improvements to RichText * Update the api package readme * Add RichText#detectFacetsWithoutResolution * Add upsertProfile to bsky-agent * Update packages/pds/src/api/com/atproto/repo/applyWrites.ts Co-authored-by: devin ivy <devinivy@gmail.com> * pr feedback * fix flaky timing streaming tests * simplify emptyPromise * fixed up open handles * fix missed repo syntax * fix error in test from fkey constraint * fix another api agent bug * Embed consistency, add complex record embed * Tidy embed lex descriptions * rename pg schemas * use swc for jest * fix up deps * cleanup * Update pds indexing, views, tests for complex record embeds * fixing up profile view semantics * wip * update snaps * Rename embed.complexRecord to embed.recordWithMedia * Tidy aroud record w/ media embeds * Add grapheme utilities to api RichText (#720) Co-authored-by: dholms <dtholmgren@gmail.com> * Fix: app.bsky.feed.getPostThread#... to app.bsky.feed.defs#... (#726) * Update bskyagent to use repo param * Minor typing fix * Add exports to api package: blobref & lex/json converters (#727) * Add exports to api package: BlobRef & lex/json converters * Add an example react-native fetch handler * Switch all lingering references of recordRef to strongRef * Update lexicon for richtext facets to have multiple features, byte slice rather than text slice * Implement multi-feature richtext facets on pds * Update api package to use updated richtext facets * Minor fixes to admin repo/record views * Fix app migration exports, remove old app migration * Fix: sort richtext facets so they can render correctly * Disable app migration dummy checks that don't work on live deploy * Optimize lex de/serialization using simple checks * Tidy comment typos * App migration to cleanup notifications for likes, follows, old scene notifs * Fix notification reason for change from vote to like --------- Co-authored-by: Devin Ivy <devinivy@gmail.com> Co-authored-by: Paul Frazee <pfrazee@gmail.com>
170 lines
4.4 KiB
TypeScript
170 lines
4.4 KiB
TypeScript
import * as http from 'http'
|
|
import { once } from 'events'
|
|
import { AddressInfo } from 'net'
|
|
import { WebSocket } from 'ws'
|
|
import { XRPCError } from '@atproto/xrpc'
|
|
import {
|
|
ErrorFrame,
|
|
Frame,
|
|
MessageFrame,
|
|
XrpcStreamServer,
|
|
byFrame,
|
|
byMessage,
|
|
} from '../src'
|
|
|
|
describe('Stream', () => {
|
|
const wait = (ms) => new Promise((res) => setTimeout(res, ms))
|
|
it('streams message and info frames.', async () => {
|
|
const httpServer = http.createServer()
|
|
const server = new XrpcStreamServer({
|
|
server: httpServer,
|
|
handler: async function* () {
|
|
await wait(1)
|
|
yield new MessageFrame(1)
|
|
await wait(1)
|
|
yield new MessageFrame(2)
|
|
await wait(1)
|
|
yield new MessageFrame(3)
|
|
return
|
|
},
|
|
})
|
|
|
|
await once(httpServer.listen(), 'listening')
|
|
const { port } = server.wss.address() as AddressInfo
|
|
|
|
const ws = new WebSocket(`ws://localhost:${port}`)
|
|
const frames: Frame[] = []
|
|
for await (const frame of byFrame(ws)) {
|
|
frames.push(frame)
|
|
}
|
|
|
|
expect(frames).toEqual([
|
|
new MessageFrame(1),
|
|
new MessageFrame(2),
|
|
new MessageFrame(3),
|
|
])
|
|
|
|
httpServer.close()
|
|
})
|
|
|
|
it('kills handler and closes on error frame.', async () => {
|
|
let proceededAfterError = false
|
|
const httpServer = http.createServer()
|
|
const server = new XrpcStreamServer({
|
|
server: httpServer,
|
|
handler: async function* () {
|
|
await wait(1)
|
|
yield new MessageFrame(1)
|
|
await wait(1)
|
|
yield new MessageFrame(2)
|
|
await wait(1)
|
|
yield new ErrorFrame({ error: 'BadOops' })
|
|
proceededAfterError = true
|
|
await wait(1)
|
|
yield new MessageFrame(3)
|
|
return
|
|
},
|
|
})
|
|
|
|
await once(httpServer.listen(), 'listening')
|
|
const { port } = server.wss.address() as AddressInfo
|
|
|
|
const ws = new WebSocket(`ws://localhost:${port}`)
|
|
const frames: Frame[] = []
|
|
for await (const frame of byFrame(ws)) {
|
|
frames.push(frame)
|
|
}
|
|
|
|
await wait(5) // Ensure handler hasn't kept running
|
|
expect(proceededAfterError).toEqual(false)
|
|
|
|
expect(frames).toEqual([
|
|
new MessageFrame(1),
|
|
new MessageFrame(2),
|
|
new ErrorFrame({ error: 'BadOops' }),
|
|
])
|
|
|
|
httpServer.close()
|
|
})
|
|
|
|
it('kills handler and closes client disconnect.', async () => {
|
|
const httpServer = http.createServer()
|
|
let i = 1
|
|
const server = new XrpcStreamServer({
|
|
server: httpServer,
|
|
handler: async function* () {
|
|
while (true) {
|
|
await wait(0)
|
|
yield new MessageFrame(i++)
|
|
}
|
|
},
|
|
})
|
|
|
|
await once(httpServer.listen(), 'listening')
|
|
const { port } = server.wss.address() as AddressInfo
|
|
|
|
const ws = new WebSocket(`ws://localhost:${port}`)
|
|
const frames: Frame[] = []
|
|
for await (const frame of byFrame(ws)) {
|
|
frames.push(frame)
|
|
if (frame.body === 3) ws.terminate()
|
|
}
|
|
|
|
// Grace period to let close take place on the server
|
|
await wait(5)
|
|
// Ensure handler hasn't kept running
|
|
const currentCount = i
|
|
await wait(5)
|
|
expect(i).toBe(currentCount)
|
|
|
|
httpServer.close()
|
|
})
|
|
|
|
describe('byMessage()', () => {
|
|
it('kills handler and closes client disconnect on error frame.', async () => {
|
|
const httpServer = http.createServer()
|
|
const server = new XrpcStreamServer({
|
|
server: httpServer,
|
|
handler: async function* () {
|
|
await wait(1)
|
|
yield new MessageFrame(1)
|
|
await wait(1)
|
|
yield new MessageFrame(2)
|
|
await wait(1)
|
|
yield new ErrorFrame({
|
|
error: 'BadOops',
|
|
message: 'That was a bad one',
|
|
})
|
|
await wait(1)
|
|
yield new MessageFrame(3)
|
|
return
|
|
},
|
|
})
|
|
await once(httpServer.listen(), 'listening')
|
|
const { port } = server.wss.address() as AddressInfo
|
|
|
|
const ws = new WebSocket(`ws://localhost:${port}`)
|
|
const frames: Frame[] = []
|
|
|
|
let error
|
|
try {
|
|
for await (const frame of byMessage(ws)) {
|
|
frames.push(frame)
|
|
}
|
|
} catch (err) {
|
|
error = err
|
|
}
|
|
|
|
expect(ws.readyState).toEqual(ws.CLOSING)
|
|
expect(frames).toEqual([new MessageFrame(1), new MessageFrame(2)])
|
|
expect(error).toBeInstanceOf(XRPCError)
|
|
if (error instanceof XRPCError) {
|
|
expect(error.error).toEqual('BadOops')
|
|
expect(error.message).toEqual('That was a bad one')
|
|
}
|
|
|
|
httpServer.close()
|
|
})
|
|
})
|
|
})
|