moved rust implementation to /src/main/rust
This commit is contained in:
		
							
								
								
									
										1452
									
								
								src/main/rust/Cargo.lock
									
									
									
										generated
									
									
									
										Normal file
									
								
							
							
						
						
									
										1452
									
								
								src/main/rust/Cargo.lock
									
									
									
										generated
									
									
									
										Normal file
									
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							
							
								
								
									
										19
									
								
								src/main/rust/Cargo.toml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										19
									
								
								src/main/rust/Cargo.toml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,19 @@ | ||||
| [package] | ||||
| name = "rust" | ||||
| version = "0.1.0" | ||||
| edition = "2021" | ||||
|  | ||||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||||
|  | ||||
| [dependencies] | ||||
| bstr = "1.9.1" | ||||
| fast-float = "0.2.0" | ||||
| hashbrown = "0.14.3" | ||||
| memchr = "2.7.4" | ||||
| memmap = "0.7.0" | ||||
| polars = { version = "0.36.2", features = ["csv", "lazy", "nightly", "streaming"]} | ||||
| rayon = "1.10.0" | ||||
| rustc-hash = "2.0.0" | ||||
|  | ||||
| [build] | ||||
| rustflags = ["-C target-cpu=native"] | ||||
							
								
								
									
										108
									
								
								src/main/rust/src/bin/multi_threaded.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										108
									
								
								src/main/rust/src/bin/multi_threaded.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,108 @@ | ||||
| use std::{ | ||||
|     fs::File, | ||||
|     io::{BufRead, BufReader}, | ||||
|     sync::{Arc, Mutex}, | ||||
|     thread, | ||||
| }; | ||||
|  | ||||
| use hashbrown::HashMap; | ||||
|  | ||||
| #[derive(Clone, Copy)] | ||||
| struct StationMeasurements { | ||||
|     min: f64, | ||||
|     max: f64, | ||||
|     count: usize, | ||||
|     sum: f64, | ||||
| } | ||||
|  | ||||
| const DEFAULT_HASHMAP_LENGTH: usize = 10000; | ||||
|  | ||||
| fn main() { | ||||
|     let stations: Arc<Mutex<HashMap<String, StationMeasurements>>> = | ||||
|         Arc::new(Mutex::new(HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH))); | ||||
|  | ||||
|     let cores: usize = std::thread::available_parallelism().unwrap().into(); | ||||
|  | ||||
|     let chunk_length = 1_000_000_000 / cores; | ||||
|     let mut handles = vec![]; | ||||
|     for i in 0..cores { | ||||
|         let file = File::open("../measurements.txt").expect("File measurements.txt not found"); | ||||
|         let reader = BufReader::new(file); | ||||
|         let line_chunk = reader.lines().skip(chunk_length * i).take(chunk_length); | ||||
|         let stations_clone = stations.clone(); | ||||
|         let handle = thread::spawn(move || { | ||||
|             let mut t_stations: HashMap<String, StationMeasurements> = | ||||
|                 HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); | ||||
|             let mut line_num = 0; | ||||
|             for line in line_chunk { | ||||
|                 line_num += 1; | ||||
|                 let lineprint = chunk_length / 500; | ||||
|                 if line_num % lineprint == 0 { | ||||
|                     println!("Thread #{i}"); | ||||
|                     let formatted_line_num = format_nums(line_num); | ||||
|                     println!("Calculated {formatted_line_num} stations"); | ||||
|                 } | ||||
|                 let line = line.expect("could not read line"); | ||||
|                 if line_num < 10 { | ||||
|                     println!("{}", line); | ||||
|                 } | ||||
|                 let (station, temp) = line.split_once(';').expect("Error while splitting"); | ||||
|                 let temp = temp.parse().expect("Error while parsing temperature"); | ||||
|                 let measurements_option = t_stations.get_mut(station); | ||||
|                 if let Some(measurements) = measurements_option { | ||||
|                     if temp < measurements.min { | ||||
|                         measurements.min = temp; | ||||
|                     } else if temp > measurements.max { | ||||
|                         measurements.max = temp; | ||||
|                     } | ||||
|                     measurements.count += 1; | ||||
|                     measurements.sum += temp; | ||||
|                 } else { | ||||
|                     let measurements = StationMeasurements { | ||||
|                         min: temp, | ||||
|                         max: temp, | ||||
|                         count: 1, | ||||
|                         sum: temp, | ||||
|                     }; | ||||
|                     t_stations.insert(station.to_owned(), measurements); | ||||
|                 } | ||||
|             } | ||||
|             let mut stations_guard = stations_clone.lock().expect("Error while locking"); | ||||
|             for (station, measurements) in t_stations.iter() { | ||||
|                 let joined_measurements_options = stations_guard.get_mut(station.as_str()); | ||||
|                 if let Some(joined_measurements) = joined_measurements_options { | ||||
|                     if measurements.min < joined_measurements.min { | ||||
|                         joined_measurements.min = measurements.min; | ||||
|                     } else if measurements.max > joined_measurements.max { | ||||
|                         joined_measurements.max = measurements.max; | ||||
|                     } | ||||
|                     joined_measurements.count += measurements.count; | ||||
|                     joined_measurements.sum += measurements.sum; | ||||
|                 } else { | ||||
|                     stations_guard.insert(station.to_owned(), *measurements); | ||||
|                 } | ||||
|             } | ||||
|         }); | ||||
|         handles.push(handle); | ||||
|     } | ||||
|     for handle in handles { | ||||
|         handle.join().unwrap(); | ||||
|     } | ||||
|     for (station, measurments) in stations.lock().unwrap().iter() { | ||||
|         let min = measurments.min; | ||||
|         let max = measurments.max; | ||||
|         let avg = measurments.sum / measurments.count as f64; | ||||
|         println!("{station}={min}/{max}/{avg:.2}"); | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn format_nums(num: usize) -> String { | ||||
|     num.to_string() | ||||
|         .as_bytes() | ||||
|         .rchunks(3) | ||||
|         .rev() | ||||
|         .map(std::str::from_utf8) | ||||
|         .collect::<Result<Vec<&str>, _>>() | ||||
|         .unwrap() | ||||
|         .join("_") | ||||
| } | ||||
							
								
								
									
										35
									
								
								src/main/rust/src/bin/polars.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										35
									
								
								src/main/rust/src/bin/polars.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,35 @@ | ||||
| use polars::prelude::*; | ||||
| use std::time::Instant; | ||||
| use std::vec; | ||||
|  | ||||
| fn run_polars() -> Result<DataFrame, PolarsError> { | ||||
|     let now = Instant::now(); | ||||
|  | ||||
|     let f1: Field = Field::new("station", DataType::String); | ||||
|     let f2: Field = Field::new("measure", DataType::Float64); | ||||
|     let sc: Schema = Schema::from_iter(vec![f1, f2]); | ||||
|  | ||||
|     let q = LazyCsvReader::new("../measurements.txt") | ||||
|         .has_header(false) | ||||
|         .with_schema(Some(Arc::new(sc))) | ||||
|         .with_separator(b';') | ||||
|         .finish()? | ||||
|         .group_by(vec![col("station")]) | ||||
|         .agg(vec![ | ||||
|             col("measure").alias("min").min(), | ||||
|             col("measure").alias("mean").mean(), | ||||
|             col("measure").alias("max").max(), | ||||
|         ]) | ||||
|         .sort("station", Default::default()) | ||||
|         .with_streaming(true); | ||||
|  | ||||
|     let df = q.collect()?; | ||||
|  | ||||
|     println!("Time={} μs", now.elapsed().as_micros()); | ||||
|  | ||||
|     Ok(df) | ||||
| } | ||||
|  | ||||
| fn main() { | ||||
|     run_polars(); | ||||
| } | ||||
							
								
								
									
										116
									
								
								src/main/rust/src/bin/referenceImpl.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										116
									
								
								src/main/rust/src/bin/referenceImpl.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,116 @@ | ||||
| use bstr::{BStr, ByteSlice}; | ||||
| use memmap::MmapOptions; | ||||
| use rustc_hash::FxHashMap as HashMap; | ||||
| use std::{fmt::Display, fs::File}; | ||||
|  | ||||
| use rayon::prelude::*; | ||||
|  | ||||
| #[derive(Debug)] | ||||
| struct State { | ||||
|     min: f64, | ||||
|     max: f64, | ||||
|     count: u64, | ||||
|     sum: f64, | ||||
| } | ||||
|  | ||||
| impl Default for State { | ||||
|     fn default() -> Self { | ||||
|         Self { | ||||
|             min: f64::MAX, | ||||
|             max: f64::MIN, | ||||
|             count: 0, | ||||
|             sum: 0.0, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Display for State { | ||||
|     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||||
|         let avg = self.sum / (self.count as f64); | ||||
|         write!(f, "{:.1}/{avg:.1}/{:.1}", self.min, self.max) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl State { | ||||
|     fn update(&mut self, v: f64) { | ||||
|         self.min = self.min.min(v); | ||||
|         self.max = self.max.max(v); | ||||
|         self.count += 1; | ||||
|         self.sum += v; | ||||
|     } | ||||
|  | ||||
|     fn merge(&mut self, other: &Self) { | ||||
|         self.min = self.min.min(other.min); | ||||
|         self.max = self.max.max(other.max); | ||||
|         self.count += other.count; | ||||
|         self.sum += other.sum; | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn make_map<'a>(i: impl Iterator<Item = &'a [u8]>) -> HashMap<&'a BStr, State> { | ||||
|     let mut state: HashMap<&'a BStr, State> = Default::default(); | ||||
|     for line in i { | ||||
|         let (name, value) = line.split_once_str(&[b';']).unwrap(); | ||||
|         let value = fast_float::parse(value).unwrap(); | ||||
|         state.entry(name.into()).or_default().update(value); | ||||
|     } | ||||
|     state | ||||
| } | ||||
|  | ||||
| fn solve_for_part((start, end): (usize, usize), mem: &[u8]) -> HashMap<&BStr, State> { | ||||
|     make_map((&mem[start..end]).lines()) | ||||
| } | ||||
|  | ||||
| fn merge<'a>(a: &mut HashMap<&'a BStr, State>, b: &HashMap<&'a BStr, State>) { | ||||
|     for (k, v) in b { | ||||
|         a.entry(k).or_default().merge(v); | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn main() { | ||||
|     let cores: usize = std::thread::available_parallelism().unwrap().into(); | ||||
|     let path = match std::env::args().skip(1).next() { | ||||
|         Some(path) => path, | ||||
|         None => "measurements.txt".to_owned(), | ||||
|     }; | ||||
|     let file = File::open(path).unwrap(); | ||||
|     let mmap = unsafe { MmapOptions::new().map(&file).unwrap() }; | ||||
|  | ||||
|     let chunk_size = mmap.len() / cores; | ||||
|     let mut chunks: Vec<(usize, usize)> = vec![]; | ||||
|     let mut start = 0; | ||||
|     for _ in 0..cores { | ||||
|         let end = (start + chunk_size).min(mmap.len()); | ||||
|         let next_new_line = match memchr::memchr(b'\n', &mmap[end..]) { | ||||
|             Some(v) => v, | ||||
|             None => { | ||||
|                 assert_eq!(end, mmap.len()); | ||||
|                 0 | ||||
|             } | ||||
|         }; | ||||
|         let end = end + next_new_line; | ||||
|         chunks.push((start, end)); | ||||
|         start = end + 1; | ||||
|     } | ||||
|     let parts: Vec<_> = chunks | ||||
|         .par_iter() | ||||
|         .map(|r| solve_for_part(*r, &mmap)) | ||||
|         .collect(); | ||||
|  | ||||
|     let state: HashMap<&BStr, State> = parts.into_iter().fold(Default::default(), |mut a, b| { | ||||
|         merge(&mut a, &b); | ||||
|         a | ||||
|     }); | ||||
|  | ||||
|     let mut all: Vec<_> = state.into_iter().collect(); | ||||
|     all.sort_unstable_by(|a, b| a.0.cmp(&b.0)); | ||||
|     print!("{{"); | ||||
|     for (i, (name, state)) in all.into_iter().enumerate() { | ||||
|         if i == 0 { | ||||
|             print!("{name}={state}"); | ||||
|         } else { | ||||
|             print!(", {name}={state}"); | ||||
|         } | ||||
|     } | ||||
|     println!("}}"); | ||||
| } | ||||
							
								
								
									
										72
									
								
								src/main/rust/src/bin/single_thread.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										72
									
								
								src/main/rust/src/bin/single_thread.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,72 @@ | ||||
| use std::{ | ||||
|     fs::File, | ||||
|     io::{BufRead, BufReader}, | ||||
| }; | ||||
|  | ||||
| use hashbrown::HashMap; | ||||
|  | ||||
| struct StationMeasurements { | ||||
|     min: f64, | ||||
|     max: f64, | ||||
|     temps: Vec<f64>, | ||||
| } | ||||
|  | ||||
| const DEFAULT_HASHMAP_LENGTH: usize = 10000; | ||||
|  | ||||
| fn main() { | ||||
|     let mut stations: HashMap<String, StationMeasurements> = | ||||
|         HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); | ||||
|  | ||||
|     let file = File::open("../measurements.txt").expect("File measurements.txt not found"); | ||||
|     let reader = BufReader::new(file); | ||||
|     let mut line_num = 0; | ||||
|     for line_result in reader.lines() { | ||||
|         line_num += 1; | ||||
|         if line_num % 250000 == 0 { | ||||
|             let formatted_line_num = format_nums(line_num); | ||||
|             println!("Calculated {formatted_line_num} stations"); | ||||
|         } | ||||
|         let line = line_result.expect("could not read line"); | ||||
|         let (station, temp) = line.split_once(';').unwrap(); | ||||
|         let temp = temp.parse().unwrap(); | ||||
|         let measurements_option = stations.get_mut(station); | ||||
|         if let Some(measurements) = measurements_option { | ||||
|             if temp < measurements.min { | ||||
|                 measurements.min = temp; | ||||
|             } else if temp > measurements.max { | ||||
|                 measurements.max = temp; | ||||
|             } | ||||
|             measurements.temps.push(temp); | ||||
|         } else { | ||||
|             let measurements = StationMeasurements { | ||||
|                 min: temp, | ||||
|                 max: temp, | ||||
|                 temps: vec![temp], | ||||
|             }; | ||||
|             stations.insert(station.to_owned(), measurements); | ||||
|         } | ||||
|     } | ||||
|     for (station, measurments) in stations { | ||||
|         let min = measurments.min; | ||||
|         let max = measurments.max; | ||||
|         let avg = avg(measurments.temps); | ||||
|         println!("{station}={min}/{max}/{avg}"); | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn avg(temps: Vec<f64>) -> f64 { | ||||
|     let num_temps = temps.len() as f64; | ||||
|     let sum_temps: f64 = temps.iter().sum(); | ||||
|     sum_temps / num_temps | ||||
| } | ||||
|  | ||||
| fn format_nums(num: usize) -> String { | ||||
|     num.to_string() | ||||
|         .as_bytes() | ||||
|         .rchunks(3) | ||||
|         .rev() | ||||
|         .map(std::str::from_utf8) | ||||
|         .collect::<Result<Vec<&str>, _>>() | ||||
|         .unwrap() | ||||
|         .join("_") | ||||
| } | ||||
							
								
								
									
										106
									
								
								src/main/rust/src/main.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										106
									
								
								src/main/rust/src/main.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,106 @@ | ||||
| use std::{ | ||||
|     fs::File, | ||||
|     io::{BufRead, BufReader}, | ||||
|     sync::{Arc, Mutex}, | ||||
|     thread, | ||||
| }; | ||||
|  | ||||
| use hashbrown::HashMap; | ||||
|  | ||||
| #[derive(Clone, Copy)] | ||||
| struct StationMeasurements { | ||||
|     min: f64, | ||||
|     max: f64, | ||||
|     count: usize, | ||||
|     sum: f64, | ||||
| } | ||||
|  | ||||
| fn main() { | ||||
|     let stations: Arc<Mutex<HashMap<String, StationMeasurements>>> = | ||||
|         Arc::new(Mutex::new(HashMap::new())); | ||||
|  | ||||
|     //let cores: usize = std::thread::available_parallelism().unwrap().into(); | ||||
|     let cores: usize = 4; | ||||
|  | ||||
|     let chunk_length = 1_000_000_000 / cores; | ||||
|     let mut handles = vec![]; | ||||
|     for i in 0..cores { | ||||
|         let file = File::open("../measurements.txt").expect("File measurements.txt not found"); | ||||
|         let reader = BufReader::new(file); | ||||
|         let line_chunk = reader.lines().skip(chunk_length * i).take(chunk_length); | ||||
|         let stations_clone = stations.clone(); | ||||
|         let handle = thread::spawn(move || { | ||||
|             let mut t_stations: HashMap<String, StationMeasurements> = HashMap::new(); | ||||
|             let mut line_num = 0; | ||||
|             for line in line_chunk { | ||||
|                 line_num += 1; | ||||
|                 let lineprint = chunk_length / 500; | ||||
|                 if line_num % lineprint == 0 { | ||||
|                     println!("Thread #{i}"); | ||||
|                     let formatted_line_num = format_nums(line_num); | ||||
|                     println!("Calculated {formatted_line_num} stations"); | ||||
|                 } | ||||
|                 let line = line.expect("could not read line"); | ||||
|                 if line_num < 10 { | ||||
|                     println!("{}", line); | ||||
|                 } | ||||
|                 let (station, temp) = line.split_once(';').expect("Error while splitting"); | ||||
|                 let temp = temp.parse().expect("Error while parsing temperature"); | ||||
|                 let measurements_option = t_stations.get_mut(station); | ||||
|                 if let Some(measurements) = measurements_option { | ||||
|                     if temp < measurements.min { | ||||
|                         measurements.min = temp; | ||||
|                     } else if temp > measurements.max { | ||||
|                         measurements.max = temp; | ||||
|                     } | ||||
|                     measurements.count += 1; | ||||
|                     measurements.sum += temp; | ||||
|                 } else { | ||||
|                     let measurements = StationMeasurements { | ||||
|                         min: temp, | ||||
|                         max: temp, | ||||
|                         count: 1, | ||||
|                         sum: temp, | ||||
|                     }; | ||||
|                     t_stations.insert(station.to_owned(), measurements); | ||||
|                 } | ||||
|             } | ||||
|             let mut stations_guard = stations_clone.lock().expect("Error while locking"); | ||||
|             for (station, measurements) in t_stations.iter() { | ||||
|                 let joined_measurements_options = stations_guard.get_mut(station.as_str()); | ||||
|                 if let Some(joined_measurements) = joined_measurements_options { | ||||
|                     if measurements.min < joined_measurements.min { | ||||
|                         joined_measurements.min = measurements.min; | ||||
|                     } else if measurements.max > joined_measurements.max { | ||||
|                         joined_measurements.max = measurements.max; | ||||
|                     } | ||||
|                     joined_measurements.count += measurements.count; | ||||
|                     joined_measurements.sum += measurements.sum; | ||||
|                 } else { | ||||
|                     stations_guard.insert(station.to_owned(), *measurements); | ||||
|                 } | ||||
|             } | ||||
|         }); | ||||
|         handles.push(handle); | ||||
|     } | ||||
|     for handle in handles { | ||||
|         handle.join().unwrap(); | ||||
|     } | ||||
|     for (station, measurments) in stations.lock().unwrap().iter() { | ||||
|         let min = measurments.min; | ||||
|         let max = measurments.max; | ||||
|         let avg = measurments.sum / measurments.count as f64; | ||||
|         println!("{station}={min}/{max}/{avg:.2}"); | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn format_nums(num: usize) -> String { | ||||
|     num.to_string() | ||||
|         .as_bytes() | ||||
|         .rchunks(3) | ||||
|         .rev() | ||||
|         .map(std::str::from_utf8) | ||||
|         .collect::<Result<Vec<&str>, _>>() | ||||
|         .unwrap() | ||||
|         .join("_") | ||||
| } | ||||
		Reference in New Issue
	
	Block a user