Compare commits
27 Commits
b1d7ebaaea
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| fdd92dd5f7 | |||
| 45b3014cbb | |||
| 98cd6e930c | |||
| 40a8d6d929 | |||
| 0ea10a3c1b | |||
| 3dbc9c32d1 | |||
| 4d586c809e | |||
| 5bb2363eee | |||
| eb2ed15e33 | |||
| dfcc8562e6 | |||
| 212e595a7e | |||
| 7b8943976f | |||
| aaa11c7b94 | |||
| 07a8e7fc69 | |||
| 0aa9d8be86 | |||
| b1c064a92f | |||
| ea06a600ce | |||
| ac5c45f8d5 | |||
| b8f589096f | |||
| c306083192 | |||
| a45ddd2dc0 | |||
| e832475fc3 | |||
| 608cbb59e5 | |||
| 53ea542f36 | |||
| d246c54cd9 | |||
| 2a89d061a0 | |||
| 7add8793a5 |
91
src/main/julia/main.jl
Normal file
91
src/main/julia/main.jl
Normal file
@@ -0,0 +1,91 @@
|
||||
using Mmap
|
||||
|
||||
mutable struct StationMeasurements
|
||||
min::Float64
|
||||
max::Float64
|
||||
sum::Float64
|
||||
count::Int64
|
||||
end
|
||||
|
||||
function update(sm, temp::Float64)
|
||||
if temp < min
|
||||
sm.min = temp
|
||||
elseif temp > max
|
||||
sm.max = temp
|
||||
end
|
||||
sm.sum += temp
|
||||
sm.count += 1
|
||||
end
|
||||
|
||||
function print_measurements(stations::Dict{String,StationMeasurements})
|
||||
sorted_keys = sort(collect(keys(stations)))
|
||||
print("{")
|
||||
sm_vec = []
|
||||
for city in sorted_keys
|
||||
sm = stations[city]
|
||||
min = round(sm.min; digits=1)
|
||||
max = round(sm.max; digits=1)
|
||||
avg = round((sm.sum / sm.count); digits=1)
|
||||
push!(sm_vec, "$city=$min/$avg/$max")
|
||||
end
|
||||
joined = join(sm_vec, ", ")
|
||||
print(joined)
|
||||
print("}")
|
||||
end
|
||||
|
||||
function merge(stations_vec::Vector{Dict{String,StationMeasurements}})
|
||||
merged = Dict{String,StationMeasurements}()
|
||||
for stations in stations_vec
|
||||
for (city, sm) in stations
|
||||
if haskey(merged, city)
|
||||
merged_sm = merged[city]
|
||||
sm.min = ifelse(merged_sm.min < sm.min, merged_sm.min, sm.min)
|
||||
sm.max = ifelse(merged_sm.max > sm.max, merged_sm.max, sm.max)
|
||||
sm.sum += merged_sm.sum
|
||||
sm.count += merged_sm.count
|
||||
else
|
||||
merged[city] = sm
|
||||
end
|
||||
end
|
||||
end
|
||||
merged
|
||||
end
|
||||
|
||||
function process_chunk(data, chunk)
|
||||
stations = Dict{String,StationMeasurements}()
|
||||
for i in eachindex(chunk)
|
||||
if i == 1
|
||||
continue
|
||||
end
|
||||
line = String(data[chunk[i-1]:chunk[i]-1])
|
||||
station, temp_str = rsplit(line, ";")
|
||||
temp = parse(Float32, temp_str)
|
||||
if haskey(stations, station)
|
||||
sm = stations[station]
|
||||
sm.min = ifelse(temp < sm.min, temp, sm.min)
|
||||
sm.max = ifelse(temp > sm.max, temp, sm.max)
|
||||
sm.sum += temp
|
||||
sm.count += 1
|
||||
else
|
||||
stations[station] = StationMeasurements(temp, temp, temp, 1)
|
||||
end
|
||||
end
|
||||
stations
|
||||
end
|
||||
|
||||
function main()
|
||||
open("../../../measurements.txt", "r") do f
|
||||
sz = Base.stat(f).size
|
||||
data = mmap(f, Vector{UInt8}, sz)
|
||||
idxs = findall(isequal(0x0a), data)
|
||||
idxs_chunks = collect(Iterators.partition(idxs, length(idxs) ÷ Threads.nthreads()))
|
||||
tasks = map(idxs_chunks) do chunk
|
||||
Threads.@spawn process_chunk(data, chunk)
|
||||
end
|
||||
stations_vec = fetch.(tasks)
|
||||
stations = merge(stations_vec)
|
||||
print_measurements(stations)
|
||||
end
|
||||
end
|
||||
|
||||
main()
|
||||
57
src/main/lua/main.lua
Normal file
57
src/main/lua/main.lua
Normal file
@@ -0,0 +1,57 @@
|
||||
local file_path = "../../../measurements.txt"
|
||||
|
||||
local file = io.open(file_path, "rb")
|
||||
if not file then
|
||||
print("Unable to open file")
|
||||
return
|
||||
end
|
||||
|
||||
local function split_semi(inputstr)
|
||||
local t = {}
|
||||
for str in string.gmatch(inputstr, "([^;]+)") do
|
||||
table.insert(t, str)
|
||||
end
|
||||
return t
|
||||
end
|
||||
|
||||
local stations = {}
|
||||
|
||||
local iterations = 1
|
||||
|
||||
for line in file:lines("*l") do
|
||||
if iterations % 1000000 == 0 then
|
||||
io.write("\x1b[J\x1b[H")
|
||||
io.write(iterations / 10000000)
|
||||
io.write("\n")
|
||||
end
|
||||
local split_line = split_semi(line)
|
||||
local station = split_line[1]
|
||||
local temp_str = string.gsub(split_line[2], "[ ]", "")
|
||||
local temp = tonumber(temp_str)
|
||||
if stations[station] == nil then
|
||||
stations[station] = { min = temp, max = temp, sum = temp, count = 1 }
|
||||
else
|
||||
if temp < stations[station].min then
|
||||
stations[station].min = temp
|
||||
elseif temp > stations[station].max then
|
||||
stations[station].max = temp
|
||||
end
|
||||
stations[station].sum = stations[station].sum + temp
|
||||
stations[station].count = stations[station].count + 1
|
||||
end
|
||||
iterations = iterations + 1
|
||||
end
|
||||
|
||||
local keys = {}
|
||||
for k in pairs(stations) do table.insert(keys, k) end
|
||||
table.sort(keys)
|
||||
|
||||
local fstations = {}
|
||||
io.write("{")
|
||||
for _, station in ipairs(keys) do
|
||||
local avg = ((stations[station].sum / 10) / stations[station].count)
|
||||
local res_str = string.format("%s=%.1f/%.1f/%.1f", station, stations[station].min, avg, stations[station].max)
|
||||
table.insert(fstations, res_str)
|
||||
end
|
||||
io.write(table.concat(fstations, ","))
|
||||
print("}")
|
||||
1310
src/main/rust/Cargo.lock
generated
1310
src/main/rust/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -6,22 +6,26 @@ edition = "2021"
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
bstr = "1.9.1"
|
||||
bstr = "1.10.0"
|
||||
fast-float = "0.2.0"
|
||||
memchr = "2.7.4"
|
||||
memmap = "0.7.0"
|
||||
polars = { version = "0.36.2", features = ["csv", "lazy", "nightly", "streaming"]}
|
||||
memmap2 = "0.9.4"
|
||||
rayon = "1.10.0"
|
||||
rustc-hash = "2.0.0"
|
||||
libc = "0.2.155"
|
||||
smol = "2.0.0"
|
||||
libc = "0.2.158"
|
||||
smol = "2.0.1"
|
||||
easy-parallel = "3.3.1"
|
||||
clap = { version = "4.5.13", features = ["derive"] }
|
||||
colored = "2.1.0"
|
||||
ptr_hash = { git = "https://github.com/ragnargrootkoerkamp/ptrhash", default-features = false }
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = { version = "0.5", features = ["html_reports"] }
|
||||
criterion = { version = "0.5.1", features = ["html_reports"] }
|
||||
|
||||
[features]
|
||||
json = []
|
||||
unsafe = []
|
||||
no_pdep = []
|
||||
|
||||
[[bench]]
|
||||
name = "reference_impl"
|
||||
@@ -36,7 +40,7 @@ name = "multi_threaded"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "polars"
|
||||
name = "multi_threaded_smol"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
@@ -51,3 +55,8 @@ harness = false
|
||||
lto = "fat"
|
||||
strip = "symbols"
|
||||
panic = "abort"
|
||||
|
||||
[profile.flamegraph]
|
||||
inherits = "release"
|
||||
debug = true
|
||||
strip = "none"
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use criterion::{Criterion, criterion_group, criterion_main};
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use onebrc::implementations::flare_flo::run;
|
||||
|
||||
pub fn criterion_benchmark(c: &mut Criterion) {
|
||||
c.bench_function("flareflo", |b| {b.iter(|| run())});
|
||||
c.bench_function("flareflo", |b| b.iter(run));
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
|
||||
9
src/main/rust/benches/libraries.rs
Normal file
9
src/main/rust/benches/libraries.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use onebrc::implementations::libraries::run;
|
||||
|
||||
pub fn criterion_benchmark(c: &mut Criterion) {
|
||||
c.bench_function("libraries", |b| b.iter(run));
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
criterion_main!(benches);
|
||||
@@ -1,8 +1,8 @@
|
||||
use criterion::{Criterion, criterion_group, criterion_main};
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use onebrc::implementations::multi_threaded::run;
|
||||
|
||||
pub fn criterion_benchmark(c: &mut Criterion) {
|
||||
c.bench_function("multithreaded", |b| {b.iter(|| run())});
|
||||
c.bench_function("multithreaded", |b| b.iter(run));
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
|
||||
9
src/main/rust/benches/multi_threaded_smol.rs
Normal file
9
src/main/rust/benches/multi_threaded_smol.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use onebrc::implementations::multi_threaded_smol::run;
|
||||
|
||||
pub fn criterion_benchmark(c: &mut Criterion) {
|
||||
c.bench_function("multithreadedsmol", |b| b.iter(run));
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
criterion_main!(benches);
|
||||
@@ -1,8 +1,8 @@
|
||||
use criterion::{Criterion, criterion_group, criterion_main};
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use onebrc::implementations::phcs::run;
|
||||
|
||||
pub fn criterion_benchmark(c: &mut Criterion) {
|
||||
c.bench_function("phcs", |b| {b.iter(|| run())});
|
||||
c.bench_function("phcs", |b| b.iter(run));
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
use criterion::{Criterion, criterion_group, criterion_main};
|
||||
|
||||
pub fn criterion_benchmark(c: &mut Criterion) {
|
||||
c.bench_function("polars", |b| {b.iter(|| /*run_polars()*/ ())});
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
criterion_main!(benches);
|
||||
@@ -1,8 +1,8 @@
|
||||
use criterion::{Criterion, criterion_group, criterion_main};
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use onebrc::implementations::reference_impl::run;
|
||||
|
||||
pub fn criterion_benchmark(c: &mut Criterion) {
|
||||
c.bench_function("reference", |b| {b.iter(|| run())});
|
||||
c.bench_function("reference", |b| b.iter(run));
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use criterion::{Criterion, criterion_group, criterion_main};
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use onebrc::implementations::single_thread::run;
|
||||
|
||||
pub fn criterion_benchmark(c: &mut Criterion) {
|
||||
c.bench_function("singlethread", |b| {b.iter(|| run())});
|
||||
c.bench_function("singlethread", |b| b.iter(run));
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
|
||||
@@ -1,4 +0,0 @@
|
||||
|
||||
fn main() {
|
||||
// let _ = run_polars();
|
||||
}
|
||||
91
src/main/rust/src/bin/rgk.rs
Normal file
91
src/main/rust/src/bin/rgk.rs
Normal file
@@ -0,0 +1,91 @@
|
||||
use clap::Parser;
|
||||
use colored::Colorize;
|
||||
use memmap2::Mmap;
|
||||
use onebrc::implementations::rgk::{
|
||||
find_city_names, format, run_parallel, to_str, Args, Record, S,
|
||||
};
|
||||
use std::thread::available_parallelism;
|
||||
|
||||
fn main() {
|
||||
let args = Args::parse();
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
let filename = args
|
||||
.input
|
||||
.unwrap_or("../../../measurements.txt".to_string());
|
||||
let mmap: Mmap;
|
||||
let data;
|
||||
{
|
||||
let file = std::fs::File::open(filename).unwrap();
|
||||
mmap = unsafe { Mmap::map(&file).unwrap() };
|
||||
data = &*mmap;
|
||||
}
|
||||
|
||||
// Guaranteed to be aligned for SIMD.
|
||||
let offset = unsafe { data.align_to::<S>().0.len() };
|
||||
let data = &data[offset..];
|
||||
|
||||
// Build a perfect hash function on the cities found in the first 100k characters.
|
||||
let names = find_city_names(&data[..4000000]);
|
||||
|
||||
if args.stats {
|
||||
eprintln!("Num cities: {}", names.len());
|
||||
let mut lens = vec![0; 102];
|
||||
for n in &names {
|
||||
if *n.last().unwrap() == b';' {
|
||||
continue;
|
||||
}
|
||||
lens[n.len()] += 1;
|
||||
}
|
||||
for (len, count) in lens.iter().enumerate() {
|
||||
if *count != 0 {
|
||||
eprintln!("{}: {}", len, count);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let phf = run_parallel(
|
||||
data,
|
||||
&names,
|
||||
args.threads
|
||||
.unwrap_or(available_parallelism().unwrap().into()),
|
||||
);
|
||||
|
||||
if args.print {
|
||||
print!("{{");
|
||||
let mut first = true;
|
||||
|
||||
let mut keys = phf.keys.clone();
|
||||
keys.sort_by(|kl, kr| to_str(kl).cmp(to_str(kr)));
|
||||
|
||||
for name in &keys {
|
||||
if *name.last().unwrap() != b';' {
|
||||
continue;
|
||||
}
|
||||
let namepos = &name[..name.len() - 1];
|
||||
|
||||
let rpos = phf.index(namepos);
|
||||
let rneg = phf.index(name);
|
||||
let (min, avg, max) = Record::merge_pos_neg(rpos, rneg);
|
||||
|
||||
if !first {
|
||||
print!(", ");
|
||||
}
|
||||
first = false;
|
||||
|
||||
print!(
|
||||
"{}={}/{}/{}",
|
||||
to_str(namepos),
|
||||
format(min),
|
||||
format(avg),
|
||||
format(max)
|
||||
);
|
||||
}
|
||||
println!("}}");
|
||||
}
|
||||
|
||||
eprintln!(
|
||||
"total: {}",
|
||||
format!("{:>5.2?}", start.elapsed()).bold().green()
|
||||
);
|
||||
}
|
||||
@@ -4,7 +4,7 @@ pub mod multi_threaded;
|
||||
pub mod multi_threaded_smol;
|
||||
pub mod multi_threaded_structured;
|
||||
pub mod phcs;
|
||||
pub mod polars;
|
||||
pub mod reference_impl;
|
||||
pub mod rgk;
|
||||
pub mod single_thread;
|
||||
pub mod smol;
|
||||
|
||||
@@ -105,7 +105,7 @@ impl Citymap {
|
||||
}
|
||||
}
|
||||
pub fn into_key_values(self) -> Vec<(String, City)> {
|
||||
self.map.into_iter().map(|(_, s)| s).collect()
|
||||
self.map.into_values().collect()
|
||||
}
|
||||
pub fn merge_with(&mut self, rhs: Self) {
|
||||
for (k, v) in rhs.map.into_iter() {
|
||||
@@ -125,7 +125,7 @@ pub fn run() {
|
||||
let start = Instant::now();
|
||||
let input = "../../../measurements.txt";
|
||||
|
||||
let results = if args.find(|e| e == "st").is_some() {
|
||||
let results = if args.any(|e| e == "st") {
|
||||
citymap_single_thread(input)
|
||||
} else {
|
||||
citymap_multi_threaded(input)
|
||||
@@ -159,7 +159,6 @@ fn citymap_multi_threaded(path: &str) -> Citymap {
|
||||
threads.push(citymap_thread(path.to_owned(), range, i, sender.clone()));
|
||||
}
|
||||
let mut ranges = (0..cpus)
|
||||
.into_iter()
|
||||
.map(|_| receiver.recv().unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
ranges.sort_unstable_by_key(|e| e.start);
|
||||
@@ -171,7 +170,7 @@ fn citymap_multi_threaded(path: &str) -> Citymap {
|
||||
}),
|
||||
"Ranges overlap or have gaps: {ranges:?}"
|
||||
);
|
||||
let results = threads
|
||||
threads
|
||||
.into_iter()
|
||||
.map(|e| e.join().unwrap())
|
||||
//.map(|e|dbg!(e))
|
||||
@@ -179,8 +178,7 @@ fn citymap_multi_threaded(path: &str) -> Citymap {
|
||||
left.merge_with(right);
|
||||
left
|
||||
})
|
||||
.unwrap();
|
||||
results
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn citymap_thread(
|
||||
@@ -204,7 +202,7 @@ fn citymap_thread(
|
||||
head.truncate(len);
|
||||
|
||||
for (i, &pos) in head.iter().enumerate() {
|
||||
if pos == '\n' as u8 {
|
||||
if pos == b'\n' {
|
||||
range.start += i as u64;
|
||||
break;
|
||||
}
|
||||
@@ -218,7 +216,7 @@ fn citymap_thread(
|
||||
head.truncate(len);
|
||||
|
||||
for (i, &pos) in head.iter().enumerate() {
|
||||
if pos == '\n' as u8 {
|
||||
if pos == b'\n' {
|
||||
range.end += i as u64;
|
||||
break;
|
||||
}
|
||||
@@ -249,7 +247,7 @@ fn citymap_naive(input: &mut impl BufRead) -> Citymap {
|
||||
}
|
||||
|
||||
// Skip over just newline strings that get created by the alignment process
|
||||
if buf == &[b'\n'] {
|
||||
if buf == b"\n" {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,65 +1,58 @@
|
||||
use std::collections::HashMap;
|
||||
use std::io::{BufRead, Seek, SeekFrom};
|
||||
use crate::models::station_measurements::StationMeasurements;
|
||||
use crate::utils::{hash, parse};
|
||||
use memmap2::MmapOptions;
|
||||
use rustc_hash::{FxBuildHasher, FxHashMap as HashMap};
|
||||
use std::slice::from_raw_parts;
|
||||
use std::sync::mpsc;
|
||||
use std::time::Instant;
|
||||
use std::{fs::File, io::BufReader, thread};
|
||||
use memmap::MmapOptions;
|
||||
use crate::models::station_measurements::StationMeasurements;
|
||||
use crate::utils::parse;
|
||||
use crate::utils::parse::hashstr;
|
||||
use std::{fs::File, thread};
|
||||
|
||||
const DEFAULT_HASHMAP_LENGTH: usize = 10000;
|
||||
|
||||
pub fn run() {
|
||||
const FILE_PATH: &str = "../../../measurements.txt";
|
||||
let now = Instant::now();
|
||||
thread::scope(|s| {
|
||||
let mut stations: HashMap<usize, (String, StationMeasurements)> =
|
||||
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let cores = thread::available_parallelism().unwrap().into();
|
||||
const FILE_PATH: &str = "../../../measurements.txt";
|
||||
let file = File::open(FILE_PATH).expect("File measurements.txt not found");
|
||||
let mmap = unsafe { MmapOptions::new().map(&file).unwrap() };
|
||||
let mmap_ptr = mmap.as_ptr();
|
||||
let file_length = mmap.len();
|
||||
let hasher = FxBuildHasher;
|
||||
// Even if I could now just use the byte slice as a key, doing the hash is still faster
|
||||
let mut stations: HashMap<u64, (&[u8], StationMeasurements)> =
|
||||
HashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher);
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let cores = thread::available_parallelism().unwrap().into();
|
||||
let chunk_length = file_length / cores;
|
||||
let mut bounds = Vec::with_capacity(cores + 1);
|
||||
bounds.push(0);
|
||||
for i in 1..cores {
|
||||
let mut reader = BufReader::new(&file);
|
||||
let mut byte_start = chunk_length * i;
|
||||
reader
|
||||
.seek(SeekFrom::Start(byte_start as u64))
|
||||
.expect("could not seek");
|
||||
let mut line = Vec::with_capacity(108);
|
||||
let line_len = reader
|
||||
.read_until(b'\n', &mut line)
|
||||
.expect("could not read bytes");
|
||||
byte_start += line_len;
|
||||
bounds.push(byte_start);
|
||||
let mut start = 0;
|
||||
for _ in 0..cores {
|
||||
let end = (start + chunk_length).min(mmap.len());
|
||||
let next_new_line = match memchr::memchr(b'\n', &mmap[end..]) {
|
||||
Some(v) => v,
|
||||
None => {
|
||||
assert_eq!(end, mmap.len());
|
||||
0
|
||||
}
|
||||
bounds.push(file_length);
|
||||
};
|
||||
let end = end + next_new_line;
|
||||
bounds.push((start, end));
|
||||
start = end + 1;
|
||||
}
|
||||
thread::scope(|s| {
|
||||
for i in 0..cores {
|
||||
let tx = tx.clone();
|
||||
let mut currposition = *bounds.get(i).unwrap();
|
||||
let end = *bounds.get(i + 1).unwrap();
|
||||
let (start, end) = *bounds.get(i).unwrap();
|
||||
let mmap_slice = unsafe { from_raw_parts(mmap_ptr.add(start), end - start) };
|
||||
s.spawn(move || {
|
||||
let file = File::open(FILE_PATH).expect("File measurements.txt not found");
|
||||
let mut reader = BufReader::new(&file);
|
||||
reader.seek(SeekFrom::Start(currposition as u64)).unwrap();
|
||||
let mut t_stations: HashMap<usize, (String, StationMeasurements)> =
|
||||
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
|
||||
let mut line = Vec::with_capacity(108);
|
||||
loop {
|
||||
let line_len = reader
|
||||
.read_until(b'\n', &mut line)
|
||||
.expect("could not read bytes");
|
||||
if line_len == 0 {
|
||||
let mut t_stations: HashMap<u64, (&[u8], StationMeasurements)> =
|
||||
HashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher);
|
||||
for line in mmap_slice.split(|&byte| byte == b'\n') {
|
||||
if line.is_empty() {
|
||||
break;
|
||||
}
|
||||
let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap();
|
||||
let hash = hashstr(station);
|
||||
let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) };
|
||||
let temp = parse::temp(temp.split_last().unwrap().1);
|
||||
let hash = hash::bytes(station);
|
||||
let temp = parse::temp(temp);
|
||||
let measurements_option = t_stations.get_mut(&hash);
|
||||
if let Some((_, measurements)) = measurements_option {
|
||||
measurements.update(temp);
|
||||
@@ -72,29 +65,25 @@ pub fn run() {
|
||||
};
|
||||
t_stations.insert(hash, (station, measurements));
|
||||
}
|
||||
currposition += line_len;
|
||||
if currposition >= end {
|
||||
break;
|
||||
}
|
||||
line.clear();
|
||||
}
|
||||
let _ = tx.send(t_stations);
|
||||
});
|
||||
}
|
||||
drop(tx);
|
||||
while let Ok(t_stations) = rx.recv() {
|
||||
for (&hash, (station, measurements)) in t_stations.iter() {
|
||||
let joined_measurements_options = stations.get_mut(&hash);
|
||||
for (hash, (station, measurements)) in t_stations.iter() {
|
||||
let joined_measurements_options = stations.get_mut(hash);
|
||||
if let Some((_, joined_measurements)) = joined_measurements_options {
|
||||
joined_measurements.merge(measurements);
|
||||
} else {
|
||||
stations.insert(hash, (station.to_owned(), *measurements));
|
||||
stations.insert(*hash, (station, *measurements));
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut stations: Vec<String> = stations
|
||||
.iter()
|
||||
.map(|(_, (station, measurements))| {
|
||||
let station = unsafe { std::str::from_utf8_unchecked(station) };
|
||||
let measurements = measurements.to_string();
|
||||
#[cfg(feature = "json")]
|
||||
{
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
use crate::models::station_measurements::StationMeasurements;
|
||||
use crate::utils::{hash, parse};
|
||||
use std::collections::HashMap;
|
||||
use std::io::{BufRead, Seek, SeekFrom};
|
||||
use std::sync::mpsc;
|
||||
use std::time::Instant;
|
||||
use std::{fs::File, io::BufReader, thread};
|
||||
use crate::models::station_measurements::StationMeasurements;
|
||||
use crate::utils::parse;
|
||||
use crate::utils::parse::hashstr;
|
||||
|
||||
const DEFAULT_HASHMAP_LENGTH: usize = 10000;
|
||||
|
||||
@@ -13,7 +12,7 @@ pub fn run() {
|
||||
const FILE_PATH: &str = "../../../measurements.txt";
|
||||
let now = Instant::now();
|
||||
thread::scope(|s| {
|
||||
let mut stations: HashMap<usize, (String, StationMeasurements)> =
|
||||
let mut stations: HashMap<u64, (String, StationMeasurements)> =
|
||||
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let cores = thread::available_parallelism().unwrap().into();
|
||||
@@ -22,30 +21,29 @@ pub fn run() {
|
||||
let file_length = reader.seek(SeekFrom::End(0)).unwrap();
|
||||
let chunk_length = file_length as usize / cores;
|
||||
let mut bounds = Vec::with_capacity(cores + 1);
|
||||
bounds.push(0);
|
||||
for i in 1..cores {
|
||||
let mut start = 0;
|
||||
for i in 0..cores {
|
||||
let mut reader = BufReader::new(&file);
|
||||
let mut byte_start = chunk_length * i;
|
||||
let mut end = chunk_length * i;
|
||||
reader
|
||||
.seek(SeekFrom::Start(byte_start as u64))
|
||||
.seek(SeekFrom::Start(end as u64))
|
||||
.expect("could not seek");
|
||||
let mut line = Vec::with_capacity(108);
|
||||
let line_len = reader
|
||||
.read_until(b'\n', &mut line)
|
||||
.expect("could not read bytes");
|
||||
byte_start += line_len;
|
||||
bounds.push(byte_start as u64);
|
||||
end += line_len;
|
||||
bounds.push((start, end));
|
||||
start = end + 1;
|
||||
}
|
||||
bounds.push(file_length);
|
||||
for i in 0..cores {
|
||||
let tx = tx.clone();
|
||||
let mut currposition = *bounds.get(i).unwrap();
|
||||
let end = *bounds.get(i + 1).unwrap();
|
||||
let (mut currposition, end) = *bounds.get(i).unwrap();
|
||||
s.spawn(move || {
|
||||
let file = File::open(FILE_PATH).expect("File measurements.txt not found");
|
||||
let mut reader = BufReader::new(&file);
|
||||
reader.seek(SeekFrom::Start(currposition)).unwrap();
|
||||
let mut t_stations: HashMap<usize, (String, StationMeasurements)> =
|
||||
reader.seek(SeekFrom::Start(currposition as u64)).unwrap();
|
||||
let mut t_stations: HashMap<u64, (String, StationMeasurements)> =
|
||||
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
|
||||
let mut line = Vec::with_capacity(108);
|
||||
loop {
|
||||
@@ -56,8 +54,8 @@ pub fn run() {
|
||||
break;
|
||||
}
|
||||
let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap();
|
||||
let hash = hashstr(station);
|
||||
let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) };
|
||||
let hash = hash::bytes(station);
|
||||
let station = unsafe { std::str::from_utf8_unchecked(station) };
|
||||
let temp = parse::temp(temp.split_last().unwrap().1);
|
||||
let measurements_option = t_stations.get_mut(&hash);
|
||||
if let Some((_, measurements)) = measurements_option {
|
||||
@@ -69,9 +67,9 @@ pub fn run() {
|
||||
count: 1,
|
||||
sum: temp,
|
||||
};
|
||||
t_stations.insert(hash, (station, measurements));
|
||||
t_stations.insert(hash, (station.to_string(), measurements));
|
||||
}
|
||||
currposition += line_len as u64;
|
||||
currposition += line_len;
|
||||
if currposition >= end {
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
use smol::fs::File;
|
||||
use smol::io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::models::station_measurements::StationMeasurements;
|
||||
use crate::utils::parse;
|
||||
use crate::utils::parse::hashstr;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::mpsc;
|
||||
use crate::utils::{hash, parse};
|
||||
use easy_parallel::Parallel;
|
||||
use std::thread;
|
||||
use std::time::Instant;
|
||||
|
||||
@@ -14,10 +13,8 @@ const DEFAULT_HASHMAP_LENGTH: usize = 10000;
|
||||
pub fn run() {
|
||||
const FILE_PATH: &str = "../../../measurements.txt";
|
||||
let now = Instant::now();
|
||||
thread::scope(|s| {
|
||||
let mut stations: HashMap<usize, (String, StationMeasurements)> =
|
||||
let mut stations: HashMap<u64, (String, StationMeasurements)> =
|
||||
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let cores = thread::available_parallelism().unwrap().into();
|
||||
let bounds = smol::block_on(async {
|
||||
let mut file = File::open(FILE_PATH)
|
||||
@@ -46,18 +43,17 @@ pub fn run() {
|
||||
bounds.push(file_length);
|
||||
bounds
|
||||
});
|
||||
for i in 0..cores {
|
||||
let tx = tx.clone();
|
||||
let t_stations_vec = Parallel::new()
|
||||
.each(0..cores, |i| {
|
||||
let mut currposition = *bounds.get(i).unwrap();
|
||||
let end = *bounds.get(i + 1).unwrap();
|
||||
s.spawn(move || {
|
||||
smol::block_on(async {
|
||||
let mut file = File::open(FILE_PATH)
|
||||
.await
|
||||
.expect("File measurements.txt not found");
|
||||
let mut reader = BufReader::new(&mut file);
|
||||
reader.seek(SeekFrom::Start(currposition)).await.unwrap();
|
||||
let mut t_stations: HashMap<usize, (String, StationMeasurements)> =
|
||||
let mut t_stations: HashMap<u64, (String, StationMeasurements)> =
|
||||
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
|
||||
let mut line = Vec::with_capacity(108);
|
||||
loop {
|
||||
@@ -69,8 +65,8 @@ pub fn run() {
|
||||
break;
|
||||
}
|
||||
let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap();
|
||||
let hash = hashstr(station);
|
||||
let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) };
|
||||
let hash = hash::bytes(station);
|
||||
let station = unsafe { std::str::from_utf8_unchecked(station) };
|
||||
let temp = parse::temp(temp.split_last().unwrap().1);
|
||||
let measurements_option = t_stations.get_mut(&hash);
|
||||
if let Some((_, measurements)) = measurements_option {
|
||||
@@ -82,7 +78,7 @@ pub fn run() {
|
||||
count: 1,
|
||||
sum: temp,
|
||||
};
|
||||
t_stations.insert(hash, (station, measurements));
|
||||
t_stations.insert(hash, (station.to_string(), measurements));
|
||||
}
|
||||
currposition += line_len as u64;
|
||||
if currposition >= end {
|
||||
@@ -90,18 +86,17 @@ pub fn run() {
|
||||
}
|
||||
line.clear();
|
||||
}
|
||||
let _ = tx.send(t_stations);
|
||||
t_stations
|
||||
})
|
||||
});
|
||||
}
|
||||
drop(tx);
|
||||
while let Ok(t_stations) = rx.recv() {
|
||||
for (&hash, (station, measurements)) in t_stations.iter() {
|
||||
let joined_measurements_options = stations.get_mut(&hash);
|
||||
})
|
||||
.run();
|
||||
for t_stations in t_stations_vec {
|
||||
for (hash, (station, measurements)) in t_stations.iter() {
|
||||
let joined_measurements_options = stations.get_mut(hash);
|
||||
if let Some((_, joined_measurements)) = joined_measurements_options {
|
||||
joined_measurements.merge(measurements);
|
||||
} else {
|
||||
stations.insert(hash, (station.to_owned(), *measurements));
|
||||
stations.insert(*hash, (station.to_owned(), *measurements));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -130,5 +125,4 @@ pub fn run() {
|
||||
println!("\n\n{{{stations}}}");
|
||||
}
|
||||
println!("\n\nTime={} ms", now.elapsed().as_millis());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
use std::collections::HashMap;
|
||||
use std::io::{Read, Seek, SeekFrom};
|
||||
use std::sync::mpsc;
|
||||
use std::time::Instant;
|
||||
use std::{fs::File, io::BufReader, thread};
|
||||
use std::ffi::CStr;
|
||||
use crate::models::station_measurements::StationMeasurements;
|
||||
use crate::utils::parse;
|
||||
use crate::utils::parse::hashstr;
|
||||
use memmap2::MmapOptions;
|
||||
use rustc_hash::{FxBuildHasher, FxHashMap};
|
||||
use std::ffi::CStr;
|
||||
use std::sync::mpsc;
|
||||
use std::time::Instant;
|
||||
use std::{fs::File, thread};
|
||||
|
||||
const DEFAULT_HASHMAP_LENGTH: usize = 10000;
|
||||
|
||||
@@ -14,55 +13,59 @@ pub fn run() {
|
||||
print!("\x1b[J");
|
||||
const FILE_PATH: &str = "structured_measurements.txt";
|
||||
let now = Instant::now();
|
||||
thread::scope(|s| {
|
||||
let mut stations: HashMap<usize, (String, StationMeasurements)> =
|
||||
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
|
||||
let file = File::open(FILE_PATH).expect("File structured_measurements.txt not found");
|
||||
let mmap = unsafe { MmapOptions::new().map(&file).unwrap() };
|
||||
let file_length = mmap.len();
|
||||
let hasher = FxBuildHasher;
|
||||
let mut stations: FxHashMap<String, StationMeasurements> =
|
||||
FxHashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher);
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let cores = thread::available_parallelism().unwrap().into();
|
||||
let file = File::open(FILE_PATH).expect("File structured_measurements.txt not found");
|
||||
let mut reader = BufReader::new(&file);
|
||||
let file_length = reader.seek(SeekFrom::End(0)).unwrap();
|
||||
let chunk_length = file_length as usize / cores;
|
||||
let chunk_length = file_length / cores;
|
||||
let mut bounds = Vec::with_capacity(cores + 1);
|
||||
bounds.push(0);
|
||||
let mut start = 0;
|
||||
for _ in 0..cores {
|
||||
let end = (start + chunk_length).min(mmap.len());
|
||||
let next_new_line = match memchr::memchr(b'\n', &mmap[end..]) {
|
||||
Some(v) => v,
|
||||
None => {
|
||||
assert_eq!(end, mmap.len());
|
||||
0
|
||||
}
|
||||
};
|
||||
let end = end + next_new_line;
|
||||
bounds.push((start, end));
|
||||
start = end + 1;
|
||||
}
|
||||
thread::scope(|s| {
|
||||
for i in 0..cores {
|
||||
let tx = tx.clone();
|
||||
let mut currposition = (i * chunk_length) as u64;
|
||||
let end = ((i + 1) * chunk_length) as u64;
|
||||
let (start, end) = *bounds.get(i).unwrap();
|
||||
let mmap_slice = &mmap[start..end];
|
||||
s.spawn(move || {
|
||||
let file = File::open(FILE_PATH).expect("File measurements.txt not found");
|
||||
let mut reader = BufReader::new(&file);
|
||||
reader.seek(SeekFrom::Start(currposition)).unwrap();
|
||||
let mut t_stations: HashMap<usize, (String, StationMeasurements)> =
|
||||
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
|
||||
let mut line = [0u8; 107];
|
||||
let mut line_num = 0;
|
||||
loop {
|
||||
let mut t_stations: FxHashMap<String, StationMeasurements> =
|
||||
FxHashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher);
|
||||
let lines = mmap_slice.chunks_exact(107);
|
||||
for (line_num, line) in lines.enumerate() {
|
||||
if line_num % 100000 == 0 {
|
||||
print!("\x1b[{i};0Hlines: {line_num}");
|
||||
}
|
||||
line_num += 1;
|
||||
let read_res = reader
|
||||
.read_exact(&mut line);
|
||||
match read_res {
|
||||
Ok(_) => (),
|
||||
Err(e) => match e.kind() {
|
||||
std::io::ErrorKind::UnexpectedEof => break,
|
||||
_ => panic!("Could not read")
|
||||
},
|
||||
};
|
||||
let (station, temp) = unsafe { line.split_at_unchecked(100) };
|
||||
let hash = hashstr(station);
|
||||
let station = {
|
||||
if station[station.len() - 1] == 0u8 {
|
||||
unsafe { std::str::from_utf8_unchecked(CStr::from_bytes_until_nul(station).unwrap().to_bytes()) }
|
||||
unsafe {
|
||||
std::str::from_utf8_unchecked(
|
||||
CStr::from_bytes_until_nul(station).unwrap().to_bytes(),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
unsafe { std::str::from_utf8_unchecked(station) }
|
||||
}
|
||||
};
|
||||
}
|
||||
.to_owned();
|
||||
let temp = parse::temp_new(&temp[1..6]);
|
||||
let measurements_option = t_stations.get_mut(&hash);
|
||||
if let Some((_, measurements)) = measurements_option {
|
||||
let measurements_option = t_stations.get_mut(&station);
|
||||
if let Some(measurements) = measurements_option {
|
||||
measurements.update(temp);
|
||||
} else {
|
||||
let measurements = StationMeasurements {
|
||||
@@ -71,11 +74,7 @@ pub fn run() {
|
||||
count: 1,
|
||||
sum: temp,
|
||||
};
|
||||
t_stations.insert(hash, (station.to_string(), measurements));
|
||||
}
|
||||
currposition += 107;
|
||||
if currposition >= end {
|
||||
break;
|
||||
t_stations.insert(station, measurements);
|
||||
}
|
||||
}
|
||||
let _ = tx.send(t_stations);
|
||||
@@ -83,18 +82,18 @@ pub fn run() {
|
||||
}
|
||||
drop(tx);
|
||||
while let Ok(t_stations) = rx.recv() {
|
||||
for (&hash, (station, measurements)) in t_stations.iter() {
|
||||
let joined_measurements_options = stations.get_mut(&hash);
|
||||
if let Some((_, joined_measurements)) = joined_measurements_options {
|
||||
for (station, measurements) in t_stations.iter() {
|
||||
let joined_measurements_options = stations.get_mut(station);
|
||||
if let Some(joined_measurements) = joined_measurements_options {
|
||||
joined_measurements.merge(measurements);
|
||||
} else {
|
||||
stations.insert(hash, (station.to_owned(), *measurements));
|
||||
stations.insert(station.to_owned(), *measurements);
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut stations: Vec<String> = stations
|
||||
.iter()
|
||||
.map(|(_, (station, measurements))| {
|
||||
.map(|(station, measurements)| {
|
||||
let measurements = measurements.to_string();
|
||||
#[cfg(feature = "json")]
|
||||
{
|
||||
|
||||
@@ -115,7 +115,7 @@ fn merge_hashmaps<'a>(
|
||||
}
|
||||
|
||||
/// Parses a chunk of the input as StationData values.
|
||||
fn process_chunk<'a>(current_chunk_slice: &'a [u8]) -> HashMap<&'a [u8], StationData> {
|
||||
fn process_chunk(current_chunk_slice: &[u8]) -> HashMap<&[u8], StationData> {
|
||||
let mut station_map: HashMap<&[u8], StationData> = HashMap::with_capacity(MAX_STATIONS);
|
||||
let mut start = 0;
|
||||
while let Some(end) = current_chunk_slice[start..]
|
||||
@@ -187,7 +187,7 @@ fn write_output_to_stdout(station_map: HashMap<&[u8], StationData>) -> io::Resul
|
||||
pub fn run() -> io::Result<()> {
|
||||
// won't accept non-utf-8 args
|
||||
let args: Vec<String> = env::args().collect();
|
||||
let file_name = match args.get(2).clone() {
|
||||
let file_name = match args.get(2) {
|
||||
Some(fname) => fname,
|
||||
None => "../../../measurements.txt",
|
||||
};
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
use polars::prelude::*;
|
||||
use std::time::Instant;
|
||||
use std::vec;
|
||||
|
||||
pub fn run_polars() -> Result<DataFrame, PolarsError> {
|
||||
let now = Instant::now();
|
||||
|
||||
let f1: Field = Field::new("station", DataType::String);
|
||||
let f2: Field = Field::new("measure", DataType::Float64);
|
||||
let sc: Schema = Schema::from_iter(vec![f1, f2]);
|
||||
|
||||
let q = LazyCsvReader::new("../../../measurements.txt")
|
||||
.has_header(false)
|
||||
.with_schema(Some(Arc::new(sc)))
|
||||
.with_separator(b';')
|
||||
.finish()?
|
||||
.group_by(vec![col("station")])
|
||||
.agg(vec![
|
||||
col("measure").alias("min").min(),
|
||||
col("measure").alias("mean").mean(),
|
||||
col("measure").alias("max").max(),
|
||||
])
|
||||
.sort("station", Default::default())
|
||||
.with_streaming(true);
|
||||
|
||||
let df = q.collect()?;
|
||||
|
||||
println!("Time={} μs", now.elapsed().as_micros());
|
||||
|
||||
Ok(df)
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
use bstr::{BStr, ByteSlice};
|
||||
use memmap::MmapOptions;
|
||||
use memmap2::MmapOptions;
|
||||
use rayon::prelude::*;
|
||||
use rustc_hash::FxHashMap as HashMap;
|
||||
use std::time::Instant;
|
||||
@@ -50,7 +50,7 @@ impl State {
|
||||
fn make_map<'a>(i: impl Iterator<Item = &'a [u8]>) -> HashMap<&'a BStr, State> {
|
||||
let mut state: HashMap<&'a BStr, State> = Default::default();
|
||||
for line in i {
|
||||
let (name, value) = line.split_once_str(&[b';']).unwrap();
|
||||
let (name, value) = line.split_once_str(b";").unwrap();
|
||||
let value = fast_float::parse(value).unwrap();
|
||||
state.entry(name.into()).or_default().update(value);
|
||||
}
|
||||
@@ -58,7 +58,7 @@ fn make_map<'a>(i: impl Iterator<Item = &'a [u8]>) -> HashMap<&'a BStr, State> {
|
||||
}
|
||||
|
||||
fn solve_for_part((start, end): (usize, usize), mem: &[u8]) -> HashMap<&BStr, State> {
|
||||
make_map((&mem[start..end]).lines())
|
||||
make_map((mem[start..end]).lines())
|
||||
}
|
||||
|
||||
fn merge<'a>(a: &mut HashMap<&'a BStr, State>, b: &HashMap<&'a BStr, State>) {
|
||||
@@ -70,7 +70,7 @@ fn merge<'a>(a: &mut HashMap<&'a BStr, State>, b: &HashMap<&'a BStr, State>) {
|
||||
pub fn run() {
|
||||
let now = Instant::now();
|
||||
let cores: usize = std::thread::available_parallelism().unwrap().into();
|
||||
let path = match std::env::args().skip(1).next() {
|
||||
let path = match std::env::args().nth(1) {
|
||||
Some(path) => path,
|
||||
None => "../../../measurements.txt".to_owned(),
|
||||
};
|
||||
@@ -104,7 +104,7 @@ pub fn run() {
|
||||
});
|
||||
|
||||
let mut all: Vec<_> = state.into_iter().collect();
|
||||
all.sort_unstable_by(|a, b| a.0.cmp(&b.0));
|
||||
all.sort_unstable_by(|a, b| a.0.cmp(b.0));
|
||||
#[cfg(feature = "json")]
|
||||
{
|
||||
print!("[");
|
||||
|
||||
443
src/main/rust/src/implementations/rgk.rs
Normal file
443
src/main/rust/src/implementations/rgk.rs
Normal file
@@ -0,0 +1,443 @@
|
||||
use ptr_hash::PtrHashParams;
|
||||
use rustc_hash::FxHashSet;
|
||||
use std::{
|
||||
simd::{cmp::SimdPartialEq, Simd},
|
||||
vec::Vec,
|
||||
};
|
||||
|
||||
type V = i32;
|
||||
|
||||
type PtrHash = ptr_hash::DefaultPtrHash<ptr_hash::hash::FxHash, u64>;
|
||||
|
||||
pub struct Phf {
|
||||
pub ptr_hash: PtrHash,
|
||||
pub keys: Vec<Vec<u8>>,
|
||||
pub slots: Vec<Record>,
|
||||
}
|
||||
|
||||
impl Phf {
|
||||
fn new(mut keys: Vec<Vec<u8>>) -> Self {
|
||||
keys.sort();
|
||||
|
||||
let num_slots = keys.len() * 5 / 2;
|
||||
let params = ptr_hash::PtrHashParams {
|
||||
alpha: 0.9,
|
||||
c: 1.5,
|
||||
slots_per_part: num_slots,
|
||||
..PtrHashParams::default()
|
||||
};
|
||||
|
||||
let mut hashes: Vec<u64> = keys.iter().map(|key| hash_name(key)).collect();
|
||||
hashes.sort();
|
||||
for (x, y) in hashes.iter().zip(hashes.iter().skip(1)) {
|
||||
assert!(*x != *y, "DUPLICATE HASH");
|
||||
}
|
||||
|
||||
let ptr_hash = PtrHash::new(&hashes, params);
|
||||
|
||||
let slots = vec![Record::default(); num_slots];
|
||||
|
||||
Self {
|
||||
ptr_hash,
|
||||
keys,
|
||||
slots,
|
||||
}
|
||||
}
|
||||
fn compute_index(&self, hash: u64) -> usize {
|
||||
self.ptr_hash.index_single_part(&hash)
|
||||
}
|
||||
fn get_index_mut(&mut self, idx: usize) -> &mut Record {
|
||||
&mut self.slots[idx]
|
||||
}
|
||||
fn index_hash_mut(&mut self, hash: u64) -> &mut Record {
|
||||
&mut self.slots[self.ptr_hash.index_single_part(&hash)]
|
||||
}
|
||||
pub fn index<'b>(&'b self, key: &[u8]) -> &'b Record {
|
||||
let hash = hash_name(key);
|
||||
&self.slots[self.compute_index(hash)]
|
||||
}
|
||||
fn index_mut<'b>(&'b mut self, key: &[u8]) -> &'b mut Record {
|
||||
self.index_hash_mut(hash_name(key))
|
||||
}
|
||||
fn merge(&mut self, r: Self) {
|
||||
// TODO: If key sets are equal or one is a subset of the other, merge
|
||||
// smaller into larger.
|
||||
let mut new_keys = vec![];
|
||||
let mut i1 = 0;
|
||||
let mut i2 = 0;
|
||||
while i1 < self.keys.len() && i2 < r.keys.len() {
|
||||
if self.keys[i1] == r.keys[i2] {
|
||||
new_keys.push(self.keys[i1].clone());
|
||||
i1 += 1;
|
||||
i2 += 1;
|
||||
continue;
|
||||
}
|
||||
if self.keys[i1] < r.keys[i2] {
|
||||
new_keys.push(self.keys[i1].clone());
|
||||
i1 += 1;
|
||||
continue;
|
||||
}
|
||||
if self.keys[i1] > r.keys[i2] {
|
||||
new_keys.push(r.keys[i2].clone());
|
||||
i2 += 1;
|
||||
continue;
|
||||
}
|
||||
panic!();
|
||||
}
|
||||
while i1 < self.keys.len() {
|
||||
new_keys.push(self.keys[i1].clone());
|
||||
i1 += 1;
|
||||
}
|
||||
while i2 < r.keys.len() {
|
||||
new_keys.push(r.keys[i2].clone());
|
||||
i2 += 1;
|
||||
}
|
||||
let mut new_phf = Self::new(new_keys);
|
||||
for key in &self.keys {
|
||||
new_phf.index_mut(key).merge(self.index(key));
|
||||
}
|
||||
for key in &r.keys {
|
||||
new_phf.index_mut(key).merge(r.index(key));
|
||||
}
|
||||
*self = new_phf;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[repr(align(32))]
|
||||
pub struct Record {
|
||||
pub count: u64,
|
||||
// Storing these as two u32 is nice, because they are read as a single u64.
|
||||
/// Byte representation of string ~b"bc.d" or ~b"\0c.d".
|
||||
pub min: u32,
|
||||
/// Byte representation of string b"bc.d" or b"\0c.d".
|
||||
pub max: u32,
|
||||
pub sum: u64,
|
||||
}
|
||||
|
||||
impl Record {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
count: 0,
|
||||
min: 0,
|
||||
max: 0,
|
||||
sum: 0,
|
||||
}
|
||||
}
|
||||
fn add(&mut self, raw_value: u32, value: u64) {
|
||||
// assert2::debug_assert!(value < 1000);
|
||||
self.count += 1;
|
||||
self.sum += value;
|
||||
// See https://en.algorithmica.org/hpc/algorithms/argmin/
|
||||
if raw_value < self.min {
|
||||
self.min = raw_value;
|
||||
}
|
||||
if raw_value > self.max {
|
||||
self.max = raw_value;
|
||||
}
|
||||
}
|
||||
fn merge(&mut self, other: &Self) {
|
||||
self.count += other.count;
|
||||
self.sum += other.sum_to_val() as u64;
|
||||
self.min = self.min.min(other.min);
|
||||
self.max = self.max.max(other.max);
|
||||
}
|
||||
fn sum_to_val(&self) -> V {
|
||||
let m = (1 << 21) - 1;
|
||||
((self.sum & m) + 10 * ((self.sum >> 21) & m) + 100 * ((self.sum >> 42) & m)) as _
|
||||
}
|
||||
/// Return (min, avg, max)
|
||||
pub fn merge_pos_neg(pos: &Record, neg: &Record) -> (V, V, V) {
|
||||
let pos_sum = pos.sum as V;
|
||||
let neg_sum = neg.sum as V;
|
||||
let sum = pos_sum - neg_sum;
|
||||
let count = (pos.count + neg.count) as V;
|
||||
// round to nearest
|
||||
let avg = (sum + count / 2).div_floor(count);
|
||||
|
||||
let pos_max = raw_to_value(pos.max);
|
||||
let neg_max = -raw_to_value(neg.min);
|
||||
let max = pos_max.max(neg_max);
|
||||
|
||||
let pos_min = raw_to_value(pos.min);
|
||||
let neg_min = -raw_to_value(neg.max);
|
||||
let min = pos_min.min(neg_min);
|
||||
|
||||
(min, avg, max)
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads raw bytes and masks the ; and the b'0'=0x30.
|
||||
/// Returns something of the form 0x0b0c..0d or 0x000c..0d
|
||||
fn parse_to_raw(data: &[u8], start: usize, end: usize) -> u32 {
|
||||
let raw = u32::from_be_bytes(unsafe { *data.get_unchecked(start..).as_ptr().cast() });
|
||||
raw >> (8 * (4 - (end - start)))
|
||||
}
|
||||
|
||||
fn raw_to_pdep(raw: u32) -> u64 {
|
||||
#[cfg(feature = "no_pdep")]
|
||||
{
|
||||
let raw = raw as u64;
|
||||
(raw & 15) | ((raw & (15 << 16)) << (21 - 16)) | ((raw & (15 << 24)) << (42 - 24))
|
||||
}
|
||||
#[cfg(not(feature = "no_pdep"))]
|
||||
{
|
||||
let mask = 0x0f0f000f;
|
||||
let raw = raw & mask;
|
||||
// input 0011bbbb0011cccc........0011dddd
|
||||
// 0b bbbb xxxxcccc yyyyyyyyyyyydddd // Deposit here
|
||||
// 0b 1111 1111 1111 // Mask out trash using &
|
||||
let pdep = 0b0000000000000000001111000000000000011111111000001111111111111111u64;
|
||||
unsafe { core::arch::x86_64::_pdep_u64(raw as u64, pdep) }
|
||||
}
|
||||
}
|
||||
|
||||
fn raw_to_value(v: u32) -> V {
|
||||
let mask = 0x0f0f000f;
|
||||
let bytes = (v & mask).to_be_bytes();
|
||||
// s = bc.d
|
||||
let b = bytes[0] as V;
|
||||
let c = bytes[1] as V;
|
||||
let d = bytes[3] as V;
|
||||
b as V * 100 * (bytes[0] != 0) as V + c as V * 10 + d as V
|
||||
}
|
||||
|
||||
pub fn format(v: V) -> String {
|
||||
format!("{:.1}", v as f64 / 10.0)
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
fn hash_name(name: &[u8]) -> u64 {
|
||||
// Hash the first and last 8 bytes.
|
||||
// TODO: More robust hash that actually uses all characters.
|
||||
let head: [u8; 8] = unsafe { *name.get_unchecked(..8).split_first_chunk().unwrap().0 };
|
||||
let tail: [u8; 8] = unsafe {
|
||||
*name
|
||||
.get_unchecked(name.len().wrapping_sub(8)..)
|
||||
.split_first_chunk()
|
||||
.unwrap()
|
||||
.0
|
||||
};
|
||||
let shift = 64usize.saturating_sub(8 * name.len());
|
||||
let khead = u64::from_ne_bytes(head) << shift;
|
||||
let ktail = u64::from_ne_bytes(tail) >> shift;
|
||||
khead.wrapping_add(ktail)
|
||||
}
|
||||
|
||||
/// Number of SIMD lanes. AVX2 has 256 bits, so 32 lanes.
|
||||
const L: usize = 32;
|
||||
/// The Simd type.
|
||||
pub type S = Simd<u8, L>;
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
struct State {
|
||||
start: usize,
|
||||
sep: usize,
|
||||
end: usize,
|
||||
}
|
||||
|
||||
/// Find the regions between \n and ; (names) and between ; and \n (values),
|
||||
/// and calls `callback` for each line.
|
||||
#[inline(always)]
|
||||
fn iter_lines<'a>(
|
||||
mut data: &'a [u8],
|
||||
mut callback: impl FnMut(&'a [u8], State, State, State, State),
|
||||
) {
|
||||
// Make sure that the out-of-bounds reads we do are OK.
|
||||
data = &data[..data.len() - 32];
|
||||
|
||||
let sep = S::splat(b';');
|
||||
let end = S::splat(b'\n');
|
||||
|
||||
let find = |last: usize, sep: S| {
|
||||
let simd = S::from_array(unsafe { *data.get_unchecked(last..).as_ptr().cast() });
|
||||
let eq = sep.simd_eq(simd).to_bitmask() as u32;
|
||||
let offset = eq.trailing_zeros() as usize;
|
||||
last + offset
|
||||
};
|
||||
// Modified to be able to search regions longer than 32.
|
||||
let find_long = |mut last: usize, sep: S| {
|
||||
let simd = S::from_array(unsafe { *data.get_unchecked(last..).as_ptr().cast() });
|
||||
let mut eq = sep.simd_eq(simd).to_bitmask() as u32;
|
||||
if eq == 0 {
|
||||
while eq == 0 {
|
||||
last += 32;
|
||||
let simd = S::from_array(unsafe { *data.get_unchecked(last..).as_ptr().cast() });
|
||||
eq = sep.simd_eq(simd).to_bitmask() as u32;
|
||||
}
|
||||
}
|
||||
let offset = eq.trailing_zeros() as usize;
|
||||
last + offset
|
||||
};
|
||||
|
||||
let init_state = |idx: usize| {
|
||||
let first_end = find_long(idx, end);
|
||||
State {
|
||||
start: first_end + 1,
|
||||
sep: first_end + 1,
|
||||
end: 0,
|
||||
}
|
||||
};
|
||||
|
||||
let mut state0 = init_state(0);
|
||||
let mut state1 = init_state(data.len() / 4);
|
||||
let mut state2 = init_state(2 * data.len() / 4);
|
||||
let mut state3 = init_state(3 * data.len() / 4);
|
||||
|
||||
// Duplicate each line for each input state.
|
||||
macro_rules! step {
|
||||
[$($s:expr),*] => {
|
||||
$($s.sep = find_long($s.sep + 1, sep) ;)*
|
||||
$($s.end = find($s.sep + 1, end) ;)*
|
||||
callback(data, $($s, )*);
|
||||
$($s.start = $s.end + 1;)*
|
||||
}
|
||||
}
|
||||
|
||||
while state3.start < data.len() {
|
||||
step!(state0, state1, state2, state3);
|
||||
}
|
||||
}
|
||||
|
||||
fn run(data: &[u8], keys: &[Vec<u8>]) -> Phf {
|
||||
// Each thread has its own accumulator.
|
||||
let mut h = Phf::new(keys.to_vec());
|
||||
iter_lines(
|
||||
data,
|
||||
|data, mut s0: State, mut s1: State, mut s2: State, mut s3: State| {
|
||||
unsafe {
|
||||
// If value is negative, extend name by one character.
|
||||
s0.sep += (data.get_unchecked(s0.sep + 1) == &b'-') as usize;
|
||||
let name0 = data.get_unchecked(s0.start..s0.sep);
|
||||
|
||||
s1.sep += (data.get_unchecked(s1.sep + 1) == &b'-') as usize;
|
||||
let name1 = data.get_unchecked(s1.start..s1.sep);
|
||||
|
||||
s2.sep += (data.get_unchecked(s2.sep + 1) == &b'-') as usize;
|
||||
let name2 = data.get_unchecked(s2.start..s2.sep);
|
||||
|
||||
s3.sep += (data.get_unchecked(s3.sep + 1) == &b'-') as usize;
|
||||
let name3 = data.get_unchecked(s3.start..s3.sep);
|
||||
|
||||
let raw0 = parse_to_raw(data, s0.sep + 1, s0.end);
|
||||
let raw1 = parse_to_raw(data, s1.sep + 1, s1.end);
|
||||
let raw2 = parse_to_raw(data, s2.sep + 1, s2.end);
|
||||
let raw3 = parse_to_raw(data, s3.sep + 1, s3.end);
|
||||
|
||||
let h0 = hash_name(name0);
|
||||
let h1 = hash_name(name1);
|
||||
let h2 = hash_name(name2);
|
||||
let h3 = hash_name(name3);
|
||||
|
||||
let idx0 = h.compute_index(h0);
|
||||
let idx1 = h.compute_index(h1);
|
||||
let idx2 = h.compute_index(h2);
|
||||
let idx3 = h.compute_index(h3);
|
||||
|
||||
h.get_index_mut(idx0).add(raw0, raw_to_pdep(raw0));
|
||||
h.get_index_mut(idx1).add(raw1, raw_to_pdep(raw1));
|
||||
h.get_index_mut(idx2).add(raw2, raw_to_pdep(raw2));
|
||||
h.get_index_mut(idx3).add(raw3, raw_to_pdep(raw3));
|
||||
}
|
||||
},
|
||||
);
|
||||
h
|
||||
}
|
||||
|
||||
pub fn run_parallel(data: &[u8], keys: &[Vec<u8>], num_threads: usize) -> Phf {
|
||||
if num_threads == 0 {
|
||||
return run(data, keys);
|
||||
}
|
||||
|
||||
let phf = std::sync::Mutex::new(Phf::new(keys.to_vec()));
|
||||
|
||||
// Spawn one thread per core.
|
||||
std::thread::scope(|s| {
|
||||
let chunks = data.chunks(data.len() / num_threads + 1);
|
||||
for chunk in chunks {
|
||||
s.spawn(|| {
|
||||
// Each thread has its own accumulator.
|
||||
let thread_phf = run(chunk, keys);
|
||||
|
||||
// Merge results.
|
||||
phf.lock().unwrap().merge(thread_phf);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
phf.into_inner().unwrap()
|
||||
}
|
||||
|
||||
pub fn to_str(name: &[u8]) -> &str {
|
||||
std::str::from_utf8(name).unwrap()
|
||||
}
|
||||
|
||||
/// Returns a list of city names found in data.
|
||||
/// Each city is returned twice, once as `<city>` and once as `<city>;`,
|
||||
/// with the latter being used to accumulate negative temperatures.
|
||||
#[inline(never)]
|
||||
pub fn find_city_names(data: &[u8]) -> Vec<Vec<u8>> {
|
||||
let mut cities = FxHashSet::default();
|
||||
|
||||
let mut callback = |data: &[u8], state: State| {
|
||||
let State { start, sep, .. } = state;
|
||||
let name = unsafe { data.get_unchecked(start..sep) };
|
||||
cities.insert(name.to_vec());
|
||||
|
||||
// Do the same for the name with ; appended.
|
||||
let name = unsafe { data.get_unchecked(start..sep + 1) };
|
||||
cities.insert(name.to_vec());
|
||||
};
|
||||
iter_lines(data, |d, s0, s1, s2, s3| {
|
||||
flatten_callback(d, s0, s1, s2, s3, &mut callback)
|
||||
});
|
||||
|
||||
let mut cities: Vec<_> = cities.into_iter().collect();
|
||||
cities.sort();
|
||||
cities
|
||||
}
|
||||
|
||||
fn flatten_callback<'a>(
|
||||
data: &'a [u8],
|
||||
s0: State,
|
||||
s1: State,
|
||||
s2: State,
|
||||
s3: State,
|
||||
callback: &mut impl FnMut(&'a [u8], State),
|
||||
) {
|
||||
callback(data, s0);
|
||||
callback(data, s1);
|
||||
callback(data, s2);
|
||||
callback(data, s3);
|
||||
}
|
||||
|
||||
#[derive(clap::Parser)]
|
||||
pub struct Args {
|
||||
pub input: Option<String>,
|
||||
|
||||
#[clap(short = 'j', long)]
|
||||
pub threads: Option<usize>,
|
||||
|
||||
#[clap(long)]
|
||||
pub print: bool,
|
||||
|
||||
#[clap(long)]
|
||||
pub stats: bool,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
#[test]
|
||||
fn parse_raw() {
|
||||
use super::*;
|
||||
let d = b"12.3";
|
||||
let raw = parse_to_raw(d, 0, 4);
|
||||
let v = raw_to_value(raw);
|
||||
assert_eq!(v, 123);
|
||||
|
||||
let d = b"12.3";
|
||||
let raw = parse_to_raw(d, 1, 4);
|
||||
let v = raw_to_value(raw);
|
||||
assert_eq!(v, 23);
|
||||
}
|
||||
}
|
||||
@@ -1,16 +1,15 @@
|
||||
use crate::models::station_measurements::StationMeasurements;
|
||||
use crate::utils::{hash, parse};
|
||||
use std::collections::HashMap;
|
||||
use std::fs::File;
|
||||
use std::io::{BufRead, BufReader};
|
||||
use std::time::Instant;
|
||||
use crate::models::station_measurements::StationMeasurements;
|
||||
use crate::utils::parse;
|
||||
use crate::utils::parse::hashstr;
|
||||
|
||||
const DEFAULT_HASHMAP_LENGTH: usize = 10000;
|
||||
|
||||
pub fn run() {
|
||||
let now = Instant::now();
|
||||
let mut stations: HashMap<usize, (String, StationMeasurements)> =
|
||||
let mut stations: HashMap<u64, (String, StationMeasurements)> =
|
||||
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
|
||||
|
||||
let file = File::open("../../../measurements.txt").expect("File measurements.txt not found");
|
||||
@@ -24,8 +23,8 @@ pub fn run() {
|
||||
break;
|
||||
}
|
||||
let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap();
|
||||
let hash = hashstr(station);
|
||||
let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) };
|
||||
let hash = hash::bytes(station);
|
||||
let station = unsafe { std::str::from_utf8_unchecked(station) };
|
||||
let temp = parse::temp(temp.split_last().unwrap().1);
|
||||
let measurements_option = stations.get_mut(&hash);
|
||||
if let Some((_, measurements)) = measurements_option {
|
||||
@@ -37,7 +36,7 @@ pub fn run() {
|
||||
count: 1,
|
||||
sum: temp,
|
||||
};
|
||||
stations.insert(hash, (station, measurements));
|
||||
stations.insert(hash, (station.to_string(), measurements));
|
||||
}
|
||||
line.clear();
|
||||
}
|
||||
@@ -51,5 +50,5 @@ pub fn run() {
|
||||
stations.sort();
|
||||
let stations = stations.join(",");
|
||||
println!("{{{stations}}}");
|
||||
println!("Time={} μs", now.elapsed().as_micros());
|
||||
println!("Time={} ms", now.elapsed().as_millis());
|
||||
}
|
||||
|
||||
@@ -2,8 +2,7 @@ use smol::fs::File;
|
||||
use smol::io::{AsyncBufReadExt, BufReader};
|
||||
|
||||
use crate::models::station_measurements::StationMeasurements;
|
||||
use crate::utils::parse;
|
||||
use crate::utils::parse::hashstr;
|
||||
use crate::utils::{hash, parse};
|
||||
use std::collections::HashMap;
|
||||
use std::time::Instant;
|
||||
|
||||
@@ -11,7 +10,7 @@ const DEFAULT_HASHMAP_LENGTH: usize = 10000;
|
||||
|
||||
pub fn run() {
|
||||
let now = Instant::now();
|
||||
let mut stations: HashMap<usize, (String, StationMeasurements)> =
|
||||
let mut stations: HashMap<u64, (String, StationMeasurements)> =
|
||||
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
|
||||
|
||||
smol::block_on(async {
|
||||
@@ -29,8 +28,8 @@ pub fn run() {
|
||||
break;
|
||||
}
|
||||
let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap();
|
||||
let hash = hashstr(station);
|
||||
let station = unsafe { String::from_utf8_unchecked(Vec::from(station)) };
|
||||
let hash = hash::bytes(station);
|
||||
let station = unsafe { std::str::from_utf8_unchecked(station) };
|
||||
let temp = parse::temp(temp.split_last().unwrap().1);
|
||||
let measurements_option = stations.get_mut(&hash);
|
||||
if let Some((_, measurements)) = measurements_option {
|
||||
@@ -42,7 +41,7 @@ pub fn run() {
|
||||
count: 1,
|
||||
sum: temp,
|
||||
};
|
||||
stations.insert(hash, (station, measurements));
|
||||
stations.insert(hash, (station.to_string(), measurements));
|
||||
}
|
||||
line.clear();
|
||||
}
|
||||
|
||||
@@ -2,7 +2,8 @@
|
||||
#![feature(portable_simd)]
|
||||
#![feature(slice_split_once)]
|
||||
#![feature(hash_raw_entry)]
|
||||
#![feature(int_roundings)]
|
||||
|
||||
pub mod implementations;
|
||||
pub mod models;
|
||||
pub mod utils;
|
||||
pub mod implementations;
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
|
||||
fn main() {
|
||||
// let now = Instant::now();
|
||||
// let file = File::open("../../../measurements.txt").expect("File measurements.txt not found");
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
pub mod station_measurements;
|
||||
pub mod mmap;
|
||||
pub mod station_measurements;
|
||||
|
||||
@@ -12,7 +12,7 @@ pub struct Mmap<'a> {
|
||||
|
||||
/// To properly dispose of the mmap we have to manually call munmap.
|
||||
/// So implementing drop for this smart-pointer type is necessary.
|
||||
impl<'a> Drop for Mmap<'a> {
|
||||
impl Drop for Mmap<'_> {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
munmap(
|
||||
@@ -25,7 +25,7 @@ impl<'a> Drop for Mmap<'a> {
|
||||
|
||||
// anti-pattern for non-smart pointer types.
|
||||
// ref: https://rust-unofficial.github.io/patterns/anti_patterns/deref.html
|
||||
impl<'a> Deref for Mmap<'a> {
|
||||
impl Deref for Mmap<'_> {
|
||||
type Target = [u8];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
@@ -50,7 +50,7 @@ impl<'a> Mmap<'a> {
|
||||
// We can advise the kernel on how we intend to use the mmap.
|
||||
// But this did not improve my read performance in a meaningful way
|
||||
madvise(m, size, MADV_WILLNEED);
|
||||
return Self::new(std::slice::from_raw_parts(m as *const u8, size));
|
||||
Self::new(std::slice::from_raw_parts(m as *const u8, size))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
pub mod byte_pos;
|
||||
pub mod hash;
|
||||
pub mod parse;
|
||||
pub mod write_structured_measurements;
|
||||
|
||||
|
||||
40
src/main/rust/src/utils/hash.rs
Normal file
40
src/main/rust/src/utils/hash.rs
Normal file
@@ -0,0 +1,40 @@
|
||||
#[inline]
|
||||
pub fn bytes(bytes: &[u8]) -> u64 {
|
||||
if cfg!(not(debug_assertions)) {
|
||||
// inspired by https://curiouscoding.nl/posts/1brc/
|
||||
let head: [u8; 8] = unsafe { bytes.get_unchecked(..8).as_chunks::<8>().0[0] };
|
||||
let tail: [u8; 8] = unsafe { bytes.get_unchecked(bytes.len() - 8..).as_chunks::<8>().0[0] };
|
||||
let shift = 64usize.saturating_sub(8 * bytes.len());
|
||||
let khead = u64::from_ne_bytes(head) << shift;
|
||||
let ktail = u64::from_ne_bytes(tail) >> shift;
|
||||
khead + ktail
|
||||
} else {
|
||||
// debug friendly but slow
|
||||
let mut head = [0u8; 8];
|
||||
let mut tail = [0u8; 8];
|
||||
let end = bytes.len().min(8);
|
||||
let start = bytes.len().saturating_sub(8);
|
||||
head[..end].copy_from_slice(&bytes[..end]);
|
||||
tail[..end].copy_from_slice(&bytes[start..]);
|
||||
let shift = 64usize.saturating_sub(8 * bytes.len());
|
||||
let khead = u64::from_ne_bytes(head) << shift;
|
||||
let ktail = u64::from_ne_bytes(tail) >> shift;
|
||||
khead.wrapping_add(ktail)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::utils::hash;
|
||||
|
||||
#[test]
|
||||
fn test_hashstr() {
|
||||
let hash_1 = hash::bytes(b"abcdefghijk");
|
||||
let hash_2 = hash::bytes(b"kjihgfedcba");
|
||||
let hash_3 = hash::bytes(b"abba");
|
||||
let hash_4 = hash::bytes(b"baab");
|
||||
|
||||
assert_ne!(hash_1, hash_2);
|
||||
assert_ne!(hash_3, hash_4);
|
||||
}
|
||||
}
|
||||
@@ -69,27 +69,9 @@ pub fn temp_simd(bytes: &[u8]) -> isize {
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn hashstr(bytes: &[u8]) -> usize {
|
||||
let mut hash = 0;
|
||||
let (chunks, remainder) = bytes.as_chunks::<8>();
|
||||
for &chunk in chunks {
|
||||
hash += usize::from_be_bytes(chunk);
|
||||
}
|
||||
let mut r = [0_u8; 8];
|
||||
r[0] = remainder.len() as u8;
|
||||
let mut idx = 1;
|
||||
for &byte in remainder {
|
||||
r[idx] = byte;
|
||||
idx += 1;
|
||||
}
|
||||
hash += usize::from_be_bytes(r);
|
||||
hash
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::utils::parse::{hashstr, temp_new};
|
||||
use crate::utils::parse::temp_new;
|
||||
|
||||
#[test]
|
||||
fn test_temp_new_max() {
|
||||
@@ -120,15 +102,4 @@ mod tests {
|
||||
let temp_neg_10 = temp_new("-9.9".as_bytes());
|
||||
assert_eq!(temp_neg_10, -99);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_hashstr() {
|
||||
let hash_1 = hashstr(b"abcdefghijk");
|
||||
let hash_2 = hashstr(b"kjihgfedcba");
|
||||
let hash_3 = hashstr(b"abba");
|
||||
let hash_4 = hashstr(b"baab");
|
||||
|
||||
assert_ne!(hash_1, hash_2);
|
||||
assert_ne!(hash_3, hash_4);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,8 @@ use std::io::{BufRead, BufReader, BufWriter, Write};
|
||||
|
||||
pub fn write_structured_measurements() {
|
||||
let file = File::open("../../../measurements.txt").expect("File measurements.txt not found");
|
||||
let structured_file = File::create_new("structured_measurements.txt").expect("Could not create file");
|
||||
let structured_file =
|
||||
File::create_new("structured_measurements.txt").expect("Could not create file");
|
||||
let mut reader = BufReader::new(&file);
|
||||
let mut writer = BufWriter::new(&structured_file);
|
||||
let mut line = Vec::with_capacity(107);
|
||||
@@ -27,7 +28,9 @@ pub fn write_structured_measurements() {
|
||||
write_line[100] = b';';
|
||||
write_line[temp_val_start..temp_val_start + temp.len()].clone_from_slice(temp);
|
||||
write_line[106] = b'\n';
|
||||
writer.write_all(write_line.as_slice()).expect("Could not write");
|
||||
writer
|
||||
.write_all(write_line.as_slice())
|
||||
.expect("Could not write");
|
||||
line.clear();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user