Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions db/migrations/0003_opml_imports_mvp_foundation.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
ALTER TABLE feeds
ADD COLUMN IF NOT EXISTS normalized_url_hash TEXT;

CREATE UNIQUE INDEX IF NOT EXISTS idx_feeds_normalized_url_hash_unique
ON feeds (normalized_url_hash)
WHERE normalized_url_hash IS NOT NULL;

CREATE TABLE IF NOT EXISTS opml_imports (
id BIGSERIAL PRIMARY KEY,
status TEXT NOT NULL CHECK (status IN ('uploaded', 'parsing', 'preview_ready', 'importing', 'completed', 'failed_validation', 'failed')),
file_name TEXT NOT NULL,
file_size_bytes BIGINT NOT NULL CHECK (file_size_bytes >= 0),
source_checksum TEXT,
error_message TEXT,
total_items INT NOT NULL DEFAULT 0 CHECK (total_items >= 0),
valid_items INT NOT NULL DEFAULT 0 CHECK (valid_items >= 0),
duplicate_items INT NOT NULL DEFAULT 0 CHECK (duplicate_items >= 0),
existing_items INT NOT NULL DEFAULT 0 CHECK (existing_items >= 0),
invalid_items INT NOT NULL DEFAULT 0 CHECK (invalid_items >= 0),
imported_items INT NOT NULL DEFAULT 0 CHECK (imported_items >= 0),
uploaded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
confirmed_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_opml_imports_status_created_at
ON opml_imports (status, created_at DESC);

CREATE TABLE IF NOT EXISTS opml_import_items (
id BIGSERIAL PRIMARY KEY,
import_id BIGINT NOT NULL REFERENCES opml_imports(id) ON DELETE CASCADE,
title TEXT,
outline_path TEXT,
source_xml_url TEXT,
normalized_url TEXT,
normalized_url_hash TEXT,
feed_id INT REFERENCES feeds(id) ON DELETE SET NULL,
item_status TEXT NOT NULL CHECK (item_status IN ('new', 'existing', 'duplicate', 'invalid', 'imported', 'failed')),
validation_error TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT opml_import_items_normalized_url_required
CHECK (item_status = 'invalid' OR (normalized_url IS NOT NULL AND normalized_url_hash IS NOT NULL))
);

CREATE INDEX IF NOT EXISTS idx_opml_import_items_import_id_status
ON opml_import_items (import_id, item_status);

CREATE INDEX IF NOT EXISTS idx_opml_import_items_hash
ON opml_import_items (normalized_url_hash);

CREATE UNIQUE INDEX IF NOT EXISTS idx_opml_import_items_dedupe_per_import
ON opml_import_items (import_id, normalized_url_hash)
WHERE normalized_url_hash IS NOT NULL;

-- Manual rollback guide (framework currently applies only "up" migrations):
-- DROP INDEX IF EXISTS idx_opml_import_items_dedupe_per_import;
-- DROP INDEX IF EXISTS idx_opml_import_items_hash;
-- DROP INDEX IF EXISTS idx_opml_import_items_import_id_status;
-- DROP TABLE IF EXISTS opml_import_items;
-- DROP INDEX IF EXISTS idx_opml_imports_status_created_at;
-- DROP TABLE IF EXISTS opml_imports;
-- DROP INDEX IF EXISTS idx_feeds_normalized_url_hash_unique;
-- ALTER TABLE feeds DROP COLUMN IF EXISTS normalized_url_hash;
23 changes: 23 additions & 0 deletions db/migrations/0004_agent_interface_filter_indexing.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
ALTER TABLE entries
ADD COLUMN IF NOT EXISTS normalized_search_document TEXT NOT NULL DEFAULT '';

UPDATE entries
SET normalized_search_document = LOWER(
TRANSLATE(
COALESCE(title, '') || ' ' || COALESCE(content, ''),
'ÁÀÂÄÃÅáàâäãåÉÈÊËéèêëÍÌÎÏíìîïÓÒÔÖÕóòôöõÚÙÛÜúùûüÑñÇç',
'AAAAAAaaaaaaEEEEeeeeIIIIiiiiOOOOOoooooUUUUuuuuNnCc'
)
)
WHERE normalized_search_document = '';

CREATE INDEX IF NOT EXISTS idx_entries_normalized_search_document_tsv
ON entries USING GIN (to_tsvector('simple', normalized_search_document));

CREATE INDEX IF NOT EXISTS idx_entries_feed_published_id
ON entries (feed_id, published_at DESC, id DESC);

-- Manual rollback guide (framework currently applies only "up" migrations):
-- DROP INDEX IF EXISTS idx_entries_feed_published_id;
-- DROP INDEX IF EXISTS idx_entries_normalized_search_document_tsv;
-- ALTER TABLE entries DROP COLUMN IF EXISTS normalized_search_document;
11 changes: 11 additions & 0 deletions db/migrations/0005_opml_duplicate_index_compatibility.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
DROP INDEX IF EXISTS idx_opml_import_items_dedupe_per_import;

CREATE UNIQUE INDEX IF NOT EXISTS idx_opml_import_items_dedupe_per_import
ON opml_import_items (import_id, normalized_url_hash)
WHERE normalized_url_hash IS NOT NULL AND item_status <> 'duplicate';

-- Manual rollback guide (framework currently applies only "up" migrations):
-- DROP INDEX IF EXISTS idx_opml_import_items_dedupe_per_import;
-- CREATE UNIQUE INDEX IF NOT EXISTS idx_opml_import_items_dedupe_per_import
-- ON opml_import_items (import_id, normalized_url_hash)
-- WHERE normalized_url_hash IS NOT NULL;
2 changes: 2 additions & 0 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { FeedsModule } from './modules/feeds/feeds.module';
import { IngestionModule } from './modules/ingestion/ingestion.module';
import { NotificationsModule } from './modules/notifications/notifications.module';
import { ObservabilityModule } from './modules/observability/observability.module';
import { OpmlImportsModule } from './modules/opml-imports/opml-imports.module';
import { RulesModule } from './modules/rules/rules.module';
import { configuration } from './shared/config/configuration';
import { AppConfigModule } from './shared/config/app-config.module';
Expand All @@ -33,6 +34,7 @@ import { QueueModule } from './infrastructure/queue/queue.module';
NotificationsModule,
AlertsModule,
ObservabilityModule,
OpmlImportsModule,
IngestionModule,
],
providers: [
Expand Down
56 changes: 56 additions & 0 deletions src/infrastructure/queue/opml-apply-import.queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import { Job, Queue, Worker } from 'bullmq';

import { AppConfigService } from '../../shared/config/app-config.service';

import { buildQueueJobId } from './job-id';
import {
OPML_APPLY_IMPORT_QUEUE_NAME,
OpmlApplyImportJobData,
OpmlApplyImportQueuePort,
} from './queue.constants';

@Injectable()
export class OpmlApplyImportQueue implements OpmlApplyImportQueuePort, OnApplicationShutdown {
private readonly queue: Queue<OpmlApplyImportJobData, void, string>;

private get connection() {
return { url: this.configService.redisUrl };
}

constructor(@Inject(AppConfigService) private readonly configService: AppConfigService) {
this.queue = new Queue<OpmlApplyImportJobData, void, string>(OPML_APPLY_IMPORT_QUEUE_NAME, {
connection: this.connection,
defaultJobOptions: {
attempts: 4,
backoff: {
type: 'exponential',
delay: 2000,
},
removeOnComplete: 100,
removeOnFail: 100,
},
});
}

async enqueue(job: OpmlApplyImportJobData): Promise<void> {
await this.queue.add(OPML_APPLY_IMPORT_QUEUE_NAME, job, {
jobId: buildQueueJobId('opml-apply', job.importId),
});
}

createWorker(processor: (job: Job<OpmlApplyImportJobData, void, string>) => Promise<void>): Worker<OpmlApplyImportJobData, void, string> {
return new Worker<OpmlApplyImportJobData, void, string>(
OPML_APPLY_IMPORT_QUEUE_NAME,
async (job) => processor(job),
{
connection: this.connection,
concurrency: Math.max(1, Math.min(this.configService.workerConcurrency, 3)),
},
);
}

async onApplicationShutdown(): Promise<void> {
await this.queue.close();
}
}
56 changes: 56 additions & 0 deletions src/infrastructure/queue/opml-parse-preview.queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common';
import { Job, Queue, Worker } from 'bullmq';

import { AppConfigService } from '../../shared/config/app-config.service';

import { buildQueueJobId } from './job-id';
import {
OPML_PARSE_PREVIEW_QUEUE_NAME,
OpmlParsePreviewJobData,
OpmlParsePreviewQueuePort,
} from './queue.constants';

@Injectable()
export class OpmlParsePreviewQueue implements OpmlParsePreviewQueuePort, OnApplicationShutdown {
private readonly queue: Queue<OpmlParsePreviewJobData, void, string>;

private get connection() {
return { url: this.configService.redisUrl };
}

constructor(@Inject(AppConfigService) private readonly configService: AppConfigService) {
this.queue = new Queue<OpmlParsePreviewJobData, void, string>(OPML_PARSE_PREVIEW_QUEUE_NAME, {
connection: this.connection,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: 100,
removeOnFail: 100,
},
});
}

async enqueue(job: OpmlParsePreviewJobData): Promise<void> {
await this.queue.add(OPML_PARSE_PREVIEW_QUEUE_NAME, job, {
jobId: buildQueueJobId('opml-parse', job.importId),
});
}

createWorker(processor: (job: Job<OpmlParsePreviewJobData, void, string>) => Promise<void>): Worker<OpmlParsePreviewJobData, void, string> {
return new Worker<OpmlParsePreviewJobData, void, string>(
OPML_PARSE_PREVIEW_QUEUE_NAME,
async (job) => processor(job),
{
connection: this.connection,
concurrency: Math.max(1, Math.min(this.configService.workerConcurrency, 3)),
},
);
}

async onApplicationShutdown(): Promise<void> {
await this.queue.close();
}
}
22 changes: 22 additions & 0 deletions src/infrastructure/queue/queue.constants.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
export const REDIS_CONNECTION = Symbol('REDIS_CONNECTION');
export const FETCH_FEED_QUEUE_TOKEN = Symbol('FETCH_FEED_QUEUE_TOKEN');
export const ALERT_DELIVERY_QUEUE_TOKEN = Symbol('ALERT_DELIVERY_QUEUE_TOKEN');
export const OPML_PARSE_PREVIEW_QUEUE_TOKEN = Symbol('OPML_PARSE_PREVIEW_QUEUE_TOKEN');
export const OPML_APPLY_IMPORT_QUEUE_TOKEN = Symbol('OPML_APPLY_IMPORT_QUEUE_TOKEN');

export const FETCH_FEED_QUEUE_NAME = 'fetch-feed';
export const ALERT_DELIVERY_QUEUE_NAME = 'alert-delivery';
export const OPML_PARSE_PREVIEW_QUEUE_NAME = 'opml-parse-preview';
export const OPML_APPLY_IMPORT_QUEUE_NAME = 'opml-apply-import';

export interface FetchFeedJobData {
feedId: number;
Expand All @@ -24,3 +28,21 @@ export interface AlertDeliveryJobData {
export interface AlertDeliveryQueuePort {
enqueue(job: AlertDeliveryJobData): Promise<void>;
}

export interface OpmlParsePreviewJobData {
importId: number;
opmlXml: string;
}

export interface OpmlParsePreviewQueuePort {
enqueue(job: OpmlParsePreviewJobData): Promise<void>;
}

export interface OpmlApplyImportJobData {
importId: number;
requestedAt: string;
}

export interface OpmlApplyImportQueuePort {
enqueue(job: OpmlApplyImportJobData): Promise<void>;
}
32 changes: 30 additions & 2 deletions src/infrastructure/queue/queue.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,17 @@ import { AppConfigModule } from '../../shared/config/app-config.module';
import { AppConfigService } from '../../shared/config/app-config.service';

import { AlertDeliveryQueue } from './alert-delivery.queue';
import { OpmlApplyImportQueue } from './opml-apply-import.queue';
import { OpmlParsePreviewQueue } from './opml-parse-preview.queue';

import { FetchFeedQueue } from './fetch-feed.queue';
import { ALERT_DELIVERY_QUEUE_TOKEN, FETCH_FEED_QUEUE_TOKEN, REDIS_CONNECTION } from './queue.constants';
import {
ALERT_DELIVERY_QUEUE_TOKEN,
FETCH_FEED_QUEUE_TOKEN,
OPML_APPLY_IMPORT_QUEUE_TOKEN,
OPML_PARSE_PREVIEW_QUEUE_TOKEN,
REDIS_CONNECTION,
} from './queue.constants';

@Global()
@Module({
Expand All @@ -20,6 +28,8 @@ import { ALERT_DELIVERY_QUEUE_TOKEN, FETCH_FEED_QUEUE_TOKEN, REDIS_CONNECTION }
},
AlertDeliveryQueue,
FetchFeedQueue,
OpmlParsePreviewQueue,
OpmlApplyImportQueue,
{
provide: ALERT_DELIVERY_QUEUE_TOKEN,
useExisting: AlertDeliveryQueue,
Expand All @@ -28,7 +38,25 @@ import { ALERT_DELIVERY_QUEUE_TOKEN, FETCH_FEED_QUEUE_TOKEN, REDIS_CONNECTION }
provide: FETCH_FEED_QUEUE_TOKEN,
useExisting: FetchFeedQueue,
},
{
provide: OPML_PARSE_PREVIEW_QUEUE_TOKEN,
useExisting: OpmlParsePreviewQueue,
},
{
provide: OPML_APPLY_IMPORT_QUEUE_TOKEN,
useExisting: OpmlApplyImportQueue,
},
],
exports: [
REDIS_CONNECTION,
AlertDeliveryQueue,
FetchFeedQueue,
OpmlParsePreviewQueue,
OpmlApplyImportQueue,
ALERT_DELIVERY_QUEUE_TOKEN,
FETCH_FEED_QUEUE_TOKEN,
OPML_PARSE_PREVIEW_QUEUE_TOKEN,
OPML_APPLY_IMPORT_QUEUE_TOKEN,
],
exports: [REDIS_CONNECTION, AlertDeliveryQueue, FetchFeedQueue, ALERT_DELIVERY_QUEUE_TOKEN, FETCH_FEED_QUEUE_TOKEN],
})
export class QueueModule {}
52 changes: 49 additions & 3 deletions src/modules/entries/entries.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,22 @@ interface EntryRow {
fetched_at: Date;
}

interface EntryFilterRow {
id: string;
feed_id: number;
title: string | null;
content: string | null;
published_at: Date | null;
}

export interface EntryFilterCandidate {
id: string;
feedId: number;
title: string | null;
content: string | null;
publishedAt: string | null;
}

function mapEntry(row: EntryRow): Entry {
return {
id: row.id,
Expand Down Expand Up @@ -63,12 +79,21 @@ export class EntriesRepository {
for (const entry of entries) {
const result = await executor.query<EntryRow>(
`
INSERT INTO entries (feed_id, title, link, guid, content, content_hash, published_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
INSERT INTO entries (feed_id, title, link, guid, content, content_hash, published_at, normalized_search_document)
VALUES ($1, $2, $3, $4, $5, $6, $7, LOWER($8))
ON CONFLICT DO NOTHING
RETURNING id, feed_id, title, link, guid, content, content_hash, published_at, fetched_at
`,
[feedId, entry.title, entry.link, entry.guid, entry.content, entry.contentHash, entry.publishedAt],
[
feedId,
entry.title,
entry.link,
entry.guid,
entry.content,
entry.contentHash,
entry.publishedAt,
`${entry.title ?? ''} ${entry.content ?? ''}`,
],
);

if (result.rows[0]) {
Expand Down Expand Up @@ -108,4 +133,25 @@ export class EntriesRepository {
total: Number(totalResult.rows[0]?.count ?? '0'),
};
}

async listForFilterSearch(limit: number): Promise<EntryFilterCandidate[]> {
const cappedLimit = Math.max(1, Math.min(limit, 5000));
const result = await this.databaseService.query<EntryFilterRow>(
`
SELECT id, feed_id, title, content, published_at
FROM entries
ORDER BY published_at DESC NULLS LAST, id DESC
LIMIT $1
`,
[cappedLimit],
);

return result.rows.map((row) => ({
id: row.id,
feedId: row.feed_id,
title: row.title,
content: row.content,
publishedAt: row.published_at?.toISOString() ?? null,
}));
}
}
Loading
Loading