Matthieu Sieben 61dc0d60e1
Add linting rule to sort imports (#3220)
* Add linting rule to sort imports

* remove spacing between import groups

* changeset

* changeset

* prettier config fine tuning

* forbid use of deprecated imports

* tidy
2025-02-05 15:06:58 +01:00

170 lines
4.4 KiB
TypeScript

import { once } from 'node:events'
import * as http from 'node:http'
import { AddressInfo } from 'node:net'
import { WebSocket } from 'ws'
import { XRPCError } from '@atproto/xrpc'
import {
ErrorFrame,
Frame,
MessageFrame,
XrpcStreamServer,
byFrame,
byMessage,
} from '../src'
describe('Stream', () => {
const wait = (ms) => new Promise((res) => setTimeout(res, ms))
it('streams message and info frames.', async () => {
const httpServer = http.createServer()
const server = new XrpcStreamServer({
server: httpServer,
handler: async function* () {
await wait(1)
yield new MessageFrame(1)
await wait(1)
yield new MessageFrame(2)
await wait(1)
yield new MessageFrame(3)
return
},
})
await once(httpServer.listen(), 'listening')
const { port } = server.wss.address() as AddressInfo
const ws = new WebSocket(`ws://localhost:${port}`)
const frames: Frame[] = []
for await (const frame of byFrame(ws)) {
frames.push(frame)
}
expect(frames).toEqual([
new MessageFrame(1),
new MessageFrame(2),
new MessageFrame(3),
])
httpServer.close()
})
it('kills handler and closes on error frame.', async () => {
let proceededAfterError = false
const httpServer = http.createServer()
const server = new XrpcStreamServer({
server: httpServer,
handler: async function* () {
await wait(1)
yield new MessageFrame(1)
await wait(1)
yield new MessageFrame(2)
await wait(1)
yield new ErrorFrame({ error: 'BadOops' })
proceededAfterError = true
await wait(1)
yield new MessageFrame(3)
return
},
})
await once(httpServer.listen(), 'listening')
const { port } = server.wss.address() as AddressInfo
const ws = new WebSocket(`ws://localhost:${port}`)
const frames: Frame[] = []
for await (const frame of byFrame(ws)) {
frames.push(frame)
}
await wait(5) // Ensure handler hasn't kept running
expect(proceededAfterError).toEqual(false)
expect(frames).toEqual([
new MessageFrame(1),
new MessageFrame(2),
new ErrorFrame({ error: 'BadOops' }),
])
httpServer.close()
})
it('kills handler and closes client disconnect.', async () => {
const httpServer = http.createServer()
let i = 1
const server = new XrpcStreamServer({
server: httpServer,
handler: async function* () {
while (true) {
await wait(0)
yield new MessageFrame(i++)
}
},
})
await once(httpServer.listen(), 'listening')
const { port } = server.wss.address() as AddressInfo
const ws = new WebSocket(`ws://localhost:${port}`)
const frames: Frame[] = []
for await (const frame of byFrame(ws)) {
frames.push(frame)
if (frame.body === 3) ws.terminate()
}
// Grace period to let close take place on the server
await wait(5)
// Ensure handler hasn't kept running
const currentCount = i
await wait(5)
expect(i).toBe(currentCount)
httpServer.close()
})
describe('byMessage()', () => {
it('kills handler and closes client disconnect on error frame.', async () => {
const httpServer = http.createServer()
const server = new XrpcStreamServer({
server: httpServer,
handler: async function* () {
await wait(1)
yield new MessageFrame(1)
await wait(1)
yield new MessageFrame(2)
await wait(1)
yield new ErrorFrame({
error: 'BadOops',
message: 'That was a bad one',
})
await wait(1)
yield new MessageFrame(3)
return
},
})
await once(httpServer.listen(), 'listening')
const { port } = server.wss.address() as AddressInfo
const ws = new WebSocket(`ws://localhost:${port}`)
const frames: Frame[] = []
let error
try {
for await (const frame of byMessage(ws)) {
frames.push(frame)
}
} catch (err) {
error = err
}
expect(ws.readyState).toEqual(ws.CLOSING)
expect(frames).toEqual([new MessageFrame(1), new MessageFrame(2)])
expect(error).toBeInstanceOf(XRPCError)
if (error instanceof XRPCError) {
expect(error.error).toEqual('BadOops')
expect(error.message).toEqual('That was a bad one')
}
httpServer.close()
})
})
})