Skip to content

Commit bdf1893

Browse files
authored
feat: add request-local rate limiter (#670)
* feat: add request-local rate limiter * chore: update `global.d.ts` * stamp: apply suggestion 2 * stamp: abort check first * stamp: 🤨
1 parent 859cf21 commit bdf1893

24 files changed

Lines changed: 1017 additions & 51 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cli/src/flags.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,15 @@ fn get_start_command() -> Command {
232232
))
233233
.value_parser(value_parser!(u64)),
234234
)
235+
.arg(
236+
arg!(--"rate-limit-table-cleanup-interval" <SECONDS>)
237+
.help(concat!(
238+
"Interval in seconds between sweeps of the outbound rate-limit ",
239+
"table to remove expired entries (default: 60)"
240+
))
241+
.default_value("60")
242+
.value_parser(value_parser!(u64)),
243+
)
235244
.arg(
236245
arg!(--"inspect"[HOST_AND_PORT])
237246
.help("Activate inspector on host:port")

cli/src/main.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,13 @@ fn main() -> Result<ExitCode, anyhow::Error> {
229229
.get_one::<u64>("request-buffer-size")
230230
.copied()
231231
.unwrap();
232-
232+
let rate_limit_cleanup_interval_sec = sub_matches
233+
.get_one::<u64>("rate-limit-table-cleanup-interval")
234+
.copied()
235+
.unwrap_or(60);
236+
if rate_limit_cleanup_interval_sec == 0 {
237+
bail!("--rate-limit-table-cleanup-interval must be >= 1 second");
238+
}
233239
let flags = ServerFlags {
234240
otel: if !enable_otel.is_empty() {
235241
if enable_otel.len() > 1 {
@@ -264,6 +270,8 @@ fn main() -> Result<ExitCode, anyhow::Error> {
264270
beforeunload_wall_clock_pct: maybe_beforeunload_wall_clock_pct,
265271
beforeunload_cpu_pct: maybe_beforeunload_cpu_pct,
266272
beforeunload_memory_pct: maybe_beforeunload_memory_pct,
273+
274+
rate_limit_cleanup_interval_sec,
267275
};
268276

269277
let mut builder = Builder::new(addr, &main_service_path);

crates/base/src/server.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,8 @@ pub struct ServerFlags {
332332
pub beforeunload_wall_clock_pct: Option<u8>,
333333
pub beforeunload_cpu_pct: Option<u8>,
334334
pub beforeunload_memory_pct: Option<u8>,
335+
336+
pub rate_limit_cleanup_interval_sec: u64,
335337
}
336338

337339
#[derive(Debug)]

crates/base/src/worker/pool.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ use either::Either::Left;
1818
use enum_as_inner::EnumAsInner;
1919
use ext_event_worker::events::WorkerEventWithMetadata;
2020
use ext_runtime::SharedMetricSource;
21+
use ext_runtime::SharedRateLimitTable;
22+
use ext_runtime::TraceRateLimiterConfig;
2123
use ext_workers::context::CreateUserWorkerResult;
2224
use ext_workers::context::SendRequestResult;
2325
use ext_workers::context::Timing;
@@ -235,6 +237,7 @@ pub struct WorkerPool {
235237
pub flags: Arc<ServerFlags>,
236238
pub policy: WorkerPoolPolicy,
237239
pub metric_src: SharedMetricSource,
240+
pub shared_rate_limit_table: SharedRateLimitTable,
238241
pub user_workers: HashMap<Uuid, UserWorkerProfile>,
239242
pub active_workers: HashMap<String, ActiveWorkerRegistry>,
240243
pub worker_pool_msgs_tx: mpsc::UnboundedSender<UserWorkerMsgs>,
@@ -253,11 +256,19 @@ impl WorkerPool {
253256
worker_event_sender: Option<UnboundedSender<WorkerEventWithMetadata>>,
254257
worker_pool_msgs_tx: mpsc::UnboundedSender<UserWorkerMsgs>,
255258
inspector: Option<Inspector>,
259+
cancel: CancellationToken,
256260
) -> Self {
261+
let shared_rate_limit_table = SharedRateLimitTable::default();
262+
shared_rate_limit_table.spawn_cleanup_task(
263+
Duration::from_secs(flags.rate_limit_cleanup_interval_sec),
264+
cancel,
265+
);
266+
257267
Self {
258268
flags,
259269
policy,
260270
metric_src,
271+
shared_rate_limit_table,
261272
worker_event_sender,
262273
user_workers: HashMap::new(),
263274
active_workers: HashMap::new(),
@@ -384,6 +395,7 @@ impl WorkerPool {
384395
let worker_pool_msgs_tx = self.worker_pool_msgs_tx.clone();
385396
let events_msg_tx = self.worker_event_sender.clone();
386397
let supervisor_policy = self.policy.supervisor_policy;
398+
let shared_rate_limit_table = self.shared_rate_limit_table.clone();
387399

388400
drop(tokio::spawn(async move {
389401
let (permit, tx) = match wait_fence_fut.await {
@@ -462,6 +474,17 @@ impl WorkerPool {
462474
user_worker_rt_opts.events_msg_tx = events_msg_tx;
463475
user_worker_rt_opts.cancel = Some(cancel.clone());
464476

477+
if let ext_runtime::RateLimiterOpts::Rules { rules, global_key } =
478+
std::mem::take(&mut user_worker_rt_opts.rate_limiter)
479+
{
480+
user_worker_rt_opts.rate_limiter =
481+
ext_runtime::RateLimiterOpts::Configured(TraceRateLimiterConfig {
482+
table: shared_rate_limit_table,
483+
rules,
484+
global_key: Some(global_key),
485+
});
486+
}
487+
465488
worker_options.timing = Some(Timing {
466489
early_drop_rx,
467490
status: status.clone(),
@@ -792,13 +815,15 @@ pub async fn create_user_worker_pool(
792815
async move {
793816
let token = termination_token.as_ref();
794817
let mut termination_requested = false;
818+
let cleanup_cancel = token.map(|t| t.inbound.clone()).unwrap_or_default();
795819
let mut worker_pool = WorkerPool::new(
796820
flags,
797821
policy,
798822
metric_src_inner,
799823
worker_event_sender,
800824
user_worker_msgs_tx_clone,
801825
inspector,
826+
cleanup_cancel,
802827
);
803828

804829
// Note: Keep this loop non-blocking. Spawn a task to run blocking calls.

crates/base/src/worker/worker_inner.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ use ext_event_worker::events::UncaughtExceptionEvent;
1212
use ext_event_worker::events::WorkerEventWithMetadata;
1313
use ext_event_worker::events::WorkerEvents;
1414
use ext_runtime::MetricSource;
15+
use ext_runtime::RateLimiterOpts;
1516
use ext_runtime::RuntimeMetricSource;
17+
use ext_runtime::TraceRateLimiter;
1618
use ext_runtime::WorkerMetricSource;
1719
use ext_workers::context::UserWorkerMsgs;
1820
use ext_workers::context::WorkerContextInitOpts;
@@ -270,6 +272,20 @@ impl Worker {
270272
state_mut.put(metric_src.clone());
271273
MetricSource::Runtime(metric_src)
272274
} else {
275+
if let Some(opts) = new_runtime.conf.as_user_worker().cloned() {
276+
if let RateLimiterOpts::Configured(config) = opts.rate_limiter {
277+
match TraceRateLimiter::new(config) {
278+
Ok(limiter) => {
279+
let state = new_runtime.js_runtime.op_state();
280+
let mut state_mut = state.borrow_mut();
281+
state_mut.put(limiter);
282+
}
283+
Err(err) => {
284+
error!("failed to compile rate limit rules: {err}");
285+
}
286+
}
287+
}
288+
}
273289
MetricSource::Worker(metric_src)
274290
}
275291
};
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Worker A: forwards to worker B. Supports two outbound HTTP modes selected
2+
// via the x-http-mode header: "fetch" (default) or "node".
3+
import * as http from "node:http";
4+
5+
function requestViaNode(
6+
url: string,
7+
headers: Record<string, string>,
8+
): Promise<{ status: number; body: string }> {
9+
return new Promise((resolve, reject) => {
10+
const parsed = new URL(url);
11+
const req = http.request(
12+
{
13+
hostname: parsed.hostname,
14+
port: parsed.port,
15+
path: parsed.pathname,
16+
method: "GET",
17+
headers,
18+
},
19+
(res) => {
20+
let body = "";
21+
res.on("data", (chunk) => {
22+
body += chunk;
23+
});
24+
res.on("end", () => resolve({ status: res.statusCode ?? 500, body }));
25+
},
26+
);
27+
req.on("error", reject);
28+
req.end();
29+
});
30+
}
31+
32+
Deno.serve(async (req: Request) => {
33+
if (!req.headers.has("traceparent")) {
34+
return new Response(
35+
JSON.stringify({ msg: "missing traceparent header" }),
36+
{ status: 400, headers: { "Content-Type": "application/json" } },
37+
);
38+
}
39+
40+
const serverUrl = req.headers.get("x-test-server-url");
41+
if (!serverUrl) {
42+
return new Response(
43+
JSON.stringify({ msg: "missing x-test-server-url header" }),
44+
{ status: 400, headers: { "Content-Type": "application/json" } },
45+
);
46+
}
47+
48+
const mode = req.headers.get("x-http-mode") ?? "fetch";
49+
const forwardHeaders: Record<string, string> = {
50+
"x-test-server-url": serverUrl,
51+
"x-http-mode": mode,
52+
};
53+
54+
try {
55+
let status: number;
56+
let body: string;
57+
58+
if (mode === "node") {
59+
({ status, body } = await requestViaNode(
60+
`${serverUrl}/rate-limit-b`,
61+
forwardHeaders,
62+
));
63+
} else {
64+
const resp = await fetch(`${serverUrl}/rate-limit-b`, {
65+
headers: forwardHeaders,
66+
});
67+
status = resp.status;
68+
body = await resp.text();
69+
}
70+
71+
return new Response(body, {
72+
status,
73+
headers: { "Content-Type": "application/json" },
74+
});
75+
} catch (e) {
76+
return new Response(
77+
JSON.stringify({ msg: e.toString() }),
78+
{ status: 500, headers: { "Content-Type": "application/json" } },
79+
);
80+
}
81+
});
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Worker B: forwards back to worker A. Supports two outbound HTTP modes
2+
// selected via the x-http-mode header: "fetch" (default) or "node".
3+
import * as http from "node:http";
4+
5+
function requestViaNode(
6+
url: string,
7+
headers: Record<string, string>,
8+
): Promise<{ status: number; body: string }> {
9+
return new Promise((resolve, reject) => {
10+
const parsed = new URL(url);
11+
const req = http.request(
12+
{
13+
hostname: parsed.hostname,
14+
port: parsed.port,
15+
path: parsed.pathname,
16+
method: "GET",
17+
headers,
18+
},
19+
(res) => {
20+
let body = "";
21+
res.on("data", (chunk) => {
22+
body += chunk;
23+
});
24+
res.on("end", () => resolve({ status: res.statusCode ?? 500, body }));
25+
},
26+
);
27+
req.on("error", reject);
28+
req.end();
29+
});
30+
}
31+
32+
Deno.serve(async (req: Request) => {
33+
if (!req.headers.has("traceparent")) {
34+
return new Response(
35+
JSON.stringify({ msg: "missing traceparent header" }),
36+
{ status: 400, headers: { "Content-Type": "application/json" } },
37+
);
38+
}
39+
40+
const serverUrl = req.headers.get("x-test-server-url");
41+
if (!serverUrl) {
42+
return new Response(
43+
JSON.stringify({ msg: "missing x-test-server-url header" }),
44+
{ status: 400, headers: { "Content-Type": "application/json" } },
45+
);
46+
}
47+
48+
const mode = req.headers.get("x-http-mode") ?? "fetch";
49+
const forwardHeaders: Record<string, string> = {
50+
"x-test-server-url": serverUrl,
51+
"x-http-mode": mode,
52+
};
53+
54+
try {
55+
let status: number;
56+
let body: string;
57+
58+
if (mode === "node") {
59+
({ status, body } = await requestViaNode(
60+
`${serverUrl}/rate-limit-a`,
61+
forwardHeaders,
62+
));
63+
} else {
64+
const resp = await fetch(`${serverUrl}/rate-limit-a`, {
65+
headers: forwardHeaders,
66+
});
67+
status = resp.status;
68+
body = await resp.text();
69+
}
70+
71+
return new Response(body, {
72+
status,
73+
headers: { "Content-Type": "application/json" },
74+
});
75+
} catch (e) {
76+
return new Response(
77+
JSON.stringify({ msg: e.toString() }),
78+
{ status: 500, headers: { "Content-Type": "application/json" } },
79+
);
80+
}
81+
});
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// Worker that echoes back the trace ID from the AsyncVariable context.
2+
// Exposed via globalThis.getRequestTraceId when exposeRequestTraceId context
3+
// flag is set. Used to verify AsyncVariable isolation across concurrent
4+
// requests.
5+
Deno.serve(async (_req: Request) => {
6+
// Small delay so concurrent requests actually overlap inside the event loop.
7+
await new Promise((resolve) => setTimeout(resolve, 30));
8+
9+
const traceId = (globalThis as any).getRequestTraceId?.() ?? null;
10+
11+
return new Response(
12+
JSON.stringify({ traceId }),
13+
{ status: 200, headers: { "Content-Type": "application/json" } },
14+
);
15+
});

0 commit comments

Comments
 (0)