a07b21151f
* Micro optimization in request proxying * Request NSID parsing optimization * DID document parsing optimization * remove un-necessary call to next() * Allow HandlerPipeThrough to be used with streams * Refactor pipethrough to work with streams * Expose "unicastLookup" DNS lookup and "isUnicastIp" utilities * Use a hardened, HTTP2 compatible, client to perform proxied requests * changeset * tidy * Properly handle compressed streams * tidy * update @types/node * refactor * Improved error management * Expose parseContentEncoding() util * use pipeline from nodejs * Avoid decoding in read-after-write (if possible) * Various fixes * Return Buffer instance from streamToBytes * fixes * Add omit() utility * tidy * lint * typo * Use Buffer instead of ArrayBuffer form pipe through handler result * optimization * tidy * refactor * increase highWaterMark * remove un-necessary type check * Use undici.request where more relevant * Improve soc in fetch utils * feedback * fidy * tidy * test refactor * safer fetch * changeset * expose and re-use extractUrl util * small optimizations * tidy * optimization * build branch --------- Co-authored-by: dholms <dtholmgren@gmail.com>
164 lines
4.3 KiB
TypeScript
164 lines
4.3 KiB
TypeScript
import * as streams from '../src/streams'
|
|
import { PassThrough, Readable } from 'node:stream'
|
|
|
|
describe('streams', () => {
|
|
describe('forwardStreamErrors', () => {
|
|
it('forwards errors through a set of streams', () => {
|
|
const streamA = new PassThrough()
|
|
const streamB = new PassThrough()
|
|
let streamBError: Error | null = null
|
|
const err = new Error('foo')
|
|
|
|
streamB.on('error', (err) => {
|
|
streamBError = err
|
|
})
|
|
|
|
streams.forwardStreamErrors(streamA, streamB)
|
|
|
|
streamA.emit('error', err)
|
|
|
|
expect(streamBError).toBe(err)
|
|
})
|
|
})
|
|
|
|
describe('cloneStream', () => {
|
|
it('should clone stream', () => {
|
|
const stream = new PassThrough()
|
|
let clonedError: Error | undefined
|
|
let clonedData: string | undefined
|
|
|
|
const cloned = streams.cloneStream(stream)
|
|
|
|
cloned.on('data', (data) => {
|
|
clonedData = String(data)
|
|
})
|
|
cloned.on('error', (err) => {
|
|
clonedError = err
|
|
})
|
|
|
|
stream.emit('data', 'foo')
|
|
stream.emit('error', new Error('foo error'))
|
|
|
|
expect(clonedData).toEqual('foo')
|
|
expect(clonedError?.message).toEqual('foo error')
|
|
})
|
|
})
|
|
|
|
describe('streamSize', () => {
|
|
it('reads entire stream and computes size', async () => {
|
|
const stream = Readable.from(['f', 'o', 'o'])
|
|
|
|
const size = await streams.streamSize(stream)
|
|
|
|
expect(size).toBe(3)
|
|
})
|
|
|
|
it('returns 0 for empty streams', async () => {
|
|
const stream = Readable.from([])
|
|
const size = await streams.streamSize(stream)
|
|
|
|
expect(size).toBe(0)
|
|
})
|
|
})
|
|
|
|
describe('streamToNodeBuffer', () => {
|
|
it('converts stream to byte array', async () => {
|
|
const stream = Readable.from(Buffer.from('foo'))
|
|
const bytes = await streams.streamToNodeBuffer(stream)
|
|
|
|
expect(bytes[0]).toBe('f'.charCodeAt(0))
|
|
expect(bytes[1]).toBe('o'.charCodeAt(0))
|
|
expect(bytes[2]).toBe('o'.charCodeAt(0))
|
|
expect(bytes.length).toBe(3)
|
|
})
|
|
|
|
it('converts async iterable to byte array', async () => {
|
|
const iterable = (async function* () {
|
|
yield Buffer.from('b')
|
|
yield Buffer.from('a')
|
|
yield new Uint8Array(['r'.charCodeAt(0)])
|
|
})()
|
|
const bytes = await streams.streamToNodeBuffer(iterable)
|
|
|
|
expect(bytes[0]).toBe('b'.charCodeAt(0))
|
|
expect(bytes[1]).toBe('a'.charCodeAt(0))
|
|
expect(bytes[2]).toBe('r'.charCodeAt(0))
|
|
expect(bytes.length).toBe(3)
|
|
})
|
|
|
|
it('throws error for non Uint8Array chunks', async () => {
|
|
const iterable: AsyncIterable<any> = (async function* () {
|
|
yield Buffer.from('b')
|
|
yield Buffer.from('a')
|
|
yield 'r'
|
|
})()
|
|
|
|
await expect(streams.streamToNodeBuffer(iterable)).rejects.toThrow(
|
|
'expected Uint8Array',
|
|
)
|
|
})
|
|
})
|
|
|
|
describe('byteIterableToStream', () => {
|
|
it('converts byte iterable to stream', async () => {
|
|
const iterable: AsyncIterable<Uint8Array> = {
|
|
async *[Symbol.asyncIterator]() {
|
|
yield new Uint8Array([0xa, 0xb])
|
|
},
|
|
}
|
|
|
|
const stream = streams.byteIterableToStream(iterable)
|
|
|
|
for await (const chunk of stream) {
|
|
expect(chunk[0]).toBe(0xa)
|
|
expect(chunk[1]).toBe(0xb)
|
|
}
|
|
})
|
|
})
|
|
|
|
describe('bytesToStream', () => {
|
|
it('converts byte array to readable stream', async () => {
|
|
const bytes = new Uint8Array([0xa, 0xb])
|
|
const stream = streams.bytesToStream(bytes)
|
|
|
|
for await (const chunk of stream) {
|
|
expect(chunk[0]).toBe(0xa)
|
|
expect(chunk[1]).toBe(0xb)
|
|
}
|
|
})
|
|
})
|
|
|
|
describe('MaxSizeChecker', () => {
|
|
it('destroys once max size is met', async () => {
|
|
const stream = new Readable()
|
|
const err = new Error('foo')
|
|
const checker = new streams.MaxSizeChecker(1, () => err)
|
|
let lastError: Error | undefined
|
|
|
|
const outStream = stream.pipe(checker)
|
|
|
|
outStream.on('error', (err) => {
|
|
lastError = err
|
|
})
|
|
|
|
const waitForStream = new Promise<void>((resolve) => {
|
|
stream.on('end', () => {
|
|
resolve()
|
|
})
|
|
})
|
|
|
|
expect(checker.totalSize).toBe(0)
|
|
|
|
stream.push(new Uint8Array([0xa]))
|
|
stream.push(new Uint8Array([0xb]))
|
|
stream.push(null)
|
|
|
|
await waitForStream
|
|
|
|
expect(checker.totalSize).toBe(2)
|
|
expect(checker.destroyed).toBe(true)
|
|
expect(lastError).toBe(err)
|
|
})
|
|
})
|
|
})
|