File tree Expand file tree Collapse file tree
apps/cron_service/src/jobs
libs/common_services/src/services/emitters Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -99,7 +99,10 @@ const job: IJobDefinition = {
9999 const emitter = new IntegrationStreamWorkerEmitter ( queueService , ctx . log )
100100 await emitter . init ( )
101101
102- await emitter . triggerWebhookProcessingBatch ( webhooks . map ( ( w ) => w . id ) )
102+ await emitter . triggerWebhookProcessingBatch (
103+ webhooks . map ( ( w ) => w . id ) ,
104+ true ,
105+ )
103106
104107 ctx . log . info ( `Re-triggered ${ webhooks . length } stuck pending webhooks in total!` )
105108 } ,
Original file line number Diff line number Diff line change @@ -60,14 +60,23 @@ export class IntegrationStreamWorkerEmitter extends QueuePriorityService {
6060 )
6161 }
6262
63- public async triggerWebhookProcessingBatch ( webhookIds : string [ ] ) : Promise < void > {
63+ // Sends in parallel batches of 10. Uses concurrent sendMessage calls (not sendMessages)
64+ // so the priority context (onboarding flag) is respected — sendMessages() has no way to
65+ // pass a priority override and always routes to NORMAL.
66+ public async triggerWebhookProcessingBatch (
67+ webhookIds : string [ ] ,
68+ onboarding : boolean ,
69+ ) : Promise < void > {
6470 for ( const batch of partition ( webhookIds , 10 ) ) {
65- await this . sendMessages (
66- batch . map ( ( webhookId ) => ( {
67- payload : new ProcessWebhookStreamQueueMessage ( webhookId ) ,
68- groupId : generateUUIDv1 ( ) ,
69- deduplicationId : webhookId ,
70- } ) ) ,
71+ await Promise . all (
72+ batch . map ( ( webhookId ) =>
73+ this . sendMessage (
74+ generateUUIDv1 ( ) ,
75+ new ProcessWebhookStreamQueueMessage ( webhookId ) ,
76+ webhookId ,
77+ { onboarding } ,
78+ ) ,
79+ ) ,
7180 )
7281 }
7382 }
You can’t perform that action at this time.
0 commit comments