import sqlite3 from 'sqlite3'; import {open} from 'sqlite'; import {iterateAtpRepo} from '@atcute/car'; import {CidLinkWrapper} from "@atcute/cid"; import {DidResolver} from "@atproto/identity"; import got from "got"; import {writeFile} from "fs/promises"; import {existsSync, mkdirSync} from 'fs'; try {mkdirSync("blobs")} catch(e) {} try {mkdirSync("repos")} catch(e) {} var didres = new DidResolver({}); async function getPds(did) { try { var doc = await didres.resolve(did); } catch(error) { console.error("resolve", did, error.message); return null; } return doc?.service?.find(s => s.id == "#atproto_pds")?.serviceEndpoint; } var w = got.extend({ hooks: { beforeRequest: [({url, method}) => console.log(`${method} ${url}`)], afterResponse: [response => { console.log(response.statusCode, (response.timings.end - response.timings.start) + "ms"); return response; }], beforeError: [error => { if (error.response) return new Error(`HTTP ${error.response.statusCode} ${error.response.body}`); else return error; }] } }); var db = await open({ filename: "repos.sqlite", driver: sqlite3.Database }); db.getDatabaseInstance().serialize(); // :/ await db.exec(`CREATE TABLE IF NOT EXISTS records (did TEXT, collection TEXT, rkey TEXT, cid TEXT, record BLOB, deleted BOOLEAN, insertedAt INTEGER, updatedAt INTEGER);`); await db.exec(`CREATE INDEX IF NOT EXISTS records_index ON records (did, collection, rkey, cid, deleted);`); var insert_statement = await db.prepare(`INSERT INTO records VALUES (?,?,?,?,?,?,?,?);`); async function getRepo(pds, did) { var repo = await w.get(`${pds}/xrpc/com.atproto.sync.getRepo?did=${did}`).buffer(); await writeFile(`repos/${did.replaceAll(':','-')}`, repo); return repo; } async function processRepo(did, repo) { console.log("process", did); var recordCounts = {}; var rdids = new Set(); var rows = await db.all(`SELECT collection, rkey, cid, deleted FROM records WHERE did = ?;`, did); console.debug("got rows"); var existing = {}; for (let {collection, rkey, cid, deleted} of rows) { existing[`${collection}~${rkey}~${cid}`] = { collection, rkey, cid, deleted: deleted || false, still_exists: null }; } //console.debug("begin transaction"); //await db.exec(`BEGIN TRANSACTION`); try { for (let entry of iterateAtpRepo(repo)) { recordCounts[entry.collection] ||= {now: 0, new: 0}; recordCounts[entry.collection].now++; try { (function recurseObject(obj) { for (let key in obj) { if (!obj.hasOwnProperty(key)) continue; let val = obj[key]; if (typeof val == "object" && !(val instanceof CidLinkWrapper)) recurseObject(val); else if (typeof val == "string") { let didplcs = val.match(/did:plc:[a-z2-7]{24}/g); if (didplcs) didplcs.forEach(did => rdids.add(did)); let didwebs = val.match(/did:web:[a-zA-Z0-9-.]+/g); if (didwebs) didwebs.forEach(did => rdids.add(did)); } } })(entry.record); let key = `${entry.collection}~${entry.rkey}~${entry.cid.$link}`; if (key in existing) { if (existing[key].deleted) { console.log(`undeleted: at://${did}/${entry.collection}/${entry.rkey}`); await db.run(`UPDATE records SET deleted = false, updatedAt = ? WHERE did = ? AND collection = ? AND rkey = ? AND cid = ?;`, [ Date.now(), did, entry.collection, entry.rkey, entry.cid.$link ]); } else { existing[key].still_exists = true; } continue; } recordCounts[entry.collection].new++; await insert_statement.run([did, entry.collection, entry.rkey, entry.cid.$link, entry.bytes, false, Date.now(), null]); } catch (error) { console.error("record error", error.stack) } } } catch (error) { console.error("iterate error", error.stack); var badrepo = true; } console.log(did, recordCounts); if (!badrepo) for (let key in existing) { let val = existing[key]; if (!val.still_exists && !val.deleted) { console.log(`gone: at://${did}/${val.collection}/${val.rkey} ${val.cid}`); await db.run(`UPDATE records SET deleted = true, updatedAt = ? WHERE did = ? AND collection = ? AND rkey = ? AND cid = ?;`, [ Date.now(), did, val.collection, val.rkey, val.cid ]); } } //console.debug("commit"); //await db.exec(`COMMIT`); console.log(did, `${rdids.size} related dids`); rdids.delete(did); return rdids; } async function saveBlob(pds, did, cid) { var fp = `blobs/${cid}`; if (existsSync(fp)) return; var blob = await w.get(`${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`).buffer(); await writeFile(fp, blob); } async function saveBlobs(pds, did) { console.log("save blobs", did); var allCids = []; var _cursor; do { let {cids, cursor} = await w.get(`${pds}/xrpc/com.atproto.sync.listBlobs?did=${did}&limit=1000${_cursor?`&cursor=${encodeURIComponent(_cursor)}`:''}`).json(); _cursor = cursor; allCids.push(...cids); } while (_cursor); console.log(`${allCids.length} blobs`); for (var cid of allCids) await saveBlob(pds, did, cid).catch(error => console.error(error.message)); } var backedUpDids = []; async function backup(did) { console.log("backup", did); var pds = await getPds(did); if (!pds) { console.log(`no pds for ${did}`); return; } var repo = await getRepo(pds, did).catch(error => console.error(error.message)); if (repo) var rdids = await processRepo(did, repo).catch(error => console.error(error.message)); await saveBlobs(pds, did).catch(error => console.error(error.message)); return rdids; } async function backupRecursive(dids, depth = 1) { console.log(`backup ${dids.size} dids depth ${depth}`); var pds2dids = {}; for (let did of dids) { let pds = await getPds(did); (pds2dids[pds] ||= new Set()).add(did); } console.log(`backing up from ${Object.keys(pds2dids).length} PDSes at once`); var allrdids = new Set(); await Promise.all(Object.entries(pds2dids).map(async ([pds, dids]) => { var index = -1; for (let did of dids) { index++; if (backedUpDids.includes(did)) continue; backedUpDids.push(did); console.log(`[${pds}] ${index} / ${dids.size}`); try { let rdids = await backup(did); rdids?.forEach(rdid => allrdids.add(rdid)); } catch (error) { console.error(error.stack); } } })); if (depth) await backupRecursive(allrdids, depth - 1); } var didsToBackup = new Set(process.argv.filter(arg => arg.startsWith("did:"))); var depth; for (let arg of process.argv) { let match = arg.match(/^--depth=(\d)+$/i); if (match) { depth = Number(match[1]); } } await backupRecursive(didsToBackup, depth);