76c91f8325
* priority notif settings in bsync * lint * priority notifications lexicon update * codegen * putNotificationPreferences -> putPreferences * bsync: reorg around notif "priority", fix build, add validation & tests * bsync: notif channel fix, tests fix * bsky: update protos for priority notifs * api prerelease * add priority notif to actor state table * dataplane impl * appview: wire-up notif priority params * appview: notif priority tests * dataplane impl * fix up tests * tidy * add changeset --------- Co-authored-by: Samuel Newman <10959775+mozzius@users.noreply.github.com> Co-authored-by: Devin Ivy <devinivy@gmail.com>
353 lines
11 KiB
TypeScript
353 lines
11 KiB
TypeScript
import { wait } from '@atproto/common'
|
|
import getPort from 'get-port'
|
|
import { Code, ConnectError } from '@connectrpc/connect'
|
|
import {
|
|
BsyncClient,
|
|
BsyncService,
|
|
Database,
|
|
authWithApiKey,
|
|
createClient,
|
|
envToCfg,
|
|
} from '../src'
|
|
import { MuteOperation, MuteOperation_Type } from '../src/proto/bsync_pb'
|
|
|
|
describe('mutes', () => {
|
|
let bsync: BsyncService
|
|
let client: BsyncClient
|
|
|
|
beforeAll(async () => {
|
|
bsync = await BsyncService.create(
|
|
envToCfg({
|
|
port: await getPort(),
|
|
dbUrl: process.env.DB_POSTGRES_URL,
|
|
dbSchema: 'bsync_mutes',
|
|
apiKeys: ['key-1'],
|
|
longPollTimeoutMs: 500,
|
|
}),
|
|
)
|
|
await bsync.ctx.db.migrateToLatestOrThrow()
|
|
await bsync.start()
|
|
client = createClient({
|
|
httpVersion: '1.1',
|
|
baseUrl: `http://localhost:${bsync.ctx.cfg.service.port}`,
|
|
interceptors: [authWithApiKey('key-1')],
|
|
})
|
|
})
|
|
|
|
afterAll(async () => {
|
|
await bsync.destroy()
|
|
})
|
|
|
|
beforeEach(async () => {
|
|
await clearMutes(bsync.ctx.db)
|
|
})
|
|
|
|
describe('addMuteOperation', () => {
|
|
it('adds mute operations to add mutes.', async () => {
|
|
await client.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: 'did:example:a',
|
|
subject: 'did:example:b',
|
|
})
|
|
await client.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: 'did:example:a',
|
|
subject: 'did:example:c',
|
|
})
|
|
// dupe has no effect
|
|
await client.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: 'did:example:a',
|
|
subject: 'did:example:c',
|
|
})
|
|
await client.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: 'did:example:b',
|
|
subject: 'did:example:c',
|
|
})
|
|
await client.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: 'did:example:c',
|
|
subject: 'at://did:example:d/app.bsky.graph.list/rkey1',
|
|
})
|
|
expect(await dumpMuteState(bsync.ctx.db)).toEqual({
|
|
'did:example:a': ['did:example:b', 'did:example:c'],
|
|
'did:example:b': ['did:example:c'],
|
|
'did:example:c': ['at://did:example:d/app.bsky.graph.list/rkey1'],
|
|
})
|
|
})
|
|
|
|
it('adds mute operations to remove mutes.', async () => {
|
|
await client.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: 'did:example:a',
|
|
subject: 'did:example:b',
|
|
})
|
|
await client.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: 'did:example:a',
|
|
subject: 'did:example:c',
|
|
})
|
|
await client.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: 'did:example:b',
|
|
subject: 'did:example:c',
|
|
})
|
|
await client.addMuteOperation({
|
|
type: MuteOperation_Type.REMOVE,
|
|
actorDid: 'did:example:a',
|
|
subject: 'did:example:c',
|
|
})
|
|
// removes nothing
|
|
await client.addMuteOperation({
|
|
type: MuteOperation_Type.REMOVE,
|
|
actorDid: 'did:example:b',
|
|
subject: 'did:example:d',
|
|
})
|
|
expect(await dumpMuteState(bsync.ctx.db)).toEqual({
|
|
'did:example:a': ['did:example:b'],
|
|
'did:example:b': ['did:example:c'],
|
|
})
|
|
})
|
|
|
|
it('adds mute operations to clear mutes.', async () => {
|
|
await client.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: 'did:example:a',
|
|
subject: 'did:example:b',
|
|
})
|
|
await client.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: 'did:example:a',
|
|
subject: 'did:example:c',
|
|
})
|
|
await client.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: 'did:example:b',
|
|
subject: 'did:example:c',
|
|
})
|
|
await client.addMuteOperation({
|
|
type: MuteOperation_Type.CLEAR,
|
|
actorDid: 'did:example:a',
|
|
})
|
|
expect(await dumpMuteState(bsync.ctx.db)).toEqual({
|
|
'did:example:b': ['did:example:c'],
|
|
})
|
|
})
|
|
|
|
it('fails on bad inputs', async () => {
|
|
await expect(
|
|
client.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: 'did:example:a',
|
|
subject: 'invalid',
|
|
}),
|
|
).rejects.toEqual(
|
|
new ConnectError(
|
|
'subject must be a did or aturi on add or remove op',
|
|
Code.InvalidArgument,
|
|
),
|
|
)
|
|
await expect(
|
|
client.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: 'did:example:a',
|
|
}),
|
|
).rejects.toEqual(
|
|
new ConnectError(
|
|
'subject must be a did or aturi on add or remove op',
|
|
Code.InvalidArgument,
|
|
),
|
|
)
|
|
await expect(
|
|
client.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: 'did:example:a',
|
|
subject: 'at://did:example:b/bad.collection/rkey1',
|
|
}),
|
|
).rejects.toEqual(
|
|
new ConnectError(
|
|
'subject must be a did or aturi on add or remove op',
|
|
Code.InvalidArgument,
|
|
),
|
|
)
|
|
await expect(
|
|
client.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: 'invalid',
|
|
subject: 'did:example:b',
|
|
}),
|
|
).rejects.toEqual(
|
|
new ConnectError('actor_did must be a valid did', Code.InvalidArgument),
|
|
)
|
|
await expect(
|
|
client.addMuteOperation({
|
|
type: MuteOperation_Type.REMOVE,
|
|
actorDid: 'did:example:a',
|
|
subject: 'invalid',
|
|
}),
|
|
).rejects.toEqual(
|
|
new ConnectError(
|
|
'subject must be a did or aturi on add or remove op',
|
|
Code.InvalidArgument,
|
|
),
|
|
)
|
|
await expect(
|
|
client.addMuteOperation({
|
|
type: MuteOperation_Type.CLEAR,
|
|
actorDid: 'did:example:a',
|
|
subject: 'did:example:b',
|
|
}),
|
|
).rejects.toEqual(
|
|
new ConnectError(
|
|
'subject must not be set on a clear op',
|
|
Code.InvalidArgument,
|
|
),
|
|
)
|
|
await expect(
|
|
client.addMuteOperation({
|
|
type: MuteOperation_Type.CLEAR,
|
|
actorDid: 'invalid',
|
|
}),
|
|
).rejects.toEqual(
|
|
new ConnectError('actor_did must be a valid did', Code.InvalidArgument),
|
|
)
|
|
await expect(
|
|
client.addMuteOperation({
|
|
type: 100 as any,
|
|
actorDid: 'did:example:a',
|
|
subject: 'did:example:b',
|
|
}),
|
|
).rejects.toEqual(
|
|
new ConnectError('bad mute operation type', Code.InvalidArgument),
|
|
)
|
|
})
|
|
|
|
it('requires auth', async () => {
|
|
// unauthed
|
|
const unauthedClient = createClient({
|
|
httpVersion: '1.1',
|
|
baseUrl: `http://localhost:${bsync.ctx.cfg.service.port}`,
|
|
})
|
|
const tryAddMuteOperation1 = unauthedClient.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: 'did:example:a',
|
|
subject: 'did:example:b',
|
|
})
|
|
await expect(tryAddMuteOperation1).rejects.toEqual(
|
|
new ConnectError('missing auth', Code.Unauthenticated),
|
|
)
|
|
// bad auth
|
|
const badauthedClient = createClient({
|
|
httpVersion: '1.1',
|
|
baseUrl: `http://localhost:${bsync.ctx.cfg.service.port}`,
|
|
interceptors: [authWithApiKey('key-bad')],
|
|
})
|
|
const tryAddMuteOperation2 = badauthedClient.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: 'did:example:a',
|
|
subject: 'did:example:b',
|
|
})
|
|
await expect(tryAddMuteOperation2).rejects.toEqual(
|
|
new ConnectError('invalid api key', Code.Unauthenticated),
|
|
)
|
|
})
|
|
})
|
|
|
|
describe('scanMuteOperations', () => {
|
|
it('requires auth', async () => {
|
|
// unauthed
|
|
const unauthedClient = createClient({
|
|
httpVersion: '1.1',
|
|
baseUrl: `http://localhost:${bsync.ctx.cfg.service.port}`,
|
|
})
|
|
const tryScanMuteOperations1 = unauthedClient.scanMuteOperations({})
|
|
await expect(tryScanMuteOperations1).rejects.toEqual(
|
|
new ConnectError('missing auth', Code.Unauthenticated),
|
|
)
|
|
// bad auth
|
|
const badauthedClient = createClient({
|
|
httpVersion: '1.1',
|
|
baseUrl: `http://localhost:${bsync.ctx.cfg.service.port}`,
|
|
interceptors: [authWithApiKey('key-bad')],
|
|
})
|
|
const tryScanMuteOperations2 = badauthedClient.scanMuteOperations({})
|
|
await expect(tryScanMuteOperations2).rejects.toEqual(
|
|
new ConnectError('invalid api key', Code.Unauthenticated),
|
|
)
|
|
})
|
|
|
|
it('pages over created mute ops.', async () => {
|
|
// add 100 mute ops
|
|
for (let i = 0; i < 10; ++i) {
|
|
for (let j = 0; j < 8; ++j) {
|
|
await client.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: `did:example:${i}`,
|
|
subject: `did:example:${j}`,
|
|
})
|
|
}
|
|
for (let j = 0; j < 2; ++j) {
|
|
await client.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: `did:example:${i}`,
|
|
subject: `at://did:example:0/app.bsky.graph.list/rkey${j}`,
|
|
})
|
|
}
|
|
}
|
|
|
|
let cursor: string | undefined
|
|
const operations: MuteOperation[] = []
|
|
do {
|
|
const res = await client.scanMuteOperations({
|
|
cursor,
|
|
limit: 30,
|
|
})
|
|
operations.push(...res.operations)
|
|
cursor = res.operations.length ? res.cursor : undefined
|
|
} while (cursor)
|
|
|
|
expect(operations.length).toEqual(100)
|
|
const operationIds = operations.map((op) => parseInt(op.id, 10))
|
|
const ascending = (a: number, b: number) => a - b
|
|
expect(operationIds).toEqual([...operationIds].sort(ascending))
|
|
})
|
|
|
|
it('supports long-poll, finding an operation.', async () => {
|
|
const scanPromise = client.scanMuteOperations({})
|
|
await wait(100) // would be complete by now if it wasn't long-polling for an item
|
|
const { operation } = await client.addMuteOperation({
|
|
type: MuteOperation_Type.ADD,
|
|
actorDid: 'did:example:a',
|
|
subject: 'did:example:b',
|
|
})
|
|
const res = await scanPromise
|
|
expect(res.operations.length).toEqual(1)
|
|
expect(res.operations[0]).toEqual(operation)
|
|
expect(res.cursor).toEqual(operation?.id)
|
|
})
|
|
|
|
it('supports long-poll, not finding an operation.', async () => {
|
|
const res = await client.scanMuteOperations({})
|
|
expect(res.cursor).toEqual('')
|
|
expect(res.operations).toEqual([])
|
|
})
|
|
})
|
|
})
|
|
|
|
const dumpMuteState = async (db: Database) => {
|
|
const items = await db.db.selectFrom('mute_item').selectAll().execute()
|
|
const result: Record<string, string[]> = {}
|
|
items.forEach((item) => {
|
|
result[item.actorDid] ??= []
|
|
result[item.actorDid].push(item.subject)
|
|
})
|
|
Object.values(result).forEach((subjects) => subjects.sort())
|
|
return result
|
|
}
|
|
|
|
const clearMutes = async (db: Database) => {
|
|
await db.db.deleteFrom('mute_item').execute()
|
|
await db.db.deleteFrom('mute_op').execute()
|
|
}
|