Skip to content

feat(notifications): NotificationListener for in-app notifications and email dispatch (M4-BE-009)#139

Merged
akinwalexander merged 2 commits into
devfrom
feat/M4-BE-009-notification-listener
May 30, 2026
Merged

feat(notifications): NotificationListener for in-app notifications and email dispatch (M4-BE-009)#139
akinwalexander merged 2 commits into
devfrom
feat/M4-BE-009-notification-listener

Conversation

@ibraheembello
Copy link
Copy Markdown
Contributor

@ibraheembello ibraheembello commented May 30, 2026

Pull Request

Description

Implements M4-BE-009 - NotificationListener: system-generated in-app notifications and email dispatch.

A new NotificationListener subscribes to APP_EVENTS and reacts by creating in-app notifications (via NotificationsService.createNotification()) and queueing emails, each gated independently on the user's notification_preferences:

  • stage.completed -> stage_completed in-app + stage-completed email
  • stage.unlocked -> stage_unlocked in-app + stage-unlocked email
  • funnel.generated -> funnel_regenerated in-app (system event, ungated) + funnel-ready email
  • 50% funnel-progress milestone on task.completed, idempotent per funnel via an EXISTS check
  • Weekly digest: a recurring Bull job (0 9 * * MON, jobId weekly-digest-recurring) registered on module init, with a per-user digest processor on the email queue

All handlers are fire-and-forget: any failure is logged and never affects the HTTP response of the service that emitted the event. Email dispatch resolves the recipient from the trusted event userId (never from an external caller).

Notification titles and bodies match the Figma copy exactly.

Four new email types (funnel-ready, stage-unlocked, stage-completed, weekly-digest) with their Handlebars templates were added.

Related Issue

M4-BE-009

Type of Change

  • feat: New feature

How Has This Been Tested?

  • Unit tests
  • Manual tests

Verified end to end against live Postgres + Redis by driving the real HTTP endpoints (complete tasks -> complete stage) and reading GET /api/notifications:

Behaviour Result Acceptance
Task + stage completion feed shows funnel_progress, stage_completed, stage_unlocked with exact Figma copy AC-01, AC-02, AC-04, AC-08
Two task completions (50% then 100%) a single funnel_progress row EC-01 idempotency
Single stage-complete transaction both stage_completed and stage_unlocked created EC-02
Email gating only stage-unlocked email queued; stage-completed suppressed (pref default false) AC-06
No preferences row defaults auto-created and applied EC-04
Handler failures during run none AC-07
Boot Weekly digest recurring job registered AC-09

funnel_regenerated (AC-03) and in-app suppression when inapp_* is false (AC-05) are covered by the unit suite.

Test Evidence

image image image image image
  1. GET /api/notifications feed (3 system notifications)
  2. Email gating log (stage-unlocked queued, no stage-completed)
  3. Boot log: weekly digest recurring job registered
  4. Unit tests passing (notification.listener, weekly-digest.processor, template.service)

Screenshots to be attached.

Checklist

  • My code follows the project's coding style
  • I have commented my code, particularly in hard-to-understand areas
  • My changes generate no new warnings
  • I have added tests that prove my feature works
  • New and existing unit tests pass locally with my changes
  • I have included a screenshot showing all tests passing

Additional Notes

  • No dependency or lockfile changes.
  • No schema change: the notifications and notification_preferences tables already exist; this PR only reads/writes them.
  • src/modules/users/users.module.ts gains a one-line forwardRef(() => NotificationsModule) to resolve the Users <-> Notifications module cycle (the listener needs UserModelAction to resolve a recipient email from the event userId).

Summary by CodeRabbit

  • New Features

    • Four new email notifications: funnel ready, stage unlocked, stage completed, and weekly digest with personalized content.
    • Automated weekly digest scheduling and processing to email opted-in users with task progress and active stage info.
    • Notification listeners create in-app notifications and conditionally send emails based on user preferences and milestone thresholds.
  • Tests

    • Added unit tests covering templates, listener behavior, and weekly-digest processing (including error handling and gating).

Review Change Stack

…spatch

Subscribe to APP_EVENTS and react with preference-gated in-app
notifications and email jobs:

- stage.completed, stage.unlocked and funnel.generated create in-app
  notifications and queue the matching email, each gated on
  notification_preferences (system funnel.generated in-app is ungated)
- 50% funnel progress milestone, idempotent per funnel via an EXISTS check
- weekly digest recurring job (Mon 09:00) registered on module init, with a
  per-user digest processor on the email queue
- new email types funnel-ready, stage-unlocked, stage-completed, weekly-digest
  plus their Handlebars templates
- every handler is fire-and-forget: failures are logged and never affect the
  request that emitted the event
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 30, 2026

📝 Walkthrough

Walkthrough

Adds weekly-digest job type and payloads, new EmailService methods and Handlebars templates, task-progress aggregation queries, notification preference and idempotency checks, a NotificationListener for events, a recurring WeeklyDigest Bull processor with batching and error isolation, module wiring (OnModuleInit/forwardRef), and comprehensive tests.

Changes

Weekly Digest Email and Notification System

Layer / File(s) Summary
Email job type definitions
src/common/constants/queue.constants.ts, src/email/interfaces/email-job.interface.ts
EmailType extended with funnel-ready, stage-unlocked, stage-completed, weekly-digest; new payload interfaces and discriminated EmailJob variants added; JOBS.WEEKLY_DIGEST constant registered.
Email service methods and template system
src/email/email.service.ts, src/email/template.service.ts, src/email/templates/*, src/email/tests/template.service.spec.ts
Four new EmailService dispatch methods (sendFunnelReady, sendStageUnlocked, sendStageCompleted, sendWeeklyDigest); TemplateService registers subjects/templates for new types; four Handlebars templates added and template render tests extended.
Task progress and notification queries
src/modules/funnels/actions/stage-task.action.ts, src/modules/notifications/actions/notification-preference.action.ts, src/modules/notifications/actions/notification.action.ts
Introduces FunnelTaskProgress; StageTaskModelAction adds getFunnelTaskProgress() and getUserTaskProgress() using TypeORM aggregates; NotificationPreferenceModelAction.findWeeklyDigestRecipients() and NotificationModelAction.existsForFunnelType() added.
Event listener with notification dispatch
src/modules/notifications/listeners/notification.listener.ts, src/modules/notifications/tests/notification.listener.spec.ts
NotificationListener handlers for stage-completed/unlocked, funnel-generated, and task-completed create in-app notifications, conditionally send emails per user preferences, track 50% funnel-progress milestone with idempotency, and swallow handler errors. Tests cover gating, payloads, milestone logic, and error logging.
Weekly digest processor and scheduler
src/modules/notifications/processors/weekly-digest.processor.ts, src/modules/notifications/tests/weekly-digest.processor.spec.ts, src/modules/notifications/notifications.module.ts
WeeklyDigestProcessor processes recurring JOBS.WEEKLY_DIGEST, pages opted-in recipients, sends per-user digests with bounded concurrency, skips users without email, and logs progress; NotificationsModule implements OnModuleInit to register an idempotent weekly cron job and register providers.
Module wiring / circular dependency
src/modules/users/users.module.ts, src/modules/notifications/notifications.module.ts
UsersModule imports NotificationsModule via forwardRef(); NotificationsModule imports Email/Funnels/Users modules, registers providers, injects email queue, and schedules recurring job in onModuleInit.

Sequence Diagram(s)

sequenceDiagram
  participant AppEvent as App Event
  participant NotificationListener as Notification Listener
  participant NotificationAction as Notification Action
  participant StageTask as StageTaskModelAction
  participant EmailService as Email Service
  participant BullQueue as Bull Queue
  AppEvent->>NotificationListener: event (stage/funnel/task)
  NotificationListener->>NotificationAction: fetch prefs & createNotification()
  NotificationListener->>StageTask: getFunnelTaskProgress()/getUserTaskProgress()
  alt prefs.email_* true
    NotificationListener->>EmailService: send...()
    EmailService->>BullQueue: dispatch job
  end
Loading
sequenceDiagram
  participant BullScheduler as Bull Scheduler
  participant WeeklyProcessor as Weekly Digest Processor
  participant PreferenceAction as Preference Action
  participant TaskProgress as StageTaskModelAction
  participant EmailService as Email Service
  BullScheduler->>WeeklyProcessor: handleWeeklyDigest(job)
  WeeklyProcessor->>PreferenceAction: findWeeklyDigestRecipients()
  loop For each opted-in recipient
    WeeklyProcessor->>TaskProgress: getUserTaskProgress(userId)
    TaskProgress->>WeeklyProcessor: task counts
    WeeklyProcessor->>EmailService: sendWeeklyDigest(email, counts)
    EmailService->>BullScheduler: enqueued
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested reviewers

  • JohnUghiovhe
  • sage-ali
  • elijaharhinful
  • Nuel-09
  • akinwalexander
🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly summarizes the main change: adding a NotificationListener for managing in-app notifications and email dispatch based on app events, with specific reference to the feature ticket M4-BE-009.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/M4-BE-009-notification-listener

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/email/interfaces/email-job.interface.ts`:
- Around line 57-64: EmailPayload currently omits
ContactAdminNotificationPayload from its union; update the EmailPayload type to
include ContactAdminNotificationPayload (e.g., add "|
ContactAdminNotificationPayload" to the union) and ensure
ContactAdminNotificationPayload is imported/declared where EmailPayload is
defined so the union is complete and consistent with EmailJob.

In `@src/modules/notifications/actions/notification.action.ts`:
- Around line 42-48: The existsForFunnelType pre-check in notification.action.ts
is race-prone; enforce idempotency at write-time instead of relying on
getExists()—add a DB-level unique constraint covering (user_id, type, funnelId)
(store funnelId as a separate column or create an expression/index on metadata
->> 'funnelId') and change the create flow to use a conflict-safe insert/upsert
(e.g., INSERT ... ON CONFLICT DO NOTHING or the ORM's upsert/returning
mechanism) when creating notifications; keep existsForFunnelType (the
createQueryBuilder call) only as an optional fast-path and ensure callers handle
the insert as authoritative for uniqueness.

In `@src/modules/notifications/processors/weekly-digest.processor.ts`:
- Around line 23-59: handleWeeklyDigest currently runs the full digest fan-out
inline on the shared EMAIL queue (bound to QUEUES.EMAIL), causing long-running
monopolization due to sequential awaits of findWeeklyDigestRecipients,
getUserTaskProgress and sendWeeklyDigest; refactor so handleWeeklyDigest only
discovers recipients via preferenceAction.findWeeklyDigestRecipients and
enqueues a lightweight per-user job (e.g., JOBS.WEEKLY_DIGEST_USER or similar)
for each pref instead of calling taskAction.getUserTaskProgress/sendWeeklyDigest
inline, or move the entire processor to a dedicated queue/worker; ensure any new
per-user processor consumes the per-user job and calls getUserTaskProgress and
emailService.sendWeeklyDigest, and verify EMAIL queue concurrency if you keep
this on the shared queue.
- Around line 37-46: The weekly digest currently passes activeStageName: null to
emailService.sendWeeklyDigest, causing templates that render the active stage to
show nothing; update the call in weekly-digest.processor.ts to compute and pass
the user's current active stage name (e.g., derive from user.active_stage,
user.currentStageName, or resolve via the funnel/stage service used elsewhere)
and supply that string instead of null so templates receive a meaningful value
via the activeStageName parameter.

In `@src/modules/notifications/tests/notification.listener.spec.ts`:
- Around line 180-188: The test currently sets loggerSpy via
jest.spyOn(Logger.prototype, 'error') and restores it after assertions, but
restoration can be skipped if an assertion throws; update the test for the
onFunnelGenerated case to ensure loggerSpy.mockRestore() always runs by moving
the restore into a finally block: create loggerSpy before calling
listener.onFunnelGenerated, run the await expect(...) and subsequent expects
inside try, and call loggerSpy.mockRestore() inside finally so the spy is always
restored even on failure (references: Logger.prototype.error,
listener.onFunnelGenerated, loggerSpy.mockRestore).
- Around line 141-147: The test currently toggles a single preference
(prefs.inapp_stage_unlocked) and exercises two handlers
(listener.onStageCompleted and listener.onStageUnlocked), which can hide wiring
bugs; split into two tests such that each test sets the specific in-app
preference for that event to false (e.g., prefs.inapp_stage_completed = false
for the onStageCompleted test and prefs.inapp_stage_unlocked = false for the
onStageUnlocked test), call only the corresponding handler
(listener.onStageCompleted(...) or listener.onStageUnlocked(...)) and assert
notificationsService.createNotification was not called for that single event.

In `@src/modules/notifications/tests/weekly-digest.processor.spec.ts`:
- Around line 60-80: The test currently mocks Logger.prototype.error with
loggerSpy inside the "does not let one user's failure stop the rest of the
batch" test but restores it only at the end of the test, which can leak if an
assertion fails; update the test to guarantee cleanup by moving the mock restore
into a shared afterEach hook (restore Logger.prototype.error there) or wrap the
test body in a try/finally and call loggerSpy.mockRestore() in finally; ensure
you still reference Logger.prototype.error and the existing loggerSpy variable
and keep the rest of the assertions involving processor.handleWeeklyDigest and
emailService.sendWeeklyDigest unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 1fbb5e58-ee75-4dbd-a3ee-3d5b94a5024b

📥 Commits

Reviewing files that changed from the base of the PR and between a25782b and 9e79d95.

📒 Files selected for processing (18)
  • src/common/constants/queue.constants.ts
  • src/email/email.service.ts
  • src/email/interfaces/email-job.interface.ts
  • src/email/template.service.ts
  • src/email/templates/funnel-ready.hbs
  • src/email/templates/stage-completed.hbs
  • src/email/templates/stage-unlocked.hbs
  • src/email/templates/weekly-digest.hbs
  • src/email/tests/template.service.spec.ts
  • src/modules/funnels/actions/stage-task.action.ts
  • src/modules/notifications/actions/notification-preference.action.ts
  • src/modules/notifications/actions/notification.action.ts
  • src/modules/notifications/listeners/notification.listener.ts
  • src/modules/notifications/notifications.module.ts
  • src/modules/notifications/processors/weekly-digest.processor.ts
  • src/modules/notifications/tests/notification.listener.spec.ts
  • src/modules/notifications/tests/weekly-digest.processor.spec.ts
  • src/modules/users/users.module.ts

Comment thread src/email/interfaces/email-job.interface.ts
Comment thread src/modules/notifications/actions/notification.action.ts
Comment thread src/modules/notifications/processors/weekly-digest.processor.ts
Comment thread src/modules/notifications/processors/weekly-digest.processor.ts Outdated
Comment thread src/modules/notifications/tests/notification.listener.spec.ts Outdated
Comment thread src/modules/notifications/tests/notification.listener.spec.ts
Comment thread src/modules/notifications/tests/weekly-digest.processor.spec.ts
Nuel-09
Nuel-09 previously approved these changes May 30, 2026
Comment thread src/modules/notifications/listeners/notification.listener.ts
akinwalexander
akinwalexander previously approved these changes May 30, 2026
Comment thread src/modules/notifications/listeners/notification.listener.ts
Comment thread src/modules/notifications/listeners/notification.listener.ts Outdated
Comment thread src/modules/notifications/processors/weekly-digest.processor.ts
- gate the 50% funnel_progress milestone on inapp_task_completed; it was
  never consulted, leaving that preference column with no effect
- document that inapp_stage_unlocked is the shared "stage events" in-app
  toggle for both stage.completed and stage.unlocked (the preferences table
  has no inapp_stage_completed column, per FR-2)
- use generic "Your funnel is ready" copy for funnel.generated, which fires
  for first-time generation as well as regeneration
- paginate weekly digest recipients and dispatch each page with bounded
  concurrency (p-limit) instead of one unbounded query and a sequential loop
- add a test for the inapp_task_completed gate and clarify the shared-key test
@ibraheembello ibraheembello dismissed stale reviews from akinwalexander and Nuel-09 via 8366a86 May 30, 2026 10:40
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/modules/notifications/listeners/notification.listener.ts (1)

125-140: 🧹 Nitpick | 🔵 Trivial

Idempotency via existsForFunnelTypecreateNotification is not atomic.

If two task.completed events for the same funnel are handled concurrently, both can pass the existsForFunnelType check before either inserts, producing duplicate funnel_progress notifications. The check-then-insert window is a TOCTOU race. For tasks completed in rapid succession this is plausible.

Since a hard guarantee needs a DB-level constraint, consider a partial unique index on (user_id, funnel_id) for type = 'funnel_progress' (or an ON CONFLICT DO NOTHING insert) so duplicates are rejected even under concurrency; the EXISTS check then remains a cheap fast-path.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/modules/notifications/listeners/notification.listener.ts` around lines
125 - 140, The current exists-then-insert flow around
notificationAction.existsForFunnelType and
notificationsService.createNotification (for FUNNEL_PROGRESS_TYPE) is vulnerable
to a TOCTOU race under concurrency; make the uniqueness enforced at the DB layer
and make the application insert tolerant of conflicts: add a DB-level uniqueness
constraint (e.g., partial unique index on (user_id, funnel_id) where type =
'funnel_progress') OR change the implementation of
notificationsService.createNotification to perform an upsert/INSERT ... ON
CONFLICT DO NOTHING (so concurrent inserts are deduplicated), while keeping
notificationAction.existsForFunnelType as a cheap fast-path check before
attempting the insert.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@src/modules/notifications/listeners/notification.listener.ts`:
- Around line 125-140: The current exists-then-insert flow around
notificationAction.existsForFunnelType and
notificationsService.createNotification (for FUNNEL_PROGRESS_TYPE) is vulnerable
to a TOCTOU race under concurrency; make the uniqueness enforced at the DB layer
and make the application insert tolerant of conflicts: add a DB-level uniqueness
constraint (e.g., partial unique index on (user_id, funnel_id) where type =
'funnel_progress') OR change the implementation of
notificationsService.createNotification to perform an upsert/INSERT ... ON
CONFLICT DO NOTHING (so concurrent inserts are deduplicated), while keeping
notificationAction.existsForFunnelType as a cheap fast-path check before
attempting the insert.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: ce21fd33-de14-4914-9b51-5de851df3962

📥 Commits

Reviewing files that changed from the base of the PR and between 9e79d95 and 8366a86.

📒 Files selected for processing (4)
  • src/modules/notifications/actions/notification-preference.action.ts
  • src/modules/notifications/listeners/notification.listener.ts
  • src/modules/notifications/processors/weekly-digest.processor.ts
  • src/modules/notifications/tests/notification.listener.spec.ts

@ibraheembello
Copy link
Copy Markdown
Contributor Author

Thanks for the review @sage-ali. All four points are addressed in 8366a86:

  • inapp_task_completed now gates the 50% milestone (was previously ignored), with a suppression test added.
  • funnel.generated copy switched to the generic funnel_ready / "Your funnel is ready" so it reads correctly for first-time generation as well as regeneration; AC-03 test updated.
  • Documented that inapp_stage_unlocked is the shared "stage events" toggle for both stage.completed and stage.unlocked (the M4-BE-004 schema has no inapp_stage_completed column); AC-05 test clarified. Renaming the column to inapp_stage_events would touch the M4-BE-004 schema/DTO/migration, so I left it as a possible follow-up.
  • Weekly digest no longer loads all recipients at once: findWeeklyDigestRecipients is paginated and each page is dispatched with p-limit (concurrency 10) via Promise.allSettled, with per-user failures caught and logged.

Build, lint and the listener/processor suites are green, and the app boots cleanly with the digest job still registered. One heads-up for the frontend: the in-app notification type changed from funnel_regenerated to funnel_ready.

@akinwalexander akinwalexander merged commit 915c8ed into dev May 30, 2026
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants