File uploads & profile image support ()

* Setup interface for image processing

* Implement getInfo() on SharpImageProcessor

* Reorganize pds image processing code

* Implement initial resize() on SharpImageProcessor

* Test sharp image processor, apply a couple fixes

* Tidy

* wip

* improve repo processing & add blbos

* more blob processing

* work into routes & config

* testing out w profiles

* Implement initial pds image uri builder/signer/verifier

* pr fixup & tests

* streaming

* better streaming interface

* s3 interface

* fix s3 prefixes

* readable streams as xrpc inputs

* more tests + cleaning up apis

* tests for failed references

* Initial implementation of image processing server

* Update node types for node v18

* fix clone issue

* add getStream to blobstore

* fixing up tests & dev-env

* get img info on upload

* integrating img processing

* hex for img uri & fixing build errors

* improve streams

* hook up uris to getProfile

* pr feedback on tmp

* test on getprofile

* testing avatars

* fix snapshot after hmac changed

* quick cleanup

Co-authored-by: Devin Ivy <devinivy@gmail.com>
This commit is contained in:
Daniel Holmgren 2022-12-07 18:08:29 -06:00 committed by GitHub
parent d19fbe5473
commit d0b7497a08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
80 changed files with 2635 additions and 407 deletions

@ -29,6 +29,7 @@
"type": "string",
"maxLength": 256
},
"avatar": { "type": "string" },
"followersCount": {"type": "integer"},
"followsCount": {"type": "integer"},
"membersCount": {"type": "integer"},

@ -16,6 +16,13 @@
"description": {
"type": "string",
"maxLength": 256
},
"avatar": {
"type": "image",
"accept": ["image/png", "image/jpeg"],
"maxWidth": 500,
"maxHeight": 500,
"maxSize": 100000
}
}
}

@ -18,6 +18,13 @@
"description": {
"type": "string",
"maxLength": 256
},
"avatar": {
"type": "image",
"accept": ["image/png", "image/jpeg"],
"maxWidth": 500,
"maxHeight": 500,
"maxSize": 100000
}
}
}

@ -0,0 +1,23 @@
{
"lexicon": 1,
"id": "com.atproto.data.uploadFile",
"defs": {
"main": {
"type": "procedure",
"description": "Upload a new file to be added to repo in a later request.",
"input": {
"encoding": "*/*"
},
"output": {
"encoding": "application/json",
"schema": {
"type": "object",
"required": ["cid"],
"properties": {
"cid": {"type": "string"}
}
}
}
}
}
}

@ -12,6 +12,7 @@ import * as ComAtprotoAccountDelete from './types/com/atproto/account/delete'
import * as ComAtprotoAccountGet from './types/com/atproto/account/get'
import * as ComAtprotoAccountRequestPasswordReset from './types/com/atproto/account/requestPasswordReset'
import * as ComAtprotoAccountResetPassword from './types/com/atproto/account/resetPassword'
import * as ComAtprotoDataUploadFile from './types/com/atproto/data/uploadFile'
import * as ComAtprotoHandleResolve from './types/com/atproto/handle/resolve'
import * as ComAtprotoRepoBatchWrite from './types/com/atproto/repo/batchWrite'
import * as ComAtprotoRepoCreateRecord from './types/com/atproto/repo/createRecord'
@ -72,6 +73,7 @@ export * as ComAtprotoAccountDelete from './types/com/atproto/account/delete'
export * as ComAtprotoAccountGet from './types/com/atproto/account/get'
export * as ComAtprotoAccountRequestPasswordReset from './types/com/atproto/account/requestPasswordReset'
export * as ComAtprotoAccountResetPassword from './types/com/atproto/account/resetPassword'
export * as ComAtprotoDataUploadFile from './types/com/atproto/data/uploadFile'
export * as ComAtprotoHandleResolve from './types/com/atproto/handle/resolve'
export * as ComAtprotoRepoBatchWrite from './types/com/atproto/repo/batchWrite'
export * as ComAtprotoRepoCreateRecord from './types/com/atproto/repo/createRecord'
@ -181,6 +183,7 @@ export class ComNS {
export class AtprotoNS {
_service: ServiceClient
account: AccountNS
data: DataNS
handle: HandleNS
repo: RepoNS
server: ServerNS
@ -190,6 +193,7 @@ export class AtprotoNS {
constructor(service: ServiceClient) {
this._service = service
this.account = new AccountNS(service)
this.data = new DataNS(service)
this.handle = new HandleNS(service)
this.repo = new RepoNS(service)
this.server = new ServerNS(service)
@ -272,6 +276,25 @@ export class AccountNS {
}
}
export class DataNS {
_service: ServiceClient
constructor(service: ServiceClient) {
this._service = service
}
uploadFile(
data?: ComAtprotoDataUploadFile.InputSchema,
opts?: ComAtprotoDataUploadFile.CallOptions,
): Promise<ComAtprotoDataUploadFile.Response> {
return this._service.xrpc
.call('com.atproto.data.uploadFile', opts?.qp, data, opts)
.catch((e) => {
throw ComAtprotoDataUploadFile.toKnownErr(e)
})
}
}
export class HandleNS {
_service: ServiceClient

@ -182,6 +182,32 @@ export const lexicons: LexiconDoc[] = [
},
},
},
{
lexicon: 1,
id: 'com.atproto.data.uploadFile',
defs: {
main: {
type: 'procedure',
description:
'Upload a new file to be added to repo in a later request.',
input: {
encoding: '*/*',
},
output: {
encoding: 'application/json',
schema: {
type: 'object',
required: ['cid'],
properties: {
cid: {
type: 'string',
},
},
},
},
},
},
},
{
lexicon: 1,
id: 'com.atproto.handle.resolve',
@ -987,6 +1013,9 @@ export const lexicons: LexiconDoc[] = [
type: 'string',
maxLength: 256,
},
avatar: {
type: 'string',
},
followersCount: {
type: 'integer',
},
@ -1121,6 +1150,13 @@ export const lexicons: LexiconDoc[] = [
type: 'string',
maxLength: 256,
},
avatar: {
type: 'image',
accept: ['image/png', 'image/jpeg'],
maxWidth: 500,
maxHeight: 500,
maxSize: 100000,
},
},
},
},
@ -1322,6 +1358,13 @@ export const lexicons: LexiconDoc[] = [
type: 'string',
maxLength: 256,
},
avatar: {
type: 'image',
accept: ['image/png', 'image/jpeg'],
maxWidth: 500,
maxHeight: 500,
maxSize: 100000,
},
},
},
},
@ -2914,6 +2957,7 @@ export const ids = {
ComAtprotoAccountRequestPasswordReset:
'com.atproto.account.requestPasswordReset',
ComAtprotoAccountResetPassword: 'com.atproto.account.resetPassword',
ComAtprotoDataUploadFile: 'com.atproto.data.uploadFile',
ComAtprotoHandleResolve: 'com.atproto.handle.resolve',
ComAtprotoRepoBatchWrite: 'com.atproto.repo.batchWrite',
ComAtprotoRepoCreateRecord: 'com.atproto.repo.createRecord',

@ -17,6 +17,7 @@ export interface OutputSchema {
creator: string
displayName?: string
description?: string
avatar?: string
followersCount: number
followsCount: number
membersCount: number

@ -4,5 +4,6 @@
export interface Record {
displayName: string
description?: string
avatar?: { cid: string; mimeType: string; [k: string]: unknown }
[k: string]: unknown
}

@ -9,6 +9,7 @@ export interface InputSchema {
did?: string
displayName?: string
description?: string
avatar?: { cid: string; mimeType: string; [k: string]: unknown }
[k: string]: unknown
}

@ -0,0 +1,31 @@
/**
* GENERATED CODE - DO NOT MODIFY
*/
import { Headers, XRPCError } from '@atproto/xrpc'
export interface QueryParams {}
export type InputSchema = string | Uint8Array
export interface OutputSchema {
cid: string
[k: string]: unknown
}
export interface CallOptions {
headers?: Headers
qp?: QueryParams
encoding: '*/*'
}
export interface Response {
success: boolean
headers: Headers
data: OutputSchema
}
export function toKnownErr(e: any) {
if (e instanceof XRPCError) {
}
return e
}

@ -1,5 +1,5 @@
{
"name": "@atproto/aws-kms",
"name": "@atproto/aws",
"version": "0.0.1",
"main": "src/index.ts",
"license": "MIT",
@ -17,6 +17,7 @@
"dependencies": {
"@atproto/crypto": "*",
"@aws-sdk/client-kms": "^3.196.0",
"@aws-sdk/client-s3": "^3.224.0",
"@noble/secp256k1": "^1.7.0",
"key-encoder": "^2.0.3"
}

@ -0,0 +1,2 @@
export * from './kms'
export * from './s3'

95
packages/aws/src/s3.ts Normal file

@ -0,0 +1,95 @@
import * as aws from '@aws-sdk/client-s3'
import { BlobStore } from '@atproto/repo'
import { randomStr } from '@atproto/crypto'
import { CID } from 'multiformats/cid'
import stream from 'stream'
export type S3Config = { bucket: string } & Omit<
aws.S3ClientConfig,
'apiVersion'
>
export class S3BlobStore implements BlobStore {
private client: aws.S3
private bucket: string
constructor(cfg: S3Config) {
const { bucket, ...rest } = cfg
this.bucket = bucket
this.client = new aws.S3({
...rest,
apiVersion: '2006-03-01',
})
}
private genKey() {
return randomStr(32, 'base32')
}
private getTmpPath(key: string): string {
return `tmp/${key}`
}
private getStoredPath(cid: CID): string {
return `blocks/${cid.toString()}`
}
async putTemp(bytes: Uint8Array | stream.Readable): Promise<string> {
const key = this.genKey()
await this.client.putObject({
Bucket: this.bucket,
Body: bytes,
Key: this.getTmpPath(key),
})
return key
}
async makePermanent(key: string, cid: CID): Promise<void> {
const tmpPath = this.getTmpPath(key)
await this.client.copyObject({
Bucket: this.bucket,
CopySource: `${this.bucket}/${tmpPath}`,
Key: this.getStoredPath(cid),
})
await this.client.deleteObject({
Bucket: this.bucket,
Key: tmpPath,
})
}
async putPermanent(
cid: CID,
bytes: Uint8Array | stream.Readable,
): Promise<void> {
await this.client.putObject({
Bucket: this.bucket,
Body: bytes,
Key: this.getStoredPath(cid),
})
}
private async getObject(cid: CID) {
const res = await this.client.getObject({
Bucket: this.bucket,
Key: this.getStoredPath(cid),
})
if (res.Body) {
return res.Body
} else {
throw new Error(`Could not get blob: ${cid.toString()}`)
}
}
async getBytes(cid: CID): Promise<Uint8Array> {
const res = await this.getObject(cid)
return res.transformToByteArray()
}
async getStream(cid: CID): Promise<stream.Readable> {
const res = await this.getObject(cid)
return res as stream.Readable
}
}
export default S3BlobStore

@ -1,6 +1,8 @@
import { CID } from 'multiformats/cid'
import * as Block from 'multiformats/block'
import * as rawCodec from 'multiformats/codecs/raw'
import { sha256 as blockHasher } from 'multiformats/hashes/sha2'
import * as mf from 'multiformats'
import * as blockCodec from '@ipld/dag-cbor'
export const valueToIpldBlock = async (
@ -13,6 +15,11 @@ export const valueToIpldBlock = async (
})
}
export const sha256RawToCid = (hash: Uint8Array): CID => {
const digest = mf.digest.create(blockHasher.code, hash)
return CID.createV1(rawCodec.code, digest)
}
export const cidForData = async (data: unknown): Promise<CID> => {
const block = await valueToIpldBlock(data)
return block.cid

@ -6,3 +6,4 @@ export * from './tid'
export * from './blocks'
export * from './logger'
export * from './types'
export * from './streams'

@ -0,0 +1,25 @@
import { Stream, Readable, PassThrough } from 'stream'
export const forwardStreamErrors = (...streams: Stream[]) => {
for (let i = 0; i < streams.length; ++i) {
const stream = streams[i]
const next = streams[i + 1]
if (next) {
stream.once('error', (err) => next.emit('error', err))
}
}
}
export const cloneStream = (stream: Readable): Readable => {
const passthrough = new PassThrough()
forwardStreamErrors(stream, passthrough)
return stream.pipe(passthrough)
}
export const streamSize = async (stream: Readable): Promise<number> => {
let size = 0
for await (const chunk of stream) {
size += chunk.length
}
return size
}

@ -63,3 +63,9 @@ export const asyncFilter = async <T>(
const results = await Promise.all(arr.map((t) => fn(t)))
return arr.filter((_, i) => results[i])
}
export const isErrnoException = (
err: unknown,
): err is NodeJS.ErrnoException => {
return !!err && 'code' in err
}

@ -1,5 +1,7 @@
import * as mf from 'multiformats/hashes/sha2'
import * as uint8arrays from 'uint8arrays'
import crypto from 'crypto'
import { Readable } from 'stream'
// takes either bytes of utf8 input
export const sha256 = async (
@ -10,3 +12,17 @@ export const sha256 = async (
const hash = await mf.sha256.digest(bytes)
return hash.digest
}
export const sha256Stream = async (stream: Readable): Promise<Uint8Array> => {
const hash = crypto.createHash('sha256')
try {
for await (const chunk of stream) {
hash.write(chunk)
}
} catch (err) {
hash.end()
throw err
}
hash.end()
return hash.read()
}

@ -1,7 +1,10 @@
import http from 'http'
import chalk from 'chalk'
import crytpo from 'crypto'
import PDSServer, { Database as PDSDatabase } from '@atproto/pds'
import PDSServer, {
Database as PDSDatabase,
MemoryBlobStore,
} from '@atproto/pds'
import * as plc from '@atproto/plc'
import * as crypto from '@atproto/crypto'
import AtpApi, { ServiceClient } from '@atproto/api'
@ -59,6 +62,8 @@ export class DevEnvServer {
await db.migrateToLatestOrThrow()
const keypair = await crypto.EcdsaKeypair.create()
const blobstore = new MemoryBlobStore()
const plcClient = new plc.PlcClient(this.env.plcUrl)
const serverDid = await plcClient.createDid(
keypair,
@ -68,7 +73,7 @@ export class DevEnvServer {
)
this.inst = await onServerReady(
PDSServer(db, keypair, {
PDSServer(db, blobstore, keypair, {
debugMode: true,
version: '0.0.0',
scheme: 'http',
@ -84,6 +89,9 @@ export class DevEnvServer {
emailNoReplyAddress: 'noreply@blueskyweb.xyz',
adminPassword: 'password',
inviteRequired: false,
imgUriSalt: '9dd04221f5755bce5f55f47464c27e1e',
imgUriKey:
'f23ecd142835025f42c3db2cf25dd813956c178392760256211f9d315f8ab4d8',
privacyPolicyUrl: 'https://example.com/privacy',
termsOfServiceUrl: 'https://example.com/tos',
}).listener,

@ -279,6 +279,19 @@ const lexiconTs = (project, lexicons: Lexicons, lexiconDoc: LexiconDoc) =>
moduleSpecifier: 'express',
defaultImport: 'express',
})
const streamingInput =
main?.type === 'procedure' &&
main.input?.encoding &&
!main.input.schema
const streamingOutput = main.output?.encoding && !main.output.schema
if (streamingInput || streamingOutput) {
//= import stream from 'stream'
file.addImportDeclaration({
moduleSpecifier: 'stream',
defaultImport: 'stream',
})
}
}
for (const defId in lexiconDoc.defs) {
@ -333,13 +346,13 @@ function genServerXrpcCommon(
if (def.input.encoding.includes(',')) {
handlerInput.addProperty({
name: 'body',
type: 'InputSchema | Uint8Array',
type: 'InputSchema | stream.Readable',
})
} else {
handlerInput.addProperty({ name: 'body', type: 'InputSchema' })
}
} else if (def.input.encoding) {
handlerInput.addProperty({ name: 'body', type: 'Uint8Array' })
handlerInput.addProperty({ name: 'body', type: 'stream.Readable' })
}
} else {
file.addTypeAlias({
@ -370,13 +383,16 @@ function genServerXrpcCommon(
if (def.output.encoding.includes(',')) {
handlerSuccess.addProperty({
name: 'body',
type: 'OutputSchema | Uint8Array',
type: 'OutputSchema | Uint8Array | stream.Readable',
})
} else {
handlerSuccess.addProperty({ name: 'body', type: 'OutputSchema' })
}
} else if (def.output?.encoding) {
handlerSuccess.addProperty({ name: 'body', type: 'Uint8Array' })
handlerSuccess.addProperty({
name: 'body',
type: 'Uint8Array | stream.Readable',
})
}
}

@ -7,7 +7,7 @@ import * as locals from '../../../../locals'
import * as lexicons from '../../../../lexicon/lexicons'
import { TID } from '@atproto/common'
import { UserAlreadyExistsError } from '../../../../db'
import * as repoUtil from '../../../../util/repo'
import * as repo from '../../../../repo'
import ServerAuth from '../../../../auth'
export default function (server: Server) {
@ -97,7 +97,7 @@ export default function (server: Server) {
const userAuth = locals.getAuthstore(res, requester)
const sceneAuth = locals.getAuthstore(res, did)
const sceneWrites = await repoUtil.prepareCreates(did, [
const sceneWrites = await repo.prepareCreates(did, [
{
action: 'create',
collection: lexicons.ids.AppBskySystemDeclaration,
@ -133,7 +133,7 @@ export default function (server: Server) {
])
const [sceneDeclaration, creatorAssert, memberAssert] = sceneWrites
const userWrites = await repoUtil.prepareCreates(requester, [
const userWrites = await repo.prepareCreates(requester, [
{
action: 'create',
collection: lexicons.ids.AppBskyGraphConfirmation,
@ -169,9 +169,9 @@ export default function (server: Server) {
])
await Promise.all([
repoUtil.createRepo(dbTxn, did, sceneAuth, sceneWrites, now),
repoUtil.writeToRepo(dbTxn, requester, userAuth, userWrites, now),
repoUtil.indexWrites(dbTxn, [...sceneWrites, ...userWrites], now),
repo.createRepo(dbTxn, did, sceneAuth, sceneWrites, now),
repo.writeToRepo(dbTxn, requester, userAuth, userWrites, now),
repo.indexWrites(dbTxn, [...sceneWrites, ...userWrites], now),
])
return {

@ -4,13 +4,14 @@ import { countAll, actorWhereClause } from '../../../../db/util'
import * as locals from '../../../../locals'
import { getDeclarationSimple } from '../util'
import ServerAuth from '../../../../auth'
import { CID } from 'multiformats/cid'
export default function (server: Server) {
server.app.bsky.actor.getProfile({
auth: ServerAuth.verifier,
handler: async ({ auth, params, res }) => {
const { actor } = params
const { db } = locals.get(res)
const { db, imgUriBuilder } = locals.get(res)
const requester = auth.credentials.did
const { ref } = db.db.dynamic
@ -29,6 +30,7 @@ export default function (server: Server) {
'profile.uri as profileUri',
'profile.displayName as displayName',
'profile.description as description',
'profile.avatarCid as avatarCid',
db.db
.selectFrom('follow')
.whereRef('creator', '=', ref('did_handle.did'))
@ -72,6 +74,17 @@ export default function (server: Server) {
throw new InvalidRequestError(`Profile not found`)
}
const avatar = queryRes.avatarCid
? imgUriBuilder.getSignedUri({
cid: CID.parse(queryRes.avatarCid),
format: 'jpeg',
fit: 'cover',
height: 250,
width: 250,
min: true,
})
: undefined
return {
encoding: 'application/json',
body: {
@ -81,6 +94,7 @@ export default function (server: Server) {
creator: queryRes.owner || queryRes.did,
displayName: queryRes.displayName || undefined,
description: queryRes.description || undefined,
avatar: avatar,
followsCount: queryRes.followsCount,
followersCount: queryRes.followersCount,
membersCount: queryRes.membersCount,

@ -6,7 +6,7 @@ import { AtUri } from '@atproto/uri'
import { CID } from 'multiformats/cid'
import * as Profile from '../../../../lexicon/types/app/bsky/actor/profile'
import * as common from '@atproto/common'
import * as repoUtil from '../../../../util/repo'
import * as repo from '../../../../repo'
import ServerAuth from '../../../../auth'
const profileNsid = lexicons.ids.AppBskyActorProfile
@ -15,7 +15,7 @@ export default function (server: Server) {
server.app.bsky.actor.updateProfile({
auth: ServerAuth.verifier,
handler: async ({ auth, input, res }) => {
const { db } = locals.get(res)
const { db, blobstore } = locals.get(res)
const requester = auth.credentials.did
const did = input.body.did || requester
@ -45,12 +45,14 @@ export default function (server: Server) {
...current,
displayName: input.body.displayName || current.displayName,
description: input.body.description || current.description,
avatar: input.body.avatar || current.avatar,
}
} else {
updated = {
$type: profileNsid,
displayName: input.body.displayName,
description: input.body.description,
avatar: input.body.avatar,
}
}
updated = common.noUndefinedVals(updated)
@ -60,14 +62,21 @@ export default function (server: Server) {
)
}
const writes = await repoUtil.prepareWrites(did, {
const writes = await repo.prepareWrites(did, {
action: current ? 'update' : 'create',
collection: profileNsid,
rkey: 'self',
value: updated,
})
await repoUtil.writeToRepo(dbTxn, did, authStore, writes, now)
const commit = await repo.writeToRepo(
dbTxn,
did,
authStore,
writes,
now,
)
await repo.processWriteBlobs(dbTxn, blobstore, did, commit, writes)
const write = writes[0]
let profileCid: CID
@ -87,6 +96,7 @@ export default function (server: Server) {
cid: profileCid.toString(),
displayName: updated.displayName,
description: updated.description,
avatarCid: updated.avatar?.cid,
indexedAt: now,
})
.where('uri', '=', uri.toString())

@ -2,7 +2,7 @@ import { AtUri } from '@atproto/uri'
import * as lexicons from '../../../../lexicon/lexicons'
import { Server } from '../../../../lexicon'
import * as locals from '../../../../locals'
import * as repoUtil from '../../../../util/repo'
import * as repo from '../../../../repo'
import { TID } from '@atproto/common'
import { DeleteOp } from '@atproto/repo'
import ServerAuth from '../../../../auth'
@ -39,7 +39,7 @@ export default function (server: Server) {
return existingVotes[0].uri
}
const writes = await repoUtil.prepareWrites(requester, [
const writes = await repo.prepareWrites(requester, [
...existingVotes.map((vote): DeleteOp => {
const uri = new AtUri(vote.uri)
return {
@ -50,10 +50,10 @@ export default function (server: Server) {
}),
])
let create: repoUtil.PreparedCreate | undefined
let create: repo.PreparedCreate | undefined
if (direction !== 'none') {
create = await repoUtil.prepareCreate(requester, {
create = await repo.prepareCreate(requester, {
action: 'create',
collection: lexicons.ids.AppBskyFeedVote,
rkey: TID.nextStr(),
@ -66,8 +66,10 @@ export default function (server: Server) {
writes.push(create)
}
await repoUtil.writeToRepo(dbTxn, requester, authStore, writes, now)
await repoUtil.indexWrites(dbTxn, writes, now)
await Promise.all([
await repo.writeToRepo(dbTxn, requester, authStore, writes, now),
await repo.indexWrites(dbTxn, writes, now),
])
return create?.uri.toString()
})

@ -8,7 +8,7 @@ import { countAll } from '../../../db/util'
import { UserAlreadyExistsError } from '../../../db'
import { grantRefreshToken } from './util/auth'
import * as lexicons from '../../../lexicon/lexicons'
import * as repoUtil from '../../../util/repo'
import * as repo from '../../../repo'
import { cidForData } from '@atproto/common'
export default function (server: Server) {
@ -145,7 +145,7 @@ export default function (server: Server) {
.execute()
}
const write = await repoUtil.prepareCreate(did, {
const write = await repo.prepareCreate(did, {
action: 'create',
collection: lexicons.ids.AppBskySystemDeclaration,
rkey: 'self',
@ -154,8 +154,8 @@ export default function (server: Server) {
// Setup repo root
const authStore = locals.getAuthstore(res, did)
await repoUtil.createRepo(dbTxn, did, authStore, [write], now)
await repoUtil.indexWrites(dbTxn, [write], now)
await repo.createRepo(dbTxn, did, authStore, [write], now)
await repo.indexWrites(dbTxn, [write], now)
const declarationCid = await cidForData(declaration)
const access = auth.createAccessToken(did)

@ -0,0 +1,27 @@
import { Server } from '../../../lexicon'
import * as locals from '../../../locals'
import * as repo from '../../../repo'
import ServerAuth from '../../../auth'
export default function (server: Server) {
server.com.atproto.data.uploadFile({
auth: ServerAuth.verifier,
handler: async ({ input, res }) => {
const { db, blobstore } = locals.get(res)
const cid = await repo.addUntetheredBlob(
db,
blobstore,
input.encoding,
input.body,
)
return {
encoding: 'application/json',
body: {
cid: cid.toString(),
},
}
},
})
}

@ -1,18 +1,20 @@
import { Server } from '../../../lexicon'
import handles from './handles'
import session from './session'
import account from './account'
import data from './data'
import handles from './handles'
import invites from './invites'
import passwordReset from './password-reset'
import repo from './repo'
import session from './session'
import sync from './sync'
import invites from './invites'
export default function (server: Server) {
handles(server)
session(server)
account(server)
data(server)
handles(server)
invites(server)
passwordReset(server)
repo(server)
session(server)
sync(server)
invites(server)
}

@ -7,7 +7,7 @@ import { DeleteOp, RecordCreateOp } from '@atproto/repo'
import * as locals from '../../../locals'
import * as dbSchemas from '../../../db/schemas'
import { TID } from '@atproto/common'
import * as repoUtil from '../../../util/repo'
import * as repo from '../../../repo'
import ServerAuth from '../../../auth'
export default function (server: Server) {
@ -101,7 +101,7 @@ export default function (server: Server) {
handler: async ({ input, auth, res }) => {
const tx = input.body
const { did, validate } = tx
const { db } = locals.get(res)
const { db, blobstore } = locals.get(res)
const requester = auth.credentials.did
const authorized = await db.isUserControlledRepo(did, requester)
if (!authorized) {
@ -129,7 +129,7 @@ export default function (server: Server) {
}
}
const writes = await repoUtil.prepareWrites(
const writes = await repo.prepareWrites(
did,
tx.writes.map((write) => {
if (write.action === 'create') {
@ -149,10 +149,7 @@ export default function (server: Server) {
await db.transaction(async (dbTxn) => {
const now = new Date().toISOString()
await Promise.all([
repoUtil.writeToRepo(dbTxn, did, authStore, writes, now),
repoUtil.indexWrites(dbTxn, writes, now),
])
await repo.processWrites(dbTxn, did, authStore, blobstore, writes, now)
})
},
})
@ -163,7 +160,7 @@ export default function (server: Server) {
const { did, collection, record } = input.body
const validate =
typeof input.body.validate === 'boolean' ? input.body.validate : true
const { db } = locals.get(res)
const { db, blobstore } = locals.get(res)
const requester = auth.credentials.did
const authorized = await db.isUserControlledRepo(did, requester)
if (!authorized) {
@ -196,7 +193,7 @@ export default function (server: Server) {
}
const now = new Date().toISOString()
const write = await repoUtil.prepareCreate(did, {
const write = await repo.prepareCreate(did, {
action: 'create',
collection,
rkey,
@ -204,10 +201,7 @@ export default function (server: Server) {
})
await db.transaction(async (dbTxn) => {
await Promise.all([
repoUtil.writeToRepo(dbTxn, did, authStore, [write], now),
repoUtil.indexWrites(dbTxn, [write], now),
])
await repo.processWrites(dbTxn, did, authStore, blobstore, [write], now)
})
return {
@ -225,7 +219,7 @@ export default function (server: Server) {
auth: ServerAuth.verifier,
handler: async ({ input, auth, res }) => {
const { did, collection, rkey } = input.body
const { db } = locals.get(res)
const { db, blobstore } = locals.get(res)
const requester = auth.credentials.did
const authorized = await db.isUserControlledRepo(did, requester)
if (!authorized) {
@ -235,17 +229,14 @@ export default function (server: Server) {
const authStore = locals.getAuthstore(res, did)
const now = new Date().toISOString()
const write = await repoUtil.prepareWrites(did, {
const write = await repo.prepareWrites(did, {
action: 'delete',
collection,
rkey,
})
await db.transaction(async (dbTxn) => {
await Promise.all([
repoUtil.writeToRepo(dbTxn, did, authStore, write, now),
repoUtil.indexWrites(dbTxn, write, now),
])
await repo.processWrites(dbTxn, did, authStore, blobstore, write, now)
})
},
})

@ -3,6 +3,8 @@ import * as crypto from '@atproto/crypto'
import Database from './db'
import server from './index'
import { ServerConfig } from './config'
import { DiskBlobStore, MemoryBlobStore } from './storage'
import { BlobStore } from '@atproto/repo'
const run = async () => {
const env = process.env.ENV
@ -15,7 +17,10 @@ const run = async () => {
let db: Database
const keypair = await crypto.EcdsaKeypair.create()
const cfg = ServerConfig.readEnv({ recoveryKey: keypair.did() })
const cfg = ServerConfig.readEnv({
serverDid: keypair.did(),
recoveryKey: keypair.did(),
})
if (cfg.dbPostgresUrl) {
db = Database.postgres({
@ -30,7 +35,17 @@ const run = async () => {
await db.migrateToLatestOrThrow()
const { listener } = server(db, keypair, cfg)
let blobstore: BlobStore
if (cfg.blobstoreLocation) {
blobstore = await DiskBlobStore.create(
cfg.blobstoreLocation,
cfg.blobstoreTmp,
)
} else {
blobstore = new MemoryBlobStore()
}
const { listener } = server(db, blobstore, keypair, cfg)
listener.on('listening', () => {
console.log(`🌞 ATP Data server is running at ${cfg.origin}`)
})

@ -10,6 +10,9 @@ export interface ServerConfigValues {
dbPostgresUrl?: string
dbPostgresSchema?: string
blobstoreLocation?: string
blobstoreTmp?: string
jwtSecret: string
didPlcUrl: string
@ -27,6 +30,11 @@ export interface ServerConfigValues {
availableUserDomains: string[]
imgUriSalt: string
imgUriKey: string
imgUriEndpoint?: string
blobCacheLocation?: string
appUrlPasswordReset: string
emailSmtpUrl?: string
emailNoReplyAddress: string
@ -80,10 +88,21 @@ export class ServerConfig {
const blockstoreLocation = process.env.BLOCKSTORE_LOC
const databaseLocation = process.env.DATABASE_LOC
const blobstoreLocation = process.env.BLOBSTORE_LOC
const blobstoreTmp = process.env.BLOBSTORE_TMP
const availableUserDomains = process.env.AVAILABLE_USER_DOMAINS
? process.env.AVAILABLE_USER_DOMAINS.split(',')
: []
const imgUriSalt =
process.env.IMG_URI_SALT || '9dd04221f5755bce5f55f47464c27e1e'
const imgUriKey =
process.env.IMG_URI_KEY ||
'f23ecd142835025f42c3db2cf25dd813956c178392760256211f9d315f8ab4d8'
const imgUriEndpoint = process.env.IMG_URI_ENDPOINT
const blobCacheLocation = process.env.BLOB_CACHE_LOC
const appUrlPasswordReset =
process.env.APP_URL_PASSWORD_RESET || 'app://password-reset'
@ -104,6 +123,8 @@ export class ServerConfig {
port,
dbPostgresUrl,
dbPostgresSchema,
blobstoreLocation,
blobstoreTmp,
jwtSecret,
recoveryKey,
didPlcUrl,
@ -115,6 +136,10 @@ export class ServerConfig {
blockstoreLocation,
databaseLocation,
availableUserDomains,
imgUriSalt,
imgUriKey,
imgUriEndpoint,
blobCacheLocation,
appUrlPasswordReset,
emailSmtpUrl,
emailNoReplyAddress,
@ -168,6 +193,14 @@ export class ServerConfig {
return this.cfg.dbPostgresSchema
}
get blobstoreLocation() {
return this.cfg.blobstoreLocation
}
get blobstoreTmp() {
return this.cfg.blobstoreTmp
}
get jwtSecret() {
return this.cfg.jwtSecret
}
@ -232,6 +265,22 @@ export class ServerConfig {
return this.cfg.availableUserDomains
}
get imgUriSalt() {
return this.cfg.imgUriSalt
}
get imgUriKey() {
return this.cfg.imgUriKey
}
get imgUriEndpoint() {
return this.cfg.imgUriEndpoint
}
get blobCacheLocation() {
return this.cfg.blobCacheLocation
}
get appUrlPasswordReset() {
return this.cfg.appUrlPasswordReset
}

@ -16,6 +16,8 @@ import * as vote from './tables/vote'
import * as repost from './tables/repost'
import * as trend from './tables/trend'
import * as follow from './tables/follow'
import * as blob from './tables/blob'
import * as repoBlob from './tables/repo-blob'
import * as messageQueue from './message-queue/tables/messageQueue'
import * as messageQueueCursor from './message-queue/tables/messageQueueCursor'
import * as sceneMemberCount from './message-queue/tables/sceneMemberCount'
@ -39,6 +41,8 @@ export type DatabaseSchema = user.PartialDB &
repost.PartialDB &
trend.PartialDB &
follow.PartialDB &
blob.PartialDB &
repoBlob.PartialDB &
messageQueue.PartialDB &
messageQueueCursor.PartialDB &
sceneMemberCount.PartialDB &

@ -11,7 +11,7 @@ import {
import { sql } from 'kysely'
import { dbLogger as log } from '../../logger'
import { AuthStore } from '@atproto/auth'
import * as repoUtil from '../../util/repo'
import * as repo from '../../repo'
import * as lexicons from '../../lexicon/lexicons'
import { TID } from '@atproto/common'
import { MessageQueue } from '../types'
@ -298,7 +298,7 @@ export class SqlMessageQueue implements MessageQueue {
// this is a "threshold vote" that makes the post trend
const sceneAuth = this.getAuthStore(scene.did)
const writes = await repoUtil.prepareWrites(scene.did, {
const writes = await repo.prepareWrites(scene.did, {
action: 'create',
collection: lexicons.ids.AppBskyFeedTrend,
rkey: TID.nextStr(),
@ -318,8 +318,8 @@ export class SqlMessageQueue implements MessageQueue {
.execute()
await Promise.all([
repoUtil.writeToRepo(db, scene.did, sceneAuth, writes, now),
repoUtil.indexWrites(db, writes, now),
repo.writeToRepo(db, scene.did, sceneAuth, writes, now),
repo.indexWrites(db, writes, now),
setTrendPosted,
])
}),

@ -0,0 +1,36 @@
import { Kysely } from 'kysely'
const blobTable = 'blob'
const repoBlobTable = 'repo_blob'
export async function up(db: Kysely<unknown>): Promise<void> {
await db.schema
.createTable(blobTable)
.addColumn('cid', 'varchar', (col) => col.primaryKey())
.addColumn('mimeType', 'varchar', (col) => col.notNull())
.addColumn('size', 'integer', (col) => col.notNull())
.addColumn('tempKey', 'varchar')
.addColumn('width', 'integer')
.addColumn('height', 'integer')
.addColumn('createdAt', 'varchar', (col) => col.notNull())
.execute()
await db.schema
.createTable(repoBlobTable)
.addColumn('cid', 'varchar', (col) => col.notNull())
.addColumn('recordUri', 'varchar', (col) => col.notNull())
.addColumn('commit', 'varchar', (col) => col.notNull())
.addColumn('did', 'varchar', (col) => col.notNull())
.addPrimaryKeyConstraint(`${repoBlobTable}_pkey`, ['cid', 'recordUri'])
.execute()
await db.schema
.alterTable('profile')
.addColumn('avatarCid', 'varchar')
.execute()
}
export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema.alterTable('profile').dropColumn('avatarCid').execute()
await db.schema.dropTable(repoBlobTable).execute()
await db.schema.dropTable(blobTable).execute()
}

@ -4,3 +4,4 @@
export * as _20221021T162202001Z from './20221021T162202001Z-init'
export * as _20221116T234458063Z from './20221116T234458063Z-duplicate-records'
export * as _20221202T212459280Z from './20221202T212459280Z-blobs'

@ -25,6 +25,7 @@ const insertFn = async (
creator: uri.host,
displayName: obj.displayName,
description: obj.description,
avatarCid: obj.avatar?.cid,
indexedAt: new Date().toISOString(),
})
.onConflict((oc) => oc.doNothing())

@ -0,0 +1,13 @@
export interface Blob {
cid: string
mimeType: string
size: number
tempKey: string | null
width: number | null
height: number | null
createdAt: string
}
export const tableName = 'blob'
export type PartialDB = { [tableName]: Blob }

@ -6,6 +6,7 @@ export interface Profile {
creator: string
displayName: string
description: string | null
avatarCid: string | null
indexedAt: string
}
export type PartialDB = { [tableName]: Profile }

@ -0,0 +1,10 @@
export interface RepoBlob {
cid: string
recordUri: string
commit: string
did: string
}
export const tableName = 'repo_blob'
export type PartialDB = { [tableName]: RepoBlob }

@ -0,0 +1,2 @@
export * from './sharp'
export type { Options, ImageInfo } from './util'

@ -2,25 +2,31 @@ import fs from 'fs/promises'
import fsSync from 'fs'
import os from 'os'
import path from 'path'
import { PassThrough, Readable } from 'stream'
import { Readable } from 'stream'
import express, { ErrorRequestHandler, NextFunction } from 'express'
import createError, { isHttpError } from 'http-errors'
import { BadPathError, ImageUriBuilder } from './uri'
import log from './logger'
import { resize } from './sharp'
import { forwardStreamErrors, formatsToMimes, Options } from './util'
import { formatsToMimes, Options } from './util'
import { BlobNotFoundError, BlobStore } from '@atproto/repo'
import {
cloneStream,
forwardStreamErrors,
isErrnoException,
} from '@atproto/common'
export class ImageProcessingServer {
app = express()
uriBuilder: ImageUriBuilder
constructor(
protected salt: Uint8Array,
protected key: Uint8Array,
protected storage: BlobStorage,
protected salt: string | Uint8Array,
protected key: string | Uint8Array,
protected storage: BlobStore,
public cache: BlobCache,
) {
this.uriBuilder = new ImageUriBuilder(salt, key)
this.uriBuilder = new ImageUriBuilder('', salt, key)
this.app.get('*', this.handler.bind(this))
this.app.use(errorMiddleware)
}
@ -52,7 +58,7 @@ export class ImageProcessingServer {
// Non-cached flow
const imageStream = await this.storage.get(options.fileId)
const imageStream = await this.storage.getStream(options.cid)
const processedImage = await resize(imageStream, options)
// Cache in the background
@ -104,33 +110,7 @@ function getMime(format: Options['format']) {
return mime
}
export interface BlobStorage {
get(fileId: string): Promise<Readable>
}
export class BlobNotFoundError extends Error {}
export class BlobDiskStorage implements BlobStorage {
constructor(public basePath: string) {
if (!path.isAbsolute(this.basePath)) {
throw new Error('Must provide an absolute path')
}
}
async get(fileId: string) {
try {
const handle = await fs.open(path.join(this.basePath, fileId), 'r')
return handle.createReadStream()
} catch (err) {
if (isErrnoException(err) && err.code === 'ENOENT') {
throw new BlobNotFoundError()
}
throw err
}
}
}
export interface BlobCache extends BlobStorage {
export interface BlobCache {
get(fileId: string): Promise<Readable & { size: number }>
put(fileId: string, stream: Readable): Promise<void>
clear(): Promise<void>
@ -179,13 +159,3 @@ export class BlobDiskCache implements BlobCache {
await fs.rm(this.tempDir, { recursive: true, force: true })
}
}
function cloneStream(stream: Readable) {
const passthrough = new PassThrough()
forwardStreamErrors(stream, passthrough)
return stream.pipe(passthrough)
}
function isErrnoException(err: unknown): err is NodeJS.ErrnoException {
return !!err && 'code' in err
}

@ -1,6 +1,7 @@
import { Readable } from 'stream'
import sharp from 'sharp'
import { formatsToMimes, forwardStreamErrors, ImageInfo, Options } from './util'
import { forwardStreamErrors } from '@atproto/common'
import { formatsToMimes, ImageInfo, Options } from './util'
export type { Options }

@ -1,11 +1,26 @@
import { createHmac } from 'crypto'
import * as uint8arrays from 'uint8arrays'
import { CID } from 'multiformats/cid'
import { Options } from './util'
export class ImageUriBuilder {
constructor(public salt: Uint8Array, public key: Uint8Array) {}
public endpoint: string
private salt: Uint8Array
private key: Uint8Array
getSignedPath(opts: Options & { fileId: string }) {
constructor(
endpoint: string,
salt: Uint8Array | string,
key: Uint8Array | string,
) {
this.endpoint = endpoint
this.salt =
typeof salt === 'string' ? uint8arrays.fromString(salt, 'hex') : salt
this.key =
typeof key === 'string' ? uint8arrays.fromString(key, 'hex') : key
}
getSignedPath(opts: Options & { cid: CID }): string {
const path = ImageUriBuilder.getPath(opts)
const saltedPath = uint8arrays.concat([
this.salt,
@ -15,9 +30,12 @@ export class ImageUriBuilder {
return `/${sig}${path}`
}
getVerifiedOptions(
path: string,
): Options & { fileId: string; signature: string } {
getSignedUri(opts: Options & { cid: CID }): string {
const path = this.getSignedPath(opts)
return this.endpoint + path
}
getVerifiedOptions(path: string): Options & { cid: CID; signature: string } {
if (path.at(0) !== '/') {
throw new BadPathError('Invalid path: does not start with a slash')
}
@ -42,7 +60,7 @@ export class ImageUriBuilder {
}
}
static getPath(opts: Options & { fileId: string }) {
static getPath(opts: Options & { cid: CID }) {
const fit = opts.fit === 'inside' ? 'fit' : 'fill' // fit default is 'cover'
const enlarge = opts.min === true ? 1 : 0 // min default is false
const resize = `rs:${fit}:${opts.width}:${opts.height}:${enlarge}:0` // final ':0' is for interop with imgproxy
@ -50,15 +68,14 @@ export class ImageUriBuilder {
opts.min && typeof opts.min === 'object' ? `mw:${opts.min.width}` : null
const minHeight =
opts.min && typeof opts.min === 'object' ? `mh:${opts.min.height}` : null
const enc = encodeURIComponent
return (
`/` +
[resize, minWidth, minHeight].filter(Boolean).join('/') +
`/plain/${enc(opts.fileId)}@${opts.format}`
`/plain/${opts.cid.toString()}@${opts.format}`
)
}
static getOptions(path: string): Options & { fileId: string } {
static getOptions(path: string): Options & { cid: CID } {
if (path.at(0) !== '/') {
throw new BadPathError('Invalid path: does not start with a slash')
}
@ -66,10 +83,10 @@ export class ImageUriBuilder {
if (parts.at(-2) !== 'plain') {
throw new BadPathError('Invalid path')
}
const fileIdAndFormat = parts.at(-1)
const [fileId, format, ...others] = fileIdAndFormat?.split('@') ?? []
if (!fileId || (format !== 'png' && format !== 'jpeg') || others.length) {
throw new BadPathError('Invalid path: bad fileId/format part')
const cidAndFormat = parts.at(-1)
const [cid, format, ...others] = cidAndFormat?.split('@') ?? []
if (!cid || (format !== 'png' && format !== 'jpeg') || others.length) {
throw new BadPathError('Invalid path: bad cid/format part')
}
const resizePart = parts.find((part) => part.startsWith('rs:'))
const minWidthPart = parts.find((part) => part.startsWith('mw:'))
@ -95,9 +112,8 @@ export class ImageUriBuilder {
) {
throw new BadPathError('Invalid path: bad min width/height param')
}
const dec = decodeURIComponent
return {
fileId: dec(fileId),
cid: CID.parse(cid),
format,
height: toInt(height),
width: toInt(width),

@ -1,4 +1,3 @@
import { Stream } from 'stream'
import { FormatEnum } from 'sharp'
export type Options = Dimensions & {
@ -21,16 +20,6 @@ export type ImageInfo = Dimensions & {
export type Dimensions = { height: number; width: number }
export function forwardStreamErrors(...streams: Stream[]) {
for (let i = 0; i < streams.length; ++i) {
const stream = streams[i]
const next = streams[i + 1]
if (next) {
stream.once('error', (err) => next.emit('error', err))
}
}
}
export const formatsToMimes: { [s in keyof FormatEnum]?: `image/${string}` } = {
jpg: 'image/jpeg',
jpeg: 'image/jpeg',

@ -18,15 +18,20 @@ import { Locals } from './locals'
import { ServerMailer } from './mailer'
import { createTransport } from 'nodemailer'
import SqlMessageQueue from './db/message-queue'
import { BlobStore } from '@atproto/repo'
import { ImageUriBuilder } from './image/uri'
import { BlobDiskCache, ImageProcessingServer } from './image/server'
export type { ServerConfigValues } from './config'
export { ServerConfig } from './config'
export { Database } from './db'
export { DiskBlobStore, MemoryBlobStore } from './storage'
export type App = express.Application
const runServer = (
db: Database,
blobstore: BlobStore,
keypair: auth.DidableKey,
cfg: ServerConfigValues,
) => {
@ -54,11 +59,32 @@ const runServer = (
app.use(cors())
app.use(loggerMiddleware)
let imgUriEndpoint = config.imgUriEndpoint
if (!imgUriEndpoint) {
const imgProcessingCache = new BlobDiskCache(config.blobCacheLocation)
const imgProcessingServer = new ImageProcessingServer(
config.imgUriSalt,
config.imgUriKey,
blobstore,
imgProcessingCache,
)
app.use('/image', imgProcessingServer.app)
imgUriEndpoint = `${config.publicUrl}/image`
}
const imgUriBuilder = new ImageUriBuilder(
imgUriEndpoint,
cfg.imgUriSalt,
cfg.imgUriKey,
)
const locals: Locals = {
logger: httpLogger,
db,
blobstore,
keypair,
auth,
imgUriBuilder,
config,
mailer,
}

@ -14,6 +14,7 @@ import * as ComAtprotoAccountDelete from './types/com/atproto/account/delete'
import * as ComAtprotoAccountGet from './types/com/atproto/account/get'
import * as ComAtprotoAccountRequestPasswordReset from './types/com/atproto/account/requestPasswordReset'
import * as ComAtprotoAccountResetPassword from './types/com/atproto/account/resetPassword'
import * as ComAtprotoDataUploadFile from './types/com/atproto/data/uploadFile'
import * as ComAtprotoHandleResolve from './types/com/atproto/handle/resolve'
import * as ComAtprotoRepoBatchWrite from './types/com/atproto/repo/batchWrite'
import * as ComAtprotoRepoCreateRecord from './types/com/atproto/repo/createRecord'
@ -89,6 +90,7 @@ export class ComNS {
export class AtprotoNS {
_server: Server
account: AccountNS
data: DataNS
handle: HandleNS
repo: RepoNS
server: ServerNS
@ -98,6 +100,7 @@ export class AtprotoNS {
constructor(server: Server) {
this._server = server
this.account = new AccountNS(server)
this.data = new DataNS(server)
this.handle = new HandleNS(server)
this.repo = new RepoNS(server)
this.server = new ServerNS(server)
@ -162,6 +165,21 @@ export class AccountNS {
}
}
export class DataNS {
_server: Server
constructor(server: Server) {
this._server = server
}
uploadFile<AV extends AuthVerifier>(
cfg: ConfigOf<AV, ComAtprotoDataUploadFile.Handler<ExtractAuth<AV>>>,
) {
const nsid = 'com.atproto.data.uploadFile' // @ts-ignore
return this._server.xrpc.method(nsid, cfg)
}
}
export class HandleNS {
_server: Server

@ -182,6 +182,32 @@ export const lexicons: LexiconDoc[] = [
},
},
},
{
lexicon: 1,
id: 'com.atproto.data.uploadFile',
defs: {
main: {
type: 'procedure',
description:
'Upload a new file to be added to repo in a later request.',
input: {
encoding: '*/*',
},
output: {
encoding: 'application/json',
schema: {
type: 'object',
required: ['cid'],
properties: {
cid: {
type: 'string',
},
},
},
},
},
},
},
{
lexicon: 1,
id: 'com.atproto.handle.resolve',
@ -987,6 +1013,9 @@ export const lexicons: LexiconDoc[] = [
type: 'string',
maxLength: 256,
},
avatar: {
type: 'string',
},
followersCount: {
type: 'integer',
},
@ -1121,6 +1150,13 @@ export const lexicons: LexiconDoc[] = [
type: 'string',
maxLength: 256,
},
avatar: {
type: 'image',
accept: ['image/png', 'image/jpeg'],
maxWidth: 500,
maxHeight: 500,
maxSize: 100000,
},
},
},
},
@ -1322,6 +1358,13 @@ export const lexicons: LexiconDoc[] = [
type: 'string',
maxLength: 256,
},
avatar: {
type: 'image',
accept: ['image/png', 'image/jpeg'],
maxWidth: 500,
maxHeight: 500,
maxSize: 100000,
},
},
},
},
@ -2914,6 +2957,7 @@ export const ids = {
ComAtprotoAccountRequestPasswordReset:
'com.atproto.account.requestPasswordReset',
ComAtprotoAccountResetPassword: 'com.atproto.account.resetPassword',
ComAtprotoDataUploadFile: 'com.atproto.data.uploadFile',
ComAtprotoHandleResolve: 'com.atproto.handle.resolve',
ComAtprotoRepoBatchWrite: 'com.atproto.repo.batchWrite',
ComAtprotoRepoCreateRecord: 'com.atproto.repo.createRecord',

@ -18,6 +18,7 @@ export interface OutputSchema {
creator: string
displayName?: string
description?: string
avatar?: string
followersCount: number
followsCount: number
membersCount: number

@ -4,5 +4,6 @@
export interface Record {
displayName: string
description?: string
avatar?: { cid: string; mimeType: string; [k: string]: unknown }
[k: string]: unknown
}

@ -10,6 +10,7 @@ export interface InputSchema {
did?: string
displayName?: string
description?: string
avatar?: { cid: string; mimeType: string; [k: string]: unknown }
[k: string]: unknown
}

@ -0,0 +1,39 @@
/**
* GENERATED CODE - DO NOT MODIFY
*/
import express from 'express'
import stream from 'stream'
import { HandlerAuth } from '@atproto/xrpc-server'
export interface QueryParams {}
export type InputSchema = string | Uint8Array
export interface OutputSchema {
cid: string
[k: string]: unknown
}
export interface HandlerInput {
encoding: '*/*'
body: stream.Readable
}
export interface HandlerSuccess {
encoding: 'application/json'
body: OutputSchema
}
export interface HandlerError {
status: number
message?: string
}
export type HandlerOutput = HandlerError | HandlerSuccess
export type Handler<HA extends HandlerAuth = never> = (ctx: {
auth: HA
params: QueryParams
input: HandlerInput
req: express.Request
res: express.Response
}) => Promise<HandlerOutput> | HandlerOutput

@ -2,6 +2,7 @@
* GENERATED CODE - DO NOT MODIFY
*/
import express from 'express'
import stream from 'stream'
import { HandlerAuth } from '@atproto/xrpc-server'
export interface QueryParams {
@ -16,7 +17,7 @@ export type HandlerInput = undefined
export interface HandlerSuccess {
encoding: 'application/cbor'
body: Uint8Array
body: Uint8Array | stream.Readable
}
export interface HandlerError {

@ -2,6 +2,7 @@
* GENERATED CODE - DO NOT MODIFY
*/
import express from 'express'
import stream from 'stream'
import { HandlerAuth } from '@atproto/xrpc-server'
export interface QueryParams {
@ -13,7 +14,7 @@ export type InputSchema = string | Uint8Array
export interface HandlerInput {
encoding: 'application/cbor'
body: Uint8Array
body: stream.Readable
}
export interface HandlerError {

@ -7,12 +7,16 @@ import { ServerConfig } from './config'
import ServerAuth from './auth'
import { ServerMailer } from './mailer'
import { App } from '.'
import { BlobStore } from '@atproto/repo'
import { ImageUriBuilder } from './image/uri'
export type Locals = {
logger: pino.Logger
db: Database
blobstore: BlobStore
keypair: DidableKey
auth: ServerAuth
imgUriBuilder: ImageUriBuilder
config: ServerConfig
mailer: ServerMailer
}
@ -35,6 +39,14 @@ export const db = (res: HasLocals): Database => {
return db as Database
}
export const blobstore = (res: HasLocals): BlobStore => {
const blobstore = res.locals.blobstore
if (!blobstore) {
throw new Error('No BlobStore object attached to server')
}
return blobstore as BlobStore
}
export const keypair = (res: HasLocals): DidableKey => {
const keypair = res.locals.keypair
if (!keypair) {
@ -67,12 +79,22 @@ export const auth = (res: HasLocals): ServerAuth => {
return auth as ServerAuth
}
export const imgUriBuilder = (res: HasLocals): ImageUriBuilder => {
const imgUriBuilder = res.locals.imgUriBuilder
if (!imgUriBuilder) {
throw new Error('No ImageUriBuilder object attached to server')
}
return imgUriBuilder as ImageUriBuilder
}
export const getLocals = (res: HasLocals): Locals => {
return {
logger: logger(res),
db: db(res),
blobstore: blobstore(res),
keypair: keypair(res),
auth: auth(res),
imgUriBuilder: imgUriBuilder(res),
config: config(res),
mailer: mailer(res),
}

@ -0,0 +1,194 @@
import stream from 'stream'
import { CID } from 'multiformats/cid'
import { BlobStore } from '@atproto/repo'
import Database from '../db'
import { cloneStream, sha256RawToCid, streamSize } from '@atproto/common'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { AtUri } from '@atproto/uri'
import { BlobRef, PreparedWrites } from './types'
import { Blob as BlobTable } from '../db/tables/blob'
import * as img from '../image'
import { sha256Stream } from '@atproto/crypto'
export const addUntetheredBlob = async (
dbTxn: Database,
blobstore: BlobStore,
mimeType: string,
blobStream: stream.Readable,
): Promise<CID> => {
const maybeGetImgInfo = async (
readable: stream.Readable,
): Promise<img.ImageInfo | null> => {
if (mimeType.startsWith('image')) {
return img.getInfo(readable)
} else {
return null
}
}
const [tempKey, size, sha256, imgInfo] = await Promise.all([
blobstore.putTemp(cloneStream(blobStream)),
streamSize(cloneStream(blobStream)),
sha256Stream(cloneStream(blobStream)),
maybeGetImgInfo(cloneStream(blobStream)),
])
const cid = sha256RawToCid(sha256)
await dbTxn.db
.insertInto('blob')
.values({
cid: cid.toString(),
mimeType,
size,
tempKey,
width: imgInfo?.width || null,
height: imgInfo?.height || null,
createdAt: new Date().toISOString(),
})
.execute()
return cid
}
export const processWriteBlobs = async (
dbTxn: Database,
blobstore: BlobStore,
did: string,
commit: CID,
writes: PreparedWrites,
) => {
const blobPromises: Promise<void>[] = []
for (const write of writes) {
if (write.action === 'create' || write.action === 'update') {
for (const blob of write.blobs) {
blobPromises.push(verifyBlobAndMakePermanent(dbTxn, blobstore, blob))
blobPromises.push(associateBlob(dbTxn, blob, write.uri, commit, did))
}
}
}
await Promise.all(blobPromises)
}
export const verifyBlob = (blob: BlobRef, found: BlobTable) => {
const throwInvalid = (msg: string) => {
throw new InvalidRequestError(msg, 'InvalidBlob')
}
if (blob.constraints.maxSize && found.size > blob.constraints.maxSize) {
throwInvalid(
`Blob too large. Expected ${blob.constraints.maxSize}. Got: ${found.size}`,
)
}
if (blob.mimeType !== found.mimeType) {
throwInvalid(
`Referenced Mimetype does not match stored blob. Expected: ${found.mimeType}, Got: ${blob.mimeType}`,
)
}
if (
blob.constraints.accept &&
!acceptedMime(blob.mimeType, blob.constraints.accept)
) {
throwInvalid(
`Referenced Mimetype is not accepted. Expected: ${blob.constraints.accept}, Got: ${blob.mimeType}`,
)
}
if (blob.constraints.type === 'image') {
if (!blob.mimeType.startsWith('image')) {
throwInvalid(`Expected an image, got ${blob.mimeType}`)
}
if (
blob.constraints.maxHeight &&
found.height &&
found.height > blob.constraints.maxHeight
) {
throwInvalid(
`Referenced image height is too large. Expected: ${blob.constraints.maxHeight}. Got: ${found.height}`,
)
}
if (
blob.constraints.maxWidth &&
found.width &&
found.width > blob.constraints.maxWidth
) {
throwInvalid(
`Referenced image width is too large. Expected: ${blob.constraints.maxWidth}. Got: ${found.width}`,
)
}
if (
blob.constraints.minHeight &&
found.height &&
found.height < blob.constraints.minHeight
) {
throwInvalid(
`Referenced image height is too small. Expected: ${blob.constraints.minHeight}. Got: ${found.height}`,
)
}
if (
blob.constraints.minWidth &&
found.width &&
found.width < blob.constraints.minWidth
) {
throwInvalid(
`Referenced image width is too small. Expected: ${blob.constraints.minWidth}. Got: ${found.width}`,
)
}
}
}
const acceptedMime = (mime: string, accepted: string[]): boolean => {
if (accepted.includes('*/*')) return true
return accepted.includes(mime)
}
export const verifyBlobAndMakePermanent = async (
dbTxn: Database,
blobstore: BlobStore,
blob: BlobRef,
): Promise<void> => {
const found = await dbTxn.db
.selectFrom('blob')
.selectAll()
.where('cid', '=', blob.cid.toString())
.executeTakeFirst()
if (!found) {
throw new InvalidRequestError(
`Could not found blob: ${blob.cid.toString()}`,
'BlobNotFound',
)
}
if (found.tempKey) {
verifyBlob(blob, found)
await blobstore.makePermanent(found.tempKey, blob.cid)
await dbTxn.db
.updateTable('blob')
.set({ tempKey: null })
.where('tempKey', '=', found.tempKey)
.execute()
}
}
export const associateBlob = async (
dbTxn: Database,
blob: BlobRef,
recordUri: AtUri,
commit: CID,
did: string,
): Promise<void> => {
await dbTxn.db
.insertInto('repo_blob')
.values({
cid: blob.cid.toString(),
recordUri: recordUri.toString(),
commit: commit.toString(),
did,
})
.onConflict((oc) => oc.doNothing())
.execute()
}
export class CidNotFound extends Error {
cid: CID
constructor(cid: CID) {
super(`cid not found: ${cid.toString()}`)
this.cid = cid
}
}

@ -0,0 +1,4 @@
export * from './blobs'
export * from './prepare'
export * from './process'
export * from './types'

@ -0,0 +1,105 @@
import { CID } from 'multiformats/cid'
import {
DeleteOp,
RecordCreateOp,
RecordUpdateOp,
RecordWriteOp,
} from '@atproto/repo'
import { AtUri } from '@atproto/uri'
import { cidForData } from '@atproto/common'
import {
PreparedCreate,
PreparedUpdate,
PreparedDelete,
PreparedWrites,
BlobRef,
} from './types'
import { ids as lexIds } from '../lexicon/lexicons'
// @TODO do this dynamically off of schemas
export const blobsForWrite = (
write: RecordCreateOp | RecordUpdateOp,
): BlobRef[] => {
if (write.collection === lexIds.AppBskyActorProfile) {
if (write.value.avatar) {
return [
{
cid: CID.parse(write.value.avatar.cid),
mimeType: write.value.avatar.mimeType,
constraints: {
type: 'image',
accept: ['image/png', 'image/jpeg'],
maxWidth: 500,
maxHeight: 500,
maxSize: 100000,
},
},
]
}
}
return []
}
export const prepareCreate = async (
did: string,
write: RecordCreateOp,
): Promise<PreparedCreate> => {
write.value.$type = write.collection
return {
action: 'create',
uri: AtUri.make(did, write.collection, write.rkey),
cid: await cidForData(write.value),
op: write,
blobs: blobsForWrite(write),
}
}
export const prepareCreates = async (
did: string,
writes: RecordCreateOp[],
): Promise<PreparedCreate[]> => {
return Promise.all(writes.map((write) => prepareCreate(did, write)))
}
export const prepareUpdate = async (
did: string,
write: RecordUpdateOp,
): Promise<PreparedUpdate> => {
write.value.$type = write.collection
return {
action: 'update',
uri: AtUri.make(did, write.collection, write.rkey),
cid: await cidForData(write.value),
op: write,
blobs: blobsForWrite(write),
}
}
export const prepareDelete = (did: string, write: DeleteOp): PreparedDelete => {
return {
action: 'delete',
uri: AtUri.make(did, write.collection, write.rkey),
op: write,
}
}
export const prepareWrites = async (
did: string,
writes: RecordWriteOp | RecordWriteOp[],
): Promise<PreparedWrites> => {
const writesArr = Array.isArray(writes) ? writes : [writes]
return Promise.all(
writesArr.map((write) => {
if (write.action === 'create') {
return prepareCreate(did, write)
} else if (write.action === 'delete') {
return prepareDelete(did, write)
} else if (write.action === 'update') {
return prepareUpdate(did, write)
} else {
throw new Error(`Action not supported: ${write}`)
}
}),
)
}

@ -0,0 +1,93 @@
import { CID } from 'multiformats/cid'
import { BlobStore, Repo } from '@atproto/repo'
import * as auth from '@atproto/auth'
import Database from '../db'
import SqlBlockstore from '../sql-blockstore'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { PreparedCreate, PreparedWrites } from './types'
import { processWriteBlobs } from './blobs'
export const createRepo = async (
dbTxn: Database,
did: string,
authStore: auth.AuthStore,
writes: PreparedCreate[],
now: string,
) => {
dbTxn.assertTransaction()
const blockstore = new SqlBlockstore(dbTxn, did, now)
const writeOps = writes.map((write) => write.op)
const repo = await Repo.create(blockstore, did, authStore, writeOps)
await dbTxn.db
.insertInto('repo_root')
.values({
did: did,
root: repo.cid.toString(),
indexedAt: now,
})
.execute()
}
export const processWrites = async (
dbTxn: Database,
did: string,
authStore: auth.AuthStore,
blobs: BlobStore,
writes: PreparedWrites,
now: string,
) => {
// make structural write to repo & send to indexing
// @TODO get commitCid first so we can do all db actions in tandem
const [commit] = await Promise.all([
writeToRepo(dbTxn, did, authStore, writes, now),
indexWrites(dbTxn, writes, now),
])
// make blobs permanent & associate w commit + recordUri in DB
await processWriteBlobs(dbTxn, blobs, did, commit, writes)
}
export const writeToRepo = async (
dbTxn: Database,
did: string,
authStore: auth.AuthStore,
writes: PreparedWrites,
now: string,
): Promise<CID> => {
dbTxn.assertTransaction()
const blockstore = new SqlBlockstore(dbTxn, did, now)
const currRoot = await dbTxn.getRepoRoot(did, true)
if (!currRoot) {
throw new InvalidRequestError(
`${did} is not a registered repo on this server`,
)
}
const writeOps = writes.map((write) => write.op)
const repo = await Repo.load(blockstore, currRoot)
const updated = await repo
.stageUpdate(writeOps)
.createCommit(authStore, async (prev, curr) => {
const success = await dbTxn.updateRepoRoot(did, curr, prev, now)
if (!success) {
throw new Error('Repo root update failed, could not linearize')
}
return null
})
return updated.cid
}
export const indexWrites = async (
dbTxn: Database,
writes: PreparedWrites,
now: string,
) => {
dbTxn.assertTransaction()
await Promise.all(
writes.map(async (write) => {
if (write.action === 'create') {
await dbTxn.indexRecord(write.uri, write.cid, write.op.value, now)
} else if (write.action === 'delete') {
await dbTxn.deleteRecord(write.uri)
}
}),
)
}

@ -0,0 +1,55 @@
import { CID } from 'multiformats/cid'
import { DeleteOp, RecordCreateOp, RecordUpdateOp } from '@atproto/repo'
import { AtUri } from '@atproto/uri'
export type ImageConstraint = {
type: 'image'
accept?: string[]
maxHeight?: number
maxWidth?: number
minHeight?: number
minWidth?: number
maxSize?: number
}
export type RawBlobConstraint = {
type: 'blob'
accept?: string[]
maxSize?: number
}
export type BlobConstraint = RawBlobConstraint | ImageConstraint
export type BlobRef = {
cid: CID
mimeType: string
constraints: BlobConstraint
}
export type PreparedCreate = {
action: 'create'
uri: AtUri
cid: CID
op: RecordCreateOp
blobs: BlobRef[]
}
export type PreparedUpdate = {
action: 'update'
uri: AtUri
cid: CID
op: RecordUpdateOp
blobs: BlobRef[]
}
export type PreparedDelete = {
action: 'delete'
uri: AtUri
op: DeleteOp
}
export type PreparedWrites = (
| PreparedCreate
| PreparedUpdate
| PreparedDelete
)[]

@ -0,0 +1,121 @@
import fs from 'fs/promises'
import fsSync from 'fs'
import stream from 'stream'
import os from 'os'
import path from 'path'
import { CID } from 'multiformats/cid'
import { BlobNotFoundError, BlobStore } from '@atproto/repo'
import { randomStr } from '@atproto/crypto'
import { httpLogger as log } from '../logger'
import { isErrnoException } from '@atproto/common'
export class DiskBlobStore implements BlobStore {
location: string
tmpLocation: string
constructor(location: string, tmpLocation: string) {
this.location = location
this.tmpLocation = tmpLocation
}
static async create(
location: string,
tmpLocation?: string,
): Promise<DiskBlobStore> {
const tmp = tmpLocation || path.join(os.tmpdir(), 'atproto/blobs')
await Promise.all([
fs.mkdir(location, { recursive: true }),
fs.mkdir(tmp, { recursive: true }),
])
return new DiskBlobStore(location, tmp)
}
private genKey() {
return randomStr(32, 'base32')
}
getTmpPath(key: string): string {
return path.join(this.tmpLocation, key)
}
getStoredPath(cid: CID): string {
return path.join(this.location, cid.toString())
}
async hasTemp(key: string): Promise<boolean> {
return fileExists(this.getTmpPath(key))
}
async hasStored(cid: CID): Promise<boolean> {
return fileExists(this.getStoredPath(cid))
}
async putTemp(bytes: Uint8Array | stream.Readable): Promise<string> {
const key = this.genKey()
await fs.writeFile(this.getTmpPath(key), bytes)
return key
}
async makePermanent(key: string, cid: CID): Promise<void> {
const tmpPath = this.getTmpPath(key)
const storedPath = this.getStoredPath(cid)
const alreadyHas = await this.hasStored(cid)
if (!alreadyHas) {
const data = await fs.readFile(tmpPath)
await fs.writeFile(storedPath, data)
}
try {
await fs.rm(tmpPath)
} catch (err) {
log.error({ err, tmpPath }, 'could not delete file from temp storage')
}
}
async putPermanent(
cid: CID,
bytes: Uint8Array | stream.Readable,
): Promise<void> {
await fs.writeFile(this.getStoredPath(cid), bytes)
}
async getBytes(cid: CID): Promise<Uint8Array> {
try {
return await fs.readFile(this.getStoredPath(cid))
} catch (err) {
if (isErrnoException(err) && err.code === 'ENOENT') {
throw new BlobNotFoundError()
}
throw err
}
}
async getStream(cid: CID): Promise<stream.Readable> {
try {
const handle = await fs.open(this.getStoredPath(cid), 'r')
return handle.createReadStream()
} catch (err) {
if (isErrnoException(err) && err.code === 'ENOENT') {
throw new BlobNotFoundError()
}
throw err
}
}
}
const fileExists = (location: string): Promise<boolean> => {
return new Promise((resolve, reject) => {
fsSync.stat(location, (err) => {
if (err) {
if (err.code === 'ENOENT') {
return resolve(false)
} else {
return reject(err)
}
} else {
resolve(true)
}
})
})
}
export default DiskBlobStore

@ -0,0 +1,2 @@
export * from './disk-blobstore'
export * from './memory-blobstore'

@ -0,0 +1,73 @@
import stream from 'stream'
import { CID } from 'multiformats/cid'
import { BlobNotFoundError, BlobStore } from '@atproto/repo'
import { randomStr } from '@atproto/crypto'
import { streamToArray } from '@atproto/common'
export class MemoryBlobStore implements BlobStore {
temp: Map<string, Uint8Array> = new Map()
blocks: Map<string, Uint8Array> = new Map()
constructor() {}
private genKey() {
return randomStr(32, 'base32')
}
async hasTemp(key: string): Promise<boolean> {
return this.temp.has(key)
}
async hasStored(cid: CID): Promise<boolean> {
return this.blocks.has(cid.toString())
}
async putTemp(bytes: Uint8Array | stream.Readable): Promise<string> {
const key = this.genKey()
let byteArray: Uint8Array
if (ArrayBuffer.isView(bytes)) {
byteArray = bytes
} else {
byteArray = await streamToArray(bytes)
}
this.temp.set(key, byteArray)
return key
}
async makePermanent(key: string, cid: CID): Promise<void> {
const value = this.temp.get(key)
if (!value) {
throw new BlobNotFoundError()
}
this.blocks.set(cid.toString(), value)
this.temp.delete(key)
}
async putPermanent(
cid: CID,
bytes: Uint8Array | stream.Readable,
): Promise<void> {
let byteArray: Uint8Array
if (ArrayBuffer.isView(bytes)) {
byteArray = bytes
} else {
byteArray = await streamToArray(bytes)
}
this.blocks.set(cid.toString(), byteArray)
}
async getBytes(cid: CID): Promise<Uint8Array> {
const value = this.blocks.get(cid.toString())
if (!value) {
throw new BlobNotFoundError()
}
return value
}
async getStream(cid: CID): Promise<stream.Readable> {
const bytes = await this.getBytes(cid)
return stream.Readable.from(bytes)
}
}
export default MemoryBlobStore

@ -1,179 +0,0 @@
import { CID } from 'multiformats/cid'
import {
DeleteOp,
RecordCreateOp,
RecordUpdateOp,
RecordWriteOp,
Repo,
} from '@atproto/repo'
import * as auth from '@atproto/auth'
import { AtUri } from '@atproto/uri'
import Database from '../db'
import SqlBlockstore from '../sql-blockstore'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { cidForData } from '@atproto/common'
export const createRepo = async (
dbTxn: Database,
did: string,
authStore: auth.AuthStore,
writes: PreparedCreate[],
now: string,
) => {
dbTxn.assertTransaction()
const blockstore = new SqlBlockstore(dbTxn, did, now)
const writeOps = writes.map((write) => write.op)
const repo = await Repo.create(blockstore, did, authStore, writeOps)
await dbTxn.db
.insertInto('repo_root')
.values({
did: did,
root: repo.cid.toString(),
indexedAt: now,
})
.execute()
}
export const writeToRepo = async (
dbTxn: Database,
did: string,
authStore: auth.AuthStore,
writes: PreparedWrites,
now: string,
) => {
dbTxn.assertTransaction()
const blockstore = new SqlBlockstore(dbTxn, did, now)
const currRoot = await dbTxn.getRepoRoot(did, true)
if (!currRoot) {
throw new InvalidRequestError(
`${did} is not a registered repo on this server`,
)
}
const writeOps = writes.map((write) => write.op)
const repo = await Repo.load(blockstore, currRoot)
await repo
.stageUpdate(writeOps)
.createCommit(authStore, async (prev, curr) => {
const success = await dbTxn.updateRepoRoot(did, curr, prev, now)
if (!success) {
throw new Error('Repo root update failed, could not linearize')
}
return null
})
}
export const indexWrites = async (
dbTxn: Database,
writes: PreparedWrites,
now: string,
) => {
dbTxn.assertTransaction()
await Promise.all(
writes.map(async (write) => {
if (write.action === 'create') {
await dbTxn.indexRecord(write.uri, write.cid, write.op.value, now)
} else if (write.action === 'delete') {
await dbTxn.deleteRecord(write.uri)
}
}),
)
}
export type PreparedCreate = {
action: 'create'
uri: AtUri
cid: CID
op: RecordCreateOp
}
export type PreparedUpdate = {
action: 'update'
uri: AtUri
cid: CID
op: RecordUpdateOp
}
export type PreparedDelete = {
action: 'delete'
uri: AtUri
op: DeleteOp
}
export type PreparedWrites = (
| PreparedCreate
| PreparedUpdate
| PreparedDelete
)[]
export const prepareCreate = async (
did: string,
write: RecordCreateOp,
): Promise<PreparedCreate> => {
const record = {
...write.value,
$type: write.collection,
}
return {
action: 'create',
uri: AtUri.make(did, write.collection, write.rkey),
cid: await cidForData(record),
op: {
...write,
value: record,
},
}
}
export const prepareCreates = async (
did: string,
writes: RecordCreateOp[],
): Promise<PreparedCreate[]> => {
return Promise.all(writes.map((write) => prepareCreate(did, write)))
}
export const prepareUpdate = async (
did: string,
write: RecordUpdateOp,
): Promise<PreparedUpdate> => {
const record = {
...write.value,
$type: write.collection,
}
return {
action: 'update',
uri: AtUri.make(did, write.collection, write.rkey),
cid: await cidForData(record),
op: {
...write,
value: record,
},
}
}
export const prepareDelete = (did: string, write: DeleteOp): PreparedDelete => {
return {
action: 'delete',
uri: AtUri.make(did, write.collection, write.rkey),
op: write,
}
}
export const prepareWrites = async (
did: string,
writes: RecordWriteOp | RecordWriteOp[],
): Promise<PreparedWrites> => {
const writesArr = Array.isArray(writes) ? writes : [writes]
return Promise.all(
writesArr.map((write) => {
if (write.action === 'create') {
return prepareCreate(did, write)
} else if (write.action === 'delete') {
return prepareDelete(did, write)
} else if (write.action === 'update') {
return prepareUpdate(did, write)
} else {
throw new Error(`Action not supported: ${write}`)
}
}),
)
}

@ -1,13 +1,22 @@
import { AddressInfo } from 'net'
import http from 'http'
import os from 'os'
import path from 'path'
import * as crypto from '@atproto/crypto'
import * as plc from '@atproto/plc'
import { AtUri } from '@atproto/uri'
import { CID } from 'multiformats/cid'
import * as uint8arrays from 'uint8arrays'
import server, { ServerConfig, Database, App } from '../src/index'
import server, {
ServerConfig,
Database,
App,
MemoryBlobStore,
} from '../src/index'
import * as GetAuthorFeed from '../src/lexicon/types/app/bsky/feed/getAuthorFeed'
import * as GetTimeline from '../src/lexicon/types/app/bsky/feed/getTimeline'
import DiskBlobStore from '../src/storage/disk-blobstore'
import { randomStr } from '@atproto/crypto'
const ADMIN_PASSWORD = 'admin-pass'
@ -18,6 +27,7 @@ export type TestServerInfo = {
serverKey: string
app: App
db: Database
blobstore: DiskBlobStore | MemoryBlobStore
close: CloseFn
}
@ -43,6 +53,8 @@ export const runTestServer = async (
'https://pds.public.url',
)
const blobstoreLoc = path.join(os.tmpdir(), randomStr(5, 'base32'))
const config = new ServerConfig({
debugMode: true,
version: '0.0.0',
@ -58,7 +70,12 @@ export const runTestServer = async (
appUrlPasswordReset: 'app://forgot-password',
emailNoReplyAddress: 'noreply@blueskyweb.xyz',
publicUrl: 'https://pds.public.url',
imgUriSalt: '9dd04221f5755bce5f55f47464c27e1e',
imgUriKey:
'f23ecd142835025f42c3db2cf25dd813956c178392760256211f9d315f8ab4d8',
dbPostgresUrl: process.env.DB_POSTGRES_URL,
blobstoreLocation: `${blobstoreLoc}/blobs`,
blobstoreTmp: `${blobstoreLoc}/tmp`,
...params,
})
@ -72,7 +89,15 @@ export const runTestServer = async (
await db.migrateToLatestOrThrow()
const { app, listener } = server(db, keypair, config)
const blobstore =
config.blobstoreLocation !== undefined
? await DiskBlobStore.create(
config.blobstoreLocation,
config.blobstoreTmp,
)
: new MemoryBlobStore()
const { app, listener } = server(db, blobstore, keypair, config)
const pdsPort = (listener.address() as AddressInfo).port
return {
@ -81,6 +106,7 @@ export const runTestServer = async (
serverKey: keypair.did(),
app,
db,
blobstore,
close: async () => {
await Promise.all([
db.close(),

@ -0,0 +1,156 @@
import fs from 'fs/promises'
import AtpApi, { ServiceClient as AtpServiceClient } from '@atproto/api'
import { CloseFn, runTestServer } from './_util'
import { CID } from 'multiformats/cid'
import { Database, ServerConfig } from '../src'
import DiskBlobStore from '../src/storage/disk-blobstore'
import * as uint8arrays from 'uint8arrays'
import * as image from '../src/image'
import axios from 'axios'
const alice = {
email: 'alice@test.com',
handle: 'alice.test',
did: '',
password: 'alice-pass',
}
const bob = {
email: 'bob@test.com',
handle: 'bob.test',
did: '',
password: 'bob-pass',
}
describe('file uploads', () => {
let client: AtpServiceClient
let aliceClient: AtpServiceClient
let blobstore: DiskBlobStore
let db: Database
let cfg: ServerConfig
let serverUrl: string
let close: CloseFn
beforeAll(async () => {
const server = await runTestServer({
dbPostgresSchema: 'file-uploads',
})
blobstore = server.blobstore as DiskBlobStore
db = server.db
close = server.close
client = AtpApi.service(server.url)
aliceClient = AtpApi.service(server.url)
cfg = server.cfg
serverUrl = server.url
})
afterAll(async () => {
await close()
})
it('registers users', async () => {
const res = await client.com.atproto.account.create({
email: alice.email,
handle: alice.handle,
password: alice.password,
})
aliceClient.setHeader('authorization', `Bearer ${res.data.accessJwt}`)
alice.did = res.data.did
const res2 = await client.com.atproto.account.create({
email: bob.email,
handle: bob.handle,
password: bob.password,
})
bob.did = res2.data.did
})
let smallCid: CID
let smallFile: Uint8Array
it('uploads files', async () => {
smallFile = await fs.readFile('tests/image/fixtures/key-portrait-small.jpg')
const res = await aliceClient.com.atproto.data.uploadFile(smallFile, {
encoding: 'image/jpeg',
} as any)
smallCid = CID.parse(res.data.cid)
const found = await db.db
.selectFrom('blob')
.selectAll()
.where('cid', '=', smallCid.toString())
.executeTakeFirst()
expect(found?.mimeType).toBe('image/jpeg')
expect(found?.size).toBe(smallFile.length)
expect(found?.tempKey).toBeDefined()
expect(found?.width).toBe(87)
expect(found?.height).toBe(150)
expect(await blobstore.hasTemp(found?.tempKey as string)).toBeTruthy()
})
it('can reference the file', async () => {
await aliceClient.app.bsky.actor.updateProfile({
displayName: 'Alice',
avatar: { cid: smallCid.toString(), mimeType: 'image/jpeg' },
})
})
it('after being referenced, the file is moved to permanent storage', async () => {
const found = await db.db
.selectFrom('blob')
.selectAll()
.where('cid', '=', smallCid.toString())
.executeTakeFirst()
expect(found?.tempKey).toBeNull()
expect(await blobstore.hasStored(smallCid)).toBeTruthy()
const storedBytes = await blobstore.getBytes(smallCid)
expect(uint8arrays.equals(smallFile, storedBytes)).toBeTruthy()
})
it('serves the referenced blob', async () => {
const profile = await aliceClient.app.bsky.actor.getProfile({
actor: 'alice.test',
})
const avatar = profile.data.avatar as string
expect(typeof avatar).toBe('string')
const url = avatar.replace(cfg.publicUrl, serverUrl)
const res = await axios.get(url, { responseType: 'stream' })
expect(res.headers['content-type']).toBe('image/jpeg')
const info = await image.getInfo(res.data)
expect(info).toEqual(
expect.objectContaining({
height: 250,
width: 250,
}),
)
})
let largeCid: CID
let largeFile: Uint8Array
it('does not allow referencing a file that is outside blob constraints', async () => {
largeFile = await fs.readFile('tests/image/fixtures/key-portrait-large.jpg')
const res = await aliceClient.com.atproto.data.uploadFile(largeFile, {
encoding: 'image/jpeg',
} as any)
largeCid = CID.parse(res.data.cid)
const profilePromise = aliceClient.app.bsky.actor.updateProfile({
avatar: { cid: largeCid.toString(), mimeType: 'image/jpeg' },
})
await expect(profilePromise).rejects.toThrow()
})
it('does not make a blob permanent if referencing failed', async () => {
const found = await db.db
.selectFrom('blob')
.selectAll()
.where('cid', '=', largeCid.toString())
.executeTakeFirst()
expect(found?.tempKey).toBeDefined()
expect(await blobstore.hasTemp(found?.tempKey as string)).toBeTruthy()
expect(await blobstore.hasStored(largeCid)).toBeFalsy()
})
})

@ -1,24 +1,35 @@
import * as http from 'http'
import os from 'os'
import path from 'path'
import fs from 'fs'
import { AddressInfo } from 'net'
import * as uint8arrays from 'uint8arrays'
import axios, { AxiosInstance } from 'axios'
import { getInfo } from '../../src/image/sharp'
import {
BlobDiskCache,
BlobDiskStorage,
ImageProcessingServer,
} from '../../src/image/server'
import { BlobDiskCache, ImageProcessingServer } from '../../src/image/server'
import { DiskBlobStore } from '../../src'
import { cidForData } from '@atproto/common'
import { CID } from 'multiformats/cid'
describe('image processing server', () => {
let server: ImageProcessingServer
let httpServer: http.Server
let client: AxiosInstance
beforeAll(() => {
const b64Bytes = (b64: string) => uint8arrays.fromString(b64, 'base64')
const salt = b64Bytes('ndBCIfV1W85fVfR0ZMJ+Hg==')
const key = b64Bytes('8j7NFCg1Al9Cw9ss8l3YE5VsF4OSdgJWIR+dMV+KtNg=')
const storage = new BlobDiskStorage(`${__dirname}/fixtures`)
let fileCid: CID
beforeAll(async () => {
const salt = '9dd04221f5755bce5f55f47464c27e1e'
const key =
'f23ecd142835025f42c3db2cf25dd813956c178392760256211f9d315f8ab4d8'
const storage = await DiskBlobStore.create(
path.join(os.tmpdir(), 'img-processing-tests'),
)
// this CID isn't accurate for the data, but it works for the sake of the test
fileCid = await cidForData('key-landscape-small')
await storage.putPermanent(
fileCid,
fs.createReadStream('tests/image/fixtures/key-landscape-small.jpg'),
)
const cache = new BlobDiskCache()
server = new ImageProcessingServer(salt, key, storage, cache)
httpServer = server.app.listen()
@ -37,7 +48,7 @@ describe('image processing server', () => {
it('processes image from storage.', async () => {
const res = await client.get(
server.uriBuilder.getSignedPath({
fileId: 'key-landscape-small.jpg',
cid: fileCid,
format: 'jpeg',
fit: 'cover',
width: 500,
@ -66,7 +77,7 @@ describe('image processing server', () => {
it('caches results.', async () => {
const path = server.uriBuilder.getSignedPath({
fileId: 'key-landscape-small.jpg',
cid: fileCid,
format: 'jpeg',
width: 25, // Special number for this test
height: 25,
@ -83,7 +94,7 @@ describe('image processing server', () => {
it('errors on bad signature.', async () => {
const path = server.uriBuilder.getSignedPath({
fileId: 'key-landscape-small.jpg',
cid: fileCid,
format: 'jpeg',
fit: 'cover',
width: 500,
@ -91,17 +102,18 @@ describe('image processing server', () => {
min: true,
})
expect(path).toEqual(
'/anzVzkZ7zMwD0Hrz5pwa5imHXis1ayKbWwgBgKvgjkM/rs:fill:500:500:1:0/plain/key-landscape-small.jpg@jpeg',
`/G37yf764s6331dxOWiaOYEiLdg8OJxeE-RNxPDKB9Ck/rs:fill:500:500:1:0/plain/${fileCid.toString()}@jpeg`,
)
const res = await client.get(path.replace('/a', '/bad_'), {})
const res = await client.get(path.replace('/G', '/bad_'), {})
expect(res.status).toEqual(400)
expect(res.data).toEqual({ message: 'Invalid path: bad signature' })
})
it('errors on missing file.', async () => {
const missingCid = await cidForData('missing-file')
const res = await client.get(
server.uriBuilder.getSignedPath({
fileId: 'missing-file.jpg',
cid: missingCid,
format: 'jpeg',
fit: 'cover',
width: 500,

@ -1,29 +1,33 @@
import * as uint8arrays from 'uint8arrays'
import { cidForData } from '@atproto/common'
import { CID } from 'multiformats/cid'
import { ImageUriBuilder, BadPathError } from '../../src/image/uri'
describe('image uri builder', () => {
let uriBuilder: ImageUriBuilder
let cid: CID
beforeAll(() => {
const b64Bytes = (b64: string) => uint8arrays.fromString(b64, 'base64')
const salt = b64Bytes('ndBCIfV1W85fVfR0ZMJ+Hg==')
const key = b64Bytes('8j7NFCg1Al9Cw9ss8l3YE5VsF4OSdgJWIR+dMV+KtNg=')
uriBuilder = new ImageUriBuilder(salt, key)
beforeAll(async () => {
const endpoint = 'https://example.com'
const salt = '9dd04221f5755bce5f55f47464c27e1e'
const key =
'f23ecd142835025f42c3db2cf25dd813956c178392760256211f9d315f8ab4d8'
uriBuilder = new ImageUriBuilder(endpoint, salt, key)
cid = await cidForData('test cid')
})
it('signs and verifies uri options.', () => {
const path = uriBuilder.getSignedPath({
fileId: 'dd180f3',
cid,
format: 'png',
height: 200,
width: 300,
})
expect(path).toEqual(
'/BtHM_4IOak5MOc2gOPDxbfS4_HG6VPcry2OAV03L29g/rs:fill:300:200:0:0/plain/dd180f3@png',
`/8Lpp5Y4ZQFkwTxDDgc1hz8haG6-lUBHsGsyNYoDEaXc/rs:fill:300:200:0:0/plain/${cid.toString()}@png`,
)
expect(uriBuilder.getVerifiedOptions(path)).toEqual({
signature: 'BtHM_4IOak5MOc2gOPDxbfS4_HG6VPcry2OAV03L29g',
fileId: 'dd180f3',
signature: '8Lpp5Y4ZQFkwTxDDgc1hz8haG6-lUBHsGsyNYoDEaXc',
cid,
format: 'png',
fit: 'cover',
height: 200,
@ -38,41 +42,41 @@ describe('image uri builder', () => {
tryGetVerifiedOptions(
// Confirm this is a good signed uri
'/BtHM_4IOak5MOc2gOPDxbfS4_HG6VPcry2OAV03L29g/rs:fill:300:200:0:0/plain/dd180f3@png',
`/BtHM_4IOak5MOc2gOPDxbfS4_HG6VPcry2OAV03L29g/rs:fill:300:200:0:0/plain/${cid.toString()}@png`,
)
expect(
tryGetVerifiedOptions(
// Tamper with signature
'/DtHM_4IOak5MOc2gOPDxbfS4_HG6VPcry2OAV03L29g/rs:fill:300:200:0:0/plain/dd180f3@png',
`/DtHM_4IOak5MOc2gOPDxbfS4_HG6VPcry2OAV03L29g/rs:fill:300:200:0:0/plain/${cid.toString()}@png`,
),
).toThrow(new BadPathError('Invalid path: bad signature'))
expect(
tryGetVerifiedOptions(
// Tamper with params
'/DtHM_4IOak5MOc2gOPDxbfS4_HG6VPcry2OAV03L29g/rs:fill:300:200:0:0/plain/dd180f3@jpeg',
`/DtHM_4IOak5MOc2gOPDxbfS4_HG6VPcry2OAV03L29g/rs:fill:300:200:0:0/plain/${cid.toString()}@jpeg`,
),
).toThrow(new BadPathError('Invalid path: bad signature'))
expect(
tryGetVerifiedOptions(
// Missing signature
'/rs:fill:300:200:0:0/plain/dd180f3@jpeg',
`/rs:fill:300:200:0:0/plain/${cid.toString()}@jpeg`,
),
).toThrow(new BadPathError('Invalid path: missing signature'))
})
it('supports basic options.', () => {
const path = ImageUriBuilder.getPath({
fileId: 'dd180f3',
cid,
format: 'png',
height: 200,
width: 300,
})
expect(path).toEqual('/rs:fill:300:200:0:0/plain/dd180f3@png')
expect(path).toEqual(`/rs:fill:300:200:0:0/plain/${cid.toString()}@png`)
expect(ImageUriBuilder.getOptions(path)).toEqual({
fileId: 'dd180f3',
cid,
format: 'png',
fit: 'cover',
height: 200,
@ -83,15 +87,15 @@ describe('image uri builder', () => {
it('supports fit option.', () => {
const path = ImageUriBuilder.getPath({
fileId: 'dd180f3',
cid,
format: 'png',
fit: 'inside',
height: 200,
width: 300,
})
expect(path).toEqual('/rs:fit:300:200:0:0/plain/dd180f3@png')
expect(path).toEqual(`/rs:fit:300:200:0:0/plain/${cid.toString()}@png`)
expect(ImageUriBuilder.getOptions(path)).toEqual({
fileId: 'dd180f3',
cid,
format: 'png',
fit: 'inside',
height: 200,
@ -102,15 +106,15 @@ describe('image uri builder', () => {
it('supports min=true option.', () => {
const path = ImageUriBuilder.getPath({
fileId: 'dd180f3',
cid,
format: 'png',
height: 200,
width: 300,
min: true,
})
expect(path).toEqual('/rs:fill:300:200:1:0/plain/dd180f3@png')
expect(path).toEqual(`/rs:fill:300:200:1:0/plain/${cid.toString()}@png`)
expect(ImageUriBuilder.getOptions(path)).toEqual({
fileId: 'dd180f3',
cid,
format: 'png',
fit: 'cover',
height: 200,
@ -121,15 +125,17 @@ describe('image uri builder', () => {
it('supports min={height,width} option.', () => {
const path = ImageUriBuilder.getPath({
fileId: 'dd180f3',
cid,
format: 'jpeg',
height: 200,
width: 300,
min: { height: 50, width: 100 },
})
expect(path).toEqual('/rs:fill:300:200:0:0/mw:100/mh:50/plain/dd180f3@jpeg')
expect(path).toEqual(
`/rs:fill:300:200:0:0/mw:100/mh:50/plain/${cid.toString()}@jpeg`,
)
expect(ImageUriBuilder.getOptions(path)).toEqual({
fileId: 'dd180f3',
cid,
format: 'jpeg',
fit: 'cover',
height: 200,
@ -138,69 +144,57 @@ describe('image uri builder', () => {
})
})
it('supports encoded fileId.', () => {
const path = ImageUriBuilder.getPath({
fileId: 'has space',
format: 'jpeg',
height: 200,
width: 300,
})
expect(path).toEqual('/rs:fill:300:200:0:0/plain/has%20space@jpeg')
expect(ImageUriBuilder.getOptions(path)).toEqual({
fileId: 'has space',
format: 'jpeg',
fit: 'cover',
height: 200,
width: 300,
min: false,
})
})
it('errors on bad fileId/format part.', () => {
expect(tryGetOptions('/rs:fill:300:200:1:0/plain/dd180f3@mp4')).toThrow(
new BadPathError('Invalid path: bad fileId/format part'),
)
expect(tryGetOptions('/rs:fill:300:200:1:0/plain/@jpg')).toThrow(
new BadPathError('Invalid path: bad fileId/format part'),
)
expect(tryGetOptions('/rs:fill:300:200:1:0/plain/dd180f3@')).toThrow(
new BadPathError('Invalid path: bad fileId/format part'),
)
expect(tryGetOptions('/rs:fill:300:200:1:0/plain/dd180f3@')).toThrow(
new BadPathError('Invalid path: bad fileId/format part'),
)
expect(tryGetOptions('/rs:fill:300:200:1:0/plain/dd180f3@x@jpeg')).toThrow(
new BadPathError('Invalid path: bad fileId/format part'),
it('errors on bad cid/format part.', () => {
expect(
tryGetOptions(`/rs:fill:300:200:1:0/plain/${cid.toString()}@mp4`),
).toThrow(new BadPathError('Invalid path: bad cid/format part'))
expect(tryGetOptions(`/rs:fill:300:200:1:0/plain/@jpg`)).toThrow(
new BadPathError('Invalid path: bad cid/format part'),
)
expect(
tryGetOptions(`/rs:fill:300:200:1:0/plain/${cid.toString()}@`),
).toThrow(new BadPathError('Invalid path: bad cid/format part'))
expect(
tryGetOptions(`/rs:fill:300:200:1:0/plain/${cid.toString()}@`),
).toThrow(new BadPathError('Invalid path: bad cid/format part'))
expect(
tryGetOptions(`/rs:fill:300:200:1:0/plain/${cid.toString()}@x@jpeg`),
).toThrow(new BadPathError('Invalid path: bad cid/format part'))
})
it('errors on mismatching min settings.', () => {
expect(
tryGetOptions('/rs:fill:300:200:1:0/mw:100/mh:50/plain/dd180f3@jpeg'),
tryGetOptions(
`/rs:fill:300:200:1:0/mw:100/mh:50/plain/${cid.toString()}@jpeg`,
),
).toThrow(new BadPathError('Invalid path: bad min width/height param'))
expect(
tryGetOptions('/rs:fill:300:200:0:0/mw:100/plain/dd180f3@jpeg'),
tryGetOptions(`/rs:fill:300:200:0:0/mw:100/plain/${cid.toString()}@jpeg`),
).toThrow(new BadPathError('Invalid path: bad min width/height param'))
})
it('errors on bad fit setting.', () => {
expect(tryGetOptions('/rs:blah:300:200:1:0/plain/dd180f3@jpeg')).toThrow(
new BadPathError('Invalid path: bad resize fit param'),
)
expect(
tryGetOptions(`/rs:blah:300:200:1:0/plain/${cid.toString()}@jpeg`),
).toThrow(new BadPathError('Invalid path: bad resize fit param'))
})
it('errors on bad dimension settings.', () => {
expect(tryGetOptions('/rs:fill:30x:200:1:0/plain/dd180f3@jpeg')).toThrow(
new BadPathError('Invalid path: bad resize height/width param'),
)
expect(tryGetOptions('/rs:fill:300:20x:1:0/plain/dd180f3@jpeg')).toThrow(
new BadPathError('Invalid path: bad resize height/width param'),
)
expect(
tryGetOptions('/rs:fill:300:200:1:0/mw:10x/mh:50/plain/dd180f3@jpeg'),
tryGetOptions(`/rs:fill:30x:200:1:0/plain/${cid.toString()}@jpeg`),
).toThrow(new BadPathError('Invalid path: bad resize height/width param'))
expect(
tryGetOptions(`/rs:fill:300:20x:1:0/plain/${cid.toString()}@jpeg`),
).toThrow(new BadPathError('Invalid path: bad resize height/width param'))
expect(
tryGetOptions(
`/rs:fill:300:200:1:0/mw:10x/mh:50/plain/${cid.toString()}@jpeg`,
),
).toThrow(new BadPathError('Invalid path: bad min width/height param'))
expect(
tryGetOptions('/rs:fill:300:200:1:0/mw:100/mh:5x/plain/dd180f3@jpeg'),
tryGetOptions(
`/rs:fill:300:200:1:0/mw:100/mh:5x/plain/${cid.toString()}@jpeg`,
),
).toThrow(new BadPathError('Invalid path: bad min width/height param'))
})

@ -96,6 +96,26 @@ Object {
}
`;
exports[`pds profile views handles avatars 1`] = `
Object {
"avatar": "https://pds.public.url/image/w4t9ji0l53vlWd7Res2ZSFAL6QS2iaI9RGCt6Gs5reE/rs:fill:250:250:1:0/plain/bafkreiaivizp4xldojmmpuzmiu75cmea7nq56dnntnuhzhsjcb63aou5ei@jpeg",
"creator": "user(0)",
"declaration": Object {
"actorType": "app.bsky.system.actorUser",
"cid": "cids(0)",
},
"description": "blah blah",
"did": "user(0)",
"displayName": "ali",
"followersCount": 2,
"followsCount": 3,
"handle": "alice.test",
"membersCount": 0,
"myState": Object {},
"postsCount": 4,
}
`;
exports[`pds profile views handles partial updates 1`] = `
Object {
"creator": "user(0)",

@ -1,3 +1,4 @@
import fs from 'fs/promises'
import AtpApi, { ServiceClient as AtpServiceClient } from '@atproto/api'
import { runTestServer, forSnapshot, CloseFn } from '../_util'
import { SeedClient } from '../seeds/client'
@ -98,6 +99,26 @@ describe('pds profile views', () => {
expect(forSnapshot(aliceForAlice.data)).toMatchSnapshot()
})
it('handles avatars', async () => {
const img = await fs.readFile('tests/image/fixtures/key-portrait-small.jpg')
const res = await client.com.atproto.data.uploadFile(img, {
headers: sc.getHeaders(alice),
encoding: 'image/jpeg',
} as any)
await client.app.bsky.actor.updateProfile(
{ avatar: { cid: res.data.cid, mimeType: 'image/jpeg' } },
{ headers: sc.getHeaders(alice), encoding: 'application/json' },
)
const aliceForAlice = await client.app.bsky.actor.getProfile(
{ actor: alice },
{ headers: sc.getHeaders(alice) },
)
expect(forSnapshot(aliceForAlice.data)).toMatchSnapshot()
})
it('creates new profile', async () => {
await client.app.bsky.actor.updateProfile(
{ displayName: 'danny boy' },

@ -7,7 +7,6 @@ const cid = z
message: 'Not a CID',
})
.transform((obj: unknown) => mf.CID.asCID(obj) as mf.CID)
export type CID = z.infer<typeof cid>
const documentData = z.object({
did: z.string(),

@ -1,6 +1,7 @@
export * from './blockstore'
export * from './repo'
export * from './mst'
export * from './storage'
export * from './types'
export * from './verify'
export * from './util'

@ -0,0 +1 @@
export * from './types'

@ -0,0 +1,12 @@
import stream from 'stream'
import { CID } from 'multiformats/cid'
export interface BlobStore {
putTemp(bytes: Uint8Array | stream.Readable): Promise<string>
makePermanent(key: string, cid: CID): Promise<void>
putPermanent(cid: CID, bytes: Uint8Array | stream.Readable): Promise<void>
getBytes(cid: CID): Promise<Uint8Array>
getStream(cid: CID): Promise<stream.Readable>
}
export class BlobNotFoundError extends Error {}

@ -10,6 +10,7 @@ import {
InvalidRequestError,
InternalServerError,
} from './types'
import { cloneStream } from '@atproto/common'
export function decodeQueryParams(
def: LexXrpcProcedure | LexXrpcQuery,
@ -82,7 +83,7 @@ export function validateInput(
return undefined
}
// input schema
// if input schema, validate
if (def.input?.schema) {
try {
lexicons.assertValidXrpcInput(nsid, req.body)
@ -91,9 +92,18 @@ export function validateInput(
}
}
// if middleware already got the body, we pass that along as input
// otherwise, we pipe it into a readable stream
let body
if (req.complete) {
body = req.body
} else {
body = cloneStream(req)
}
return {
encoding: inputEncoding,
body: req.body,
body,
}
}
@ -155,11 +165,20 @@ function isValidEncoding(possibleStr: string, value: string) {
const possible = possibleStr.split(',').map((v) => v.trim())
const normalized = normalizeMime(value)
if (!normalized) return false
if (possible.includes('*/*')) return true
return possible.includes(normalized)
}
function hasBody(req: express.Request) {
export function hasBody(req: express.Request) {
const contentLength = req.headers['content-length']
const transferEncoding = req.headers['transfer-encoding']
return (contentLength && parseInt(contentLength, 10) > 0) || transferEncoding
}
export function processBodyAsBytes(req: express.Request): Promise<Uint8Array> {
return new Promise((resolve) => {
const chunks: Buffer[] = []
req.on('data', (chunk) => chunks.push(chunk))
req.on('end', () => resolve(new Uint8Array(Buffer.concat(chunks))))
})
}

@ -28,7 +28,7 @@
{ "path": "./packages/pds/tsconfig.build.json" },
{ "path": "./packages/api/tsconfig.build.json" },
{ "path": "./packages/auth/tsconfig.build.json" },
{ "path": "./packages/aws-kms/tsconfig.build.json" },
{ "path": "./packages/aws/tsconfig.build.json" },
{ "path": "./packages/common/tsconfig.build.json" },
{ "path": "./packages/crypto/tsconfig.build.json" },
{ "path": "./packages/dev-env" },

875
yarn.lock

File diff suppressed because it is too large Load Diff