Cleanup write prep ()

* clean up record validation

* simplify & reorg write prepares
This commit is contained in:
Daniel Holmgren 2022-12-15 12:03:12 -06:00 committed by GitHub
parent 53d01564ed
commit b52f53e9fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 181 additions and 182 deletions
packages/pds/src

@ -5,7 +5,6 @@ import * as crypto from '@atproto/crypto'
import * as handleLib from '@atproto/handle'
import * as locals from '../../../../locals'
import * as lex from '../../../../lexicon/lexicons'
import { TID } from '@atproto/common'
import * as repo from '../../../../repo'
import ServerAuth from '../../../../auth'
import { UserAlreadyExistsError } from '../../../../services/actor'
@ -99,18 +98,16 @@ export default function (server: Server) {
const userAuth = locals.getAuthstore(res, requester)
const sceneAuth = locals.getAuthstore(res, did)
const sceneWrites = await repo.prepareCreates(did, [
{
action: 'create',
const sceneWrites = await Promise.all([
repo.prepareCreate({
did,
collection: lex.ids.AppBskySystemDeclaration,
rkey: 'self',
value: declaration,
},
{
action: 'create',
record: declaration,
}),
repo.prepareCreate({
did,
collection: lex.ids.AppBskyGraphAssertion,
rkey: TID.nextStr(),
value: {
record: {
assertion: APP_BSKY_GRAPH.AssertCreator,
subject: {
did: requester,
@ -118,12 +115,11 @@ export default function (server: Server) {
},
createdAt: now,
},
},
{
action: 'create',
}),
repo.prepareCreate({
did,
collection: lex.ids.AppBskyGraphAssertion,
rkey: TID.nextStr(),
value: {
record: {
assertion: APP_BSKY_GRAPH.AssertMember,
subject: {
did: requester,
@ -131,16 +127,15 @@ export default function (server: Server) {
},
createdAt: now,
},
},
}),
])
const [sceneDeclaration, creatorAssert, memberAssert] = sceneWrites
const userWrites = await repo.prepareCreates(requester, [
{
action: 'create',
const userWrites = await Promise.all([
repo.prepareCreate({
did: requester,
collection: lex.ids.AppBskyGraphConfirmation,
rkey: TID.nextStr(),
value: {
record: {
originator: {
did: requester,
declarationCid: sceneDeclaration.cid.toString(),
@ -151,12 +146,11 @@ export default function (server: Server) {
},
createdAt: now,
},
},
{
action: 'create',
}),
repo.prepareCreate({
did: requester,
collection: lex.ids.AppBskyGraphConfirmation,
rkey: TID.nextStr(),
value: {
record: {
originator: {
did: requester,
declarationCid: sceneDeclaration.cid.toString(),
@ -167,7 +161,7 @@ export default function (server: Server) {
},
createdAt: now,
},
},
}),
])
await Promise.all([

@ -68,17 +68,22 @@ export default function (server: Server) {
)
}
const writes = await repo.prepareWrites(did, {
action: current ? 'update' : 'create',
collection: profileNsid,
rkey: 'self',
value: updated,
})
const write = current
? await repo.prepareUpdate({
did,
collection: profileNsid,
rkey: 'self',
record: updated,
})
: await repo.prepareCreate({
did,
collection: profileNsid,
record: updated,
})
const commit = await repoTxn.writeToRepo(did, authStore, writes, now)
await repoTxn.blobs.processWriteBlobs(did, commit, writes)
const commit = await repoTxn.writeToRepo(did, authStore, [write], now)
await repoTxn.blobs.processWriteBlobs(did, commit, [write])
const write = writes[0]
let profileCid: CID
if (write.action === 'update') {
profileCid = write.cid
@ -106,9 +111,9 @@ export default function (server: Server) {
profileCid = write.cid
await recordTxn.indexRecord(uri, profileCid, updated, now)
} else {
// should never hit this
const exhaustiveCheck: never = write
throw new Error(
`Unsupported action on update profile: ${write.action}`,
`Unsupported action on update profile: ${exhaustiveCheck}`,
)
}

@ -3,8 +3,6 @@ import * as lexicons from '../../../../lexicon/lexicons'
import { Server } from '../../../../lexicon'
import * as locals from '../../../../locals'
import * as repo from '../../../../repo'
import { TID } from '@atproto/common'
import { DeleteOp } from '@atproto/repo'
import ServerAuth from '../../../../auth'
export default function (server: Server) {
@ -40,25 +38,24 @@ export default function (server: Server) {
return existingVotes[0].uri
}
const writes = await repo.prepareWrites(requester, [
...existingVotes.map((vote): DeleteOp => {
const writes: repo.PreparedWrite[] = await Promise.all(
existingVotes.map((vote) => {
const uri = new AtUri(vote.uri)
return {
action: 'delete',
return repo.prepareDelete({
did: requester,
collection: uri.collection,
rkey: uri.rkey,
}
})
}),
])
)
let create: repo.PreparedCreate | undefined
if (direction !== 'none') {
create = await repo.prepareCreate(requester, {
action: 'create',
create = await repo.prepareCreate({
did: requester,
collection: lexicons.ids.AppBskyFeedVote,
rkey: TID.nextStr(),
value: {
record: {
direction,
subject,
createdAt: now,

@ -147,11 +147,10 @@ export default function (server: Server) {
.execute()
}
const write = await repo.prepareCreate(did, {
action: 'create',
const write = await repo.prepareCreate({
did,
collection: lex.ids.AppBskySystemDeclaration,
rkey: 'self',
value: declaration,
record: declaration,
})
// Setup repo root

@ -2,15 +2,13 @@ import { Server } from '../../../lexicon'
import { InvalidRequestError, AuthRequiredError } from '@atproto/xrpc-server'
import { AtUri } from '@atproto/uri'
import * as didResolver from '@atproto/did-resolver'
import { DeleteOp, RecordCreateOp } from '@atproto/repo'
import * as locals from '../../../locals'
import { TID } from '@atproto/common'
import * as repo from '../../../repo'
import ServerAuth from '../../../auth'
import {
InvalidRecordError,
PreparedCreate,
PreparedWrites,
PreparedWrite,
} from '../../../repo'
export default function (server: Server) {
@ -116,6 +114,11 @@ export default function (server: Server) {
if (!authorized) {
throw new AuthRequiredError()
}
if (validate === false) {
throw new InvalidRequestError(
'Unvalidated writes are not yet supported.',
)
}
const authStore = locals.getAuthstore(res, did)
const hasUpdate = tx.writes.some((write) => write.action === 'update')
@ -123,25 +126,30 @@ export default function (server: Server) {
throw new InvalidRequestError(`Updates are not yet supported.`)
}
let writes: PreparedWrites
let writes: PreparedWrite[]
try {
writes = await repo.prepareWrites(
did,
writes = await Promise.all(
tx.writes.map((write) => {
if (write.action === 'create') {
return {
...write,
rkey: write.rkey || TID.nextStr(),
} as RecordCreateOp
return repo.prepareCreate({
did,
collection: write.collection,
record: write.value,
rkey: write.rkey,
validate,
})
} else if (write.action === 'delete') {
return write as DeleteOp
return repo.prepareDelete({
did,
collection: write.collection,
rkey: write.rkey,
})
} else {
throw new InvalidRequestError(
`Action not supported: ${write.action}`,
)
}
}),
validate,
)
} catch (err) {
if (err instanceof InvalidRecordError) {
@ -173,6 +181,11 @@ export default function (server: Server) {
throw new AuthRequiredError()
}
const authStore = locals.getAuthstore(res, did)
if (validate === false) {
throw new InvalidRequestError(
'Unvalidated writes are not yet supported.',
)
}
// determine key type. if undefined, repo assigns a TID
const rkey = repo.determineRkey(collection)
@ -180,16 +193,13 @@ export default function (server: Server) {
const now = new Date().toISOString()
let write: PreparedCreate
try {
write = await repo.prepareCreate(
write = await repo.prepareCreate({
did,
{
action: 'create',
collection,
rkey,
value: record,
},
collection,
record,
rkey,
validate,
)
})
} catch (err) {
if (err instanceof InvalidRecordError) {
throw new InvalidRequestError(err.message)
@ -229,15 +239,10 @@ export default function (server: Server) {
const authStore = locals.getAuthstore(res, did)
const now = new Date().toISOString()
const write = await repo.prepareWrites(did, {
action: 'delete',
collection,
rkey,
})
const write = await repo.prepareDelete({ did, collection, rkey })
await db.transaction(async (dbTxn) => {
const repoTxn = services.repo(dbTxn)
await repoTxn.processWrites(did, authStore, write, now)
await services.repo(dbTxn).processWrites(did, authStore, [write], now)
})
},
})

@ -1,4 +1,3 @@
import { TID } from '@atproto/common'
import { AuthStore } from '@atproto/auth'
import { BlobStore } from '@atproto/repo'
import Database from '../../db'
@ -48,11 +47,10 @@ export default class extends Consumer<SceneVotesOnPostTableUpdates> {
// this is a "threshold vote" that makes the post trend
const sceneAuth = this.getAuthStore(scene.did)
const writes = await repo.prepareWrites(scene.did, {
action: 'create',
const write = await repo.prepareCreate({
did: scene.did,
collection: lexicons.ids.AppBskyFeedTrend,
rkey: TID.nextStr(),
value: {
record: {
subject: {
uri: scene.subject,
cid: scene.subjectCid,
@ -70,8 +68,8 @@ export default class extends Consumer<SceneVotesOnPostTableUpdates> {
const repoTxn = new RepoService(db, this.messageQueue, this.blobstore)
await Promise.all([
repoTxn.writeToRepo(scene.did, sceneAuth, writes, now),
repoTxn.indexWrites(writes, now),
repoTxn.writeToRepo(scene.did, sceneAuth, [write], now),
repoTxn.indexWrites([write], now),
setTrendPosted,
])
}),

@ -1,50 +1,48 @@
import { CID } from 'multiformats/cid'
import {
DeleteOp,
RecordCreateOp,
RecordUpdateOp,
RecordWriteOp,
} from '@atproto/repo'
import { AtUri } from '@atproto/uri'
import { cidForData, TID } from '@atproto/common'
import {
PreparedCreate,
PreparedUpdate,
PreparedDelete,
PreparedWrites,
BlobRef,
ImageConstraint,
InvalidRecordError,
PreparedWrite,
} from './types'
import * as lex from '../lexicon/lexicons'
import { LexiconDefNotFoundError } from '@atproto/lexicon'
import {
DeleteOp,
RecordCreateOp,
RecordUpdateOp,
RecordWriteOp,
} from '@atproto/repo'
// @TODO do this dynamically off of schemas
export const blobsForWrite = (
write: RecordCreateOp | RecordUpdateOp,
): BlobRef[] => {
if (write.collection === lex.ids.AppBskyActorProfile) {
export const blobsForWrite = (record: any): BlobRef[] => {
if (record.$type === lex.ids.AppBskyActorProfile) {
const doc = lex.schemaDict.AppBskyActorProfile
const refs: BlobRef[] = []
if (write.value.avatar) {
if (record.avatar) {
refs.push({
cid: CID.parse(write.value.avatar.cid),
mimeType: write.value.avatar.mimeType,
cid: CID.parse(record.avatar.cid),
mimeType: record.avatar.mimeType,
constraints: doc.defs.main.record.properties.avatar as ImageConstraint,
})
}
if (write.value.banner) {
if (record.banner) {
refs.push({
cid: CID.parse(write.value.banner.cid),
mimeType: write.value.banner.mimeType,
cid: CID.parse(record.banner.cid),
mimeType: record.banner.mimeType,
constraints: doc.defs.main.record.properties.banner as ImageConstraint,
})
}
return refs
} else if (write.collection === lex.ids.AppBskyFeedPost) {
} else if (record.$type === lex.ids.AppBskyFeedPost) {
const refs: BlobRef[] = []
const embed = write.value?.embed
const embed = record?.embed
if (embed?.$type === 'app.bsky.embed.images') {
const doc = lex.schemaDict.AppBskyEmbedImages
for (let i = 0; i < embed.images?.length || 0; i++) {
@ -56,7 +54,7 @@ export const blobsForWrite = (
})
}
} else if (
write.value?.embed?.$type === 'app.bsky.embed.external' &&
record?.embed?.$type === 'app.bsky.embed.external' &&
embed.external.thumb?.cid
) {
const doc = lex.schemaDict.AppBskyEmbedExternal
@ -119,82 +117,90 @@ export const determineRkey = (collection: string): string => {
}
}
export const prepareCreate = async (
did: string,
write: RecordCreateOp,
validate = true,
): Promise<PreparedCreate> => {
const record = setCollectionName(write.collection, write.value, validate)
export const prepareCreate = async (opts: {
did: string
collection: string
record: Record<string, unknown>
rkey?: string
validate?: boolean
}): Promise<PreparedCreate> => {
const { did, collection, validate = true } = opts
const record = setCollectionName(collection, opts.record, validate)
if (validate) {
assertValidRecord(record)
}
const op = {
...write,
value: record,
}
const rkey = opts.rkey || determineRkey(collection)
return {
action: 'create',
uri: AtUri.make(did, write.collection, write.rkey),
uri: AtUri.make(did, collection, rkey),
cid: await cidForData(record),
op,
blobs: blobsForWrite(op),
record,
blobs: blobsForWrite(record),
}
}
export const prepareCreates = async (
did: string,
writes: RecordCreateOp[],
validate = true,
): Promise<PreparedCreate[]> => {
return Promise.all(writes.map((write) => prepareCreate(did, write, validate)))
}
export const prepareUpdate = async (
did: string,
write: RecordUpdateOp,
validate = true,
): Promise<PreparedUpdate> => {
const record = setCollectionName(write.collection, write.value, validate)
export const prepareUpdate = async (opts: {
did: string
collection: string
rkey: string
record: Record<string, unknown>
validate?: boolean
}): Promise<PreparedUpdate> => {
const { did, collection, rkey, validate = true } = opts
const record = setCollectionName(collection, opts.record, validate)
if (validate) {
assertValidRecord(record)
}
return {
action: 'update',
uri: AtUri.make(did, write.collection, write.rkey),
uri: AtUri.make(did, collection, rkey),
cid: await cidForData(record),
op: {
...write,
value: record,
},
blobs: blobsForWrite(write),
record,
blobs: blobsForWrite(record),
}
}
export const prepareDelete = (did: string, write: DeleteOp): PreparedDelete => {
export const prepareDelete = (opts: {
did: string
collection: string
rkey: string
}): PreparedDelete => {
const { did, collection, rkey } = opts
return {
action: 'delete',
uri: AtUri.make(did, write.collection, write.rkey),
op: write,
uri: AtUri.make(did, collection, rkey),
}
}
export const prepareWrites = async (
did: string,
writes: RecordWriteOp | RecordWriteOp[],
validate = true,
): Promise<PreparedWrites> => {
const writesArr = Array.isArray(writes) ? writes : [writes]
return Promise.all(
writesArr.map((write) => {
if (write.action === 'create') {
return prepareCreate(did, write, validate)
} else if (write.action === 'delete') {
return prepareDelete(did, write)
} else if (write.action === 'update') {
return prepareUpdate(did, write, validate)
} else {
throw new Error(`Action not supported: ${write}`)
}
}),
)
export const createWriteToOp = (write: PreparedCreate): RecordCreateOp => ({
action: 'create',
collection: write.uri.collection,
rkey: write.uri.rkey,
value: write.record,
})
export const updateWriteToOp = (write: PreparedUpdate): RecordUpdateOp => ({
action: 'update',
collection: write.uri.collection,
rkey: write.uri.rkey,
value: write.record,
})
export const deleteWriteToOp = (write: PreparedDelete): DeleteOp => ({
action: 'delete',
collection: write.uri.collection,
rkey: write.uri.rkey,
})
export const writeToOp = (write: PreparedWrite): RecordWriteOp => {
switch (write.action) {
case 'create':
return createWriteToOp(write)
case 'update':
return updateWriteToOp(write)
case 'delete':
return deleteWriteToOp(write)
default:
throw new Error(`Unrecognized action: ${write}`)
}
}

@ -1,5 +1,4 @@
import { CID } from 'multiformats/cid'
import { DeleteOp, RecordCreateOp, RecordUpdateOp } from '@atproto/repo'
import { AtUri } from '@atproto/uri'
export type ImageConstraint = {
@ -30,7 +29,7 @@ export type PreparedCreate = {
action: 'create'
uri: AtUri
cid: CID
op: RecordCreateOp
record: Record<string, unknown>
blobs: BlobRef[]
}
@ -38,20 +37,15 @@ export type PreparedUpdate = {
action: 'update'
uri: AtUri
cid: CID
op: RecordUpdateOp
record: Record<string, unknown>
blobs: BlobRef[]
}
export type PreparedDelete = {
action: 'delete'
uri: AtUri
op: DeleteOp
}
export type PreparedWrites = (
| PreparedCreate
| PreparedUpdate
| PreparedDelete
)[]
export type PreparedWrite = PreparedCreate | PreparedUpdate | PreparedDelete
export class InvalidRecordError extends Error {}

@ -7,7 +7,7 @@ import { AtUri } from '@atproto/uri'
import { sha256Stream } from '@atproto/crypto'
import { cloneStream, sha256RawToCid, streamSize } from '@atproto/common'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { BlobRef, PreparedWrites } from '../../repo/types'
import { BlobRef, PreparedWrite } from '../../repo/types'
import Database from '../../db'
import { Blob as BlobTable } from '../../db/tables/blob'
import * as img from '../../image'
@ -50,7 +50,7 @@ export class RepoBlobs {
return cid
}
async processWriteBlobs(did: string, commit: CID, writes: PreparedWrites) {
async processWriteBlobs(did: string, commit: CID, writes: PreparedWrite[]) {
const blobPromises: Promise<void>[] = []
for (const write of writes) {
if (write.action === 'create' || write.action === 'update') {

@ -6,9 +6,10 @@ import Database from '../../db'
import { dbLogger as log } from '../../logger'
import { MessageQueue } from '../../event-stream/types'
import SqlBlockstore from '../../sql-blockstore'
import { PreparedCreate, PreparedWrites } from '../../repo/types'
import { PreparedCreate, PreparedWrite } from '../../repo/types'
import { RecordService } from '../record'
import { RepoBlobs } from './blobs'
import { createWriteToOp, writeToOp } from '../../repo'
export class RepoService {
blobs: RepoBlobs
@ -93,7 +94,7 @@ export class RepoService {
) {
this.db.assertTransaction()
const blockstore = new SqlBlockstore(this.db, did, now)
const writeOps = writes.map((write) => write.op)
const writeOps = writes.map(createWriteToOp)
const repo = await Repo.create(blockstore, did, authStore, writeOps)
await this.db.db
.insertInto('repo_root')
@ -108,7 +109,7 @@ export class RepoService {
async processWrites(
did: string,
authStore: auth.AuthStore,
writes: PreparedWrites,
writes: PreparedWrite[],
now: string,
) {
// make structural write to repo & send to indexing
@ -124,7 +125,7 @@ export class RepoService {
async writeToRepo(
did: string,
authStore: auth.AuthStore,
writes: PreparedWrites,
writes: PreparedWrite[],
now: string,
): Promise<CID> {
this.db.assertTransaction()
@ -135,7 +136,7 @@ export class RepoService {
`${did} is not a registered repo on this server`,
)
}
const writeOps = writes.map((write) => write.op)
const writeOps = writes.map(writeToOp)
const repo = await Repo.load(blockstore, currRoot)
const updated = await repo
.stageUpdate(writeOps)
@ -149,13 +150,13 @@ export class RepoService {
return updated.cid
}
async indexWrites(writes: PreparedWrites, now: string) {
async indexWrites(writes: PreparedWrite[], now: string) {
this.db.assertTransaction()
const recordTxn = new RecordService(this.db, this.messageQueue)
await Promise.all(
writes.map(async (write) => {
if (write.action === 'create') {
await recordTxn.indexRecord(write.uri, write.cid, write.op.value, now)
await recordTxn.indexRecord(write.uri, write.cid, write.record, now)
} else if (write.action === 'delete') {
await recordTxn.deleteRecord(write.uri)
}