Compare commits
	
		
			8 Commits
		
	
	
		
			main
			...
			0adcf3dec5
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 0adcf3dec5 | ||
|  | b6e8b41bb1 | ||
|  | 18ea81eaaa | ||
|  | 180da512f6 | ||
|  | 14e55e8c5c | ||
|  | 9606b06313 | ||
|  | 6aea8d90cc | ||
|  | 9f257da903 | 
							
								
								
									
										1419
									
								
								rust/Cargo.lock
									
									
									
										generated
									
									
									
										Normal file
									
								
							
							
						
						
									
										1419
									
								
								rust/Cargo.lock
									
									
									
										generated
									
									
									
										Normal file
									
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							
							
								
								
									
										10
									
								
								rust/Cargo.toml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										10
									
								
								rust/Cargo.toml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,10 @@ | |||||||
|  | [package] | ||||||
|  | name = "rust" | ||||||
|  | version = "0.1.0" | ||||||
|  | edition = "2021" | ||||||
|  |  | ||||||
|  | # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||||||
|  |  | ||||||
|  | [dependencies] | ||||||
|  | hashbrown = "0.14.3" | ||||||
|  | polars = { version = "0.36.2", features = ["csv", "lazy", "nightly", "streaming"]} | ||||||
							
								
								
									
										105
									
								
								rust/src/bin/multi_threaded.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										105
									
								
								rust/src/bin/multi_threaded.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,105 @@ | |||||||
|  | use std::{ | ||||||
|  |     fs::File, | ||||||
|  |     io::{BufRead, BufReader}, | ||||||
|  |     sync::{Arc, Mutex}, | ||||||
|  |     thread, | ||||||
|  | }; | ||||||
|  |  | ||||||
|  | use hashbrown::HashMap; | ||||||
|  |  | ||||||
|  | #[derive(Clone, Copy)] | ||||||
|  | struct StationMeasurements { | ||||||
|  |     min: f64, | ||||||
|  |     max: f64, | ||||||
|  |     count: usize, | ||||||
|  |     sum: f64, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn main() { | ||||||
|  |     let stations: Arc<Mutex<HashMap<String, StationMeasurements>>> = | ||||||
|  |         Arc::new(Mutex::new(HashMap::new())); | ||||||
|  |  | ||||||
|  |     let cores: usize = std::thread::available_parallelism().unwrap().into(); | ||||||
|  |  | ||||||
|  |     let chunk_length = 1_000_000_000 / cores; | ||||||
|  |     let mut handles = vec![]; | ||||||
|  |     for i in 0..cores { | ||||||
|  |         let file = File::open("../measurements.txt").expect("File measurements.txt not found"); | ||||||
|  |         let reader = BufReader::new(file); | ||||||
|  |         let line_chunk = reader.lines().skip(chunk_length * i).take(chunk_length); | ||||||
|  |         let stations_clone = stations.clone(); | ||||||
|  |         let handle = thread::spawn(move || { | ||||||
|  |             let mut t_stations: HashMap<String, StationMeasurements> = HashMap::new(); | ||||||
|  |             let mut line_num = 0; | ||||||
|  |             for line in line_chunk { | ||||||
|  |                 line_num += 1; | ||||||
|  |                 let lineprint = chunk_length / 500; | ||||||
|  |                 if line_num % lineprint == 0 { | ||||||
|  |                     println!("Thread #{i}"); | ||||||
|  |                     let formatted_line_num = format_nums(line_num); | ||||||
|  |                     println!("Calculated {formatted_line_num} stations"); | ||||||
|  |                 } | ||||||
|  |                 let line = line.expect("could not read line"); | ||||||
|  |                 if line_num < 10 { | ||||||
|  |                     println!("{}", line); | ||||||
|  |                 } | ||||||
|  |                 let (station, temp) = line.split_once(';').expect("Error while splitting"); | ||||||
|  |                 let temp = temp.parse().expect("Error while parsing temperature"); | ||||||
|  |                 let measurements_option = t_stations.get_mut(station); | ||||||
|  |                 if let Some(measurements) = measurements_option { | ||||||
|  |                     if temp < measurements.min { | ||||||
|  |                         measurements.min = temp; | ||||||
|  |                     } else if temp > measurements.max { | ||||||
|  |                         measurements.max = temp; | ||||||
|  |                     } | ||||||
|  |                     measurements.count += 1; | ||||||
|  |                     measurements.sum += temp; | ||||||
|  |                 } else { | ||||||
|  |                     let measurements = StationMeasurements { | ||||||
|  |                         min: temp, | ||||||
|  |                         max: temp, | ||||||
|  |                         count: 1, | ||||||
|  |                         sum: temp, | ||||||
|  |                     }; | ||||||
|  |                     t_stations.insert(station.to_owned(), measurements); | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |             let mut stations_guard = stations_clone.lock().expect("Error while locking"); | ||||||
|  |             for (station, measurements) in t_stations.iter() { | ||||||
|  |                 let joined_measurements_options = stations_guard.get_mut(station.as_str()); | ||||||
|  |                 if let Some(joined_measurements) = joined_measurements_options { | ||||||
|  |                     if measurements.min < joined_measurements.min { | ||||||
|  |                         joined_measurements.min = measurements.min; | ||||||
|  |                     } else if measurements.max > joined_measurements.max { | ||||||
|  |                         joined_measurements.max = measurements.max; | ||||||
|  |                     } | ||||||
|  |                     joined_measurements.count += measurements.count; | ||||||
|  |                     joined_measurements.sum += measurements.sum; | ||||||
|  |                 } else { | ||||||
|  |                     stations_guard.insert(station.to_owned(), *measurements); | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         }); | ||||||
|  |         handles.push(handle); | ||||||
|  |     } | ||||||
|  |     for handle in handles { | ||||||
|  |         handle.join().unwrap(); | ||||||
|  |     } | ||||||
|  |     for (station, measurments) in stations.lock().unwrap().iter() { | ||||||
|  |         let min = measurments.min; | ||||||
|  |         let max = measurments.max; | ||||||
|  |         let avg = measurments.sum / measurments.count as f64; | ||||||
|  |         println!("{station}={min}/{max}/{avg:.2}"); | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn format_nums(num: usize) -> String { | ||||||
|  |     num.to_string() | ||||||
|  |         .as_bytes() | ||||||
|  |         .rchunks(3) | ||||||
|  |         .rev() | ||||||
|  |         .map(std::str::from_utf8) | ||||||
|  |         .collect::<Result<Vec<&str>, _>>() | ||||||
|  |         .unwrap() | ||||||
|  |         .join("_") | ||||||
|  | } | ||||||
							
								
								
									
										35
									
								
								rust/src/bin/polars.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										35
									
								
								rust/src/bin/polars.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,35 @@ | |||||||
|  | use polars::prelude::*; | ||||||
|  | use std::time::Instant; | ||||||
|  | use std::vec; | ||||||
|  |  | ||||||
|  | fn run_polars() -> Result<DataFrame, PolarsError> { | ||||||
|  |     let now = Instant::now(); | ||||||
|  |  | ||||||
|  |     let f1: Field = Field::new("station", DataType::String); | ||||||
|  |     let f2: Field = Field::new("measure", DataType::Float64); | ||||||
|  |     let sc: Schema = Schema::from_iter(vec![f1, f2]); | ||||||
|  |  | ||||||
|  |     let q = LazyCsvReader::new("../measurements.txt") | ||||||
|  |         .has_header(false) | ||||||
|  |         .with_schema(Some(Arc::new(sc))) | ||||||
|  |         .with_separator(b';') | ||||||
|  |         .finish()? | ||||||
|  |         .group_by(vec![col("station")]) | ||||||
|  |         .agg(vec![ | ||||||
|  |             col("measure").alias("min").min(), | ||||||
|  |             col("measure").alias("mean").mean(), | ||||||
|  |             col("measure").alias("max").max(), | ||||||
|  |         ]) | ||||||
|  |         .sort("station", Default::default()) | ||||||
|  |         .with_streaming(true); | ||||||
|  |  | ||||||
|  |     let df = q.collect()?; | ||||||
|  |  | ||||||
|  |     println!("Time={} μs", now.elapsed().as_micros()); | ||||||
|  |  | ||||||
|  |     Ok(df) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn main() { | ||||||
|  |     run_polars(); | ||||||
|  | } | ||||||
							
								
								
									
										69
									
								
								rust/src/bin/single_thread.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								rust/src/bin/single_thread.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,69 @@ | |||||||
|  | use std::{ | ||||||
|  |     fs::File, | ||||||
|  |     io::{BufRead, BufReader}, | ||||||
|  | }; | ||||||
|  |  | ||||||
|  | use hashbrown::HashMap; | ||||||
|  |  | ||||||
|  | struct StationMeasurements { | ||||||
|  |     min: f64, | ||||||
|  |     max: f64, | ||||||
|  |     temps: Vec<f64>, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn main() { | ||||||
|  |     let mut stations: HashMap<String, StationMeasurements> = HashMap::new(); | ||||||
|  |  | ||||||
|  |     let file = File::open("../measurements.txt").expect("File measurements.txt not found"); | ||||||
|  |     let reader = BufReader::new(file); | ||||||
|  |     let mut line_num = 0; | ||||||
|  |     for line_result in reader.lines() { | ||||||
|  |         line_num += 1; | ||||||
|  |         if line_num % 250000 == 0 { | ||||||
|  |             let formatted_line_num = format_nums(line_num); | ||||||
|  |             println!("Calculated {formatted_line_num} stations"); | ||||||
|  |         } | ||||||
|  |         let line = line_result.expect("could not read line"); | ||||||
|  |         let (station, temp) = line.split_once(';').unwrap(); | ||||||
|  |         let temp = temp.parse().unwrap(); | ||||||
|  |         let measurements_option = stations.get_mut(station); | ||||||
|  |         if let Some(measurements) = measurements_option { | ||||||
|  |             if temp < measurements.min { | ||||||
|  |                 measurements.min = temp; | ||||||
|  |             } else if temp > measurements.max { | ||||||
|  |                 measurements.max = temp; | ||||||
|  |             } | ||||||
|  |             measurements.temps.push(temp); | ||||||
|  |         } else { | ||||||
|  |             let measurements = StationMeasurements { | ||||||
|  |                 min: temp, | ||||||
|  |                 max: temp, | ||||||
|  |                 temps: vec![temp], | ||||||
|  |             }; | ||||||
|  |             stations.insert(station.to_owned(), measurements); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |     for (station, measurments) in stations { | ||||||
|  |         let min = measurments.min; | ||||||
|  |         let max = measurments.max; | ||||||
|  |         let avg = avg(measurments.temps); | ||||||
|  |         println!("{station}={min}/{max}/{avg}"); | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn avg(temps: Vec<f64>) -> f64 { | ||||||
|  |     let num_temps = temps.len() as f64; | ||||||
|  |     let sum_temps: f64 = temps.iter().sum(); | ||||||
|  |     sum_temps / num_temps | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn format_nums(num: usize) -> String { | ||||||
|  |     num.to_string() | ||||||
|  |         .as_bytes() | ||||||
|  |         .rchunks(3) | ||||||
|  |         .rev() | ||||||
|  |         .map(std::str::from_utf8) | ||||||
|  |         .collect::<Result<Vec<&str>, _>>() | ||||||
|  |         .unwrap() | ||||||
|  |         .join("_") | ||||||
|  | } | ||||||
							
								
								
									
										106
									
								
								rust/src/main.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										106
									
								
								rust/src/main.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,106 @@ | |||||||
|  | use std::{ | ||||||
|  |     fs::File, | ||||||
|  |     io::{BufRead, BufReader}, | ||||||
|  |     sync::{Arc, Mutex}, | ||||||
|  |     thread, | ||||||
|  | }; | ||||||
|  |  | ||||||
|  | use hashbrown::HashMap; | ||||||
|  |  | ||||||
|  | #[derive(Clone, Copy)] | ||||||
|  | struct StationMeasurements { | ||||||
|  |     min: f64, | ||||||
|  |     max: f64, | ||||||
|  |     count: usize, | ||||||
|  |     sum: f64, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn main() { | ||||||
|  |     let stations: Arc<Mutex<HashMap<String, StationMeasurements>>> = | ||||||
|  |         Arc::new(Mutex::new(HashMap::new())); | ||||||
|  |  | ||||||
|  |     //let cores: usize = std::thread::available_parallelism().unwrap().into(); | ||||||
|  |     let cores: usize = 4; | ||||||
|  |  | ||||||
|  |     let chunk_length = 1_000_000_000 / cores; | ||||||
|  |     let mut handles = vec![]; | ||||||
|  |     for i in 0..cores { | ||||||
|  |         let file = File::open("../measurements.txt").expect("File measurements.txt not found"); | ||||||
|  |         let reader = BufReader::new(file); | ||||||
|  |         let line_chunk = reader.lines().skip(chunk_length * i).take(chunk_length); | ||||||
|  |         let stations_clone = stations.clone(); | ||||||
|  |         let handle = thread::spawn(move || { | ||||||
|  |             let mut t_stations: HashMap<String, StationMeasurements> = HashMap::new(); | ||||||
|  |             let mut line_num = 0; | ||||||
|  |             for line in line_chunk { | ||||||
|  |                 line_num += 1; | ||||||
|  |                 let lineprint = chunk_length / 500; | ||||||
|  |                 if line_num % lineprint == 0 { | ||||||
|  |                     println!("Thread #{i}"); | ||||||
|  |                     let formatted_line_num = format_nums(line_num); | ||||||
|  |                     println!("Calculated {formatted_line_num} stations"); | ||||||
|  |                 } | ||||||
|  |                 let line = line.expect("could not read line"); | ||||||
|  |                 if line_num < 10 { | ||||||
|  |                     println!("{}", line); | ||||||
|  |                 } | ||||||
|  |                 let (station, temp) = line.split_once(';').expect("Error while splitting"); | ||||||
|  |                 let temp = temp.parse().expect("Error while parsing temperature"); | ||||||
|  |                 let measurements_option = t_stations.get_mut(station); | ||||||
|  |                 if let Some(measurements) = measurements_option { | ||||||
|  |                     if temp < measurements.min { | ||||||
|  |                         measurements.min = temp; | ||||||
|  |                     } else if temp > measurements.max { | ||||||
|  |                         measurements.max = temp; | ||||||
|  |                     } | ||||||
|  |                     measurements.count += 1; | ||||||
|  |                     measurements.sum += temp; | ||||||
|  |                 } else { | ||||||
|  |                     let measurements = StationMeasurements { | ||||||
|  |                         min: temp, | ||||||
|  |                         max: temp, | ||||||
|  |                         count: 1, | ||||||
|  |                         sum: temp, | ||||||
|  |                     }; | ||||||
|  |                     t_stations.insert(station.to_owned(), measurements); | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |             let mut stations_guard = stations_clone.lock().expect("Error while locking"); | ||||||
|  |             for (station, measurements) in t_stations.iter() { | ||||||
|  |                 let joined_measurements_options = stations_guard.get_mut(station.as_str()); | ||||||
|  |                 if let Some(joined_measurements) = joined_measurements_options { | ||||||
|  |                     if measurements.min < joined_measurements.min { | ||||||
|  |                         joined_measurements.min = measurements.min; | ||||||
|  |                     } else if measurements.max > joined_measurements.max { | ||||||
|  |                         joined_measurements.max = measurements.max; | ||||||
|  |                     } | ||||||
|  |                     joined_measurements.count += measurements.count; | ||||||
|  |                     joined_measurements.sum += measurements.sum; | ||||||
|  |                 } else { | ||||||
|  |                     stations_guard.insert(station.to_owned(), *measurements); | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         }); | ||||||
|  |         handles.push(handle); | ||||||
|  |     } | ||||||
|  |     for handle in handles { | ||||||
|  |         handle.join().unwrap(); | ||||||
|  |     } | ||||||
|  |     for (station, measurments) in stations.lock().unwrap().iter() { | ||||||
|  |         let min = measurments.min; | ||||||
|  |         let max = measurments.max; | ||||||
|  |         let avg = measurments.sum / measurments.count as f64; | ||||||
|  |         println!("{station}={min}/{max}/{avg:.2}"); | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn format_nums(num: usize) -> String { | ||||||
|  |     num.to_string() | ||||||
|  |         .as_bytes() | ||||||
|  |         .rchunks(3) | ||||||
|  |         .rev() | ||||||
|  |         .map(std::str::from_utf8) | ||||||
|  |         .collect::<Result<Vec<&str>, _>>() | ||||||
|  |         .unwrap() | ||||||
|  |         .join("_") | ||||||
|  | } | ||||||
| @@ -1,91 +0,0 @@ | |||||||
| using Mmap |  | ||||||
|  |  | ||||||
| mutable struct StationMeasurements |  | ||||||
|     min::Float64 |  | ||||||
|     max::Float64 |  | ||||||
|     sum::Float64 |  | ||||||
|     count::Int64 |  | ||||||
| end |  | ||||||
|  |  | ||||||
| function update(sm, temp::Float64) |  | ||||||
|     if temp < min |  | ||||||
|         sm.min = temp |  | ||||||
|     elseif temp > max |  | ||||||
|         sm.max = temp |  | ||||||
|     end |  | ||||||
|     sm.sum += temp |  | ||||||
|     sm.count += 1 |  | ||||||
| end |  | ||||||
|  |  | ||||||
| function print_measurements(stations::Dict{String,StationMeasurements}) |  | ||||||
|     sorted_keys = sort(collect(keys(stations))) |  | ||||||
|     print("{") |  | ||||||
|     sm_vec = [] |  | ||||||
|     for city in sorted_keys |  | ||||||
|         sm = stations[city] |  | ||||||
|         min = round(sm.min; digits=1) |  | ||||||
|         max = round(sm.max; digits=1) |  | ||||||
|         avg = round((sm.sum / sm.count); digits=1) |  | ||||||
|         push!(sm_vec, "$city=$min/$avg/$max") |  | ||||||
|     end |  | ||||||
|     joined = join(sm_vec, ", ") |  | ||||||
|     print(joined) |  | ||||||
|     print("}") |  | ||||||
| end |  | ||||||
|  |  | ||||||
| function merge(stations_vec::Vector{Dict{String,StationMeasurements}}) |  | ||||||
|     merged = Dict{String,StationMeasurements}() |  | ||||||
|     for stations in stations_vec |  | ||||||
|         for (city, sm) in stations |  | ||||||
|             if haskey(merged, city) |  | ||||||
|                 merged_sm = merged[city] |  | ||||||
|                 sm.min = ifelse(merged_sm.min < sm.min, merged_sm.min, sm.min) |  | ||||||
|                 sm.max = ifelse(merged_sm.max > sm.max, merged_sm.max, sm.max) |  | ||||||
|                 sm.sum += merged_sm.sum |  | ||||||
|                 sm.count += merged_sm.count |  | ||||||
|             else |  | ||||||
|                 merged[city] = sm |  | ||||||
|             end |  | ||||||
|         end |  | ||||||
|     end |  | ||||||
|     merged |  | ||||||
| end |  | ||||||
|  |  | ||||||
| function process_chunk(data, chunk) |  | ||||||
|     stations = Dict{String,StationMeasurements}() |  | ||||||
|     for i in eachindex(chunk) |  | ||||||
|         if i == 1 |  | ||||||
|             continue |  | ||||||
|         end |  | ||||||
|         line = String(data[chunk[i-1]:chunk[i]-1]) |  | ||||||
|         station, temp_str = rsplit(line, ";") |  | ||||||
|         temp = parse(Float32, temp_str) |  | ||||||
|         if haskey(stations, station) |  | ||||||
|             sm = stations[station] |  | ||||||
|             sm.min = ifelse(temp < sm.min, temp, sm.min) |  | ||||||
|             sm.max = ifelse(temp > sm.max, temp, sm.max) |  | ||||||
|             sm.sum += temp |  | ||||||
|             sm.count += 1 |  | ||||||
|         else |  | ||||||
|             stations[station] = StationMeasurements(temp, temp, temp, 1) |  | ||||||
|         end |  | ||||||
|     end |  | ||||||
|     stations |  | ||||||
| end |  | ||||||
|  |  | ||||||
| function main() |  | ||||||
|     open("../../../measurements.txt", "r") do f |  | ||||||
|         sz = Base.stat(f).size |  | ||||||
|         data = mmap(f, Vector{UInt8}, sz) |  | ||||||
|         idxs = findall(isequal(0x0a), data) |  | ||||||
|         idxs_chunks = collect(Iterators.partition(idxs, length(idxs) ÷ Threads.nthreads())) |  | ||||||
|         tasks = map(idxs_chunks) do chunk |  | ||||||
|             Threads.@spawn process_chunk(data, chunk) |  | ||||||
|         end |  | ||||||
|         stations_vec = fetch.(tasks) |  | ||||||
|         stations = merge(stations_vec) |  | ||||||
|         print_measurements(stations) |  | ||||||
|     end |  | ||||||
| end |  | ||||||
|  |  | ||||||
| main() |  | ||||||
| @@ -1,57 +0,0 @@ | |||||||
| local file_path = "../../../measurements.txt" |  | ||||||
|  |  | ||||||
| local file = io.open(file_path, "rb") |  | ||||||
| if not file then |  | ||||||
| 	print("Unable to open file") |  | ||||||
| 	return |  | ||||||
| end |  | ||||||
|  |  | ||||||
| local function split_semi(inputstr) |  | ||||||
| 	local t = {} |  | ||||||
| 	for str in string.gmatch(inputstr, "([^;]+)") do |  | ||||||
| 		table.insert(t, str) |  | ||||||
| 	end |  | ||||||
| 	return t |  | ||||||
| end |  | ||||||
|  |  | ||||||
| local stations = {} |  | ||||||
|  |  | ||||||
| local iterations = 1 |  | ||||||
|  |  | ||||||
| for line in file:lines("*l") do |  | ||||||
| 	if iterations % 1000000 == 0 then |  | ||||||
| 		io.write("\x1b[J\x1b[H") |  | ||||||
| 		io.write(iterations / 10000000) |  | ||||||
| 		io.write("\n") |  | ||||||
| 	end |  | ||||||
| 	local split_line = split_semi(line) |  | ||||||
| 	local station = split_line[1] |  | ||||||
| 	local temp_str = string.gsub(split_line[2], "[ ]", "") |  | ||||||
| 	local temp = tonumber(temp_str) |  | ||||||
| 	if stations[station] == nil then |  | ||||||
| 		stations[station] = { min = temp, max = temp, sum = temp, count = 1 } |  | ||||||
| 	else |  | ||||||
| 		if temp < stations[station].min then |  | ||||||
| 			stations[station].min = temp |  | ||||||
| 		elseif temp > stations[station].max then |  | ||||||
| 			stations[station].max = temp |  | ||||||
| 		end |  | ||||||
| 		stations[station].sum = stations[station].sum + temp |  | ||||||
| 		stations[station].count = stations[station].count + 1 |  | ||||||
| 	end |  | ||||||
| 	iterations = iterations + 1 |  | ||||||
| end |  | ||||||
|  |  | ||||||
| local keys = {} |  | ||||||
| for k in pairs(stations) do table.insert(keys, k) end |  | ||||||
| table.sort(keys) |  | ||||||
|  |  | ||||||
| local fstations = {} |  | ||||||
| io.write("{") |  | ||||||
| for _, station in ipairs(keys) do |  | ||||||
| 	local avg = ((stations[station].sum / 10) / stations[station].count) |  | ||||||
| 	local res_str = string.format("%s=%.1f/%.1f/%.1f", station, stations[station].min, avg, stations[station].max) |  | ||||||
| 	table.insert(fstations, res_str) |  | ||||||
| end |  | ||||||
| io.write(table.concat(fstations, ",")) |  | ||||||
| print("}") |  | ||||||
							
								
								
									
										1536
									
								
								src/main/rust/Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										1536
									
								
								src/main/rust/Cargo.lock
									
									
									
										generated
									
									
									
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							| @@ -1,62 +0,0 @@ | |||||||
| [package] |  | ||||||
| name = "onebrc" |  | ||||||
| version = "0.1.0" |  | ||||||
| edition = "2021" |  | ||||||
|  |  | ||||||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html |  | ||||||
|  |  | ||||||
| [dependencies] |  | ||||||
| bstr = "1.10.0" |  | ||||||
| fast-float = "0.2.0" |  | ||||||
| memchr = "2.7.4" |  | ||||||
| memmap2 = "0.9.4" |  | ||||||
| rayon = "1.10.0" |  | ||||||
| rustc-hash = "2.0.0" |  | ||||||
| libc = "0.2.158" |  | ||||||
| smol = "2.0.1" |  | ||||||
| easy-parallel = "3.3.1" |  | ||||||
| clap = { version = "4.5.13", features = ["derive"] } |  | ||||||
| colored = "2.1.0" |  | ||||||
| ptr_hash = { git = "https://github.com/ragnargrootkoerkamp/ptrhash", default-features = false } |  | ||||||
|  |  | ||||||
| [dev-dependencies] |  | ||||||
| criterion = { version = "0.5.1", features = ["html_reports"] } |  | ||||||
|  |  | ||||||
| [features] |  | ||||||
| json = [] |  | ||||||
| unsafe = [] |  | ||||||
| no_pdep = [] |  | ||||||
|  |  | ||||||
| [[bench]] |  | ||||||
| name = "reference_impl" |  | ||||||
| harness = false |  | ||||||
|  |  | ||||||
| [[bench]] |  | ||||||
| name = "single_thread" |  | ||||||
| harness = false |  | ||||||
|  |  | ||||||
| [[bench]] |  | ||||||
| name = "multi_threaded" |  | ||||||
| harness = false |  | ||||||
|  |  | ||||||
| [[bench]] |  | ||||||
| name = "multi_threaded_smol" |  | ||||||
| harness = false |  | ||||||
|  |  | ||||||
| [[bench]] |  | ||||||
| name = "flare_flo" |  | ||||||
| harness = false |  | ||||||
|  |  | ||||||
| [[bench]] |  | ||||||
| name = "phcs" |  | ||||||
| harness = false |  | ||||||
|  |  | ||||||
| [profile.release] |  | ||||||
| lto = "fat" |  | ||||||
| strip = "symbols" |  | ||||||
| panic = "abort" |  | ||||||
|  |  | ||||||
| [profile.flamegraph] |  | ||||||
| inherits = "release" |  | ||||||
| debug = true |  | ||||||
| strip = "none" |  | ||||||
| @@ -1,9 +0,0 @@ | |||||||
| use criterion::{criterion_group, criterion_main, Criterion}; |  | ||||||
| use onebrc::implementations::flare_flo::run; |  | ||||||
|  |  | ||||||
| pub fn criterion_benchmark(c: &mut Criterion) { |  | ||||||
|     c.bench_function("flareflo", |b| b.iter(run)); |  | ||||||
| } |  | ||||||
|  |  | ||||||
| criterion_group!(benches, criterion_benchmark); |  | ||||||
| criterion_main!(benches); |  | ||||||
| @@ -1,9 +0,0 @@ | |||||||
| use criterion::{criterion_group, criterion_main, Criterion}; |  | ||||||
| use onebrc::implementations::libraries::run; |  | ||||||
|  |  | ||||||
| pub fn criterion_benchmark(c: &mut Criterion) { |  | ||||||
|     c.bench_function("libraries", |b| b.iter(run)); |  | ||||||
| } |  | ||||||
|  |  | ||||||
| criterion_group!(benches, criterion_benchmark); |  | ||||||
| criterion_main!(benches); |  | ||||||
| @@ -1,9 +0,0 @@ | |||||||
| use criterion::{criterion_group, criterion_main, Criterion}; |  | ||||||
| use onebrc::implementations::multi_threaded::run; |  | ||||||
|  |  | ||||||
| pub fn criterion_benchmark(c: &mut Criterion) { |  | ||||||
|     c.bench_function("multithreaded", |b| b.iter(run)); |  | ||||||
| } |  | ||||||
|  |  | ||||||
| criterion_group!(benches, criterion_benchmark); |  | ||||||
| criterion_main!(benches); |  | ||||||
| @@ -1,9 +0,0 @@ | |||||||
| use criterion::{criterion_group, criterion_main, Criterion}; |  | ||||||
| use onebrc::implementations::multi_threaded_smol::run; |  | ||||||
|  |  | ||||||
| pub fn criterion_benchmark(c: &mut Criterion) { |  | ||||||
|     c.bench_function("multithreadedsmol", |b| b.iter(run)); |  | ||||||
| } |  | ||||||
|  |  | ||||||
| criterion_group!(benches, criterion_benchmark); |  | ||||||
| criterion_main!(benches); |  | ||||||
| @@ -1,9 +0,0 @@ | |||||||
| use criterion::{criterion_group, criterion_main, Criterion}; |  | ||||||
| use onebrc::implementations::phcs::run; |  | ||||||
|  |  | ||||||
| pub fn criterion_benchmark(c: &mut Criterion) { |  | ||||||
|     c.bench_function("phcs", |b| b.iter(run)); |  | ||||||
| } |  | ||||||
|  |  | ||||||
| criterion_group!(benches, criterion_benchmark); |  | ||||||
| criterion_main!(benches); |  | ||||||
| @@ -1,9 +0,0 @@ | |||||||
| use criterion::{criterion_group, criterion_main, Criterion}; |  | ||||||
| use onebrc::implementations::reference_impl::run; |  | ||||||
|  |  | ||||||
| pub fn criterion_benchmark(c: &mut Criterion) { |  | ||||||
|     c.bench_function("reference", |b| b.iter(run)); |  | ||||||
| } |  | ||||||
|  |  | ||||||
| criterion_group!(benches, criterion_benchmark); |  | ||||||
| criterion_main!(benches); |  | ||||||
| @@ -1,9 +0,0 @@ | |||||||
| use criterion::{criterion_group, criterion_main, Criterion}; |  | ||||||
| use onebrc::implementations::single_thread::run; |  | ||||||
|  |  | ||||||
| pub fn criterion_benchmark(c: &mut Criterion) { |  | ||||||
|     c.bench_function("singlethread", |b| b.iter(run)); |  | ||||||
| } |  | ||||||
|  |  | ||||||
| criterion_group!(benches, criterion_benchmark); |  | ||||||
| criterion_main!(benches); |  | ||||||
| @@ -1,5 +0,0 @@ | |||||||
| use onebrc::implementations::flare_flo::run; |  | ||||||
|  |  | ||||||
| fn main() { |  | ||||||
|     run(); |  | ||||||
| } |  | ||||||
| @@ -1,5 +0,0 @@ | |||||||
| use onebrc::implementations::libraries::run; |  | ||||||
|  |  | ||||||
| fn main() { |  | ||||||
|     run(); |  | ||||||
| } |  | ||||||
| @@ -1,5 +0,0 @@ | |||||||
| use onebrc::implementations::multi_threaded::run; |  | ||||||
|  |  | ||||||
| fn main() { |  | ||||||
|     run(); |  | ||||||
| } |  | ||||||
| @@ -1,5 +0,0 @@ | |||||||
| use onebrc::implementations::multi_threaded_smol::run; |  | ||||||
|  |  | ||||||
| fn main() { |  | ||||||
|     run(); |  | ||||||
| } |  | ||||||
| @@ -1,5 +0,0 @@ | |||||||
| use onebrc::implementations::multi_threaded_structured::run; |  | ||||||
|  |  | ||||||
| fn main() { |  | ||||||
|     run(); |  | ||||||
| } |  | ||||||
| @@ -1,5 +0,0 @@ | |||||||
| use onebrc::implementations::phcs::run; |  | ||||||
|  |  | ||||||
| fn main() { |  | ||||||
|     let _ = run(); |  | ||||||
| } |  | ||||||
| @@ -1,5 +0,0 @@ | |||||||
| use onebrc::implementations::reference_impl::run; |  | ||||||
|  |  | ||||||
| fn main() { |  | ||||||
|     run(); |  | ||||||
| } |  | ||||||
| @@ -1,91 +0,0 @@ | |||||||
| use clap::Parser; |  | ||||||
| use colored::Colorize; |  | ||||||
| use memmap2::Mmap; |  | ||||||
| use onebrc::implementations::rgk::{ |  | ||||||
|     find_city_names, format, run_parallel, to_str, Args, Record, S, |  | ||||||
| }; |  | ||||||
| use std::thread::available_parallelism; |  | ||||||
|  |  | ||||||
| fn main() { |  | ||||||
|     let args = Args::parse(); |  | ||||||
|  |  | ||||||
|     let start = std::time::Instant::now(); |  | ||||||
|     let filename = args |  | ||||||
|         .input |  | ||||||
|         .unwrap_or("../../../measurements.txt".to_string()); |  | ||||||
|     let mmap: Mmap; |  | ||||||
|     let data; |  | ||||||
|     { |  | ||||||
|         let file = std::fs::File::open(filename).unwrap(); |  | ||||||
|         mmap = unsafe { Mmap::map(&file).unwrap() }; |  | ||||||
|         data = &*mmap; |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     // Guaranteed to be aligned for SIMD. |  | ||||||
|     let offset = unsafe { data.align_to::<S>().0.len() }; |  | ||||||
|     let data = &data[offset..]; |  | ||||||
|  |  | ||||||
|     // Build a perfect hash function on the cities found in the first 100k characters. |  | ||||||
|     let names = find_city_names(&data[..4000000]); |  | ||||||
|  |  | ||||||
|     if args.stats { |  | ||||||
|         eprintln!("Num cities: {}", names.len()); |  | ||||||
|         let mut lens = vec![0; 102]; |  | ||||||
|         for n in &names { |  | ||||||
|             if *n.last().unwrap() == b';' { |  | ||||||
|                 continue; |  | ||||||
|             } |  | ||||||
|             lens[n.len()] += 1; |  | ||||||
|         } |  | ||||||
|         for (len, count) in lens.iter().enumerate() { |  | ||||||
|             if *count != 0 { |  | ||||||
|                 eprintln!("{}: {}", len, count); |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     let phf = run_parallel( |  | ||||||
|         data, |  | ||||||
|         &names, |  | ||||||
|         args.threads |  | ||||||
|             .unwrap_or(available_parallelism().unwrap().into()), |  | ||||||
|     ); |  | ||||||
|  |  | ||||||
|     if args.print { |  | ||||||
|         print!("{{"); |  | ||||||
|         let mut first = true; |  | ||||||
|  |  | ||||||
|         let mut keys = phf.keys.clone(); |  | ||||||
|         keys.sort_by(|kl, kr| to_str(kl).cmp(to_str(kr))); |  | ||||||
|  |  | ||||||
|         for name in &keys { |  | ||||||
|             if *name.last().unwrap() != b';' { |  | ||||||
|                 continue; |  | ||||||
|             } |  | ||||||
|             let namepos = &name[..name.len() - 1]; |  | ||||||
|  |  | ||||||
|             let rpos = phf.index(namepos); |  | ||||||
|             let rneg = phf.index(name); |  | ||||||
|             let (min, avg, max) = Record::merge_pos_neg(rpos, rneg); |  | ||||||
|  |  | ||||||
|             if !first { |  | ||||||
|                 print!(", "); |  | ||||||
|             } |  | ||||||
|             first = false; |  | ||||||
|  |  | ||||||
|             print!( |  | ||||||
|                 "{}={}/{}/{}", |  | ||||||
|                 to_str(namepos), |  | ||||||
|                 format(min), |  | ||||||
|                 format(avg), |  | ||||||
|                 format(max) |  | ||||||
|             ); |  | ||||||
|         } |  | ||||||
|         println!("}}"); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     eprintln!( |  | ||||||
|         "total: {}", |  | ||||||
|         format!("{:>5.2?}", start.elapsed()).bold().green() |  | ||||||
|     ); |  | ||||||
| } |  | ||||||
| @@ -1,5 +0,0 @@ | |||||||
| use onebrc::implementations::single_thread::run; |  | ||||||
|  |  | ||||||
| fn main() { |  | ||||||
|     run(); |  | ||||||
| } |  | ||||||
| @@ -1,5 +0,0 @@ | |||||||
| use onebrc::implementations::smol::run; |  | ||||||
|  |  | ||||||
| fn main() { |  | ||||||
|     run(); |  | ||||||
| } |  | ||||||
| @@ -1,10 +0,0 @@ | |||||||
| pub mod flare_flo; |  | ||||||
| pub mod libraries; |  | ||||||
| pub mod multi_threaded; |  | ||||||
| pub mod multi_threaded_smol; |  | ||||||
| pub mod multi_threaded_structured; |  | ||||||
| pub mod phcs; |  | ||||||
| pub mod reference_impl; |  | ||||||
| pub mod rgk; |  | ||||||
| pub mod single_thread; |  | ||||||
| pub mod smol; |  | ||||||
| @@ -1,286 +0,0 @@ | |||||||
| use std::collections::HashMap; |  | ||||||
| use std::env::args; |  | ||||||
| use std::fs::File; |  | ||||||
| use std::io::{BufRead, BufReader, Read, Seek, SeekFrom}; |  | ||||||
| use std::ops::{Neg, Range}; |  | ||||||
| use std::os::unix::fs::FileExt; |  | ||||||
| use std::sync::mpsc::{channel, Sender}; |  | ||||||
| use std::thread; |  | ||||||
| use std::thread::{available_parallelism, JoinHandle}; |  | ||||||
| use std::time::Instant; |  | ||||||
|  |  | ||||||
| #[derive(Copy, Clone, Debug)] |  | ||||||
| struct City { |  | ||||||
|     min: i64, |  | ||||||
|     max: i64, |  | ||||||
|     sum: i64, |  | ||||||
|     occurrences: u32, |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl City { |  | ||||||
|     pub fn add_new(&mut self, input: &[u8]) { |  | ||||||
|         let mut val = 0; |  | ||||||
|         let mut is_neg = false; |  | ||||||
|         for &char in input { |  | ||||||
|             match char { |  | ||||||
|                 b'0'..=b'9' => { |  | ||||||
|                     val *= 10; |  | ||||||
|                     let digit = char - b'0'; |  | ||||||
|                     val += digit as i64; |  | ||||||
|                 } |  | ||||||
|                 b'-' => { |  | ||||||
|                     is_neg = true; |  | ||||||
|                 } |  | ||||||
|                 b'.' => {} |  | ||||||
|                 _ => { |  | ||||||
|                     panic!("encountered {} in value", char::from(char)) |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|         if is_neg { |  | ||||||
|             val = val.neg(); |  | ||||||
|         } |  | ||||||
|         self.add_new_value(val); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     pub fn add_new_value(&mut self, new: i64) { |  | ||||||
|         self.min = self.min.min(new); |  | ||||||
|         self.max = self.max.max(new); |  | ||||||
|         self.sum += new; |  | ||||||
|         self.occurrences += 1; |  | ||||||
|     } |  | ||||||
|     pub fn min(&self) -> f64 { |  | ||||||
|         self.min as f64 / 10.0 |  | ||||||
|     } |  | ||||||
|     pub fn mean(&self) -> f64 { |  | ||||||
|         self.sum as f64 / self.occurrences as f64 / 10.0 |  | ||||||
|     } |  | ||||||
|     pub fn max(&self) -> f64 { |  | ||||||
|         self.max as f64 / 10.0 |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     pub fn add_result(&mut self, other: Self) { |  | ||||||
|         self.min = self.min.min(other.min); |  | ||||||
|         self.max = self.max.max(other.max); |  | ||||||
|         self.sum += other.sum; |  | ||||||
|         self.occurrences += other.occurrences; |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl Default for City { |  | ||||||
|     fn default() -> Self { |  | ||||||
|         Self { |  | ||||||
|             min: i64::MAX, |  | ||||||
|             max: i64::MIN, |  | ||||||
|             sum: 0, |  | ||||||
|             occurrences: 0, |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[derive(Default, Clone, Debug)] |  | ||||||
| struct Citymap { |  | ||||||
|     // Length then values |  | ||||||
|     pub map: HashMap<u32, (String, City)>, |  | ||||||
| } |  | ||||||
|  |  | ||||||
| fn hashstr(s: &str) -> u32 { |  | ||||||
|     let b = s.as_bytes(); |  | ||||||
|     u32::from_le_bytes([s.len() as u8, b[0], b[1], b[2]]) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl Citymap { |  | ||||||
|     pub fn lookup(&mut self, lookup: &str) -> &mut City { |  | ||||||
|         let hash = hashstr(lookup); |  | ||||||
|         let builder = self.map.raw_entry_mut(); |  | ||||||
|         &mut builder |  | ||||||
|             .from_key(&hash) |  | ||||||
|             .or_insert(hash, (lookup.to_owned(), Default::default())) |  | ||||||
|             .1 |  | ||||||
|              .1 |  | ||||||
|     } |  | ||||||
|     pub fn new() -> Self { |  | ||||||
|         Self { |  | ||||||
|             map: Default::default(), |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|     pub fn into_key_values(self) -> Vec<(String, City)> { |  | ||||||
|         self.map.into_values().collect() |  | ||||||
|     } |  | ||||||
|     pub fn merge_with(&mut self, rhs: Self) { |  | ||||||
|         for (k, v) in rhs.map.into_iter() { |  | ||||||
|             self.map |  | ||||||
|                 .entry(k) |  | ||||||
|                 .and_modify(|lhs| { |  | ||||||
|                     lhs.1.add_result(v.1); |  | ||||||
|                 }) |  | ||||||
|                 .or_insert(v); |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| pub fn run() { |  | ||||||
|     let mut args = args(); |  | ||||||
|  |  | ||||||
|     let start = Instant::now(); |  | ||||||
|     let input = "../../../measurements.txt"; |  | ||||||
|  |  | ||||||
|     let results = if args.any(|e| e == "st") { |  | ||||||
|         citymap_single_thread(input) |  | ||||||
|     } else { |  | ||||||
|         citymap_multi_threaded(input) |  | ||||||
|     }; |  | ||||||
|  |  | ||||||
|     print_results(results); |  | ||||||
|  |  | ||||||
|     println!("{:?}", start.elapsed()); |  | ||||||
| } |  | ||||||
|  |  | ||||||
| fn citymap_single_thread(path: &str) -> Citymap { |  | ||||||
|     let f = File::open(path).unwrap(); |  | ||||||
|  |  | ||||||
|     let mut buf = BufReader::with_capacity(10_usize.pow(8), f); |  | ||||||
|     citymap_naive(&mut buf) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| fn citymap_multi_threaded(path: &str) -> Citymap { |  | ||||||
|     let cpus = available_parallelism().unwrap().get(); |  | ||||||
|     let size = File::open(path).unwrap().metadata().unwrap().len(); |  | ||||||
|     let per_thread = size / cpus as u64; |  | ||||||
|  |  | ||||||
|     let mut index = 0; |  | ||||||
|     let mut threads = vec![]; |  | ||||||
|     let (sender, receiver) = channel(); |  | ||||||
|     for i in 0..cpus { |  | ||||||
|         let range = index..({ |  | ||||||
|             index += per_thread; |  | ||||||
|             index.min(size) |  | ||||||
|         }); |  | ||||||
|         threads.push(citymap_thread(path.to_owned(), range, i, sender.clone())); |  | ||||||
|     } |  | ||||||
|     let mut ranges = (0..cpus) |  | ||||||
|         .map(|_| receiver.recv().unwrap()) |  | ||||||
|         .collect::<Vec<_>>(); |  | ||||||
|     ranges.sort_unstable_by_key(|e| e.start); |  | ||||||
|     assert!( |  | ||||||
|         ranges.windows(2).all(|e| { |  | ||||||
|             let first = &e[0]; |  | ||||||
|             let second = &e[1]; |  | ||||||
|             first.end == second.start |  | ||||||
|         }), |  | ||||||
|         "Ranges overlap or have gaps: {ranges:?}" |  | ||||||
|     ); |  | ||||||
|     threads |  | ||||||
|         .into_iter() |  | ||||||
|         .map(|e| e.join().unwrap()) |  | ||||||
|         //.map(|e|dbg!(e)) |  | ||||||
|         .reduce(|mut left, right| { |  | ||||||
|             left.merge_with(right); |  | ||||||
|             left |  | ||||||
|         }) |  | ||||||
|         .unwrap() |  | ||||||
| } |  | ||||||
|  |  | ||||||
| fn citymap_thread( |  | ||||||
|     path: String, |  | ||||||
|     mut range: Range<u64>, |  | ||||||
|     i: usize, |  | ||||||
|     range_feedback: Sender<Range<u64>>, |  | ||||||
| ) -> JoinHandle<Citymap> { |  | ||||||
|     thread::Builder::new() |  | ||||||
|         .name(format!("process_thread id: {i} assigned: {range:?}")) |  | ||||||
|         .spawn(move || { |  | ||||||
|             let mut file = File::open(path).unwrap(); |  | ||||||
|             //println!("Before: {range:?}"); |  | ||||||
|  |  | ||||||
|             // Perform alignment of buffer/range at the start |  | ||||||
|             { |  | ||||||
|                 // Skip head alignment for start of file |  | ||||||
|                 if range.start != 0 { |  | ||||||
|                     let mut head = vec![0; 50]; |  | ||||||
|                     let len = file.read_at(&mut head, range.start).unwrap(); |  | ||||||
|                     head.truncate(len); |  | ||||||
|  |  | ||||||
|                     for (i, &pos) in head.iter().enumerate() { |  | ||||||
|                         if pos == b'\n' { |  | ||||||
|                             range.start += i as u64; |  | ||||||
|                             break; |  | ||||||
|                         } |  | ||||||
|                     } |  | ||||||
|                 } |  | ||||||
|  |  | ||||||
|                 // tail alignment |  | ||||||
|                 { |  | ||||||
|                     let mut head = vec![0; 50]; |  | ||||||
|                     let len = file.read_at(&mut head, range.end).unwrap(); |  | ||||||
|                     head.truncate(len); |  | ||||||
|  |  | ||||||
|                     for (i, &pos) in head.iter().enumerate() { |  | ||||||
|                         if pos == b'\n' { |  | ||||||
|                             range.end += i as u64; |  | ||||||
|                             break; |  | ||||||
|                         } |  | ||||||
|                     } |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|  |  | ||||||
|             // Notify main about alignment |  | ||||||
|             range_feedback.send(range.clone()).unwrap(); |  | ||||||
|             // Ensure we remain within bounds of the designated file range |  | ||||||
|             file.seek(SeekFrom::Start(range.start)).unwrap(); |  | ||||||
|  |  | ||||||
|             let limited = BufReader::with_capacity(10_usize.pow(5), file); |  | ||||||
|             let mut buffered = limited.take(range.end - range.start); |  | ||||||
|             citymap_naive(&mut buffered) |  | ||||||
|         }) |  | ||||||
|         .unwrap() |  | ||||||
| } |  | ||||||
|  |  | ||||||
| fn citymap_naive(input: &mut impl BufRead) -> Citymap { |  | ||||||
|     let mut map = Citymap::new(); |  | ||||||
|     let mut buf = Vec::with_capacity(50); |  | ||||||
|     loop { |  | ||||||
|         let read = input.read_until(b'\n', &mut buf).unwrap(); |  | ||||||
|         // Stream has finished |  | ||||||
|         if read == 0 { |  | ||||||
|             break; |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         // Skip over just newline strings that get created by the alignment process |  | ||||||
|         if buf == b"\n" { |  | ||||||
|             continue; |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         let mut city = None; |  | ||||||
|         let mut val = None; |  | ||||||
|         for (i, &char) in buf.iter().enumerate() { |  | ||||||
|             if char == b';' { |  | ||||||
|                 city = Some(&buf[0..i]); |  | ||||||
|                 val = Some(&buf[(i + 1)..(buf.len() - 1)]); |  | ||||||
|                 break; |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|         #[cfg(not(feature = "unsafe"))] |  | ||||||
|         let entry = map.lookup(std::str::from_utf8(city.unwrap()).unwrap()); |  | ||||||
|  |  | ||||||
|         #[cfg(feature = "unsafe")] |  | ||||||
|         let entry = map.lookup(unsafe { std::str::from_utf8_unchecked(city.unwrap()) }); |  | ||||||
|  |  | ||||||
|         entry.add_new(val.unwrap()); |  | ||||||
|         buf.clear(); |  | ||||||
|     } |  | ||||||
|     map |  | ||||||
| } |  | ||||||
|  |  | ||||||
| fn print_results(map: Citymap) { |  | ||||||
|     let mut res = map.into_key_values(); |  | ||||||
|     res.sort_unstable_by(|(a, _), (b, _)| a.cmp(b)); |  | ||||||
|     print!("{{"); |  | ||||||
|     for (city, vals) in res { |  | ||||||
|         let min = vals.min(); |  | ||||||
|         let mean = vals.mean(); |  | ||||||
|         let max = vals.max(); |  | ||||||
|         print!("{city}={min:.1}/{mean:.1}/{max:.1}, ") |  | ||||||
|     } |  | ||||||
|     println!("}}"); |  | ||||||
| } |  | ||||||
| @@ -1,110 +0,0 @@ | |||||||
| use crate::models::station_measurements::StationMeasurements; |  | ||||||
| use crate::utils::{hash, parse}; |  | ||||||
| use memmap2::MmapOptions; |  | ||||||
| use rustc_hash::{FxBuildHasher, FxHashMap as HashMap}; |  | ||||||
| use std::slice::from_raw_parts; |  | ||||||
| use std::sync::mpsc; |  | ||||||
| use std::time::Instant; |  | ||||||
| use std::{fs::File, thread}; |  | ||||||
|  |  | ||||||
| const DEFAULT_HASHMAP_LENGTH: usize = 10000; |  | ||||||
|  |  | ||||||
| pub fn run() { |  | ||||||
|     let now = Instant::now(); |  | ||||||
|     const FILE_PATH: &str = "../../../measurements.txt"; |  | ||||||
|     let file = File::open(FILE_PATH).expect("File measurements.txt not found"); |  | ||||||
|     let mmap = unsafe { MmapOptions::new().map(&file).unwrap() }; |  | ||||||
|     let mmap_ptr = mmap.as_ptr(); |  | ||||||
|     let file_length = mmap.len(); |  | ||||||
|     let hasher = FxBuildHasher; |  | ||||||
|     // Even if I could now just use the byte slice as a key, doing the hash is still faster |  | ||||||
|     let mut stations: HashMap<u64, (&[u8], StationMeasurements)> = |  | ||||||
|         HashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher); |  | ||||||
|     let (tx, rx) = mpsc::channel(); |  | ||||||
|     let cores = thread::available_parallelism().unwrap().into(); |  | ||||||
|     let chunk_length = file_length / cores; |  | ||||||
|     let mut bounds = Vec::with_capacity(cores + 1); |  | ||||||
|     let mut start = 0; |  | ||||||
|     for _ in 0..cores { |  | ||||||
|         let end = (start + chunk_length).min(mmap.len()); |  | ||||||
|         let next_new_line = match memchr::memchr(b'\n', &mmap[end..]) { |  | ||||||
|             Some(v) => v, |  | ||||||
|             None => { |  | ||||||
|                 assert_eq!(end, mmap.len()); |  | ||||||
|                 0 |  | ||||||
|             } |  | ||||||
|         }; |  | ||||||
|         let end = end + next_new_line; |  | ||||||
|         bounds.push((start, end)); |  | ||||||
|         start = end + 1; |  | ||||||
|     } |  | ||||||
|     thread::scope(|s| { |  | ||||||
|         for i in 0..cores { |  | ||||||
|             let tx = tx.clone(); |  | ||||||
|             let (start, end) = *bounds.get(i).unwrap(); |  | ||||||
|             let mmap_slice = unsafe { from_raw_parts(mmap_ptr.add(start), end - start) }; |  | ||||||
|             s.spawn(move || { |  | ||||||
|                 let mut t_stations: HashMap<u64, (&[u8], StationMeasurements)> = |  | ||||||
|                     HashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher); |  | ||||||
|                 for line in mmap_slice.split(|&byte| byte == b'\n') { |  | ||||||
|                     if line.is_empty() { |  | ||||||
|                         break; |  | ||||||
|                     } |  | ||||||
|                     let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); |  | ||||||
|                     let hash = hash::bytes(station); |  | ||||||
|                     let temp = parse::temp(temp); |  | ||||||
|                     let measurements_option = t_stations.get_mut(&hash); |  | ||||||
|                     if let Some((_, measurements)) = measurements_option { |  | ||||||
|                         measurements.update(temp); |  | ||||||
|                     } else { |  | ||||||
|                         let measurements = StationMeasurements { |  | ||||||
|                             min: temp, |  | ||||||
|                             max: temp, |  | ||||||
|                             count: 1, |  | ||||||
|                             sum: temp, |  | ||||||
|                         }; |  | ||||||
|                         t_stations.insert(hash, (station, measurements)); |  | ||||||
|                     } |  | ||||||
|                 } |  | ||||||
|                 let _ = tx.send(t_stations); |  | ||||||
|             }); |  | ||||||
|         } |  | ||||||
|         drop(tx); |  | ||||||
|         while let Ok(t_stations) = rx.recv() { |  | ||||||
|             for (hash, (station, measurements)) in t_stations.iter() { |  | ||||||
|                 let joined_measurements_options = stations.get_mut(hash); |  | ||||||
|                 if let Some((_, joined_measurements)) = joined_measurements_options { |  | ||||||
|                     joined_measurements.merge(measurements); |  | ||||||
|                 } else { |  | ||||||
|                     stations.insert(*hash, (station, *measurements)); |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|         let mut stations: Vec<String> = stations |  | ||||||
|             .iter() |  | ||||||
|             .map(|(_, (station, measurements))| { |  | ||||||
|                 let station = unsafe { std::str::from_utf8_unchecked(station) }; |  | ||||||
|                 let measurements = measurements.to_string(); |  | ||||||
|                 #[cfg(feature = "json")] |  | ||||||
|                 { |  | ||||||
|                     format!("{{\"{station}\":\"{measurements}\"}}") |  | ||||||
|                 } |  | ||||||
|                 #[cfg(not(feature = "json"))] |  | ||||||
|                 { |  | ||||||
|                     format!("{station}={measurements}") |  | ||||||
|                 } |  | ||||||
|             }) |  | ||||||
|             .collect(); |  | ||||||
|         stations.sort(); |  | ||||||
|         let stations = stations.join(","); |  | ||||||
|         #[cfg(feature = "json")] |  | ||||||
|         { |  | ||||||
|             println!("\n\n[{stations}]"); |  | ||||||
|         } |  | ||||||
|         #[cfg(not(feature = "json"))] |  | ||||||
|         { |  | ||||||
|             println!("\n\n{{{stations}}}"); |  | ||||||
|         } |  | ||||||
|         println!("\n\nTime={} ms", now.elapsed().as_millis()); |  | ||||||
|     }); |  | ||||||
| } |  | ||||||
| @@ -1,118 +0,0 @@ | |||||||
| use crate::models::station_measurements::StationMeasurements; |  | ||||||
| use crate::utils::{hash, parse}; |  | ||||||
| use std::collections::HashMap; |  | ||||||
| use std::io::{BufRead, Seek, SeekFrom}; |  | ||||||
| use std::sync::mpsc; |  | ||||||
| use std::time::Instant; |  | ||||||
| use std::{fs::File, io::BufReader, thread}; |  | ||||||
|  |  | ||||||
| const DEFAULT_HASHMAP_LENGTH: usize = 10000; |  | ||||||
|  |  | ||||||
| pub fn run() { |  | ||||||
|     const FILE_PATH: &str = "../../../measurements.txt"; |  | ||||||
|     let now = Instant::now(); |  | ||||||
|     thread::scope(|s| { |  | ||||||
|         let mut stations: HashMap<u64, (String, StationMeasurements)> = |  | ||||||
|             HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); |  | ||||||
|         let (tx, rx) = mpsc::channel(); |  | ||||||
|         let cores = thread::available_parallelism().unwrap().into(); |  | ||||||
|         let file = File::open(FILE_PATH).expect("File measurements.txt not found"); |  | ||||||
|         let mut reader = BufReader::new(&file); |  | ||||||
|         let file_length = reader.seek(SeekFrom::End(0)).unwrap(); |  | ||||||
|         let chunk_length = file_length as usize / cores; |  | ||||||
|         let mut bounds = Vec::with_capacity(cores + 1); |  | ||||||
|         let mut start = 0; |  | ||||||
|         for i in 0..cores { |  | ||||||
|             let mut reader = BufReader::new(&file); |  | ||||||
|             let mut end = chunk_length * i; |  | ||||||
|             reader |  | ||||||
|                 .seek(SeekFrom::Start(end as u64)) |  | ||||||
|                 .expect("could not seek"); |  | ||||||
|             let mut line = Vec::with_capacity(108); |  | ||||||
|             let line_len = reader |  | ||||||
|                 .read_until(b'\n', &mut line) |  | ||||||
|                 .expect("could not read bytes"); |  | ||||||
|             end += line_len; |  | ||||||
|             bounds.push((start, end)); |  | ||||||
|             start = end + 1; |  | ||||||
|         } |  | ||||||
|         for i in 0..cores { |  | ||||||
|             let tx = tx.clone(); |  | ||||||
|             let (mut currposition, end) = *bounds.get(i).unwrap(); |  | ||||||
|             s.spawn(move || { |  | ||||||
|                 let file = File::open(FILE_PATH).expect("File measurements.txt not found"); |  | ||||||
|                 let mut reader = BufReader::new(&file); |  | ||||||
|                 reader.seek(SeekFrom::Start(currposition as u64)).unwrap(); |  | ||||||
|                 let mut t_stations: HashMap<u64, (String, StationMeasurements)> = |  | ||||||
|                     HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); |  | ||||||
|                 let mut line = Vec::with_capacity(108); |  | ||||||
|                 loop { |  | ||||||
|                     let line_len = reader |  | ||||||
|                         .read_until(b'\n', &mut line) |  | ||||||
|                         .expect("could not read bytes"); |  | ||||||
|                     if line_len == 0 { |  | ||||||
|                         break; |  | ||||||
|                     } |  | ||||||
|                     let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); |  | ||||||
|                     let hash = hash::bytes(station); |  | ||||||
|                     let station = unsafe { std::str::from_utf8_unchecked(station) }; |  | ||||||
|                     let temp = parse::temp(temp.split_last().unwrap().1); |  | ||||||
|                     let measurements_option = t_stations.get_mut(&hash); |  | ||||||
|                     if let Some((_, measurements)) = measurements_option { |  | ||||||
|                         measurements.update(temp); |  | ||||||
|                     } else { |  | ||||||
|                         let measurements = StationMeasurements { |  | ||||||
|                             min: temp, |  | ||||||
|                             max: temp, |  | ||||||
|                             count: 1, |  | ||||||
|                             sum: temp, |  | ||||||
|                         }; |  | ||||||
|                         t_stations.insert(hash, (station.to_string(), measurements)); |  | ||||||
|                     } |  | ||||||
|                     currposition += line_len; |  | ||||||
|                     if currposition >= end { |  | ||||||
|                         break; |  | ||||||
|                     } |  | ||||||
|                     line.clear(); |  | ||||||
|                 } |  | ||||||
|                 let _ = tx.send(t_stations); |  | ||||||
|             }); |  | ||||||
|         } |  | ||||||
|         drop(tx); |  | ||||||
|         while let Ok(t_stations) = rx.recv() { |  | ||||||
|             for (&hash, (station, measurements)) in t_stations.iter() { |  | ||||||
|                 let joined_measurements_options = stations.get_mut(&hash); |  | ||||||
|                 if let Some((_, joined_measurements)) = joined_measurements_options { |  | ||||||
|                     joined_measurements.merge(measurements); |  | ||||||
|                 } else { |  | ||||||
|                     stations.insert(hash, (station.to_owned(), *measurements)); |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|         let mut stations: Vec<String> = stations |  | ||||||
|             .iter() |  | ||||||
|             .map(|(_, (station, measurements))| { |  | ||||||
|                 let measurements = measurements.to_string(); |  | ||||||
|                 #[cfg(feature = "json")] |  | ||||||
|                 { |  | ||||||
|                     format!("{{\"{station}\":\"{measurements}\"}}") |  | ||||||
|                 } |  | ||||||
|                 #[cfg(not(feature = "json"))] |  | ||||||
|                 { |  | ||||||
|                     format!("{station}={measurements}") |  | ||||||
|                 } |  | ||||||
|             }) |  | ||||||
|             .collect(); |  | ||||||
|         stations.sort(); |  | ||||||
|         let stations = stations.join(","); |  | ||||||
|         #[cfg(feature = "json")] |  | ||||||
|         { |  | ||||||
|             println!("\n\n[{stations}]"); |  | ||||||
|         } |  | ||||||
|         #[cfg(not(feature = "json"))] |  | ||||||
|         { |  | ||||||
|             println!("\n\n{{{stations}}}"); |  | ||||||
|         } |  | ||||||
|         println!("\n\nTime={} ms", now.elapsed().as_millis()); |  | ||||||
|     }); |  | ||||||
| } |  | ||||||
| @@ -1,128 +0,0 @@ | |||||||
| use smol::fs::File; |  | ||||||
| use smol::io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom}; |  | ||||||
| use std::collections::HashMap; |  | ||||||
|  |  | ||||||
| use crate::models::station_measurements::StationMeasurements; |  | ||||||
| use crate::utils::{hash, parse}; |  | ||||||
| use easy_parallel::Parallel; |  | ||||||
| use std::thread; |  | ||||||
| use std::time::Instant; |  | ||||||
|  |  | ||||||
| const DEFAULT_HASHMAP_LENGTH: usize = 10000; |  | ||||||
|  |  | ||||||
| pub fn run() { |  | ||||||
|     const FILE_PATH: &str = "../../../measurements.txt"; |  | ||||||
|     let now = Instant::now(); |  | ||||||
|     let mut stations: HashMap<u64, (String, StationMeasurements)> = |  | ||||||
|         HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); |  | ||||||
|     let cores = thread::available_parallelism().unwrap().into(); |  | ||||||
|     let bounds = smol::block_on(async { |  | ||||||
|         let mut file = File::open(FILE_PATH) |  | ||||||
|             .await |  | ||||||
|             .expect("File measurements.txt not found"); |  | ||||||
|         let mut reader = BufReader::new(&mut file); |  | ||||||
|         let file_length = reader.seek(SeekFrom::End(0)).await.unwrap(); |  | ||||||
|         let chunk_length = file_length as usize / cores; |  | ||||||
|         let mut bounds = Vec::with_capacity(cores + 1); |  | ||||||
|         bounds.push(0); |  | ||||||
|         for i in 1..cores { |  | ||||||
|             let mut reader = BufReader::new(&mut file); |  | ||||||
|             let mut byte_start = chunk_length * i; |  | ||||||
|             reader |  | ||||||
|                 .seek(SeekFrom::Start(byte_start as u64)) |  | ||||||
|                 .await |  | ||||||
|                 .expect("could not seek"); |  | ||||||
|             let mut line = Vec::with_capacity(108); |  | ||||||
|             let line_len = reader |  | ||||||
|                 .read_until(b'\n', &mut line) |  | ||||||
|                 .await |  | ||||||
|                 .expect("could not read bytes"); |  | ||||||
|             byte_start += line_len; |  | ||||||
|             bounds.push(byte_start as u64); |  | ||||||
|         } |  | ||||||
|         bounds.push(file_length); |  | ||||||
|         bounds |  | ||||||
|     }); |  | ||||||
|     let t_stations_vec = Parallel::new() |  | ||||||
|         .each(0..cores, |i| { |  | ||||||
|             let mut currposition = *bounds.get(i).unwrap(); |  | ||||||
|             let end = *bounds.get(i + 1).unwrap(); |  | ||||||
|             smol::block_on(async { |  | ||||||
|                 let mut file = File::open(FILE_PATH) |  | ||||||
|                     .await |  | ||||||
|                     .expect("File measurements.txt not found"); |  | ||||||
|                 let mut reader = BufReader::new(&mut file); |  | ||||||
|                 reader.seek(SeekFrom::Start(currposition)).await.unwrap(); |  | ||||||
|                 let mut t_stations: HashMap<u64, (String, StationMeasurements)> = |  | ||||||
|                     HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); |  | ||||||
|                 let mut line = Vec::with_capacity(108); |  | ||||||
|                 loop { |  | ||||||
|                     let line_len = reader |  | ||||||
|                         .read_until(b'\n', &mut line) |  | ||||||
|                         .await |  | ||||||
|                         .expect("could not read bytes"); |  | ||||||
|                     if line_len == 0 { |  | ||||||
|                         break; |  | ||||||
|                     } |  | ||||||
|                     let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); |  | ||||||
|                     let hash = hash::bytes(station); |  | ||||||
|                     let station = unsafe { std::str::from_utf8_unchecked(station) }; |  | ||||||
|                     let temp = parse::temp(temp.split_last().unwrap().1); |  | ||||||
|                     let measurements_option = t_stations.get_mut(&hash); |  | ||||||
|                     if let Some((_, measurements)) = measurements_option { |  | ||||||
|                         measurements.update(temp); |  | ||||||
|                     } else { |  | ||||||
|                         let measurements = StationMeasurements { |  | ||||||
|                             min: temp, |  | ||||||
|                             max: temp, |  | ||||||
|                             count: 1, |  | ||||||
|                             sum: temp, |  | ||||||
|                         }; |  | ||||||
|                         t_stations.insert(hash, (station.to_string(), measurements)); |  | ||||||
|                     } |  | ||||||
|                     currposition += line_len as u64; |  | ||||||
|                     if currposition >= end { |  | ||||||
|                         break; |  | ||||||
|                     } |  | ||||||
|                     line.clear(); |  | ||||||
|                 } |  | ||||||
|                 t_stations |  | ||||||
|             }) |  | ||||||
|         }) |  | ||||||
|         .run(); |  | ||||||
|     for t_stations in t_stations_vec { |  | ||||||
|         for (hash, (station, measurements)) in t_stations.iter() { |  | ||||||
|             let joined_measurements_options = stations.get_mut(hash); |  | ||||||
|             if let Some((_, joined_measurements)) = joined_measurements_options { |  | ||||||
|                 joined_measurements.merge(measurements); |  | ||||||
|             } else { |  | ||||||
|                 stations.insert(*hash, (station.to_owned(), *measurements)); |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|     let mut stations: Vec<String> = stations |  | ||||||
|         .iter() |  | ||||||
|         .map(|(_, (station, measurements))| { |  | ||||||
|             let measurements = measurements.to_string(); |  | ||||||
|             #[cfg(feature = "json")] |  | ||||||
|             { |  | ||||||
|                 format!("{{\"{station}\":\"{measurements}\"}}") |  | ||||||
|             } |  | ||||||
|             #[cfg(not(feature = "json"))] |  | ||||||
|             { |  | ||||||
|                 format!("{station}={measurements}") |  | ||||||
|             } |  | ||||||
|         }) |  | ||||||
|         .collect(); |  | ||||||
|     stations.sort(); |  | ||||||
|     let stations = stations.join(","); |  | ||||||
|     #[cfg(feature = "json")] |  | ||||||
|     { |  | ||||||
|         println!("\n\n[{stations}]"); |  | ||||||
|     } |  | ||||||
|     #[cfg(not(feature = "json"))] |  | ||||||
|     { |  | ||||||
|         println!("\n\n{{{stations}}}"); |  | ||||||
|     } |  | ||||||
|     println!("\n\nTime={} ms", now.elapsed().as_millis()); |  | ||||||
| } |  | ||||||
| @@ -1,120 +0,0 @@ | |||||||
| use crate::models::station_measurements::StationMeasurements; |  | ||||||
| use crate::utils::parse; |  | ||||||
| use memmap2::MmapOptions; |  | ||||||
| use rustc_hash::{FxBuildHasher, FxHashMap}; |  | ||||||
| use std::ffi::CStr; |  | ||||||
| use std::sync::mpsc; |  | ||||||
| use std::time::Instant; |  | ||||||
| use std::{fs::File, thread}; |  | ||||||
|  |  | ||||||
| const DEFAULT_HASHMAP_LENGTH: usize = 10000; |  | ||||||
|  |  | ||||||
| pub fn run() { |  | ||||||
|     print!("\x1b[J"); |  | ||||||
|     const FILE_PATH: &str = "structured_measurements.txt"; |  | ||||||
|     let now = Instant::now(); |  | ||||||
|     let file = File::open(FILE_PATH).expect("File structured_measurements.txt not found"); |  | ||||||
|     let mmap = unsafe { MmapOptions::new().map(&file).unwrap() }; |  | ||||||
|     let file_length = mmap.len(); |  | ||||||
|     let hasher = FxBuildHasher; |  | ||||||
|     let mut stations: FxHashMap<String, StationMeasurements> = |  | ||||||
|         FxHashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher); |  | ||||||
|     let (tx, rx) = mpsc::channel(); |  | ||||||
|     let cores = thread::available_parallelism().unwrap().into(); |  | ||||||
|     let chunk_length = file_length / cores; |  | ||||||
|     let mut bounds = Vec::with_capacity(cores + 1); |  | ||||||
|     let mut start = 0; |  | ||||||
|     for _ in 0..cores { |  | ||||||
|         let end = (start + chunk_length).min(mmap.len()); |  | ||||||
|         let next_new_line = match memchr::memchr(b'\n', &mmap[end..]) { |  | ||||||
|             Some(v) => v, |  | ||||||
|             None => { |  | ||||||
|                 assert_eq!(end, mmap.len()); |  | ||||||
|                 0 |  | ||||||
|             } |  | ||||||
|         }; |  | ||||||
|         let end = end + next_new_line; |  | ||||||
|         bounds.push((start, end)); |  | ||||||
|         start = end + 1; |  | ||||||
|     } |  | ||||||
|     thread::scope(|s| { |  | ||||||
|         for i in 0..cores { |  | ||||||
|             let tx = tx.clone(); |  | ||||||
|             let (start, end) = *bounds.get(i).unwrap(); |  | ||||||
|             let mmap_slice = &mmap[start..end]; |  | ||||||
|             s.spawn(move || { |  | ||||||
|                 let mut t_stations: FxHashMap<String, StationMeasurements> = |  | ||||||
|                     FxHashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher); |  | ||||||
|                 let lines = mmap_slice.chunks_exact(107); |  | ||||||
|                 for (line_num, line) in lines.enumerate() { |  | ||||||
|                     if line_num % 100000 == 0 { |  | ||||||
|                         print!("\x1b[{i};0Hlines: {line_num}"); |  | ||||||
|                     } |  | ||||||
|                     let (station, temp) = unsafe { line.split_at_unchecked(100) }; |  | ||||||
|                     let station = { |  | ||||||
|                         if station[station.len() - 1] == 0u8 { |  | ||||||
|                             unsafe { |  | ||||||
|                                 std::str::from_utf8_unchecked( |  | ||||||
|                                     CStr::from_bytes_until_nul(station).unwrap().to_bytes(), |  | ||||||
|                                 ) |  | ||||||
|                             } |  | ||||||
|                         } else { |  | ||||||
|                             unsafe { std::str::from_utf8_unchecked(station) } |  | ||||||
|                         } |  | ||||||
|                     } |  | ||||||
|                     .to_owned(); |  | ||||||
|                     let temp = parse::temp_new(&temp[1..6]); |  | ||||||
|                     let measurements_option = t_stations.get_mut(&station); |  | ||||||
|                     if let Some(measurements) = measurements_option { |  | ||||||
|                         measurements.update(temp); |  | ||||||
|                     } else { |  | ||||||
|                         let measurements = StationMeasurements { |  | ||||||
|                             min: temp, |  | ||||||
|                             max: temp, |  | ||||||
|                             count: 1, |  | ||||||
|                             sum: temp, |  | ||||||
|                         }; |  | ||||||
|                         t_stations.insert(station, measurements); |  | ||||||
|                     } |  | ||||||
|                 } |  | ||||||
|                 let _ = tx.send(t_stations); |  | ||||||
|             }); |  | ||||||
|         } |  | ||||||
|         drop(tx); |  | ||||||
|         while let Ok(t_stations) = rx.recv() { |  | ||||||
|             for (station, measurements) in t_stations.iter() { |  | ||||||
|                 let joined_measurements_options = stations.get_mut(station); |  | ||||||
|                 if let Some(joined_measurements) = joined_measurements_options { |  | ||||||
|                     joined_measurements.merge(measurements); |  | ||||||
|                 } else { |  | ||||||
|                     stations.insert(station.to_owned(), *measurements); |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|         let mut stations: Vec<String> = stations |  | ||||||
|             .iter() |  | ||||||
|             .map(|(station, measurements)| { |  | ||||||
|                 let measurements = measurements.to_string(); |  | ||||||
|                 #[cfg(feature = "json")] |  | ||||||
|                 { |  | ||||||
|                     format!("{{\"{station}\":\"{measurements}\"}}") |  | ||||||
|                 } |  | ||||||
|                 #[cfg(not(feature = "json"))] |  | ||||||
|                 { |  | ||||||
|                     format!("{station}={measurements}") |  | ||||||
|                 } |  | ||||||
|             }) |  | ||||||
|             .collect(); |  | ||||||
|         stations.sort(); |  | ||||||
|         let stations = stations.join(","); |  | ||||||
|         #[cfg(feature = "json")] |  | ||||||
|         { |  | ||||||
|             println!("\n\n[{stations}]"); |  | ||||||
|         } |  | ||||||
|         #[cfg(not(feature = "json"))] |  | ||||||
|         { |  | ||||||
|             println!("\n\n{{{stations}}}"); |  | ||||||
|         } |  | ||||||
|         println!("\n\nTime={} ms", now.elapsed().as_millis()); |  | ||||||
|     }); |  | ||||||
| } |  | ||||||
| @@ -1,203 +0,0 @@ | |||||||
| use std::collections::HashMap; |  | ||||||
| use std::env; |  | ||||||
| use std::fmt; |  | ||||||
| use std::fs::File; |  | ||||||
| use std::io; |  | ||||||
| use std::io::prelude::*; |  | ||||||
| use std::thread::{self, Scope, ScopedJoinHandle}; |  | ||||||
|  |  | ||||||
| use crate::models::mmap::{Mmap, MmapChunkIterator}; |  | ||||||
|  |  | ||||||
| // Defined in challenge spec |  | ||||||
| const MAX_STATIONS: usize = 10000; |  | ||||||
| const NUM_CONSUMERS: usize = 32; |  | ||||||
| const FIXED_POINT_DIVISOR: f64 = 10.0; |  | ||||||
|  |  | ||||||
| struct StationData { |  | ||||||
|     min_temp: i32, |  | ||||||
|     max_temp: i32, |  | ||||||
|     count: i32, |  | ||||||
|     temp_sum: i32, |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl fmt::Display for StationData { |  | ||||||
|     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |  | ||||||
|         write!( |  | ||||||
|             f, |  | ||||||
|             "{:.1}/{:.1}/{:.1}", |  | ||||||
|             (self.min_temp as f64 / FIXED_POINT_DIVISOR), |  | ||||||
|             self.get_mean(), |  | ||||||
|             (self.max_temp as f64 / FIXED_POINT_DIVISOR) |  | ||||||
|         ) |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| /// Efficiently handles station statistics. Avoids using floating-point arithmetic to speed-up processing. |  | ||||||
| /// The mean is only calculated on demand, so we avoid calculating it as we read the file |  | ||||||
| impl StationData { |  | ||||||
|     fn new(temp: i32) -> Self { |  | ||||||
|         Self { |  | ||||||
|             min_temp: temp, |  | ||||||
|             max_temp: temp, |  | ||||||
|             count: 1, |  | ||||||
|             temp_sum: temp, |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     fn to_bytes(&self) -> Vec<u8> { |  | ||||||
|         format!( |  | ||||||
|             "{:.1}/{:.1}/{:.1}", |  | ||||||
|             (self.min_temp as f64 / FIXED_POINT_DIVISOR), |  | ||||||
|             self.get_mean(), |  | ||||||
|             (self.max_temp as f64 / FIXED_POINT_DIVISOR) |  | ||||||
|         ) |  | ||||||
|         .into_bytes() |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     fn get_mean(&self) -> f64 { |  | ||||||
|         (self.temp_sum as f64 / self.count as f64) / FIXED_POINT_DIVISOR |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     fn update_from(&mut self, temp: i32) { |  | ||||||
|         self.max_temp = self.max_temp.max(temp); |  | ||||||
|         self.min_temp = self.min_temp.min(temp); |  | ||||||
|         self.count += 1; |  | ||||||
|         self.temp_sum += temp; |  | ||||||
|     } |  | ||||||
|     fn update_from_station(&mut self, src: &mut Self) { |  | ||||||
|         self.max_temp = self.max_temp.max(src.max_temp); |  | ||||||
|         self.min_temp = self.min_temp.min(src.min_temp); |  | ||||||
|         self.temp_sum += src.temp_sum; |  | ||||||
|         self.count += src.count; |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     #[inline] |  | ||||||
|     fn parse_temp(bytes: &[u8]) -> i32 { |  | ||||||
|         let mut result: i32 = 0; |  | ||||||
|         let mut negative: bool = false; |  | ||||||
|         for &b in bytes { |  | ||||||
|             match b { |  | ||||||
|                 b'0'..=b'9' => { |  | ||||||
|                     result = result * 10 + (b as i32 - b'0' as i32); |  | ||||||
|                 } |  | ||||||
|                 b'.' => {} |  | ||||||
|                 b'-' => { |  | ||||||
|                     negative = true; |  | ||||||
|                 } |  | ||||||
|                 _ => panic!("wrong format for temperature"), |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|         if negative { |  | ||||||
|             return -result; |  | ||||||
|         } |  | ||||||
|         result |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     #[inline] |  | ||||||
|     fn parse_data(line: &[u8]) -> (&[u8], i32) { |  | ||||||
|         let semicolon_pos = line.iter().position(|&b| b == b';').unwrap(); |  | ||||||
|         let name = &line[..semicolon_pos]; |  | ||||||
|         let temp = &line[semicolon_pos + 1..]; |  | ||||||
|         (name, Self::parse_temp(temp)) |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| fn merge_hashmaps<'a>( |  | ||||||
|     mut dest: HashMap<&'a [u8], StationData>, |  | ||||||
|     src: HashMap<&'a [u8], StationData>, |  | ||||||
| ) -> HashMap<&'a [u8], StationData> { |  | ||||||
|     for (k, mut v) in src { |  | ||||||
|         dest.entry(k) |  | ||||||
|             .and_modify(|e| e.update_from_station(&mut v)) |  | ||||||
|             .or_insert(v); |  | ||||||
|     } |  | ||||||
|     dest |  | ||||||
| } |  | ||||||
|  |  | ||||||
| /// Parses a chunk of the input as StationData values. |  | ||||||
| fn process_chunk(current_chunk_slice: &[u8]) -> HashMap<&[u8], StationData> { |  | ||||||
|     let mut station_map: HashMap<&[u8], StationData> = HashMap::with_capacity(MAX_STATIONS); |  | ||||||
|     let mut start = 0; |  | ||||||
|     while let Some(end) = current_chunk_slice[start..] |  | ||||||
|         .iter() |  | ||||||
|         .position(|&b| b == b'\n') |  | ||||||
|     { |  | ||||||
|         let line = ¤t_chunk_slice[start..start + end]; |  | ||||||
|         let (name, temp) = StationData::parse_data(line); |  | ||||||
|         station_map |  | ||||||
|             .entry(name) |  | ||||||
|             .and_modify(|e| e.update_from(temp)) |  | ||||||
|             .or_insert(StationData::new(temp)); |  | ||||||
|         start += end + 1; // move to the start of the next line |  | ||||||
|     } |  | ||||||
|     // If we don't find a \n, process the remaining data |  | ||||||
|     if start < current_chunk_slice.len() { |  | ||||||
|         let line = ¤t_chunk_slice[start..]; |  | ||||||
|         let (name, temp) = StationData::parse_data(line); |  | ||||||
|         station_map |  | ||||||
|             .entry(name) |  | ||||||
|             .and_modify(|e| e.update_from(temp)) |  | ||||||
|             .or_insert(StationData::new(temp)); |  | ||||||
|     } |  | ||||||
|     station_map |  | ||||||
| } |  | ||||||
|  |  | ||||||
| fn process_mmap<'scope, 'env>( |  | ||||||
|     mmap: Mmap<'env>, |  | ||||||
|     s: &'scope Scope<'scope, 'env>, |  | ||||||
| ) -> HashMap<&'env [u8], StationData> { |  | ||||||
|     let mut handlers: Vec<ScopedJoinHandle<HashMap<&[u8], StationData>>> = Vec::new(); |  | ||||||
|  |  | ||||||
|     for chunk in MmapChunkIterator::new(mmap, NUM_CONSUMERS) { |  | ||||||
|         let h = s.spawn(move || process_chunk(chunk)); |  | ||||||
|         handlers.push(h); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     let mut station_map: HashMap<&[u8], StationData> = HashMap::with_capacity(MAX_STATIONS); |  | ||||||
|     for h in handlers { |  | ||||||
|         let inner_station = h.join().unwrap(); |  | ||||||
|         station_map = merge_hashmaps(station_map, inner_station); |  | ||||||
|     } |  | ||||||
|     station_map |  | ||||||
| } |  | ||||||
|  |  | ||||||
| fn write_output_to_stdout(station_map: HashMap<&[u8], StationData>) -> io::Result<()> { |  | ||||||
|     let mut stdout = io::stdout().lock(); |  | ||||||
|     let mut buffer = Vec::new(); |  | ||||||
|  |  | ||||||
|     buffer.extend_from_slice(b"{"); |  | ||||||
|  |  | ||||||
|     let mut sorted_key_value_vec: Vec<_> = station_map.iter().collect(); |  | ||||||
|     sorted_key_value_vec.sort_by_key(|e| e.0); |  | ||||||
|  |  | ||||||
|     for (i, (name, data)) in sorted_key_value_vec.iter().enumerate() { |  | ||||||
|         if i > 0 { |  | ||||||
|             buffer.extend_from_slice(b", "); |  | ||||||
|         } |  | ||||||
|         buffer.extend_from_slice(name); |  | ||||||
|         buffer.extend_from_slice(b"="); |  | ||||||
|         buffer.extend(data.to_bytes()); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     buffer.extend_from_slice(b"}"); |  | ||||||
|  |  | ||||||
|     stdout.write_all(&buffer) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| pub fn run() -> io::Result<()> { |  | ||||||
|     // won't accept non-utf-8 args |  | ||||||
|     let args: Vec<String> = env::args().collect(); |  | ||||||
|     let file_name = match args.get(2) { |  | ||||||
|         Some(fname) => fname, |  | ||||||
|         None => "../../../measurements.txt", |  | ||||||
|     }; |  | ||||||
|     let f = File::open(file_name)?; |  | ||||||
|     let mmap = Mmap::from_file(f); |  | ||||||
|  |  | ||||||
|     thread::scope(|s| { |  | ||||||
|         let station_map = process_mmap(mmap, s); |  | ||||||
|         write_output_to_stdout(station_map).unwrap(); |  | ||||||
|     }); |  | ||||||
|  |  | ||||||
|     Ok(()) |  | ||||||
| } |  | ||||||
| @@ -1,133 +0,0 @@ | |||||||
| use bstr::{BStr, ByteSlice}; |  | ||||||
| use memmap2::MmapOptions; |  | ||||||
| use rayon::prelude::*; |  | ||||||
| use rustc_hash::FxHashMap as HashMap; |  | ||||||
| use std::time::Instant; |  | ||||||
| use std::{fmt::Display, fs::File}; |  | ||||||
|  |  | ||||||
| #[derive(Debug)] |  | ||||||
| struct State { |  | ||||||
|     min: f64, |  | ||||||
|     max: f64, |  | ||||||
|     count: u64, |  | ||||||
|     sum: f64, |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl Default for State { |  | ||||||
|     fn default() -> Self { |  | ||||||
|         Self { |  | ||||||
|             min: f64::MAX, |  | ||||||
|             max: f64::MIN, |  | ||||||
|             count: 0, |  | ||||||
|             sum: 0.0, |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl Display for State { |  | ||||||
|     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |  | ||||||
|         let avg = self.sum / (self.count as f64); |  | ||||||
|         write!(f, "{:.1}/{avg:.1}/{:.1}", self.min, self.max) |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl State { |  | ||||||
|     fn update(&mut self, v: f64) { |  | ||||||
|         self.min = self.min.min(v); |  | ||||||
|         self.max = self.max.max(v); |  | ||||||
|         self.count += 1; |  | ||||||
|         self.sum += v; |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     fn merge(&mut self, other: &Self) { |  | ||||||
|         self.min = self.min.min(other.min); |  | ||||||
|         self.max = self.max.max(other.max); |  | ||||||
|         self.count += other.count; |  | ||||||
|         self.sum += other.sum; |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| fn make_map<'a>(i: impl Iterator<Item = &'a [u8]>) -> HashMap<&'a BStr, State> { |  | ||||||
|     let mut state: HashMap<&'a BStr, State> = Default::default(); |  | ||||||
|     for line in i { |  | ||||||
|         let (name, value) = line.split_once_str(b";").unwrap(); |  | ||||||
|         let value = fast_float::parse(value).unwrap(); |  | ||||||
|         state.entry(name.into()).or_default().update(value); |  | ||||||
|     } |  | ||||||
|     state |  | ||||||
| } |  | ||||||
|  |  | ||||||
| fn solve_for_part((start, end): (usize, usize), mem: &[u8]) -> HashMap<&BStr, State> { |  | ||||||
|     make_map((mem[start..end]).lines()) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| fn merge<'a>(a: &mut HashMap<&'a BStr, State>, b: &HashMap<&'a BStr, State>) { |  | ||||||
|     for (k, v) in b { |  | ||||||
|         a.entry(k).or_default().merge(v); |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| pub fn run() { |  | ||||||
|     let now = Instant::now(); |  | ||||||
|     let cores: usize = std::thread::available_parallelism().unwrap().into(); |  | ||||||
|     let path = match std::env::args().nth(1) { |  | ||||||
|         Some(path) => path, |  | ||||||
|         None => "../../../measurements.txt".to_owned(), |  | ||||||
|     }; |  | ||||||
|     let file = File::open(path).unwrap(); |  | ||||||
|     let mmap = unsafe { MmapOptions::new().map(&file).unwrap() }; |  | ||||||
|  |  | ||||||
|     let chunk_size = mmap.len() / cores; |  | ||||||
|     let mut chunks: Vec<(usize, usize)> = vec![]; |  | ||||||
|     let mut start = 0; |  | ||||||
|     for _ in 0..cores { |  | ||||||
|         let end = (start + chunk_size).min(mmap.len()); |  | ||||||
|         let next_new_line = match memchr::memchr(b'\n', &mmap[end..]) { |  | ||||||
|             Some(v) => v, |  | ||||||
|             None => { |  | ||||||
|                 assert_eq!(end, mmap.len()); |  | ||||||
|                 0 |  | ||||||
|             } |  | ||||||
|         }; |  | ||||||
|         let end = end + next_new_line; |  | ||||||
|         chunks.push((start, end)); |  | ||||||
|         start = end + 1; |  | ||||||
|     } |  | ||||||
|     let parts: Vec<_> = chunks |  | ||||||
|         .par_iter() |  | ||||||
|         .map(|r| solve_for_part(*r, &mmap)) |  | ||||||
|         .collect(); |  | ||||||
|  |  | ||||||
|     let state: HashMap<&BStr, State> = parts.into_iter().fold(Default::default(), |mut a, b| { |  | ||||||
|         merge(&mut a, &b); |  | ||||||
|         a |  | ||||||
|     }); |  | ||||||
|  |  | ||||||
|     let mut all: Vec<_> = state.into_iter().collect(); |  | ||||||
|     all.sort_unstable_by(|a, b| a.0.cmp(b.0)); |  | ||||||
|     #[cfg(feature = "json")] |  | ||||||
|     { |  | ||||||
|         print!("["); |  | ||||||
|         for (i, (name, state)) in all.into_iter().enumerate() { |  | ||||||
|             if i == 0 { |  | ||||||
|                 print!("{{\"{name}\":\"{state}\"}}"); |  | ||||||
|             } else { |  | ||||||
|                 print!(", {{\"{name}\":\"{state}\"}}"); |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|         println!("]"); |  | ||||||
|     } |  | ||||||
|     #[cfg(not(feature = "json"))] |  | ||||||
|     { |  | ||||||
|         print!("{{"); |  | ||||||
|         for (i, (name, state)) in all.into_iter().enumerate() { |  | ||||||
|             if i == 0 { |  | ||||||
|                 print!("{name}={state}"); |  | ||||||
|             } else { |  | ||||||
|                 print!(", {name}={state}"); |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|         println!("}}"); |  | ||||||
|     } |  | ||||||
|     println!("\n\nTime={} ms", now.elapsed().as_millis()); |  | ||||||
| } |  | ||||||
| @@ -1,443 +0,0 @@ | |||||||
| use ptr_hash::PtrHashParams; |  | ||||||
| use rustc_hash::FxHashSet; |  | ||||||
| use std::{ |  | ||||||
|     simd::{cmp::SimdPartialEq, Simd}, |  | ||||||
|     vec::Vec, |  | ||||||
| }; |  | ||||||
|  |  | ||||||
| type V = i32; |  | ||||||
|  |  | ||||||
| type PtrHash = ptr_hash::DefaultPtrHash<ptr_hash::hash::FxHash, u64>; |  | ||||||
|  |  | ||||||
| pub struct Phf { |  | ||||||
|     pub ptr_hash: PtrHash, |  | ||||||
|     pub keys: Vec<Vec<u8>>, |  | ||||||
|     pub slots: Vec<Record>, |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl Phf { |  | ||||||
|     fn new(mut keys: Vec<Vec<u8>>) -> Self { |  | ||||||
|         keys.sort(); |  | ||||||
|  |  | ||||||
|         let num_slots = keys.len() * 5 / 2; |  | ||||||
|         let params = ptr_hash::PtrHashParams { |  | ||||||
|             alpha: 0.9, |  | ||||||
|             c: 1.5, |  | ||||||
|             slots_per_part: num_slots, |  | ||||||
|             ..PtrHashParams::default() |  | ||||||
|         }; |  | ||||||
|  |  | ||||||
|         let mut hashes: Vec<u64> = keys.iter().map(|key| hash_name(key)).collect(); |  | ||||||
|         hashes.sort(); |  | ||||||
|         for (x, y) in hashes.iter().zip(hashes.iter().skip(1)) { |  | ||||||
|             assert!(*x != *y, "DUPLICATE HASH"); |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         let ptr_hash = PtrHash::new(&hashes, params); |  | ||||||
|  |  | ||||||
|         let slots = vec![Record::default(); num_slots]; |  | ||||||
|  |  | ||||||
|         Self { |  | ||||||
|             ptr_hash, |  | ||||||
|             keys, |  | ||||||
|             slots, |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|     fn compute_index(&self, hash: u64) -> usize { |  | ||||||
|         self.ptr_hash.index_single_part(&hash) |  | ||||||
|     } |  | ||||||
|     fn get_index_mut(&mut self, idx: usize) -> &mut Record { |  | ||||||
|         &mut self.slots[idx] |  | ||||||
|     } |  | ||||||
|     fn index_hash_mut(&mut self, hash: u64) -> &mut Record { |  | ||||||
|         &mut self.slots[self.ptr_hash.index_single_part(&hash)] |  | ||||||
|     } |  | ||||||
|     pub fn index<'b>(&'b self, key: &[u8]) -> &'b Record { |  | ||||||
|         let hash = hash_name(key); |  | ||||||
|         &self.slots[self.compute_index(hash)] |  | ||||||
|     } |  | ||||||
|     fn index_mut<'b>(&'b mut self, key: &[u8]) -> &'b mut Record { |  | ||||||
|         self.index_hash_mut(hash_name(key)) |  | ||||||
|     } |  | ||||||
|     fn merge(&mut self, r: Self) { |  | ||||||
|         // TODO: If key sets are equal or one is a subset of the other, merge |  | ||||||
|         // smaller into larger. |  | ||||||
|         let mut new_keys = vec![]; |  | ||||||
|         let mut i1 = 0; |  | ||||||
|         let mut i2 = 0; |  | ||||||
|         while i1 < self.keys.len() && i2 < r.keys.len() { |  | ||||||
|             if self.keys[i1] == r.keys[i2] { |  | ||||||
|                 new_keys.push(self.keys[i1].clone()); |  | ||||||
|                 i1 += 1; |  | ||||||
|                 i2 += 1; |  | ||||||
|                 continue; |  | ||||||
|             } |  | ||||||
|             if self.keys[i1] < r.keys[i2] { |  | ||||||
|                 new_keys.push(self.keys[i1].clone()); |  | ||||||
|                 i1 += 1; |  | ||||||
|                 continue; |  | ||||||
|             } |  | ||||||
|             if self.keys[i1] > r.keys[i2] { |  | ||||||
|                 new_keys.push(r.keys[i2].clone()); |  | ||||||
|                 i2 += 1; |  | ||||||
|                 continue; |  | ||||||
|             } |  | ||||||
|             panic!(); |  | ||||||
|         } |  | ||||||
|         while i1 < self.keys.len() { |  | ||||||
|             new_keys.push(self.keys[i1].clone()); |  | ||||||
|             i1 += 1; |  | ||||||
|         } |  | ||||||
|         while i2 < r.keys.len() { |  | ||||||
|             new_keys.push(r.keys[i2].clone()); |  | ||||||
|             i2 += 1; |  | ||||||
|         } |  | ||||||
|         let mut new_phf = Self::new(new_keys); |  | ||||||
|         for key in &self.keys { |  | ||||||
|             new_phf.index_mut(key).merge(self.index(key)); |  | ||||||
|         } |  | ||||||
|         for key in &r.keys { |  | ||||||
|             new_phf.index_mut(key).merge(r.index(key)); |  | ||||||
|         } |  | ||||||
|         *self = new_phf; |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[derive(Clone, Debug)] |  | ||||||
| #[repr(align(32))] |  | ||||||
| pub struct Record { |  | ||||||
|     pub count: u64, |  | ||||||
|     // Storing these as two u32 is nice, because they are read as a single u64. |  | ||||||
|     /// Byte representation of string ~b"bc.d" or ~b"\0c.d". |  | ||||||
|     pub min: u32, |  | ||||||
|     /// Byte representation of string b"bc.d" or b"\0c.d". |  | ||||||
|     pub max: u32, |  | ||||||
|     pub sum: u64, |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl Record { |  | ||||||
|     fn default() -> Self { |  | ||||||
|         Self { |  | ||||||
|             count: 0, |  | ||||||
|             min: 0, |  | ||||||
|             max: 0, |  | ||||||
|             sum: 0, |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|     fn add(&mut self, raw_value: u32, value: u64) { |  | ||||||
|         // assert2::debug_assert!(value < 1000); |  | ||||||
|         self.count += 1; |  | ||||||
|         self.sum += value; |  | ||||||
|         // See https://en.algorithmica.org/hpc/algorithms/argmin/ |  | ||||||
|         if raw_value < self.min { |  | ||||||
|             self.min = raw_value; |  | ||||||
|         } |  | ||||||
|         if raw_value > self.max { |  | ||||||
|             self.max = raw_value; |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|     fn merge(&mut self, other: &Self) { |  | ||||||
|         self.count += other.count; |  | ||||||
|         self.sum += other.sum_to_val() as u64; |  | ||||||
|         self.min = self.min.min(other.min); |  | ||||||
|         self.max = self.max.max(other.max); |  | ||||||
|     } |  | ||||||
|     fn sum_to_val(&self) -> V { |  | ||||||
|         let m = (1 << 21) - 1; |  | ||||||
|         ((self.sum & m) + 10 * ((self.sum >> 21) & m) + 100 * ((self.sum >> 42) & m)) as _ |  | ||||||
|     } |  | ||||||
|     /// Return (min, avg, max) |  | ||||||
|     pub fn merge_pos_neg(pos: &Record, neg: &Record) -> (V, V, V) { |  | ||||||
|         let pos_sum = pos.sum as V; |  | ||||||
|         let neg_sum = neg.sum as V; |  | ||||||
|         let sum = pos_sum - neg_sum; |  | ||||||
|         let count = (pos.count + neg.count) as V; |  | ||||||
|         // round to nearest |  | ||||||
|         let avg = (sum + count / 2).div_floor(count); |  | ||||||
|  |  | ||||||
|         let pos_max = raw_to_value(pos.max); |  | ||||||
|         let neg_max = -raw_to_value(neg.min); |  | ||||||
|         let max = pos_max.max(neg_max); |  | ||||||
|  |  | ||||||
|         let pos_min = raw_to_value(pos.min); |  | ||||||
|         let neg_min = -raw_to_value(neg.max); |  | ||||||
|         let min = pos_min.min(neg_min); |  | ||||||
|  |  | ||||||
|         (min, avg, max) |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| /// Reads raw bytes and masks the ; and the b'0'=0x30. |  | ||||||
| /// Returns something of the form 0x0b0c..0d or 0x000c..0d |  | ||||||
| fn parse_to_raw(data: &[u8], start: usize, end: usize) -> u32 { |  | ||||||
|     let raw = u32::from_be_bytes(unsafe { *data.get_unchecked(start..).as_ptr().cast() }); |  | ||||||
|     raw >> (8 * (4 - (end - start))) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| fn raw_to_pdep(raw: u32) -> u64 { |  | ||||||
|     #[cfg(feature = "no_pdep")] |  | ||||||
|     { |  | ||||||
|         let raw = raw as u64; |  | ||||||
|         (raw & 15) | ((raw & (15 << 16)) << (21 - 16)) | ((raw & (15 << 24)) << (42 - 24)) |  | ||||||
|     } |  | ||||||
|     #[cfg(not(feature = "no_pdep"))] |  | ||||||
|     { |  | ||||||
|         let mask = 0x0f0f000f; |  | ||||||
|         let raw = raw & mask; |  | ||||||
|         // input                                     0011bbbb0011cccc........0011dddd |  | ||||||
|         //         0b                  bbbb             xxxxcccc     yyyyyyyyyyyydddd // Deposit here |  | ||||||
|         //         0b                  1111                 1111                 1111 // Mask out trash using & |  | ||||||
|         let pdep = 0b0000000000000000001111000000000000011111111000001111111111111111u64; |  | ||||||
|         unsafe { core::arch::x86_64::_pdep_u64(raw as u64, pdep) } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| fn raw_to_value(v: u32) -> V { |  | ||||||
|     let mask = 0x0f0f000f; |  | ||||||
|     let bytes = (v & mask).to_be_bytes(); |  | ||||||
|     // s = bc.d |  | ||||||
|     let b = bytes[0] as V; |  | ||||||
|     let c = bytes[1] as V; |  | ||||||
|     let d = bytes[3] as V; |  | ||||||
|     b as V * 100 * (bytes[0] != 0) as V + c as V * 10 + d as V |  | ||||||
| } |  | ||||||
|  |  | ||||||
| pub fn format(v: V) -> String { |  | ||||||
|     format!("{:.1}", v as f64 / 10.0) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[allow(unused)] |  | ||||||
| fn hash_name(name: &[u8]) -> u64 { |  | ||||||
|     // Hash the first and last 8 bytes. |  | ||||||
|     // TODO: More robust hash that actually uses all characters. |  | ||||||
|     let head: [u8; 8] = unsafe { *name.get_unchecked(..8).split_first_chunk().unwrap().0 }; |  | ||||||
|     let tail: [u8; 8] = unsafe { |  | ||||||
|         *name |  | ||||||
|             .get_unchecked(name.len().wrapping_sub(8)..) |  | ||||||
|             .split_first_chunk() |  | ||||||
|             .unwrap() |  | ||||||
|             .0 |  | ||||||
|     }; |  | ||||||
|     let shift = 64usize.saturating_sub(8 * name.len()); |  | ||||||
|     let khead = u64::from_ne_bytes(head) << shift; |  | ||||||
|     let ktail = u64::from_ne_bytes(tail) >> shift; |  | ||||||
|     khead.wrapping_add(ktail) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| /// Number of SIMD lanes. AVX2 has 256 bits, so 32 lanes. |  | ||||||
| const L: usize = 32; |  | ||||||
| /// The Simd type. |  | ||||||
| pub type S = Simd<u8, L>; |  | ||||||
|  |  | ||||||
| #[derive(Copy, Clone)] |  | ||||||
| struct State { |  | ||||||
|     start: usize, |  | ||||||
|     sep: usize, |  | ||||||
|     end: usize, |  | ||||||
| } |  | ||||||
|  |  | ||||||
| /// Find the regions between \n and ; (names) and between ; and \n (values), |  | ||||||
| /// and calls `callback` for each line. |  | ||||||
| #[inline(always)] |  | ||||||
| fn iter_lines<'a>( |  | ||||||
|     mut data: &'a [u8], |  | ||||||
|     mut callback: impl FnMut(&'a [u8], State, State, State, State), |  | ||||||
| ) { |  | ||||||
|     // Make sure that the out-of-bounds reads we do are OK. |  | ||||||
|     data = &data[..data.len() - 32]; |  | ||||||
|  |  | ||||||
|     let sep = S::splat(b';'); |  | ||||||
|     let end = S::splat(b'\n'); |  | ||||||
|  |  | ||||||
|     let find = |last: usize, sep: S| { |  | ||||||
|         let simd = S::from_array(unsafe { *data.get_unchecked(last..).as_ptr().cast() }); |  | ||||||
|         let eq = sep.simd_eq(simd).to_bitmask() as u32; |  | ||||||
|         let offset = eq.trailing_zeros() as usize; |  | ||||||
|         last + offset |  | ||||||
|     }; |  | ||||||
|     // Modified to be able to search regions longer than 32. |  | ||||||
|     let find_long = |mut last: usize, sep: S| { |  | ||||||
|         let simd = S::from_array(unsafe { *data.get_unchecked(last..).as_ptr().cast() }); |  | ||||||
|         let mut eq = sep.simd_eq(simd).to_bitmask() as u32; |  | ||||||
|         if eq == 0 { |  | ||||||
|             while eq == 0 { |  | ||||||
|                 last += 32; |  | ||||||
|                 let simd = S::from_array(unsafe { *data.get_unchecked(last..).as_ptr().cast() }); |  | ||||||
|                 eq = sep.simd_eq(simd).to_bitmask() as u32; |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|         let offset = eq.trailing_zeros() as usize; |  | ||||||
|         last + offset |  | ||||||
|     }; |  | ||||||
|  |  | ||||||
|     let init_state = |idx: usize| { |  | ||||||
|         let first_end = find_long(idx, end); |  | ||||||
|         State { |  | ||||||
|             start: first_end + 1, |  | ||||||
|             sep: first_end + 1, |  | ||||||
|             end: 0, |  | ||||||
|         } |  | ||||||
|     }; |  | ||||||
|  |  | ||||||
|     let mut state0 = init_state(0); |  | ||||||
|     let mut state1 = init_state(data.len() / 4); |  | ||||||
|     let mut state2 = init_state(2 * data.len() / 4); |  | ||||||
|     let mut state3 = init_state(3 * data.len() / 4); |  | ||||||
|  |  | ||||||
|     // Duplicate each line for each input state. |  | ||||||
|     macro_rules! step { |  | ||||||
|         [$($s:expr),*] => { |  | ||||||
|             $($s.sep = find_long($s.sep + 1, sep) ;)* |  | ||||||
|                 $($s.end = find($s.sep + 1, end) ;)* |  | ||||||
|                 callback(data, $($s, )*); |  | ||||||
|                 $($s.start = $s.end + 1;)* |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     while state3.start < data.len() { |  | ||||||
|         step!(state0, state1, state2, state3); |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| fn run(data: &[u8], keys: &[Vec<u8>]) -> Phf { |  | ||||||
|     // Each thread has its own accumulator. |  | ||||||
|     let mut h = Phf::new(keys.to_vec()); |  | ||||||
|     iter_lines( |  | ||||||
|         data, |  | ||||||
|         |data, mut s0: State, mut s1: State, mut s2: State, mut s3: State| { |  | ||||||
|             unsafe { |  | ||||||
|                 // If value is negative, extend name by one character. |  | ||||||
|                 s0.sep += (data.get_unchecked(s0.sep + 1) == &b'-') as usize; |  | ||||||
|                 let name0 = data.get_unchecked(s0.start..s0.sep); |  | ||||||
|  |  | ||||||
|                 s1.sep += (data.get_unchecked(s1.sep + 1) == &b'-') as usize; |  | ||||||
|                 let name1 = data.get_unchecked(s1.start..s1.sep); |  | ||||||
|  |  | ||||||
|                 s2.sep += (data.get_unchecked(s2.sep + 1) == &b'-') as usize; |  | ||||||
|                 let name2 = data.get_unchecked(s2.start..s2.sep); |  | ||||||
|  |  | ||||||
|                 s3.sep += (data.get_unchecked(s3.sep + 1) == &b'-') as usize; |  | ||||||
|                 let name3 = data.get_unchecked(s3.start..s3.sep); |  | ||||||
|  |  | ||||||
|                 let raw0 = parse_to_raw(data, s0.sep + 1, s0.end); |  | ||||||
|                 let raw1 = parse_to_raw(data, s1.sep + 1, s1.end); |  | ||||||
|                 let raw2 = parse_to_raw(data, s2.sep + 1, s2.end); |  | ||||||
|                 let raw3 = parse_to_raw(data, s3.sep + 1, s3.end); |  | ||||||
|  |  | ||||||
|                 let h0 = hash_name(name0); |  | ||||||
|                 let h1 = hash_name(name1); |  | ||||||
|                 let h2 = hash_name(name2); |  | ||||||
|                 let h3 = hash_name(name3); |  | ||||||
|  |  | ||||||
|                 let idx0 = h.compute_index(h0); |  | ||||||
|                 let idx1 = h.compute_index(h1); |  | ||||||
|                 let idx2 = h.compute_index(h2); |  | ||||||
|                 let idx3 = h.compute_index(h3); |  | ||||||
|  |  | ||||||
|                 h.get_index_mut(idx0).add(raw0, raw_to_pdep(raw0)); |  | ||||||
|                 h.get_index_mut(idx1).add(raw1, raw_to_pdep(raw1)); |  | ||||||
|                 h.get_index_mut(idx2).add(raw2, raw_to_pdep(raw2)); |  | ||||||
|                 h.get_index_mut(idx3).add(raw3, raw_to_pdep(raw3)); |  | ||||||
|             } |  | ||||||
|         }, |  | ||||||
|     ); |  | ||||||
|     h |  | ||||||
| } |  | ||||||
|  |  | ||||||
| pub fn run_parallel(data: &[u8], keys: &[Vec<u8>], num_threads: usize) -> Phf { |  | ||||||
|     if num_threads == 0 { |  | ||||||
|         return run(data, keys); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     let phf = std::sync::Mutex::new(Phf::new(keys.to_vec())); |  | ||||||
|  |  | ||||||
|     // Spawn one thread per core. |  | ||||||
|     std::thread::scope(|s| { |  | ||||||
|         let chunks = data.chunks(data.len() / num_threads + 1); |  | ||||||
|         for chunk in chunks { |  | ||||||
|             s.spawn(|| { |  | ||||||
|                 // Each thread has its own accumulator. |  | ||||||
|                 let thread_phf = run(chunk, keys); |  | ||||||
|  |  | ||||||
|                 // Merge results. |  | ||||||
|                 phf.lock().unwrap().merge(thread_phf); |  | ||||||
|             }); |  | ||||||
|         } |  | ||||||
|     }); |  | ||||||
|  |  | ||||||
|     phf.into_inner().unwrap() |  | ||||||
| } |  | ||||||
|  |  | ||||||
| pub fn to_str(name: &[u8]) -> &str { |  | ||||||
|     std::str::from_utf8(name).unwrap() |  | ||||||
| } |  | ||||||
|  |  | ||||||
| /// Returns a list of city names found in data. |  | ||||||
| /// Each city is returned twice, once as `<city>` and once as `<city>;`, |  | ||||||
| /// with the latter being used to accumulate negative temperatures. |  | ||||||
| #[inline(never)] |  | ||||||
| pub fn find_city_names(data: &[u8]) -> Vec<Vec<u8>> { |  | ||||||
|     let mut cities = FxHashSet::default(); |  | ||||||
|  |  | ||||||
|     let mut callback = |data: &[u8], state: State| { |  | ||||||
|         let State { start, sep, .. } = state; |  | ||||||
|         let name = unsafe { data.get_unchecked(start..sep) }; |  | ||||||
|         cities.insert(name.to_vec()); |  | ||||||
|  |  | ||||||
|         // Do the same for the name with ; appended. |  | ||||||
|         let name = unsafe { data.get_unchecked(start..sep + 1) }; |  | ||||||
|         cities.insert(name.to_vec()); |  | ||||||
|     }; |  | ||||||
|     iter_lines(data, |d, s0, s1, s2, s3| { |  | ||||||
|         flatten_callback(d, s0, s1, s2, s3, &mut callback) |  | ||||||
|     }); |  | ||||||
|  |  | ||||||
|     let mut cities: Vec<_> = cities.into_iter().collect(); |  | ||||||
|     cities.sort(); |  | ||||||
|     cities |  | ||||||
| } |  | ||||||
|  |  | ||||||
| fn flatten_callback<'a>( |  | ||||||
|     data: &'a [u8], |  | ||||||
|     s0: State, |  | ||||||
|     s1: State, |  | ||||||
|     s2: State, |  | ||||||
|     s3: State, |  | ||||||
|     callback: &mut impl FnMut(&'a [u8], State), |  | ||||||
| ) { |  | ||||||
|     callback(data, s0); |  | ||||||
|     callback(data, s1); |  | ||||||
|     callback(data, s2); |  | ||||||
|     callback(data, s3); |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[derive(clap::Parser)] |  | ||||||
| pub struct Args { |  | ||||||
|     pub input: Option<String>, |  | ||||||
|  |  | ||||||
|     #[clap(short = 'j', long)] |  | ||||||
|     pub threads: Option<usize>, |  | ||||||
|  |  | ||||||
|     #[clap(long)] |  | ||||||
|     pub print: bool, |  | ||||||
|  |  | ||||||
|     #[clap(long)] |  | ||||||
|     pub stats: bool, |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[cfg(test)] |  | ||||||
| mod test { |  | ||||||
|     #[test] |  | ||||||
|     fn parse_raw() { |  | ||||||
|         use super::*; |  | ||||||
|         let d = b"12.3"; |  | ||||||
|         let raw = parse_to_raw(d, 0, 4); |  | ||||||
|         let v = raw_to_value(raw); |  | ||||||
|         assert_eq!(v, 123); |  | ||||||
|  |  | ||||||
|         let d = b"12.3"; |  | ||||||
|         let raw = parse_to_raw(d, 1, 4); |  | ||||||
|         let v = raw_to_value(raw); |  | ||||||
|         assert_eq!(v, 23); |  | ||||||
|     } |  | ||||||
| } |  | ||||||
| @@ -1,54 +0,0 @@ | |||||||
| use crate::models::station_measurements::StationMeasurements; |  | ||||||
| use crate::utils::{hash, parse}; |  | ||||||
| use std::collections::HashMap; |  | ||||||
| use std::fs::File; |  | ||||||
| use std::io::{BufRead, BufReader}; |  | ||||||
| use std::time::Instant; |  | ||||||
|  |  | ||||||
| const DEFAULT_HASHMAP_LENGTH: usize = 10000; |  | ||||||
|  |  | ||||||
| pub fn run() { |  | ||||||
|     let now = Instant::now(); |  | ||||||
|     let mut stations: HashMap<u64, (String, StationMeasurements)> = |  | ||||||
|         HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); |  | ||||||
|  |  | ||||||
|     let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); |  | ||||||
|     let mut reader = BufReader::new(&file); |  | ||||||
|     let mut line = Vec::with_capacity(108); |  | ||||||
|     loop { |  | ||||||
|         let line_len = reader |  | ||||||
|             .read_until(b'\n', &mut line) |  | ||||||
|             .expect("could not read bytes"); |  | ||||||
|         if line_len == 0 { |  | ||||||
|             break; |  | ||||||
|         } |  | ||||||
|         let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); |  | ||||||
|         let hash = hash::bytes(station); |  | ||||||
|         let station = unsafe { std::str::from_utf8_unchecked(station) }; |  | ||||||
|         let temp = parse::temp(temp.split_last().unwrap().1); |  | ||||||
|         let measurements_option = stations.get_mut(&hash); |  | ||||||
|         if let Some((_, measurements)) = measurements_option { |  | ||||||
|             measurements.update(temp); |  | ||||||
|         } else { |  | ||||||
|             let measurements = StationMeasurements { |  | ||||||
|                 min: temp, |  | ||||||
|                 max: temp, |  | ||||||
|                 count: 1, |  | ||||||
|                 sum: temp, |  | ||||||
|             }; |  | ||||||
|             stations.insert(hash, (station.to_string(), measurements)); |  | ||||||
|         } |  | ||||||
|         line.clear(); |  | ||||||
|     } |  | ||||||
|     let mut stations: Vec<String> = stations |  | ||||||
|         .iter() |  | ||||||
|         .map(|(_, (station, measurements))| { |  | ||||||
|             let measurements = measurements.to_string(); |  | ||||||
|             format!("{station}={measurements}") |  | ||||||
|         }) |  | ||||||
|         .collect(); |  | ||||||
|     stations.sort(); |  | ||||||
|     let stations = stations.join(","); |  | ||||||
|     println!("{{{stations}}}"); |  | ||||||
|     println!("Time={} ms", now.elapsed().as_millis()); |  | ||||||
| } |  | ||||||
| @@ -1,60 +0,0 @@ | |||||||
| use smol::fs::File; |  | ||||||
| use smol::io::{AsyncBufReadExt, BufReader}; |  | ||||||
|  |  | ||||||
| use crate::models::station_measurements::StationMeasurements; |  | ||||||
| use crate::utils::{hash, parse}; |  | ||||||
| use std::collections::HashMap; |  | ||||||
| use std::time::Instant; |  | ||||||
|  |  | ||||||
| const DEFAULT_HASHMAP_LENGTH: usize = 10000; |  | ||||||
|  |  | ||||||
| pub fn run() { |  | ||||||
|     let now = Instant::now(); |  | ||||||
|     let mut stations: HashMap<u64, (String, StationMeasurements)> = |  | ||||||
|         HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); |  | ||||||
|  |  | ||||||
|     smol::block_on(async { |  | ||||||
|         let mut file = File::open("../../../measurements.txt") |  | ||||||
|             .await |  | ||||||
|             .expect("Could not read file"); |  | ||||||
|         let mut reader = BufReader::new(&mut file); |  | ||||||
|         let mut line = Vec::with_capacity(108); |  | ||||||
|         loop { |  | ||||||
|             let line_len = reader |  | ||||||
|                 .read_until(b'\n', &mut line) |  | ||||||
|                 .await |  | ||||||
|                 .expect("could not read bytes"); |  | ||||||
|             if line_len == 0 { |  | ||||||
|                 break; |  | ||||||
|             } |  | ||||||
|             let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); |  | ||||||
|             let hash = hash::bytes(station); |  | ||||||
|             let station = unsafe { std::str::from_utf8_unchecked(station) }; |  | ||||||
|             let temp = parse::temp(temp.split_last().unwrap().1); |  | ||||||
|             let measurements_option = stations.get_mut(&hash); |  | ||||||
|             if let Some((_, measurements)) = measurements_option { |  | ||||||
|                 measurements.update(temp); |  | ||||||
|             } else { |  | ||||||
|                 let measurements = StationMeasurements { |  | ||||||
|                     min: temp, |  | ||||||
|                     max: temp, |  | ||||||
|                     count: 1, |  | ||||||
|                     sum: temp, |  | ||||||
|                 }; |  | ||||||
|                 stations.insert(hash, (station.to_string(), measurements)); |  | ||||||
|             } |  | ||||||
|             line.clear(); |  | ||||||
|         } |  | ||||||
|         let mut stations: Vec<String> = stations |  | ||||||
|             .iter() |  | ||||||
|             .map(|(_, (station, measurements))| { |  | ||||||
|                 let measurements = measurements.to_string(); |  | ||||||
|                 format!("{station}={measurements}") |  | ||||||
|             }) |  | ||||||
|             .collect(); |  | ||||||
|         stations.sort(); |  | ||||||
|         let stations = stations.join(","); |  | ||||||
|         println!("{{{stations}}}"); |  | ||||||
|         println!("Time={} ms", now.elapsed().as_millis()); |  | ||||||
|     }) |  | ||||||
| } |  | ||||||
| @@ -1,9 +0,0 @@ | |||||||
| #![feature(slice_as_chunks)] |  | ||||||
| #![feature(portable_simd)] |  | ||||||
| #![feature(slice_split_once)] |  | ||||||
| #![feature(hash_raw_entry)] |  | ||||||
| #![feature(int_roundings)] |  | ||||||
|  |  | ||||||
| pub mod implementations; |  | ||||||
| pub mod models; |  | ||||||
| pub mod utils; |  | ||||||
| @@ -1,7 +0,0 @@ | |||||||
| fn main() { |  | ||||||
|     // let now = Instant::now(); |  | ||||||
|     // let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); |  | ||||||
|     // let reader = BufReader::new(file); |  | ||||||
|     // reader.lines().for_each(|_x| {()}); |  | ||||||
|     // println!("Time={} μs", now.elapsed().as_micros()); |  | ||||||
| } |  | ||||||
| @@ -1,2 +0,0 @@ | |||||||
| pub mod mmap; |  | ||||||
| pub mod station_measurements; |  | ||||||
| @@ -1,145 +0,0 @@ | |||||||
| use std::ops::Deref; |  | ||||||
| use std::os::fd::AsRawFd; |  | ||||||
| use std::ptr::null_mut; |  | ||||||
| use std::{fs::File, os::raw::c_void}; |  | ||||||
|  |  | ||||||
| use libc::{madvise, mmap, munmap, size_t, MADV_WILLNEED, MAP_FAILED, MAP_SHARED, PROT_READ}; |  | ||||||
|  |  | ||||||
| /// Smart pointer type for a mmap. Handles munmap call. |  | ||||||
| pub struct Mmap<'a> { |  | ||||||
|     mmap_slice: &'a [u8], |  | ||||||
| } |  | ||||||
|  |  | ||||||
| /// To properly dispose of the mmap we have to manually call munmap. |  | ||||||
| /// So implementing drop for this smart-pointer type is necessary. |  | ||||||
| impl Drop for Mmap<'_> { |  | ||||||
|     fn drop(&mut self) { |  | ||||||
|         unsafe { |  | ||||||
|             munmap( |  | ||||||
|                 self.mmap_slice.as_ptr() as *mut c_void, |  | ||||||
|                 self.mmap_slice.len(), |  | ||||||
|             ); |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // anti-pattern for non-smart pointer types. |  | ||||||
| // ref: https://rust-unofficial.github.io/patterns/anti_patterns/deref.html |  | ||||||
| impl Deref for Mmap<'_> { |  | ||||||
|     type Target = [u8]; |  | ||||||
|  |  | ||||||
|     fn deref(&self) -> &Self::Target { |  | ||||||
|         self.mmap_slice |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl<'a> Mmap<'a> { |  | ||||||
|     fn new(data: &'a [u8]) -> Self { |  | ||||||
|         Self { mmap_slice: data } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     pub fn from_file(f: File) -> Self { |  | ||||||
|         let size = f.metadata().unwrap().len() as size_t; |  | ||||||
|         let prot = PROT_READ; |  | ||||||
|         let flags = MAP_SHARED; |  | ||||||
|         unsafe { |  | ||||||
|             let m = mmap(null_mut(), size, prot, flags, f.as_raw_fd(), 0); |  | ||||||
|             if m == MAP_FAILED { |  | ||||||
|                 panic!("mmap failed"); |  | ||||||
|             } |  | ||||||
|             // We can advise the kernel on how we intend to use the mmap. |  | ||||||
|             // But this did not improve my read performance in a meaningful way |  | ||||||
|             madvise(m, size, MADV_WILLNEED); |  | ||||||
|             Self::new(std::slice::from_raw_parts(m as *const u8, size)) |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| pub struct MmapChunkIterator<'a> { |  | ||||||
|     data: Mmap<'a>, |  | ||||||
|     chunk_size: usize, |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl<'a> MmapChunkIterator<'a> { |  | ||||||
|     fn with_consumers(mut self, consumers: usize) -> Self { |  | ||||||
|         self.chunk_size = self.data.len() / consumers; |  | ||||||
|         self |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     pub fn new(data: Mmap<'a>, num_consumers: usize) -> Self { |  | ||||||
|         Self { |  | ||||||
|             data, |  | ||||||
|             chunk_size: 1, |  | ||||||
|         } |  | ||||||
|         .with_consumers(num_consumers) |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl<'a> IntoIterator for Mmap<'a> { |  | ||||||
|     type IntoIter = MmapChunkIterator<'a>; |  | ||||||
|     type Item = &'a [u8]; |  | ||||||
|  |  | ||||||
|     fn into_iter(self) -> Self::IntoIter { |  | ||||||
|         MmapChunkIterator { |  | ||||||
|             data: self, |  | ||||||
|             chunk_size: 1, |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl<'a> Iterator for MmapChunkIterator<'a> { |  | ||||||
|     type Item = &'a [u8]; |  | ||||||
|  |  | ||||||
|     fn next(&mut self) -> Option<Self::Item> { |  | ||||||
|         if self.data.is_empty() { |  | ||||||
|             return None; |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         let chunk_end = self.chunk_size.min(self.data.len()); |  | ||||||
|         let chunk = &self.data[..chunk_end]; |  | ||||||
|  |  | ||||||
|         // Find the last newline in the chunk |  | ||||||
|         let split_at = chunk |  | ||||||
|             .iter() |  | ||||||
|             .rposition(|&x| x == b'\n') |  | ||||||
|             .map(|i| i + 1) |  | ||||||
|             .unwrap_or(chunk_end); |  | ||||||
|  |  | ||||||
|         let (result, rest) = self.data.mmap_slice.split_at(split_at); |  | ||||||
|         self.data.mmap_slice = rest; |  | ||||||
|  |  | ||||||
|         Some(result) |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[cfg(test)] |  | ||||||
| mod tests { |  | ||||||
|     use super::*; |  | ||||||
|     use std::fs; |  | ||||||
|     use std::io::Write; |  | ||||||
|     use std::path::Path; |  | ||||||
|  |  | ||||||
|     fn create_test_file(path: &Path, content: &[u8]) { |  | ||||||
|         let mut file = File::create(path).unwrap(); |  | ||||||
|         file.write_all(content).unwrap(); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     fn remove_test_file(path: &Path) { |  | ||||||
|         if path.exists() { |  | ||||||
|             fs::remove_file(path).unwrap(); |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     #[test] |  | ||||||
|     fn test_from_file() { |  | ||||||
|         let test_file_path = Path::new("test_file.txt"); |  | ||||||
|         let test_content = b"Hello, mmap!"; |  | ||||||
|         create_test_file(test_file_path, test_content); |  | ||||||
|         let file = File::open(test_file_path).unwrap(); |  | ||||||
|  |  | ||||||
|         let mmap = Mmap::from_file(file); |  | ||||||
|  |  | ||||||
|         assert_eq!(&*mmap, test_content); |  | ||||||
|         remove_test_file(test_file_path); |  | ||||||
|     } |  | ||||||
| } |  | ||||||
| @@ -1,37 +0,0 @@ | |||||||
| use std::fmt::Display; |  | ||||||
|  |  | ||||||
| #[derive(Copy, Clone)] |  | ||||||
| pub struct StationMeasurements { |  | ||||||
|     pub min: isize, |  | ||||||
|     pub max: isize, |  | ||||||
|     pub count: isize, |  | ||||||
|     pub sum: isize, |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl StationMeasurements { |  | ||||||
|     pub fn update(&mut self, v: isize) { |  | ||||||
|         self.min = self.min.min(v); |  | ||||||
|         self.max = self.max.max(v); |  | ||||||
|         self.count += 1; |  | ||||||
|         self.sum += v; |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     pub fn merge(&mut self, other: &Self) { |  | ||||||
|         self.min = self.min.min(other.min); |  | ||||||
|         self.max = self.max.max(other.max); |  | ||||||
|         self.count += other.count; |  | ||||||
|         self.sum += other.sum; |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl Display for StationMeasurements { |  | ||||||
|     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |  | ||||||
|         let min_pre = self.min / 10; |  | ||||||
|         let min_suf = (self.min % 10).abs(); |  | ||||||
|         let max_pre = self.max / 10; |  | ||||||
|         let max_suf = (self.max % 10).abs(); |  | ||||||
|         // casting here is faster |  | ||||||
|         let avg = (self.sum as f64 / self.count as f64) / 10.0; |  | ||||||
|         write!(f, "{min_pre}.{min_suf}/{avg:.1}/{max_pre}.{max_suf}") |  | ||||||
|     } |  | ||||||
| } |  | ||||||
| @@ -1,15 +0,0 @@ | |||||||
| pub mod byte_pos; |  | ||||||
| pub mod hash; |  | ||||||
| pub mod parse; |  | ||||||
| pub mod write_structured_measurements; |  | ||||||
|  |  | ||||||
| pub fn format_nums(num: usize) -> String { |  | ||||||
|     num.to_string() |  | ||||||
|         .as_bytes() |  | ||||||
|         .rchunks(3) |  | ||||||
|         .rev() |  | ||||||
|         .map(std::str::from_utf8) |  | ||||||
|         .collect::<Result<Vec<&str>, _>>() |  | ||||||
|         .unwrap() |  | ||||||
|         .join("_") |  | ||||||
| } |  | ||||||
| @@ -1,42 +0,0 @@ | |||||||
| #[inline] |  | ||||||
| pub fn get(bytes: &[u8], find: u8) -> Option<u32> { |  | ||||||
|     let chunks = bytes.windows(4); |  | ||||||
|     for (pos, chunk) in chunks.enumerate() { |  | ||||||
|         let inner_pos = get_pos_in_chunk(chunk, find); |  | ||||||
|         if inner_pos < chunk.len() as u32 { |  | ||||||
|             return Some(pos as u32 + inner_pos); |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|     None |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[inline] |  | ||||||
| fn get_pos_in_chunk(byte_chunk: &[u8], find: u8) -> u32 { |  | ||||||
|     let find_hex = u32::from_be_bytes([find; 4]); |  | ||||||
|     let x = u32::from_be_bytes(byte_chunk.try_into().unwrap()) ^ find_hex; |  | ||||||
|     let mask = (x - 0x01010101) & (!x & (0x80808080)); |  | ||||||
|     u32::leading_zeros(mask) >> 3 |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[cfg(test)] |  | ||||||
| mod tests { |  | ||||||
|     use crate::utils::byte_pos::get; |  | ||||||
|  |  | ||||||
|     #[test] |  | ||||||
|     fn test_getpos() { |  | ||||||
|         let semi_bytes = vec![ |  | ||||||
|             0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, b';', 0_u8, 0_u8, |  | ||||||
|         ]; |  | ||||||
|         let semi_bytes = semi_bytes.as_slice(); |  | ||||||
|         let pos = get(semi_bytes, b';').unwrap(); |  | ||||||
|         assert_eq!(pos, 8); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     #[test] |  | ||||||
|     fn test_getpos_empty() { |  | ||||||
|         let semi_bytes = vec![0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8]; |  | ||||||
|         let semi_bytes = semi_bytes.as_slice(); |  | ||||||
|         let pos = get(semi_bytes, b';'); |  | ||||||
|         assert_eq!(pos, None); |  | ||||||
|     } |  | ||||||
| } |  | ||||||
| @@ -1,40 +0,0 @@ | |||||||
| #[inline] |  | ||||||
| pub fn bytes(bytes: &[u8]) -> u64 { |  | ||||||
|     if cfg!(not(debug_assertions)) { |  | ||||||
|         // inspired by https://curiouscoding.nl/posts/1brc/ |  | ||||||
|         let head: [u8; 8] = unsafe { bytes.get_unchecked(..8).as_chunks::<8>().0[0] }; |  | ||||||
|         let tail: [u8; 8] = unsafe { bytes.get_unchecked(bytes.len() - 8..).as_chunks::<8>().0[0] }; |  | ||||||
|         let shift = 64usize.saturating_sub(8 * bytes.len()); |  | ||||||
|         let khead = u64::from_ne_bytes(head) << shift; |  | ||||||
|         let ktail = u64::from_ne_bytes(tail) >> shift; |  | ||||||
|         khead + ktail |  | ||||||
|     } else { |  | ||||||
|         // debug friendly but slow |  | ||||||
|         let mut head = [0u8; 8]; |  | ||||||
|         let mut tail = [0u8; 8]; |  | ||||||
|         let end = bytes.len().min(8); |  | ||||||
|         let start = bytes.len().saturating_sub(8); |  | ||||||
|         head[..end].copy_from_slice(&bytes[..end]); |  | ||||||
|         tail[..end].copy_from_slice(&bytes[start..]); |  | ||||||
|         let shift = 64usize.saturating_sub(8 * bytes.len()); |  | ||||||
|         let khead = u64::from_ne_bytes(head) << shift; |  | ||||||
|         let ktail = u64::from_ne_bytes(tail) >> shift; |  | ||||||
|         khead.wrapping_add(ktail) |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[cfg(test)] |  | ||||||
| mod tests { |  | ||||||
|     use crate::utils::hash; |  | ||||||
|  |  | ||||||
|     #[test] |  | ||||||
|     fn test_hashstr() { |  | ||||||
|         let hash_1 = hash::bytes(b"abcdefghijk"); |  | ||||||
|         let hash_2 = hash::bytes(b"kjihgfedcba"); |  | ||||||
|         let hash_3 = hash::bytes(b"abba"); |  | ||||||
|         let hash_4 = hash::bytes(b"baab"); |  | ||||||
|  |  | ||||||
|         assert_ne!(hash_1, hash_2); |  | ||||||
|         assert_ne!(hash_3, hash_4); |  | ||||||
|     } |  | ||||||
| } |  | ||||||
| @@ -1,105 +0,0 @@ | |||||||
| use std::simd::num::SimdUint; |  | ||||||
| use std::simd::{u8x4, Simd}; |  | ||||||
|  |  | ||||||
| #[inline] |  | ||||||
| pub const fn get_digit(b: u8) -> isize { |  | ||||||
|     // wrapping_sub('0' as u32) same as - 48 but less magical |  | ||||||
|     (b as isize).wrapping_sub('0' as isize) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[inline] |  | ||||||
| pub fn temp(bytes: &[u8]) -> isize { |  | ||||||
|     let is_negative = bytes[0] == b'-'; |  | ||||||
|     let as_decimal = match (is_negative, bytes.len()) { |  | ||||||
|         (true, 4) => get_digit(bytes[1]) * 10 + get_digit(bytes[3]), |  | ||||||
|         (true, 5) => get_digit(bytes[1]) * 100 + get_digit(bytes[2]) * 10 + get_digit(bytes[4]), |  | ||||||
|         (false, 3) => get_digit(bytes[0]) * 10 + get_digit(bytes[2]), |  | ||||||
|         (false, 4) => get_digit(bytes[0]) * 100 + get_digit(bytes[1]) * 10 + get_digit(bytes[3]), |  | ||||||
|         _x => panic!( |  | ||||||
|             "could not parse temp: is_negative = {is_negative}, length = {}", |  | ||||||
|             bytes.len() |  | ||||||
|         ), |  | ||||||
|     }; |  | ||||||
|     if is_negative { |  | ||||||
|         -as_decimal |  | ||||||
|     } else { |  | ||||||
|         as_decimal |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // not really faster |  | ||||||
| #[inline] |  | ||||||
| pub fn temp_new(bytes: &[u8]) -> isize { |  | ||||||
|     let (characteristic, mantissa) = unsafe { bytes.split_at_unchecked(bytes.len() - 2) }; |  | ||||||
|     let mut sum = (mantissa[1] as u32).wrapping_sub('0' as u32); |  | ||||||
|     let mut position = 10; |  | ||||||
|     let mut idx = characteristic.len(); |  | ||||||
|     while idx > 0 { |  | ||||||
|         idx -= 1; |  | ||||||
|         if characteristic[idx] == b'-' { |  | ||||||
|             return -(sum as isize); |  | ||||||
|         } |  | ||||||
|         if characteristic[idx] == 0u8 { |  | ||||||
|             return sum as isize; |  | ||||||
|         } |  | ||||||
|         sum += (characteristic[idx] as u32).wrapping_sub('0' as u32) * position; |  | ||||||
|         position *= 10; |  | ||||||
|     } |  | ||||||
|     sum as isize |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // slower |  | ||||||
| #[inline] |  | ||||||
| pub fn temp_simd(bytes: &[u8]) -> isize { |  | ||||||
|     let is_negative = bytes[0] == b'-'; |  | ||||||
|     let bytes = if is_negative { &bytes[1..] } else { bytes }; |  | ||||||
|     let len = bytes.len(); |  | ||||||
|     let idxs = Simd::from_array([5, len.wrapping_sub(4), len - 3, len - 1]); |  | ||||||
|     let zeroes = u8x4::splat(b'0'); |  | ||||||
|     let temp_simd = Simd::gather_or(bytes, idxs, zeroes); |  | ||||||
|     let subbed = temp_simd - zeroes; |  | ||||||
|     let subbed: Simd<isize, 4> = subbed.cast(); |  | ||||||
|     let mul = Simd::from_array([0, 100, 10, 1]); |  | ||||||
|     let mulled = subbed * mul; |  | ||||||
|     let sum: isize = mulled.to_array().iter().sum(); |  | ||||||
|     if is_negative { |  | ||||||
|         -sum |  | ||||||
|     } else { |  | ||||||
|         sum |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[cfg(test)] |  | ||||||
| mod tests { |  | ||||||
|     use crate::utils::parse::temp_new; |  | ||||||
|  |  | ||||||
|     #[test] |  | ||||||
|     fn test_temp_new_max() { |  | ||||||
|         let temp_max = temp_new("99.9".as_bytes()); |  | ||||||
|         assert_eq!(temp_max, 999); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     #[test] |  | ||||||
|     fn test_temp_new_min() { |  | ||||||
|         let temp_min = temp_new("-99.9".as_bytes()); |  | ||||||
|         assert_eq!(temp_min, -999); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     #[test] |  | ||||||
|     fn test_temp_new_zero() { |  | ||||||
|         let temp_0 = temp_new("0.0".as_bytes()); |  | ||||||
|         assert_eq!(temp_0, 0); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     #[test] |  | ||||||
|     fn test_temp_new_pos() { |  | ||||||
|         let temp_10 = temp_new("9.9".as_bytes()); |  | ||||||
|         assert_eq!(temp_10, 99); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     #[test] |  | ||||||
|     fn test_temp_new_neg() { |  | ||||||
|         let temp_neg_10 = temp_new("-9.9".as_bytes()); |  | ||||||
|         assert_eq!(temp_neg_10, -99); |  | ||||||
|     } |  | ||||||
| } |  | ||||||
| @@ -1,36 +0,0 @@ | |||||||
| use std::fs::File; |  | ||||||
| use std::io::{BufRead, BufReader, BufWriter, Write}; |  | ||||||
|  |  | ||||||
| pub fn write_structured_measurements() { |  | ||||||
|     let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); |  | ||||||
|     let structured_file = |  | ||||||
|         File::create_new("structured_measurements.txt").expect("Could not create file"); |  | ||||||
|     let mut reader = BufReader::new(&file); |  | ||||||
|     let mut writer = BufWriter::new(&structured_file); |  | ||||||
|     let mut line = Vec::with_capacity(107); |  | ||||||
|     let mut line_num = 0; |  | ||||||
|     loop { |  | ||||||
|         line_num += 1; |  | ||||||
|         let line_len = reader |  | ||||||
|             .read_until(b'\n', &mut line) |  | ||||||
|             .expect("could not read bytes"); |  | ||||||
|         if line_len == 0 { |  | ||||||
|             break; |  | ||||||
|         } |  | ||||||
|         if line_num % 100000 == 0 { |  | ||||||
|             print!("\x1b[0Glines: {line_num}"); |  | ||||||
|         } |  | ||||||
|         let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); |  | ||||||
|         let station_len = station.len(); |  | ||||||
|         let temp_val_start = 107 - temp.len(); |  | ||||||
|         let mut write_line = [0u8; 107]; |  | ||||||
|         write_line[..station_len].clone_from_slice(station); |  | ||||||
|         write_line[100] = b';'; |  | ||||||
|         write_line[temp_val_start..temp_val_start + temp.len()].clone_from_slice(temp); |  | ||||||
|         write_line[106] = b'\n'; |  | ||||||
|         writer |  | ||||||
|             .write_all(write_line.as_slice()) |  | ||||||
|             .expect("Could not write"); |  | ||||||
|         line.clear(); |  | ||||||
|     } |  | ||||||
| } |  | ||||||
		Reference in New Issue
	
	Block a user