Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions .github/workflows/test-module-loader.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
name: Module Loader Tests

on:
push:
branches: [develop]
pull_request:
branches: [develop]
workflow_dispatch: {}

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
test:
name: Module loader integration tests
runs-on: ubuntu-latest

env:
PGHOST: localhost
PGPORT: 5432
PGUSER: postgres
PGPASSWORD: password
PGDATABASE: postgres

services:
pg_db:
image: constructiveio/postgres-plus:18
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
ports:
- 5432:5432
options: >-
--health-cmd "pg_isready -U postgres"
--health-interval 10s
--health-timeout 5s
--health-retries 5

steps:
- uses: actions/checkout@v5

- uses: pnpm/action-setup@v6

- uses: actions/setup-node@v5
with:
node-version: '22'
cache: 'pnpm'

- name: Install pgpm CLI
run: npm install -g pgpm@4.7.4

- name: Bootstrap pgpm admin users
run: pgpm admin-users bootstrap --yes

- name: Generate function packages
run: node --experimental-strip-types scripts/generate.ts

- name: Install dependencies
run: pnpm install --frozen-lockfile

- name: Build module-loader
run: pnpm --filter @constructive-io/module-loader build

- name: Run module-loader tests
run: pnpm --filter @constructive-io/module-loader test
3 changes: 2 additions & 1 deletion extensions/@pgpm/metaschema-modules/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
EXTENSION = metaschema-modules
DATA = sql/metaschema-modules--0.26.5.sql
DATA = sql/metaschema-modules--0.28.0.sql

PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)

Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
-- Deploy schemas/metaschema_modules_public/tables/graph_execution_module/table to pg

-- requires: schemas/metaschema_modules_public/schema
-- requires: schemas/metaschema_modules_public/tables/graph_module/table

BEGIN;

CREATE TABLE metaschema_modules_public.graph_execution_module (
id uuid PRIMARY KEY DEFAULT uuidv7(),
database_id uuid NOT NULL,

-- Schema references (if uuid_nil, resolved from schema name or default)
schema_id uuid NOT NULL DEFAULT uuid_nil(),
private_schema_id uuid NOT NULL DEFAULT uuid_nil(),

-- Optional schema name overrides (used when schema IDs are not provided)
public_schema_name text,
private_schema_name text,

-- Reference to the graph module this execution module operates against.
-- The execution module resolves definition tables (graphs, merkle store)
-- from the linked graph_module at provision time.
graph_module_id uuid NOT NULL,

-- Scope: determines the security level for this module instance.
-- Can differ from graph_module scope (e.g., platform definitions + entity executions).
scope text NOT NULL DEFAULT 'app',

-- Table name prefix. Auto-derived from scope by the trigger when empty.
prefix text NOT NULL DEFAULT '',

-- Generated table IDs (populated by BEFORE INSERT trigger)
-- Execution state tables (partitioned by time)
executions_table_id uuid NOT NULL DEFAULT uuid_nil(),
outputs_table_id uuid NOT NULL DEFAULT uuid_nil(),
node_states_table_id uuid NOT NULL DEFAULT uuid_nil(),


-- Configurable table names (bare names without scope prefix).
-- The trigger prepends the scope prefix automatically.
executions_table_name text NOT NULL DEFAULT 'function_graph_executions',
outputs_table_name text NOT NULL DEFAULT 'function_graph_execution_outputs',
node_states_table_name text NOT NULL DEFAULT 'function_graph_execution_node_states',

-- API routing (get-or-create: if set, schema is added to this API; if NULL, no API is added)
api_name text,
private_api_name text,

-- Entity table for RLS and billing attribution.
-- When set, executions are scoped to the entity (org, app) for billing/metering.
entity_table_id uuid NULL,

-- Configurable security policies (NULL = use defaults based on scope).
policies jsonb NULL,

-- Per-table provisions overrides from blueprint config.
-- Keys are table keys (executions, outputs, node_states).
provisions jsonb NULL,

-- Default permissions: permission names auto-granted to new members.
default_permissions text[] DEFAULT NULL,

-- Timestamps
created_at timestamptz NOT NULL DEFAULT now(),

-- Constraints
CONSTRAINT graph_execution_module_db_fkey FOREIGN KEY (database_id) REFERENCES metaschema_public.database (id) ON DELETE CASCADE,
CONSTRAINT graph_execution_module_schema_fkey FOREIGN KEY (schema_id) REFERENCES metaschema_public.schema (id) ON DELETE CASCADE,
CONSTRAINT graph_execution_module_private_schema_fkey FOREIGN KEY (private_schema_id) REFERENCES metaschema_public.schema (id) ON DELETE CASCADE,
CONSTRAINT graph_execution_module_graph_module_fkey FOREIGN KEY (graph_module_id) REFERENCES metaschema_modules_public.graph_module (id) ON DELETE CASCADE,
CONSTRAINT graph_execution_module_executions_table_fkey FOREIGN KEY (executions_table_id) REFERENCES metaschema_public.table (id) ON DELETE CASCADE,
CONSTRAINT graph_execution_module_outputs_table_fkey FOREIGN KEY (outputs_table_id) REFERENCES metaschema_public.table (id) ON DELETE CASCADE,
CONSTRAINT graph_execution_module_node_states_table_fkey FOREIGN KEY (node_states_table_id) REFERENCES metaschema_public.table (id) ON DELETE CASCADE,

CONSTRAINT graph_execution_module_entity_table_fkey FOREIGN KEY (entity_table_id) REFERENCES metaschema_public.table (id) ON DELETE CASCADE
);

CREATE INDEX graph_execution_module_database_id_idx ON metaschema_modules_public.graph_execution_module ( database_id );

-- One execution module per (database, scope, prefix).
CREATE UNIQUE INDEX graph_execution_module_unique_scope ON metaschema_modules_public.graph_execution_module ( database_id, scope, prefix );

COMMIT;
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# metaschema-modules extension
comment = 'metaschema-modules extension'
default_version = '0.26.5'
default_version = '0.28.0'
module_pathname = '$libdir/metaschema-modules'
requires = 'plpgsql,uuid-ossp,metaschema-schema,services,pgpm-verify'
requires = 'metaschema-schema,pgpm-database-jobs,pgpm-inflection,pgpm-jwt-claims,pgpm-types,pgpm-verify,plpgsql,services,uuid-ossp'
relocatable = false
superuser = false
19 changes: 10 additions & 9 deletions extensions/@pgpm/metaschema-modules/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,6 @@
"test": "jest",
"test:watch": "jest --watch"
},
"dependencies": {
"@pgpm/metaschema-schema": "0.28.0",
"@pgpm/verify": "0.28.0"
},
"devDependencies": {
"pgpm": "^4.23.2"
},
"repository": {
"type": "git",
"url": "https://github.com/constructive-io/pgpm-modules"
Expand All @@ -35,5 +28,13 @@
"bugs": {
"url": "https://github.com/constructive-io/pgpm-modules/issues"
},
"gitHead": "d2ab7ca810ded086eb742eb8f0ca362b6212b97e"
}
"gitHead": "d2ab7ca810ded086eb742eb8f0ca362b6212b97e",
"dependencies": {
"@pgpm/metaschema-schema": "0.28.0",
"@pgpm/services": "0.28.0",
"@pgpm/verify": "0.28.0"
},
"devDependencies": {
"pgpm": "^4.23.2"
}
}
1 change: 1 addition & 0 deletions extensions/@pgpm/metaschema-modules/pgpm.plan
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,4 @@ schemas/metaschema_modules_public/tables/user_credentials_module/table [schemas/
schemas/metaschema_modules_public/tables/user_settings_module/table [schemas/metaschema_modules_public/schema] 2026-05-28T00:00:00Z devin <devin@cognition.ai> # add user_settings_module for extensible per-user preferences (1:1 with users)

schemas/metaschema_modules_public/tables/i18n_module/table [schemas/metaschema_modules_public/schema] 2026-05-28T00:00:00Z devin <devin@cognition.ai> # add i18n_module config table for internationalization settings
schemas/metaschema_modules_public/tables/graph_execution_module/table [schemas/metaschema_modules_public/schema schemas/metaschema_modules_public/tables/graph_module/table] 2026-06-12T00:00:00Z devin <devin@cognition.ai> # add graph_execution_module config table for partitioned execution state + merkle tree time-travel debugging
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Revert schemas/metaschema_modules_public/tables/graph_execution_module/table from pg

BEGIN;

DROP TABLE metaschema_modules_public.graph_execution_module;

COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Verify schemas/metaschema_modules_public/tables/graph_execution_module/table on pg

BEGIN;

SELECT verify_table ('metaschema_modules_public.graph_execution_module');

ROLLBACK;
16 changes: 4 additions & 12 deletions job/compute-service/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* 4. A Scheduler for cron-like scheduled jobs
*/

import ComputeWorker, { ComputeModuleLoader } from '@constructive-io/compute-worker';
import ComputeWorker, { ModuleLoader } from '@constructive-io/compute-worker';
import poolManager from '@constructive-io/job-pg';
import Scheduler from '@constructive-io/job-scheduler';
import {
Expand Down Expand Up @@ -399,17 +399,9 @@ export const waitForComputePrereqs = async (): Promise<void> => {
database: cfg.database,
max: 1,
});
const loader = new ComputeModuleLoader(pool, 0);
const config = await loader.load(databaseId);

if (config.functionModule) {
const { publicSchema, definitionsTable } = config.functionModule;
await client.query(`SELECT count(*) FROM "${publicSchema}"."${definitionsTable}" LIMIT 1`);
} else {
// Metaschema not populated — check the compute schema directly
log.info('function_module not in metaschema, checking constructive_compute_public directly');
await client.query('SELECT count(*) FROM constructive_compute_public.platform_function_definitions LIMIT 1');
}
const loader = new ModuleLoader({ pool, ttlMs: 0 });
const fnConfig = await loader.function.loadDefault(databaseId);
await client.query(`SELECT count(*) FROM "${fnConfig.publicSchema}"."${fnConfig.definitionsTable}" LIMIT 1`);

log.info('compute prereqs satisfied (jobs table + compute module present)');
} catch (error) {
Expand Down
58 changes: 52 additions & 6 deletions job/compute-worker/src/billing.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,59 @@
/**
* BillingTracker — re-exports BillingLoader from @constructive-io/module-loader
* under the legacy name `BillingTracker`.
* BillingTracker — quota checks and usage recording via billing_module.
*
* Resolves billing config dynamically via ModuleLoader. Gracefully no-ops
* when billing is not provisioned (standalone dev mode).
*/

import { BillingLoader } from '@constructive-io/module-loader';
import type { BillingModuleConfig } from '@constructive-io/module-loader';
import { AmbiguousScopeError, ModuleLoader, ModuleNotProvisionedError } from '@constructive-io/module-loader';
import type { Pool } from 'pg';

export class BillingTracker extends BillingLoader {
constructor(pool: Pool, databaseId: string, cacheTtlMs?: number) {
super(pool, databaseId, cacheTtlMs);
export class BillingTracker {
private loader: ModuleLoader;
private pool: Pool;
private databaseId: string;

constructor(pool: Pool, databaseId: string) {
this.pool = pool;
this.databaseId = databaseId;
this.loader = new ModuleLoader({ pool });
}

async load(databaseId?: string): Promise<BillingModuleConfig | null> {
try {
return await this.loader.billing.load(databaseId ?? this.databaseId, null);
} catch (err) {
if (err instanceof ModuleNotProvisionedError) return null;
if (err instanceof AmbiguousScopeError) {
return await this.loader.billing.loadDefault(databaseId ?? this.databaseId);
}
return null;
}
}

async checkQuota(entityId: string, meterSlug: string, amount = 1, databaseId?: string): Promise<boolean> {
const config = await this.load(databaseId);
if (!config) return true;
try {
const sql = `SELECT "${config.privateSchema}"."check_billing_quota"($1, $2::uuid, $3) AS allowed`;
const { rows } = await this.pool.query(sql, [meterSlug, entityId, amount]);
return rows[0]?.allowed !== false;
} catch {
return true;
}
}

async recordUsage(entityId: string, meterSlug: string, amount: number, metadata: Record<string, unknown>, databaseId?: string): Promise<void> {
const config = await this.load(databaseId);
if (!config) return;
try {
const sql = `SELECT "${config.privateSchema}"."${config.recordUsageFunction}"($1, $2::uuid, $3, $4::jsonb)`;
await this.pool.query(sql, [meterSlug, entityId, amount, JSON.stringify(metadata)]);
} catch { /* non-fatal */ }
}

invalidate(databaseId?: string): void {
this.loader.billing.invalidate(databaseId);
}
}
23 changes: 21 additions & 2 deletions job/compute-worker/src/cache.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,23 @@
/**
* Re-export TtlCache from @constructive-io/module-loader.
* Simple TTL cache for compute-worker internal use.
*/
export { TtlCache } from '@constructive-io/module-loader';
interface CacheEntry<T> { value: T; expiresAt: number; }

export class TtlCache<T> {
private store = new Map<string, CacheEntry<T>>();
constructor(private ttlMs: number) {}

get(key: string): T | undefined {
const e = this.store.get(key);
if (!e) return undefined;
if (Date.now() > e.expiresAt) { this.store.delete(key); return undefined; }
return e.value;
}

set(key: string, value: T): void {
this.store.set(key, { value, expiresAt: Date.now() + this.ttlMs });
}

delete(key: string): void { this.store.delete(key); }
clear(): void { this.store.clear(); }
}
Loading
Loading