1 Commits

Author SHA1 Message Date
lamp ad1c475999 concurrent backup
download from all pdses at once. I don't know how to do concurrent sqlite transations
2025-11-30 21:54:46 -08:00
+35 -17
View File
@@ -4,13 +4,24 @@ import {iterateAtpRepo} from '@atcute/car';
import {CidLinkWrapper} from "@atcute/cid"; import {CidLinkWrapper} from "@atcute/cid";
import {DidResolver} from "@atproto/identity"; import {DidResolver} from "@atproto/identity";
import got from "got"; import got from "got";
import * as fs from "fs"; import {writeFile} from "fs/promises";
import {existsSync, mkdirSync} from 'fs';
try {fs.mkdirSync("blobs")} catch(e) {} try {mkdirSync("blobs")} catch(e) {}
try {fs.mkdirSync("repos")} catch(e) {} try {mkdirSync("repos")} catch(e) {}
var didres = new DidResolver({}); var didres = new DidResolver({});
var getPds = async did => (await didres.resolve(did)).service.find(s => s.id == "#atproto_pds")?.serviceEndpoint; async function getPds(did) {
try {
var doc = await didres.resolve(did);
} catch(error) {
console.error("resolve", did, error.message);
return null;
}
return doc?.service?.find(s => s.id == "#atproto_pds")?.serviceEndpoint;
}
var w = got.extend({ var w = got.extend({
hooks: { hooks: {
beforeRequest: [({url, method}) => console.log(`${method} ${url}`)], beforeRequest: [({url, method}) => console.log(`${method} ${url}`)],
@@ -37,7 +48,7 @@ var insert_statement = await db.prepare(`INSERT INTO records VALUES (?,?,?,?,?,?
async function getRepo(pds, did) { async function getRepo(pds, did) {
var repo = await w.get(`${pds}/xrpc/com.atproto.sync.getRepo?did=${did}`).buffer(); var repo = await w.get(`${pds}/xrpc/com.atproto.sync.getRepo?did=${did}`).buffer();
fs.writeFileSync(`repos/${did.replaceAll(':','-')}`, repo); await writeFile(`repos/${did.replaceAll(':','-')}`, repo);
return repo; return repo;
} }
@@ -55,8 +66,8 @@ async function processRepo(did, repo) {
still_exists: null still_exists: null
}; };
} }
console.debug("begin transaction"); //console.debug("begin transaction");
await db.exec(`BEGIN TRANSACTION`); //await db.exec(`BEGIN TRANSACTION`);
try { try {
for (let entry of iterateAtpRepo(repo)) { for (let entry of iterateAtpRepo(repo)) {
recordCounts[entry.collection] ||= {now: 0, new: 0}; recordCounts[entry.collection] ||= {now: 0, new: 0};
@@ -107,8 +118,8 @@ async function processRepo(did, repo) {
]); ]);
} }
} }
console.debug("commit"); //console.debug("commit");
await db.exec(`COMMIT`); //await db.exec(`COMMIT`);
console.log(did, `${rdids.size} related dids`); console.log(did, `${rdids.size} related dids`);
rdids.delete(did); rdids.delete(did);
return rdids; return rdids;
@@ -116,9 +127,9 @@ async function processRepo(did, repo) {
async function saveBlob(pds, did, cid) { async function saveBlob(pds, did, cid) {
var fp = `blobs/${cid}`; var fp = `blobs/${cid}`;
if (fs.existsSync(fp)) return; if (existsSync(fp)) return;
var blob = await w.get(`${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`).buffer(); var blob = await w.get(`${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`).buffer();
fs.writeFileSync(fp, blob); await writeFile(fp, blob);
} }
async function saveBlobs(pds, did) { async function saveBlobs(pds, did) {
@@ -144,7 +155,7 @@ async function backup(did) {
return; return;
} }
var repo = await getRepo(pds, did).catch(error => console.error(error.message)); var repo = await getRepo(pds, did).catch(error => console.error(error.message));
if (repo) var rdids = await processRepo(did, repo); if (repo) var rdids = await processRepo(did, repo).catch(error => console.error(error.message));
await saveBlobs(pds, did).catch(error => console.error(error.message)); await saveBlobs(pds, did).catch(error => console.error(error.message));
return rdids; return rdids;
} }
@@ -152,17 +163,22 @@ async function backup(did) {
async function backupRecursive(dids, depth = 1) { async function backupRecursive(dids, depth = 1) {
console.log(`backup ${dids.size} dids depth ${depth}`); console.log(`backup ${dids.size} dids depth ${depth}`);
if (!depth) {
for (let did of dids) await backup(did).catch(error => console.error(error.stack)); var pds2dids = {};
return; for (let did of dids) {
let pds = await getPds(did);
(pds2dids[pds] ||= new Set()).add(did);
} }
console.log(`backing up from ${Object.keys(pds2dids).length} PDSes at once`);
var allrdids = new Set(); var allrdids = new Set();
await Promise.all(Object.entries(pds2dids).map(async ([pds, dids]) => {
var index = -1; var index = -1;
for (let did of dids) { for (let did of dids) {
index++; index++;
if (backedUpDids.includes(did)) continue; if (backedUpDids.includes(did)) continue;
backedUpDids.push(did); backedUpDids.push(did);
console.log(`${index} / ${dids.size}`); console.log(`[${pds}] ${index} / ${dids.size}`);
try { try {
let rdids = await backup(did); let rdids = await backup(did);
rdids?.forEach(rdid => allrdids.add(rdid)); rdids?.forEach(rdid => allrdids.add(rdid));
@@ -170,7 +186,9 @@ async function backupRecursive(dids, depth = 1) {
console.error(error.stack); console.error(error.stack);
} }
} }
await backupRecursive(allrdids, depth - 1); }));
if (depth) await backupRecursive(allrdids, depth - 1);
} }