Proxy timeline skeleton construction ()

proxy timeline skeleton construction to appview
This commit is contained in:
Daniel Holmgren 2023-07-03 11:28:39 -05:00 committed by GitHub
parent 7637fdbf10
commit e7a0d27f1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1773 additions and 27 deletions
lexicons/app/bsky/unspecced
packages
api/src/client
bsky/src
dev-env/src
pds

@ -0,0 +1,34 @@
{
"lexicon": 1,
"id": "app.bsky.unspecced.getTimelineSkeleton",
"defs": {
"main": {
"type": "query",
"description": "A skeleton of a timeline - UNSPECCED & WILL GO AWAY SOON",
"parameters": {
"type": "params",
"properties": {
"limit": {"type": "integer", "minimum": 1, "maximum": 100, "default": 50},
"cursor": {"type": "string"}
}
},
"output": {
"encoding": "application/json",
"schema": {
"type": "object",
"required": ["feed"],
"properties": {
"cursor": {"type": "string"},
"feed": {
"type": "array",
"items": {"type": "ref", "ref": "app.bsky.feed.defs#skeletonFeedPost"}
}
}
}
},
"errors": [
{"name": "UnknownFeed"}
]
}
}
}

@ -123,6 +123,7 @@ import * as AppBskyNotificationUpdateSeen from './types/app/bsky/notification/up
import * as AppBskyRichtextFacet from './types/app/bsky/richtext/facet'
import * as AppBskyUnspeccedGetPopular from './types/app/bsky/unspecced/getPopular'
import * as AppBskyUnspeccedGetPopularFeedGenerators from './types/app/bsky/unspecced/getPopularFeedGenerators'
import * as AppBskyUnspeccedGetTimelineSkeleton from './types/app/bsky/unspecced/getTimelineSkeleton'
export * as ComAtprotoAdminDefs from './types/com/atproto/admin/defs'
export * as ComAtprotoAdminDisableAccountInvites from './types/com/atproto/admin/disableAccountInvites'
@ -240,6 +241,7 @@ export * as AppBskyNotificationUpdateSeen from './types/app/bsky/notification/up
export * as AppBskyRichtextFacet from './types/app/bsky/richtext/facet'
export * as AppBskyUnspeccedGetPopular from './types/app/bsky/unspecced/getPopular'
export * as AppBskyUnspeccedGetPopularFeedGenerators from './types/app/bsky/unspecced/getPopularFeedGenerators'
export * as AppBskyUnspeccedGetTimelineSkeleton from './types/app/bsky/unspecced/getTimelineSkeleton'
export const COM_ATPROTO_ADMIN = {
DefsTakedown: 'com.atproto.admin.defs#takedown',
@ -2047,4 +2049,15 @@ export class UnspeccedNS {
throw AppBskyUnspeccedGetPopularFeedGenerators.toKnownErr(e)
})
}
getTimelineSkeleton(
params?: AppBskyUnspeccedGetTimelineSkeleton.QueryParams,
opts?: AppBskyUnspeccedGetTimelineSkeleton.CallOptions,
): Promise<AppBskyUnspeccedGetTimelineSkeleton.Response> {
return this._service.xrpc
.call('app.bsky.unspecced.getTimelineSkeleton', params, undefined, opts)
.catch((e) => {
throw AppBskyUnspeccedGetTimelineSkeleton.toKnownErr(e)
})
}
}

@ -6304,6 +6304,54 @@ export const schemaDict = {
},
},
},
AppBskyUnspeccedGetTimelineSkeleton: {
lexicon: 1,
id: 'app.bsky.unspecced.getTimelineSkeleton',
defs: {
main: {
type: 'query',
description: 'A skeleton of a timeline',
parameters: {
type: 'params',
properties: {
limit: {
type: 'integer',
minimum: 1,
maximum: 100,
default: 50,
},
cursor: {
type: 'string',
},
},
},
output: {
encoding: 'application/json',
schema: {
type: 'object',
required: ['feed'],
properties: {
cursor: {
type: 'string',
},
feed: {
type: 'array',
items: {
type: 'ref',
ref: 'lex:app.bsky.feed.defs#skeletonFeedPost',
},
},
},
},
},
errors: [
{
name: 'UnknownFeed',
},
],
},
},
},
}
export const schemas: LexiconDoc[] = Object.values(schemaDict) as LexiconDoc[]
export const lexicons: Lexicons = new Lexicons(schemas)
@ -6432,4 +6480,5 @@ export const ids = {
AppBskyUnspeccedGetPopular: 'app.bsky.unspecced.getPopular',
AppBskyUnspeccedGetPopularFeedGenerators:
'app.bsky.unspecced.getPopularFeedGenerators',
AppBskyUnspeccedGetTimelineSkeleton: 'app.bsky.unspecced.getTimelineSkeleton',
}

@ -0,0 +1,45 @@
/**
* GENERATED CODE - DO NOT MODIFY
*/
import { Headers, XRPCError } from '@atproto/xrpc'
import { ValidationResult, BlobRef } from '@atproto/lexicon'
import { isObj, hasProp } from '../../../../util'
import { lexicons } from '../../../../lexicons'
import { CID } from 'multiformats/cid'
import * as AppBskyFeedDefs from '../feed/defs'
export interface QueryParams {
limit?: number
cursor?: string
}
export type InputSchema = undefined
export interface OutputSchema {
cursor?: string
feed: AppBskyFeedDefs.SkeletonFeedPost[]
[k: string]: unknown
}
export interface CallOptions {
headers?: Headers
}
export interface Response {
success: boolean
headers: Headers
data: OutputSchema
}
export class UnknownFeedError extends XRPCError {
constructor(src: XRPCError) {
super(src.status, src.error, src.message)
}
}
export function toKnownErr(e: any) {
if (e instanceof XRPCError) {
if (e.error === 'UnknownFeed') return new UnknownFeedError(e)
}
return e
}

@ -1,6 +1,6 @@
import { Server } from '../../../lexicon'
import AppContext from '../../../context'
import { countAll } from '../../../db/util'
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import { countAll } from '../../../../db/util'
// THIS IS A TEMPORARY UNSPECCED ROUTE
export default function (server: Server, ctx: AppContext) {

@ -0,0 +1,80 @@
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import { FeedKeyset, getFeedDateThreshold } from '../util/feed'
import { paginate } from '../../../../db/pagination'
// THIS IS A TEMPORARY UNSPECCED ROUTE
export default function (server: Server, ctx: AppContext) {
server.app.bsky.unspecced.getTimelineSkeleton({
auth: ctx.authVerifier,
handler: async ({ auth, params }) => {
const { limit, cursor } = params
const viewer = auth.credentials.did
const db = ctx.db.db
const { ref } = db.dynamic
const feedService = ctx.services.feed(ctx.db)
const graphService = ctx.services.graph(ctx.db)
const followingIdsSubquery = db
.selectFrom('follow')
.select('follow.subjectDid')
.where('follow.creator', '=', viewer)
const keyset = new FeedKeyset(
ref('feed_item.sortAt'),
ref('feed_item.cid'),
)
const sortFrom = keyset.unpack(cursor)?.primary
let feedItemsQb = feedService
.selectFeedItemQb()
.where((qb) =>
qb
.where('originatorDid', '=', viewer)
.orWhere('originatorDid', 'in', followingIdsSubquery),
)
.where((qb) =>
// Hide posts and reposts of or by muted actors
graphService.whereNotMuted(qb, viewer, [
ref('post.creator'),
ref('originatorDid'),
]),
)
.whereNotExists(
graphService.blockQb(viewer, [
ref('post.creator'),
ref('originatorDid'),
]),
)
.where('feed_item.sortAt', '>', getFeedDateThreshold(sortFrom))
feedItemsQb = paginate(feedItemsQb, {
limit,
cursor,
keyset,
tryIndex: true,
})
const feedItems = await feedItemsQb.execute()
const feed = feedItems.map((item) => ({
post: item.postUri,
reason:
item.uri === item.postUri
? undefined
: {
$type: 'app.bsky.feed.defs#skeletonReasonRepost',
repost: item.uri,
},
}))
return {
encoding: 'application/json',
body: {
cursor: keyset.packFromResult(feedItems),
feed,
},
}
},
})
}

@ -30,7 +30,8 @@ import getSuggestions from './app/bsky/actor/getSuggestions'
import getUnreadCount from './app/bsky/notification/getUnreadCount'
import listNotifications from './app/bsky/notification/listNotifications'
import updateSeen from './app/bsky/notification/updateSeen'
import unspecced from './app/bsky/unspecced'
import getPopularFeedGenerators from './app/bsky/unspecced/getPopularFeedGenerators'
import getTimelineSkeleton from './app/bsky/unspecced/getTimelineSkeleton'
import createReport from './com/atproto/moderation/createReport'
import resolveModerationReports from './com/atproto/admin/resolveModerationReports'
import reverseModerationAction from './com/atproto/admin/reverseModerationAction'
@ -81,7 +82,8 @@ export default function (server: Server, ctx: AppContext) {
getUnreadCount(server, ctx)
listNotifications(server, ctx)
updateSeen(server, ctx)
unspecced(server, ctx)
getPopularFeedGenerators(server, ctx)
getTimelineSkeleton(server, ctx)
// com.atproto
createReport(server, ctx)
resolveModerationReports(server, ctx)

@ -103,6 +103,7 @@ import * as AppBskyNotificationListNotifications from './types/app/bsky/notifica
import * as AppBskyNotificationUpdateSeen from './types/app/bsky/notification/updateSeen'
import * as AppBskyUnspeccedGetPopular from './types/app/bsky/unspecced/getPopular'
import * as AppBskyUnspeccedGetPopularFeedGenerators from './types/app/bsky/unspecced/getPopularFeedGenerators'
import * as AppBskyUnspeccedGetTimelineSkeleton from './types/app/bsky/unspecced/getTimelineSkeleton'
export const COM_ATPROTO_ADMIN = {
DefsTakedown: 'com.atproto.admin.defs#takedown',
@ -1048,6 +1049,16 @@ export class UnspeccedNS {
const nsid = 'app.bsky.unspecced.getPopularFeedGenerators' // @ts-ignore
return this._server.xrpc.method(nsid, cfg)
}
getTimelineSkeleton<AV extends AuthVerifier>(
cfg: ConfigOf<
AV,
AppBskyUnspeccedGetTimelineSkeleton.Handler<ExtractAuth<AV>>
>,
) {
const nsid = 'app.bsky.unspecced.getTimelineSkeleton' // @ts-ignore
return this._server.xrpc.method(nsid, cfg)
}
}
type ConfigOf<Auth, Handler> =

@ -6304,6 +6304,54 @@ export const schemaDict = {
},
},
},
AppBskyUnspeccedGetTimelineSkeleton: {
lexicon: 1,
id: 'app.bsky.unspecced.getTimelineSkeleton',
defs: {
main: {
type: 'query',
description: 'A skeleton of a timeline',
parameters: {
type: 'params',
properties: {
limit: {
type: 'integer',
minimum: 1,
maximum: 100,
default: 50,
},
cursor: {
type: 'string',
},
},
},
output: {
encoding: 'application/json',
schema: {
type: 'object',
required: ['feed'],
properties: {
cursor: {
type: 'string',
},
feed: {
type: 'array',
items: {
type: 'ref',
ref: 'lex:app.bsky.feed.defs#skeletonFeedPost',
},
},
},
},
},
errors: [
{
name: 'UnknownFeed',
},
],
},
},
},
}
export const schemas: LexiconDoc[] = Object.values(schemaDict) as LexiconDoc[]
export const lexicons: Lexicons = new Lexicons(schemas)
@ -6432,4 +6480,5 @@ export const ids = {
AppBskyUnspeccedGetPopular: 'app.bsky.unspecced.getPopular',
AppBskyUnspeccedGetPopularFeedGenerators:
'app.bsky.unspecced.getPopularFeedGenerators',
AppBskyUnspeccedGetTimelineSkeleton: 'app.bsky.unspecced.getTimelineSkeleton',
}

@ -0,0 +1,46 @@
/**
* GENERATED CODE - DO NOT MODIFY
*/
import express from 'express'
import { ValidationResult, BlobRef } from '@atproto/lexicon'
import { lexicons } from '../../../../lexicons'
import { isObj, hasProp } from '../../../../util'
import { CID } from 'multiformats/cid'
import { HandlerAuth } from '@atproto/xrpc-server'
import * as AppBskyFeedDefs from '../feed/defs'
export interface QueryParams {
limit: number
cursor?: string
}
export type InputSchema = undefined
export interface OutputSchema {
cursor?: string
feed: AppBskyFeedDefs.SkeletonFeedPost[]
[k: string]: unknown
}
export type HandlerInput = undefined
export interface HandlerSuccess {
encoding: 'application/json'
body: OutputSchema
headers?: { [key: string]: string }
}
export interface HandlerError {
status: number
message?: string
error?: 'UnknownFeed'
}
export type HandlerOutput = HandlerError | HandlerSuccess
export type Handler<HA extends HandlerAuth = never> = (ctx: {
auth: HA
params: QueryParams
input: HandlerInput
req: express.Request
res: express.Response
}) => Promise<HandlerOutput> | HandlerOutput

@ -77,7 +77,7 @@ export class TestPds {
: pds.Database.memory()
await db.migrateToLatestOrThrow()
if (config.bskyAppViewEndpoint) {
if (config.bskyAppViewEndpoint && !cfg.enableAppView) {
// Disable communication to app view within pds
MessageDispatcher.prototype.send = async () => {}
}

@ -9,6 +9,7 @@ export type PlcConfig = {
export type PdsConfig = Partial<pds.ServerConfig> & {
plcUrl: string
migration?: string
enableAppView?: boolean
}
export type BskyConfig = Partial<bsky.ServerConfig> & {
@ -22,7 +23,7 @@ export type BskyConfig = Partial<bsky.ServerConfig> & {
export type TestServerParams = {
dbPostgresUrl: string
dbPostgresSchema: string
pds: Partial<pds.ServerConfig>
pds: Partial<PdsConfig>
plc: Partial<pds.ServerConfig>
bsky: Partial<BskyConfig>
}

@ -139,9 +139,17 @@ async function skeletonFromFeedGen(
throw err
}
return filterMutesAndBlocks(ctx, skeleton, params.limit, requester)
}
export async function filterMutesAndBlocks(
ctx: AppContext,
skeleton: SkeletonOutput,
limit: number,
requester: string,
) {
const { feed: skeletonFeed, ...rest } = skeleton
// Hydrate feed skeleton
const { ref } = ctx.db.db.dynamic
const feedService = ctx.services.appView.feed(ctx.db)
const graphService = ctx.services.appView.graph(ctx.db)
@ -168,7 +176,7 @@ async function skeletonFromFeedGen(
.execute()
: []
const orderedItems = getOrderedFeedItems(skeletonFeed, feedItems, params)
const orderedItems = getOrderedFeedItems(skeletonFeed, feedItems, limit)
return {
...rest,
feedItems: orderedItems,
@ -185,15 +193,15 @@ function getSkeleFeedItemUri(item: SkeletonFeedPost) {
function getOrderedFeedItems(
skeletonItems: SkeletonFeedPost[],
feedItems: FeedRow[],
params: GetFeedParams,
limit: number,
) {
const SKIP = []
const feedItemsByUri = feedItems.reduce((acc, item) => {
return Object.assign(acc, { [item.uri]: item })
}, {} as Record<string, FeedRow>)
// enforce limit param in the case that the feedgen does not
if (skeletonItems.length > params.limit) {
skeletonItems = skeletonItems.slice(0, params.limit)
if (skeletonItems.length > limit) {
skeletonItems = skeletonItems.slice(0, limit)
}
return skeletonItems.flatMap((item) => {
const uri = getSkeleFeedItemUri(item)

@ -4,31 +4,45 @@ import { FeedAlgorithm, FeedKeyset, getFeedDateThreshold } from '../util/feed'
import { paginate } from '../../../../../db/pagination'
import AppContext from '../../../../../context'
import { FeedRow } from '../../../../services/feed'
import { filterMutesAndBlocks } from './getFeed'
export default function (server: Server, ctx: AppContext) {
server.app.bsky.feed.getTimeline({
auth: ctx.accessVerifier,
handler: async ({ req, params, auth }) => {
const requester = auth.credentials.did
if (ctx.canProxy(req)) {
const res = await ctx.appviewAgent.api.app.bsky.feed.getTimeline(
params,
await ctx.serviceAuthHeaders(requester),
)
return {
encoding: 'application/json',
body: res.data,
}
}
const { algorithm, limit, cursor } = params
const db = ctx.db.db
const { ref } = db.dynamic
if (algorithm && algorithm !== FeedAlgorithm.ReverseChronological) {
throw new InvalidRequestError(`Unsupported algorithm: ${algorithm}`)
}
if (ctx.canProxy(req)) {
const res =
await ctx.appviewAgent.api.app.bsky.unspecced.getTimelineSkeleton(
params,
await ctx.serviceAuthHeaders(requester),
)
const filtered = await filterMutesAndBlocks(
ctx,
res.data,
limit,
requester,
)
const hydrated = await ctx.services.appView
.feed(ctx.db)
.hydrateFeed(filtered.feedItems, requester)
return {
encoding: 'application/json',
body: {
cursor: filtered.cursor,
feed: hydrated,
},
}
}
const db = ctx.db.db
const { ref } = db.dynamic
const accountService = ctx.services.account(ctx.db)
const feedService = ctx.services.appView.feed(ctx.db)
const graphService = ctx.services.appView.graph(ctx.db)

@ -103,6 +103,7 @@ import * as AppBskyNotificationListNotifications from './types/app/bsky/notifica
import * as AppBskyNotificationUpdateSeen from './types/app/bsky/notification/updateSeen'
import * as AppBskyUnspeccedGetPopular from './types/app/bsky/unspecced/getPopular'
import * as AppBskyUnspeccedGetPopularFeedGenerators from './types/app/bsky/unspecced/getPopularFeedGenerators'
import * as AppBskyUnspeccedGetTimelineSkeleton from './types/app/bsky/unspecced/getTimelineSkeleton'
export const COM_ATPROTO_ADMIN = {
DefsTakedown: 'com.atproto.admin.defs#takedown',
@ -1048,6 +1049,16 @@ export class UnspeccedNS {
const nsid = 'app.bsky.unspecced.getPopularFeedGenerators' // @ts-ignore
return this._server.xrpc.method(nsid, cfg)
}
getTimelineSkeleton<AV extends AuthVerifier>(
cfg: ConfigOf<
AV,
AppBskyUnspeccedGetTimelineSkeleton.Handler<ExtractAuth<AV>>
>,
) {
const nsid = 'app.bsky.unspecced.getTimelineSkeleton' // @ts-ignore
return this._server.xrpc.method(nsid, cfg)
}
}
type ConfigOf<Auth, Handler> =

@ -6304,6 +6304,54 @@ export const schemaDict = {
},
},
},
AppBskyUnspeccedGetTimelineSkeleton: {
lexicon: 1,
id: 'app.bsky.unspecced.getTimelineSkeleton',
defs: {
main: {
type: 'query',
description: 'A skeleton of a timeline',
parameters: {
type: 'params',
properties: {
limit: {
type: 'integer',
minimum: 1,
maximum: 100,
default: 50,
},
cursor: {
type: 'string',
},
},
},
output: {
encoding: 'application/json',
schema: {
type: 'object',
required: ['feed'],
properties: {
cursor: {
type: 'string',
},
feed: {
type: 'array',
items: {
type: 'ref',
ref: 'lex:app.bsky.feed.defs#skeletonFeedPost',
},
},
},
},
},
errors: [
{
name: 'UnknownFeed',
},
],
},
},
},
}
export const schemas: LexiconDoc[] = Object.values(schemaDict) as LexiconDoc[]
export const lexicons: Lexicons = new Lexicons(schemas)
@ -6432,4 +6480,5 @@ export const ids = {
AppBskyUnspeccedGetPopular: 'app.bsky.unspecced.getPopular',
AppBskyUnspeccedGetPopularFeedGenerators:
'app.bsky.unspecced.getPopularFeedGenerators',
AppBskyUnspeccedGetTimelineSkeleton: 'app.bsky.unspecced.getTimelineSkeleton',
}

@ -0,0 +1,46 @@
/**
* GENERATED CODE - DO NOT MODIFY
*/
import express from 'express'
import { ValidationResult, BlobRef } from '@atproto/lexicon'
import { lexicons } from '../../../../lexicons'
import { isObj, hasProp } from '../../../../util'
import { CID } from 'multiformats/cid'
import { HandlerAuth } from '@atproto/xrpc-server'
import * as AppBskyFeedDefs from '../feed/defs'
export interface QueryParams {
limit: number
cursor?: string
}
export type InputSchema = undefined
export interface OutputSchema {
cursor?: string
feed: AppBskyFeedDefs.SkeletonFeedPost[]
[k: string]: unknown
}
export type HandlerInput = undefined
export interface HandlerSuccess {
encoding: 'application/json'
body: OutputSchema
headers?: { [key: string]: string }
}
export interface HandlerError {
status: number
message?: string
error?: 'UnknownFeed'
}
export type HandlerOutput = HandlerError | HandlerSuccess
export type Handler<HA extends HandlerAuth = never> = (ctx: {
auth: HA
params: QueryParams
input: HandlerInput
req: express.Request
res: express.Response
}) => Promise<HandlerOutput> | HandlerOutput

File diff suppressed because it is too large Load Diff

@ -0,0 +1,59 @@
import AtpAgent from '@atproto/api'
import { TestNetwork } from '@atproto/dev-env'
import { SeedClient } from '../seeds/client'
import basicSeed from '../seeds/basic'
import { forSnapshot } from '../_util'
describe('proxies timeline skeleton', () => {
let network: TestNetwork
let agent: AtpAgent
let sc: SeedClient
let alice: string
beforeAll(async () => {
network = await TestNetwork.create({
dbPostgresSchema: 'proxy_timeline_skeleton',
pds: {
enableAppView: true,
},
})
agent = network.pds.getClient()
sc = new SeedClient(agent)
await basicSeed(sc)
await network.processAll()
alice = sc.dids.alice
})
afterAll(async () => {
await network.close()
})
it('timeline skeleton construction', async () => {
const res = await agent.api.app.bsky.feed.getTimeline(
{},
{
headers: { ...sc.getHeaders(alice), 'x-appview-proxy': 'true' },
},
)
expect(forSnapshot(res.data)).toMatchSnapshot()
const pt1 = await agent.api.app.bsky.feed.getTimeline(
{
limit: 2,
},
{
headers: { ...sc.getHeaders(alice), 'x-appview-proxy': 'true' },
},
)
const pt2 = await agent.api.app.bsky.feed.getTimeline(
{
cursor: pt1.data.cursor,
},
{
headers: { ...sc.getHeaders(alice), 'x-appview-proxy': 'true' },
},
)
expect([...pt1.data.feed, ...pt2.data.feed]).toEqual(res.data.feed)
})
})

@ -257,13 +257,15 @@ describe('proxies view requests', () => {
expect(forSnapshot(res.data)).toMatchSnapshot()
})
it('feed.getTimeline', async () => {
// @TODO re-enable when proxying is a full-proxy
it.skip('feed.getTimeline', async () => {
const res = await agent.api.app.bsky.feed.getTimeline(
{},
{
headers: { ...sc.getHeaders(alice), 'x-appview-proxy': 'true' },
},
)
expect(forSnapshot(res.data)).toMatchSnapshot()
const pt1 = await agent.api.app.bsky.feed.getTimeline(
{