From 7add8793a570c6c7b2539e1096d554cd5166f393 Mon Sep 17 00:00:00 2001 From: Fabian Schmidt Date: Mon, 19 Aug 2024 10:39:19 +0200 Subject: [PATCH] managed to get library version on par with the reference implementation by using memmap2. I'm understanding scoped threads a little better now... I think The fixed line length solution is still just as slow, even using memmap --- src/main/rust/Cargo.lock | 23 ++-- src/main/rust/Cargo.toml | 2 +- .../rust/src/implementations/libraries.rs | 105 ++++++++---------- .../multi_threaded_structured.rs | 90 +++++++-------- .../src/implementations/reference_impl.rs | 2 +- 5 files changed, 100 insertions(+), 122 deletions(-) diff --git a/src/main/rust/Cargo.lock b/src/main/rust/Cargo.lock index c62bf6e..26271c9 100644 --- a/src/main/rust/Cargo.lock +++ b/src/main/rust/Cargo.lock @@ -838,16 +838,6 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" -[[package]] -name = "memmap" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "memmap2" version = "0.7.1" @@ -857,6 +847,15 @@ dependencies = [ "libc", ] +[[package]] +name = "memmap2" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe751422e4a8caa417e13c3ea66452215d7d63e19e604f4980461212f3ae1322" +dependencies = [ + "libc", +] + [[package]] name = "multiversion" version = "0.7.4" @@ -922,7 +921,7 @@ dependencies = [ "fast-float", "libc", "memchr", - "memmap", + "memmap2 0.9.4", "polars", "rayon", "rustc-hash", @@ -1148,7 +1147,7 @@ dependencies = [ "home", "itoa", "memchr", - "memmap2", + "memmap2 0.7.1", "num-traits", "once_cell", "percent-encoding", diff --git a/src/main/rust/Cargo.toml b/src/main/rust/Cargo.toml index da24f92..68fc60d 100644 --- a/src/main/rust/Cargo.toml +++ b/src/main/rust/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" bstr = "1.9.1" fast-float = "0.2.0" memchr = "2.7.4" -memmap = "0.7.0" +memmap2 = "0.9.4" polars = { version = "0.36.2", features = ["csv", "lazy", "nightly", "streaming"]} rayon = "1.10.0" rustc-hash = "2.0.0" diff --git a/src/main/rust/src/implementations/libraries.rs b/src/main/rust/src/implementations/libraries.rs index dab33a3..3285766 100644 --- a/src/main/rust/src/implementations/libraries.rs +++ b/src/main/rust/src/implementations/libraries.rs @@ -1,67 +1,57 @@ -use std::collections::HashMap; -use std::io::{BufRead, Seek, SeekFrom}; -use std::sync::mpsc; -use std::time::Instant; -use std::{fs::File, io::BufReader, thread}; -use memmap::MmapOptions; use crate::models::station_measurements::StationMeasurements; use crate::utils::parse; -use crate::utils::parse::hashstr; +use memmap2::MmapOptions; +use rustc_hash::{FxBuildHasher, FxHashMap as HashMap}; +use std::sync::mpsc; +use std::time::Instant; +use std::{fs::File, thread}; const DEFAULT_HASHMAP_LENGTH: usize = 10000; pub fn run() { - const FILE_PATH: &str = "../../../measurements.txt"; let now = Instant::now(); + const FILE_PATH: &str = "../../../measurements.txt"; + let file = File::open(FILE_PATH).expect("File measurements.txt not found"); + let mmap = unsafe { MmapOptions::new().map(&file).unwrap() }; + let file_length = mmap.len(); + let hasher = FxBuildHasher::default(); + let mut stations: HashMap = + HashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher); + let (tx, rx) = mpsc::channel(); + let cores = thread::available_parallelism().unwrap().into(); + let chunk_length = file_length / cores; + let mut bounds = Vec::with_capacity(cores + 1); + let mut start = 0; + for _ in 0..cores { + let end = (start + chunk_length).min(mmap.len()); + let next_new_line = match memchr::memchr(b'\n', &mmap[end..]) { + Some(v) => v, + None => { + assert_eq!(end, mmap.len()); + 0 + } + }; + let end = end + next_new_line; + bounds.push((start, end)); + start = end + 1; + } thread::scope(|s| { - let mut stations: HashMap = - HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); - let (tx, rx) = mpsc::channel(); - let cores = thread::available_parallelism().unwrap().into(); - let file = File::open(FILE_PATH).expect("File measurements.txt not found"); - let mmap = unsafe { MmapOptions::new().map(&file).unwrap() }; - let file_length = mmap.len(); - let chunk_length = file_length / cores; - let mut bounds = Vec::with_capacity(cores + 1); - bounds.push(0); - for i in 1..cores { - let mut reader = BufReader::new(&file); - let mut byte_start = chunk_length * i; - reader - .seek(SeekFrom::Start(byte_start as u64)) - .expect("could not seek"); - let mut line = Vec::with_capacity(108); - let line_len = reader - .read_until(b'\n', &mut line) - .expect("could not read bytes"); - byte_start += line_len; - bounds.push(byte_start); - } - bounds.push(file_length); for i in 0..cores { let tx = tx.clone(); - let mut currposition = *bounds.get(i).unwrap(); - let end = *bounds.get(i + 1).unwrap(); + let (start, end) = *bounds.get(i).unwrap(); + let mmap_slice = &mmap[start..end]; s.spawn(move || { - let file = File::open(FILE_PATH).expect("File measurements.txt not found"); - let mut reader = BufReader::new(&file); - reader.seek(SeekFrom::Start(currposition as u64)).unwrap(); - let mut t_stations: HashMap = - HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); - let mut line = Vec::with_capacity(108); - loop { - let line_len = reader - .read_until(b'\n', &mut line) - .expect("could not read bytes"); - if line_len == 0 { + let mut t_stations: HashMap = + HashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher); + for line in mmap_slice.split(|&byte| { byte == b'\n' }) { + if line.len() == 0 { break; } let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); - let hash = hashstr(station); let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) }; - let temp = parse::temp(temp.split_last().unwrap().1); - let measurements_option = t_stations.get_mut(&hash); - if let Some((_, measurements)) = measurements_option { + let temp = parse::temp(temp); + let measurements_option = t_stations.get_mut(&station); + if let Some(measurements) = measurements_option { measurements.update(temp); } else { let measurements = StationMeasurements { @@ -70,31 +60,26 @@ pub fn run() { count: 1, sum: temp, }; - t_stations.insert(hash, (station, measurements)); + t_stations.insert(station, measurements); } - currposition += line_len; - if currposition >= end { - break; - } - line.clear(); } let _ = tx.send(t_stations); }); } drop(tx); while let Ok(t_stations) = rx.recv() { - for (&hash, (station, measurements)) in t_stations.iter() { - let joined_measurements_options = stations.get_mut(&hash); - if let Some((_, joined_measurements)) = joined_measurements_options { + for (station, measurements) in t_stations.iter() { + let joined_measurements_options = stations.get_mut(station); + if let Some(joined_measurements) = joined_measurements_options { joined_measurements.merge(measurements); } else { - stations.insert(hash, (station.to_owned(), *measurements)); + stations.insert(station.to_owned(), *measurements); } } } let mut stations: Vec = stations .iter() - .map(|(_, (station, measurements))| { + .map(|(station, measurements)| { let measurements = measurements.to_string(); #[cfg(feature = "json")] { diff --git a/src/main/rust/src/implementations/multi_threaded_structured.rs b/src/main/rust/src/implementations/multi_threaded_structured.rs index 4cdbf9a..03eca40 100644 --- a/src/main/rust/src/implementations/multi_threaded_structured.rs +++ b/src/main/rust/src/implementations/multi_threaded_structured.rs @@ -1,12 +1,11 @@ -use std::collections::HashMap; -use std::io::{Read, Seek, SeekFrom}; use std::sync::mpsc; use std::time::Instant; -use std::{fs::File, io::BufReader, thread}; +use std::{fs::File, thread}; use std::ffi::CStr; +use memmap2::MmapOptions; +use rustc_hash::{FxBuildHasher, FxHashMap}; use crate::models::station_measurements::StationMeasurements; use crate::utils::parse; -use crate::utils::parse::hashstr; const DEFAULT_HASHMAP_LENGTH: usize = 10000; @@ -14,55 +13,54 @@ pub fn run() { print!("\x1b[J"); const FILE_PATH: &str = "structured_measurements.txt"; let now = Instant::now(); + let file = File::open(FILE_PATH).expect("File structured_measurements.txt not found"); + let mmap = unsafe { MmapOptions::new().map(&file).unwrap() }; + let file_length = mmap.len(); + let hasher = FxBuildHasher::default(); + let mut stations: FxHashMap = + FxHashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher); + let (tx, rx) = mpsc::channel(); + let cores = thread::available_parallelism().unwrap().into(); + let chunk_length = file_length / cores; + let mut bounds = Vec::with_capacity(cores + 1); + let mut start = 0; + for _ in 0..cores { + let end = (start + chunk_length).min(mmap.len()); + let next_new_line = match memchr::memchr(b'\n', &mmap[end..]) { + Some(v) => v, + None => { + assert_eq!(end, mmap.len()); + 0 + } + }; + let end = end + next_new_line; + bounds.push((start, end)); + start = end + 1; + } thread::scope(|s| { - let mut stations: HashMap = - HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); - let (tx, rx) = mpsc::channel(); - let cores = thread::available_parallelism().unwrap().into(); - let file = File::open(FILE_PATH).expect("File structured_measurements.txt not found"); - let mut reader = BufReader::new(&file); - let file_length = reader.seek(SeekFrom::End(0)).unwrap(); - let chunk_length = file_length as usize / cores; - let mut bounds = Vec::with_capacity(cores + 1); - bounds.push(0); for i in 0..cores { let tx = tx.clone(); - let mut currposition = (i * chunk_length) as u64; - let end = ((i + 1) * chunk_length) as u64; + let (start, end) = *bounds.get(i).unwrap(); + let mmap_slice = &mmap[start..end]; s.spawn(move || { - let file = File::open(FILE_PATH).expect("File measurements.txt not found"); - let mut reader = BufReader::new(&file); - reader.seek(SeekFrom::Start(currposition)).unwrap(); - let mut t_stations: HashMap = - HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); - let mut line = [0u8; 107]; - let mut line_num = 0; - loop { + let mut t_stations: FxHashMap = + FxHashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher); + let lines = mmap_slice.chunks_exact(107); + for (line_num, line) in lines.enumerate() { if line_num % 100000 == 0 { print!("\x1b[{i};0Hlines: {line_num}"); } - line_num += 1; - let read_res = reader - .read_exact(&mut line); - match read_res { - Ok(_) => (), - Err(e) => match e.kind() { - std::io::ErrorKind::UnexpectedEof => break, - _ => panic!("Could not read") - }, - }; let (station, temp) = unsafe { line.split_at_unchecked(100) }; - let hash = hashstr(station); let station = { if station[station.len() - 1] == 0u8 { unsafe { std::str::from_utf8_unchecked(CStr::from_bytes_until_nul(station).unwrap().to_bytes()) } } else { unsafe { std::str::from_utf8_unchecked(station) } } - }; + }.to_owned(); let temp = parse::temp_new(&temp[1..6]); - let measurements_option = t_stations.get_mut(&hash); - if let Some((_, measurements)) = measurements_option { + let measurements_option = t_stations.get_mut(&station); + if let Some(measurements) = measurements_option { measurements.update(temp); } else { let measurements = StationMeasurements { @@ -71,11 +69,7 @@ pub fn run() { count: 1, sum: temp, }; - t_stations.insert(hash, (station.to_string(), measurements)); - } - currposition += 107; - if currposition >= end { - break; + t_stations.insert(station, measurements); } } let _ = tx.send(t_stations); @@ -83,18 +77,18 @@ pub fn run() { } drop(tx); while let Ok(t_stations) = rx.recv() { - for (&hash, (station, measurements)) in t_stations.iter() { - let joined_measurements_options = stations.get_mut(&hash); - if let Some((_, joined_measurements)) = joined_measurements_options { + for (station, measurements) in t_stations.iter() { + let joined_measurements_options = stations.get_mut(station); + if let Some(joined_measurements) = joined_measurements_options { joined_measurements.merge(measurements); } else { - stations.insert(hash, (station.to_owned(), *measurements)); + stations.insert(station.to_owned(), *measurements); } } } let mut stations: Vec = stations .iter() - .map(|(_, (station, measurements))| { + .map(|(station, measurements)| { let measurements = measurements.to_string(); #[cfg(feature = "json")] { diff --git a/src/main/rust/src/implementations/reference_impl.rs b/src/main/rust/src/implementations/reference_impl.rs index bdc2c6a..bd2b1e0 100644 --- a/src/main/rust/src/implementations/reference_impl.rs +++ b/src/main/rust/src/implementations/reference_impl.rs @@ -1,5 +1,5 @@ use bstr::{BStr, ByteSlice}; -use memmap::MmapOptions; +use memmap2::MmapOptions; use rayon::prelude::*; use rustc_hash::FxHashMap as HashMap; use std::time::Instant;