Compare commits
6 Commits
4d586c809e
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| fdd92dd5f7 | |||
| 45b3014cbb | |||
| 98cd6e930c | |||
| 40a8d6d929 | |||
| 0ea10a3c1b | |||
| 3dbc9c32d1 |
57
src/main/lua/main.lua
Normal file
57
src/main/lua/main.lua
Normal file
@@ -0,0 +1,57 @@
|
||||
local file_path = "../../../measurements.txt"
|
||||
|
||||
local file = io.open(file_path, "rb")
|
||||
if not file then
|
||||
print("Unable to open file")
|
||||
return
|
||||
end
|
||||
|
||||
local function split_semi(inputstr)
|
||||
local t = {}
|
||||
for str in string.gmatch(inputstr, "([^;]+)") do
|
||||
table.insert(t, str)
|
||||
end
|
||||
return t
|
||||
end
|
||||
|
||||
local stations = {}
|
||||
|
||||
local iterations = 1
|
||||
|
||||
for line in file:lines("*l") do
|
||||
if iterations % 1000000 == 0 then
|
||||
io.write("\x1b[J\x1b[H")
|
||||
io.write(iterations / 10000000)
|
||||
io.write("\n")
|
||||
end
|
||||
local split_line = split_semi(line)
|
||||
local station = split_line[1]
|
||||
local temp_str = string.gsub(split_line[2], "[ ]", "")
|
||||
local temp = tonumber(temp_str)
|
||||
if stations[station] == nil then
|
||||
stations[station] = { min = temp, max = temp, sum = temp, count = 1 }
|
||||
else
|
||||
if temp < stations[station].min then
|
||||
stations[station].min = temp
|
||||
elseif temp > stations[station].max then
|
||||
stations[station].max = temp
|
||||
end
|
||||
stations[station].sum = stations[station].sum + temp
|
||||
stations[station].count = stations[station].count + 1
|
||||
end
|
||||
iterations = iterations + 1
|
||||
end
|
||||
|
||||
local keys = {}
|
||||
for k in pairs(stations) do table.insert(keys, k) end
|
||||
table.sort(keys)
|
||||
|
||||
local fstations = {}
|
||||
io.write("{")
|
||||
for _, station in ipairs(keys) do
|
||||
local avg = ((stations[station].sum / 10) / stations[station].count)
|
||||
local res_str = string.format("%s=%.1f/%.1f/%.1f", station, stations[station].min, avg, stations[station].max)
|
||||
table.insert(fstations, res_str)
|
||||
end
|
||||
io.write(table.concat(fstations, ","))
|
||||
print("}")
|
||||
@@ -17,7 +17,7 @@ smol = "2.0.1"
|
||||
easy-parallel = "3.3.1"
|
||||
clap = { version = "4.5.13", features = ["derive"] }
|
||||
colored = "2.1.0"
|
||||
ptr_hash = { git = "https://github.com/ragnargrootkoerkamp/ptrhash", default_features = false }
|
||||
ptr_hash = { git = "https://github.com/ragnargrootkoerkamp/ptrhash", default-features = false }
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = { version = "0.5.1", features = ["html_reports"] }
|
||||
|
||||
@@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use onebrc::implementations::flare_flo::run;
|
||||
|
||||
pub fn criterion_benchmark(c: &mut Criterion) {
|
||||
c.bench_function("flareflo", |b| b.iter(|| run()));
|
||||
c.bench_function("flareflo", |b| b.iter(run));
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
|
||||
@@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use onebrc::implementations::libraries::run;
|
||||
|
||||
pub fn criterion_benchmark(c: &mut Criterion) {
|
||||
c.bench_function("libraries", |b| b.iter(|| run()));
|
||||
c.bench_function("libraries", |b| b.iter(run));
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
|
||||
@@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use onebrc::implementations::multi_threaded::run;
|
||||
|
||||
pub fn criterion_benchmark(c: &mut Criterion) {
|
||||
c.bench_function("multithreaded", |b| b.iter(|| run()));
|
||||
c.bench_function("multithreaded", |b| b.iter(run));
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
|
||||
@@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use onebrc::implementations::multi_threaded_smol::run;
|
||||
|
||||
pub fn criterion_benchmark(c: &mut Criterion) {
|
||||
c.bench_function("multithreadedsmol", |b| b.iter(|| run()));
|
||||
c.bench_function("multithreadedsmol", |b| b.iter(run));
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
|
||||
@@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use onebrc::implementations::phcs::run;
|
||||
|
||||
pub fn criterion_benchmark(c: &mut Criterion) {
|
||||
c.bench_function("phcs", |b| b.iter(|| run()));
|
||||
c.bench_function("phcs", |b| b.iter(run));
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
|
||||
@@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use onebrc::implementations::reference_impl::run;
|
||||
|
||||
pub fn criterion_benchmark(c: &mut Criterion) {
|
||||
c.bench_function("reference", |b| b.iter(|| run()));
|
||||
c.bench_function("reference", |b| b.iter(run));
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
|
||||
@@ -2,7 +2,7 @@ use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use onebrc::implementations::single_thread::run;
|
||||
|
||||
pub fn criterion_benchmark(c: &mut Criterion) {
|
||||
c.bench_function("singlethread", |b| b.iter(|| run()));
|
||||
c.bench_function("singlethread", |b| b.iter(run));
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
|
||||
@@ -2,6 +2,7 @@ use crate::models::station_measurements::StationMeasurements;
|
||||
use crate::utils::{hash, parse};
|
||||
use memmap2::MmapOptions;
|
||||
use rustc_hash::{FxBuildHasher, FxHashMap as HashMap};
|
||||
use std::slice::from_raw_parts;
|
||||
use std::sync::mpsc;
|
||||
use std::time::Instant;
|
||||
use std::{fs::File, thread};
|
||||
@@ -13,9 +14,11 @@ pub fn run() {
|
||||
const FILE_PATH: &str = "../../../measurements.txt";
|
||||
let file = File::open(FILE_PATH).expect("File measurements.txt not found");
|
||||
let mmap = unsafe { MmapOptions::new().map(&file).unwrap() };
|
||||
let mmap_ptr = mmap.as_ptr();
|
||||
let file_length = mmap.len();
|
||||
let hasher = FxBuildHasher;
|
||||
let mut stations: HashMap<u64, (String, StationMeasurements)> =
|
||||
// Even if I could now just use the byte slice as a key, doing the hash is still faster
|
||||
let mut stations: HashMap<u64, (&[u8], StationMeasurements)> =
|
||||
HashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher);
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let cores = thread::available_parallelism().unwrap().into();
|
||||
@@ -39,9 +42,9 @@ pub fn run() {
|
||||
for i in 0..cores {
|
||||
let tx = tx.clone();
|
||||
let (start, end) = *bounds.get(i).unwrap();
|
||||
let mmap_slice = &mmap[start..end];
|
||||
let mmap_slice = unsafe { from_raw_parts(mmap_ptr.add(start), end - start) };
|
||||
s.spawn(move || {
|
||||
let mut t_stations: HashMap<u64, (String, StationMeasurements)> =
|
||||
let mut t_stations: HashMap<u64, (&[u8], StationMeasurements)> =
|
||||
HashMap::with_capacity_and_hasher(DEFAULT_HASHMAP_LENGTH, hasher);
|
||||
for line in mmap_slice.split(|&byte| byte == b'\n') {
|
||||
if line.is_empty() {
|
||||
@@ -49,7 +52,6 @@ pub fn run() {
|
||||
}
|
||||
let (station, temp) = line.rsplit_once(|&byte| byte == b';').unwrap();
|
||||
let hash = hash::bytes(station);
|
||||
let station = unsafe { std::str::from_utf8_unchecked(station) };
|
||||
let temp = parse::temp(temp);
|
||||
let measurements_option = t_stations.get_mut(&hash);
|
||||
if let Some((_, measurements)) = measurements_option {
|
||||
@@ -61,7 +63,7 @@ pub fn run() {
|
||||
count: 1,
|
||||
sum: temp,
|
||||
};
|
||||
t_stations.insert(hash, (station.to_string(), measurements));
|
||||
t_stations.insert(hash, (station, measurements));
|
||||
}
|
||||
}
|
||||
let _ = tx.send(t_stations);
|
||||
@@ -74,13 +76,14 @@ pub fn run() {
|
||||
if let Some((_, joined_measurements)) = joined_measurements_options {
|
||||
joined_measurements.merge(measurements);
|
||||
} else {
|
||||
stations.insert(*hash, (station.to_owned(), *measurements));
|
||||
stations.insert(*hash, (station, *measurements));
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut stations: Vec<String> = stations
|
||||
.iter()
|
||||
.map(|(_, (station, measurements))| {
|
||||
let station = unsafe { std::str::from_utf8_unchecked(station) };
|
||||
let measurements = measurements.to_string();
|
||||
#[cfg(feature = "json")]
|
||||
{
|
||||
|
||||
@@ -21,29 +21,28 @@ pub fn run() {
|
||||
let file_length = reader.seek(SeekFrom::End(0)).unwrap();
|
||||
let chunk_length = file_length as usize / cores;
|
||||
let mut bounds = Vec::with_capacity(cores + 1);
|
||||
bounds.push(0);
|
||||
for i in 1..cores {
|
||||
let mut start = 0;
|
||||
for i in 0..cores {
|
||||
let mut reader = BufReader::new(&file);
|
||||
let mut byte_start = chunk_length * i;
|
||||
let mut end = chunk_length * i;
|
||||
reader
|
||||
.seek(SeekFrom::Start(byte_start as u64))
|
||||
.seek(SeekFrom::Start(end as u64))
|
||||
.expect("could not seek");
|
||||
let mut line = Vec::with_capacity(108);
|
||||
let line_len = reader
|
||||
.read_until(b'\n', &mut line)
|
||||
.expect("could not read bytes");
|
||||
byte_start += line_len;
|
||||
bounds.push(byte_start as u64);
|
||||
end += line_len;
|
||||
bounds.push((start, end));
|
||||
start = end + 1;
|
||||
}
|
||||
bounds.push(file_length);
|
||||
for i in 0..cores {
|
||||
let tx = tx.clone();
|
||||
let mut currposition = *bounds.get(i).unwrap();
|
||||
let end = *bounds.get(i + 1).unwrap();
|
||||
let (mut currposition, end) = *bounds.get(i).unwrap();
|
||||
s.spawn(move || {
|
||||
let file = File::open(FILE_PATH).expect("File measurements.txt not found");
|
||||
let mut reader = BufReader::new(&file);
|
||||
reader.seek(SeekFrom::Start(currposition)).unwrap();
|
||||
reader.seek(SeekFrom::Start(currposition as u64)).unwrap();
|
||||
let mut t_stations: HashMap<u64, (String, StationMeasurements)> =
|
||||
HashMap::with_capacity(DEFAULT_HASHMAP_LENGTH);
|
||||
let mut line = Vec::with_capacity(108);
|
||||
@@ -70,7 +69,7 @@ pub fn run() {
|
||||
};
|
||||
t_stations.insert(hash, (station.to_string(), measurements));
|
||||
}
|
||||
currposition += line_len as u64;
|
||||
currposition += line_len;
|
||||
if currposition >= end {
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ pub struct Mmap<'a> {
|
||||
|
||||
/// To properly dispose of the mmap we have to manually call munmap.
|
||||
/// So implementing drop for this smart-pointer type is necessary.
|
||||
impl<'a> Drop for Mmap<'a> {
|
||||
impl Drop for Mmap<'_> {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
munmap(
|
||||
@@ -25,7 +25,7 @@ impl<'a> Drop for Mmap<'a> {
|
||||
|
||||
// anti-pattern for non-smart pointer types.
|
||||
// ref: https://rust-unofficial.github.io/patterns/anti_patterns/deref.html
|
||||
impl<'a> Deref for Mmap<'a> {
|
||||
impl Deref for Mmap<'_> {
|
||||
type Target = [u8];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
@@ -50,7 +50,7 @@ impl<'a> Mmap<'a> {
|
||||
// We can advise the kernel on how we intend to use the mmap.
|
||||
// But this did not improve my read performance in a meaningful way
|
||||
madvise(m, size, MADV_WILLNEED);
|
||||
return Self::new(std::slice::from_raw_parts(m as *const u8, size));
|
||||
Self::new(std::slice::from_raw_parts(m as *const u8, size))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user