Handle client abort on image upload ()

This commit is contained in:
devin ivy 2023-02-03 11:19:45 -05:00 committed by GitHub
parent 55234b4b0d
commit 216e434c73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 9 deletions
packages/pds

@ -1,4 +1,5 @@
import { Readable } from 'stream'
import { pipeline } from 'stream/promises'
import sharp from 'sharp'
import { errHasMsg, forwardStreamErrors } from '@atproto/common'
import { formatsToMimes, ImageInfo, Options } from './util'
@ -51,14 +52,14 @@ export async function resize(
export async function maybeGetInfo(
stream: Readable,
): Promise<ImageInfo | null> {
const processor = sharp()
forwardStreamErrors(stream, processor)
stream.pipe(processor)
let metadata
let metadata: sharp.Metadata
try {
metadata = await processor.metadata()
const processor = sharp()
const [result] = await Promise.all([
processor.metadata(),
pipeline(stream, processor), // Handles error propagation
])
metadata = result
} catch (err) {
if (errHasMsg(err, 'Input buffer contains unsupported image format')) {
return null

@ -1,7 +1,7 @@
import fs from 'fs/promises'
import { gzipSync } from 'zlib'
import AtpApi, { ServiceClient as AtpServiceClient } from '@atproto/api'
import { CloseFn, runTestServer } from './_util'
import { CloseFn, runTestServer, TestServerInfo } from './_util'
import { CID } from 'multiformats/cid'
import { Database, ServerConfig } from '../src'
import DiskBlobStore from '../src/storage/disk-blobstore'
@ -24,6 +24,7 @@ const bob = {
}
describe('file uploads', () => {
let server: TestServerInfo
let client: AtpServiceClient
let aliceClient: AtpServiceClient
let bobClient: AtpServiceClient
@ -34,7 +35,7 @@ describe('file uploads', () => {
let close: CloseFn
beforeAll(async () => {
const server = await runTestServer({
server = await runTestServer({
dbPostgresSchema: 'file_uploads',
})
blobstore = server.ctx.blobstore as DiskBlobStore
@ -71,6 +72,30 @@ describe('file uploads', () => {
let smallCid: CID
let smallFile: Uint8Array
it('handles client abort', async () => {
const abortController = new AbortController()
const _putTemp = server.ctx.blobstore.putTemp
server.ctx.blobstore.putTemp = function (...args) {
// Abort just as processing blob in packages/pds/src/services/repo/blobs.ts
process.nextTick(() => abortController.abort())
return _putTemp.call(this, ...args)
}
const response = fetch(`${server.url}/xrpc/com.atproto.blob.upload`, {
method: 'post',
body: Buffer.alloc(5000000), // Enough bytes to get some chunking going on
signal: abortController.signal,
headers: {
'content-type': 'image/jpeg',
authorization: aliceClient.xrpc.headers.authorization,
},
})
await expect(response).rejects.toThrow('operation was aborted')
// Cleanup
server.ctx.blobstore.putTemp = _putTemp
// This test would fail from an uncaught exception: this grace period gives time for that to surface
await new Promise((res) => setTimeout(res, 10))
})
it('uploads files', async () => {
smallFile = await fs.readFile('tests/image/fixtures/key-portrait-small.jpg')
const res = await aliceClient.com.atproto.blob.upload(smallFile, {