atproto/packages/bsky/tests/etcd.test.ts
devin ivy 48b0a6f7b9
Appview: etcd-backed dataplane host list (#3586)
* appview: setup etcd-based dataplane host list

* appview: remove old file

* appview: tidy etcd host list functionality and add tests

* appview: add config and lifecycle for etcd-based dataplane host list

* tidy

* tidy

* appview: tidy config and dataplane client types

* build
2025-02-26 11:20:07 -05:00

302 lines
10 KiB
TypeScript

import EventEmitter from 'node:events'
import { Etcd3, IKeyValue } from 'etcd3'
import { EtcdHostList } from '../src'
import { EtcdMap } from '../src/etcd'
describe('etcd', () => {
describe('EtcdMap', () => {
it('initializes values based on current keys', async () => {
const etcd = new MockEtcd()
etcd.watcher.set('service/a', { value: '1' })
etcd.watcher.set('service/b', { value: '2' })
etcd.watcher.set('service/c', { value: '3' })
const map = new EtcdMap(etcd as unknown as Etcd3)
await map.connect()
expect(map.get('service/a')).toBe('1')
expect(map.get('service/b')).toBe('2')
expect(map.get('service/c')).toBe('3')
expect([...map.values()]).toEqual(['1', '2', '3'])
})
it('maintains key updates', async () => {
const etcd = new MockEtcd()
etcd.watcher.set('service/a', { value: '1' })
etcd.watcher.set('service/b', { value: '2' })
etcd.watcher.set('service/c', { value: '3' })
const map = new EtcdMap(etcd as unknown as Etcd3)
await map.connect()
etcd.watcher.set('service/b', { value: '4' })
expect(map.get('service/a')).toBe('1')
expect(map.get('service/b')).toBe('4')
expect(map.get('service/c')).toBe('3')
expect([...map.values()]).toEqual(['1', '4', '3'])
})
it('maintains key creates', async () => {
const etcd = new MockEtcd()
etcd.watcher.set('service/a', { value: '1' })
const map = new EtcdMap(etcd as unknown as Etcd3)
await map.connect()
etcd.watcher.set('service/b', { value: '2' })
expect(map.get('service/a')).toBe('1')
expect(map.get('service/b')).toBe('2')
expect([...map.values()]).toEqual(['1', '2'])
})
it('maintains key deletions', async () => {
const etcd = new MockEtcd()
etcd.watcher.set('service/a', { value: '1' })
etcd.watcher.set('service/b', { value: '2' })
const map = new EtcdMap(etcd as unknown as Etcd3)
await map.connect()
etcd.watcher.del('service/b')
expect(map.get('service/a')).toBe('1')
expect(map.get('service/b')).toBe(null)
expect([...map.values()]).toEqual(['1'])
})
it('notifies of updates', async () => {
const etcd = new MockEtcd()
etcd.watcher.set('service/a', { value: '1' })
etcd.watcher.set('service/b', { value: '2' })
const map = new EtcdMap(etcd as unknown as Etcd3)
await map.connect()
const states: string[][] = [[...map.values()]]
map.onUpdate((update) => {
states.push([...update.values()])
})
etcd.watcher.set('service/c', { value: '3' })
etcd.watcher.del('service/b')
etcd.watcher.set('service/a', { value: '4' })
expect(states).toEqual([
['1', '2'],
['1', '2', '3'],
['1', '3'],
['4', '3'],
])
})
it('ignores out-of-order updates', async () => {
const etcd = new MockEtcd()
etcd.watcher.set('service/a', { value: '1' })
const map = new EtcdMap(etcd as unknown as Etcd3)
await map.connect()
const states: string[][] = [[...map.values()]]
map.onUpdate((update) => {
states.push([...update.values()])
})
etcd.watcher.set('service/a', { value: '2' })
etcd.watcher.set('service/a', { value: '3', overrideRev: 1 }) // old rev
etcd.watcher.set('service/a', { value: '4' })
expect(map.get('service/a')).toBe('4')
expect(states).toEqual([['1'], ['2'], ['4']]) // never witnessed 3
})
})
describe('EtcdHostList', () => {
it('initializes values based on current keys', async () => {
const etcd = new MockEtcd()
etcd.watcher.set('service/a', { value: 'http://192.168.1.1' })
etcd.watcher.set('service/b', { value: 'http://192.168.1.2' })
etcd.watcher.set('service/c', { value: 'http://192.168.1.3' })
const hostList = new EtcdHostList(etcd as unknown as Etcd3, '')
await hostList.connect()
expect([...hostList.get()]).toEqual([
'http://192.168.1.1',
'http://192.168.1.2',
'http://192.168.1.3',
])
})
it('maintains key updates', async () => {
const etcd = new MockEtcd()
etcd.watcher.set('service/a', { value: 'http://192.168.1.1' })
etcd.watcher.set('service/b', { value: 'http://192.168.1.2' })
etcd.watcher.set('service/c', { value: 'http://192.168.1.3' })
const hostList = new EtcdHostList(etcd as unknown as Etcd3, '')
await hostList.connect()
etcd.watcher.set('service/b', { value: 'http://192.168.1.4' })
expect([...hostList.get()]).toEqual([
'http://192.168.1.1',
'http://192.168.1.4',
'http://192.168.1.3',
])
})
it('maintains key creates', async () => {
const etcd = new MockEtcd()
etcd.watcher.set('service/a', { value: 'http://192.168.1.1' })
const hostList = new EtcdHostList(etcd as unknown as Etcd3, '')
await hostList.connect()
etcd.watcher.set('service/b', { value: 'http://192.168.1.2' })
expect([...hostList.get()]).toEqual([
'http://192.168.1.1',
'http://192.168.1.2',
])
})
it('maintains key deletions', async () => {
const etcd = new MockEtcd()
etcd.watcher.set('service/a', { value: 'http://192.168.1.1' })
etcd.watcher.set('service/b', { value: 'http://192.168.1.2' })
const hostList = new EtcdHostList(etcd as unknown as Etcd3, '')
await hostList.connect()
etcd.watcher.del('service/b')
expect([...hostList.get()]).toEqual(['http://192.168.1.1'])
})
it('notifies of updates', async () => {
const etcd = new MockEtcd()
etcd.watcher.set('service/a', { value: 'http://192.168.1.1' })
etcd.watcher.set('service/b', { value: 'http://192.168.1.2' })
const hostList = new EtcdHostList(etcd as unknown as Etcd3, '')
await hostList.connect()
const states: string[][] = [[...hostList.get()]]
hostList.onUpdate((updated) => {
expect([...updated]).toEqual([...hostList.get()])
states.push([...updated])
})
etcd.watcher.set('service/c', { value: 'http://192.168.1.3' })
etcd.watcher.del('service/b')
etcd.watcher.set('service/a', { value: 'http://192.168.1.4' })
expect(states).toEqual([
['http://192.168.1.1', 'http://192.168.1.2'],
['http://192.168.1.1', 'http://192.168.1.2', 'http://192.168.1.3'],
['http://192.168.1.1', 'http://192.168.1.3'],
['http://192.168.1.4', 'http://192.168.1.3'],
])
})
it('ignores bad host values', async () => {
const etcd = new MockEtcd()
etcd.watcher.set('service/a', { value: 'not-a-host' })
etcd.watcher.set('service/b', { value: 'http://192.168.1.2' })
const hostList = new EtcdHostList(etcd as unknown as Etcd3, '')
await hostList.connect()
expect([...hostList.get()]).toEqual(['http://192.168.1.2'])
etcd.watcher.set('service/a', { value: 'http://192.168.1.1' })
etcd.watcher.set('service/c', { value: 'not-a-host' })
expect([...hostList.get()]).toEqual([
'http://192.168.1.1',
'http://192.168.1.2',
])
etcd.watcher.set('service/c', { value: 'http://192.168.1.3' })
expect([...hostList.get()]).toEqual([
'http://192.168.1.1',
'http://192.168.1.2',
'http://192.168.1.3',
])
})
it('falls back to static host list when uninitialized or no keys available', async () => {
const etcd = new MockEtcd()
const hostList = new EtcdHostList(etcd as unknown as Etcd3, '', [
'http://10.0.0.1',
'http://10.0.0.2',
])
etcd.watcher.set('service/a', { value: 'http://192.168.1.1' })
expect([...hostList.get()]).toEqual([
'http://10.0.0.1',
'http://10.0.0.2',
])
await hostList.connect()
const states: string[][] = [[...hostList.get()]]
hostList.onUpdate((updated) => {
states.push([...updated])
})
etcd.watcher.del('service/a')
etcd.watcher.set('service/b', { value: 'http://192.168.1.2' })
expect(states).toEqual([
['http://192.168.1.1'],
['http://10.0.0.1', 'http://10.0.0.2'],
['http://192.168.1.2'],
])
})
})
})
class MockEtcd {
public watcher = new MockWatcher()
watch() {
const watcher = this.watcher
return {
prefix() {
return {
watcher() {
return watcher
},
}
},
}
}
getAll() {
const watcher = this.watcher
return {
prefix() {
return {
async exec(): Promise<{ kvs: IKeyValue[] }> {
return { kvs: watcher.getAll() }
},
}
},
}
}
}
class MockWatcher extends EventEmitter {
rev = 1
kvs: IKeyValue[] = []
constructor() {
super()
process.nextTick(() => this.emit('connected', {}))
}
get(key: string): IKeyValue | null {
const found = this.kvs.find((kv) => kv.key.toString() === key)
return found ?? null
}
getAll(): IKeyValue[] {
return [...this.kvs]
}
set(
key: string,
{ value, overrideRev }: { value: string; overrideRev?: number },
) {
const found = this.kvs.find((kv) => kv.key.toString() === key)
const rev = overrideRev ?? ++this.rev
if (found) {
found.value = Buffer.from(value)
found.mod_revision = rev.toString()
found.version = (parseInt(found.version, 10) + 1).toString()
this.emit('put', found)
} else {
const created = {
key: Buffer.from(key),
value: Buffer.from(value),
create_revision: rev.toString(),
mod_revision: rev.toString(),
version: '1',
lease: '0',
}
this.kvs.push(created)
this.emit('put', created)
}
}
del(key: string) {
const foundIdx = this.kvs.findIndex((kv) => kv.key.toString() === key)
if (foundIdx === -1) return
const [deleted] = this.kvs.splice(foundIdx, 1)
const rev = ++this.rev
deleted.value = Buffer.from('')
deleted.mod_revision = rev.toString()
deleted.create_revision = '0'
deleted.version = '0'
this.emit('delete', deleted)
}
on(evt: 'connected', listener: (res: unknown) => void): any
on(evt: 'put', listener: (kv: IKeyValue) => void): any
on(evt: 'delete', listener: (kv: IKeyValue) => void): any
on(evt: string, listener: (...args: any[]) => void) {
super.on(evt, listener)
}
}