Fastest yet with scaled integers instead of floats
This commit is contained in:
		
							
								
								
									
										1306
									
								
								src/main/rust/Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										1306
									
								
								src/main/rust/Cargo.lock
									
									
									
										generated
									
									
									
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							| @@ -1,5 +1,5 @@ | ||||
| [package] | ||||
| name = "rust" | ||||
| name = "onebrc" | ||||
| version = "0.1.0" | ||||
| edition = "2021" | ||||
|  | ||||
| @@ -8,12 +8,9 @@ edition = "2021" | ||||
| [dependencies] | ||||
| bstr = "1.9.1" | ||||
| fast-float = "0.2.0" | ||||
| hashbrown = "0.14.3" | ||||
| memchr = "2.7.4" | ||||
| memmap = "0.7.0" | ||||
| polars = { version = "0.36.2", features = ["csv", "lazy", "nightly", "streaming"]} | ||||
| #polars = { version = "0.36.2", features = ["csv", "lazy", "nightly", "streaming"]} | ||||
| rayon = "1.10.0" | ||||
| rustc-hash = "2.0.0" | ||||
|  | ||||
| [build] | ||||
| rustflags = ["-C target-cpu=native"] | ||||
|   | ||||
| @@ -4,61 +4,40 @@ use std::{ | ||||
|     sync::{Arc, Mutex}, | ||||
|     thread, | ||||
| }; | ||||
|  | ||||
| use hashbrown::HashMap; | ||||
|  | ||||
| #[derive(Clone, Copy)] | ||||
| struct StationMeasurements { | ||||
|     min: f64, | ||||
|     max: f64, | ||||
|     count: usize, | ||||
|     sum: f64, | ||||
| } | ||||
| use std::collections::HashMap; | ||||
| use std::time::Instant; | ||||
|  | ||||
| const DEFAULT_HASHMAP_LENGTH: usize = 10000; | ||||
|  | ||||
| fn main() { | ||||
|     let stations: Arc<Mutex<HashMap<String, StationMeasurements>>> = | ||||
|     let now = Instant::now(); | ||||
|     let stations: Arc<Mutex<HashMap<String, onebrc::StationMeasurements>>> = | ||||
|         Arc::new(Mutex::new(HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH))); | ||||
|  | ||||
|     let cores: usize = std::thread::available_parallelism().unwrap().into(); | ||||
|     let cores: usize = thread::available_parallelism().unwrap().into(); | ||||
|  | ||||
|     let chunk_length = 1_000_000_000 / cores; | ||||
|     let mut handles = vec![]; | ||||
|     for i in 0..cores { | ||||
|         let file = File::open("../measurements.txt").expect("File measurements.txt not found"); | ||||
|         let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); | ||||
|         let reader = BufReader::new(file); | ||||
|         let line_chunk = reader.lines().skip(chunk_length * i).take(chunk_length); | ||||
|         let stations_clone = stations.clone(); | ||||
|         let handle = thread::spawn(move || { | ||||
|             let mut t_stations: HashMap<String, StationMeasurements> = | ||||
|             let mut t_stations: HashMap<String, onebrc::StationMeasurements> = | ||||
|                 HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); | ||||
|             let mut line_num = 0; | ||||
|             for line in line_chunk { | ||||
|                 line_num += 1; | ||||
|                 let lineprint = chunk_length / 500; | ||||
|                 if line_num % lineprint == 0 { | ||||
|                     println!("Thread #{i}"); | ||||
|                     let formatted_line_num = format_nums(line_num); | ||||
|                     println!("Calculated {formatted_line_num} stations"); | ||||
|                 } | ||||
|  | ||||
|             let now_read_line = Instant::now(); | ||||
|             println!("Start reading lines in thread {i}"); | ||||
|             line_chunk.for_each(|line| { | ||||
|                 let line = line.expect("could not read line"); | ||||
|                 if line_num < 10 { | ||||
|                     println!("{}", line); | ||||
|                 } | ||||
|                 let (station, temp) = line.split_once(';').expect("Error while splitting"); | ||||
|                 let temp = temp.parse().expect("Error while parsing temperature"); | ||||
|                 let temp = onebrc::parse_temp(temp.as_bytes()); | ||||
|                 let measurements_option = t_stations.get_mut(station); | ||||
|                 if let Some(measurements) = measurements_option { | ||||
|                     if temp < measurements.min { | ||||
|                         measurements.min = temp; | ||||
|                     } else if temp > measurements.max { | ||||
|                         measurements.max = temp; | ||||
|                     } | ||||
|                     measurements.count += 1; | ||||
|                     measurements.sum += temp; | ||||
|                     measurements.update(temp); | ||||
|                 } else { | ||||
|                     let measurements = StationMeasurements { | ||||
|                     let measurements = onebrc::StationMeasurements { | ||||
|                         min: temp, | ||||
|                         max: temp, | ||||
|                         count: 1, | ||||
| @@ -66,43 +45,32 @@ fn main() { | ||||
|                     }; | ||||
|                     t_stations.insert(station.to_owned(), measurements); | ||||
|                 } | ||||
|             } | ||||
|             let mut stations_guard = stations_clone.lock().expect("Error while locking"); | ||||
|             }); | ||||
|             println!("Time reading lines in thread {i}={} μs", now_read_line.elapsed().as_micros()); | ||||
|             let now_insert_line = Instant::now(); | ||||
|             println!("Start inserting lines in thread {i}"); | ||||
|             for (station, measurements) in t_stations.iter() { | ||||
|                 let mut stations_guard = stations_clone.lock().expect("Error while locking"); | ||||
|                 let joined_measurements_options = stations_guard.get_mut(station.as_str()); | ||||
|                 if let Some(joined_measurements) = joined_measurements_options { | ||||
|                     if measurements.min < joined_measurements.min { | ||||
|                         joined_measurements.min = measurements.min; | ||||
|                     } else if measurements.max > joined_measurements.max { | ||||
|                         joined_measurements.max = measurements.max; | ||||
|                     } | ||||
|                     joined_measurements.count += measurements.count; | ||||
|                     joined_measurements.sum += measurements.sum; | ||||
|                     joined_measurements.merge(measurements); | ||||
|                 } else { | ||||
|                     stations_guard.insert(station.to_owned(), *measurements); | ||||
|                 } | ||||
|             } | ||||
|             println!("Time inserting lines in thread {i}={} μs", now_insert_line.elapsed().as_micros()); | ||||
|         }); | ||||
|         handles.push(handle); | ||||
|     } | ||||
|     for handle in handles { | ||||
|         handle.join().unwrap(); | ||||
|     } | ||||
|     for (station, measurments) in stations.lock().unwrap().iter() { | ||||
|         let min = measurments.min; | ||||
|         let max = measurments.max; | ||||
|         let avg = measurments.sum / measurments.count as f64; | ||||
|         println!("{station}={min}/{max}/{avg:.2}"); | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn format_nums(num: usize) -> String { | ||||
|     num.to_string() | ||||
|         .as_bytes() | ||||
|         .rchunks(3) | ||||
|         .rev() | ||||
|         .map(std::str::from_utf8) | ||||
|         .collect::<Result<Vec<&str>, _>>() | ||||
|         .unwrap() | ||||
|         .join("_") | ||||
|     let mut stations: Vec<String> = stations.lock().unwrap().iter().map(|(&ref station, &ref measurements)| { | ||||
|         let measurements = measurements.to_string(); | ||||
|         format!("{station}={measurements}") | ||||
|     }).collect(); | ||||
|     stations.sort(); | ||||
|     let stations = stations.join(","); | ||||
|     println!("{{{stations}}}"); | ||||
|     println!("Time={} μs", now.elapsed().as_micros()); | ||||
| } | ||||
|   | ||||
| @@ -9,7 +9,7 @@ fn run_polars() -> Result<DataFrame, PolarsError> { | ||||
|     let f2: Field = Field::new("measure", DataType::Float64); | ||||
|     let sc: Schema = Schema::from_iter(vec![f1, f2]); | ||||
|  | ||||
|     let q = LazyCsvReader::new("../measurements.txt") | ||||
|     let q = LazyCsvReader::new("../../../measurements.txt") | ||||
|         .has_header(false) | ||||
|         .with_schema(Some(Arc::new(sc))) | ||||
|         .with_separator(b';') | ||||
|   | ||||
| @@ -68,7 +68,8 @@ fn merge<'a>(a: &mut HashMap<&'a BStr, State>, b: &HashMap<&'a BStr, State>) { | ||||
| } | ||||
|  | ||||
| fn main() { | ||||
|     let cores: usize = std::thread::available_parallelism().unwrap().into(); | ||||
|     //let cores: usize = std::thread::available_parallelism().unwrap().into(); | ||||
|     let cores: usize = 1; | ||||
|     let path = match std::env::args().skip(1).next() { | ||||
|         Some(path) => path, | ||||
|         None => "measurements.txt".to_owned(), | ||||
|   | ||||
| @@ -2,71 +2,43 @@ use std::{ | ||||
|     fs::File, | ||||
|     io::{BufRead, BufReader}, | ||||
| }; | ||||
| use std::collections::HashMap; | ||||
| use std::time::Instant; | ||||
|  | ||||
| use hashbrown::HashMap; | ||||
|  | ||||
| struct StationMeasurements { | ||||
|     min: f64, | ||||
|     max: f64, | ||||
|     temps: Vec<f64>, | ||||
| } | ||||
|  | ||||
| const DEFAULT_HASHMAP_LENGTH: usize = 10000; | ||||
|  | ||||
| fn main() { | ||||
|     let mut stations: HashMap<String, StationMeasurements> = | ||||
|     let now = Instant::now(); | ||||
|     let mut stations: HashMap<String, onebrc::StationMeasurements> = | ||||
|         HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); | ||||
|  | ||||
|     let file = File::open("../measurements.txt").expect("File measurements.txt not found"); | ||||
|     let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); | ||||
|     let reader = BufReader::new(file); | ||||
|     let mut line_num = 0; | ||||
|     for line_result in reader.lines() { | ||||
|         line_num += 1; | ||||
|         if line_num % 250000 == 0 { | ||||
|             let formatted_line_num = format_nums(line_num); | ||||
|             println!("Calculated {formatted_line_num} stations"); | ||||
|         } | ||||
|         let line = line_result.expect("could not read line"); | ||||
|         let (station, temp) = line.split_once(';').unwrap(); | ||||
|         let temp = temp.parse().unwrap(); | ||||
|         let temp = onebrc::parse_temp(temp.as_bytes()); | ||||
|         let measurements_option = stations.get_mut(station); | ||||
|         if let Some(measurements) = measurements_option { | ||||
|             if temp < measurements.min { | ||||
|                 measurements.min = temp; | ||||
|             } else if temp > measurements.max { | ||||
|                 measurements.max = temp; | ||||
|             } | ||||
|             measurements.temps.push(temp); | ||||
|             measurements.update(temp); | ||||
|         } else { | ||||
|             let measurements = StationMeasurements { | ||||
|             let measurements = onebrc::StationMeasurements { | ||||
|                 min: temp, | ||||
|                 max: temp, | ||||
|                 temps: vec![temp], | ||||
|                 count: 1, | ||||
|                 sum: temp, | ||||
|             }; | ||||
|             stations.insert(station.to_owned(), measurements); | ||||
|         } | ||||
|     } | ||||
|     for (station, measurments) in stations { | ||||
|         let min = measurments.min; | ||||
|         let max = measurments.max; | ||||
|         let avg = avg(measurments.temps); | ||||
|         println!("{station}={min}/{max}/{avg}"); | ||||
|     } | ||||
|     let mut stations: Vec<String> = stations.iter().map(|(&ref station, &ref measurements)| { | ||||
|         let measurements = measurements.to_string(); | ||||
|         format!("{station}={measurements}") | ||||
|     }).collect(); | ||||
|     stations.sort(); | ||||
|     let stations = stations.join(","); | ||||
|     println!("{{{stations}}}"); | ||||
|     println!("Time={} μs", now.elapsed().as_micros()); | ||||
| } | ||||
|  | ||||
| fn avg(temps: Vec<f64>) -> f64 { | ||||
|     let num_temps = temps.len() as f64; | ||||
|     let sum_temps: f64 = temps.iter().sum(); | ||||
|     sum_temps / num_temps | ||||
| } | ||||
|  | ||||
| fn format_nums(num: usize) -> String { | ||||
|     num.to_string() | ||||
|         .as_bytes() | ||||
|         .rchunks(3) | ||||
|         .rev() | ||||
|         .map(std::str::from_utf8) | ||||
|         .collect::<Result<Vec<&str>, _>>() | ||||
|         .unwrap() | ||||
|         .join("_") | ||||
| } | ||||
|   | ||||
							
								
								
									
										67
									
								
								src/main/rust/src/lib.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										67
									
								
								src/main/rust/src/lib.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,67 @@ | ||||
| use std::fmt::Display; | ||||
|  | ||||
| #[derive(Copy, Clone)] | ||||
| pub struct StationMeasurements { | ||||
|     pub min: isize, | ||||
|     pub max: isize, | ||||
|     pub count: isize, | ||||
|     pub sum: isize, | ||||
| } | ||||
|  | ||||
| impl StationMeasurements { | ||||
|     pub fn update(&mut self, v: isize) { | ||||
|         self.min = self.min.min(v); | ||||
|         self.max = self.max.max(v); | ||||
|         self.count += 1; | ||||
|         self.sum += v; | ||||
|     } | ||||
|  | ||||
|     pub fn merge(&mut self, other: &Self) { | ||||
|         self.min = self.min.min(other.min); | ||||
|         self.max = self.max.max(other.max); | ||||
|         self.count += other.count; | ||||
|         self.sum += other.sum; | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Display for StationMeasurements { | ||||
|     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||||
|         let min = self.min as f64 / 10.0; | ||||
|         let max = self.max as f64 / 10.0; | ||||
|         let avg = (self.sum as f64 / self.count as f64) / 10.0; | ||||
|         write!(f, "{min}/{avg:.1}/{max}") | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub fn format_nums(num: usize) -> String { | ||||
|     num.to_string() | ||||
|         .as_bytes() | ||||
|         .rchunks(3) | ||||
|         .rev() | ||||
|         .map(std::str::from_utf8) | ||||
|         .collect::<Result<Vec<&str>, _>>() | ||||
|         .unwrap() | ||||
|         .join("_") | ||||
| } | ||||
|  | ||||
| #[inline] | ||||
| pub const fn get_digit(b: u8) -> u32 { | ||||
|     (b as u32).wrapping_sub('0' as u32) | ||||
| } | ||||
|  | ||||
| #[inline] | ||||
| pub fn parse_temp(bytes: &[u8]) -> isize { | ||||
|     let is_negative = bytes[0] == b'-'; | ||||
|     let as_decimal = match (is_negative, bytes.len()) { | ||||
|         (true, 4) => get_digit(bytes[1]) * 10 + get_digit(bytes[3]), | ||||
|         (true, 5) => get_digit(bytes[1]) * 100 + get_digit(bytes[2]) * 10 + get_digit(bytes[4]), | ||||
|         (false, 3) => get_digit(bytes[0]) * 10 + get_digit(bytes[2]), | ||||
|         (false, 4) => get_digit(bytes[0]) * 100 + get_digit(bytes[1]) * 10 + get_digit(bytes[3]), | ||||
|         _x => panic!(), | ||||
|     }; | ||||
|     if is_negative { | ||||
|         -(as_decimal as isize) | ||||
|     } else { | ||||
|         as_decimal as isize | ||||
|     } | ||||
| } | ||||
| @@ -1,106 +1,8 @@ | ||||
| use std::{ | ||||
|     fs::File, | ||||
|     io::{BufRead, BufReader}, | ||||
|     sync::{Arc, Mutex}, | ||||
|     thread, | ||||
| }; | ||||
|  | ||||
| use hashbrown::HashMap; | ||||
|  | ||||
| #[derive(Clone, Copy)] | ||||
| struct StationMeasurements { | ||||
|     min: f64, | ||||
|     max: f64, | ||||
|     count: usize, | ||||
|     sum: f64, | ||||
| } | ||||
|  | ||||
| fn main() { | ||||
|     let stations: Arc<Mutex<HashMap<String, StationMeasurements>>> = | ||||
|         Arc::new(Mutex::new(HashMap::new())); | ||||
|  | ||||
|     //let cores: usize = std::thread::available_parallelism().unwrap().into(); | ||||
|     let cores: usize = 4; | ||||
|  | ||||
|     let chunk_length = 1_000_000_000 / cores; | ||||
|     let mut handles = vec![]; | ||||
|     for i in 0..cores { | ||||
|         let file = File::open("../measurements.txt").expect("File measurements.txt not found"); | ||||
|         let reader = BufReader::new(file); | ||||
|         let line_chunk = reader.lines().skip(chunk_length * i).take(chunk_length); | ||||
|         let stations_clone = stations.clone(); | ||||
|         let handle = thread::spawn(move || { | ||||
|             let mut t_stations: HashMap<String, StationMeasurements> = HashMap::new(); | ||||
|             let mut line_num = 0; | ||||
|             for line in line_chunk { | ||||
|                 line_num += 1; | ||||
|                 let lineprint = chunk_length / 500; | ||||
|                 if line_num % lineprint == 0 { | ||||
|                     println!("Thread #{i}"); | ||||
|                     let formatted_line_num = format_nums(line_num); | ||||
|                     println!("Calculated {formatted_line_num} stations"); | ||||
|                 } | ||||
|                 let line = line.expect("could not read line"); | ||||
|                 if line_num < 10 { | ||||
|                     println!("{}", line); | ||||
|                 } | ||||
|                 let (station, temp) = line.split_once(';').expect("Error while splitting"); | ||||
|                 let temp = temp.parse().expect("Error while parsing temperature"); | ||||
|                 let measurements_option = t_stations.get_mut(station); | ||||
|                 if let Some(measurements) = measurements_option { | ||||
|                     if temp < measurements.min { | ||||
|                         measurements.min = temp; | ||||
|                     } else if temp > measurements.max { | ||||
|                         measurements.max = temp; | ||||
|                     } | ||||
|                     measurements.count += 1; | ||||
|                     measurements.sum += temp; | ||||
|                 } else { | ||||
|                     let measurements = StationMeasurements { | ||||
|                         min: temp, | ||||
|                         max: temp, | ||||
|                         count: 1, | ||||
|                         sum: temp, | ||||
|                     }; | ||||
|                     t_stations.insert(station.to_owned(), measurements); | ||||
|                 } | ||||
|             } | ||||
|             let mut stations_guard = stations_clone.lock().expect("Error while locking"); | ||||
|             for (station, measurements) in t_stations.iter() { | ||||
|                 let joined_measurements_options = stations_guard.get_mut(station.as_str()); | ||||
|                 if let Some(joined_measurements) = joined_measurements_options { | ||||
|                     if measurements.min < joined_measurements.min { | ||||
|                         joined_measurements.min = measurements.min; | ||||
|                     } else if measurements.max > joined_measurements.max { | ||||
|                         joined_measurements.max = measurements.max; | ||||
|                     } | ||||
|                     joined_measurements.count += measurements.count; | ||||
|                     joined_measurements.sum += measurements.sum; | ||||
|                 } else { | ||||
|                     stations_guard.insert(station.to_owned(), *measurements); | ||||
|                 } | ||||
|             } | ||||
|         }); | ||||
|         handles.push(handle); | ||||
|     } | ||||
|     for handle in handles { | ||||
|         handle.join().unwrap(); | ||||
|     } | ||||
|     for (station, measurments) in stations.lock().unwrap().iter() { | ||||
|         let min = measurments.min; | ||||
|         let max = measurments.max; | ||||
|         let avg = measurments.sum / measurments.count as f64; | ||||
|         println!("{station}={min}/{max}/{avg:.2}"); | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn format_nums(num: usize) -> String { | ||||
|     num.to_string() | ||||
|         .as_bytes() | ||||
|         .rchunks(3) | ||||
|         .rev() | ||||
|         .map(std::str::from_utf8) | ||||
|         .collect::<Result<Vec<&str>, _>>() | ||||
|         .unwrap() | ||||
|         .join("_") | ||||
|     // let now = Instant::now(); | ||||
|     // let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); | ||||
|     // let reader = BufReader::new(file); | ||||
|     // reader.lines().for_each(|_x| {()}); | ||||
|     // println!("Time={} μs", now.elapsed().as_micros()); | ||||
| } | ||||
		Reference in New Issue
	
	Block a user