9 Commits

Author SHA1 Message Date
lamp ad1c475999 concurrent backup
download from all pdses at once. I don't know how to do concurrent sqlite transations
2025-11-30 21:54:46 -08:00
lamp 53fda270e7 use got
why is undici extremely slow and failing constantly (UND_ERR_SOCKET)?
2025-11-03 23:04:17 -08:00
lamp 0f83539d66 encode cursor 2025-11-03 19:48:40 -08:00
lamp d0548a7215 handle iterateAtpRepo error 2025-05-21 13:14:11 -07:00
lamp 60df9c5d0e INDEX 2025-05-15 14:12:30 -07:00
lamp 245e2bfe22 optimize sqlite insert 2025-05-12 02:48:16 -07:00
lamp 5b060c6a23 add updatedAt 2025-05-12 01:55:54 -07:00
lamp 4f877526ca don't recurse CidLinkWrapper 2025-05-12 01:51:08 -07:00
lamp 9f3b14f5ef use sqlite, preserve deleted records, support did:web 2025-05-12 01:39:24 -07:00
8 changed files with 3452 additions and 126 deletions
+2
View File
@@ -1,3 +1,5 @@
node_modules node_modules
blobs blobs
repos repos
*.sqlite
*.sqlite-journal
+22
View File
@@ -0,0 +1,22 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "Launch Program",
"skipFiles": [
"<node_internals>/**"
],
"program": "${workspaceFolder}\\index.js",
"args": ["did:plc:u4gngygg2w5egsigxu5g7byu"],
"outFiles": [
"${workspaceFolder}/**/*.js"
]
}
]
}
-6
View File
@@ -1,6 +0,0 @@
{
"deno.enable": true,
"deno.enablePaths": [
"*.deno.js"
]
}
-113
View File
@@ -1,113 +0,0 @@
try { Deno.mkdirSync("blobs") } catch (error) {}
try { Deno.mkdirSync("repos") } catch (error) {}
async function get(url, options) {
console.debug("get", url);
var res = await fetch(url, options);
if (!res.ok) throw new Error(`HTTP ${res.status} ${res.statusText} ${await res.text()}`);
return res;
}
async function getPds(did) {
var doc = await get(`https://plc.directory/${did}`).then(res => res.json());
var {serviceEndpoint} = doc.service.find(s => s.id == "#atproto_pds");
return serviceEndpoint;
}
async function getRepo(pds, did) {
var fp = `repos/${did.replaceAll(':','-')}`;
var res = await get(`${pds}/xrpc/com.atproto.sync.getRepo?did=${did}`);
var ab = await res.arrayBuffer();
Deno.writeFileSync(fp, new Uint8Array(ab));
return ab;
}
function exists(path) {
try {
Deno.lstatSync(path);
return true;
} catch (error) {
if (error instanceof Deno.errors.NotFound) return false;
throw error;
}
}
async function getBlob(pds, did, cid) {
/*try {
var file = await Deno.open(`blobs/${cid}`, {write: true, createNew: true});
} catch (error) {
if (error instanceof Deno.errors.AlreadyExists) return;
throw error;
}
var res = await get(`${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`);
await res.body.pipeTo(file.writable);*/ // if download is interrupted will leave corrupt file
var fp = `blobs/${cid}`;
if (exists(fp)) return;
var res = await get(`${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`);
var ab = await res.arrayBuffer();
Deno.writeFileSync(fp, new Uint8Array(ab));
}
async function getBlobs(pds, did) {
var allCids = [];
var _cursor;
do {
try {
let {cids, cursor} = await get(`${pds}/xrpc/com.atproto.sync.listBlobs?did=${did}&limit=1000${_cursor?`&cursor=${_cursor}`:''}`).then(res => res.json());
_cursor = cursor;
allCids.push(...cids);
} catch (error) {
console.error(error);
}
} while (_cursor);
console.log(`${allCids.length} blobs`);
for (var cid of allCids) await getBlob(pds, did, cid).catch(console.error);
}
var backedUpDids = [];
async function backup(did) {
if (backedUpDids.includes(did)) return;
console.log("backup", did);
backedUpDids.push(did);
var pds = await getPds(did);
var repo = await getRepo(pds, did).catch(console.error);
await getBlobs(pds, did);
return repo;
}
var ascii = new TextDecoder("ascii");
async function backupRecursive(dids, depth = 1) {
console.log(`backup ${dids.length} dids depth ${depth}`);
if (!depth) {
for (var did of dids) await backup(did).catch(console.error);
return;
}
var allrdids = new Set();
for (var did of dids) {
try {
var repo = await backup(did);
repo = ascii.decode(repo);
var rdids = new Set(repo.match(/did:plc:[a-z2-7]{24}/g));
rdids.delete(did);
console.log(`${rdids.size} related didplcs`);
rdids.forEach(rdid => allrdids.add(rdid));
} catch (error) {
console.error(error);
}
}
await backupRecursive(allrdids, depth - 1);
}
var didsToBackup = Deno.args.filter(arg => arg.startsWith("did:plc:"));
var depth;
for (let arg of Deno.args) {
let match = arg.match(/^--depth=(\d)+$/i);
if (match) {
depth = Number(match[1]);
}
}
await backupRecursive(didsToBackup, depth);
+204
View File
@@ -0,0 +1,204 @@
import sqlite3 from 'sqlite3';
import {open} from 'sqlite';
import {iterateAtpRepo} from '@atcute/car';
import {CidLinkWrapper} from "@atcute/cid";
import {DidResolver} from "@atproto/identity";
import got from "got";
import {writeFile} from "fs/promises";
import {existsSync, mkdirSync} from 'fs';
try {mkdirSync("blobs")} catch(e) {}
try {mkdirSync("repos")} catch(e) {}
var didres = new DidResolver({});
async function getPds(did) {
try {
var doc = await didres.resolve(did);
} catch(error) {
console.error("resolve", did, error.message);
return null;
}
return doc?.service?.find(s => s.id == "#atproto_pds")?.serviceEndpoint;
}
var w = got.extend({
hooks: {
beforeRequest: [({url, method}) => console.log(`${method} ${url}`)],
afterResponse: [response => {
console.log(response.statusCode, (response.timings.end - response.timings.start) + "ms");
return response;
}],
beforeError: [error => {
if (error.response) return new Error(`HTTP ${error.response.statusCode} ${error.response.body}`);
else return error;
}]
}
});
var db = await open({
filename: "repos.sqlite",
driver: sqlite3.Database
});
db.getDatabaseInstance().serialize(); // :/
await db.exec(`CREATE TABLE IF NOT EXISTS records (did TEXT, collection TEXT, rkey TEXT, cid TEXT, record BLOB, deleted BOOLEAN, insertedAt INTEGER, updatedAt INTEGER);`);
await db.exec(`CREATE INDEX IF NOT EXISTS records_index ON records (did, collection, rkey, cid, deleted);`);
var insert_statement = await db.prepare(`INSERT INTO records VALUES (?,?,?,?,?,?,?,?);`);
async function getRepo(pds, did) {
var repo = await w.get(`${pds}/xrpc/com.atproto.sync.getRepo?did=${did}`).buffer();
await writeFile(`repos/${did.replaceAll(':','-')}`, repo);
return repo;
}
async function processRepo(did, repo) {
console.log("process", did);
var recordCounts = {};
var rdids = new Set();
var rows = await db.all(`SELECT collection, rkey, cid, deleted FROM records WHERE did = ?;`, did);
console.debug("got rows");
var existing = {};
for (let {collection, rkey, cid, deleted} of rows) {
existing[`${collection}~${rkey}~${cid}`] = {
collection, rkey, cid,
deleted: deleted || false,
still_exists: null
};
}
//console.debug("begin transaction");
//await db.exec(`BEGIN TRANSACTION`);
try {
for (let entry of iterateAtpRepo(repo)) {
recordCounts[entry.collection] ||= {now: 0, new: 0};
recordCounts[entry.collection].now++;
try {
(function recurseObject(obj) {
for (let key in obj) {
if (!obj.hasOwnProperty(key)) continue;
let val = obj[key];
if (typeof val == "object" && !(val instanceof CidLinkWrapper)) recurseObject(val);
else if (typeof val == "string") {
let didplcs = val.match(/did:plc:[a-z2-7]{24}/g);
if (didplcs) didplcs.forEach(did => rdids.add(did));
let didwebs = val.match(/did:web:[a-zA-Z0-9-.]+/g);
if (didwebs) didwebs.forEach(did => rdids.add(did));
}
}
})(entry.record);
let key = `${entry.collection}~${entry.rkey}~${entry.cid.$link}`;
if (key in existing) {
if (existing[key].deleted) {
console.log(`undeleted: at://${did}/${entry.collection}/${entry.rkey}`);
await db.run(`UPDATE records SET deleted = false, updatedAt = ? WHERE did = ? AND collection = ? AND rkey = ? AND cid = ?;`, [
Date.now(), did, entry.collection, entry.rkey, entry.cid.$link
]);
} else {
existing[key].still_exists = true;
}
continue;
}
recordCounts[entry.collection].new++;
await insert_statement.run([did, entry.collection, entry.rkey, entry.cid.$link, entry.bytes, false, Date.now(), null]);
} catch (error) {
console.error("record error", error.stack)
}
}
} catch (error) {
console.error("iterate error", error.stack);
var badrepo = true;
}
console.log(did, recordCounts);
if (!badrepo) for (let key in existing) {
let val = existing[key];
if (!val.still_exists && !val.deleted) {
console.log(`gone: at://${did}/${val.collection}/${val.rkey} ${val.cid}`);
await db.run(`UPDATE records SET deleted = true, updatedAt = ? WHERE did = ? AND collection = ? AND rkey = ? AND cid = ?;`, [
Date.now(), did, val.collection, val.rkey, val.cid
]);
}
}
//console.debug("commit");
//await db.exec(`COMMIT`);
console.log(did, `${rdids.size} related dids`);
rdids.delete(did);
return rdids;
}
async function saveBlob(pds, did, cid) {
var fp = `blobs/${cid}`;
if (existsSync(fp)) return;
var blob = await w.get(`${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`).buffer();
await writeFile(fp, blob);
}
async function saveBlobs(pds, did) {
console.log("save blobs", did);
var allCids = [];
var _cursor;
do {
let {cids, cursor} = await w.get(`${pds}/xrpc/com.atproto.sync.listBlobs?did=${did}&limit=1000${_cursor?`&cursor=${encodeURIComponent(_cursor)}`:''}`).json();
_cursor = cursor;
allCids.push(...cids);
} while (_cursor);
console.log(`${allCids.length} blobs`);
for (var cid of allCids) await saveBlob(pds, did, cid).catch(error => console.error(error.message));
}
var backedUpDids = [];
async function backup(did) {
console.log("backup", did);
var pds = await getPds(did);
if (!pds) {
console.log(`no pds for ${did}`);
return;
}
var repo = await getRepo(pds, did).catch(error => console.error(error.message));
if (repo) var rdids = await processRepo(did, repo).catch(error => console.error(error.message));
await saveBlobs(pds, did).catch(error => console.error(error.message));
return rdids;
}
async function backupRecursive(dids, depth = 1) {
console.log(`backup ${dids.size} dids depth ${depth}`);
var pds2dids = {};
for (let did of dids) {
let pds = await getPds(did);
(pds2dids[pds] ||= new Set()).add(did);
}
console.log(`backing up from ${Object.keys(pds2dids).length} PDSes at once`);
var allrdids = new Set();
await Promise.all(Object.entries(pds2dids).map(async ([pds, dids]) => {
var index = -1;
for (let did of dids) {
index++;
if (backedUpDids.includes(did)) continue;
backedUpDids.push(did);
console.log(`[${pds}] ${index} / ${dids.size}`);
try {
let rdids = await backup(did);
rdids?.forEach(rdid => allrdids.add(rdid));
} catch (error) {
console.error(error.stack);
}
}
}));
if (depth) await backupRecursive(allrdids, depth - 1);
}
var didsToBackup = new Set(process.argv.filter(arg => arg.startsWith("did:")));
var depth;
for (let arg of process.argv) {
let match = arg.match(/^--depth=(\d)+$/i);
if (match) {
depth = Number(match[1]);
}
}
await backupRecursive(didsToBackup, depth);
+3203
View File
File diff suppressed because it is too large Load Diff
+10
View File
@@ -0,0 +1,10 @@
{
"dependencies": {
"@atcute/car": "^3.0.4",
"@atproto/identity": "^0.4.8",
"got": "^14.6.2",
"sqlite": "^5.1.1",
"sqlite3": "^5.1.7"
},
"type": "module"
}
+10 -6
View File
@@ -2,12 +2,10 @@
A script that downloads a Bluesky profile (repo and blobs) as well as all profiles referenced by (i.e. followed, liked, mentioned, replied to, blocked, listed, etc) that profile. A script that downloads a Bluesky profile (repo and blobs) as well as all profiles referenced by (i.e. followed, liked, mentioned, replied to, blocked, listed, etc) that profile.
Only supports did:plc for now.
To backup one or more profiles and all related profiles: To backup one or more profiles and all related profiles:
```sh ```sh
$ deno run --allow-all bskybackup.deno.js did:plc:u4gngygg2w5egsigxu5g7byu # (you can put more than one did:plc separated by spaces) $ node . did:plc:u4gngygg2w5egsigxu5g7byu # (you can put more than one did separated by spaces)
``` ```
This will preserve all content they interacted with, such as posts they liked, replied to, quoted or reposted. This will preserve all content they interacted with, such as posts they liked, replied to, quoted or reposted.
@@ -15,12 +13,18 @@ This will preserve all content they interacted with, such as posts they liked, r
To backup a profile, all related profiles AND all profiles related to related profiles: To backup a profile, all related profiles AND all profiles related to related profiles:
```sh ```sh
$ deno run --allow-all bskybackup.deno.js did:plc:u4gngygg2w5egsigxu5g7byu --depth=2 $ node . did:plc:u4gngygg2w5egsigxu5g7byu --depth=2
``` ```
This will additionally save all profiles related to the profiles related to them, so you will have a copy of everything their friends liked and reposted as well. This may be a LOT of data! This will additionally save all profiles related to the profiles related to them, so you will have a copy of everything their friends liked and reposted as well. This may be a LOT of data!
Running the script again will refresh the repositories and download new blobs. Existing blobs are not re-downloaded. Run the script regularly to keep the backups up to date. To backup ONLY a single profiles (or list of profiles):
```sh
$ node . did:plc:u4gngygg2w5egsigxu5g7byu --depth=0
```
Run the script again to refresh the repos and download new blobs. New repo records are added to the database and deleted records are preserved but marked as deleted. Edited records are added as new entries and the old versions are marked as deleted.
## Example SystemD ## Example SystemD
@@ -35,7 +39,7 @@ After=network-online.target
[Service] [Service]
WorkingDirectory=/zpool1/FileStorage/srv/bskybackup/ WorkingDirectory=/zpool1/FileStorage/srv/bskybackup/
User=bskybackup User=bskybackup
ExecStart=deno run --allow-all bskybackup.deno.js did:plc:u4gngygg2w5egsigxu5g7byu ExecStart=node . did:plc:u4gngygg2w5egsigxu5g7byu
Type=exec Type=exec
``` ```