diff --git a/src/main/rust/src/implementations/multi_threaded_smol.rs b/src/main/rust/src/implementations/multi_threaded_smol.rs index 6d0b4b9..4f0f3bd 100644 --- a/src/main/rust/src/implementations/multi_threaded_smol.rs +++ b/src/main/rust/src/implementations/multi_threaded_smol.rs @@ -1,134 +1,129 @@ -use std::collections::HashMap; use smol::fs::File; use smol::io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom}; +use std::collections::HashMap; use crate::models::station_measurements::StationMeasurements; use crate::utils::parse; -use std::sync::mpsc; +use crate::utils::parse::hashstr; +use easy_parallel::Parallel; use std::thread; use std::time::Instant; -use crate::utils::parse::hashstr; const DEFAULT_HASHMAP_LENGTH: usize = 10000; pub fn run() { const FILE_PATH: &str = "../../../measurements.txt"; let now = Instant::now(); - thread::scope(|s| { - let mut stations: HashMap = - HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); - let (tx, rx) = mpsc::channel(); - let cores = thread::available_parallelism().unwrap().into(); - let bounds = smol::block_on(async { - let mut file = File::open(FILE_PATH) - .await - .expect("File measurements.txt not found"); + let mut stations: HashMap = + HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); + let cores = thread::available_parallelism().unwrap().into(); + let bounds = smol::block_on(async { + let mut file = File::open(FILE_PATH) + .await + .expect("File measurements.txt not found"); + let mut reader = BufReader::new(&mut file); + let file_length = reader.seek(SeekFrom::End(0)).await.unwrap(); + let chunk_length = file_length as usize / cores; + let mut bounds = Vec::with_capacity(cores + 1); + bounds.push(0); + for i in 1..cores { let mut reader = BufReader::new(&mut file); - let file_length = reader.seek(SeekFrom::End(0)).await.unwrap(); - let chunk_length = file_length as usize / cores; - let mut bounds = Vec::with_capacity(cores + 1); - bounds.push(0); - for i in 1..cores { - let mut reader = BufReader::new(&mut file); - let mut byte_start = chunk_length * i; - reader - .seek(SeekFrom::Start(byte_start as u64)) - .await - .expect("could not seek"); - let mut line = Vec::with_capacity(108); - let line_len = reader - .read_until(b'\n', &mut line) - .await - .expect("could not read bytes"); - byte_start += line_len; - bounds.push(byte_start as u64); - } - bounds.push(file_length); - bounds - }); - for i in 0..cores { - let tx = tx.clone(); + let mut byte_start = chunk_length * i; + reader + .seek(SeekFrom::Start(byte_start as u64)) + .await + .expect("could not seek"); + let mut line = Vec::with_capacity(108); + let line_len = reader + .read_until(b'\n', &mut line) + .await + .expect("could not read bytes"); + byte_start += line_len; + bounds.push(byte_start as u64); + } + bounds.push(file_length); + bounds + }); + let t_stations_vec = Parallel::new() + .each(0..cores, |i| { let mut currposition = *bounds.get(i).unwrap(); let end = *bounds.get(i + 1).unwrap(); - s.spawn(move || { - smol::block_on(async { - let mut file = File::open(FILE_PATH) + smol::block_on(async { + let mut file = File::open(FILE_PATH) + .await + .expect("File measurements.txt not found"); + let mut reader = BufReader::new(&mut file); + reader.seek(SeekFrom::Start(currposition)).await.unwrap(); + let mut t_stations: HashMap = + HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); + let mut line = Vec::with_capacity(108); + loop { + let line_len = reader + .read_until(b'\n', &mut line) .await - .expect("File measurements.txt not found"); - let mut reader = BufReader::new(&mut file); - reader.seek(SeekFrom::Start(currposition)).await.unwrap(); - let mut t_stations: HashMap = - HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); - let mut line = Vec::with_capacity(108); - loop { - let line_len = reader - .read_until(b'\n', &mut line) - .await - .expect("could not read bytes"); - if line_len == 0 { - break; - } - let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); - let hash = hashstr(station); - let station = unsafe { std::str::from_utf8_unchecked(station) }; - let temp = parse::temp(temp.split_last().unwrap().1); - let measurements_option = t_stations.get_mut(&hash); - if let Some((_, measurements)) = measurements_option { - measurements.update(temp); - } else { - let measurements = StationMeasurements { - min: temp, - max: temp, - count: 1, - sum: temp, - }; - t_stations.insert(hash, (station.to_string(), measurements)); - } - currposition += line_len as u64; - if currposition >= end { - break; - } - line.clear(); + .expect("could not read bytes"); + if line_len == 0 { + break; } - let _ = tx.send(t_stations); - }) - }); - } - drop(tx); - while let Ok(t_stations) = rx.recv() { - for (hash, (station, measurements)) in t_stations.iter() { - let joined_measurements_options = stations.get_mut(hash); - if let Some((_, joined_measurements)) = joined_measurements_options { - joined_measurements.merge(measurements); - } else { - stations.insert(*hash, (station.to_owned(), *measurements)); + let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); + let hash = hashstr(station); + let station = unsafe { std::str::from_utf8_unchecked(station) }; + let temp = parse::temp(temp.split_last().unwrap().1); + let measurements_option = t_stations.get_mut(&hash); + if let Some((_, measurements)) = measurements_option { + measurements.update(temp); + } else { + let measurements = StationMeasurements { + min: temp, + max: temp, + count: 1, + sum: temp, + }; + t_stations.insert(hash, (station.to_string(), measurements)); + } + currposition += line_len as u64; + if currposition >= end { + break; + } + line.clear(); } + t_stations + }) + }) + .run(); + for t_stations in t_stations_vec { + for (hash, (station, measurements)) in t_stations.iter() { + let joined_measurements_options = stations.get_mut(hash); + if let Some((_, joined_measurements)) = joined_measurements_options { + joined_measurements.merge(measurements); + } else { + stations.insert(*hash, (station.to_owned(), *measurements)); } } - let mut stations: Vec = stations - .iter() - .map(|(_, (station, measurements))| { - let measurements = measurements.to_string(); - #[cfg(feature = "json")] - { - format!("{{\"{station}\":\"{measurements}\"}}") - } - #[cfg(not(feature = "json"))] - { - format!("{station}={measurements}") - } - }) - .collect(); - stations.sort(); - let stations = stations.join(","); - #[cfg(feature = "json")] - { - println!("\n\n[{stations}]"); - } - #[cfg(not(feature = "json"))] - { - println!("\n\n{{{stations}}}"); - } - println!("\n\nTime={} ms", now.elapsed().as_millis()); - }); + } + let mut stations: Vec = stations + .iter() + .map(|(_, (station, measurements))| { + let measurements = measurements.to_string(); + #[cfg(feature = "json")] + { + format!("{{\"{station}\":\"{measurements}\"}}") + } + #[cfg(not(feature = "json"))] + { + format!("{station}={measurements}") + } + }) + .collect(); + stations.sort(); + let stations = stations.join(","); + #[cfg(feature = "json")] + { + println!("\n\n[{stations}]"); + } + #[cfg(not(feature = "json"))] + { + println!("\n\n{{{stations}}}"); + } + println!("\n\nTime={} ms", now.elapsed().as_millis()); }