Daniel Holmgren b15dec2f4f
Atproto sync package (#2752)
* first pass/port

* reworking

* authenticated commit parsing

* authenticate identity evts

* some testing

* tidy & add firehose to queue

* error handling

* fix test

* refactor sync queue + some tests

* fix race in sync queue

* rm firehose from syncqueue

* add tests for queue utils

* README

* lint readme

* filter before parsing

* pr feedback

* small fix

* changesets

* fix type

* Rework dataplane subscription (#2766)

* working sync package into appview subscription

* add restart method to subscription for tests

* fix another test

* tidy subscription utils/files

* remove dupe property

* tidy after merge

* fix start cursor on subscription

* tweak process full subscription logic

* fixes
2024-09-04 20:18:16 -05:00

250 lines
6.8 KiB
TypeScript

import fs from 'fs'
import { CID } from 'multiformats'
import { TID, dataToCborBlock } from '@atproto/common'
import * as crypto from '@atproto/crypto'
import { Repo } from '../src/repo'
import { RepoStorage } from '../src/storage'
import { MST } from '../src/mst'
import {
BlockMap,
CollectionContents,
RecordWriteOp,
RepoContents,
RecordPath,
WriteOpAction,
Commit,
DataDiff,
CommitData,
} from '../src'
import { Keypair, randomBytes } from '@atproto/crypto'
type IdMapping = Record<string, CID>
export const randomCid = async (storage?: RepoStorage): Promise<CID> => {
const block = await dataToCborBlock({ test: randomStr(50) })
if (storage) {
// @ts-expect-error FIXME remove this comment (and fix the TS error)
await storage.putBlock(block.cid, block.bytes)
}
return block.cid
}
export const generateBulkDataKeys = async (
count: number,
blockstore?: RepoStorage,
): Promise<IdMapping> => {
const obj: IdMapping = {}
for (let i = 0; i < count; i++) {
const key = `com.example.record/${TID.nextStr()}`
obj[key] = await randomCid(blockstore)
}
return obj
}
export const keysFromMapping = (mapping: IdMapping): TID[] => {
return Object.keys(mapping).map((id) => TID.fromStr(id))
}
export const keysFromMappings = (mappings: IdMapping[]): TID[] => {
return mappings.map(keysFromMapping).flat()
}
export const randomStr = (len: number): string => {
let result = ''
const CHARS = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
for (let i = 0; i < len; i++) {
result += CHARS.charAt(Math.floor(Math.random() * CHARS.length))
}
return result
}
export const shuffle = <T>(arr: T[]): T[] => {
const toShuffle = [...arr]
const shuffled: T[] = []
while (toShuffle.length > 0) {
const index = Math.floor(Math.random() * toShuffle.length)
shuffled.push(toShuffle[index])
toShuffle.splice(index, 1)
}
return shuffled
}
export const generateObject = (): Record<string, string> => {
return {
name: randomStr(100),
}
}
// Mass repo mutations & checking
// -------------------------------
export const testCollections = ['com.example.posts', 'com.example.likes']
export const fillRepo = async (
repo: Repo,
keypair: crypto.Keypair,
itemsPerCollection: number,
): Promise<{ repo: Repo; data: RepoContents }> => {
const repoData: RepoContents = {}
const writes: RecordWriteOp[] = []
for (const collName of testCollections) {
const collData: CollectionContents = {}
for (let i = 0; i < itemsPerCollection; i++) {
const object = generateObject()
const rkey = TID.nextStr()
collData[rkey] = object
writes.push({
action: WriteOpAction.Create,
collection: collName,
rkey,
record: object,
})
}
repoData[collName] = collData
}
const updated = await repo.applyWrites(writes, keypair)
return {
repo: updated,
data: repoData,
}
}
export const formatEdit = async (
repo: Repo,
prevData: RepoContents,
keypair: crypto.Keypair,
params: {
adds?: number
updates?: number
deletes?: number
},
): Promise<{ commit: CommitData; data: RepoContents }> => {
const { adds = 0, updates = 0, deletes = 0 } = params
const repoData: RepoContents = {}
const writes: RecordWriteOp[] = []
for (const collName of testCollections) {
const collData = { ...(prevData[collName] ?? {}) }
const shuffled = shuffle(Object.entries(collData))
for (let i = 0; i < adds; i++) {
const object = generateObject()
const rkey = TID.nextStr()
collData[rkey] = object
writes.push({
action: WriteOpAction.Create,
collection: collName,
rkey,
record: object,
})
}
const toUpdate = shuffled.slice(0, updates)
for (let i = 0; i < toUpdate.length; i++) {
const object = generateObject()
const rkey = toUpdate[i][0]
collData[rkey] = object
writes.push({
action: WriteOpAction.Update,
collection: collName,
rkey,
record: object,
})
}
const toDelete = shuffled.slice(updates, deletes)
for (let i = 0; i < toDelete.length; i++) {
const rkey = toDelete[i][0]
delete collData[rkey]
writes.push({
action: WriteOpAction.Delete,
collection: collName,
rkey,
})
}
repoData[collName] = collData
}
const commit = await repo.formatCommit(writes, keypair)
return {
commit,
data: repoData,
}
}
export const pathsForOps = (ops: RecordWriteOp[]): RecordPath[] =>
ops.map((op) => ({ collection: op.collection, rkey: op.rkey }))
export const saveMst = async (storage: RepoStorage, mst: MST): Promise<CID> => {
const diff = await mst.getUnstoredBlocks()
// @ts-expect-error FIXME remove this comment (and fix the TS error)
await storage.putMany(diff.blocks)
return diff.root
}
// Creating repo
// -------------------
export const addBadCommit = async (
repo: Repo,
keypair: Keypair,
): Promise<Repo> => {
const obj = generateObject()
const newBlocks = new BlockMap()
const cid = await newBlocks.add(obj)
const updatedData = await repo.data.add(`com.example.test/${TID.next()}`, cid)
const dataCid = await updatedData.getPointer()
const diff = await DataDiff.of(updatedData, repo.data)
newBlocks.addMap(diff.newMstBlocks)
// we generate a bad sig by signing some other data
const rev = TID.nextStr(repo.commit.rev)
const commit: Commit = {
...repo.commit,
rev,
data: dataCid,
sig: await keypair.sign(randomBytes(256)),
}
const commitCid = await newBlocks.add(commit)
// @ts-expect-error FIXME remove this comment (and fix the TS error)
await repo.storage.applyCommit({
cid: commitCid,
rev,
prev: repo.cid,
newBlocks,
removedCids: diff.removedCids,
})
return await Repo.load(repo.storage, commitCid)
}
// Logging
// ----------------
export const writeMstLog = async (filename: string, tree: MST) => {
let log = ''
for await (const entry of tree.walk()) {
if (entry.isLeaf()) continue
const layer = await entry.getLayer()
log += `Layer ${layer}: ${entry.pointer}\n`
log += '--------------\n'
const entries = await entry.getEntries()
for (const e of entries) {
if (e.isLeaf()) {
log += `Key: ${e.key} (${e.value})\n`
} else {
log += `Subtree: ${e.pointer}\n`
}
}
log += '\n\n'
}
fs.writeFileSync(filename, log)
}
export const saveMstEntries = (filename: string, entries: [string, CID][]) => {
const writable = entries.map(([key, val]) => [key, val.toString()])
fs.writeFileSync(filename, JSON.stringify(writable))
}
export const loadMstEntries = (filename: string): [string, CID][] => {
const contents = fs.readFileSync(filename)
const parsed = JSON.parse(contents.toString())
return parsed.map(([key, value]) => [key, CID.parse(value)])
}