ad1c475999
download from all pdses at once. I don't know how to do concurrent sqlite transations
205 lines
6.7 KiB
JavaScript
205 lines
6.7 KiB
JavaScript
import sqlite3 from 'sqlite3';
|
|
import {open} from 'sqlite';
|
|
import {iterateAtpRepo} from '@atcute/car';
|
|
import {CidLinkWrapper} from "@atcute/cid";
|
|
import {DidResolver} from "@atproto/identity";
|
|
import got from "got";
|
|
import {writeFile} from "fs/promises";
|
|
import {existsSync, mkdirSync} from 'fs';
|
|
|
|
try {mkdirSync("blobs")} catch(e) {}
|
|
try {mkdirSync("repos")} catch(e) {}
|
|
|
|
var didres = new DidResolver({});
|
|
async function getPds(did) {
|
|
try {
|
|
var doc = await didres.resolve(did);
|
|
} catch(error) {
|
|
console.error("resolve", did, error.message);
|
|
return null;
|
|
}
|
|
return doc?.service?.find(s => s.id == "#atproto_pds")?.serviceEndpoint;
|
|
}
|
|
|
|
|
|
var w = got.extend({
|
|
hooks: {
|
|
beforeRequest: [({url, method}) => console.log(`${method} ${url}`)],
|
|
afterResponse: [response => {
|
|
console.log(response.statusCode, (response.timings.end - response.timings.start) + "ms");
|
|
return response;
|
|
}],
|
|
beforeError: [error => {
|
|
if (error.response) return new Error(`HTTP ${error.response.statusCode} ${error.response.body}`);
|
|
else return error;
|
|
}]
|
|
}
|
|
});
|
|
|
|
|
|
var db = await open({
|
|
filename: "repos.sqlite",
|
|
driver: sqlite3.Database
|
|
});
|
|
db.getDatabaseInstance().serialize(); // :/
|
|
await db.exec(`CREATE TABLE IF NOT EXISTS records (did TEXT, collection TEXT, rkey TEXT, cid TEXT, record BLOB, deleted BOOLEAN, insertedAt INTEGER, updatedAt INTEGER);`);
|
|
await db.exec(`CREATE INDEX IF NOT EXISTS records_index ON records (did, collection, rkey, cid, deleted);`);
|
|
var insert_statement = await db.prepare(`INSERT INTO records VALUES (?,?,?,?,?,?,?,?);`);
|
|
|
|
async function getRepo(pds, did) {
|
|
var repo = await w.get(`${pds}/xrpc/com.atproto.sync.getRepo?did=${did}`).buffer();
|
|
await writeFile(`repos/${did.replaceAll(':','-')}`, repo);
|
|
return repo;
|
|
}
|
|
|
|
async function processRepo(did, repo) {
|
|
console.log("process", did);
|
|
var recordCounts = {};
|
|
var rdids = new Set();
|
|
var rows = await db.all(`SELECT collection, rkey, cid, deleted FROM records WHERE did = ?;`, did);
|
|
console.debug("got rows");
|
|
var existing = {};
|
|
for (let {collection, rkey, cid, deleted} of rows) {
|
|
existing[`${collection}~${rkey}~${cid}`] = {
|
|
collection, rkey, cid,
|
|
deleted: deleted || false,
|
|
still_exists: null
|
|
};
|
|
}
|
|
//console.debug("begin transaction");
|
|
//await db.exec(`BEGIN TRANSACTION`);
|
|
try {
|
|
for (let entry of iterateAtpRepo(repo)) {
|
|
recordCounts[entry.collection] ||= {now: 0, new: 0};
|
|
recordCounts[entry.collection].now++;
|
|
try {
|
|
(function recurseObject(obj) {
|
|
for (let key in obj) {
|
|
if (!obj.hasOwnProperty(key)) continue;
|
|
let val = obj[key];
|
|
if (typeof val == "object" && !(val instanceof CidLinkWrapper)) recurseObject(val);
|
|
else if (typeof val == "string") {
|
|
let didplcs = val.match(/did:plc:[a-z2-7]{24}/g);
|
|
if (didplcs) didplcs.forEach(did => rdids.add(did));
|
|
let didwebs = val.match(/did:web:[a-zA-Z0-9-.]+/g);
|
|
if (didwebs) didwebs.forEach(did => rdids.add(did));
|
|
}
|
|
}
|
|
})(entry.record);
|
|
let key = `${entry.collection}~${entry.rkey}~${entry.cid.$link}`;
|
|
if (key in existing) {
|
|
if (existing[key].deleted) {
|
|
console.log(`undeleted: at://${did}/${entry.collection}/${entry.rkey}`);
|
|
await db.run(`UPDATE records SET deleted = false, updatedAt = ? WHERE did = ? AND collection = ? AND rkey = ? AND cid = ?;`, [
|
|
Date.now(), did, entry.collection, entry.rkey, entry.cid.$link
|
|
]);
|
|
} else {
|
|
existing[key].still_exists = true;
|
|
}
|
|
continue;
|
|
}
|
|
recordCounts[entry.collection].new++;
|
|
await insert_statement.run([did, entry.collection, entry.rkey, entry.cid.$link, entry.bytes, false, Date.now(), null]);
|
|
} catch (error) {
|
|
console.error("record error", error.stack)
|
|
}
|
|
}
|
|
} catch (error) {
|
|
console.error("iterate error", error.stack);
|
|
var badrepo = true;
|
|
}
|
|
console.log(did, recordCounts);
|
|
if (!badrepo) for (let key in existing) {
|
|
let val = existing[key];
|
|
if (!val.still_exists && !val.deleted) {
|
|
console.log(`gone: at://${did}/${val.collection}/${val.rkey} ${val.cid}`);
|
|
await db.run(`UPDATE records SET deleted = true, updatedAt = ? WHERE did = ? AND collection = ? AND rkey = ? AND cid = ?;`, [
|
|
Date.now(), did, val.collection, val.rkey, val.cid
|
|
]);
|
|
}
|
|
}
|
|
//console.debug("commit");
|
|
//await db.exec(`COMMIT`);
|
|
console.log(did, `${rdids.size} related dids`);
|
|
rdids.delete(did);
|
|
return rdids;
|
|
}
|
|
|
|
async function saveBlob(pds, did, cid) {
|
|
var fp = `blobs/${cid}`;
|
|
if (existsSync(fp)) return;
|
|
var blob = await w.get(`${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`).buffer();
|
|
await writeFile(fp, blob);
|
|
}
|
|
|
|
async function saveBlobs(pds, did) {
|
|
console.log("save blobs", did);
|
|
var allCids = [];
|
|
var _cursor;
|
|
do {
|
|
let {cids, cursor} = await w.get(`${pds}/xrpc/com.atproto.sync.listBlobs?did=${did}&limit=1000${_cursor?`&cursor=${encodeURIComponent(_cursor)}`:''}`).json();
|
|
_cursor = cursor;
|
|
allCids.push(...cids);
|
|
} while (_cursor);
|
|
console.log(`${allCids.length} blobs`);
|
|
for (var cid of allCids) await saveBlob(pds, did, cid).catch(error => console.error(error.message));
|
|
}
|
|
|
|
var backedUpDids = [];
|
|
|
|
async function backup(did) {
|
|
console.log("backup", did);
|
|
var pds = await getPds(did);
|
|
if (!pds) {
|
|
console.log(`no pds for ${did}`);
|
|
return;
|
|
}
|
|
var repo = await getRepo(pds, did).catch(error => console.error(error.message));
|
|
if (repo) var rdids = await processRepo(did, repo).catch(error => console.error(error.message));
|
|
await saveBlobs(pds, did).catch(error => console.error(error.message));
|
|
return rdids;
|
|
}
|
|
|
|
|
|
async function backupRecursive(dids, depth = 1) {
|
|
console.log(`backup ${dids.size} dids depth ${depth}`);
|
|
|
|
var pds2dids = {};
|
|
for (let did of dids) {
|
|
let pds = await getPds(did);
|
|
(pds2dids[pds] ||= new Set()).add(did);
|
|
}
|
|
console.log(`backing up from ${Object.keys(pds2dids).length} PDSes at once`);
|
|
|
|
var allrdids = new Set();
|
|
await Promise.all(Object.entries(pds2dids).map(async ([pds, dids]) => {
|
|
var index = -1;
|
|
for (let did of dids) {
|
|
index++;
|
|
if (backedUpDids.includes(did)) continue;
|
|
backedUpDids.push(did);
|
|
console.log(`[${pds}] ${index} / ${dids.size}`);
|
|
try {
|
|
let rdids = await backup(did);
|
|
rdids?.forEach(rdid => allrdids.add(rdid));
|
|
} catch (error) {
|
|
console.error(error.stack);
|
|
}
|
|
}
|
|
}));
|
|
|
|
if (depth) await backupRecursive(allrdids, depth - 1);
|
|
}
|
|
|
|
|
|
var didsToBackup = new Set(process.argv.filter(arg => arg.startsWith("did:")));
|
|
var depth;
|
|
for (let arg of process.argv) {
|
|
let match = arg.match(/^--depth=(\d)+$/i);
|
|
if (match) {
|
|
depth = Number(match[1]);
|
|
}
|
|
}
|
|
|
|
await backupRecursive(didsToBackup, depth);
|