f689bd51a2
* refactor(crypto): remove circular dependency * refactor(crypto): expose compress/decompress as part of the DidKeyPlugin interface * fix(crypto): remove import from private file * refactor: isolate tsconfig * fix: remove unused bench file * chore(repo): remove unused deps * fix(ozone): properly list dependencies * fix(services): do lint js files * fix(services/pds): remove unused deps * chore(pds): remove bench * chore(dev-env): remove unused deps * chore(api): remove bench * remove unused babel.config.js files * fix: remove .ts extension from import * fix(pds): remove imports of src files * fix(tsconfig): properly list all projects * fix(dev-env): remove imports of src files * fix(bsky): remove direct import to crypto src * fix(api): remove imports to api internals * chore(build): prevent bundling of built output * chore(dev): add "dev" script to build in watch mode * chore(deps): move ts-node dependency where it is actually used * fix(deps): add dev-env as project dependency * fix(xrpc-server): properly type kexicon * fix(bsky): improve typings * fix(pds): fully type formatRecordEmbedInternal return value * fix(repo): remove imports from @ipld/car/api * feat(dev-env): re-export BskyIngester * fix: properly lint & type jest config & test files * fix(ci): test after build * fix(types): use NodeJS.Timeout instead of NodeJS.Timer * fix(bsky): make types exportable * fix(ozone): make types exportable * fix(xrpc-server): make types exportable * fix(xprc-server): make code compliant with "node" types * fix(xrpc-server): avoid accessing properties of unknown * chore(deps): update @types/node * feat(tsconfig): narrow down available types depending on the package's target environment * fix(pds): remove unused prop * fix(bsync): Database's migrator not always initialized * fix(dev-env): remove unreachable code * fix(xrpc-server): remove unused import * fix(xrpc-server): mark header property as abstract * fix(pds): initialize LeakyTxPlugin's txOver property * fix(bsky): initialize LeakyTxPlugin's txOver property * fix(bsky): remove unused migrator from DatabaseCoordinator * fix(bsky): Properly initialize LabelService's cache property * fix(ozone): Database's migrator not initialized * fix(ozone): initialize LeakyTxPlugin's txOver property * fix(crypto): ignore unused variable error * feat(tsconfig): use stricter rules * feat(tsconfig): enable useDefineForClassFields * feat(xrpc-server): add support for brotli incoming payload * fix(xrpc-server): properly parse & process content-encoding * fix(common:stream): always call cb in _transform * tidy/fix tests and service entrypoints * Revert "fix(xrpc-server): properly parse & process content-encoding" This reverts commit 2b1c66e153820d3e128fc839fcc1834d52a66686. * Revert "feat(xrpc-server): add support for brotli incoming payload" This reverts commit e710c21e6118214ddf215b0515e68cb87299a952. * remove special node env for tests (defaults to jest val of "test") * kill mute sync handler on disconnect * work around connect-es bug w/ request aborts * style(crypto): rename imports from uint8arrays * fix update package-lock * fix lint * force hbs files to be bundled as cjs * fix: use concurrently instead of npm-run-all npm-run-all seems not to be maintained anymore. Additionally, concurrently better forwards signals to child processes. * remove concurrently alltogether * ignore sqlite files in services/pds * fix verify * fix verify * tidy, fix verify * fix blob diversion test * build rework changeset --------- Co-authored-by: Devin Ivy <devinivy@gmail.com>
117 lines
3.2 KiB
JavaScript
117 lines
3.2 KiB
JavaScript
/* eslint-env node */
|
|
|
|
'use strict'
|
|
|
|
const dd = require('dd-trace')
|
|
|
|
dd.tracer
|
|
.init()
|
|
.use('http2', {
|
|
client: true, // calls into dataplane
|
|
server: false,
|
|
})
|
|
.use('express', {
|
|
hooks: {
|
|
request: (span, req) => {
|
|
maintainXrpcResource(span, req)
|
|
},
|
|
},
|
|
})
|
|
|
|
// modify tracer in order to track calls to dataplane as a service with proper resource names
|
|
const DATAPLANE_PREFIX = '/bsky.Service/'
|
|
const origStartSpan = dd.tracer._tracer.startSpan
|
|
dd.tracer._tracer.startSpan = function (name, options) {
|
|
if (
|
|
name !== 'http.request' ||
|
|
options?.tags?.component !== 'http2' ||
|
|
!options?.tags?.['http.url']
|
|
) {
|
|
return origStartSpan.call(this, name, options)
|
|
}
|
|
const uri = new URL(options.tags['http.url'])
|
|
if (!uri.pathname.startsWith(DATAPLANE_PREFIX)) {
|
|
return origStartSpan.call(this, name, options)
|
|
}
|
|
options.tags['service.name'] = 'dataplane-bsky'
|
|
options.tags['resource.name'] = uri.pathname.slice(DATAPLANE_PREFIX.length)
|
|
return origStartSpan.call(this, name, options)
|
|
}
|
|
|
|
// Tracer code above must come before anything else
|
|
const path = require('node:path')
|
|
const assert = require('node:assert')
|
|
const cluster = require('cluster')
|
|
const { Secp256k1Keypair } = require('@atproto/crypto')
|
|
const { ServerConfig, BskyAppView } = require('@atproto/bsky')
|
|
|
|
const main = async () => {
|
|
const env = getEnv()
|
|
const config = ServerConfig.readEnv()
|
|
assert(env.serviceSigningKey, 'must set BSKY_SERVICE_SIGNING_KEY')
|
|
const signingKey = await Secp256k1Keypair.import(env.serviceSigningKey)
|
|
const bsky = BskyAppView.create({ config, signingKey })
|
|
await bsky.start()
|
|
// Graceful shutdown (see also https://aws.amazon.com/blogs/containers/graceful-shutdowns-with-ecs/)
|
|
const shutdown = async () => {
|
|
await bsky.destroy()
|
|
}
|
|
process.on('SIGTERM', shutdown)
|
|
process.on('disconnect', shutdown) // when clustering
|
|
}
|
|
|
|
const getEnv = () => ({
|
|
serviceSigningKey: process.env.BSKY_SERVICE_SIGNING_KEY || undefined,
|
|
})
|
|
|
|
const maybeParseInt = (str) => {
|
|
if (!str) return
|
|
const int = parseInt(str, 10)
|
|
if (isNaN(int)) return
|
|
return int
|
|
}
|
|
|
|
const maintainXrpcResource = (span, req) => {
|
|
// Show actual xrpc method as resource rather than the route pattern
|
|
if (span && req.originalUrl?.startsWith('/xrpc/')) {
|
|
span.setTag(
|
|
'resource.name',
|
|
[
|
|
req.method,
|
|
path.posix.join(req.baseUrl || '', req.path || '', '/').slice(0, -1), // Ensures no trailing slash
|
|
]
|
|
.filter(Boolean)
|
|
.join(' '),
|
|
)
|
|
}
|
|
}
|
|
|
|
const workerCount = maybeParseInt(process.env.CLUSTER_WORKER_COUNT)
|
|
|
|
if (workerCount) {
|
|
if (cluster.isPrimary) {
|
|
console.log(`primary ${process.pid} is running`)
|
|
const workers = new Set()
|
|
for (let i = 0; i < workerCount; ++i) {
|
|
workers.add(cluster.fork())
|
|
}
|
|
let teardown = false
|
|
cluster.on('exit', (worker) => {
|
|
workers.delete(worker)
|
|
if (!teardown) {
|
|
workers.add(cluster.fork()) // restart on crash
|
|
}
|
|
})
|
|
process.on('SIGTERM', () => {
|
|
teardown = true
|
|
console.log('disconnecting workers')
|
|
workers.forEach((w) => w.disconnect())
|
|
})
|
|
} else {
|
|
console.log(`worker ${process.pid} is running`)
|
|
main()
|
|
}
|
|
} else {
|
|
main() // non-clustering
|
|
}
|