Compare commits

...

63 Commits

Author SHA1 Message Date
fdd92dd5f7 Make it more like libraries.rs, tried reading whole file into vec, but it's very slow, maybe slice is better 2024-12-31 12:58:04 +01:00
45b3014cbb Store station name as byte slice and only convert to string once at the end, but using u64 hash as key is still faster 2024-12-31 11:24:31 +01:00
98cd6e930c Make faster by splitting borrows of byte slice 2024-12-31 11:13:34 +01:00
40a8d6d929 Some cleanup 2024-12-31 10:48:10 +01:00
0ea10a3c1b Ultra slow lua solution (aka as slow as my naive rust solution) 2024-09-02 15:15:28 +02:00
3dbc9c32d1 Fix warning of deprecated key name default_features 2024-09-02 09:27:02 +02:00
4d586c809e Under 2 minutes, could probably start optimizing as I did for rust, but it's good enough for now 2024-08-30 15:33:40 +02:00
5bb2363eee mmapped file slightly faster 2024-08-30 14:23:46 +02:00
eb2ed15e33 First julia impl, very slow 2024-08-30 13:39:21 +02:00
dfcc8562e6 ran cargo clippy 2024-08-30 10:09:39 +02:00
212e595a7e ran cargo fmt 2024-08-30 10:00:00 +02:00
7b8943976f Revert last commit
Accidentally kept the hashing and when I removed it the solution slowed down again

This reverts commit aaa11c7b94.
2024-08-28 14:18:39 +02:00
aaa11c7b94 Worse than fx (despite me hashing the station names there too) better than vanilla 2024-08-28 14:07:16 +02:00
07a8e7fc69 add solution I found which helped me get faster, unfortunately the solution itself is false (?) 2024-08-28 13:09:47 +02:00
0aa9d8be86 changed hash to add debug mode 2024-08-28 11:32:35 +02:00
b1c064a92f changed key hash 2024-08-28 09:37:48 +02:00
ea06a600ce changed key hash 2024-08-28 09:37:38 +02:00
ac5c45f8d5 fxhashmap faster afterall... 2024-08-28 08:52:40 +02:00
b8f589096f extract hash into own module 2024-08-27 13:54:23 +02:00
c306083192 run cargo fmt 2024-08-27 13:24:14 +02:00
a45ddd2dc0 use easy_parallel from smol project instead of std::thread. No performance improvement, but easier 2024-08-27 13:23:51 +02:00
e832475fc3 stuff 2024-08-27 11:48:08 +02:00
608cbb59e5 managed to get faster again by searching hashmap using &str and only converting to String on insertion. Removed FxHashMap again 2024-08-27 10:57:23 +02:00
53ea542f36 update dependencies, create benches 2024-08-19 14:58:34 +02:00
d246c54cd9 removed polars example because it wouldn't compile anymore, even when updating to latest version. It also massively reduced the number of downloaded crates 2024-08-19 13:55:19 +02:00
2a89d061a0 Use FxHashMap for multi_threaded_smol.rs 2024-08-19 10:57:18 +02:00
7add8793a5 managed to get library version on par with the reference implementation by using memmap2. I'm understanding scoped threads a little better now... I think
The fixed line length solution is still just as slow, even using memmap
2024-08-19 10:39:19 +02:00
b1d7ebaaea Create multi threaded version using smol for async reading of the file ~1 seconds faster 2024-08-13 14:14:42 +02:00
bbc89aa2b3 Create single threaded version using smol for async reading of the file 46 -> 40 seconds 2024-08-13 13:50:49 +02:00
b53212103b Tried looking at what the performance would be if I read the data from a file where every line would have the same length ie. not having to read until eol. But despite not having to search the \n byte (or ; because every station name and temperatures are padded with null bytes) and having a fixed size array instead of vec this is slower (the normal read_until version is actually still just as fast, while the new one is 10x slower) 2024-08-12 10:48:07 +02:00
8ffea918c4 either it's slightly faster or it's a measurement error 2024-08-05 12:52:57 +02:00
3b3801ba0d Going back because compile times trippled 2024-08-05 11:22:08 +02:00
1c066ec113 accidentally committed bug 2024-08-05 10:54:59 +02:00
13c54a2811 FxHashMap made me faster, memmap makes me slower, guess I'm using it wrong 2024-08-05 10:53:17 +02:00
40627f9aeb add solution using libraries to see how fast I can get. For now no difference 2024-08-02 11:43:23 +02:00
45ae29d3cd organize differently, added criterion as dev-dependency for benchmarks 2024-08-01 15:02:24 +02:00
8eefe06e8b Formatting 2024-08-01 10:23:14 +02:00
34768d3ec1 forgot feature flag in last commit 2024-07-31 14:00:00 +02:00
0ffbff4cbf added 2 new implementations from users who commented on my reddit post as a comparison 2024-07-31 13:58:42 +02:00
25d20169aa some improvements by reducing the casts, implemented a get_pos function to get position of byte in byteslice by bitmasking... slightly slower 2024-07-31 12:49:16 +02:00
2c23e30fe0 hash stationnames myself for faster HashMap 2024-07-31 09:27:02 +02:00
da72f38e42 don't need own parse_line function 2024-07-29 12:09:05 +02:00
5aa94e67d1 use read_until method instead of custom function because it is faster than mine using the Bytes struct 2024-07-25 15:35:10 +02:00
c6b8273d65 applied clippy 2024-07-25 13:05:53 +02:00
e230a5ce2c My multi-treaded version is now faster than polars and takes less time to compile. It's a little more complex though 2024-07-25 10:03:37 +02:00
dcaca0cc65 managed to make my solution super fast... but it's also incorrect 2024-07-24 15:19:17 +02:00
16cf4ca2ca moved from reading String to reading bytes. A little faster, still need to implement for multithreaded solution 2024-07-23 16:30:50 +02:00
b4e3992c65 mostly made output a bit nicer 2024-07-23 13:23:26 +02:00
393f802741 remove the need for mutex by using channels 2024-07-23 13:23:26 +02:00
327fe8564e use scopes to allow to use mutex without arc
apply clippy to my implementations
2024-07-23 13:23:26 +02:00
14d608b209 Fastest yet with scaled integers instead of floats 2024-07-23 13:23:26 +02:00
2f82e8788d moved rust implementation to /src/main/rust 2024-07-23 13:23:26 +02:00
91adbf4c1c Actually I just needed to add target-cpu=native to [build] in Cargo.toml, did that and create HashMap with capacity, also added reference implementation (which uses libraries unlike my solution) 2024-07-23 13:23:26 +02:00
00096647ee The polars solution I saw on reddit a few months ago is also super slow. I'm thinking it might be my macbook 2024-07-23 13:23:26 +02:00
fe1053b74a Multi threaded works now but it's slower than single threaded... 2024-07-23 13:23:26 +02:00
3ffed9099c Still broken but it compiles. For some reason lines are not properly being read and only one thread is being run 2024-07-23 13:23:26 +02:00
65df621cf6 use hashbrown instead of std hashmap, no performance improvements 2024-07-23 13:23:26 +02:00
bd83b9bc2c not working multi threaded solution 2024-07-23 13:23:26 +02:00
15525282d6 modified .gitignore 2024-07-23 13:23:26 +02:00
6cc29fb645 single threaded solution in bin 2024-07-23 13:23:26 +02:00
6f548678f2 single threaded solution 2024-07-23 13:23:26 +02:00
Antonio Goncalves
3372b6b290 Adding Antonio Goncalves' blog post 2024-03-08 11:58:47 +01:00
Ruslan Kovtun
dfec2cdbe6 Fixes progress bar for create_measurements.py 2024-03-03 13:10:46 +01:00
45 changed files with 4057 additions and 6 deletions

2
.gitignore vendored
View File

@@ -9,7 +9,7 @@ release.properties
.project
.classpath
.settings/
bin/
./bin/
# IntelliJ
.idea

View File

@@ -533,6 +533,7 @@ A list of external resources such as blog posts and videos, discussing 1BRC and
* [1BRC - What a Journey](https://www.esolutions.tech/1brc-what-a-journey), by Marius Staicu (blog post)
* [One Billion Rows Challenge in Golang](https://www.bytesizego.com/blog/one-billion-row-challenge-go), by Shraddha Agrawal (blog post)
* [The Billion Row Challenge (1BRC) - Step-by-step from 71s to 1.7s](https://questdb.io/blog/billion-row-challenge-step-by-step/) by Marko Topolnik (blog post)
* [Entering The One Billion Row Challenge With GitHub Copilot](https://devblogs.microsoft.com/java/entering-the-one-billion-row-challenge-with-github-copilot/) by Antonio Goncalves (blog post)
## License

91
src/main/julia/main.jl Normal file
View File

@@ -0,0 +1,91 @@
using Mmap
mutable struct StationMeasurements
min::Float64
max::Float64
sum::Float64
count::Int64
end
function update(sm, temp::Float64)
if temp < min
sm.min = temp
elseif temp > max
sm.max = temp
end
sm.sum += temp
sm.count += 1
end
function print_measurements(stations::Dict{String,StationMeasurements})
sorted_keys = sort(collect(keys(stations)))
print("{")
sm_vec = []
for city in sorted_keys
sm = stations[city]
min = round(sm.min; digits=1)
max = round(sm.max; digits=1)
avg = round((sm.sum / sm.count); digits=1)
push!(sm_vec, "$city=$min/$avg/$max")
end
joined = join(sm_vec, ", ")
print(joined)
print("}")
end
function merge(stations_vec::Vector{Dict{String,StationMeasurements}})
merged = Dict{String,StationMeasurements}()
for stations in stations_vec
for (city, sm) in stations
if haskey(merged, city)
merged_sm = merged[city]
sm.min = ifelse(merged_sm.min < sm.min, merged_sm.min, sm.min)
sm.max = ifelse(merged_sm.max > sm.max, merged_sm.max, sm.max)
sm.sum += merged_sm.sum
sm.count += merged_sm.count
else
merged[city] = sm
end
end
end
merged
end
function process_chunk(data, chunk)
stations = Dict{String,StationMeasurements}()
for i in eachindex(chunk)
if i == 1
continue
end
line = String(data[chunk[i-1]:chunk[i]-1])
station, temp_str = rsplit(line, ";")
temp = parse(Float32, temp_str)
if haskey(stations, station)
sm = stations[station]
sm.min = ifelse(temp < sm.min, temp, sm.min)
sm.max = ifelse(temp > sm.max, temp, sm.max)
sm.sum += temp
sm.count += 1
else
stations[station] = StationMeasurements(temp, temp, temp, 1)
end
end
stations
end
function main()
open("../../../measurements.txt", "r") do f
sz = Base.stat(f).size
data = mmap(f, Vector{UInt8}, sz)
idxs = findall(isequal(0x0a), data)
idxs_chunks = collect(Iterators.partition(idxs, length(idxs) ÷ Threads.nthreads()))
tasks = map(idxs_chunks) do chunk
Threads.@spawn process_chunk(data, chunk)
end
stations_vec = fetch.(tasks)
stations = merge(stations_vec)
print_measurements(stations)
end
end
main()

57
src/main/lua/main.lua Normal file
View File

@@ -0,0 +1,57 @@
local file_path = "../../../measurements.txt"
local file = io.open(file_path, "rb")
if not file then
print("Unable to open file")
return
end
local function split_semi(inputstr)
local t = {}
for str in string.gmatch(inputstr, "([^;]+)") do
table.insert(t, str)
end
return t
end
local stations = {}
local iterations = 1
for line in file:lines("*l") do
if iterations % 1000000 == 0 then
io.write("\x1b[J\x1b[H")
io.write(iterations / 10000000)
io.write("\n")
end
local split_line = split_semi(line)
local station = split_line[1]
local temp_str = string.gsub(split_line[2], "[ ]", "")
local temp = tonumber(temp_str)
if stations[station] == nil then
stations[station] = { min = temp, max = temp, sum = temp, count = 1 }
else
if temp < stations[station].min then
stations[station].min = temp
elseif temp > stations[station].max then
stations[station].max = temp
end
stations[station].sum = stations[station].sum + temp
stations[station].count = stations[station].count + 1
end
iterations = iterations + 1
end
local keys = {}
for k in pairs(stations) do table.insert(keys, k) end
table.sort(keys)
local fstations = {}
io.write("{")
for _, station in ipairs(keys) do
local avg = ((stations[station].sum / 10) / stations[station].count)
local res_str = string.format("%s=%.1f/%.1f/%.1f", station, stations[station].min, avg, stations[station].max)
table.insert(fstations, res_str)
end
io.write(table.concat(fstations, ","))
print("}")

View File

@@ -107,21 +107,23 @@ def build_test_data(weather_station_names, num_rows_to_create):
hottest_temp = 99.9
station_names_10k_max = random.choices(weather_station_names, k=10_000)
batch_size = 10000 # instead of writing line by line to file, process a batch of stations and put it to disk
progress_step = max(1, (num_rows_to_create // batch_size) // 100)
chunks = num_rows_to_create // batch_size
print('Building test data...')
try:
with open("../../../data/measurements.txt", 'w') as file:
for s in range(0,num_rows_to_create // batch_size):
progress = 0
for chunk in range(chunks):
batch = random.choices(station_names_10k_max, k=batch_size)
prepped_deviated_batch = '\n'.join([f"{station};{random.uniform(coldest_temp, hottest_temp):.1f}" for station in batch]) # :.1f should quicker than round on a large scale, because round utilizes mathematical operation
file.write(prepped_deviated_batch + '\n')
# Update progress bar every 1%
if s % progress_step == 0 or s == num_rows_to_create - 1:
sys.stdout.write('\r')
sys.stdout.write("[%-50s] %d%%" % ('=' * int((s + 1) / num_rows_to_create * 50), (s + 1) / num_rows_to_create * 100))
if (chunk + 1) * 100 // chunks != progress:
progress = (chunk + 1) * 100 // chunks
bars = '=' * (progress // 2)
sys.stdout.write(f"\r[{bars:<50}] {progress}%")
sys.stdout.flush()
sys.stdout.write('\n')
except Exception as e:

1536
src/main/rust/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

62
src/main/rust/Cargo.toml Normal file
View File

@@ -0,0 +1,62 @@
[package]
name = "onebrc"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bstr = "1.10.0"
fast-float = "0.2.0"
memchr = "2.7.4"
memmap2 = "0.9.4"
rayon = "1.10.0"
rustc-hash = "2.0.0"
libc = "0.2.158"
smol = "2.0.1"
easy-parallel = "3.3.1"
clap = { version = "4.5.13", features = ["derive"] }
colored = "2.1.0"
ptr_hash = { git = "https://github.com/ragnargrootkoerkamp/ptrhash", default-features = false }
[dev-dependencies]
criterion = { version = "0.5.1", features = ["html_reports"] }
[features]
json = []
unsafe = []
no_pdep = []
[[bench]]
name = "reference_impl"
harness = false
[[bench]]
name = "single_thread"
harness = false
[[bench]]
name = "multi_threaded"
harness = false
[[bench]]
name = "multi_threaded_smol"
harness = false
[[bench]]
name = "flare_flo"
harness = false
[[bench]]
name = "phcs"
harness = false
[profile.release]
lto = "fat"
strip = "symbols"
panic = "abort"
[profile.flamegraph]
inherits = "release"
debug = true
strip = "none"

View File

@@ -0,0 +1,9 @@
use criterion::{criterion_group, criterion_main, Criterion};
use onebrc::implementations::flare_flo::run;
pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("flareflo", |b| b.iter(run));
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -0,0 +1,9 @@
use criterion::{criterion_group, criterion_main, Criterion};
use onebrc::implementations::libraries::run;
pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("libraries", |b| b.iter(run));
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -0,0 +1,9 @@
use criterion::{criterion_group, criterion_main, Criterion};
use onebrc::implementations::multi_threaded::run;
pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("multithreaded", |b| b.iter(run));
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -0,0 +1,9 @@
use criterion::{criterion_group, criterion_main, Criterion};
use onebrc::implementations::multi_threaded_smol::run;
pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("multithreadedsmol", |b| b.iter(run));
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -0,0 +1,9 @@
use criterion::{criterion_group, criterion_main, Criterion};
use onebrc::implementations::phcs::run;
pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("phcs", |b| b.iter(run));
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -0,0 +1,9 @@
use criterion::{criterion_group, criterion_main, Criterion};
use onebrc::implementations::reference_impl::run;
pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("reference", |b| b.iter(run));
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -0,0 +1,9 @@
use criterion::{criterion_group, criterion_main, Criterion};
use onebrc::implementations::single_thread::run;
pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("singlethread", |b| b.iter(run));
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -0,0 +1,5 @@
use onebrc::implementations::flare_flo::run;
fn main() {
run();
}

View File

@@ -0,0 +1,5 @@
use onebrc::implementations::libraries::run;
fn main() {
run();
}

View File

@@ -0,0 +1,5 @@
use onebrc::implementations::multi_threaded::run;
fn main() {
run();
}

View File

@@ -0,0 +1,5 @@
use onebrc::implementations::multi_threaded_smol::run;
fn main() {
run();
}

View File

@@ -0,0 +1,5 @@
use onebrc::implementations::multi_threaded_structured::run;
fn main() {
run();
}

View File

@@ -0,0 +1,5 @@
use onebrc::implementations::phcs::run;
fn main() {
let _ = run();
}

View File

@@ -0,0 +1,5 @@
use onebrc::implementations::reference_impl::run;
fn main() {
run();
}

View File

@@ -0,0 +1,91 @@
use clap::Parser;
use colored::Colorize;
use memmap2::Mmap;
use onebrc::implementations::rgk::{
find_city_names, format, run_parallel, to_str, Args, Record, S,
};
use std::thread::available_parallelism;
fn main() {
let args = Args::parse();
let start = std::time::Instant::now();
let filename = args
.input
.unwrap_or("../../../measurements.txt".to_string());
let mmap: Mmap;
let data;
{
let file = std::fs::File::open(filename).unwrap();
mmap = unsafe { Mmap::map(&file).unwrap() };
data = &*mmap;
}
// Guaranteed to be aligned for SIMD.
let offset = unsafe { data.align_to::<S>().0.len() };
let data = &data[offset..];
// Build a perfect hash function on the cities found in the first 100k characters.
let names = find_city_names(&data[..4000000]);
if args.stats {
eprintln!("Num cities: {}", names.len());
let mut lens = vec![0; 102];
for n in &names {
if *n.last().unwrap() == b';' {
continue;
}
lens[n.len()] += 1;
}
for (len, count) in lens.iter().enumerate() {
if *count != 0 {
eprintln!("{}: {}", len, count);
}
}
}
let phf = run_parallel(
data,
&names,
args.threads
.unwrap_or(available_parallelism().unwrap().into()),
);
if args.print {
print!("{{");
let mut first = true;
let mut keys = phf.keys.clone();
keys.sort_by(|kl, kr| to_str(kl).cmp(to_str(kr)));
for name in &keys {
if *name.last().unwrap() != b';' {
continue;
}
let namepos = &name[..name.len() - 1];
let rpos = phf.index(namepos);
let rneg = phf.index(name);
let (min, avg, max) = Record::merge_pos_neg(rpos, rneg);
if !first {
print!(", ");
}
first = false;
print!(
"{}={}/{}/{}",
to_str(namepos),
format(min),
format(avg),
format(max)
);
}
println!("}}");
}
eprintln!(
"total: {}",
format!("{:>5.2?}", start.elapsed()).bold().green()
);
}

View File

@@ -0,0 +1,5 @@
use onebrc::implementations::single_thread::run;
fn main() {
run();
}

View File

@@ -0,0 +1,5 @@
use onebrc::implementations::smol::run;
fn main() {
run();
}

View File

@@ -0,0 +1,10 @@
pub mod flare_flo;
pub mod libraries;
pub mod multi_threaded;
pub mod multi_threaded_smol;
pub mod multi_threaded_structured;
pub mod phcs;
pub mod reference_impl;
pub mod rgk;
pub mod single_thread;
pub mod smol;

View File

@@ -0,0 +1,286 @@
use std::collections::HashMap;
use std::env::args;
use std::fs::File;
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
use std::ops::{Neg, Range};
use std::os::unix::fs::FileExt;
use std::sync::mpsc::{channel, Sender};
use std::thread;
use std::thread::{available_parallelism, JoinHandle};
use std::time::Instant;
#[derive(Copy, Clone, Debug)]
struct City {
min: i64,
max: i64,
sum: i64,
occurrences: u32,
}
impl City {
pub fn add_new(&mut self, input: &[u8]) {
let mut val = 0;
let mut is_neg = false;
for &char in input {
match char {
b'0'..=b'9' => {
val *= 10;
let digit = char - b'0';
val += digit as i64;
}
b'-' => {
is_neg = true;
}
b'.' => {}
_ => {
panic!("encountered {} in value", char::from(char))
}
}
}
if is_neg {
val = val.neg();
}
self.add_new_value(val);
}
pub fn add_new_value(&mut self, new: i64) {
self.min = self.min.min(new);
self.max = self.max.max(new);
self.sum += new;
self.occurrences += 1;
}
pub fn min(&self) -> f64 {
self.min as f64 / 10.0
}
pub fn mean(&self) -> f64 {
self.sum as f64 / self.occurrences as f64 / 10.0
}
pub fn max(&self) -> f64 {
self.max as f64 / 10.0
}
pub fn add_result(&mut self, other: Self) {
self.min = self.min.min(other.min);
self.max = self.max.max(other.max);
self.sum += other.sum;
self.occurrences += other.occurrences;
}
}
impl Default for City {
fn default() -> Self {
Self {
min: i64::MAX,
max: i64::MIN,
sum: 0,
occurrences: 0,
}
}
}
#[derive(Default, Clone, Debug)]
struct Citymap {
// Length then values
pub map: HashMap<u32, (String, City)>,
}
fn hashstr(s: &str) -> u32 {
let b = s.as_bytes();
u32::from_le_bytes([s.len() as u8, b[0], b[1], b[2]])
}
impl Citymap {
pub fn lookup(&mut self, lookup: &str) -> &mut City {
let hash = hashstr(lookup);
let builder = self.map.raw_entry_mut();
&mut builder
.from_key(&hash)
.or_insert(hash, (lookup.to_owned(), Default::default()))
.1
.1
}
pub fn new() -> Self {
Self {
map: Default::default(),
}
}
pub fn into_key_values(self) -> Vec<(String, City)> {
self.map.into_values().collect()
}
pub fn merge_with(&mut self, rhs: Self) {
for (k, v) in rhs.map.into_iter() {
self.map
.entry(k)
.and_modify(|lhs| {
lhs.1.add_result(v.1);
})
.or_insert(v);
}
}
}
pub fn run() {
let mut args = args();
let start = Instant::now();
let input = "../../../measurements.txt";
let results = if args.any(|e| e == "st") {
citymap_single_thread(input)
} else {
citymap_multi_threaded(input)
};
print_results(results);
println!("{:?}", start.elapsed());
}
fn citymap_single_thread(path: &str) -> Citymap {
let f = File::open(path).unwrap();
let mut buf = BufReader::with_capacity(10_usize.pow(8), f);
citymap_naive(&mut buf)
}
fn citymap_multi_threaded(path: &str) -> Citymap {
let cpus = available_parallelism().unwrap().get();
let size = File::open(path).unwrap().metadata().unwrap().len();
let per_thread = size / cpus as u64;
let mut index = 0;
let mut threads = vec![];
let (sender, receiver) = channel();
for i in 0..cpus {
let range = index..({
index += per_thread;
index.min(size)
});
threads.push(citymap_thread(path.to_owned(), range, i, sender.clone()));
}
let mut ranges = (0..cpus)
.map(|_| receiver.recv().unwrap())
.collect::<Vec<_>>();
ranges.sort_unstable_by_key(|e| e.start);
assert!(
ranges.windows(2).all(|e| {
let first = &e[0];
let second = &e[1];
first.end == second.start
}),
"Ranges overlap or have gaps: {ranges:?}"
);
threads
.into_iter()
.map(|e| e.join().unwrap())
//.map(|e|dbg!(e))
.reduce(|mut left, right| {
left.merge_with(right);
left
})
.unwrap()
}
fn citymap_thread(
path: String,
mut range: Range<u64>,
i: usize,
range_feedback: Sender<Range<u64>>,
) -> JoinHandle<Citymap> {
thread::Builder::new()
.name(format!("process_thread id: {i} assigned: {range:?}"))
.spawn(move || {
let mut file = File::open(path).unwrap();
//println!("Before: {range:?}");
// Perform alignment of buffer/range at the start
{
// Skip head alignment for start of file
if range.start != 0 {
let mut head = vec![0; 50];
let len = file.read_at(&mut head, range.start).unwrap();
head.truncate(len);
for (i, &pos) in head.iter().enumerate() {
if pos == b'\n' {
range.start += i as u64;
break;
}
}
}
// tail alignment
{
let mut head = vec![0; 50];
let len = file.read_at(&mut head, range.end).unwrap();
head.truncate(len);
for (i, &pos) in head.iter().enumerate() {
if pos == b'\n' {
range.end += i as u64;
break;
}
}
}
}
// Notify main about alignment
range_feedback.send(range.clone()).unwrap();
// Ensure we remain within bounds of the designated file range
file.seek(SeekFrom::Start(range.start)).unwrap();
let limited = BufReader::with_capacity(10_usize.pow(5), file);
let mut buffered = limited.take(range.end - range.start);
citymap_naive(&mut buffered)
})
.unwrap()
}
fn citymap_naive(input: &mut impl BufRead) -> Citymap {
let mut map = Citymap::new();
let mut buf = Vec::with_capacity(50);
loop {
let read = input.read_until(b'\n', &mut buf).unwrap();
// Stream has finished
if read == 0 {
break;
}
// Skip over just newline strings that get created by the alignment process
if buf == b"\n" {
continue;
}
let mut city = None;
let mut val = None;
for (i, &char) in buf.iter().enumerate() {
if char == b';' {
city = Some(&buf[0..i]);
val = Some(&buf[(i + 1)..(buf.len() - 1)]);
break;
}
}
#[cfg(not(feature = "unsafe"))]
let entry = map.lookup(std::str::from_utf8(city.unwrap()).unwrap());
#[cfg(feature = "unsafe")]
let entry = map.lookup(unsafe { std::str::from_utf8_unchecked(city.unwrap()) });
entry.add_new(val.unwrap());
buf.clear();
}
map
}
fn print_results(map: Citymap) {
let mut res = map.into_key_values();
res.sort_unstable_by(|(a, _), (b, _)| a.cmp(b));
print!("{{");
for (city, vals) in res {
let min = vals.min();
let mean = vals.mean();
let max = vals.max();
print!("{city}={min:.1}/{mean:.1}/{max:.1}, ")
}
println!("}}");
}

View File

@@ -0,0 +1,110 @@
use crate::models::station_measurements::StationMeasurements;
use crate::utils::{hash, parse};
use memmap2::MmapOptions;
use rustc_hash::{FxBuildHasher, FxHashMap as HashMap};
use std::slice::from_raw_parts;
use std::sync::mpsc;
use std::time::Instant;
use std::{fs::File, thread};
const DEFAULT_HASHMAP_LENGTH: usize = 10000;
pub fn run() {
let now = Instant::now();
const FILE_PATH: &str = "../../../measurements.txt";
let file = File::open(FILE_PATH).expect("File measurements.txt not found");
let mmap = unsafe { MmapOptions::new().map(&file).unwrap() };
let mmap_ptr = mmap.as_ptr();
let file_length = mmap.len();
let hasher = FxBuildHasher;
// Even if I could now just use the byte slice as a key, doing the hash is still faster
let mut stations: HashMap<u64, (&[u8], StationMeasurements)> =
HashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher);
let (tx, rx) = mpsc::channel();
let cores = thread::available_parallelism().unwrap().into();
let chunk_length = file_length / cores;
let mut bounds = Vec::with_capacity(cores + 1);
let mut start = 0;
for _ in 0..cores {
let end = (start + chunk_length).min(mmap.len());
let next_new_line = match memchr::memchr(b'\n', &mmap[end..]) {
Some(v) => v,
None => {
assert_eq!(end, mmap.len());
0
}
};
let end = end + next_new_line;
bounds.push((start, end));
start = end + 1;
}
thread::scope(|s| {
for i in 0..cores {
let tx = tx.clone();
let (start, end) = *bounds.get(i).unwrap();
let mmap_slice = unsafe { from_raw_parts(mmap_ptr.add(start), end - start) };
s.spawn(move || {
let mut t_stations: HashMap<u64, (&[u8], StationMeasurements)> =
HashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher);
for line in mmap_slice.split(|&byte| byte == b'\n') {
if line.is_empty() {
break;
}
let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap();
let hash = hash::bytes(station);
let temp = parse::temp(temp);
let measurements_option = t_stations.get_mut(&hash);
if let Some((_, measurements)) = measurements_option {
measurements.update(temp);
} else {
let measurements = StationMeasurements {
min: temp,
max: temp,
count: 1,
sum: temp,
};
t_stations.insert(hash, (station, measurements));
}
}
let _ = tx.send(t_stations);
});
}
drop(tx);
while let Ok(t_stations) = rx.recv() {
for (hash, (station, measurements)) in t_stations.iter() {
let joined_measurements_options = stations.get_mut(hash);
if let Some((_, joined_measurements)) = joined_measurements_options {
joined_measurements.merge(measurements);
} else {
stations.insert(*hash, (station, *measurements));
}
}
}
let mut stations: Vec<String> = stations
.iter()
.map(|(_, (station, measurements))| {
let station = unsafe { std::str::from_utf8_unchecked(station) };
let measurements = measurements.to_string();
#[cfg(feature = "json")]
{
format!("{{\"{station}\":\"{measurements}\"}}")
}
#[cfg(not(feature = "json"))]
{
format!("{station}={measurements}")
}
})
.collect();
stations.sort();
let stations = stations.join(",");
#[cfg(feature = "json")]
{
println!("\n\n[{stations}]");
}
#[cfg(not(feature = "json"))]
{
println!("\n\n{{{stations}}}");
}
println!("\n\nTime={} ms", now.elapsed().as_millis());
});
}

View File

@@ -0,0 +1,118 @@
use crate::models::station_measurements::StationMeasurements;
use crate::utils::{hash, parse};
use std::collections::HashMap;
use std::io::{BufRead, Seek, SeekFrom};
use std::sync::mpsc;
use std::time::Instant;
use std::{fs::File, io::BufReader, thread};
const DEFAULT_HASHMAP_LENGTH: usize = 10000;
pub fn run() {
const FILE_PATH: &str = "../../../measurements.txt";
let now = Instant::now();
thread::scope(|s| {
let mut stations: HashMap<u64, (String, StationMeasurements)> =
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
let (tx, rx) = mpsc::channel();
let cores = thread::available_parallelism().unwrap().into();
let file = File::open(FILE_PATH).expect("File measurements.txt not found");
let mut reader = BufReader::new(&file);
let file_length = reader.seek(SeekFrom::End(0)).unwrap();
let chunk_length = file_length as usize / cores;
let mut bounds = Vec::with_capacity(cores + 1);
let mut start = 0;
for i in 0..cores {
let mut reader = BufReader::new(&file);
let mut end = chunk_length * i;
reader
.seek(SeekFrom::Start(end as u64))
.expect("could not seek");
let mut line = Vec::with_capacity(108);
let line_len = reader
.read_until(b'\n', &mut line)
.expect("could not read bytes");
end += line_len;
bounds.push((start, end));
start = end + 1;
}
for i in 0..cores {
let tx = tx.clone();
let (mut currposition, end) = *bounds.get(i).unwrap();
s.spawn(move || {
let file = File::open(FILE_PATH).expect("File measurements.txt not found");
let mut reader = BufReader::new(&file);
reader.seek(SeekFrom::Start(currposition as u64)).unwrap();
let mut t_stations: HashMap<u64, (String, StationMeasurements)> =
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
let mut line = Vec::with_capacity(108);
loop {
let line_len = reader
.read_until(b'\n', &mut line)
.expect("could not read bytes");
if line_len == 0 {
break;
}
let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap();
let hash = hash::bytes(station);
let station = unsafe { std::str::from_utf8_unchecked(station) };
let temp = parse::temp(temp.split_last().unwrap().1);
let measurements_option = t_stations.get_mut(&hash);
if let Some((_, measurements)) = measurements_option {
measurements.update(temp);
} else {
let measurements = StationMeasurements {
min: temp,
max: temp,
count: 1,
sum: temp,
};
t_stations.insert(hash, (station.to_string(), measurements));
}
currposition += line_len;
if currposition >= end {
break;
}
line.clear();
}
let _ = tx.send(t_stations);
});
}
drop(tx);
while let Ok(t_stations) = rx.recv() {
for (&hash, (station, measurements)) in t_stations.iter() {
let joined_measurements_options = stations.get_mut(&hash);
if let Some((_, joined_measurements)) = joined_measurements_options {
joined_measurements.merge(measurements);
} else {
stations.insert(hash, (station.to_owned(), *measurements));
}
}
}
let mut stations: Vec<String> = stations
.iter()
.map(|(_, (station, measurements))| {
let measurements = measurements.to_string();
#[cfg(feature = "json")]
{
format!("{{\"{station}\":\"{measurements}\"}}")
}
#[cfg(not(feature = "json"))]
{
format!("{station}={measurements}")
}
})
.collect();
stations.sort();
let stations = stations.join(",");
#[cfg(feature = "json")]
{
println!("\n\n[{stations}]");
}
#[cfg(not(feature = "json"))]
{
println!("\n\n{{{stations}}}");
}
println!("\n\nTime={} ms", now.elapsed().as_millis());
});
}

View File

@@ -0,0 +1,128 @@
use smol::fs::File;
use smol::io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom};
use std::collections::HashMap;
use crate::models::station_measurements::StationMeasurements;
use crate::utils::{hash, parse};
use easy_parallel::Parallel;
use std::thread;
use std::time::Instant;
const DEFAULT_HASHMAP_LENGTH: usize = 10000;
pub fn run() {
const FILE_PATH: &str = "../../../measurements.txt";
let now = Instant::now();
let mut stations: HashMap<u64, (String, StationMeasurements)> =
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
let cores = thread::available_parallelism().unwrap().into();
let bounds = smol::block_on(async {
let mut file = File::open(FILE_PATH)
.await
.expect("File measurements.txt not found");
let mut reader = BufReader::new(&mut file);
let file_length = reader.seek(SeekFrom::End(0)).await.unwrap();
let chunk_length = file_length as usize / cores;
let mut bounds = Vec::with_capacity(cores + 1);
bounds.push(0);
for i in 1..cores {
let mut reader = BufReader::new(&mut file);
let mut byte_start = chunk_length * i;
reader
.seek(SeekFrom::Start(byte_start as u64))
.await
.expect("could not seek");
let mut line = Vec::with_capacity(108);
let line_len = reader
.read_until(b'\n', &mut line)
.await
.expect("could not read bytes");
byte_start += line_len;
bounds.push(byte_start as u64);
}
bounds.push(file_length);
bounds
});
let t_stations_vec = Parallel::new()
.each(0..cores, |i| {
let mut currposition = *bounds.get(i).unwrap();
let end = *bounds.get(i + 1).unwrap();
smol::block_on(async {
let mut file = File::open(FILE_PATH)
.await
.expect("File measurements.txt not found");
let mut reader = BufReader::new(&mut file);
reader.seek(SeekFrom::Start(currposition)).await.unwrap();
let mut t_stations: HashMap<u64, (String, StationMeasurements)> =
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
let mut line = Vec::with_capacity(108);
loop {
let line_len = reader
.read_until(b'\n', &mut line)
.await
.expect("could not read bytes");
if line_len == 0 {
break;
}
let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap();
let hash = hash::bytes(station);
let station = unsafe { std::str::from_utf8_unchecked(station) };
let temp = parse::temp(temp.split_last().unwrap().1);
let measurements_option = t_stations.get_mut(&hash);
if let Some((_, measurements)) = measurements_option {
measurements.update(temp);
} else {
let measurements = StationMeasurements {
min: temp,
max: temp,
count: 1,
sum: temp,
};
t_stations.insert(hash, (station.to_string(), measurements));
}
currposition += line_len as u64;
if currposition >= end {
break;
}
line.clear();
}
t_stations
})
})
.run();
for t_stations in t_stations_vec {
for (hash, (station, measurements)) in t_stations.iter() {
let joined_measurements_options = stations.get_mut(hash);
if let Some((_, joined_measurements)) = joined_measurements_options {
joined_measurements.merge(measurements);
} else {
stations.insert(*hash, (station.to_owned(), *measurements));
}
}
}
let mut stations: Vec<String> = stations
.iter()
.map(|(_, (station, measurements))| {
let measurements = measurements.to_string();
#[cfg(feature = "json")]
{
format!("{{\"{station}\":\"{measurements}\"}}")
}
#[cfg(not(feature = "json"))]
{
format!("{station}={measurements}")
}
})
.collect();
stations.sort();
let stations = stations.join(",");
#[cfg(feature = "json")]
{
println!("\n\n[{stations}]");
}
#[cfg(not(feature = "json"))]
{
println!("\n\n{{{stations}}}");
}
println!("\n\nTime={} ms", now.elapsed().as_millis());
}

View File

@@ -0,0 +1,120 @@
use crate::models::station_measurements::StationMeasurements;
use crate::utils::parse;
use memmap2::MmapOptions;
use rustc_hash::{FxBuildHasher, FxHashMap};
use std::ffi::CStr;
use std::sync::mpsc;
use std::time::Instant;
use std::{fs::File, thread};
const DEFAULT_HASHMAP_LENGTH: usize = 10000;
pub fn run() {
print!("\x1b[J");
const FILE_PATH: &str = "structured_measurements.txt";
let now = Instant::now();
let file = File::open(FILE_PATH).expect("File structured_measurements.txt not found");
let mmap = unsafe { MmapOptions::new().map(&file).unwrap() };
let file_length = mmap.len();
let hasher = FxBuildHasher;
let mut stations: FxHashMap<String, StationMeasurements> =
FxHashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher);
let (tx, rx) = mpsc::channel();
let cores = thread::available_parallelism().unwrap().into();
let chunk_length = file_length / cores;
let mut bounds = Vec::with_capacity(cores + 1);
let mut start = 0;
for _ in 0..cores {
let end = (start + chunk_length).min(mmap.len());
let next_new_line = match memchr::memchr(b'\n', &mmap[end..]) {
Some(v) => v,
None => {
assert_eq!(end, mmap.len());
0
}
};
let end = end + next_new_line;
bounds.push((start, end));
start = end + 1;
}
thread::scope(|s| {
for i in 0..cores {
let tx = tx.clone();
let (start, end) = *bounds.get(i).unwrap();
let mmap_slice = &mmap[start..end];
s.spawn(move || {
let mut t_stations: FxHashMap<String, StationMeasurements> =
FxHashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher);
let lines = mmap_slice.chunks_exact(107);
for (line_num, line) in lines.enumerate() {
if line_num % 100000 == 0 {
print!("\x1b[{i};0Hlines: {line_num}");
}
let (station, temp) = unsafe { line.split_at_unchecked(100) };
let station = {
if station[station.len() - 1] == 0u8 {
unsafe {
std::str::from_utf8_unchecked(
CStr::from_bytes_until_nul(station).unwrap().to_bytes(),
)
}
} else {
unsafe { std::str::from_utf8_unchecked(station) }
}
}
.to_owned();
let temp = parse::temp_new(&temp[1..6]);
let measurements_option = t_stations.get_mut(&station);
if let Some(measurements) = measurements_option {
measurements.update(temp);
} else {
let measurements = StationMeasurements {
min: temp,
max: temp,
count: 1,
sum: temp,
};
t_stations.insert(station, measurements);
}
}
let _ = tx.send(t_stations);
});
}
drop(tx);
while let Ok(t_stations) = rx.recv() {
for (station, measurements) in t_stations.iter() {
let joined_measurements_options = stations.get_mut(station);
if let Some(joined_measurements) = joined_measurements_options {
joined_measurements.merge(measurements);
} else {
stations.insert(station.to_owned(), *measurements);
}
}
}
let mut stations: Vec<String> = stations
.iter()
.map(|(station, measurements)| {
let measurements = measurements.to_string();
#[cfg(feature = "json")]
{
format!("{{\"{station}\":\"{measurements}\"}}")
}
#[cfg(not(feature = "json"))]
{
format!("{station}={measurements}")
}
})
.collect();
stations.sort();
let stations = stations.join(",");
#[cfg(feature = "json")]
{
println!("\n\n[{stations}]");
}
#[cfg(not(feature = "json"))]
{
println!("\n\n{{{stations}}}");
}
println!("\n\nTime={} ms", now.elapsed().as_millis());
});
}

View File

@@ -0,0 +1,203 @@
use std::collections::HashMap;
use std::env;
use std::fmt;
use std::fs::File;
use std::io;
use std::io::prelude::*;
use std::thread::{self, Scope, ScopedJoinHandle};
use crate::models::mmap::{Mmap, MmapChunkIterator};
// Defined in challenge spec
const MAX_STATIONS: usize = 10000;
const NUM_CONSUMERS: usize = 32;
const FIXED_POINT_DIVISOR: f64 = 10.0;
struct StationData {
min_temp: i32,
max_temp: i32,
count: i32,
temp_sum: i32,
}
impl fmt::Display for StationData {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{:.1}/{:.1}/{:.1}",
(self.min_temp as f64 / FIXED_POINT_DIVISOR),
self.get_mean(),
(self.max_temp as f64 / FIXED_POINT_DIVISOR)
)
}
}
/// Efficiently handles station statistics. Avoids using floating-point arithmetic to speed-up processing.
/// The mean is only calculated on demand, so we avoid calculating it as we read the file
impl StationData {
fn new(temp: i32) -> Self {
Self {
min_temp: temp,
max_temp: temp,
count: 1,
temp_sum: temp,
}
}
fn to_bytes(&self) -> Vec<u8> {
format!(
"{:.1}/{:.1}/{:.1}",
(self.min_temp as f64 / FIXED_POINT_DIVISOR),
self.get_mean(),
(self.max_temp as f64 / FIXED_POINT_DIVISOR)
)
.into_bytes()
}
fn get_mean(&self) -> f64 {
(self.temp_sum as f64 / self.count as f64) / FIXED_POINT_DIVISOR
}
fn update_from(&mut self, temp: i32) {
self.max_temp = self.max_temp.max(temp);
self.min_temp = self.min_temp.min(temp);
self.count += 1;
self.temp_sum += temp;
}
fn update_from_station(&mut self, src: &mut Self) {
self.max_temp = self.max_temp.max(src.max_temp);
self.min_temp = self.min_temp.min(src.min_temp);
self.temp_sum += src.temp_sum;
self.count += src.count;
}
#[inline]
fn parse_temp(bytes: &[u8]) -> i32 {
let mut result: i32 = 0;
let mut negative: bool = false;
for &b in bytes {
match b {
b'0'..=b'9' => {
result = result * 10 + (b as i32 - b'0' as i32);
}
b'.' => {}
b'-' => {
negative = true;
}
_ => panic!("wrong format for temperature"),
}
}
if negative {
return -result;
}
result
}
#[inline]
fn parse_data(line: &[u8]) -> (&[u8], i32) {
let semicolon_pos = line.iter().position(|&b| b == b';').unwrap();
let name = &line[..semicolon_pos];
let temp = &line[semicolon_pos + 1..];
(name, Self::parse_temp(temp))
}
}
fn merge_hashmaps<'a>(
mut dest: HashMap<&'a [u8], StationData>,
src: HashMap<&'a [u8], StationData>,
) -> HashMap<&'a [u8], StationData> {
for (k, mut v) in src {
dest.entry(k)
.and_modify(|e| e.update_from_station(&mut v))
.or_insert(v);
}
dest
}
/// Parses a chunk of the input as StationData values.
fn process_chunk(current_chunk_slice: &[u8]) -> HashMap<&[u8], StationData> {
let mut station_map: HashMap<&[u8], StationData> = HashMap::with_capacity(MAX_STATIONS);
let mut start = 0;
while let Some(end) = current_chunk_slice[start..]
.iter()
.position(|&b| b == b'\n')
{
let line = &current_chunk_slice[start..start + end];
let (name, temp) = StationData::parse_data(line);
station_map
.entry(name)
.and_modify(|e| e.update_from(temp))
.or_insert(StationData::new(temp));
start += end + 1; // move to the start of the next line
}
// If we don't find a \n, process the remaining data
if start < current_chunk_slice.len() {
let line = &current_chunk_slice[start..];
let (name, temp) = StationData::parse_data(line);
station_map
.entry(name)
.and_modify(|e| e.update_from(temp))
.or_insert(StationData::new(temp));
}
station_map
}
fn process_mmap<'scope, 'env>(
mmap: Mmap<'env>,
s: &'scope Scope<'scope, 'env>,
) -> HashMap<&'env [u8], StationData> {
let mut handlers: Vec<ScopedJoinHandle<HashMap<&[u8], StationData>>> = Vec::new();
for chunk in MmapChunkIterator::new(mmap, NUM_CONSUMERS) {
let h = s.spawn(move || process_chunk(chunk));
handlers.push(h);
}
let mut station_map: HashMap<&[u8], StationData> = HashMap::with_capacity(MAX_STATIONS);
for h in handlers {
let inner_station = h.join().unwrap();
station_map = merge_hashmaps(station_map, inner_station);
}
station_map
}
fn write_output_to_stdout(station_map: HashMap<&[u8], StationData>) -> io::Result<()> {
let mut stdout = io::stdout().lock();
let mut buffer = Vec::new();
buffer.extend_from_slice(b"{");
let mut sorted_key_value_vec: Vec<_> = station_map.iter().collect();
sorted_key_value_vec.sort_by_key(|e| e.0);
for (i, (name, data)) in sorted_key_value_vec.iter().enumerate() {
if i > 0 {
buffer.extend_from_slice(b", ");
}
buffer.extend_from_slice(name);
buffer.extend_from_slice(b"=");
buffer.extend(data.to_bytes());
}
buffer.extend_from_slice(b"}");
stdout.write_all(&buffer)
}
pub fn run() -> io::Result<()> {
// won't accept non-utf-8 args
let args: Vec<String> = env::args().collect();
let file_name = match args.get(2) {
Some(fname) => fname,
None => "../../../measurements.txt",
};
let f = File::open(file_name)?;
let mmap = Mmap::from_file(f);
thread::scope(|s| {
let station_map = process_mmap(mmap, s);
write_output_to_stdout(station_map).unwrap();
});
Ok(())
}

View File

@@ -0,0 +1,133 @@
use bstr::{BStr, ByteSlice};
use memmap2::MmapOptions;
use rayon::prelude::*;
use rustc_hash::FxHashMap as HashMap;
use std::time::Instant;
use std::{fmt::Display, fs::File};
#[derive(Debug)]
struct State {
min: f64,
max: f64,
count: u64,
sum: f64,
}
impl Default for State {
fn default() -> Self {
Self {
min: f64::MAX,
max: f64::MIN,
count: 0,
sum: 0.0,
}
}
}
impl Display for State {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let avg = self.sum / (self.count as f64);
write!(f, "{:.1}/{avg:.1}/{:.1}", self.min, self.max)
}
}
impl State {
fn update(&mut self, v: f64) {
self.min = self.min.min(v);
self.max = self.max.max(v);
self.count += 1;
self.sum += v;
}
fn merge(&mut self, other: &Self) {
self.min = self.min.min(other.min);
self.max = self.max.max(other.max);
self.count += other.count;
self.sum += other.sum;
}
}
fn make_map<'a>(i: impl Iterator<Item = &'a [u8]>) -> HashMap<&'a BStr, State> {
let mut state: HashMap<&'a BStr, State> = Default::default();
for line in i {
let (name, value) = line.split_once_str(b";").unwrap();
let value = fast_float::parse(value).unwrap();
state.entry(name.into()).or_default().update(value);
}
state
}
fn solve_for_part((start, end): (usize, usize), mem: &[u8]) -> HashMap<&BStr, State> {
make_map((mem[start..end]).lines())
}
fn merge<'a>(a: &mut HashMap<&'a BStr, State>, b: &HashMap<&'a BStr, State>) {
for (k, v) in b {
a.entry(k).or_default().merge(v);
}
}
pub fn run() {
let now = Instant::now();
let cores: usize = std::thread::available_parallelism().unwrap().into();
let path = match std::env::args().nth(1) {
Some(path) => path,
None => "../../../measurements.txt".to_owned(),
};
let file = File::open(path).unwrap();
let mmap = unsafe { MmapOptions::new().map(&file).unwrap() };
let chunk_size = mmap.len() / cores;
let mut chunks: Vec<(usize, usize)> = vec![];
let mut start = 0;
for _ in 0..cores {
let end = (start + chunk_size).min(mmap.len());
let next_new_line = match memchr::memchr(b'\n', &mmap[end..]) {
Some(v) => v,
None => {
assert_eq!(end, mmap.len());
0
}
};
let end = end + next_new_line;
chunks.push((start, end));
start = end + 1;
}
let parts: Vec<_> = chunks
.par_iter()
.map(|r| solve_for_part(*r, &mmap))
.collect();
let state: HashMap<&BStr, State> = parts.into_iter().fold(Default::default(), |mut a, b| {
merge(&mut a, &b);
a
});
let mut all: Vec<_> = state.into_iter().collect();
all.sort_unstable_by(|a, b| a.0.cmp(b.0));
#[cfg(feature = "json")]
{
print!("[");
for (i, (name, state)) in all.into_iter().enumerate() {
if i == 0 {
print!("{{\"{name}\":\"{state}\"}}");
} else {
print!(", {{\"{name}\":\"{state}\"}}");
}
}
println!("]");
}
#[cfg(not(feature = "json"))]
{
print!("{{");
for (i, (name, state)) in all.into_iter().enumerate() {
if i == 0 {
print!("{name}={state}");
} else {
print!(", {name}={state}");
}
}
println!("}}");
}
println!("\n\nTime={} ms", now.elapsed().as_millis());
}

View File

@@ -0,0 +1,443 @@
use ptr_hash::PtrHashParams;
use rustc_hash::FxHashSet;
use std::{
simd::{cmp::SimdPartialEq, Simd},
vec::Vec,
};
type V = i32;
type PtrHash = ptr_hash::DefaultPtrHash<ptr_hash::hash::FxHash, u64>;
pub struct Phf {
pub ptr_hash: PtrHash,
pub keys: Vec<Vec<u8>>,
pub slots: Vec<Record>,
}
impl Phf {
fn new(mut keys: Vec<Vec<u8>>) -> Self {
keys.sort();
let num_slots = keys.len() * 5 / 2;
let params = ptr_hash::PtrHashParams {
alpha: 0.9,
c: 1.5,
slots_per_part: num_slots,
..PtrHashParams::default()
};
let mut hashes: Vec<u64> = keys.iter().map(|key| hash_name(key)).collect();
hashes.sort();
for (x, y) in hashes.iter().zip(hashes.iter().skip(1)) {
assert!(*x != *y, "DUPLICATE HASH");
}
let ptr_hash = PtrHash::new(&hashes, params);
let slots = vec![Record::default(); num_slots];
Self {
ptr_hash,
keys,
slots,
}
}
fn compute_index(&self, hash: u64) -> usize {
self.ptr_hash.index_single_part(&hash)
}
fn get_index_mut(&mut self, idx: usize) -> &mut Record {
&mut self.slots[idx]
}
fn index_hash_mut(&mut self, hash: u64) -> &mut Record {
&mut self.slots[self.ptr_hash.index_single_part(&hash)]
}
pub fn index<'b>(&'b self, key: &[u8]) -> &'b Record {
let hash = hash_name(key);
&self.slots[self.compute_index(hash)]
}
fn index_mut<'b>(&'b mut self, key: &[u8]) -> &'b mut Record {
self.index_hash_mut(hash_name(key))
}
fn merge(&mut self, r: Self) {
// TODO: If key sets are equal or one is a subset of the other, merge
// smaller into larger.
let mut new_keys = vec![];
let mut i1 = 0;
let mut i2 = 0;
while i1 < self.keys.len() && i2 < r.keys.len() {
if self.keys[i1] == r.keys[i2] {
new_keys.push(self.keys[i1].clone());
i1 += 1;
i2 += 1;
continue;
}
if self.keys[i1] < r.keys[i2] {
new_keys.push(self.keys[i1].clone());
i1 += 1;
continue;
}
if self.keys[i1] > r.keys[i2] {
new_keys.push(r.keys[i2].clone());
i2 += 1;
continue;
}
panic!();
}
while i1 < self.keys.len() {
new_keys.push(self.keys[i1].clone());
i1 += 1;
}
while i2 < r.keys.len() {
new_keys.push(r.keys[i2].clone());
i2 += 1;
}
let mut new_phf = Self::new(new_keys);
for key in &self.keys {
new_phf.index_mut(key).merge(self.index(key));
}
for key in &r.keys {
new_phf.index_mut(key).merge(r.index(key));
}
*self = new_phf;
}
}
#[derive(Clone, Debug)]
#[repr(align(32))]
pub struct Record {
pub count: u64,
// Storing these as two u32 is nice, because they are read as a single u64.
/// Byte representation of string ~b"bc.d" or ~b"\0c.d".
pub min: u32,
/// Byte representation of string b"bc.d" or b"\0c.d".
pub max: u32,
pub sum: u64,
}
impl Record {
fn default() -> Self {
Self {
count: 0,
min: 0,
max: 0,
sum: 0,
}
}
fn add(&mut self, raw_value: u32, value: u64) {
// assert2::debug_assert!(value < 1000);
self.count += 1;
self.sum += value;
// See https://en.algorithmica.org/hpc/algorithms/argmin/
if raw_value < self.min {
self.min = raw_value;
}
if raw_value > self.max {
self.max = raw_value;
}
}
fn merge(&mut self, other: &Self) {
self.count += other.count;
self.sum += other.sum_to_val() as u64;
self.min = self.min.min(other.min);
self.max = self.max.max(other.max);
}
fn sum_to_val(&self) -> V {
let m = (1 << 21) - 1;
((self.sum & m) + 10 * ((self.sum >> 21) & m) + 100 * ((self.sum >> 42) & m)) as _
}
/// Return (min, avg, max)
pub fn merge_pos_neg(pos: &Record, neg: &Record) -> (V, V, V) {
let pos_sum = pos.sum as V;
let neg_sum = neg.sum as V;
let sum = pos_sum - neg_sum;
let count = (pos.count + neg.count) as V;
// round to nearest
let avg = (sum + count / 2).div_floor(count);
let pos_max = raw_to_value(pos.max);
let neg_max = -raw_to_value(neg.min);
let max = pos_max.max(neg_max);
let pos_min = raw_to_value(pos.min);
let neg_min = -raw_to_value(neg.max);
let min = pos_min.min(neg_min);
(min, avg, max)
}
}
/// Reads raw bytes and masks the ; and the b'0'=0x30.
/// Returns something of the form 0x0b0c..0d or 0x000c..0d
fn parse_to_raw(data: &[u8], start: usize, end: usize) -> u32 {
let raw = u32::from_be_bytes(unsafe { *data.get_unchecked(start..).as_ptr().cast() });
raw >> (8 * (4 - (end - start)))
}
fn raw_to_pdep(raw: u32) -> u64 {
#[cfg(feature = "no_pdep")]
{
let raw = raw as u64;
(raw & 15) | ((raw & (15 << 16)) << (21 - 16)) | ((raw & (15 << 24)) << (42 - 24))
}
#[cfg(not(feature = "no_pdep"))]
{
let mask = 0x0f0f000f;
let raw = raw & mask;
// input 0011bbbb0011cccc........0011dddd
// 0b bbbb xxxxcccc yyyyyyyyyyyydddd // Deposit here
// 0b 1111 1111 1111 // Mask out trash using &
let pdep = 0b0000000000000000001111000000000000011111111000001111111111111111u64;
unsafe { core::arch::x86_64::_pdep_u64(raw as u64, pdep) }
}
}
fn raw_to_value(v: u32) -> V {
let mask = 0x0f0f000f;
let bytes = (v & mask).to_be_bytes();
// s = bc.d
let b = bytes[0] as V;
let c = bytes[1] as V;
let d = bytes[3] as V;
b as V * 100 * (bytes[0] != 0) as V + c as V * 10 + d as V
}
pub fn format(v: V) -> String {
format!("{:.1}", v as f64 / 10.0)
}
#[allow(unused)]
fn hash_name(name: &[u8]) -> u64 {
// Hash the first and last 8 bytes.
// TODO: More robust hash that actually uses all characters.
let head: [u8; 8] = unsafe { *name.get_unchecked(..8).split_first_chunk().unwrap().0 };
let tail: [u8; 8] = unsafe {
*name
.get_unchecked(name.len().wrapping_sub(8)..)
.split_first_chunk()
.unwrap()
.0
};
let shift = 64usize.saturating_sub(8 * name.len());
let khead = u64::from_ne_bytes(head) << shift;
let ktail = u64::from_ne_bytes(tail) >> shift;
khead.wrapping_add(ktail)
}
/// Number of SIMD lanes. AVX2 has 256 bits, so 32 lanes.
const L: usize = 32;
/// The Simd type.
pub type S = Simd<u8, L>;
#[derive(Copy, Clone)]
struct State {
start: usize,
sep: usize,
end: usize,
}
/// Find the regions between \n and ; (names) and between ; and \n (values),
/// and calls `callback` for each line.
#[inline(always)]
fn iter_lines<'a>(
mut data: &'a [u8],
mut callback: impl FnMut(&'a [u8], State, State, State, State),
) {
// Make sure that the out-of-bounds reads we do are OK.
data = &data[..data.len() - 32];
let sep = S::splat(b';');
let end = S::splat(b'\n');
let find = |last: usize, sep: S| {
let simd = S::from_array(unsafe { *data.get_unchecked(last..).as_ptr().cast() });
let eq = sep.simd_eq(simd).to_bitmask() as u32;
let offset = eq.trailing_zeros() as usize;
last + offset
};
// Modified to be able to search regions longer than 32.
let find_long = |mut last: usize, sep: S| {
let simd = S::from_array(unsafe { *data.get_unchecked(last..).as_ptr().cast() });
let mut eq = sep.simd_eq(simd).to_bitmask() as u32;
if eq == 0 {
while eq == 0 {
last += 32;
let simd = S::from_array(unsafe { *data.get_unchecked(last..).as_ptr().cast() });
eq = sep.simd_eq(simd).to_bitmask() as u32;
}
}
let offset = eq.trailing_zeros() as usize;
last + offset
};
let init_state = |idx: usize| {
let first_end = find_long(idx, end);
State {
start: first_end + 1,
sep: first_end + 1,
end: 0,
}
};
let mut state0 = init_state(0);
let mut state1 = init_state(data.len() / 4);
let mut state2 = init_state(2 * data.len() / 4);
let mut state3 = init_state(3 * data.len() / 4);
// Duplicate each line for each input state.
macro_rules! step {
[$($s:expr),*] => {
$($s.sep = find_long($s.sep + 1, sep) ;)*
$($s.end = find($s.sep + 1, end) ;)*
callback(data, $($s, )*);
$($s.start = $s.end + 1;)*
}
}
while state3.start < data.len() {
step!(state0, state1, state2, state3);
}
}
fn run(data: &[u8], keys: &[Vec<u8>]) -> Phf {
// Each thread has its own accumulator.
let mut h = Phf::new(keys.to_vec());
iter_lines(
data,
|data, mut s0: State, mut s1: State, mut s2: State, mut s3: State| {
unsafe {
// If value is negative, extend name by one character.
s0.sep += (data.get_unchecked(s0.sep + 1) == &b'-') as usize;
let name0 = data.get_unchecked(s0.start..s0.sep);
s1.sep += (data.get_unchecked(s1.sep + 1) == &b'-') as usize;
let name1 = data.get_unchecked(s1.start..s1.sep);
s2.sep += (data.get_unchecked(s2.sep + 1) == &b'-') as usize;
let name2 = data.get_unchecked(s2.start..s2.sep);
s3.sep += (data.get_unchecked(s3.sep + 1) == &b'-') as usize;
let name3 = data.get_unchecked(s3.start..s3.sep);
let raw0 = parse_to_raw(data, s0.sep + 1, s0.end);
let raw1 = parse_to_raw(data, s1.sep + 1, s1.end);
let raw2 = parse_to_raw(data, s2.sep + 1, s2.end);
let raw3 = parse_to_raw(data, s3.sep + 1, s3.end);
let h0 = hash_name(name0);
let h1 = hash_name(name1);
let h2 = hash_name(name2);
let h3 = hash_name(name3);
let idx0 = h.compute_index(h0);
let idx1 = h.compute_index(h1);
let idx2 = h.compute_index(h2);
let idx3 = h.compute_index(h3);
h.get_index_mut(idx0).add(raw0, raw_to_pdep(raw0));
h.get_index_mut(idx1).add(raw1, raw_to_pdep(raw1));
h.get_index_mut(idx2).add(raw2, raw_to_pdep(raw2));
h.get_index_mut(idx3).add(raw3, raw_to_pdep(raw3));
}
},
);
h
}
pub fn run_parallel(data: &[u8], keys: &[Vec<u8>], num_threads: usize) -> Phf {
if num_threads == 0 {
return run(data, keys);
}
let phf = std::sync::Mutex::new(Phf::new(keys.to_vec()));
// Spawn one thread per core.
std::thread::scope(|s| {
let chunks = data.chunks(data.len() / num_threads + 1);
for chunk in chunks {
s.spawn(|| {
// Each thread has its own accumulator.
let thread_phf = run(chunk, keys);
// Merge results.
phf.lock().unwrap().merge(thread_phf);
});
}
});
phf.into_inner().unwrap()
}
pub fn to_str(name: &[u8]) -> &str {
std::str::from_utf8(name).unwrap()
}
/// Returns a list of city names found in data.
/// Each city is returned twice, once as `<city>` and once as `<city>;`,
/// with the latter being used to accumulate negative temperatures.
#[inline(never)]
pub fn find_city_names(data: &[u8]) -> Vec<Vec<u8>> {
let mut cities = FxHashSet::default();
let mut callback = |data: &[u8], state: State| {
let State { start, sep, .. } = state;
let name = unsafe { data.get_unchecked(start..sep) };
cities.insert(name.to_vec());
// Do the same for the name with ; appended.
let name = unsafe { data.get_unchecked(start..sep + 1) };
cities.insert(name.to_vec());
};
iter_lines(data, |d, s0, s1, s2, s3| {
flatten_callback(d, s0, s1, s2, s3, &mut callback)
});
let mut cities: Vec<_> = cities.into_iter().collect();
cities.sort();
cities
}
fn flatten_callback<'a>(
data: &'a [u8],
s0: State,
s1: State,
s2: State,
s3: State,
callback: &mut impl FnMut(&'a [u8], State),
) {
callback(data, s0);
callback(data, s1);
callback(data, s2);
callback(data, s3);
}
#[derive(clap::Parser)]
pub struct Args {
pub input: Option<String>,
#[clap(short = 'j', long)]
pub threads: Option<usize>,
#[clap(long)]
pub print: bool,
#[clap(long)]
pub stats: bool,
}
#[cfg(test)]
mod test {
#[test]
fn parse_raw() {
use super::*;
let d = b"12.3";
let raw = parse_to_raw(d, 0, 4);
let v = raw_to_value(raw);
assert_eq!(v, 123);
let d = b"12.3";
let raw = parse_to_raw(d, 1, 4);
let v = raw_to_value(raw);
assert_eq!(v, 23);
}
}

View File

@@ -0,0 +1,54 @@
use crate::models::station_measurements::StationMeasurements;
use crate::utils::{hash, parse};
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::time::Instant;
const DEFAULT_HASHMAP_LENGTH: usize = 10000;
pub fn run() {
let now = Instant::now();
let mut stations: HashMap<u64, (String, StationMeasurements)> =
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
let file = File::open("../../../measurements.txt").expect("File measurements.txt not found");
let mut reader = BufReader::new(&file);
let mut line = Vec::with_capacity(108);
loop {
let line_len = reader
.read_until(b'\n', &mut line)
.expect("could not read bytes");
if line_len == 0 {
break;
}
let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap();
let hash = hash::bytes(station);
let station = unsafe { std::str::from_utf8_unchecked(station) };
let temp = parse::temp(temp.split_last().unwrap().1);
let measurements_option = stations.get_mut(&hash);
if let Some((_, measurements)) = measurements_option {
measurements.update(temp);
} else {
let measurements = StationMeasurements {
min: temp,
max: temp,
count: 1,
sum: temp,
};
stations.insert(hash, (station.to_string(), measurements));
}
line.clear();
}
let mut stations: Vec<String> = stations
.iter()
.map(|(_, (station, measurements))| {
let measurements = measurements.to_string();
format!("{station}={measurements}")
})
.collect();
stations.sort();
let stations = stations.join(",");
println!("{{{stations}}}");
println!("Time={} ms", now.elapsed().as_millis());
}

View File

@@ -0,0 +1,60 @@
use smol::fs::File;
use smol::io::{AsyncBufReadExt, BufReader};
use crate::models::station_measurements::StationMeasurements;
use crate::utils::{hash, parse};
use std::collections::HashMap;
use std::time::Instant;
const DEFAULT_HASHMAP_LENGTH: usize = 10000;
pub fn run() {
let now = Instant::now();
let mut stations: HashMap<u64, (String, StationMeasurements)> =
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
smol::block_on(async {
let mut file = File::open("../../../measurements.txt")
.await
.expect("Could not read file");
let mut reader = BufReader::new(&mut file);
let mut line = Vec::with_capacity(108);
loop {
let line_len = reader
.read_until(b'\n', &mut line)
.await
.expect("could not read bytes");
if line_len == 0 {
break;
}
let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap();
let hash = hash::bytes(station);
let station = unsafe { std::str::from_utf8_unchecked(station) };
let temp = parse::temp(temp.split_last().unwrap().1);
let measurements_option = stations.get_mut(&hash);
if let Some((_, measurements)) = measurements_option {
measurements.update(temp);
} else {
let measurements = StationMeasurements {
min: temp,
max: temp,
count: 1,
sum: temp,
};
stations.insert(hash, (station.to_string(), measurements));
}
line.clear();
}
let mut stations: Vec<String> = stations
.iter()
.map(|(_, (station, measurements))| {
let measurements = measurements.to_string();
format!("{station}={measurements}")
})
.collect();
stations.sort();
let stations = stations.join(",");
println!("{{{stations}}}");
println!("Time={} ms", now.elapsed().as_millis());
})
}

9
src/main/rust/src/lib.rs Normal file
View File

@@ -0,0 +1,9 @@
#![feature(slice_as_chunks)]
#![feature(portable_simd)]
#![feature(slice_split_once)]
#![feature(hash_raw_entry)]
#![feature(int_roundings)]
pub mod implementations;
pub mod models;
pub mod utils;

View File

@@ -0,0 +1,7 @@
fn main() {
// let now = Instant::now();
// let file = File::open("../../../measurements.txt").expect("File measurements.txt not found");
// let reader = BufReader::new(file);
// reader.lines().for_each(|_x| {()});
// println!("Time={} μs", now.elapsed().as_micros());
}

View File

@@ -0,0 +1,2 @@
pub mod mmap;
pub mod station_measurements;

View File

@@ -0,0 +1,145 @@
use std::ops::Deref;
use std::os::fd::AsRawFd;
use std::ptr::null_mut;
use std::{fs::File, os::raw::c_void};
use libc::{madvise, mmap, munmap, size_t, MADV_WILLNEED, MAP_FAILED, MAP_SHARED, PROT_READ};
/// Smart pointer type for a mmap. Handles munmap call.
pub struct Mmap<'a> {
mmap_slice: &'a [u8],
}
/// To properly dispose of the mmap we have to manually call munmap.
/// So implementing drop for this smart-pointer type is necessary.
impl Drop for Mmap<'_> {
fn drop(&mut self) {
unsafe {
munmap(
self.mmap_slice.as_ptr() as *mut c_void,
self.mmap_slice.len(),
);
}
}
}
// anti-pattern for non-smart pointer types.
// ref: https://rust-unofficial.github.io/patterns/anti_patterns/deref.html
impl Deref for Mmap<'_> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.mmap_slice
}
}
impl<'a> Mmap<'a> {
fn new(data: &'a [u8]) -> Self {
Self { mmap_slice: data }
}
pub fn from_file(f: File) -> Self {
let size = f.metadata().unwrap().len() as size_t;
let prot = PROT_READ;
let flags = MAP_SHARED;
unsafe {
let m = mmap(null_mut(), size, prot, flags, f.as_raw_fd(), 0);
if m == MAP_FAILED {
panic!("mmap failed");
}
// We can advise the kernel on how we intend to use the mmap.
// But this did not improve my read performance in a meaningful way
madvise(m, size, MADV_WILLNEED);
Self::new(std::slice::from_raw_parts(m as *const u8, size))
}
}
}
pub struct MmapChunkIterator<'a> {
data: Mmap<'a>,
chunk_size: usize,
}
impl<'a> MmapChunkIterator<'a> {
fn with_consumers(mut self, consumers: usize) -> Self {
self.chunk_size = self.data.len() / consumers;
self
}
pub fn new(data: Mmap<'a>, num_consumers: usize) -> Self {
Self {
data,
chunk_size: 1,
}
.with_consumers(num_consumers)
}
}
impl<'a> IntoIterator for Mmap<'a> {
type IntoIter = MmapChunkIterator<'a>;
type Item = &'a [u8];
fn into_iter(self) -> Self::IntoIter {
MmapChunkIterator {
data: self,
chunk_size: 1,
}
}
}
impl<'a> Iterator for MmapChunkIterator<'a> {
type Item = &'a [u8];
fn next(&mut self) -> Option<Self::Item> {
if self.data.is_empty() {
return None;
}
let chunk_end = self.chunk_size.min(self.data.len());
let chunk = &self.data[..chunk_end];
// Find the last newline in the chunk
let split_at = chunk
.iter()
.rposition(|&x| x == b'\n')
.map(|i| i + 1)
.unwrap_or(chunk_end);
let (result, rest) = self.data.mmap_slice.split_at(split_at);
self.data.mmap_slice = rest;
Some(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::io::Write;
use std::path::Path;
fn create_test_file(path: &Path, content: &[u8]) {
let mut file = File::create(path).unwrap();
file.write_all(content).unwrap();
}
fn remove_test_file(path: &Path) {
if path.exists() {
fs::remove_file(path).unwrap();
}
}
#[test]
fn test_from_file() {
let test_file_path = Path::new("test_file.txt");
let test_content = b"Hello, mmap!";
create_test_file(test_file_path, test_content);
let file = File::open(test_file_path).unwrap();
let mmap = Mmap::from_file(file);
assert_eq!(&*mmap, test_content);
remove_test_file(test_file_path);
}
}

View File

@@ -0,0 +1,37 @@
use std::fmt::Display;
#[derive(Copy, Clone)]
pub struct StationMeasurements {
pub min: isize,
pub max: isize,
pub count: isize,
pub sum: isize,
}
impl StationMeasurements {
pub fn update(&mut self, v: isize) {
self.min = self.min.min(v);
self.max = self.max.max(v);
self.count += 1;
self.sum += v;
}
pub fn merge(&mut self, other: &Self) {
self.min = self.min.min(other.min);
self.max = self.max.max(other.max);
self.count += other.count;
self.sum += other.sum;
}
}
impl Display for StationMeasurements {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let min_pre = self.min / 10;
let min_suf = (self.min % 10).abs();
let max_pre = self.max / 10;
let max_suf = (self.max % 10).abs();
// casting here is faster
let avg = (self.sum as f64 / self.count as f64) / 10.0;
write!(f, "{min_pre}.{min_suf}/{avg:.1}/{max_pre}.{max_suf}")
}
}

View File

@@ -0,0 +1,15 @@
pub mod byte_pos;
pub mod hash;
pub mod parse;
pub mod write_structured_measurements;
pub fn format_nums(num: usize) -> String {
num.to_string()
.as_bytes()
.rchunks(3)
.rev()
.map(std::str::from_utf8)
.collect::<Result<Vec<&str>, _>>()
.unwrap()
.join("_")
}

View File

@@ -0,0 +1,42 @@
#[inline]
pub fn get(bytes: &[u8], find: u8) -> Option<u32> {
let chunks = bytes.windows(4);
for (pos, chunk) in chunks.enumerate() {
let inner_pos = get_pos_in_chunk(chunk, find);
if inner_pos < chunk.len() as u32 {
return Some(pos as u32 + inner_pos);
}
}
None
}
#[inline]
fn get_pos_in_chunk(byte_chunk: &[u8], find: u8) -> u32 {
let find_hex = u32::from_be_bytes([find; 4]);
let x = u32::from_be_bytes(byte_chunk.try_into().unwrap()) ^ find_hex;
let mask = (x - 0x01010101) & (!x & (0x80808080));
u32::leading_zeros(mask) >> 3
}
#[cfg(test)]
mod tests {
use crate::utils::byte_pos::get;
#[test]
fn test_getpos() {
let semi_bytes = vec![
0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, b';', 0_u8, 0_u8,
];
let semi_bytes = semi_bytes.as_slice();
let pos = get(semi_bytes, b';').unwrap();
assert_eq!(pos, 8);
}
#[test]
fn test_getpos_empty() {
let semi_bytes = vec![0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8, 0_u8];
let semi_bytes = semi_bytes.as_slice();
let pos = get(semi_bytes, b';');
assert_eq!(pos, None);
}
}

View File

@@ -0,0 +1,40 @@
#[inline]
pub fn bytes(bytes: &[u8]) -> u64 {
if cfg!(not(debug_assertions)) {
// inspired by https://curiouscoding.nl/posts/1brc/
let head: [u8; 8] = unsafe { bytes.get_unchecked(..8).as_chunks::<8>().0[0] };
let tail: [u8; 8] = unsafe { bytes.get_unchecked(bytes.len() - 8..).as_chunks::<8>().0[0] };
let shift = 64usize.saturating_sub(8 * bytes.len());
let khead = u64::from_ne_bytes(head) << shift;
let ktail = u64::from_ne_bytes(tail) >> shift;
khead + ktail
} else {
// debug friendly but slow
let mut head = [0u8; 8];
let mut tail = [0u8; 8];
let end = bytes.len().min(8);
let start = bytes.len().saturating_sub(8);
head[..end].copy_from_slice(&bytes[..end]);
tail[..end].copy_from_slice(&bytes[start..]);
let shift = 64usize.saturating_sub(8 * bytes.len());
let khead = u64::from_ne_bytes(head) << shift;
let ktail = u64::from_ne_bytes(tail) >> shift;
khead.wrapping_add(ktail)
}
}
#[cfg(test)]
mod tests {
use crate::utils::hash;
#[test]
fn test_hashstr() {
let hash_1 = hash::bytes(b"abcdefghijk");
let hash_2 = hash::bytes(b"kjihgfedcba");
let hash_3 = hash::bytes(b"abba");
let hash_4 = hash::bytes(b"baab");
assert_ne!(hash_1, hash_2);
assert_ne!(hash_3, hash_4);
}
}

View File

@@ -0,0 +1,105 @@
use std::simd::num::SimdUint;
use std::simd::{u8x4, Simd};
#[inline]
pub const fn get_digit(b: u8) -> isize {
// wrapping_sub('0' as u32) same as - 48 but less magical
(b as isize).wrapping_sub('0' as isize)
}
#[inline]
pub fn temp(bytes: &[u8]) -> isize {
let is_negative = bytes[0] == b'-';
let as_decimal = match (is_negative, bytes.len()) {
(true, 4) => get_digit(bytes[1]) * 10 + get_digit(bytes[3]),
(true, 5) => get_digit(bytes[1]) * 100 + get_digit(bytes[2]) * 10 + get_digit(bytes[4]),
(false, 3) => get_digit(bytes[0]) * 10 + get_digit(bytes[2]),
(false, 4) => get_digit(bytes[0]) * 100 + get_digit(bytes[1]) * 10 + get_digit(bytes[3]),
_x => panic!(
"could not parse temp: is_negative = {is_negative}, length = {}",
bytes.len()
),
};
if is_negative {
-as_decimal
} else {
as_decimal
}
}
// not really faster
#[inline]
pub fn temp_new(bytes: &[u8]) -> isize {
let (characteristic, mantissa) = unsafe { bytes.split_at_unchecked(bytes.len() - 2) };
let mut sum = (mantissa[1] as u32).wrapping_sub('0' as u32);
let mut position = 10;
let mut idx = characteristic.len();
while idx > 0 {
idx -= 1;
if characteristic[idx] == b'-' {
return -(sum as isize);
}
if characteristic[idx] == 0u8 {
return sum as isize;
}
sum += (characteristic[idx] as u32).wrapping_sub('0' as u32) * position;
position *= 10;
}
sum as isize
}
// slower
#[inline]
pub fn temp_simd(bytes: &[u8]) -> isize {
let is_negative = bytes[0] == b'-';
let bytes = if is_negative { &bytes[1..] } else { bytes };
let len = bytes.len();
let idxs = Simd::from_array([5, len.wrapping_sub(4), len - 3, len - 1]);
let zeroes = u8x4::splat(b'0');
let temp_simd = Simd::gather_or(bytes, idxs, zeroes);
let subbed = temp_simd - zeroes;
let subbed: Simd<isize, 4> = subbed.cast();
let mul = Simd::from_array([0, 100, 10, 1]);
let mulled = subbed * mul;
let sum: isize = mulled.to_array().iter().sum();
if is_negative {
-sum
} else {
sum
}
}
#[cfg(test)]
mod tests {
use crate::utils::parse::temp_new;
#[test]
fn test_temp_new_max() {
let temp_max = temp_new("99.9".as_bytes());
assert_eq!(temp_max, 999);
}
#[test]
fn test_temp_new_min() {
let temp_min = temp_new("-99.9".as_bytes());
assert_eq!(temp_min, -999);
}
#[test]
fn test_temp_new_zero() {
let temp_0 = temp_new("0.0".as_bytes());
assert_eq!(temp_0, 0);
}
#[test]
fn test_temp_new_pos() {
let temp_10 = temp_new("9.9".as_bytes());
assert_eq!(temp_10, 99);
}
#[test]
fn test_temp_new_neg() {
let temp_neg_10 = temp_new("-9.9".as_bytes());
assert_eq!(temp_neg_10, -99);
}
}

View File

@@ -0,0 +1,36 @@
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Write};
pub fn write_structured_measurements() {
let file = File::open("../../../measurements.txt").expect("File measurements.txt not found");
let structured_file =
File::create_new("structured_measurements.txt").expect("Could not create file");
let mut reader = BufReader::new(&file);
let mut writer = BufWriter::new(&structured_file);
let mut line = Vec::with_capacity(107);
let mut line_num = 0;
loop {
line_num += 1;
let line_len = reader
.read_until(b'\n', &mut line)
.expect("could not read bytes");
if line_len == 0 {
break;
}
if line_num % 100000 == 0 {
print!("\x1b[0Glines: {line_num}");
}
let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap();
let station_len = station.len();
let temp_val_start = 107 - temp.len();
let mut write_line = [0u8; 107];
write_line[..station_len].clone_from_slice(station);
write_line[100] = b';';
write_line[temp_val_start..temp_val_start + temp.len()].clone_from_slice(temp);
write_line[106] = b'\n';
writer
.write_all(write_line.as_slice())
.expect("Could not write");
line.clear();
}
}