import sqlite3 from 'sqlite3'; import {open} from 'sqlite'; import {iterateAtpRepo} from '@atcute/car'; import {CidLinkWrapper} from "@atcute/cid"; import {DidResolver} from "@atproto/identity"; import * as fs from "fs"; try {fs.mkdirSync("blobs")} catch(e) {} try {fs.mkdirSync("repos")} catch(e) {} var didres = new DidResolver({}); var getPds = async did => (await didres.resolve(did)).service.find(s => s.id == "#atproto_pds")?.serviceEndpoint; var db = await open({ filename: "repos.sqlite", driver: sqlite3.Database }); db.getDatabaseInstance().serialize(); // :/ await db.exec(`CREATE TABLE IF NOT EXISTS records (did TEXT, collection TEXT, rkey TEXT, cid TEXT, record BLOB, deleted BOOLEAN, insertedAt INTEGER, updatedAt INTEGER);`); await db.exec(`CREATE INDEX IF NOT EXISTS records_index ON records (did, collection, rkey, cid, deleted);`); var insert_statement = await db.prepare(`INSERT INTO records VALUES (?,?,?,?,?,?,?,?);`); async function fetch(url, options) { console.debug("get", url); var res = await global.fetch(url, options); if (!res.ok) throw new Error(`HTTP ${res.status} ${res.statusText} ${await res.text()}`); return res; } async function getRepo(pds, did) { var res = await fetch(`${pds}/xrpc/com.atproto.sync.getRepo?did=${did}`); var ab = await res.arrayBuffer(); var u = new Uint8Array(ab); fs.writeFileSync(`repos/${did.replaceAll(':','-')}`, u); return u; } async function processRepo(did, repo) { console.log("process", did); var recordCounts = {}; var rdids = new Set(); var rows = await db.all(`SELECT collection, rkey, cid, deleted FROM records WHERE did = ?;`, did); console.debug("got rows"); var existing = {}; for (let {collection, rkey, cid, deleted} of rows) { existing[`${collection}~${rkey}~${cid}`] = { collection, rkey, cid, deleted: deleted || false, still_exists: null }; } console.debug("begin transaction"); await db.exec(`BEGIN TRANSACTION`); try { for (let entry of iterateAtpRepo(repo)) { recordCounts[entry.collection] ||= {now: 0, new: 0}; recordCounts[entry.collection].now++; try { (function recurseObject(obj) { for (let key in obj) { if (!obj.hasOwnProperty(key)) continue; let val = obj[key]; if (typeof val == "object" && !(val instanceof CidLinkWrapper)) recurseObject(val); else if (typeof val == "string") { let didplcs = val.match(/did:plc:[a-z2-7]{24}/g); if (didplcs) didplcs.forEach(did => rdids.add(did)); let didwebs = val.match(/did:web:[a-zA-Z0-9-.]+/g); if (didwebs) didwebs.forEach(did => rdids.add(did)); } } })(entry.record); let key = `${entry.collection}~${entry.rkey}~${entry.cid.$link}`; if (key in existing) { if (existing[key].deleted) { console.log(`undeleted: at://${did}/${entry.collection}/${entry.rkey}`); await db.run(`UPDATE records SET deleted = false, updatedAt = ? WHERE did = ? AND collection = ? AND rkey = ? AND cid = ?;`, [ Date.now(), did, entry.collection, entry.rkey, entry.cid.$link ]); } else { existing[key].still_exists = true; } continue; } recordCounts[entry.collection].new++; await insert_statement.run([did, entry.collection, entry.rkey, entry.cid.$link, entry.bytes, false, Date.now(), null]); } catch (error) { console.error("record error", error.stack) } } } catch (error) { console.error("iterate error", error.stack); var badrepo = true; } console.log(did, recordCounts); if (!badrepo) for (let key in existing) { let val = existing[key]; if (!val.still_exists && !val.deleted) { console.log(`gone: at://${did}/${val.collection}/${val.rkey} ${val.cid}`); await db.run(`UPDATE records SET deleted = true, updatedAt = ? WHERE did = ? AND collection = ? AND rkey = ? AND cid = ?;`, [ Date.now(), did, val.collection, val.rkey, val.cid ]); } } console.debug("commit"); await db.exec(`COMMIT`); console.log(did, `${rdids.size} related dids`); rdids.delete(did); return rdids; } async function saveBlob(pds, did, cid) { var fp = `blobs/${cid}`; if (fs.existsSync(fp)) return; var res = await fetch(`${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`); var ab = await res.arrayBuffer(); fs.writeFileSync(fp, new Uint8Array(ab)); } async function saveBlobs(pds, did) { console.log("save blobs", did); var allCids = []; var _cursor; do { try { let {cids, cursor} = await fetch(`${pds}/xrpc/com.atproto.sync.listBlobs?did=${did}&limit=1000${_cursor?`&cursor=${_cursor}`:''}`).then(res => res.json()); _cursor = cursor; allCids.push(...cids); } catch (error) { console.error(error); } } while (_cursor); console.log(`${allCids.length} blobs`); for (var cid of allCids) await saveBlob(pds, did, cid).catch(console.error); } var backedUpDids = []; async function backup(did) { console.log("backup", did); var pds = await getPds(did); if (!pds) { console.log(`no pds for ${did}`); return; } var repo = await getRepo(pds, did).catch(console.error); if (repo) var rdids = await processRepo(did, repo); await saveBlobs(pds, did); return rdids; } async function backupRecursive(dids, depth = 1) { console.log(`backup ${dids.size} dids depth ${depth}`); if (!depth) { for (let did of dids) await backup(did).catch(console.error); return; } var allrdids = new Set(); var index = -1; for (let did of dids) { index++; if (backedUpDids.includes(did)) continue; backedUpDids.push(did); console.log(`${index} / ${dids.size}`); try { let rdids = await backup(did); rdids?.forEach(rdid => allrdids.add(rdid)); } catch (error) { console.error(error); } } await backupRecursive(allrdids, depth - 1); } var didsToBackup = new Set(process.argv.filter(arg => arg.startsWith("did:"))); var depth; for (let arg of process.argv) { let match = arg.match(/^--depth=(\d)+$/i); if (match) { depth = Number(match[1]); } } await backupRecursive(didsToBackup, depth);