parent
5cc57d5ebc
commit
b16c41c2fa
packages/bsky/src
@ -11,12 +11,10 @@ import {
|
||||
getFeedGen,
|
||||
} from '@atproto/identity'
|
||||
import { AtpAgent, AppBskyFeedGetFeedSkeleton } from '@atproto/api'
|
||||
import { SkeletonFeedPost } from '../../../../lexicon/types/app/bsky/feed/defs'
|
||||
import { QueryParams as GetFeedParams } from '../../../../lexicon/types/app/bsky/feed/getFeed'
|
||||
import { OutputSchema as SkeletonOutput } from '../../../../lexicon/types/app/bsky/feed/getFeedSkeleton'
|
||||
import { Server } from '../../../../lexicon'
|
||||
import AppContext from '../../../../context'
|
||||
import { FeedRow } from '../../../../services/feed/types'
|
||||
import { AlgoResponse } from '../../../../feed-gen/types'
|
||||
|
||||
export default function (server: Server, ctx: AppContext) {
|
||||
@ -127,68 +125,12 @@ async function skeletonFromFeedGen(
|
||||
}
|
||||
|
||||
const { feed: skeletonFeed, ...rest } = skeleton
|
||||
const cleanedFeed = await ctx.services
|
||||
.feed(ctx.db)
|
||||
.cleanFeedSkeleton(skeletonFeed, params.limit, viewer)
|
||||
|
||||
// Hydrate feed skeleton
|
||||
const { ref } = ctx.db.db.dynamic
|
||||
const feedService = ctx.services.feed(ctx.db)
|
||||
const graphService = ctx.services.graph(ctx.db)
|
||||
const feedItemUris = skeletonFeed.map(getSkeleFeedItemUri)
|
||||
|
||||
// @TODO apply mutes and blocks
|
||||
const feedItems = feedItemUris.length
|
||||
? await feedService
|
||||
.selectFeedItemQb()
|
||||
.where('feed_item.uri', 'in', feedItemUris)
|
||||
.where((qb) =>
|
||||
// Hide posts and reposts of or by muted actors
|
||||
graphService.whereNotMuted(qb, viewer, [
|
||||
ref('post.creator'),
|
||||
ref('originatorDid'),
|
||||
]),
|
||||
)
|
||||
.whereNotExists(
|
||||
graphService.blockQb(viewer, [
|
||||
ref('post.creator'),
|
||||
ref('originatorDid'),
|
||||
]),
|
||||
)
|
||||
.execute()
|
||||
: []
|
||||
|
||||
const orderedItems = getOrderedFeedItems(skeletonFeed, feedItems, params)
|
||||
return {
|
||||
...rest,
|
||||
feedItems: orderedItems,
|
||||
feedItems: cleanedFeed,
|
||||
}
|
||||
}
|
||||
|
||||
function getSkeleFeedItemUri(item: SkeletonFeedPost) {
|
||||
if (typeof item.reason?.repost === 'string') {
|
||||
return item.reason.repost
|
||||
}
|
||||
return item.post
|
||||
}
|
||||
|
||||
function getOrderedFeedItems(
|
||||
skeletonItems: SkeletonFeedPost[],
|
||||
feedItems: FeedRow[],
|
||||
params: GetFeedParams,
|
||||
) {
|
||||
const SKIP = []
|
||||
const feedItemsByUri = feedItems.reduce((acc, item) => {
|
||||
return Object.assign(acc, { [item.uri]: item })
|
||||
}, {} as Record<string, FeedRow>)
|
||||
// enforce limit param in the case that the feedgen does not
|
||||
if (skeletonItems.length > params.limit) {
|
||||
skeletonItems = skeletonItems.slice(0, params.limit)
|
||||
}
|
||||
return skeletonItems.flatMap((item) => {
|
||||
const uri = getSkeleFeedItemUri(item)
|
||||
const feedItem = feedItemsByUri[uri]
|
||||
if (!feedItem || item.post !== feedItem.postUri) {
|
||||
// Couldn't find the record, or skeleton repost referenced the wrong post
|
||||
return SKIP
|
||||
}
|
||||
return feedItem
|
||||
})
|
||||
}
|
||||
|
@ -3,73 +3,116 @@ import { Server } from '../../../../lexicon'
|
||||
import { FeedAlgorithm, FeedKeyset, getFeedDateThreshold } from '../util/feed'
|
||||
import { paginate } from '../../../../db/pagination'
|
||||
import AppContext from '../../../../context'
|
||||
import Database from '../../../../db'
|
||||
import { SkeletonFeedPost } from '../../../../lexicon/types/app/bsky/feed/defs'
|
||||
|
||||
export default function (server: Server, ctx: AppContext) {
|
||||
server.app.bsky.feed.getTimeline({
|
||||
auth: ctx.authVerifier,
|
||||
handler: async ({ params, auth }) => {
|
||||
const { algorithm, limit, cursor } = params
|
||||
const db = ctx.db.db
|
||||
const { ref } = db.dynamic
|
||||
const viewer = auth.credentials.did
|
||||
|
||||
if (algorithm && algorithm !== FeedAlgorithm.ReverseChronological) {
|
||||
throw new InvalidRequestError(`Unsupported algorithm: ${algorithm}`)
|
||||
}
|
||||
|
||||
const skeleton = await getTimelineSkeleton(ctx.db, viewer, limit, cursor)
|
||||
|
||||
const feedService = ctx.services.feed(ctx.db)
|
||||
const graphService = ctx.services.graph(ctx.db)
|
||||
|
||||
const followingIdsSubquery = db
|
||||
.selectFrom('follow')
|
||||
.select('follow.subjectDid')
|
||||
.where('follow.creator', '=', viewer)
|
||||
|
||||
const keyset = new FeedKeyset(
|
||||
ref('feed_item.sortAt'),
|
||||
ref('feed_item.cid'),
|
||||
)
|
||||
const sortFrom = keyset.unpack(cursor)?.primary
|
||||
|
||||
let feedItemsQb = feedService
|
||||
.selectFeedItemQb()
|
||||
.where((qb) =>
|
||||
qb
|
||||
.where('originatorDid', '=', viewer)
|
||||
.orWhere('originatorDid', 'in', followingIdsSubquery),
|
||||
)
|
||||
.where((qb) =>
|
||||
// Hide posts and reposts of or by muted actors
|
||||
graphService.whereNotMuted(qb, viewer, [
|
||||
ref('post.creator'),
|
||||
ref('originatorDid'),
|
||||
]),
|
||||
)
|
||||
.whereNotExists(
|
||||
graphService.blockQb(viewer, [
|
||||
ref('post.creator'),
|
||||
ref('originatorDid'),
|
||||
]),
|
||||
)
|
||||
.where('feed_item.sortAt', '>', getFeedDateThreshold(sortFrom))
|
||||
|
||||
feedItemsQb = paginate(feedItemsQb, {
|
||||
const feedItems = await feedService.cleanFeedSkeleton(
|
||||
skeleton.feed,
|
||||
limit,
|
||||
cursor,
|
||||
keyset,
|
||||
tryIndex: true,
|
||||
})
|
||||
|
||||
const feedItems = await feedItemsQb.execute()
|
||||
viewer,
|
||||
)
|
||||
const feed = await feedService.hydrateFeed(feedItems, viewer)
|
||||
|
||||
return {
|
||||
encoding: 'application/json',
|
||||
body: {
|
||||
feed,
|
||||
cursor: keyset.packFromResult(feedItems),
|
||||
cursor: skeleton.cursor,
|
||||
},
|
||||
}
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
export const getTimelineSkeleton = async (
|
||||
db: Database,
|
||||
viewer: string,
|
||||
limit: number,
|
||||
cursor?: string,
|
||||
): Promise<{ feed: SkeletonFeedPost[]; cursor?: string }> => {
|
||||
const { ref } = db.db.dynamic
|
||||
|
||||
const keyset = new FeedKeyset(ref('feed_item.sortAt'), ref('feed_item.cid'))
|
||||
const sortFrom = keyset.unpack(cursor)?.primary
|
||||
|
||||
let followQb = db.db
|
||||
.selectFrom('feed_item')
|
||||
.innerJoin('follow', 'follow.subjectDid', 'feed_item.originatorDid')
|
||||
.where('follow.creator', '=', viewer)
|
||||
.innerJoin('post', 'post.uri', 'feed_item.postUri')
|
||||
.where('feed_item.sortAt', '>', getFeedDateThreshold(sortFrom, 1))
|
||||
.selectAll('feed_item')
|
||||
.select([
|
||||
'post.replyRoot',
|
||||
'post.replyParent',
|
||||
'post.creator as postAuthorDid',
|
||||
])
|
||||
|
||||
followQb = paginate(followQb, {
|
||||
limit,
|
||||
cursor,
|
||||
keyset,
|
||||
tryIndex: true,
|
||||
})
|
||||
|
||||
let selfQb = db.db
|
||||
.selectFrom('feed_item')
|
||||
.innerJoin('post', 'post.uri', 'feed_item.postUri')
|
||||
.where('feed_item.originatorDid', '=', viewer)
|
||||
.where('feed_item.sortAt', '>', getFeedDateThreshold(sortFrom, 1))
|
||||
.selectAll('feed_item')
|
||||
.select([
|
||||
'post.replyRoot',
|
||||
'post.replyParent',
|
||||
'post.creator as postAuthorDid',
|
||||
])
|
||||
|
||||
selfQb = paginate(selfQb, {
|
||||
limit: Math.min(limit, 10),
|
||||
cursor,
|
||||
keyset,
|
||||
tryIndex: true,
|
||||
})
|
||||
|
||||
const [followRes, selfRes] = await Promise.all([
|
||||
followQb.execute(),
|
||||
selfQb.execute(),
|
||||
])
|
||||
|
||||
const feedItems = [...followRes, ...selfRes]
|
||||
.sort((a, b) => {
|
||||
if (a.sortAt > b.sortAt) return -1
|
||||
if (a.sortAt < b.sortAt) return 1
|
||||
return a.cid > b.cid ? -1 : 1
|
||||
})
|
||||
.slice(0, limit)
|
||||
const feed = feedItems.map((item) => ({
|
||||
post: item.postUri,
|
||||
reason:
|
||||
item.uri === item.postUri
|
||||
? undefined
|
||||
: {
|
||||
$type: 'app.bsky.feed.defs#skeletonReasonRepost',
|
||||
repost: item.uri,
|
||||
},
|
||||
}))
|
||||
|
||||
return {
|
||||
cursor: keyset.packFromResult(feedItems),
|
||||
feed,
|
||||
}
|
||||
}
|
||||
|
@ -1,7 +1,6 @@
|
||||
import { Server } from '../../../../lexicon'
|
||||
import AppContext from '../../../../context'
|
||||
import { FeedKeyset, getFeedDateThreshold } from '../util/feed'
|
||||
import { paginate } from '../../../../db/pagination'
|
||||
import { getTimelineSkeleton } from '../feed/getTimeline'
|
||||
|
||||
// THIS IS A TEMPORARY UNSPECCED ROUTE
|
||||
export default function (server: Server, ctx: AppContext) {
|
||||
@ -11,83 +10,10 @@ export default function (server: Server, ctx: AppContext) {
|
||||
const { limit, cursor } = params
|
||||
const viewer = auth.credentials.did
|
||||
|
||||
const db = ctx.db.db
|
||||
const { ref } = db.dynamic
|
||||
|
||||
const keyset = new FeedKeyset(
|
||||
ref('feed_item.sortAt'),
|
||||
ref('feed_item.cid'),
|
||||
)
|
||||
const sortFrom = keyset.unpack(cursor)?.primary
|
||||
|
||||
let followQb = db
|
||||
.selectFrom('feed_item')
|
||||
.innerJoin('follow', 'follow.subjectDid', 'feed_item.originatorDid')
|
||||
.where('follow.creator', '=', viewer)
|
||||
.innerJoin('post', 'post.uri', 'feed_item.postUri')
|
||||
.where('feed_item.sortAt', '>', getFeedDateThreshold(sortFrom, 1))
|
||||
.selectAll('feed_item')
|
||||
.select([
|
||||
'post.replyRoot',
|
||||
'post.replyParent',
|
||||
'post.creator as postAuthorDid',
|
||||
])
|
||||
|
||||
followQb = paginate(followQb, {
|
||||
limit,
|
||||
cursor,
|
||||
keyset,
|
||||
tryIndex: true,
|
||||
})
|
||||
|
||||
let selfQb = ctx.db.db
|
||||
.selectFrom('feed_item')
|
||||
.innerJoin('post', 'post.uri', 'feed_item.postUri')
|
||||
.where('feed_item.originatorDid', '=', viewer)
|
||||
.where('feed_item.sortAt', '>', getFeedDateThreshold(sortFrom, 1))
|
||||
.selectAll('feed_item')
|
||||
.select([
|
||||
'post.replyRoot',
|
||||
'post.replyParent',
|
||||
'post.creator as postAuthorDid',
|
||||
])
|
||||
|
||||
selfQb = paginate(selfQb, {
|
||||
limit: Math.min(limit, 10),
|
||||
cursor,
|
||||
keyset,
|
||||
tryIndex: true,
|
||||
})
|
||||
|
||||
const [followRes, selfRes] = await Promise.all([
|
||||
followQb.execute(),
|
||||
selfQb.execute(),
|
||||
])
|
||||
|
||||
const feedItems = [...followRes, ...selfRes]
|
||||
.sort((a, b) => {
|
||||
if (a.sortAt > b.sortAt) return -1
|
||||
if (a.sortAt < b.sortAt) return 1
|
||||
return a.cid > b.cid ? -1 : 1
|
||||
})
|
||||
.slice(0, limit)
|
||||
|
||||
const feed = feedItems.map((item) => ({
|
||||
post: item.postUri,
|
||||
reason:
|
||||
item.uri === item.postUri
|
||||
? undefined
|
||||
: {
|
||||
$type: 'app.bsky.feed.defs#skeletonReasonRepost',
|
||||
repost: item.uri,
|
||||
},
|
||||
}))
|
||||
const skeleton = await getTimelineSkeleton(ctx.db, viewer, limit, cursor)
|
||||
return {
|
||||
encoding: 'application/json',
|
||||
body: {
|
||||
cursor: keyset.packFromResult(feedItems),
|
||||
feed,
|
||||
},
|
||||
body: skeleton,
|
||||
}
|
||||
},
|
||||
})
|
||||
|
@ -23,7 +23,10 @@ import {
|
||||
isViewRecord,
|
||||
} from '../../lexicon/types/app/bsky/embed/record'
|
||||
import { isMain as isEmbedRecordWithMedia } from '../../lexicon/types/app/bsky/embed/recordWithMedia'
|
||||
import { FeedViewPost } from '../../lexicon/types/app/bsky/feed/defs'
|
||||
import {
|
||||
FeedViewPost,
|
||||
SkeletonFeedPost,
|
||||
} from '../../lexicon/types/app/bsky/feed/defs'
|
||||
import {
|
||||
ActorInfoMap,
|
||||
PostInfoMap,
|
||||
@ -330,6 +333,53 @@ export class FeedService {
|
||||
}, {} as PostViews)
|
||||
}
|
||||
|
||||
async filterAndGetFeedItems(
|
||||
uris: string[],
|
||||
requester: string,
|
||||
): Promise<Record<string, FeedRow>> {
|
||||
if (uris.length < 1) return {}
|
||||
const { ref } = this.db.db.dynamic
|
||||
const feedItems = await this.selectFeedItemQb()
|
||||
.where('feed_item.uri', 'in', uris)
|
||||
.where((qb) =>
|
||||
// Hide posts and reposts of or by muted actors
|
||||
this.services.graph.whereNotMuted(qb, requester, [
|
||||
ref('post.creator'),
|
||||
ref('originatorDid'),
|
||||
]),
|
||||
)
|
||||
.whereNotExists(
|
||||
this.services.graph.blockQb(requester, [
|
||||
ref('post.creator'),
|
||||
ref('originatorDid'),
|
||||
]),
|
||||
)
|
||||
.execute()
|
||||
return feedItems.reduce((acc, item) => {
|
||||
return Object.assign(acc, { [item.uri]: item })
|
||||
}, {} as Record<string, FeedRow>)
|
||||
}
|
||||
|
||||
// @TODO enforce limit elsewhere
|
||||
async cleanFeedSkeleton(
|
||||
skeleton: SkeletonFeedPost[],
|
||||
limit: number,
|
||||
requester: string,
|
||||
): Promise<FeedRow[]> {
|
||||
skeleton = skeleton.slice(0, limit)
|
||||
const feedItemUris = skeleton.map(getSkeleFeedItemUri)
|
||||
const feedItems = await this.filterAndGetFeedItems(feedItemUris, requester)
|
||||
|
||||
const cleaned: FeedRow[] = []
|
||||
for (const skeleItem of skeleton) {
|
||||
const feedItem = feedItems[getSkeleFeedItemUri(skeleItem)]
|
||||
if (feedItem && feedItem.postUri === skeleItem.post) {
|
||||
cleaned.push(feedItem)
|
||||
}
|
||||
}
|
||||
return cleaned
|
||||
}
|
||||
|
||||
async hydrateFeed(
|
||||
items: FeedRow[],
|
||||
viewer: string | null,
|
||||
@ -641,3 +691,9 @@ function applyEmbedBlock(
|
||||
}
|
||||
return view
|
||||
}
|
||||
|
||||
function getSkeleFeedItemUri(item: SkeletonFeedPost) {
|
||||
return typeof item.reason?.repost === 'string'
|
||||
? item.reason.repost
|
||||
: item.post
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user