* appview: setup etcd-based dataplane host list * appview: remove old file * appview: tidy etcd host list functionality and add tests * appview: add config and lifecycle for etcd-based dataplane host list * tidy * tidy * appview: tidy config and dataplane client types * build
302 lines
10 KiB
TypeScript
302 lines
10 KiB
TypeScript
import EventEmitter from 'node:events'
|
|
import { Etcd3, IKeyValue } from 'etcd3'
|
|
import { EtcdHostList } from '../src'
|
|
import { EtcdMap } from '../src/etcd'
|
|
|
|
describe('etcd', () => {
|
|
describe('EtcdMap', () => {
|
|
it('initializes values based on current keys', async () => {
|
|
const etcd = new MockEtcd()
|
|
etcd.watcher.set('service/a', { value: '1' })
|
|
etcd.watcher.set('service/b', { value: '2' })
|
|
etcd.watcher.set('service/c', { value: '3' })
|
|
const map = new EtcdMap(etcd as unknown as Etcd3)
|
|
await map.connect()
|
|
expect(map.get('service/a')).toBe('1')
|
|
expect(map.get('service/b')).toBe('2')
|
|
expect(map.get('service/c')).toBe('3')
|
|
expect([...map.values()]).toEqual(['1', '2', '3'])
|
|
})
|
|
|
|
it('maintains key updates', async () => {
|
|
const etcd = new MockEtcd()
|
|
etcd.watcher.set('service/a', { value: '1' })
|
|
etcd.watcher.set('service/b', { value: '2' })
|
|
etcd.watcher.set('service/c', { value: '3' })
|
|
const map = new EtcdMap(etcd as unknown as Etcd3)
|
|
await map.connect()
|
|
etcd.watcher.set('service/b', { value: '4' })
|
|
expect(map.get('service/a')).toBe('1')
|
|
expect(map.get('service/b')).toBe('4')
|
|
expect(map.get('service/c')).toBe('3')
|
|
expect([...map.values()]).toEqual(['1', '4', '3'])
|
|
})
|
|
|
|
it('maintains key creates', async () => {
|
|
const etcd = new MockEtcd()
|
|
etcd.watcher.set('service/a', { value: '1' })
|
|
const map = new EtcdMap(etcd as unknown as Etcd3)
|
|
await map.connect()
|
|
etcd.watcher.set('service/b', { value: '2' })
|
|
expect(map.get('service/a')).toBe('1')
|
|
expect(map.get('service/b')).toBe('2')
|
|
expect([...map.values()]).toEqual(['1', '2'])
|
|
})
|
|
|
|
it('maintains key deletions', async () => {
|
|
const etcd = new MockEtcd()
|
|
etcd.watcher.set('service/a', { value: '1' })
|
|
etcd.watcher.set('service/b', { value: '2' })
|
|
const map = new EtcdMap(etcd as unknown as Etcd3)
|
|
await map.connect()
|
|
etcd.watcher.del('service/b')
|
|
expect(map.get('service/a')).toBe('1')
|
|
expect(map.get('service/b')).toBe(null)
|
|
expect([...map.values()]).toEqual(['1'])
|
|
})
|
|
|
|
it('notifies of updates', async () => {
|
|
const etcd = new MockEtcd()
|
|
etcd.watcher.set('service/a', { value: '1' })
|
|
etcd.watcher.set('service/b', { value: '2' })
|
|
const map = new EtcdMap(etcd as unknown as Etcd3)
|
|
await map.connect()
|
|
const states: string[][] = [[...map.values()]]
|
|
map.onUpdate((update) => {
|
|
states.push([...update.values()])
|
|
})
|
|
etcd.watcher.set('service/c', { value: '3' })
|
|
etcd.watcher.del('service/b')
|
|
etcd.watcher.set('service/a', { value: '4' })
|
|
expect(states).toEqual([
|
|
['1', '2'],
|
|
['1', '2', '3'],
|
|
['1', '3'],
|
|
['4', '3'],
|
|
])
|
|
})
|
|
|
|
it('ignores out-of-order updates', async () => {
|
|
const etcd = new MockEtcd()
|
|
etcd.watcher.set('service/a', { value: '1' })
|
|
const map = new EtcdMap(etcd as unknown as Etcd3)
|
|
await map.connect()
|
|
const states: string[][] = [[...map.values()]]
|
|
map.onUpdate((update) => {
|
|
states.push([...update.values()])
|
|
})
|
|
etcd.watcher.set('service/a', { value: '2' })
|
|
etcd.watcher.set('service/a', { value: '3', overrideRev: 1 }) // old rev
|
|
etcd.watcher.set('service/a', { value: '4' })
|
|
expect(map.get('service/a')).toBe('4')
|
|
expect(states).toEqual([['1'], ['2'], ['4']]) // never witnessed 3
|
|
})
|
|
})
|
|
|
|
describe('EtcdHostList', () => {
|
|
it('initializes values based on current keys', async () => {
|
|
const etcd = new MockEtcd()
|
|
etcd.watcher.set('service/a', { value: 'http://192.168.1.1' })
|
|
etcd.watcher.set('service/b', { value: 'http://192.168.1.2' })
|
|
etcd.watcher.set('service/c', { value: 'http://192.168.1.3' })
|
|
const hostList = new EtcdHostList(etcd as unknown as Etcd3, '')
|
|
await hostList.connect()
|
|
expect([...hostList.get()]).toEqual([
|
|
'http://192.168.1.1',
|
|
'http://192.168.1.2',
|
|
'http://192.168.1.3',
|
|
])
|
|
})
|
|
|
|
it('maintains key updates', async () => {
|
|
const etcd = new MockEtcd()
|
|
etcd.watcher.set('service/a', { value: 'http://192.168.1.1' })
|
|
etcd.watcher.set('service/b', { value: 'http://192.168.1.2' })
|
|
etcd.watcher.set('service/c', { value: 'http://192.168.1.3' })
|
|
const hostList = new EtcdHostList(etcd as unknown as Etcd3, '')
|
|
await hostList.connect()
|
|
etcd.watcher.set('service/b', { value: 'http://192.168.1.4' })
|
|
expect([...hostList.get()]).toEqual([
|
|
'http://192.168.1.1',
|
|
'http://192.168.1.4',
|
|
'http://192.168.1.3',
|
|
])
|
|
})
|
|
|
|
it('maintains key creates', async () => {
|
|
const etcd = new MockEtcd()
|
|
etcd.watcher.set('service/a', { value: 'http://192.168.1.1' })
|
|
const hostList = new EtcdHostList(etcd as unknown as Etcd3, '')
|
|
await hostList.connect()
|
|
etcd.watcher.set('service/b', { value: 'http://192.168.1.2' })
|
|
expect([...hostList.get()]).toEqual([
|
|
'http://192.168.1.1',
|
|
'http://192.168.1.2',
|
|
])
|
|
})
|
|
|
|
it('maintains key deletions', async () => {
|
|
const etcd = new MockEtcd()
|
|
etcd.watcher.set('service/a', { value: 'http://192.168.1.1' })
|
|
etcd.watcher.set('service/b', { value: 'http://192.168.1.2' })
|
|
const hostList = new EtcdHostList(etcd as unknown as Etcd3, '')
|
|
await hostList.connect()
|
|
etcd.watcher.del('service/b')
|
|
expect([...hostList.get()]).toEqual(['http://192.168.1.1'])
|
|
})
|
|
|
|
it('notifies of updates', async () => {
|
|
const etcd = new MockEtcd()
|
|
etcd.watcher.set('service/a', { value: 'http://192.168.1.1' })
|
|
etcd.watcher.set('service/b', { value: 'http://192.168.1.2' })
|
|
const hostList = new EtcdHostList(etcd as unknown as Etcd3, '')
|
|
await hostList.connect()
|
|
const states: string[][] = [[...hostList.get()]]
|
|
hostList.onUpdate((updated) => {
|
|
expect([...updated]).toEqual([...hostList.get()])
|
|
states.push([...updated])
|
|
})
|
|
etcd.watcher.set('service/c', { value: 'http://192.168.1.3' })
|
|
etcd.watcher.del('service/b')
|
|
etcd.watcher.set('service/a', { value: 'http://192.168.1.4' })
|
|
expect(states).toEqual([
|
|
['http://192.168.1.1', 'http://192.168.1.2'],
|
|
['http://192.168.1.1', 'http://192.168.1.2', 'http://192.168.1.3'],
|
|
['http://192.168.1.1', 'http://192.168.1.3'],
|
|
['http://192.168.1.4', 'http://192.168.1.3'],
|
|
])
|
|
})
|
|
|
|
it('ignores bad host values', async () => {
|
|
const etcd = new MockEtcd()
|
|
etcd.watcher.set('service/a', { value: 'not-a-host' })
|
|
etcd.watcher.set('service/b', { value: 'http://192.168.1.2' })
|
|
const hostList = new EtcdHostList(etcd as unknown as Etcd3, '')
|
|
await hostList.connect()
|
|
expect([...hostList.get()]).toEqual(['http://192.168.1.2'])
|
|
etcd.watcher.set('service/a', { value: 'http://192.168.1.1' })
|
|
etcd.watcher.set('service/c', { value: 'not-a-host' })
|
|
expect([...hostList.get()]).toEqual([
|
|
'http://192.168.1.1',
|
|
'http://192.168.1.2',
|
|
])
|
|
etcd.watcher.set('service/c', { value: 'http://192.168.1.3' })
|
|
expect([...hostList.get()]).toEqual([
|
|
'http://192.168.1.1',
|
|
'http://192.168.1.2',
|
|
'http://192.168.1.3',
|
|
])
|
|
})
|
|
|
|
it('falls back to static host list when uninitialized or no keys available', async () => {
|
|
const etcd = new MockEtcd()
|
|
const hostList = new EtcdHostList(etcd as unknown as Etcd3, '', [
|
|
'http://10.0.0.1',
|
|
'http://10.0.0.2',
|
|
])
|
|
etcd.watcher.set('service/a', { value: 'http://192.168.1.1' })
|
|
expect([...hostList.get()]).toEqual([
|
|
'http://10.0.0.1',
|
|
'http://10.0.0.2',
|
|
])
|
|
await hostList.connect()
|
|
const states: string[][] = [[...hostList.get()]]
|
|
hostList.onUpdate((updated) => {
|
|
states.push([...updated])
|
|
})
|
|
etcd.watcher.del('service/a')
|
|
etcd.watcher.set('service/b', { value: 'http://192.168.1.2' })
|
|
expect(states).toEqual([
|
|
['http://192.168.1.1'],
|
|
['http://10.0.0.1', 'http://10.0.0.2'],
|
|
['http://192.168.1.2'],
|
|
])
|
|
})
|
|
})
|
|
})
|
|
|
|
class MockEtcd {
|
|
public watcher = new MockWatcher()
|
|
watch() {
|
|
const watcher = this.watcher
|
|
return {
|
|
prefix() {
|
|
return {
|
|
watcher() {
|
|
return watcher
|
|
},
|
|
}
|
|
},
|
|
}
|
|
}
|
|
getAll() {
|
|
const watcher = this.watcher
|
|
return {
|
|
prefix() {
|
|
return {
|
|
async exec(): Promise<{ kvs: IKeyValue[] }> {
|
|
return { kvs: watcher.getAll() }
|
|
},
|
|
}
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
class MockWatcher extends EventEmitter {
|
|
rev = 1
|
|
kvs: IKeyValue[] = []
|
|
constructor() {
|
|
super()
|
|
process.nextTick(() => this.emit('connected', {}))
|
|
}
|
|
get(key: string): IKeyValue | null {
|
|
const found = this.kvs.find((kv) => kv.key.toString() === key)
|
|
return found ?? null
|
|
}
|
|
getAll(): IKeyValue[] {
|
|
return [...this.kvs]
|
|
}
|
|
set(
|
|
key: string,
|
|
{ value, overrideRev }: { value: string; overrideRev?: number },
|
|
) {
|
|
const found = this.kvs.find((kv) => kv.key.toString() === key)
|
|
const rev = overrideRev ?? ++this.rev
|
|
if (found) {
|
|
found.value = Buffer.from(value)
|
|
found.mod_revision = rev.toString()
|
|
found.version = (parseInt(found.version, 10) + 1).toString()
|
|
this.emit('put', found)
|
|
} else {
|
|
const created = {
|
|
key: Buffer.from(key),
|
|
value: Buffer.from(value),
|
|
create_revision: rev.toString(),
|
|
mod_revision: rev.toString(),
|
|
version: '1',
|
|
lease: '0',
|
|
}
|
|
this.kvs.push(created)
|
|
this.emit('put', created)
|
|
}
|
|
}
|
|
del(key: string) {
|
|
const foundIdx = this.kvs.findIndex((kv) => kv.key.toString() === key)
|
|
if (foundIdx === -1) return
|
|
const [deleted] = this.kvs.splice(foundIdx, 1)
|
|
const rev = ++this.rev
|
|
deleted.value = Buffer.from('')
|
|
deleted.mod_revision = rev.toString()
|
|
deleted.create_revision = '0'
|
|
deleted.version = '0'
|
|
this.emit('delete', deleted)
|
|
}
|
|
on(evt: 'connected', listener: (res: unknown) => void): any
|
|
on(evt: 'put', listener: (kv: IKeyValue) => void): any
|
|
on(evt: 'delete', listener: (kv: IKeyValue) => void): any
|
|
on(evt: string, listener: (...args: any[]) => void) {
|
|
super.on(evt, listener)
|
|
}
|
|
}
|