feat(concurrency): bullmq based concurrency control system#3605
feat(concurrency): bullmq based concurrency control system#3605icecrasher321 wants to merge 20 commits intostagingfrom
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub. |
PR SummaryHigh Risk Overview Updates workflow execution, scheduled execution, and webhook trigger APIs to enqueue through Adds billing-driven workspace concurrency limits (plan defaults + enterprise metadata override) with caching, extends enterprise billing metadata parsing with zod (including concurrency limit), and documents/markets the new concurrency limits in docs and pricing UI. Replaces the async-jobs Redis backend with a BullMQ backend and adjusts inline-execution semantics so only the database fallback executes inline. Written by Cursor Bugbot for commit 53733e4. This will update automatically on new commits. Configure here. |
Greptile SummaryThis PR introduces a comprehensive BullMQ-based queuing and concurrency control system for workflow, webhook, and schedule executions. It replaces the previous Redis/database job queue backends with BullMQ queues backed by Redis, adds a per-workspace fairness dispatcher (with a Lua-script-based atomic claim mechanism), a lease-based concurrency limit (keyed to billing plan), an in-process admission gate for external API requests, and a standalone worker process to consume jobs. It also adds a buffered SSE stream for non-manual executions that now run asynchronously through the dispatch queue. The change is substantial (~5900 lines added) and represents a foundational infrastructure improvement for reliability and rate-limiting protection. Key changes and issues found:
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Client
participant RouteHandler as API Route Handler
participant AdmissionGate as Admission Gate (in-process)
participant Dispatcher as Workspace Dispatcher
participant RedisStore as Redis Dispatch Store
participant BullMQ as BullMQ Queue (Redis)
participant Worker as BullMQ Worker Process
participant DispatchWorker as Dispatch Worker (worker.ts)
Client->>RouteHandler: POST /api/workflows/[id]/execute
RouteHandler->>AdmissionGate: tryAdmit()
alt At capacity (inflight >= MAX_INFLIGHT)
AdmissionGate-->>RouteHandler: null
RouteHandler-->>Client: 429 Too Many Requests
else Admitted
AdmissionGate-->>RouteHandler: ticket
RouteHandler->>Dispatcher: enqueueWorkspaceDispatch(input)
Dispatcher->>RedisStore: enqueueWorkspaceDispatchJob()
RedisStore-->>Dispatcher: jobRecord (status=waiting)
Dispatcher->>Dispatcher: runDispatcherLoop() [void]
Dispatcher->>RedisStore: popNextWorkspaceId()
Dispatcher->>RedisStore: claimWorkspaceJob() [Lua script]
RedisStore-->>Dispatcher: {type: admitted, record, leaseId}
Dispatcher->>BullMQ: queue.add(jobName, payload, {jobId})
Dispatcher->>RedisStore: markDispatchJobAdmitted()
Dispatcher-->>RouteHandler: dispatchJobId
RouteHandler->>RouteHandler: waitForDispatchJob(id, timeout) [polls 250ms]
Worker->>BullMQ: picks up job
Worker->>DispatchWorker: runDispatchedJob(metadata, fn)
DispatchWorker->>RedisStore: markDispatchJobRunning()
DispatchWorker->>DispatchWorker: executeQueuedWorkflowJob() / executeWorkflowJob()
DispatchWorker->>RedisStore: markDispatchJobCompleted(output)
DispatchWorker->>RedisStore: releaseWorkspaceLease()
DispatchWorker->>Dispatcher: wakeWorkspaceDispatcher()
RedisStore-->>RouteHandler: record (status=completed) [via poll]
RouteHandler->>AdmissionGate: ticket.release()
RouteHandler-->>Client: 200 JSON result
end
Last reviewed commit: be83c97 |
|
bugbot run |
|
bugbot run |
|
bugbot run |
|
bugbot run |
|
bugbot run |
|
bugbot run |
Resolved conflict in document-processor.ts by keeping resolveParserExtension helper with txt fallback for parseBase64Content (matching staging behavior) and strict mode for parseHttpFile. Made mimeType optional in both resolveParserExtension and parseHttpFile. Made-with: Cursor
|
bugbot run |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Summary
BullMQ based concurrency control system for executions currently running in line [manual execs excluded]. Can tune limits based on resources.
Overall admin gates to prevent rate limiting services based crashes.
Type of Change
Testing
Tested manually under different configurations, added extensive test suite
Checklist