diff --git a/src/main/rust/src/bin/multi_threaded_smol.rs b/src/main/rust/src/bin/multi_threaded_smol.rs new file mode 100644 index 0000000..97064ab --- /dev/null +++ b/src/main/rust/src/bin/multi_threaded_smol.rs @@ -0,0 +1,5 @@ +use onebrc::implementations::multi_threaded_smol::run; + +fn main() { + run(); +} diff --git a/src/main/rust/src/implementations.rs b/src/main/rust/src/implementations.rs index 1ee7f34..986d35d 100644 --- a/src/main/rust/src/implementations.rs +++ b/src/main/rust/src/implementations.rs @@ -1,6 +1,7 @@ pub mod flare_flo; pub mod libraries; pub mod multi_threaded; +pub mod multi_threaded_smol; pub mod multi_threaded_structured; pub mod phcs; pub mod polars; diff --git a/src/main/rust/src/implementations/multi_threaded_smol.rs b/src/main/rust/src/implementations/multi_threaded_smol.rs new file mode 100644 index 0000000..85f8a7e --- /dev/null +++ b/src/main/rust/src/implementations/multi_threaded_smol.rs @@ -0,0 +1,134 @@ +use smol::fs::File; +use smol::io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom}; + +use crate::models::station_measurements::StationMeasurements; +use crate::utils::parse; +use crate::utils::parse::hashstr; +use std::collections::HashMap; +use std::sync::mpsc; +use std::thread; +use std::time::Instant; + +const DEFAULT_HASHMAP_LENGTH: usize = 10000; + +pub fn run() { + const FILE_PATH: &str = "../../../measurements.txt"; + let now = Instant::now(); + thread::scope(|s| { + let mut stations: HashMap = + HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); + let (tx, rx) = mpsc::channel(); + let cores = thread::available_parallelism().unwrap().into(); + let bounds = smol::block_on(async { + let mut file = File::open(FILE_PATH) + .await + .expect("File measurements.txt not found"); + let mut reader = BufReader::new(&mut file); + let file_length = reader.seek(SeekFrom::End(0)).await.unwrap(); + let chunk_length = file_length as usize / cores; + let mut bounds = Vec::with_capacity(cores + 1); + bounds.push(0); + for i in 1..cores { + let mut reader = BufReader::new(&mut file); + let mut byte_start = chunk_length * i; + reader + .seek(SeekFrom::Start(byte_start as u64)) + .await + .expect("could not seek"); + let mut line = Vec::with_capacity(108); + let line_len = reader + .read_until(b'\n', &mut line) + .await + .expect("could not read bytes"); + byte_start += line_len; + bounds.push(byte_start as u64); + } + bounds.push(file_length); + bounds + }); + for i in 0..cores { + let tx = tx.clone(); + let mut currposition = *bounds.get(i).unwrap(); + let end = *bounds.get(i + 1).unwrap(); + s.spawn(move || { + smol::block_on(async { + let mut file = File::open(FILE_PATH) + .await + .expect("File measurements.txt not found"); + let mut reader = BufReader::new(&mut file); + reader.seek(SeekFrom::Start(currposition)).await.unwrap(); + let mut t_stations: HashMap = + HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); + let mut line = Vec::with_capacity(108); + loop { + let line_len = reader + .read_until(b'\n', &mut line) + .await + .expect("could not read bytes"); + if line_len == 0 { + break; + } + let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); + let hash = hashstr(station); + let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) }; + let temp = parse::temp(temp.split_last().unwrap().1); + let measurements_option = t_stations.get_mut(&hash); + if let Some((_, measurements)) = measurements_option { + measurements.update(temp); + } else { + let measurements = StationMeasurements { + min: temp, + max: temp, + count: 1, + sum: temp, + }; + t_stations.insert(hash, (station, measurements)); + } + currposition += line_len as u64; + if currposition >= end { + break; + } + line.clear(); + } + let _ = tx.send(t_stations); + }) + }); + } + drop(tx); + while let Ok(t_stations) = rx.recv() { + for (&hash, (station, measurements)) in t_stations.iter() { + let joined_measurements_options = stations.get_mut(&hash); + if let Some((_, joined_measurements)) = joined_measurements_options { + joined_measurements.merge(measurements); + } else { + stations.insert(hash, (station.to_owned(), *measurements)); + } + } + } + let mut stations: Vec = stations + .iter() + .map(|(_, (station, measurements))| { + let measurements = measurements.to_string(); + #[cfg(feature = "json")] + { + format!("{{\"{station}\":\"{measurements}\"}}") + } + #[cfg(not(feature = "json"))] + { + format!("{station}={measurements}") + } + }) + .collect(); + stations.sort(); + let stations = stations.join(","); + #[cfg(feature = "json")] + { + println!("\n\n[{stations}]"); + } + #[cfg(not(feature = "json"))] + { + println!("\n\n{{{stations}}}"); + } + println!("\n\nTime={} ms", now.elapsed().as_millis()); + }); +}