diff --git a/migrations/20260411_210100_add_dashboard_live_notify_triggers.js b/migrations/20260411_210100_add_dashboard_live_notify_triggers.js new file mode 100644 index 00000000..6cc63c3c --- /dev/null +++ b/migrations/20260411_210100_add_dashboard_live_notify_triggers.js @@ -0,0 +1,44 @@ +exports.up = async function (knex) { + await knex.schema.raw(` + CREATE OR REPLACE FUNCTION notify_dashboard_events_changed() RETURNS TRIGGER AS $$ + BEGIN + PERFORM pg_notify('dashboard_events_changed', TG_OP); + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + `) + + await knex.schema.raw(` + DROP TRIGGER IF EXISTS dashboard_events_changed_trigger ON events; + + CREATE TRIGGER dashboard_events_changed_trigger + AFTER INSERT OR UPDATE OR DELETE ON events + FOR EACH STATEMENT + EXECUTE FUNCTION notify_dashboard_events_changed(); + `) + + await knex.schema.raw(` + CREATE OR REPLACE FUNCTION notify_dashboard_users_changed() RETURNS TRIGGER AS $$ + BEGIN + PERFORM pg_notify('dashboard_users_changed', TG_OP); + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + `) + + await knex.schema.raw(` + DROP TRIGGER IF EXISTS dashboard_users_changed_trigger ON users; + + CREATE TRIGGER dashboard_users_changed_trigger + AFTER INSERT OR UPDATE OR DELETE ON users + FOR EACH STATEMENT + EXECUTE FUNCTION notify_dashboard_users_changed(); + `) +} + +exports.down = async function (knex) { + await knex.schema.raw('DROP TRIGGER IF EXISTS dashboard_events_changed_trigger ON events;') + await knex.schema.raw('DROP TRIGGER IF EXISTS dashboard_users_changed_trigger ON users;') + await knex.schema.raw('DROP FUNCTION IF EXISTS notify_dashboard_events_changed();') + await knex.schema.raw('DROP FUNCTION IF EXISTS notify_dashboard_users_changed();') +} diff --git a/migrations/20260412_020000_add_dashboard_kpi_indexes.js b/migrations/20260412_020000_add_dashboard_kpi_indexes.js new file mode 100644 index 00000000..63af1af2 --- /dev/null +++ b/migrations/20260412_020000_add_dashboard_kpi_indexes.js @@ -0,0 +1,40 @@ +/** + * Migration: add dashboard KPI query indexes + * + * Without these the incremental collector degrades to sequential scans: + * - idx_events_cursor → covers the (first_seen, id) cursor predicate used in every + * incremental delta query and the bootstrap cursor select. + * - idx_events_pubkey → covers the GROUP BY event_pubkey in the all-time talker query. + * - idx_users_cursor → covers the (updated_at, pubkey) cursor predicate used in the + * user delta / cursor-select queries. + * + * All three are created CONCURRENTLY so they don't lock the table on a live relay. + * knex does not support CREATE INDEX CONCURRENTLY natively, so we use raw SQL and + * set `disableTransactions` to true (DDL inside a transaction would negate CONCURRENTLY). + */ + +exports.up = async (knex) => { + await knex.raw(` + CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_cursor + ON events (first_seen, id); + `) + + await knex.raw(` + CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_pubkey + ON events (event_pubkey); + `) + + await knex.raw(` + CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_cursor + ON users (updated_at, pubkey); + `) +} + +exports.down = async (knex) => { + await knex.raw('DROP INDEX CONCURRENTLY IF EXISTS idx_events_cursor;') + await knex.raw('DROP INDEX CONCURRENTLY IF EXISTS idx_events_pubkey;') + await knex.raw('DROP INDEX CONCURRENTLY IF EXISTS idx_users_cursor;') +} + +// Required so knex doesn't wrap the CONCURRENTLY statements in a transaction. +exports.config = { transaction: false } diff --git a/package.json b/package.json index 9d9f902f..e78a3b12 100644 --- a/package.json +++ b/package.json @@ -23,10 +23,12 @@ "main": "src/index.ts", "scripts": { "dev": "node -r ts-node/register src/index.ts", + "dev:dashboard": "node -r ts-node/register src/dashboard-service/index.ts", "clean": "rimraf ./{dist,.nyc_output,.test-reports,.coverage}", "build": "tsc --project tsconfig.build.json", "prestart": "npm run build", "start": "cd dist && node src/index.js", + "start:dashboard": "node dist/src/dashboard-service/index.js", "build:check": "npm run build -- --noEmit", "lint": "eslint --ext .ts ./src ./test", "lint:report": "eslint -o .lint-reports/eslint.json -f json --ext .ts ./src ./test", @@ -37,6 +39,7 @@ "db:seed": "knex seed:run", "pretest:unit": "mkdir -p .test-reports/unit", "test:unit": "mocha 'test/**/*.spec.ts'", + "test:unit:dashboard": "mocha 'test/unit/dashboard-service/**/*.spec.ts'", "test:unit:watch": "npm run test:unit -- --min --watch --watch-files src/**/*,test/**/*", "cover:unit": "nyc --report-dir .coverage/unit npm run test:unit", "docker:build": "docker build -t nostream .", diff --git a/src/dashboard-service/api/dashboard-router.ts b/src/dashboard-service/api/dashboard-router.ts new file mode 100644 index 00000000..dc977c52 --- /dev/null +++ b/src/dashboard-service/api/dashboard-router.ts @@ -0,0 +1,12 @@ +import { Router } from 'express' + +import { createGetKPISnapshotRequestHandler } from '../handlers/request-handlers/get-kpi-snapshot-request-handler' +import { SnapshotService } from '../services/snapshot-service' + +export const createDashboardRouter = (snapshotService: SnapshotService): Router => { + const router = Router() + + router.get('/snapshot', createGetKPISnapshotRequestHandler(snapshotService)) + + return router +} diff --git a/src/dashboard-service/app.ts b/src/dashboard-service/app.ts new file mode 100644 index 00000000..e4970567 --- /dev/null +++ b/src/dashboard-service/app.ts @@ -0,0 +1,165 @@ +import { getMasterDbClient, getReadReplicaDbClient } from '../database/client' +import { IKPICollector, SnapshotService } from './services/snapshot-service' +import { createDashboardRouter } from './api/dashboard-router' +import { createLogger } from '../factories/logger-factory' +import { DashboardServiceConfig } from './config' +import { DashboardWebSocketHub } from './ws/dashboard-ws-hub' +import express from 'express' +import { getHealthRequestHandler } from './handlers/request-handlers/get-health-request-handler' +import http from 'http' +import { IncrementalKPICollectorService } from './services/incremental-kpi-collector-service' +import { KPICollectorService } from './services/kpi-collector-service' +import { PollingScheduler } from './polling/polling-scheduler' +import { StatefulIncrementalKPICollectorService } from './services/stateful-incremental-service' +import { WebSocketServer } from 'ws' +const debug = createLogger('dashboard-service:app') + +export interface DashboardService { + readonly config: DashboardServiceConfig + readonly snapshotService: SnapshotService + readonly pollingScheduler: PollingScheduler + start(): Promise + stop(): Promise + getHttpPort(): number +} + +export const createDashboardService = (config: DashboardServiceConfig): DashboardService => { + console.info( + 'dashboard-service: creating service (host=%s, port=%d, wsPath=%s, pollIntervalMs=%d, useDummyData=%s, collectorMode=%s)', + config.host, + config.port, + config.wsPath, + config.pollIntervalMs, + config.useDummyData, + config.collectorMode, + ) + + const collector: IKPICollector = config.useDummyData + ? { + collectMetrics: async () => ({ + eventsByKind: [], + admittedUsers: 0, + satsPaid: 0, + topTalkers: { allTime: [], recent: [] }, + }), + } + : (() => { + if (config.collectorMode === 'stateful-incremental') { + return new StatefulIncrementalKPICollectorService(getMasterDbClient()) + } + + const dbClient = getReadReplicaDbClient() + return config.collectorMode === 'incremental' + ? new IncrementalKPICollectorService(dbClient) + : new KPICollectorService(dbClient) + })() + + const snapshotService = new SnapshotService(collector) + + const app = express() + .disable('x-powered-by') + .get('/healthz', getHealthRequestHandler) + .use('/api/v1/kpis', createDashboardRouter(snapshotService)) + + const webServer = http.createServer(app) + const webSocketServer = new WebSocketServer({ + server: webServer, + path: config.wsPath, + }) + + const webSocketHub = new DashboardWebSocketHub(webSocketServer, () => snapshotService.getSnapshot()) + + const pollingScheduler = new PollingScheduler(config.pollIntervalMs, async () => { + const { snapshot, changed } = await snapshotService.refresh() + + if (!changed) { + debug('poll tick detected no KPI changes') + return + } + + debug('poll tick produced snapshot sequence=%d status=%s', snapshot.sequence, snapshot.status) + webSocketHub.broadcastTick(snapshot.sequence) + webSocketHub.broadcastSnapshot(snapshot) + }) + + const start = async () => { + if (webServer.listening) { + debug('start requested but service is already listening') + return + } + + console.info('dashboard-service: starting http and websocket servers') + + await new Promise((resolve, reject) => { + webServer.listen(config.port, config.host, () => { + const address = webServer.address() + debug('listening on %o', address) + console.info('dashboard-service: listening on %o', address) + resolve() + }) + webServer.once('error', (error) => { + console.error('dashboard-service: failed to start server', error) + reject(error) + }) + }) + + try { + const initialSnapshotRefresh = await snapshotService.refresh() + if (initialSnapshotRefresh.changed) { + debug('initial snapshot prepared with sequence=%d status=%s', initialSnapshotRefresh.snapshot.sequence, initialSnapshotRefresh.snapshot.status) + } + } catch (error) { + console.error('dashboard-service: initial snapshot refresh failed (will retry on next poll)', error) + } + + pollingScheduler.start() + console.info('dashboard-service: polling scheduler started') + } + + const stop = async () => { + console.info('dashboard-service: stopping service') + pollingScheduler.stop() + + if (collector?.close) { + try { + await collector.close() + } catch (error) { + console.error('dashboard-service: failed to close collector resources', error) + } + } + + webSocketHub.close() + await new Promise((resolve, reject) => { + if (!webServer.listening) { + debug('stop requested while server was already stopped') + resolve() + return + } + + webServer.close((error) => { + if (error) { + console.error('dashboard-service: failed to stop cleanly', error) + reject(error) + return + } + + console.info('dashboard-service: http server closed') + resolve() + }) + }) + } + + const getHttpPort = (): number => { + const address = webServer.address() + return typeof address === 'object' && address !== null ? address.port : config.port + } + + return { + config, + snapshotService, + pollingScheduler, + start, + stop, + getHttpPort, + } +} diff --git a/src/dashboard-service/config.ts b/src/dashboard-service/config.ts new file mode 100644 index 00000000..21872984 --- /dev/null +++ b/src/dashboard-service/config.ts @@ -0,0 +1,58 @@ +export interface DashboardServiceConfig { + host: string + port: number + wsPath: string + pollIntervalMs: number + useDummyData: boolean + collectorMode: DashboardCollectorMode +} + +export type DashboardCollectorMode = 'full' | 'incremental' | 'stateful-incremental' + +const parseBoolean = (value: string | undefined, fallback = false): boolean => { + if (typeof value === 'undefined') { + return fallback + } + + return value === '1' || value.toLowerCase() === 'true' +} + +const parseInteger = (value: string | undefined, fallback: number): number => { + if (typeof value === 'undefined' || value === '') { + return fallback + } + + const parsed = Number(value) + if (!Number.isInteger(parsed) || parsed < 0) { + return fallback + } + + return parsed +} + +const parseCollectorMode = ( + value: string | undefined, + fallback: DashboardCollectorMode = 'full', +): DashboardCollectorMode => { + if (typeof value === 'undefined') { + return fallback + } + + const normalized = value.toLowerCase() + if (normalized === 'full' || normalized === 'incremental' || normalized === 'stateful-incremental') { + return normalized + } + + return fallback +} + +export const getDashboardServiceConfig = (): DashboardServiceConfig => { + return { + host: process.env.DASHBOARD_SERVICE_HOST ?? '127.0.0.1', + port: parseInteger(process.env.DASHBOARD_SERVICE_PORT, 8011), + wsPath: process.env.DASHBOARD_WS_PATH ?? '/api/v1/kpis/stream', + pollIntervalMs: parseInteger(process.env.DASHBOARD_POLL_INTERVAL_MS, 5000), + useDummyData: parseBoolean(process.env.DASHBOARD_USE_DUMMY_DATA, false), + collectorMode: parseCollectorMode(process.env.DASHBOARD_COLLECTOR_MODE, 'full'), + } +} diff --git a/src/dashboard-service/controllers/get-health-controller.ts b/src/dashboard-service/controllers/get-health-controller.ts new file mode 100644 index 00000000..0c7037a3 --- /dev/null +++ b/src/dashboard-service/controllers/get-health-controller.ts @@ -0,0 +1,12 @@ +import { Request, Response } from 'express' + +import { IController } from '../../@types/controllers' + +export class GetHealthController implements IController { + public async handleRequest(_request: Request, response: Response): Promise { + response + .status(200) + .setHeader('content-type', 'application/json; charset=utf-8') + .send({ status: 'ok' }) + } +} \ No newline at end of file diff --git a/src/dashboard-service/controllers/get-kpi-snapshot-controller.ts b/src/dashboard-service/controllers/get-kpi-snapshot-controller.ts new file mode 100644 index 00000000..741d9902 --- /dev/null +++ b/src/dashboard-service/controllers/get-kpi-snapshot-controller.ts @@ -0,0 +1,20 @@ +import { Request, Response } from 'express' + +import { DashboardSnapshotResponse } from '../types' +import { IController } from '../../@types/controllers' +import { SnapshotService } from '../services/snapshot-service' + +export class GetKPISnapshotController implements IController { + public constructor(private readonly snapshotService: SnapshotService) { } + + public async handleRequest(_request: Request, response: Response): Promise { + const payload: DashboardSnapshotResponse = { + data: this.snapshotService.getSnapshot(), + } + + response + .status(200) + .setHeader('content-type', 'application/json; charset=utf-8') + .send(payload) + } +} \ No newline at end of file diff --git a/src/dashboard-service/handlers/request-handlers/get-health-request-handler.ts b/src/dashboard-service/handlers/request-handlers/get-health-request-handler.ts new file mode 100644 index 00000000..e8f2cea4 --- /dev/null +++ b/src/dashboard-service/handlers/request-handlers/get-health-request-handler.ts @@ -0,0 +1,5 @@ +import { withController } from '../../../handlers/request-handlers/with-controller-request-handler' + +import { GetHealthController } from '../../controllers/get-health-controller' + +export const getHealthRequestHandler = withController(() => new GetHealthController()) \ No newline at end of file diff --git a/src/dashboard-service/handlers/request-handlers/get-kpi-snapshot-request-handler.ts b/src/dashboard-service/handlers/request-handlers/get-kpi-snapshot-request-handler.ts new file mode 100644 index 00000000..507e181b --- /dev/null +++ b/src/dashboard-service/handlers/request-handlers/get-kpi-snapshot-request-handler.ts @@ -0,0 +1,8 @@ +import { withController } from '../../../handlers/request-handlers/with-controller-request-handler' + +import { GetKPISnapshotController } from '../../controllers/get-kpi-snapshot-controller' +import { SnapshotService } from '../../services/snapshot-service' + +export const createGetKPISnapshotRequestHandler = (snapshotService: SnapshotService) => { + return withController(() => new GetKPISnapshotController(snapshotService)) +} \ No newline at end of file diff --git a/src/dashboard-service/index.ts b/src/dashboard-service/index.ts new file mode 100644 index 00000000..4f64161c --- /dev/null +++ b/src/dashboard-service/index.ts @@ -0,0 +1,42 @@ +import { createLogger } from '../factories/logger-factory' + +import { createDashboardService } from './app' +import { getDashboardServiceConfig } from './config' + +const debug = createLogger('dashboard-service:index') + +const run = async () => { + const config = getDashboardServiceConfig() + console.info('dashboard-service: bootstrapping with config %o', config) + const service = createDashboardService(config) + + const shutdown = async () => { + console.info('dashboard-service: received shutdown signal') + debug('received shutdown signal') + await service.stop() + process.exit(0) + } + + process + .on('SIGINT', shutdown) + .on('SIGTERM', shutdown) + + process.on('uncaughtException', (error) => { + console.error('dashboard-service: uncaught exception', error) + }) + + process.on('unhandledRejection', (error) => { + console.error('dashboard-service: unhandled rejection', error) + }) + + await service.start() +} + +if (require.main === module) { + run().catch((error) => { + console.error('dashboard-service: unable to start', error) + process.exit(1) + }) +} + +export { run } diff --git a/src/dashboard-service/polling/polling-scheduler.ts b/src/dashboard-service/polling/polling-scheduler.ts new file mode 100644 index 00000000..58663adb --- /dev/null +++ b/src/dashboard-service/polling/polling-scheduler.ts @@ -0,0 +1,69 @@ +import { createLogger } from '../../factories/logger-factory' + +type Tick = () => Promise | void + +const debug = createLogger('dashboard-service:polling') + +/** + * Runs a tick callback on a fixed cadence, but — unlike setInterval — never + * overlaps: the next tick is only scheduled *after* the current one resolves + * or rejects. This prevents DB query storms when a poll takes longer than the + * configured interval. + */ +export class PollingScheduler { + private timer: NodeJS.Timeout | undefined + private running = false + + public constructor( + private readonly intervalMs: number, + private readonly tick: Tick, + ) { } + + public start(): void { + if (this.running) { + return + } + + this.running = true + debug('starting scheduler with interval %d ms', this.intervalMs) + this.scheduleNext() + } + + public stop(): void { + if (!this.running) { + return + } + + debug('stopping scheduler') + this.running = false + + if (this.timer) { + clearTimeout(this.timer) + this.timer = undefined + } + } + + public isRunning(): boolean { + return this.running + } + + private scheduleNext(): void { + if (!this.running) { + return + } + + this.timer = setTimeout(() => { + this.timer = undefined + + Promise.resolve(this.tick()) + .catch((error) => { + console.error('dashboard-service: polling tick failed', error) + }) + .finally(() => { + // Schedule the next tick only after the current one completes, + // regardless of success or failure. + this.scheduleNext() + }) + }, this.intervalMs) + } +} diff --git a/src/dashboard-service/services/incremental-kpi-collector-service.ts b/src/dashboard-service/services/incremental-kpi-collector-service.ts new file mode 100644 index 00000000..e26ce2b8 --- /dev/null +++ b/src/dashboard-service/services/incremental-kpi-collector-service.ts @@ -0,0 +1,586 @@ +import { DashboardMetrics, EventsByKindCount, TopTalker } from '../types' +import { createLogger } from '../../factories/logger-factory' +import { DatabaseClient } from '../../@types/base' + +const debug = createLogger('dashboard-service:incremental-kpi-collector') + +const DEFAULT_TRACKED_KINDS = [7, 1, 6, 1984, 4, 3, 9735] +const MINUTES_PER_DAY = 24 * 60 +const SATS_SCALE_FACTOR = 1000 + + +class MinHeap { + private readonly data: TopTalker[] = [] + + public constructor(private readonly maxSize: number) {} + + public push(item: TopTalker): void { + if (this.data.length < this.maxSize) { + this.data.push(item) + this.bubbleUp(this.data.length - 1) + } else if (this.data.length > 0 && item.count > this.data[0].count) { + this.data[0] = item + this.sinkDown(0) + } + } + + /** Returns the heap contents sorted descending by count. */ + public toSortedDescArray(): TopTalker[] { + return [...this.data].sort((a, b) => b.count - a.count) + } + + public get size(): number { + return this.data.length + } + + private bubbleUp(idx: number): void { + while (idx > 0) { + const parent = (idx - 1) >> 1 + if (this.data[parent].count <= this.data[idx].count) { + break + } + [this.data[parent], this.data[idx]] = [this.data[idx], this.data[parent]] + idx = parent + } + } + + private sinkDown(idx: number): void { + const n = this.data.length + // eslint-disable-next-line no-constant-condition + while(true) { + let smallest = idx + const left = 2 * idx + 1 + const right = 2 * idx + 2 + + if (left < n && this.data[left].count < this.data[smallest].count) { + smallest = left + } + if (right < n && this.data[right].count < this.data[smallest].count) { + smallest = right + } + if (smallest === idx) { + break + } + [this.data[smallest], this.data[idx]] = [this.data[idx], this.data[smallest]] + idx = smallest + } + } +} + + +interface ICombinedEventRow { + agg_type: 'kind' | 'talker' | 'bucket' + event_kind: number | null + pubkey: string | null + bucket_epoch: string | number | null + count: string | number +} + +interface IEventCursorRow { + first_seen: string + id: string +} + +interface IUserSnapshotRow { + pubkey: string | Buffer + is_admitted: boolean | number | string + balance: string | number | null + updated_at_epoch: string | number +} + +interface IUserCursorRow { + updated_at_epoch: string | number + pubkey: string | Buffer +} + + +const toNumber = (value: unknown): number => { + if (typeof value === 'number') { + return value + } + if (typeof value === 'string' && value !== '') { + return Number(value) + } + return 0 +} + +const toBoolean = (value: unknown): boolean => { + if (typeof value === 'boolean') { + return value + } + if (typeof value === 'number') { + return value === 1 + } + if (typeof value === 'string') { + return value === '1' || value.toLowerCase() === 'true' || value.toLowerCase() === 't' + } + return false +} + +const normalizeText = (value: unknown): string => { + if (Buffer.isBuffer(value)) { + return value.toString('hex') + } + return String(value) +} + + +interface IEventCursor { + firstSeen: string + id: string +} + +interface IUserCursor { + updatedAtEpoch: number + pubkey: string +} + +interface IUserState { + isAdmitted: boolean + balanceMillisats: number +} + +export class IncrementalKPICollectorService { + private readonly trackedKindsSet: Set + + // All-time talker counts stored in a Map; the MinHeap is rebuilt on each + // collectMetrics() call from this map — keeping O(N) space in the map while + // heap work is O(N log K) per cycle instead of O(N log N). + private readonly allTimeTalkerCounts = new Map() + + private readonly eventsByKindCounts = new Map() + + // Recent bucket data: minuteEpoch → pubkey → count (pruned each cycle) + private readonly recentBucketTalkerCounts = new Map>() + + private readonly userStates = new Map() + + private admittedUsers = 0 + + private eventCursor: IEventCursor = { + firstSeen: '1970-01-01 00:00:00.000000', + id: '00000000-0000-0000-0000-000000000000', + } + + private initialized = false + + private satsPaidMillisats = 0 + + private userCursor: IUserCursor = { + updatedAtEpoch: 0, + pubkey: '', + } + + public constructor( + private readonly dbClient: DatabaseClient, + private readonly trackedKinds: number[] = DEFAULT_TRACKED_KINDS, + private readonly topTalkersLimit = 10, + private readonly recentDays = 3, + ) { + this.trackedKindsSet = new Set(trackedKinds) + } + + public async collectMetrics(): Promise { + if (!this.initialized) { + await this.bootstrapState() + this.initialized = true + } else { + await Promise.all([ + this.applyEventDeltas(), + this.applyUserDeltas(), + ]) + } + + this.pruneRecentBuckets() + + return { + eventsByKind: this.buildEventsByKindMetrics(), + admittedUsers: this.admittedUsers, + satsPaid: this.satsPaidMillisats / SATS_SCALE_FACTOR, + topTalkers: { + allTime: this.getTopKFromMap(this.allTimeTalkerCounts), + recent: this.getRecentTopTalkers(), + }, + } + } + + /** + * Fetches all new events since the last cursor in a single MATERIALIZED CTE + * query, then fans the rows out into three accumulators: + * • kind counts + * • all-time talker counts + * • per-minute bucket talker counts (for the recent window) + * + * Using MATERIALIZED forces a single table scan; without it PG 15 may inline + * the CTE and re-scan for each UNION ALL branch. + */ + + private async applyEventDeltas(): Promise { + const sinceMinuteEpoch = this.getWindowStartMinute() + + // Combined query: one MATERIALIZED CTE, three aggregation branches. + const combinedSql = ` + WITH new_events AS MATERIALIZED ( + SELECT event_kind, event_pubkey, first_seen + FROM events + WHERE (first_seen, id) > (?, ?) + ) + SELECT 'kind' AS agg_type, + event_kind, + NULL::text AS pubkey, + NULL::bigint AS bucket_epoch, + COUNT(*)::bigint AS count + FROM new_events + GROUP BY event_kind + + UNION ALL + + SELECT 'talker', + NULL, + encode(event_pubkey, 'hex'), + NULL, + COUNT(*)::bigint + FROM new_events + GROUP BY event_pubkey + + UNION ALL + + SELECT 'bucket', + NULL, + encode(event_pubkey, 'hex'), + extract(epoch FROM date_trunc('minute', first_seen))::bigint, + COUNT(*)::bigint + FROM new_events + WHERE first_seen >= to_timestamp(?) + GROUP BY event_pubkey, date_trunc('minute', first_seen); + ` + + // Cursor update: a separate lightweight query on the indexed (first_seen, id) pair. + const cursorSql = ` + SELECT to_char(first_seen, 'YYYY-MM-DD HH24:MI:SS.US') AS first_seen, id + FROM events + WHERE (first_seen, id) > (?, ?) + ORDER BY first_seen DESC, id DESC + LIMIT 1; + ` + + const [combinedRows, cursorRows] = await Promise.all([ + this.queryRows(combinedSql, [ + this.eventCursor.firstSeen, + this.eventCursor.id, + sinceMinuteEpoch * 60, + ]), + this.queryRows(cursorSql, [ + this.eventCursor.firstSeen, + this.eventCursor.id, + ]), + ]) + + for (const row of combinedRows) { + const count = toNumber(row.count) + + if (row.agg_type === 'kind') { + const kind = this.trackedKindsSet.has(toNumber(row.event_kind)) + ? String(row.event_kind) + : 'other' + this.eventsByKindCounts.set(kind, (this.eventsByKindCounts.get(kind) ?? 0) + count) + } else if (row.agg_type === 'talker') { + const pubkey = row.pubkey ?? '' + this.allTimeTalkerCounts.set(pubkey, (this.allTimeTalkerCounts.get(pubkey) ?? 0) + count) + } else if (row.agg_type === 'bucket') { + const minuteEpoch = toNumber(row.bucket_epoch) + const pubkey = row.pubkey ?? '' + const bucket = this.recentBucketTalkerCounts.get(minuteEpoch) ?? new Map() + bucket.set(pubkey, (bucket.get(pubkey) ?? 0) + count) + this.recentBucketTalkerCounts.set(minuteEpoch, bucket) + } + } + + if (cursorRows.length > 0) { + this.eventCursor = { + firstSeen: cursorRows[0].first_seen, + id: cursorRows[0].id, + } + } + } + + private async applyUserDeltas(): Promise { + const [changedUsers, latestUserCursor] = await Promise.all([ + this.queryRows( + ` + SELECT + encode(pubkey, 'hex') AS pubkey, + is_admitted, + balance, + extract(epoch FROM updated_at)::bigint AS updated_at_epoch + FROM users + WHERE (extract(epoch FROM updated_at)::bigint, encode(pubkey, 'hex')) > (?, ?) + ORDER BY updated_at ASC, pubkey ASC; + `, + [this.userCursor.updatedAtEpoch, this.userCursor.pubkey], + ), + this.queryRows( + ` + SELECT + extract(epoch FROM updated_at)::bigint AS updated_at_epoch, + encode(pubkey, 'hex') AS pubkey + FROM users + WHERE (extract(epoch FROM updated_at)::bigint, encode(pubkey, 'hex')) > (?, ?) + ORDER BY updated_at DESC, pubkey DESC + LIMIT 1; + `, + [this.userCursor.updatedAtEpoch, this.userCursor.pubkey], + ), + ]) + + for (const row of changedUsers) { + const pubkey = normalizeText(row.pubkey) + const nextState: IUserState = { + isAdmitted: toBoolean(row.is_admitted), + balanceMillisats: toNumber(row.balance), + } + + const previousState = this.userStates.get(pubkey) + if (previousState?.isAdmitted) { + this.admittedUsers -= 1 + this.satsPaidMillisats -= previousState.balanceMillisats + } + + if (nextState.isAdmitted) { + this.admittedUsers += 1 + this.satsPaidMillisats += nextState.balanceMillisats + } + + this.userStates.set(pubkey, nextState) + } + + if (latestUserCursor.length > 0) { + this.userCursor = { + updatedAtEpoch: toNumber(latestUserCursor[0].updated_at_epoch), + pubkey: normalizeText(latestUserCursor[0].pubkey), + } + } + } + + /** + * The bootstrap runs only for the first time and + * Same MATERIALIZED CTE strategy as applyEventDeltas but without the cursor + * predicate (reads entire table on first run). + */ + private async bootstrapState(): Promise { + debug('bootstrapping incremental KPI collector state') + + this.resetState() + + const sinceMinuteEpoch = this.getWindowStartMinute() + + const bootstrapEventsSql = ` + WITH all_events AS MATERIALIZED ( + SELECT event_kind, event_pubkey, first_seen + FROM events + ) + SELECT 'kind' AS agg_type, + event_kind, + NULL::text AS pubkey, + NULL::bigint AS bucket_epoch, + COUNT(*)::bigint AS count + FROM all_events + GROUP BY event_kind + + UNION ALL + + SELECT 'talker', + NULL, + encode(event_pubkey, 'hex'), + NULL, + COUNT(*)::bigint + FROM all_events + GROUP BY event_pubkey + + UNION ALL + + SELECT 'bucket', + NULL, + encode(event_pubkey, 'hex'), + extract(epoch FROM date_trunc('minute', first_seen))::bigint, + COUNT(*)::bigint + FROM all_events + WHERE first_seen >= to_timestamp(?) + GROUP BY event_pubkey, date_trunc('minute', first_seen); + ` + + const [combinedRows, eventCursorRows, userRows, userCursorRows] = await Promise.all([ + this.queryRows(bootstrapEventsSql, [sinceMinuteEpoch * 60]), + this.queryRows( + ` + SELECT to_char(first_seen, 'YYYY-MM-DD HH24:MI:SS.US') AS first_seen, id + FROM events + ORDER BY first_seen DESC, id DESC + LIMIT 1; + `, + [], + ), + // Only load admitted users at bootstrap — avoids unbounded memory growth + // from loading every user row into the in-memory map. + this.queryRows( + ` + SELECT + encode(pubkey, 'hex') AS pubkey, + is_admitted, + balance, + extract(epoch FROM updated_at)::bigint AS updated_at_epoch + FROM users + WHERE is_admitted = true; + `, + [], + ), + this.queryRows( + ` + SELECT + extract(epoch FROM updated_at)::bigint AS updated_at_epoch, + encode(pubkey, 'hex') AS pubkey + FROM users + ORDER BY updated_at DESC, pubkey DESC + LIMIT 1; + `, + [], + ), + ]) + + for (const row of combinedRows) { + const count = toNumber(row.count) + + if (row.agg_type === 'kind') { + const kind = this.trackedKindsSet.has(toNumber(row.event_kind)) + ? String(row.event_kind) + : 'other' + this.eventsByKindCounts.set(kind, (this.eventsByKindCounts.get(kind) ?? 0) + count) + } else if (row.agg_type === 'talker') { + const pubkey = row.pubkey ?? '' + this.allTimeTalkerCounts.set(pubkey, (this.allTimeTalkerCounts.get(pubkey) ?? 0) + count) + } else if (row.agg_type === 'bucket') { + const minuteEpoch = toNumber(row.bucket_epoch) + const pubkey = row.pubkey ?? '' + const bucket = this.recentBucketTalkerCounts.get(minuteEpoch) ?? new Map() + bucket.set(pubkey, (bucket.get(pubkey) ?? 0) + count) + this.recentBucketTalkerCounts.set(minuteEpoch, bucket) + } + } + + // Bootstrap only admitted users (memory-safe for large relays). + for (const row of userRows) { + const pubkey = normalizeText(row.pubkey) + const userState: IUserState = { + isAdmitted: true, // filtered in query + balanceMillisats: toNumber(row.balance), + } + this.userStates.set(pubkey, userState) + this.admittedUsers += 1 + this.satsPaidMillisats += userState.balanceMillisats + } + + if (eventCursorRows.length > 0) { + this.eventCursor = { + firstSeen: eventCursorRows[0].first_seen, + id: eventCursorRows[0].id, + } + } + + if (userCursorRows.length > 0) { + this.userCursor = { + updatedAtEpoch: toNumber(userCursorRows[0].updated_at_epoch), + pubkey: normalizeText(userCursorRows[0].pubkey), + } + } + } + + private buildEventsByKindMetrics(): EventsByKindCount[] { + const eventsByKind = Array + .from(this.eventsByKindCounts.entries()) + .filter(([kind]) => kind !== 'other') + .map(([kind, count]) => ({ kind, count })) + .sort((a, b) => b.count - a.count) + + eventsByKind.push({ + kind: 'other', + count: this.eventsByKindCounts.get('other') ?? 0, + }) + + return eventsByKind + } + + /** + * Builds Top-K talkers using a MinHeap (O(N log K)) instead of a full sort + * (O(N log N)). For large relays with millions of distinct pubkeys this is a + * significant speedup and the heap is bounded to K entries. + */ + private getTopKFromMap(counts: Map): TopTalker[] { + const heap = new MinHeap(this.topTalkersLimit) + + for (const [pubkey, count] of counts) { + heap.push({ pubkey, count }) + } + + return heap.toSortedDescArray() + } + + private getRecentTopTalkers(): TopTalker[] { + // Merge all per-minute buckets into a single counts map, then run Top-K. + const merged = new Map() + + for (const bucketCounts of this.recentBucketTalkerCounts.values()) { + for (const [pubkey, count] of bucketCounts) { + merged.set(pubkey, (merged.get(pubkey) ?? 0) + count) + } + } + + return this.getTopKFromMap(merged) + } + + private getWindowStartMinute(): number { + const windowMinutes = this.recentDays * MINUTES_PER_DAY + const nowMinute = Math.floor(Date.now() / 60000) + return nowMinute - windowMinutes + } + + private pruneRecentBuckets(): void { + const thresholdMinute = this.getWindowStartMinute() + + for (const bucketMinute of this.recentBucketTalkerCounts.keys()) { + if (bucketMinute < thresholdMinute) { + this.recentBucketTalkerCounts.delete(bucketMinute) + } + } + } + + private async queryRows(sql: string, bindings: unknown[]): Promise { + const rawResult = await this.dbClient.raw(sql, bindings) + + if (Array.isArray((rawResult as any).rows)) { + return (rawResult as any).rows as T[] + } + + if (Array.isArray(rawResult)) { + return rawResult as unknown as T[] + } + + return [] + } + + private resetState(): void { + this.allTimeTalkerCounts.clear() + this.eventsByKindCounts.clear() + this.recentBucketTalkerCounts.clear() + this.userStates.clear() + this.admittedUsers = 0 + this.satsPaidMillisats = 0 + this.eventCursor = { + firstSeen: '1970-01-01 00:00:00.000000', + id: '00000000-0000-0000-0000-000000000000', + } + this.userCursor = { + updatedAtEpoch: 0, + pubkey: '', + } + } +} diff --git a/src/dashboard-service/services/kpi-collector-service.ts b/src/dashboard-service/services/kpi-collector-service.ts new file mode 100644 index 00000000..51139733 --- /dev/null +++ b/src/dashboard-service/services/kpi-collector-service.ts @@ -0,0 +1,134 @@ +import { DashboardMetrics, EventsByKindCount, TopTalker } from '../types' +import { createLogger } from '../../factories/logger-factory' +import { DatabaseClient } from '../../@types/base' + +const debug = createLogger('dashboard-service:kpi-collector') + +const DEFAULT_TRACKED_KINDS = [7, 1, 6, 1984, 4, 3, 9735] + +const toNumber = (value: unknown): number => { + if (typeof value === 'number') { + return value + } + + if (typeof value === 'string' && value !== '') { + return Number(value) + } + + return 0 +} + +export class KPICollectorService { + public constructor( + private readonly dbClient: DatabaseClient, + private readonly trackedKinds: number[] = DEFAULT_TRACKED_KINDS, + private readonly topTalkersLimit = 10, + private readonly recentDays = 3, + ) { } + + public async collectMetrics(): Promise { + debug('collecting dashboard metrics') + + const [ + eventsByKind, + admittedUsers, + satsPaid, + allTimeTopTalkers, + recentTopTalkers, + ] = await Promise.all([ + this.getEventsByKind(), + this.getAdmittedUsersCount(), + this.getSatsPaidCount(), + this.getTopTalkersAllTime(), + this.getTopTalkersRecent(), + ]) + + return { + eventsByKind, + admittedUsers, + satsPaid, + topTalkers: { + allTime: allTimeTopTalkers, + recent: recentTopTalkers, + }, + } + } + + private async getEventsByKind(): Promise { + const rows = await this.dbClient('events') + .select('event_kind') + .count('id as count') + .whereIn('event_kind', this.trackedKinds) + .groupBy('event_kind') + .orderBy('count', 'desc') as Array<{ event_kind: number, count: string }> + + const other = await this.dbClient('events') + .whereNotIn('event_kind', this.trackedKinds) + .count<{ count: string }>('id as count') + .first() + + const eventsByKind = rows.map((row) => { + return { + kind: String(row.event_kind), + count: toNumber(row.count), + } + }) + + eventsByKind.push({ + kind: 'other', + count: toNumber(other?.count), + }) + + return eventsByKind + } + + private async getAdmittedUsersCount(): Promise { + const result = await this.dbClient('users') + .where('is_admitted', true) + .count<{ count: string }>('pubkey as count') + .first() + + return toNumber(result?.count) + } + + private async getSatsPaidCount(): Promise { + const result = await this.dbClient('users') + .where('is_admitted', true) + .sum<{ total: string | null }>('balance as total') + .first() + + const millisats = toNumber(result?.total) + return millisats / 1000 + } + + private async getTopTalkersAllTime(): Promise { + const rows = await this.dbClient('events') + .select(this.dbClient.raw("encode(event_pubkey, 'hex') as pubkey")) + .count('id as count') + .groupBy('event_pubkey') + .orderBy('count', 'desc') + .limit(this.topTalkersLimit) as unknown as Array<{ pubkey: string | Buffer, count: string | number }> + + return rows.map((row) => ({ + pubkey: String(row.pubkey), + count: toNumber(row.count), + })) + } + + private async getTopTalkersRecent(): Promise { + const since = new Date(Date.now() - this.recentDays * 24 * 60 * 60 * 1000) + + const rows = await this.dbClient('events') + .select(this.dbClient.raw("encode(event_pubkey, 'hex') as pubkey")) + .count('id as count') + .where('first_seen', '>=', since) + .groupBy('event_pubkey') + .orderBy('count', 'desc') + .limit(this.topTalkersLimit) as unknown as Array<{ pubkey: string | Buffer, count: string | number }> + + return rows.map((row) => ({ + pubkey: String(row.pubkey), + count: toNumber(row.count), + })) + } +} diff --git a/src/dashboard-service/services/snapshot-service.ts b/src/dashboard-service/services/snapshot-service.ts new file mode 100644 index 00000000..d846269f --- /dev/null +++ b/src/dashboard-service/services/snapshot-service.ts @@ -0,0 +1,81 @@ +import { DashboardMetrics, KPISnapshot } from '../types' +import { createLogger } from '../../factories/logger-factory' + +const debug = createLogger('dashboard-service:snapshot-service') + +const defaultMetrics = (): DashboardMetrics => ({ + eventsByKind: [], + admittedUsers: 0, + satsPaid: 0, + topTalkers: { + allTime: [], + recent: [], + }, +}) + +export interface ISnapshotRefreshResult { + snapshot: KPISnapshot + changed: boolean +} + +export interface IKPICollector { + collectMetrics(): Promise + close?(): Promise | void +} + +export class SnapshotService { + private metricsFingerprint = JSON.stringify(defaultMetrics()) + + private sequence = 0 + + private snapshot: KPISnapshot = { + sequence: this.sequence, + generatedAt: new Date(0).toISOString(), + status: 'live', + metrics: defaultMetrics(), + } + + public constructor(private readonly collector: IKPICollector) { } + + public getSnapshot(): KPISnapshot { + return this.snapshot + } + + /** + * Fetches fresh metrics from the collector and updates the snapshot if the + * metrics have changed. Throws if the collector is unavailable — callers + * are responsible for catching and deciding how to surface errors. + */ + public async refresh(): Promise { + const metrics = await this.collector.collectMetrics() + const nextFingerprint = JSON.stringify(metrics) + + if (nextFingerprint === this.metricsFingerprint && this.snapshot.status === 'live') { + debug('metrics unchanged, skipping snapshot sequence update') + return { + snapshot: this.snapshot, + changed: false, + } + } + + this.metricsFingerprint = nextFingerprint + + return this.updateSnapshot(metrics, 'live') + } + + private updateSnapshot(metrics: DashboardMetrics, status: 'live' | 'stale'): ISnapshotRefreshResult { + this.sequence += 1 + + this.snapshot = { + sequence: this.sequence, + generatedAt: new Date().toISOString(), + status, + metrics, + } + + return { + snapshot: this.snapshot, + changed: true, + } + } +} diff --git a/src/dashboard-service/services/stateful-incremental-service.ts b/src/dashboard-service/services/stateful-incremental-service.ts new file mode 100644 index 00000000..2638dace --- /dev/null +++ b/src/dashboard-service/services/stateful-incremental-service.ts @@ -0,0 +1,239 @@ +import { Client, ClientConfig } from 'pg' +import { createLogger } from '../../factories/logger-factory' +import { DashboardMetrics } from '../types' +import { DatabaseClient } from '../../@types/base' +import { IncrementalKPICollectorService } from './incremental-kpi-collector-service' + +const debug = createLogger('dashboard-service:stateful-incremental-kpi-collector') + +const DEFAULT_EVENTS_CHANNEL = 'dashboard_events_changed' +const DEFAULT_USERS_CHANNEL = 'dashboard_users_changed' + +const isValidChannelName = (channel: string): boolean => { + return /^[a-zA-Z_][a-zA-Z0-9_]*$/.test(channel) +} + +const getListenerConnectionConfig = (): ClientConfig => { + if (process.env.DB_URI) { + return { + connectionString: process.env.DB_URI, + } + } + + return { + host: process.env.DB_HOST, + port: Number(process.env.DB_PORT), + user: process.env.DB_USER, + password: process.env.DB_PASSWORD, + database: process.env.DB_NAME, + } +} + +const defaultMetrics = (): DashboardMetrics => { + return { + eventsByKind: [], + admittedUsers: 0, + satsPaid: 0, + topTalkers: { + allTime: [], + recent: [], + }, + } +} + +export class StatefulIncrementalKPICollectorService { + private cachedMetrics: DashboardMetrics = defaultMetrics() + + private hasCache = false + + private isDirty = true + + private isListenerReady = false + + /** Set to true permanently once close() is called — prevents reconnect loops after shutdown. */ + private isClosed = false + + private listenerClient: Client | undefined + + private reconnectTimer: ReturnType | undefined + + private readonly incrementalCollector: IncrementalKPICollectorService + + private readonly channels: string[] + + private static readonly BASE_DELAY_MS = 500 + private static readonly MAX_DELAY_MS = 30_000 + + /** Backoff state — reset to BASE_DELAY_MS on every successful connect. */ + private reconnectDelayMs = StatefulIncrementalKPICollectorService.BASE_DELAY_MS + + public constructor( + dbClient: DatabaseClient, + trackedKinds?: number[], + topTalkersLimit?: number, + recentDays?: number, + ) { + this.incrementalCollector = new IncrementalKPICollectorService( + dbClient, + trackedKinds, + topTalkersLimit, + recentDays, + ) + + this.channels = [ + process.env.DASHBOARD_EVENTS_NOTIFY_CHANNEL ?? DEFAULT_EVENTS_CHANNEL, + process.env.DASHBOARD_USERS_NOTIFY_CHANNEL ?? DEFAULT_USERS_CHANNEL, + ] + } + + public async collectMetrics(): Promise { + // Kick off a connect attempt if the listener isn't alive yet. + // We don't await here — the listener is best-effort; data comes from the + // incremental collector regardless. + if (!this.isListenerReady && !this.listenerClient) { + this.scheduleReconnect(0) + } + + if (!this.hasCache || this.isDirty) { + this.cachedMetrics = await this.incrementalCollector.collectMetrics() + this.hasCache = true + this.isDirty = false + } + + return this.cachedMetrics + } + + public async close(): Promise { + this.isClosed = true + + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer) + this.reconnectTimer = undefined + } + + const client = this.listenerClient + this.listenerClient = undefined + this.isListenerReady = false + + if (!client) { + return + } + + for (const channel of this.channels) { + if (!isValidChannelName(channel)) { + continue + } + + try { + await client.query(`UNLISTEN ${channel}`) + } catch (error) { + console.error('dashboard-service: failed to unlisten channel', { + channel, + error, + }) + } + } + + client.removeAllListeners('notification') + client.removeAllListeners('error') + client.removeAllListeners('end') + + try { + await client.end() + } catch (error) { + console.error('dashboard-service: failed to close stateful incremental collector listener', error) + } + } + + + /** + * Schedule a reconnect attempt after `delayMs` milliseconds. + * Passing 0 connects immediately (used on first call and after close via `close()`). + */ + private scheduleReconnect(delayMs: number): void { + if (this.isClosed || this.reconnectTimer || this.isListenerReady) { + return + } + + debug('scheduling listener reconnect in %d ms', delayMs) + + this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = undefined + this.connectListener().catch((err) => { + // connectListener already logs; just ensure the loop continues. + debug('connectListener threw unexpectedly: %o', err) + }) + }, delayMs) + } + + private async connectListener(): Promise { + if (this.isClosed || this.isListenerReady) { + return + } + + const client = new Client(getListenerConnectionConfig()) + + client.on('notification', (notification) => { + if (!notification.channel || !this.channels.includes(notification.channel)) { + return + } + + this.isDirty = true + debug('received postgres notification on channel=%s', notification.channel) + }) + + client.on('error', (error) => { + this.isDirty = true + this.isListenerReady = false + console.error('dashboard-service: stateful incremental collector listener error', error) + // Don't call scheduleReconnect here — 'end' will always fire after 'error' + // on a pg.Client, so we reconnect from the 'end' handler to avoid double-scheduling. + }) + + client.on('end', () => { + this.isDirty = true + this.isListenerReady = false + this.listenerClient = undefined + debug('postgres stateful incremental collector listener ended — will reconnect in %d ms', this.reconnectDelayMs) + + if (!this.isClosed) { + this.scheduleReconnect(this.reconnectDelayMs) + // Exponential backoff, capped at MAX_DELAY_MS. + this.reconnectDelayMs = Math.min( + this.reconnectDelayMs * 2, + StatefulIncrementalKPICollectorService.MAX_DELAY_MS, + ) + } + }) + + try { + await client.connect() + + for (const channel of this.channels) { + if (!isValidChannelName(channel)) { + console.error('dashboard-service: skipping invalid notify channel name', channel) + continue + } + + await client.query(`LISTEN ${channel}`) + } + + this.listenerClient = client + this.isListenerReady = true + // Reset backoff on successful connect. + this.reconnectDelayMs = StatefulIncrementalKPICollectorService.BASE_DELAY_MS + debug('postgres stateful incremental collector listener initialized for channels=%o', this.channels) + } catch (error) { + this.isDirty = true + this.listenerClient = undefined + this.isListenerReady = false + console.error('dashboard-service: unable to initialize stateful incremental collector listener', error) + + try { + await client.end() + } catch (_closeError) { + // best effort — 'end' handler above will fire and schedule the next reconnect + } + } + } +} diff --git a/src/dashboard-service/types.ts b/src/dashboard-service/types.ts new file mode 100644 index 00000000..15165b30 --- /dev/null +++ b/src/dashboard-service/types.ts @@ -0,0 +1,40 @@ +export interface TopTalker { + pubkey: string + count: number +} + +export interface EventsByKindCount { + kind: string + count: number +} + +export interface DashboardMetrics { + eventsByKind: EventsByKindCount[] + admittedUsers: number + satsPaid: number + topTalkers: { + allTime: TopTalker[] + recent: TopTalker[] + } +} + +export interface KPISnapshot { + sequence: number + generatedAt: string + status: 'live' | 'stale' + metrics: DashboardMetrics +} + +export interface DashboardSnapshotResponse { + data: KPISnapshot +} + +export interface DashboardWebSocketEnvelope { + type: TType + payload: TPayload +} + +export type DashboardServerMessage = + | DashboardWebSocketEnvelope<'dashboard.connected', { at: string }> + | DashboardWebSocketEnvelope<'kpi.snapshot', KPISnapshot> + | DashboardWebSocketEnvelope<'kpi.tick', { at: string, sequence: number }> diff --git a/src/dashboard-service/ws/dashboard-ws-hub.ts b/src/dashboard-service/ws/dashboard-ws-hub.ts new file mode 100644 index 00000000..1b3b330d --- /dev/null +++ b/src/dashboard-service/ws/dashboard-ws-hub.ts @@ -0,0 +1,141 @@ +import { DashboardServerMessage, KPISnapshot } from '../types' +import { RawData, WebSocketServer } from 'ws' +import { createLogger } from '../../factories/logger-factory' +import WebSocket from 'ws' + +const debug = createLogger('dashboard-service:ws') + +export class DashboardWebSocketHub { + public constructor( + private readonly webSocketServer: WebSocketServer, + private readonly getSnapshot: () => KPISnapshot, + ) { + console.info('dashboard-service: websocket hub initialized') + + this.webSocketServer + .on('connection', this.onConnection.bind(this)) + .on('close', () => { + console.info('dashboard-service: websocket server closed') + }) + .on('error', (error) => { + console.error('dashboard-service: websocket server error', error) + }) + } + + public broadcastSnapshot(snapshot: KPISnapshot): void { + this.broadcast({ + type: 'kpi.snapshot', + payload: snapshot, + }) + } + + public broadcastTick(sequence: number): void { + this.broadcast({ + type: 'kpi.tick', + payload: { + at: new Date().toISOString(), + sequence, + }, + }) + } + + public close(): void { + console.info('dashboard-service: closing websocket hub') + this.webSocketServer.clients.forEach((client) => { + client.close() + }) + this.webSocketServer.removeAllListeners() + } + + private onConnection(client: WebSocket): void { + const connectedClients = this.getConnectedClientsCount() + console.info('dashboard-service: websocket client connected (clients=%d)', connectedClients) + + client + .on('close', (code, reason) => { + console.info( + 'dashboard-service: websocket client disconnected (code=%d, reason=%s, clients=%d)', + code, + reason.toString(), + this.getConnectedClientsCount(), + ) + }) + .on('error', (error) => { + console.error('dashboard-service: websocket client error', error) + }) + .on('message', (raw) => { + this.onClientMessage(raw) + }) + + this.send(client, { + type: 'dashboard.connected', + payload: { + at: new Date().toISOString(), + }, + }) + + this.send(client, { + type: 'kpi.snapshot', + payload: this.getSnapshot(), + }) + + debug('dashboard websocket bootstrap snapshot sent') + } + + private onClientMessage(raw: RawData): void { + try { + const rawMessage = this.toUtf8(raw) + const message = JSON.parse(rawMessage) + debug('dashboard websocket client message received: %o', message) + } catch (error) { + console.error('dashboard-service: websocket message parsing failed', error) + } + } + + private broadcast(message: DashboardServerMessage): void { + this.webSocketServer.clients.forEach((client) => { + if (client.readyState !== WebSocket.OPEN) { + return + } + this.send(client, message) + }) + } + + private send(client: WebSocket, message: DashboardServerMessage): void { + if (client.readyState !== WebSocket.OPEN) { + return + } + + try { + client.send(JSON.stringify(message)) + } catch (error) { + console.error('dashboard-service: websocket send failed', error) + } + } + + private toUtf8(raw: RawData): string { + if (typeof raw === 'string') { + return raw + } + + if (Buffer.isBuffer(raw)) { + return raw.toString('utf8') + } + + if (Array.isArray(raw)) { + return raw.map((chunk) => { + if (Buffer.isBuffer(chunk)) { + return chunk.toString('utf8') + } + + return Buffer.from(chunk as ArrayBuffer).toString('utf8') + }).join('') + } + + return Buffer.from(raw).toString('utf8') + } + + private getConnectedClientsCount(): number { + return Array.from(this.webSocketServer.clients).filter((client) => client.readyState === WebSocket.OPEN).length + } +} diff --git a/test/unit/dashboard-service/app.spec.ts b/test/unit/dashboard-service/app.spec.ts new file mode 100644 index 00000000..f8e81331 --- /dev/null +++ b/test/unit/dashboard-service/app.spec.ts @@ -0,0 +1,45 @@ +import axios from 'axios' +import { createDashboardService } from '../../../src/dashboard-service/app' +import { expect } from 'chai' +import WebSocket from 'ws' + +describe('dashboard-service app', () => { + it('serves health, snapshot, and websocket endpoints', async () => { + const service = createDashboardService({ + host: '127.0.0.1', + port: 0, + wsPath: '/api/v1/kpis/stream', + pollIntervalMs: 1000, + useDummyData: true, + collectorMode: 'full', + }) + + await service.start() + + const port = service.getHttpPort() + + const healthResponse = await axios.get(`http://127.0.0.1:${port}/healthz`) + expect(healthResponse.status).to.equal(200) + + const snapshotResponse = await axios.get(`http://127.0.0.1:${port}/api/v1/kpis/snapshot`) + expect(snapshotResponse.status).to.equal(200) + + const snapshotJson = snapshotResponse.data as any + expect(snapshotJson.data).to.have.property('sequence') + + const ws = new WebSocket(`ws://127.0.0.1:${port}/api/v1/kpis/stream`) + + const connectedEvent = await new Promise((resolve, reject) => { + const timeout = setTimeout(() => reject(new Error('timeout waiting for ws message')), 2000) + ws.once('message', (raw) => { + clearTimeout(timeout) + resolve(JSON.parse(raw.toString())) + }) + }) + + expect(connectedEvent).to.have.property('type', 'dashboard.connected') + + ws.close() + await service.stop() + }) +}) diff --git a/test/unit/dashboard-service/polling-scheduler.spec.ts b/test/unit/dashboard-service/polling-scheduler.spec.ts new file mode 100644 index 00000000..8855fcb9 --- /dev/null +++ b/test/unit/dashboard-service/polling-scheduler.spec.ts @@ -0,0 +1,97 @@ +import { expect } from 'chai' +import Sinon from 'sinon' + +import { PollingScheduler } from '../../../src/dashboard-service/polling/polling-scheduler' + +describe('PollingScheduler', () => { + let clock: Sinon.SinonFakeTimers + + beforeEach(() => { + clock = Sinon.useFakeTimers() + }) + + afterEach(() => { + clock.restore() + }) + + /** + * The scheduler uses recursive setTimeout (not setInterval), so each tick + * is only enqueued after the previous one resolves. With instant-resolving + * stubs the sequence is: + * T=0 start() → schedules tick at T=1000 + * T=1000 tick #1 resolves → schedules tick at T=2000 + * T=2000 tick #2 resolves → schedules tick at T=3000 + * T=3000 tick #3 resolves → schedules tick at T=4000 + * tickAsync drives the microtask queue between timer firings, so all three + * ticks complete inside tickAsync(3000). + */ + it('runs tick callback on interval while running', async () => { + const tick = Sinon.stub().resolves(undefined) + const scheduler = new PollingScheduler(1000, tick) + + scheduler.start() + await clock.tickAsync(3000) + + expect(tick.callCount).to.equal(3) + scheduler.stop() + }) + + it('stops running when stop is called', async () => { + const tick = Sinon.stub().resolves(undefined) + const scheduler = new PollingScheduler(500, tick) + + scheduler.start() + await clock.tickAsync(1000) // ticks at 500ms, 1000ms → 2 calls + scheduler.stop() + await clock.tickAsync(1000) // no more ticks after stop + + expect(tick.callCount).to.equal(2) + }) + + it('does not overlap ticks when callback is slow', async () => { + // Tick takes 1500ms — longer than the 1000ms interval. + // With setInterval this would cause overlap; with recursive setTimeout it must not. + let running = 0 + let maxConcurrent = 0 + + const tick = Sinon.stub().callsFake(async () => { + running++ + maxConcurrent = Math.max(maxConcurrent, running) + await clock.tickAsync(1500) + running-- + }) + + const scheduler = new PollingScheduler(1000, tick) + scheduler.start() + // Drive enough time for two potential overlapping cycles + await clock.tickAsync(4000) + scheduler.stop() + + expect(maxConcurrent).to.equal(1, 'ticks must not run concurrently') + }) + + it('continues scheduling after a failed tick', async () => { + const tick = Sinon.stub() + .onFirstCall().rejects(new Error('transient error')) + .resolves(undefined) + + const scheduler = new PollingScheduler(1000, tick) + scheduler.start() + await clock.tickAsync(3000) + scheduler.stop() + + // First tick rejects, but the scheduler must recover and run two more. + expect(tick.callCount).to.be.greaterThanOrEqual(2) + }) + + it('isRunning reflects scheduler state', () => { + const tick = Sinon.stub().resolves(undefined) + const scheduler = new PollingScheduler(1000, tick) + + expect(scheduler.isRunning()).to.equal(false) + scheduler.start() + expect(scheduler.isRunning()).to.equal(true) + scheduler.stop() + expect(scheduler.isRunning()).to.equal(false) + }) +}) diff --git a/test/unit/dashboard-service/snapshot-service.spec.ts b/test/unit/dashboard-service/snapshot-service.spec.ts new file mode 100644 index 00000000..a1acba8a --- /dev/null +++ b/test/unit/dashboard-service/snapshot-service.spec.ts @@ -0,0 +1,107 @@ +import chai, { expect } from 'chai' +import chaiAsPromised from 'chai-as-promised' +import Sinon from 'sinon' + +import { IKPICollector, SnapshotService } from '../../../src/dashboard-service/services/snapshot-service' +import { DashboardMetrics } from '../../../src/dashboard-service/types' + +chai.use(chaiAsPromised) + +const createMetrics = (overrides: Partial = {}): DashboardMetrics => ({ + eventsByKind: [], + admittedUsers: 0, + satsPaid: 0, + topTalkers: { + allTime: [], + recent: [], + }, + ...overrides, +}) + +const makeCollector = (stub: Sinon.SinonStub): IKPICollector => ({ + collectMetrics: stub, +}) + +describe('SnapshotService', () => { + let sandbox: Sinon.SinonSandbox + + beforeEach(() => { + sandbox = Sinon.createSandbox() + }) + + afterEach(() => { + sandbox.restore() + }) + + it('updates snapshot when collected metrics change', async () => { + const firstMetrics = createMetrics({ admittedUsers: 1 }) + const nextMetrics = createMetrics({ admittedUsers: 2 }) + + const stub = sandbox.stub() + .onFirstCall().resolves(firstMetrics) + .onSecondCall().resolves(firstMetrics) + .onThirdCall().resolves(nextMetrics) + + const service = new SnapshotService(makeCollector(stub)) + + const first = await service.refresh() + expect(first.changed).to.equal(true, 'first refresh should report changed') + expect(first.snapshot.sequence).to.equal(1) + expect(first.snapshot.status).to.equal('live') + + const second = await service.refresh() + expect(second.changed).to.equal(false, 'second refresh with same metrics should not change') + expect(second.snapshot.sequence).to.equal(1, 'sequence must not advance when metrics are unchanged') + + const third = await service.refresh() + expect(third.changed).to.equal(true, 'third refresh with new metrics should report changed') + expect(third.snapshot.sequence).to.equal(2) + expect(third.snapshot.metrics.admittedUsers).to.equal(2) + }) + + it('does not advance sequence when metrics are identical across refreshes', async () => { + const metrics = createMetrics({ satsPaid: 100 }) + const stub = sandbox.stub().resolves(metrics) + + const service = new SnapshotService(makeCollector(stub)) + + const first = await service.refresh() + expect(first.changed).to.equal(true) + expect(first.snapshot.sequence).to.equal(1) + + const second = await service.refresh() + expect(second.changed).to.equal(false) + expect(second.snapshot.sequence).to.equal(1) + }) + + it('propagates collector errors to the caller', async () => { + const stub = sandbox.stub().rejects(new Error('db down')) + + const service = new SnapshotService(makeCollector(stub)) + + await expect(service.refresh()).to.be.rejectedWith('db down') + }) + + it('returns the last known snapshot via getSnapshot()', async () => { + const metrics = createMetrics({ admittedUsers: 5 }) + const stub = sandbox.stub().resolves(metrics) + + const service = new SnapshotService(makeCollector(stub)) + + await service.refresh() + + const snap = service.getSnapshot() + expect(snap.sequence).to.equal(1) + expect(snap.status).to.equal('live') + expect(snap.metrics.admittedUsers).to.equal(5) + }) + + it('sets status to live after a successful refresh', async () => { + const stub = sandbox.stub().resolves(createMetrics()) + + const service = new SnapshotService(makeCollector(stub)) + + const { snapshot } = await service.refresh() + expect(snapshot.status).to.equal('live') + }) +})