Skip to content

Commit 1589183

Browse files
committed
fix: monitoring for results, incomming webhooks handling improved, fixed a few bugs in dsw
Signed-off-by: Uroš Marolt <uros@marolt.me>
1 parent 20f8fe4 commit 1589183

9 files changed

Lines changed: 323 additions & 34 deletions

File tree

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import CronTime from 'cron-time-generator'
2+
3+
import { IS_PROD_ENV } from '@crowd/common'
4+
import { IntegrationStreamWorkerEmitter } from '@crowd/common_services'
5+
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
6+
import { Logger } from '@crowd/logging'
7+
import { KafkaAdmin, QUEUE_CONFIG, getKafkaClient } from '@crowd/queue'
8+
import { KafkaQueueService } from '@crowd/queue/src/vendors/kafka/client'
9+
import { WebhookState } from '@crowd/types'
10+
11+
import { IJobDefinition } from '../types'
12+
13+
const TOPIC = 'integration-stream-worker-high-production'
14+
const GROUP_ID = 'integration-stream-worker-high-production'
15+
const MAX_UNCONSUMED = 50000
16+
17+
const job: IJobDefinition = {
18+
name: 'incoming-webhooks-check',
19+
cronTime: CronTime.everyDay(),
20+
timeout: 30 * 60, // 30 minutes
21+
enabled: async () => IS_PROD_ENV,
22+
process: async (ctx) => {
23+
const kafkaClient = getKafkaClient(QUEUE_CONFIG())
24+
const admin = kafkaClient.admin()
25+
await admin.connect()
26+
27+
const counts = await getMessageCounts(ctx.log, admin, TOPIC, GROUP_ID)
28+
29+
if (counts.unconsumed >= MAX_UNCONSUMED) {
30+
ctx.log.info(
31+
`Integration stream worker queue has ${counts.unconsumed} unconsumed messages, skipping!`,
32+
)
33+
return
34+
}
35+
36+
const dbConnection = await getDbConnection(WRITE_DB_CONFIG())
37+
38+
const count = (
39+
await dbConnection.one(
40+
`select count(*) as count from "incomingWebhooks" where state = $(state) and "createdAt" < now() - interval '1 day'`,
41+
{ state: WebhookState.PENDING },
42+
)
43+
).count
44+
45+
if (count <= counts.unconsumed) {
46+
ctx.log.info(`All ${count} stuck pending webhooks are already in the queue, skipping!`)
47+
return
48+
}
49+
50+
const webhooks = await dbConnection.any<{ id: string; platform: string }>(
51+
`
52+
select iw.id, i.platform
53+
from "incomingWebhooks" iw
54+
join integrations i on iw."integrationId" = i.id
55+
where iw.state = $(state)
56+
and iw."createdAt" < now() - interval '1 day'
57+
order by iw."createdAt" asc
58+
limit 10000
59+
`,
60+
{ state: WebhookState.PENDING },
61+
)
62+
63+
if (webhooks.length === 0) {
64+
ctx.log.info('No stuck pending webhooks found!')
65+
return
66+
}
67+
68+
ctx.log.info(`Found ${webhooks.length} stuck pending webhooks, re-triggering!`)
69+
70+
const queueService = new KafkaQueueService(kafkaClient, ctx.log)
71+
const emitter = new IntegrationStreamWorkerEmitter(queueService, ctx.log)
72+
await emitter.init()
73+
74+
let triggered = 0
75+
for (const webhook of webhooks) {
76+
await emitter.triggerWebhookProcessing(webhook.platform, webhook.id)
77+
triggered++
78+
79+
if (triggered % 100 === 0) {
80+
ctx.log.info(`Re-triggered ${triggered} webhooks!`)
81+
}
82+
}
83+
84+
ctx.log.info(`Re-triggered ${triggered} stuck pending webhooks in total!`)
85+
},
86+
}
87+
88+
async function getMessageCounts(
89+
log: Logger,
90+
admin: KafkaAdmin,
91+
topic: string,
92+
groupId: string,
93+
): Promise<{ total: number; consumed: number; unconsumed: number }> {
94+
try {
95+
const topicOffsets = await admin.fetchTopicOffsets(topic)
96+
const offsetsResponse = await admin.fetchOffsets({ groupId, topics: [topic] })
97+
const offsets = offsetsResponse[0].partitions
98+
99+
let totalMessages = 0
100+
let consumedMessages = 0
101+
let totalLeft = 0
102+
103+
for (const offset of offsets) {
104+
const topicOffset = topicOffsets.find((p) => p.partition === offset.partition)
105+
if (topicOffset) {
106+
totalMessages += Number(topicOffset.offset)
107+
consumedMessages += Number(offset.offset)
108+
totalLeft += Number(topicOffset.offset) - Number(offset.offset)
109+
}
110+
}
111+
112+
return { total: totalMessages, consumed: consumedMessages, unconsumed: totalLeft }
113+
} catch (err) {
114+
log.error(err, 'Failed to get message count!')
115+
throw err
116+
}
117+
}
118+
119+
export default job

services/apps/cron_service/src/jobs/integrationResultsCheck.job.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import CronTime from 'cron-time-generator'
22

3-
import { generateUUIDv1, partition } from '@crowd/common'
3+
import { IS_PROD_ENV, generateUUIDv1, partition } from '@crowd/common'
44
import { DataSinkWorkerEmitter } from '@crowd/common_services'
55
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
66
import { Logger } from '@crowd/logging'
@@ -14,6 +14,7 @@ const job: IJobDefinition = {
1414
name: 'integration-results-check',
1515
cronTime: CronTime.every(10).minutes(),
1616
timeout: 30 * 60, // 30 minutes
17+
enabled: async () => IS_PROD_ENV,
1718
process: async (ctx) => {
1819
const topic = 'data-sink-worker-normal-production'
1920
const groupId = 'data-sink-worker-normal-production'
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
import CronTime from 'cron-time-generator'
2+
3+
import { IS_DEV_ENV } from '@crowd/common'
4+
import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
5+
import {
6+
SlackChannel,
7+
SlackMessageSection,
8+
SlackPersona,
9+
sendSlackNotificationAsync,
10+
} from '@crowd/slack'
11+
import { IntegrationResultState } from '@crowd/types'
12+
13+
import { IJobDefinition } from '../types'
14+
15+
interface IResultStateCount {
16+
state: string
17+
count: number
18+
}
19+
20+
interface IErrorGroup {
21+
errorMessage: string
22+
location: string
23+
message: string
24+
count: number
25+
avgRetries: number
26+
maxRetries: number
27+
oldest: Date
28+
newest: Date
29+
platforms: string | null
30+
}
31+
32+
const job: IJobDefinition = {
33+
name: 'integration-results-reporting',
34+
cronTime: IS_DEV_ENV ? CronTime.everyMinute() : CronTime.everyDayAt(8, 30),
35+
timeout: 10 * 60, // 10 minutes
36+
process: async (ctx) => {
37+
ctx.log.info('Running integration-results-reporting job...')
38+
39+
const dbConnection = await getDbConnection(READ_DB_CONFIG(), 3, 0)
40+
41+
// Count results per state
42+
const stateCounts = await dbConnection.any<IResultStateCount>(
43+
`SELECT state, count(*)::int AS count FROM integration.results GROUP BY state ORDER BY count DESC`,
44+
)
45+
46+
const countByState: Record<string, number> = {}
47+
for (const row of stateCounts) {
48+
countByState[row.state] = row.count
49+
}
50+
51+
const pending = countByState[IntegrationResultState.PENDING] ?? 0
52+
const processing = countByState[IntegrationResultState.PROCESSING] ?? 0
53+
const processed = countByState[IntegrationResultState.PROCESSED] ?? 0
54+
const delayed = countByState[IntegrationResultState.DELAYED] ?? 0
55+
const errorCount = countByState[IntegrationResultState.ERROR] ?? 0
56+
const total = pending + processing + processed + delayed + errorCount
57+
58+
// How many delayed results are overdue (i.e. should already be processed)
59+
const overdueDelayed = (
60+
await dbConnection.one<{ count: number }>(
61+
`SELECT count(*)::int AS count FROM integration.results WHERE state = 'delayed' AND "delayedUntil" < now()`,
62+
)
63+
).count
64+
65+
// Break down errors by errorMessage + location, enriched with platform info
66+
const errorGroups = await dbConnection.any<IErrorGroup>(
67+
`
68+
SELECT
69+
COALESCE(r.error->>'errorMessage', '[no errorMessage]') AS "errorMessage",
70+
COALESCE(r.error->>'location', '[no location]') AS location,
71+
COALESCE(r.error->>'message', '[no message]') AS message,
72+
count(*)::int AS count,
73+
round(avg(r.retries), 1)::float AS "avgRetries",
74+
max(r.retries)::int AS "maxRetries",
75+
min(r."createdAt") AS oldest,
76+
max(r."updatedAt") AS newest,
77+
string_agg(DISTINCT i.platform, ', ' ORDER BY i.platform) AS platforms
78+
FROM integration.results r
79+
LEFT JOIN integrations i ON i.id = r."integrationId"
80+
WHERE r.state = 'error'
81+
GROUP BY
82+
r.error->>'errorMessage',
83+
r.error->>'location',
84+
r.error->>'message'
85+
ORDER BY count DESC
86+
LIMIT 20
87+
`,
88+
)
89+
90+
const sections: SlackMessageSection[] = []
91+
92+
sections.push({
93+
title: 'Integration Results Summary',
94+
text: [
95+
`*Total:* ${total.toLocaleString()}`,
96+
'',
97+
`⏳ Pending: *${pending.toLocaleString()}*`,
98+
`⚙️ Processing: *${processing.toLocaleString()}*`,
99+
`✅ Processed: *${processed.toLocaleString()}*`,
100+
`🕐 Delayed: *${delayed.toLocaleString()}*${overdueDelayed > 0 ? ` (${overdueDelayed.toLocaleString()} overdue)` : ''}`,
101+
`❌ Error: *${errorCount.toLocaleString()}*`,
102+
].join('\n'),
103+
})
104+
105+
if (errorCount > 0 && errorGroups.length > 0) {
106+
const lines: string[] = [
107+
`Top ${errorGroups.length} error group${errorGroups.length !== 1 ? 's' : ''} out of *${errorCount.toLocaleString()}* total errors:`,
108+
'',
109+
]
110+
111+
for (const group of errorGroups) {
112+
const oldestHoursAgo = Math.round(
113+
(Date.now() - new Date(group.oldest).getTime()) / 3_600_000,
114+
)
115+
const newestHoursAgo = Math.round(
116+
(Date.now() - new Date(group.newest).getTime()) / 3_600_000,
117+
)
118+
const ageLabel =
119+
oldestHoursAgo === newestHoursAgo
120+
? formatHoursAgo(oldestHoursAgo)
121+
: `${formatHoursAgo(newestHoursAgo)}${formatHoursAgo(oldestHoursAgo)}`
122+
123+
lines.push(
124+
`• *${group.count}x* \`${group.errorMessage}\``,
125+
` _Location:_ \`${group.location}\` | _retries avg/max:_ ${group.avgRetries}/${group.maxRetries}${group.platforms ? ` | _platforms:_ \`${group.platforms}\`` : ''}`,
126+
` _Age:_ ${ageLabel}`,
127+
` _Detail:_ ${group.message}`,
128+
'',
129+
)
130+
}
131+
132+
sections.push({
133+
title: `Error Breakdown (top ${errorGroups.length})`,
134+
text: lines.join('\n'),
135+
})
136+
}
137+
138+
const persona = errorCount > 0 ? SlackPersona.WARNING_PROPAGATOR : SlackPersona.INFO_NOTIFIER
139+
140+
await sendSlackNotificationAsync(
141+
SlackChannel.CDP_INTEGRATIONS_ALERTS,
142+
persona,
143+
'Integration Results Daily Report',
144+
sections,
145+
)
146+
147+
ctx.log.info(
148+
`Integration results report sent: pending=${pending}, delayed=${delayed} (${overdueDelayed} overdue), errors=${errorCount}`,
149+
)
150+
},
151+
}
152+
153+
function formatHoursAgo(hours: number): string {
154+
if (hours < 1) return 'just now'
155+
if (hours < 24) return `${hours}h ago`
156+
return `${Math.round(hours / 24)}d ago`
157+
}
158+
159+
export default job

services/apps/data_sink_worker/src/service/activity.service.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,13 +266,25 @@ export default class ActivityService extends LoggerBase {
266266
const results = new Map<string, { success: boolean; err?: Error }>()
267267

268268
for (const { resultId, activity, platform } of data) {
269+
// Guard against results whose data.data is missing or not an activity object.
270+
// Without the !activity check, accessing activity.username throws a TypeError that
271+
// propagates out of prepareMemberData and crashes the entire batch, marking all other
272+
// results in the batch with the same error even though they are valid.
273+
if (!activity) {
274+
this.log.error({ platform }, 'Activity data is missing.')
275+
results.set(resultId, {
276+
success: false,
277+
err: new UnrepeatableError('Activity data is missing.'),
278+
})
279+
continue
280+
}
281+
269282
if (!activity.username && !activity.member) {
270283
this.log.error({ platform, activity }, 'Activity does not have a username or member.')
271284
results.set(resultId, {
272285
success: false,
273286
err: new UnrepeatableError('Activity does not have a username or member.'),
274287
})
275-
276288
continue
277289
}
278290

services/apps/data_sink_worker/src/service/dataSink.service.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,14 @@ export default class DataSinkService extends LoggerBase {
9797
}`
9898
}
9999

100-
if (errorData.errorMessage.includes('uix_memberIdentities_platform_value_type_verified')) {
100+
// Identity conflict errors get a long random delay to let concurrent upserts settle.
101+
// Previously this called delayResult() unconditionally, bypassing maxStreamRetries and
102+
// allowing retries to grow without bound. Now we respect the retry limit so the row
103+
// eventually reaches ERROR state instead of cycling forever.
104+
if (
105+
errorData.errorMessage.includes('uix_memberIdentities_platform_value_type_verified') &&
106+
resultInfo.retries + 1 <= WORKER_SETTINGS().maxStreamRetries
107+
) {
101108
const delaySeconds = Math.floor(Math.random() * (120 - 10 + 1) + 10) * 60
102109
const until = addSeconds(new Date(), delaySeconds)
103110
this.log.warn(

services/apps/integration_stream_worker/src/service/integrationStreamService.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ export default class IntegrationStreamService extends LoggerBase {
374374
await integrationService.processWebhookStream(context)
375375
this.log.debug('Finished processing webhook stream!')
376376
await this.repo.deleteStream(streamId)
377-
await this.webhookRepo.markWebhookProcessed(webhookId)
377+
await this.webhookRepo.deleteWebhook(webhookId)
378378
return true
379379
} catch (err) {
380380
this.log.error(err, 'Error while processing webhook stream!')

services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/dataSink.repo.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,19 @@ export default class DataSinkRepository extends RepositoryBase<DataSinkRepositor
6262
`
6363
select r.id
6464
from integration.results r
65-
where (r.state = $(pendingState)
66-
or (r.state = $(delayedState) and r."delayedUntil" < now())
67-
or (r.state = $(errorState) and r.retries <= 5))
65+
where (r.state = $(pendingState)
66+
or (r.state = $(delayedState) and r."delayedUntil" < now()))
6867
${lastId !== undefined ? 'and r.id > $(lastId)' : ''}
6968
order by r.id
7069
limit ${limit};
7170
`,
71+
// ERROR rows are intentionally excluded: ERROR is a terminal state — a row reaches it
72+
// only after exhausting all retries or hitting an unrepeatable error. Re-queuing it
73+
// without resetting retries means it will immediately error again on the next attempt.
74+
// To retry a specific error row, reset it explicitly first (state = PENDING, retries = 0).
7275
{
7376
pendingState: IntegrationResultState.PENDING,
7477
delayedState: IntegrationResultState.DELAYED,
75-
errorState: IntegrationResultState.ERROR,
7678
lastId,
7779
},
7880
)
@@ -90,15 +92,14 @@ export default class DataSinkRepository extends RepositoryBase<DataSinkRepositor
9092
`
9193
select r.id
9294
from integration.results r
93-
where r.state = $(pendingState)
95+
where r.state = $(pendingState)
9496
or (r.state = $(delayedState) and r."delayedUntil" < now())
95-
or (r.state = $(errorState) and r.retries <= 5)
9697
limit ${limit};
9798
`,
99+
// ERROR rows are intentionally excluded — see getOldResultsToProcessForTenant for details.
98100
{
99101
pendingState: IntegrationResultState.PENDING,
100102
delayedState: IntegrationResultState.DELAYED,
101-
errorState: IntegrationResultState.ERROR,
102103
},
103104
)
104105

0 commit comments

Comments
 (0)