diff --git a/src/main/rust/Cargo.toml b/src/main/rust/Cargo.toml index 7099609..653aea3 100644 --- a/src/main/rust/Cargo.toml +++ b/src/main/rust/Cargo.toml @@ -14,6 +14,9 @@ memmap = "0.7.0" rayon = "1.10.0" rustc-hash = "2.0.0" +[features] +json = [] + [profile.release] lto = "fat" strip = "symbols" diff --git a/src/main/rust/src/bin/fast_but_wrong.rs b/src/main/rust/src/bin/fast_but_wrong.rs new file mode 100644 index 0000000..a0dd387 --- /dev/null +++ b/src/main/rust/src/bin/fast_but_wrong.rs @@ -0,0 +1,106 @@ +use std::{ + fs::File, + io::BufReader, + thread, +}; +use std::collections::HashMap; +use std::io::{Read, Seek, SeekFrom}; +use std::sync::mpsc; +use onebrc::{parse_line, parse_temp, read_bytes_until}; + +const DEFAULT_HASHMAP_LENGTH: usize = 10000; + +fn main() { + const FILE_PATH: &str = "../../../measurements.txt"; + let file = File::open(FILE_PATH).expect("File measurements.txt not found"); + thread::scope(|s| { + let mut stations: HashMap = HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); + let (tx, rx) = mpsc::channel(); + let cores = thread::available_parallelism().unwrap().into(); + let mut reader = BufReader::new(&file); + let file_length = reader.seek(SeekFrom::End(0)).unwrap(); + let chunk_length = file_length as usize / cores; + reader.seek(SeekFrom::Start(0)).unwrap(); + for i in 0..cores { + let tx = tx.clone(); + s.spawn(move || { + let file = File::open(FILE_PATH).expect("File measurements.txt not found"); + let mut reader = BufReader::new(&file); + let mut currposition = (chunk_length * i) as u64; + let end = currposition + chunk_length as u64; + reader.seek(SeekFrom::Start(currposition)).unwrap(); + let mut bytes = reader.bytes(); + + while let Some(byte) = bytes.next() { + match byte { + Ok(byte) => { + if byte == b'\n' { + break; + } + } + Err(_) => { panic!("could not go to next") } + } + } + + let mut t_stations: HashMap = + HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); + + while let Some(line) = read_bytes_until(&mut bytes, b'\n') { + let (station, temp) = parse_line(&line); + let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) }; + let temp = parse_temp(temp); + let measurements_option = t_stations.get_mut(&station); + if let Some(measurements) = measurements_option { + measurements.update(temp); + } else { + let measurements = onebrc::StationMeasurements { + min: temp, + max: temp, + count: 1, + sum: temp, + }; + t_stations.insert(station, measurements); + } + currposition += line.len() as u64; + if currposition > end { + break; + } + } + let _ = tx.send(t_stations); + }); + } + drop(tx); + while let Ok(t_stations) = rx.recv() { + for (station, measurements) in t_stations.iter() { + let joined_measurements_options = stations.get_mut(station.as_str()); + if let Some(joined_measurements) = joined_measurements_options { + joined_measurements.merge(measurements); + } else { + stations.insert(station.to_owned(), *measurements); + } + } + } + let mut stations: Vec = stations.iter().map(|(station, measurements)| { + let measurements = measurements.to_string(); + #[cfg(feature = "json")] + { + format!("{{\"{station}\":\"{measurements}\"}}") + } + #[cfg(not(feature = "json"))] + { + format!("{station}={measurements}") + } + }).collect(); + stations.sort(); + let stations = stations.join(","); + #[cfg(feature = "json")] + { + println!("\n\n[{stations}]"); + } + #[cfg(not(feature = "json"))] + { + println!("\n\n{{{stations}}}"); + } + // println!("\n\nTime={} ms", now.elapsed().as_millis()); + }); +} diff --git a/src/main/rust/src/bin/referenceImpl.rs b/src/main/rust/src/bin/referenceImpl.rs index 5569fc9..9c43a8c 100644 --- a/src/main/rust/src/bin/referenceImpl.rs +++ b/src/main/rust/src/bin/referenceImpl.rs @@ -104,13 +104,28 @@ fn main() { let mut all: Vec<_> = state.into_iter().collect(); all.sort_unstable_by(|a, b| a.0.cmp(&b.0)); - print!("{{"); - for (i, (name, state)) in all.into_iter().enumerate() { - if i == 0 { - print!("{name}={state}"); - } else { - print!(", {name}={state}"); + #[cfg(feature = "json")] + { + print!("["); + for (i, (name, state)) in all.into_iter().enumerate() { + if i == 0 { + print!("{{\"{name}\":\"{state}\"}}"); + } else { + print!(", {{\"{name}\":\"{state}\"}}"); + } } + println!("]"); + } + #[cfg(not(feature = "json"))] + { + print!("{{"); + for (i, (name, state)) in all.into_iter().enumerate() { + if i == 0 { + print!("{name}={state}"); + } else { + print!(", {name}={state}"); + } + } + println!("}}"); } - println!("}}"); } diff --git a/src/main/rust/src/bin/single_thread.rs b/src/main/rust/src/bin/single_thread.rs index 26add47..88512b6 100644 --- a/src/main/rust/src/bin/single_thread.rs +++ b/src/main/rust/src/bin/single_thread.rs @@ -13,9 +13,8 @@ fn main() { HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); - let mut bytes = BufReader::new(file).bytes(); - while let Some(line_result) = read_bytes_until(&mut bytes, b'\n') { - let line = line_result.expect("could not read line"); + let mut bytes = BufReader::new(&file).bytes(); + while let Some(line) = read_bytes_until(&mut bytes, b'\n') { let (station, temp) = parse_line(&line); let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) }; let temp = onebrc::parse_temp(temp); diff --git a/src/main/rust/src/lib.rs b/src/main/rust/src/lib.rs index 9d44b10..10d3484 100644 --- a/src/main/rust/src/lib.rs +++ b/src/main/rust/src/lib.rs @@ -59,7 +59,7 @@ pub fn parse_temp(bytes: &[u8]) -> isize { (true, 5) => get_digit(bytes[1]) * 100 + get_digit(bytes[2]) * 10 + get_digit(bytes[4]), (false, 3) => get_digit(bytes[0]) * 10 + get_digit(bytes[2]), (false, 4) => get_digit(bytes[0]) * 100 + get_digit(bytes[1]) * 10 + get_digit(bytes[3]), - _x => panic!(), + _x => panic!("could not parse temp: is_negative = {is_negative}, length = {}", bytes.len()), }; if is_negative { -(as_decimal as isize) @@ -69,7 +69,7 @@ pub fn parse_temp(bytes: &[u8]) -> isize { } #[inline] -pub fn read_bytes_until(bytes: &mut Bytes>, delimiter: u8) -> Option> { +pub fn read_bytes_until(bytes: &mut Bytes>, delimiter: u8) -> Option<[u8; 108]> { // 108 max length of line in bytes let mut buf: [u8; 108] = [b'#'; 108]; let mut idx = 0; @@ -79,7 +79,7 @@ pub fn read_bytes_until(bytes: &mut Bytes>, delimiter: u8) -> Op } let byte = byte.unwrap(); if delimiter == byte { - return Some(Ok(buf)); + return Some(buf); } buf[idx] = byte; idx += 1;