Compare commits

...

6 Commits

Author SHA1 Message Date
d0548a7215 handle iterateAtpRepo error 2025-05-21 13:14:11 -07:00
60df9c5d0e INDEX 2025-05-15 14:12:30 -07:00
245e2bfe22 optimize sqlite insert 2025-05-12 02:48:16 -07:00
5b060c6a23 add updatedAt 2025-05-12 01:55:54 -07:00
4f877526ca don't recurse CidLinkWrapper 2025-05-12 01:51:08 -07:00
9f3b14f5ef use sqlite, preserve deleted records, support did:web 2025-05-12 01:39:24 -07:00
8 changed files with 2971 additions and 126 deletions

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
node_modules
blobs
repos
*.sqlite
*.sqlite-journal

22
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,22 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "Launch Program",
"skipFiles": [
"<node_internals>/**"
],
"program": "${workspaceFolder}\\index.js",
"args": ["did:plc:u4gngygg2w5egsigxu5g7byu"],
"outFiles": [
"${workspaceFolder}/**/*.js"
]
}
]
}

View File

@ -1,6 +0,0 @@
{
"deno.enable": true,
"deno.enablePaths": [
"*.deno.js"
]
}

View File

@ -1,113 +0,0 @@
try { Deno.mkdirSync("blobs") } catch (error) {}
try { Deno.mkdirSync("repos") } catch (error) {}
async function get(url, options) {
console.debug("get", url);
var res = await fetch(url, options);
if (!res.ok) throw new Error(`HTTP ${res.status} ${res.statusText} ${await res.text()}`);
return res;
}
async function getPds(did) {
var doc = await get(`https://plc.directory/${did}`).then(res => res.json());
var {serviceEndpoint} = doc.service.find(s => s.id == "#atproto_pds");
return serviceEndpoint;
}
async function getRepo(pds, did) {
var fp = `repos/${did.replaceAll(':','-')}`;
var res = await get(`${pds}/xrpc/com.atproto.sync.getRepo?did=${did}`);
var ab = await res.arrayBuffer();
Deno.writeFileSync(fp, new Uint8Array(ab));
return ab;
}
function exists(path) {
try {
Deno.lstatSync(path);
return true;
} catch (error) {
if (error instanceof Deno.errors.NotFound) return false;
throw error;
}
}
async function getBlob(pds, did, cid) {
/*try {
var file = await Deno.open(`blobs/${cid}`, {write: true, createNew: true});
} catch (error) {
if (error instanceof Deno.errors.AlreadyExists) return;
throw error;
}
var res = await get(`${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`);
await res.body.pipeTo(file.writable);*/ // if download is interrupted will leave corrupt file
var fp = `blobs/${cid}`;
if (exists(fp)) return;
var res = await get(`${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`);
var ab = await res.arrayBuffer();
Deno.writeFileSync(fp, new Uint8Array(ab));
}
async function getBlobs(pds, did) {
var allCids = [];
var _cursor;
do {
try {
let {cids, cursor} = await get(`${pds}/xrpc/com.atproto.sync.listBlobs?did=${did}&limit=1000${_cursor?`&cursor=${_cursor}`:''}`).then(res => res.json());
_cursor = cursor;
allCids.push(...cids);
} catch (error) {
console.error(error);
}
} while (_cursor);
console.log(`${allCids.length} blobs`);
for (var cid of allCids) await getBlob(pds, did, cid).catch(console.error);
}
var backedUpDids = [];
async function backup(did) {
if (backedUpDids.includes(did)) return;
console.log("backup", did);
backedUpDids.push(did);
var pds = await getPds(did);
var repo = await getRepo(pds, did).catch(console.error);
await getBlobs(pds, did);
return repo;
}
var ascii = new TextDecoder("ascii");
async function backupRecursive(dids, depth = 1) {
console.log(`backup ${dids.length} dids depth ${depth}`);
if (!depth) {
for (var did of dids) await backup(did).catch(console.error);
return;
}
var allrdids = new Set();
for (var did of dids) {
try {
var repo = await backup(did);
repo = ascii.decode(repo);
var rdids = new Set(repo.match(/did:plc:[a-z2-7]{24}/g));
rdids.delete(did);
console.log(`${rdids.size} related didplcs`);
rdids.forEach(rdid => allrdids.add(rdid));
} catch (error) {
console.error(error);
}
}
await backupRecursive(allrdids, depth - 1);
}
var didsToBackup = Deno.args.filter(arg => arg.startsWith("did:plc:"));
var depth;
for (let arg of Deno.args) {
let match = arg.match(/^--depth=(\d)+$/i);
if (match) {
depth = Number(match[1]);
}
}
await backupRecursive(didsToBackup, depth);

186
index.js Normal file
View File

@ -0,0 +1,186 @@
import sqlite3 from 'sqlite3';
import {open} from 'sqlite';
import {iterateAtpRepo} from '@atcute/car';
import {CidLinkWrapper} from "@atcute/cid";
import {DidResolver} from "@atproto/identity";
import * as fs from "fs";
try {fs.mkdirSync("blobs")} catch(e) {}
try {fs.mkdirSync("repos")} catch(e) {}
var didres = new DidResolver({});
var getPds = async did => (await didres.resolve(did)).service.find(s => s.id == "#atproto_pds")?.serviceEndpoint;
var db = await open({
filename: "repos.sqlite",
driver: sqlite3.Database
});
db.getDatabaseInstance().serialize(); // :/
await db.exec(`CREATE TABLE IF NOT EXISTS records (did TEXT, collection TEXT, rkey TEXT, cid TEXT, record BLOB, deleted BOOLEAN, insertedAt INTEGER, updatedAt INTEGER);`);
await db.exec(`CREATE INDEX IF NOT EXISTS records_index ON records (did, collection, rkey, cid, deleted);`);
var insert_statement = await db.prepare(`INSERT INTO records VALUES (?,?,?,?,?,?,?,?);`);
async function fetch(url, options) {
console.debug("get", url);
var res = await global.fetch(url, options);
if (!res.ok) throw new Error(`HTTP ${res.status} ${res.statusText} ${await res.text()}`);
return res;
}
async function getRepo(pds, did) {
var res = await fetch(`${pds}/xrpc/com.atproto.sync.getRepo?did=${did}`);
var ab = await res.arrayBuffer();
var u = new Uint8Array(ab);
fs.writeFileSync(`repos/${did.replaceAll(':','-')}`, u);
return u;
}
async function processRepo(did, repo) {
console.log("process", did);
var recordCounts = {};
var rdids = new Set();
var rows = await db.all(`SELECT collection, rkey, cid, deleted FROM records WHERE did = ?;`, did);
console.debug("got rows");
var existing = {};
for (let {collection, rkey, cid, deleted} of rows) {
existing[`${collection}~${rkey}~${cid}`] = {
collection, rkey, cid,
deleted: deleted || false,
still_exists: null
};
}
console.debug("begin transaction");
await db.exec(`BEGIN TRANSACTION`);
try {
for (let entry of iterateAtpRepo(repo)) {
recordCounts[entry.collection] ||= {now: 0, new: 0};
recordCounts[entry.collection].now++;
try {
(function recurseObject(obj) {
for (let key in obj) {
if (!obj.hasOwnProperty(key)) continue;
let val = obj[key];
if (typeof val == "object" && !(val instanceof CidLinkWrapper)) recurseObject(val);
else if (typeof val == "string") {
let didplcs = val.match(/did:plc:[a-z2-7]{24}/g);
if (didplcs) didplcs.forEach(did => rdids.add(did));
let didwebs = val.match(/did:web:[a-zA-Z0-9-.]+/g);
if (didwebs) didwebs.forEach(did => rdids.add(did));
}
}
})(entry.record);
let key = `${entry.collection}~${entry.rkey}~${entry.cid.$link}`;
if (key in existing) {
if (existing[key].deleted) {
console.log(`undeleted: at://${did}/${entry.collection}/${entry.rkey}`);
await db.run(`UPDATE records SET deleted = false, updatedAt = ? WHERE did = ? AND collection = ? AND rkey = ? AND cid = ?;`, [
Date.now(), did, entry.collection, entry.rkey, entry.cid.$link
]);
} else {
existing[key].still_exists = true;
}
continue;
}
recordCounts[entry.collection].new++;
await insert_statement.run([did, entry.collection, entry.rkey, entry.cid.$link, entry.bytes, false, Date.now(), null]);
} catch (error) {
console.error("record error", error.stack)
}
}
} catch (error) {
console.error("iterate error", error.stack);
var badrepo = true;
}
console.log(did, recordCounts);
if (!badrepo) for (let key in existing) {
let val = existing[key];
if (!val.still_exists && !val.deleted) {
console.log(`gone: at://${did}/${val.collection}/${val.rkey} ${val.cid}`);
await db.run(`UPDATE records SET deleted = true, updatedAt = ? WHERE did = ? AND collection = ? AND rkey = ? AND cid = ?;`, [
Date.now(), did, val.collection, val.rkey, val.cid
]);
}
}
console.debug("commit");
await db.exec(`COMMIT`);
console.log(did, `${rdids.size} related dids`);
rdids.delete(did);
return rdids;
}
async function saveBlob(pds, did, cid) {
var fp = `blobs/${cid}`;
if (fs.existsSync(fp)) return;
var res = await fetch(`${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`);
var ab = await res.arrayBuffer();
fs.writeFileSync(fp, new Uint8Array(ab));
}
async function saveBlobs(pds, did) {
console.log("save blobs", did);
var allCids = [];
var _cursor;
do {
try {
let {cids, cursor} = await fetch(`${pds}/xrpc/com.atproto.sync.listBlobs?did=${did}&limit=1000${_cursor?`&cursor=${_cursor}`:''}`).then(res => res.json());
_cursor = cursor;
allCids.push(...cids);
} catch (error) {
console.error(error);
}
} while (_cursor);
console.log(`${allCids.length} blobs`);
for (var cid of allCids) await saveBlob(pds, did, cid).catch(console.error);
}
var backedUpDids = [];
async function backup(did) {
console.log("backup", did);
var pds = await getPds(did);
if (!pds) {
console.log(`no pds for ${did}`);
return;
}
var repo = await getRepo(pds, did).catch(console.error);
if (repo) var rdids = await processRepo(did, repo);
await saveBlobs(pds, did);
return rdids;
}
async function backupRecursive(dids, depth = 1) {
console.log(`backup ${dids.size} dids depth ${depth}`);
if (!depth) {
for (let did of dids) await backup(did).catch(console.error);
return;
}
var allrdids = new Set();
var index = -1;
for (let did of dids) {
index++;
if (backedUpDids.includes(did)) continue;
backedUpDids.push(did);
console.log(`${index} / ${dids.size}`);
try {
let rdids = await backup(did);
rdids?.forEach(rdid => allrdids.add(rdid));
} catch (error) {
console.error(error);
}
}
await backupRecursive(allrdids, depth - 1);
}
var didsToBackup = new Set(process.argv.filter(arg => arg.startsWith("did:")));
var depth;
for (let arg of process.argv) {
let match = arg.match(/^--depth=(\d)+$/i);
if (match) {
depth = Number(match[1]);
}
}
await backupRecursive(didsToBackup, depth);

2741
package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

9
package.json Normal file
View File

@ -0,0 +1,9 @@
{
"dependencies": {
"@atcute/car": "^3.0.4",
"@atproto/identity": "^0.4.8",
"sqlite": "^5.1.1",
"sqlite3": "^5.1.7"
},
"type": "module"
}

View File

@ -2,12 +2,10 @@
A script that downloads a Bluesky profile (repo and blobs) as well as all profiles referenced by (i.e. followed, liked, mentioned, replied to, blocked, listed, etc) that profile.
Only supports did:plc for now.
To backup one or more profiles and all related profiles:
```sh
$ deno run --allow-all bskybackup.deno.js did:plc:u4gngygg2w5egsigxu5g7byu # (you can put more than one did:plc separated by spaces)
$ node . did:plc:u4gngygg2w5egsigxu5g7byu # (you can put more than one did separated by spaces)
```
This will preserve all content they interacted with, such as posts they liked, replied to, quoted or reposted.
@ -15,12 +13,18 @@ This will preserve all content they interacted with, such as posts they liked, r
To backup a profile, all related profiles AND all profiles related to related profiles:
```sh
$ deno run --allow-all bskybackup.deno.js did:plc:u4gngygg2w5egsigxu5g7byu --depth=2
$ node . did:plc:u4gngygg2w5egsigxu5g7byu --depth=2
```
This will additionally save all profiles related to the profiles related to them, so you will have a copy of everything their friends liked and reposted as well. This may be a LOT of data!
Running the script again will refresh the repositories and download new blobs. Existing blobs are not re-downloaded. Run the script regularly to keep the backups up to date.
To backup ONLY a single profiles (or list of profiles):
```sh
$ node . did:plc:u4gngygg2w5egsigxu5g7byu --depth=0
```
Run the script again to refresh the repos and download new blobs. New repo records are added to the database and deleted records are preserved but marked as deleted. Edited records are added as new entries and the old versions are marked as deleted.
## Example SystemD
@ -35,7 +39,7 @@ After=network-online.target
[Service]
WorkingDirectory=/zpool1/FileStorage/srv/bskybackup/
User=bskybackup
ExecStart=deno run --allow-all bskybackup.deno.js did:plc:u4gngygg2w5egsigxu5g7byu
ExecStart=node . did:plc:u4gngygg2w5egsigxu5g7byu
Type=exec
```