Compare commits
1 Commits
sqlite
...
concurrent
| Author | SHA1 | Date | |
|---|---|---|---|
| ad1c475999 |
@@ -4,13 +4,24 @@ import {iterateAtpRepo} from '@atcute/car';
|
|||||||
import {CidLinkWrapper} from "@atcute/cid";
|
import {CidLinkWrapper} from "@atcute/cid";
|
||||||
import {DidResolver} from "@atproto/identity";
|
import {DidResolver} from "@atproto/identity";
|
||||||
import got from "got";
|
import got from "got";
|
||||||
import * as fs from "fs";
|
import {writeFile} from "fs/promises";
|
||||||
|
import {existsSync, mkdirSync} from 'fs';
|
||||||
|
|
||||||
try {fs.mkdirSync("blobs")} catch(e) {}
|
try {mkdirSync("blobs")} catch(e) {}
|
||||||
try {fs.mkdirSync("repos")} catch(e) {}
|
try {mkdirSync("repos")} catch(e) {}
|
||||||
|
|
||||||
var didres = new DidResolver({});
|
var didres = new DidResolver({});
|
||||||
var getPds = async did => (await didres.resolve(did)).service.find(s => s.id == "#atproto_pds")?.serviceEndpoint;
|
async function getPds(did) {
|
||||||
|
try {
|
||||||
|
var doc = await didres.resolve(did);
|
||||||
|
} catch(error) {
|
||||||
|
console.error("resolve", did, error.message);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return doc?.service?.find(s => s.id == "#atproto_pds")?.serviceEndpoint;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
var w = got.extend({
|
var w = got.extend({
|
||||||
hooks: {
|
hooks: {
|
||||||
beforeRequest: [({url, method}) => console.log(`${method} ${url}`)],
|
beforeRequest: [({url, method}) => console.log(`${method} ${url}`)],
|
||||||
@@ -37,7 +48,7 @@ var insert_statement = await db.prepare(`INSERT INTO records VALUES (?,?,?,?,?,?
|
|||||||
|
|
||||||
async function getRepo(pds, did) {
|
async function getRepo(pds, did) {
|
||||||
var repo = await w.get(`${pds}/xrpc/com.atproto.sync.getRepo?did=${did}`).buffer();
|
var repo = await w.get(`${pds}/xrpc/com.atproto.sync.getRepo?did=${did}`).buffer();
|
||||||
fs.writeFileSync(`repos/${did.replaceAll(':','-')}`, repo);
|
await writeFile(`repos/${did.replaceAll(':','-')}`, repo);
|
||||||
return repo;
|
return repo;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -55,8 +66,8 @@ async function processRepo(did, repo) {
|
|||||||
still_exists: null
|
still_exists: null
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
console.debug("begin transaction");
|
//console.debug("begin transaction");
|
||||||
await db.exec(`BEGIN TRANSACTION`);
|
//await db.exec(`BEGIN TRANSACTION`);
|
||||||
try {
|
try {
|
||||||
for (let entry of iterateAtpRepo(repo)) {
|
for (let entry of iterateAtpRepo(repo)) {
|
||||||
recordCounts[entry.collection] ||= {now: 0, new: 0};
|
recordCounts[entry.collection] ||= {now: 0, new: 0};
|
||||||
@@ -107,8 +118,8 @@ async function processRepo(did, repo) {
|
|||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
console.debug("commit");
|
//console.debug("commit");
|
||||||
await db.exec(`COMMIT`);
|
//await db.exec(`COMMIT`);
|
||||||
console.log(did, `${rdids.size} related dids`);
|
console.log(did, `${rdids.size} related dids`);
|
||||||
rdids.delete(did);
|
rdids.delete(did);
|
||||||
return rdids;
|
return rdids;
|
||||||
@@ -116,9 +127,9 @@ async function processRepo(did, repo) {
|
|||||||
|
|
||||||
async function saveBlob(pds, did, cid) {
|
async function saveBlob(pds, did, cid) {
|
||||||
var fp = `blobs/${cid}`;
|
var fp = `blobs/${cid}`;
|
||||||
if (fs.existsSync(fp)) return;
|
if (existsSync(fp)) return;
|
||||||
var blob = await w.get(`${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`).buffer();
|
var blob = await w.get(`${pds}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`).buffer();
|
||||||
fs.writeFileSync(fp, blob);
|
await writeFile(fp, blob);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function saveBlobs(pds, did) {
|
async function saveBlobs(pds, did) {
|
||||||
@@ -144,7 +155,7 @@ async function backup(did) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
var repo = await getRepo(pds, did).catch(error => console.error(error.message));
|
var repo = await getRepo(pds, did).catch(error => console.error(error.message));
|
||||||
if (repo) var rdids = await processRepo(did, repo);
|
if (repo) var rdids = await processRepo(did, repo).catch(error => console.error(error.message));
|
||||||
await saveBlobs(pds, did).catch(error => console.error(error.message));
|
await saveBlobs(pds, did).catch(error => console.error(error.message));
|
||||||
return rdids;
|
return rdids;
|
||||||
}
|
}
|
||||||
@@ -152,25 +163,32 @@ async function backup(did) {
|
|||||||
|
|
||||||
async function backupRecursive(dids, depth = 1) {
|
async function backupRecursive(dids, depth = 1) {
|
||||||
console.log(`backup ${dids.size} dids depth ${depth}`);
|
console.log(`backup ${dids.size} dids depth ${depth}`);
|
||||||
if (!depth) {
|
|
||||||
for (let did of dids) await backup(did).catch(error => console.error(error.stack));
|
var pds2dids = {};
|
||||||
return;
|
|
||||||
}
|
|
||||||
var allrdids = new Set();
|
|
||||||
var index = -1;
|
|
||||||
for (let did of dids) {
|
for (let did of dids) {
|
||||||
index++;
|
let pds = await getPds(did);
|
||||||
if (backedUpDids.includes(did)) continue;
|
(pds2dids[pds] ||= new Set()).add(did);
|
||||||
backedUpDids.push(did);
|
|
||||||
console.log(`${index} / ${dids.size}`);
|
|
||||||
try {
|
|
||||||
let rdids = await backup(did);
|
|
||||||
rdids?.forEach(rdid => allrdids.add(rdid));
|
|
||||||
} catch (error) {
|
|
||||||
console.error(error.stack);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
await backupRecursive(allrdids, depth - 1);
|
console.log(`backing up from ${Object.keys(pds2dids).length} PDSes at once`);
|
||||||
|
|
||||||
|
var allrdids = new Set();
|
||||||
|
await Promise.all(Object.entries(pds2dids).map(async ([pds, dids]) => {
|
||||||
|
var index = -1;
|
||||||
|
for (let did of dids) {
|
||||||
|
index++;
|
||||||
|
if (backedUpDids.includes(did)) continue;
|
||||||
|
backedUpDids.push(did);
|
||||||
|
console.log(`[${pds}] ${index} / ${dids.size}`);
|
||||||
|
try {
|
||||||
|
let rdids = await backup(did);
|
||||||
|
rdids?.forEach(rdid => allrdids.add(rdid));
|
||||||
|
} catch (error) {
|
||||||
|
console.error(error.stack);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
if (depth) await backupRecursive(allrdids, depth - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user