Tried looking at what the performance would be if I read the data from a file where every line would have the same length ie. not having to read until eol. But despite not having to search the \n byte (or ; because every station name and temperatures are padded with null bytes) and having a fixed size array instead of vec this is slower (the normal read_until version is actually still just as fast, while the new one is 10x slower)

This commit is contained in:
Fabian Schmidt 2024-08-12 10:48:07 +02:00
parent 8ffea918c4
commit b53212103b
6 changed files with 165 additions and 1 deletions

View File

@ -0,0 +1,5 @@
use onebrc::implementations::multi_threaded_structured::run;
fn main() {
run();
}

View File

@ -5,3 +5,4 @@ pub mod polars;
pub mod flare_flo; pub mod flare_flo;
pub mod phcs; pub mod phcs;
pub mod libraries; pub mod libraries;
pub mod multi_threaded_structured;

View File

@ -0,0 +1,121 @@
use std::collections::HashMap;
use std::io::{Read, Seek, SeekFrom};
use std::sync::mpsc;
use std::time::Instant;
use std::{fs::File, io::BufReader, thread};
use std::ffi::CStr;
use crate::models::station_measurements::StationMeasurements;
use crate::utils::parse;
use crate::utils::parse::hashstr;
const DEFAULT_HASHMAP_LENGTH: usize = 10000;
pub fn run() {
print!("\x1b[J");
const FILE_PATH: &str = "structured_measurements.txt";
let now = Instant::now();
thread::scope(|s| {
let mut stations: HashMap<usize, (String, StationMeasurements)> =
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
let (tx, rx) = mpsc::channel();
let cores = thread::available_parallelism().unwrap().into();
let file = File::open(FILE_PATH).expect("File structured_measurements.txt not found");
let mut reader = BufReader::new(&file);
let file_length = reader.seek(SeekFrom::End(0)).unwrap();
let chunk_length = file_length as usize / cores;
let mut bounds = Vec::with_capacity(cores + 1);
bounds.push(0);
for i in 0..cores {
let tx = tx.clone();
let mut currposition = (i * chunk_length) as u64;
let end = ((i + 1) * chunk_length) as u64;
s.spawn(move || {
let file = File::open(FILE_PATH).expect("File measurements.txt not found");
let mut reader = BufReader::new(&file);
reader.seek(SeekFrom::Start(currposition)).unwrap();
let mut t_stations: HashMap<usize, (String, StationMeasurements)> =
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
let mut line = [0u8; 107];
let mut line_num = 0;
loop {
if line_num % 100000 == 0 {
print!("\x1b[{i};0Hlines: {line_num}");
}
line_num += 1;
let read_res = reader
.read_exact(&mut line);
match read_res {
Ok(_) => (),
Err(e) => match e.kind() {
std::io::ErrorKind::UnexpectedEof => break,
_ => panic!("Could not read")
},
};
let (station, temp) = unsafe { line.split_at_unchecked(100) };
let hash = hashstr(station);
let station = {
if station[station.len() - 1] == 0u8 {
unsafe { std::str::from_utf8_unchecked(CStr::from_bytes_until_nul(station).unwrap().to_bytes()) }
} else {
unsafe { std::str::from_utf8_unchecked(station) }
}
};
let temp = parse::temp_new(&temp[1..6]);
let measurements_option = t_stations.get_mut(&hash);
if let Some((_, measurements)) = measurements_option {
measurements.update(temp);
} else {
let measurements = StationMeasurements {
min: temp,
max: temp,
count: 1,
sum: temp,
};
t_stations.insert(hash, (station.to_string(), measurements));
}
currposition += 107;
if currposition >= end {
break;
}
}
let _ = tx.send(t_stations);
});
}
drop(tx);
while let Ok(t_stations) = rx.recv() {
for (&hash, (station, measurements)) in t_stations.iter() {
let joined_measurements_options = stations.get_mut(&hash);
if let Some((_, joined_measurements)) = joined_measurements_options {
joined_measurements.merge(measurements);
} else {
stations.insert(hash, (station.to_owned(), *measurements));
}
}
}
let mut stations: Vec<String> = stations
.iter()
.map(|(_, (station, measurements))| {
let measurements = measurements.to_string();
#[cfg(feature = "json")]
{
format!("{{\"{station}\":\"{measurements}\"}}")
}
#[cfg(not(feature = "json"))]
{
format!("{station}={measurements}")
}
})
.collect();
stations.sort();
let stations = stations.join(",");
#[cfg(feature = "json")]
{
println!("\n\n[{stations}]");
}
#[cfg(not(feature = "json"))]
{
println!("\n\n{{{stations}}}");
}
println!("\n\nTime={} ms", now.elapsed().as_millis());
});
}

View File

@ -1,5 +1,6 @@
pub mod byte_pos; pub mod byte_pos;
pub mod parse; pub mod parse;
pub mod write_structured_measurements;
pub fn format_nums(num: usize) -> String { pub fn format_nums(num: usize) -> String {
num.to_string() num.to_string()

View File

@ -37,7 +37,10 @@ pub fn temp_new(bytes: &[u8]) -> isize {
while idx > 0 { while idx > 0 {
idx -= 1; idx -= 1;
if characteristic[idx] == b'-' { if characteristic[idx] == b'-' {
return sum as isize * -1; return -(sum as isize);
}
if characteristic[idx] == 0u8 {
return sum as isize;
} }
sum += (characteristic[idx] as u32).wrapping_sub('0' as u32) * position; sum += (characteristic[idx] as u32).wrapping_sub('0' as u32) * position;
position *= 10; position *= 10;

View File

@ -0,0 +1,33 @@
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Write};
pub fn write_structured_measurements() {
let file = File::open("../../../measurements.txt").expect("File measurements.txt not found");
let structured_file = File::create_new("structured_measurements.txt").expect("Could not create file");
let mut reader = BufReader::new(&file);
let mut writer = BufWriter::new(&structured_file);
let mut line = Vec::with_capacity(107);
let mut line_num = 0;
loop {
line_num += 1;
let line_len = reader
.read_until(b'\n', &mut line)
.expect("could not read bytes");
if line_len == 0 {
break;
}
if line_num % 100000 == 0 {
print!("\x1b[0Glines: {line_num}");
}
let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap();
let station_len = station.len();
let temp_val_start = 107 - temp.len();
let mut write_line = [0u8; 107];
write_line[..station_len].clone_from_slice(station);
write_line[100] = b';';
write_line[temp_val_start..temp_val_start + temp.len()].clone_from_slice(temp);
write_line[106] = b'\n';
writer.write_all(write_line.as_slice()).expect("Could not write");
line.clear();
}
}