Skip to content

Commit 85a7602

Browse files
Make perf knobs configurable via env vars
- ZS_PARALLEL_DOWNLOADS (default 16, was hardcoded 8) - ZS_EXTRACT_THREADS (default num_cpus*2, was num_cpus) - ZS_CHUNK_MB (default 16MB, was hardcoded 8MB) - Parallelize shared cache checks via rayon Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 115ec5b commit 85a7602

2 files changed

Lines changed: 52 additions & 10 deletions

File tree

crates/zs-fast-wheel/src/main.rs

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,24 @@ fn num_cpus() -> usize {
146146
.unwrap_or(4)
147147
}
148148

149+
/// Read a usize env var with a default. Used for tuning knobs.
150+
fn env_usize(name: &str, default: usize) -> usize {
151+
std::env::var(name)
152+
.ok()
153+
.and_then(|v| v.parse().ok())
154+
.unwrap_or(default)
155+
}
156+
157+
/// Parallel downloads (default 16). Set ZS_PARALLEL_DOWNLOADS to tune.
158+
fn configured_parallel_downloads() -> usize {
159+
env_usize("ZS_PARALLEL_DOWNLOADS", 16)
160+
}
161+
162+
/// Extraction threads (default num_cpus * 2). Set ZS_EXTRACT_THREADS to tune.
163+
fn configured_extract_threads() -> usize {
164+
env_usize("ZS_EXTRACT_THREADS", num_cpus() * 2)
165+
}
166+
149167
/// Compute the environment key (must match Python's _env_key).
150168
fn env_key(requirements: &[String]) -> String {
151169
let mut sorted = requirements.to_vec();
@@ -645,12 +663,23 @@ async fn main() -> Result<()> {
645663
};
646664

647665
if !plan.daemon_wheels.is_empty() {
648-
// Check shared cache — restore cached wheels via hardlinks, only download uncached
666+
// Check shared cache in parallel — restore cached wheels via hardlinks
667+
let sp_for_cache = site_packages.clone();
668+
let wheels_for_cache = plan.daemon_wheels.clone();
669+
let cache_results: Vec<bool> = tokio::task::spawn_blocking(move || {
670+
use rayon::prelude::*;
671+
wheels_for_cache
672+
.par_iter()
673+
.map(|spec| restore_from_shared_cache(spec, &sp_for_cache))
674+
.collect()
675+
})
676+
.await?;
677+
649678
let mut uncached_wheels = Vec::new();
650679
let mut cached_count = 0u32;
651680

652-
for spec in &plan.daemon_wheels {
653-
if restore_from_shared_cache(spec, &site_packages) {
681+
for (spec, was_cached) in plan.daemon_wheels.iter().zip(cache_results.iter()) {
682+
if *was_cached {
654683
cached_count += 1;
655684
if verbose {
656685
eprintln!(" {} (shared cache hit)", spec.distribution);
@@ -676,10 +705,15 @@ async fn main() -> Result<()> {
676705
}
677706

678707
if !uncached_wheels.is_empty() {
708+
let pd = configured_parallel_downloads();
709+
let et = configured_extract_threads();
710+
if verbose {
711+
eprintln!("Config: parallel_downloads={pd}, extract_threads={et}");
712+
}
679713
let config = DaemonConfig {
680714
site_packages: site_packages.clone(),
681-
parallel_downloads: 8,
682-
extract_threads: num_cpus(),
715+
parallel_downloads: pd,
716+
extract_threads: et,
683717
};
684718

685719
let wheels_to_cache: Vec<WheelSpec> = uncached_wheels.clone();

crates/zs-fast-wheel/src/streaming.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,21 @@ const CD_SIGNATURE: u32 = 0x02014b50;
3636
/// Large file threshold — entries above this get their own Range request
3737
const LARGE_THRESHOLD: u64 = 1024 * 1024; // 1MB
3838

39-
/// Target chunk size for batching small files into single Range requests
40-
const CHUNK_TARGET_BYTES: u64 = 8 * 1024 * 1024; // 8MB chunks
39+
/// Target chunk size for batching small files into single Range requests.
40+
/// Override with ZS_CHUNK_MB env var (default 16MB).
41+
fn chunk_target_bytes() -> u64 {
42+
std::env::var("ZS_CHUNK_MB")
43+
.ok()
44+
.and_then(|v| v.parse::<u64>().ok())
45+
.unwrap_or(16)
46+
* 1024
47+
* 1024
48+
}
4149

4250
/// Stream-extract a wheel from a URL with overlapped download+decompress:
4351
/// 1. Range request the central directory (small, fast)
4452
/// 2. Parse entries to get file offsets and sizes
45-
/// 3. Group small files into contiguous chunks (~8MB each)
53+
/// 3. Group small files into contiguous chunks (ZS_CHUNK_MB, default 16MB)
4654
/// 4. Large files get individual Range requests
4755
/// 5. Download chunks in parallel, extract files from each chunk as it arrives
4856
pub async fn stream_extract_wheel(
@@ -151,7 +159,7 @@ pub async fn stream_extract_wheel(
151159

152160
/// Group entries into chunks for efficient Range requests.
153161
/// Large files (>1MB) get their own chunk. Small files are batched
154-
/// into contiguous byte ranges up to CHUNK_TARGET_BYTES.
162+
/// into contiguous byte ranges up to chunk_target_bytes().
155163
fn build_chunks(entries: &[ZipEntry], total_size: u64) -> Vec<Chunk> {
156164
// Sort by file offset (entries are usually already sorted, but be safe)
157165
let mut sorted: Vec<&ZipEntry> = entries.iter().collect();
@@ -195,7 +203,7 @@ fn build_chunks(entries: &[ZipEntry], total_size: u64) -> Vec<Chunk> {
195203
current_end = entry_end;
196204
current_entries.push(entry.clone());
197205
} else if entry_start <= current_end + 4096
198-
&& (entry_end - current_start) < CHUNK_TARGET_BYTES
206+
&& (entry_end - current_start) < chunk_target_bytes()
199207
{
200208
// Contiguous and within chunk size — add to batch
201209
current_end = current_end.max(entry_end);

0 commit comments

Comments
 (0)