-
Notifications
You must be signed in to change notification settings - Fork 731
Expand file tree
/
Copy pathtransformerConsumer.ts
More file actions
161 lines (134 loc) · 5.37 KB
/
transformerConsumer.ts
File metadata and controls
161 lines (134 loc) · 5.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/**
* Transformer consumer: Infinite loop that polls DB → transforms.
*
* Continuously polls the metadata store for pending jobs
* that need transformation, then runs the appropriate transformer.
*/
import { DataSinkWorkerEmitter } from '@crowd/common_services'
import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/database'
import { getServiceChildLogger } from '@crowd/logging'
import { QUEUE_CONFIG, QueueFactory } from '@crowd/queue'
import { REDIS_CONFIG, RedisCache, getRedisClient } from '@crowd/redis'
import { PlatformType } from '@crowd/types'
import { IntegrationResolver } from '../core/integrationResolver'
import { MetadataStore, SnowflakeExportJob } from '../core/metadataStore'
import { S3Service } from '../core/s3Service'
import { getDataSource, getEnabledPlatforms } from '../integrations'
const log = getServiceChildLogger('transformerConsumer')
const MAX_POLLING_INTERVAL_MS = 30 * 60 * 1000 // 30 minutes
export class TransformerConsumer {
private running = false
private currentPollingIntervalMs: number
constructor(
private readonly metadataStore: MetadataStore,
private readonly s3Service: S3Service,
private readonly integrationResolver: IntegrationResolver,
private readonly emitter: DataSinkWorkerEmitter,
private readonly pollingIntervalMs: number,
) {
this.currentPollingIntervalMs = pollingIntervalMs
}
async start(): Promise<void> {
this.running = true
log.info('Transformer consumer started')
while (this.running) {
try {
const job = await this.metadataStore.claimOldestPendingJob()
log.info('Claiming job from metadata store', { job })
if (job) {
log.info({ jobId: job.id, platform: job.platform, s3Path: job.s3Path }, 'Processing job')
this.currentPollingIntervalMs = this.pollingIntervalMs
await this.processJob(job)
// yield to the event loop so GC can collect the previous batch before the next one starts
await new Promise<void>((resolve) => setImmediate(resolve))
continue
}
} catch (err) {
log.error({ err }, 'Error in consumer loop')
await this.sleep(this.pollingIntervalMs)
continue
}
log.info(
{ currentPollingIntervalMs: this.currentPollingIntervalMs },
'No pending jobs, backing off',
)
await this.sleep(this.currentPollingIntervalMs)
this.currentPollingIntervalMs = Math.min(
this.currentPollingIntervalMs * 2,
MAX_POLLING_INTERVAL_MS,
)
}
log.info('Transformer consumer stopped')
}
stop(): void {
this.running = false
}
private async processJob(job: SnowflakeExportJob): Promise<void> {
log.info({ jobId: job.id, platform: job.platform, s3Path: job.s3Path }, 'Processing job')
const startTime = Date.now()
try {
const platform = job.platform as PlatformType
const source = getDataSource(platform, job.sourceName)
let transformedCount = 0
let transformSkippedCount = 0
let resolveSkippedCount = 0
for await (const row of this.s3Service.streamParquetRows(job.s3Path)) {
const results = source.transformer.safeTransformRow(row)
if (!results) {
transformSkippedCount++
continue
}
for (const result of results) {
const resolved = await this.integrationResolver.resolve(platform, result.segment)
if (!resolved) {
resolveSkippedCount++
continue
}
await this.emitter.createAndProcessActivityResult(
resolved.segmentId,
resolved.integrationId,
result.activity,
)
transformedCount++
}
}
const skippedCount = transformSkippedCount + resolveSkippedCount
const processingMetrics = {
transformedCount,
skippedCount,
transformSkippedCount,
resolveSkippedCount,
processingDurationMs: Date.now() - startTime,
}
await this.metadataStore.markCompleted(job.id, processingMetrics)
log.info({ jobId: job.id, ...processingMetrics }, 'Job completed')
} catch (err) {
const errorMessage = err instanceof Error ? err.message : String(err)
log.error({ jobId: job.id, err }, 'Job failed')
try {
await this.metadataStore.markFailed(job.id, errorMessage, {
processingDurationMs: Date.now() - startTime,
})
} catch (updateErr) {
log.error({ jobId: job.id, updateErr }, 'Failed to mark job as failed')
}
}
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms))
}
}
export async function createTransformerConsumer(): Promise<TransformerConsumer> {
const db = await getDbConnection(WRITE_DB_CONFIG())
const metadataStore = new MetadataStore(db)
const s3Service = new S3Service()
const redisClient = await getRedisClient(REDIS_CONFIG(), true)
const cache = new RedisCache('snowflake-integration-resolver', redisClient, log)
const resolver = new IntegrationResolver(db, cache)
await resolver.preloadPlatforms(getEnabledPlatforms())
const queueClient = QueueFactory.createQueueService(QUEUE_CONFIG())
const emitter = new DataSinkWorkerEmitter(queueClient, log)
await emitter.init()
const pollingIntervalMs = 10_000 // 10 seconds
return new TransformerConsumer(metadataStore, s3Service, resolver, emitter, pollingIntervalMs)
}