refactored repo
This commit is contained in:
parent
bb7963293b
commit
08f12f48c6
packages
@ -3,8 +3,8 @@ import { DidableKey } from './ucans'
|
||||
|
||||
import { adxSemantics, parseAdxResource } from './semantics'
|
||||
import { MONTH_IN_SEC, YEAR_IN_SEC } from './consts'
|
||||
import { Signer } from './types'
|
||||
import { writeCap } from './capabilities'
|
||||
import { CapWithProof, Signer } from './types'
|
||||
import { vaguerCap, writeCap } from './capabilities'
|
||||
|
||||
export class AuthStore implements Signer {
|
||||
protected keypair: DidableKey
|
||||
@ -58,7 +58,7 @@ export class AuthStore implements Signer {
|
||||
return keypair.sign(data)
|
||||
}
|
||||
|
||||
async findUcan(cap: ucan.Capability): Promise<ucan.Ucan | null> {
|
||||
async findProof(cap: ucan.Capability): Promise<ucan.DelegationChain | null> {
|
||||
const ucanStore = await this.getUcanStore()
|
||||
// we only handle adx caps right now
|
||||
const resource = parseAdxResource(cap.with)
|
||||
@ -67,7 +67,13 @@ export class AuthStore implements Signer {
|
||||
ucanStore.findWithCapability(await this.did(), cap, resource.did),
|
||||
)
|
||||
if (!res) return null
|
||||
return res.ucan
|
||||
return res
|
||||
}
|
||||
|
||||
async findUcan(cap: ucan.Capability): Promise<ucan.Ucan | null> {
|
||||
const chain = await this.findProof(cap)
|
||||
if (chain === null) return null
|
||||
return chain.ucan
|
||||
}
|
||||
|
||||
async hasUcan(cap: ucan.Capability): Promise<boolean> {
|
||||
@ -91,6 +97,51 @@ export class AuthStore implements Signer {
|
||||
.build()
|
||||
}
|
||||
|
||||
async createUcanForCaps(
|
||||
audience: string,
|
||||
caps: ucan.Capability[],
|
||||
lifetime = MONTH_IN_SEC,
|
||||
): Promise<ucan.Ucan> {
|
||||
const proofs: CapWithProof[] = []
|
||||
const encodedTokens = new Set()
|
||||
for (const cap of caps) {
|
||||
const proof = await this.vaguestProofForCap(cap)
|
||||
if (proof === null) {
|
||||
throw new Error(`Could not find a ucan for capability: ${cap.with}`)
|
||||
}
|
||||
// avoid duplicate proofs
|
||||
const token = ucan.encode(proof.prf.ucan)
|
||||
if (!encodedTokens.has(token)) {
|
||||
encodedTokens.add(token)
|
||||
proofs.push(proof)
|
||||
}
|
||||
}
|
||||
|
||||
const keypair = await this.getKeypair()
|
||||
|
||||
const builder = ucan
|
||||
.createBuilder()
|
||||
.issuedBy(keypair)
|
||||
.toAudience(audience)
|
||||
.withLifetimeInSeconds(lifetime)
|
||||
|
||||
for (const prf of proofs) {
|
||||
builder.delegateCapability(prf.cap, prf.prf, adxSemantics)
|
||||
}
|
||||
|
||||
return builder.build()
|
||||
}
|
||||
|
||||
async vaguestProofForCap(cap: ucan.Capability): Promise<CapWithProof | null> {
|
||||
const prf = await this.findProof(cap)
|
||||
if (prf === null) return null
|
||||
const vauger = vaguerCap(cap)
|
||||
if (vauger === null) return { cap, prf }
|
||||
const vaugerPrf = await this.vaguestProofForCap(vauger)
|
||||
if (vaugerPrf === null) return { cap, prf }
|
||||
return vaugerPrf
|
||||
}
|
||||
|
||||
async createAwakeProof(
|
||||
audience: string,
|
||||
cap: ucan.Capability,
|
||||
|
@ -1,4 +1,4 @@
|
||||
import { adxCapability } from './semantics'
|
||||
import { adxCapability, parseAdxResource } from './semantics'
|
||||
import * as ucan from './ucans/index'
|
||||
|
||||
export const writeCap = (
|
||||
@ -26,3 +26,13 @@ export const maintenanceCap = (did: string): ucan.Capability => {
|
||||
const resource = `${did}|*`
|
||||
return adxCapability(resource, 'MAINTENANCE')
|
||||
}
|
||||
|
||||
export const vaguerCap = (cap: ucan.Capability): ucan.Capability | null => {
|
||||
const rsc = parseAdxResource(cap.with)
|
||||
if (rsc === null) return null
|
||||
// can't go vaguer than every collection
|
||||
if (rsc.collection === '*') return null
|
||||
if (rsc.schema === '*') return writeCap(rsc.did)
|
||||
if (rsc.record === '*') return writeCap(rsc.did, rsc.collection)
|
||||
return writeCap(rsc.did, rsc.collection, rsc.schema)
|
||||
}
|
||||
|
@ -1,3 +1,10 @@
|
||||
import * as ucan from './ucans'
|
||||
|
||||
export interface Signer {
|
||||
sign: (data: Uint8Array) => Promise<Uint8Array>
|
||||
}
|
||||
|
||||
export type CapWithProof = {
|
||||
cap: ucan.Capability
|
||||
prf: ucan.DelegationChain
|
||||
}
|
||||
|
@ -1,3 +1,4 @@
|
||||
import * as auth from '@adxp/auth'
|
||||
import { CID } from 'multiformats'
|
||||
|
||||
export class DataDiff {
|
||||
@ -56,11 +57,25 @@ export class DataDiff {
|
||||
deleteList(): DataDelete[] {
|
||||
return Object.values(this.deletes)
|
||||
}
|
||||
}
|
||||
|
||||
export type DataDelete = {
|
||||
key: string
|
||||
cid: CID
|
||||
updatedKeys(): string[] {
|
||||
const keys = [
|
||||
...Object.keys(this.adds),
|
||||
...Object.keys(this.updates),
|
||||
...Object.keys(this.deletes),
|
||||
]
|
||||
return [...new Set(keys)]
|
||||
}
|
||||
|
||||
neededCapabilities(rootDid: string): auth.ucans.Capability[] {
|
||||
return this.updatedKeys().map((key) => {
|
||||
const split = key.split('/')
|
||||
const tid = split[1]
|
||||
if (tid === undefined) throw new Error(`Invalid record id: ${key}`)
|
||||
const collection = split.slice(0, -1).join('/')
|
||||
return auth.writeCap(rootDid, collection, tid)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
export type DataAdd = {
|
||||
@ -73,3 +88,8 @@ export type DataUpdate = {
|
||||
prev: CID
|
||||
cid: CID
|
||||
}
|
||||
|
||||
export type DataDelete = {
|
||||
key: string
|
||||
cid: CID
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ import z from 'zod'
|
||||
import { schema } from '../../common/types'
|
||||
import { DataDiff } from './diff'
|
||||
import { DataStore } from '../types'
|
||||
import { BlockWriter } from '@ipld/car/api'
|
||||
|
||||
/**
|
||||
* This is an implementation of a Merkle Search Tree (MST)
|
||||
@ -673,6 +674,17 @@ export class MST implements DataStore {
|
||||
return leaves.length
|
||||
}
|
||||
|
||||
async writeToCarStream(car: BlockWriter): Promise<void> {
|
||||
for await (const entry of this.walk()) {
|
||||
if (entry.isTree()) {
|
||||
const pointer = await entry.getPointer()
|
||||
await this.blockstore.addToCar(car, pointer)
|
||||
} else {
|
||||
await this.blockstore.addToCar(car, entry.value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Matching Leaf interface
|
||||
// -------------------
|
||||
|
||||
|
@ -2,28 +2,18 @@ import { CID } from 'multiformats/cid'
|
||||
import { CarReader, CarWriter } from '@ipld/car'
|
||||
import { BlockWriter } from '@ipld/car/lib/writer-browser'
|
||||
|
||||
import {
|
||||
RepoRoot,
|
||||
CarStreamable,
|
||||
IdMapping,
|
||||
Commit,
|
||||
schema,
|
||||
UpdateData,
|
||||
BatchWrite,
|
||||
DataStore,
|
||||
} from './types'
|
||||
import { RepoRoot, Commit, schema, BatchWrite, DataStore } from './types'
|
||||
import { DID } from '../common/types'
|
||||
import * as check from '../common/check'
|
||||
import IpldStore, { AllowedIpldVal } from '../blockstore/ipld-store'
|
||||
import { streamToArray } from '../common/util'
|
||||
import CidSet from './cid-set'
|
||||
import * as auth from '@adxp/auth'
|
||||
import * as service from '../network/service'
|
||||
import { AuthStore } from '@adxp/auth'
|
||||
import { MST } from './mst'
|
||||
import Collection from './collection'
|
||||
|
||||
export class Repo implements CarStreamable {
|
||||
export class Repo {
|
||||
blockstore: IpldStore
|
||||
data: DataStore
|
||||
cid: CID
|
||||
@ -132,7 +122,7 @@ export class Repo implements CarStreamable {
|
||||
}
|
||||
const currentCommit = this.cid
|
||||
const updatedData = await mutation(this.data)
|
||||
const tokenCid = await this.ucanForOperation({} as any) // @TODO get an actual token
|
||||
const tokenCid = await this.ucanForOperation(updatedData)
|
||||
const dataCid = await updatedData.save()
|
||||
const root: RepoRoot = {
|
||||
did: this.did,
|
||||
@ -194,6 +184,13 @@ export class Repo implements CarStreamable {
|
||||
return this.blockstore.get(commit.root, schema.repoRoot)
|
||||
}
|
||||
|
||||
async loadRoot(newRoot: CID): Promise<void> {
|
||||
const commit = await this.blockstore.get(newRoot, schema.commit)
|
||||
const root = await this.blockstore.get(commit.root, schema.repoRoot)
|
||||
this.data = await MST.fromCid(this.blockstore, root.data)
|
||||
this.cid = newRoot
|
||||
}
|
||||
|
||||
// IPLD STORE PASS THROUGHS
|
||||
// -----------
|
||||
|
||||
@ -208,23 +205,18 @@ export class Repo implements CarStreamable {
|
||||
// UCAN AUTH
|
||||
// -----------
|
||||
|
||||
async ucanForOperation(update: UpdateData): Promise<CID> {
|
||||
async ucanForOperation(newData: DataStore): Promise<CID> {
|
||||
if (!this.authStore) {
|
||||
throw new Error('No keypair provided. Repo is read-only.')
|
||||
}
|
||||
const neededCap = auth.writeCap(
|
||||
const diff = await this.data.diff(newData)
|
||||
const neededCaps = diff.neededCapabilities(this.did)
|
||||
const ucanForOp = await this.authStore.createUcanForCaps(
|
||||
this.did,
|
||||
update.namespace,
|
||||
update.collection,
|
||||
update.tid?.toString(),
|
||||
neededCaps,
|
||||
30,
|
||||
)
|
||||
const foundUcan = await this.authStore.findUcan(neededCap)
|
||||
if (foundUcan === null) {
|
||||
throw new Error(
|
||||
`Could not find a valid ucan for operation: ${neededCap.can.toString()}`,
|
||||
)
|
||||
}
|
||||
return this.blockstore.put(auth.encodeUcan(foundUcan))
|
||||
return this.blockstore.put(auth.encodeUcan(ucanForOp))
|
||||
}
|
||||
|
||||
async maintenanceToken(forDid: string): Promise<auth.Ucan> {
|
||||
@ -260,140 +252,72 @@ export class Repo implements CarStreamable {
|
||||
|
||||
// loads car files, verifies structure, signature & auth on each commit
|
||||
// emits semantic updates to the structure starting from oldest first
|
||||
async loadAndVerifyDiff(
|
||||
buf: Uint8Array,
|
||||
emit?: (evt: delta.Event) => Promise<void>,
|
||||
): Promise<void> {
|
||||
async loadAndVerifyDiff(buf: Uint8Array): Promise<void> {
|
||||
const root = await this.loadCar(buf)
|
||||
await this.verifySetOfUpdates(this.cid, root, emit)
|
||||
await this.verifySetOfUpdates(this.cid, root)
|
||||
await this.loadRoot(root)
|
||||
}
|
||||
|
||||
async verifySetOfUpdates(
|
||||
from: CID | null,
|
||||
to: CID,
|
||||
emit?: (evt: delta.Event) => Promise<void>,
|
||||
oldCommit: CID | null,
|
||||
recentCommit: CID,
|
||||
): Promise<void> {
|
||||
if (to.equals(from)) return
|
||||
const toRepo = await Repo.load(this.blockstore, to)
|
||||
if (recentCommit.equals(oldCommit)) return
|
||||
const toRepo = await Repo.load(this.blockstore, recentCommit)
|
||||
const root = await toRepo.getRoot()
|
||||
|
||||
// if the root does not have a predecessor (the genesis commit)
|
||||
// & we still have not found the commit we're searching for, then bail
|
||||
if (!root.prev) {
|
||||
if (from === null) {
|
||||
if (oldCommit === null) {
|
||||
return
|
||||
} else {
|
||||
throw new Error('Could not find start repo root')
|
||||
throw new Error('Could not find shared history')
|
||||
}
|
||||
}
|
||||
await this.verifySetOfUpdates(from, root.prev, emit)
|
||||
const prevRepo = await Repo.load(this.blockstore, root.prev)
|
||||
const updates = await toRepo.verifyUpdate(prevRepo)
|
||||
|
||||
// verify sig & auth
|
||||
const prevRepo = await Repo.load(this.blockstore, root.prev)
|
||||
|
||||
// verify auth token covers all necessary writes
|
||||
const encodedToken = await this.blockstore.get(
|
||||
root.auth_token,
|
||||
schema.string,
|
||||
)
|
||||
const token = await auth.validateUcan(encodedToken)
|
||||
const diff = await prevRepo.data.diff(this.data)
|
||||
const neededCaps = diff.neededCapabilities(this.did)
|
||||
for (const cap of neededCaps) {
|
||||
await auth.verifyAdxUcan(token, this.did, cap)
|
||||
}
|
||||
|
||||
// verify signature matches repo root + auth token
|
||||
const commit = await toRepo.getCommit()
|
||||
const validSig = await auth.verifySignature(
|
||||
token.payload.aud,
|
||||
token.payload.iss,
|
||||
commit.root.bytes,
|
||||
commit.sig,
|
||||
)
|
||||
if (!validSig) {
|
||||
throw new Error(`Invalid signature on commit: ${toRepo.cid.toString()}`)
|
||||
}
|
||||
for (const update of updates) {
|
||||
const neededCap = delta.capabilityForEvent(root.did, update)
|
||||
try {
|
||||
await auth.verifyAdxUcan(token, token.payload.aud, neededCap)
|
||||
} catch (err) {
|
||||
console.log('TOKEN: ', token)
|
||||
console.log('NEEDED CAP: ', neededCap)
|
||||
console.log('ATT: ', token.payload.att)
|
||||
console.log('CAN: ', token.payload.att[0].can)
|
||||
throw err
|
||||
}
|
||||
if (emit) {
|
||||
await emit(update)
|
||||
}
|
||||
}
|
||||
|
||||
// check next commits
|
||||
await this.verifySetOfUpdates(oldCommit, root.prev)
|
||||
}
|
||||
|
||||
async verifyUpdate(prev: Repo): Promise<delta.Event[]> {
|
||||
const root = await this.getRoot()
|
||||
if (!root.prev) {
|
||||
throw new Error('No previous version found at root')
|
||||
} else if (!root.prev.equals(prev.cid)) {
|
||||
throw new Error('Previous version root CID does not match')
|
||||
}
|
||||
if (root.did !== prev.did) {
|
||||
throw new Error('Changes in DID are not allowed at this point')
|
||||
}
|
||||
const newCids = new CidSet(root.new_cids)
|
||||
let events: delta.Event[] = []
|
||||
const mapDiff = delta.idMapDiff(
|
||||
prev.namespaceCids,
|
||||
this.namespaceCids,
|
||||
newCids,
|
||||
)
|
||||
// namespace deletes: we can emit as events
|
||||
for (const del of mapDiff.deletes) {
|
||||
events.push(delta.deletedNamespace(del.key))
|
||||
}
|
||||
// namespace adds: we walk to ensure we have all content & then emit all posts & interactions
|
||||
for (const add of mapDiff.adds) {
|
||||
const namespace = await Namespace.load(this.blockstore, add.cid)
|
||||
const missing = await namespace.missingCids()
|
||||
if (missing.size() > 0) {
|
||||
throw new Error(
|
||||
`Missing cids for namespace ${add.key}: ${missing.toList()}`,
|
||||
)
|
||||
}
|
||||
const [newPosts, newInters] = await Promise.all([
|
||||
namespace.posts.getAllEntries(),
|
||||
namespace.interactions.getAllEntries(),
|
||||
])
|
||||
for (const { cid, tid } of newPosts) {
|
||||
events.push(delta.addedObject(add.key, 'posts', tid, cid))
|
||||
}
|
||||
for (const { cid, tid } of newInters) {
|
||||
events.push(delta.addedObject(add.key, 'interactions', tid, cid))
|
||||
}
|
||||
}
|
||||
// namespace updates: we dive deeper to figure out the differences
|
||||
for (const update of mapDiff.updates) {
|
||||
const [old, curr] = await Promise.all([
|
||||
Namespace.load(this.blockstore, update.old),
|
||||
Namespace.load(this.blockstore, update.cid),
|
||||
])
|
||||
const updates = await curr.verifyUpdate(old, newCids, update.key)
|
||||
events = events.concat(updates)
|
||||
}
|
||||
// relationship updates: we dive deeper to figure out the difference
|
||||
if (this.relationships.cid !== prev.relationships.cid) {
|
||||
const updates = await this.relationships.verifyUpdate(
|
||||
prev.relationships,
|
||||
newCids,
|
||||
)
|
||||
events = events.concat(updates)
|
||||
}
|
||||
return events
|
||||
}
|
||||
|
||||
async missingCids(): Promise<CidSet> {
|
||||
const missing = new CidSet()
|
||||
for (const cid of Object.values(this.namespaceCids)) {
|
||||
if (await this.blockstore.has(cid)) {
|
||||
const namespace = await Namespace.load(this.blockstore, cid)
|
||||
const namespaceMissing = await namespace.missingCids()
|
||||
missing.addSet(namespaceMissing)
|
||||
} else {
|
||||
missing.add(cid)
|
||||
}
|
||||
}
|
||||
return missing
|
||||
}
|
||||
// async missingCids(): Promise<CidSet> {
|
||||
// const missing = new CidSet()
|
||||
// for (const cid of Object.values(this.namespaceCids)) {
|
||||
// if (await this.blockstore.has(cid)) {
|
||||
// const namespace = await Namespace.load(this.blockstore, cid)
|
||||
// const namespaceMissing = await namespace.missingCids()
|
||||
// missing.addSet(namespaceMissing)
|
||||
// } else {
|
||||
// missing.add(cid)
|
||||
// }
|
||||
// }
|
||||
// return missing
|
||||
// }
|
||||
|
||||
// CAR FILES
|
||||
// -----------
|
||||
@ -418,7 +342,7 @@ export class Repo implements CarStreamable {
|
||||
|
||||
async getCarNoHistory(): Promise<Uint8Array> {
|
||||
return this.openCar((car: BlockWriter) => {
|
||||
return this.writeToCarStream(car)
|
||||
return this.writeCheckoutToCarStream(car)
|
||||
})
|
||||
}
|
||||
|
||||
@ -441,17 +365,13 @@ export class Repo implements CarStreamable {
|
||||
return streamToArray(out)
|
||||
}
|
||||
|
||||
async writeToCarStream(car: BlockWriter): Promise<void> {
|
||||
async writeCheckoutToCarStream(car: BlockWriter): Promise<void> {
|
||||
await this.blockstore.addToCar(car, this.cid)
|
||||
const commit = await this.blockstore.get(this.cid, schema.commit)
|
||||
await this.blockstore.addToCar(car, commit.root)
|
||||
await this.relationships.writeToCarStream(car)
|
||||
await Promise.all(
|
||||
Object.values(this.namespaceCids).map(async (cid) => {
|
||||
const namespace = await Namespace.load(this.blockstore, cid)
|
||||
await namespace.writeToCarStream(car)
|
||||
}),
|
||||
)
|
||||
const root = await this.blockstore.get(commit.root, schema.repoRoot)
|
||||
await this.blockstore.addToCar(car, root.auth_token)
|
||||
await this.data.writeToCarStream(car)
|
||||
}
|
||||
|
||||
async writeCommitsToCarStream(
|
||||
@ -459,23 +379,23 @@ export class Repo implements CarStreamable {
|
||||
newestCommit: CID,
|
||||
oldestCommit: CID | null,
|
||||
): Promise<void> {
|
||||
if (oldestCommit && oldestCommit.equals(newestCommit)) return
|
||||
const commit = await this.blockstore.get(newestCommit, schema.commit)
|
||||
const { new_cids, prev } = await this.blockstore.get(
|
||||
commit.root,
|
||||
schema.repoRoot,
|
||||
)
|
||||
await this.blockstore.addToCar(car, newestCommit)
|
||||
await this.blockstore.addToCar(car, commit.root)
|
||||
|
||||
await Promise.all(new_cids.map((cid) => this.blockstore.addToCar(car, cid)))
|
||||
if (!prev) {
|
||||
if (oldestCommit === null) {
|
||||
return
|
||||
}
|
||||
throw new Error(`Could not find commit in repo history: $${oldestCommit}`)
|
||||
}
|
||||
await this.writeCommitsToCarStream(car, prev, oldestCommit)
|
||||
// @TODO write this
|
||||
// if (newestCommit.equals(oldestCommit)) return
|
||||
// const commit = await this.blockstore.get(newestCommit, schema.commit)
|
||||
// const { new_cids, prev } = await this.blockstore.get(
|
||||
// commit.root,
|
||||
// schema.repoRoot,
|
||||
// )
|
||||
// await this.blockstore.addToCar(car, newestCommit)
|
||||
// await this.blockstore.addToCar(car, commit.root)
|
||||
// await Promise.all(new_cids.map((cid) => this.blockstore.addToCar(car, cid)))
|
||||
// if (!prev) {
|
||||
// if (oldestCommit === null) {
|
||||
// return
|
||||
// }
|
||||
// throw new Error(`Could not find commit in repo history: $${oldestCommit}`)
|
||||
// }
|
||||
// await this.writeCommitsToCarStream(car, prev, oldestCommit)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -106,4 +106,5 @@ export interface DataStore {
|
||||
listWithPrefix(from: string, count?: number): Promise<DataValue[]>
|
||||
diff(other: DataStore): Promise<DataDiff>
|
||||
save(): Promise<CID>
|
||||
writeToCarStream(car: BlockWriter): Promise<void>
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
import * as auth from '@adxp/auth'
|
||||
|
||||
import Repo from '../src/repo/index'
|
||||
import { Repo } from '../src/repo'
|
||||
import IpldStore from '../src/blockstore/ipld-store'
|
||||
|
||||
import * as util from './_util'
|
||||
|
Loading…
x
Reference in New Issue
Block a user