A Cloudflare Worker that powers background processing for Moretransfer: ZIP bundling (Cloudflare Containers), Cloudflare Stream ingest, scheduled notifications, and transfer cleanup crons. Jobs are processed asynchronously via Cloudflare Queues and Durable Objects.
- ZIP (containers): Resumable ZIP64 STORE archives built in a Go container, coordinated by
JobManagerDOwith SQLite checkpointing and R2 multipart uploads - Global concurrency control:
ZipSemaphoreDOlimits how many container ZIP jobs run at once - Cloudflare Stream ingest: Copies video from R2 presigned URLs into Cloudflare Stream for preview playback
- Notification workflow: Cron-driven schedule β dedicated notification queue β batched delivery via the Web API
- Transfer maintenance crons: Expired-transfer cleanup and abandoned-upload cleanup
- HMAC-authenticated HTTP producers: Signed POST endpoints for the Next.js API
βββββββββββββββββββ POST /compress-files ββββββββββββββββββββ
β Next.js API β βββββββββββββββββββββββββββββΊ β Worker (fetch) β
β β POST /stream-ingest β β
βββββββββββββββββββ ββββββββββ¬ββββββββββ
β
ββββββββββββββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββ
β β β
βΌ βΌ βΌ
QUEUE_WORKER_MAIN QUEUE_NOTIFICATIONS Cron triggers
(zip_v2_tick, stream_ingest) (notification_process) (cleanup + schedule)
β β
βββββββββββββ΄ββββββββββββ β
βΌ βΌ βΌ
JobManagerDO StreamIngestor notification-processor
ZipSemaphoreDO (Cloudflare Stream) β Web API /notifications/process
ZipContainerDO
(Go ZIP container)
β
βΌ
R2 (source + output)
| Component | Role |
|---|---|
| HTTP producer | Accepts signed POSTs to /compress-files and /stream-ingest |
Main queue (QUEUE_WORKER_MAIN) |
ZIP tick triggers and Stream ingest jobs |
Notification queue (QUEUE_NOTIFICATIONS) |
Batched notification delivery jobs |
| JobManagerDO | ZIP lifecycle: checkpointing, retries, multipart finalize, cleanup TTL |
| ZipSemaphoreDO | Global cap on concurrent container ZIP work |
| ZipContainerDO | Cloudflare Container (Go) that streams ZIP64 STORE output to R2 |
| Cron handler | Expired transfers, abandoned uploads, notification scheduling |
- Cloudflare account with Workers, R2, Queues, Durable Objects, and Containers enabled
- R2 bucket(s) for source and output objects
- Wrangler CLI (
npm installincludes it) - For Stream ingest:
CLOUDFLARE_ACCOUNT_IDandCLOUDFLARE_STREAM_API_TOKENsecrets
Set in wrangler.toml per environment (dev / prod) or via the Cloudflare dashboard.
Core
| Variable | Description |
|---|---|
SOURCE_BUCKET / OUTPUT_BUCKET |
R2 bindings |
ZIP_OUTPUT_PREFIX |
Prefix for generated ZIP paths (e.g. bundle) |
ZIP_OUTPUT_FILE_NAME |
Base name for output object (e.g. moretransfer_bundle β .zip) |
MAX_FILES |
Max files per ZIP manifest (default in code: 500) |
MAX_ZIP_BYTES |
Max total uncompressed bytes (default: 64 GiB) |
BASE_RETRY_DELAY_SECONDS |
Queue retry backoff base |
SECRET_KEY |
HMAC secret shared with the Web API |
WEB_API_BASE_URL |
Next.js API base URL |
ZIP (containers)
| Variable | Description |
|---|---|
ZIP_GLOBAL_CONCURRENCY |
Max concurrent container ZIP jobs (dev: 1, prod: 3) |
ZIP_MANIFEST_PREFIX |
R2 prefix for job manifests (default: manifests) |
ZIP_PART_SIZE_BYTES |
Multipart part size (default: 128 MiB) |
ZIP_MAX_PARTS_PER_TICK |
Parts uploaded per container chunk (default: 8) |
BUNDLE_CLEANUP_TTL_DAYS |
Days before terminal job state is purged from JobManagerDO |
Cloudflare Stream
| Variable | Description |
|---|---|
CLOUDFLARE_ACCOUNT_ID |
Cloudflare account (secret in prod) |
CLOUDFLARE_STREAM_API_TOKEN |
Stream API token (secret in prod) |
STREAM_REQUIRE_SIGNED_URLS |
Require signed playback URLs (set false to allow public preview during ingest) |
- R2 buckets: Source and output bindings
- Queues:
- Main worker queue + DLQ (
QUEUE_WORKER_MAIN) - Notification queue + DLQ (
QUEUE_NOTIFICATIONS)
- Main worker queue + DLQ (
- Durable Objects:
JobManagerDO,ZipSemaphoreDO,ZipContainerDO - Container:
ZipContainerDOimage fromcontainer/Dockerfile
| Schedule | Handler |
|---|---|
0 */3 * * * |
Cleanup expired transfers |
0 1,7,13,19 * * * |
Cleanup abandoned uploads (staggered from expired cleanup) |
*/15 * * * * |
Notification schedule β enqueue notification_process jobs |
git clone <repository-url>
cd moretransfer-worker
npm installConfigure wrangler.toml, create queues, and set secrets:
npm run deploy:dev # or deploy:prod
wrangler secret put CLOUDFLARE_STREAM_API_TOKEN --env prod
wrangler secret put SECRET_KEY --env prodPOST to /compress-files with a JSON body matching ZipJob:
{
"transferId": "uuid",
"objectPrefix": "path/to/files/",
"zipOutputKey": "optional/custom/key.zip",
"includeEmpty": true,
"createdBy": "api",
"files": [{ "key": "path/obj1", "relativePath": "folder/a.mxf" }]
}Parameters
transferId(required): Transfer id for status callbacksobjectPrefix(required): R2 prefix for the transferzipOutputKey(optional): Custom output key in the output bucketincludeEmpty(optional): Include zero-byte files (default:true)createdBy(optional): Audit field; stored in ZIP object metadatafiles(optional): R2 keys mapped torelativePathinside the ZIP
ZIP flow
- Worker writes a manifest to R2 (
manifests/<jobId>.json) JobManagerDOstarts the job (jobId=transferId)- A
zip_v2_tickmessage is sent to the main queue - Each tick acquires the global semaphore, runs a container chunk (
/runChunk), uploads multipart parts, and advances the checkpoint - When all files are written, the DO finalizes multipart upload, verifies output, updates transfer status to
ready, and schedules state cleanup
Output objects include customMetadata.zipVersion = zip64-store-container-v1.
POST to /stream-ingest:
{
"transferId": "uuid",
"fileId": "uuid",
"r2PresignedGetUrl": "https://...",
"transferUserId": "optional-user-id",
"transferExpiresAt": "2026-12-31T00:00:00.000Z",
"meta": {
"transferId": "uuid",
"fileId": "uuid",
"filename": "video.mp4",
"mimeType": "video/mp4"
}
}The worker copies the R2 object into Cloudflare Stream via the Stream copy API, with optional signed URLs, allowed origins, creator metadata, and scheduled deletion aligned to transfer expiry.
- Schedule (cron every 15 minutes): Worker calls
POST /api/external/notifications/scheduleon the Web API - Enqueue: Web API returns
jobIds; worker batches them ontoQUEUE_NOTIFICATIONSasnotification_processmessages (with acorrelationId) - Process (notification queue consumer): Messages grouped by
correlationIdare sent toPOST /api/external/notifications/process - Ack/retry: Per-job results (
sent,skipped,failed,retry) drive message ack or retry
npm run dev
# or without HMAC for local testing:
npm run dev:no-authnpm run dev:cron
npm run cron:cleanup
npm run cron:abandoned-uploads
npm run cron:notificationsnpm run typecheckUpload test objects to R2, start the worker, then:
./scripts/test-enqueue.shmoretransfer-worker/
βββ container/
β βββ Dockerfile # ZipContainerDO image
β βββ main.go # ZIP64 STORE chunk writer (Go)
βββ src/
β βββ index.ts # fetch / queue / scheduled handlers
β βββ lib/
β β βββ types/types.ts
β βββ modules/
β βββ job-manager-do.ts # ZIP coordinator
β βββ zip-container.ts # Container DO + R2 outbound proxy
β βββ semaphore-do.ts # Global ZIP concurrency
β βββ zip-processor.ts
β βββ stream-ingestor.ts # Cloudflare Stream copy
β βββ notification-processor.ts
β βββ cron.ts
β βββ job-manifest.ts
β βββ web-api-service.ts
βββ scripts/
βββ package.json
βββ wrangler.toml
βββ README.md
- @cloudflare/containers: Cloudflare Containers SDK for
ZipContainerDO - @cloudflare/workers-types: TypeScript types for Workers
- wrangler: Cloudflare Workers CLI
- STORE only β no deflate, encryption, or symlinks in the ZIP writer
- MAX_FILES / MAX_ZIP_BYTES enforced at manifest creation
- Large jobs depend on R2 throughput, container availability, and Worker CPU limits (
cpu_msinwrangler.toml) - Stream ingest requires Cloudflare Stream API credentials and respects Stream minimum retention (30 days) for scheduled deletion
After generating a ZIP, download it and verify:
unzip -t output.zip
ditto -x -k output.zip test-output
7zz t output.zipSuggested cases: single file under 4 GiB; single file over 4 GiB; many small files; deep paths / Unicode names; empty files when includeEmpty is true.
- Main queue: Infrastructure failures on ZIP ticks retry with delay; business backoff is handled inside
JobManagerDOvianextActionAtMsand alarms - Notification queue: 5xx / parse errors retry the batch; 4xx acks; per-job
retrystatus retries individual messages - Stream ingest: Exponential backoff on failure
- Dead letter queues: Permanently failed messages (see
wrangler.toml) - ZIP cleanup: Terminal jobs purge DO state after
BUNDLE_CLEANUP_TTL_DAYS; failed jobs best-effort abort multipart uploads
ISC