Compare commits

...

6 Commits

12 changed files with 87 additions and 28 deletions

57
src/main/lua/main.lua Normal file
View File

@@ -0,0 +1,57 @@
local file_path = "../../../measurements.txt"
local file = io.open(file_path, "rb")
if not file then
print("Unable to open file")
return
end
local function split_semi(inputstr)
local t = {}
for str in string.gmatch(inputstr, "([^;]+)") do
table.insert(t, str)
end
return t
end
local stations = {}
local iterations = 1
for line in file:lines("*l") do
if iterations % 1000000 == 0 then
io.write("\x1b[J\x1b[H")
io.write(iterations / 10000000)
io.write("\n")
end
local split_line = split_semi(line)
local station = split_line[1]
local temp_str = string.gsub(split_line[2], "[ ]", "")
local temp = tonumber(temp_str)
if stations[station] == nil then
stations[station] = { min = temp, max = temp, sum = temp, count = 1 }
else
if temp < stations[station].min then
stations[station].min = temp
elseif temp > stations[station].max then
stations[station].max = temp
end
stations[station].sum = stations[station].sum + temp
stations[station].count = stations[station].count + 1
end
iterations = iterations + 1
end
local keys = {}
for k in pairs(stations) do table.insert(keys, k) end
table.sort(keys)
local fstations = {}
io.write("{")
for _, station in ipairs(keys) do
local avg = ((stations[station].sum / 10) / stations[station].count)
local res_str = string.format("%s=%.1f/%.1f/%.1f", station, stations[station].min, avg, stations[station].max)
table.insert(fstations, res_str)
end
io.write(table.concat(fstations, ","))
print("}")

View File

@@ -17,7 +17,7 @@ smol = "2.0.1"
easy-parallel = "3.3.1"
clap = { version = "4.5.13", features = ["derive"] }
colored = "2.1.0"
ptr_hash = { git = "https://github.com/ragnargrootkoerkamp/ptrhash", default_features = false }
ptr_hash = { git = "https://github.com/ragnargrootkoerkamp/ptrhash", default-features = false }
[dev-dependencies]
criterion = { version = "0.5.1", features = ["html_reports"] }

View File

@@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
use onebrc::implementations::flare_flo::run;
pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("flareflo", |b| b.iter(|| run()));
c.bench_function("flareflo", |b| b.iter(run));
}
criterion_group!(benches, criterion_benchmark);

View File

@@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
use onebrc::implementations::libraries::run;
pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("libraries", |b| b.iter(|| run()));
c.bench_function("libraries", |b| b.iter(run));
}
criterion_group!(benches, criterion_benchmark);

View File

@@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
use onebrc::implementations::multi_threaded::run;
pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("multithreaded", |b| b.iter(|| run()));
c.bench_function("multithreaded", |b| b.iter(run));
}
criterion_group!(benches, criterion_benchmark);

View File

@@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
use onebrc::implementations::multi_threaded_smol::run;
pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("multithreadedsmol", |b| b.iter(|| run()));
c.bench_function("multithreadedsmol", |b| b.iter(run));
}
criterion_group!(benches, criterion_benchmark);

View File

@@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
use onebrc::implementations::phcs::run;
pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("phcs", |b| b.iter(|| run()));
c.bench_function("phcs", |b| b.iter(run));
}
criterion_group!(benches, criterion_benchmark);

View File

@@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
use onebrc::implementations::reference_impl::run;
pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("reference", |b| b.iter(|| run()));
c.bench_function("reference", |b| b.iter(run));
}
criterion_group!(benches, criterion_benchmark);

View File

@@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
use onebrc::implementations::single_thread::run;
pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("singlethread", |b| b.iter(|| run()));
c.bench_function("singlethread", |b| b.iter(run));
}
criterion_group!(benches, criterion_benchmark);

View File

@@ -2,6 +2,7 @@ use crate::models::station_measurements::StationMeasurements;
use crate::utils::{hash, parse};
use memmap2::MmapOptions;
use rustc_hash::{FxBuildHasher, FxHashMap as HashMap};
use std::slice::from_raw_parts;
use std::sync::mpsc;
use std::time::Instant;
use std::{fs::File, thread};
@@ -13,9 +14,11 @@ pub fn run() {
const FILE_PATH: &str = "../../../measurements.txt";
let file = File::open(FILE_PATH).expect("File measurements.txt not found");
let mmap = unsafe { MmapOptions::new().map(&file).unwrap() };
let mmap_ptr = mmap.as_ptr();
let file_length = mmap.len();
let hasher = FxBuildHasher;
let mut stations: HashMap<u64, (String, StationMeasurements)> =
// Even if I could now just use the byte slice as a key, doing the hash is still faster
let mut stations: HashMap<u64, (&[u8], StationMeasurements)> =
HashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher);
let (tx, rx) = mpsc::channel();
let cores = thread::available_parallelism().unwrap().into();
@@ -39,9 +42,9 @@ pub fn run() {
for i in 0..cores {
let tx = tx.clone();
let (start, end) = *bounds.get(i).unwrap();
let mmap_slice = &mmap[start..end];
let mmap_slice = unsafe { from_raw_parts(mmap_ptr.add(start), end - start) };
s.spawn(move || {
let mut t_stations: HashMap<u64, (String, StationMeasurements)> =
let mut t_stations: HashMap<u64, (&[u8], StationMeasurements)> =
HashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher);
for line in mmap_slice.split(|&byte| byte == b'\n') {
if line.is_empty() {
@@ -49,7 +52,6 @@ pub fn run() {
}
let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap();
let hash = hash::bytes(station);
let station = unsafe { std::str::from_utf8_unchecked(station) };
let temp = parse::temp(temp);
let measurements_option = t_stations.get_mut(&hash);
if let Some((_, measurements)) = measurements_option {
@@ -61,7 +63,7 @@ pub fn run() {
count: 1,
sum: temp,
};
t_stations.insert(hash, (station.to_string(), measurements));
t_stations.insert(hash, (station, measurements));
}
}
let _ = tx.send(t_stations);
@@ -74,13 +76,14 @@ pub fn run() {
if let Some((_, joined_measurements)) = joined_measurements_options {
joined_measurements.merge(measurements);
} else {
stations.insert(*hash, (station.to_owned(), *measurements));
stations.insert(*hash, (station, *measurements));
}
}
}
let mut stations: Vec<String> = stations
.iter()
.map(|(_, (station, measurements))| {
let station = unsafe { std::str::from_utf8_unchecked(station) };
let measurements = measurements.to_string();
#[cfg(feature = "json")]
{

View File

@@ -21,29 +21,28 @@ pub fn run() {
let file_length = reader.seek(SeekFrom::End(0)).unwrap();
let chunk_length = file_length as usize / cores;
let mut bounds = Vec::with_capacity(cores + 1);
bounds.push(0);
for i in 1..cores {
let mut start = 0;
for i in 0..cores {
let mut reader = BufReader::new(&file);
let mut byte_start = chunk_length * i;
let mut end = chunk_length * i;
reader
.seek(SeekFrom::Start(byte_start as u64))
.seek(SeekFrom::Start(end as u64))
.expect("could not seek");
let mut line = Vec::with_capacity(108);
let line_len = reader
.read_until(b'\n', &mut line)
.expect("could not read bytes");
byte_start += line_len;
bounds.push(byte_start as u64);
end += line_len;
bounds.push((start, end));
start = end + 1;
}
bounds.push(file_length);
for i in 0..cores {
let tx = tx.clone();
let mut currposition = *bounds.get(i).unwrap();
let end = *bounds.get(i + 1).unwrap();
let (mut currposition, end) = *bounds.get(i).unwrap();
s.spawn(move || {
let file = File::open(FILE_PATH).expect("File measurements.txt not found");
let mut reader = BufReader::new(&file);
reader.seek(SeekFrom::Start(currposition)).unwrap();
reader.seek(SeekFrom::Start(currposition as u64)).unwrap();
let mut t_stations: HashMap<u64, (String, StationMeasurements)> =
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
let mut line = Vec::with_capacity(108);
@@ -70,7 +69,7 @@ pub fn run() {
};
t_stations.insert(hash, (station.to_string(), measurements));
}
currposition += line_len as u64;
currposition += line_len;
if currposition >= end {
break;
}

View File

@@ -12,7 +12,7 @@ pub struct Mmap<'a> {
/// To properly dispose of the mmap we have to manually call munmap.
/// So implementing drop for this smart-pointer type is necessary.
impl<'a> Drop for Mmap<'a> {
impl Drop for Mmap<'_> {
fn drop(&mut self) {
unsafe {
munmap(
@@ -25,7 +25,7 @@ impl<'a> Drop for Mmap<'a> {
// anti-pattern for non-smart pointer types.
// ref: https://rust-unofficial.github.io/patterns/anti_patterns/deref.html
impl<'a> Deref for Mmap<'a> {
impl Deref for Mmap<'_> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
@@ -50,7 +50,7 @@ impl<'a> Mmap<'a> {
// We can advise the kernel on how we intend to use the mmap.
// But this did not improve my read performance in a meaningful way
madvise(m, size, MADV_WILLNEED);
return Self::new(std::slice::from_raw_parts(m as *const u8, size));
Self::new(std::slice::from_raw_parts(m as *const u8, size))
}
}
}