* Add linting rule to sort imports * remove spacing between import groups * changeset * changeset * prettier config fine tuning * forbid use of deprecated imports * tidy
		
			
				
	
	
		
			170 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
			
		
		
	
	
			170 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			TypeScript
		
	
	
	
	
	
| import { once } from 'node:events'
 | |
| import * as http from 'node:http'
 | |
| import { AddressInfo } from 'node:net'
 | |
| import { WebSocket } from 'ws'
 | |
| import { XRPCError } from '@atproto/xrpc'
 | |
| import {
 | |
|   ErrorFrame,
 | |
|   Frame,
 | |
|   MessageFrame,
 | |
|   XrpcStreamServer,
 | |
|   byFrame,
 | |
|   byMessage,
 | |
| } from '../src'
 | |
| 
 | |
| describe('Stream', () => {
 | |
|   const wait = (ms) => new Promise((res) => setTimeout(res, ms))
 | |
|   it('streams message and info frames.', async () => {
 | |
|     const httpServer = http.createServer()
 | |
|     const server = new XrpcStreamServer({
 | |
|       server: httpServer,
 | |
|       handler: async function* () {
 | |
|         await wait(1)
 | |
|         yield new MessageFrame(1)
 | |
|         await wait(1)
 | |
|         yield new MessageFrame(2)
 | |
|         await wait(1)
 | |
|         yield new MessageFrame(3)
 | |
|         return
 | |
|       },
 | |
|     })
 | |
| 
 | |
|     await once(httpServer.listen(), 'listening')
 | |
|     const { port } = server.wss.address() as AddressInfo
 | |
| 
 | |
|     const ws = new WebSocket(`ws://localhost:${port}`)
 | |
|     const frames: Frame[] = []
 | |
|     for await (const frame of byFrame(ws)) {
 | |
|       frames.push(frame)
 | |
|     }
 | |
| 
 | |
|     expect(frames).toEqual([
 | |
|       new MessageFrame(1),
 | |
|       new MessageFrame(2),
 | |
|       new MessageFrame(3),
 | |
|     ])
 | |
| 
 | |
|     httpServer.close()
 | |
|   })
 | |
| 
 | |
|   it('kills handler and closes on error frame.', async () => {
 | |
|     let proceededAfterError = false
 | |
|     const httpServer = http.createServer()
 | |
|     const server = new XrpcStreamServer({
 | |
|       server: httpServer,
 | |
|       handler: async function* () {
 | |
|         await wait(1)
 | |
|         yield new MessageFrame(1)
 | |
|         await wait(1)
 | |
|         yield new MessageFrame(2)
 | |
|         await wait(1)
 | |
|         yield new ErrorFrame({ error: 'BadOops' })
 | |
|         proceededAfterError = true
 | |
|         await wait(1)
 | |
|         yield new MessageFrame(3)
 | |
|         return
 | |
|       },
 | |
|     })
 | |
| 
 | |
|     await once(httpServer.listen(), 'listening')
 | |
|     const { port } = server.wss.address() as AddressInfo
 | |
| 
 | |
|     const ws = new WebSocket(`ws://localhost:${port}`)
 | |
|     const frames: Frame[] = []
 | |
|     for await (const frame of byFrame(ws)) {
 | |
|       frames.push(frame)
 | |
|     }
 | |
| 
 | |
|     await wait(5) // Ensure handler hasn't kept running
 | |
|     expect(proceededAfterError).toEqual(false)
 | |
| 
 | |
|     expect(frames).toEqual([
 | |
|       new MessageFrame(1),
 | |
|       new MessageFrame(2),
 | |
|       new ErrorFrame({ error: 'BadOops' }),
 | |
|     ])
 | |
| 
 | |
|     httpServer.close()
 | |
|   })
 | |
| 
 | |
|   it('kills handler and closes client disconnect.', async () => {
 | |
|     const httpServer = http.createServer()
 | |
|     let i = 1
 | |
|     const server = new XrpcStreamServer({
 | |
|       server: httpServer,
 | |
|       handler: async function* () {
 | |
|         while (true) {
 | |
|           await wait(0)
 | |
|           yield new MessageFrame(i++)
 | |
|         }
 | |
|       },
 | |
|     })
 | |
| 
 | |
|     await once(httpServer.listen(), 'listening')
 | |
|     const { port } = server.wss.address() as AddressInfo
 | |
| 
 | |
|     const ws = new WebSocket(`ws://localhost:${port}`)
 | |
|     const frames: Frame[] = []
 | |
|     for await (const frame of byFrame(ws)) {
 | |
|       frames.push(frame)
 | |
|       if (frame.body === 3) ws.terminate()
 | |
|     }
 | |
| 
 | |
|     // Grace period to let close take place on the server
 | |
|     await wait(5)
 | |
|     // Ensure handler hasn't kept running
 | |
|     const currentCount = i
 | |
|     await wait(5)
 | |
|     expect(i).toBe(currentCount)
 | |
| 
 | |
|     httpServer.close()
 | |
|   })
 | |
| 
 | |
|   describe('byMessage()', () => {
 | |
|     it('kills handler and closes client disconnect on error frame.', async () => {
 | |
|       const httpServer = http.createServer()
 | |
|       const server = new XrpcStreamServer({
 | |
|         server: httpServer,
 | |
|         handler: async function* () {
 | |
|           await wait(1)
 | |
|           yield new MessageFrame(1)
 | |
|           await wait(1)
 | |
|           yield new MessageFrame(2)
 | |
|           await wait(1)
 | |
|           yield new ErrorFrame({
 | |
|             error: 'BadOops',
 | |
|             message: 'That was a bad one',
 | |
|           })
 | |
|           await wait(1)
 | |
|           yield new MessageFrame(3)
 | |
|           return
 | |
|         },
 | |
|       })
 | |
|       await once(httpServer.listen(), 'listening')
 | |
|       const { port } = server.wss.address() as AddressInfo
 | |
| 
 | |
|       const ws = new WebSocket(`ws://localhost:${port}`)
 | |
|       const frames: Frame[] = []
 | |
| 
 | |
|       let error
 | |
|       try {
 | |
|         for await (const frame of byMessage(ws)) {
 | |
|           frames.push(frame)
 | |
|         }
 | |
|       } catch (err) {
 | |
|         error = err
 | |
|       }
 | |
| 
 | |
|       expect(ws.readyState).toEqual(ws.CLOSING)
 | |
|       expect(frames).toEqual([new MessageFrame(1), new MessageFrame(2)])
 | |
|       expect(error).toBeInstanceOf(XRPCError)
 | |
|       if (error instanceof XRPCError) {
 | |
|         expect(error.error).toEqual('BadOops')
 | |
|         expect(error.message).toEqual('That was a bad one')
 | |
|       }
 | |
| 
 | |
|       httpServer.close()
 | |
|     })
 | |
|   })
 | |
| })
 |