From 40627f9aebb2365dde488f433f8993780e7fbf4d Mon Sep 17 00:00:00 2001 From: Fabian Schmidt Date: Fri, 2 Aug 2024 11:43:23 +0200 Subject: [PATCH] add solution using libraries to see how fast I can get. For now no difference --- src/main/rust/src/bin/libraries.rs | 5 + src/main/rust/src/implementations.rs | 1 + .../rust/src/implementations/libraries.rs | 121 ++++++++++++++++++ 3 files changed, 127 insertions(+) create mode 100644 src/main/rust/src/bin/libraries.rs create mode 100644 src/main/rust/src/implementations/libraries.rs diff --git a/src/main/rust/src/bin/libraries.rs b/src/main/rust/src/bin/libraries.rs new file mode 100644 index 0000000..55a333b --- /dev/null +++ b/src/main/rust/src/bin/libraries.rs @@ -0,0 +1,5 @@ +use onebrc::implementations::libraries::run; + +fn main() { + run(); +} \ No newline at end of file diff --git a/src/main/rust/src/implementations.rs b/src/main/rust/src/implementations.rs index 0dfe0e4..c6e6edc 100644 --- a/src/main/rust/src/implementations.rs +++ b/src/main/rust/src/implementations.rs @@ -4,3 +4,4 @@ pub mod multi_threaded; pub mod polars; pub mod flare_flo; pub mod phcs; +pub mod libraries; diff --git a/src/main/rust/src/implementations/libraries.rs b/src/main/rust/src/implementations/libraries.rs new file mode 100644 index 0000000..dab33a3 --- /dev/null +++ b/src/main/rust/src/implementations/libraries.rs @@ -0,0 +1,121 @@ +use std::collections::HashMap; +use std::io::{BufRead, Seek, SeekFrom}; +use std::sync::mpsc; +use std::time::Instant; +use std::{fs::File, io::BufReader, thread}; +use memmap::MmapOptions; +use crate::models::station_measurements::StationMeasurements; +use crate::utils::parse; +use crate::utils::parse::hashstr; + +const DEFAULT_HASHMAP_LENGTH: usize = 10000; + +pub fn run() { + const FILE_PATH: &str = "../../../measurements.txt"; + let now = Instant::now(); + thread::scope(|s| { + let mut stations: HashMap = + HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); + let (tx, rx) = mpsc::channel(); + let cores = thread::available_parallelism().unwrap().into(); + let file = File::open(FILE_PATH).expect("File measurements.txt not found"); + let mmap = unsafe { MmapOptions::new().map(&file).unwrap() }; + let file_length = mmap.len(); + let chunk_length = file_length / cores; + let mut bounds = Vec::with_capacity(cores + 1); + bounds.push(0); + for i in 1..cores { + let mut reader = BufReader::new(&file); + let mut byte_start = chunk_length * i; + reader + .seek(SeekFrom::Start(byte_start as u64)) + .expect("could not seek"); + let mut line = Vec::with_capacity(108); + let line_len = reader + .read_until(b'\n', &mut line) + .expect("could not read bytes"); + byte_start += line_len; + bounds.push(byte_start); + } + bounds.push(file_length); + for i in 0..cores { + let tx = tx.clone(); + let mut currposition = *bounds.get(i).unwrap(); + let end = *bounds.get(i + 1).unwrap(); + s.spawn(move || { + let file = File::open(FILE_PATH).expect("File measurements.txt not found"); + let mut reader = BufReader::new(&file); + reader.seek(SeekFrom::Start(currposition as u64)).unwrap(); + let mut t_stations: HashMap = + HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); + let mut line = Vec::with_capacity(108); + loop { + let line_len = reader + .read_until(b'\n', &mut line) + .expect("could not read bytes"); + if line_len == 0 { + break; + } + let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); + let hash = hashstr(station); + let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) }; + let temp = parse::temp(temp.split_last().unwrap().1); + let measurements_option = t_stations.get_mut(&hash); + if let Some((_, measurements)) = measurements_option { + measurements.update(temp); + } else { + let measurements = StationMeasurements { + min: temp, + max: temp, + count: 1, + sum: temp, + }; + t_stations.insert(hash, (station, measurements)); + } + currposition += line_len; + if currposition >= end { + break; + } + line.clear(); + } + let _ = tx.send(t_stations); + }); + } + drop(tx); + while let Ok(t_stations) = rx.recv() { + for (&hash, (station, measurements)) in t_stations.iter() { + let joined_measurements_options = stations.get_mut(&hash); + if let Some((_, joined_measurements)) = joined_measurements_options { + joined_measurements.merge(measurements); + } else { + stations.insert(hash, (station.to_owned(), *measurements)); + } + } + } + let mut stations: Vec = stations + .iter() + .map(|(_, (station, measurements))| { + let measurements = measurements.to_string(); + #[cfg(feature = "json")] + { + format!("{{\"{station}\":\"{measurements}\"}}") + } + #[cfg(not(feature = "json"))] + { + format!("{station}={measurements}") + } + }) + .collect(); + stations.sort(); + let stations = stations.join(","); + #[cfg(feature = "json")] + { + println!("\n\n[{stations}]"); + } + #[cfg(not(feature = "json"))] + { + println!("\n\n{{{stations}}}"); + } + println!("\n\nTime={} ms", now.elapsed().as_millis()); + }); +}