Compare commits
	
		
			27 Commits
		
	
	
		
			b1d7ebaaea
			...
			main
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| fdd92dd5f7 | |||
| 45b3014cbb | |||
| 98cd6e930c | |||
| 40a8d6d929 | |||
| 0ea10a3c1b | |||
| 3dbc9c32d1 | |||
| 4d586c809e | |||
| 5bb2363eee | |||
| eb2ed15e33 | |||
| dfcc8562e6 | |||
| 212e595a7e | |||
| 7b8943976f | |||
| aaa11c7b94 | |||
| 07a8e7fc69 | |||
| 0aa9d8be86 | |||
| b1c064a92f | |||
| ea06a600ce | |||
| ac5c45f8d5 | |||
| b8f589096f | |||
| c306083192 | |||
| a45ddd2dc0 | |||
| e832475fc3 | |||
| 608cbb59e5 | |||
| 53ea542f36 | |||
| d246c54cd9 | |||
| 2a89d061a0 | |||
| 7add8793a5 | 
							
								
								
									
										91
									
								
								src/main/julia/main.jl
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										91
									
								
								src/main/julia/main.jl
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,91 @@ | |||||||
|  | using Mmap | ||||||
|  |  | ||||||
|  | mutable struct StationMeasurements | ||||||
|  |     min::Float64 | ||||||
|  |     max::Float64 | ||||||
|  |     sum::Float64 | ||||||
|  |     count::Int64 | ||||||
|  | end | ||||||
|  |  | ||||||
|  | function update(sm, temp::Float64) | ||||||
|  |     if temp < min | ||||||
|  |         sm.min = temp | ||||||
|  |     elseif temp > max | ||||||
|  |         sm.max = temp | ||||||
|  |     end | ||||||
|  |     sm.sum += temp | ||||||
|  |     sm.count += 1 | ||||||
|  | end | ||||||
|  |  | ||||||
|  | function print_measurements(stations::Dict{String,StationMeasurements}) | ||||||
|  |     sorted_keys = sort(collect(keys(stations))) | ||||||
|  |     print("{") | ||||||
|  |     sm_vec = [] | ||||||
|  |     for city in sorted_keys | ||||||
|  |         sm = stations[city] | ||||||
|  |         min = round(sm.min; digits=1) | ||||||
|  |         max = round(sm.max; digits=1) | ||||||
|  |         avg = round((sm.sum / sm.count); digits=1) | ||||||
|  |         push!(sm_vec, "$city=$min/$avg/$max") | ||||||
|  |     end | ||||||
|  |     joined = join(sm_vec, ", ") | ||||||
|  |     print(joined) | ||||||
|  |     print("}") | ||||||
|  | end | ||||||
|  |  | ||||||
|  | function merge(stations_vec::Vector{Dict{String,StationMeasurements}}) | ||||||
|  |     merged = Dict{String,StationMeasurements}() | ||||||
|  |     for stations in stations_vec | ||||||
|  |         for (city, sm) in stations | ||||||
|  |             if haskey(merged, city) | ||||||
|  |                 merged_sm = merged[city] | ||||||
|  |                 sm.min = ifelse(merged_sm.min < sm.min, merged_sm.min, sm.min) | ||||||
|  |                 sm.max = ifelse(merged_sm.max > sm.max, merged_sm.max, sm.max) | ||||||
|  |                 sm.sum += merged_sm.sum | ||||||
|  |                 sm.count += merged_sm.count | ||||||
|  |             else | ||||||
|  |                 merged[city] = sm | ||||||
|  |             end | ||||||
|  |         end | ||||||
|  |     end | ||||||
|  |     merged | ||||||
|  | end | ||||||
|  |  | ||||||
|  | function process_chunk(data, chunk) | ||||||
|  |     stations = Dict{String,StationMeasurements}() | ||||||
|  |     for i in eachindex(chunk) | ||||||
|  |         if i == 1 | ||||||
|  |             continue | ||||||
|  |         end | ||||||
|  |         line = String(data[chunk[i-1]:chunk[i]-1]) | ||||||
|  |         station, temp_str = rsplit(line, ";") | ||||||
|  |         temp = parse(Float32, temp_str) | ||||||
|  |         if haskey(stations, station) | ||||||
|  |             sm = stations[station] | ||||||
|  |             sm.min = ifelse(temp < sm.min, temp, sm.min) | ||||||
|  |             sm.max = ifelse(temp > sm.max, temp, sm.max) | ||||||
|  |             sm.sum += temp | ||||||
|  |             sm.count += 1 | ||||||
|  |         else | ||||||
|  |             stations[station] = StationMeasurements(temp, temp, temp, 1) | ||||||
|  |         end | ||||||
|  |     end | ||||||
|  |     stations | ||||||
|  | end | ||||||
|  |  | ||||||
|  | function main() | ||||||
|  |     open("../../../measurements.txt", "r") do f | ||||||
|  |         sz = Base.stat(f).size | ||||||
|  |         data = mmap(f, Vector{UInt8}, sz) | ||||||
|  |         idxs = findall(isequal(0x0a), data) | ||||||
|  |         idxs_chunks = collect(Iterators.partition(idxs, length(idxs) ÷ Threads.nthreads())) | ||||||
|  |         tasks = map(idxs_chunks) do chunk | ||||||
|  |             Threads.@spawn process_chunk(data, chunk) | ||||||
|  |         end | ||||||
|  |         stations_vec = fetch.(tasks) | ||||||
|  |         stations = merge(stations_vec) | ||||||
|  |         print_measurements(stations) | ||||||
|  |     end | ||||||
|  | end | ||||||
|  |  | ||||||
|  | main() | ||||||
							
								
								
									
										57
									
								
								src/main/lua/main.lua
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										57
									
								
								src/main/lua/main.lua
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,57 @@ | |||||||
|  | local file_path = "../../../measurements.txt" | ||||||
|  |  | ||||||
|  | local file = io.open(file_path, "rb") | ||||||
|  | if not file then | ||||||
|  | 	print("Unable to open file") | ||||||
|  | 	return | ||||||
|  | end | ||||||
|  |  | ||||||
|  | local function split_semi(inputstr) | ||||||
|  | 	local t = {} | ||||||
|  | 	for str in string.gmatch(inputstr, "([^;]+)") do | ||||||
|  | 		table.insert(t, str) | ||||||
|  | 	end | ||||||
|  | 	return t | ||||||
|  | end | ||||||
|  |  | ||||||
|  | local stations = {} | ||||||
|  |  | ||||||
|  | local iterations = 1 | ||||||
|  |  | ||||||
|  | for line in file:lines("*l") do | ||||||
|  | 	if iterations % 1000000 == 0 then | ||||||
|  | 		io.write("\x1b[J\x1b[H") | ||||||
|  | 		io.write(iterations / 10000000) | ||||||
|  | 		io.write("\n") | ||||||
|  | 	end | ||||||
|  | 	local split_line = split_semi(line) | ||||||
|  | 	local station = split_line[1] | ||||||
|  | 	local temp_str = string.gsub(split_line[2], "[ ]", "") | ||||||
|  | 	local temp = tonumber(temp_str) | ||||||
|  | 	if stations[station] == nil then | ||||||
|  | 		stations[station] = { min = temp, max = temp, sum = temp, count = 1 } | ||||||
|  | 	else | ||||||
|  | 		if temp < stations[station].min then | ||||||
|  | 			stations[station].min = temp | ||||||
|  | 		elseif temp > stations[station].max then | ||||||
|  | 			stations[station].max = temp | ||||||
|  | 		end | ||||||
|  | 		stations[station].sum = stations[station].sum + temp | ||||||
|  | 		stations[station].count = stations[station].count + 1 | ||||||
|  | 	end | ||||||
|  | 	iterations = iterations + 1 | ||||||
|  | end | ||||||
|  |  | ||||||
|  | local keys = {} | ||||||
|  | for k in pairs(stations) do table.insert(keys, k) end | ||||||
|  | table.sort(keys) | ||||||
|  |  | ||||||
|  | local fstations = {} | ||||||
|  | io.write("{") | ||||||
|  | for _, station in ipairs(keys) do | ||||||
|  | 	local avg = ((stations[station].sum / 10) / stations[station].count) | ||||||
|  | 	local res_str = string.format("%s=%.1f/%.1f/%.1f", station, stations[station].min, avg, stations[station].max) | ||||||
|  | 	table.insert(fstations, res_str) | ||||||
|  | end | ||||||
|  | io.write(table.concat(fstations, ",")) | ||||||
|  | print("}") | ||||||
							
								
								
									
										1310
									
								
								src/main/rust/Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										1310
									
								
								src/main/rust/Cargo.lock
									
									
									
										generated
									
									
									
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							| @@ -6,22 +6,26 @@ edition = "2021" | |||||||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||||||
|  |  | ||||||
| [dependencies] | [dependencies] | ||||||
| bstr = "1.9.1" | bstr = "1.10.0" | ||||||
| fast-float = "0.2.0" | fast-float = "0.2.0" | ||||||
| memchr = "2.7.4" | memchr = "2.7.4" | ||||||
| memmap = "0.7.0" | memmap2 = "0.9.4" | ||||||
| polars = { version = "0.36.2", features = ["csv", "lazy", "nightly", "streaming"]} |  | ||||||
| rayon = "1.10.0" | rayon = "1.10.0" | ||||||
| rustc-hash = "2.0.0" | rustc-hash = "2.0.0" | ||||||
| libc = "0.2.155" | libc = "0.2.158" | ||||||
| smol = "2.0.0" | smol = "2.0.1" | ||||||
|  | easy-parallel = "3.3.1" | ||||||
|  | clap = { version = "4.5.13", features = ["derive"] } | ||||||
|  | colored = "2.1.0" | ||||||
|  | ptr_hash = { git = "https://github.com/ragnargrootkoerkamp/ptrhash", default-features = false } | ||||||
|  |  | ||||||
| [dev-dependencies] | [dev-dependencies] | ||||||
| criterion = { version = "0.5", features = ["html_reports"] } | criterion = { version = "0.5.1", features = ["html_reports"] } | ||||||
|  |  | ||||||
| [features] | [features] | ||||||
| json = [] | json = [] | ||||||
| unsafe = [] | unsafe = [] | ||||||
|  | no_pdep = [] | ||||||
|  |  | ||||||
| [[bench]] | [[bench]] | ||||||
| name = "reference_impl" | name = "reference_impl" | ||||||
| @@ -36,7 +40,7 @@ name = "multi_threaded" | |||||||
| harness = false | harness = false | ||||||
|  |  | ||||||
| [[bench]] | [[bench]] | ||||||
| name = "polars" | name = "multi_threaded_smol" | ||||||
| harness = false | harness = false | ||||||
|  |  | ||||||
| [[bench]] | [[bench]] | ||||||
| @@ -51,3 +55,8 @@ harness = false | |||||||
| lto = "fat" | lto = "fat" | ||||||
| strip = "symbols" | strip = "symbols" | ||||||
| panic = "abort" | panic = "abort" | ||||||
|  |  | ||||||
|  | [profile.flamegraph] | ||||||
|  | inherits = "release" | ||||||
|  | debug = true | ||||||
|  | strip = "none" | ||||||
|   | |||||||
| @@ -1,8 +1,8 @@ | |||||||
| use criterion::{Criterion, criterion_group, criterion_main}; | use criterion::{criterion_group, criterion_main, Criterion}; | ||||||
| use onebrc::implementations::flare_flo::run; | use onebrc::implementations::flare_flo::run; | ||||||
|  |  | ||||||
| pub fn criterion_benchmark(c: &mut Criterion) { | pub fn criterion_benchmark(c: &mut Criterion) { | ||||||
|     c.bench_function("flareflo", |b| {b.iter(|| run())}); |     c.bench_function("flareflo", |b| b.iter(run)); | ||||||
| } | } | ||||||
|  |  | ||||||
| criterion_group!(benches, criterion_benchmark); | criterion_group!(benches, criterion_benchmark); | ||||||
|   | |||||||
							
								
								
									
										9
									
								
								src/main/rust/benches/libraries.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										9
									
								
								src/main/rust/benches/libraries.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,9 @@ | |||||||
|  | use criterion::{criterion_group, criterion_main, Criterion}; | ||||||
|  | use onebrc::implementations::libraries::run; | ||||||
|  |  | ||||||
|  | pub fn criterion_benchmark(c: &mut Criterion) { | ||||||
|  |     c.bench_function("libraries", |b| b.iter(run)); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | criterion_group!(benches, criterion_benchmark); | ||||||
|  | criterion_main!(benches); | ||||||
| @@ -1,8 +1,8 @@ | |||||||
| use criterion::{Criterion, criterion_group, criterion_main}; | use criterion::{criterion_group, criterion_main, Criterion}; | ||||||
| use onebrc::implementations::multi_threaded::run; | use onebrc::implementations::multi_threaded::run; | ||||||
|  |  | ||||||
| pub fn criterion_benchmark(c: &mut Criterion) { | pub fn criterion_benchmark(c: &mut Criterion) { | ||||||
|     c.bench_function("multithreaded", |b| {b.iter(|| run())}); |     c.bench_function("multithreaded", |b| b.iter(run)); | ||||||
| } | } | ||||||
|  |  | ||||||
| criterion_group!(benches, criterion_benchmark); | criterion_group!(benches, criterion_benchmark); | ||||||
|   | |||||||
							
								
								
									
										9
									
								
								src/main/rust/benches/multi_threaded_smol.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										9
									
								
								src/main/rust/benches/multi_threaded_smol.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,9 @@ | |||||||
|  | use criterion::{criterion_group, criterion_main, Criterion}; | ||||||
|  | use onebrc::implementations::multi_threaded_smol::run; | ||||||
|  |  | ||||||
|  | pub fn criterion_benchmark(c: &mut Criterion) { | ||||||
|  |     c.bench_function("multithreadedsmol", |b| b.iter(run)); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | criterion_group!(benches, criterion_benchmark); | ||||||
|  | criterion_main!(benches); | ||||||
| @@ -1,8 +1,8 @@ | |||||||
| use criterion::{Criterion, criterion_group, criterion_main}; | use criterion::{criterion_group, criterion_main, Criterion}; | ||||||
| use onebrc::implementations::phcs::run; | use onebrc::implementations::phcs::run; | ||||||
|  |  | ||||||
| pub fn criterion_benchmark(c: &mut Criterion) { | pub fn criterion_benchmark(c: &mut Criterion) { | ||||||
|     c.bench_function("phcs", |b| {b.iter(|| run())}); |     c.bench_function("phcs", |b| b.iter(run)); | ||||||
| } | } | ||||||
|  |  | ||||||
| criterion_group!(benches, criterion_benchmark); | criterion_group!(benches, criterion_benchmark); | ||||||
|   | |||||||
| @@ -1,8 +0,0 @@ | |||||||
| use criterion::{Criterion, criterion_group, criterion_main}; |  | ||||||
|  |  | ||||||
| pub fn criterion_benchmark(c: &mut Criterion) { |  | ||||||
|     c.bench_function("polars", |b| {b.iter(|| /*run_polars()*/ ())}); |  | ||||||
| } |  | ||||||
|  |  | ||||||
| criterion_group!(benches, criterion_benchmark); |  | ||||||
| criterion_main!(benches); |  | ||||||
| @@ -1,8 +1,8 @@ | |||||||
| use criterion::{Criterion, criterion_group, criterion_main}; | use criterion::{criterion_group, criterion_main, Criterion}; | ||||||
| use onebrc::implementations::reference_impl::run; | use onebrc::implementations::reference_impl::run; | ||||||
|  |  | ||||||
| pub fn criterion_benchmark(c: &mut Criterion) { | pub fn criterion_benchmark(c: &mut Criterion) { | ||||||
|     c.bench_function("reference", |b| {b.iter(|| run())}); |     c.bench_function("reference", |b| b.iter(run)); | ||||||
| } | } | ||||||
|  |  | ||||||
| criterion_group!(benches, criterion_benchmark); | criterion_group!(benches, criterion_benchmark); | ||||||
|   | |||||||
| @@ -1,8 +1,8 @@ | |||||||
| use criterion::{Criterion, criterion_group, criterion_main}; | use criterion::{criterion_group, criterion_main, Criterion}; | ||||||
| use onebrc::implementations::single_thread::run; | use onebrc::implementations::single_thread::run; | ||||||
|  |  | ||||||
| pub fn criterion_benchmark(c: &mut Criterion) { | pub fn criterion_benchmark(c: &mut Criterion) { | ||||||
|     c.bench_function("singlethread", |b| {b.iter(|| run())}); |     c.bench_function("singlethread", |b| b.iter(run)); | ||||||
| } | } | ||||||
|  |  | ||||||
| criterion_group!(benches, criterion_benchmark); | criterion_group!(benches, criterion_benchmark); | ||||||
|   | |||||||
| @@ -1,4 +0,0 @@ | |||||||
|  |  | ||||||
| fn main() { |  | ||||||
|     // let _ = run_polars(); |  | ||||||
| } |  | ||||||
							
								
								
									
										91
									
								
								src/main/rust/src/bin/rgk.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										91
									
								
								src/main/rust/src/bin/rgk.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,91 @@ | |||||||
|  | use clap::Parser; | ||||||
|  | use colored::Colorize; | ||||||
|  | use memmap2::Mmap; | ||||||
|  | use onebrc::implementations::rgk::{ | ||||||
|  |     find_city_names, format, run_parallel, to_str, Args, Record, S, | ||||||
|  | }; | ||||||
|  | use std::thread::available_parallelism; | ||||||
|  |  | ||||||
|  | fn main() { | ||||||
|  |     let args = Args::parse(); | ||||||
|  |  | ||||||
|  |     let start = std::time::Instant::now(); | ||||||
|  |     let filename = args | ||||||
|  |         .input | ||||||
|  |         .unwrap_or("../../../measurements.txt".to_string()); | ||||||
|  |     let mmap: Mmap; | ||||||
|  |     let data; | ||||||
|  |     { | ||||||
|  |         let file = std::fs::File::open(filename).unwrap(); | ||||||
|  |         mmap = unsafe { Mmap::map(&file).unwrap() }; | ||||||
|  |         data = &*mmap; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     // Guaranteed to be aligned for SIMD. | ||||||
|  |     let offset = unsafe { data.align_to::<S>().0.len() }; | ||||||
|  |     let data = &data[offset..]; | ||||||
|  |  | ||||||
|  |     // Build a perfect hash function on the cities found in the first 100k characters. | ||||||
|  |     let names = find_city_names(&data[..4000000]); | ||||||
|  |  | ||||||
|  |     if args.stats { | ||||||
|  |         eprintln!("Num cities: {}", names.len()); | ||||||
|  |         let mut lens = vec![0; 102]; | ||||||
|  |         for n in &names { | ||||||
|  |             if *n.last().unwrap() == b';' { | ||||||
|  |                 continue; | ||||||
|  |             } | ||||||
|  |             lens[n.len()] += 1; | ||||||
|  |         } | ||||||
|  |         for (len, count) in lens.iter().enumerate() { | ||||||
|  |             if *count != 0 { | ||||||
|  |                 eprintln!("{}: {}", len, count); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     let phf = run_parallel( | ||||||
|  |         data, | ||||||
|  |         &names, | ||||||
|  |         args.threads | ||||||
|  |             .unwrap_or(available_parallelism().unwrap().into()), | ||||||
|  |     ); | ||||||
|  |  | ||||||
|  |     if args.print { | ||||||
|  |         print!("{{"); | ||||||
|  |         let mut first = true; | ||||||
|  |  | ||||||
|  |         let mut keys = phf.keys.clone(); | ||||||
|  |         keys.sort_by(|kl, kr| to_str(kl).cmp(to_str(kr))); | ||||||
|  |  | ||||||
|  |         for name in &keys { | ||||||
|  |             if *name.last().unwrap() != b';' { | ||||||
|  |                 continue; | ||||||
|  |             } | ||||||
|  |             let namepos = &name[..name.len() - 1]; | ||||||
|  |  | ||||||
|  |             let rpos = phf.index(namepos); | ||||||
|  |             let rneg = phf.index(name); | ||||||
|  |             let (min, avg, max) = Record::merge_pos_neg(rpos, rneg); | ||||||
|  |  | ||||||
|  |             if !first { | ||||||
|  |                 print!(", "); | ||||||
|  |             } | ||||||
|  |             first = false; | ||||||
|  |  | ||||||
|  |             print!( | ||||||
|  |                 "{}={}/{}/{}", | ||||||
|  |                 to_str(namepos), | ||||||
|  |                 format(min), | ||||||
|  |                 format(avg), | ||||||
|  |                 format(max) | ||||||
|  |             ); | ||||||
|  |         } | ||||||
|  |         println!("}}"); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     eprintln!( | ||||||
|  |         "total: {}", | ||||||
|  |         format!("{:>5.2?}", start.elapsed()).bold().green() | ||||||
|  |     ); | ||||||
|  | } | ||||||
| @@ -4,7 +4,7 @@ pub mod multi_threaded; | |||||||
| pub mod multi_threaded_smol; | pub mod multi_threaded_smol; | ||||||
| pub mod multi_threaded_structured; | pub mod multi_threaded_structured; | ||||||
| pub mod phcs; | pub mod phcs; | ||||||
| pub mod polars; |  | ||||||
| pub mod reference_impl; | pub mod reference_impl; | ||||||
|  | pub mod rgk; | ||||||
| pub mod single_thread; | pub mod single_thread; | ||||||
| pub mod smol; | pub mod smol; | ||||||
|   | |||||||
| @@ -105,7 +105,7 @@ impl Citymap { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|     pub fn into_key_values(self) -> Vec<(String, City)> { |     pub fn into_key_values(self) -> Vec<(String, City)> { | ||||||
|         self.map.into_iter().map(|(_, s)| s).collect() |         self.map.into_values().collect() | ||||||
|     } |     } | ||||||
|     pub fn merge_with(&mut self, rhs: Self) { |     pub fn merge_with(&mut self, rhs: Self) { | ||||||
|         for (k, v) in rhs.map.into_iter() { |         for (k, v) in rhs.map.into_iter() { | ||||||
| @@ -125,7 +125,7 @@ pub fn run() { | |||||||
|     let start = Instant::now(); |     let start = Instant::now(); | ||||||
|     let input = "../../../measurements.txt"; |     let input = "../../../measurements.txt"; | ||||||
|  |  | ||||||
|     let results = if args.find(|e| e == "st").is_some() { |     let results = if args.any(|e| e == "st") { | ||||||
|         citymap_single_thread(input) |         citymap_single_thread(input) | ||||||
|     } else { |     } else { | ||||||
|         citymap_multi_threaded(input) |         citymap_multi_threaded(input) | ||||||
| @@ -159,7 +159,6 @@ fn citymap_multi_threaded(path: &str) -> Citymap { | |||||||
|         threads.push(citymap_thread(path.to_owned(), range, i, sender.clone())); |         threads.push(citymap_thread(path.to_owned(), range, i, sender.clone())); | ||||||
|     } |     } | ||||||
|     let mut ranges = (0..cpus) |     let mut ranges = (0..cpus) | ||||||
|         .into_iter() |  | ||||||
|         .map(|_| receiver.recv().unwrap()) |         .map(|_| receiver.recv().unwrap()) | ||||||
|         .collect::<Vec<_>>(); |         .collect::<Vec<_>>(); | ||||||
|     ranges.sort_unstable_by_key(|e| e.start); |     ranges.sort_unstable_by_key(|e| e.start); | ||||||
| @@ -171,7 +170,7 @@ fn citymap_multi_threaded(path: &str) -> Citymap { | |||||||
|         }), |         }), | ||||||
|         "Ranges overlap or have gaps: {ranges:?}" |         "Ranges overlap or have gaps: {ranges:?}" | ||||||
|     ); |     ); | ||||||
|     let results = threads |     threads | ||||||
|         .into_iter() |         .into_iter() | ||||||
|         .map(|e| e.join().unwrap()) |         .map(|e| e.join().unwrap()) | ||||||
|         //.map(|e|dbg!(e)) |         //.map(|e|dbg!(e)) | ||||||
| @@ -179,8 +178,7 @@ fn citymap_multi_threaded(path: &str) -> Citymap { | |||||||
|             left.merge_with(right); |             left.merge_with(right); | ||||||
|             left |             left | ||||||
|         }) |         }) | ||||||
|         .unwrap(); |         .unwrap() | ||||||
|     results |  | ||||||
| } | } | ||||||
|  |  | ||||||
| fn citymap_thread( | fn citymap_thread( | ||||||
| @@ -204,7 +202,7 @@ fn citymap_thread( | |||||||
|                     head.truncate(len); |                     head.truncate(len); | ||||||
|  |  | ||||||
|                     for (i, &pos) in head.iter().enumerate() { |                     for (i, &pos) in head.iter().enumerate() { | ||||||
|                         if pos == '\n' as u8 { |                         if pos == b'\n' { | ||||||
|                             range.start += i as u64; |                             range.start += i as u64; | ||||||
|                             break; |                             break; | ||||||
|                         } |                         } | ||||||
| @@ -218,7 +216,7 @@ fn citymap_thread( | |||||||
|                     head.truncate(len); |                     head.truncate(len); | ||||||
|  |  | ||||||
|                     for (i, &pos) in head.iter().enumerate() { |                     for (i, &pos) in head.iter().enumerate() { | ||||||
|                         if pos == '\n' as u8 { |                         if pos == b'\n' { | ||||||
|                             range.end += i as u64; |                             range.end += i as u64; | ||||||
|                             break; |                             break; | ||||||
|                         } |                         } | ||||||
| @@ -249,7 +247,7 @@ fn citymap_naive(input: &mut impl BufRead) -> Citymap { | |||||||
|         } |         } | ||||||
|  |  | ||||||
|         // Skip over just newline strings that get created by the alignment process |         // Skip over just newline strings that get created by the alignment process | ||||||
|         if buf == &[b'\n'] { |         if buf == b"\n" { | ||||||
|             continue; |             continue; | ||||||
|         } |         } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1,65 +1,58 @@ | |||||||
| use std::collections::HashMap; | use crate::models::station_measurements::StationMeasurements; | ||||||
| use std::io::{BufRead, Seek, SeekFrom}; | use crate::utils::{hash, parse}; | ||||||
|  | use memmap2::MmapOptions; | ||||||
|  | use rustc_hash::{FxBuildHasher, FxHashMap as HashMap}; | ||||||
|  | use std::slice::from_raw_parts; | ||||||
| use std::sync::mpsc; | use std::sync::mpsc; | ||||||
| use std::time::Instant; | use std::time::Instant; | ||||||
| use std::{fs::File, io::BufReader, thread}; | use std::{fs::File, thread}; | ||||||
| use memmap::MmapOptions; |  | ||||||
| use crate::models::station_measurements::StationMeasurements; |  | ||||||
| use crate::utils::parse; |  | ||||||
| use crate::utils::parse::hashstr; |  | ||||||
|  |  | ||||||
| const DEFAULT_HASHMAP_LENGTH: usize = 10000; | const DEFAULT_HASHMAP_LENGTH: usize = 10000; | ||||||
|  |  | ||||||
| pub fn run() { | pub fn run() { | ||||||
|     const FILE_PATH: &str = "../../../measurements.txt"; |  | ||||||
|     let now = Instant::now(); |     let now = Instant::now(); | ||||||
|  |     const FILE_PATH: &str = "../../../measurements.txt"; | ||||||
|  |     let file = File::open(FILE_PATH).expect("File measurements.txt not found"); | ||||||
|  |     let mmap = unsafe { MmapOptions::new().map(&file).unwrap() }; | ||||||
|  |     let mmap_ptr = mmap.as_ptr(); | ||||||
|  |     let file_length = mmap.len(); | ||||||
|  |     let hasher = FxBuildHasher; | ||||||
|  |     // Even if I could now just use the byte slice as a key, doing the hash is still faster | ||||||
|  |     let mut stations: HashMap<u64, (&[u8], StationMeasurements)> = | ||||||
|  |         HashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher); | ||||||
|  |     let (tx, rx) = mpsc::channel(); | ||||||
|  |     let cores = thread::available_parallelism().unwrap().into(); | ||||||
|  |     let chunk_length = file_length / cores; | ||||||
|  |     let mut bounds = Vec::with_capacity(cores + 1); | ||||||
|  |     let mut start = 0; | ||||||
|  |     for _ in 0..cores { | ||||||
|  |         let end = (start + chunk_length).min(mmap.len()); | ||||||
|  |         let next_new_line = match memchr::memchr(b'\n', &mmap[end..]) { | ||||||
|  |             Some(v) => v, | ||||||
|  |             None => { | ||||||
|  |                 assert_eq!(end, mmap.len()); | ||||||
|  |                 0 | ||||||
|  |             } | ||||||
|  |         }; | ||||||
|  |         let end = end + next_new_line; | ||||||
|  |         bounds.push((start, end)); | ||||||
|  |         start = end + 1; | ||||||
|  |     } | ||||||
|     thread::scope(|s| { |     thread::scope(|s| { | ||||||
|         let mut stations: HashMap<usize, (String, StationMeasurements)> = |  | ||||||
|             HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); |  | ||||||
|         let (tx, rx) = mpsc::channel(); |  | ||||||
|         let cores = thread::available_parallelism().unwrap().into(); |  | ||||||
|         let file = File::open(FILE_PATH).expect("File measurements.txt not found"); |  | ||||||
|         let mmap = unsafe { MmapOptions::new().map(&file).unwrap() }; |  | ||||||
|         let file_length = mmap.len(); |  | ||||||
|         let chunk_length = file_length / cores; |  | ||||||
|         let mut bounds = Vec::with_capacity(cores + 1); |  | ||||||
|         bounds.push(0); |  | ||||||
|         for i in 1..cores { |  | ||||||
|             let mut reader = BufReader::new(&file); |  | ||||||
|             let mut byte_start = chunk_length * i; |  | ||||||
|             reader |  | ||||||
|                 .seek(SeekFrom::Start(byte_start as u64)) |  | ||||||
|                 .expect("could not seek"); |  | ||||||
|             let mut line = Vec::with_capacity(108); |  | ||||||
|             let line_len = reader |  | ||||||
|                 .read_until(b'\n', &mut line) |  | ||||||
|                 .expect("could not read bytes"); |  | ||||||
|             byte_start += line_len; |  | ||||||
|             bounds.push(byte_start); |  | ||||||
|         } |  | ||||||
|         bounds.push(file_length); |  | ||||||
|         for i in 0..cores { |         for i in 0..cores { | ||||||
|             let tx = tx.clone(); |             let tx = tx.clone(); | ||||||
|             let mut currposition = *bounds.get(i).unwrap(); |             let (start, end) = *bounds.get(i).unwrap(); | ||||||
|             let end = *bounds.get(i + 1).unwrap(); |             let mmap_slice = unsafe { from_raw_parts(mmap_ptr.add(start), end - start) }; | ||||||
|             s.spawn(move || { |             s.spawn(move || { | ||||||
|                 let file = File::open(FILE_PATH).expect("File measurements.txt not found"); |                 let mut t_stations: HashMap<u64, (&[u8], StationMeasurements)> = | ||||||
|                 let mut reader = BufReader::new(&file); |                     HashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher); | ||||||
|                 reader.seek(SeekFrom::Start(currposition as u64)).unwrap(); |                 for line in mmap_slice.split(|&byte| byte == b'\n') { | ||||||
|                 let mut t_stations: HashMap<usize, (String, StationMeasurements)> = |                     if line.is_empty() { | ||||||
|                     HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); |  | ||||||
|                 let mut line = Vec::with_capacity(108); |  | ||||||
|                 loop { |  | ||||||
|                     let line_len = reader |  | ||||||
|                         .read_until(b'\n', &mut line) |  | ||||||
|                         .expect("could not read bytes"); |  | ||||||
|                     if line_len == 0 { |  | ||||||
|                         break; |                         break; | ||||||
|                     } |                     } | ||||||
|                     let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); |                     let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); | ||||||
|                     let hash = hashstr(station); |                     let hash = hash::bytes(station); | ||||||
|                     let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) }; |                     let temp = parse::temp(temp); | ||||||
|                     let temp = parse::temp(temp.split_last().unwrap().1); |  | ||||||
|                     let measurements_option = t_stations.get_mut(&hash); |                     let measurements_option = t_stations.get_mut(&hash); | ||||||
|                     if let Some((_, measurements)) = measurements_option { |                     if let Some((_, measurements)) = measurements_option { | ||||||
|                         measurements.update(temp); |                         measurements.update(temp); | ||||||
| @@ -72,29 +65,25 @@ pub fn run() { | |||||||
|                         }; |                         }; | ||||||
|                         t_stations.insert(hash, (station, measurements)); |                         t_stations.insert(hash, (station, measurements)); | ||||||
|                     } |                     } | ||||||
|                     currposition += line_len; |  | ||||||
|                     if currposition >= end { |  | ||||||
|                         break; |  | ||||||
|                     } |  | ||||||
|                     line.clear(); |  | ||||||
|                 } |                 } | ||||||
|                 let _ = tx.send(t_stations); |                 let _ = tx.send(t_stations); | ||||||
|             }); |             }); | ||||||
|         } |         } | ||||||
|         drop(tx); |         drop(tx); | ||||||
|         while let Ok(t_stations) = rx.recv() { |         while let Ok(t_stations) = rx.recv() { | ||||||
|             for (&hash, (station, measurements)) in t_stations.iter() { |             for (hash, (station, measurements)) in t_stations.iter() { | ||||||
|                 let joined_measurements_options = stations.get_mut(&hash); |                 let joined_measurements_options = stations.get_mut(hash); | ||||||
|                 if let Some((_, joined_measurements)) = joined_measurements_options { |                 if let Some((_, joined_measurements)) = joined_measurements_options { | ||||||
|                     joined_measurements.merge(measurements); |                     joined_measurements.merge(measurements); | ||||||
|                 } else { |                 } else { | ||||||
|                     stations.insert(hash, (station.to_owned(), *measurements)); |                     stations.insert(*hash, (station, *measurements)); | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|         let mut stations: Vec<String> = stations |         let mut stations: Vec<String> = stations | ||||||
|             .iter() |             .iter() | ||||||
|             .map(|(_, (station, measurements))| { |             .map(|(_, (station, measurements))| { | ||||||
|  |                 let station = unsafe { std::str::from_utf8_unchecked(station) }; | ||||||
|                 let measurements = measurements.to_string(); |                 let measurements = measurements.to_string(); | ||||||
|                 #[cfg(feature = "json")] |                 #[cfg(feature = "json")] | ||||||
|                 { |                 { | ||||||
|   | |||||||
| @@ -1,11 +1,10 @@ | |||||||
|  | use crate::models::station_measurements::StationMeasurements; | ||||||
|  | use crate::utils::{hash, parse}; | ||||||
| use std::collections::HashMap; | use std::collections::HashMap; | ||||||
| use std::io::{BufRead, Seek, SeekFrom}; | use std::io::{BufRead, Seek, SeekFrom}; | ||||||
| use std::sync::mpsc; | use std::sync::mpsc; | ||||||
| use std::time::Instant; | use std::time::Instant; | ||||||
| use std::{fs::File, io::BufReader, thread}; | use std::{fs::File, io::BufReader, thread}; | ||||||
| use crate::models::station_measurements::StationMeasurements; |  | ||||||
| use crate::utils::parse; |  | ||||||
| use crate::utils::parse::hashstr; |  | ||||||
|  |  | ||||||
| const DEFAULT_HASHMAP_LENGTH: usize = 10000; | const DEFAULT_HASHMAP_LENGTH: usize = 10000; | ||||||
|  |  | ||||||
| @@ -13,7 +12,7 @@ pub fn run() { | |||||||
|     const FILE_PATH: &str = "../../../measurements.txt"; |     const FILE_PATH: &str = "../../../measurements.txt"; | ||||||
|     let now = Instant::now(); |     let now = Instant::now(); | ||||||
|     thread::scope(|s| { |     thread::scope(|s| { | ||||||
|         let mut stations: HashMap<usize, (String, StationMeasurements)> = |         let mut stations: HashMap<u64, (String, StationMeasurements)> = | ||||||
|             HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); |             HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); | ||||||
|         let (tx, rx) = mpsc::channel(); |         let (tx, rx) = mpsc::channel(); | ||||||
|         let cores = thread::available_parallelism().unwrap().into(); |         let cores = thread::available_parallelism().unwrap().into(); | ||||||
| @@ -22,30 +21,29 @@ pub fn run() { | |||||||
|         let file_length = reader.seek(SeekFrom::End(0)).unwrap(); |         let file_length = reader.seek(SeekFrom::End(0)).unwrap(); | ||||||
|         let chunk_length = file_length as usize / cores; |         let chunk_length = file_length as usize / cores; | ||||||
|         let mut bounds = Vec::with_capacity(cores + 1); |         let mut bounds = Vec::with_capacity(cores + 1); | ||||||
|         bounds.push(0); |         let mut start = 0; | ||||||
|         for i in 1..cores { |         for i in 0..cores { | ||||||
|             let mut reader = BufReader::new(&file); |             let mut reader = BufReader::new(&file); | ||||||
|             let mut byte_start = chunk_length * i; |             let mut end = chunk_length * i; | ||||||
|             reader |             reader | ||||||
|                 .seek(SeekFrom::Start(byte_start as u64)) |                 .seek(SeekFrom::Start(end as u64)) | ||||||
|                 .expect("could not seek"); |                 .expect("could not seek"); | ||||||
|             let mut line = Vec::with_capacity(108); |             let mut line = Vec::with_capacity(108); | ||||||
|             let line_len = reader |             let line_len = reader | ||||||
|                 .read_until(b'\n', &mut line) |                 .read_until(b'\n', &mut line) | ||||||
|                 .expect("could not read bytes"); |                 .expect("could not read bytes"); | ||||||
|             byte_start += line_len; |             end += line_len; | ||||||
|             bounds.push(byte_start as u64); |             bounds.push((start, end)); | ||||||
|  |             start = end + 1; | ||||||
|         } |         } | ||||||
|         bounds.push(file_length); |  | ||||||
|         for i in 0..cores { |         for i in 0..cores { | ||||||
|             let tx = tx.clone(); |             let tx = tx.clone(); | ||||||
|             let mut currposition = *bounds.get(i).unwrap(); |             let (mut currposition, end) = *bounds.get(i).unwrap(); | ||||||
|             let end = *bounds.get(i + 1).unwrap(); |  | ||||||
|             s.spawn(move || { |             s.spawn(move || { | ||||||
|                 let file = File::open(FILE_PATH).expect("File measurements.txt not found"); |                 let file = File::open(FILE_PATH).expect("File measurements.txt not found"); | ||||||
|                 let mut reader = BufReader::new(&file); |                 let mut reader = BufReader::new(&file); | ||||||
|                 reader.seek(SeekFrom::Start(currposition)).unwrap(); |                 reader.seek(SeekFrom::Start(currposition as u64)).unwrap(); | ||||||
|                 let mut t_stations: HashMap<usize, (String, StationMeasurements)> = |                 let mut t_stations: HashMap<u64, (String, StationMeasurements)> = | ||||||
|                     HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); |                     HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); | ||||||
|                 let mut line = Vec::with_capacity(108); |                 let mut line = Vec::with_capacity(108); | ||||||
|                 loop { |                 loop { | ||||||
| @@ -56,8 +54,8 @@ pub fn run() { | |||||||
|                         break; |                         break; | ||||||
|                     } |                     } | ||||||
|                     let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); |                     let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); | ||||||
|                     let hash = hashstr(station); |                     let hash = hash::bytes(station); | ||||||
|                     let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) }; |                     let station = unsafe { std::str::from_utf8_unchecked(station) }; | ||||||
|                     let temp = parse::temp(temp.split_last().unwrap().1); |                     let temp = parse::temp(temp.split_last().unwrap().1); | ||||||
|                     let measurements_option = t_stations.get_mut(&hash); |                     let measurements_option = t_stations.get_mut(&hash); | ||||||
|                     if let Some((_, measurements)) = measurements_option { |                     if let Some((_, measurements)) = measurements_option { | ||||||
| @@ -69,9 +67,9 @@ pub fn run() { | |||||||
|                             count: 1, |                             count: 1, | ||||||
|                             sum: temp, |                             sum: temp, | ||||||
|                         }; |                         }; | ||||||
|                         t_stations.insert(hash, (station, measurements)); |                         t_stations.insert(hash, (station.to_string(), measurements)); | ||||||
|                     } |                     } | ||||||
|                     currposition += line_len as u64; |                     currposition += line_len; | ||||||
|                     if currposition >= end { |                     if currposition >= end { | ||||||
|                         break; |                         break; | ||||||
|                     } |                     } | ||||||
|   | |||||||
| @@ -1,11 +1,10 @@ | |||||||
| use smol::fs::File; | use smol::fs::File; | ||||||
| use smol::io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom}; | use smol::io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom}; | ||||||
|  | use std::collections::HashMap; | ||||||
|  |  | ||||||
| use crate::models::station_measurements::StationMeasurements; | use crate::models::station_measurements::StationMeasurements; | ||||||
| use crate::utils::parse; | use crate::utils::{hash, parse}; | ||||||
| use crate::utils::parse::hashstr; | use easy_parallel::Parallel; | ||||||
| use std::collections::HashMap; |  | ||||||
| use std::sync::mpsc; |  | ||||||
| use std::thread; | use std::thread; | ||||||
| use std::time::Instant; | use std::time::Instant; | ||||||
|  |  | ||||||
| @@ -14,121 +13,116 @@ const DEFAULT_HASHMAP_LENGTH: usize = 10000; | |||||||
| pub fn run() { | pub fn run() { | ||||||
|     const FILE_PATH: &str = "../../../measurements.txt"; |     const FILE_PATH: &str = "../../../measurements.txt"; | ||||||
|     let now = Instant::now(); |     let now = Instant::now(); | ||||||
|     thread::scope(|s| { |     let mut stations: HashMap<u64, (String, StationMeasurements)> = | ||||||
|         let mut stations: HashMap<usize, (String, StationMeasurements)> = |         HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); | ||||||
|             HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); |     let cores = thread::available_parallelism().unwrap().into(); | ||||||
|         let (tx, rx) = mpsc::channel(); |     let bounds = smol::block_on(async { | ||||||
|         let cores = thread::available_parallelism().unwrap().into(); |         let mut file = File::open(FILE_PATH) | ||||||
|         let bounds = smol::block_on(async { |             .await | ||||||
|             let mut file = File::open(FILE_PATH) |             .expect("File measurements.txt not found"); | ||||||
|                 .await |         let mut reader = BufReader::new(&mut file); | ||||||
|                 .expect("File measurements.txt not found"); |         let file_length = reader.seek(SeekFrom::End(0)).await.unwrap(); | ||||||
|  |         let chunk_length = file_length as usize / cores; | ||||||
|  |         let mut bounds = Vec::with_capacity(cores + 1); | ||||||
|  |         bounds.push(0); | ||||||
|  |         for i in 1..cores { | ||||||
|             let mut reader = BufReader::new(&mut file); |             let mut reader = BufReader::new(&mut file); | ||||||
|             let file_length = reader.seek(SeekFrom::End(0)).await.unwrap(); |             let mut byte_start = chunk_length * i; | ||||||
|             let chunk_length = file_length as usize / cores; |             reader | ||||||
|             let mut bounds = Vec::with_capacity(cores + 1); |                 .seek(SeekFrom::Start(byte_start as u64)) | ||||||
|             bounds.push(0); |                 .await | ||||||
|             for i in 1..cores { |                 .expect("could not seek"); | ||||||
|                 let mut reader = BufReader::new(&mut file); |             let mut line = Vec::with_capacity(108); | ||||||
|                 let mut byte_start = chunk_length * i; |             let line_len = reader | ||||||
|                 reader |                 .read_until(b'\n', &mut line) | ||||||
|                     .seek(SeekFrom::Start(byte_start as u64)) |                 .await | ||||||
|                     .await |                 .expect("could not read bytes"); | ||||||
|                     .expect("could not seek"); |             byte_start += line_len; | ||||||
|                 let mut line = Vec::with_capacity(108); |             bounds.push(byte_start as u64); | ||||||
|                 let line_len = reader |         } | ||||||
|                     .read_until(b'\n', &mut line) |         bounds.push(file_length); | ||||||
|                     .await |         bounds | ||||||
|                     .expect("could not read bytes"); |     }); | ||||||
|                 byte_start += line_len; |     let t_stations_vec = Parallel::new() | ||||||
|                 bounds.push(byte_start as u64); |         .each(0..cores, |i| { | ||||||
|             } |  | ||||||
|             bounds.push(file_length); |  | ||||||
|             bounds |  | ||||||
|         }); |  | ||||||
|         for i in 0..cores { |  | ||||||
|             let tx = tx.clone(); |  | ||||||
|             let mut currposition = *bounds.get(i).unwrap(); |             let mut currposition = *bounds.get(i).unwrap(); | ||||||
|             let end = *bounds.get(i + 1).unwrap(); |             let end = *bounds.get(i + 1).unwrap(); | ||||||
|             s.spawn(move || { |             smol::block_on(async { | ||||||
|                 smol::block_on(async { |                 let mut file = File::open(FILE_PATH) | ||||||
|                     let mut file = File::open(FILE_PATH) |                     .await | ||||||
|  |                     .expect("File measurements.txt not found"); | ||||||
|  |                 let mut reader = BufReader::new(&mut file); | ||||||
|  |                 reader.seek(SeekFrom::Start(currposition)).await.unwrap(); | ||||||
|  |                 let mut t_stations: HashMap<u64, (String, StationMeasurements)> = | ||||||
|  |                     HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); | ||||||
|  |                 let mut line = Vec::with_capacity(108); | ||||||
|  |                 loop { | ||||||
|  |                     let line_len = reader | ||||||
|  |                         .read_until(b'\n', &mut line) | ||||||
|                         .await |                         .await | ||||||
|                         .expect("File measurements.txt not found"); |                         .expect("could not read bytes"); | ||||||
|                     let mut reader = BufReader::new(&mut file); |                     if line_len == 0 { | ||||||
|                     reader.seek(SeekFrom::Start(currposition)).await.unwrap(); |                         break; | ||||||
|                     let mut t_stations: HashMap<usize, (String, StationMeasurements)> = |  | ||||||
|                         HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); |  | ||||||
|                     let mut line = Vec::with_capacity(108); |  | ||||||
|                     loop { |  | ||||||
|                         let line_len = reader |  | ||||||
|                             .read_until(b'\n', &mut line) |  | ||||||
|                             .await |  | ||||||
|                             .expect("could not read bytes"); |  | ||||||
|                         if line_len == 0 { |  | ||||||
|                             break; |  | ||||||
|                         } |  | ||||||
|                         let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); |  | ||||||
|                         let hash = hashstr(station); |  | ||||||
|                         let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) }; |  | ||||||
|                         let temp = parse::temp(temp.split_last().unwrap().1); |  | ||||||
|                         let measurements_option = t_stations.get_mut(&hash); |  | ||||||
|                         if let Some((_, measurements)) = measurements_option { |  | ||||||
|                             measurements.update(temp); |  | ||||||
|                         } else { |  | ||||||
|                             let measurements = StationMeasurements { |  | ||||||
|                                 min: temp, |  | ||||||
|                                 max: temp, |  | ||||||
|                                 count: 1, |  | ||||||
|                                 sum: temp, |  | ||||||
|                             }; |  | ||||||
|                             t_stations.insert(hash, (station, measurements)); |  | ||||||
|                         } |  | ||||||
|                         currposition += line_len as u64; |  | ||||||
|                         if currposition >= end { |  | ||||||
|                             break; |  | ||||||
|                         } |  | ||||||
|                         line.clear(); |  | ||||||
|                     } |                     } | ||||||
|                     let _ = tx.send(t_stations); |                     let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); | ||||||
|                 }) |                     let hash = hash::bytes(station); | ||||||
|             }); |                     let station = unsafe { std::str::from_utf8_unchecked(station) }; | ||||||
|         } |                     let temp = parse::temp(temp.split_last().unwrap().1); | ||||||
|         drop(tx); |                     let measurements_option = t_stations.get_mut(&hash); | ||||||
|         while let Ok(t_stations) = rx.recv() { |                     if let Some((_, measurements)) = measurements_option { | ||||||
|             for (&hash, (station, measurements)) in t_stations.iter() { |                         measurements.update(temp); | ||||||
|                 let joined_measurements_options = stations.get_mut(&hash); |                     } else { | ||||||
|                 if let Some((_, joined_measurements)) = joined_measurements_options { |                         let measurements = StationMeasurements { | ||||||
|                     joined_measurements.merge(measurements); |                             min: temp, | ||||||
|                 } else { |                             max: temp, | ||||||
|                     stations.insert(hash, (station.to_owned(), *measurements)); |                             count: 1, | ||||||
|  |                             sum: temp, | ||||||
|  |                         }; | ||||||
|  |                         t_stations.insert(hash, (station.to_string(), measurements)); | ||||||
|  |                     } | ||||||
|  |                     currposition += line_len as u64; | ||||||
|  |                     if currposition >= end { | ||||||
|  |                         break; | ||||||
|  |                     } | ||||||
|  |                     line.clear(); | ||||||
|                 } |                 } | ||||||
|  |                 t_stations | ||||||
|  |             }) | ||||||
|  |         }) | ||||||
|  |         .run(); | ||||||
|  |     for t_stations in t_stations_vec { | ||||||
|  |         for (hash, (station, measurements)) in t_stations.iter() { | ||||||
|  |             let joined_measurements_options = stations.get_mut(hash); | ||||||
|  |             if let Some((_, joined_measurements)) = joined_measurements_options { | ||||||
|  |                 joined_measurements.merge(measurements); | ||||||
|  |             } else { | ||||||
|  |                 stations.insert(*hash, (station.to_owned(), *measurements)); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|         let mut stations: Vec<String> = stations |     } | ||||||
|             .iter() |     let mut stations: Vec<String> = stations | ||||||
|             .map(|(_, (station, measurements))| { |         .iter() | ||||||
|                 let measurements = measurements.to_string(); |         .map(|(_, (station, measurements))| { | ||||||
|                 #[cfg(feature = "json")] |             let measurements = measurements.to_string(); | ||||||
|                 { |             #[cfg(feature = "json")] | ||||||
|                     format!("{{\"{station}\":\"{measurements}\"}}") |             { | ||||||
|                 } |                 format!("{{\"{station}\":\"{measurements}\"}}") | ||||||
|                 #[cfg(not(feature = "json"))] |             } | ||||||
|                 { |             #[cfg(not(feature = "json"))] | ||||||
|                     format!("{station}={measurements}") |             { | ||||||
|                 } |                 format!("{station}={measurements}") | ||||||
|             }) |             } | ||||||
|             .collect(); |         }) | ||||||
|         stations.sort(); |         .collect(); | ||||||
|         let stations = stations.join(","); |     stations.sort(); | ||||||
|         #[cfg(feature = "json")] |     let stations = stations.join(","); | ||||||
|         { |     #[cfg(feature = "json")] | ||||||
|             println!("\n\n[{stations}]"); |     { | ||||||
|         } |         println!("\n\n[{stations}]"); | ||||||
|         #[cfg(not(feature = "json"))] |     } | ||||||
|         { |     #[cfg(not(feature = "json"))] | ||||||
|             println!("\n\n{{{stations}}}"); |     { | ||||||
|         } |         println!("\n\n{{{stations}}}"); | ||||||
|         println!("\n\nTime={} ms", now.elapsed().as_millis()); |     } | ||||||
|     }); |     println!("\n\nTime={} ms", now.elapsed().as_millis()); | ||||||
| } | } | ||||||
|   | |||||||
| @@ -1,12 +1,11 @@ | |||||||
| use std::collections::HashMap; |  | ||||||
| use std::io::{Read, Seek, SeekFrom}; |  | ||||||
| use std::sync::mpsc; |  | ||||||
| use std::time::Instant; |  | ||||||
| use std::{fs::File, io::BufReader, thread}; |  | ||||||
| use std::ffi::CStr; |  | ||||||
| use crate::models::station_measurements::StationMeasurements; | use crate::models::station_measurements::StationMeasurements; | ||||||
| use crate::utils::parse; | use crate::utils::parse; | ||||||
| use crate::utils::parse::hashstr; | use memmap2::MmapOptions; | ||||||
|  | use rustc_hash::{FxBuildHasher, FxHashMap}; | ||||||
|  | use std::ffi::CStr; | ||||||
|  | use std::sync::mpsc; | ||||||
|  | use std::time::Instant; | ||||||
|  | use std::{fs::File, thread}; | ||||||
|  |  | ||||||
| const DEFAULT_HASHMAP_LENGTH: usize = 10000; | const DEFAULT_HASHMAP_LENGTH: usize = 10000; | ||||||
|  |  | ||||||
| @@ -14,55 +13,59 @@ pub fn run() { | |||||||
|     print!("\x1b[J"); |     print!("\x1b[J"); | ||||||
|     const FILE_PATH: &str = "structured_measurements.txt"; |     const FILE_PATH: &str = "structured_measurements.txt"; | ||||||
|     let now = Instant::now(); |     let now = Instant::now(); | ||||||
|  |     let file = File::open(FILE_PATH).expect("File structured_measurements.txt not found"); | ||||||
|  |     let mmap = unsafe { MmapOptions::new().map(&file).unwrap() }; | ||||||
|  |     let file_length = mmap.len(); | ||||||
|  |     let hasher = FxBuildHasher; | ||||||
|  |     let mut stations: FxHashMap<String, StationMeasurements> = | ||||||
|  |         FxHashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher); | ||||||
|  |     let (tx, rx) = mpsc::channel(); | ||||||
|  |     let cores = thread::available_parallelism().unwrap().into(); | ||||||
|  |     let chunk_length = file_length / cores; | ||||||
|  |     let mut bounds = Vec::with_capacity(cores + 1); | ||||||
|  |     let mut start = 0; | ||||||
|  |     for _ in 0..cores { | ||||||
|  |         let end = (start + chunk_length).min(mmap.len()); | ||||||
|  |         let next_new_line = match memchr::memchr(b'\n', &mmap[end..]) { | ||||||
|  |             Some(v) => v, | ||||||
|  |             None => { | ||||||
|  |                 assert_eq!(end, mmap.len()); | ||||||
|  |                 0 | ||||||
|  |             } | ||||||
|  |         }; | ||||||
|  |         let end = end + next_new_line; | ||||||
|  |         bounds.push((start, end)); | ||||||
|  |         start = end + 1; | ||||||
|  |     } | ||||||
|     thread::scope(|s| { |     thread::scope(|s| { | ||||||
|         let mut stations: HashMap<usize, (String, StationMeasurements)> = |  | ||||||
|             HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); |  | ||||||
|         let (tx, rx) = mpsc::channel(); |  | ||||||
|         let cores = thread::available_parallelism().unwrap().into(); |  | ||||||
|         let file = File::open(FILE_PATH).expect("File structured_measurements.txt not found"); |  | ||||||
|         let mut reader = BufReader::new(&file); |  | ||||||
|         let file_length = reader.seek(SeekFrom::End(0)).unwrap(); |  | ||||||
|         let chunk_length = file_length as usize / cores; |  | ||||||
|         let mut bounds = Vec::with_capacity(cores + 1); |  | ||||||
|         bounds.push(0); |  | ||||||
|         for i in 0..cores { |         for i in 0..cores { | ||||||
|             let tx = tx.clone(); |             let tx = tx.clone(); | ||||||
|             let mut currposition = (i * chunk_length) as u64; |             let (start, end) = *bounds.get(i).unwrap(); | ||||||
|             let end = ((i + 1) * chunk_length) as u64; |             let mmap_slice = &mmap[start..end]; | ||||||
|             s.spawn(move || { |             s.spawn(move || { | ||||||
|                 let file = File::open(FILE_PATH).expect("File measurements.txt not found"); |                 let mut t_stations: FxHashMap<String, StationMeasurements> = | ||||||
|                 let mut reader = BufReader::new(&file); |                     FxHashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher); | ||||||
|                 reader.seek(SeekFrom::Start(currposition)).unwrap(); |                 let lines = mmap_slice.chunks_exact(107); | ||||||
|                 let mut t_stations: HashMap<usize, (String, StationMeasurements)> = |                 for (line_num, line) in lines.enumerate() { | ||||||
|                     HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); |  | ||||||
|                 let mut line = [0u8; 107]; |  | ||||||
|                 let mut line_num = 0; |  | ||||||
|                 loop { |  | ||||||
|                     if line_num % 100000 == 0 { |                     if line_num % 100000 == 0 { | ||||||
|                         print!("\x1b[{i};0Hlines: {line_num}"); |                         print!("\x1b[{i};0Hlines: {line_num}"); | ||||||
|                     } |                     } | ||||||
|                     line_num += 1; |  | ||||||
|                     let read_res = reader |  | ||||||
|                         .read_exact(&mut line); |  | ||||||
|                     match read_res { |  | ||||||
|                         Ok(_) => (), |  | ||||||
|                         Err(e) => match e.kind() { |  | ||||||
|                             std::io::ErrorKind::UnexpectedEof => break, |  | ||||||
|                             _ => panic!("Could not read") |  | ||||||
|                         }, |  | ||||||
|                     }; |  | ||||||
|                     let (station, temp) = unsafe { line.split_at_unchecked(100) }; |                     let (station, temp) = unsafe { line.split_at_unchecked(100) }; | ||||||
|                     let hash = hashstr(station); |  | ||||||
|                     let station = { |                     let station = { | ||||||
|                         if station[station.len() - 1] == 0u8 { |                         if station[station.len() - 1] == 0u8 { | ||||||
|                             unsafe { std::str::from_utf8_unchecked(CStr::from_bytes_until_nul(station).unwrap().to_bytes()) } |                             unsafe { | ||||||
|  |                                 std::str::from_utf8_unchecked( | ||||||
|  |                                     CStr::from_bytes_until_nul(station).unwrap().to_bytes(), | ||||||
|  |                                 ) | ||||||
|  |                             } | ||||||
|                         } else { |                         } else { | ||||||
|                             unsafe { std::str::from_utf8_unchecked(station) } |                             unsafe { std::str::from_utf8_unchecked(station) } | ||||||
|                         } |                         } | ||||||
|                     }; |                     } | ||||||
|  |                     .to_owned(); | ||||||
|                     let temp = parse::temp_new(&temp[1..6]); |                     let temp = parse::temp_new(&temp[1..6]); | ||||||
|                     let measurements_option = t_stations.get_mut(&hash); |                     let measurements_option = t_stations.get_mut(&station); | ||||||
|                     if let Some((_, measurements)) = measurements_option { |                     if let Some(measurements) = measurements_option { | ||||||
|                         measurements.update(temp); |                         measurements.update(temp); | ||||||
|                     } else { |                     } else { | ||||||
|                         let measurements = StationMeasurements { |                         let measurements = StationMeasurements { | ||||||
| @@ -71,11 +74,7 @@ pub fn run() { | |||||||
|                             count: 1, |                             count: 1, | ||||||
|                             sum: temp, |                             sum: temp, | ||||||
|                         }; |                         }; | ||||||
|                         t_stations.insert(hash, (station.to_string(), measurements)); |                         t_stations.insert(station, measurements); | ||||||
|                     } |  | ||||||
|                     currposition += 107; |  | ||||||
|                     if currposition >= end { |  | ||||||
|                         break; |  | ||||||
|                     } |                     } | ||||||
|                 } |                 } | ||||||
|                 let _ = tx.send(t_stations); |                 let _ = tx.send(t_stations); | ||||||
| @@ -83,18 +82,18 @@ pub fn run() { | |||||||
|         } |         } | ||||||
|         drop(tx); |         drop(tx); | ||||||
|         while let Ok(t_stations) = rx.recv() { |         while let Ok(t_stations) = rx.recv() { | ||||||
|             for (&hash, (station, measurements)) in t_stations.iter() { |             for (station, measurements) in t_stations.iter() { | ||||||
|                 let joined_measurements_options = stations.get_mut(&hash); |                 let joined_measurements_options = stations.get_mut(station); | ||||||
|                 if let Some((_, joined_measurements)) = joined_measurements_options { |                 if let Some(joined_measurements) = joined_measurements_options { | ||||||
|                     joined_measurements.merge(measurements); |                     joined_measurements.merge(measurements); | ||||||
|                 } else { |                 } else { | ||||||
|                     stations.insert(hash, (station.to_owned(), *measurements)); |                     stations.insert(station.to_owned(), *measurements); | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|         let mut stations: Vec<String> = stations |         let mut stations: Vec<String> = stations | ||||||
|             .iter() |             .iter() | ||||||
|             .map(|(_, (station, measurements))| { |             .map(|(station, measurements)| { | ||||||
|                 let measurements = measurements.to_string(); |                 let measurements = measurements.to_string(); | ||||||
|                 #[cfg(feature = "json")] |                 #[cfg(feature = "json")] | ||||||
|                 { |                 { | ||||||
|   | |||||||
| @@ -115,7 +115,7 @@ fn merge_hashmaps<'a>( | |||||||
| } | } | ||||||
|  |  | ||||||
| /// Parses a chunk of the input as StationData values. | /// Parses a chunk of the input as StationData values. | ||||||
| fn process_chunk<'a>(current_chunk_slice: &'a [u8]) -> HashMap<&'a [u8], StationData> { | fn process_chunk(current_chunk_slice: &[u8]) -> HashMap<&[u8], StationData> { | ||||||
|     let mut station_map: HashMap<&[u8], StationData> = HashMap::with_capacity(MAX_STATIONS); |     let mut station_map: HashMap<&[u8], StationData> = HashMap::with_capacity(MAX_STATIONS); | ||||||
|     let mut start = 0; |     let mut start = 0; | ||||||
|     while let Some(end) = current_chunk_slice[start..] |     while let Some(end) = current_chunk_slice[start..] | ||||||
| @@ -187,7 +187,7 @@ fn write_output_to_stdout(station_map: HashMap<&[u8], StationData>) -> io::Resul | |||||||
| pub fn run() -> io::Result<()> { | pub fn run() -> io::Result<()> { | ||||||
|     // won't accept non-utf-8 args |     // won't accept non-utf-8 args | ||||||
|     let args: Vec<String> = env::args().collect(); |     let args: Vec<String> = env::args().collect(); | ||||||
|     let file_name = match args.get(2).clone() { |     let file_name = match args.get(2) { | ||||||
|         Some(fname) => fname, |         Some(fname) => fname, | ||||||
|         None => "../../../measurements.txt", |         None => "../../../measurements.txt", | ||||||
|     }; |     }; | ||||||
|   | |||||||
| @@ -1,31 +0,0 @@ | |||||||
| use polars::prelude::*; |  | ||||||
| use std::time::Instant; |  | ||||||
| use std::vec; |  | ||||||
|  |  | ||||||
| pub fn run_polars() -> Result<DataFrame, PolarsError> { |  | ||||||
|     let now = Instant::now(); |  | ||||||
|  |  | ||||||
|     let f1: Field = Field::new("station", DataType::String); |  | ||||||
|     let f2: Field = Field::new("measure", DataType::Float64); |  | ||||||
|     let sc: Schema = Schema::from_iter(vec![f1, f2]); |  | ||||||
|  |  | ||||||
|     let q = LazyCsvReader::new("../../../measurements.txt") |  | ||||||
|         .has_header(false) |  | ||||||
|         .with_schema(Some(Arc::new(sc))) |  | ||||||
|         .with_separator(b';') |  | ||||||
|         .finish()? |  | ||||||
|         .group_by(vec![col("station")]) |  | ||||||
|         .agg(vec![ |  | ||||||
|             col("measure").alias("min").min(), |  | ||||||
|             col("measure").alias("mean").mean(), |  | ||||||
|             col("measure").alias("max").max(), |  | ||||||
|         ]) |  | ||||||
|         .sort("station", Default::default()) |  | ||||||
|         .with_streaming(true); |  | ||||||
|  |  | ||||||
|     let df = q.collect()?; |  | ||||||
|  |  | ||||||
|     println!("Time={} μs", now.elapsed().as_micros()); |  | ||||||
|  |  | ||||||
|     Ok(df) |  | ||||||
| } |  | ||||||
| @@ -1,5 +1,5 @@ | |||||||
| use bstr::{BStr, ByteSlice}; | use bstr::{BStr, ByteSlice}; | ||||||
| use memmap::MmapOptions; | use memmap2::MmapOptions; | ||||||
| use rayon::prelude::*; | use rayon::prelude::*; | ||||||
| use rustc_hash::FxHashMap as HashMap; | use rustc_hash::FxHashMap as HashMap; | ||||||
| use std::time::Instant; | use std::time::Instant; | ||||||
| @@ -50,7 +50,7 @@ impl State { | |||||||
| fn make_map<'a>(i: impl Iterator<Item = &'a [u8]>) -> HashMap<&'a BStr, State> { | fn make_map<'a>(i: impl Iterator<Item = &'a [u8]>) -> HashMap<&'a BStr, State> { | ||||||
|     let mut state: HashMap<&'a BStr, State> = Default::default(); |     let mut state: HashMap<&'a BStr, State> = Default::default(); | ||||||
|     for line in i { |     for line in i { | ||||||
|         let (name, value) = line.split_once_str(&[b';']).unwrap(); |         let (name, value) = line.split_once_str(b";").unwrap(); | ||||||
|         let value = fast_float::parse(value).unwrap(); |         let value = fast_float::parse(value).unwrap(); | ||||||
|         state.entry(name.into()).or_default().update(value); |         state.entry(name.into()).or_default().update(value); | ||||||
|     } |     } | ||||||
| @@ -58,7 +58,7 @@ fn make_map<'a>(i: impl Iterator<Item = &'a [u8]>) -> HashMap<&'a BStr, State> { | |||||||
| } | } | ||||||
|  |  | ||||||
| fn solve_for_part((start, end): (usize, usize), mem: &[u8]) -> HashMap<&BStr, State> { | fn solve_for_part((start, end): (usize, usize), mem: &[u8]) -> HashMap<&BStr, State> { | ||||||
|     make_map((&mem[start..end]).lines()) |     make_map((mem[start..end]).lines()) | ||||||
| } | } | ||||||
|  |  | ||||||
| fn merge<'a>(a: &mut HashMap<&'a BStr, State>, b: &HashMap<&'a BStr, State>) { | fn merge<'a>(a: &mut HashMap<&'a BStr, State>, b: &HashMap<&'a BStr, State>) { | ||||||
| @@ -70,7 +70,7 @@ fn merge<'a>(a: &mut HashMap<&'a BStr, State>, b: &HashMap<&'a BStr, State>) { | |||||||
| pub fn run() { | pub fn run() { | ||||||
|     let now = Instant::now(); |     let now = Instant::now(); | ||||||
|     let cores: usize = std::thread::available_parallelism().unwrap().into(); |     let cores: usize = std::thread::available_parallelism().unwrap().into(); | ||||||
|     let path = match std::env::args().skip(1).next() { |     let path = match std::env::args().nth(1) { | ||||||
|         Some(path) => path, |         Some(path) => path, | ||||||
|         None => "../../../measurements.txt".to_owned(), |         None => "../../../measurements.txt".to_owned(), | ||||||
|     }; |     }; | ||||||
| @@ -104,7 +104,7 @@ pub fn run() { | |||||||
|     }); |     }); | ||||||
|  |  | ||||||
|     let mut all: Vec<_> = state.into_iter().collect(); |     let mut all: Vec<_> = state.into_iter().collect(); | ||||||
|     all.sort_unstable_by(|a, b| a.0.cmp(&b.0)); |     all.sort_unstable_by(|a, b| a.0.cmp(b.0)); | ||||||
|     #[cfg(feature = "json")] |     #[cfg(feature = "json")] | ||||||
|     { |     { | ||||||
|         print!("["); |         print!("["); | ||||||
|   | |||||||
							
								
								
									
										443
									
								
								src/main/rust/src/implementations/rgk.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										443
									
								
								src/main/rust/src/implementations/rgk.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,443 @@ | |||||||
|  | use ptr_hash::PtrHashParams; | ||||||
|  | use rustc_hash::FxHashSet; | ||||||
|  | use std::{ | ||||||
|  |     simd::{cmp::SimdPartialEq, Simd}, | ||||||
|  |     vec::Vec, | ||||||
|  | }; | ||||||
|  |  | ||||||
|  | type V = i32; | ||||||
|  |  | ||||||
|  | type PtrHash = ptr_hash::DefaultPtrHash<ptr_hash::hash::FxHash, u64>; | ||||||
|  |  | ||||||
|  | pub struct Phf { | ||||||
|  |     pub ptr_hash: PtrHash, | ||||||
|  |     pub keys: Vec<Vec<u8>>, | ||||||
|  |     pub slots: Vec<Record>, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl Phf { | ||||||
|  |     fn new(mut keys: Vec<Vec<u8>>) -> Self { | ||||||
|  |         keys.sort(); | ||||||
|  |  | ||||||
|  |         let num_slots = keys.len() * 5 / 2; | ||||||
|  |         let params = ptr_hash::PtrHashParams { | ||||||
|  |             alpha: 0.9, | ||||||
|  |             c: 1.5, | ||||||
|  |             slots_per_part: num_slots, | ||||||
|  |             ..PtrHashParams::default() | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         let mut hashes: Vec<u64> = keys.iter().map(|key| hash_name(key)).collect(); | ||||||
|  |         hashes.sort(); | ||||||
|  |         for (x, y) in hashes.iter().zip(hashes.iter().skip(1)) { | ||||||
|  |             assert!(*x != *y, "DUPLICATE HASH"); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         let ptr_hash = PtrHash::new(&hashes, params); | ||||||
|  |  | ||||||
|  |         let slots = vec![Record::default(); num_slots]; | ||||||
|  |  | ||||||
|  |         Self { | ||||||
|  |             ptr_hash, | ||||||
|  |             keys, | ||||||
|  |             slots, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |     fn compute_index(&self, hash: u64) -> usize { | ||||||
|  |         self.ptr_hash.index_single_part(&hash) | ||||||
|  |     } | ||||||
|  |     fn get_index_mut(&mut self, idx: usize) -> &mut Record { | ||||||
|  |         &mut self.slots[idx] | ||||||
|  |     } | ||||||
|  |     fn index_hash_mut(&mut self, hash: u64) -> &mut Record { | ||||||
|  |         &mut self.slots[self.ptr_hash.index_single_part(&hash)] | ||||||
|  |     } | ||||||
|  |     pub fn index<'b>(&'b self, key: &[u8]) -> &'b Record { | ||||||
|  |         let hash = hash_name(key); | ||||||
|  |         &self.slots[self.compute_index(hash)] | ||||||
|  |     } | ||||||
|  |     fn index_mut<'b>(&'b mut self, key: &[u8]) -> &'b mut Record { | ||||||
|  |         self.index_hash_mut(hash_name(key)) | ||||||
|  |     } | ||||||
|  |     fn merge(&mut self, r: Self) { | ||||||
|  |         // TODO: If key sets are equal or one is a subset of the other, merge | ||||||
|  |         // smaller into larger. | ||||||
|  |         let mut new_keys = vec![]; | ||||||
|  |         let mut i1 = 0; | ||||||
|  |         let mut i2 = 0; | ||||||
|  |         while i1 < self.keys.len() && i2 < r.keys.len() { | ||||||
|  |             if self.keys[i1] == r.keys[i2] { | ||||||
|  |                 new_keys.push(self.keys[i1].clone()); | ||||||
|  |                 i1 += 1; | ||||||
|  |                 i2 += 1; | ||||||
|  |                 continue; | ||||||
|  |             } | ||||||
|  |             if self.keys[i1] < r.keys[i2] { | ||||||
|  |                 new_keys.push(self.keys[i1].clone()); | ||||||
|  |                 i1 += 1; | ||||||
|  |                 continue; | ||||||
|  |             } | ||||||
|  |             if self.keys[i1] > r.keys[i2] { | ||||||
|  |                 new_keys.push(r.keys[i2].clone()); | ||||||
|  |                 i2 += 1; | ||||||
|  |                 continue; | ||||||
|  |             } | ||||||
|  |             panic!(); | ||||||
|  |         } | ||||||
|  |         while i1 < self.keys.len() { | ||||||
|  |             new_keys.push(self.keys[i1].clone()); | ||||||
|  |             i1 += 1; | ||||||
|  |         } | ||||||
|  |         while i2 < r.keys.len() { | ||||||
|  |             new_keys.push(r.keys[i2].clone()); | ||||||
|  |             i2 += 1; | ||||||
|  |         } | ||||||
|  |         let mut new_phf = Self::new(new_keys); | ||||||
|  |         for key in &self.keys { | ||||||
|  |             new_phf.index_mut(key).merge(self.index(key)); | ||||||
|  |         } | ||||||
|  |         for key in &r.keys { | ||||||
|  |             new_phf.index_mut(key).merge(r.index(key)); | ||||||
|  |         } | ||||||
|  |         *self = new_phf; | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[derive(Clone, Debug)] | ||||||
|  | #[repr(align(32))] | ||||||
|  | pub struct Record { | ||||||
|  |     pub count: u64, | ||||||
|  |     // Storing these as two u32 is nice, because they are read as a single u64. | ||||||
|  |     /// Byte representation of string ~b"bc.d" or ~b"\0c.d". | ||||||
|  |     pub min: u32, | ||||||
|  |     /// Byte representation of string b"bc.d" or b"\0c.d". | ||||||
|  |     pub max: u32, | ||||||
|  |     pub sum: u64, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl Record { | ||||||
|  |     fn default() -> Self { | ||||||
|  |         Self { | ||||||
|  |             count: 0, | ||||||
|  |             min: 0, | ||||||
|  |             max: 0, | ||||||
|  |             sum: 0, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |     fn add(&mut self, raw_value: u32, value: u64) { | ||||||
|  |         // assert2::debug_assert!(value < 1000); | ||||||
|  |         self.count += 1; | ||||||
|  |         self.sum += value; | ||||||
|  |         // See https://en.algorithmica.org/hpc/algorithms/argmin/ | ||||||
|  |         if raw_value < self.min { | ||||||
|  |             self.min = raw_value; | ||||||
|  |         } | ||||||
|  |         if raw_value > self.max { | ||||||
|  |             self.max = raw_value; | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |     fn merge(&mut self, other: &Self) { | ||||||
|  |         self.count += other.count; | ||||||
|  |         self.sum += other.sum_to_val() as u64; | ||||||
|  |         self.min = self.min.min(other.min); | ||||||
|  |         self.max = self.max.max(other.max); | ||||||
|  |     } | ||||||
|  |     fn sum_to_val(&self) -> V { | ||||||
|  |         let m = (1 << 21) - 1; | ||||||
|  |         ((self.sum & m) + 10 * ((self.sum >> 21) & m) + 100 * ((self.sum >> 42) & m)) as _ | ||||||
|  |     } | ||||||
|  |     /// Return (min, avg, max) | ||||||
|  |     pub fn merge_pos_neg(pos: &Record, neg: &Record) -> (V, V, V) { | ||||||
|  |         let pos_sum = pos.sum as V; | ||||||
|  |         let neg_sum = neg.sum as V; | ||||||
|  |         let sum = pos_sum - neg_sum; | ||||||
|  |         let count = (pos.count + neg.count) as V; | ||||||
|  |         // round to nearest | ||||||
|  |         let avg = (sum + count / 2).div_floor(count); | ||||||
|  |  | ||||||
|  |         let pos_max = raw_to_value(pos.max); | ||||||
|  |         let neg_max = -raw_to_value(neg.min); | ||||||
|  |         let max = pos_max.max(neg_max); | ||||||
|  |  | ||||||
|  |         let pos_min = raw_to_value(pos.min); | ||||||
|  |         let neg_min = -raw_to_value(neg.max); | ||||||
|  |         let min = pos_min.min(neg_min); | ||||||
|  |  | ||||||
|  |         (min, avg, max) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /// Reads raw bytes and masks the ; and the b'0'=0x30. | ||||||
|  | /// Returns something of the form 0x0b0c..0d or 0x000c..0d | ||||||
|  | fn parse_to_raw(data: &[u8], start: usize, end: usize) -> u32 { | ||||||
|  |     let raw = u32::from_be_bytes(unsafe { *data.get_unchecked(start..).as_ptr().cast() }); | ||||||
|  |     raw >> (8 * (4 - (end - start))) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn raw_to_pdep(raw: u32) -> u64 { | ||||||
|  |     #[cfg(feature = "no_pdep")] | ||||||
|  |     { | ||||||
|  |         let raw = raw as u64; | ||||||
|  |         (raw & 15) | ((raw & (15 << 16)) << (21 - 16)) | ((raw & (15 << 24)) << (42 - 24)) | ||||||
|  |     } | ||||||
|  |     #[cfg(not(feature = "no_pdep"))] | ||||||
|  |     { | ||||||
|  |         let mask = 0x0f0f000f; | ||||||
|  |         let raw = raw & mask; | ||||||
|  |         // input                                     0011bbbb0011cccc........0011dddd | ||||||
|  |         //         0b                  bbbb             xxxxcccc     yyyyyyyyyyyydddd // Deposit here | ||||||
|  |         //         0b                  1111                 1111                 1111 // Mask out trash using & | ||||||
|  |         let pdep = 0b0000000000000000001111000000000000011111111000001111111111111111u64; | ||||||
|  |         unsafe { core::arch::x86_64::_pdep_u64(raw as u64, pdep) } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn raw_to_value(v: u32) -> V { | ||||||
|  |     let mask = 0x0f0f000f; | ||||||
|  |     let bytes = (v & mask).to_be_bytes(); | ||||||
|  |     // s = bc.d | ||||||
|  |     let b = bytes[0] as V; | ||||||
|  |     let c = bytes[1] as V; | ||||||
|  |     let d = bytes[3] as V; | ||||||
|  |     b as V * 100 * (bytes[0] != 0) as V + c as V * 10 + d as V | ||||||
|  | } | ||||||
|  |  | ||||||
|  | pub fn format(v: V) -> String { | ||||||
|  |     format!("{:.1}", v as f64 / 10.0) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[allow(unused)] | ||||||
|  | fn hash_name(name: &[u8]) -> u64 { | ||||||
|  |     // Hash the first and last 8 bytes. | ||||||
|  |     // TODO: More robust hash that actually uses all characters. | ||||||
|  |     let head: [u8; 8] = unsafe { *name.get_unchecked(..8).split_first_chunk().unwrap().0 }; | ||||||
|  |     let tail: [u8; 8] = unsafe { | ||||||
|  |         *name | ||||||
|  |             .get_unchecked(name.len().wrapping_sub(8)..) | ||||||
|  |             .split_first_chunk() | ||||||
|  |             .unwrap() | ||||||
|  |             .0 | ||||||
|  |     }; | ||||||
|  |     let shift = 64usize.saturating_sub(8 * name.len()); | ||||||
|  |     let khead = u64::from_ne_bytes(head) << shift; | ||||||
|  |     let ktail = u64::from_ne_bytes(tail) >> shift; | ||||||
|  |     khead.wrapping_add(ktail) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /// Number of SIMD lanes. AVX2 has 256 bits, so 32 lanes. | ||||||
|  | const L: usize = 32; | ||||||
|  | /// The Simd type. | ||||||
|  | pub type S = Simd<u8, L>; | ||||||
|  |  | ||||||
|  | #[derive(Copy, Clone)] | ||||||
|  | struct State { | ||||||
|  |     start: usize, | ||||||
|  |     sep: usize, | ||||||
|  |     end: usize, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /// Find the regions between \n and ; (names) and between ; and \n (values), | ||||||
|  | /// and calls `callback` for each line. | ||||||
|  | #[inline(always)] | ||||||
|  | fn iter_lines<'a>( | ||||||
|  |     mut data: &'a [u8], | ||||||
|  |     mut callback: impl FnMut(&'a [u8], State, State, State, State), | ||||||
|  | ) { | ||||||
|  |     // Make sure that the out-of-bounds reads we do are OK. | ||||||
|  |     data = &data[..data.len() - 32]; | ||||||
|  |  | ||||||
|  |     let sep = S::splat(b';'); | ||||||
|  |     let end = S::splat(b'\n'); | ||||||
|  |  | ||||||
|  |     let find = |last: usize, sep: S| { | ||||||
|  |         let simd = S::from_array(unsafe { *data.get_unchecked(last..).as_ptr().cast() }); | ||||||
|  |         let eq = sep.simd_eq(simd).to_bitmask() as u32; | ||||||
|  |         let offset = eq.trailing_zeros() as usize; | ||||||
|  |         last + offset | ||||||
|  |     }; | ||||||
|  |     // Modified to be able to search regions longer than 32. | ||||||
|  |     let find_long = |mut last: usize, sep: S| { | ||||||
|  |         let simd = S::from_array(unsafe { *data.get_unchecked(last..).as_ptr().cast() }); | ||||||
|  |         let mut eq = sep.simd_eq(simd).to_bitmask() as u32; | ||||||
|  |         if eq == 0 { | ||||||
|  |             while eq == 0 { | ||||||
|  |                 last += 32; | ||||||
|  |                 let simd = S::from_array(unsafe { *data.get_unchecked(last..).as_ptr().cast() }); | ||||||
|  |                 eq = sep.simd_eq(simd).to_bitmask() as u32; | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |         let offset = eq.trailing_zeros() as usize; | ||||||
|  |         last + offset | ||||||
|  |     }; | ||||||
|  |  | ||||||
|  |     let init_state = |idx: usize| { | ||||||
|  |         let first_end = find_long(idx, end); | ||||||
|  |         State { | ||||||
|  |             start: first_end + 1, | ||||||
|  |             sep: first_end + 1, | ||||||
|  |             end: 0, | ||||||
|  |         } | ||||||
|  |     }; | ||||||
|  |  | ||||||
|  |     let mut state0 = init_state(0); | ||||||
|  |     let mut state1 = init_state(data.len() / 4); | ||||||
|  |     let mut state2 = init_state(2 * data.len() / 4); | ||||||
|  |     let mut state3 = init_state(3 * data.len() / 4); | ||||||
|  |  | ||||||
|  |     // Duplicate each line for each input state. | ||||||
|  |     macro_rules! step { | ||||||
|  |         [$($s:expr),*] => { | ||||||
|  |             $($s.sep = find_long($s.sep + 1, sep) ;)* | ||||||
|  |                 $($s.end = find($s.sep + 1, end) ;)* | ||||||
|  |                 callback(data, $($s, )*); | ||||||
|  |                 $($s.start = $s.end + 1;)* | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     while state3.start < data.len() { | ||||||
|  |         step!(state0, state1, state2, state3); | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn run(data: &[u8], keys: &[Vec<u8>]) -> Phf { | ||||||
|  |     // Each thread has its own accumulator. | ||||||
|  |     let mut h = Phf::new(keys.to_vec()); | ||||||
|  |     iter_lines( | ||||||
|  |         data, | ||||||
|  |         |data, mut s0: State, mut s1: State, mut s2: State, mut s3: State| { | ||||||
|  |             unsafe { | ||||||
|  |                 // If value is negative, extend name by one character. | ||||||
|  |                 s0.sep += (data.get_unchecked(s0.sep + 1) == &b'-') as usize; | ||||||
|  |                 let name0 = data.get_unchecked(s0.start..s0.sep); | ||||||
|  |  | ||||||
|  |                 s1.sep += (data.get_unchecked(s1.sep + 1) == &b'-') as usize; | ||||||
|  |                 let name1 = data.get_unchecked(s1.start..s1.sep); | ||||||
|  |  | ||||||
|  |                 s2.sep += (data.get_unchecked(s2.sep + 1) == &b'-') as usize; | ||||||
|  |                 let name2 = data.get_unchecked(s2.start..s2.sep); | ||||||
|  |  | ||||||
|  |                 s3.sep += (data.get_unchecked(s3.sep + 1) == &b'-') as usize; | ||||||
|  |                 let name3 = data.get_unchecked(s3.start..s3.sep); | ||||||
|  |  | ||||||
|  |                 let raw0 = parse_to_raw(data, s0.sep + 1, s0.end); | ||||||
|  |                 let raw1 = parse_to_raw(data, s1.sep + 1, s1.end); | ||||||
|  |                 let raw2 = parse_to_raw(data, s2.sep + 1, s2.end); | ||||||
|  |                 let raw3 = parse_to_raw(data, s3.sep + 1, s3.end); | ||||||
|  |  | ||||||
|  |                 let h0 = hash_name(name0); | ||||||
|  |                 let h1 = hash_name(name1); | ||||||
|  |                 let h2 = hash_name(name2); | ||||||
|  |                 let h3 = hash_name(name3); | ||||||
|  |  | ||||||
|  |                 let idx0 = h.compute_index(h0); | ||||||
|  |                 let idx1 = h.compute_index(h1); | ||||||
|  |                 let idx2 = h.compute_index(h2); | ||||||
|  |                 let idx3 = h.compute_index(h3); | ||||||
|  |  | ||||||
|  |                 h.get_index_mut(idx0).add(raw0, raw_to_pdep(raw0)); | ||||||
|  |                 h.get_index_mut(idx1).add(raw1, raw_to_pdep(raw1)); | ||||||
|  |                 h.get_index_mut(idx2).add(raw2, raw_to_pdep(raw2)); | ||||||
|  |                 h.get_index_mut(idx3).add(raw3, raw_to_pdep(raw3)); | ||||||
|  |             } | ||||||
|  |         }, | ||||||
|  |     ); | ||||||
|  |     h | ||||||
|  | } | ||||||
|  |  | ||||||
|  | pub fn run_parallel(data: &[u8], keys: &[Vec<u8>], num_threads: usize) -> Phf { | ||||||
|  |     if num_threads == 0 { | ||||||
|  |         return run(data, keys); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     let phf = std::sync::Mutex::new(Phf::new(keys.to_vec())); | ||||||
|  |  | ||||||
|  |     // Spawn one thread per core. | ||||||
|  |     std::thread::scope(|s| { | ||||||
|  |         let chunks = data.chunks(data.len() / num_threads + 1); | ||||||
|  |         for chunk in chunks { | ||||||
|  |             s.spawn(|| { | ||||||
|  |                 // Each thread has its own accumulator. | ||||||
|  |                 let thread_phf = run(chunk, keys); | ||||||
|  |  | ||||||
|  |                 // Merge results. | ||||||
|  |                 phf.lock().unwrap().merge(thread_phf); | ||||||
|  |             }); | ||||||
|  |         } | ||||||
|  |     }); | ||||||
|  |  | ||||||
|  |     phf.into_inner().unwrap() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | pub fn to_str(name: &[u8]) -> &str { | ||||||
|  |     std::str::from_utf8(name).unwrap() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /// Returns a list of city names found in data. | ||||||
|  | /// Each city is returned twice, once as `<city>` and once as `<city>;`, | ||||||
|  | /// with the latter being used to accumulate negative temperatures. | ||||||
|  | #[inline(never)] | ||||||
|  | pub fn find_city_names(data: &[u8]) -> Vec<Vec<u8>> { | ||||||
|  |     let mut cities = FxHashSet::default(); | ||||||
|  |  | ||||||
|  |     let mut callback = |data: &[u8], state: State| { | ||||||
|  |         let State { start, sep, .. } = state; | ||||||
|  |         let name = unsafe { data.get_unchecked(start..sep) }; | ||||||
|  |         cities.insert(name.to_vec()); | ||||||
|  |  | ||||||
|  |         // Do the same for the name with ; appended. | ||||||
|  |         let name = unsafe { data.get_unchecked(start..sep + 1) }; | ||||||
|  |         cities.insert(name.to_vec()); | ||||||
|  |     }; | ||||||
|  |     iter_lines(data, |d, s0, s1, s2, s3| { | ||||||
|  |         flatten_callback(d, s0, s1, s2, s3, &mut callback) | ||||||
|  |     }); | ||||||
|  |  | ||||||
|  |     let mut cities: Vec<_> = cities.into_iter().collect(); | ||||||
|  |     cities.sort(); | ||||||
|  |     cities | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn flatten_callback<'a>( | ||||||
|  |     data: &'a [u8], | ||||||
|  |     s0: State, | ||||||
|  |     s1: State, | ||||||
|  |     s2: State, | ||||||
|  |     s3: State, | ||||||
|  |     callback: &mut impl FnMut(&'a [u8], State), | ||||||
|  | ) { | ||||||
|  |     callback(data, s0); | ||||||
|  |     callback(data, s1); | ||||||
|  |     callback(data, s2); | ||||||
|  |     callback(data, s3); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[derive(clap::Parser)] | ||||||
|  | pub struct Args { | ||||||
|  |     pub input: Option<String>, | ||||||
|  |  | ||||||
|  |     #[clap(short = 'j', long)] | ||||||
|  |     pub threads: Option<usize>, | ||||||
|  |  | ||||||
|  |     #[clap(long)] | ||||||
|  |     pub print: bool, | ||||||
|  |  | ||||||
|  |     #[clap(long)] | ||||||
|  |     pub stats: bool, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[cfg(test)] | ||||||
|  | mod test { | ||||||
|  |     #[test] | ||||||
|  |     fn parse_raw() { | ||||||
|  |         use super::*; | ||||||
|  |         let d = b"12.3"; | ||||||
|  |         let raw = parse_to_raw(d, 0, 4); | ||||||
|  |         let v = raw_to_value(raw); | ||||||
|  |         assert_eq!(v, 123); | ||||||
|  |  | ||||||
|  |         let d = b"12.3"; | ||||||
|  |         let raw = parse_to_raw(d, 1, 4); | ||||||
|  |         let v = raw_to_value(raw); | ||||||
|  |         assert_eq!(v, 23); | ||||||
|  |     } | ||||||
|  | } | ||||||
| @@ -1,16 +1,15 @@ | |||||||
|  | use crate::models::station_measurements::StationMeasurements; | ||||||
|  | use crate::utils::{hash, parse}; | ||||||
| use std::collections::HashMap; | use std::collections::HashMap; | ||||||
| use std::fs::File; | use std::fs::File; | ||||||
| use std::io::{BufRead, BufReader}; | use std::io::{BufRead, BufReader}; | ||||||
| use std::time::Instant; | use std::time::Instant; | ||||||
| use crate::models::station_measurements::StationMeasurements; |  | ||||||
| use crate::utils::parse; |  | ||||||
| use crate::utils::parse::hashstr; |  | ||||||
|  |  | ||||||
| const DEFAULT_HASHMAP_LENGTH: usize = 10000; | const DEFAULT_HASHMAP_LENGTH: usize = 10000; | ||||||
|  |  | ||||||
| pub fn run() { | pub fn run() { | ||||||
|     let now = Instant::now(); |     let now = Instant::now(); | ||||||
|     let mut stations: HashMap<usize, (String, StationMeasurements)> = |     let mut stations: HashMap<u64, (String, StationMeasurements)> = | ||||||
|         HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); |         HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); | ||||||
|  |  | ||||||
|     let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); |     let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); | ||||||
| @@ -24,8 +23,8 @@ pub fn run() { | |||||||
|             break; |             break; | ||||||
|         } |         } | ||||||
|         let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); |         let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); | ||||||
|         let hash = hashstr(station); |         let hash = hash::bytes(station); | ||||||
|         let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) }; |         let station = unsafe { std::str::from_utf8_unchecked(station) }; | ||||||
|         let temp = parse::temp(temp.split_last().unwrap().1); |         let temp = parse::temp(temp.split_last().unwrap().1); | ||||||
|         let measurements_option = stations.get_mut(&hash); |         let measurements_option = stations.get_mut(&hash); | ||||||
|         if let Some((_, measurements)) = measurements_option { |         if let Some((_, measurements)) = measurements_option { | ||||||
| @@ -37,7 +36,7 @@ pub fn run() { | |||||||
|                 count: 1, |                 count: 1, | ||||||
|                 sum: temp, |                 sum: temp, | ||||||
|             }; |             }; | ||||||
|             stations.insert(hash, (station, measurements)); |             stations.insert(hash, (station.to_string(), measurements)); | ||||||
|         } |         } | ||||||
|         line.clear(); |         line.clear(); | ||||||
|     } |     } | ||||||
| @@ -51,5 +50,5 @@ pub fn run() { | |||||||
|     stations.sort(); |     stations.sort(); | ||||||
|     let stations = stations.join(","); |     let stations = stations.join(","); | ||||||
|     println!("{{{stations}}}"); |     println!("{{{stations}}}"); | ||||||
|     println!("Time={} μs", now.elapsed().as_micros()); |     println!("Time={} ms", now.elapsed().as_millis()); | ||||||
| } | } | ||||||
|   | |||||||
| @@ -2,8 +2,7 @@ use smol::fs::File; | |||||||
| use smol::io::{AsyncBufReadExt, BufReader}; | use smol::io::{AsyncBufReadExt, BufReader}; | ||||||
|  |  | ||||||
| use crate::models::station_measurements::StationMeasurements; | use crate::models::station_measurements::StationMeasurements; | ||||||
| use crate::utils::parse; | use crate::utils::{hash, parse}; | ||||||
| use crate::utils::parse::hashstr; |  | ||||||
| use std::collections::HashMap; | use std::collections::HashMap; | ||||||
| use std::time::Instant; | use std::time::Instant; | ||||||
|  |  | ||||||
| @@ -11,7 +10,7 @@ const DEFAULT_HASHMAP_LENGTH: usize = 10000; | |||||||
|  |  | ||||||
| pub fn run() { | pub fn run() { | ||||||
|     let now = Instant::now(); |     let now = Instant::now(); | ||||||
|     let mut stations: HashMap<usize, (String, StationMeasurements)> = |     let mut stations: HashMap<u64, (String, StationMeasurements)> = | ||||||
|         HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); |         HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH); | ||||||
|  |  | ||||||
|     smol::block_on(async { |     smol::block_on(async { | ||||||
| @@ -29,8 +28,8 @@ pub fn run() { | |||||||
|                 break; |                 break; | ||||||
|             } |             } | ||||||
|             let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); |             let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap(); | ||||||
|             let hash = hashstr(station); |             let hash = hash::bytes(station); | ||||||
|             let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) }; |             let station = unsafe { std::str::from_utf8_unchecked(station) }; | ||||||
|             let temp = parse::temp(temp.split_last().unwrap().1); |             let temp = parse::temp(temp.split_last().unwrap().1); | ||||||
|             let measurements_option = stations.get_mut(&hash); |             let measurements_option = stations.get_mut(&hash); | ||||||
|             if let Some((_, measurements)) = measurements_option { |             if let Some((_, measurements)) = measurements_option { | ||||||
| @@ -42,7 +41,7 @@ pub fn run() { | |||||||
|                     count: 1, |                     count: 1, | ||||||
|                     sum: temp, |                     sum: temp, | ||||||
|                 }; |                 }; | ||||||
|                 stations.insert(hash, (station, measurements)); |                 stations.insert(hash, (station.to_string(), measurements)); | ||||||
|             } |             } | ||||||
|             line.clear(); |             line.clear(); | ||||||
|         } |         } | ||||||
|   | |||||||
| @@ -2,7 +2,8 @@ | |||||||
| #![feature(portable_simd)] | #![feature(portable_simd)] | ||||||
| #![feature(slice_split_once)] | #![feature(slice_split_once)] | ||||||
| #![feature(hash_raw_entry)] | #![feature(hash_raw_entry)] | ||||||
|  | #![feature(int_roundings)] | ||||||
|  |  | ||||||
|  | pub mod implementations; | ||||||
| pub mod models; | pub mod models; | ||||||
| pub mod utils; | pub mod utils; | ||||||
| pub mod implementations; |  | ||||||
|   | |||||||
| @@ -1,4 +1,3 @@ | |||||||
|  |  | ||||||
| fn main() { | fn main() { | ||||||
|     // let now = Instant::now(); |     // let now = Instant::now(); | ||||||
|     // let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); |     // let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); | ||||||
|   | |||||||
| @@ -1,2 +1,2 @@ | |||||||
| pub mod station_measurements; |  | ||||||
| pub mod mmap; | pub mod mmap; | ||||||
|  | pub mod station_measurements; | ||||||
|   | |||||||
| @@ -12,7 +12,7 @@ pub struct Mmap<'a> { | |||||||
|  |  | ||||||
| /// To properly dispose of the mmap we have to manually call munmap. | /// To properly dispose of the mmap we have to manually call munmap. | ||||||
| /// So implementing drop for this smart-pointer type is necessary. | /// So implementing drop for this smart-pointer type is necessary. | ||||||
| impl<'a> Drop for Mmap<'a> { | impl Drop for Mmap<'_> { | ||||||
|     fn drop(&mut self) { |     fn drop(&mut self) { | ||||||
|         unsafe { |         unsafe { | ||||||
|             munmap( |             munmap( | ||||||
| @@ -25,7 +25,7 @@ impl<'a> Drop for Mmap<'a> { | |||||||
|  |  | ||||||
| // anti-pattern for non-smart pointer types. | // anti-pattern for non-smart pointer types. | ||||||
| // ref: https://rust-unofficial.github.io/patterns/anti_patterns/deref.html | // ref: https://rust-unofficial.github.io/patterns/anti_patterns/deref.html | ||||||
| impl<'a> Deref for Mmap<'a> { | impl Deref for Mmap<'_> { | ||||||
|     type Target = [u8]; |     type Target = [u8]; | ||||||
|  |  | ||||||
|     fn deref(&self) -> &Self::Target { |     fn deref(&self) -> &Self::Target { | ||||||
| @@ -50,7 +50,7 @@ impl<'a> Mmap<'a> { | |||||||
|             // We can advise the kernel on how we intend to use the mmap. |             // We can advise the kernel on how we intend to use the mmap. | ||||||
|             // But this did not improve my read performance in a meaningful way |             // But this did not improve my read performance in a meaningful way | ||||||
|             madvise(m, size, MADV_WILLNEED); |             madvise(m, size, MADV_WILLNEED); | ||||||
|             return Self::new(std::slice::from_raw_parts(m as *const u8, size)); |             Self::new(std::slice::from_raw_parts(m as *const u8, size)) | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -1,4 +1,5 @@ | |||||||
| pub mod byte_pos; | pub mod byte_pos; | ||||||
|  | pub mod hash; | ||||||
| pub mod parse; | pub mod parse; | ||||||
| pub mod write_structured_measurements; | pub mod write_structured_measurements; | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										40
									
								
								src/main/rust/src/utils/hash.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										40
									
								
								src/main/rust/src/utils/hash.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,40 @@ | |||||||
|  | #[inline] | ||||||
|  | pub fn bytes(bytes: &[u8]) -> u64 { | ||||||
|  |     if cfg!(not(debug_assertions)) { | ||||||
|  |         // inspired by https://curiouscoding.nl/posts/1brc/ | ||||||
|  |         let head: [u8; 8] = unsafe { bytes.get_unchecked(..8).as_chunks::<8>().0[0] }; | ||||||
|  |         let tail: [u8; 8] = unsafe { bytes.get_unchecked(bytes.len() - 8..).as_chunks::<8>().0[0] }; | ||||||
|  |         let shift = 64usize.saturating_sub(8 * bytes.len()); | ||||||
|  |         let khead = u64::from_ne_bytes(head) << shift; | ||||||
|  |         let ktail = u64::from_ne_bytes(tail) >> shift; | ||||||
|  |         khead + ktail | ||||||
|  |     } else { | ||||||
|  |         // debug friendly but slow | ||||||
|  |         let mut head = [0u8; 8]; | ||||||
|  |         let mut tail = [0u8; 8]; | ||||||
|  |         let end = bytes.len().min(8); | ||||||
|  |         let start = bytes.len().saturating_sub(8); | ||||||
|  |         head[..end].copy_from_slice(&bytes[..end]); | ||||||
|  |         tail[..end].copy_from_slice(&bytes[start..]); | ||||||
|  |         let shift = 64usize.saturating_sub(8 * bytes.len()); | ||||||
|  |         let khead = u64::from_ne_bytes(head) << shift; | ||||||
|  |         let ktail = u64::from_ne_bytes(tail) >> shift; | ||||||
|  |         khead.wrapping_add(ktail) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[cfg(test)] | ||||||
|  | mod tests { | ||||||
|  |     use crate::utils::hash; | ||||||
|  |  | ||||||
|  |     #[test] | ||||||
|  |     fn test_hashstr() { | ||||||
|  |         let hash_1 = hash::bytes(b"abcdefghijk"); | ||||||
|  |         let hash_2 = hash::bytes(b"kjihgfedcba"); | ||||||
|  |         let hash_3 = hash::bytes(b"abba"); | ||||||
|  |         let hash_4 = hash::bytes(b"baab"); | ||||||
|  |  | ||||||
|  |         assert_ne!(hash_1, hash_2); | ||||||
|  |         assert_ne!(hash_3, hash_4); | ||||||
|  |     } | ||||||
|  | } | ||||||
| @@ -69,27 +69,9 @@ pub fn temp_simd(bytes: &[u8]) -> isize { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| #[inline] |  | ||||||
| pub fn hashstr(bytes: &[u8]) -> usize { |  | ||||||
|     let mut hash = 0; |  | ||||||
|     let (chunks, remainder) = bytes.as_chunks::<8>(); |  | ||||||
|     for &chunk in chunks { |  | ||||||
|         hash += usize::from_be_bytes(chunk); |  | ||||||
|     } |  | ||||||
|     let mut r = [0_u8; 8]; |  | ||||||
|     r[0] = remainder.len() as u8; |  | ||||||
|     let mut idx = 1; |  | ||||||
|     for &byte in remainder { |  | ||||||
|         r[idx] = byte; |  | ||||||
|         idx += 1; |  | ||||||
|     } |  | ||||||
|     hash += usize::from_be_bytes(r); |  | ||||||
|     hash |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[cfg(test)] | #[cfg(test)] | ||||||
| mod tests { | mod tests { | ||||||
|     use crate::utils::parse::{hashstr, temp_new}; |     use crate::utils::parse::temp_new; | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_temp_new_max() { |     fn test_temp_new_max() { | ||||||
| @@ -120,15 +102,4 @@ mod tests { | |||||||
|         let temp_neg_10 = temp_new("-9.9".as_bytes()); |         let temp_neg_10 = temp_new("-9.9".as_bytes()); | ||||||
|         assert_eq!(temp_neg_10, -99); |         assert_eq!(temp_neg_10, -99); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[test] |  | ||||||
|     fn test_hashstr() { |  | ||||||
|         let hash_1 = hashstr(b"abcdefghijk"); |  | ||||||
|         let hash_2 = hashstr(b"kjihgfedcba"); |  | ||||||
|         let hash_3 = hashstr(b"abba"); |  | ||||||
|         let hash_4 = hashstr(b"baab"); |  | ||||||
|  |  | ||||||
|         assert_ne!(hash_1, hash_2); |  | ||||||
|         assert_ne!(hash_3, hash_4); |  | ||||||
|     } |  | ||||||
| } | } | ||||||
|   | |||||||
| @@ -3,7 +3,8 @@ use std::io::{BufRead, BufReader, BufWriter, Write}; | |||||||
|  |  | ||||||
| pub fn write_structured_measurements() { | pub fn write_structured_measurements() { | ||||||
|     let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); |     let file = File::open("../../../measurements.txt").expect("File measurements.txt not found"); | ||||||
|     let structured_file = File::create_new("structured_measurements.txt").expect("Could not create file"); |     let structured_file = | ||||||
|  |         File::create_new("structured_measurements.txt").expect("Could not create file"); | ||||||
|     let mut reader = BufReader::new(&file); |     let mut reader = BufReader::new(&file); | ||||||
|     let mut writer = BufWriter::new(&structured_file); |     let mut writer = BufWriter::new(&structured_file); | ||||||
|     let mut line = Vec::with_capacity(107); |     let mut line = Vec::with_capacity(107); | ||||||
| @@ -27,7 +28,9 @@ pub fn write_structured_measurements() { | |||||||
|         write_line[100] = b';'; |         write_line[100] = b';'; | ||||||
|         write_line[temp_val_start..temp_val_start + temp.len()].clone_from_slice(temp); |         write_line[temp_val_start..temp_val_start + temp.len()].clone_from_slice(temp); | ||||||
|         write_line[106] = b'\n'; |         write_line[106] = b'\n'; | ||||||
|         writer.write_all(write_line.as_slice()).expect("Could not write"); |         writer | ||||||
|  |             .write_all(write_line.as_slice()) | ||||||
|  |             .expect("Could not write"); | ||||||
|         line.clear(); |         line.clear(); | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user