Compare commits
	
		
			8 Commits
		
	
	
		
			main
			...
			0adcf3dec5
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 0adcf3dec5 | ||
|  | b6e8b41bb1 | ||
|  | 18ea81eaaa | ||
|  | 180da512f6 | ||
|  | 14e55e8c5c | ||
|  | 9606b06313 | ||
|  | 6aea8d90cc | ||
|  | 9f257da903 | 
							
								
								
									
										2
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @@ -9,7 +9,7 @@ release.properties | |||||||
| .project | .project | ||||||
| .classpath | .classpath | ||||||
| .settings/ | .settings/ | ||||||
| bin/ | ./bin/ | ||||||
|  |  | ||||||
| # IntelliJ | # IntelliJ | ||||||
| .idea | .idea | ||||||
|   | |||||||
							
								
								
									
										1419
									
								
								rust/Cargo.lock
									
									
									
										generated
									
									
									
										Normal file
									
								
							
							
						
						
									
										1419
									
								
								rust/Cargo.lock
									
									
									
										generated
									
									
									
										Normal file
									
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							
							
								
								
									
										10
									
								
								rust/Cargo.toml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										10
									
								
								rust/Cargo.toml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,10 @@ | |||||||
|  | [package] | ||||||
|  | name = "rust" | ||||||
|  | version = "0.1.0" | ||||||
|  | edition = "2021" | ||||||
|  |  | ||||||
|  | # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||||||
|  |  | ||||||
|  | [dependencies] | ||||||
|  | hashbrown = "0.14.3" | ||||||
|  | polars = { version = "0.36.2", features = ["csv", "lazy", "nightly", "streaming"]} | ||||||
							
								
								
									
										105
									
								
								rust/src/bin/multi_threaded.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										105
									
								
								rust/src/bin/multi_threaded.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,105 @@ | |||||||
|  | use std::{ | ||||||
|  |     fs::File, | ||||||
|  |     io::{BufRead, BufReader}, | ||||||
|  |     sync::{Arc, Mutex}, | ||||||
|  |     thread, | ||||||
|  | }; | ||||||
|  |  | ||||||
|  | use hashbrown::HashMap; | ||||||
|  |  | ||||||
|  | #[derive(Clone, Copy)] | ||||||
|  | struct StationMeasurements { | ||||||
|  |     min: f64, | ||||||
|  |     max: f64, | ||||||
|  |     count: usize, | ||||||
|  |     sum: f64, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn main() { | ||||||
|  |     let stations: Arc<Mutex<HashMap<String, StationMeasurements>>> = | ||||||
|  |         Arc::new(Mutex::new(HashMap::new())); | ||||||
|  |  | ||||||
|  |     let cores: usize = std::thread::available_parallelism().unwrap().into(); | ||||||
|  |  | ||||||
|  |     let chunk_length = 1_000_000_000 / cores; | ||||||
|  |     let mut handles = vec![]; | ||||||
|  |     for i in 0..cores { | ||||||
|  |         let file = File::open("../measurements.txt").expect("File measurements.txt not found"); | ||||||
|  |         let reader = BufReader::new(file); | ||||||
|  |         let line_chunk = reader.lines().skip(chunk_length * i).take(chunk_length); | ||||||
|  |         let stations_clone = stations.clone(); | ||||||
|  |         let handle = thread::spawn(move || { | ||||||
|  |             let mut t_stations: HashMap<String, StationMeasurements> = HashMap::new(); | ||||||
|  |             let mut line_num = 0; | ||||||
|  |             for line in line_chunk { | ||||||
|  |                 line_num += 1; | ||||||
|  |                 let lineprint = chunk_length / 500; | ||||||
|  |                 if line_num % lineprint == 0 { | ||||||
|  |                     println!("Thread #{i}"); | ||||||
|  |                     let formatted_line_num = format_nums(line_num); | ||||||
|  |                     println!("Calculated {formatted_line_num} stations"); | ||||||
|  |                 } | ||||||
|  |                 let line = line.expect("could not read line"); | ||||||
|  |                 if line_num < 10 { | ||||||
|  |                     println!("{}", line); | ||||||
|  |                 } | ||||||
|  |                 let (station, temp) = line.split_once(';').expect("Error while splitting"); | ||||||
|  |                 let temp = temp.parse().expect("Error while parsing temperature"); | ||||||
|  |                 let measurements_option = t_stations.get_mut(station); | ||||||
|  |                 if let Some(measurements) = measurements_option { | ||||||
|  |                     if temp < measurements.min { | ||||||
|  |                         measurements.min = temp; | ||||||
|  |                     } else if temp > measurements.max { | ||||||
|  |                         measurements.max = temp; | ||||||
|  |                     } | ||||||
|  |                     measurements.count += 1; | ||||||
|  |                     measurements.sum += temp; | ||||||
|  |                 } else { | ||||||
|  |                     let measurements = StationMeasurements { | ||||||
|  |                         min: temp, | ||||||
|  |                         max: temp, | ||||||
|  |                         count: 1, | ||||||
|  |                         sum: temp, | ||||||
|  |                     }; | ||||||
|  |                     t_stations.insert(station.to_owned(), measurements); | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |             let mut stations_guard = stations_clone.lock().expect("Error while locking"); | ||||||
|  |             for (station, measurements) in t_stations.iter() { | ||||||
|  |                 let joined_measurements_options = stations_guard.get_mut(station.as_str()); | ||||||
|  |                 if let Some(joined_measurements) = joined_measurements_options { | ||||||
|  |                     if measurements.min < joined_measurements.min { | ||||||
|  |                         joined_measurements.min = measurements.min; | ||||||
|  |                     } else if measurements.max > joined_measurements.max { | ||||||
|  |                         joined_measurements.max = measurements.max; | ||||||
|  |                     } | ||||||
|  |                     joined_measurements.count += measurements.count; | ||||||
|  |                     joined_measurements.sum += measurements.sum; | ||||||
|  |                 } else { | ||||||
|  |                     stations_guard.insert(station.to_owned(), *measurements); | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         }); | ||||||
|  |         handles.push(handle); | ||||||
|  |     } | ||||||
|  |     for handle in handles { | ||||||
|  |         handle.join().unwrap(); | ||||||
|  |     } | ||||||
|  |     for (station, measurments) in stations.lock().unwrap().iter() { | ||||||
|  |         let min = measurments.min; | ||||||
|  |         let max = measurments.max; | ||||||
|  |         let avg = measurments.sum / measurments.count as f64; | ||||||
|  |         println!("{station}={min}/{max}/{avg:.2}"); | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn format_nums(num: usize) -> String { | ||||||
|  |     num.to_string() | ||||||
|  |         .as_bytes() | ||||||
|  |         .rchunks(3) | ||||||
|  |         .rev() | ||||||
|  |         .map(std::str::from_utf8) | ||||||
|  |         .collect::<Result<Vec<&str>, _>>() | ||||||
|  |         .unwrap() | ||||||
|  |         .join("_") | ||||||
|  | } | ||||||
							
								
								
									
										35
									
								
								rust/src/bin/polars.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										35
									
								
								rust/src/bin/polars.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,35 @@ | |||||||
|  | use polars::prelude::*; | ||||||
|  | use std::time::Instant; | ||||||
|  | use std::vec; | ||||||
|  |  | ||||||
|  | fn run_polars() -> Result<DataFrame, PolarsError> { | ||||||
|  |     let now = Instant::now(); | ||||||
|  |  | ||||||
|  |     let f1: Field = Field::new("station", DataType::String); | ||||||
|  |     let f2: Field = Field::new("measure", DataType::Float64); | ||||||
|  |     let sc: Schema = Schema::from_iter(vec![f1, f2]); | ||||||
|  |  | ||||||
|  |     let q = LazyCsvReader::new("../measurements.txt") | ||||||
|  |         .has_header(false) | ||||||
|  |         .with_schema(Some(Arc::new(sc))) | ||||||
|  |         .with_separator(b';') | ||||||
|  |         .finish()? | ||||||
|  |         .group_by(vec![col("station")]) | ||||||
|  |         .agg(vec![ | ||||||
|  |             col("measure").alias("min").min(), | ||||||
|  |             col("measure").alias("mean").mean(), | ||||||
|  |             col("measure").alias("max").max(), | ||||||
|  |         ]) | ||||||
|  |         .sort("station", Default::default()) | ||||||
|  |         .with_streaming(true); | ||||||
|  |  | ||||||
|  |     let df = q.collect()?; | ||||||
|  |  | ||||||
|  |     println!("Time={} μs", now.elapsed().as_micros()); | ||||||
|  |  | ||||||
|  |     Ok(df) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn main() { | ||||||
|  |     run_polars(); | ||||||
|  | } | ||||||
							
								
								
									
										69
									
								
								rust/src/bin/single_thread.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								rust/src/bin/single_thread.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,69 @@ | |||||||
|  | use std::{ | ||||||
|  |     fs::File, | ||||||
|  |     io::{BufRead, BufReader}, | ||||||
|  | }; | ||||||
|  |  | ||||||
|  | use hashbrown::HashMap; | ||||||
|  |  | ||||||
|  | struct StationMeasurements { | ||||||
|  |     min: f64, | ||||||
|  |     max: f64, | ||||||
|  |     temps: Vec<f64>, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn main() { | ||||||
|  |     let mut stations: HashMap<String, StationMeasurements> = HashMap::new(); | ||||||
|  |  | ||||||
|  |     let file = File::open("../measurements.txt").expect("File measurements.txt not found"); | ||||||
|  |     let reader = BufReader::new(file); | ||||||
|  |     let mut line_num = 0; | ||||||
|  |     for line_result in reader.lines() { | ||||||
|  |         line_num += 1; | ||||||
|  |         if line_num % 250000 == 0 { | ||||||
|  |             let formatted_line_num = format_nums(line_num); | ||||||
|  |             println!("Calculated {formatted_line_num} stations"); | ||||||
|  |         } | ||||||
|  |         let line = line_result.expect("could not read line"); | ||||||
|  |         let (station, temp) = line.split_once(';').unwrap(); | ||||||
|  |         let temp = temp.parse().unwrap(); | ||||||
|  |         let measurements_option = stations.get_mut(station); | ||||||
|  |         if let Some(measurements) = measurements_option { | ||||||
|  |             if temp < measurements.min { | ||||||
|  |                 measurements.min = temp; | ||||||
|  |             } else if temp > measurements.max { | ||||||
|  |                 measurements.max = temp; | ||||||
|  |             } | ||||||
|  |             measurements.temps.push(temp); | ||||||
|  |         } else { | ||||||
|  |             let measurements = StationMeasurements { | ||||||
|  |                 min: temp, | ||||||
|  |                 max: temp, | ||||||
|  |                 temps: vec![temp], | ||||||
|  |             }; | ||||||
|  |             stations.insert(station.to_owned(), measurements); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |     for (station, measurments) in stations { | ||||||
|  |         let min = measurments.min; | ||||||
|  |         let max = measurments.max; | ||||||
|  |         let avg = avg(measurments.temps); | ||||||
|  |         println!("{station}={min}/{max}/{avg}"); | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn avg(temps: Vec<f64>) -> f64 { | ||||||
|  |     let num_temps = temps.len() as f64; | ||||||
|  |     let sum_temps: f64 = temps.iter().sum(); | ||||||
|  |     sum_temps / num_temps | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn format_nums(num: usize) -> String { | ||||||
|  |     num.to_string() | ||||||
|  |         .as_bytes() | ||||||
|  |         .rchunks(3) | ||||||
|  |         .rev() | ||||||
|  |         .map(std::str::from_utf8) | ||||||
|  |         .collect::<Result<Vec<&str>, _>>() | ||||||
|  |         .unwrap() | ||||||
|  |         .join("_") | ||||||
|  | } | ||||||
							
								
								
									
										106
									
								
								rust/src/main.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										106
									
								
								rust/src/main.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,106 @@ | |||||||
|  | use std::{ | ||||||
|  |     fs::File, | ||||||
|  |     io::{BufRead, BufReader}, | ||||||
|  |     sync::{Arc, Mutex}, | ||||||
|  |     thread, | ||||||
|  | }; | ||||||
|  |  | ||||||
|  | use hashbrown::HashMap; | ||||||
|  |  | ||||||
|  | #[derive(Clone, Copy)] | ||||||
|  | struct StationMeasurements { | ||||||
|  |     min: f64, | ||||||
|  |     max: f64, | ||||||
|  |     count: usize, | ||||||
|  |     sum: f64, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn main() { | ||||||
|  |     let stations: Arc<Mutex<HashMap<String, StationMeasurements>>> = | ||||||
|  |         Arc::new(Mutex::new(HashMap::new())); | ||||||
|  |  | ||||||
|  |     //let cores: usize = std::thread::available_parallelism().unwrap().into(); | ||||||
|  |     let cores: usize = 4; | ||||||
|  |  | ||||||
|  |     let chunk_length = 1_000_000_000 / cores; | ||||||
|  |     let mut handles = vec![]; | ||||||
|  |     for i in 0..cores { | ||||||
|  |         let file = File::open("../measurements.txt").expect("File measurements.txt not found"); | ||||||
|  |         let reader = BufReader::new(file); | ||||||
|  |         let line_chunk = reader.lines().skip(chunk_length * i).take(chunk_length); | ||||||
|  |         let stations_clone = stations.clone(); | ||||||
|  |         let handle = thread::spawn(move || { | ||||||
|  |             let mut t_stations: HashMap<String, StationMeasurements> = HashMap::new(); | ||||||
|  |             let mut line_num = 0; | ||||||
|  |             for line in line_chunk { | ||||||
|  |                 line_num += 1; | ||||||
|  |                 let lineprint = chunk_length / 500; | ||||||
|  |                 if line_num % lineprint == 0 { | ||||||
|  |                     println!("Thread #{i}"); | ||||||
|  |                     let formatted_line_num = format_nums(line_num); | ||||||
|  |                     println!("Calculated {formatted_line_num} stations"); | ||||||
|  |                 } | ||||||
|  |                 let line = line.expect("could not read line"); | ||||||
|  |                 if line_num < 10 { | ||||||
|  |                     println!("{}", line); | ||||||
|  |                 } | ||||||
|  |                 let (station, temp) = line.split_once(';').expect("Error while splitting"); | ||||||
|  |                 let temp = temp.parse().expect("Error while parsing temperature"); | ||||||
|  |                 let measurements_option = t_stations.get_mut(station); | ||||||
|  |                 if let Some(measurements) = measurements_option { | ||||||
|  |                     if temp < measurements.min { | ||||||
|  |                         measurements.min = temp; | ||||||
|  |                     } else if temp > measurements.max { | ||||||
|  |                         measurements.max = temp; | ||||||
|  |                     } | ||||||
|  |                     measurements.count += 1; | ||||||
|  |                     measurements.sum += temp; | ||||||
|  |                 } else { | ||||||
|  |                     let measurements = StationMeasurements { | ||||||
|  |                         min: temp, | ||||||
|  |                         max: temp, | ||||||
|  |                         count: 1, | ||||||
|  |                         sum: temp, | ||||||
|  |                     }; | ||||||
|  |                     t_stations.insert(station.to_owned(), measurements); | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |             let mut stations_guard = stations_clone.lock().expect("Error while locking"); | ||||||
|  |             for (station, measurements) in t_stations.iter() { | ||||||
|  |                 let joined_measurements_options = stations_guard.get_mut(station.as_str()); | ||||||
|  |                 if let Some(joined_measurements) = joined_measurements_options { | ||||||
|  |                     if measurements.min < joined_measurements.min { | ||||||
|  |                         joined_measurements.min = measurements.min; | ||||||
|  |                     } else if measurements.max > joined_measurements.max { | ||||||
|  |                         joined_measurements.max = measurements.max; | ||||||
|  |                     } | ||||||
|  |                     joined_measurements.count += measurements.count; | ||||||
|  |                     joined_measurements.sum += measurements.sum; | ||||||
|  |                 } else { | ||||||
|  |                     stations_guard.insert(station.to_owned(), *measurements); | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         }); | ||||||
|  |         handles.push(handle); | ||||||
|  |     } | ||||||
|  |     for handle in handles { | ||||||
|  |         handle.join().unwrap(); | ||||||
|  |     } | ||||||
|  |     for (station, measurments) in stations.lock().unwrap().iter() { | ||||||
|  |         let min = measurments.min; | ||||||
|  |         let max = measurments.max; | ||||||
|  |         let avg = measurments.sum / measurments.count as f64; | ||||||
|  |         println!("{station}={min}/{max}/{avg:.2}"); | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn format_nums(num: usize) -> String { | ||||||
|  |     num.to_string() | ||||||
|  |         .as_bytes() | ||||||
|  |         .rchunks(3) | ||||||
|  |         .rev() | ||||||
|  |         .map(std::str::from_utf8) | ||||||
|  |         .collect::<Result<Vec<&str>, _>>() | ||||||
|  |         .unwrap() | ||||||
|  |         .join("_") | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user