Ozone: configure to fetch but not pull from appview (#2284)
ozone: configure to fetch but not pull from appview
This commit is contained in:
parent
71f9cc9d4f
commit
f192b97fdd
packages
dev-env/src
ozone/src
@ -75,6 +75,7 @@ export class TestNetwork extends TestNetworkNoAppView {
|
||||
dbPostgresUrl,
|
||||
appviewUrl: bsky.url,
|
||||
appviewDid: bsky.ctx.cfg.serverDid,
|
||||
appviewPushEvents: true,
|
||||
pdsUrl: pds.url,
|
||||
pdsDid: pds.ctx.cfg.service.did,
|
||||
...params.ozone,
|
||||
|
@ -25,18 +25,20 @@ export const envToCfg = (env: OzoneEnvironment): OzoneConfig => {
|
||||
poolIdleTimeoutMs: env.dbPoolIdleTimeoutMs,
|
||||
}
|
||||
|
||||
assert(env.appviewUrl)
|
||||
assert(env.appviewDid)
|
||||
assert(env.appviewUrl && env.appviewDid)
|
||||
const appviewCfg: OzoneConfig['appview'] = {
|
||||
url: env.appviewUrl,
|
||||
did: env.appviewDid,
|
||||
pushEvents: !!env.appviewPushEvents,
|
||||
}
|
||||
|
||||
assert(env.pdsUrl)
|
||||
assert(env.pdsDid)
|
||||
const pdsCfg: OzoneConfig['pds'] = {
|
||||
url: env.pdsUrl,
|
||||
did: env.pdsDid,
|
||||
let pdsCfg: OzoneConfig['pds'] = null
|
||||
if (env.pdsUrl || env.pdsDid) {
|
||||
assert(env.pdsUrl && env.pdsDid)
|
||||
pdsCfg = {
|
||||
url: env.pdsUrl,
|
||||
did: env.pdsDid,
|
||||
}
|
||||
}
|
||||
|
||||
const cdnCfg: OzoneConfig['cdn'] = {
|
||||
@ -94,6 +96,7 @@ export type DatabaseConfig = {
|
||||
export type AppviewConfig = {
|
||||
url: string
|
||||
did: string
|
||||
pushEvents: boolean
|
||||
}
|
||||
|
||||
export type PdsConfig = {
|
||||
|
@ -10,6 +10,7 @@ export const readEnv = (): OzoneEnvironment => {
|
||||
serverDid: envStr('OZONE_SERVER_DID'),
|
||||
appviewUrl: envStr('OZONE_APPVIEW_URL'),
|
||||
appviewDid: envStr('OZONE_APPVIEW_DID'),
|
||||
appviewPushEvents: envBool('OZONE_APPVIEW_PUSH_EVENTS'),
|
||||
pdsUrl: envStr('OZONE_PDS_URL'),
|
||||
pdsDid: envStr('OZONE_PDS_DID'),
|
||||
dbPostgresUrl: envStr('OZONE_DB_POSTGRES_URL'),
|
||||
@ -36,6 +37,7 @@ export type OzoneEnvironment = {
|
||||
serverDid?: string
|
||||
appviewUrl?: string
|
||||
appviewDid?: string
|
||||
appviewPushEvents?: boolean
|
||||
pdsUrl?: string
|
||||
pdsDid?: string
|
||||
dbPostgresUrl?: string
|
||||
|
@ -62,7 +62,7 @@ export class AppContext {
|
||||
|
||||
const backgroundQueue = new BackgroundQueue(db)
|
||||
const eventPusher = new EventPusher(db, createAuthHeaders, {
|
||||
appview: cfg.appview,
|
||||
appview: cfg.appview.pushEvents ? cfg.appview : undefined,
|
||||
pds: cfg.pds ?? undefined,
|
||||
})
|
||||
|
||||
|
@ -41,7 +41,7 @@ export class DaemonContext {
|
||||
})
|
||||
|
||||
const eventPusher = new EventPusher(db, createAuthHeaders, {
|
||||
appview: cfg.appview,
|
||||
appview: cfg.appview.pushEvents ? cfg.appview : undefined,
|
||||
pds: cfg.pds ?? undefined,
|
||||
})
|
||||
|
||||
|
@ -1,10 +1,11 @@
|
||||
import assert from 'node:assert'
|
||||
import AtpAgent from '@atproto/api'
|
||||
import { SECOND } from '@atproto/common'
|
||||
import Database from '../db'
|
||||
import { RepoPushEventType } from '../db/schema/repo_push_event'
|
||||
import { retryHttp } from '../util'
|
||||
import { dbLogger } from '../logger'
|
||||
import { InputSchema } from '../lexicon/types/com/atproto/admin/updateSubjectStatus'
|
||||
import assert from 'assert'
|
||||
|
||||
type EventSubject = InputSchema['subject']
|
||||
|
||||
@ -74,6 +75,13 @@ export class EventPusher {
|
||||
this.poll(this.blobPollState, () => this.pushBlobEvents())
|
||||
}
|
||||
|
||||
get takedowns(): RepoPushEventType[] {
|
||||
const takedowns: RepoPushEventType[] = []
|
||||
if (this.pds) takedowns.push('pds_takedown')
|
||||
if (this.appview) takedowns.push('appview_takedown')
|
||||
return takedowns
|
||||
}
|
||||
|
||||
poll(state: PollState, fn: () => Promise<void>) {
|
||||
if (this.destroyed) return
|
||||
state.promise = fn()
|
||||
|
@ -455,7 +455,8 @@ export class ModerationService {
|
||||
const takedownRef = `BSKY-${
|
||||
isSuspend ? 'SUSPEND' : 'TAKEDOWN'
|
||||
}-${takedownId}`
|
||||
const values = TAKEDOWNS.map((eventType) => ({
|
||||
|
||||
const values = this.eventPusher.takedowns.map((eventType) => ({
|
||||
eventType,
|
||||
subjectDid: subject.did,
|
||||
takedownRef,
|
||||
@ -516,7 +517,7 @@ export class ModerationService {
|
||||
async takedownRecord(subject: RecordSubject, takedownId: number) {
|
||||
this.db.assertTransaction()
|
||||
const takedownRef = `BSKY-TAKEDOWN-${takedownId}`
|
||||
const values = TAKEDOWNS.map((eventType) => ({
|
||||
const values = this.eventPusher.takedowns.map((eventType) => ({
|
||||
eventType,
|
||||
subjectDid: subject.did,
|
||||
subjectUri: subject.uri,
|
||||
@ -555,7 +556,7 @@ export class ModerationService {
|
||||
|
||||
if (blobCids && blobCids.length > 0) {
|
||||
const blobValues: Insertable<BlobPushEvent>[] = []
|
||||
for (const eventType of TAKEDOWNS) {
|
||||
for (const eventType of this.eventPusher.takedowns) {
|
||||
for (const cid of blobCids) {
|
||||
blobValues.push({
|
||||
eventType,
|
||||
|
Loading…
x
Reference in New Issue
Block a user