Compare commits
	
		
			10 Commits
		
	
	
		
			6125ba4dfa
			...
			0adcf3dec5
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 0adcf3dec5 | ||
|  | b6e8b41bb1 | ||
|  | 18ea81eaaa | ||
|  | 180da512f6 | ||
|  | 14e55e8c5c | ||
|  | 9606b06313 | ||
|  | 6aea8d90cc | ||
|  | 9f257da903 | ||
|  | 3372b6b290 | ||
|  | dfec2cdbe6 | 
							
								
								
									
										2
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @@ -9,7 +9,7 @@ release.properties | ||||
| .project | ||||
| .classpath | ||||
| .settings/ | ||||
| bin/ | ||||
| ./bin/ | ||||
|  | ||||
| # IntelliJ | ||||
| .idea | ||||
|   | ||||
| @@ -533,6 +533,7 @@ A list of external resources such as blog posts and videos, discussing 1BRC and | ||||
| * [1BRC - What a Journey](https://www.esolutions.tech/1brc-what-a-journey), by Marius Staicu (blog post) | ||||
| * [One Billion Rows Challenge in Golang](https://www.bytesizego.com/blog/one-billion-row-challenge-go), by Shraddha Agrawal (blog post) | ||||
| * [The Billion Row Challenge (1BRC) - Step-by-step from 71s to 1.7s](https://questdb.io/blog/billion-row-challenge-step-by-step/) by Marko Topolnik (blog post) | ||||
| * [Entering The One Billion Row Challenge With GitHub Copilot](https://devblogs.microsoft.com/java/entering-the-one-billion-row-challenge-with-github-copilot/) by Antonio Goncalves (blog post) | ||||
|  | ||||
| ## License | ||||
|  | ||||
|   | ||||
							
								
								
									
										1419
									
								
								rust/Cargo.lock
									
									
									
										generated
									
									
									
										Normal file
									
								
							
							
						
						
									
										1419
									
								
								rust/Cargo.lock
									
									
									
										generated
									
									
									
										Normal file
									
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							
							
								
								
									
										10
									
								
								rust/Cargo.toml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										10
									
								
								rust/Cargo.toml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,10 @@ | ||||
| [package] | ||||
| name = "rust" | ||||
| version = "0.1.0" | ||||
| edition = "2021" | ||||
|  | ||||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||||
|  | ||||
| [dependencies] | ||||
| hashbrown = "0.14.3" | ||||
| polars = { version = "0.36.2", features = ["csv", "lazy", "nightly", "streaming"]} | ||||
							
								
								
									
										105
									
								
								rust/src/bin/multi_threaded.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										105
									
								
								rust/src/bin/multi_threaded.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,105 @@ | ||||
| use std::{ | ||||
|     fs::File, | ||||
|     io::{BufRead, BufReader}, | ||||
|     sync::{Arc, Mutex}, | ||||
|     thread, | ||||
| }; | ||||
|  | ||||
| use hashbrown::HashMap; | ||||
|  | ||||
| #[derive(Clone, Copy)] | ||||
| struct StationMeasurements { | ||||
|     min: f64, | ||||
|     max: f64, | ||||
|     count: usize, | ||||
|     sum: f64, | ||||
| } | ||||
|  | ||||
| fn main() { | ||||
|     let stations: Arc<Mutex<HashMap<String, StationMeasurements>>> = | ||||
|         Arc::new(Mutex::new(HashMap::new())); | ||||
|  | ||||
|     let cores: usize = std::thread::available_parallelism().unwrap().into(); | ||||
|  | ||||
|     let chunk_length = 1_000_000_000 / cores; | ||||
|     let mut handles = vec![]; | ||||
|     for i in 0..cores { | ||||
|         let file = File::open("../measurements.txt").expect("File measurements.txt not found"); | ||||
|         let reader = BufReader::new(file); | ||||
|         let line_chunk = reader.lines().skip(chunk_length * i).take(chunk_length); | ||||
|         let stations_clone = stations.clone(); | ||||
|         let handle = thread::spawn(move || { | ||||
|             let mut t_stations: HashMap<String, StationMeasurements> = HashMap::new(); | ||||
|             let mut line_num = 0; | ||||
|             for line in line_chunk { | ||||
|                 line_num += 1; | ||||
|                 let lineprint = chunk_length / 500; | ||||
|                 if line_num % lineprint == 0 { | ||||
|                     println!("Thread #{i}"); | ||||
|                     let formatted_line_num = format_nums(line_num); | ||||
|                     println!("Calculated {formatted_line_num} stations"); | ||||
|                 } | ||||
|                 let line = line.expect("could not read line"); | ||||
|                 if line_num < 10 { | ||||
|                     println!("{}", line); | ||||
|                 } | ||||
|                 let (station, temp) = line.split_once(';').expect("Error while splitting"); | ||||
|                 let temp = temp.parse().expect("Error while parsing temperature"); | ||||
|                 let measurements_option = t_stations.get_mut(station); | ||||
|                 if let Some(measurements) = measurements_option { | ||||
|                     if temp < measurements.min { | ||||
|                         measurements.min = temp; | ||||
|                     } else if temp > measurements.max { | ||||
|                         measurements.max = temp; | ||||
|                     } | ||||
|                     measurements.count += 1; | ||||
|                     measurements.sum += temp; | ||||
|                 } else { | ||||
|                     let measurements = StationMeasurements { | ||||
|                         min: temp, | ||||
|                         max: temp, | ||||
|                         count: 1, | ||||
|                         sum: temp, | ||||
|                     }; | ||||
|                     t_stations.insert(station.to_owned(), measurements); | ||||
|                 } | ||||
|             } | ||||
|             let mut stations_guard = stations_clone.lock().expect("Error while locking"); | ||||
|             for (station, measurements) in t_stations.iter() { | ||||
|                 let joined_measurements_options = stations_guard.get_mut(station.as_str()); | ||||
|                 if let Some(joined_measurements) = joined_measurements_options { | ||||
|                     if measurements.min < joined_measurements.min { | ||||
|                         joined_measurements.min = measurements.min; | ||||
|                     } else if measurements.max > joined_measurements.max { | ||||
|                         joined_measurements.max = measurements.max; | ||||
|                     } | ||||
|                     joined_measurements.count += measurements.count; | ||||
|                     joined_measurements.sum += measurements.sum; | ||||
|                 } else { | ||||
|                     stations_guard.insert(station.to_owned(), *measurements); | ||||
|                 } | ||||
|             } | ||||
|         }); | ||||
|         handles.push(handle); | ||||
|     } | ||||
|     for handle in handles { | ||||
|         handle.join().unwrap(); | ||||
|     } | ||||
|     for (station, measurments) in stations.lock().unwrap().iter() { | ||||
|         let min = measurments.min; | ||||
|         let max = measurments.max; | ||||
|         let avg = measurments.sum / measurments.count as f64; | ||||
|         println!("{station}={min}/{max}/{avg:.2}"); | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn format_nums(num: usize) -> String { | ||||
|     num.to_string() | ||||
|         .as_bytes() | ||||
|         .rchunks(3) | ||||
|         .rev() | ||||
|         .map(std::str::from_utf8) | ||||
|         .collect::<Result<Vec<&str>, _>>() | ||||
|         .unwrap() | ||||
|         .join("_") | ||||
| } | ||||
							
								
								
									
										35
									
								
								rust/src/bin/polars.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										35
									
								
								rust/src/bin/polars.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,35 @@ | ||||
| use polars::prelude::*; | ||||
| use std::time::Instant; | ||||
| use std::vec; | ||||
|  | ||||
| fn run_polars() -> Result<DataFrame, PolarsError> { | ||||
|     let now = Instant::now(); | ||||
|  | ||||
|     let f1: Field = Field::new("station", DataType::String); | ||||
|     let f2: Field = Field::new("measure", DataType::Float64); | ||||
|     let sc: Schema = Schema::from_iter(vec![f1, f2]); | ||||
|  | ||||
|     let q = LazyCsvReader::new("../measurements.txt") | ||||
|         .has_header(false) | ||||
|         .with_schema(Some(Arc::new(sc))) | ||||
|         .with_separator(b';') | ||||
|         .finish()? | ||||
|         .group_by(vec![col("station")]) | ||||
|         .agg(vec![ | ||||
|             col("measure").alias("min").min(), | ||||
|             col("measure").alias("mean").mean(), | ||||
|             col("measure").alias("max").max(), | ||||
|         ]) | ||||
|         .sort("station", Default::default()) | ||||
|         .with_streaming(true); | ||||
|  | ||||
|     let df = q.collect()?; | ||||
|  | ||||
|     println!("Time={} μs", now.elapsed().as_micros()); | ||||
|  | ||||
|     Ok(df) | ||||
| } | ||||
|  | ||||
| fn main() { | ||||
|     run_polars(); | ||||
| } | ||||
							
								
								
									
										69
									
								
								rust/src/bin/single_thread.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								rust/src/bin/single_thread.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,69 @@ | ||||
| use std::{ | ||||
|     fs::File, | ||||
|     io::{BufRead, BufReader}, | ||||
| }; | ||||
|  | ||||
| use hashbrown::HashMap; | ||||
|  | ||||
| struct StationMeasurements { | ||||
|     min: f64, | ||||
|     max: f64, | ||||
|     temps: Vec<f64>, | ||||
| } | ||||
|  | ||||
| fn main() { | ||||
|     let mut stations: HashMap<String, StationMeasurements> = HashMap::new(); | ||||
|  | ||||
|     let file = File::open("../measurements.txt").expect("File measurements.txt not found"); | ||||
|     let reader = BufReader::new(file); | ||||
|     let mut line_num = 0; | ||||
|     for line_result in reader.lines() { | ||||
|         line_num += 1; | ||||
|         if line_num % 250000 == 0 { | ||||
|             let formatted_line_num = format_nums(line_num); | ||||
|             println!("Calculated {formatted_line_num} stations"); | ||||
|         } | ||||
|         let line = line_result.expect("could not read line"); | ||||
|         let (station, temp) = line.split_once(';').unwrap(); | ||||
|         let temp = temp.parse().unwrap(); | ||||
|         let measurements_option = stations.get_mut(station); | ||||
|         if let Some(measurements) = measurements_option { | ||||
|             if temp < measurements.min { | ||||
|                 measurements.min = temp; | ||||
|             } else if temp > measurements.max { | ||||
|                 measurements.max = temp; | ||||
|             } | ||||
|             measurements.temps.push(temp); | ||||
|         } else { | ||||
|             let measurements = StationMeasurements { | ||||
|                 min: temp, | ||||
|                 max: temp, | ||||
|                 temps: vec![temp], | ||||
|             }; | ||||
|             stations.insert(station.to_owned(), measurements); | ||||
|         } | ||||
|     } | ||||
|     for (station, measurments) in stations { | ||||
|         let min = measurments.min; | ||||
|         let max = measurments.max; | ||||
|         let avg = avg(measurments.temps); | ||||
|         println!("{station}={min}/{max}/{avg}"); | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn avg(temps: Vec<f64>) -> f64 { | ||||
|     let num_temps = temps.len() as f64; | ||||
|     let sum_temps: f64 = temps.iter().sum(); | ||||
|     sum_temps / num_temps | ||||
| } | ||||
|  | ||||
| fn format_nums(num: usize) -> String { | ||||
|     num.to_string() | ||||
|         .as_bytes() | ||||
|         .rchunks(3) | ||||
|         .rev() | ||||
|         .map(std::str::from_utf8) | ||||
|         .collect::<Result<Vec<&str>, _>>() | ||||
|         .unwrap() | ||||
|         .join("_") | ||||
| } | ||||
							
								
								
									
										106
									
								
								rust/src/main.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										106
									
								
								rust/src/main.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,106 @@ | ||||
| use std::{ | ||||
|     fs::File, | ||||
|     io::{BufRead, BufReader}, | ||||
|     sync::{Arc, Mutex}, | ||||
|     thread, | ||||
| }; | ||||
|  | ||||
| use hashbrown::HashMap; | ||||
|  | ||||
| #[derive(Clone, Copy)] | ||||
| struct StationMeasurements { | ||||
|     min: f64, | ||||
|     max: f64, | ||||
|     count: usize, | ||||
|     sum: f64, | ||||
| } | ||||
|  | ||||
| fn main() { | ||||
|     let stations: Arc<Mutex<HashMap<String, StationMeasurements>>> = | ||||
|         Arc::new(Mutex::new(HashMap::new())); | ||||
|  | ||||
|     //let cores: usize = std::thread::available_parallelism().unwrap().into(); | ||||
|     let cores: usize = 4; | ||||
|  | ||||
|     let chunk_length = 1_000_000_000 / cores; | ||||
|     let mut handles = vec![]; | ||||
|     for i in 0..cores { | ||||
|         let file = File::open("../measurements.txt").expect("File measurements.txt not found"); | ||||
|         let reader = BufReader::new(file); | ||||
|         let line_chunk = reader.lines().skip(chunk_length * i).take(chunk_length); | ||||
|         let stations_clone = stations.clone(); | ||||
|         let handle = thread::spawn(move || { | ||||
|             let mut t_stations: HashMap<String, StationMeasurements> = HashMap::new(); | ||||
|             let mut line_num = 0; | ||||
|             for line in line_chunk { | ||||
|                 line_num += 1; | ||||
|                 let lineprint = chunk_length / 500; | ||||
|                 if line_num % lineprint == 0 { | ||||
|                     println!("Thread #{i}"); | ||||
|                     let formatted_line_num = format_nums(line_num); | ||||
|                     println!("Calculated {formatted_line_num} stations"); | ||||
|                 } | ||||
|                 let line = line.expect("could not read line"); | ||||
|                 if line_num < 10 { | ||||
|                     println!("{}", line); | ||||
|                 } | ||||
|                 let (station, temp) = line.split_once(';').expect("Error while splitting"); | ||||
|                 let temp = temp.parse().expect("Error while parsing temperature"); | ||||
|                 let measurements_option = t_stations.get_mut(station); | ||||
|                 if let Some(measurements) = measurements_option { | ||||
|                     if temp < measurements.min { | ||||
|                         measurements.min = temp; | ||||
|                     } else if temp > measurements.max { | ||||
|                         measurements.max = temp; | ||||
|                     } | ||||
|                     measurements.count += 1; | ||||
|                     measurements.sum += temp; | ||||
|                 } else { | ||||
|                     let measurements = StationMeasurements { | ||||
|                         min: temp, | ||||
|                         max: temp, | ||||
|                         count: 1, | ||||
|                         sum: temp, | ||||
|                     }; | ||||
|                     t_stations.insert(station.to_owned(), measurements); | ||||
|                 } | ||||
|             } | ||||
|             let mut stations_guard = stations_clone.lock().expect("Error while locking"); | ||||
|             for (station, measurements) in t_stations.iter() { | ||||
|                 let joined_measurements_options = stations_guard.get_mut(station.as_str()); | ||||
|                 if let Some(joined_measurements) = joined_measurements_options { | ||||
|                     if measurements.min < joined_measurements.min { | ||||
|                         joined_measurements.min = measurements.min; | ||||
|                     } else if measurements.max > joined_measurements.max { | ||||
|                         joined_measurements.max = measurements.max; | ||||
|                     } | ||||
|                     joined_measurements.count += measurements.count; | ||||
|                     joined_measurements.sum += measurements.sum; | ||||
|                 } else { | ||||
|                     stations_guard.insert(station.to_owned(), *measurements); | ||||
|                 } | ||||
|             } | ||||
|         }); | ||||
|         handles.push(handle); | ||||
|     } | ||||
|     for handle in handles { | ||||
|         handle.join().unwrap(); | ||||
|     } | ||||
|     for (station, measurments) in stations.lock().unwrap().iter() { | ||||
|         let min = measurments.min; | ||||
|         let max = measurments.max; | ||||
|         let avg = measurments.sum / measurments.count as f64; | ||||
|         println!("{station}={min}/{max}/{avg:.2}"); | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn format_nums(num: usize) -> String { | ||||
|     num.to_string() | ||||
|         .as_bytes() | ||||
|         .rchunks(3) | ||||
|         .rev() | ||||
|         .map(std::str::from_utf8) | ||||
|         .collect::<Result<Vec<&str>, _>>() | ||||
|         .unwrap() | ||||
|         .join("_") | ||||
| } | ||||
| @@ -107,21 +107,23 @@ def build_test_data(weather_station_names, num_rows_to_create): | ||||
|     hottest_temp = 99.9 | ||||
|     station_names_10k_max = random.choices(weather_station_names, k=10_000) | ||||
|     batch_size = 10000 # instead of writing line by line to file, process a batch of stations and put it to disk | ||||
|     progress_step = max(1, (num_rows_to_create // batch_size) // 100) | ||||
|     chunks = num_rows_to_create // batch_size | ||||
|     print('Building test data...') | ||||
|  | ||||
|     try: | ||||
|         with open("../../../data/measurements.txt", 'w') as file: | ||||
|             for s in range(0,num_rows_to_create // batch_size): | ||||
|             progress = 0 | ||||
|             for chunk in range(chunks): | ||||
|                  | ||||
|                 batch = random.choices(station_names_10k_max, k=batch_size) | ||||
|                 prepped_deviated_batch = '\n'.join([f"{station};{random.uniform(coldest_temp, hottest_temp):.1f}" for station in batch]) # :.1f should quicker than round on a large scale, because round utilizes mathematical operation | ||||
|                 file.write(prepped_deviated_batch + '\n') | ||||
|                  | ||||
|                 # Update progress bar every 1% | ||||
|                 if s % progress_step == 0 or s == num_rows_to_create - 1: | ||||
|                     sys.stdout.write('\r') | ||||
|                     sys.stdout.write("[%-50s] %d%%" % ('=' * int((s + 1) / num_rows_to_create * 50), (s + 1) / num_rows_to_create * 100)) | ||||
|                 if (chunk + 1) * 100 // chunks != progress: | ||||
|                     progress = (chunk + 1) * 100 // chunks | ||||
|                     bars = '=' * (progress // 2) | ||||
|                     sys.stdout.write(f"\r[{bars:<50}] {progress}%") | ||||
|                     sys.stdout.flush() | ||||
|         sys.stdout.write('\n') | ||||
|     except Exception as e: | ||||
|   | ||||
		Reference in New Issue
	
	Block a user