Skip to content

Commit 84493c7

Browse files
committed
fix: comments
Signed-off-by: Uroš Marolt <uros@marolt.me>
1 parent b409a2c commit 84493c7

6 files changed

Lines changed: 62 additions & 144 deletions

File tree

services/apps/cron_service/src/jobs/incomingWebhooksCheck.job.ts

Lines changed: 3 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ import CronTime from 'cron-time-generator'
33
import { IS_PROD_ENV } from '@crowd/common'
44
import { IntegrationStreamWorkerEmitter } from '@crowd/common_services'
55
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
6-
import { Logger } from '@crowd/logging'
7-
import { KafkaAdmin, QUEUE_CONFIG, getKafkaClient } from '@crowd/queue'
6+
import { QUEUE_CONFIG, getKafkaClient, getKafkaMessageCounts } from '@crowd/queue'
87
import { KafkaQueueService } from '@crowd/queue/src/vendors/kafka/client'
98
import { WebhookState } from '@crowd/types'
109

@@ -24,7 +23,7 @@ const job: IJobDefinition = {
2423
const admin = kafkaClient.admin()
2524
await admin.connect()
2625

27-
const counts = await getMessageCounts(ctx.log, admin, TOPIC, GROUP_ID)
26+
const counts = await getKafkaMessageCounts(ctx.log, admin, TOPIC, GROUP_ID)
2827

2928
if (counts.unconsumed >= MAX_UNCONSUMED) {
3029
ctx.log.info(
@@ -37,7 +36,7 @@ const job: IJobDefinition = {
3736

3837
const count = (
3938
await dbConnection.one(
40-
`select count(*) as count from "incomingWebhooks" where state = $(state) and "createdAt" < now() - interval '1 day'`,
39+
`select count(*)::int as count from "incomingWebhooks" where state = $(state) and "createdAt" < now() - interval '1 day'`,
4140
{ state: WebhookState.PENDING },
4241
)
4342
).count
@@ -85,35 +84,4 @@ const job: IJobDefinition = {
8584
},
8685
}
8786

88-
async function getMessageCounts(
89-
log: Logger,
90-
admin: KafkaAdmin,
91-
topic: string,
92-
groupId: string,
93-
): Promise<{ total: number; consumed: number; unconsumed: number }> {
94-
try {
95-
const topicOffsets = await admin.fetchTopicOffsets(topic)
96-
const offsetsResponse = await admin.fetchOffsets({ groupId, topics: [topic] })
97-
const offsets = offsetsResponse[0].partitions
98-
99-
let totalMessages = 0
100-
let consumedMessages = 0
101-
let totalLeft = 0
102-
103-
for (const offset of offsets) {
104-
const topicOffset = topicOffsets.find((p) => p.partition === offset.partition)
105-
if (topicOffset) {
106-
totalMessages += Number(topicOffset.offset)
107-
consumedMessages += Number(offset.offset)
108-
totalLeft += Number(topicOffset.offset) - Number(offset.offset)
109-
}
110-
}
111-
112-
return { total: totalMessages, consumed: consumedMessages, unconsumed: totalLeft }
113-
} catch (err) {
114-
log.error(err, 'Failed to get message count!')
115-
throw err
116-
}
117-
}
118-
11987
export default job

services/apps/cron_service/src/jobs/integrationResultsCheck.job.ts

Lines changed: 2 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ import CronTime from 'cron-time-generator'
33
import { IS_PROD_ENV, generateUUIDv1, partition } from '@crowd/common'
44
import { DataSinkWorkerEmitter } from '@crowd/common_services'
55
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
6-
import { Logger } from '@crowd/logging'
7-
import { KafkaAdmin, QUEUE_CONFIG, getKafkaClient } from '@crowd/queue'
6+
import { QUEUE_CONFIG, getKafkaClient, getKafkaMessageCounts } from '@crowd/queue'
87
import { KafkaQueueService } from '@crowd/queue/src/vendors/kafka/client'
98
import { DataSinkWorkerQueueMessageType, IntegrationResultState } from '@crowd/types'
109

@@ -23,7 +22,7 @@ const job: IJobDefinition = {
2322
const admin = kafkaClient.admin()
2423
await admin.connect()
2524

26-
const counts = await getMessageCounts(ctx.log, admin, topic, groupId)
25+
const counts = await getKafkaMessageCounts(ctx.log, admin, topic, groupId)
2726

2827
// if we have less than 50k messages in the queue we can trigger 50k oldest results (we process between 100k and 300k results per hour on average)
2928
if (counts.unconsumed < 50000) {
@@ -80,50 +79,4 @@ const job: IJobDefinition = {
8079
},
8180
}
8281

83-
async function getMessageCounts(
84-
log: Logger,
85-
admin: KafkaAdmin,
86-
topic: string,
87-
groupId: string,
88-
): Promise<{
89-
total: number
90-
consumed: number
91-
unconsumed: number
92-
}> {
93-
try {
94-
const topicOffsets = await admin.fetchTopicOffsets(topic)
95-
const offsetsResponse = await admin.fetchOffsets({
96-
groupId: groupId,
97-
topics: [topic],
98-
})
99-
100-
const offsets = offsetsResponse[0].partitions
101-
102-
let totalMessages = 0
103-
let consumedMessages = 0
104-
let totalLeft = 0
105-
106-
for (const offset of offsets) {
107-
const topicOffset = topicOffsets.find((p) => p.partition === offset.partition)
108-
if (topicOffset) {
109-
// Total messages is the latest offset
110-
totalMessages += Number(topicOffset.offset)
111-
// Consumed messages is the consumer group's offset
112-
consumedMessages += Number(offset.offset)
113-
// Unconsumed is the difference
114-
totalLeft += Number(topicOffset.offset) - Number(offset.offset)
115-
}
116-
}
117-
118-
return {
119-
total: totalMessages,
120-
consumed: consumedMessages,
121-
unconsumed: totalLeft,
122-
}
123-
} catch (err) {
124-
log.error(err, 'Failed to get message count!')
125-
throw err
126-
}
127-
}
128-
12982
export default job

services/apps/cron_service/src/jobs/queueMonitoring.job.ts

Lines changed: 2 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import CronTime from 'cron-time-generator'
22

33
import { IS_PROD_ENV, distinct, timeout } from '@crowd/common'
44
import { Logger } from '@crowd/logging'
5-
import { KafkaAdmin, QUEUE_CONFIG, getKafkaClient } from '@crowd/queue'
5+
import { KafkaAdmin, QUEUE_CONFIG, getKafkaClient, getKafkaMessageCounts } from '@crowd/queue'
66
import { SlackChannel, SlackPersona, sendSlackNotificationAsync } from '@crowd/slack'
77
import telemetry from '@crowd/telemetry'
88

@@ -41,7 +41,7 @@ const job: IJobDefinition = {
4141
}
4242

4343
for (const group of groups) {
44-
const counts = await getMessageCounts(ctx.log, admin, topic, group)
44+
const counts = await getKafkaMessageCounts(ctx.log, admin, topic, group)
4545
ctx.log.info(
4646
`Topic ${topic} group ${group} has ${counts.total} total messages, ${counts.consumed} consumed, ${counts.unconsumed} unconsumed!`,
4747
)
@@ -181,61 +181,6 @@ async function isConsumerListeningToTopic(
181181
}
182182
}
183183

184-
async function getMessageCounts(
185-
log: Logger,
186-
admin: KafkaAdmin,
187-
topic: string,
188-
groupId: string,
189-
): Promise<{
190-
total: number
191-
consumed: number
192-
unconsumed: number
193-
}> {
194-
try {
195-
const topicOffsets = await admin.fetchTopicOffsets(topic)
196-
const offsetsResponse = await admin.fetchOffsets({
197-
groupId: groupId,
198-
topics: [topic],
199-
})
200-
201-
const offsets = offsetsResponse[0].partitions
202-
203-
let totalMessages = 0
204-
let consumedMessages = 0
205-
let totalLeft = 0
206-
207-
for (const offset of offsets) {
208-
const topicOffset = topicOffsets.find((p) => p.partition === offset.partition)
209-
if (topicOffset) {
210-
// Total messages is the latest offset
211-
totalMessages += Number(topicOffset.offset)
212-
213-
// Handle -1 offsets (no committed offset)
214-
if (offset.offset === '-1') {
215-
// No committed offset means no messages consumed from this partition
216-
consumedMessages += 0
217-
// Unconsumed is the total messages in the partition
218-
totalLeft += Number(topicOffset.offset) - Number(topicOffset.low)
219-
} else {
220-
// Consumed messages is the consumer group's offset
221-
consumedMessages += Number(offset.offset)
222-
// Unconsumed is the difference
223-
totalLeft += Number(topicOffset.offset) - Number(offset.offset)
224-
}
225-
}
226-
}
227-
228-
return {
229-
total: totalMessages,
230-
consumed: consumedMessages,
231-
unconsumed: totalLeft,
232-
}
233-
} catch (err) {
234-
log.error(err, 'Failed to get message count!')
235-
throw err
236-
}
237-
}
238-
239184
async function getTopicMessageCount(
240185
log: Logger,
241186
admin: KafkaAdmin,

services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/requestedForErasureMemberIdentities.repo.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,9 @@ export default class RequestedForErasureMemberIdentitiesRepository extends Repos
9090
} else {
9191
// The SQL query above already filters non-EMAIL identities by platform, so the
9292
// in-memory filter must also include platform. Without it, two identities sharing
93-
// (type, value) but on different platforms both match, causing singleOrDefault to
94-
// throw "Array contains more than one matching element!" — a deterministic crash
95-
// that will never self-heal since the same data triggers the same failure every time.
93+
// (type, value) but on different platforms would both match. The previous
94+
// singleOrDefault call threw "Array contains more than one matching element!" in that
95+
// case — a deterministic crash that never self-heals.
9696
const row =
9797
data.find(
9898
(r) =>

services/libs/queue/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
export * from './types'
22
export * from './queue'
33
export * from './vendors/kafka/config'
4+
export * from './vendors/kafka/utils'
45
export * from './prioritization'
56
export * from './factory'
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import { Admin } from 'kafkajs'
2+
3+
import { Logger } from '@crowd/logging'
4+
5+
/**
6+
* Returns the total, consumed, and unconsumed message counts for a Kafka topic/consumer-group pair.
7+
*
8+
* Handles the `-1` offset edge case: when a consumer group has never committed an offset,
9+
* Kafka returns `-1`. In that case all messages from the low watermark onward are unconsumed.
10+
*/
11+
export async function getKafkaMessageCounts(
12+
log: Logger,
13+
admin: Admin,
14+
topic: string,
15+
groupId: string,
16+
): Promise<{ total: number; consumed: number; unconsumed: number }> {
17+
try {
18+
const topicOffsets = await admin.fetchTopicOffsets(topic)
19+
const offsetsResponse = await admin.fetchOffsets({
20+
groupId,
21+
topics: [topic],
22+
})
23+
24+
const offsets = offsetsResponse[0].partitions
25+
26+
let totalMessages = 0
27+
let consumedMessages = 0
28+
let totalLeft = 0
29+
30+
for (const offset of offsets) {
31+
const topicOffset = topicOffsets.find((p) => p.partition === offset.partition)
32+
if (topicOffset) {
33+
totalMessages += Number(topicOffset.offset)
34+
35+
if (offset.offset === '-1') {
36+
// No committed offset yet — treat all messages from the low watermark as unconsumed.
37+
consumedMessages += 0
38+
totalLeft += Number(topicOffset.offset) - Number(topicOffset.low)
39+
} else {
40+
consumedMessages += Number(offset.offset)
41+
totalLeft += Number(topicOffset.offset) - Number(offset.offset)
42+
}
43+
}
44+
}
45+
46+
return { total: totalMessages, consumed: consumedMessages, unconsumed: totalLeft }
47+
} catch (err) {
48+
log.error(err, 'Failed to get Kafka message counts!')
49+
throw err
50+
}
51+
}

0 commit comments

Comments
 (0)