add solution using libraries to see how fast I can get. For now no difference
This commit is contained in:
		
							
								
								
									
										5
									
								
								src/main/rust/src/bin/libraries.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										5
									
								
								src/main/rust/src/bin/libraries.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,5 @@ | |||||||
|  | use onebrc::implementations::libraries::run; | ||||||
|  |  | ||||||
|  | fn main() { | ||||||
|  |     run(); | ||||||
|  | } | ||||||
| @@ -4,3 +4,4 @@ pub mod multi_threaded; | |||||||
| pub mod polars; | pub mod polars; | ||||||
| pub mod flare_flo; | pub mod flare_flo; | ||||||
| pub mod phcs; | pub mod phcs; | ||||||
|  | pub mod libraries; | ||||||
|   | |||||||
							
								
								
									
										121
									
								
								src/main/rust/src/implementations/libraries.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										121
									
								
								src/main/rust/src/implementations/libraries.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,121 @@ | |||||||
|  | use std::collections::HashMap; | ||||||
|  | use std::io::{BufRead, Seek, SeekFrom}; | ||||||
|  | use std::sync::mpsc; | ||||||
|  | use std::time::Instant; | ||||||
|  | use std::{fs::File, io::BufReader, thread}; | ||||||
|  | use memmap::MmapOptions; | ||||||
|  | use crate::models::station_measurements::StationMeasurements; | ||||||
|  | use crate::utils::parse; | ||||||
|  | use crate::utils::parse::hashstr; | ||||||
|  |  | ||||||
|  | const DEFAULT_HASHMAP_LENGTH: usize = 10000; | ||||||
|  |  | ||||||
|  | pub fn run() { | ||||||
|  |     const FILE_PATH: &str = "../../../measurements.txt"; | ||||||
|  |     let now = Instant::now(); | ||||||
|  |     thread::scope(|s| { | ||||||
|  |         let mut stations: HashMap<usize, (String, StationMeasurements)> = | ||||||
|  |             HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); | ||||||
|  |         let (tx, rx) = mpsc::channel(); | ||||||
|  |         let cores = thread::available_parallelism().unwrap().into(); | ||||||
|  |         let file = File::open(FILE_PATH).expect("File measurements.txt not found"); | ||||||
|  |         let mmap = unsafe { MmapOptions::new().map(&file).unwrap() }; | ||||||
|  |         let file_length = mmap.len(); | ||||||
|  |         let chunk_length = file_length / cores; | ||||||
|  |         let mut bounds = Vec::with_capacity(cores + 1); | ||||||
|  |         bounds.push(0); | ||||||
|  |         for i in 1..cores { | ||||||
|  |             let mut reader = BufReader::new(&file); | ||||||
|  |             let mut byte_start = chunk_length * i; | ||||||
|  |             reader | ||||||
|  |                 .seek(SeekFrom::Start(byte_start as u64)) | ||||||
|  |                 .expect("could not seek"); | ||||||
|  |             let mut line = Vec::with_capacity(108); | ||||||
|  |             let line_len = reader | ||||||
|  |                 .read_until(b'\n', &mut line) | ||||||
|  |                 .expect("could not read bytes"); | ||||||
|  |             byte_start += line_len; | ||||||
|  |             bounds.push(byte_start); | ||||||
|  |         } | ||||||
|  |         bounds.push(file_length); | ||||||
|  |         for i in 0..cores { | ||||||
|  |             let tx = tx.clone(); | ||||||
|  |             let mut currposition = *bounds.get(i).unwrap(); | ||||||
|  |             let end = *bounds.get(i + 1).unwrap(); | ||||||
|  |             s.spawn(move || { | ||||||
|  |                 let file = File::open(FILE_PATH).expect("File measurements.txt not found"); | ||||||
|  |                 let mut reader = BufReader::new(&file); | ||||||
|  |                 reader.seek(SeekFrom::Start(currposition as u64)).unwrap(); | ||||||
|  |                 let mut t_stations: HashMap<usize, (String, StationMeasurements)> = | ||||||
|  |                     HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); | ||||||
|  |                 let mut line = Vec::with_capacity(108); | ||||||
|  |                 loop { | ||||||
|  |                     let line_len = reader | ||||||
|  |                         .read_until(b'\n', &mut line) | ||||||
|  |                         .expect("could not read bytes"); | ||||||
|  |                     if line_len == 0 { | ||||||
|  |                         break; | ||||||
|  |                     } | ||||||
|  |                     let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); | ||||||
|  |                     let hash = hashstr(station); | ||||||
|  |                     let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) }; | ||||||
|  |                     let temp = parse::temp(temp.split_last().unwrap().1); | ||||||
|  |                     let measurements_option = t_stations.get_mut(&hash); | ||||||
|  |                     if let Some((_, measurements)) = measurements_option { | ||||||
|  |                         measurements.update(temp); | ||||||
|  |                     } else { | ||||||
|  |                         let measurements = StationMeasurements { | ||||||
|  |                             min: temp, | ||||||
|  |                             max: temp, | ||||||
|  |                             count: 1, | ||||||
|  |                             sum: temp, | ||||||
|  |                         }; | ||||||
|  |                         t_stations.insert(hash, (station, measurements)); | ||||||
|  |                     } | ||||||
|  |                     currposition += line_len; | ||||||
|  |                     if currposition >= end { | ||||||
|  |                         break; | ||||||
|  |                     } | ||||||
|  |                     line.clear(); | ||||||
|  |                 } | ||||||
|  |                 let _ = tx.send(t_stations); | ||||||
|  |             }); | ||||||
|  |         } | ||||||
|  |         drop(tx); | ||||||
|  |         while let Ok(t_stations) = rx.recv() { | ||||||
|  |             for (&hash, (station, measurements)) in t_stations.iter() { | ||||||
|  |                 let joined_measurements_options = stations.get_mut(&hash); | ||||||
|  |                 if let Some((_, joined_measurements)) = joined_measurements_options { | ||||||
|  |                     joined_measurements.merge(measurements); | ||||||
|  |                 } else { | ||||||
|  |                     stations.insert(hash, (station.to_owned(), *measurements)); | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |         let mut stations: Vec<String> = stations | ||||||
|  |             .iter() | ||||||
|  |             .map(|(_, (station, measurements))| { | ||||||
|  |                 let measurements = measurements.to_string(); | ||||||
|  |                 #[cfg(feature = "json")] | ||||||
|  |                 { | ||||||
|  |                     format!("{{\"{station}\":\"{measurements}\"}}") | ||||||
|  |                 } | ||||||
|  |                 #[cfg(not(feature = "json"))] | ||||||
|  |                 { | ||||||
|  |                     format!("{station}={measurements}") | ||||||
|  |                 } | ||||||
|  |             }) | ||||||
|  |             .collect(); | ||||||
|  |         stations.sort(); | ||||||
|  |         let stations = stations.join(","); | ||||||
|  |         #[cfg(feature = "json")] | ||||||
|  |         { | ||||||
|  |             println!("\n\n[{stations}]"); | ||||||
|  |         } | ||||||
|  |         #[cfg(not(feature = "json"))] | ||||||
|  |         { | ||||||
|  |             println!("\n\n{{{stations}}}"); | ||||||
|  |         } | ||||||
|  |         println!("\n\nTime={} ms", now.elapsed().as_millis()); | ||||||
|  |     }); | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user