Skip to content

Commit d93ed19

Browse files
Benchmark spill machinery (#791)
1 parent b4e5ef9 commit d93ed19

2 files changed

Lines changed: 292 additions & 0 deletions

File tree

timely/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,7 @@ timely_logging = { path = "../logging", version = "0.29" }
3535
timely_communication = { path = "../communication", version = "0.29", default-features = false }
3636
timely_container = { path = "../container", version = "0.29" }
3737
smallvec = { version = "1.15.1", features = ["serde", "const_generics"] }
38+
39+
[dev-dependencies]
40+
tempfile = "3"
41+
serde_bytes = "0.11"

timely/examples/spill_compare.rs

Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
//! Pushes a configurable volume (default 50 GB) of `Vec<u8>` chunks through
2+
//! a timely `Exchange`, optionally with the spilling `MergeQueue` policy
3+
//! installed. Compare RSS and wall-clock time:
4+
//!
5+
//! # spill enabled (file-backed): bounded RSS, disk traffic to --spill-dir
6+
//! cargo run --release --example spill_compare -- --with-spill -w 2
7+
//!
8+
//! # spill disabled: OS pages or OOMs once the queue exceeds RAM
9+
//! cargo run --release --example spill_compare -- -w 2
10+
//!
11+
//! Spill engages on the zero-copy `MergeQueue` path, so this example always
12+
//! uses `CommunicationConfig::ProcessBinary(workers)`. Each worker injects
13+
//! `--total-gb / workers` of data and routes every record to peer
14+
//! `(index + 1) % workers`, forcing all bytes across a MergeQueue.
15+
//!
16+
//! Options:
17+
//! --total-gb N Total cluster-wide data (default: 50)
18+
//! --chunk-kb N Size of each Vec<u8> sent (default: 256)
19+
//! --workers N Number of worker threads (default: 2)
20+
//! --threshold-mb N Spill threshold (default: 256)
21+
//! --head-reserve-mb N Head reserve / prefetch budget (default: 64)
22+
//! --spill-dir PATH Directory for tempfiles (default: std::env::temp_dir())
23+
//! --with-spill Install the file-backed spill policy
24+
//! --rss-every-secs N RSS sampling cadence (default: 2)
25+
26+
use std::sync::Arc;
27+
use std::sync::atomic::{AtomicBool, Ordering};
28+
use std::time::{Duration, Instant};
29+
30+
use timely::CommunicationConfig;
31+
use timely::WorkerConfig;
32+
use timely::communication::initialize::Hooks;
33+
use timely::dataflow::InputHandle;
34+
use timely::dataflow::operators::Input;
35+
use timely::dataflow::operators::generic::Operator;
36+
use timely::dataflow::channels::pact::Exchange;
37+
38+
fn main() {
39+
let args: Vec<String> = std::env::args().collect();
40+
41+
let total_gb: usize = parse_arg(&args, "--total-gb", 50);
42+
let chunk_kb: usize = parse_arg(&args, "--chunk-kb", 256);
43+
let workers: usize = parse_arg(&args, "--workers", 2);
44+
let threshold_mb: usize = parse_arg(&args, "--threshold-mb", 256);
45+
let head_reserve_mb: usize = parse_arg(&args, "--head-reserve-mb", 64);
46+
let rss_every_secs: u64 = parse_arg(&args, "--rss-every-secs", 2) as u64;
47+
let with_spill = args.iter().any(|a| a == "--with-spill");
48+
let spill_dir: std::path::PathBuf = args.iter()
49+
.position(|a| a == "--spill-dir")
50+
.and_then(|i| args.get(i + 1))
51+
.map(std::path::PathBuf::from)
52+
.unwrap_or_else(std::env::temp_dir);
53+
54+
let chunk_bytes = chunk_kb << 10;
55+
let total_bytes = total_gb << 30;
56+
let total_chunks = total_bytes / chunk_bytes;
57+
let chunks_per_worker = total_chunks / workers;
58+
59+
println!("spill_compare configuration:");
60+
println!(" workers: {}", workers);
61+
println!(" total: {} GB ({} chunks of {} KB)", total_gb, total_chunks, chunk_kb);
62+
println!(" per worker: {} chunks ({} GB)", chunks_per_worker, (chunks_per_worker * chunk_bytes) >> 30);
63+
println!(" with_spill: {}", with_spill);
64+
if with_spill {
65+
println!(" threshold: {} MB", threshold_mb);
66+
println!(" head_reserve: {} MB", head_reserve_mb);
67+
println!(" spill_dir: {}", spill_dir.display());
68+
}
69+
println!();
70+
71+
// Build hooks. If --with-spill, install a file-backed policy factory.
72+
let mut hooks = Hooks::default();
73+
if with_spill {
74+
let threshold_bytes = threshold_mb << 20;
75+
let head_reserve = head_reserve_mb << 20;
76+
let dir = spill_dir.clone();
77+
hooks.spill = Some(Arc::new(move || {
78+
use timely::communication::allocator::zero_copy::spill::{
79+
SpillPolicy, Threshold, PrefetchPolicy,
80+
};
81+
let strategy = Box::new(file_spill::FileSpillStrategy::new(dir.clone()));
82+
let mut tp = Threshold::new(strategy);
83+
tp.threshold_bytes = threshold_bytes;
84+
tp.head_reserve_bytes = head_reserve;
85+
let writer: Box<dyn SpillPolicy> = Box::new(tp);
86+
let reader: Box<dyn SpillPolicy> = Box::new(PrefetchPolicy::new(head_reserve));
87+
(writer, reader)
88+
}));
89+
}
90+
91+
let comm = CommunicationConfig::ProcessBinary(workers);
92+
let (builders, others) = comm.try_build_with(hooks).expect("failed to build allocators");
93+
94+
// RSS sampler thread, stopped when the workers finish.
95+
let stop = Arc::new(AtomicBool::new(false));
96+
let stop_clone = stop.clone();
97+
let start = Instant::now();
98+
let sampler = std::thread::spawn(move || {
99+
print_rss(start, "start");
100+
while !stop_clone.load(Ordering::Relaxed) {
101+
std::thread::sleep(Duration::from_secs(rss_every_secs));
102+
print_rss(start, "running");
103+
}
104+
print_rss(start, "done");
105+
});
106+
107+
let guards = timely::execute::execute_from(builders, others, WorkerConfig::default(), move |worker| {
108+
let index = worker.index();
109+
let peers = worker.peers();
110+
let target = ((index + 1) % peers) as u64;
111+
112+
let mut input = InputHandle::<u64, timely::container::CapacityContainerBuilder<Vec<serde_bytes::ByteBuf>>>::new();
113+
114+
worker.dataflow(|scope| {
115+
scope.input_from(&mut input)
116+
.sink(Exchange::new(move |_v: &serde_bytes::ByteBuf| target), "Sink", {
117+
let mut received_bytes: usize = 0;
118+
let mut received_chunks: usize = 0;
119+
let mut last_print = Instant::now();
120+
move |(input, _frontier)| {
121+
input.for_each(|_cap, data| {
122+
for v in data.drain(..) {
123+
received_bytes += v.len();
124+
received_chunks += 1;
125+
}
126+
if last_print.elapsed() >= Duration::from_secs(5) {
127+
println!("worker {}: received {} chunks ({} MB)",
128+
index, received_chunks, received_bytes >> 20);
129+
last_print = Instant::now();
130+
}
131+
});
132+
}
133+
});
134+
});
135+
136+
// Production: each worker pushes its share into the input handle in
137+
// one shot, with no `step` calls. Then closes the input and lets
138+
// the framework drain.
139+
let prod_start = Instant::now();
140+
// xorshift64* keeps the bytes incompressible so macOS's compressed
141+
// memory can't squash duplicate pages.
142+
let mut rng_state: u64 = 0x9E37_79B9_7F4A_7C15
143+
^ ((index as u64).wrapping_mul(0xBF58_476D_1CE4_E5B9));
144+
for _ in 0..chunks_per_worker {
145+
let mut buf = vec![0u8; chunk_bytes];
146+
let words = chunk_bytes / 8;
147+
let (prefix, body, _suffix) = unsafe { buf.align_to_mut::<u64>() };
148+
debug_assert!(prefix.is_empty());
149+
for w in body.iter_mut().take(words) {
150+
rng_state ^= rng_state << 13;
151+
rng_state ^= rng_state >> 7;
152+
rng_state ^= rng_state << 17;
153+
*w = rng_state;
154+
}
155+
input.send(serde_bytes::ByteBuf::from(buf));
156+
}
157+
input.close();
158+
println!("worker {}: production {:.2?}", index, prod_start.elapsed());
159+
160+
let drain_start = Instant::now();
161+
while worker.step_or_park(None) { }
162+
println!("worker {}: drain {:.2?}", index, drain_start.elapsed());
163+
164+
index
165+
}).expect("execute_from failed");
166+
167+
for r in guards.join() { let _ = r; }
168+
stop.store(true, Ordering::Relaxed);
169+
sampler.join().ok();
170+
171+
let elapsed = start.elapsed();
172+
println!();
173+
println!("elapsed: {:.2?}", elapsed);
174+
println!("OK");
175+
}
176+
177+
fn parse_arg(args: &[String], flag: &str, default: usize) -> usize {
178+
args.iter()
179+
.position(|a| a == flag)
180+
.and_then(|i| args.get(i + 1))
181+
.and_then(|v| v.parse().ok())
182+
.unwrap_or(default)
183+
}
184+
185+
fn get_rss_kb() -> Option<u64> {
186+
let pid = std::process::id();
187+
let output = std::process::Command::new("ps")
188+
.args(["-o", "rss=", "-p", &pid.to_string()])
189+
.output()
190+
.ok()?;
191+
String::from_utf8_lossy(&output.stdout).trim().parse().ok()
192+
}
193+
194+
fn print_rss(start: Instant, label: &str) {
195+
match get_rss_kb() {
196+
Some(kb) => println!("[t={:>6.1}s RSS {:>8} KB / {:>6} MB] {}",
197+
start.elapsed().as_secs_f64(), kb, kb / 1024, label),
198+
None => println!("[t={:>6.1}s RSS unavailable] {}", start.elapsed().as_secs_f64(), label),
199+
}
200+
}
201+
202+
/// File-backed BytesSpill implementation; mirrors `communication/examples/spill_stress.rs`.
203+
mod file_spill {
204+
use std::fs::File;
205+
use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
206+
use std::path::PathBuf;
207+
use std::sync::{Arc, Mutex};
208+
use timely::bytes::arc::{Bytes, BytesMut};
209+
use timely::communication::allocator::zero_copy::spill::{BytesSpill, BytesFetch};
210+
211+
pub struct FileSpillStrategy { dir: PathBuf }
212+
213+
impl FileSpillStrategy {
214+
pub fn new(dir: PathBuf) -> Self { FileSpillStrategy { dir } }
215+
}
216+
217+
impl BytesSpill for FileSpillStrategy {
218+
fn spill(&mut self, chunks: &mut Vec<Bytes>, handles: &mut Vec<Box<dyn BytesFetch>>) {
219+
if chunks.is_empty() { return; }
220+
let raw = match tempfile::tempfile_in(&self.dir) {
221+
Ok(f) => f,
222+
Err(e) => { eprintln!("file spill failed: {}", e); return; }
223+
};
224+
let mut writer = BufWriter::with_capacity(4 << 20, raw);
225+
let mut lens = Vec::with_capacity(chunks.len());
226+
for chunk in chunks.iter() {
227+
if let Err(e) = writer.write_all(&chunk[..]) {
228+
eprintln!("file spill write failed: {}", e);
229+
return;
230+
}
231+
lens.push(chunk.len());
232+
}
233+
let file = match writer.into_inner() {
234+
Ok(f) => f,
235+
Err(e) => { eprintln!("file spill flush failed: {}", e); return; }
236+
};
237+
chunks.clear();
238+
let state = Arc::new(Mutex::new(FileState::OnDisk { file, lens: lens.clone() }));
239+
handles.extend((0..lens.len()).map(|i| Box::new(ChunkHandle {
240+
state: Arc::clone(&state), index: i,
241+
}) as Box<dyn BytesFetch>));
242+
}
243+
}
244+
245+
enum FileState {
246+
OnDisk { file: File, lens: Vec<usize> },
247+
Slurped { chunks: Vec<Bytes> },
248+
Placeholder,
249+
}
250+
251+
struct ChunkHandle { state: Arc<Mutex<FileState>>, index: usize }
252+
253+
impl BytesFetch for ChunkHandle {
254+
fn fetch(self: Box<Self>) -> Result<Vec<Bytes>, Box<dyn BytesFetch>> {
255+
let mut state = self.state.lock().expect("spill state poisoned");
256+
if matches!(*state, FileState::OnDisk { .. }) {
257+
let (mut file, lens) = match std::mem::replace(&mut *state, FileState::Placeholder) {
258+
FileState::OnDisk { file, lens } => (file, lens),
259+
_ => unreachable!(),
260+
};
261+
if let Err(e) = file.seek(SeekFrom::Start(0)) {
262+
eprintln!("spill fetch seek failed: {}", e);
263+
*state = FileState::OnDisk { file, lens };
264+
drop(state);
265+
return Err(self);
266+
}
267+
let mut chunks = Vec::with_capacity(lens.len());
268+
for &len in &lens {
269+
let mut data = vec![0u8; len];
270+
if let Err(e) = file.read_exact(&mut data) {
271+
eprintln!("spill fetch read failed: {}", e);
272+
*state = FileState::OnDisk { file, lens };
273+
drop(state);
274+
return Err(self);
275+
}
276+
chunks.push(BytesMut::from(data).freeze());
277+
}
278+
*state = FileState::Slurped { chunks };
279+
}
280+
let result = match &*state {
281+
FileState::Slurped { chunks } => Ok(vec![chunks[self.index].clone()]),
282+
_ => unreachable!(),
283+
};
284+
drop(state);
285+
result
286+
}
287+
}
288+
}

0 commit comments

Comments
 (0)