diff --git a/src/main/rust/src/bin/multi_threaded.rs b/src/main/rust/src/bin/multi_threaded.rs index b5be52c..d1860a4 100644 --- a/src/main/rust/src/bin/multi_threaded.rs +++ b/src/main/rust/src/bin/multi_threaded.rs @@ -1,76 +1,84 @@ use std::{ fs::File, io::{BufRead, BufReader}, - sync::{Arc, Mutex}, + sync::{Mutex}, thread, }; use std::collections::HashMap; use std::time::Instant; +use onebrc::format_nums; const DEFAULT_HASHMAP_LENGTH: usize = 10000; fn main() { - let now = Instant::now(); - let stations: Arc>> = - Arc::new(Mutex::new(HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH))); + print!("\x1b[2J"); + let stations:Mutex> = + Mutex::new(HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH)); + thread::scope(|s| { + // Doing this allows us to not "move" stations into the closure + // and remove the necessity for reference counting (Arc) + // no performance improvement but less complex + let stations = &stations; + let now = Instant::now(); - let cores: usize = thread::available_parallelism().unwrap().into(); + let cores: usize = thread::available_parallelism().unwrap().into(); - let chunk_length = 1_000_000_000 / cores; - let mut handles = vec![]; - for i in 0..cores { - let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); - let reader = BufReader::new(file); - let line_chunk = reader.lines().skip(chunk_length * i).take(chunk_length); - let stations_clone = stations.clone(); - let handle = thread::spawn(move || { - let mut t_stations: HashMap = - HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); + let chunk_length = 1_000_000_000 / cores; + for i in 0..cores { + let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); + let reader = BufReader::new(file); + let line_chunk = reader.lines().skip(chunk_length * i).take(chunk_length); + s.spawn(move || { + let mut t_stations: HashMap = + HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); - let now_read_line = Instant::now(); - println!("Start reading lines in thread {i}"); - line_chunk.for_each(|line| { - let line = line.expect("could not read line"); - let (station, temp) = line.split_once(';').expect("Error while splitting"); - let temp = onebrc::parse_temp(temp.as_bytes()); - let measurements_option = t_stations.get_mut(station); - if let Some(measurements) = measurements_option { - measurements.update(temp); - } else { - let measurements = onebrc::StationMeasurements { - min: temp, - max: temp, - count: 1, - sum: temp, - }; - t_stations.insert(station.to_owned(), measurements); + let now_read_line = Instant::now(); + let print_line = i + 1; + let mut line_num = 0; + line_chunk.for_each(|line| { + if line_num == 0 { + print!("\x1b[{print_line};30HStart read line {}ms", now_read_line.elapsed().as_millis()); + } + if line_num % 10000 == 0 { + let formatted = format_nums(line_num); + print!("\x1b[{print_line};0HThread #{i:0>2}: {formatted}"); + } + line_num += 1; + let line = line.expect("could not read line"); + let (station, temp) = line.split_once(';').expect("Error while splitting"); + let temp = onebrc::parse_temp(temp.as_bytes()); + let measurements_option = t_stations.get_mut(station); + if let Some(measurements) = measurements_option { + measurements.update(temp); + } else { + let measurements = onebrc::StationMeasurements { + min: temp, + max: temp, + count: 1, + sum: temp, + }; + t_stations.insert(station.to_owned(), measurements); + } + }); + print!("\x1b[{print_line};60HTime reading lines in thread {i}={} ms", now_read_line.elapsed().as_millis()); + for (station, measurements) in t_stations.iter() { + let mut stations_guard = stations.lock().expect("Error while locking"); + let joined_measurements_options = stations_guard.get_mut(station.as_str()); + if let Some(joined_measurements) = joined_measurements_options { + joined_measurements.merge(measurements); + } else { + stations_guard.insert(station.to_owned(), *measurements); + } } }); - println!("Time reading lines in thread {i}={} μs", now_read_line.elapsed().as_micros()); - let now_insert_line = Instant::now(); - println!("Start inserting lines in thread {i}"); - for (station, measurements) in t_stations.iter() { - let mut stations_guard = stations_clone.lock().expect("Error while locking"); - let joined_measurements_options = stations_guard.get_mut(station.as_str()); - if let Some(joined_measurements) = joined_measurements_options { - joined_measurements.merge(measurements); - } else { - stations_guard.insert(station.to_owned(), *measurements); - } - } - println!("Time inserting lines in thread {i}={} μs", now_insert_line.elapsed().as_micros()); - }); - handles.push(handle); - } - for handle in handles { - handle.join().unwrap(); - } - let mut stations: Vec = stations.lock().unwrap().iter().map(|(&ref station, &ref measurements)| { - let measurements = measurements.to_string(); - format!("{station}={measurements}") - }).collect(); - stations.sort(); - let stations = stations.join(","); - println!("{{{stations}}}"); - println!("Time={} μs", now.elapsed().as_micros()); + } + let mut stations: Vec = stations.lock().unwrap().iter().map(|(station, measurements)| { + let measurements = measurements.to_string(); + format!("{station}={measurements}") + }).collect(); + stations.sort(); + let _stations = stations.join(","); + // println!("{{{stations}}}"); + println!("\n\nTime={} ms", now.elapsed().as_millis()); + }); } diff --git a/src/main/rust/src/bin/single_thread.rs b/src/main/rust/src/bin/single_thread.rs index fe4c371..6037fec 100644 --- a/src/main/rust/src/bin/single_thread.rs +++ b/src/main/rust/src/bin/single_thread.rs @@ -32,7 +32,7 @@ fn main() { stations.insert(station.to_owned(), measurements); } } - let mut stations: Vec = stations.iter().map(|(&ref station, &ref measurements)| { + let mut stations: Vec = stations.iter().map(|(station, measurements)| { let measurements = measurements.to_string(); format!("{station}={measurements}") }).collect();