Get blobs ()

* refactor repo routes

* basic blob route

* getBlob route

* tidy

* move getBlob to sync

* allow mimetype on getBlob

* creator on blob table

* migration

* migration

* handle deletes & check db on getBlob

* fix content type bug

* back to octet-stream

* Update packages/pds/src/api/com/atproto/sync/getBlob.ts

Co-authored-by: devin ivy <devinivy@gmail.com>

* fix up migrations

* pr feedback

* fixing up merge & migration

* patched up migration

---------

Co-authored-by: devin ivy <devinivy@gmail.com>
This commit is contained in:
Daniel Holmgren 2023-03-13 19:11:47 -05:00 committed by GitHub
parent 5b20053e36
commit afa28c709f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 785 additions and 254 deletions

@ -0,0 +1,21 @@
{
"lexicon": 1,
"id": "com.atproto.sync.getBlob",
"defs": {
"main": {
"type": "query",
"description": "Get a blob associated with a given repo.",
"parameters": {
"type": "params",
"required": ["did", "cid"],
"properties": {
"did": {"type": "string", "description": "The DID of the repo."},
"cid": {"type": "string", "description": "The CID of the blob to fetch"}
}
},
"output": {
"encoding": "*/*"
}
}
}
}

@ -49,6 +49,7 @@ import * as ComAtprotoSessionCreate from './types/com/atproto/session/create'
import * as ComAtprotoSessionDelete from './types/com/atproto/session/delete'
import * as ComAtprotoSessionGet from './types/com/atproto/session/get'
import * as ComAtprotoSessionRefresh from './types/com/atproto/session/refresh'
import * as ComAtprotoSyncGetBlob from './types/com/atproto/sync/getBlob'
import * as ComAtprotoSyncGetBlocks from './types/com/atproto/sync/getBlocks'
import * as ComAtprotoSyncGetCheckout from './types/com/atproto/sync/getCheckout'
import * as ComAtprotoSyncGetCommitPath from './types/com/atproto/sync/getCommitPath'
@ -139,6 +140,7 @@ export * as ComAtprotoSessionCreate from './types/com/atproto/session/create'
export * as ComAtprotoSessionDelete from './types/com/atproto/session/delete'
export * as ComAtprotoSessionGet from './types/com/atproto/session/get'
export * as ComAtprotoSessionRefresh from './types/com/atproto/session/refresh'
export * as ComAtprotoSyncGetBlob from './types/com/atproto/sync/getBlob'
export * as ComAtprotoSyncGetBlocks from './types/com/atproto/sync/getBlocks'
export * as ComAtprotoSyncGetCheckout from './types/com/atproto/sync/getCheckout'
export * as ComAtprotoSyncGetCommitPath from './types/com/atproto/sync/getCommitPath'
@ -703,6 +705,17 @@ export class SyncNS {
this._service = service
}
getBlob(
params?: ComAtprotoSyncGetBlob.QueryParams,
opts?: ComAtprotoSyncGetBlob.CallOptions,
): Promise<ComAtprotoSyncGetBlob.Response> {
return this._service.xrpc
.call('com.atproto.sync.getBlob', params, undefined, opts)
.catch((e) => {
throw ComAtprotoSyncGetBlob.toKnownErr(e)
})
}
getBlocks(
params?: ComAtprotoSyncGetBlocks.QueryParams,
opts?: ComAtprotoSyncGetBlocks.CallOptions,

@ -2025,6 +2025,33 @@ export const schemaDict = {
},
},
},
ComAtprotoSyncGetBlob: {
lexicon: 1,
id: 'com.atproto.sync.getBlob',
defs: {
main: {
type: 'query',
description: 'Get a blob associated with a given repo.',
parameters: {
type: 'params',
required: ['did', 'cid'],
properties: {
did: {
type: 'string',
description: 'The DID of the repo.',
},
cid: {
type: 'string',
description: 'The CID of the blob to fetch',
},
},
},
output: {
encoding: '*/*',
},
},
},
},
ComAtprotoSyncGetBlocks: {
lexicon: 1,
id: 'com.atproto.sync.getBlocks',
@ -4171,6 +4198,7 @@ export const ids = {
ComAtprotoSessionDelete: 'com.atproto.session.delete',
ComAtprotoSessionGet: 'com.atproto.session.get',
ComAtprotoSessionRefresh: 'com.atproto.session.refresh',
ComAtprotoSyncGetBlob: 'com.atproto.sync.getBlob',
ComAtprotoSyncGetBlocks: 'com.atproto.sync.getBlocks',
ComAtprotoSyncGetCheckout: 'com.atproto.sync.getCheckout',
ComAtprotoSyncGetCommitPath: 'com.atproto.sync.getCommitPath',

@ -0,0 +1,32 @@
/**
* GENERATED CODE - DO NOT MODIFY
*/
import { Headers, XRPCError } from '@atproto/xrpc'
import { ValidationResult } from '@atproto/lexicon'
import { isObj, hasProp } from '../../../../util'
import { lexicons } from '../../../../lexicons'
export interface QueryParams {
/** The DID of the repo. */
did: string
/** The CID of the blob to fetch */
cid: string
}
export type InputSchema = undefined
export interface CallOptions {
headers?: Headers
}
export interface Response {
success: boolean
headers: Headers
data: Uint8Array
}
export function toKnownErr(e: any) {
if (e instanceof XRPCError) {
}
return e
}

@ -1,6 +1,6 @@
import * as aws from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage'
import { BlobStore } from '@atproto/repo'
import { BlobStore, BlobNotFoundError } from '@atproto/repo'
import { randomStr } from '@atproto/crypto'
import { CID } from 'multiformats/cid'
import stream from 'stream'
@ -56,10 +56,16 @@ export class S3BlobStore implements BlobStore {
}
async makePermanent(key: string, cid: CID): Promise<void> {
await this.move({
from: this.getTmpPath(key),
to: this.getStoredPath(cid),
})
const alreadyHas = await this.hasStored(cid)
if (!alreadyHas) {
await this.move({
from: this.getTmpPath(key),
to: this.getStoredPath(cid),
})
} else {
// already saved, so we no-op & just delete the temp
await this.deleteKey(this.getTmpPath(key))
}
}
async putPermanent(
@ -98,7 +104,7 @@ export class S3BlobStore implements BlobStore {
if (res.Body) {
return res.Body
} else {
throw new Error(`Could not get blob: ${cid.toString()}`)
throw new BlobNotFoundError()
}
}
@ -113,9 +119,25 @@ export class S3BlobStore implements BlobStore {
}
async delete(cid: CID): Promise<void> {
await this.deleteKey(this.getStoredPath(cid))
}
async hasStored(cid: CID): Promise<boolean> {
try {
const res = await this.client.headObject({
Bucket: this.bucket,
Key: this.getStoredPath(cid),
})
return res.$metadata.httpStatusCode === 200
} catch (err) {
return false
}
}
private async deleteKey(key: string) {
await this.client.deleteObject({
Bucket: this.bucket,
Key: this.getStoredPath(cid),
Key: key,
})
}

@ -4,10 +4,11 @@ import AppContext from '../../../context'
export default function (server: Server, ctx: AppContext) {
server.com.atproto.blob.upload({
auth: ctx.accessVerifierCheckTakedown,
handler: async ({ input }) => {
handler: async ({ auth, input }) => {
const requester = auth.credentials.did
const cid = await ctx.services
.repo(ctx.db)
.blobs.addUntetheredBlob(input.encoding, input.body)
.blobs.addUntetheredBlob(requester, input.encoding, input.body)
return {
encoding: 'application/json',

@ -1,231 +0,0 @@
import { InvalidRequestError, AuthRequiredError } from '@atproto/xrpc-server'
import { AtUri } from '@atproto/uri'
import { WriteOpAction } from '@atproto/repo'
import * as didResolver from '@atproto/did-resolver'
import * as repo from '../../../repo'
import { Server } from '../../../lexicon'
import {
InvalidRecordError,
PreparedCreate,
PreparedWrite,
} from '../../../repo'
import AppContext from '../../../context'
export default function (server: Server, ctx: AppContext) {
server.com.atproto.repo.describe(async ({ params }) => {
const { user } = params
const userObj = await ctx.services.account(ctx.db).getUser(user)
if (userObj === null) {
throw new InvalidRequestError(`Could not find user: ${user}`)
}
let didDoc
try {
didDoc = await ctx.didResolver.ensureResolveDid(userObj.did)
} catch (err) {
throw new InvalidRequestError(`Could not resolve DID: ${err}`)
}
const handle = didResolver.getHandle(didDoc)
const handleIsCorrect = handle === userObj.handle
const collections = await ctx.services
.record(ctx.db)
.listCollectionsForDid(userObj.did)
return {
encoding: 'application/json',
body: {
handle: userObj.handle,
did: userObj.did,
didDoc,
collections,
handleIsCorrect,
},
}
})
server.com.atproto.repo.listRecords(async ({ params }) => {
const { user, collection, limit, before, after, reverse } = params
const did = await ctx.services.account(ctx.db).getDidForActor(user)
if (!did) {
throw new InvalidRequestError(`Could not find user: ${user}`)
}
const records = await ctx.services
.record(ctx.db)
.listRecordsForCollection(
did,
collection,
limit || 50,
reverse || false,
before,
after,
)
const lastRecord = records.at(-1)
const lastUri = lastRecord && new AtUri(lastRecord?.uri)
return {
encoding: 'application/json',
body: {
records,
// Paginate with `before` by default, paginate with `after` when using `reverse`.
cursor: lastUri?.rkey,
},
}
})
server.com.atproto.repo.getRecord(async ({ params }) => {
const { user, collection, rkey, cid } = params
const did = await ctx.services.account(ctx.db).getDidForActor(user)
if (!did) {
throw new InvalidRequestError(`Could not find user: ${user}`)
}
const uri = new AtUri(`${did}/${collection}/${rkey}`)
const record = await ctx.services.record(ctx.db).getRecord(uri, cid || null)
if (!record) {
throw new InvalidRequestError(`Could not locate record: ${uri}`)
}
return {
encoding: 'application/json',
body: record,
}
})
server.com.atproto.repo.batchWrite({
auth: ctx.accessVerifierCheckTakedown,
handler: async ({ input, auth }) => {
const tx = input.body
const { did, validate } = tx
const requester = auth.credentials.did
if (did !== requester) {
throw new AuthRequiredError()
}
if (validate === false) {
throw new InvalidRequestError(
'Unvalidated writes are not yet supported.',
)
}
const hasUpdate = tx.writes.some(
(write) => write.action === WriteOpAction.Update,
)
if (hasUpdate) {
throw new InvalidRequestError(`Updates are not yet supported.`)
}
let writes: PreparedWrite[]
try {
writes = await Promise.all(
tx.writes.map((write) => {
if (write.action === WriteOpAction.Create) {
return repo.prepareCreate({
did,
collection: write.collection,
record: write.value,
rkey: write.rkey,
validate,
})
} else if (write.action === WriteOpAction.Delete) {
return repo.prepareDelete({
did,
collection: write.collection,
rkey: write.rkey,
})
} else {
throw new InvalidRequestError(
`Action not supported: ${write.action}`,
)
}
}),
)
} catch (err) {
if (err instanceof InvalidRecordError) {
throw new InvalidRequestError(err.message)
}
throw err
}
await ctx.db.transaction(async (dbTxn) => {
const now = new Date().toISOString()
const repoTxn = ctx.services.repo(dbTxn)
await repoTxn.processWrites(did, writes, now)
})
},
})
server.com.atproto.repo.createRecord({
auth: ctx.accessVerifierCheckTakedown,
handler: async ({ input, auth }) => {
const { did, collection, record } = input.body
const validate =
typeof input.body.validate === 'boolean' ? input.body.validate : true
const requester = auth.credentials.did
if (did !== requester) {
throw new AuthRequiredError()
}
if (validate === false) {
throw new InvalidRequestError(
'Unvalidated writes are not yet supported.',
)
}
// determine key type. if undefined, repo assigns a TID
const rkey = repo.determineRkey(collection)
const now = new Date().toISOString()
let write: PreparedCreate
try {
write = await repo.prepareCreate({
did,
collection,
record,
rkey,
validate,
})
} catch (err) {
if (err instanceof InvalidRecordError) {
throw new InvalidRequestError(err.message)
}
throw err
}
await ctx.db.transaction(async (dbTxn) => {
const repoTxn = ctx.services.repo(dbTxn)
await repoTxn.processWrites(did, [write], now)
})
return {
encoding: 'application/json',
body: { uri: write.uri.toString(), cid: write.cid.toString() },
}
},
})
server.com.atproto.repo.putRecord(async () => {
throw new InvalidRequestError(`Updates are not yet supported.`)
})
server.com.atproto.repo.deleteRecord({
auth: ctx.accessVerifierCheckTakedown,
handler: async ({ input, auth }) => {
const { did, collection, rkey } = input.body
const requester = auth.credentials.did
if (did !== requester) {
throw new AuthRequiredError()
}
const now = new Date().toISOString()
const write = await repo.prepareDelete({ did, collection, rkey })
await ctx.db.transaction(async (dbTxn) => {
await ctx.services.repo(dbTxn).processWrites(did, [write], now)
})
},
})
}

@ -0,0 +1,70 @@
import { InvalidRequestError, AuthRequiredError } from '@atproto/xrpc-server'
import * as repo from '../../../../repo'
import { Server } from '../../../../lexicon'
import { InvalidRecordError, PreparedWrite } from '../../../../repo'
import AppContext from '../../../../context'
import { WriteOpAction } from '@atproto/repo'
export default function (server: Server, ctx: AppContext) {
server.com.atproto.repo.batchWrite({
auth: ctx.accessVerifierCheckTakedown,
handler: async ({ input, auth }) => {
const tx = input.body
const { did, validate } = tx
const requester = auth.credentials.did
if (did !== requester) {
throw new AuthRequiredError()
}
if (validate === false) {
throw new InvalidRequestError(
'Unvalidated writes are not yet supported.',
)
}
const hasUpdate = tx.writes.some(
(write) => write.action === WriteOpAction.Update,
)
if (hasUpdate) {
throw new InvalidRequestError(`Updates are not yet supported.`)
}
let writes: PreparedWrite[]
try {
writes = await Promise.all(
tx.writes.map((write) => {
if (write.action === WriteOpAction.Create) {
return repo.prepareCreate({
did,
collection: write.collection,
record: write.value,
rkey: write.rkey,
validate,
})
} else if (write.action === WriteOpAction.Delete) {
return repo.prepareDelete({
did,
collection: write.collection,
rkey: write.rkey,
})
} else {
throw new InvalidRequestError(
`Action not supported: ${write.action}`,
)
}
}),
)
} catch (err) {
if (err instanceof InvalidRecordError) {
throw new InvalidRequestError(err.message)
}
throw err
}
await ctx.db.transaction(async (dbTxn) => {
const now = new Date().toISOString()
const repoTxn = ctx.services.repo(dbTxn)
await repoTxn.processWrites(did, writes, now)
})
},
})
}

@ -0,0 +1,55 @@
import { InvalidRequestError, AuthRequiredError } from '@atproto/xrpc-server'
import * as repo from '../../../../repo'
import { Server } from '../../../../lexicon'
import { InvalidRecordError, PreparedCreate } from '../../../../repo'
import AppContext from '../../../../context'
export default function (server: Server, ctx: AppContext) {
server.com.atproto.repo.createRecord({
auth: ctx.accessVerifierCheckTakedown,
handler: async ({ input, auth }) => {
const { did, collection, record } = input.body
const validate =
typeof input.body.validate === 'boolean' ? input.body.validate : true
const requester = auth.credentials.did
if (did !== requester) {
throw new AuthRequiredError()
}
if (validate === false) {
throw new InvalidRequestError(
'Unvalidated writes are not yet supported.',
)
}
// determine key type. if undefined, repo assigns a TID
const rkey = repo.determineRkey(collection)
const now = new Date().toISOString()
let write: PreparedCreate
try {
write = await repo.prepareCreate({
did,
collection,
record,
rkey,
validate,
})
} catch (err) {
if (err instanceof InvalidRecordError) {
throw new InvalidRequestError(err.message)
}
throw err
}
await ctx.db.transaction(async (dbTxn) => {
const repoTxn = ctx.services.repo(dbTxn)
await repoTxn.processWrites(did, [write], now)
})
return {
encoding: 'application/json',
body: { uri: write.uri.toString(), cid: write.cid.toString() },
}
},
})
}

@ -0,0 +1,23 @@
import { AuthRequiredError } from '@atproto/xrpc-server'
import * as repo from '../../../../repo'
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
export default function (server: Server, ctx: AppContext) {
server.com.atproto.repo.deleteRecord({
auth: ctx.accessVerifierCheckTakedown,
handler: async ({ input, auth }) => {
const { did, collection, rkey } = input.body
const requester = auth.credentials.did
if (did !== requester) {
throw new AuthRequiredError()
}
const now = new Date().toISOString()
const write = await repo.prepareDelete({ did, collection, rkey })
await ctx.db.transaction(async (dbTxn) => {
await ctx.services.repo(dbTxn).processWrites(did, [write], now)
})
},
})
}

@ -0,0 +1,40 @@
import { InvalidRequestError } from '@atproto/xrpc-server'
import * as didResolver from '@atproto/did-resolver'
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
export default function (server: Server, ctx: AppContext) {
server.com.atproto.repo.describe(async ({ params }) => {
const { user } = params
const userObj = await ctx.services.account(ctx.db).getUser(user)
if (userObj === null) {
throw new InvalidRequestError(`Could not find user: ${user}`)
}
let didDoc
try {
didDoc = await ctx.didResolver.ensureResolveDid(userObj.did)
} catch (err) {
throw new InvalidRequestError(`Could not resolve DID: ${err}`)
}
const handle = didResolver.getHandle(didDoc)
const handleIsCorrect = handle === userObj.handle
const collections = await ctx.services
.record(ctx.db)
.listCollectionsForDid(userObj.did)
return {
encoding: 'application/json',
body: {
handle: userObj.handle,
did: userObj.did,
didDoc,
collections,
handleIsCorrect,
},
}
})
}

@ -0,0 +1,26 @@
import { InvalidRequestError } from '@atproto/xrpc-server'
import { AtUri } from '@atproto/uri'
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
export default function (server: Server, ctx: AppContext) {
server.com.atproto.repo.getRecord(async ({ params }) => {
const { user, collection, rkey, cid } = params
const did = await ctx.services.account(ctx.db).getDidForActor(user)
if (!did) {
throw new InvalidRequestError(`Could not find user: ${user}`)
}
const uri = new AtUri(`${did}/${collection}/${rkey}`)
const record = await ctx.services.record(ctx.db).getRecord(uri, cid || null)
if (!record) {
throw new InvalidRequestError(`Could not locate record: ${uri}`)
}
return {
encoding: 'application/json',
body: record,
}
})
}

@ -0,0 +1,19 @@
import AppContext from '../../../../context'
import { Server } from '../../../../lexicon'
import batchWrite from './batchWrite'
import createRecord from './createRecord'
import deleteRecord from './deleteRecord'
import describe from './describe'
import getRecord from './getRecord'
import listRecords from './listRecords'
import putRecord from './putRecord'
export default function (server: Server, ctx: AppContext) {
batchWrite(server, ctx)
createRecord(server, ctx)
deleteRecord(server, ctx)
describe(server, ctx)
getRecord(server, ctx)
listRecords(server, ctx)
putRecord(server, ctx)
}

@ -0,0 +1,38 @@
import { InvalidRequestError } from '@atproto/xrpc-server'
import { AtUri } from '@atproto/uri'
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
export default function (server: Server, ctx: AppContext) {
server.com.atproto.repo.listRecords(async ({ params }) => {
const { user, collection, limit, before, after, reverse } = params
const did = await ctx.services.account(ctx.db).getDidForActor(user)
if (!did) {
throw new InvalidRequestError(`Could not find user: ${user}`)
}
const records = await ctx.services
.record(ctx.db)
.listRecordsForCollection(
did,
collection,
limit || 50,
reverse || false,
before,
after,
)
const lastRecord = records.at(-1)
const lastUri = lastRecord && new AtUri(lastRecord?.uri)
return {
encoding: 'application/json',
body: {
records,
// Paginate with `before` by default, paginate with `after` when using `reverse`.
cursor: lastUri?.rkey,
},
}
})
}

@ -0,0 +1,9 @@
import { InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
export default function (server: Server, _ctx: AppContext) {
server.com.atproto.repo.putRecord(async () => {
throw new InvalidRequestError(`Updates are not yet supported.`)
})
}

@ -0,0 +1,26 @@
import { CID } from 'multiformats/cid'
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import { InvalidRequestError } from '@atproto/xrpc-server'
export default function (server: Server, ctx: AppContext) {
server.com.atproto.sync.getBlob(async ({ params, res }) => {
const found = await ctx.db.db
.selectFrom('blob')
.selectAll()
.where('cid', '=', params.cid)
.where('creator', '=', params.did)
.executeTakeFirst()
if (!found) {
throw new InvalidRequestError(`blob not found: ${params.cid}`)
}
const cid = CID.parse(params.cid)
const blobStream = await ctx.blobstore.getStream(cid)
res.setHeader('Content-Length', found.size)
// @TODO better codegen for */* mimetype
return {
encoding: 'application/octet-stream' as any,
body: blobStream,
}
})
}

@ -1,5 +1,6 @@
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import getBlob from './getBlob'
import getBlocks from './getBlocks'
import getCheckout from './getCheckout'
import getCommitPath from './getCommitPath'
@ -9,6 +10,7 @@ import getRepo from './getRepo'
import subscribe from './subscribeAllRepos'
export default function (server: Server, ctx: AppContext) {
getBlob(server, ctx)
getBlocks(server, ctx)
getCheckout(server, ctx)
getCommitPath(server, ctx)

@ -0,0 +1,96 @@
import { Kysely } from 'kysely'
export async function up(db: Kysely<any>): Promise<void> {
await db.schema
.createTable('blob_new')
.addColumn('creator', 'varchar', (col) => col.notNull())
.addColumn('cid', 'varchar', (col) => col.notNull())
.addColumn('mimeType', 'varchar', (col) => col.notNull())
.addColumn('size', 'integer', (col) => col.notNull())
.addColumn('tempKey', 'varchar')
.addColumn('width', 'integer')
.addColumn('height', 'integer')
.addColumn('createdAt', 'varchar', (col) => col.notNull())
.addPrimaryKeyConstraint('blob_creator_pkey', ['creator', 'cid'])
.execute()
await db
.insertInto('blob_new')
.columns([
'creator',
'cid',
'mimeType',
'size',
'tempKey',
'width',
'height',
'createdAt',
])
.expression((exp) =>
exp
.selectFrom('blob')
.innerJoin('repo_blob', 'repo_blob.cid', 'blob.cid')
.select([
'repo_blob.did',
'blob.cid',
'blob.mimeType',
'blob.size',
'blob.tempKey',
'blob.width',
'blob.height',
'blob.createdAt',
])
// kinda silly, but we need a WHERE clause so that the ON CONFLICT parses correctly
.where('repo_blob.did', 'is not', null),
)
.onConflict((oc) => oc.doNothing())
.execute()
await db.schema.dropTable('blob').execute()
await db.schema.alterTable('blob_new').renameTo('blob').execute()
}
export async function down(db: Kysely<any>): Promise<void> {
await db.schema
.createTable('blob_new')
.addColumn('cid', 'varchar', (col) => col.primaryKey())
.addColumn('mimeType', 'varchar', (col) => col.notNull())
.addColumn('size', 'integer', (col) => col.notNull())
.addColumn('tempKey', 'varchar')
.addColumn('width', 'integer')
.addColumn('height', 'integer')
.addColumn('createdAt', 'varchar', (col) => col.notNull())
.execute()
await db
.insertInto('blob_new')
.columns([
'cid',
'mimeType',
'size',
'tempKey',
'width',
'height',
'createdAt',
])
.expression((exp) =>
exp
.selectFrom('blob')
.select([
'blob.cid',
'blob.mimeType',
'blob.size',
'blob.tempKey',
'blob.width',
'blob.height',
'blob.createdAt',
])
// kinda silly, but we need a WHERE clause so that the ON CONFLICT parses correctly
.where('cid', 'is not', null),
)
.onConflict((oc) => oc.doNothing())
.execute()
await db.schema.dropTable('blob').execute()
await db.schema.alterTable('blob_new').renameTo('blob').execute()
}

@ -26,3 +26,4 @@ export * as _20230304T193548198Z from './20230304T193548198Z-pagination-indices'
export * as _20230308T234640077Z from './20230308T234640077Z-record-indexes'
export * as _20230309T012947663Z from './20230309T012947663Z-app-migration'
export * as _20230310T205728933Z from './20230310T205728933Z-subscription-init'
export * as _20230313T232322844Z from './20230313T232322844Z-blob-creator'

@ -1,4 +1,5 @@
export interface Blob {
creator: string
cid: string
mimeType: string
size: number

@ -42,6 +42,7 @@ import * as ComAtprotoSessionCreate from './types/com/atproto/session/create'
import * as ComAtprotoSessionDelete from './types/com/atproto/session/delete'
import * as ComAtprotoSessionGet from './types/com/atproto/session/get'
import * as ComAtprotoSessionRefresh from './types/com/atproto/session/refresh'
import * as ComAtprotoSyncGetBlob from './types/com/atproto/sync/getBlob'
import * as ComAtprotoSyncGetBlocks from './types/com/atproto/sync/getBlocks'
import * as ComAtprotoSyncGetCheckout from './types/com/atproto/sync/getCheckout'
import * as ComAtprotoSyncGetCommitPath from './types/com/atproto/sync/getCommitPath'
@ -473,6 +474,13 @@ export class SyncNS {
this._server = server
}
getBlob<AV extends AuthVerifier>(
cfg: ConfigOf<AV, ComAtprotoSyncGetBlob.Handler<ExtractAuth<AV>>>,
) {
const nsid = 'com.atproto.sync.getBlob' // @ts-ignore
return this._server.xrpc.method(nsid, cfg)
}
getBlocks<AV extends AuthVerifier>(
cfg: ConfigOf<AV, ComAtprotoSyncGetBlocks.Handler<ExtractAuth<AV>>>,
) {

@ -2025,6 +2025,33 @@ export const schemaDict = {
},
},
},
ComAtprotoSyncGetBlob: {
lexicon: 1,
id: 'com.atproto.sync.getBlob',
defs: {
main: {
type: 'query',
description: 'Get a blob associated with a given repo.',
parameters: {
type: 'params',
required: ['did', 'cid'],
properties: {
did: {
type: 'string',
description: 'The DID of the repo.',
},
cid: {
type: 'string',
description: 'The CID of the blob to fetch',
},
},
},
output: {
encoding: '*/*',
},
},
},
},
ComAtprotoSyncGetBlocks: {
lexicon: 1,
id: 'com.atproto.sync.getBlocks',
@ -4171,6 +4198,7 @@ export const ids = {
ComAtprotoSessionDelete: 'com.atproto.session.delete',
ComAtprotoSessionGet: 'com.atproto.session.get',
ComAtprotoSessionRefresh: 'com.atproto.session.refresh',
ComAtprotoSyncGetBlob: 'com.atproto.sync.getBlob',
ComAtprotoSyncGetBlocks: 'com.atproto.sync.getBlocks',
ComAtprotoSyncGetCheckout: 'com.atproto.sync.getCheckout',
ComAtprotoSyncGetCommitPath: 'com.atproto.sync.getCommitPath',

@ -0,0 +1,38 @@
/**
* GENERATED CODE - DO NOT MODIFY
*/
import express from 'express'
import stream from 'stream'
import { ValidationResult } from '@atproto/lexicon'
import { lexicons } from '../../../../lexicons'
import { isObj, hasProp } from '../../../../util'
import { HandlerAuth } from '@atproto/xrpc-server'
export interface QueryParams {
/** The DID of the repo. */
did: string
/** The CID of the blob to fetch */
cid: string
}
export type InputSchema = undefined
export type HandlerInput = undefined
export interface HandlerSuccess {
encoding: '*/*'
body: Uint8Array | stream.Readable
}
export interface HandlerError {
status: number
message?: string
}
export type HandlerOutput = HandlerError | HandlerSuccess
export type Handler<HA extends HandlerAuth = never> = (ctx: {
auth: HA
params: QueryParams
input: HandlerInput
req: express.Request
res: express.Response
}) => Promise<HandlerOutput> | HandlerOutput

@ -16,6 +16,7 @@ export class RepoBlobs {
constructor(public db: Database, public blobstore: BlobStore) {}
async addUntetheredBlob(
creator: string,
userSuggestedMime: string,
blobStream: stream.Readable,
): Promise<CID> {
@ -32,6 +33,7 @@ export class RepoBlobs {
await this.db.db
.insertInto('blob')
.values({
creator,
cid: cid.toString(),
mimeType: sniffedMime || userSuggestedMime,
size,
@ -42,7 +44,7 @@ export class RepoBlobs {
})
.onConflict((oc) =>
oc
.column('cid')
.columns(['creator', 'cid'])
.doUpdateSet({ tempKey })
.where('blob.tempKey', 'is not', null),
)
@ -58,7 +60,7 @@ export class RepoBlobs {
write.action === WriteOpAction.Update
) {
for (const blob of write.blobs) {
blobPromises.push(this.verifyBlobAndMakePermanent(blob))
blobPromises.push(this.verifyBlobAndMakePermanent(did, blob))
blobPromises.push(this.associateBlob(blob, write.uri, commit, did))
}
}
@ -66,11 +68,15 @@ export class RepoBlobs {
await Promise.all(blobPromises)
}
async verifyBlobAndMakePermanent(blob: BlobRef): Promise<void> {
async verifyBlobAndMakePermanent(
creator: string,
blob: BlobRef,
): Promise<void> {
const { ref } = this.db.db.dynamic
const found = await this.db.db
.selectFrom('blob')
.selectAll()
.where('creator', '=', creator)
.where('cid', '=', blob.cid.toString())
.whereNotExists(
// Check if blob has been taken down
@ -118,11 +124,14 @@ export class RepoBlobs {
async deleteForUser(did: string): Promise<void> {
this.db.assertTransaction()
const deleted = await this.db.db
.deleteFrom('repo_blob')
.where('did', '=', did)
.returningAll()
.execute()
const [deleted] = await Promise.all([
this.db.db
.deleteFrom('blob')
.where('creator', '=', did)
.returningAll()
.execute(),
this.db.db.deleteFrom('repo_blob').where('did', '=', did).execute(),
])
const deletedCids = deleted.map((d) => d.cid)
let duplicateCids: string[] = []
if (deletedCids.length > 0) {
@ -135,7 +144,6 @@ export class RepoBlobs {
}
const toDelete = deletedCids.filter((cid) => !duplicateCids.includes(cid))
if (toDelete.length > 0) {
await this.db.db.deleteFrom('blob').where('cid', 'in', toDelete).execute()
await Promise.all(
toDelete.map((cid) => this.blobstore.delete(CID.parse(cid))),
)

@ -208,7 +208,7 @@ describe('account deletion', () => {
initialDbContents.repoBlobs.filter((row) => row.did !== carol.did),
)
expect(updatedDbContents.blobs).toEqual(
initialDbContents.blobs.filter((row) => row.cid !== second.toString()),
initialDbContents.blobs.filter((row) => row.creator !== carol.did),
)
})

@ -137,6 +137,16 @@ describe('file uploads', () => {
expect(uint8arrays.equals(smallFile, storedBytes)).toBeTruthy()
})
it('can fetch the file after being referenced', async () => {
const fetchedFile = await aliceAgent.api.com.atproto.sync.getBlob({
did: alice.did,
cid: smallCid.toString(),
})
expect(
uint8arrays.equals(smallFile, new Uint8Array(fetchedFile.data)),
).toBeTruthy()
})
it('serves the referenced blob', async () => {
const profile = await aliceAgent.api.app.bsky.actor.getProfile({
actor: 'alice.test',

@ -0,0 +1,141 @@
import { Database } from '../../src'
import { randomStr } from '@atproto/crypto'
import { cidForCbor, TID } from '@atproto/common'
import { Kysely } from 'kysely'
import { AtUri } from '@atproto/uri'
describe('blob creator migration', () => {
let db: Database
let rawDb: Kysely<any>
beforeAll(async () => {
if (process.env.DB_POSTGRES_URL) {
db = Database.postgres({
url: process.env.DB_POSTGRES_URL,
schema: 'migration_blob_creator',
})
} else {
db = Database.memory()
}
await db.migrateToOrThrow('_20230310T205728933Z')
rawDb = db.db
})
afterAll(async () => {
await db.close()
})
const dids = ['did:example:alice', 'did:example:bob', 'did:example:carol']
const getCidStr = async () => {
const cid = await cidForCbor({ test: randomStr(20, 'base32') })
return cid.toString()
}
const repoBlob = async (did: string, cid: string) => {
const uri = AtUri.make(did, 'com.atproto.collection', TID.nextStr())
return {
cid,
recordUri: uri.toString(),
commit: await getCidStr(),
did,
takedownId: null,
}
}
let blobsSnap
let repoBlobsSnap
it('creates a some blobs', async () => {
const blobs: any[] = []
const repoBlobs: any[] = []
for (let i = 0; i < 1000; i++) {
const cid = await getCidStr()
blobs.push({
cid,
mimeType: 'image/jpeg',
size: Math.floor(Math.random() * 1000000),
tempKey: null,
width: Math.floor(Math.random() * 1000),
height: Math.floor(Math.random() * 1000),
createdAt: new Date().toISOString(),
})
if (i % 2 === 0) {
repoBlobs.push(await repoBlob(dids[0], cid))
} else {
repoBlobs.push(await repoBlob(dids[1], cid))
}
if (i % 5 === 0) {
repoBlobs.push(await repoBlob(dids[2], cid))
}
}
await rawDb.insertInto('blob').values(blobs).execute()
await rawDb.insertInto('repo_blob').values(repoBlobs).execute()
blobsSnap = await rawDb
.selectFrom('blob')
.selectAll()
.orderBy('cid')
.execute()
repoBlobsSnap = await rawDb
.selectFrom('repo_blob')
.selectAll()
.orderBy('cid')
.orderBy('did')
.execute()
})
it('migrates up', async () => {
const migration = await db.migrator.migrateTo('_20230313T232322844Z')
expect(migration.error).toBeUndefined()
})
it('correctly migrated data', async () => {
const blobs = await rawDb
.selectFrom('blob')
.selectAll()
.orderBy('cid')
.orderBy('creator')
.execute()
const repoBlobs = await rawDb
.selectFrom('repo_blob')
.selectAll()
.orderBy('cid')
.orderBy('did')
.execute()
expect(blobs.length).toBe(repoBlobs.length)
expect(repoBlobs.length).toBe(repoBlobsSnap.length)
for (const blob of blobs) {
const snapped = blobsSnap.find((b) => b.cid === blob.cid)
const { creator, ...rest } = blob
expect(snapped).toEqual(rest)
const found = repoBlobsSnap.find(
(b) => b.cid === blob.cid && b.did === creator,
)
expect(found).toBeDefined()
}
})
it('migrates down', async () => {
const migration = await db.migrator.migrateTo('_20230310T205728933Z')
expect(migration.error).toBeUndefined()
const updatedBlobs = await rawDb
.selectFrom('blob')
.selectAll()
.orderBy('cid')
.execute()
const updatedRepoBlobs = await rawDb
.selectFrom('repo_blob')
.selectAll()
.orderBy('cid')
.orderBy('did')
.execute()
expect(updatedBlobs).toEqual(blobsSnap)
expect(updatedRepoBlobs).toEqual(repoBlobsSnap)
})
})

@ -27,7 +27,7 @@ describe('repo sync data migration', () => {
})
it('migrates down to pt2', async () => {
await db.migrateToOrThrow('_20230127T224743452Z')
await db.migrateToOrThrow('_20230201T200606704Z')
})
const getSnapshot = async () => {

@ -568,8 +568,8 @@ describe('moderation', () => {
it('only allows blob to have one current action.', async () => {
const img = sc.posts[sc.dids.carol][0].images[0]
const postA = await sc.post(sc.dids.alice, 'image A', undefined, [img])
const postB = await sc.post(sc.dids.alice, 'image B', undefined, [img])
const postA = await sc.post(sc.dids.carol, 'image A', undefined, [img])
const postB = await sc.post(sc.dids.carol, 'image B', undefined, [img])
const { data: acknowledge } =
await agent.api.com.atproto.admin.takeModerationAction(
{

@ -35,6 +35,7 @@ import {
validateOutput,
} from './util'
import log from './logger'
import { forwardStreamErrors } from '@atproto/common'
export function createServer(lexicons?: unknown[], options?: Options) {
return new Server(lexicons, options)
@ -222,9 +223,14 @@ export class Server {
output?.encoding === 'json'
) {
res.status(200).json(output.body)
} else if (output) {
} else if (output?.body instanceof Readable) {
res.header('Content-Type', output.encoding)
res.status(200)
forwardStreamErrors(output.body, res)
output.body.pipe(res)
} else if (output) {
res
.header('Content-Type', output.encoding)
.status(200)
.send(
output.body instanceof Uint8Array