From 2c23e30fe011cb07eb4dd4e52f16634476b5e0f4 Mon Sep 17 00:00:00 2001 From: Fabian Schmidt Date: Wed, 31 Jul 2024 09:27:02 +0200 Subject: [PATCH] hash stationnames myself for faster HashMap --- src/main/rust/Cargo.lock | 59 ++++++++++++++++++------ src/main/rust/src/bin/multi_threaded.rs | 27 +++++------ src/main/rust/src/bin/single_thread.rs | 14 +++--- src/main/rust/src/lib.rs | 61 ++++++++++++++++++++++++- 4 files changed, 127 insertions(+), 34 deletions(-) diff --git a/src/main/rust/Cargo.lock b/src/main/rust/Cargo.lock index a40635b..7fc8b04 100644 --- a/src/main/rust/Cargo.lock +++ b/src/main/rust/Cargo.lock @@ -12,7 +12,7 @@ dependencies = [ "getrandom", "once_cell", "version_check", - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -99,9 +99,9 @@ checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" [[package]] name = "bstr" -version = "1.9.1" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05efc5cfd9110c8416e471df0e96702d58690178e206e61b7173706673c93706" +checksum = "40723b8fb387abc38f4f4a37c09073622e41dd12327033091ef8950659e6dc0c" dependencies = [ "memchr", "regex-automata", @@ -134,6 +134,12 @@ dependencies = [ "syn 2.0.72", ] +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.6.1" @@ -142,9 +148,9 @@ checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" [[package]] name = "cc" -version = "1.1.6" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2aba8f4e9906c7ce3c73463f62a7f0c65183ada1a2d47e397cc8810827f9694f" +checksum = "26a5c3fd7bfa1ce3897a3a3501d362b2d87b7f2583ebcb4a949ec25911025cbc" dependencies = [ "jobserver", "libc", @@ -890,9 +896,12 @@ dependencies = [ [[package]] name = "ppv-lite86" -version = "0.2.17" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +checksum = "dee4364d9f3b902ef14fab8a1ddffb783a1cb6b4bba3bfc1fa3922732c7de97f" +dependencies = [ + "zerocopy 0.6.6", +] [[package]] name = "proc-macro2" @@ -1056,11 +1065,12 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.120" +version = "1.0.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e0d21c9a8cae1235ad58a00c11cb40d4b1e5c784f1ef2c537876ed6ffd8b7c5" +checksum = "4ab380d7d9f22ef3f21ad3e6c1ebe8e4fc7a2000ccba2e4d71fc96f15b2cb609" dependencies = [ "itoa", + "memchr", "ryu", "serde", ] @@ -1223,9 +1233,9 @@ checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d" [[package]] name = "version_check" -version = "0.9.4" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" [[package]] name = "wasi" @@ -1403,9 +1413,19 @@ checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "xxhash-rust" -version = "0.8.11" +version = "0.8.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63658493314859b4dfdf3fb8c1defd61587839def09582db50b8a4e93afca6bb" +checksum = "6a5cbf750400958819fb6178eaa83bee5cd9c29a26a40cc241df8c70fdd46984" + +[[package]] +name = "zerocopy" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "854e949ac82d619ee9a14c66a1b674ac730422372ccb759ce0c39cabcf2bf8e6" +dependencies = [ + "byteorder", + "zerocopy-derive 0.6.6", +] [[package]] name = "zerocopy" @@ -1413,7 +1433,18 @@ version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ - "zerocopy-derive", + "zerocopy-derive 0.7.35", +] + +[[package]] +name = "zerocopy-derive" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "125139de3f6b9d625c39e2efdd73d41bdac468ccd556556440e322be0e1bbd91" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.72", ] [[package]] diff --git a/src/main/rust/src/bin/multi_threaded.rs b/src/main/rust/src/bin/multi_threaded.rs index 0c6e82d..4cedf7b 100644 --- a/src/main/rust/src/bin/multi_threaded.rs +++ b/src/main/rust/src/bin/multi_threaded.rs @@ -10,7 +10,7 @@ use std::io::{BufRead, Seek, SeekFrom}; use std::sync::mpsc; use std::time::Instant; -use onebrc::parse_temp; +use onebrc::{hashstr, parse_temp, StationMeasurements}; const DEFAULT_HASHMAP_LENGTH: usize = 10000; @@ -18,7 +18,7 @@ fn main() { const FILE_PATH: &str = "../../../measurements.txt"; let now = Instant::now(); thread::scope(|s| { - let mut stations: HashMap = HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); + let mut stations: HashMap = HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); let (tx, rx) = mpsc::channel(); let cores = thread::available_parallelism().unwrap().into(); let file = File::open(FILE_PATH).expect("File measurements.txt not found"); @@ -40,12 +40,12 @@ fn main() { for i in 0..cores { let tx = tx.clone(); let mut currposition = *bounds.get(i).unwrap(); - let end = *bounds.get(i+1).unwrap(); + let end = *bounds.get(i+1).unwrap(); s.spawn(move || { let file = File::open(FILE_PATH).expect("File measurements.txt not found"); let mut reader = BufReader::new(&file); reader.seek(SeekFrom::Start(currposition)).unwrap(); - let mut t_stations: HashMap = + let mut t_stations: HashMap = HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); let mut line = Vec::with_capacity(108); loop { @@ -54,19 +54,20 @@ fn main() { break; } let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); + let hash = hashstr(station); let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) }; let temp = parse_temp(temp.split_last().unwrap().1); - let measurements_option = t_stations.get_mut(&station); - if let Some(measurements) = measurements_option { + let measurements_option = t_stations.get_mut(&hash); + if let Some((_, measurements)) = measurements_option { measurements.update(temp); } else { - let measurements = onebrc::StationMeasurements { + let measurements = StationMeasurements { min: temp, max: temp, count: 1, sum: temp, }; - t_stations.insert(station, measurements); + t_stations.insert(hash, (station, measurements)); } currposition += line_len as u64; if currposition >= end { @@ -79,16 +80,16 @@ fn main() { } drop(tx); while let Ok(t_stations) = rx.recv() { - for (station, measurements) in t_stations.iter() { - let joined_measurements_options = stations.get_mut(station.as_str()); - if let Some(joined_measurements) = joined_measurements_options { + for (&hash, (station, measurements)) in t_stations.iter() { + let joined_measurements_options = stations.get_mut(&hash); + if let Some((_, joined_measurements)) = joined_measurements_options { joined_measurements.merge(measurements); } else { - stations.insert(station.to_owned(), *measurements); + stations.insert(hash, (station.to_owned(), *measurements)); } } } - let mut stations: Vec = stations.iter().map(|(station, measurements)| { + let mut stations: Vec = stations.iter().map(|(_, (station, measurements))| { let measurements = measurements.to_string(); #[cfg(feature = "json")] { diff --git a/src/main/rust/src/bin/single_thread.rs b/src/main/rust/src/bin/single_thread.rs index 2cc93ec..2e6d088 100644 --- a/src/main/rust/src/bin/single_thread.rs +++ b/src/main/rust/src/bin/single_thread.rs @@ -4,12 +4,13 @@ use std::collections::HashMap; use std::fs::File; use std::io::{BufRead, BufReader}; use std::time::Instant; +use onebrc::{hashstr, StationMeasurements}; const DEFAULT_HASHMAP_LENGTH: usize = 10000; fn main() { let now = Instant::now(); - let mut stations: HashMap = + let mut stations: HashMap = HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); @@ -21,23 +22,24 @@ fn main() { break; } let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); + let hash = hashstr(station); let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) }; let temp = onebrc::parse_temp(temp.split_last().unwrap().1); - let measurements_option = stations.get_mut(&station); - if let Some(measurements) = measurements_option { + let measurements_option = stations.get_mut(&hash); + if let Some((_, measurements)) = measurements_option { measurements.update(temp); } else { - let measurements = onebrc::StationMeasurements { + let measurements = StationMeasurements { min: temp, max: temp, count: 1, sum: temp, }; - stations.insert(station, measurements); + stations.insert(hash, (station, measurements)); } line.clear(); } - let mut stations: Vec = stations.iter().map(|(station, measurements)| { + let mut stations: Vec = stations.iter().map(|(_ , (station, measurements))| { let measurements = measurements.to_string(); format!("{station}={measurements}") }).collect(); diff --git a/src/main/rust/src/lib.rs b/src/main/rust/src/lib.rs index a546884..6d1394b 100644 --- a/src/main/rust/src/lib.rs +++ b/src/main/rust/src/lib.rs @@ -1,3 +1,5 @@ +#![feature(slice_as_chunks)] + use std::fmt::Display; #[derive(Copy, Clone)] @@ -46,6 +48,7 @@ pub fn format_nums(num: usize) -> String { #[inline] pub const fn get_digit(b: u8) -> u32 { + // wrapping_sub('0' as u32) same as - 48 but less magical (b as u32).wrapping_sub('0' as u32) } @@ -66,6 +69,42 @@ pub fn parse_temp(bytes: &[u8]) -> isize { } } +#[inline] +pub fn hashstr(bytes: &[u8]) -> usize { + let mut hash = 0; + let (chunks, remainder) = bytes.as_chunks::<8>(); + for &chunk in chunks { + hash += usize::from_be_bytes(chunk); + } + let mut r = [0_u8; 8]; + r[0] = remainder.len() as u8; + let mut idx = 1; + for &byte in remainder { + r[idx] = byte; + idx += 1; + } + hash += usize::from_be_bytes(r); + hash +} + +// not really faster +#[inline] +pub fn new_parse_temp(bytes: &[u8]) -> isize { + let (characteristic, mantissa) = unsafe { bytes.split_at_unchecked(bytes.len() - 2) }; + let mut sum = (mantissa[1] as u32).wrapping_sub('0' as u32); + let mut position = 10; + let mut idx = characteristic.len(); + while idx > 0 { + idx -= 1; + if characteristic[idx] == b'-' { + return sum as isize * -1; + } + sum += (characteristic[idx] as u32).wrapping_sub('0' as u32) * position; + position *= 10; + } + sum as isize +} + // using Bytes struct has more performance impact than the std read_until method which uses Vec instead of slice // #[inline] // pub fn read_bytes_until(bytes: &mut Bytes>, delimiter: u8) -> Option> { @@ -92,4 +131,24 @@ pub fn parse_line(line: &[u8]) -> (&[u8], &[u8]) { } let station = &line[0..idx]; (station, &line[(idx+1)..(line_len-1)]) -} \ No newline at end of file +} + +#[cfg(test)] +mod tests { + use crate::new_parse_temp; + + #[test] + fn test_new_parse_temp() { + let temp_max = new_parse_temp("99.9".as_bytes()); + let temp_min = new_parse_temp("-99.9".as_bytes()); + let temp_0 = new_parse_temp("0.0".as_bytes()); + let temp_10 = new_parse_temp("10.0".as_bytes()); + let temp_neg_10 = new_parse_temp("-10.0".as_bytes()); + + assert_eq!(temp_max, 999); + assert_eq!(temp_min, -999); + assert_eq!(temp_0, 0); + assert_eq!(temp_10, 100); + assert_eq!(temp_neg_10, -100); + } +}