atproto/packages/pds/tests/proxied/proxy-catchall.test.ts
Matthieu Sieben 9ffeb5216a
Allow retrying proxied requests (#2850)
* Allow retrying proxied requests

* fix tests

* remove un-necessary code

* Only retry on socket errors
2024-10-31 00:21:05 -04:00

256 lines
6.1 KiB
TypeScript

import AtpAgent from '@atproto/api'
import { Keypair } from '@atproto/crypto'
import { TestNetworkNoAppView } from '@atproto/dev-env'
import { LexiconDoc } from '@atproto/lexicon'
import * as plc from '@did-plc/lib'
import express from 'express'
import { once } from 'node:events'
import http from 'node:http'
import { AddressInfo } from 'node:net'
import { setTimeout as sleep } from 'node:timers/promises'
const lexicons = [
{
lexicon: 1,
id: 'com.example.ok',
defs: {
main: {
type: 'query',
output: {
encoding: 'application/json',
schema: { type: 'object', properties: { foo: { type: 'string' } } },
},
},
},
},
{
lexicon: 1,
id: 'com.example.slow',
defs: {
main: {
type: 'query',
output: {
encoding: 'application/json',
schema: { type: 'object', properties: { foo: { type: 'string' } } },
},
},
},
},
{
lexicon: 1,
id: 'com.example.abort',
defs: {
main: {
type: 'query',
output: {
encoding: 'application/json',
schema: { type: 'object', properties: { foo: { type: 'string' } } },
},
},
},
},
{
lexicon: 1,
id: 'com.example.error',
defs: {
main: {
type: 'query',
output: {
encoding: 'application/json',
schema: { type: 'object', properties: { foo: { type: 'string' } } },
},
},
},
},
] as const satisfies LexiconDoc[]
describe('proxy header', () => {
let network: TestNetworkNoAppView
let alice: AtpAgent
let proxyServer: ProxyServer
beforeAll(async () => {
network = await TestNetworkNoAppView.create({
dbPostgresSchema: 'proxy_catchall',
})
const serviceId = 'proxy_test'
proxyServer = await ProxyServer.create(
network.pds.ctx.plcClient,
network.pds.ctx.plcRotationKey,
serviceId,
)
alice = network.pds.getClient().withProxy(serviceId, proxyServer.did)
for (const lex of lexicons) alice.lex.add(lex)
await alice.createAccount({
email: 'alice@test.com',
handle: 'alice.test',
password: 'alice-pass',
})
await network.processAll()
})
afterAll(async () => {
await proxyServer?.close()
await network?.close()
})
it('rejects when upstream unavailable', async () => {
const serviceId = 'foo_bar'
const proxyServer = await ProxyServer.create(
network.pds.ctx.plcClient,
network.pds.ctx.plcRotationKey,
serviceId,
)
// Make sure the service is not available
await proxyServer.close()
const client = alice.withProxy(serviceId, proxyServer.did)
for (const lex of lexicons) client.lex.add(lex)
await expect(client.call('com.example.ok')).rejects.toThrow(
'Upstream service unreachable',
)
})
it('successfully proxies requests', async () => {
await expect(alice.call('com.example.ok')).resolves.toMatchObject({
data: { foo: 'ok' },
success: true,
})
})
it('handles cancelled upstream requests', async () => {
await expect(alice.call('com.example.abort')).rejects.toThrow('terminated')
})
it('handles failing upstream requests', async () => {
await expect(alice.call('com.example.error')).rejects.toThrowError(
expect.objectContaining({
status: 502,
error: 'FooBar',
message: 'My message',
}),
)
})
it('handles cancelled downstream requests', async () => {
const ac = new AbortController()
setTimeout(() => ac.abort(), 20)
await expect(
alice.call('com.example.slow', {}, undefined, { signal: ac.signal }),
).rejects.toThrow('This operation was aborted')
await expect(alice.call('com.example.slow')).resolves.toMatchObject({
data: { foo: 'slow' },
success: true,
})
})
})
class ProxyServer {
constructor(
private server: http.Server,
public did: string,
) {}
static async create(
plcClient: plc.Client,
keypair: Keypair,
serviceId: string,
): Promise<ProxyServer> {
const app = express()
app.get('/xrpc/com.example.ok', (req, res) => {
res.status(200)
res.setHeader('content-type', 'application/json')
res.send('{"foo":"ok"}')
})
app.get('/xrpc/com.example.slow', async (req, res) => {
const wait = async (ms: number) => {
if (res.destroyed) return
const ac = new AbortController()
const abort = () => ac.abort()
res.on('close', abort)
try {
await sleep(ms, undefined, { signal: ac.signal })
} finally {
res.off('close', abort)
}
}
await wait(50)
res.status(200)
res.setHeader('content-type', 'application/json')
res.flushHeaders()
await wait(50)
for (const char of '{"foo":"slow"}') {
res.write(char)
await wait(10)
}
res.end()
})
app.get('/xrpc/com.example.abort', async (req, res) => {
res.status(200)
res.setHeader('content-type', 'application/json')
res.write('{"foo"')
await sleep(50)
res.destroy(new Error('abort'))
})
app.get('/xrpc/com.example.error', async (req, res) => {
res.status(500).json({ error: 'FooBar', message: 'My message' })
})
const server = app.listen(0)
server.keepAliveTimeout = 30 * 1000
server.headersTimeout = 35 * 1000
await once(server, 'listening')
const { port } = server.address() as AddressInfo
const plcOp = await plc.signOperation(
{
type: 'plc_operation',
rotationKeys: [keypair.did()],
alsoKnownAs: [],
verificationMethods: {},
services: {
[serviceId]: {
type: 'TestAtprotoService',
endpoint: `http://localhost:${port}`,
},
},
prev: null,
},
keypair,
)
const did = await plc.didForCreateOp(plcOp)
await plcClient.sendOperation(did, plcOp)
return new ProxyServer(server, did)
}
async close() {
await new Promise<void>((resolve, reject) => {
this.server.close((err) => {
if (err) reject(err)
else resolve()
})
})
}
}