Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions migrations/20260411_210100_add_dashboard_live_notify_triggers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
exports.up = async function (knex) {
await knex.schema.raw(`
CREATE OR REPLACE FUNCTION notify_dashboard_events_changed() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('dashboard_events_changed', TG_OP);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
`)

await knex.schema.raw(`
DROP TRIGGER IF EXISTS dashboard_events_changed_trigger ON events;

CREATE TRIGGER dashboard_events_changed_trigger
AFTER INSERT OR UPDATE OR DELETE ON events
FOR EACH STATEMENT
EXECUTE FUNCTION notify_dashboard_events_changed();
`)

await knex.schema.raw(`
CREATE OR REPLACE FUNCTION notify_dashboard_users_changed() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('dashboard_users_changed', TG_OP);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
`)

await knex.schema.raw(`
DROP TRIGGER IF EXISTS dashboard_users_changed_trigger ON users;

CREATE TRIGGER dashboard_users_changed_trigger
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH STATEMENT
EXECUTE FUNCTION notify_dashboard_users_changed();
`)
}

exports.down = async function (knex) {
await knex.schema.raw('DROP TRIGGER IF EXISTS dashboard_events_changed_trigger ON events;')
await knex.schema.raw('DROP TRIGGER IF EXISTS dashboard_users_changed_trigger ON users;')
await knex.schema.raw('DROP FUNCTION IF EXISTS notify_dashboard_events_changed();')
await knex.schema.raw('DROP FUNCTION IF EXISTS notify_dashboard_users_changed();')
}
40 changes: 40 additions & 0 deletions migrations/20260412_020000_add_dashboard_kpi_indexes.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Migration: add dashboard KPI query indexes
*
* Without these the incremental collector degrades to sequential scans:
* - idx_events_cursor → covers the (first_seen, id) cursor predicate used in every
* incremental delta query and the bootstrap cursor select.
* - idx_events_pubkey → covers the GROUP BY event_pubkey in the all-time talker query.
* - idx_users_cursor → covers the (updated_at, pubkey) cursor predicate used in the
* user delta / cursor-select queries.
*
* All three are created CONCURRENTLY so they don't lock the table on a live relay.
* knex does not support CREATE INDEX CONCURRENTLY natively, so we use raw SQL and
* set `disableTransactions` to true (DDL inside a transaction would negate CONCURRENTLY).
*/

exports.up = async (knex) => {
await knex.raw(`
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_cursor
ON events (first_seen, id);
`)

await knex.raw(`
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_pubkey
ON events (event_pubkey);
`)

await knex.raw(`
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_cursor
ON users (updated_at, pubkey);
`)
}

exports.down = async (knex) => {
await knex.raw('DROP INDEX CONCURRENTLY IF EXISTS idx_events_cursor;')
await knex.raw('DROP INDEX CONCURRENTLY IF EXISTS idx_events_pubkey;')
await knex.raw('DROP INDEX CONCURRENTLY IF EXISTS idx_users_cursor;')
}

// Required so knex doesn't wrap the CONCURRENTLY statements in a transaction.
exports.config = { transaction: false }
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
"main": "src/index.ts",
"scripts": {
"dev": "node -r ts-node/register src/index.ts",
"dev:dashboard": "node -r ts-node/register src/dashboard-service/index.ts",
"clean": "rimraf ./{dist,.nyc_output,.test-reports,.coverage}",
"build": "tsc --project tsconfig.build.json",
"prestart": "npm run build",
"start": "cd dist && node src/index.js",
"start:dashboard": "node dist/src/dashboard-service/index.js",
"build:check": "npm run build -- --noEmit",
"lint": "eslint --ext .ts ./src ./test",
"lint:report": "eslint -o .lint-reports/eslint.json -f json --ext .ts ./src ./test",
Expand All @@ -37,6 +39,7 @@
"db:seed": "knex seed:run",
"pretest:unit": "mkdir -p .test-reports/unit",
"test:unit": "mocha 'test/**/*.spec.ts'",
"test:unit:dashboard": "mocha 'test/unit/dashboard-service/**/*.spec.ts'",
"test:unit:watch": "npm run test:unit -- --min --watch --watch-files src/**/*,test/**/*",
"cover:unit": "nyc --report-dir .coverage/unit npm run test:unit",
"docker:build": "docker build -t nostream .",
Expand Down
12 changes: 12 additions & 0 deletions src/dashboard-service/api/dashboard-router.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Router } from 'express'

import { createGetKPISnapshotRequestHandler } from '../handlers/request-handlers/get-kpi-snapshot-request-handler'
import { SnapshotService } from '../services/snapshot-service'

export const createDashboardRouter = (snapshotService: SnapshotService): Router => {
const router = Router()

router.get('/snapshot', createGetKPISnapshotRequestHandler(snapshotService))

return router
}
165 changes: 165 additions & 0 deletions src/dashboard-service/app.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import { getMasterDbClient, getReadReplicaDbClient } from '../database/client'
import { IKPICollector, SnapshotService } from './services/snapshot-service'
import { createDashboardRouter } from './api/dashboard-router'
import { createLogger } from '../factories/logger-factory'
import { DashboardServiceConfig } from './config'
import { DashboardWebSocketHub } from './ws/dashboard-ws-hub'
import express from 'express'
import { getHealthRequestHandler } from './handlers/request-handlers/get-health-request-handler'
import http from 'http'
import { IncrementalKPICollectorService } from './services/incremental-kpi-collector-service'
import { KPICollectorService } from './services/kpi-collector-service'
import { PollingScheduler } from './polling/polling-scheduler'
import { StatefulIncrementalKPICollectorService } from './services/stateful-incremental-service'
import { WebSocketServer } from 'ws'
const debug = createLogger('dashboard-service:app')

export interface DashboardService {
readonly config: DashboardServiceConfig
readonly snapshotService: SnapshotService
readonly pollingScheduler: PollingScheduler
start(): Promise<void>
stop(): Promise<void>
getHttpPort(): number
}

export const createDashboardService = (config: DashboardServiceConfig): DashboardService => {
console.info(
'dashboard-service: creating service (host=%s, port=%d, wsPath=%s, pollIntervalMs=%d, useDummyData=%s, collectorMode=%s)',
config.host,
config.port,
config.wsPath,
config.pollIntervalMs,
config.useDummyData,
config.collectorMode,
)

const collector: IKPICollector = config.useDummyData
? {
collectMetrics: async () => ({
eventsByKind: [],
admittedUsers: 0,
satsPaid: 0,
topTalkers: { allTime: [], recent: [] },
}),
}
: (() => {
if (config.collectorMode === 'stateful-incremental') {
return new StatefulIncrementalKPICollectorService(getMasterDbClient())
}

const dbClient = getReadReplicaDbClient()
return config.collectorMode === 'incremental'
? new IncrementalKPICollectorService(dbClient)
: new KPICollectorService(dbClient)
})()

const snapshotService = new SnapshotService(collector)

const app = express()
.disable('x-powered-by')
.get('/healthz', getHealthRequestHandler)
.use('/api/v1/kpis', createDashboardRouter(snapshotService))

const webServer = http.createServer(app)
const webSocketServer = new WebSocketServer({
server: webServer,
path: config.wsPath,
})

const webSocketHub = new DashboardWebSocketHub(webSocketServer, () => snapshotService.getSnapshot())

const pollingScheduler = new PollingScheduler(config.pollIntervalMs, async () => {
const { snapshot, changed } = await snapshotService.refresh()

if (!changed) {
debug('poll tick detected no KPI changes')
return
}

debug('poll tick produced snapshot sequence=%d status=%s', snapshot.sequence, snapshot.status)
webSocketHub.broadcastTick(snapshot.sequence)
webSocketHub.broadcastSnapshot(snapshot)
})

const start = async () => {
if (webServer.listening) {
debug('start requested but service is already listening')
return
}

console.info('dashboard-service: starting http and websocket servers')

await new Promise<void>((resolve, reject) => {
webServer.listen(config.port, config.host, () => {
const address = webServer.address()
debug('listening on %o', address)
console.info('dashboard-service: listening on %o', address)
resolve()
})
webServer.once('error', (error) => {
console.error('dashboard-service: failed to start server', error)
reject(error)
})
})

try {
const initialSnapshotRefresh = await snapshotService.refresh()
if (initialSnapshotRefresh.changed) {
debug('initial snapshot prepared with sequence=%d status=%s', initialSnapshotRefresh.snapshot.sequence, initialSnapshotRefresh.snapshot.status)
}
} catch (error) {
console.error('dashboard-service: initial snapshot refresh failed (will retry on next poll)', error)
}

pollingScheduler.start()
console.info('dashboard-service: polling scheduler started')
}

const stop = async () => {
console.info('dashboard-service: stopping service')
pollingScheduler.stop()

if (collector?.close) {
try {
await collector.close()
} catch (error) {
console.error('dashboard-service: failed to close collector resources', error)
}
}

webSocketHub.close()
await new Promise<void>((resolve, reject) => {
if (!webServer.listening) {
debug('stop requested while server was already stopped')
resolve()
return
}

webServer.close((error) => {
if (error) {
console.error('dashboard-service: failed to stop cleanly', error)
reject(error)
return
}

console.info('dashboard-service: http server closed')
resolve()
})
})
}

const getHttpPort = (): number => {
const address = webServer.address()
return typeof address === 'object' && address !== null ? address.port : config.port
}

return {
config,
snapshotService,
pollingScheduler,
start,
stop,
getHttpPort,
}
}
58 changes: 58 additions & 0 deletions src/dashboard-service/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
export interface DashboardServiceConfig {
host: string
port: number
wsPath: string
pollIntervalMs: number
useDummyData: boolean
collectorMode: DashboardCollectorMode
}

export type DashboardCollectorMode = 'full' | 'incremental' | 'stateful-incremental'

const parseBoolean = (value: string | undefined, fallback = false): boolean => {
if (typeof value === 'undefined') {
return fallback
}

return value === '1' || value.toLowerCase() === 'true'
}

const parseInteger = (value: string | undefined, fallback: number): number => {
if (typeof value === 'undefined' || value === '') {
return fallback
}

const parsed = Number(value)
if (!Number.isInteger(parsed) || parsed < 0) {
return fallback
}

return parsed
}

const parseCollectorMode = (
value: string | undefined,
fallback: DashboardCollectorMode = 'full',
): DashboardCollectorMode => {
if (typeof value === 'undefined') {
return fallback
}

const normalized = value.toLowerCase()
if (normalized === 'full' || normalized === 'incremental' || normalized === 'stateful-incremental') {
return normalized
}

return fallback
}

export const getDashboardServiceConfig = (): DashboardServiceConfig => {
return {
host: process.env.DASHBOARD_SERVICE_HOST ?? '127.0.0.1',
port: parseInteger(process.env.DASHBOARD_SERVICE_PORT, 8011),
wsPath: process.env.DASHBOARD_WS_PATH ?? '/api/v1/kpis/stream',
pollIntervalMs: parseInteger(process.env.DASHBOARD_POLL_INTERVAL_MS, 5000),
useDummyData: parseBoolean(process.env.DASHBOARD_USE_DUMMY_DATA, false),
collectorMode: parseCollectorMode(process.env.DASHBOARD_COLLECTOR_MODE, 'full'),
}
}
12 changes: 12 additions & 0 deletions src/dashboard-service/controllers/get-health-controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Request, Response } from 'express'

import { IController } from '../../@types/controllers'

export class GetHealthController implements IController {
public async handleRequest(_request: Request, response: Response): Promise<void> {
response
.status(200)
.setHeader('content-type', 'application/json; charset=utf-8')
.send({ status: 'ok' })
}
}
20 changes: 20 additions & 0 deletions src/dashboard-service/controllers/get-kpi-snapshot-controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Request, Response } from 'express'

import { DashboardSnapshotResponse } from '../types'
import { IController } from '../../@types/controllers'
import { SnapshotService } from '../services/snapshot-service'

export class GetKPISnapshotController implements IController {
public constructor(private readonly snapshotService: SnapshotService) { }

public async handleRequest(_request: Request, response: Response): Promise<void> {
const payload: DashboardSnapshotResponse = {
data: this.snapshotService.getSnapshot(),
}

response
.status(200)
.setHeader('content-type', 'application/json; charset=utf-8')
.send(payload)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { withController } from '../../../handlers/request-handlers/with-controller-request-handler'

import { GetHealthController } from '../../controllers/get-health-controller'

export const getHealthRequestHandler = withController(() => new GetHealthController())
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { withController } from '../../../handlers/request-handlers/with-controller-request-handler'

import { GetKPISnapshotController } from '../../controllers/get-kpi-snapshot-controller'
import { SnapshotService } from '../../services/snapshot-service'

export const createGetKPISnapshotRequestHandler = (snapshotService: SnapshotService) => {
return withController(() => new GetKPISnapshotController(snapshotService))
}
Loading