bskybackup/index.js
2025-05-21 13:14:11 -07:00

187 lines
6.0 KiB
JavaScript

import sqlite3 from 'sqlite3';
import {open} from 'sqlite';
import {iterateAtpRepo} from '@atcute/car';
import {CidLinkWrapper} from "@atcute/cid";
import {DidResolver} from "@atproto/identity";
import * as fs from "fs";
try {fs.mkdirSync("blobs")} catch(e) {}
try {fs.mkdirSync("repos")} catch(e) {}
var didres = new DidResolver({});
var getPds = async did => (await didres.resolve(did)).service.find(s => s.id == "#atproto_pds")?.serviceEndpoint;
var db = await open({
filename: "repos.sqlite",
driver: sqlite3.Database
});
db.getDatabaseInstance().serialize(); // :/
await db.exec(`CREATE TABLE IF NOT EXISTS records (did TEXT, collection TEXT, rkey TEXT, cid TEXT, record BLOB, deleted BOOLEAN, insertedAt INTEGER, updatedAt INTEGER);`);
await db.exec(`CREATE INDEX IF NOT EXISTS records_index ON records (did, collection, rkey, cid, deleted);`);
var insert_statement = await db.prepare(`INSERT INTO records VALUES (?,?,?,?,?,?,?,?);`);
async function fetch(url, options) {
console.debug("get", url);
var res = await global.fetch(url, options);
if (!res.ok) throw new Error(`HTTP ${res.status} ${res.statusText} ${await res.text()}`);
return res;
}
async function getRepo(pds, did) {
var res = await fetch(`${pds}/xrpc/com.atproto.sync.getRepo?did=${did}`);
var ab = await res.arrayBuffer();
var u = new Uint8Array(ab);
fs.writeFileSync(`repos/${did.replaceAll(':','-')}`, u);
return u;
}
async function processRepo(did, repo) {
console.log("process", did);
var recordCounts = {};
var rdids = new Set();
var rows = await db.all(`SELECT collection, rkey, cid, deleted FROM records WHERE did = ?;`, did);
console.debug("got rows");
var existing = {};
for (let {collection, rkey, cid, deleted} of rows) {
existing[`${collection}~${rkey}~${cid}`] = {
collection, rkey, cid,
deleted: deleted || false,
still_exists: null
};
}
console.debug("begin transaction");
await db.exec(`BEGIN TRANSACTION`);
try {
for (let entry of iterateAtpRepo(repo)) {
recordCounts[entry.collection] ||= {now: 0, new: 0};
recordCounts[entry.collection].now++;
try {
(function recurseObject(obj) {
for (let key in obj) {
if (!obj.hasOwnProperty(key)) continue;
let val = obj[key];
if (typeof val == "object" && !(val instanceof CidLinkWrapper)) recurseObject(val);
else if (typeof val == "string") {
let didplcs = val.match(/did:plc:[a-z2-7]{24}/g);
if (didplcs) didplcs.forEach(did => rdids.add(did));
let didwebs = val.match(/did:web:[a-zA-Z0-9-.]+/g);
if (didwebs) didwebs.forEach(did => rdids.add(did));
}
}
})(entry.record);
let key = `${entry.collection}~${entry.rkey}~${entry.cid.$link}`;
if (key in existing) {
if (existing[key].deleted) {
console.log(`undeleted: at://${did}/${entry.collection}/${entry.rkey}`);
await db.run(`UPDATE records SET deleted = false, updatedAt = ? WHERE did = ? AND collection = ? AND rkey = ? AND cid = ?;`, [
Date.now(), did, entry.collection, entry.rkey, entry.cid.$link
]);
} else {
existing[key].still_exists = true;
}
continue;
}
recordCounts[entry.collection].new++;
await insert_statement.run([did, entry.collection, entry.rkey, entry.cid.$link, entry.bytes, false, Date.now(), null]);
} catch (error) {
console.error("record error", error.stack)
}
}
} catch (error) {
console.error("iterate error", error.stack);
var badrepo = true;
}
console.log(did, recordCounts);
if (!badrepo) for (let key in existing) {
let val = existing[key];
if (!val.still_exists && !val.deleted) {
console.log(`gone: at://${did}/${val.collection}/${val.rkey} ${val.cid}`);
await db.run(`UPDATE records SET deleted = true, updatedAt = ? WHERE did = ? AND collection = ? AND rkey = ? AND cid = ?;`, [
Date.now(), did, val.collection, val.rkey, val.cid
]);
}
}
console.debug("commit");
await db.exec(`COMMIT`);
console.log(did, `${rdids.size} related dids`);
rdids.delete(did);
return rdids;
}
async function saveBlob(pds, did, cid) {
var fp = `blobs/${cid}`;
if (fs.existsSync(fp)) return;
var res = await fetch(`${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`);
var ab = await res.arrayBuffer();
fs.writeFileSync(fp, new Uint8Array(ab));
}
async function saveBlobs(pds, did) {
console.log("save blobs", did);
var allCids = [];
var _cursor;
do {
try {
let {cids, cursor} = await fetch(`${pds}/xrpc/com.atproto.sync.listBlobs?did=${did}&limit=1000${_cursor?`&cursor=${_cursor}`:''}`).then(res => res.json());
_cursor = cursor;
allCids.push(...cids);
} catch (error) {
console.error(error);
}
} while (_cursor);
console.log(`${allCids.length} blobs`);
for (var cid of allCids) await saveBlob(pds, did, cid).catch(console.error);
}
var backedUpDids = [];
async function backup(did) {
console.log("backup", did);
var pds = await getPds(did);
if (!pds) {
console.log(`no pds for ${did}`);
return;
}
var repo = await getRepo(pds, did).catch(console.error);
if (repo) var rdids = await processRepo(did, repo);
await saveBlobs(pds, did);
return rdids;
}
async function backupRecursive(dids, depth = 1) {
console.log(`backup ${dids.size} dids depth ${depth}`);
if (!depth) {
for (let did of dids) await backup(did).catch(console.error);
return;
}
var allrdids = new Set();
var index = -1;
for (let did of dids) {
index++;
if (backedUpDids.includes(did)) continue;
backedUpDids.push(did);
console.log(`${index} / ${dids.size}`);
try {
let rdids = await backup(did);
rdids?.forEach(rdid => allrdids.add(rdid));
} catch (error) {
console.error(error);
}
}
await backupRecursive(allrdids, depth - 1);
}
var didsToBackup = new Set(process.argv.filter(arg => arg.startsWith("did:")));
var depth;
for (let arg of process.argv) {
let match = arg.match(/^--depth=(\d)+$/i);
if (match) {
depth = Number(match[1]);
}
}
await backupRecursive(didsToBackup, depth);