feat(notifications): NotificationListener for in-app notifications and email dispatch (M4-BE-009)#139
Conversation
…spatch Subscribe to APP_EVENTS and react with preference-gated in-app notifications and email jobs: - stage.completed, stage.unlocked and funnel.generated create in-app notifications and queue the matching email, each gated on notification_preferences (system funnel.generated in-app is ungated) - 50% funnel progress milestone, idempotent per funnel via an EXISTS check - weekly digest recurring job (Mon 09:00) registered on module init, with a per-user digest processor on the email queue - new email types funnel-ready, stage-unlocked, stage-completed, weekly-digest plus their Handlebars templates - every handler is fire-and-forget: failures are logged and never affect the request that emitted the event
📝 WalkthroughWalkthroughAdds weekly-digest job type and payloads, new EmailService methods and Handlebars templates, task-progress aggregation queries, notification preference and idempotency checks, a NotificationListener for events, a recurring WeeklyDigest Bull processor with batching and error isolation, module wiring (OnModuleInit/forwardRef), and comprehensive tests. ChangesWeekly Digest Email and Notification System
Sequence Diagram(s)sequenceDiagram
participant AppEvent as App Event
participant NotificationListener as Notification Listener
participant NotificationAction as Notification Action
participant StageTask as StageTaskModelAction
participant EmailService as Email Service
participant BullQueue as Bull Queue
AppEvent->>NotificationListener: event (stage/funnel/task)
NotificationListener->>NotificationAction: fetch prefs & createNotification()
NotificationListener->>StageTask: getFunnelTaskProgress()/getUserTaskProgress()
alt prefs.email_* true
NotificationListener->>EmailService: send...()
EmailService->>BullQueue: dispatch job
end
sequenceDiagram
participant BullScheduler as Bull Scheduler
participant WeeklyProcessor as Weekly Digest Processor
participant PreferenceAction as Preference Action
participant TaskProgress as StageTaskModelAction
participant EmailService as Email Service
BullScheduler->>WeeklyProcessor: handleWeeklyDigest(job)
WeeklyProcessor->>PreferenceAction: findWeeklyDigestRecipients()
loop For each opted-in recipient
WeeklyProcessor->>TaskProgress: getUserTaskProgress(userId)
TaskProgress->>WeeklyProcessor: task counts
WeeklyProcessor->>EmailService: sendWeeklyDigest(email, counts)
EmailService->>BullScheduler: enqueued
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/email/interfaces/email-job.interface.ts`:
- Around line 57-64: EmailPayload currently omits
ContactAdminNotificationPayload from its union; update the EmailPayload type to
include ContactAdminNotificationPayload (e.g., add "|
ContactAdminNotificationPayload" to the union) and ensure
ContactAdminNotificationPayload is imported/declared where EmailPayload is
defined so the union is complete and consistent with EmailJob.
In `@src/modules/notifications/actions/notification.action.ts`:
- Around line 42-48: The existsForFunnelType pre-check in notification.action.ts
is race-prone; enforce idempotency at write-time instead of relying on
getExists()—add a DB-level unique constraint covering (user_id, type, funnelId)
(store funnelId as a separate column or create an expression/index on metadata
->> 'funnelId') and change the create flow to use a conflict-safe insert/upsert
(e.g., INSERT ... ON CONFLICT DO NOTHING or the ORM's upsert/returning
mechanism) when creating notifications; keep existsForFunnelType (the
createQueryBuilder call) only as an optional fast-path and ensure callers handle
the insert as authoritative for uniqueness.
In `@src/modules/notifications/processors/weekly-digest.processor.ts`:
- Around line 23-59: handleWeeklyDigest currently runs the full digest fan-out
inline on the shared EMAIL queue (bound to QUEUES.EMAIL), causing long-running
monopolization due to sequential awaits of findWeeklyDigestRecipients,
getUserTaskProgress and sendWeeklyDigest; refactor so handleWeeklyDigest only
discovers recipients via preferenceAction.findWeeklyDigestRecipients and
enqueues a lightweight per-user job (e.g., JOBS.WEEKLY_DIGEST_USER or similar)
for each pref instead of calling taskAction.getUserTaskProgress/sendWeeklyDigest
inline, or move the entire processor to a dedicated queue/worker; ensure any new
per-user processor consumes the per-user job and calls getUserTaskProgress and
emailService.sendWeeklyDigest, and verify EMAIL queue concurrency if you keep
this on the shared queue.
- Around line 37-46: The weekly digest currently passes activeStageName: null to
emailService.sendWeeklyDigest, causing templates that render the active stage to
show nothing; update the call in weekly-digest.processor.ts to compute and pass
the user's current active stage name (e.g., derive from user.active_stage,
user.currentStageName, or resolve via the funnel/stage service used elsewhere)
and supply that string instead of null so templates receive a meaningful value
via the activeStageName parameter.
In `@src/modules/notifications/tests/notification.listener.spec.ts`:
- Around line 180-188: The test currently sets loggerSpy via
jest.spyOn(Logger.prototype, 'error') and restores it after assertions, but
restoration can be skipped if an assertion throws; update the test for the
onFunnelGenerated case to ensure loggerSpy.mockRestore() always runs by moving
the restore into a finally block: create loggerSpy before calling
listener.onFunnelGenerated, run the await expect(...) and subsequent expects
inside try, and call loggerSpy.mockRestore() inside finally so the spy is always
restored even on failure (references: Logger.prototype.error,
listener.onFunnelGenerated, loggerSpy.mockRestore).
- Around line 141-147: The test currently toggles a single preference
(prefs.inapp_stage_unlocked) and exercises two handlers
(listener.onStageCompleted and listener.onStageUnlocked), which can hide wiring
bugs; split into two tests such that each test sets the specific in-app
preference for that event to false (e.g., prefs.inapp_stage_completed = false
for the onStageCompleted test and prefs.inapp_stage_unlocked = false for the
onStageUnlocked test), call only the corresponding handler
(listener.onStageCompleted(...) or listener.onStageUnlocked(...)) and assert
notificationsService.createNotification was not called for that single event.
In `@src/modules/notifications/tests/weekly-digest.processor.spec.ts`:
- Around line 60-80: The test currently mocks Logger.prototype.error with
loggerSpy inside the "does not let one user's failure stop the rest of the
batch" test but restores it only at the end of the test, which can leak if an
assertion fails; update the test to guarantee cleanup by moving the mock restore
into a shared afterEach hook (restore Logger.prototype.error there) or wrap the
test body in a try/finally and call loggerSpy.mockRestore() in finally; ensure
you still reference Logger.prototype.error and the existing loggerSpy variable
and keep the rest of the assertions involving processor.handleWeeklyDigest and
emailService.sendWeeklyDigest unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 1fbb5e58-ee75-4dbd-a3ee-3d5b94a5024b
📒 Files selected for processing (18)
src/common/constants/queue.constants.tssrc/email/email.service.tssrc/email/interfaces/email-job.interface.tssrc/email/template.service.tssrc/email/templates/funnel-ready.hbssrc/email/templates/stage-completed.hbssrc/email/templates/stage-unlocked.hbssrc/email/templates/weekly-digest.hbssrc/email/tests/template.service.spec.tssrc/modules/funnels/actions/stage-task.action.tssrc/modules/notifications/actions/notification-preference.action.tssrc/modules/notifications/actions/notification.action.tssrc/modules/notifications/listeners/notification.listener.tssrc/modules/notifications/notifications.module.tssrc/modules/notifications/processors/weekly-digest.processor.tssrc/modules/notifications/tests/notification.listener.spec.tssrc/modules/notifications/tests/weekly-digest.processor.spec.tssrc/modules/users/users.module.ts
- gate the 50% funnel_progress milestone on inapp_task_completed; it was never consulted, leaving that preference column with no effect - document that inapp_stage_unlocked is the shared "stage events" in-app toggle for both stage.completed and stage.unlocked (the preferences table has no inapp_stage_completed column, per FR-2) - use generic "Your funnel is ready" copy for funnel.generated, which fires for first-time generation as well as regeneration - paginate weekly digest recipients and dispatch each page with bounded concurrency (p-limit) instead of one unbounded query and a sequential loop - add a test for the inapp_task_completed gate and clarify the shared-key test
8366a86
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/modules/notifications/listeners/notification.listener.ts (1)
125-140: 🧹 Nitpick | 🔵 TrivialIdempotency via
existsForFunnelType→createNotificationis not atomic.If two
task.completedevents for the same funnel are handled concurrently, both can pass theexistsForFunnelTypecheck before either inserts, producing duplicatefunnel_progressnotifications. The check-then-insert window is a TOCTOU race. For tasks completed in rapid succession this is plausible.Since a hard guarantee needs a DB-level constraint, consider a partial unique index on
(user_id, funnel_id)fortype = 'funnel_progress'(or anON CONFLICT DO NOTHINGinsert) so duplicates are rejected even under concurrency; the EXISTS check then remains a cheap fast-path.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/modules/notifications/listeners/notification.listener.ts` around lines 125 - 140, The current exists-then-insert flow around notificationAction.existsForFunnelType and notificationsService.createNotification (for FUNNEL_PROGRESS_TYPE) is vulnerable to a TOCTOU race under concurrency; make the uniqueness enforced at the DB layer and make the application insert tolerant of conflicts: add a DB-level uniqueness constraint (e.g., partial unique index on (user_id, funnel_id) where type = 'funnel_progress') OR change the implementation of notificationsService.createNotification to perform an upsert/INSERT ... ON CONFLICT DO NOTHING (so concurrent inserts are deduplicated), while keeping notificationAction.existsForFunnelType as a cheap fast-path check before attempting the insert.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@src/modules/notifications/listeners/notification.listener.ts`:
- Around line 125-140: The current exists-then-insert flow around
notificationAction.existsForFunnelType and
notificationsService.createNotification (for FUNNEL_PROGRESS_TYPE) is vulnerable
to a TOCTOU race under concurrency; make the uniqueness enforced at the DB layer
and make the application insert tolerant of conflicts: add a DB-level uniqueness
constraint (e.g., partial unique index on (user_id, funnel_id) where type =
'funnel_progress') OR change the implementation of
notificationsService.createNotification to perform an upsert/INSERT ... ON
CONFLICT DO NOTHING (so concurrent inserts are deduplicated), while keeping
notificationAction.existsForFunnelType as a cheap fast-path check before
attempting the insert.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: ce21fd33-de14-4914-9b51-5de851df3962
📒 Files selected for processing (4)
src/modules/notifications/actions/notification-preference.action.tssrc/modules/notifications/listeners/notification.listener.tssrc/modules/notifications/processors/weekly-digest.processor.tssrc/modules/notifications/tests/notification.listener.spec.ts
|
Thanks for the review @sage-ali. All four points are addressed in 8366a86:
Build, lint and the listener/processor suites are green, and the app boots cleanly with the digest job still registered. One heads-up for the frontend: the in-app notification type changed from funnel_regenerated to funnel_ready. |
Pull Request
Description
Implements M4-BE-009 - NotificationListener: system-generated in-app notifications and email dispatch.
A new
NotificationListenersubscribes toAPP_EVENTSand reacts by creating in-app notifications (viaNotificationsService.createNotification()) and queueing emails, each gated independently on the user'snotification_preferences:stage.completed->stage_completedin-app +stage-completedemailstage.unlocked->stage_unlockedin-app +stage-unlockedemailfunnel.generated->funnel_regeneratedin-app (system event, ungated) +funnel-readyemailtask.completed, idempotent per funnel via an EXISTS check0 9 * * MON, jobIdweekly-digest-recurring) registered on module init, with a per-user digest processor on the email queueAll handlers are fire-and-forget: any failure is logged and never affects the HTTP response of the service that emitted the event. Email dispatch resolves the recipient from the trusted event
userId(never from an external caller).Notification titles and bodies match the Figma copy exactly.
Four new email types (
funnel-ready,stage-unlocked,stage-completed,weekly-digest) with their Handlebars templates were added.Related Issue
M4-BE-009
Type of Change
How Has This Been Tested?
Verified end to end against live Postgres + Redis by driving the real HTTP endpoints (complete tasks -> complete stage) and reading
GET /api/notifications:funnel_progress,stage_completed,stage_unlockedwith exact Figma copyfunnel_progressrowstage_completedandstage_unlockedcreatedstage-unlockedemail queued;stage-completedsuppressed (pref default false)Weekly digest recurring job registeredfunnel_regenerated(AC-03) and in-app suppression wheninapp_*is false (AC-05) are covered by the unit suite.Test Evidence
GET /api/notificationsfeed (3 system notifications)stage-unlockedqueued, nostage-completed)notification.listener,weekly-digest.processor,template.service)Screenshots to be attached.
Checklist
Additional Notes
notificationsandnotification_preferencestables already exist; this PR only reads/writes them.src/modules/users/users.module.tsgains a one-lineforwardRef(() => NotificationsModule)to resolve the Users <-> Notifications module cycle (the listener needsUserModelActionto resolve a recipient email from the eventuserId).Summary by CodeRabbit
New Features
Tests