Wrap sync semantics ()

* deprecate blobs & tooBig

* add sync event, deprecate handle & tombstone

* fix up tests

* small tidy

* add test for sync account on account activation

* use new sync event in another place

* remove deprecated events from lexicons

* formatting

* pr cleanup

* changeset
This commit is contained in:
Daniel Holmgren 2025-02-28 17:19:18 -06:00 committed by GitHub
parent be800369c5
commit 38320191e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 177 additions and 801 deletions

@ -0,0 +1,6 @@
---
"@atproto/api": patch
"@atproto/pds": patch
---
Wrap sync v1.1 semantics. Add #sync event to subscribeRepos and deprecate #handle and #tombstone events

@ -17,16 +17,7 @@
"message": {
"schema": {
"type": "union",
"refs": [
"#commit",
"#sync",
"#identity",
"#account",
"#handle",
"#migrate",
"#tombstone",
"#info"
]
"refs": ["#commit", "#sync", "#identity", "#account", "#info"]
}
},
"errors": [
@ -186,39 +177,6 @@
}
}
},
"handle": {
"type": "object",
"description": "DEPRECATED -- Use #identity event instead",
"required": ["seq", "did", "handle", "time"],
"properties": {
"seq": { "type": "integer" },
"did": { "type": "string", "format": "did" },
"handle": { "type": "string", "format": "handle" },
"time": { "type": "string", "format": "datetime" }
}
},
"migrate": {
"type": "object",
"description": "DEPRECATED -- Use #account event instead",
"required": ["seq", "did", "migrateTo", "time"],
"nullable": ["migrateTo"],
"properties": {
"seq": { "type": "integer" },
"did": { "type": "string", "format": "did" },
"migrateTo": { "type": "string" },
"time": { "type": "string", "format": "datetime" }
}
},
"tombstone": {
"type": "object",
"description": "DEPRECATED -- Use #account event instead",
"required": ["seq", "did", "time"],
"properties": {
"seq": { "type": "integer" },
"did": { "type": "string", "format": "did" },
"time": { "type": "string", "format": "datetime" }
}
},
"info": {
"type": "object",
"required": ["name"],

@ -3991,9 +3991,6 @@ export const schemaDict = {
'lex:com.atproto.sync.subscribeRepos#sync',
'lex:com.atproto.sync.subscribeRepos#identity',
'lex:com.atproto.sync.subscribeRepos#account',
'lex:com.atproto.sync.subscribeRepos#handle',
'lex:com.atproto.sync.subscribeRepos#migrate',
'lex:com.atproto.sync.subscribeRepos#tombstone',
'lex:com.atproto.sync.subscribeRepos#info',
],
},
@ -4197,68 +4194,6 @@ export const schemaDict = {
},
},
},
handle: {
type: 'object',
description: 'DEPRECATED -- Use #identity event instead',
required: ['seq', 'did', 'handle', 'time'],
properties: {
seq: {
type: 'integer',
},
did: {
type: 'string',
format: 'did',
},
handle: {
type: 'string',
format: 'handle',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
migrate: {
type: 'object',
description: 'DEPRECATED -- Use #account event instead',
required: ['seq', 'did', 'migrateTo', 'time'],
nullable: ['migrateTo'],
properties: {
seq: {
type: 'integer',
},
did: {
type: 'string',
format: 'did',
},
migrateTo: {
type: 'string',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
tombstone: {
type: 'object',
description: 'DEPRECATED -- Use #account event instead',
required: ['seq', 'did', 'time'],
properties: {
seq: {
type: 'integer',
},
did: {
type: 'string',
format: 'did',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
info: {
type: 'object',
required: ['name'],

@ -122,62 +122,6 @@ export function validateAccount<V>(v: V) {
return validate<Account & V>(v, id, hashAccount)
}
/** DEPRECATED -- Use #identity event instead */
export interface Handle {
$type?: 'com.atproto.sync.subscribeRepos#handle'
seq: number
did: string
handle: string
time: string
}
const hashHandle = 'handle'
export function isHandle<V>(v: V) {
return is$typed(v, id, hashHandle)
}
export function validateHandle<V>(v: V) {
return validate<Handle & V>(v, id, hashHandle)
}
/** DEPRECATED -- Use #account event instead */
export interface Migrate {
$type?: 'com.atproto.sync.subscribeRepos#migrate'
seq: number
did: string
migrateTo: string | null
time: string
}
const hashMigrate = 'migrate'
export function isMigrate<V>(v: V) {
return is$typed(v, id, hashMigrate)
}
export function validateMigrate<V>(v: V) {
return validate<Migrate & V>(v, id, hashMigrate)
}
/** DEPRECATED -- Use #account event instead */
export interface Tombstone {
$type?: 'com.atproto.sync.subscribeRepos#tombstone'
seq: number
did: string
time: string
}
const hashTombstone = 'tombstone'
export function isTombstone<V>(v: V) {
return is$typed(v, id, hashTombstone)
}
export function validateTombstone<V>(v: V) {
return validate<Tombstone & V>(v, id, hashTombstone)
}
export interface Info {
$type?: 'com.atproto.sync.subscribeRepos#info'
name: 'OutdatedCursor' | (string & {})

@ -2035,16 +2035,6 @@ export const schemaDict = {
cursor: {
type: 'string',
},
rkeyStart: {
type: 'string',
description:
'DEPRECATED: The lowest sort-ordered rkey to start from (exclusive)',
},
rkeyEnd: {
type: 'string',
description:
'DEPRECATED: The highest sort-ordered rkey to stop at (exclusive)',
},
reverse: {
type: 'boolean',
description: 'Flag to reverse the order of the returned records.',
@ -3585,12 +3575,6 @@ export const schemaDict = {
description: 'Record Key',
format: 'record-key',
},
commit: {
type: 'string',
format: 'cid',
description:
'DEPRECATED: referenced a repo commit by CID, and retrieved record as of that commit',
},
},
},
output: {
@ -4007,9 +3991,6 @@ export const schemaDict = {
'lex:com.atproto.sync.subscribeRepos#sync',
'lex:com.atproto.sync.subscribeRepos#identity',
'lex:com.atproto.sync.subscribeRepos#account',
'lex:com.atproto.sync.subscribeRepos#handle',
'lex:com.atproto.sync.subscribeRepos#migrate',
'lex:com.atproto.sync.subscribeRepos#tombstone',
'lex:com.atproto.sync.subscribeRepos#info',
],
},
@ -4213,68 +4194,6 @@ export const schemaDict = {
},
},
},
handle: {
type: 'object',
description: 'DEPRECATED -- Use #identity event instead',
required: ['seq', 'did', 'handle', 'time'],
properties: {
seq: {
type: 'integer',
},
did: {
type: 'string',
format: 'did',
},
handle: {
type: 'string',
format: 'handle',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
migrate: {
type: 'object',
description: 'DEPRECATED -- Use #account event instead',
required: ['seq', 'did', 'migrateTo', 'time'],
nullable: ['migrateTo'],
properties: {
seq: {
type: 'integer',
},
did: {
type: 'string',
format: 'did',
},
migrateTo: {
type: 'string',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
tombstone: {
type: 'object',
description: 'DEPRECATED -- Use #account event instead',
required: ['seq', 'did', 'time'],
properties: {
seq: {
type: 'integer',
},
did: {
type: 'string',
format: 'did',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
info: {
type: 'object',
required: ['name'],

@ -20,10 +20,6 @@ export interface QueryParams {
/** The number of records to return. */
limit: number
cursor?: string
/** DEPRECATED: The lowest sort-ordered rkey to start from (exclusive) */
rkeyStart?: string
/** DEPRECATED: The highest sort-ordered rkey to stop at (exclusive) */
rkeyEnd?: string
/** Flag to reverse the order of the returned records. */
reverse?: boolean
}

@ -19,8 +19,6 @@ export interface QueryParams {
collection: string
/** Record Key */
rkey: string
/** DEPRECATED: referenced a repo commit by CID, and retrieved record as of that commit */
commit?: string
}
export type InputSchema = undefined

@ -22,9 +22,6 @@ export type OutputSchema =
| $Typed<Sync>
| $Typed<Identity>
| $Typed<Account>
| $Typed<Handle>
| $Typed<Migrate>
| $Typed<Tombstone>
| $Typed<Info>
| { $type: string }
export type HandlerError = ErrorFrame<'FutureCursor' | 'ConsumerTooSlow'>
@ -150,62 +147,6 @@ export function validateAccount<V>(v: V) {
return validate<Account & V>(v, id, hashAccount)
}
/** DEPRECATED -- Use #identity event instead */
export interface Handle {
$type?: 'com.atproto.sync.subscribeRepos#handle'
seq: number
did: string
handle: string
time: string
}
const hashHandle = 'handle'
export function isHandle<V>(v: V) {
return is$typed(v, id, hashHandle)
}
export function validateHandle<V>(v: V) {
return validate<Handle & V>(v, id, hashHandle)
}
/** DEPRECATED -- Use #account event instead */
export interface Migrate {
$type?: 'com.atproto.sync.subscribeRepos#migrate'
seq: number
did: string
migrateTo: string | null
time: string
}
const hashMigrate = 'migrate'
export function isMigrate<V>(v: V) {
return is$typed(v, id, hashMigrate)
}
export function validateMigrate<V>(v: V) {
return validate<Migrate & V>(v, id, hashMigrate)
}
/** DEPRECATED -- Use #account event instead */
export interface Tombstone {
$type?: 'com.atproto.sync.subscribeRepos#tombstone'
seq: number
did: string
time: string
}
const hashTombstone = 'tombstone'
export function isTombstone<V>(v: V) {
return is$typed(v, id, hashTombstone)
}
export function validateTombstone<V>(v: V) {
return validate<Tombstone & V>(v, id, hashTombstone)
}
export interface Info {
$type?: 'com.atproto.sync.subscribeRepos#info'
name: 'OutdatedCursor' | (string & {})

@ -3991,9 +3991,6 @@ export const schemaDict = {
'lex:com.atproto.sync.subscribeRepos#sync',
'lex:com.atproto.sync.subscribeRepos#identity',
'lex:com.atproto.sync.subscribeRepos#account',
'lex:com.atproto.sync.subscribeRepos#handle',
'lex:com.atproto.sync.subscribeRepos#migrate',
'lex:com.atproto.sync.subscribeRepos#tombstone',
'lex:com.atproto.sync.subscribeRepos#info',
],
},
@ -4197,68 +4194,6 @@ export const schemaDict = {
},
},
},
handle: {
type: 'object',
description: 'DEPRECATED -- Use #identity event instead',
required: ['seq', 'did', 'handle', 'time'],
properties: {
seq: {
type: 'integer',
},
did: {
type: 'string',
format: 'did',
},
handle: {
type: 'string',
format: 'handle',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
migrate: {
type: 'object',
description: 'DEPRECATED -- Use #account event instead',
required: ['seq', 'did', 'migrateTo', 'time'],
nullable: ['migrateTo'],
properties: {
seq: {
type: 'integer',
},
did: {
type: 'string',
format: 'did',
},
migrateTo: {
type: 'string',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
tombstone: {
type: 'object',
description: 'DEPRECATED -- Use #account event instead',
required: ['seq', 'did', 'time'],
properties: {
seq: {
type: 'integer',
},
did: {
type: 'string',
format: 'did',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
info: {
type: 'object',
required: ['name'],

@ -22,9 +22,6 @@ export type OutputSchema =
| $Typed<Sync>
| $Typed<Identity>
| $Typed<Account>
| $Typed<Handle>
| $Typed<Migrate>
| $Typed<Tombstone>
| $Typed<Info>
| { $type: string }
export type HandlerError = ErrorFrame<'FutureCursor' | 'ConsumerTooSlow'>
@ -150,62 +147,6 @@ export function validateAccount<V>(v: V) {
return validate<Account & V>(v, id, hashAccount)
}
/** DEPRECATED -- Use #identity event instead */
export interface Handle {
$type?: 'com.atproto.sync.subscribeRepos#handle'
seq: number
did: string
handle: string
time: string
}
const hashHandle = 'handle'
export function isHandle<V>(v: V) {
return is$typed(v, id, hashHandle)
}
export function validateHandle<V>(v: V) {
return validate<Handle & V>(v, id, hashHandle)
}
/** DEPRECATED -- Use #account event instead */
export interface Migrate {
$type?: 'com.atproto.sync.subscribeRepos#migrate'
seq: number
did: string
migrateTo: string | null
time: string
}
const hashMigrate = 'migrate'
export function isMigrate<V>(v: V) {
return is$typed(v, id, hashMigrate)
}
export function validateMigrate<V>(v: V) {
return validate<Migrate & V>(v, id, hashMigrate)
}
/** DEPRECATED -- Use #account event instead */
export interface Tombstone {
$type?: 'com.atproto.sync.subscribeRepos#tombstone'
seq: number
did: string
time: string
}
const hashTombstone = 'tombstone'
export function isTombstone<V>(v: V) {
return is$typed(v, id, hashTombstone)
}
export function validateTombstone<V>(v: V) {
return validate<Tombstone & V>(v, id, hashTombstone)
}
export interface Info {
$type?: 'com.atproto.sync.subscribeRepos#info'
name: 'OutdatedCursor' | (string & {})

@ -1,4 +1,5 @@
import { BlobStore } from '@atproto/repo'
import { SyncEvtData } from '../../repo'
import { BlobReader } from '../blob/reader'
import { ActorDb } from '../db'
import { RecordReader } from '../record/reader'
@ -17,4 +18,14 @@ export class RepoReader {
this.record = new RecordReader(db)
this.storage = new SqlRepoReader(db)
}
async getSyncEventData(): Promise<SyncEvtData> {
const root = await this.storage.getRootDetailed()
const { blocks } = await this.storage.getBlocks([root.cid])
return {
cid: root.cid,
rev: root.rev,
blocks,
}
}
}

@ -18,7 +18,6 @@ import { ActorDb } from '../db'
import { RecordTransactor } from '../record/transactor'
import { RepoReader } from './reader'
import { SqlRepoTransactor } from './sql-repo-transactor'
import { blobCidsFromWrites, commitOpsFromCreates } from './util'
export class RepoTransactor extends RepoReader {
blob: BlobTransactor
@ -61,10 +60,14 @@ export class RepoTransactor extends RepoReader {
this.indexWrites(writes, commit.rev),
this.blob.processWriteBlobs(commit.rev, writes),
])
const ops = writes.map((w) => ({
action: 'create' as const,
path: formatDataKey(w.uri.collection, w.uri.rkey),
cid: w.cid,
}))
return {
...commit,
ops: commitOpsFromCreates(writes),
blobs: blobCidsFromWrites(writes),
ops,
prevData: null,
}
}
@ -74,7 +77,16 @@ export class RepoTransactor extends RepoReader {
swapCommitCid?: CID,
): Promise<CommitDataWithOps> {
this.db.assertTransaction()
if (writes.length > 200) {
throw new InvalidRequestError('Too many writes. Max: 200')
}
const commit = await this.formatCommit(writes, swapCommitCid)
// Do not allow commits > 2MB
if (commit.relevantBlocks.byteSize > 2000000) {
throw new InvalidRequestError('Too many writes. Max event size: 2MB')
}
await Promise.all([
// persist the commit to repo storage
this.storage.applyCommit(commit),
@ -166,7 +178,6 @@ export class RepoTransactor extends RepoReader {
return {
...commit,
ops: commitOps,
blobs: blobCidsFromWrites(writes),
prevData,
}
}

@ -1,22 +0,0 @@
import { CidSet, formatDataKey } from '@atproto/repo'
import { CommitOp, PreparedCreate, PreparedWrite } from '../../repo'
export const blobCidsFromWrites = (writes: PreparedWrite[]): CidSet => {
const blobCids = new CidSet()
for (const w of writes) {
if (w.action === 'create' || w.action === 'update') {
for (const blob of w.blobs) {
blobCids.add(blob.cid)
}
}
}
return blobCids
}
export const commitOpsFromCreates = (writes: PreparedCreate[]): CommitOp[] => {
return writes.map((w) => ({
action: 'create' as const,
path: formatDataKey(w.uri.collection, w.uri.rkey),
cid: w.cid,
}))
}

@ -9,12 +9,11 @@ export default function (server: Server, ctx: AppContext) {
const { did } = input.body
await ctx.actorStore.destroy(did)
await ctx.accountManager.deleteAccount(did)
const tombstoneSeq = await ctx.sequencer.sequenceTombstone(did)
const accountSeq = await ctx.sequencer.sequenceAccountEvt(
did,
AccountStatus.Deleted,
)
await ctx.sequencer.deleteAllForUser(did, [accountSeq, tombstoneSeq])
await ctx.sequencer.deleteAllForUser(did, [accountSeq])
},
})
}

@ -44,7 +44,6 @@ export default function (server: Server, ctx: AppContext) {
}
try {
await ctx.sequencer.sequenceHandleUpdate(did, handle)
await ctx.sequencer.sequenceIdentityEvt(did, handle)
} catch (err) {
httpLogger.error(

@ -79,7 +79,6 @@ export default function (server: Server, ctx: AppContext) {
}
try {
await ctx.sequencer.sequenceHandleUpdate(requester, handle)
await ctx.sequencer.sequenceIdentityEvt(requester, handle)
} catch (err) {
httpLogger.error(

@ -1,4 +1,3 @@
import { CidSet } from '@atproto/repo'
import { INVALID_HANDLE } from '@atproto/syntax'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { AppContext } from '../../../../context'
@ -31,22 +30,9 @@ export default function (server: Server, ctx: AppContext) {
await ctx.accountManager.activateAccount(requester)
const commitData = await ctx.actorStore.read(requester, async (store) => {
const root = await store.repo.storage.getRootDetailed()
const blocks = await store.repo.storage.getBlocks([root.cid])
return {
cid: root.cid,
rev: root.rev,
since: null,
prev: null,
newBlocks: blocks.blocks,
relevantBlocks: blocks.blocks,
removedCids: new CidSet(),
ops: [],
blobs: new CidSet(),
prevData: null,
}
})
const syncData = await ctx.actorStore.read(requester, (store) =>
store.repo.getSyncEventData(),
)
// @NOTE: we're over-emitting for now for backwards compatibility, can reduce this in the future
const status = await ctx.accountManager.getAccountStatus(requester)
@ -55,7 +41,7 @@ export default function (server: Server, ctx: AppContext) {
requester,
account.handle ?? INVALID_HANDLE,
)
await ctx.sequencer.sequenceCommit(requester, commitData)
await ctx.sequencer.sequenceSyncEvt(requester, syncData)
},
})
}

@ -48,8 +48,7 @@ export default function (server: Server, ctx: AppContext) {
did,
AccountStatus.Deleted,
)
const tombstoneSeq = await ctx.sequencer.sequenceTombstone(did)
await ctx.sequencer.deleteAllForUser(did, [accountSeq, tombstoneSeq])
await ctx.sequencer.deleteAllForUser(did, [accountSeq])
},
})
}

@ -45,9 +45,9 @@ export default function (server: Server, ctx: AppContext) {
time: evt.time,
...evt.evt,
}
} else if (evt.type === 'handle') {
} else if (evt.type === 'sync') {
yield {
$type: '#handle',
$type: '#sync',
seq: evt.seq,
time: evt.time,
...evt.evt,
@ -66,13 +66,6 @@ export default function (server: Server, ctx: AppContext) {
time: evt.time,
...evt.evt,
}
} else if (evt.type === 'tombstone') {
yield {
$type: '#tombstone',
seq: evt.seq,
time: evt.time,
...evt.evt,
}
}
}
})

@ -3991,9 +3991,6 @@ export const schemaDict = {
'lex:com.atproto.sync.subscribeRepos#sync',
'lex:com.atproto.sync.subscribeRepos#identity',
'lex:com.atproto.sync.subscribeRepos#account',
'lex:com.atproto.sync.subscribeRepos#handle',
'lex:com.atproto.sync.subscribeRepos#migrate',
'lex:com.atproto.sync.subscribeRepos#tombstone',
'lex:com.atproto.sync.subscribeRepos#info',
],
},
@ -4197,68 +4194,6 @@ export const schemaDict = {
},
},
},
handle: {
type: 'object',
description: 'DEPRECATED -- Use #identity event instead',
required: ['seq', 'did', 'handle', 'time'],
properties: {
seq: {
type: 'integer',
},
did: {
type: 'string',
format: 'did',
},
handle: {
type: 'string',
format: 'handle',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
migrate: {
type: 'object',
description: 'DEPRECATED -- Use #account event instead',
required: ['seq', 'did', 'migrateTo', 'time'],
nullable: ['migrateTo'],
properties: {
seq: {
type: 'integer',
},
did: {
type: 'string',
format: 'did',
},
migrateTo: {
type: 'string',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
tombstone: {
type: 'object',
description: 'DEPRECATED -- Use #account event instead',
required: ['seq', 'did', 'time'],
properties: {
seq: {
type: 'integer',
},
did: {
type: 'string',
format: 'did',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
info: {
type: 'object',
required: ['name'],

@ -22,9 +22,6 @@ export type OutputSchema =
| $Typed<Sync>
| $Typed<Identity>
| $Typed<Account>
| $Typed<Handle>
| $Typed<Migrate>
| $Typed<Tombstone>
| $Typed<Info>
| { $type: string }
export type HandlerError = ErrorFrame<'FutureCursor' | 'ConsumerTooSlow'>
@ -150,62 +147,6 @@ export function validateAccount<V>(v: V) {
return validate<Account & V>(v, id, hashAccount)
}
/** DEPRECATED -- Use #identity event instead */
export interface Handle {
$type?: 'com.atproto.sync.subscribeRepos#handle'
seq: number
did: string
handle: string
time: string
}
const hashHandle = 'handle'
export function isHandle<V>(v: V) {
return is$typed(v, id, hashHandle)
}
export function validateHandle<V>(v: V) {
return validate<Handle & V>(v, id, hashHandle)
}
/** DEPRECATED -- Use #account event instead */
export interface Migrate {
$type?: 'com.atproto.sync.subscribeRepos#migrate'
seq: number
did: string
migrateTo: string | null
time: string
}
const hashMigrate = 'migrate'
export function isMigrate<V>(v: V) {
return is$typed(v, id, hashMigrate)
}
export function validateMigrate<V>(v: V) {
return validate<Migrate & V>(v, id, hashMigrate)
}
/** DEPRECATED -- Use #account event instead */
export interface Tombstone {
$type?: 'com.atproto.sync.subscribeRepos#tombstone'
seq: number
did: string
time: string
}
const hashTombstone = 'tombstone'
export function isTombstone<V>(v: V) {
return is$typed(v, id, hashTombstone)
}
export function validateTombstone<V>(v: V) {
return validate<Tombstone & V>(v, id, hashTombstone)
}
export interface Info {
$type?: 'com.atproto.sync.subscribeRepos#info'
name: 'OutdatedCursor' | (string & {})

@ -1,6 +1,6 @@
import { CID } from 'multiformats/cid'
import { RepoRecord } from '@atproto/lexicon'
import { CidSet, CommitData, WriteOpAction } from '@atproto/repo'
import { BlockMap, CommitData, WriteOpAction } from '@atproto/repo'
import { AtUri } from '@atproto/syntax'
export type ValidationStatus = 'valid' | 'unknown' | undefined
@ -51,12 +51,17 @@ export type CommitOp = {
export type CommitDataWithOps = CommitData & {
ops: CommitOp[]
blobs: CidSet
prevData: CID | null
}
export type PreparedWrite = PreparedCreate | PreparedUpdate | PreparedDelete
export type SyncEvtData = {
cid: CID
rev: string
blocks: BlockMap
}
export class InvalidRecordError extends Error {}
export class BadCommitSwapError extends Error {

@ -77,7 +77,10 @@ export const rebuildRepo = async (ctx: AppContext, args: string[]) => {
}
})
await ctx.accountManager.updateRepoRoot(did, commit.cid, rev)
await ctx.sequencer.sequenceCommit(did, commit)
const syncData = await ctx.actorStore.read(did, (store) =>
store.repo.getSyncEventData(),
)
await ctx.sequencer.sequenceSyncEvt(did, syncData)
}
const promptContinue = async (): Promise<boolean> => {

@ -1,13 +1,6 @@
import { Generated, GeneratedAlways, Insertable, Selectable } from 'kysely'
export type RepoSeqEventType =
| 'append'
| 'rebase'
| 'handle'
| 'migrate'
| 'identity'
| 'account'
| 'tombstone'
export type RepoSeqEventType = 'append' | 'sync' | 'identity' | 'account'
export interface RepoSeq {
seq: GeneratedAlways<number>

@ -2,7 +2,7 @@ import { z } from 'zod'
import { cborEncode, noUndefinedVals, schema } from '@atproto/common'
import { BlockMap, blocksToCarFile } from '@atproto/repo'
import { AccountStatus } from '../account-manager'
import { CommitDataWithOps } from '../repo'
import { CommitDataWithOps, SyncEvtData } from '../repo'
import { RepoSeqInsert } from './db'
export const formatSeqCommit = async (
@ -13,41 +13,18 @@ export const formatSeqCommit = async (
blocksToSend.addMap(commitData.newBlocks)
blocksToSend.addMap(commitData.relevantBlocks)
let evt: CommitEvt
// If event is too big (max 200 ops or 1MB of data)
if (commitData.ops.length > 200 || blocksToSend.byteSize > 1000000) {
const justRoot = new BlockMap()
const rootBlock = blocksToSend.get(commitData.cid)
if (rootBlock) {
justRoot.set(commitData.cid, rootBlock)
}
evt = {
rebase: false,
tooBig: true,
repo: did,
commit: commitData.cid,
rev: commitData.rev,
since: commitData.since,
blocks: await blocksToCarFile(commitData.cid, justRoot),
ops: [],
blobs: [],
prevData: commitData.prevData ?? undefined,
}
} else {
evt = {
rebase: false,
tooBig: false,
repo: did,
commit: commitData.cid,
rev: commitData.rev,
since: commitData.since,
blocks: await blocksToCarFile(commitData.cid, blocksToSend),
ops: commitData.ops,
blobs: commitData.blobs.toList(),
prevData: commitData.prevData ?? undefined,
}
const evt = {
repo: did,
commit: commitData.cid,
rev: commitData.rev,
since: commitData.since,
blocks: await blocksToCarFile(commitData.cid, blocksToSend),
ops: commitData.ops,
prevData: commitData.prevData ?? undefined,
// deprecated (but still required) fields
rebase: false,
tooBig: false,
blobs: [],
}
return {
@ -58,17 +35,19 @@ export const formatSeqCommit = async (
}
}
export const formatSeqHandleUpdate = async (
export const formatSeqSyncEvt = async (
did: string,
handle: string,
data: SyncEvtData,
): Promise<RepoSeqInsert> => {
const evt: HandleEvt = {
const blocks = await blocksToCarFile(data.cid, data.blocks)
const evt: SyncEvt = {
did,
handle,
rev: data.rev,
blocks,
}
return {
did,
eventType: 'handle',
eventType: 'sync',
event: cborEncode(evt),
sequencedAt: new Date().toISOString(),
}
@ -112,20 +91,6 @@ export const formatSeqAccountEvt = async (
}
}
export const formatSeqTombstone = async (
did: string,
): Promise<RepoSeqInsert> => {
const evt: TombstoneEvt = {
did,
}
return {
did,
eventType: 'tombstone',
event: cborEncode(evt),
sequencedAt: new Date().toISOString(),
}
}
export const commitEvtOp = z.object({
action: z.union([
z.literal('create'),
@ -152,11 +117,12 @@ export const commitEvt = z.object({
})
export type CommitEvt = z.infer<typeof commitEvt>
export const handleEvt = z.object({
export const syncEvt = z.object({
did: z.string(),
handle: z.string(),
blocks: schema.bytes,
rev: z.string(),
})
export type HandleEvt = z.infer<typeof handleEvt>
export type SyncEvt = z.infer<typeof syncEvt>
export const identityEvt = z.object({
did: z.string(),
@ -178,22 +144,17 @@ export const accountEvt = z.object({
})
export type AccountEvt = z.infer<typeof accountEvt>
export const tombstoneEvt = z.object({
did: z.string(),
})
export type TombstoneEvt = z.infer<typeof tombstoneEvt>
type TypedCommitEvt = {
type: 'commit'
seq: number
time: string
evt: CommitEvt
}
type TypedHandleEvt = {
type: 'handle'
type TypedSyncEvt = {
type: 'sync'
seq: number
time: string
evt: HandleEvt
evt: SyncEvt
}
type TypedIdentityEvt = {
type: 'identity'
@ -207,15 +168,8 @@ type TypedAccountEvt = {
time: string
evt: AccountEvt
}
type TypedTombstoneEvt = {
type: 'tombstone'
seq: number
time: string
evt: TombstoneEvt
}
export type SeqEvt =
| TypedCommitEvt
| TypedHandleEvt
| TypedSyncEvt
| TypedIdentityEvt
| TypedAccountEvt
| TypedTombstoneEvt

@ -4,7 +4,7 @@ import { SECOND, cborDecode, wait } from '@atproto/common'
import { AccountStatus } from '../account-manager/helpers/account'
import { Crawlers } from '../crawlers'
import { seqLogger as log } from '../logger'
import { CommitDataWithOps } from '../repo'
import { CommitDataWithOps, SyncEvtData } from '../repo'
import {
RepoSeqEntry,
RepoSeqInsert,
@ -15,15 +15,13 @@ import {
import {
AccountEvt,
CommitEvt,
HandleEvt,
IdentityEvt,
SeqEvt,
TombstoneEvt,
SyncEvt,
formatSeqAccountEvt,
formatSeqCommit,
formatSeqHandleUpdate,
formatSeqIdentityEvt,
formatSeqTombstone,
formatSeqSyncEvt,
} from './events'
export * from './events'
@ -135,19 +133,19 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) {
continue
}
const evt = cborDecode(row.event)
if (row.eventType === 'append' || row.eventType === 'rebase') {
if (row.eventType === 'append') {
seqEvts.push({
type: 'commit',
seq: row.seq,
time: row.sequencedAt,
evt: evt as CommitEvt,
})
} else if (row.eventType === 'handle') {
} else if (row.eventType === 'sync') {
seqEvts.push({
type: 'handle',
type: 'sync',
seq: row.seq,
time: row.sequencedAt,
evt: evt as HandleEvt,
evt: evt as SyncEvt,
})
} else if (row.eventType === 'identity') {
seqEvts.push({
@ -163,13 +161,6 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) {
time: row.sequencedAt,
evt: evt as AccountEvt,
})
} else if (row.eventType === 'tombstone') {
seqEvts.push({
type: 'tombstone',
seq: row.seq,
time: row.sequencedAt,
evt: evt as TombstoneEvt,
})
}
}
@ -222,8 +213,8 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) {
return await this.sequenceEvt(evt)
}
async sequenceHandleUpdate(did: string, handle: string): Promise<number> {
const evt = await formatSeqHandleUpdate(did, handle)
async sequenceSyncEvt(did: string, data: SyncEvtData) {
const evt = await formatSeqSyncEvt(did, data)
return await this.sequenceEvt(evt)
}
@ -240,11 +231,6 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) {
return await this.sequenceEvt(evt)
}
async sequenceTombstone(did: string): Promise<number> {
const evt = await formatSeqTombstone(did)
return await this.sequenceEvt(evt)
}
async deleteAllForUser(did: string, excludingSeqs: number[] = []) {
await this.db.executeWithRetry(
this.db.db

@ -151,14 +151,12 @@ describe('account deletion', () => {
expect(
updatedDbContents.repoSeqs
.filter((row) => row.did === carol.did)
.every(
(row) => row.eventType === 'tombstone' || row.eventType === 'account',
),
.every((row) => row.eventType === 'account'),
).toBe(true)
// check we do have a tombstone for this did
// check we do have a account (deletion) event for this did
expect(
updatedDbContents.repoSeqs.filter(
(row) => row.did === carol.did && row.eventType === 'tombstone',
(row) => row.did === carol.did && row.eventType === 'account',
).length,
).toEqual(1)
expect(updatedDbContents.appPasswords).toEqual(

@ -7,9 +7,8 @@ import {
import { randomStr } from '@atproto/crypto'
import { SeedClient, TestNetworkNoAppView } from '@atproto/dev-env'
import { readCarWithRoot } from '@atproto/repo'
import { repoPrepare, sequencer } from '../../pds'
import { ids } from '../src/lexicon/lexicons'
import { SeqEvt, Sequencer, formatSeqCommit } from '../src/sequencer'
import { sequencer } from '../../pds'
import { SeqEvt, Sequencer, formatSeqSyncEvt } from '../src/sequencer'
import { Outbox } from '../src/sequencer/outbox'
import userSeed from './seeds/users'
@ -220,35 +219,27 @@ describe('sequencer', () => {
lastSeen = results[0].at(-1)?.seq ?? lastSeen
})
it('root block must be returned in tooBig seq commit', async () => {
// Create good records to exceed the event limit (the current limit is 200 events)
// it creates events completely locally, so it doesn't need to be in the network
const eventsToCreate = 250
const createPostRecord = () =>
repoPrepare.prepareCreate({
did: sc.dids.alice,
collection: ids.AppBskyFeedPost,
record: { text: 'valid', createdAt: new Date().toISOString() },
})
const writesPromises = Array.from(
{ length: eventsToCreate },
createPostRecord,
)
const writes = await Promise.all(writesPromises)
// just format commit without processing writes
const writeCommit = await network.pds.ctx.actorStore.transact(
it('root block must be returned in sync event', async () => {
const syncData = await network.pds.ctx.actorStore.read(
sc.dids.alice,
(store) => store.repo.formatCommit(writes),
async (store) => {
const root = await store.repo.storage.getRootDetailed()
const { blocks } = await store.repo.storage.getBlocks([root.cid])
return {
cid: root.cid,
rev: root.rev,
blocks,
}
},
)
const repoSeqInsert = await formatSeqCommit(sc.dids.alice, writeCommit)
const evt = cborDecode<sequencer.CommitEvt>(repoSeqInsert.event)
expect(evt.tooBig).toBe(true)
const dbEvt = await formatSeqSyncEvt(sc.dids.alice, syncData)
const evt = cborDecode<sequencer.SyncEvt>(dbEvt.event)
expect(evt.did).toBe(sc.dids.alice)
const car = await readCarWithRoot(evt.blocks)
expect(car.root.toString()).toBe(writeCommit.cid.toString())
expect(car.root.toString()).toBe(syncData.cid.toString())
// in the case of tooBig, the blocks must contain the root block only
expect(car.blocks.size).toBe(1)
expect(car.blocks.has(syncData.cid)).toBeTruthy()
})
})

@ -18,9 +18,8 @@ import { AccountStatus } from '../../src/account-manager'
import {
Account as AccountEvt,
Commit as CommitEvt,
Handle as HandleEvt,
Identity as IdentityEvt,
Tombstone as TombstoneEvt,
Sync as SyncEvt,
} from '../../src/lexicon/types/com/atproto/sync/subscribeRepos'
import basicSeed from '../seeds/basic'
@ -74,10 +73,12 @@ describe('repo subscribe repos', () => {
if (
(frame.header.t === '#commit' &&
(frame.body as CommitEvt).repo === userDid) ||
(frame.header.t === '#handle' &&
(frame.body as HandleEvt).did === userDid) ||
(frame.header.t === '#tombstone' &&
(frame.body as TombstoneEvt).did === userDid)
(frame.header.t === '#sync' &&
(frame.body as SyncEvt).did === userDid) ||
(frame.header.t === '#identity' &&
(frame.body as IdentityEvt).did === userDid) ||
(frame.header.t === '#account' &&
(frame.body as AccountEvt).did === userDid)
) {
types.push(frame.body)
}
@ -96,6 +97,10 @@ describe('repo subscribe repos', () => {
return evts
}
const getSyncEvts = (frames: Frame[]): SyncEvt[] => {
return getEventType(frames, '#sync')
}
const getAccountEvts = (frames: Frame[]): AccountEvt[] => {
return getEventType(frames, '#account')
}
@ -104,14 +109,6 @@ describe('repo subscribe repos', () => {
return getEventType(frames, '#identity')
}
const getHandleEvts = (frames: Frame[]): HandleEvt[] => {
return getEventType(frames, '#handle')
}
const getTombstoneEvts = (frames: Frame[]): TombstoneEvt[] => {
return getEventType(frames, '#tombstone')
}
const getCommitEvents = (frames: Frame[]): CommitEvt[] => {
return getEventType(frames, '#commit')
}
@ -127,13 +124,6 @@ describe('repo subscribe repos', () => {
expect(evt.handle).toEqual(handle)
}
const verifyHandleEvent = (evt: HandleEvt, did: string, handle: string) => {
expect(typeof evt.seq).toBe('number')
expect(evt.did).toBe(did)
expect(evt.handle).toBe(handle)
expect(typeof evt.time).toBe('string')
}
const verifyAccountEvent = (
evt: AccountEvt,
did: string,
@ -147,10 +137,20 @@ describe('repo subscribe repos', () => {
expect(evt.status).toBe(status)
}
const verifyTombstoneEvent = (evt: unknown, did: string) => {
expect(evt?.['did']).toBe(did)
expect(typeof evt?.['time']).toBe('string')
expect(typeof evt?.['seq']).toBe('number')
const verifySyncEvent = async (
evt: SyncEvt,
did: string,
commit: CID,
rev: string,
) => {
expect(typeof evt.seq).toBe('number')
expect(evt.did).toBe(did)
expect(typeof evt.time).toBe('string')
expect(evt.rev).toBe(rev)
const car = await repo.readCarWithRoot(evt.blocks)
expect(car.root.equals(commit)).toBe(true)
expect(car.blocks.size).toBe(1)
expect(car.blocks.has(car.root)).toBe(true)
}
const verifyCommitEvents = async (frames: Frame[]) => {
@ -329,7 +329,7 @@ describe('repo subscribe repos', () => {
}
})
it('syncs handle changes', async () => {
it('syncs handle changes (identity evts)', async () => {
await sc.updateHandle(alice, 'alice2.test')
await sc.updateHandle(bob, 'bob2.test')
await sc.updateHandle(bob, 'bob2.test') // idempotent update re-sends
@ -344,20 +344,14 @@ describe('repo subscribe repos', () => {
await verifyCommitEvents(evts)
const handleEvts = getHandleEvts(evts.slice(-6))
expect(handleEvts.length).toBe(3)
verifyHandleEvent(handleEvts[0], alice, 'alice2.test')
verifyHandleEvent(handleEvts[1], bob, 'bob2.test')
verifyHandleEvent(handleEvts[2], bob, 'bob2.test')
const identityEvts = getIdentityEvts(evts.slice(-6))
const identityEvts = getIdentityEvts(evts.slice(-3))
expect(identityEvts.length).toBe(3)
verifyIdentityEvent(identityEvts[0], alice, 'alice2.test')
verifyIdentityEvent(identityEvts[1], bob, 'bob2.test')
verifyIdentityEvent(identityEvts[2], bob, 'bob2.test')
})
it('resends handle events on idempotent updates', async () => {
it('resends identity events on idempotent updates', async () => {
const update = sc.updateHandle(bob, 'bob2.test')
const ws = new WebSocket(
@ -368,8 +362,8 @@ describe('repo subscribe repos', () => {
const evts = await readTillCaughtUp(gen, update)
ws.terminate()
const handleEvts = getHandleEvts(evts.slice(-2))
verifyHandleEvent(handleEvts[0], bob, 'bob2.test')
const identityEvts = getIdentityEvts(evts.slice(-1))
verifyIdentityEvent(identityEvts[0], bob, 'bob2.test')
})
it('syncs account events', async () => {
@ -487,7 +481,35 @@ describe('repo subscribe repos', () => {
verifyAccountEvent(accountEvts[3], alice, true)
})
it('syncs tombstones', async () => {
it('emits sync event on account activation', async () => {
await agent.api.com.atproto.server.deactivateAccount(
{},
{
encoding: 'application/json',
headers: sc.getHeaders(alice),
},
)
await agent.api.com.atproto.server.activateAccount(undefined, {
headers: sc.getHeaders(alice),
})
const ws = new WebSocket(
`ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`,
)
const gen = byFrame(ws)
const evts = await readTillCaughtUp(gen)
ws.terminate()
const syncEvts = getSyncEvts(evts.slice(-1))
expect(syncEvts.length).toBe(1)
const root = await ctx.actorStore.read(alice, (store) =>
store.repo.storage.getRootDetailed(),
)
await verifySyncEvent(syncEvts[0], alice, root.cid, root.rev)
})
it('syncs account deletions (account evt)', async () => {
const baddie1 = (
await sc.createAccount('baddie1.test', {
email: 'baddie1@test.com',
@ -529,12 +551,7 @@ describe('repo subscribe repos', () => {
const evts = await readTillCaughtUp(gen)
ws.terminate()
const tombstoneEvts = getTombstoneEvts(evts.slice(-4))
expect(tombstoneEvts.length).toBe(2)
verifyTombstoneEvent(tombstoneEvts[0], baddie1)
verifyTombstoneEvent(tombstoneEvts[1], baddie2)
const accountEvts = getAccountEvts(evts.slice(-4))
const accountEvts = getAccountEvts(evts.slice(-2))
expect(accountEvts.length).toBe(2)
verifyAccountEvent(accountEvts[0], baddie1, false, AccountStatus.Deleted)
verifyAccountEvent(accountEvts[1], baddie2, false, AccountStatus.Deleted)
@ -571,7 +588,12 @@ describe('repo subscribe repos', () => {
const didEvts = getAllEvents(baddie3, evts)
expect(didEvts.length).toBe(1)
verifyTombstoneEvent(didEvts[0], baddie3)
verifyAccountEvent(
didEvts[0] as AccountEvt,
baddie3,
false,
AccountStatus.Deleted,
)
})
it('sends info frame on out of date cursor', async () => {