From fe1053b74a7db2c72ffe06807da491b323598151 Mon Sep 17 00:00:00 2001 From: Fabian Schmidt Date: Tue, 23 Jul 2024 13:23:26 +0200 Subject: [PATCH] Multi threaded works now but it's slower than single threaded... --- rust/src/bin/multi_threaded.rs | 105 +++++++++++++++++++++++++++++++++ rust/src/bin/single_thread.rs | 16 ++++- rust/src/main.rs | 55 ++++++++++++++--- 3 files changed, 166 insertions(+), 10 deletions(-) create mode 100644 rust/src/bin/multi_threaded.rs diff --git a/rust/src/bin/multi_threaded.rs b/rust/src/bin/multi_threaded.rs new file mode 100644 index 0000000..1d07386 --- /dev/null +++ b/rust/src/bin/multi_threaded.rs @@ -0,0 +1,105 @@ +use std::{ + fs::File, + io::{BufRead, BufReader}, + sync::{Arc, Mutex}, + thread, +}; + +use hashbrown::HashMap; + +#[derive(Clone, Copy)] +struct StationMeasurements { + min: f64, + max: f64, + count: usize, + sum: f64, +} + +fn main() { + let stations: Arc>> = + Arc::new(Mutex::new(HashMap::new())); + + let cores: usize = std::thread::available_parallelism().unwrap().into(); + + let chunk_length = 1_000_000_000 / cores; + let mut handles = vec![]; + for i in 0..cores { + let file = File::open("../measurements.txt").expect("File measurements.txt not found"); + let reader = BufReader::new(file); + let line_chunk = reader.lines().skip(chunk_length * i).take(chunk_length); + let stations_clone = stations.clone(); + let handle = thread::spawn(move || { + let mut t_stations: HashMap = HashMap::new(); + let mut line_num = 0; + for line in line_chunk { + line_num += 1; + let lineprint = chunk_length / 500; + if line_num % lineprint == 0 { + println!("Thread #{i}"); + let formatted_line_num = format_nums(line_num); + println!("Calculated {formatted_line_num} stations"); + } + let line = line.expect("could not read line"); + if line_num < 10 { + println!("{}", line); + } + let (station, temp) = line.split_once(';').expect("Error while splitting"); + let temp = temp.parse().expect("Error while parsing temperature"); + let measurements_option = t_stations.get_mut(station); + if let Some(measurements) = measurements_option { + if temp < measurements.min { + measurements.min = temp; + } else if temp > measurements.max { + measurements.max = temp; + } + measurements.count += 1; + measurements.sum += temp; + } else { + let measurements = StationMeasurements { + min: temp, + max: temp, + count: 1, + sum: temp, + }; + t_stations.insert(station.to_owned(), measurements); + } + } + let mut stations_guard = stations_clone.lock().expect("Error while locking"); + for (station, measurements) in t_stations.iter() { + let joined_measurements_options = stations_guard.get_mut(station.as_str()); + if let Some(joined_measurements) = joined_measurements_options { + if measurements.min < joined_measurements.min { + joined_measurements.min = measurements.min; + } else if measurements.max > joined_measurements.max { + joined_measurements.max = measurements.max; + } + joined_measurements.count += measurements.count; + joined_measurements.sum += measurements.sum; + } else { + stations_guard.insert(station.to_owned(), *measurements); + } + } + }); + handles.push(handle); + } + for handle in handles { + handle.join().unwrap(); + } + for (station, measurments) in stations.lock().unwrap().iter() { + let min = measurments.min; + let max = measurments.max; + let avg = measurments.sum / measurments.count as f64; + println!("{station}={min}/{max}/{avg:.2}"); + } +} + +fn format_nums(num: usize) -> String { + num.to_string() + .as_bytes() + .rchunks(3) + .rev() + .map(std::str::from_utf8) + .collect::, _>>() + .unwrap() + .join("_") +} diff --git a/rust/src/bin/single_thread.rs b/rust/src/bin/single_thread.rs index 63f02e2..4f07e84 100644 --- a/rust/src/bin/single_thread.rs +++ b/rust/src/bin/single_thread.rs @@ -19,8 +19,9 @@ fn main() { let mut line_num = 0; for line_result in reader.lines() { line_num += 1; - if line_num % 50000 == 0 { - println!("Calculated {line_num} stations"); + if line_num % 250000 == 0 { + let formatted_line_num = format_nums(line_num); + println!("Calculated {formatted_line_num} stations"); } let line = line_result.expect("could not read line"); let (station, temp) = line.split_once(';').unwrap(); @@ -55,3 +56,14 @@ fn avg(temps: Vec) -> f64 { let sum_temps: f64 = temps.iter().sum(); sum_temps / num_temps } + +fn format_nums(num: usize) -> String { + num.to_string() + .as_bytes() + .rchunks(3) + .rev() + .map(std::str::from_utf8) + .collect::, _>>() + .unwrap() + .join("_") +} diff --git a/rust/src/main.rs b/rust/src/main.rs index 0c2f5db..e7d54e3 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -1,11 +1,13 @@ use std::{ - collections::HashMap, fs::File, io::{BufRead, BufReader}, sync::{Arc, Mutex}, thread, }; +use hashbrown::HashMap; + +#[derive(Clone, Copy)] struct StationMeasurements { min: f64, max: f64, @@ -17,23 +19,34 @@ fn main() { let stations: Arc>> = Arc::new(Mutex::new(HashMap::new())); - let cores: usize = std::thread::available_parallelism().unwrap().into(); + //let cores: usize = std::thread::available_parallelism().unwrap().into(); + let cores: usize = 4; - let file = File::open("../measurements.txt").expect("File measurements.txt not found"); let chunk_length = 1_000_000_000 / cores; let mut handles = vec![]; for i in 0..cores { - let reader = BufReader::new(file.try_clone().unwrap()); + let file = File::open("../measurements.txt").expect("File measurements.txt not found"); + let reader = BufReader::new(file); let line_chunk = reader.lines().skip(chunk_length * i).take(chunk_length); let stations_clone = stations.clone(); let handle = thread::spawn(move || { + let mut t_stations: HashMap = HashMap::new(); + let mut line_num = 0; for line in line_chunk { + line_num += 1; + let lineprint = chunk_length / 500; + if line_num % lineprint == 0 { + println!("Thread #{i}"); + let formatted_line_num = format_nums(line_num); + println!("Calculated {formatted_line_num} stations"); + } let line = line.expect("could not read line"); - println!("Thread #{i}: {line}"); + if line_num < 10 { + println!("{}", line); + } let (station, temp) = line.split_once(';').expect("Error while splitting"); let temp = temp.parse().expect("Error while parsing temperature"); - let mut stations_guard = stations_clone.lock().expect("Error while locking thread"); - let measurements_option = stations_guard.get_mut(station); + let measurements_option = t_stations.get_mut(station); if let Some(measurements) = measurements_option { if temp < measurements.min { measurements.min = temp; @@ -49,7 +62,22 @@ fn main() { count: 1, sum: temp, }; - stations_guard.insert(station.to_owned(), measurements); + t_stations.insert(station.to_owned(), measurements); + } + } + let mut stations_guard = stations_clone.lock().expect("Error while locking"); + for (station, measurements) in t_stations.iter() { + let joined_measurements_options = stations_guard.get_mut(station.as_str()); + if let Some(joined_measurements) = joined_measurements_options { + if measurements.min < joined_measurements.min { + joined_measurements.min = measurements.min; + } else if measurements.max > joined_measurements.max { + joined_measurements.max = measurements.max; + } + joined_measurements.count += measurements.count; + joined_measurements.sum += measurements.sum; + } else { + stations_guard.insert(station.to_owned(), *measurements); } } }); @@ -65,3 +93,14 @@ fn main() { println!("{station}={min}/{max}/{avg:.2}"); } } + +fn format_nums(num: usize) -> String { + num.to_string() + .as_bytes() + .rchunks(3) + .rev() + .map(std::str::from_utf8) + .collect::, _>>() + .unwrap() + .join("_") +}