diff --git a/db/migrations/0003_opml_imports_mvp_foundation.sql b/db/migrations/0003_opml_imports_mvp_foundation.sql new file mode 100644 index 0000000..43b2458 --- /dev/null +++ b/db/migrations/0003_opml_imports_mvp_foundation.sql @@ -0,0 +1,66 @@ +ALTER TABLE feeds +ADD COLUMN IF NOT EXISTS normalized_url_hash TEXT; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_feeds_normalized_url_hash_unique +ON feeds (normalized_url_hash) +WHERE normalized_url_hash IS NOT NULL; + +CREATE TABLE IF NOT EXISTS opml_imports ( + id BIGSERIAL PRIMARY KEY, + status TEXT NOT NULL CHECK (status IN ('uploaded', 'parsing', 'preview_ready', 'importing', 'completed', 'failed_validation', 'failed')), + file_name TEXT NOT NULL, + file_size_bytes BIGINT NOT NULL CHECK (file_size_bytes >= 0), + source_checksum TEXT, + error_message TEXT, + total_items INT NOT NULL DEFAULT 0 CHECK (total_items >= 0), + valid_items INT NOT NULL DEFAULT 0 CHECK (valid_items >= 0), + duplicate_items INT NOT NULL DEFAULT 0 CHECK (duplicate_items >= 0), + existing_items INT NOT NULL DEFAULT 0 CHECK (existing_items >= 0), + invalid_items INT NOT NULL DEFAULT 0 CHECK (invalid_items >= 0), + imported_items INT NOT NULL DEFAULT 0 CHECK (imported_items >= 0), + uploaded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + confirmed_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_opml_imports_status_created_at +ON opml_imports (status, created_at DESC); + +CREATE TABLE IF NOT EXISTS opml_import_items ( + id BIGSERIAL PRIMARY KEY, + import_id BIGINT NOT NULL REFERENCES opml_imports(id) ON DELETE CASCADE, + title TEXT, + outline_path TEXT, + source_xml_url TEXT, + normalized_url TEXT, + normalized_url_hash TEXT, + feed_id INT REFERENCES feeds(id) ON DELETE SET NULL, + item_status TEXT NOT NULL CHECK (item_status IN ('new', 'existing', 'duplicate', 'invalid', 'imported', 'failed')), + validation_error TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + CONSTRAINT opml_import_items_normalized_url_required + CHECK (item_status = 'invalid' OR (normalized_url IS NOT NULL AND normalized_url_hash IS NOT NULL)) +); + +CREATE INDEX IF NOT EXISTS idx_opml_import_items_import_id_status +ON opml_import_items (import_id, item_status); + +CREATE INDEX IF NOT EXISTS idx_opml_import_items_hash +ON opml_import_items (normalized_url_hash); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_opml_import_items_dedupe_per_import +ON opml_import_items (import_id, normalized_url_hash) +WHERE normalized_url_hash IS NOT NULL; + +-- Manual rollback guide (framework currently applies only "up" migrations): +-- DROP INDEX IF EXISTS idx_opml_import_items_dedupe_per_import; +-- DROP INDEX IF EXISTS idx_opml_import_items_hash; +-- DROP INDEX IF EXISTS idx_opml_import_items_import_id_status; +-- DROP TABLE IF EXISTS opml_import_items; +-- DROP INDEX IF EXISTS idx_opml_imports_status_created_at; +-- DROP TABLE IF EXISTS opml_imports; +-- DROP INDEX IF EXISTS idx_feeds_normalized_url_hash_unique; +-- ALTER TABLE feeds DROP COLUMN IF EXISTS normalized_url_hash; diff --git a/db/migrations/0004_agent_interface_filter_indexing.sql b/db/migrations/0004_agent_interface_filter_indexing.sql new file mode 100644 index 0000000..79791b7 --- /dev/null +++ b/db/migrations/0004_agent_interface_filter_indexing.sql @@ -0,0 +1,23 @@ +ALTER TABLE entries +ADD COLUMN IF NOT EXISTS normalized_search_document TEXT NOT NULL DEFAULT ''; + +UPDATE entries +SET normalized_search_document = LOWER( + TRANSLATE( + COALESCE(title, '') || ' ' || COALESCE(content, ''), + 'ÁÀÂÄÃÅáàâäãåÉÈÊËéèêëÍÌÎÏíìîïÓÒÔÖÕóòôöõÚÙÛÜúùûüÑñÇç', + 'AAAAAAaaaaaaEEEEeeeeIIIIiiiiOOOOOoooooUUUUuuuuNnCc' + ) +) +WHERE normalized_search_document = ''; + +CREATE INDEX IF NOT EXISTS idx_entries_normalized_search_document_tsv +ON entries USING GIN (to_tsvector('simple', normalized_search_document)); + +CREATE INDEX IF NOT EXISTS idx_entries_feed_published_id +ON entries (feed_id, published_at DESC, id DESC); + +-- Manual rollback guide (framework currently applies only "up" migrations): +-- DROP INDEX IF EXISTS idx_entries_feed_published_id; +-- DROP INDEX IF EXISTS idx_entries_normalized_search_document_tsv; +-- ALTER TABLE entries DROP COLUMN IF EXISTS normalized_search_document; diff --git a/db/migrations/0005_opml_duplicate_index_compatibility.sql b/db/migrations/0005_opml_duplicate_index_compatibility.sql new file mode 100644 index 0000000..7df6f53 --- /dev/null +++ b/db/migrations/0005_opml_duplicate_index_compatibility.sql @@ -0,0 +1,11 @@ +DROP INDEX IF EXISTS idx_opml_import_items_dedupe_per_import; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_opml_import_items_dedupe_per_import +ON opml_import_items (import_id, normalized_url_hash) +WHERE normalized_url_hash IS NOT NULL AND item_status <> 'duplicate'; + +-- Manual rollback guide (framework currently applies only "up" migrations): +-- DROP INDEX IF EXISTS idx_opml_import_items_dedupe_per_import; +-- CREATE UNIQUE INDEX IF NOT EXISTS idx_opml_import_items_dedupe_per_import +-- ON opml_import_items (import_id, normalized_url_hash) +-- WHERE normalized_url_hash IS NOT NULL; diff --git a/src/app.module.ts b/src/app.module.ts index 083c7bb..2c81449 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -8,6 +8,7 @@ import { FeedsModule } from './modules/feeds/feeds.module'; import { IngestionModule } from './modules/ingestion/ingestion.module'; import { NotificationsModule } from './modules/notifications/notifications.module'; import { ObservabilityModule } from './modules/observability/observability.module'; +import { OpmlImportsModule } from './modules/opml-imports/opml-imports.module'; import { RulesModule } from './modules/rules/rules.module'; import { configuration } from './shared/config/configuration'; import { AppConfigModule } from './shared/config/app-config.module'; @@ -33,6 +34,7 @@ import { QueueModule } from './infrastructure/queue/queue.module'; NotificationsModule, AlertsModule, ObservabilityModule, + OpmlImportsModule, IngestionModule, ], providers: [ diff --git a/src/infrastructure/queue/opml-apply-import.queue.ts b/src/infrastructure/queue/opml-apply-import.queue.ts new file mode 100644 index 0000000..eea5d22 --- /dev/null +++ b/src/infrastructure/queue/opml-apply-import.queue.ts @@ -0,0 +1,56 @@ +import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; +import { Job, Queue, Worker } from 'bullmq'; + +import { AppConfigService } from '../../shared/config/app-config.service'; + +import { buildQueueJobId } from './job-id'; +import { + OPML_APPLY_IMPORT_QUEUE_NAME, + OpmlApplyImportJobData, + OpmlApplyImportQueuePort, +} from './queue.constants'; + +@Injectable() +export class OpmlApplyImportQueue implements OpmlApplyImportQueuePort, OnApplicationShutdown { + private readonly queue: Queue; + + private get connection() { + return { url: this.configService.redisUrl }; + } + + constructor(@Inject(AppConfigService) private readonly configService: AppConfigService) { + this.queue = new Queue(OPML_APPLY_IMPORT_QUEUE_NAME, { + connection: this.connection, + defaultJobOptions: { + attempts: 4, + backoff: { + type: 'exponential', + delay: 2000, + }, + removeOnComplete: 100, + removeOnFail: 100, + }, + }); + } + + async enqueue(job: OpmlApplyImportJobData): Promise { + await this.queue.add(OPML_APPLY_IMPORT_QUEUE_NAME, job, { + jobId: buildQueueJobId('opml-apply', job.importId), + }); + } + + createWorker(processor: (job: Job) => Promise): Worker { + return new Worker( + OPML_APPLY_IMPORT_QUEUE_NAME, + async (job) => processor(job), + { + connection: this.connection, + concurrency: Math.max(1, Math.min(this.configService.workerConcurrency, 3)), + }, + ); + } + + async onApplicationShutdown(): Promise { + await this.queue.close(); + } +} diff --git a/src/infrastructure/queue/opml-parse-preview.queue.ts b/src/infrastructure/queue/opml-parse-preview.queue.ts new file mode 100644 index 0000000..186982f --- /dev/null +++ b/src/infrastructure/queue/opml-parse-preview.queue.ts @@ -0,0 +1,56 @@ +import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; +import { Job, Queue, Worker } from 'bullmq'; + +import { AppConfigService } from '../../shared/config/app-config.service'; + +import { buildQueueJobId } from './job-id'; +import { + OPML_PARSE_PREVIEW_QUEUE_NAME, + OpmlParsePreviewJobData, + OpmlParsePreviewQueuePort, +} from './queue.constants'; + +@Injectable() +export class OpmlParsePreviewQueue implements OpmlParsePreviewQueuePort, OnApplicationShutdown { + private readonly queue: Queue; + + private get connection() { + return { url: this.configService.redisUrl }; + } + + constructor(@Inject(AppConfigService) private readonly configService: AppConfigService) { + this.queue = new Queue(OPML_PARSE_PREVIEW_QUEUE_NAME, { + connection: this.connection, + defaultJobOptions: { + attempts: 3, + backoff: { + type: 'exponential', + delay: 1000, + }, + removeOnComplete: 100, + removeOnFail: 100, + }, + }); + } + + async enqueue(job: OpmlParsePreviewJobData): Promise { + await this.queue.add(OPML_PARSE_PREVIEW_QUEUE_NAME, job, { + jobId: buildQueueJobId('opml-parse', job.importId), + }); + } + + createWorker(processor: (job: Job) => Promise): Worker { + return new Worker( + OPML_PARSE_PREVIEW_QUEUE_NAME, + async (job) => processor(job), + { + connection: this.connection, + concurrency: Math.max(1, Math.min(this.configService.workerConcurrency, 3)), + }, + ); + } + + async onApplicationShutdown(): Promise { + await this.queue.close(); + } +} diff --git a/src/infrastructure/queue/queue.constants.ts b/src/infrastructure/queue/queue.constants.ts index 742de5d..9268d97 100644 --- a/src/infrastructure/queue/queue.constants.ts +++ b/src/infrastructure/queue/queue.constants.ts @@ -1,9 +1,13 @@ export const REDIS_CONNECTION = Symbol('REDIS_CONNECTION'); export const FETCH_FEED_QUEUE_TOKEN = Symbol('FETCH_FEED_QUEUE_TOKEN'); export const ALERT_DELIVERY_QUEUE_TOKEN = Symbol('ALERT_DELIVERY_QUEUE_TOKEN'); +export const OPML_PARSE_PREVIEW_QUEUE_TOKEN = Symbol('OPML_PARSE_PREVIEW_QUEUE_TOKEN'); +export const OPML_APPLY_IMPORT_QUEUE_TOKEN = Symbol('OPML_APPLY_IMPORT_QUEUE_TOKEN'); export const FETCH_FEED_QUEUE_NAME = 'fetch-feed'; export const ALERT_DELIVERY_QUEUE_NAME = 'alert-delivery'; +export const OPML_PARSE_PREVIEW_QUEUE_NAME = 'opml-parse-preview'; +export const OPML_APPLY_IMPORT_QUEUE_NAME = 'opml-apply-import'; export interface FetchFeedJobData { feedId: number; @@ -24,3 +28,21 @@ export interface AlertDeliveryJobData { export interface AlertDeliveryQueuePort { enqueue(job: AlertDeliveryJobData): Promise; } + +export interface OpmlParsePreviewJobData { + importId: number; + opmlXml: string; +} + +export interface OpmlParsePreviewQueuePort { + enqueue(job: OpmlParsePreviewJobData): Promise; +} + +export interface OpmlApplyImportJobData { + importId: number; + requestedAt: string; +} + +export interface OpmlApplyImportQueuePort { + enqueue(job: OpmlApplyImportJobData): Promise; +} diff --git a/src/infrastructure/queue/queue.module.ts b/src/infrastructure/queue/queue.module.ts index 6354791..48d05b1 100644 --- a/src/infrastructure/queue/queue.module.ts +++ b/src/infrastructure/queue/queue.module.ts @@ -5,9 +5,17 @@ import { AppConfigModule } from '../../shared/config/app-config.module'; import { AppConfigService } from '../../shared/config/app-config.service'; import { AlertDeliveryQueue } from './alert-delivery.queue'; +import { OpmlApplyImportQueue } from './opml-apply-import.queue'; +import { OpmlParsePreviewQueue } from './opml-parse-preview.queue'; import { FetchFeedQueue } from './fetch-feed.queue'; -import { ALERT_DELIVERY_QUEUE_TOKEN, FETCH_FEED_QUEUE_TOKEN, REDIS_CONNECTION } from './queue.constants'; +import { + ALERT_DELIVERY_QUEUE_TOKEN, + FETCH_FEED_QUEUE_TOKEN, + OPML_APPLY_IMPORT_QUEUE_TOKEN, + OPML_PARSE_PREVIEW_QUEUE_TOKEN, + REDIS_CONNECTION, +} from './queue.constants'; @Global() @Module({ @@ -20,6 +28,8 @@ import { ALERT_DELIVERY_QUEUE_TOKEN, FETCH_FEED_QUEUE_TOKEN, REDIS_CONNECTION } }, AlertDeliveryQueue, FetchFeedQueue, + OpmlParsePreviewQueue, + OpmlApplyImportQueue, { provide: ALERT_DELIVERY_QUEUE_TOKEN, useExisting: AlertDeliveryQueue, @@ -28,7 +38,25 @@ import { ALERT_DELIVERY_QUEUE_TOKEN, FETCH_FEED_QUEUE_TOKEN, REDIS_CONNECTION } provide: FETCH_FEED_QUEUE_TOKEN, useExisting: FetchFeedQueue, }, + { + provide: OPML_PARSE_PREVIEW_QUEUE_TOKEN, + useExisting: OpmlParsePreviewQueue, + }, + { + provide: OPML_APPLY_IMPORT_QUEUE_TOKEN, + useExisting: OpmlApplyImportQueue, + }, + ], + exports: [ + REDIS_CONNECTION, + AlertDeliveryQueue, + FetchFeedQueue, + OpmlParsePreviewQueue, + OpmlApplyImportQueue, + ALERT_DELIVERY_QUEUE_TOKEN, + FETCH_FEED_QUEUE_TOKEN, + OPML_PARSE_PREVIEW_QUEUE_TOKEN, + OPML_APPLY_IMPORT_QUEUE_TOKEN, ], - exports: [REDIS_CONNECTION, AlertDeliveryQueue, FetchFeedQueue, ALERT_DELIVERY_QUEUE_TOKEN, FETCH_FEED_QUEUE_TOKEN], }) export class QueueModule {} diff --git a/src/modules/entries/entries.repository.ts b/src/modules/entries/entries.repository.ts index df609f1..d5fa856 100644 --- a/src/modules/entries/entries.repository.ts +++ b/src/modules/entries/entries.repository.ts @@ -28,6 +28,22 @@ interface EntryRow { fetched_at: Date; } +interface EntryFilterRow { + id: string; + feed_id: number; + title: string | null; + content: string | null; + published_at: Date | null; +} + +export interface EntryFilterCandidate { + id: string; + feedId: number; + title: string | null; + content: string | null; + publishedAt: string | null; +} + function mapEntry(row: EntryRow): Entry { return { id: row.id, @@ -63,12 +79,21 @@ export class EntriesRepository { for (const entry of entries) { const result = await executor.query( ` - INSERT INTO entries (feed_id, title, link, guid, content, content_hash, published_at) - VALUES ($1, $2, $3, $4, $5, $6, $7) + INSERT INTO entries (feed_id, title, link, guid, content, content_hash, published_at, normalized_search_document) + VALUES ($1, $2, $3, $4, $5, $6, $7, LOWER($8)) ON CONFLICT DO NOTHING RETURNING id, feed_id, title, link, guid, content, content_hash, published_at, fetched_at `, - [feedId, entry.title, entry.link, entry.guid, entry.content, entry.contentHash, entry.publishedAt], + [ + feedId, + entry.title, + entry.link, + entry.guid, + entry.content, + entry.contentHash, + entry.publishedAt, + `${entry.title ?? ''} ${entry.content ?? ''}`, + ], ); if (result.rows[0]) { @@ -108,4 +133,25 @@ export class EntriesRepository { total: Number(totalResult.rows[0]?.count ?? '0'), }; } + + async listForFilterSearch(limit: number): Promise { + const cappedLimit = Math.max(1, Math.min(limit, 5000)); + const result = await this.databaseService.query( + ` + SELECT id, feed_id, title, content, published_at + FROM entries + ORDER BY published_at DESC NULLS LAST, id DESC + LIMIT $1 + `, + [cappedLimit], + ); + + return result.rows.map((row) => ({ + id: row.id, + feedId: row.feed_id, + title: row.title, + content: row.content, + publishedAt: row.published_at?.toISOString() ?? null, + })); + } } diff --git a/src/modules/feeds/feeds.module.ts b/src/modules/feeds/feeds.module.ts index d9140e8..87ee1f5 100644 --- a/src/modules/feeds/feeds.module.ts +++ b/src/modules/feeds/feeds.module.ts @@ -12,6 +12,6 @@ import { FeedsController } from './http/feeds.controller'; @Module({ controllers: [FeedsController], providers: [FeedsRepository, RegisterFeedUseCase, ListFeedsUseCase, GetFeedUseCase, UpdateFeedUseCase, DisableFeedUseCase, CheckFeedNowUseCase], - exports: [FeedsRepository], + exports: [FeedsRepository, RegisterFeedUseCase, ListFeedsUseCase], }) export class FeedsModule {} diff --git a/src/modules/feeds/feeds.repository.ts b/src/modules/feeds/feeds.repository.ts index cf80b78..42d5078 100644 --- a/src/modules/feeds/feeds.repository.ts +++ b/src/modules/feeds/feeds.repository.ts @@ -1,6 +1,7 @@ import { BadRequestException, ConflictException, Injectable } from '@nestjs/common'; import { DatabaseService } from '../../infrastructure/persistence/database.service'; +import { buildNormalizedFeedUrlHash, normalizeFeedUrl } from '../opml-imports/domain/url-normalizer'; import { Feed } from './domain/feed.entity'; @@ -47,14 +48,16 @@ export class FeedsRepository { constructor(private readonly databaseService: DatabaseService) {} async create(input: { url: string; pollIntervalSeconds: number; status: 'active' | 'paused' | 'error' }): Promise { + const normalizedUrlHash = this.buildNormalizedUrlHashOrNull(input.url); + try { const result = await this.databaseService.query( ` - INSERT INTO feeds (url, poll_interval_seconds, status, next_check_at) - VALUES ($1, $2, $3, NOW()) + INSERT INTO feeds (url, normalized_url_hash, poll_interval_seconds, status, next_check_at) + VALUES ($1, $2, $3, $4, NOW()) RETURNING * `, - [input.url, input.pollIntervalSeconds, input.status], + [input.url, normalizedUrlHash, input.pollIntervalSeconds, input.status], ); return mapFeed(result.rows[0]); @@ -67,6 +70,14 @@ export class FeedsRepository { } } + private buildNormalizedUrlHashOrNull(url: string): string | null { + try { + return buildNormalizedFeedUrlHash(normalizeFeedUrl(url)); + } catch { + return null; + } + } + async list(filters: { status?: string; query?: string; diff --git a/src/modules/ingestion/ingestion.module.ts b/src/modules/ingestion/ingestion.module.ts index 8174c31..83dc198 100644 --- a/src/modules/ingestion/ingestion.module.ts +++ b/src/modules/ingestion/ingestion.module.ts @@ -7,6 +7,7 @@ import { AlertsModule } from '../alerts/alerts.module'; import { EntriesModule } from '../entries/entries.module'; import { FeedsModule } from '../feeds/feeds.module'; import { ObservabilityModule } from '../observability/observability.module'; +import { OpmlImportsModule } from '../opml-imports/opml-imports.module'; import { RulesModule } from '../rules/rules.module'; import { ProcessFeedJobUseCase } from './application/process-feed-job.use-case'; @@ -20,7 +21,7 @@ import { WorkerRunner } from './worker.runner'; import { MetricsService } from '../observability/metrics.service'; @Module({ - imports: [AppConfigModule, FeedsModule, EntriesModule, RulesModule, AlertsModule, ObservabilityModule], + imports: [AppConfigModule, FeedsModule, EntriesModule, RulesModule, AlertsModule, ObservabilityModule, OpmlImportsModule], providers: [ ScheduleDueFeedsUseCase, ProcessFeedJobUseCase, diff --git a/src/modules/ingestion/worker.runner.ts b/src/modules/ingestion/worker.runner.ts index 529eb5d..b96b00f 100644 --- a/src/modules/ingestion/worker.runner.ts +++ b/src/modules/ingestion/worker.runner.ts @@ -3,9 +3,13 @@ import { Job, Worker } from 'bullmq'; import { AlertDeliveryQueue } from '../../infrastructure/queue/alert-delivery.queue'; import { FetchFeedQueue } from '../../infrastructure/queue/fetch-feed.queue'; -import { AlertDeliveryJobData, FetchFeedJobData } from '../../infrastructure/queue/queue.constants'; +import { OpmlApplyImportQueue } from '../../infrastructure/queue/opml-apply-import.queue'; +import { OpmlParsePreviewQueue } from '../../infrastructure/queue/opml-parse-preview.queue'; +import { AlertDeliveryJobData, FetchFeedJobData, OpmlApplyImportJobData, OpmlParsePreviewJobData } from '../../infrastructure/queue/queue.constants'; import { ProcessAlertDeliveryUseCase } from '../alerts/application/process-alert-delivery.use-case'; +import { ProcessOpmlApplyJobUseCase } from '../opml-imports/application/process-opml-apply-job.use-case'; +import { ProcessOpmlParseJobUseCase } from '../opml-imports/application/process-opml-parse-job.use-case'; import { ProcessFeedJobUseCase } from './application/process-feed-job.use-case'; @@ -14,12 +18,18 @@ export class WorkerRunner implements OnApplicationShutdown { private readonly logger = new Logger(WorkerRunner.name); private feedWorker: Worker | null = null; private alertDeliveryWorker: Worker | null = null; + private opmlParseWorker: Worker | null = null; + private opmlApplyWorker: Worker | null = null; constructor( private readonly fetchFeedQueue: FetchFeedQueue, private readonly alertDeliveryQueue: AlertDeliveryQueue, + private readonly opmlParsePreviewQueue: OpmlParsePreviewQueue, + private readonly opmlApplyImportQueue: OpmlApplyImportQueue, private readonly processFeedJobUseCase: ProcessFeedJobUseCase, private readonly processAlertDeliveryUseCase: ProcessAlertDeliveryUseCase, + private readonly processOpmlParseJobUseCase: ProcessOpmlParseJobUseCase, + private readonly processOpmlApplyJobUseCase: ProcessOpmlApplyJobUseCase, ) {} async start(): Promise { @@ -51,6 +61,30 @@ export class WorkerRunner implements OnApplicationShutdown { this.alertDeliveryWorker.on('error', (error) => { this.logger.error(`Alert delivery worker error: ${error.message}`); }); + + this.opmlParseWorker = this.opmlParsePreviewQueue.createWorker(async (job) => { + await this.processOpmlParseJobUseCase.execute(job.data); + }); + + this.opmlParseWorker.on('failed', (job, error) => { + this.logger.error(`OPML parse job ${job?.data.importId ?? 'unknown'} failed: ${error.message}`); + }); + + this.opmlParseWorker.on('error', (error) => { + this.logger.error(`OPML parse worker error: ${error.message}`); + }); + + this.opmlApplyWorker = this.opmlApplyImportQueue.createWorker(async (job) => { + await this.processOpmlApplyJobUseCase.execute(job.data); + }); + + this.opmlApplyWorker.on('failed', (job, error) => { + this.logger.error(`OPML apply job ${job?.data.importId ?? 'unknown'} failed: ${error.message}`); + }); + + this.opmlApplyWorker.on('error', (error) => { + this.logger.error(`OPML apply worker error: ${error.message}`); + }); } async onApplicationShutdown(): Promise { @@ -63,5 +97,15 @@ export class WorkerRunner implements OnApplicationShutdown { await this.alertDeliveryWorker.close(); this.alertDeliveryWorker = null; } + + if (this.opmlParseWorker) { + await this.opmlParseWorker.close(); + this.opmlParseWorker = null; + } + + if (this.opmlApplyWorker) { + await this.opmlApplyWorker.close(); + this.opmlApplyWorker = null; + } } } diff --git a/src/modules/opml-imports/application/confirm-opml-import.use-case.ts b/src/modules/opml-imports/application/confirm-opml-import.use-case.ts new file mode 100644 index 0000000..777448c --- /dev/null +++ b/src/modules/opml-imports/application/confirm-opml-import.use-case.ts @@ -0,0 +1,40 @@ +import { ConflictException, Inject, Injectable } from '@nestjs/common'; + +import { + OPML_APPLY_IMPORT_QUEUE_TOKEN, + OpmlApplyImportQueuePort, +} from '../../../infrastructure/queue/queue.constants'; + +import { OpmlImportsRepository } from '../opml-imports.repository'; + +@Injectable() +export class ConfirmOpmlImportUseCase { + constructor( + private readonly opmlImportsRepository: OpmlImportsRepository, + @Inject(OPML_APPLY_IMPORT_QUEUE_TOKEN) private readonly opmlApplyImportQueue: OpmlApplyImportQueuePort, + ) {} + + async execute(importId: number): Promise<{ id: string; status: 'queued' | 'already_confirmed' }> { + const current = await this.opmlImportsRepository.getImportOrThrow(importId); + + if (current.status === 'importing' || current.status === 'completed') { + return { id: current.id, status: 'already_confirmed' }; + } + + if (current.status !== 'preview_ready') { + throw new ConflictException('opml_import_not_ready_for_confirm'); + } + + const updated = await this.opmlImportsRepository.markImportStatus(importId, { + status: 'importing', + confirmed: true, + }); + + await this.opmlApplyImportQueue.enqueue({ + importId, + requestedAt: new Date().toISOString(), + }); + + return { id: updated.id, status: 'queued' }; + } +} diff --git a/src/modules/opml-imports/application/create-opml-import.use-case.ts b/src/modules/opml-imports/application/create-opml-import.use-case.ts new file mode 100644 index 0000000..f066d1f --- /dev/null +++ b/src/modules/opml-imports/application/create-opml-import.use-case.ts @@ -0,0 +1,67 @@ +import { createHash } from 'node:crypto'; + +import { BadRequestException, Inject, Injectable } from '@nestjs/common'; + +import { + OPML_PARSE_PREVIEW_QUEUE_TOKEN, + OpmlParsePreviewQueuePort, +} from '../../../infrastructure/queue/queue.constants'; +import { AppConfigService } from '../../../shared/config/app-config.service'; + +import { OpmlImportsRepository } from '../opml-imports.repository'; + +@Injectable() +export class CreateOpmlImportUseCase { + private static readonly ALLOWED_MIME_TYPES = new Set([ + 'text/xml', + 'application/xml', + 'text/x-opml', + 'application/octet-stream', + ]); + + constructor( + private readonly opmlImportsRepository: OpmlImportsRepository, + @Inject(OPML_PARSE_PREVIEW_QUEUE_TOKEN) private readonly opmlParsePreviewQueue: OpmlParsePreviewQueuePort, + @Inject(AppConfigService) private readonly appConfigService: AppConfigService, + ) {} + + async execute(input: { fileName: string; mimeType: string; content: Buffer }): Promise<{ id: string; status: string; parseQueued: boolean }> { + this.validateUpload(input); + + const sourceChecksum = createHash('sha256').update(input.content).digest('hex'); + const created = await this.opmlImportsRepository.createImport({ + fileName: input.fileName, + fileSizeBytes: input.content.length, + sourceChecksum, + }); + + await this.opmlParsePreviewQueue.enqueue({ + importId: Number(created.id), + opmlXml: input.content.toString('utf8'), + }); + + return { + id: created.id, + status: created.status, + parseQueued: true, + }; + } + + private validateUpload(input: { fileName: string; mimeType: string; content: Buffer }): void { + if (!input.content || input.content.length === 0) { + throw new BadRequestException('opml_file_required'); + } + + if (input.content.length > this.appConfigService.opmlUploadMaxBytes) { + throw new BadRequestException('opml_file_too_large'); + } + + const lowerFileName = input.fileName.toLowerCase(); + const extensionLooksValid = lowerFileName.endsWith('.opml') || lowerFileName.endsWith('.xml'); + const mimeLooksValid = CreateOpmlImportUseCase.ALLOWED_MIME_TYPES.has(input.mimeType.toLowerCase()); + + if (!extensionLooksValid && !mimeLooksValid) { + throw new BadRequestException('opml_file_invalid_type'); + } + } +} diff --git a/src/modules/opml-imports/application/get-opml-import-status.use-case.ts b/src/modules/opml-imports/application/get-opml-import-status.use-case.ts new file mode 100644 index 0000000..55596a7 --- /dev/null +++ b/src/modules/opml-imports/application/get-opml-import-status.use-case.ts @@ -0,0 +1,39 @@ +import { Injectable } from '@nestjs/common'; + +import { OpmlImportsRepository } from '../opml-imports.repository'; + +@Injectable() +export class GetOpmlImportStatusUseCase { + constructor(private readonly opmlImportsRepository: OpmlImportsRepository) {} + + async execute(importId: number) { + const summary = await this.opmlImportsRepository.getImportOrThrow(importId); + const grouped = await this.opmlImportsRepository.countItemsByStatus(importId); + + return { + ...summary, + progressPercent: this.calculateProgress(summary.status), + failedItems: grouped.failed ?? 0, + }; + } + + private calculateProgress(status: string): number { + switch (status) { + case 'uploaded': + return 10; + case 'parsing': + return 30; + case 'preview_ready': + return 60; + case 'importing': + return 80; + case 'completed': + return 100; + case 'failed_validation': + case 'failed': + return 100; + default: + return 0; + } + } +} diff --git a/src/modules/opml-imports/application/get-opml-preview.use-case.ts b/src/modules/opml-imports/application/get-opml-preview.use-case.ts new file mode 100644 index 0000000..24f51f0 --- /dev/null +++ b/src/modules/opml-imports/application/get-opml-preview.use-case.ts @@ -0,0 +1,19 @@ +import { Injectable } from '@nestjs/common'; + +import { OpmlImportsRepository } from '../opml-imports.repository'; + +@Injectable() +export class GetOpmlPreviewUseCase { + constructor(private readonly opmlImportsRepository: OpmlImportsRepository) {} + + async execute(input: { importId: number; page: number; pageSize: number }) { + const summary = await this.opmlImportsRepository.getImportOrThrow(input.importId); + const preview = await this.opmlImportsRepository.listPreviewItems(input.importId, input.page, input.pageSize); + + return { + import: summary, + items: preview.items, + total: preview.total, + }; + } +} diff --git a/src/modules/opml-imports/application/opml-import-observability.service.ts b/src/modules/opml-imports/application/opml-import-observability.service.ts new file mode 100644 index 0000000..820ed0a --- /dev/null +++ b/src/modules/opml-imports/application/opml-import-observability.service.ts @@ -0,0 +1,58 @@ +import { Injectable } from '@nestjs/common'; +import { Counter, Histogram } from 'prom-client'; + +import { SHARED_METRICS_REGISTRY } from '../../observability/metrics-registry'; + +type OpmlJobStage = 'parse' | 'apply'; +type OpmlJobStatus = 'success' | 'error'; + +@Injectable() +export class OpmlImportObservabilityService { + private readonly jobDurationMs = this.getOrCreateDurationHistogram(); + private readonly jobErrorsTotal = this.getOrCreateErrorCounter(); + + startJobTimer(stage: OpmlJobStage): (status: OpmlJobStatus, errorCode?: string) => number { + const startedAt = process.hrtime.bigint(); + + return (status: OpmlJobStatus, errorCode?: string) => { + const endedAt = process.hrtime.bigint(); + const durationMs = Number(endedAt - startedAt) / 1_000_000; + + this.jobDurationMs.labels(stage, status).observe(durationMs); + if (status === 'error') { + this.jobErrorsTotal.labels(stage, errorCode ?? 'unknown').inc(); + } + + return durationMs; + }; + } + + private getOrCreateDurationHistogram(): Histogram<'stage' | 'status'> { + const existing = SHARED_METRICS_REGISTRY.getSingleMetric('rss_opml_job_duration_ms'); + if (existing) { + return existing as Histogram<'stage' | 'status'>; + } + + return new Histogram({ + name: 'rss_opml_job_duration_ms', + help: 'OPML parse/apply job duration in milliseconds', + labelNames: ['stage', 'status'], + buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000], + registers: [SHARED_METRICS_REGISTRY], + }); + } + + private getOrCreateErrorCounter(): Counter<'stage' | 'error_code'> { + const existing = SHARED_METRICS_REGISTRY.getSingleMetric('rss_opml_job_errors_total'); + if (existing) { + return existing as Counter<'stage' | 'error_code'>; + } + + return new Counter({ + name: 'rss_opml_job_errors_total', + help: 'OPML parse/apply job errors by stage and error code', + labelNames: ['stage', 'error_code'], + registers: [SHARED_METRICS_REGISTRY], + }); + } +} diff --git a/src/modules/opml-imports/application/process-opml-apply-job.use-case.ts b/src/modules/opml-imports/application/process-opml-apply-job.use-case.ts new file mode 100644 index 0000000..f1156e3 --- /dev/null +++ b/src/modules/opml-imports/application/process-opml-apply-job.use-case.ts @@ -0,0 +1,167 @@ +import { Inject, Injectable, Logger } from '@nestjs/common'; + +import { DatabaseService } from '../../../infrastructure/persistence/database.service'; +import { AppConfigService } from '../../../shared/config/app-config.service'; +import { FETCH_FEED_QUEUE_TOKEN, FetchFeedQueuePort, OpmlApplyImportJobData } from '../../../infrastructure/queue/queue.constants'; + +import { assertValidOpmlImportStatusTransition } from '../domain/opml-import-status'; +import { buildNormalizedFeedUrlHash } from '../domain/url-normalizer'; +import { normalizeFeedUrl } from '../domain/url-normalizer'; +import { OpmlImportObservabilityService } from './opml-import-observability.service'; +import { OpmlImportsRepository } from '../opml-imports.repository'; + +interface FeedUpsertRow { + id: number; + url: string; +} + +@Injectable() +export class ProcessOpmlApplyJobUseCase { + private readonly logger = new Logger(ProcessOpmlApplyJobUseCase.name); + + constructor( + private readonly databaseService: DatabaseService, + private readonly opmlImportsRepository: OpmlImportsRepository, + @Inject(FETCH_FEED_QUEUE_TOKEN) private readonly fetchFeedQueue: FetchFeedQueuePort, + @Inject(AppConfigService) private readonly appConfigService: AppConfigService, + private readonly observabilityService: OpmlImportObservabilityService, + ) {} + + async execute(job: OpmlApplyImportJobData): Promise { + const current = await this.opmlImportsRepository.getImportOrThrow(job.importId); + if (current.status === 'completed') { + return; + } + + if (current.status !== 'importing') { + assertValidOpmlImportStatusTransition(current.status, 'importing'); + await this.opmlImportsRepository.markImportStatus(job.importId, { status: 'importing', confirmed: true }); + } + + const stopTimer = this.observabilityService.startJobTimer('apply'); + + const client = await this.databaseService.getPool().connect(); + try { + await client.query('BEGIN'); + + const candidates = await this.opmlImportsRepository.listNewCandidateItems(job.importId, client); + let importedItems = 0; + let failedItems = 0; + const createdFeedIds: number[] = []; + + for (const item of candidates) { + if (!item.normalizedUrl) { + await this.opmlImportsRepository.markItemFailed(Number(item.id), 'missing_normalized_url', client); + failedItems += 1; + continue; + } + + try { + const upsert = await this.upsertFeed(item.normalizedUrl, client); + if (upsert.collision) { + await this.opmlImportsRepository.markItemFailed(Number(item.id), 'normalized_hash_collision_detected', client); + failedItems += 1; + continue; + } + + await this.opmlImportsRepository.markItemImported(Number(item.id), upsert.feedId, client); + if (upsert.created) { + createdFeedIds.push(upsert.feedId); + } + importedItems += 1; + } catch (error) { + await this.opmlImportsRepository.markItemFailed( + Number(item.id), + error instanceof Error ? error.message : 'opml_apply_item_failed', + client, + ); + failedItems += 1; + } + } + + const grouped = await this.opmlImportsRepository.countItemsByStatus(job.importId, client); + const finalStatus = failedItems > 0 ? 'failed' : 'completed'; + const errorMessage = failedItems > 0 ? `partial_import_failure:${failedItems}` : null; + + await this.opmlImportsRepository.markImportStatus( + job.importId, + { + status: finalStatus, + errorMessage, + completed: true, + counters: { + importedItems, + invalidItems: grouped.invalid ?? 0, + duplicateItems: grouped.duplicate ?? 0, + existingItems: grouped.existing ?? 0, + validItems: (grouped.new ?? 0) + (grouped.existing ?? 0) + (grouped.duplicate ?? 0) + (grouped.imported ?? 0), + totalItems: Object.values(grouped).reduce((sum, value) => sum + value, 0), + }, + }, + client, + ); + + await client.query('COMMIT'); + stopTimer('success'); + + for (const feedId of createdFeedIds) { + await this.fetchFeedQueue.enqueue({ + feedId, + queuedAt: new Date().toISOString(), + attempt: 0, + }); + } + } catch (error) { + await client.query('ROLLBACK'); + const message = error instanceof Error ? error.message : 'opml_apply_failed'; + this.logger.error(`OPML apply failed for import ${job.importId}: ${message}`); + await this.opmlImportsRepository.markImportStatus(job.importId, { + status: 'failed', + errorMessage: message, + }); + stopTimer('error', 'apply_failed'); + } finally { + client.release(); + } + } + + private async upsertFeed(normalizedUrl: string, executor: Pick): Promise<{ feedId: number; created: boolean; collision: boolean }> { + const normalizedHash = buildNormalizedFeedUrlHash(normalizedUrl); + + const jitterSeconds = Math.floor(Math.random() * Math.max(1, this.appConfigService.opmlInitialJitterMaxSeconds)); + const insertResult = await executor.query( + ` + INSERT INTO feeds (url, normalized_url_hash, poll_interval_seconds, status, next_check_at) + VALUES ($1, $2, 1800, 'active', NOW() + ($3::text || ' seconds')::interval) + ON CONFLICT (normalized_url_hash) DO NOTHING + RETURNING id, url + `, + [normalizedUrl, normalizedHash, jitterSeconds], + ); + + if (insertResult.rows[0]) { + return { feedId: insertResult.rows[0].id, created: true, collision: false }; + } + + const existingResult = await executor.query( + ` + SELECT id, url + FROM feeds + WHERE normalized_url_hash = $1 + LIMIT 1 + `, + [normalizedHash], + ); + + const existing = existingResult.rows[0]; + if (!existing) { + throw new Error('opml_feed_lookup_after_conflict_failed'); + } + + if (normalizeFeedUrl(existing.url) !== normalizedUrl) { + return { feedId: existing.id, created: false, collision: true }; + } + + return { feedId: existing.id, created: false, collision: false }; + } +} diff --git a/src/modules/opml-imports/application/process-opml-parse-job.use-case.ts b/src/modules/opml-imports/application/process-opml-parse-job.use-case.ts new file mode 100644 index 0000000..952e18b --- /dev/null +++ b/src/modules/opml-imports/application/process-opml-parse-job.use-case.ts @@ -0,0 +1,197 @@ +import { Injectable, Logger } from '@nestjs/common'; + +import { DatabaseService } from '../../../infrastructure/persistence/database.service'; +import { OpmlParsePreviewJobData } from '../../../infrastructure/queue/queue.constants'; +import { AppConfigService } from '../../../shared/config/app-config.service'; + +import { assertValidOpmlImportStatusTransition } from '../domain/opml-import-status'; +import { extractOpmlItems } from '../domain/opml-parser'; +import { buildNormalizedFeedUrlHash, normalizeFeedUrl } from '../domain/url-normalizer'; +import { OpmlImportObservabilityService } from './opml-import-observability.service'; +import { OpmlImportItemInput, OpmlImportsRepository } from '../opml-imports.repository'; + +interface ExistingFeedRow { + id: number; + url: string; + normalized_url_hash: string; +} + +@Injectable() +export class ProcessOpmlParseJobUseCase { + private readonly logger = new Logger(ProcessOpmlParseJobUseCase.name); + private static readonly MAX_OUTLINE_TAGS = 20_000; + + constructor( + private readonly databaseService: DatabaseService, + private readonly opmlImportsRepository: OpmlImportsRepository, + private readonly appConfigService: AppConfigService, + private readonly observabilityService: OpmlImportObservabilityService, + ) {} + + async execute(job: OpmlParsePreviewJobData): Promise { + const current = await this.opmlImportsRepository.getImportOrThrow(job.importId); + + if (current.status === 'preview_ready' || current.status === 'completed' || current.status === 'importing') { + return; + } + + if (current.status !== 'uploaded' && current.status !== 'parsing') { + this.logger.warn(`Skipping parse for import ${job.importId} in status ${current.status}`); + return; + } + + assertValidOpmlImportStatusTransition(current.status, 'parsing'); + const stopTimer = this.observabilityService.startJobTimer('parse'); + + const client = await this.databaseService.getPool().connect(); + try { + await client.query('BEGIN'); + + await this.opmlImportsRepository.markImportStatus(job.importId, { status: 'parsing', errorMessage: null }, client); + + const parsedItems = extractOpmlItems(job.opmlXml, { + maxBytes: this.appConfigService.opmlUploadMaxBytes, + maxOutlineTags: ProcessOpmlParseJobUseCase.MAX_OUTLINE_TAGS, + }); + const seenHashes = new Set(); + const draftItems = parsedItems.map((item) => { + const local = this.classifyLocalItem(item); + if (local.itemStatus !== 'new' || !local.normalizedUrlHash) { + return local; + } + + if (seenHashes.has(local.normalizedUrlHash)) { + return { + ...local, + itemStatus: 'duplicate' as const, + validationError: 'duplicate_within_opml', + }; + } + + seenHashes.add(local.normalizedUrlHash); + return local; + }); + const normalizedCandidates = draftItems.filter( + (item): item is OpmlImportItemInput & { normalizedUrlHash: string; normalizedUrl: string } => Boolean(item.normalizedUrlHash && item.normalizedUrl), + ); + const existingByHash = await this.findExistingFeedsByHash( + normalizedCandidates.map((item) => item.normalizedUrlHash), + normalizedCandidates.map((item) => item.normalizedUrl), + client, + ); + + const items = draftItems.map((item) => { + if (item.itemStatus !== 'new' || !item.normalizedUrlHash || !item.normalizedUrl) { + return item; + } + + const existing = existingByHash.get(item.normalizedUrlHash); + if (!existing) { + return item; + } + + if (normalizeFeedUrl(existing.url) !== item.normalizedUrl) { + return { + ...item, + itemStatus: 'invalid' as const, + validationError: 'normalized_hash_collision_detected', + normalizedUrl: null, + normalizedUrlHash: null, + }; + } + + return { + ...item, + itemStatus: 'existing' as const, + }; + }); + + const counters = { + totalItems: items.length, + validItems: items.filter((item) => item.itemStatus !== 'invalid').length, + duplicateItems: items.filter((item) => item.itemStatus === 'duplicate').length, + existingItems: items.filter((item) => item.itemStatus === 'existing').length, + invalidItems: items.filter((item) => item.itemStatus === 'invalid').length, + }; + + await this.opmlImportsRepository.replaceImportItems(job.importId, items, client); + await this.opmlImportsRepository.markImportStatus( + job.importId, + { + status: 'preview_ready', + errorMessage: null, + counters, + }, + client, + ); + + await client.query('COMMIT'); + stopTimer('success'); + } catch (error) { + await client.query('ROLLBACK'); + const message = error instanceof Error ? error.message : 'unknown_opml_parse_failure'; + + await this.opmlImportsRepository.markImportStatus(job.importId, { + status: 'failed_validation', + errorMessage: message, + }); + this.logger.error(`OPML parse failed for import ${job.importId}: ${message}`); + stopTimer('error', 'parse_failed'); + } finally { + client.release(); + } + } + + private classifyLocalItem(item: { title: string | null; outlinePath: string | null; sourceXmlUrl: string }): OpmlImportItemInput { + try { + const normalizedUrl = normalizeFeedUrl(item.sourceXmlUrl); + const normalizedUrlHash = buildNormalizedFeedUrlHash(normalizedUrl); + + return { + title: item.title, + outlinePath: item.outlinePath, + sourceXmlUrl: item.sourceXmlUrl, + normalizedUrl, + normalizedUrlHash, + itemStatus: 'new', + validationError: null, + }; + } catch (error) { + return { + title: item.title, + outlinePath: item.outlinePath, + sourceXmlUrl: item.sourceXmlUrl, + normalizedUrl: null, + normalizedUrlHash: null, + itemStatus: 'invalid', + validationError: error instanceof Error ? error.message : 'feed_url_invalid', + }; + } + } + + private async findExistingFeedsByHash(hashes: string[], normalizedUrls: string[], executor: Pick): Promise> { + const uniqueHashes = [...new Set(hashes)]; + const uniqueUrls = [...new Set(normalizedUrls)]; + + if (!uniqueHashes.length && !uniqueUrls.length) { + return new Map(); + } + + const result = await executor.query( + ` + SELECT id, url, normalized_url_hash + FROM feeds + WHERE normalized_url_hash = ANY($1::text[]) + OR url = ANY($2::text[]) + `, + [uniqueHashes, uniqueUrls], + ); + + const map = new Map(); + for (const row of result.rows) { + map.set(row.normalized_url_hash, row); + } + + return map; + } +} diff --git a/src/modules/opml-imports/domain/opml-import-status.ts b/src/modules/opml-imports/domain/opml-import-status.ts new file mode 100644 index 0000000..8f7b7fc --- /dev/null +++ b/src/modules/opml-imports/domain/opml-import-status.ts @@ -0,0 +1,39 @@ +export const OPML_IMPORT_STATUSES = [ + 'uploaded', + 'parsing', + 'preview_ready', + 'importing', + 'completed', + 'failed_validation', + 'failed', +] as const; + +export type OpmlImportStatus = (typeof OPML_IMPORT_STATUSES)[number]; + +const VALID_OPML_IMPORT_STATUS_TRANSITIONS: Record = { + uploaded: ['parsing', 'failed_validation'], + parsing: ['preview_ready', 'failed_validation'], + preview_ready: ['importing', 'failed'], + importing: ['completed', 'failed'], + completed: [], + failed_validation: [], + failed: [], +}; + +export function isOpmlImportStatus(value: string): value is OpmlImportStatus { + return (OPML_IMPORT_STATUSES as readonly string[]).includes(value); +} + +export function canTransitionOpmlImportStatus(from: OpmlImportStatus, to: OpmlImportStatus): boolean { + if (from === to) { + return true; + } + + return VALID_OPML_IMPORT_STATUS_TRANSITIONS[from].includes(to); +} + +export function assertValidOpmlImportStatusTransition(from: OpmlImportStatus, to: OpmlImportStatus): void { + if (!canTransitionOpmlImportStatus(from, to)) { + throw new Error(`invalid_opml_import_status_transition:${from}->${to}`); + } +} diff --git a/src/modules/opml-imports/domain/opml-parser.ts b/src/modules/opml-imports/domain/opml-parser.ts new file mode 100644 index 0000000..7683727 --- /dev/null +++ b/src/modules/opml-imports/domain/opml-parser.ts @@ -0,0 +1,110 @@ +export interface ParsedOpmlItem { + title: string | null; + outlinePath: string | null; + sourceXmlUrl: string; +} + +export interface ExtractOpmlItemsOptions { + maxBytes?: number; + maxOutlineTags?: number; +} + +const DEFAULT_MAX_OPML_BYTES = 2 * 1024 * 1024; +const DEFAULT_MAX_OUTLINE_TAGS = 20_000; + +function decodeXmlEntities(input: string): string { + return input + .replace(/"/g, '"') + .replace(/'/g, "'") + .replace(/</g, '<') + .replace(/>/g, '>') + .replace(/&/g, '&'); +} + +function parseAttributes(tag: string): Record { + const attributes: Record = {}; + const raw = tag.replace(/^<\/?\s*outline\b/i, '').replace(/\/?\s*>$/, ''); + const attrRegex = /([a-zA-Z_:][-a-zA-Z0-9_:.]*)\s*=\s*("([^"]*)"|'([^']*)')/g; + + let match: RegExpExecArray | null = attrRegex.exec(raw); + while (match) { + const key = match[1]; + const value = match[3] ?? match[4] ?? ''; + attributes[key] = decodeXmlEntities(value.trim()); + match = attrRegex.exec(raw); + } + + return attributes; +} + +/** + * Extracts OPML outline items in a streaming-friendly way without executing any external entities. + */ +export function extractOpmlItems(opmlXml: string, options: ExtractOpmlItemsOptions = {}): ParsedOpmlItem[] { + const maxBytes = options.maxBytes ?? DEFAULT_MAX_OPML_BYTES; + const maxOutlineTags = options.maxOutlineTags ?? DEFAULT_MAX_OUTLINE_TAGS; + + if (Buffer.byteLength(opmlXml, 'utf8') > maxBytes) { + throw new Error('opml_too_large'); + } + + const xml = opmlXml.trim(); + if (!xml || !xml.toLowerCase().includes(']*>/gi); + if (!tags || tags.length === 0) { + return []; + } + + if (tags.length > maxOutlineTags) { + throw new Error('opml_outline_limit_exceeded'); + } + + const pathStack: string[] = []; + const parsed: ParsedOpmlItem[] = []; + + for (const tag of tags) { + const trimmedTag = tag.trim(); + const isClosing = /^<\/\s*outline/i.test(trimmedTag); + const isSelfClosing = /\/>$/.test(trimmedTag); + + if (isClosing) { + if (!pathStack.length) { + throw new Error('opml_malformed_outline'); + } + + pathStack.pop(); + continue; + } + + const attributes = parseAttributes(trimmedTag); + const title = (attributes.title ?? attributes.text ?? '').trim() || null; + const xmlUrl = (attributes.xmlUrl ?? attributes.xmlurl ?? '').trim(); + + if (xmlUrl) { + const outlinePath = [...pathStack, ...(title ? [title] : [])].join(' / ').trim(); + parsed.push({ + title, + outlinePath: outlinePath || null, + sourceXmlUrl: xmlUrl, + }); + continue; + } + + if (!isSelfClosing && title) { + pathStack.push(title); + } + } + + if (pathStack.length > 0) { + throw new Error('opml_malformed_outline'); + } + + return parsed; +} diff --git a/src/modules/opml-imports/domain/url-normalizer.ts b/src/modules/opml-imports/domain/url-normalizer.ts new file mode 100644 index 0000000..3af9f9b --- /dev/null +++ b/src/modules/opml-imports/domain/url-normalizer.ts @@ -0,0 +1,38 @@ +import { createHash } from 'node:crypto'; + +export function normalizeFeedUrl(input: string): string { + const trimmed = input.trim(); + + if (!trimmed) { + throw new Error('feed_url_empty'); + } + + let parsed: URL; + try { + parsed = new URL(trimmed); + } catch { + throw new Error('feed_url_invalid'); + } + + const protocol = parsed.protocol.toLowerCase(); + if (protocol !== 'http:' && protocol !== 'https:') { + throw new Error('feed_url_protocol_not_supported'); + } + + parsed.protocol = protocol; + parsed.hostname = parsed.hostname.toLowerCase(); + + if ((protocol === 'http:' && parsed.port === '80') || (protocol === 'https:' && parsed.port === '443')) { + parsed.port = ''; + } + + const normalizedPath = parsed.pathname.replace(/\/{2,}/g, '/'); + parsed.pathname = normalizedPath === '/' ? '/' : normalizedPath.replace(/\/+$/g, ''); + parsed.hash = ''; + + return parsed.toString(); +} + +export function buildNormalizedFeedUrlHash(normalizedUrl: string): string { + return createHash('sha256').update(normalizedUrl).digest('hex'); +} diff --git a/src/modules/opml-imports/dto/opml-preview.query.ts b/src/modules/opml-imports/dto/opml-preview.query.ts new file mode 100644 index 0000000..2aa52f6 --- /dev/null +++ b/src/modules/opml-imports/dto/opml-preview.query.ts @@ -0,0 +1,18 @@ +import { Transform } from 'class-transformer'; +import { IsInt, Max, Min } from 'class-validator'; +import { ApiPropertyOptional } from '@nestjs/swagger'; + +export class OpmlPreviewQueryDto { + @ApiPropertyOptional({ type: Number, minimum: 1, default: 1 }) + @Transform(({ value }) => Number(value ?? 1)) + @IsInt() + @Min(1) + page = 1; + + @ApiPropertyOptional({ type: Number, minimum: 1, maximum: 200, default: 50 }) + @Transform(({ value }) => Number(value ?? 50)) + @IsInt() + @Min(1) + @Max(200) + page_size = 50; +} diff --git a/src/modules/opml-imports/http/opml-imports.controller.ts b/src/modules/opml-imports/http/opml-imports.controller.ts new file mode 100644 index 0000000..e5bca82 --- /dev/null +++ b/src/modules/opml-imports/http/opml-imports.controller.ts @@ -0,0 +1,89 @@ +import { + BadRequestException, + Controller, + Get, + HttpCode, + Param, + ParseIntPipe, + Post, + Query, + Req, + UploadedFile, + UseInterceptors, +} from '@nestjs/common'; +import { FileInterceptor } from '@nestjs/platform-express'; +import { ApiConsumes, ApiOperation, ApiParam, ApiTags } from '@nestjs/swagger'; +import { Request } from 'express'; + +import { paginatedResponse, successResponse } from '../../../shared/http/response'; +import { ApiStandardErrorResponses } from '../../../shared/http/swagger'; + +import { ConfirmOpmlImportUseCase } from '../application/confirm-opml-import.use-case'; +import { CreateOpmlImportUseCase } from '../application/create-opml-import.use-case'; +import { GetOpmlImportStatusUseCase } from '../application/get-opml-import-status.use-case'; +import { GetOpmlPreviewUseCase } from '../application/get-opml-preview.use-case'; +import { OpmlPreviewQueryDto } from '../dto/opml-preview.query'; + +@ApiTags('OPML Imports') +@Controller('api/v1/opml/imports') +export class OpmlImportsController { + constructor( + private readonly createOpmlImportUseCase: CreateOpmlImportUseCase, + private readonly getOpmlPreviewUseCase: GetOpmlPreviewUseCase, + private readonly confirmOpmlImportUseCase: ConfirmOpmlImportUseCase, + private readonly getOpmlImportStatusUseCase: GetOpmlImportStatusUseCase, + ) {} + + @Post() + @ApiOperation({ summary: 'Upload OPML file and create asynchronous parse-preview job.' }) + @ApiConsumes('multipart/form-data') + @ApiStandardErrorResponses() + @UseInterceptors(FileInterceptor('file')) + async upload(@Req() request: Request, @UploadedFile() file?: { originalname: string; mimetype: string; buffer: Buffer }) { + if (!file) { + throw new BadRequestException('opml_file_required'); + } + + const created = await this.createOpmlImportUseCase.execute({ + fileName: file.originalname, + mimeType: file.mimetype, + content: file.buffer, + }); + + return successResponse(request, created); + } + + @Get(':id/preview') + @ApiOperation({ summary: 'Get OPML preview with counters and paginated items.' }) + @ApiParam({ name: 'id', type: Number, example: 1 }) + @ApiStandardErrorResponses() + async preview(@Req() request: Request, @Param('id', ParseIntPipe) id: number, @Query() query: OpmlPreviewQueryDto) { + const result = await this.getOpmlPreviewUseCase.execute({ + importId: id, + page: query.page, + pageSize: query.page_size, + }); + + return { + ...paginatedResponse(request, result.items, query.page, query.page_size, result.total), + summary: result.import, + }; + } + + @Post(':id/confirm') + @HttpCode(202) + @ApiOperation({ summary: 'Confirm OPML import (idempotent) and enqueue apply job.' }) + @ApiParam({ name: 'id', type: Number, example: 1 }) + @ApiStandardErrorResponses() + async confirm(@Req() request: Request, @Param('id', ParseIntPipe) id: number) { + return successResponse(request, await this.confirmOpmlImportUseCase.execute(id)); + } + + @Get(':id/status') + @ApiOperation({ summary: 'Get import status and progress with partial-failure visibility.' }) + @ApiParam({ name: 'id', type: Number, example: 1 }) + @ApiStandardErrorResponses() + async status(@Req() request: Request, @Param('id', ParseIntPipe) id: number) { + return successResponse(request, await this.getOpmlImportStatusUseCase.execute(id)); + } +} diff --git a/src/modules/opml-imports/opml-imports.module.ts b/src/modules/opml-imports/opml-imports.module.ts new file mode 100644 index 0000000..11d5592 --- /dev/null +++ b/src/modules/opml-imports/opml-imports.module.ts @@ -0,0 +1,27 @@ +import { Module } from '@nestjs/common'; + +import { OpmlImportsController } from './http/opml-imports.controller'; +import { ConfirmOpmlImportUseCase } from './application/confirm-opml-import.use-case'; +import { CreateOpmlImportUseCase } from './application/create-opml-import.use-case'; +import { GetOpmlImportStatusUseCase } from './application/get-opml-import-status.use-case'; +import { GetOpmlPreviewUseCase } from './application/get-opml-preview.use-case'; +import { OpmlImportObservabilityService } from './application/opml-import-observability.service'; +import { ProcessOpmlApplyJobUseCase } from './application/process-opml-apply-job.use-case'; +import { ProcessOpmlParseJobUseCase } from './application/process-opml-parse-job.use-case'; +import { OpmlImportsRepository } from './opml-imports.repository'; + +@Module({ + controllers: [OpmlImportsController], + providers: [ + OpmlImportsRepository, + CreateOpmlImportUseCase, + GetOpmlPreviewUseCase, + ConfirmOpmlImportUseCase, + GetOpmlImportStatusUseCase, + OpmlImportObservabilityService, + ProcessOpmlParseJobUseCase, + ProcessOpmlApplyJobUseCase, + ], + exports: [OpmlImportsRepository, ProcessOpmlParseJobUseCase, ProcessOpmlApplyJobUseCase], +}) +export class OpmlImportsModule {} diff --git a/src/modules/opml-imports/opml-imports.repository.ts b/src/modules/opml-imports/opml-imports.repository.ts new file mode 100644 index 0000000..0acab05 --- /dev/null +++ b/src/modules/opml-imports/opml-imports.repository.ts @@ -0,0 +1,289 @@ +import { Injectable, NotFoundException } from '@nestjs/common'; + +import { DatabaseService } from '../../infrastructure/persistence/database.service'; + +import { OpmlImportStatus } from './domain/opml-import-status'; + +type QueryExecutor = Pick; + +export interface OpmlImportItemInput { + title: string | null; + outlinePath: string | null; + sourceXmlUrl: string; + normalizedUrl: string | null; + normalizedUrlHash: string | null; + itemStatus: 'new' | 'existing' | 'duplicate' | 'invalid' | 'imported' | 'failed'; + validationError: string | null; +} + +export interface OpmlImportSummary { + id: string; + status: OpmlImportStatus; + fileName: string; + fileSizeBytes: number; + sourceChecksum: string | null; + errorMessage: string | null; + totalItems: number; + validItems: number; + duplicateItems: number; + existingItems: number; + invalidItems: number; + importedItems: number; + uploadedAt: string; + confirmedAt: string | null; + completedAt: string | null; + createdAt: string; + updatedAt: string; +} + +export interface OpmlImportPreviewItem { + id: string; + title: string | null; + outlinePath: string | null; + sourceXmlUrl: string | null; + normalizedUrl: string | null; + itemStatus: 'new' | 'existing' | 'duplicate' | 'invalid' | 'imported' | 'failed'; + validationError: string | null; + feedId: number | null; +} + +interface OpmlImportRow { + id: string; + status: OpmlImportStatus; + file_name: string; + file_size_bytes: string; + source_checksum: string | null; + error_message: string | null; + total_items: number; + valid_items: number; + duplicate_items: number; + existing_items: number; + invalid_items: number; + imported_items: number; + uploaded_at: Date; + confirmed_at: Date | null; + completed_at: Date | null; + created_at: Date; + updated_at: Date; +} + +interface OpmlImportItemRow { + id: string; + title: string | null; + outline_path: string | null; + source_xml_url: string | null; + normalized_url: string | null; + item_status: 'new' | 'existing' | 'duplicate' | 'invalid' | 'imported' | 'failed'; + validation_error: string | null; + feed_id: number | null; +} + +function mapImport(row: OpmlImportRow): OpmlImportSummary { + return { + id: row.id, + status: row.status, + fileName: row.file_name, + fileSizeBytes: Number(row.file_size_bytes), + sourceChecksum: row.source_checksum, + errorMessage: row.error_message, + totalItems: row.total_items, + validItems: row.valid_items, + duplicateItems: row.duplicate_items, + existingItems: row.existing_items, + invalidItems: row.invalid_items, + importedItems: row.imported_items, + uploadedAt: row.uploaded_at.toISOString(), + confirmedAt: row.confirmed_at?.toISOString() ?? null, + completedAt: row.completed_at?.toISOString() ?? null, + createdAt: row.created_at.toISOString(), + updatedAt: row.updated_at.toISOString(), + }; +} + +function mapPreviewItem(row: OpmlImportItemRow): OpmlImportPreviewItem { + return { + id: row.id, + title: row.title, + outlinePath: row.outline_path, + sourceXmlUrl: row.source_xml_url, + normalizedUrl: row.normalized_url, + itemStatus: row.item_status, + validationError: row.validation_error, + feedId: row.feed_id, + }; +} + +@Injectable() +export class OpmlImportsRepository { + constructor(private readonly databaseService: DatabaseService) {} + + async createImport(input: { fileName: string; fileSizeBytes: number; sourceChecksum: string }): Promise { + const result = await this.databaseService.query( + ` + INSERT INTO opml_imports (status, file_name, file_size_bytes, source_checksum) + VALUES ('uploaded', $1, $2, $3) + RETURNING * + `, + [input.fileName, input.fileSizeBytes, input.sourceChecksum], + ); + + return mapImport(result.rows[0]); + } + + async findImportById(importId: number, executor: QueryExecutor = this.databaseService): Promise { + const result = await executor.query('SELECT * FROM opml_imports WHERE id = $1', [importId]); + return result.rows[0] ? mapImport(result.rows[0]) : null; + } + + async getImportOrThrow(importId: number, executor: QueryExecutor = this.databaseService): Promise { + const found = await this.findImportById(importId, executor); + if (!found) { + throw new NotFoundException('opml_import_not_found'); + } + + return found; + } + + async markImportStatus( + importId: number, + input: { + status: OpmlImportStatus; + errorMessage?: string | null; + confirmed?: boolean; + completed?: boolean; + counters?: Partial>; + }, + executor: QueryExecutor = this.databaseService, + ): Promise { + const result = await executor.query( + ` + UPDATE opml_imports + SET status = $2, + error_message = COALESCE($3, error_message), + total_items = COALESCE($4, total_items), + valid_items = COALESCE($5, valid_items), + duplicate_items = COALESCE($6, duplicate_items), + existing_items = COALESCE($7, existing_items), + invalid_items = COALESCE($8, invalid_items), + imported_items = COALESCE($9, imported_items), + confirmed_at = CASE WHEN $10::boolean THEN COALESCE(confirmed_at, NOW()) ELSE confirmed_at END, + completed_at = CASE WHEN $11::boolean THEN NOW() ELSE completed_at END, + updated_at = NOW() + WHERE id = $1 + RETURNING * + `, + [ + importId, + input.status, + input.errorMessage ?? null, + input.counters?.totalItems, + input.counters?.validItems, + input.counters?.duplicateItems, + input.counters?.existingItems, + input.counters?.invalidItems, + input.counters?.importedItems, + input.confirmed ?? false, + input.completed ?? false, + ], + ); + + if (!result.rows[0]) { + throw new NotFoundException('opml_import_not_found'); + } + + return mapImport(result.rows[0]); + } + + async replaceImportItems(importId: number, items: OpmlImportItemInput[], executor: QueryExecutor = this.databaseService): Promise { + await executor.query('DELETE FROM opml_import_items WHERE import_id = $1', [importId]); + + for (const item of items) { + await executor.query( + ` + INSERT INTO opml_import_items (import_id, title, outline_path, source_xml_url, normalized_url, normalized_url_hash, item_status, validation_error) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + `, + [importId, item.title, item.outlinePath, item.sourceXmlUrl, item.normalizedUrl, item.normalizedUrlHash, item.itemStatus, item.validationError], + ); + } + } + + async listPreviewItems(importId: number, page: number, pageSize: number): Promise<{ items: OpmlImportPreviewItem[]; total: number }> { + const offset = (page - 1) * pageSize; + const [itemsResult, totalResult] = await Promise.all([ + this.databaseService.query( + ` + SELECT id, title, outline_path, source_xml_url, normalized_url, item_status, validation_error, feed_id + FROM opml_import_items + WHERE import_id = $1 + ORDER BY id ASC + LIMIT $2 OFFSET $3 + `, + [importId, pageSize, offset], + ), + this.databaseService.query<{ count: string }>('SELECT COUNT(*)::text AS count FROM opml_import_items WHERE import_id = $1', [importId]), + ]); + + return { + items: itemsResult.rows.map(mapPreviewItem), + total: Number(totalResult.rows[0]?.count ?? '0'), + }; + } + + async listNewCandidateItems(importId: number, executor: QueryExecutor = this.databaseService): Promise { + const result = await executor.query( + ` + SELECT id, title, outline_path, source_xml_url, normalized_url, item_status, validation_error, feed_id + FROM opml_import_items + WHERE import_id = $1 AND item_status = 'new' + ORDER BY id ASC + `, + [importId], + ); + + return result.rows.map(mapPreviewItem); + } + + async markItemImported(itemId: number, feedId: number, executor: QueryExecutor = this.databaseService): Promise { + await executor.query( + ` + UPDATE opml_import_items + SET item_status = 'imported', + feed_id = $2, + updated_at = NOW() + WHERE id = $1 + `, + [itemId, feedId], + ); + } + + async markItemFailed(itemId: number, validationError: string, executor: QueryExecutor = this.databaseService): Promise { + await executor.query( + ` + UPDATE opml_import_items + SET item_status = 'failed', + validation_error = $2, + updated_at = NOW() + WHERE id = $1 + `, + [itemId, validationError], + ); + } + + async countItemsByStatus(importId: number, executor: QueryExecutor = this.databaseService): Promise> { + const result = await executor.query<{ item_status: string; total: string }>( + ` + SELECT item_status, COUNT(*)::text AS total + FROM opml_import_items + WHERE import_id = $1 + GROUP BY item_status + `, + [importId], + ); + + return result.rows.reduce>((acc, row) => { + acc[row.item_status] = Number(row.total); + return acc; + }, {}); + } +} diff --git a/src/modules/rules/rules.repository.ts b/src/modules/rules/rules.repository.ts index e868eb2..56c8737 100644 --- a/src/modules/rules/rules.repository.ts +++ b/src/modules/rules/rules.repository.ts @@ -79,6 +79,31 @@ export class RulesRepository { return result.rows.map(mapRule); } + async findByName(name: string): Promise { + const result = await this.databaseService.query('SELECT * FROM rules WHERE name = $1 LIMIT 1', [name]); + return result.rows[0] ? mapRule(result.rows[0]) : null; + } + + async upsertByName(input: { name: string; includeKeywords: string[]; excludeKeywords: string[]; isActive: boolean }): Promise { + const existing = await this.findByName(input.name); + if (!existing) { + return this.create(input); + } + + const updated = await this.update({ + id: existing.id, + includeKeywords: input.includeKeywords, + excludeKeywords: input.excludeKeywords, + isActive: input.isActive, + }); + + if (!updated) { + throw new Error('rule_upsert_failed'); + } + + return updated; + } + async findById(id: number): Promise { const result = await this.databaseService.query('SELECT * FROM rules WHERE id = $1', [id]); return result.rows[0] ? mapRule(result.rows[0]) : null; diff --git a/src/shared/config/app-config.service.ts b/src/shared/config/app-config.service.ts index ea181cb..1c4f580 100644 --- a/src/shared/config/app-config.service.ts +++ b/src/shared/config/app-config.service.ts @@ -66,4 +66,36 @@ export class AppConfigService { get workerMetricsPort(): number { return this.configService.get('workerMetricsPort', { infer: true }); } + + get opmlUploadMaxBytes(): number { + return this.configService.get('opmlUploadMaxBytes', { infer: true }); + } + + get opmlInitialJitterMaxSeconds(): number { + return this.configService.get('opmlInitialJitterMaxSeconds', { infer: true }); + } + + get agentInterfaceV1(): boolean { + return this.configService.get('agentInterfaceV1', { infer: true }); + } + + get tuiEnabled(): boolean { + return this.configService.get('tuiEnabled', { infer: true }); + } + + get enableAuth(): boolean { + return this.configService.get('enableAuth', { infer: true }); + } + + get authProvider(): string { + return this.configService.get('authProvider', { infer: true }); + } + + get clerkSecretKey(): string | undefined { + return this.configService.get('clerkSecretKey', { infer: true }); + } + + get clerkApiUrl(): string { + return this.configService.get('clerkApiUrl', { infer: true }); + } } diff --git a/src/shared/config/configuration.ts b/src/shared/config/configuration.ts index b058043..4dc2ab3 100644 --- a/src/shared/config/configuration.ts +++ b/src/shared/config/configuration.ts @@ -16,6 +16,14 @@ export interface AppConfiguration { rateLimitMaxBackoffMs: number; rateLimitBaseBackoffMs: number; workerMetricsPort: number; + opmlUploadMaxBytes: number; + opmlInitialJitterMaxSeconds: number; + agentInterfaceV1: boolean; + tuiEnabled: boolean; + enableAuth: boolean; + authProvider: string; + clerkSecretKey?: string; + clerkApiUrl: string; } export const configuration = (env: Env): AppConfiguration => ({ @@ -34,4 +42,12 @@ export const configuration = (env: Env): AppConfiguration => ({ rateLimitMaxBackoffMs: env.RATE_LIMIT_MAX_BACKOFF_MS, rateLimitBaseBackoffMs: env.RATE_LIMIT_BASE_BACKOFF_MS, workerMetricsPort: env.WORKER_METRICS_PORT, + opmlUploadMaxBytes: env.OPML_UPLOAD_MAX_BYTES, + opmlInitialJitterMaxSeconds: env.OPML_INITIAL_JITTER_MAX_SECONDS, + agentInterfaceV1: env.AGENT_INTERFACE_V1, + tuiEnabled: env.TUI_ENABLED, + enableAuth: env.ENABLE_AUTH, + authProvider: env.AUTH_PROVIDER, + clerkSecretKey: env.CLERK_SECRET_KEY, + clerkApiUrl: env.CLERK_API_URL, }); diff --git a/src/shared/config/env.schema.ts b/src/shared/config/env.schema.ts index 27ff8ff..1dbb789 100644 --- a/src/shared/config/env.schema.ts +++ b/src/shared/config/env.schema.ts @@ -1,5 +1,8 @@ import { z } from 'zod'; +const TRUE_VALUES = new Set(['1', 'true', 'yes', 'on']); +const FALSE_VALUES = new Set(['0', 'false', 'no', 'off']); + const optionalUrl = z.preprocess( (value) => { if (typeof value === 'string' && value.trim() === '') { @@ -11,6 +14,36 @@ const optionalUrl = z.preprocess( z.url().optional(), ); +const optionalString = z.preprocess( + (value) => { + if (typeof value === 'string' && value.trim() === '') { + return undefined; + } + + return value; + }, + z.string().trim().min(1).optional(), +); + +const featureFlag = z.preprocess((value) => { + if (typeof value === 'boolean') { + return value; + } + + if (typeof value === 'string') { + const normalized = value.trim().toLowerCase(); + if (TRUE_VALUES.has(normalized)) { + return true; + } + + if (FALSE_VALUES.has(normalized)) { + return false; + } + } + + return value; +}, z.boolean()); + export const envSchema = z.object({ NODE_ENV: z.enum(['development', 'test', 'production']).default('development'), PORT: z.coerce.number().int().positive().default(3000), @@ -31,6 +64,22 @@ export const envSchema = z.object({ RATE_LIMIT_BASE_BACKOFF_MS: z.coerce.number().int().positive().default(1000), /** Port on which the worker exposes its /metrics endpoint (aggregated by the API). Default: 3001 */ WORKER_METRICS_PORT: z.coerce.number().int().positive().default(3001), + /** Maximum OPML upload size in bytes. Default: 2097152 (2 MiB). */ + OPML_UPLOAD_MAX_BYTES: z.coerce.number().int().positive().default(2 * 1024 * 1024), + /** Max jitter (seconds) for initial feed scheduling after OPML import. Default: 120 */ + OPML_INITIAL_JITTER_MAX_SECONDS: z.coerce.number().int().nonnegative().default(120), + /** Rollout guard for CLI/TUI agent interface v1 adapters. */ + AGENT_INTERFACE_V1: featureFlag.default(true), + /** Rollout guard for interactive TUI adapter surface. */ + TUI_ENABLED: featureFlag.default(true), + /** Enables API key authentication for HTTP API endpoints. */ + ENABLE_AUTH: featureFlag.default(false), + /** Auth provider selector for API authentication. */ + AUTH_PROVIDER: z.string().trim().min(1).default('clerk_api_key'), + /** Secret key used to call Clerk API key verification endpoint. */ + CLERK_SECRET_KEY: optionalString, + /** Base URL for Clerk API calls. */ + CLERK_API_URL: z.url().default('https://api.clerk.com'), }); export type Env = z.infer; diff --git a/test/opml-api.phase3.integration-spec.ts b/test/opml-api.phase3.integration-spec.ts new file mode 100644 index 0000000..28f5195 --- /dev/null +++ b/test/opml-api.phase3.integration-spec.ts @@ -0,0 +1,350 @@ +process.env.NODE_ENV = 'test'; +process.env.PORT = '3002'; +process.env.DATABASE_URL = 'postgres://postgres:postgres@localhost:5432/rss_monitor_test'; +process.env.REDIS_URL = 'redis://localhost:6379'; +process.env.LOG_LEVEL = 'error'; +process.env.OPML_UPLOAD_MAX_BYTES = '64'; + +import { INestApplication } from '@nestjs/common'; +import { Test } from '@nestjs/testing'; +import { newDb } from 'pg-mem'; +import request from 'supertest'; + +import { AppModule } from '../src/app.module'; +import { DATABASE_POOL } from '../src/infrastructure/persistence/database.constants'; +import { AlertDeliveryQueue } from '../src/infrastructure/queue/alert-delivery.queue'; +import { FetchFeedQueue } from '../src/infrastructure/queue/fetch-feed.queue'; +import { OpmlApplyImportQueue } from '../src/infrastructure/queue/opml-apply-import.queue'; +import { OpmlParsePreviewQueue } from '../src/infrastructure/queue/opml-parse-preview.queue'; +import { + ALERT_DELIVERY_QUEUE_TOKEN, + AlertDeliveryJobData, + FETCH_FEED_QUEUE_TOKEN, + FetchFeedJobData, + OPML_APPLY_IMPORT_QUEUE_TOKEN, + OPML_PARSE_PREVIEW_QUEUE_TOKEN, + OpmlApplyImportJobData, + OpmlParsePreviewJobData, + REDIS_CONNECTION, +} from '../src/infrastructure/queue/queue.constants'; +import { configureApiApplication } from '../src/main/create-api-app'; +import { OpmlImportsRepository } from '../src/modules/opml-imports/opml-imports.repository'; + +class FakeRedis { + async ping(): Promise { + return 'PONG'; + } + + async quit(): Promise { + return undefined; + } +} + +class FakeFetchQueue { + readonly jobs: FetchFeedJobData[] = []; + + async enqueue(job: FetchFeedJobData): Promise { + this.jobs.push(job); + } + + createWorker() { + throw new Error('not_used_in_phase3_api_tests'); + } +} + +class FakeAlertDeliveryQueue { + readonly jobs: AlertDeliveryJobData[] = []; + + async enqueue(job: AlertDeliveryJobData): Promise { + this.jobs.push(job); + } + + createWorker() { + throw new Error('not_used_in_phase3_api_tests'); + } +} + +class FakeOpmlParseQueue { + readonly jobs: OpmlParsePreviewJobData[] = []; + + async enqueue(job: OpmlParsePreviewJobData): Promise { + this.jobs.push(job); + } + + createWorker() { + throw new Error('not_used_in_phase3_api_tests'); + } +} + +class FakeOpmlApplyQueue { + readonly jobs: OpmlApplyImportJobData[] = []; + + async enqueue(job: OpmlApplyImportJobData): Promise { + this.jobs.push(job); + } + + createWorker() { + throw new Error('not_used_in_phase3_api_tests'); + } +} + +async function bootstrapSchema(pool: { query: (sql: string) => Promise }): Promise { + const schema = [ + `CREATE TABLE feeds ( + id SERIAL PRIMARY KEY, + url TEXT NOT NULL UNIQUE, + normalized_url_hash TEXT + )`, + `CREATE TABLE opml_imports ( + id BIGSERIAL PRIMARY KEY, + status TEXT NOT NULL CHECK (status IN ('uploaded', 'parsing', 'preview_ready', 'importing', 'completed', 'failed_validation', 'failed')), + file_name TEXT NOT NULL, + file_size_bytes BIGINT NOT NULL CHECK (file_size_bytes >= 0), + source_checksum TEXT, + error_message TEXT, + total_items INT NOT NULL DEFAULT 0 CHECK (total_items >= 0), + valid_items INT NOT NULL DEFAULT 0 CHECK (valid_items >= 0), + duplicate_items INT NOT NULL DEFAULT 0 CHECK (duplicate_items >= 0), + existing_items INT NOT NULL DEFAULT 0 CHECK (existing_items >= 0), + invalid_items INT NOT NULL DEFAULT 0 CHECK (invalid_items >= 0), + imported_items INT NOT NULL DEFAULT 0 CHECK (imported_items >= 0), + uploaded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + confirmed_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + )`, + `CREATE TABLE opml_import_items ( + id BIGSERIAL PRIMARY KEY, + import_id BIGINT NOT NULL REFERENCES opml_imports(id) ON DELETE CASCADE, + title TEXT, + outline_path TEXT, + source_xml_url TEXT, + normalized_url TEXT, + normalized_url_hash TEXT, + feed_id INT REFERENCES feeds(id) ON DELETE SET NULL, + item_status TEXT NOT NULL CHECK (item_status IN ('new', 'existing', 'duplicate', 'invalid', 'imported', 'failed')), + validation_error TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + CONSTRAINT opml_import_items_normalized_url_required + CHECK (item_status = 'invalid' OR (normalized_url IS NOT NULL AND normalized_url_hash IS NOT NULL)) + )`, + `CREATE UNIQUE INDEX idx_opml_import_items_dedupe_per_import + ON opml_import_items (import_id, normalized_url_hash) + WHERE normalized_url_hash IS NOT NULL`, + ]; + + for (const statement of schema) { + await pool.query(statement); + } +} + +describe('OPML API (fase 3 backend, sin workers)', () => { + let app: INestApplication; + let opmlImportsRepository: OpmlImportsRepository; + let fakeParseQueue: FakeOpmlParseQueue; + let fakeApplyQueue: FakeOpmlApplyQueue; + + beforeAll(async () => { + const db = newDb({ autoCreateForeignKeyIndices: true }); + const adapter = db.adapters.createPg(); + const pool = new adapter.Pool(); + await bootstrapSchema(pool); + + const fakeFetchQueue = new FakeFetchQueue(); + const fakeAlertQueue = new FakeAlertDeliveryQueue(); + fakeParseQueue = new FakeOpmlParseQueue(); + fakeApplyQueue = new FakeOpmlApplyQueue(); + + const moduleRef = await Test.createTestingModule({ + imports: [AppModule], + }) + .overrideProvider(DATABASE_POOL) + .useValue(pool) + .overrideProvider(REDIS_CONNECTION) + .useValue(new FakeRedis()) + .overrideProvider(FETCH_FEED_QUEUE_TOKEN) + .useValue(fakeFetchQueue) + .overrideProvider(FetchFeedQueue) + .useValue(fakeFetchQueue) + .overrideProvider(ALERT_DELIVERY_QUEUE_TOKEN) + .useValue(fakeAlertQueue) + .overrideProvider(AlertDeliveryQueue) + .useValue(fakeAlertQueue) + .overrideProvider(OPML_PARSE_PREVIEW_QUEUE_TOKEN) + .useValue(fakeParseQueue) + .overrideProvider(OpmlParsePreviewQueue) + .useValue(fakeParseQueue) + .overrideProvider(OPML_APPLY_IMPORT_QUEUE_TOKEN) + .useValue(fakeApplyQueue) + .overrideProvider(OpmlApplyImportQueue) + .useValue(fakeApplyQueue) + .compile(); + + app = moduleRef.createNestApplication(); + configureApiApplication(app); + await app.init(); + + opmlImportsRepository = moduleRef.get(OpmlImportsRepository); + }); + + afterAll(async () => { + if (app) { + await app.close(); + } + }); + + beforeEach(() => { + fakeParseQueue.jobs.length = 0; + fakeApplyQueue.jobs.length = 0; + }); + + it('sube OPML válido y crea import con parse job encolado (stub)', async () => { + const uploadResponse = await request(app.getHttpServer()) + .post('/api/v1/opml/imports') + .attach('file', Buffer.from('', 'utf8'), { + filename: 'feeds.opml', + contentType: 'text/x-opml', + }) + .expect(201); + + expect(uploadResponse.body.data.status).toBe('uploaded'); + expect(uploadResponse.body.data.parseQueued).toBe(true); + expect(fakeParseQueue.jobs).toHaveLength(1); + }); + + it('rechaza upload inválido por tipo y por tamaño', async () => { + await request(app.getHttpServer()) + .post('/api/v1/opml/imports') + .attach('file', Buffer.from('', 'utf8'), { + filename: 'feeds.txt', + contentType: 'text/plain', + }) + .expect(400); + + await request(app.getHttpServer()) + .post('/api/v1/opml/imports') + .attach('file', Buffer.from('x'.repeat(80), 'utf8'), { + filename: 'feeds.opml', + contentType: 'text/x-opml', + }) + .expect(400); + }); + + it('expone preview paginado por import_id con conteos', async () => { + const uploadResponse = await request(app.getHttpServer()) + .post('/api/v1/opml/imports') + .attach('file', Buffer.from('', 'utf8'), { + filename: 'preview.opml', + contentType: 'text/x-opml', + }) + .expect(201); + + const importId = Number(uploadResponse.body.data.id); + + await opmlImportsRepository.replaceImportItems(importId, [ + { + title: 'Feed A', + outlinePath: 'Folder / Feed A', + sourceXmlUrl: 'https://example.com/a.xml', + normalizedUrl: 'https://example.com/a.xml', + normalizedUrlHash: 'hash-a', + itemStatus: 'new', + validationError: null, + }, + { + title: 'Feed B', + outlinePath: 'Folder / Feed B', + sourceXmlUrl: 'https://example.com/b.xml', + normalizedUrl: 'https://example.com/b.xml', + normalizedUrlHash: 'hash-b', + itemStatus: 'existing', + validationError: null, + }, + { + title: 'Invalid feed', + outlinePath: 'Folder / Invalid', + sourceXmlUrl: 'ftp://example.com/nope.xml', + normalizedUrl: null, + normalizedUrlHash: null, + itemStatus: 'invalid', + validationError: 'feed_url_invalid_scheme', + }, + ]); + + await opmlImportsRepository.markImportStatus(importId, { + status: 'preview_ready', + counters: { + totalItems: 3, + validItems: 2, + existingItems: 1, + invalidItems: 1, + }, + }); + + const previewResponse = await request(app.getHttpServer()) + .get(`/api/v1/opml/imports/${importId}/preview?page=1&page_size=2`) + .expect(200); + + expect(previewResponse.body.summary.status).toBe('preview_ready'); + expect(previewResponse.body.summary.totalItems).toBe(3); + expect(previewResponse.body.summary.invalidItems).toBe(1); + expect(previewResponse.body.meta.total).toBe(3); + expect(previewResponse.body.data).toHaveLength(2); + }); + + it('confirma import de forma idempotente y reporta estado/progreso', async () => { + const uploadResponse = await request(app.getHttpServer()) + .post('/api/v1/opml/imports') + .attach('file', Buffer.from('', 'utf8'), { + filename: 'confirm.opml', + contentType: 'text/x-opml', + }) + .expect(201); + + const importId = Number(uploadResponse.body.data.id); + + await opmlImportsRepository.markImportStatus(importId, { + status: 'preview_ready', + counters: { + totalItems: 2, + validItems: 2, + }, + }); + + await opmlImportsRepository.replaceImportItems(importId, [ + { + title: 'Imported item', + outlinePath: null, + sourceXmlUrl: 'https://example.com/imported.xml', + normalizedUrl: 'https://example.com/imported.xml', + normalizedUrlHash: 'hash-imported', + itemStatus: 'imported', + validationError: null, + }, + { + title: 'Failed item', + outlinePath: null, + sourceXmlUrl: 'https://example.com/failed.xml', + normalizedUrl: 'https://example.com/failed.xml', + normalizedUrlHash: 'hash-failed', + itemStatus: 'failed', + validationError: 'worker_not_running_phase3_stub', + }, + ]); + + const firstConfirm = await request(app.getHttpServer()).post(`/api/v1/opml/imports/${importId}/confirm`).expect(202); + expect(firstConfirm.body.data.status).toBe('queued'); + expect(fakeApplyQueue.jobs).toHaveLength(1); + + const secondConfirm = await request(app.getHttpServer()).post(`/api/v1/opml/imports/${importId}/confirm`).expect(202); + expect(secondConfirm.body.data.status).toBe('already_confirmed'); + expect(fakeApplyQueue.jobs).toHaveLength(1); + + const statusResponse = await request(app.getHttpServer()).get(`/api/v1/opml/imports/${importId}/status`).expect(200); + expect(statusResponse.body.data.status).toBe('importing'); + expect(statusResponse.body.data.progressPercent).toBe(80); + expect(statusResponse.body.data.failedItems).toBe(1); + }); +}); diff --git a/test/opml-import-status.spec.ts b/test/opml-import-status.spec.ts new file mode 100644 index 0000000..de88b25 --- /dev/null +++ b/test/opml-import-status.spec.ts @@ -0,0 +1,33 @@ +import { + assertValidOpmlImportStatusTransition, + canTransitionOpmlImportStatus, + isOpmlImportStatus, +} from '../src/modules/opml-imports/domain/opml-import-status'; + +describe('opml import status transitions', () => { + it('accepts known statuses', () => { + expect(isOpmlImportStatus('uploaded')).toBe(true); + expect(isOpmlImportStatus('preview_ready')).toBe(true); + expect(isOpmlImportStatus('unknown')).toBe(false); + }); + + it('allows valid forward transitions', () => { + expect(canTransitionOpmlImportStatus('uploaded', 'parsing')).toBe(true); + expect(canTransitionOpmlImportStatus('parsing', 'preview_ready')).toBe(true); + expect(canTransitionOpmlImportStatus('preview_ready', 'importing')).toBe(true); + expect(canTransitionOpmlImportStatus('importing', 'completed')).toBe(true); + }); + + it('allows terminal states to stay unchanged for idempotent updates', () => { + expect(canTransitionOpmlImportStatus('completed', 'completed')).toBe(true); + expect(canTransitionOpmlImportStatus('failed', 'failed')).toBe(true); + }); + + it('rejects invalid transitions', () => { + expect(canTransitionOpmlImportStatus('uploaded', 'completed')).toBe(false); + expect(canTransitionOpmlImportStatus('failed_validation', 'parsing')).toBe(false); + expect(() => assertValidOpmlImportStatusTransition('preview_ready', 'completed')).toThrow( + 'invalid_opml_import_status_transition:preview_ready->completed', + ); + }); +}); diff --git a/test/opml-observability.spec.ts b/test/opml-observability.spec.ts new file mode 100644 index 0000000..9cfca7b --- /dev/null +++ b/test/opml-observability.spec.ts @@ -0,0 +1,35 @@ +import { OpmlImportObservabilityService } from '../src/modules/opml-imports/application/opml-import-observability.service'; + +describe('OPML observability metrics', () => { + it('registra métricas de duración para parse/apply', async () => { + const observability = new OpmlImportObservabilityService(); + + const stopParse = observability.startJobTimer('parse'); + const stopApply = observability.startJobTimer('apply'); + + stopParse('success'); + stopApply('success'); + + const histogram = (observability as any).jobDurationMs; + const histogramSnapshot = await histogram.get(); + expect(histogramSnapshot.values).toEqual( + expect.arrayContaining([ + expect.objectContaining({ labels: { stage: 'parse', status: 'success' } }), + expect.objectContaining({ labels: { stage: 'apply', status: 'success' } }), + ]), + ); + }); + + it('registra contador de errores OPML por etapa', async () => { + const observability = new OpmlImportObservabilityService(); + const stopTimer = observability.startJobTimer('parse'); + + stopTimer('error', 'parse_failed'); + + const counter = (observability as any).jobErrorsTotal; + const counterSnapshot = await counter.get(); + expect(counterSnapshot.values).toEqual( + expect.arrayContaining([expect.objectContaining({ labels: { stage: 'parse', error_code: 'parse_failed' } })]), + ); + }); +}); diff --git a/test/opml-parser.spec.ts b/test/opml-parser.spec.ts new file mode 100644 index 0000000..ce8c1d7 --- /dev/null +++ b/test/opml-parser.spec.ts @@ -0,0 +1,44 @@ +import { extractOpmlItems } from '../src/modules/opml-imports/domain/opml-parser'; + +describe('extractOpmlItems', () => { + it('extracts xmlUrl entries and keeps folder path context', () => { + const xml = ` + + + + + + + + `; + + const parsed = extractOpmlItems(xml); + expect(parsed).toHaveLength(2); + expect(parsed[0]).toEqual({ + title: 'AI Feed', + outlinePath: 'Tech / AI Feed', + sourceXmlUrl: 'https://example.com/ai.xml', + }); + expect(parsed[1].outlinePath).toBe('News'); + }); + + it('fails for malformed outline nesting', () => { + const malformed = ``; + expect(() => extractOpmlItems(malformed)).toThrow('opml_malformed_outline'); + }); + + it('rejects XML with doctype/entity declarations (XXE hardening)', () => { + const xmlWithDoctype = ` + + ]> + `; + + expect(() => extractOpmlItems(xmlWithDoctype)).toThrow('opml_doctype_not_allowed'); + }); + + it('enforces configurable outline tag limits', () => { + const xml = ``; + expect(() => extractOpmlItems(xml, { maxOutlineTags: 1 })).toThrow('opml_outline_limit_exceeded'); + }); +}); diff --git a/test/opml-workers.phase4.integration-spec.ts b/test/opml-workers.phase4.integration-spec.ts new file mode 100644 index 0000000..8d2d309 --- /dev/null +++ b/test/opml-workers.phase4.integration-spec.ts @@ -0,0 +1,315 @@ +process.env.NODE_ENV = 'test'; +process.env.PORT = '3002'; +process.env.DATABASE_URL = 'postgres://postgres:postgres@localhost:5432/rss_monitor_test'; +process.env.REDIS_URL = 'redis://localhost:6379'; +process.env.LOG_LEVEL = 'error'; +process.env.OPML_UPLOAD_MAX_BYTES = '2097152'; +process.env.OPML_INITIAL_JITTER_MAX_SECONDS = '1'; + +import { INestApplication } from '@nestjs/common'; +import { Test } from '@nestjs/testing'; +import { newDb } from 'pg-mem'; + +import { AppModule } from '../src/app.module'; +import { DATABASE_POOL } from '../src/infrastructure/persistence/database.constants'; +import { AlertDeliveryQueue } from '../src/infrastructure/queue/alert-delivery.queue'; +import { FetchFeedQueue } from '../src/infrastructure/queue/fetch-feed.queue'; +import { OpmlApplyImportQueue } from '../src/infrastructure/queue/opml-apply-import.queue'; +import { OpmlParsePreviewQueue } from '../src/infrastructure/queue/opml-parse-preview.queue'; +import { + ALERT_DELIVERY_QUEUE_TOKEN, + AlertDeliveryJobData, + FETCH_FEED_QUEUE_TOKEN, + FetchFeedJobData, + OPML_APPLY_IMPORT_QUEUE_TOKEN, + OPML_PARSE_PREVIEW_QUEUE_TOKEN, + OpmlApplyImportJobData, + OpmlParsePreviewJobData, + REDIS_CONNECTION, +} from '../src/infrastructure/queue/queue.constants'; +import { ConfirmOpmlImportUseCase } from '../src/modules/opml-imports/application/confirm-opml-import.use-case'; +import { CreateOpmlImportUseCase } from '../src/modules/opml-imports/application/create-opml-import.use-case'; +import { ProcessOpmlApplyJobUseCase } from '../src/modules/opml-imports/application/process-opml-apply-job.use-case'; +import { ProcessOpmlParseJobUseCase } from '../src/modules/opml-imports/application/process-opml-parse-job.use-case'; +import { buildNormalizedFeedUrlHash, normalizeFeedUrl } from '../src/modules/opml-imports/domain/url-normalizer'; +import { OpmlImportsRepository } from '../src/modules/opml-imports/opml-imports.repository'; + +class FakeRedis { + async ping(): Promise { + return 'PONG'; + } + + async quit(): Promise { + return undefined; + } +} + +class FakeFetchQueue { + readonly jobs: FetchFeedJobData[] = []; + + async enqueue(job: FetchFeedJobData): Promise { + this.jobs.push(job); + } + + createWorker() { + throw new Error('not_used_in_worker_phase4_integration'); + } +} + +class FakeAlertDeliveryQueue { + readonly jobs: AlertDeliveryJobData[] = []; + + async enqueue(job: AlertDeliveryJobData): Promise { + this.jobs.push(job); + } + + createWorker() { + throw new Error('not_used_in_worker_phase4_integration'); + } +} + +class FakeOpmlParseQueue { + readonly jobs: OpmlParsePreviewJobData[] = []; + + async enqueue(job: OpmlParsePreviewJobData): Promise { + this.jobs.push(job); + } + + createWorker() { + throw new Error('not_used_in_worker_phase4_integration'); + } +} + +class FakeOpmlApplyQueue { + readonly jobs: OpmlApplyImportJobData[] = []; + + async enqueue(job: OpmlApplyImportJobData): Promise { + this.jobs.push(job); + } + + createWorker() { + throw new Error('not_used_in_worker_phase4_integration'); + } +} + +async function bootstrapSchema(pool: { query: (sql: string, params?: unknown[]) => Promise }): Promise { + const schema = [ + `CREATE TABLE feeds ( + id SERIAL PRIMARY KEY, + url TEXT NOT NULL UNIQUE, + status TEXT NOT NULL DEFAULT 'active', + next_check_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + poll_interval_seconds INT NOT NULL DEFAULT 1800, + normalized_url_hash TEXT + )`, + `CREATE UNIQUE INDEX idx_feeds_normalized_url_hash_unique + ON feeds (normalized_url_hash) + WHERE normalized_url_hash IS NOT NULL`, + `CREATE TABLE opml_imports ( + id BIGSERIAL PRIMARY KEY, + status TEXT NOT NULL CHECK (status IN ('uploaded', 'parsing', 'preview_ready', 'importing', 'completed', 'failed_validation', 'failed')), + file_name TEXT NOT NULL, + file_size_bytes BIGINT NOT NULL CHECK (file_size_bytes >= 0), + source_checksum TEXT, + error_message TEXT, + total_items INT NOT NULL DEFAULT 0 CHECK (total_items >= 0), + valid_items INT NOT NULL DEFAULT 0 CHECK (valid_items >= 0), + duplicate_items INT NOT NULL DEFAULT 0 CHECK (duplicate_items >= 0), + existing_items INT NOT NULL DEFAULT 0 CHECK (existing_items >= 0), + invalid_items INT NOT NULL DEFAULT 0 CHECK (invalid_items >= 0), + imported_items INT NOT NULL DEFAULT 0 CHECK (imported_items >= 0), + uploaded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + confirmed_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + )`, + `CREATE TABLE opml_import_items ( + id BIGSERIAL PRIMARY KEY, + import_id BIGINT NOT NULL REFERENCES opml_imports(id) ON DELETE CASCADE, + title TEXT, + outline_path TEXT, + source_xml_url TEXT, + normalized_url TEXT, + normalized_url_hash TEXT, + feed_id INT REFERENCES feeds(id) ON DELETE SET NULL, + item_status TEXT NOT NULL CHECK (item_status IN ('new', 'existing', 'duplicate', 'invalid', 'imported', 'failed')), + validation_error TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + )`, + `CREATE UNIQUE INDEX idx_opml_import_items_dedupe_per_import + ON opml_import_items (import_id, normalized_url_hash) + WHERE normalized_url_hash IS NOT NULL AND item_status <> 'duplicate'`, + ]; + + for (const statement of schema) { + await pool.query(statement); + } +} + +describe('OPML workers fase 4 (parse/apply reales)', () => { + let app: INestApplication; + let fakeParseQueue: FakeOpmlParseQueue; + let fakeApplyQueue: FakeOpmlApplyQueue; + let fakeFetchQueue: FakeFetchQueue; + + let createUseCase: CreateOpmlImportUseCase; + let confirmUseCase: ConfirmOpmlImportUseCase; + let processParseUseCase: ProcessOpmlParseJobUseCase; + let processApplyUseCase: ProcessOpmlApplyJobUseCase; + let opmlImportsRepository: OpmlImportsRepository; + let dbPool: { query: (sql: string, params?: unknown[]) => Promise<{ rows: unknown[] }> }; + + beforeAll(async () => { + const db = newDb({ autoCreateForeignKeyIndices: true }); + const adapter = db.adapters.createPg(); + const pool = new adapter.Pool(); + await bootstrapSchema(pool); + dbPool = pool; + + fakeFetchQueue = new FakeFetchQueue(); + const fakeAlertQueue = new FakeAlertDeliveryQueue(); + fakeParseQueue = new FakeOpmlParseQueue(); + fakeApplyQueue = new FakeOpmlApplyQueue(); + + const moduleRef = await Test.createTestingModule({ + imports: [AppModule], + }) + .overrideProvider(DATABASE_POOL) + .useValue(pool) + .overrideProvider(REDIS_CONNECTION) + .useValue(new FakeRedis()) + .overrideProvider(FETCH_FEED_QUEUE_TOKEN) + .useValue(fakeFetchQueue) + .overrideProvider(FetchFeedQueue) + .useValue(fakeFetchQueue) + .overrideProvider(ALERT_DELIVERY_QUEUE_TOKEN) + .useValue(fakeAlertQueue) + .overrideProvider(AlertDeliveryQueue) + .useValue(fakeAlertQueue) + .overrideProvider(OPML_PARSE_PREVIEW_QUEUE_TOKEN) + .useValue(fakeParseQueue) + .overrideProvider(OpmlParsePreviewQueue) + .useValue(fakeParseQueue) + .overrideProvider(OPML_APPLY_IMPORT_QUEUE_TOKEN) + .useValue(fakeApplyQueue) + .overrideProvider(OpmlApplyImportQueue) + .useValue(fakeApplyQueue) + .compile(); + + app = moduleRef.createNestApplication(); + await app.init(); + + createUseCase = moduleRef.get(CreateOpmlImportUseCase); + confirmUseCase = moduleRef.get(ConfirmOpmlImportUseCase); + processParseUseCase = moduleRef.get(ProcessOpmlParseJobUseCase); + processApplyUseCase = moduleRef.get(ProcessOpmlApplyJobUseCase); + opmlImportsRepository = moduleRef.get(OpmlImportsRepository); + }); + + afterAll(async () => { + if (app) { + await app.close(); + } + }); + + beforeEach(async () => { + fakeParseQueue.jobs.length = 0; + fakeApplyQueue.jobs.length = 0; + fakeFetchQueue.jobs.length = 0; + await dbPool.query('DELETE FROM opml_import_items'); + await dbPool.query('DELETE FROM opml_imports'); + await dbPool.query('DELETE FROM feeds'); + }); + + it('procesa flujo completo async: uploaded -> parsing -> preview_ready -> importing -> completed', async () => { + await dbPool.query('INSERT INTO feeds(url, normalized_url_hash) VALUES ($1, $2)', [ + 'https://existing.example.com/', + buildNormalizedFeedUrlHash(normalizeFeedUrl('https://existing.example.com')), + ]); + + const opml = ` + + + + + + + + + + + `; + + const created = await createUseCase.execute({ + fileName: 'fase4.opml', + mimeType: 'text/x-opml', + content: Buffer.from(opml, 'utf8'), + }); + + expect(created.status).toBe('uploaded'); + expect(fakeParseQueue.jobs).toHaveLength(1); + + const importId = Number(created.id); + await processParseUseCase.execute(fakeParseQueue.jobs[0]); + + const afterParse = await opmlImportsRepository.getImportOrThrow(importId); + expect(afterParse.status).toBe('preview_ready'); + expect(afterParse.totalItems).toBe(5); + expect(afterParse.invalidItems).toBe(1); + expect(afterParse.duplicateItems).toBe(2); + expect(afterParse.validItems).toBeGreaterThanOrEqual(2); + + const confirmed = await confirmUseCase.execute(importId); + expect(confirmed.status).toBe('queued'); + expect(fakeApplyQueue.jobs).toHaveLength(1); + + await processApplyUseCase.execute(fakeApplyQueue.jobs[0]); + + const afterApply = await opmlImportsRepository.getImportOrThrow(importId); + expect(afterApply.status).toBe('completed'); + expect(afterApply.importedItems).toBeGreaterThanOrEqual(1); + + const grouped = await opmlImportsRepository.countItemsByStatus(importId); + expect(grouped.imported ?? 0).toBeGreaterThanOrEqual(1); + expect(grouped.duplicate).toBe(2); + expect(grouped.invalid).toBe(1); + expect(grouped.failed ?? 0).toBe(0); + expect(fakeFetchQueue.jobs.length).toBeGreaterThanOrEqual(1); + }); + + it('marca failed con fallo parcial y mantiene conteos consistentes', async () => { + const candidateUrl = normalizeFeedUrl('https://candidate.example.com/rss'); + + const opml = ``; + + const created = await createUseCase.execute({ + fileName: 'collision.opml', + mimeType: 'text/x-opml', + content: Buffer.from(opml, 'utf8'), + }); + + const importId = Number(created.id); + await processParseUseCase.execute(fakeParseQueue.jobs[0]); + + await dbPool.query( + `UPDATE opml_import_items + SET normalized_url = NULL, normalized_url_hash = NULL + WHERE id = (SELECT id FROM opml_import_items WHERE import_id = $1 AND item_status = 'new' ORDER BY id ASC LIMIT 1)`, + [importId], + ); + + await confirmUseCase.execute(importId); + await processApplyUseCase.execute(fakeApplyQueue.jobs[0]); + + const afterApply = await opmlImportsRepository.getImportOrThrow(importId); + expect(afterApply.status).toBe('failed'); + expect(afterApply.errorMessage).toContain('partial_import_failure:1'); + + const grouped = await opmlImportsRepository.countItemsByStatus(importId); + expect(grouped.failed).toBe(1); + expect(afterApply.importedItems).toBe(0); + expect(fakeFetchQueue.jobs).toHaveLength(0); + }); +}); diff --git a/test/process-opml-parse-job.use-case.spec.ts b/test/process-opml-parse-job.use-case.spec.ts new file mode 100644 index 0000000..4f36658 --- /dev/null +++ b/test/process-opml-parse-job.use-case.spec.ts @@ -0,0 +1,75 @@ +import { ProcessOpmlParseJobUseCase } from '../src/modules/opml-imports/application/process-opml-parse-job.use-case'; +import { buildNormalizedFeedUrlHash, normalizeFeedUrl } from '../src/modules/opml-imports/domain/url-normalizer'; +import { OpmlImportItemInput } from '../src/modules/opml-imports/opml-imports.repository'; + +describe('ProcessOpmlParseJobUseCase', () => { + it('clasifica entradas ya registradas como existing en preview', async () => { + const existingUrl = normalizeFeedUrl('https://existing.example.com/feed.xml'); + const existingHash = buildNormalizedFeedUrlHash(existingUrl); + + const replaceImportItems = jest.fn, [number, OpmlImportItemInput[], unknown]>().mockResolvedValue(); + const markImportStatus = jest.fn().mockImplementation(async (_importId: number, input: { status: string }) => ({ + id: '1', + status: input.status, + })); + + const opmlImportsRepository = { + getImportOrThrow: jest.fn().mockResolvedValue({ id: '1', status: 'uploaded' }), + markImportStatus, + replaceImportItems, + }; + + const client = { + query: jest.fn(async (sql: string) => { + if (sql.includes('FROM feeds')) { + return { + rows: [ + { + id: 12, + url: existingUrl, + normalized_url_hash: existingHash, + }, + ], + }; + } + + return { rows: [] }; + }), + release: jest.fn(), + }; + + const databaseService = { + getPool: () => ({ connect: async () => client }), + }; + + const appConfigService = { + opmlUploadMaxBytes: 1024 * 1024, + }; + + const observabilityService = { + startJobTimer: jest.fn().mockReturnValue(jest.fn()), + }; + + const useCase = new ProcessOpmlParseJobUseCase( + databaseService as never, + opmlImportsRepository as never, + appConfigService as never, + observabilityService as never, + ); + + await useCase.execute({ + importId: 1, + opmlXml: ``, + }); + + expect(replaceImportItems).toHaveBeenCalledTimes(1); + const [, parsedItems] = replaceImportItems.mock.calls[0]; + expect(parsedItems).toHaveLength(1); + expect(parsedItems[0].itemStatus).toBe('existing'); + expect(parsedItems[0].validationError).toBeNull(); + + const finalStatusCall = markImportStatus.mock.calls.find(([, input]) => input.status === 'preview_ready'); + expect(finalStatusCall).toBeDefined(); + expect(finalStatusCall?.[1]?.counters?.existingItems).toBe(1); + }); +}); diff --git a/test/url-normalizer.spec.ts b/test/url-normalizer.spec.ts new file mode 100644 index 0000000..234e289 --- /dev/null +++ b/test/url-normalizer.spec.ts @@ -0,0 +1,44 @@ +import { buildNormalizedFeedUrlHash, normalizeFeedUrl } from '../src/modules/opml-imports/domain/url-normalizer'; + +describe('normalizeFeedUrl', () => { + it('normalizes host/protocol casing and removes default port', () => { + const a = normalizeFeedUrl('HTTP://Example.COM:80/rss.xml'); + const b = normalizeFeedUrl('http://example.com/rss.xml'); + + expect(a).toBe('http://example.com/rss.xml'); + expect(a).toBe(b); + }); + + it('normalizes repeated slashes and trailing slash', () => { + const a = normalizeFeedUrl('https://example.com//feeds///'); + const b = normalizeFeedUrl('https://example.com/feeds'); + + expect(a).toBe('https://example.com/feeds'); + expect(a).toBe(b); + }); + + it('drops hash fragments and keeps query string as-is', () => { + const normalized = normalizeFeedUrl('https://example.com/rss.xml?lang=es#section-1'); + expect(normalized).toBe('https://example.com/rss.xml?lang=es'); + }); + + it('rejects non-http(s) URLs', () => { + expect(() => normalizeFeedUrl('ftp://example.com/feed.xml')).toThrow('feed_url_protocol_not_supported'); + }); +}); + +describe('buildNormalizedFeedUrlHash', () => { + it('returns stable hash for equivalent urls', () => { + const a = normalizeFeedUrl('https://EXAMPLE.com:443/path/'); + const b = normalizeFeedUrl('https://example.com/path'); + + expect(buildNormalizedFeedUrlHash(a)).toBe(buildNormalizedFeedUrlHash(b)); + }); + + it('returns different hash for different canonical urls', () => { + const a = normalizeFeedUrl('https://example.com/feed.xml?lang=es'); + const b = normalizeFeedUrl('https://example.com/feed.xml?lang=en'); + + expect(buildNormalizedFeedUrlHash(a)).not.toBe(buildNormalizedFeedUrlHash(b)); + }); +}); diff --git a/test/vertical-slice.integration-spec.ts b/test/vertical-slice.integration-spec.ts index e4ee9d8..219c97b 100644 --- a/test/vertical-slice.integration-spec.ts +++ b/test/vertical-slice.integration-spec.ts @@ -19,16 +19,24 @@ import { AppModule } from '../src/app.module'; import { DATABASE_POOL } from '../src/infrastructure/persistence/database.constants'; import { AlertDeliveryQueue } from '../src/infrastructure/queue/alert-delivery.queue'; import { FetchFeedQueue } from '../src/infrastructure/queue/fetch-feed.queue'; +import { OpmlApplyImportQueue } from '../src/infrastructure/queue/opml-apply-import.queue'; +import { OpmlParsePreviewQueue } from '../src/infrastructure/queue/opml-parse-preview.queue'; import { ALERT_DELIVERY_QUEUE_TOKEN, AlertDeliveryJobData, FETCH_FEED_QUEUE_TOKEN, FetchFeedJobData, + OPML_APPLY_IMPORT_QUEUE_TOKEN, + OPML_PARSE_PREVIEW_QUEUE_TOKEN, + OpmlApplyImportJobData, + OpmlParsePreviewJobData, REDIS_CONNECTION, } from '../src/infrastructure/queue/queue.constants'; import { ProcessAlertDeliveryUseCase } from '../src/modules/alerts/application/process-alert-delivery.use-case'; import { ProcessFeedJobUseCase } from '../src/modules/ingestion/application/process-feed-job.use-case'; import { ScheduleDueFeedsUseCase } from '../src/modules/ingestion/application/schedule-due-feeds.use-case'; +import { ProcessOpmlApplyJobUseCase } from '../src/modules/opml-imports/application/process-opml-apply-job.use-case'; +import { ProcessOpmlParseJobUseCase } from '../src/modules/opml-imports/application/process-opml-parse-job.use-case'; import { FEED_FETCHER, FeedFetchResult } from '../src/modules/ingestion/domain/feed-fetcher.port'; import { ALERT_NOTIFIER, AlertNotificationPayload, AlertNotifierPort } from '../src/modules/notifications/domain/alert-notifier.port'; import { configureApiApplication } from '../src/main/create-api-app'; @@ -57,6 +65,30 @@ class FakeAlertDeliveryQueue { } } +class FakeOpmlParseQueue { + readonly jobs: OpmlParsePreviewJobData[] = []; + + async enqueue(job: OpmlParsePreviewJobData): Promise { + this.jobs.push(job); + } + + createWorker() { + throw new Error('Not used in integration test'); + } +} + +class FakeOpmlApplyQueue { + readonly jobs: OpmlApplyImportJobData[] = []; + + async enqueue(job: OpmlApplyImportJobData): Promise { + this.jobs.push(job); + } + + createWorker() { + throw new Error('Not used in integration test'); + } +} + class FakeRedis { async ping(): Promise { return 'PONG'; @@ -115,6 +147,7 @@ async function bootstrapTestSchema(pool: { query: (sql: string) => Promise Promise Promise Promise= 0), + source_checksum TEXT, + error_message TEXT, + total_items INT NOT NULL DEFAULT 0 CHECK (total_items >= 0), + valid_items INT NOT NULL DEFAULT 0 CHECK (valid_items >= 0), + duplicate_items INT NOT NULL DEFAULT 0 CHECK (duplicate_items >= 0), + existing_items INT NOT NULL DEFAULT 0 CHECK (existing_items >= 0), + invalid_items INT NOT NULL DEFAULT 0 CHECK (invalid_items >= 0), + imported_items INT NOT NULL DEFAULT 0 CHECK (imported_items >= 0), + uploaded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + confirmed_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + )`, + `CREATE TABLE opml_import_items ( + id BIGSERIAL PRIMARY KEY, + import_id BIGINT NOT NULL REFERENCES opml_imports(id) ON DELETE CASCADE, + title TEXT, + outline_path TEXT, + source_xml_url TEXT, + normalized_url TEXT, + normalized_url_hash TEXT, + feed_id INT REFERENCES feeds(id) ON DELETE SET NULL, + item_status TEXT NOT NULL CHECK (item_status IN ('new', 'existing', 'duplicate', 'invalid', 'imported', 'failed')), + validation_error TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + CONSTRAINT opml_import_items_normalized_url_required + CHECK (item_status = 'invalid' OR (normalized_url IS NOT NULL AND normalized_url_hash IS NOT NULL)) + )`, + `CREATE UNIQUE INDEX idx_opml_import_items_dedupe_per_import + ON opml_import_items (import_id, normalized_url_hash) + WHERE normalized_url_hash IS NOT NULL AND item_status <> 'duplicate'`, ]; for (const statement of schema) { @@ -186,8 +259,12 @@ describe('vertical slice integration', () => { let scheduleDueFeedsUseCase: ScheduleDueFeedsUseCase; let processFeedJobUseCase: ProcessFeedJobUseCase; let processAlertDeliveryUseCase: ProcessAlertDeliveryUseCase; + let processOpmlParseJobUseCase: ProcessOpmlParseJobUseCase; + let processOpmlApplyJobUseCase: ProcessOpmlApplyJobUseCase; let fakeQueue: FakeQueue; let fakeAlertDeliveryQueue: FakeAlertDeliveryQueue; + let fakeOpmlParseQueue: FakeOpmlParseQueue; + let fakeOpmlApplyQueue: FakeOpmlApplyQueue; let fakeAlertNotifier: FakeAlertNotifier; beforeAll(async () => { @@ -198,6 +275,8 @@ describe('vertical slice integration', () => { fakeQueue = new FakeQueue(); fakeAlertDeliveryQueue = new FakeAlertDeliveryQueue(); + fakeOpmlParseQueue = new FakeOpmlParseQueue(); + fakeOpmlApplyQueue = new FakeOpmlApplyQueue(); fakeAlertNotifier = new FakeAlertNotifier(); const moduleRef = await Test.createTestingModule({ @@ -215,6 +294,14 @@ describe('vertical slice integration', () => { .useValue(fakeAlertDeliveryQueue) .overrideProvider(AlertDeliveryQueue) .useValue(fakeAlertDeliveryQueue) + .overrideProvider(OPML_PARSE_PREVIEW_QUEUE_TOKEN) + .useValue(fakeOpmlParseQueue) + .overrideProvider(OpmlParsePreviewQueue) + .useValue(fakeOpmlParseQueue) + .overrideProvider(OPML_APPLY_IMPORT_QUEUE_TOKEN) + .useValue(fakeOpmlApplyQueue) + .overrideProvider(OpmlApplyImportQueue) + .useValue(fakeOpmlApplyQueue) .overrideProvider(FEED_FETCHER) .useValue(new FakeFeedFetcher()) .overrideProvider(ALERT_NOTIFIER) @@ -228,6 +315,8 @@ describe('vertical slice integration', () => { scheduleDueFeedsUseCase = moduleRef.get(ScheduleDueFeedsUseCase); processFeedJobUseCase = moduleRef.get(ProcessFeedJobUseCase); processAlertDeliveryUseCase = moduleRef.get(ProcessAlertDeliveryUseCase); + processOpmlParseJobUseCase = moduleRef.get(ProcessOpmlParseJobUseCase); + processOpmlApplyJobUseCase = moduleRef.get(ProcessOpmlApplyJobUseCase); }); afterAll(async () => { @@ -403,6 +492,53 @@ describe('vertical slice integration', () => { }); }); + it('runs OPML happy path upload -> preview -> confirm -> status', async () => { + const opml = ` + + + + + + + + + `; + + const uploadResponse = await request(app.getHttpServer()) + .post('/api/v1/opml/imports') + .attach('file', Buffer.from(opml, 'utf8'), { + filename: 'feeds.opml', + contentType: 'text/x-opml', + }) + .expect(201); + + const importId = Number(uploadResponse.body.data.id); + expect(importId).toBeGreaterThan(0); + expect(fakeOpmlParseQueue.jobs).toHaveLength(1); + + await processOpmlParseJobUseCase.execute(fakeOpmlParseQueue.jobs[0]); + + const previewResponse = await request(app.getHttpServer()).get(`/api/v1/opml/imports/${importId}/preview`).expect(200); + expect(previewResponse.body.summary.status).toBe('preview_ready'); + expect(previewResponse.body.summary.totalItems).toBe(3); + expect(previewResponse.body.summary.duplicateItems).toBe(1); + expect(previewResponse.body.summary.invalidItems).toBe(1); + + const confirmResponse = await request(app.getHttpServer()).post(`/api/v1/opml/imports/${importId}/confirm`).expect(202); + expect(confirmResponse.body.data.status).toBe('queued'); + expect(fakeOpmlApplyQueue.jobs).toHaveLength(1); + + await processOpmlApplyJobUseCase.execute(fakeOpmlApplyQueue.jobs[0]); + + const statusResponse = await request(app.getHttpServer()).get(`/api/v1/opml/imports/${importId}/status`).expect(200); + expect(statusResponse.body.data.status).toBe('completed'); + expect(statusResponse.body.data.importedItems).toBe(1); + expect(statusResponse.body.data.progressPercent).toBe(100); + + const secondConfirm = await request(app.getHttpServer()).post(`/api/v1/opml/imports/${importId}/confirm`).expect(202); + expect(secondConfirm.body.data.status).toBe('already_confirmed'); + }); + it('exposes Swagger UI and OpenAPI JSON for the running API surface', async () => { const docsResponse = await request(app.getHttpServer()).get('/docs').expect(200); expect(docsResponse.text).toContain('swagger-ui');