Add modEventDivertBlobs event to send blobs to abyss ()

* 🚧 Working through an nullable review state

*  Update snapshots on some tests

*  Update snapshots on some tests

*  Add test for reviewOptional status mutation

*  Add divertBlobs event to send blobs to abyss

* ♻️ Rename reviewOptional -> reviewNone

* ♻️ Rename modEventDivertBlobs -> modEventDivert

* 🐛 Rename event type checker

*  Use pds resolver to get blob straight from pds

*  Use FOR UPDATE to respect db transactions

* ♻️ Refactor to use event_pusher table instead of new table

*  Bring back missing lines in pnpm-lock

* 🔨 Rebuild?

* 🚨 Formatting

* ♻️ Refactor to divert blob sync

* 🧹 Cleanup

*  Use modClient seed client in blob-divert test

* update divert blob config to use basic admin auth

* fix

* build

---------

Co-authored-by: Devin Ivy <devinivy@gmail.com>
This commit is contained in:
Foysal Ahamed 2024-03-12 14:40:32 +00:00 committed by GitHub
parent 38656e71ff
commit 2802880a97
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 686 additions and 185 deletions
.github/workflows
lexicons/com/atproto/admin
packages
api/src/client
lexicons.ts
types/com/atproto/admin
bsky/src/lexicon
lexicons.ts
types/com/atproto/admin
ozone
pds/src/lexicon
lexicons.ts
types/com/atproto/admin
pnpm-lock.yaml

@ -3,6 +3,7 @@ on:
push:
branches:
- main
- divert-blobs
env:
REGISTRY: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_REGISTRY }}
USERNAME: ${{ secrets.AWS_ECR_REGISTRY_USEAST2_PACKAGES_USERNAME }}

@ -34,7 +34,8 @@
"#modEventEscalate",
"#modEventMute",
"#modEventEmail",
"#modEventResolveAppeal"
"#modEventResolveAppeal",
"#modEventDivert"
]
},
"subject": {
@ -72,7 +73,8 @@
"#modEventEscalate",
"#modEventMute",
"#modEventEmail",
"#modEventResolveAppeal"
"#modEventResolveAppeal",
"#modEventDivert"
]
},
"subject": {
@ -625,6 +627,13 @@
}
}
},
"modEventDivert": {
"type": "object",
"description": "Divert a record's blobs to a 3rd party service for further scanning/tagging",
"properties": {
"comment": { "type": "string" }
}
},
"communicationTemplateView": {
"type": "object",
"required": [

@ -92,6 +92,7 @@ export const schemaDict = {
'lex:com.atproto.admin.defs#modEventMute',
'lex:com.atproto.admin.defs#modEventEmail',
'lex:com.atproto.admin.defs#modEventResolveAppeal',
'lex:com.atproto.admin.defs#modEventDivert',
],
},
subject: {
@ -150,6 +151,7 @@ export const schemaDict = {
'lex:com.atproto.admin.defs#modEventMute',
'lex:com.atproto.admin.defs#modEventEmail',
'lex:com.atproto.admin.defs#modEventResolveAppeal',
'lex:com.atproto.admin.defs#modEventDivert',
],
},
subject: {
@ -940,6 +942,16 @@ export const schemaDict = {
},
},
},
modEventDivert: {
type: 'object',
description:
"Divert a record's blobs to a 3rd party service for further scanning/tagging",
properties: {
comment: {
type: 'string',
},
},
},
communicationTemplateView: {
type: 'object',
required: [

@ -41,6 +41,7 @@ export interface ModEventView {
| ModEventMute
| ModEventEmail
| ModEventResolveAppeal
| ModEventDivert
| { $type: string; [k: string]: unknown }
subject:
| RepoRef
@ -79,6 +80,7 @@ export interface ModEventViewDetail {
| ModEventMute
| ModEventEmail
| ModEventResolveAppeal
| ModEventDivert
| { $type: string; [k: string]: unknown }
subject:
| RepoView
@ -749,6 +751,24 @@ export function validateModEventTag(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.admin.defs#modEventTag', v)
}
/** Divert a record's blobs to a 3rd party service for further scanning/tagging */
export interface ModEventDivert {
comment?: string
[k: string]: unknown
}
export function isModEventDivert(v: unknown): v is ModEventDivert {
return (
isObj(v) &&
hasProp(v, '$type') &&
v.$type === 'com.atproto.admin.defs#modEventDivert'
)
}
export function validateModEventDivert(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.admin.defs#modEventDivert', v)
}
export interface CommunicationTemplateView {
id: string
/** Name of the template. */

@ -92,6 +92,7 @@ export const schemaDict = {
'lex:com.atproto.admin.defs#modEventMute',
'lex:com.atproto.admin.defs#modEventEmail',
'lex:com.atproto.admin.defs#modEventResolveAppeal',
'lex:com.atproto.admin.defs#modEventDivert',
],
},
subject: {
@ -150,6 +151,7 @@ export const schemaDict = {
'lex:com.atproto.admin.defs#modEventMute',
'lex:com.atproto.admin.defs#modEventEmail',
'lex:com.atproto.admin.defs#modEventResolveAppeal',
'lex:com.atproto.admin.defs#modEventDivert',
],
},
subject: {
@ -940,6 +942,16 @@ export const schemaDict = {
},
},
},
modEventDivert: {
type: 'object',
description:
"Divert a record's blobs to a 3rd party service for further scanning/tagging",
properties: {
comment: {
type: 'string',
},
},
},
communicationTemplateView: {
type: 'object',
required: [

@ -41,6 +41,7 @@ export interface ModEventView {
| ModEventMute
| ModEventEmail
| ModEventResolveAppeal
| ModEventDivert
| { $type: string; [k: string]: unknown }
subject:
| RepoRef
@ -79,6 +80,7 @@ export interface ModEventViewDetail {
| ModEventMute
| ModEventEmail
| ModEventResolveAppeal
| ModEventDivert
| { $type: string; [k: string]: unknown }
subject:
| RepoView
@ -749,6 +751,24 @@ export function validateModEventTag(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.admin.defs#modEventTag', v)
}
/** Divert a record's blobs to a 3rd party service for further scanning/tagging */
export interface ModEventDivert {
comment?: string
[k: string]: unknown
}
export function isModEventDivert(v: unknown): v is ModEventDivert {
return (
isObj(v) &&
hasProp(v, '$type') &&
v.$type === 'com.atproto.admin.defs#modEventDivert'
)
}
export function validateModEventDivert(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.admin.defs#modEventDivert', v)
}
export interface CommunicationTemplateView {
id: string
/** Name of the template. */

@ -2,160 +2,204 @@ import { AuthRequiredError, InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../lexicon'
import AppContext from '../../context'
import {
isModEventDivert,
isModEventEmail,
isModEventLabel,
isModEventReverseTakedown,
isModEventTakedown,
} from '../../lexicon/types/com/atproto/admin/defs'
import { HandlerInput } from '../../lexicon/types/com/atproto/admin/emitModerationEvent'
import { subjectFromInput } from '../../mod-service/subject'
import { ModerationLangService } from '../../mod-service/lang'
import { retryHttp } from '../../util'
import { ModeratorOutput, AdminTokenOutput } from '../../auth-verifier'
const handleModerationEvent = async ({
ctx,
input,
auth,
}: {
ctx: AppContext
input: HandlerInput
auth: ModeratorOutput | AdminTokenOutput
}) => {
const access = auth.credentials
const createdBy =
auth.credentials.type === 'moderator'
? auth.credentials.iss
: input.body.createdBy
const db = ctx.db
const moderationService = ctx.modService(db)
const { event } = input.body
const isTakedownEvent = isModEventTakedown(event)
const isReverseTakedownEvent = isModEventReverseTakedown(event)
const isLabelEvent = isModEventLabel(event)
const subject = subjectFromInput(
input.body.subject,
input.body.subjectBlobCids,
)
// apply access rules
// if less than moderator access then can only take ack and escalation actions
if (isTakedownEvent || isReverseTakedownEvent) {
if (!access.isModerator) {
throw new AuthRequiredError(
'Must be a full moderator to take this type of action',
)
}
// Non admins should not be able to take down feed generators
if (
!access.isAdmin &&
subject.recordPath?.includes('app.bsky.feed.generator/')
) {
throw new AuthRequiredError(
'Must be a full admin to take this type of action on feed generators',
)
}
}
// if less than moderator access then can not apply labels
if (!access.isModerator && isLabelEvent) {
throw new AuthRequiredError('Must be a full moderator to label content')
}
if (isLabelEvent) {
validateLabels([
...(event.createLabelVals ?? []),
...(event.negateLabelVals ?? []),
])
}
if (isTakedownEvent || isReverseTakedownEvent) {
const status = await moderationService.getStatus(subject)
if (status?.takendown && isTakedownEvent) {
throw new InvalidRequestError(`Subject is already taken down`)
}
if (!status?.takendown && isReverseTakedownEvent) {
throw new InvalidRequestError(`Subject is not taken down`)
}
if (status?.takendown && isReverseTakedownEvent && subject.isRecord()) {
// due to the way blob status is modeled, we should reverse takedown on all
// blobs for the record being restored, which aren't taken down on another record.
subject.blobCids = status.blobCids ?? []
}
}
if (isModEventEmail(event) && event.content) {
// sending email prior to logging the event to avoid a long transaction below
if (!subject.isRepo()) {
throw new InvalidRequestError('Email can only be sent to a repo subject')
}
const { content, subjectLine } = event
await retryHttp(() =>
ctx.modService(db).sendEmail({
subject: subjectLine,
content,
recipientDid: subject.did,
}),
)
}
if (isModEventDivert(event) && subject.isRecord()) {
if (!ctx.blobDiverter) {
throw new InvalidRequestError(
'BlobDiverter not configured for this service',
)
}
await ctx.blobDiverter.uploadBlobOnService(subject.info())
}
const moderationEvent = await db.transaction(async (dbTxn) => {
const moderationTxn = ctx.modService(dbTxn)
const result = await moderationTxn.logEvent({
event,
subject,
createdBy,
})
const moderationLangService = new ModerationLangService(moderationTxn)
await moderationLangService.tagSubjectWithLang({
subject,
createdBy: ctx.cfg.service.did,
subjectStatus: result.subjectStatus,
})
if (subject.isRepo()) {
if (isTakedownEvent) {
const isSuspend = !!result.event.durationInHours
await moderationTxn.takedownRepo(subject, result.event.id, isSuspend)
} else if (isReverseTakedownEvent) {
await moderationTxn.reverseTakedownRepo(subject)
}
}
if (subject.isRecord()) {
if (isTakedownEvent) {
await moderationTxn.takedownRecord(subject, result.event.id)
} else if (isReverseTakedownEvent) {
await moderationTxn.reverseTakedownRecord(subject)
}
}
if (isLabelEvent) {
await moderationTxn.formatAndCreateLabels(
result.event.subjectUri ?? result.event.subjectDid,
result.event.subjectCid,
{
create: result.event.createLabelVals?.length
? result.event.createLabelVals.split(' ')
: undefined,
negate: result.event.negateLabelVals?.length
? result.event.negateLabelVals.split(' ')
: undefined,
},
)
}
return result.event
})
return moderationService.views.formatEvent(moderationEvent)
}
export default function (server: Server, ctx: AppContext) {
server.com.atproto.admin.emitModerationEvent({
auth: ctx.authVerifier.modOrAdminToken,
handler: async ({ input, auth }) => {
const access = auth.credentials
const createdBy =
auth.credentials.type === 'moderator'
? auth.credentials.iss
: input.body.createdBy
const db = ctx.db
const moderationService = ctx.modService(db)
const { event } = input.body
const isTakedownEvent = isModEventTakedown(event)
const isReverseTakedownEvent = isModEventReverseTakedown(event)
const isLabelEvent = isModEventLabel(event)
const subject = subjectFromInput(
input.body.subject,
input.body.subjectBlobCids,
)
// apply access rules
// if less than moderator access then can only take ack and escalation actions
if (isTakedownEvent || isReverseTakedownEvent) {
if (!access.isModerator) {
throw new AuthRequiredError(
'Must be a full moderator to take this type of action',
)
}
// Non admins should not be able to take down feed generators
if (
!access.isAdmin &&
subject.recordPath?.includes('app.bsky.feed.generator/')
) {
throw new AuthRequiredError(
'Must be a full admin to take this type of action on feed generators',
)
}
}
// if less than moderator access then can not apply labels
if (!access.isModerator && isLabelEvent) {
throw new AuthRequiredError('Must be a full moderator to label content')
}
if (isLabelEvent) {
validateLabels([
...(event.createLabelVals ?? []),
...(event.negateLabelVals ?? []),
])
}
if (isTakedownEvent || isReverseTakedownEvent) {
const status = await moderationService.getStatus(subject)
if (status?.takendown && isTakedownEvent) {
throw new InvalidRequestError(`Subject is already taken down`)
}
if (!status?.takendown && isReverseTakedownEvent) {
throw new InvalidRequestError(`Subject is not taken down`)
}
if (status?.takendown && isReverseTakedownEvent && subject.isRecord()) {
// due to the way blob status is modeled, we should reverse takedown on all
// blobs for the record being restored, which aren't taken down on another record.
subject.blobCids = status.blobCids ?? []
}
}
if (isModEventEmail(event) && event.content) {
// sending email prior to logging the event to avoid a long transaction below
if (!subject.isRepo()) {
throw new InvalidRequestError(
'Email can only be sent to a repo subject',
)
}
const { content, subjectLine } = event
await retryHttp(() =>
ctx.modService(db).sendEmail({
subject: subjectLine,
content,
recipientDid: subject.did,
}),
)
}
const moderationEvent = await db.transaction(async (dbTxn) => {
const moderationTxn = ctx.modService(dbTxn)
const result = await moderationTxn.logEvent({
event,
subject,
createdBy,
})
const moderationLangService = new ModerationLangService(moderationTxn)
await moderationLangService.tagSubjectWithLang({
subject,
createdBy: ctx.cfg.service.did,
subjectStatus: result.subjectStatus,
})
if (subject.isRepo()) {
if (isTakedownEvent) {
const isSuspend = !!result.event.durationInHours
await moderationTxn.takedownRepo(
subject,
result.event.id,
isSuspend,
)
} else if (isReverseTakedownEvent) {
await moderationTxn.reverseTakedownRepo(subject)
}
}
if (subject.isRecord()) {
if (isTakedownEvent) {
await moderationTxn.takedownRecord(subject, result.event.id)
} else if (isReverseTakedownEvent) {
await moderationTxn.reverseTakedownRecord(subject)
}
}
if (isLabelEvent) {
await moderationTxn.formatAndCreateLabels(
result.event.subjectUri ?? result.event.subjectDid,
result.event.subjectCid,
{
create: result.event.createLabelVals?.length
? result.event.createLabelVals.split(' ')
: undefined,
negate: result.event.negateLabelVals?.length
? result.event.negateLabelVals.split(' ')
: undefined,
},
)
}
return result.event
const moderationEvent = await handleModerationEvent({
input,
auth,
ctx,
})
// On divert events, we need to automatically take down the blobs
if (isModEventDivert(input.body.event)) {
await handleModerationEvent({
auth,
ctx,
input: {
...input,
body: {
...input.body,
event: {
...input.body.event,
$type: 'com.atproto.admin.defs#modEventTakedown',
comment:
'[DIVERT_SIDE_EFFECT]: Automatically taking down after divert event',
},
},
},
})
}
return {
encoding: 'application/json',
body: moderationService.views.formatEvent(moderationEvent),
body: moderationEvent,
}
},
})

@ -7,7 +7,7 @@ type ReqCtx = {
req: express.Request
}
type AdminTokenOutput = {
export type AdminTokenOutput = {
credentials: {
type: 'admin_token'
isAdmin: true
@ -16,7 +16,7 @@ type AdminTokenOutput = {
}
}
type ModeratorOutput = {
export type ModeratorOutput = {
credentials: {
type: 'moderator'
aud: string

@ -50,6 +50,13 @@ export const envToCfg = (env: OzoneEnvironment): OzoneConfig => {
plcUrl: env.didPlcUrl,
}
const blobDivertServiceCfg =
env.blobDivertUrl && env.blobDivertAdminPassword
? {
url: env.blobDivertUrl,
adminPassword: env.blobDivertAdminPassword,
}
: null
const accessCfg: OzoneConfig['access'] = {
admins: env.adminDids,
moderators: env.moderatorDids,
@ -63,6 +70,7 @@ export const envToCfg = (env: OzoneEnvironment): OzoneConfig => {
pds: pdsCfg,
cdn: cdnCfg,
identity: identityCfg,
blobDivert: blobDivertServiceCfg,
access: accessCfg,
}
}
@ -74,6 +82,7 @@ export type OzoneConfig = {
pds: PdsConfig | null
cdn: CdnConfig
identity: IdentityConfig
blobDivert: BlobDivertConfig | null
access: AccessConfig
}
@ -85,6 +94,11 @@ export type ServiceConfig = {
devMode?: boolean
}
export type BlobDivertConfig = {
url: string
adminPassword: string
}
export type DatabaseConfig = {
postgresUrl: string
postgresSchema?: string

@ -25,6 +25,8 @@ export const readEnv = (): OzoneEnvironment => {
triageDids: envList('OZONE_TRIAGE_DIDS'),
adminPassword: envStr('OZONE_ADMIN_PASSWORD'),
signingKeyHex: envStr('OZONE_SIGNING_KEY_HEX'),
blobDivertUrl: envStr('OZONE_BLOB_DIVERT_URL'),
blobDivertAdminPassword: envStr('OZONE_BLOB_DIVERT_ADMIN_PASSWORD'),
}
}
@ -52,4 +54,6 @@ export type OzoneEnvironment = {
triageDids: string[]
adminPassword?: string
signingKeyHex?: string
blobDivertUrl?: string
blobDivertAdminPassword?: string
}

@ -14,6 +14,7 @@ import {
CommunicationTemplateService,
CommunicationTemplateServiceCreator,
} from './communication-service/template'
import { BlobDiverter } from './daemon/blob-diverter'
import { AuthVerifier } from './auth-verifier'
import { ImageInvalidator } from './image-invalidator'
import { getSigningKeyId } from './util'
@ -25,6 +26,7 @@ export type AppContextOptions = {
communicationTemplateService: CommunicationTemplateServiceCreator
appviewAgent: AtpAgent
pdsAgent: AtpAgent | undefined
blobDiverter?: BlobDiverter
signingKey: Keypair
signingKeyId: number
idResolver: IdResolver
@ -56,6 +58,10 @@ export class AppContext {
? new AtpAgent({ service: cfg.pds.url })
: undefined
const idResolver = new IdResolver({
plcUrl: cfg.identity.plcUrl,
})
const createAuthHeaders = (aud: string) =>
createServiceAuthHeaders({
iss: `${cfg.service.did}#atproto_labeler`,
@ -64,15 +70,16 @@ export class AppContext {
})
const backgroundQueue = new BackgroundQueue(db)
const blobDiverter = cfg.blobDivert
? new BlobDiverter(db, {
idResolver,
serviceConfig: cfg.blobDivert,
})
: undefined
const eventPusher = new EventPusher(db, createAuthHeaders, {
appview: cfg.appview.pushEvents ? cfg.appview : undefined,
pds: cfg.pds ?? undefined,
})
const idResolver = new IdResolver({
plcUrl: cfg.identity.plcUrl,
})
const modService = ModerationService.creator(
signingKey,
signingKeyId,
@ -111,6 +118,7 @@ export class AppContext {
backgroundQueue,
sequencer,
authVerifier,
blobDiverter,
...(overrides ?? {}),
},
secrets,
@ -137,6 +145,10 @@ export class AppContext {
return this.opts.modService
}
get blobDiverter(): BlobDiverter | undefined {
return this.opts.blobDiverter
}
get communicationTemplateService(): CommunicationTemplateServiceCreator {
return this.opts.communicationTemplateService
}

@ -0,0 +1,150 @@
import {
VerifyCidTransform,
forwardStreamErrors,
getPdsEndpoint,
} from '@atproto/common'
import { IdResolver } from '@atproto/identity'
import axios from 'axios'
import { Readable } from 'stream'
import { CID } from 'multiformats/cid'
import Database from '../db'
import { retryHttp } from '../util'
import { BlobDivertConfig } from '../config'
export class BlobDiverter {
serviceConfig: BlobDivertConfig
idResolver: IdResolver
constructor(
public db: Database,
services: {
idResolver: IdResolver
serviceConfig: BlobDivertConfig
},
) {
this.serviceConfig = services.serviceConfig
this.idResolver = services.idResolver
}
private async getBlob({
pds,
did,
cid,
}: {
pds: string
did: string
cid: string
}) {
const blobResponse = await axios.get(
`${pds}/xrpc/com.atproto.sync.getBlob`,
{
params: { did, cid },
decompress: true,
responseType: 'stream',
timeout: 5000, // 5sec of inactivity on the connection
},
)
const imageStream: Readable = blobResponse.data
const verifyCid = new VerifyCidTransform(CID.parse(cid))
forwardStreamErrors(imageStream, verifyCid)
return {
contentType:
blobResponse.headers['content-type'] || 'application/octet-stream',
imageStream: imageStream.pipe(verifyCid),
}
}
async sendImage({
url,
imageStream,
contentType,
}: {
url: string
imageStream: Readable
contentType: string
}) {
const result = await axios(url, {
method: 'POST',
data: imageStream,
headers: {
Authorization: basicAuth('admin', this.serviceConfig.adminPassword),
'Content-Type': contentType,
},
})
return result.status === 200
}
private async uploadBlob(
{
imageStream,
contentType,
}: { imageStream: Readable; contentType: string },
{
subjectDid,
subjectUri,
}: { subjectDid: string; subjectUri: string | null },
) {
const url = new URL(this.serviceConfig.url)
url.searchParams.set('did', subjectDid)
if (subjectUri) url.searchParams.set('uri', subjectUri)
const result = await this.sendImage({
url: url.toString(),
imageStream,
contentType,
})
return result
}
async uploadBlobOnService({
subjectDid,
subjectUri,
subjectBlobCids,
}: {
subjectDid: string
subjectUri: string
subjectBlobCids: string[]
}): Promise<boolean> {
const didDoc = await this.idResolver.did.resolve(subjectDid)
if (!didDoc) {
throw new Error('Error resolving DID')
}
const pds = getPdsEndpoint(didDoc)
if (!pds) {
throw new Error('Error resolving PDS')
}
// attempt to download and upload within the same retry block since the imageStream is not reusable
const uploadResult = await Promise.all(
subjectBlobCids.map((cid) =>
retryHttp(async () => {
const { imageStream, contentType } = await this.getBlob({
pds,
cid,
did: subjectDid,
})
return this.uploadBlob(
{ imageStream, contentType },
{ subjectDid, subjectUri },
)
}),
),
)
if (uploadResult.includes(false)) {
throw new Error(`Error uploading blob ${subjectUri}`)
}
return true
}
}
const basicAuth = (username: string, password: string) => {
return 'Basic ' + Buffer.from(`${username}:${password}`).toString('base64')
}

@ -1,5 +1,6 @@
import { Keypair, Secp256k1Keypair } from '@atproto/crypto'
import { createServiceAuthHeaders } from '@atproto/xrpc-server'
import { IdResolver } from '@atproto/identity'
import AtpAgent from '@atproto/api'
import { OzoneConfig, OzoneSecrets } from '../config'
import { Database } from '../db'
@ -7,7 +8,6 @@ import { EventPusher } from './event-pusher'
import { EventReverser } from './event-reverser'
import { ModerationService, ModerationServiceCreator } from '../mod-service'
import { BackgroundQueue } from '../background'
import { IdResolver } from '@atproto/identity'
import { getSigningKeyId } from '../util'
export type DaemonContextOptions = {
@ -34,6 +34,10 @@ export class DaemonContext {
const signingKey = await Secp256k1Keypair.import(secrets.signingKeyHex)
const signingKeyId = await getSigningKeyId(db, signingKey.did())
const idResolver = new IdResolver({
plcUrl: cfg.identity.plcUrl,
})
const appviewAgent = new AtpAgent({ service: cfg.appview.url })
const createAuthHeaders = (aud: string) =>
createServiceAuthHeaders({
@ -48,9 +52,6 @@ export class DaemonContext {
})
const backgroundQueue = new BackgroundQueue(db)
const idResolver = new IdResolver({
plcUrl: cfg.identity.plcUrl,
})
const modService = ModerationService.creator(
signingKey,

@ -6,6 +6,8 @@ import { RepoPushEventType } from '../db/schema/repo_push_event'
import { retryHttp } from '../util'
import { dbLogger } from '../logger'
import { InputSchema } from '../lexicon/types/com/atproto/admin/updateSubjectStatus'
import { BlobPushEvent } from '../db/schema/blob_push_event'
import { Insertable, Selectable } from 'kysely'
type EventSubject = InputSchema['subject']
@ -285,20 +287,53 @@ export class EventPusher {
subject,
evt.takedownRef,
)
await dbTxn.db
.updateTable('blob_push_event')
.set(
succeeded
? { confirmedAt: new Date() }
: {
lastAttempted: new Date(),
attempts: (evt.attempts ?? 0) + 1,
},
)
.where('subjectDid', '=', evt.subjectDid)
.where('subjectBlobCid', '=', evt.subjectBlobCid)
.where('eventType', '=', evt.eventType)
.execute()
await this.markBlobEventAttempt(dbTxn, evt, succeeded)
})
}
async markBlobEventAttempt(
dbTxn: Database,
event: Selectable<BlobPushEvent>,
succeeded: boolean,
) {
await dbTxn.db
.updateTable('blob_push_event')
.set(
succeeded
? { confirmedAt: new Date() }
: {
lastAttempted: new Date(),
attempts: (event.attempts ?? 0) + 1,
},
)
.where('subjectDid', '=', event.subjectDid)
.where('subjectBlobCid', '=', event.subjectBlobCid)
.where('eventType', '=', event.eventType)
.execute()
}
async logBlobPushEvent(
blobValues: Insertable<BlobPushEvent>[],
takedownRef?: string | null,
) {
return this.db.db
.insertInto('blob_push_event')
.values(blobValues)
.onConflict((oc) =>
oc.columns(['subjectDid', 'subjectBlobCid', 'eventType']).doUpdateSet({
takedownRef,
confirmedAt: null,
attempts: 0,
lastAttempted: null,
}),
)
.returning([
'id',
'subjectDid',
'subjectUri',
'subjectBlobCid',
'eventType',
])
.execute()
}
}

@ -3,6 +3,7 @@ import DaemonContext from './context'
import { AppContextOptions } from '../context'
export { EventPusher } from './event-pusher'
export { BlobDiverter } from './blob-diverter'
export { EventReverser } from './event-reverser'
export class OzoneDaemon {

@ -92,6 +92,7 @@ export const schemaDict = {
'lex:com.atproto.admin.defs#modEventMute',
'lex:com.atproto.admin.defs#modEventEmail',
'lex:com.atproto.admin.defs#modEventResolveAppeal',
'lex:com.atproto.admin.defs#modEventDivert',
],
},
subject: {
@ -150,6 +151,7 @@ export const schemaDict = {
'lex:com.atproto.admin.defs#modEventMute',
'lex:com.atproto.admin.defs#modEventEmail',
'lex:com.atproto.admin.defs#modEventResolveAppeal',
'lex:com.atproto.admin.defs#modEventDivert',
],
},
subject: {
@ -940,6 +942,16 @@ export const schemaDict = {
},
},
},
modEventDivert: {
type: 'object',
description:
"Divert a record's blobs to a 3rd party service for further scanning/tagging",
properties: {
comment: {
type: 'string',
},
},
},
communicationTemplateView: {
type: 'object',
required: [

@ -41,6 +41,7 @@ export interface ModEventView {
| ModEventMute
| ModEventEmail
| ModEventResolveAppeal
| ModEventDivert
| { $type: string; [k: string]: unknown }
subject:
| RepoRef
@ -79,6 +80,7 @@ export interface ModEventViewDetail {
| ModEventMute
| ModEventEmail
| ModEventResolveAppeal
| ModEventDivert
| { $type: string; [k: string]: unknown }
subject:
| RepoView
@ -749,6 +751,24 @@ export function validateModEventTag(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.admin.defs#modEventTag', v)
}
/** Divert a record's blobs to a 3rd party service for further scanning/tagging */
export interface ModEventDivert {
comment?: string
[k: string]: unknown
}
export function isModEventDivert(v: unknown): v is ModEventDivert {
return (
isObj(v) &&
hasProp(v, '$type') &&
v.$type === 'com.atproto.admin.defs#modEventDivert'
)
}
export function validateModEventDivert(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.admin.defs#modEventDivert', v)
}
export interface CommunicationTemplateView {
id: string
/** Name of the template. */

@ -566,27 +566,17 @@ export class ModerationService {
for (const cid of blobCids) {
blobValues.push({
eventType,
subjectDid: subject.did,
subjectBlobCid: cid.toString(),
takedownRef,
subjectDid: subject.did,
subjectUri: subject.uri || null,
subjectBlobCid: cid.toString(),
})
}
}
const blobEvts = await this.db.db
.insertInto('blob_push_event')
.values(blobValues)
.onConflict((oc) =>
oc
.columns(['subjectDid', 'subjectBlobCid', 'eventType'])
.doUpdateSet({
takedownRef,
confirmedAt: null,
attempts: 0,
lastAttempted: null,
}),
)
.returning(['id', 'subjectDid', 'subjectBlobCid', 'eventType'])
.execute()
const blobEvts = await this.eventPusher.logBlobPushEvent(
blobValues,
takedownRef,
)
this.db.onCommit(() => {
this.backgroundQueue.add(async () => {

@ -0,0 +1,22 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP
exports[`blob divert sends blobs to configured divert service and marks divert date 1`] = `
Object {
"createdAt": "1970-01-01T00:00:00.000Z",
"createdBy": "user(0)",
"event": Object {
"$type": "com.atproto.admin.defs#modEventDivert",
"comment": "Diverting for test",
},
"id": 1,
"subject": Object {
"$type": "com.atproto.repo.strongRef",
"cid": "cids(0)",
"uri": "record(0)",
},
"subjectBlobCids": Array [
"cids(1)",
"cids(2)",
],
}
`;

@ -0,0 +1,90 @@
import {
ModeratorClient,
SeedClient,
TestNetwork,
basicSeed,
} from '@atproto/dev-env'
import AtpAgent from '@atproto/api'
import { BlobDiverter } from '../src/daemon'
import { forSnapshot } from './_util'
describe('blob divert', () => {
let network: TestNetwork
let agent: AtpAgent
let sc: SeedClient
let modClient: ModeratorClient
beforeAll(async () => {
network = await TestNetwork.create({
dbPostgresSchema: 'ozone_blob_divert_test',
ozone: {
blobDivertUrl: `https://blob-report.com`,
blobDivertAdminPassword: 'test-auth-token',
},
})
agent = network.pds.getClient()
sc = network.getSeedClient()
modClient = network.ozone.getModClient()
await basicSeed(sc)
await network.processAll()
})
afterAll(async () => {
await network.close()
})
const mockReportServiceResponse = (result: boolean) => {
return jest
.spyOn(BlobDiverter.prototype, 'sendImage')
.mockImplementation(async () => {
return result
})
}
const getSubject = () => ({
$type: 'com.atproto.repo.strongRef',
uri: sc.posts[sc.dids.carol][0].ref.uriStr,
cid: sc.posts[sc.dids.carol][0].ref.cidStr,
})
const emitDivertEvent = async () =>
modClient.emitModerationEvent(
{
subject: getSubject(),
event: {
$type: 'com.atproto.admin.defs#modEventDivert',
comment: 'Diverting for test',
},
createdBy: sc.dids.alice,
subjectBlobCids: sc.posts[sc.dids.carol][0].images.map((img) =>
img.image.ref.toString(),
),
},
'moderator',
)
it('fails and keeps attempt count when report service fails to accept upload.', async () => {
// Simulate failure to fail upload
const reportServiceRequest = mockReportServiceResponse(false)
await expect(emitDivertEvent()).rejects.toThrow()
expect(reportServiceRequest).toHaveBeenCalled()
})
it('sends blobs to configured divert service and marks divert date', async () => {
// Simulate failure to accept upload
const reportServiceRequest = mockReportServiceResponse(true)
const divertEvent = await emitDivertEvent()
expect(reportServiceRequest).toHaveBeenCalled()
expect(forSnapshot(divertEvent)).toMatchSnapshot()
const { subjectStatuses } = await modClient.queryModerationStatuses({
subject: getSubject().uri,
})
expect(subjectStatuses[0].takendown).toBe(true)
})
})

@ -92,6 +92,7 @@ export const schemaDict = {
'lex:com.atproto.admin.defs#modEventMute',
'lex:com.atproto.admin.defs#modEventEmail',
'lex:com.atproto.admin.defs#modEventResolveAppeal',
'lex:com.atproto.admin.defs#modEventDivert',
],
},
subject: {
@ -150,6 +151,7 @@ export const schemaDict = {
'lex:com.atproto.admin.defs#modEventMute',
'lex:com.atproto.admin.defs#modEventEmail',
'lex:com.atproto.admin.defs#modEventResolveAppeal',
'lex:com.atproto.admin.defs#modEventDivert',
],
},
subject: {
@ -940,6 +942,16 @@ export const schemaDict = {
},
},
},
modEventDivert: {
type: 'object',
description:
"Divert a record's blobs to a 3rd party service for further scanning/tagging",
properties: {
comment: {
type: 'string',
},
},
},
communicationTemplateView: {
type: 'object',
required: [

@ -41,6 +41,7 @@ export interface ModEventView {
| ModEventMute
| ModEventEmail
| ModEventResolveAppeal
| ModEventDivert
| { $type: string; [k: string]: unknown }
subject:
| RepoRef
@ -79,6 +80,7 @@ export interface ModEventViewDetail {
| ModEventMute
| ModEventEmail
| ModEventResolveAppeal
| ModEventDivert
| { $type: string; [k: string]: unknown }
subject:
| RepoView
@ -749,6 +751,24 @@ export function validateModEventTag(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.admin.defs#modEventTag', v)
}
/** Divert a record's blobs to a 3rd party service for further scanning/tagging */
export interface ModEventDivert {
comment?: string
[k: string]: unknown
}
export function isModEventDivert(v: unknown): v is ModEventDivert {
return (
isObj(v) &&
hasProp(v, '$type') &&
v.$type === 'com.atproto.admin.defs#modEventDivert'
)
}
export function validateModEventDivert(v: unknown): ValidationResult {
return lexicons.validate('com.atproto.admin.defs#modEventDivert', v)
}
export interface CommunicationTemplateView {
id: string
/** Name of the template. */

8
pnpm-lock.yaml generated

@ -1,9 +1,5 @@
lockfileVersion: '6.0'
settings:
autoInstallPeers: true
excludeLinksFromLockfile: false
importers:
.:
@ -12055,3 +12051,7 @@ packages:
/zod@3.21.4:
resolution: {integrity: sha512-m46AKbrzKVzOzs/DZgVnG5H55N1sv1M8qZU3A8RIKbs3mrACDNeIOeilDymVb2HdmP8uwshOCF4uJ8uM9rCqJw==}
settings:
autoInstallPeers: true
excludeLinksFromLockfile: false