diff --git a/src/main/rust/Cargo.lock b/src/main/rust/Cargo.lock index e1545cb..b607136 100644 --- a/src/main/rust/Cargo.lock +++ b/src/main/rust/Cargo.lock @@ -45,6 +45,18 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + +[[package]] +name = "anstyle" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1" + [[package]] name = "argminmax" version = "0.6.2" @@ -146,6 +158,12 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cc" version = "1.1.7" @@ -174,6 +192,58 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + +[[package]] +name = "clap" +version = "4.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fbb260a053428790f3de475e304ff84cdbc4face759ea7a3e64c1edd938a7fc" +dependencies = [ + "clap_builder", +] + +[[package]] +name = "clap_builder" +version = "4.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64b17d7ea74e9f833c7dbf2cbe4fb12ff26783eda4782a8975b72f895c9b4d99" +dependencies = [ + "anstyle", + "clap_lex", +] + +[[package]] +name = "clap_lex" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" + [[package]] name = "comfy-table" version = "7.1.1" @@ -192,6 +262,42 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "is-terminal", + "itertools", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools", +] + [[package]] name = "crossbeam-channel" version = "0.5.13" @@ -257,6 +363,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + [[package]] name = "dyn-clone" version = "1.0.17" @@ -324,6 +436,16 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "half" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" +dependencies = [ + "cfg-if", + "crunchy", +] + [[package]] name = "hashbrown" version = "0.14.5" @@ -347,6 +469,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + [[package]] name = "home" version = "0.5.9" @@ -389,6 +517,26 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "is-terminal" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys", +] + +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.11" @@ -547,6 +695,7 @@ name = "onebrc" version = "0.1.0" dependencies = [ "bstr", + "criterion", "fast-float", "libc", "memchr", @@ -556,6 +705,12 @@ dependencies = [ "rustc-hash", ] +[[package]] +name = "oorandom" +version = "11.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" + [[package]] name = "parking_lot" version = "0.12.3" @@ -600,6 +755,34 @@ dependencies = [ "array-init-cursor", ] +[[package]] +name = "plotters" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15b6eccb8484002195a3e44fe65a4ce8e93a625797a063735536fd59cb01cf3" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "414cec62c6634ae900ea1c56128dfe87cf63e7caece0852ec76aba307cebadb7" + +[[package]] +name = "plotters-svg" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81b30686a7d9c3e010b84284bdd26a29f2138574f52f5eb6f794fc0ad924e705" +dependencies = [ + "plotters-backend", +] + [[package]] name = "polars" version = "0.36.2" @@ -1038,6 +1221,15 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1220,6 +1412,16 @@ dependencies = [ "syn 2.0.72", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "unicode-ident" version = "1.0.12" @@ -1238,6 +1440,16 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -1298,6 +1510,16 @@ version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" +[[package]] +name = "web-sys" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77afa9a11836342370f4817622a2f0f418b134426d91a82dfb48f532d2ec13ef" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "winapi" version = "0.3.9" @@ -1314,6 +1536,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d4cc384e1e73b93bafa6fb4f1df8c41695c8a91cf9c4c64358067d15a7b6c6b" +dependencies = [ + "windows-sys", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/src/main/rust/Cargo.toml b/src/main/rust/Cargo.toml index ffa8c1d..b40b893 100644 --- a/src/main/rust/Cargo.toml +++ b/src/main/rust/Cargo.toml @@ -15,10 +15,37 @@ rayon = "1.10.0" rustc-hash = "2.0.0" libc = "0.2.155" +[dev-dependencies] +criterion = { version = "0.5", features = ["html_reports"] } + [features] json = [] unsafe = [] +[[bench]] +name = "reference_impl" +harness = false + +[[bench]] +name = "single_thread" +harness = false + +[[bench]] +name = "multi_threaded" +harness = false + +[[bench]] +name = "polars" +harness = false + +[[bench]] +name = "flare_flo" +harness = false + +[[bench]] +name = "phcs" +harness = false + [profile.release] lto = "fat" strip = "symbols" diff --git a/src/main/rust/benches/flare_flo.rs b/src/main/rust/benches/flare_flo.rs new file mode 100644 index 0000000..32abb85 --- /dev/null +++ b/src/main/rust/benches/flare_flo.rs @@ -0,0 +1,9 @@ +use criterion::{Criterion, criterion_group, criterion_main}; +use onebrc::implementations::flare_flo::run; + +pub fn criterion_benchmark(c: &mut Criterion) { + c.bench_function("flareflo", |b| {b.iter(|| run())}); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); \ No newline at end of file diff --git a/src/main/rust/benches/multi_threaded.rs b/src/main/rust/benches/multi_threaded.rs new file mode 100644 index 0000000..aadcb82 --- /dev/null +++ b/src/main/rust/benches/multi_threaded.rs @@ -0,0 +1,9 @@ +use criterion::{Criterion, criterion_group, criterion_main}; +use onebrc::implementations::multi_threaded::run; + +pub fn criterion_benchmark(c: &mut Criterion) { + c.bench_function("multithreaded", |b| {b.iter(|| run())}); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); \ No newline at end of file diff --git a/src/main/rust/benches/phcs.rs b/src/main/rust/benches/phcs.rs new file mode 100644 index 0000000..c6a227a --- /dev/null +++ b/src/main/rust/benches/phcs.rs @@ -0,0 +1,9 @@ +use criterion::{Criterion, criterion_group, criterion_main}; +use onebrc::implementations::phcs::run; + +pub fn criterion_benchmark(c: &mut Criterion) { + c.bench_function("phcs", |b| {b.iter(|| run())}); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); \ No newline at end of file diff --git a/src/main/rust/benches/polars.rs b/src/main/rust/benches/polars.rs new file mode 100644 index 0000000..56f454f --- /dev/null +++ b/src/main/rust/benches/polars.rs @@ -0,0 +1,8 @@ +use criterion::{Criterion, criterion_group, criterion_main}; + +pub fn criterion_benchmark(c: &mut Criterion) { + c.bench_function("polars", |b| {b.iter(|| /*run_polars()*/ ())}); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); \ No newline at end of file diff --git a/src/main/rust/benches/reference_impl.rs b/src/main/rust/benches/reference_impl.rs new file mode 100644 index 0000000..8a6b59c --- /dev/null +++ b/src/main/rust/benches/reference_impl.rs @@ -0,0 +1,9 @@ +use criterion::{Criterion, criterion_group, criterion_main}; +use onebrc::implementations::reference_impl::run; + +pub fn criterion_benchmark(c: &mut Criterion) { + c.bench_function("reference", |b| {b.iter(|| run())}); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); \ No newline at end of file diff --git a/src/main/rust/benches/single_thread.rs b/src/main/rust/benches/single_thread.rs new file mode 100644 index 0000000..06b8db8 --- /dev/null +++ b/src/main/rust/benches/single_thread.rs @@ -0,0 +1,9 @@ +use criterion::{Criterion, criterion_group, criterion_main}; +use onebrc::implementations::single_thread::run; + +pub fn criterion_benchmark(c: &mut Criterion) { + c.bench_function("singlethread", |b| {b.iter(|| run())}); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); \ No newline at end of file diff --git a/src/main/rust/src/bin/flare_flo.rs b/src/main/rust/src/bin/flare_flo.rs new file mode 100644 index 0000000..046f5bf --- /dev/null +++ b/src/main/rust/src/bin/flare_flo.rs @@ -0,0 +1,5 @@ +use onebrc::implementations::flare_flo::run; + +fn main() { + run(); +} diff --git a/src/main/rust/src/bin/multi_threaded.rs b/src/main/rust/src/bin/multi_threaded.rs index 84a3369..12d3a77 100644 --- a/src/main/rust/src/bin/multi_threaded.rs +++ b/src/main/rust/src/bin/multi_threaded.rs @@ -1,121 +1,5 @@ -#![feature(slice_split_once)] - -use std::collections::HashMap; -use std::io::{BufRead, Seek, SeekFrom}; -use std::sync::mpsc; -use std::time::Instant; -use std::{fs::File, io::BufReader, thread}; - -use onebrc::{hashstr, parse_temp, StationMeasurements}; - -const DEFAULT_HASHMAP_LENGTH: usize = 10000; +use onebrc::implementations::multi_threaded::run; fn main() { - const FILE_PATH: &str = "../../../measurements.txt"; - let now = Instant::now(); - thread::scope(|s| { - let mut stations: HashMap = - HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); - let (tx, rx) = mpsc::channel(); - let cores = thread::available_parallelism().unwrap().into(); - let file = File::open(FILE_PATH).expect("File measurements.txt not found"); - let mut reader = BufReader::new(&file); - let file_length = reader.seek(SeekFrom::End(0)).unwrap(); - let chunk_length = file_length as usize / cores; - let mut bounds = Vec::with_capacity(cores + 1); - bounds.push(0); - for i in 1..cores { - let mut reader = BufReader::new(&file); - let mut byte_start = chunk_length * i; - reader - .seek(SeekFrom::Start(byte_start as u64)) - .expect("could not seek"); - let mut line = Vec::with_capacity(108); - let line_len = reader - .read_until(b'\n', &mut line) - .expect("could not read bytes"); - byte_start += line_len; - bounds.push(byte_start as u64); - } - bounds.push(file_length); - for i in 0..cores { - let tx = tx.clone(); - let mut currposition = *bounds.get(i).unwrap(); - let end = *bounds.get(i + 1).unwrap(); - s.spawn(move || { - let file = File::open(FILE_PATH).expect("File measurements.txt not found"); - let mut reader = BufReader::new(&file); - reader.seek(SeekFrom::Start(currposition)).unwrap(); - let mut t_stations: HashMap = - HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); - let mut line = Vec::with_capacity(108); - loop { - let line_len = reader - .read_until(b'\n', &mut line) - .expect("could not read bytes"); - if line_len == 0 { - break; - } - let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); - let hash = hashstr(station); - let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) }; - let temp = parse_temp(temp.split_last().unwrap().1); - let measurements_option = t_stations.get_mut(&hash); - if let Some((_, measurements)) = measurements_option { - measurements.update(temp); - } else { - let measurements = StationMeasurements { - min: temp, - max: temp, - count: 1, - sum: temp, - }; - t_stations.insert(hash, (station, measurements)); - } - currposition += line_len as u64; - if currposition >= end { - break; - } - line.clear(); - } - let _ = tx.send(t_stations); - }); - } - drop(tx); - while let Ok(t_stations) = rx.recv() { - for (&hash, (station, measurements)) in t_stations.iter() { - let joined_measurements_options = stations.get_mut(&hash); - if let Some((_, joined_measurements)) = joined_measurements_options { - joined_measurements.merge(measurements); - } else { - stations.insert(hash, (station.to_owned(), *measurements)); - } - } - } - let mut stations: Vec = stations - .iter() - .map(|(_, (station, measurements))| { - let measurements = measurements.to_string(); - #[cfg(feature = "json")] - { - format!("{{\"{station}\":\"{measurements}\"}}") - } - #[cfg(not(feature = "json"))] - { - format!("{station}={measurements}") - } - }) - .collect(); - stations.sort(); - let stations = stations.join(","); - #[cfg(feature = "json")] - { - println!("\n\n[{stations}]"); - } - #[cfg(not(feature = "json"))] - { - println!("\n\n{{{stations}}}"); - } - println!("\n\nTime={} ms", now.elapsed().as_millis()); - }); + run(); } diff --git a/src/main/rust/src/bin/phcs.rs b/src/main/rust/src/bin/phcs.rs index 5f54945..cda59db 100644 --- a/src/main/rust/src/bin/phcs.rs +++ b/src/main/rust/src/bin/phcs.rs @@ -1,201 +1,5 @@ -use std::collections::HashMap; -use std::env; -use std::fmt; -use std::fs::File; -use std::io; -use std::io::prelude::*; -use std::thread::{self, Scope, ScopedJoinHandle}; +use onebrc::implementations::phcs::run; -use onebrc::mmap::Mmap; -use onebrc::mmap::MmapChunkIterator; - -// Defined in challenge spec -const MAX_STATIONS: usize = 10000; -const NUM_CONSUMERS: usize = 32; -const FIXED_POINT_DIVISOR: f64 = 10.0; - -struct StationData { - min_temp: i32, - max_temp: i32, - count: i32, - temp_sum: i32, -} - -impl fmt::Display for StationData { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "{:.1}/{:.1}/{:.1}", - (self.min_temp as f64 / FIXED_POINT_DIVISOR), - self.get_mean(), - (self.max_temp as f64 / FIXED_POINT_DIVISOR) - ) - } -} - -/// Efficiently handles station statistics. Avoids using floating-point arithmetic to speed-up processing. -/// The mean is only calculated on demand, so we avoid calculating it as we read the file -impl StationData { - fn new(temp: i32) -> Self { - Self { - min_temp: temp, - max_temp: temp, - count: 1, - temp_sum: temp, - } - } - - fn to_bytes(&self) -> Vec { - format!( - "{:.1}/{:.1}/{:.1}", - (self.min_temp as f64 / FIXED_POINT_DIVISOR), - self.get_mean(), - (self.max_temp as f64 / FIXED_POINT_DIVISOR) - ) - .into_bytes() - } - - fn get_mean(&self) -> f64 { - (self.temp_sum as f64 / self.count as f64) / FIXED_POINT_DIVISOR - } - - fn update_from(&mut self, temp: i32) { - self.max_temp = self.max_temp.max(temp); - self.min_temp = self.min_temp.min(temp); - self.count += 1; - self.temp_sum += temp; - } - fn update_from_station(&mut self, src: &mut Self) { - self.max_temp = self.max_temp.max(src.max_temp); - self.min_temp = self.min_temp.min(src.min_temp); - self.temp_sum += src.temp_sum; - self.count += src.count; - } - - #[inline] - fn parse_temp(bytes: &[u8]) -> i32 { - let mut result: i32 = 0; - let mut negative: bool = false; - for &b in bytes { - match b { - b'0'..=b'9' => { - result = result * 10 + (b as i32 - b'0' as i32); - } - b'.' => {} - b'-' => { - negative = true; - } - _ => panic!("wrong format for temperature"), - } - } - if negative { - return -result; - } - result - } - - #[inline] - fn parse_data(line: &[u8]) -> (&[u8], i32) { - let semicolon_pos = line.iter().position(|&b| b == b';').unwrap(); - let name = &line[..semicolon_pos]; - let temp = &line[semicolon_pos + 1..]; - (name, Self::parse_temp(temp)) - } -} - -fn merge_hashmaps<'a>( - mut dest: HashMap<&'a [u8], StationData>, - src: HashMap<&'a [u8], StationData>, -) -> HashMap<&'a [u8], StationData> { - for (k, mut v) in src { - dest.entry(k) - .and_modify(|e| e.update_from_station(&mut v)) - .or_insert(v); - } - dest -} - -/// Parses a chunk of the input as StationData values. -fn process_chunk<'a>(current_chunk_slice: &'a [u8]) -> HashMap<&'a [u8], StationData> { - let mut station_map: HashMap<&[u8], StationData> = HashMap::with_capacity(MAX_STATIONS); - let mut start = 0; - while let Some(end) = current_chunk_slice[start..].iter().position(|&b| b == b'\n') { - let line = ¤t_chunk_slice[start..start + end]; - let (name, temp) = StationData::parse_data(line); - station_map - .entry(name) - .and_modify(|e| e.update_from(temp)) - .or_insert(StationData::new(temp)); - start += end + 1; // move to the start of the next line - } - // If we don't find a \n, process the remaining data - if start < current_chunk_slice.len() { - let line = ¤t_chunk_slice[start..]; - let (name, temp) = StationData::parse_data(line); - station_map - .entry(name) - .and_modify(|e| e.update_from(temp)) - .or_insert(StationData::new(temp)); - } - station_map -} - -fn process_mmap<'scope, 'env>( - mmap: Mmap<'env>, - s: &'scope Scope<'scope, 'env>, -) -> HashMap<&'env [u8], StationData> { - let mut handlers: Vec>> = Vec::new(); - - for chunk in MmapChunkIterator::new(mmap, NUM_CONSUMERS) { - let h = s.spawn(move || process_chunk(chunk)); - handlers.push(h); - } - - let mut station_map: HashMap<&[u8], StationData> = HashMap::with_capacity(MAX_STATIONS); - for h in handlers { - let inner_station = h.join().unwrap(); - station_map = merge_hashmaps(station_map, inner_station); - } - station_map -} - -fn write_output_to_stdout(station_map: HashMap<&[u8], StationData>) -> io::Result<()> { - let mut stdout = io::stdout().lock(); - let mut buffer = Vec::new(); - - buffer.extend_from_slice(b"{"); - - let mut sorted_key_value_vec: Vec<_> = station_map.iter().collect(); - sorted_key_value_vec.sort_by_key(|e| e.0); - - for (i, (name, data)) in sorted_key_value_vec.iter().enumerate() { - if i > 0 { - buffer.extend_from_slice(b", "); - } - buffer.extend_from_slice(name); - buffer.extend_from_slice(b"="); - buffer.extend(data.to_bytes()); - } - - buffer.extend_from_slice(b"}"); - - stdout.write_all(&buffer) -} - -fn main() -> io::Result<()> { - // won't accept non-utf-8 args - let args: Vec = env::args().collect(); - let file_name = match args.get(2).clone() { - Some(fname) => fname, - None => "../../../measurements.txt", - }; - let f = File::open(file_name)?; - let mmap = Mmap::from_file(f); - - thread::scope(|s| { - let station_map = process_mmap(mmap, s); - write_output_to_stdout(station_map).unwrap(); - }); - - Ok(()) +fn main() { + let _ = run(); } diff --git a/src/main/rust/src/bin/polars.rs b/src/main/rust/src/bin/polars.rs index 9d81b89..b594166 100644 --- a/src/main/rust/src/bin/polars.rs +++ b/src/main/rust/src/bin/polars.rs @@ -1,35 +1,4 @@ -use polars::prelude::*; -use std::time::Instant; -use std::vec; - -fn run_polars() -> Result { - let now = Instant::now(); - - let f1: Field = Field::new("station", DataType::String); - let f2: Field = Field::new("measure", DataType::Float64); - let sc: Schema = Schema::from_iter(vec![f1, f2]); - - let q = LazyCsvReader::new("../../../measurements.txt") - .has_header(false) - .with_schema(Some(Arc::new(sc))) - .with_separator(b';') - .finish()? - .group_by(vec![col("station")]) - .agg(vec![ - col("measure").alias("min").min(), - col("measure").alias("mean").mean(), - col("measure").alias("max").max(), - ]) - .sort("station", Default::default()) - .with_streaming(true); - - let df = q.collect()?; - - println!("Time={} μs", now.elapsed().as_micros()); - - Ok(df) -} fn main() { - run_polars(); + // let _ = run_polars(); } diff --git a/src/main/rust/src/bin/reference_impl.rs b/src/main/rust/src/bin/reference_impl.rs new file mode 100644 index 0000000..ce595b2 --- /dev/null +++ b/src/main/rust/src/bin/reference_impl.rs @@ -0,0 +1,5 @@ +use onebrc::implementations::reference_impl::run; + +fn main() { + run(); +} diff --git a/src/main/rust/src/bin/single_thread.rs b/src/main/rust/src/bin/single_thread.rs index 8f6dda9..c309454 100644 --- a/src/main/rust/src/bin/single_thread.rs +++ b/src/main/rust/src/bin/single_thread.rs @@ -1,52 +1,5 @@ -#![feature(slice_split_once)] - -use std::collections::HashMap; -use std::fs::File; -use std::io::{BufRead, BufReader}; -use std::time::Instant; - -use onebrc::{hashstr, parse_temp, StationMeasurements}; - -const DEFAULT_HASHMAP_LENGTH: usize = 10000; +use onebrc::implementations::single_thread::run; fn main() { - let now = Instant::now(); - let mut stations: HashMap = - HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); - - let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); - let mut reader = BufReader::new(&file); - let mut line = Vec::with_capacity(108); - loop { - let line_len = reader.read_until(b'\n', &mut line).expect("could not read bytes"); - if line_len == 0 { - break; - } - let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); - let hash = hashstr(station); - let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) }; - let temp = parse_temp(temp.split_last().unwrap().1); - let measurements_option = stations.get_mut(&hash); - if let Some((_, measurements)) = measurements_option { - measurements.update(temp); - } else { - let measurements = StationMeasurements { - min: temp, - max: temp, - count: 1, - sum: temp, - }; - stations.insert(hash, (station, measurements)); - } - line.clear(); - } - let mut stations: Vec = stations.iter().map(|(_ , (station, measurements))| { - let measurements = measurements.to_string(); - format!("{station}={measurements}") - }).collect(); - stations.sort(); - let stations = stations.join(","); - println!("{{{stations}}}"); - println!("Time={} μs", now.elapsed().as_micros()); + run(); } - diff --git a/src/main/rust/src/implementations.rs b/src/main/rust/src/implementations.rs new file mode 100644 index 0000000..0dfe0e4 --- /dev/null +++ b/src/main/rust/src/implementations.rs @@ -0,0 +1,6 @@ +pub mod reference_impl; +pub mod single_thread; +pub mod multi_threaded; +pub mod polars; +pub mod flare_flo; +pub mod phcs; diff --git a/src/main/rust/src/bin/FlareFlo.rs b/src/main/rust/src/implementations/flare_flo.rs similarity index 97% rename from src/main/rust/src/bin/FlareFlo.rs rename to src/main/rust/src/implementations/flare_flo.rs index fd95f9c..b2f12ef 100644 --- a/src/main/rust/src/bin/FlareFlo.rs +++ b/src/main/rust/src/implementations/flare_flo.rs @@ -1,5 +1,3 @@ -#![feature(hash_raw_entry)] - use std::collections::HashMap; use std::env::args; use std::fs::File; @@ -95,7 +93,11 @@ impl Citymap { pub fn lookup(&mut self, lookup: &str) -> &mut City { let hash = hashstr(lookup); let builder = self.map.raw_entry_mut(); - &mut builder.from_key(&hash).or_insert(hash, (lookup.to_owned(), Default::default())).1.1 + &mut builder + .from_key(&hash) + .or_insert(hash, (lookup.to_owned(), Default::default())) + .1 + .1 } pub fn new() -> Self { Self { @@ -117,7 +119,7 @@ impl Citymap { } } -fn main() { +pub fn run() { let mut args = args(); let start = Instant::now(); diff --git a/src/main/rust/src/implementations/multi_threaded.rs b/src/main/rust/src/implementations/multi_threaded.rs new file mode 100644 index 0000000..109743f --- /dev/null +++ b/src/main/rust/src/implementations/multi_threaded.rs @@ -0,0 +1,120 @@ +use std::collections::HashMap; +use std::io::{BufRead, Seek, SeekFrom}; +use std::sync::mpsc; +use std::time::Instant; +use std::{fs::File, io::BufReader, thread}; +use crate::models::station_measurements::StationMeasurements; +use crate::utils::parse; +use crate::utils::parse::hashstr; + +const DEFAULT_HASHMAP_LENGTH: usize = 10000; + +pub fn run() { + const FILE_PATH: &str = "../../../measurements.txt"; + let now = Instant::now(); + thread::scope(|s| { + let mut stations: HashMap = + HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); + let (tx, rx) = mpsc::channel(); + let cores = thread::available_parallelism().unwrap().into(); + let file = File::open(FILE_PATH).expect("File measurements.txt not found"); + let mut reader = BufReader::new(&file); + let file_length = reader.seek(SeekFrom::End(0)).unwrap(); + let chunk_length = file_length as usize / cores; + let mut bounds = Vec::with_capacity(cores + 1); + bounds.push(0); + for i in 1..cores { + let mut reader = BufReader::new(&file); + let mut byte_start = chunk_length * i; + reader + .seek(SeekFrom::Start(byte_start as u64)) + .expect("could not seek"); + let mut line = Vec::with_capacity(108); + let line_len = reader + .read_until(b'\n', &mut line) + .expect("could not read bytes"); + byte_start += line_len; + bounds.push(byte_start as u64); + } + bounds.push(file_length); + for i in 0..cores { + let tx = tx.clone(); + let mut currposition = *bounds.get(i).unwrap(); + let end = *bounds.get(i + 1).unwrap(); + s.spawn(move || { + let file = File::open(FILE_PATH).expect("File measurements.txt not found"); + let mut reader = BufReader::new(&file); + reader.seek(SeekFrom::Start(currposition)).unwrap(); + let mut t_stations: HashMap = + HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); + let mut line = Vec::with_capacity(108); + loop { + let line_len = reader + .read_until(b'\n', &mut line) + .expect("could not read bytes"); + if line_len == 0 { + break; + } + let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); + let hash = hashstr(station); + let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) }; + let temp = parse::temp(temp.split_last().unwrap().1); + let measurements_option = t_stations.get_mut(&hash); + if let Some((_, measurements)) = measurements_option { + measurements.update(temp); + } else { + let measurements = StationMeasurements { + min: temp, + max: temp, + count: 1, + sum: temp, + }; + t_stations.insert(hash, (station, measurements)); + } + currposition += line_len as u64; + if currposition >= end { + break; + } + line.clear(); + } + let _ = tx.send(t_stations); + }); + } + drop(tx); + while let Ok(t_stations) = rx.recv() { + for (&hash, (station, measurements)) in t_stations.iter() { + let joined_measurements_options = stations.get_mut(&hash); + if let Some((_, joined_measurements)) = joined_measurements_options { + joined_measurements.merge(measurements); + } else { + stations.insert(hash, (station.to_owned(), *measurements)); + } + } + } + let mut stations: Vec = stations + .iter() + .map(|(_, (station, measurements))| { + let measurements = measurements.to_string(); + #[cfg(feature = "json")] + { + format!("{{\"{station}\":\"{measurements}\"}}") + } + #[cfg(not(feature = "json"))] + { + format!("{station}={measurements}") + } + }) + .collect(); + stations.sort(); + let stations = stations.join(","); + #[cfg(feature = "json")] + { + println!("\n\n[{stations}]"); + } + #[cfg(not(feature = "json"))] + { + println!("\n\n{{{stations}}}"); + } + println!("\n\nTime={} ms", now.elapsed().as_millis()); + }); +} diff --git a/src/main/rust/src/implementations/phcs.rs b/src/main/rust/src/implementations/phcs.rs new file mode 100644 index 0000000..dd5eca6 --- /dev/null +++ b/src/main/rust/src/implementations/phcs.rs @@ -0,0 +1,203 @@ +use std::collections::HashMap; +use std::env; +use std::fmt; +use std::fs::File; +use std::io; +use std::io::prelude::*; +use std::thread::{self, Scope, ScopedJoinHandle}; + +use crate::models::mmap::{Mmap, MmapChunkIterator}; + +// Defined in challenge spec +const MAX_STATIONS: usize = 10000; +const NUM_CONSUMERS: usize = 32; +const FIXED_POINT_DIVISOR: f64 = 10.0; + +struct StationData { + min_temp: i32, + max_temp: i32, + count: i32, + temp_sum: i32, +} + +impl fmt::Display for StationData { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "{:.1}/{:.1}/{:.1}", + (self.min_temp as f64 / FIXED_POINT_DIVISOR), + self.get_mean(), + (self.max_temp as f64 / FIXED_POINT_DIVISOR) + ) + } +} + +/// Efficiently handles station statistics. Avoids using floating-point arithmetic to speed-up processing. +/// The mean is only calculated on demand, so we avoid calculating it as we read the file +impl StationData { + fn new(temp: i32) -> Self { + Self { + min_temp: temp, + max_temp: temp, + count: 1, + temp_sum: temp, + } + } + + fn to_bytes(&self) -> Vec { + format!( + "{:.1}/{:.1}/{:.1}", + (self.min_temp as f64 / FIXED_POINT_DIVISOR), + self.get_mean(), + (self.max_temp as f64 / FIXED_POINT_DIVISOR) + ) + .into_bytes() + } + + fn get_mean(&self) -> f64 { + (self.temp_sum as f64 / self.count as f64) / FIXED_POINT_DIVISOR + } + + fn update_from(&mut self, temp: i32) { + self.max_temp = self.max_temp.max(temp); + self.min_temp = self.min_temp.min(temp); + self.count += 1; + self.temp_sum += temp; + } + fn update_from_station(&mut self, src: &mut Self) { + self.max_temp = self.max_temp.max(src.max_temp); + self.min_temp = self.min_temp.min(src.min_temp); + self.temp_sum += src.temp_sum; + self.count += src.count; + } + + #[inline] + fn parse_temp(bytes: &[u8]) -> i32 { + let mut result: i32 = 0; + let mut negative: bool = false; + for &b in bytes { + match b { + b'0'..=b'9' => { + result = result * 10 + (b as i32 - b'0' as i32); + } + b'.' => {} + b'-' => { + negative = true; + } + _ => panic!("wrong format for temperature"), + } + } + if negative { + return -result; + } + result + } + + #[inline] + fn parse_data(line: &[u8]) -> (&[u8], i32) { + let semicolon_pos = line.iter().position(|&b| b == b';').unwrap(); + let name = &line[..semicolon_pos]; + let temp = &line[semicolon_pos + 1..]; + (name, Self::parse_temp(temp)) + } +} + +fn merge_hashmaps<'a>( + mut dest: HashMap<&'a [u8], StationData>, + src: HashMap<&'a [u8], StationData>, +) -> HashMap<&'a [u8], StationData> { + for (k, mut v) in src { + dest.entry(k) + .and_modify(|e| e.update_from_station(&mut v)) + .or_insert(v); + } + dest +} + +/// Parses a chunk of the input as StationData values. +fn process_chunk<'a>(current_chunk_slice: &'a [u8]) -> HashMap<&'a [u8], StationData> { + let mut station_map: HashMap<&[u8], StationData> = HashMap::with_capacity(MAX_STATIONS); + let mut start = 0; + while let Some(end) = current_chunk_slice[start..] + .iter() + .position(|&b| b == b'\n') + { + let line = ¤t_chunk_slice[start..start + end]; + let (name, temp) = StationData::parse_data(line); + station_map + .entry(name) + .and_modify(|e| e.update_from(temp)) + .or_insert(StationData::new(temp)); + start += end + 1; // move to the start of the next line + } + // If we don't find a \n, process the remaining data + if start < current_chunk_slice.len() { + let line = ¤t_chunk_slice[start..]; + let (name, temp) = StationData::parse_data(line); + station_map + .entry(name) + .and_modify(|e| e.update_from(temp)) + .or_insert(StationData::new(temp)); + } + station_map +} + +fn process_mmap<'scope, 'env>( + mmap: Mmap<'env>, + s: &'scope Scope<'scope, 'env>, +) -> HashMap<&'env [u8], StationData> { + let mut handlers: Vec>> = Vec::new(); + + for chunk in MmapChunkIterator::new(mmap, NUM_CONSUMERS) { + let h = s.spawn(move || process_chunk(chunk)); + handlers.push(h); + } + + let mut station_map: HashMap<&[u8], StationData> = HashMap::with_capacity(MAX_STATIONS); + for h in handlers { + let inner_station = h.join().unwrap(); + station_map = merge_hashmaps(station_map, inner_station); + } + station_map +} + +fn write_output_to_stdout(station_map: HashMap<&[u8], StationData>) -> io::Result<()> { + let mut stdout = io::stdout().lock(); + let mut buffer = Vec::new(); + + buffer.extend_from_slice(b"{"); + + let mut sorted_key_value_vec: Vec<_> = station_map.iter().collect(); + sorted_key_value_vec.sort_by_key(|e| e.0); + + for (i, (name, data)) in sorted_key_value_vec.iter().enumerate() { + if i > 0 { + buffer.extend_from_slice(b", "); + } + buffer.extend_from_slice(name); + buffer.extend_from_slice(b"="); + buffer.extend(data.to_bytes()); + } + + buffer.extend_from_slice(b"}"); + + stdout.write_all(&buffer) +} + +pub fn run() -> io::Result<()> { + // won't accept non-utf-8 args + let args: Vec = env::args().collect(); + let file_name = match args.get(2).clone() { + Some(fname) => fname, + None => "../../../measurements.txt", + }; + let f = File::open(file_name)?; + let mmap = Mmap::from_file(f); + + thread::scope(|s| { + let station_map = process_mmap(mmap, s); + write_output_to_stdout(station_map).unwrap(); + }); + + Ok(()) +} diff --git a/src/main/rust/src/implementations/polars.rs b/src/main/rust/src/implementations/polars.rs new file mode 100644 index 0000000..303a9ae --- /dev/null +++ b/src/main/rust/src/implementations/polars.rs @@ -0,0 +1,31 @@ +use polars::prelude::*; +use std::time::Instant; +use std::vec; + +pub fn run_polars() -> Result { + let now = Instant::now(); + + let f1: Field = Field::new("station", DataType::String); + let f2: Field = Field::new("measure", DataType::Float64); + let sc: Schema = Schema::from_iter(vec![f1, f2]); + + let q = LazyCsvReader::new("../../../measurements.txt") + .has_header(false) + .with_schema(Some(Arc::new(sc))) + .with_separator(b';') + .finish()? + .group_by(vec![col("station")]) + .agg(vec![ + col("measure").alias("min").min(), + col("measure").alias("mean").mean(), + col("measure").alias("max").max(), + ]) + .sort("station", Default::default()) + .with_streaming(true); + + let df = q.collect()?; + + println!("Time={} μs", now.elapsed().as_micros()); + + Ok(df) +} \ No newline at end of file diff --git a/src/main/rust/src/bin/referenceImpl.rs b/src/main/rust/src/implementations/reference_impl.rs similarity index 99% rename from src/main/rust/src/bin/referenceImpl.rs rename to src/main/rust/src/implementations/reference_impl.rs index 66c25b0..bdc2c6a 100644 --- a/src/main/rust/src/bin/referenceImpl.rs +++ b/src/main/rust/src/implementations/reference_impl.rs @@ -1,9 +1,9 @@ use bstr::{BStr, ByteSlice}; use memmap::MmapOptions; -use rustc_hash::FxHashMap as HashMap; -use std::{fmt::Display, fs::File}; -use std::time::Instant; use rayon::prelude::*; +use rustc_hash::FxHashMap as HashMap; +use std::time::Instant; +use std::{fmt::Display, fs::File}; #[derive(Debug)] struct State { @@ -67,7 +67,7 @@ fn merge<'a>(a: &mut HashMap<&'a BStr, State>, b: &HashMap<&'a BStr, State>) { } } -fn main() { +pub fn run() { let now = Instant::now(); let cores: usize = std::thread::available_parallelism().unwrap().into(); let path = match std::env::args().skip(1).next() { diff --git a/src/main/rust/src/implementations/single_thread.rs b/src/main/rust/src/implementations/single_thread.rs new file mode 100644 index 0000000..2367974 --- /dev/null +++ b/src/main/rust/src/implementations/single_thread.rs @@ -0,0 +1,55 @@ +use std::collections::HashMap; +use std::fs::File; +use std::io::{BufRead, BufReader}; +use std::time::Instant; +use crate::models::station_measurements::StationMeasurements; +use crate::utils::parse; +use crate::utils::parse::hashstr; + +const DEFAULT_HASHMAP_LENGTH: usize = 10000; + +pub fn run() { + let now = Instant::now(); + let mut stations: HashMap = + HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); + + let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); + let mut reader = BufReader::new(&file); + let mut line = Vec::with_capacity(108); + loop { + let line_len = reader + .read_until(b'\n', &mut line) + .expect("could not read bytes"); + if line_len == 0 { + break; + } + let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); + let hash = hashstr(station); + let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) }; + let temp = parse::temp(temp.split_last().unwrap().1); + let measurements_option = stations.get_mut(&hash); + if let Some((_, measurements)) = measurements_option { + measurements.update(temp); + } else { + let measurements = StationMeasurements { + min: temp, + max: temp, + count: 1, + sum: temp, + }; + stations.insert(hash, (station, measurements)); + } + line.clear(); + } + let mut stations: Vec = stations + .iter() + .map(|(_, (station, measurements))| { + let measurements = measurements.to_string(); + format!("{station}={measurements}") + }) + .collect(); + stations.sort(); + let stations = stations.join(","); + println!("{{{stations}}}"); + println!("Time={} μs", now.elapsed().as_micros()); +} diff --git a/src/main/rust/src/lib.rs b/src/main/rust/src/lib.rs index 393c15e..3aaa843 100644 --- a/src/main/rust/src/lib.rs +++ b/src/main/rust/src/lib.rs @@ -1,212 +1,8 @@ #![feature(slice_as_chunks)] +#![feature(portable_simd)] +#![feature(slice_split_once)] +#![feature(hash_raw_entry)] -pub mod mmap; - -use std::fmt::Display; - -#[derive(Copy, Clone)] -pub struct StationMeasurements { - pub min: isize, - pub max: isize, - pub count: isize, - pub sum: isize, -} - -impl StationMeasurements { - pub fn update(&mut self, v: isize) { - self.min = self.min.min(v); - self.max = self.max.max(v); - self.count += 1; - self.sum += v; - } - - pub fn merge(&mut self, other: &Self) { - self.min = self.min.min(other.min); - self.max = self.max.max(other.max); - self.count += other.count; - self.sum += other.sum; - } -} - -impl Display for StationMeasurements { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let min = self.min as f64 / 10.0; - let max = self.max as f64 / 10.0; - let avg = (self.sum as f64 / self.count as f64) / 10.0; - write!(f, "{min}/{avg:.1}/{max}") - } -} - -pub fn format_nums(num: usize) -> String { - num.to_string() - .as_bytes() - .rchunks(3) - .rev() - .map(std::str::from_utf8) - .collect::, _>>() - .unwrap() - .join("_") -} - -#[inline] -pub const fn get_digit(b: u8) -> isize { - // wrapping_sub('0' as u32) same as - 48 but less magical - (b as isize).wrapping_sub('0' as isize) -} - -#[inline] -pub fn parse_temp(bytes: &[u8]) -> isize { - let is_negative = bytes[0] == b'-'; - let as_decimal = match (is_negative, bytes.len()) { - (true, 4) => get_digit(bytes[1]) * 10 + get_digit(bytes[3]), - (true, 5) => get_digit(bytes[1]) * 100 + get_digit(bytes[2]) * 10 + get_digit(bytes[4]), - (false, 3) => get_digit(bytes[0]) * 10 + get_digit(bytes[2]), - (false, 4) => get_digit(bytes[0]) * 100 + get_digit(bytes[1]) * 10 + get_digit(bytes[3]), - _x => panic!( - "could not parse temp: is_negative = {is_negative}, length = {}", - bytes.len() - ), - }; - if is_negative { - -as_decimal - } else { - as_decimal - } -} - -#[inline] -pub fn hashstr(bytes: &[u8]) -> usize { - let mut hash = 0; - let (chunks, remainder) = bytes.as_chunks::<8>(); - for &chunk in chunks { - hash += usize::from_be_bytes(chunk); - } - let mut r = [0_u8; 8]; - r[0] = remainder.len() as u8; - let mut idx = 1; - for &byte in remainder { - r[idx] = byte; - idx += 1; - } - hash += usize::from_be_bytes(r); - hash -} - -// not really faster -#[inline] -pub fn new_parse_temp(bytes: &[u8]) -> isize { - let (characteristic, mantissa) = unsafe { bytes.split_at_unchecked(bytes.len() - 2) }; - let mut sum = (mantissa[1] as u32).wrapping_sub('0' as u32); - let mut position = 10; - let mut idx = characteristic.len(); - while idx > 0 { - idx -= 1; - if characteristic[idx] == b'-' { - return sum as isize * -1; - } - sum += (characteristic[idx] as u32).wrapping_sub('0' as u32) * position; - position *= 10; - } - sum as isize -} - -// using Bytes struct has more performance impact than the std read_until method which uses Vec instead of slice -// #[inline] -// pub fn read_bytes_until(bytes: &mut Bytes>, delimiter: u8) -> Option> { -// let mut buf: Vec = Vec::with_capacity(108); -// for byte in bytes { -// if byte.is_err() { -// panic!("Could not read byte"); -// } -// let byte = byte.unwrap(); -// if delimiter == byte { -// return Some(buf); -// } -// buf.push(byte); -// } -// None -// } - -#[inline] -pub fn get_pos(bytes: &[u8], find: u8) -> Option { - let chunks = bytes.windows(4); - for (pos, chunk) in chunks.enumerate() { - let inner_pos = get_pos_in_chunk(chunk, find); - if inner_pos < chunk.len() as u32 { - return Some(pos as u32 + inner_pos); - } - } - None -} - -#[inline] -fn get_pos_in_chunk(byte_chunk: &[u8], find: u8) -> u32 { - let find_hex = u32::from_be_bytes([find; 4]); - let x = u32::from_be_bytes(byte_chunk.try_into().unwrap()) ^ find_hex; - let mask = (x - 0x01010101) & (!x & (0x80808080)); - u32::leading_zeros(mask) >> 3 -} - -#[cfg(test)] -mod tests { - use crate::{get_pos, hashstr, new_parse_temp}; - - #[test] - fn test_new_parse_temp_max() { - let temp_max = new_parse_temp("99.9".as_bytes()); - assert_eq!(temp_max, 999); - } - - #[test] - fn test_new_parse_temp_min() { - let temp_min = new_parse_temp("-99.9".as_bytes()); - assert_eq!(temp_min, -999); - } - - #[test] - fn test_new_parse_temp_zero() { - let temp_0 = new_parse_temp("0.0".as_bytes()); - assert_eq!(temp_0, 0); - } - - #[test] - fn test_new_parse_temp_pos() { - let temp_10 = new_parse_temp("9.9".as_bytes()); - assert_eq!(temp_10, 99); - } - - #[test] - fn test_new_parse_temp_neg() { - let temp_neg_10 = new_parse_temp("-9.9".as_bytes()); - assert_eq!(temp_neg_10, -99); - } - - #[test] - fn test_hashstr() { - let hash_1 = hashstr(b"abcdefghijk"); - let hash_2 = hashstr(b"kjihgfedcba"); - let hash_3 = hashstr(b"abba"); - let hash_4 = hashstr(b"baab"); - - assert_ne!(hash_1, hash_2); - assert_ne!(hash_3, hash_4); - } - - #[test] - fn test_getpos() { - let semi_bytes = vec![ - 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, b';', 0_u8, 0_u8, - ]; - let semi_bytes = semi_bytes.as_slice(); - let pos = get_pos(semi_bytes, b';').unwrap(); - assert_eq!(pos, 8); - } - - #[test] - fn test_getpos_empty() { - let semi_bytes = vec![0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8]; - let semi_bytes = semi_bytes.as_slice(); - let pos = get_pos(semi_bytes, b';'); - assert_eq!(pos, None); - } -} +pub mod models; +pub mod utils; +pub mod implementations; diff --git a/src/main/rust/src/main.rs b/src/main/rust/src/main.rs index bf63228..ef6ca57 100644 --- a/src/main/rust/src/main.rs +++ b/src/main/rust/src/main.rs @@ -5,4 +5,4 @@ fn main() { // let reader = BufReader::new(file); // reader.lines().for_each(|_x| {()}); // println!("Time={} μs", now.elapsed().as_micros()); -} \ No newline at end of file +} diff --git a/src/main/rust/src/models.rs b/src/main/rust/src/models.rs new file mode 100644 index 0000000..4253ee4 --- /dev/null +++ b/src/main/rust/src/models.rs @@ -0,0 +1,2 @@ +pub mod station_measurements; +pub mod mmap; diff --git a/src/main/rust/src/mmap.rs b/src/main/rust/src/models/mmap.rs similarity index 98% rename from src/main/rust/src/mmap.rs rename to src/main/rust/src/models/mmap.rs index b10688d..054ce74 100644 --- a/src/main/rust/src/mmap.rs +++ b/src/main/rust/src/models/mmap.rs @@ -71,7 +71,7 @@ impl<'a> MmapChunkIterator<'a> { data, chunk_size: 1, } - .with_consumers(num_consumers) + .with_consumers(num_consumers) } } diff --git a/src/main/rust/src/models/station_measurements.rs b/src/main/rust/src/models/station_measurements.rs new file mode 100644 index 0000000..5c89597 --- /dev/null +++ b/src/main/rust/src/models/station_measurements.rs @@ -0,0 +1,34 @@ +use std::fmt::Display; + +#[derive(Copy, Clone)] +pub struct StationMeasurements { + pub min: isize, + pub max: isize, + pub count: isize, + pub sum: isize, +} + +impl StationMeasurements { + pub fn update(&mut self, v: isize) { + self.min = self.min.min(v); + self.max = self.max.max(v); + self.count += 1; + self.sum += v; + } + + pub fn merge(&mut self, other: &Self) { + self.min = self.min.min(other.min); + self.max = self.max.max(other.max); + self.count += other.count; + self.sum += other.sum; + } +} + +impl Display for StationMeasurements { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let min = self.min as f64 / 10.0; + let max = self.max as f64 / 10.0; + let avg = (self.sum as f64 / self.count as f64) / 10.0; + write!(f, "{min}/{avg:.1}/{max}") + } +} diff --git a/src/main/rust/src/utils.rs b/src/main/rust/src/utils.rs new file mode 100644 index 0000000..69e3b7c --- /dev/null +++ b/src/main/rust/src/utils.rs @@ -0,0 +1,13 @@ +pub mod byte_pos; +pub mod parse; + +pub fn format_nums(num: usize) -> String { + num.to_string() + .as_bytes() + .rchunks(3) + .rev() + .map(std::str::from_utf8) + .collect::, _>>() + .unwrap() + .join("_") +} diff --git a/src/main/rust/src/utils/byte_pos.rs b/src/main/rust/src/utils/byte_pos.rs new file mode 100644 index 0000000..67e4003 --- /dev/null +++ b/src/main/rust/src/utils/byte_pos.rs @@ -0,0 +1,42 @@ +#[inline] +pub fn get(bytes: &[u8], find: u8) -> Option { + let chunks = bytes.windows(4); + for (pos, chunk) in chunks.enumerate() { + let inner_pos = get_pos_in_chunk(chunk, find); + if inner_pos < chunk.len() as u32 { + return Some(pos as u32 + inner_pos); + } + } + None +} + +#[inline] +fn get_pos_in_chunk(byte_chunk: &[u8], find: u8) -> u32 { + let find_hex = u32::from_be_bytes([find; 4]); + let x = u32::from_be_bytes(byte_chunk.try_into().unwrap()) ^ find_hex; + let mask = (x - 0x01010101) & (!x & (0x80808080)); + u32::leading_zeros(mask) >> 3 +} + +#[cfg(test)] +mod tests { + use crate::utils::byte_pos::get; + + #[test] + fn test_getpos() { + let semi_bytes = vec![ + 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, b';', 0_u8, 0_u8, + ]; + let semi_bytes = semi_bytes.as_slice(); + let pos = get(semi_bytes, b';').unwrap(); + assert_eq!(pos, 8); + } + + #[test] + fn test_getpos_empty() { + let semi_bytes = vec![0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8]; + let semi_bytes = semi_bytes.as_slice(); + let pos = get(semi_bytes, b';'); + assert_eq!(pos, None); + } +} diff --git a/src/main/rust/src/utils/parse.rs b/src/main/rust/src/utils/parse.rs new file mode 100644 index 0000000..6f53c5e --- /dev/null +++ b/src/main/rust/src/utils/parse.rs @@ -0,0 +1,131 @@ +use std::simd::num::SimdUint; +use std::simd::{u8x4, Simd}; + +#[inline] +pub const fn get_digit(b: u8) -> isize { + // wrapping_sub('0' as u32) same as - 48 but less magical + (b as isize).wrapping_sub('0' as isize) +} + +#[inline] +pub fn temp(bytes: &[u8]) -> isize { + let is_negative = bytes[0] == b'-'; + let as_decimal = match (is_negative, bytes.len()) { + (true, 4) => get_digit(bytes[1]) * 10 + get_digit(bytes[3]), + (true, 5) => get_digit(bytes[1]) * 100 + get_digit(bytes[2]) * 10 + get_digit(bytes[4]), + (false, 3) => get_digit(bytes[0]) * 10 + get_digit(bytes[2]), + (false, 4) => get_digit(bytes[0]) * 100 + get_digit(bytes[1]) * 10 + get_digit(bytes[3]), + _x => panic!( + "could not parse temp: is_negative = {is_negative}, length = {}", + bytes.len() + ), + }; + if is_negative { + -as_decimal + } else { + as_decimal + } +} + +// not really faster +#[inline] +pub fn temp_new(bytes: &[u8]) -> isize { + let (characteristic, mantissa) = unsafe { bytes.split_at_unchecked(bytes.len() - 2) }; + let mut sum = (mantissa[1] as u32).wrapping_sub('0' as u32); + let mut position = 10; + let mut idx = characteristic.len(); + while idx > 0 { + idx -= 1; + if characteristic[idx] == b'-' { + return sum as isize * -1; + } + sum += (characteristic[idx] as u32).wrapping_sub('0' as u32) * position; + position *= 10; + } + sum as isize +} + +// slower +#[inline] +pub fn temp_simd(bytes: &[u8]) -> isize { + let is_negative = bytes[0] == b'-'; + let bytes = if is_negative { &bytes[1..] } else { bytes }; + let len = bytes.len(); + let idxs = Simd::from_array([5, len.wrapping_sub(4), len - 3, len - 1]); + let zeroes = u8x4::splat(b'0'); + let temp_simd = Simd::gather_or(bytes, idxs, zeroes); + let subbed = temp_simd - zeroes; + let subbed: Simd = subbed.cast(); + let mul = Simd::from_array([0, 100, 10, 1]); + let mulled = subbed * mul; + let sum: isize = mulled.to_array().iter().sum(); + if is_negative { + -sum + } else { + sum + } +} + +#[inline] +pub fn hashstr(bytes: &[u8]) -> usize { + let mut hash = 0; + let (chunks, remainder) = bytes.as_chunks::<8>(); + for &chunk in chunks { + hash += usize::from_be_bytes(chunk); + } + let mut r = [0_u8; 8]; + r[0] = remainder.len() as u8; + let mut idx = 1; + for &byte in remainder { + r[idx] = byte; + idx += 1; + } + hash += usize::from_be_bytes(r); + hash +} + +#[cfg(test)] +mod tests { + use crate::utils::parse::{hashstr, temp_new}; + + #[test] + fn test_temp_new_max() { + let temp_max = temp_new("99.9".as_bytes()); + assert_eq!(temp_max, 999); + } + + #[test] + fn test_temp_new_min() { + let temp_min = temp_new("-99.9".as_bytes()); + assert_eq!(temp_min, -999); + } + + #[test] + fn test_temp_new_zero() { + let temp_0 = temp_new("0.0".as_bytes()); + assert_eq!(temp_0, 0); + } + + #[test] + fn test_temp_new_pos() { + let temp_10 = temp_new("9.9".as_bytes()); + assert_eq!(temp_10, 99); + } + + #[test] + fn test_temp_new_neg() { + let temp_neg_10 = temp_new("-9.9".as_bytes()); + assert_eq!(temp_neg_10, -99); + } + + #[test] + fn test_hashstr() { + let hash_1 = hashstr(b"abcdefghijk"); + let hash_2 = hashstr(b"kjihgfedcba"); + let hash_3 = hashstr(b"abba"); + let hash_4 = hashstr(b"baab"); + + assert_ne!(hash_1, hash_2); + assert_ne!(hash_3, hash_4); + } +}