From b53212103b9860d1684fdd8f6bf9d89160208c8b Mon Sep 17 00:00:00 2001 From: Fabian Schmidt Date: Mon, 12 Aug 2024 10:48:07 +0200 Subject: [PATCH] Tried looking at what the performance would be if I read the data from a file where every line would have the same length ie. not having to read until eol. But despite not having to search the \n byte (or ; because every station name and temperatures are padded with null bytes) and having a fixed size array instead of vec this is slower (the normal read_until version is actually still just as fast, while the new one is 10x slower) --- .../rust/src/bin/multi_threaded_structured.rs | 5 + src/main/rust/src/implementations.rs | 1 + .../multi_threaded_structured.rs | 121 ++++++++++++++++++ src/main/rust/src/utils.rs | 1 + src/main/rust/src/utils/parse.rs | 5 +- .../utils/write_structured_measurements.rs | 33 +++++ 6 files changed, 165 insertions(+), 1 deletion(-) create mode 100644 src/main/rust/src/bin/multi_threaded_structured.rs create mode 100644 src/main/rust/src/implementations/multi_threaded_structured.rs create mode 100644 src/main/rust/src/utils/write_structured_measurements.rs diff --git a/src/main/rust/src/bin/multi_threaded_structured.rs b/src/main/rust/src/bin/multi_threaded_structured.rs new file mode 100644 index 0000000..2757e0c --- /dev/null +++ b/src/main/rust/src/bin/multi_threaded_structured.rs @@ -0,0 +1,5 @@ +use onebrc::implementations::multi_threaded_structured::run; + +fn main() { + run(); +} diff --git a/src/main/rust/src/implementations.rs b/src/main/rust/src/implementations.rs index c6e6edc..e807a75 100644 --- a/src/main/rust/src/implementations.rs +++ b/src/main/rust/src/implementations.rs @@ -5,3 +5,4 @@ pub mod polars; pub mod flare_flo; pub mod phcs; pub mod libraries; +pub mod multi_threaded_structured; diff --git a/src/main/rust/src/implementations/multi_threaded_structured.rs b/src/main/rust/src/implementations/multi_threaded_structured.rs new file mode 100644 index 0000000..4cdbf9a --- /dev/null +++ b/src/main/rust/src/implementations/multi_threaded_structured.rs @@ -0,0 +1,121 @@ +use std::collections::HashMap; +use std::io::{Read, Seek, SeekFrom}; +use std::sync::mpsc; +use std::time::Instant; +use std::{fs::File, io::BufReader, thread}; +use std::ffi::CStr; +use crate::models::station_measurements::StationMeasurements; +use crate::utils::parse; +use crate::utils::parse::hashstr; + +const DEFAULT_HASHMAP_LENGTH: usize = 10000; + +pub fn run() { + print!("\x1b[J"); + const FILE_PATH: &str = "structured_measurements.txt"; + let now = Instant::now(); + thread::scope(|s| { + let mut stations: HashMap = + HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); + let (tx, rx) = mpsc::channel(); + let cores = thread::available_parallelism().unwrap().into(); + let file = File::open(FILE_PATH).expect("File structured_measurements.txt not found"); + let mut reader = BufReader::new(&file); + let file_length = reader.seek(SeekFrom::End(0)).unwrap(); + let chunk_length = file_length as usize / cores; + let mut bounds = Vec::with_capacity(cores + 1); + bounds.push(0); + for i in 0..cores { + let tx = tx.clone(); + let mut currposition = (i * chunk_length) as u64; + let end = ((i + 1) * chunk_length) as u64; + s.spawn(move || { + let file = File::open(FILE_PATH).expect("File measurements.txt not found"); + let mut reader = BufReader::new(&file); + reader.seek(SeekFrom::Start(currposition)).unwrap(); + let mut t_stations: HashMap = + HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); + let mut line = [0u8; 107]; + let mut line_num = 0; + loop { + if line_num % 100000 == 0 { + print!("\x1b[{i};0Hlines: {line_num}"); + } + line_num += 1; + let read_res = reader + .read_exact(&mut line); + match read_res { + Ok(_) => (), + Err(e) => match e.kind() { + std::io::ErrorKind::UnexpectedEof => break, + _ => panic!("Could not read") + }, + }; + let (station, temp) = unsafe { line.split_at_unchecked(100) }; + let hash = hashstr(station); + let station = { + if station[station.len() - 1] == 0u8 { + unsafe { std::str::from_utf8_unchecked(CStr::from_bytes_until_nul(station).unwrap().to_bytes()) } + } else { + unsafe { std::str::from_utf8_unchecked(station) } + } + }; + let temp = parse::temp_new(&temp[1..6]); + let measurements_option = t_stations.get_mut(&hash); + if let Some((_, measurements)) = measurements_option { + measurements.update(temp); + } else { + let measurements = StationMeasurements { + min: temp, + max: temp, + count: 1, + sum: temp, + }; + t_stations.insert(hash, (station.to_string(), measurements)); + } + currposition += 107; + if currposition >= end { + break; + } + } + let _ = tx.send(t_stations); + }); + } + drop(tx); + while let Ok(t_stations) = rx.recv() { + for (&hash, (station, measurements)) in t_stations.iter() { + let joined_measurements_options = stations.get_mut(&hash); + if let Some((_, joined_measurements)) = joined_measurements_options { + joined_measurements.merge(measurements); + } else { + stations.insert(hash, (station.to_owned(), *measurements)); + } + } + } + let mut stations: Vec = stations + .iter() + .map(|(_, (station, measurements))| { + let measurements = measurements.to_string(); + #[cfg(feature = "json")] + { + format!("{{\"{station}\":\"{measurements}\"}}") + } + #[cfg(not(feature = "json"))] + { + format!("{station}={measurements}") + } + }) + .collect(); + stations.sort(); + let stations = stations.join(","); + #[cfg(feature = "json")] + { + println!("\n\n[{stations}]"); + } + #[cfg(not(feature = "json"))] + { + println!("\n\n{{{stations}}}"); + } + println!("\n\nTime={} ms", now.elapsed().as_millis()); + }); +} diff --git a/src/main/rust/src/utils.rs b/src/main/rust/src/utils.rs index 69e3b7c..b7342c1 100644 --- a/src/main/rust/src/utils.rs +++ b/src/main/rust/src/utils.rs @@ -1,5 +1,6 @@ pub mod byte_pos; pub mod parse; +pub mod write_structured_measurements; pub fn format_nums(num: usize) -> String { num.to_string() diff --git a/src/main/rust/src/utils/parse.rs b/src/main/rust/src/utils/parse.rs index 6f53c5e..9c1023b 100644 --- a/src/main/rust/src/utils/parse.rs +++ b/src/main/rust/src/utils/parse.rs @@ -37,7 +37,10 @@ pub fn temp_new(bytes: &[u8]) -> isize { while idx > 0 { idx -= 1; if characteristic[idx] == b'-' { - return sum as isize * -1; + return -(sum as isize); + } + if characteristic[idx] == 0u8 { + return sum as isize; } sum += (characteristic[idx] as u32).wrapping_sub('0' as u32) * position; position *= 10; diff --git a/src/main/rust/src/utils/write_structured_measurements.rs b/src/main/rust/src/utils/write_structured_measurements.rs new file mode 100644 index 0000000..3a4995e --- /dev/null +++ b/src/main/rust/src/utils/write_structured_measurements.rs @@ -0,0 +1,33 @@ +use std::fs::File; +use std::io::{BufRead, BufReader, BufWriter, Write}; + +pub fn write_structured_measurements() { + let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); + let structured_file = File::create_new("structured_measurements.txt").expect("Could not create file"); + let mut reader = BufReader::new(&file); + let mut writer = BufWriter::new(&structured_file); + let mut line = Vec::with_capacity(107); + let mut line_num = 0; + loop { + line_num += 1; + let line_len = reader + .read_until(b'\n', &mut line) + .expect("could not read bytes"); + if line_len == 0 { + break; + } + if line_num % 100000 == 0 { + print!("\x1b[0Glines: {line_num}"); + } + let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); + let station_len = station.len(); + let temp_val_start = 107 - temp.len(); + let mut write_line = [0u8; 107]; + write_line[..station_len].clone_from_slice(station); + write_line[100] = b';'; + write_line[temp_val_start..temp_val_start + temp.len()].clone_from_slice(temp); + write_line[106] = b'\n'; + writer.write_all(write_line.as_slice()).expect("Could not write"); + line.clear(); + } +}