Daniel Holmgren 7f008c05a0
Lexicon refactor (#658)
* remove return in test

* couple of fixups in other pacakges

* Add dummy checks to declaration and follow app migrations, remove paranoid join

* update db nsid migration

* Ensure there are writes in follow app migration

* Add dumy check to votes-to-likes app migration, tidy

* Ensure there are writes in vote-to-like app migration

* update migration name

* pr feedback

* count utf8 & grapheme length

* add maxUtf8

* siwtch max semantics

* plural

* update post schema

* added bytes & cid refs

* add ipld<>json

* fixin up a could tings

* Add app.bsky.richtext.facet, replace post entities with facets

* plural actors

* wip

* Setup backlinks table on pds

* wip

* send & recieve cids/bytes with xrpc

* Track backlinks when indexing records on pds

* handle ipld vals in xrpc server

* added cids & bytes to codegen

* In createRecord, add deletions to avoid duplicate likes/follows/reposts

* Tests and fixes for prevention of dupe follows, likes, reposts

* Backlink migration tidy

* cleanup dag json parser

* Fix dupe backlink inserts

* Tidy

* blob refs + codegen

* Make profile displayName optional

* Test view and updateProfile for empty display name

* working into pds

* Make aggregate counts optional on post and profile views

* Make viewer state optional on post view for consistency

* Remove deprecated myState field on profile view

* Tidy repo method descriptions

* tests & types & fixes

* Implementation and tests for putRecord

* Remove updateProfile method

* Update repo service so that head can be taken for update externally

* Lex updates for compare-and-swap records/commits

* Add error to lex for bad repo compare-and-swaps

* Improve update-at-head thru repo service

* common package

* Implement and test compare-and-swaps on repo write methods

* Use lex discriminator for applyWrites

* Remove post entity/facet index

* Update lex descriptions to clarify repo write semantics

* Make deleteRecord idempotent w/ tests

* cleanup

* fix things up

* adding more formats

* tests

* updating schema

* Only generate tid rkeys on pds, support literal rkeys on client

* Add backlink indexes

* Update format of post embed views, fix external uri validation

* fixing up tests

* Include embeds on record embeds

* cleanup

* Notify users when they are quoted

* Remove determineRkey indirection

* fix api tests

* support concatenated cbor

* integrating to server

* re-enable tests

* fix up tests

* Thread compare-and-swaps down into repo service rather than use pinned storage

* Tidy

* Update packages/common/tests/ipld-multi.test.ts

Co-authored-by: devin ivy <devinivy@gmail.com>

* Update packages/lexicon/src/validators/formats.ts

Co-authored-by: devin ivy <devinivy@gmail.com>

* pr feedback

* pr feedback

* Add postgres-specific migration path for missing profile display names

* Tidy/clarify deep embeds

* Tidy

* rm unused escape

* decrease crud race count

* update subscribeRepos lexicon

* Fix applyWrite lexicon re: collection fields

* sign post event type

* update cids & bytes json encoding

* update lex blob & cid-link types

* updated codegen & pds

* number -> float

* missed a couple

* remove old image constraints

* pr feedback + descripts

* no hardcoded port numbers

* remove separate tooLarge evt

* fix dumb build error

* fixin gup lex + xrpc server

* better parsing of message types

* dont mutate body in subscription

* bugfix in subscription

* rm commented out code

* init feature branch

* undo

* Remove old lexicons

* Remove creator from profile view

* wip

* rework seqs

* fixed up tests

* bug fixing

* sequence handles & notify in dbTxn

* tidy

* update lex to include times

* test syncing handle changes

* one more fix

* handle too big evts

* dont thread sequencer through everything

* Split common into server vs web-friendly versions

* Make lexicon, identifier web-safe using common-web

* Switch api package to be a browser build, fix identifier package for browser bundling

* Fix pds and repo for lexicon package changes, tidy

* Make common-web a browser build, tidy

* fixing up deps

* fix up test

* turn off caching in actions

* Standardize repo write interfaces around repo input

* Update repo write endpoints for repo input field

* Remove scene follows during app migration

* API package updates (#712)

* Add bsky agent and various sugars to the api package

* Add richtext library to api package

* Update richtext to use facets and deprecate entities

* Update richtext to use utf8 indices

* Richtext converts deprecated entity indices from utf16 locations to utf8 locations

* Add note about encodings in the lexicon

* Add RichText facet detection

* Remove dead code

* Add deprecation notices to lexicons

* Usability improvements to RichText

* Update the api package readme

* Add RichText#detectFacetsWithoutResolution

* Add upsertProfile to bsky-agent

* Update packages/pds/src/api/com/atproto/repo/applyWrites.ts

Co-authored-by: devin ivy <devinivy@gmail.com>

* pr feedback

* fix flaky timing streaming tests

* simplify emptyPromise

* fixed up open handles

* fix missed repo syntax

* fix error in test from fkey constraint

* fix another api agent bug

* Embed consistency, add complex record embed

* Tidy embed lex descriptions

* rename pg schemas

* use swc for jest

* fix up deps

* cleanup

* Update pds indexing, views, tests for complex record embeds

* fixing up profile view semantics

* wip

* update snaps

* Rename embed.complexRecord to embed.recordWithMedia

* Tidy aroud record w/ media embeds

* Add grapheme utilities to api RichText (#720)

Co-authored-by: dholms <dtholmgren@gmail.com>

* Fix: app.bsky.feed.getPostThread#... to app.bsky.feed.defs#... (#726)

* Update bskyagent to use repo param

* Minor typing fix

* Add exports to api package: blobref & lex/json converters (#727)

* Add exports to api package: BlobRef & lex/json converters

* Add an example react-native fetch handler

* Switch all lingering references of recordRef to strongRef

* Update lexicon for richtext facets to have multiple features, byte slice rather than text slice

* Implement multi-feature richtext facets on pds

* Update api package to use updated richtext facets

* Minor fixes to admin repo/record views

* Fix app migration exports, remove old app migration

* Fix: sort richtext facets so they can render correctly

* Disable app migration dummy checks that don't work on live deploy

* Optimize lex de/serialization using simple checks

* Tidy comment typos

* App migration to cleanup notifications for likes, follows, old scene notifs

* Fix notification reason for change from vote to like

---------

Co-authored-by: Devin Ivy <devinivy@gmail.com>
Co-authored-by: Paul Frazee <pfrazee@gmail.com>
2023-03-31 13:34:51 -04:00

170 lines
4.4 KiB
TypeScript

import * as http from 'http'
import { once } from 'events'
import { AddressInfo } from 'net'
import { WebSocket } from 'ws'
import { XRPCError } from '@atproto/xrpc'
import {
ErrorFrame,
Frame,
MessageFrame,
XrpcStreamServer,
byFrame,
byMessage,
} from '../src'
describe('Stream', () => {
const wait = (ms) => new Promise((res) => setTimeout(res, ms))
it('streams message and info frames.', async () => {
const httpServer = http.createServer()
const server = new XrpcStreamServer({
server: httpServer,
handler: async function* () {
await wait(1)
yield new MessageFrame(1)
await wait(1)
yield new MessageFrame(2)
await wait(1)
yield new MessageFrame(3)
return
},
})
await once(httpServer.listen(), 'listening')
const { port } = server.wss.address() as AddressInfo
const ws = new WebSocket(`ws://localhost:${port}`)
const frames: Frame[] = []
for await (const frame of byFrame(ws)) {
frames.push(frame)
}
expect(frames).toEqual([
new MessageFrame(1),
new MessageFrame(2),
new MessageFrame(3),
])
httpServer.close()
})
it('kills handler and closes on error frame.', async () => {
let proceededAfterError = false
const httpServer = http.createServer()
const server = new XrpcStreamServer({
server: httpServer,
handler: async function* () {
await wait(1)
yield new MessageFrame(1)
await wait(1)
yield new MessageFrame(2)
await wait(1)
yield new ErrorFrame({ error: 'BadOops' })
proceededAfterError = true
await wait(1)
yield new MessageFrame(3)
return
},
})
await once(httpServer.listen(), 'listening')
const { port } = server.wss.address() as AddressInfo
const ws = new WebSocket(`ws://localhost:${port}`)
const frames: Frame[] = []
for await (const frame of byFrame(ws)) {
frames.push(frame)
}
await wait(5) // Ensure handler hasn't kept running
expect(proceededAfterError).toEqual(false)
expect(frames).toEqual([
new MessageFrame(1),
new MessageFrame(2),
new ErrorFrame({ error: 'BadOops' }),
])
httpServer.close()
})
it('kills handler and closes client disconnect.', async () => {
const httpServer = http.createServer()
let i = 1
const server = new XrpcStreamServer({
server: httpServer,
handler: async function* () {
while (true) {
await wait(0)
yield new MessageFrame(i++)
}
},
})
await once(httpServer.listen(), 'listening')
const { port } = server.wss.address() as AddressInfo
const ws = new WebSocket(`ws://localhost:${port}`)
const frames: Frame[] = []
for await (const frame of byFrame(ws)) {
frames.push(frame)
if (frame.body === 3) ws.terminate()
}
// Grace period to let close take place on the server
await wait(5)
// Ensure handler hasn't kept running
const currentCount = i
await wait(5)
expect(i).toBe(currentCount)
httpServer.close()
})
describe('byMessage()', () => {
it('kills handler and closes client disconnect on error frame.', async () => {
const httpServer = http.createServer()
const server = new XrpcStreamServer({
server: httpServer,
handler: async function* () {
await wait(1)
yield new MessageFrame(1)
await wait(1)
yield new MessageFrame(2)
await wait(1)
yield new ErrorFrame({
error: 'BadOops',
message: 'That was a bad one',
})
await wait(1)
yield new MessageFrame(3)
return
},
})
await once(httpServer.listen(), 'listening')
const { port } = server.wss.address() as AddressInfo
const ws = new WebSocket(`ws://localhost:${port}`)
const frames: Frame[] = []
let error
try {
for await (const frame of byMessage(ws)) {
frames.push(frame)
}
} catch (err) {
error = err
}
expect(ws.readyState).toEqual(ws.CLOSING)
expect(frames).toEqual([new MessageFrame(1), new MessageFrame(2)])
expect(error).toBeInstanceOf(XRPCError)
if (error instanceof XRPCError) {
expect(error.error).toEqual('BadOops')
expect(error.message).toEqual('That was a bad one')
}
httpServer.close()
})
})
})