diff --git a/src/main/rust/src/bin/multi_threaded.rs b/src/main/rust/src/bin/multi_threaded.rs index d1860a4..6745ba1 100644 --- a/src/main/rust/src/bin/multi_threaded.rs +++ b/src/main/rust/src/bin/multi_threaded.rs @@ -1,10 +1,10 @@ use std::{ fs::File, io::{BufRead, BufReader}, - sync::{Mutex}, thread, }; use std::collections::HashMap; +use std::sync::mpsc; use std::time::Instant; use onebrc::format_nums; @@ -12,22 +12,17 @@ const DEFAULT_HASHMAP_LENGTH: usize = 10000; fn main() { print!("\x1b[2J"); - let stations:Mutex> = - Mutex::new(HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH)); thread::scope(|s| { - // Doing this allows us to not "move" stations into the closure - // and remove the necessity for reference counting (Arc) - // no performance improvement but less complex - let stations = &stations; + let mut stations: HashMap = HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); + let (tx, rx) = mpsc::channel(); let now = Instant::now(); - let cores: usize = thread::available_parallelism().unwrap().into(); - let chunk_length = 1_000_000_000 / cores; for i in 0..cores { let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); let reader = BufReader::new(file); let line_chunk = reader.lines().skip(chunk_length * i).take(chunk_length); + let tx = tx.clone(); s.spawn(move || { let mut t_stations: HashMap = HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); @@ -61,18 +56,21 @@ fn main() { } }); print!("\x1b[{print_line};60HTime reading lines in thread {i}={} ms", now_read_line.elapsed().as_millis()); - for (station, measurements) in t_stations.iter() { - let mut stations_guard = stations.lock().expect("Error while locking"); - let joined_measurements_options = stations_guard.get_mut(station.as_str()); - if let Some(joined_measurements) = joined_measurements_options { - joined_measurements.merge(measurements); - } else { - stations_guard.insert(station.to_owned(), *measurements); - } - } + let _ = tx.send(t_stations); }); } - let mut stations: Vec = stations.lock().unwrap().iter().map(|(station, measurements)| { + drop(tx); + while let Ok(t_stations) = rx.recv() { + for (station, measurements) in t_stations.iter() { + let joined_measurements_options = stations.get_mut(station.as_str()); + if let Some(joined_measurements) = joined_measurements_options { + joined_measurements.merge(measurements); + } else { + stations.insert(station.to_owned(), *measurements); + } + } + } + let mut stations: Vec = stations.iter().map(|(station, measurements)| { let measurements = measurements.to_string(); format!("{station}={measurements}") }).collect();