SQL Blockstore ()

* setting up sql-blockstore

* drop table migration

* unblock sqlite tx

* correct binary types

* lol woops

* block encoding utilities

* no longer store raw record in record table

* fix dev-env

* pr feedback

* tx check
This commit is contained in:
Daniel Holmgren 2022-10-27 13:26:17 -05:00 committed by GitHub
parent 21ff052da9
commit 9879f673ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 307 additions and 178 deletions

@ -0,0 +1,35 @@
import { CID } from 'multiformats/cid'
import * as Block from 'multiformats/block'
import { sha256 as blockHasher } from 'multiformats/hashes/sha2'
import * as blockCodec from '@ipld/dag-cbor'
export const valueToIpldBlock = async (
data: unknown,
): Promise<Block.Block<unknown>> => {
return Block.encode({
value: data,
codec: blockCodec,
hasher: blockHasher,
})
}
export const cidForData = async (data: unknown): Promise<CID> => {
const block = await valueToIpldBlock(data)
return block.cid
}
export const valueToIpldBytes = (value: unknown): Uint8Array => {
return blockCodec.encode(value)
}
export const ipldBytesToValue = (bytes: Uint8Array) => {
return blockCodec.decode(bytes)
}
export const ipldBytesToRecord = (bytes: Uint8Array): object => {
const val = ipldBytesToValue(bytes)
if (typeof val !== 'object' || val === null) {
throw new Error(`Expected object, got: ${val}`)
}
return val
}

@ -1,13 +0,0 @@
import { CID } from 'multiformats/cid'
import * as Block from 'multiformats/block'
import { sha256 as blockHasher } from 'multiformats/hashes/sha2'
import * as blockCodec from '@ipld/dag-cbor'
export const cidForData = async (data: unknown): Promise<CID> => {
const block = await Block.encode({
value: data,
codec: blockCodec,
hasher: blockHasher,
})
return block.cid
}

@ -3,6 +3,6 @@ export * as util from './util'
export * from './util'
export * from './tid'
export * from './cid'
export * from './blocks'
export * from './logger'
export * from './types'

@ -58,11 +58,10 @@ export class DevEnvServer {
const db = await PDSDatabase.memory()
await db.migrateToLatestOrThrow()
const serverBlockstore = new MemoryBlockstore()
const keypair = await crypto.EcdsaKeypair.create()
this.inst = await onServerReady(
PDSServer(serverBlockstore, db, keypair, {
PDSServer(db, keypair, {
debugMode: true,
scheme: 'http',
hostname: 'localhost',

@ -55,7 +55,7 @@ export default function (server: Server) {
let postsAndRepostsQb = db.db
.selectFrom(postsQb.union(repostsQb).as('posts_and_reposts'))
.innerJoin('app_bsky_post as post', 'post.uri', 'postUri')
.innerJoin('record', 'record.uri', 'postUri')
.innerJoin('ipld_block', 'ipld_block.cid', 'post.cid')
.innerJoin('user as author', 'author.did', 'post.creator')
.leftJoin(
'app_bsky_profile as author_profile',
@ -73,8 +73,8 @@ export default function (server: Server) {
'postUri',
'postCid',
'cursor',
'record.raw as recordRaw',
'record.indexedAt as indexedAt',
'ipld_block.content as recordBytes',
'ipld_block.indexedAt as indexedAt',
'author.did as authorDid',
'author.username as authorName',
'author_profile.displayName as authorDisplayName',

@ -71,7 +71,7 @@ export default function (server: Server) {
let postsAndRepostsQb = db.db
.selectFrom(postsQb.union(repostsQb).as('posts_and_reposts'))
.innerJoin('app_bsky_post as post', 'post.uri', 'postUri')
.innerJoin('record', 'record.uri', 'postUri')
.innerJoin('ipld_block', 'ipld_block.cid', 'post.cid')
.innerJoin('user as author', 'author.did', 'post.creator')
.leftJoin(
'app_bsky_profile as author_profile',
@ -89,8 +89,8 @@ export default function (server: Server) {
'postUri',
'postCid',
'cursor',
'record.raw as recordRaw',
'record.indexedAt as indexedAt',
'ipld_block.content as recordBytes',
'ipld_block.indexedAt as indexedAt',
'author.did as authorDid',
'author.username as authorName',
'author_profile.displayName as authorDisplayName',

@ -13,7 +13,6 @@ export default function (server: Server) {
let builder = db.db
.selectFrom('app_bsky_like as like')
.where('like.subject', '=', uri)
.innerJoin('record', 'like.uri', 'record.uri')
.innerJoin('user', 'like.creator', 'user.did')
.leftJoin('app_bsky_profile as profile', 'profile.creator', 'user.did')
.select([
@ -21,7 +20,7 @@ export default function (server: Server) {
'user.username as name',
'profile.displayName as displayName',
'like.createdAt as createdAt',
'record.indexedAt as indexedAt',
'like.indexedAt as indexedAt',
])
if (cid) {

@ -1,5 +1,6 @@
import { Server } from '../../../lexicon'
import { AuthRequiredError, InvalidRequestError } from '@atproto/xrpc-server'
import * as common from '@atproto/common'
import { Server } from '../../../lexicon'
import * as GetNotifications from '../../../lexicon/types/app/bsky/getNotifications'
import * as locals from '../../../locals'
import { paginate } from '../../../db/util'
@ -19,7 +20,7 @@ export default function (server: Server) {
let notifBuilder = db.db
.selectFrom('user_notification as notif')
.where('notif.userDid', '=', requester)
.innerJoin('record', 'record.uri', 'notif.recordUri')
.innerJoin('ipld_block', 'ipld_block.cid', 'notif.recordCid')
.innerJoin('user as author', 'author.did', 'notif.author')
.leftJoin(
'app_bsky_profile as author_profile',
@ -35,8 +36,8 @@ export default function (server: Server) {
'notif.reason as reason',
'notif.reasonSubject as reasonSubject',
'notif.indexedAt as createdAt',
'record.raw as record',
'record.indexedAt as indexedAt',
'ipld_block.content as recordBytes',
'ipld_block.indexedAt as indexedAt',
'notif.recordUri as uri',
])
@ -69,7 +70,7 @@ export default function (server: Server) {
},
reason: notif.reason,
reasonSubject: notif.reasonSubject || undefined,
record: JSON.parse(notif.record),
record: common.ipldBytesToRecord(notif.recordBytes),
isRead: notif.createdAt <= user.lastSeenNotifs,
indexedAt: notif.indexedAt,
}))

@ -1,5 +1,6 @@
import { Kysely } from 'kysely'
import { AuthRequiredError, InvalidRequestError } from '@atproto/xrpc-server'
import * as common from '@atproto/common'
import { Server } from '../../../lexicon'
import * as GetPostThread from '../../../lexicon/types/app/bsky/getPostThread'
import * as locals from '../../../locals'
@ -72,7 +73,7 @@ const postInfoBuilder = (db: Kysely<DatabaseSchema>, requester: string) => {
const { ref } = db.dynamic
return db
.selectFrom('app_bsky_post as post')
.innerJoin('record', 'record.uri', 'post.uri')
.innerJoin('ipld_block', 'ipld_block.cid', 'post.cid')
.innerJoin('user as author', 'author.did', 'post.creator')
.leftJoin(
'app_bsky_profile as author_profile',
@ -86,8 +87,8 @@ const postInfoBuilder = (db: Kysely<DatabaseSchema>, requester: string) => {
'author.did as authorDid',
'author.username as authorName',
'author_profile.displayName as authorDisplayName',
'record.raw as rawRecord',
'record.indexedAt as indexedAt',
'ipld_block.content as recordBytes',
'ipld_block.indexedAt as indexedAt',
db
.selectFrom('app_bsky_like')
.select(countAll.as('count'))
@ -132,7 +133,7 @@ const rowToPost = (
name: row.authorName,
displayName: row.authorDisplayName || undefined,
},
record: JSON.parse(row.rawRecord),
record: common.ipldBytesToRecord(row.recordBytes),
parent: parent ? { ...parent } : undefined,
replyCount: row.replyCount || 0,
likeCount: row.likeCount || 0,

@ -13,7 +13,6 @@ export default function (server: Server) {
let builder = db.db
.selectFrom('app_bsky_repost as repost')
.where('repost.subject', '=', uri)
.innerJoin('record', 'repost.uri', 'record.uri')
.innerJoin('user', 'repost.creator', 'user.did')
.leftJoin('app_bsky_profile as profile', 'profile.creator', 'user.did')
.select([
@ -21,7 +20,7 @@ export default function (server: Server) {
'user.username as name',
'profile.displayName as displayName',
'repost.createdAt as createdAt',
'record.indexedAt as indexedAt',
'repost.indexedAt as indexedAt',
])
if (cid) {

@ -19,19 +19,18 @@ export default function (server: Server) {
let followersReq = db.db
.selectFrom('app_bsky_follow as follow')
.where('follow.subject', '=', subject.did)
.innerJoin('record', 'record.uri', 'follow.uri')
.innerJoin('user as creator', 'creator.did', 'record.did')
.innerJoin('user as creator', 'creator.did', 'follow.creator')
.leftJoin(
'app_bsky_profile as profile',
'profile.creator',
'record.did',
'follow.creator',
)
.select([
'creator.did as did',
'creator.username as name',
'profile.displayName as displayName',
'follow.createdAt as createdAt',
'record.indexedAt as indexedAt',
'follow.indexedAt as indexedAt',
])
followersReq = paginate(followersReq, {

@ -19,7 +19,6 @@ export default function (server: Server) {
let followsReq = db.db
.selectFrom('app_bsky_follow as follow')
.where('follow.creator', '=', creator.did)
.innerJoin('record', 'record.uri', 'follow.uri')
.innerJoin('user as subject', 'subject.did', 'follow.subject')
.leftJoin(
'app_bsky_profile as profile',
@ -31,7 +30,7 @@ export default function (server: Server) {
'subject.username as name',
'profile.displayName as displayName',
'follow.createdAt as createdAt',
'record.indexedAt as indexedAt',
'follow.indexedAt as indexedAt',
])
followsReq = paginate(followsReq, {

@ -4,6 +4,7 @@ import * as locals from '../../../locals'
import * as schema from '../../../lexicon/schemas'
import { AtUri } from '@atproto/uri'
import { RepoStructure } from '@atproto/repo'
import SqlBlockstore from '../../../sql-blockstore'
import { CID } from 'multiformats/cid'
import * as Profile from '../../../lexicon/types/app/bsky/profile'
@ -11,7 +12,7 @@ const profileNsid = schema.ids.AppBskyProfile
export default function (server: Server) {
server.app.bsky.updateProfile(async (_params, input, req, res) => {
const { auth, db, blockstore, logger } = locals.get(res)
const { auth, db, logger } = locals.get(res)
const requester = auth.getUserDid(req)
if (!requester) {
@ -21,13 +22,15 @@ export default function (server: Server) {
const uri = new AtUri(`${requester}/${profileNsid}/self`)
const { profileCid, updated } = await db.transaction(
async (txnDb): Promise<{ profileCid: CID; updated: Profile.Record }> => {
const currRoot = await txnDb.getRepoRoot(requester, true)
async (dbTxn): Promise<{ profileCid: CID; updated: Profile.Record }> => {
const currRoot = await dbTxn.getRepoRoot(requester, true)
if (!currRoot) {
throw new InvalidRequestError(
`${requester} is not a registered repo on this server`,
)
}
const now = new Date().toISOString()
const blockstore = new SqlBlockstore(dbTxn, requester, now)
const repo = await RepoStructure.load(blockstore, currRoot)
const current = await repo.getRecord(profileNsid, 'self')
if (!db.records.profile.matchesSchema(current)) {
@ -47,7 +50,7 @@ export default function (server: Server) {
)
}
const currBadges = await txnDb.db
const currBadges = await dbTxn.db
.selectFrom('app_bsky_profile_badge')
.selectAll()
.where('profileUri', '=', uri.toString())
@ -72,31 +75,27 @@ export default function (server: Server) {
const profileCid = await repo.blockstore.put(updated)
// Update profile record
await txnDb.db
await dbTxn.db
.updateTable('record')
.set({
raw: JSON.stringify(updated),
cid: profileCid.toString(),
indexedAt: new Date().toISOString(),
})
.set({ cid: profileCid.toString() })
.where('uri', '=', uri.toString())
.execute()
// Update profile app index
await txnDb.db
await dbTxn.db
.updateTable('app_bsky_profile')
.set({
cid: profileCid.toString(),
displayName: updated.displayName,
description: updated.description,
indexedAt: new Date().toISOString(),
indexedAt: now,
})
.where('uri', '=', uri.toString())
.execute()
// Remove old badges
if (toDelete.length > 0) {
await txnDb.db
await dbTxn.db
.deleteFrom('app_bsky_profile_badge')
.where('profileUri', '=', uri.toString())
.where('badgeUri', 'in', toDelete)
@ -105,7 +104,7 @@ export default function (server: Server) {
// Add new badges
if (toAdd.length > 0) {
await txnDb.db
await dbTxn.db
.insertInto('app_bsky_profile_badge')
.values(toAdd)
.execute()
@ -119,7 +118,7 @@ export default function (server: Server) {
cid: profileCid,
})
.createCommit(authStore, async (prev, curr) => {
const success = await txnDb.updateRepoRoot(requester, curr, prev)
const success = await dbTxn.updateRepoRoot(requester, curr, prev)
if (!success) {
logger.error({ did: requester, curr, prev }, 'repo update failed')
throw new Error('Could not update repo root')

@ -1,3 +1,4 @@
import * as common from '@atproto/common'
import * as GetAuthorFeed from '../../../../lexicon/types/app/bsky/getAuthorFeed'
import * as GetHomeFeed from '../../../../lexicon/types/app/bsky/getHomeFeed'
@ -19,7 +20,7 @@ export const rowToFeedItem = (row: FeedRow): FeedItem => ({
displayName: row.originatorDisplayName ?? undefined,
}
: undefined,
record: JSON.parse(row.recordRaw),
record: common.ipldBytesToRecord(row.recordBytes),
replyCount: row.replyCount,
repostCount: row.repostCount,
likeCount: row.likeCount,
@ -42,7 +43,7 @@ type FeedRow = {
postUri: string
postCid: string
cursor: string
recordRaw: string
recordBytes: Uint8Array
indexedAt: string
authorDid: string
authorName: string

@ -8,6 +8,7 @@ import { Server } from '../../../lexicon'
import * as locals from '../../../locals'
import { countAll } from '../../../db/util'
import { UserAlreadyExistsError } from '../../../db'
import SqlBlockstore from '../../../sql-blockstore'
export default function (server: Server) {
server.com.atproto.getAccountsConfig((_params, _input, _req, res) => {
@ -34,7 +35,7 @@ export default function (server: Server) {
server.com.atproto.createAccount(async (_params, input, _req, res) => {
const { email, username, password, inviteCode, recoveryKey } = input.body
const { db, blockstore, auth, config, keypair, logger } = locals.get(res)
const { db, auth, config, keypair, logger } = locals.get(res)
// In order to perform the significant db updates ahead of
// registering the did, we will use a temp invalid did. Once everything
@ -157,6 +158,7 @@ export default function (server: Server) {
// Setup repo root
const authStore = locals.getAuthstore(res, did)
const blockstore = new SqlBlockstore(dbTxn, did, now)
const repo = await Repo.create(blockstore, did, authStore)
await dbTxn.db

@ -6,6 +6,7 @@ import * as locals from '../../../locals'
import * as schemas from '../../../lexicon/schemas'
import { TID } from '@atproto/common'
import { CidWriteOp, RepoStructure } from '@atproto/repo'
import SqlBlockstore from '../../../sql-blockstore'
export default function (server: Server) {
server.com.atproto.repoDescribe(async (params, _in, _req, res) => {
@ -95,7 +96,7 @@ export default function (server: Server) {
server.com.atproto.repoBatchWrite(async (params, input, req, res) => {
const { did, validate } = params
const { auth, db, blockstore, logger } = locals.get(res)
const { auth, db, logger } = locals.get(res)
if (!auth.verifyUser(req, did)) {
throw new AuthRequiredError()
}
@ -124,13 +125,15 @@ export default function (server: Server) {
`${did} is not a registered repo on this server`,
)
}
const now = new Date().toISOString()
const blockstore = new SqlBlockstore(dbTxn, did, now)
const cidWriteOps: CidWriteOp[] = await Promise.all(
tx.writes.map(async (write) => {
if (write.action === 'create') {
const cid = await blockstore.put(write.value)
const rkey = write.rkey || TID.nextStr()
const uri = new AtUri(`${did}/${write.collection}/${rkey}`)
await dbTxn.indexRecord(uri, cid, write.value)
await dbTxn.indexRecord(uri, cid, write.value, now)
return {
action: 'create',
collection: write.collection,
@ -153,7 +156,7 @@ export default function (server: Server) {
await repo
.stageUpdate(cidWriteOps)
.createCommit(authStore, async (prev, curr) => {
const success = await db.updateRepoRoot(did, curr, prev)
const success = await db.updateRepoRoot(did, curr, prev, now)
if (!success) {
logger.error({ did, curr, prev }, 'repo update failed')
throw new Error('Could not update repo root')
@ -170,7 +173,7 @@ export default function (server: Server) {
server.com.atproto.repoCreateRecord(async (params, input, req, res) => {
const { did, collection, validate } = params
const { auth, db, blockstore, logger } = locals.get(res)
const { auth, db, logger } = locals.get(res)
if (!auth.verifyUser(req, did)) {
throw new AuthRequiredError()
}
@ -194,18 +197,20 @@ export default function (server: Server) {
rkey = TID.nextStr()
}
const cid = await blockstore.put(input.body)
const uri = new AtUri(`${did}/${collection}/${rkey}`)
await db.transaction(async (txn) => {
const currRoot = await txn.getRepoRoot(did, true)
const { cid } = await db.transaction(async (dbTxn) => {
const currRoot = await dbTxn.getRepoRoot(did, true)
if (!currRoot) {
throw new InvalidRequestError(
`${did} is not a registered repo on this server`,
)
}
const now = new Date().toISOString()
const blockstore = new SqlBlockstore(dbTxn, did, now)
const cid = await blockstore.put(input.body)
try {
await txn.indexRecord(uri, cid, input.body)
await dbTxn.indexRecord(uri, cid, input.body, now)
} catch (err) {
logger.warn(
{ uri: uri.toString(), err, validate },
@ -225,13 +230,14 @@ export default function (server: Server) {
cid,
})
.createCommit(authStore, async (prev, curr) => {
const success = await txn.updateRepoRoot(did, curr, prev)
const success = await dbTxn.updateRepoRoot(did, curr, prev, now)
if (!success) {
logger.error({ did, curr, prev }, 'repo update failed')
throw new Error('Could not update repo root')
}
return null
})
return { cid }
})
return {
@ -246,22 +252,24 @@ export default function (server: Server) {
server.com.atproto.repoDeleteRecord(async (params, _input, req, res) => {
const { did, collection, rkey } = params
const { auth, db, blockstore, logger } = locals.get(res)
const { auth, db, logger } = locals.get(res)
if (!auth.verifyUser(req, did)) {
throw new AuthRequiredError()
}
const authStore = locals.getAuthstore(res, did)
const uri = new AtUri(`${did}/${collection}/${rkey}`)
await db.transaction(async (txn) => {
const currRoot = await txn.getRepoRoot(did, true)
await db.transaction(async (dbTxn) => {
const currRoot = await dbTxn.getRepoRoot(did, true)
if (!currRoot) {
throw new InvalidRequestError(
`${did} is not a registered repo on this server`,
)
}
await txn.deleteRecord(uri)
const now = new Date().toISOString()
await dbTxn.deleteRecord(uri)
const blockstore = new SqlBlockstore(dbTxn, did, now)
const repo = await RepoStructure.load(blockstore, currRoot)
await repo
.stageUpdate({
@ -270,7 +278,7 @@ export default function (server: Server) {
rkey,
})
.createCommit(authStore, async (prev, curr) => {
const success = await txn.updateRepoRoot(did, curr, prev)
const success = await dbTxn.updateRepoRoot(did, curr, prev, now)
if (!success) {
logger.error({ did, curr, prev }, 'repo update failed')
throw new Error('Could not update repo root')

@ -2,6 +2,8 @@ import { Server } from '../../../lexicon'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { def as common } from '@atproto/common'
import * as locals from '../../../locals'
import { RepoStructure } from '@atproto/repo'
import SqlBlockstore from '../../../sql-blockstore'
export default function (server: Server) {
server.com.atproto.syncGetRoot(async (params, _in, _req, res) => {
@ -19,11 +21,14 @@ export default function (server: Server) {
server.com.atproto.syncGetRepo(async (params, _in, _req, res) => {
const { did, from = null } = params
const fromCid = from ? common.strToCid.parse(from) : null
const repo = await locals.loadRepo(res, did)
if (repo === null) {
const { db } = locals.get(res)
const repoRoot = await db.getRepoRoot(did)
if (repoRoot === null) {
throw new InvalidRequestError(`Could not find repo for DID: ${did}`)
}
const blockstore = new SqlBlockstore(db, did)
const repo = await RepoStructure.load(blockstore, repoRoot)
const fromCid = from ? common.strToCid.parse(from) : null
const diff = await repo.getDiffCar(fromCid)
return {
encoding: 'application/cbor',

@ -1,9 +1,4 @@
import dotenv from 'dotenv'
import {
IpldStore,
MemoryBlockstore,
PersistentBlockstore,
} from '@atproto/repo'
import * as crypto from '@atproto/crypto'
import Database from './db'
import server from './index'
@ -17,18 +12,11 @@ const run = async () => {
dotenv.config()
}
let blockstore: IpldStore
let db: Database
const keypair = await crypto.EcdsaKeypair.create()
const cfg = ServerConfig.readEnv({ recoveryKey: keypair.did() })
if (cfg.blockstoreLocation) {
blockstore = new PersistentBlockstore(cfg.blockstoreLocation)
} else {
blockstore = new MemoryBlockstore()
}
if (cfg.dbPostgresUrl) {
db = Database.postgres({
url: cfg.dbPostgresUrl,
@ -42,7 +30,7 @@ const run = async () => {
await db.migrateToLatestOrThrow()
const { listener } = server(blockstore, db, keypair, cfg)
const { listener } = server(db, keypair, cfg)
listener.on('listening', () => {
console.log(`🌞 ATP Data server is running at ${cfg.origin}`)
})

@ -1,6 +1,8 @@
import * as user from './tables/user'
import * as repoRoot from './tables/repo-root'
import * as record from './tables/record'
import * as ipldBlock from './tables/ipld-block'
import * as ipldBlockCreator from './tables/ipld-block-creator'
import * as invite from './tables/invite'
import * as notification from './tables/user-notification'
import * as post from './records/post'
@ -15,6 +17,8 @@ import * as badgeOffer from './records/badgeOffer'
export type DatabaseSchema = user.PartialDB &
repoRoot.PartialDB &
record.PartialDB &
ipldBlock.PartialDB &
ipldBlockCreator.PartialDB &
invite.PartialDB &
notification.PartialDB &
post.PartialDB &

@ -22,6 +22,7 @@ import badgeOfferPlugin, { AppBskyBadgeOffer } from './records/badgeOffer'
import profilePlugin, { AppBskyProfile } from './records/profile'
import notificationPlugin from './tables/user-notification'
import { AtUri } from '@atproto/uri'
import * as common from '@atproto/common'
import { CID } from 'multiformats/cid'
import { dbLogger as log } from '../logger'
import { DatabaseSchema } from './database-schema'
@ -157,16 +158,22 @@ export class Database {
return found ? CID.parse(found.root) : null
}
async updateRepoRoot(did: string, root: CID, prev?: CID): Promise<boolean> {
async updateRepoRoot(
did: string,
root: CID,
prev: CID,
timestamp?: string,
): Promise<boolean> {
log.debug({ did, root: root.toString() }, 'updating repo root')
let builder = this.db
const res = await this.db
.updateTable('repo_root')
.set({ root: root.toString() })
.set({
root: root.toString(),
indexedAt: timestamp || new Date().toISOString(),
})
.where('did', '=', did)
if (prev) {
builder = builder.where('root', '=', prev.toString())
}
const res = await builder.executeTakeFirst()
.where('root', '=', prev.toString())
.executeTakeFirst()
if (res.numUpdatedRows > 0) {
log.info({ did, root: root.toString() }, 'updated repo root')
return true
@ -296,7 +303,7 @@ export class Database {
return table.validateSchema(obj).valid
}
async indexRecord(uri: AtUri, cid: CID, obj: unknown) {
async indexRecord(uri: AtUri, cid: CID, obj: unknown, timestamp?: string) {
this.assertTransaction()
log.debug({ uri }, 'indexing record')
const record = {
@ -305,9 +312,6 @@ export class Database {
did: uri.host,
collection: uri.collection,
rkey: uri.rkey,
raw: JSON.stringify(obj),
indexedAt: new Date().toISOString(),
receivedAt: new Date().toISOString(),
}
if (!record.did.startsWith('did:')) {
throw new Error('Expected indexed URI to contain DID')
@ -318,7 +322,7 @@ export class Database {
}
await this.db.insertInto('record').values(record).execute()
const table = this.findTableForCollection(uri.collection)
await table.insert(uri, cid, obj)
await table.insert(uri, cid, obj, timestamp)
const notifs = table.notifsForRecord(uri, cid, obj)
await this.notifications.process(notifs)
log.info({ uri }, 'indexed record')
@ -361,24 +365,25 @@ export class Database {
): Promise<{ uri: string; cid: string; value: object }[]> {
let builder = this.db
.selectFrom('record')
.selectAll()
.where('did', '=', did)
.where('collection', '=', collection)
.orderBy('rkey', reverse ? 'asc' : 'desc')
.innerJoin('ipld_block', 'ipld_block.cid', 'record.cid')
.where('record.did', '=', did)
.where('record.collection', '=', collection)
.orderBy('record.rkey', reverse ? 'asc' : 'desc')
.limit(limit)
.selectAll()
if (before !== undefined) {
builder = builder.where('rkey', '<', before)
builder = builder.where('record.rkey', '<', before)
}
if (after !== undefined) {
builder = builder.where('rkey', '>', after)
builder = builder.where('record.rkey', '>', after)
}
const res = await builder.execute()
return res.map((row) => {
return {
uri: row.uri,
cid: row.cid,
value: JSON.parse(row.raw),
value: common.ipldBytesToRecord(row.content),
}
})
}
@ -389,17 +394,18 @@ export class Database {
): Promise<{ uri: string; cid: string; value: object } | null> {
let builder = this.db
.selectFrom('record')
.innerJoin('ipld_block', 'ipld_block.cid', 'record.cid')
.selectAll()
.where('uri', '=', uri.toString())
.where('record.uri', '=', uri.toString())
if (cid) {
builder = builder.where('cid', '=', cid)
builder = builder.where('record.cid', '=', cid)
}
const record = await builder.executeTakeFirst()
if (!record) return null
return {
uri: record.uri,
cid: record.cid,
value: JSON.parse(record.raw),
value: common.ipldBytesToRecord(record.content),
}
}

@ -4,6 +4,8 @@ import { Dialect } from '..'
const userTable = 'user'
const repoRootTable = 'repo_root'
const recordTable = 'record'
const ipldBlockTable = 'ipld_block'
const ipldBlockCreatorTable = 'ipld_block_creator'
const inviteTable = 'invite_code'
const inviteUseTable = 'invite_code_use'
const notificationTable = 'user_notification'
@ -32,6 +34,10 @@ export async function up(db: Kysely<unknown>, dialect: Dialect): Promise<void> {
if (!err?.detail?.includes?.('(pg_trgm) already exists')) throw err
}
}
// Postgres uses the type `bytea` for variable length bytes
const binaryDatatype = dialect === 'sqlite' ? 'blob' : sql`bytea`
// Users
await db.schema
.createTable(userTable)
@ -77,10 +83,22 @@ export async function up(db: Kysely<unknown>, dialect: Dialect): Promise<void> {
.addColumn('did', 'varchar', (col) => col.notNull())
.addColumn('collection', 'varchar', (col) => col.notNull())
.addColumn('rkey', 'varchar', (col) => col.notNull())
.addColumn('raw', 'text', (col) => col.notNull())
.addColumn('receivedAt', 'varchar', (col) => col.notNull())
.execute()
// Ipld Blocks
await db.schema
.createTable(ipldBlockTable)
.addColumn('cid', 'varchar', (col) => col.primaryKey())
.addColumn('size', 'integer', (col) => col.notNull())
.addColumn('content', binaryDatatype, (col) => col.notNull())
.addColumn('indexedAt', 'varchar', (col) => col.notNull())
.execute()
// Ipld Block Creators
await db.schema
.createTable(ipldBlockCreatorTable)
.addColumn('cid', 'varchar', (col) => col.notNull())
.addColumn('did', 'varchar', (col) => col.notNull())
.addPrimaryKeyConstraint(`${ipldBlockCreatorTable}_pkey`, ['cid', 'did'])
.execute()
// Invites
await db.schema
.createTable(inviteTable)
@ -241,6 +259,8 @@ export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema.dropTable(notificationTable).execute()
await db.schema.dropTable(inviteUseTable).execute()
await db.schema.dropTable(inviteTable).execute()
await db.schema.dropTable(ipldBlockCreatorTable).execute()
await db.schema.dropTable(ipldBlockTable).execute()
await db.schema.dropTable(recordTable).execute()
await db.schema.dropTable(repoRootTable).execute()
await db.schema.dropTable(userTable).execute()

@ -49,7 +49,12 @@ const getFn =
const insertFn =
(db: Kysely<PartialDB>) =>
async (uri: AtUri, cid: CID, obj: unknown): Promise<void> => {
async (
uri: AtUri,
cid: CID,
obj: unknown,
timestamp?: string,
): Promise<void> => {
if (!matchesSchema(obj)) {
throw new Error(`Record does not match schema: ${type}`)
}
@ -60,7 +65,7 @@ const insertFn =
assertionType: obj.assertion.type,
assertionTag: (obj.assertion as Badge.TagAssertion).tag || null,
createdAt: obj.createdAt,
indexedAt: new Date().toISOString(),
indexedAt: timestamp || new Date().toISOString(),
}
await db.insertInto('app_bsky_badge').values(val).execute()
}

@ -55,7 +55,12 @@ const getFn =
const insertFn =
(db: Kysely<PartialDB>) =>
async (uri: AtUri, cid: CID, obj: unknown): Promise<void> => {
async (
uri: AtUri,
cid: CID,
obj: unknown,
timestamp?: string,
): Promise<void> => {
if (!matchesSchema(obj)) {
throw new Error(`Record does not match schema: ${type}`)
}
@ -70,7 +75,7 @@ const insertFn =
offerUri: obj.offer.uri,
offerCid: obj.offer.cid,
createdAt: obj.createdAt,
indexedAt: new Date().toISOString(),
indexedAt: timestamp || new Date().toISOString(),
})
.execute()
}

@ -51,7 +51,12 @@ const getFn =
const insertFn =
(db: Kysely<PartialDB>) =>
async (uri: AtUri, cid: CID, obj: unknown): Promise<void> => {
async (
uri: AtUri,
cid: CID,
obj: unknown,
timestamp?: string,
): Promise<void> => {
if (!matchesSchema(obj)) {
throw new Error(`Record does not match schema: ${type}`)
}
@ -65,7 +70,7 @@ const insertFn =
badgeUri: obj.badge.uri,
badgeCid: obj.badge.cid,
createdAt: obj.createdAt,
indexedAt: new Date().toISOString(),
indexedAt: timestamp || new Date().toISOString(),
})
.execute()
}

@ -44,7 +44,12 @@ const getFn =
const insertFn =
(db: Kysely<PartialDB>) =>
async (uri: AtUri, cid: CID, obj: unknown): Promise<void> => {
async (
uri: AtUri,
cid: CID,
obj: unknown,
timestamp?: string,
): Promise<void> => {
if (!matchesSchema(obj)) {
throw new Error(`Record does not match schema: ${type}`)
}
@ -54,7 +59,7 @@ const insertFn =
creator: uri.host,
subject: obj.subject,
createdAt: obj.createdAt,
indexedAt: new Date().toISOString(),
indexedAt: timestamp || new Date().toISOString(),
}
await db.insertInto('app_bsky_follow').values(val).execute()
}

@ -49,7 +49,12 @@ const getFn =
const insertFn =
(db: Kysely<PartialDB>) =>
async (uri: AtUri, cid: CID, obj: unknown): Promise<void> => {
async (
uri: AtUri,
cid: CID,
obj: unknown,
timestamp?: string,
): Promise<void> => {
if (!matchesSchema(obj)) {
throw new Error(`Record does not match schema: ${type}`)
}
@ -62,7 +67,7 @@ const insertFn =
subject: obj.subject.uri,
subjectCid: obj.subject.cid,
createdAt: obj.createdAt,
indexedAt: new Date().toISOString(),
indexedAt: timestamp || new Date().toISOString(),
})
.execute()
}

@ -88,7 +88,12 @@ const getFn =
const insertFn =
(db: Kysely<PartialDB>) =>
async (uri: AtUri, cid: CID, obj: unknown): Promise<void> => {
async (
uri: AtUri,
cid: CID,
obj: unknown,
timestamp?: string,
): Promise<void> => {
if (!matchesSchema(obj)) {
throw new Error(`Record does not match schema: ${type}`)
}
@ -109,7 +114,7 @@ const insertFn =
replyRootCid: obj.reply?.root?.cid || null,
replyParent: obj.reply?.parent?.uri || null,
replyParentCid: obj.reply?.parent?.cid || null,
indexedAt: new Date().toISOString(),
indexedAt: timestamp || new Date().toISOString(),
}
const promises = [db.insertInto('app_bsky_post').values(post).execute()]
if (entities.length > 0) {

@ -67,7 +67,12 @@ const getFn =
const insertFn =
(db: Kysely<PartialDB>) =>
async (uri: AtUri, cid: CID, obj: unknown): Promise<void> => {
async (
uri: AtUri,
cid: CID,
obj: unknown,
timestamp?: string,
): Promise<void> => {
if (!matchesSchema(obj)) {
throw new Error(`Record does not match schema: ${type}`)
}
@ -83,7 +88,7 @@ const insertFn =
creator: uri.host,
displayName: obj.displayName,
description: obj.description,
indexedAt: new Date().toISOString(),
indexedAt: timestamp || new Date().toISOString(),
}
const promises = [
db.insertInto('app_bsky_profile').values(profile).execute(),

@ -49,7 +49,12 @@ const getFn =
const insertFn =
(db: Kysely<PartialDB>) =>
async (uri: AtUri, cid: CID, obj: unknown): Promise<void> => {
async (
uri: AtUri,
cid: CID,
obj: unknown,
timestamp?: string,
): Promise<void> => {
if (!matchesSchema(obj)) {
throw new Error(`Record does not match schema: ${type}`)
}
@ -62,7 +67,7 @@ const insertFn =
subject: obj.subject.uri,
subjectCid: obj.subject.cid,
createdAt: obj.createdAt,
indexedAt: new Date().toISOString(),
indexedAt: timestamp || new Date().toISOString(),
})
.execute()
}

@ -0,0 +1,8 @@
export interface IpldBlockCreator {
cid: string
did: string
}
export const tableName = 'ipld_block_creator'
export type PartialDB = { [tableName]: IpldBlockCreator }

@ -0,0 +1,10 @@
export interface IpldBlock {
cid: string
size: number
content: Uint8Array
indexedAt: string
}
export const tableName = 'ipld_block'
export type PartialDB = { [tableName]: IpldBlock }

@ -4,9 +4,6 @@ export interface Record {
did: string
collection: string
rkey: string
raw: string
receivedAt: string
indexedAt: string
}
export const tableName = 'record'

@ -9,7 +9,12 @@ export type DbRecordPlugin<T, S> = {
matchesSchema: (obj: unknown) => obj is T
translateDbObj: (dbObj: S) => T
get: (uri: AtUri) => Promise<T | null>
insert: (uri: AtUri, cid: CID, obj: unknown) => Promise<void>
insert: (
uri: AtUri,
cid: CID,
obj: unknown,
timestamp?: string,
) => Promise<void>
delete: (uri: AtUri) => Promise<void>
notifsForRecord: (uri: AtUri, cid: CID, obj: unknown) => Notification[]
}

@ -8,7 +8,6 @@ import express from 'express'
import cors from 'cors'
import * as auth from '@atproto/auth'
import API from './api'
import { IpldStore } from '@atproto/repo'
import Database from './db'
import ServerAuth from './auth'
import * as error from './error'
@ -26,7 +25,6 @@ export { Database } from './db'
export type App = express.Application
const runServer = (
blockstore: IpldStore,
db: Database,
keypair: auth.DidableKey,
cfg: ServerConfigValues,
@ -52,7 +50,6 @@ const runServer = (
const locals: Locals = {
logger: httpLogger,
blockstore,
db,
keypair,
auth,

@ -1,6 +1,5 @@
import { Response } from 'express'
import pino from 'pino'
import { IpldStore, RepoStructure } from '@atproto/repo'
import { AuthStore, DidableKey } from '@atproto/auth'
import * as plc from '@atproto/plc'
import { Database } from './db'
@ -11,7 +10,6 @@ import { App } from '.'
export type Locals = {
logger: pino.Logger
blockstore: IpldStore
db: Database
keypair: DidableKey
auth: ServerAuth
@ -29,14 +27,6 @@ export const logger = (res: HasLocals): pino.Logger => {
return logger as pino.Logger
}
export const blockstore = (res: HasLocals): IpldStore => {
const blockstore = res.locals.blockstore
if (!blockstore) {
throw new Error('No Blockstore object attached to server')
}
return blockstore as IpldStore
}
export const db = (res: HasLocals): Database => {
const db = res.locals.db
if (!db) {
@ -80,7 +70,6 @@ export const auth = (res: HasLocals): ServerAuth => {
export const getLocals = (res: HasLocals): Locals => {
return {
logger: logger(res),
blockstore: blockstore(res),
db: db(res),
keypair: keypair(res),
auth: auth(res),
@ -99,15 +88,3 @@ export const getAuthstore = (res: Response, did: string): AuthStore => {
const { auth, keypair } = get(res)
return auth.verifier.loadAuthStore(keypair, [], did)
}
export const loadRepo = async (
res: Response,
did: string,
): Promise<RepoStructure | null> => {
const { db, blockstore } = getLocals(res)
const currRoot = await db.getRepoRoot(did)
if (!currRoot) {
return null
}
return RepoStructure.load(blockstore, currRoot)
}

@ -0,0 +1,57 @@
import { IpldStore } from '@atproto/repo'
import { CID } from 'multiformats/cid'
import Database from './db'
export class SqlBlockstore extends IpldStore {
constructor(
public db: Database,
public did: string,
public timestamp?: string,
) {
super()
}
async has(cid: CID): Promise<boolean> {
const found = await this.db.db
.selectFrom('ipld_block')
.where('cid', '=', cid.toString())
.select('cid')
.executeTakeFirst()
return !!found
}
async getBytes(cid: CID): Promise<Uint8Array> {
const found = await this.db.db
.selectFrom('ipld_block')
.where('cid', '=', cid.toString())
.select('content')
.executeTakeFirst()
if (!found) throw new Error(`Not found: ${cid.toString()}`)
return found.content
}
async putBytes(cid: CID, bytes: Uint8Array): Promise<void> {
this.db.assertTransaction()
const insertBlock = this.db.db
.insertInto('ipld_block')
.values({
cid: cid.toString(),
size: bytes.length,
content: bytes,
indexedAt: this.timestamp || new Date().toISOString(),
})
.onConflict((oc) => oc.doNothing())
.execute()
const insertBlockOwner = this.db.db
.insertInto('ipld_block_creator')
.values({ cid: cid.toString(), did: this.did })
.execute()
await Promise.all([insertBlock, insertBlockOwner])
}
destroy(): Promise<void> {
throw new Error('Destruction of SQL blockstore not allowed at runtime')
}
}
export default SqlBlockstore

@ -1,4 +1,3 @@
import { MemoryBlockstore } from '@atproto/repo'
import * as crypto from '@atproto/crypto'
import * as plc from '@atproto/plc'
import { AtUri } from '@atproto/uri'
@ -60,9 +59,8 @@ export const runTestServer = async (
: Database.memory()
await db.migrateToLatestOrThrow()
const serverBlockstore = new MemoryBlockstore()
const { app, listener } = server(serverBlockstore, db, keypair, config)
const { app, listener } = server(db, keypair, config)
return {
url: `http://localhost:${pdsPort}`,

@ -1,10 +1,8 @@
import * as Block from 'multiformats/block'
import { CID } from 'multiformats/cid'
import { sha256 as blockHasher } from 'multiformats/hashes/sha2'
import * as blockCodec from '@ipld/dag-cbor'
import { BlockWriter } from '@ipld/car/writer'
import { check, util } from '@atproto/common'
import * as common from '@atproto/common'
import { check, util, valueToIpldBlock } from '@atproto/common'
import { BlockReader } from '@ipld/car/api'
import CidSet from '../cid-set'
import { CarReader } from '@ipld/car/reader'
@ -16,11 +14,7 @@ export abstract class IpldStore {
abstract destroy(): Promise<void>
async put(value: unknown): Promise<CID> {
const block = await Block.encode({
value,
codec: blockCodec,
hasher: blockHasher,
})
const block = await valueToIpldBlock(value)
await this.putBytes(block.cid, block.bytes)
return block.cid
}
@ -38,13 +32,7 @@ export abstract class IpldStore {
async getUnchecked(cid: CID): Promise<unknown> {
const bytes = await this.getBytes(cid)
const block = await Block.create({
bytes,
cid,
codec: blockCodec,
hasher: blockHasher,
})
return block.value
return common.ipldBytesToValue(bytes)
}
async isMissing(cid: CID): Promise<boolean> {