From 12ae36ade18fba051b0ce7af70e7d7364a28becf Mon Sep 17 00:00:00 2001 From: Nils Semmelrock <42540177+nstng@users.noreply.github.com> Date: Thu, 4 Jan 2024 23:31:47 +0100 Subject: [PATCH] Adding Nils Semmelrock's submission nothing fancy, just work on chunks in parallel and optimize bottlenecks --- calculate_average_nstng.sh | 19 ++ .../onebrc/CalculateAverage_nstng.java | 271 ++++++++++++++++++ 2 files changed, 290 insertions(+) create mode 100755 calculate_average_nstng.sh create mode 100644 src/main/java/dev/morling/onebrc/CalculateAverage_nstng.java diff --git a/calculate_average_nstng.sh b/calculate_average_nstng.sh new file mode 100755 index 0000000..fccca03 --- /dev/null +++ b/calculate_average_nstng.sh @@ -0,0 +1,19 @@ +#!/bin/bash +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +JAVA_OPTS="-Xmx16G" +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_nstng diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_nstng.java b/src/main/java/dev/morling/onebrc/CalculateAverage_nstng.java new file mode 100644 index 0000000..ab032e6 --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_nstng.java @@ -0,0 +1,271 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.*; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; + +public class CalculateAverage_nstng { + + private static final File FILE = new File("./measurements.txt"); + + private static class MeasurementAggregator { + private double min; + private double max; + private long sum; + private long count; + + public MeasurementAggregator(double min, double max, long sum, long count) { + + this.min = min; + this.max = max; + this.sum = sum; + this.count = count; + } + + @Override + public String toString() { + return "%s/%s/%s".formatted(min, round(sum / 10.0 / count), max); + } + + private double round(double value) { + return Math.round(value * 10.0) / 10.0; + } + } + + public static void main(String[] args) throws InterruptedException { + HashMap[] maps; + Thread[] threads; + try (RandomAccessFile raf = new RandomAccessFile(FILE, "r")) { + // calc num threads to use + int availCpus = Runtime.getRuntime().availableProcessors(); + // make sure to use at least num of threads so that byte chunks fit into an array + long fileLength = raf.length(); + int minThreads = (int) Math.ceil(fileLength / (1.0 * Integer.MAX_VALUE)); + int numThreads = Math.max(minThreads, availCpus); + + // create threads - each thread processes a chunk of the file + // the chunks have all equal size (modulo take care that a chunk does not + // start/end in the middle of a line) + threads = new Thread[numThreads]; + maps = new HashMap[numThreads]; + long chunkSize = fileLength / numThreads; + long lastChunkEnd = 0; + for (int i = 0; i < numThreads; i++) { + raf.seek(Math.min(lastChunkEnd + chunkSize, fileLength)); + readUntilLineBreak(raf); + long chunkEnd = raf.getFilePointer(); + long finalLastChunkEnd = lastChunkEnd; + // 413 possible cities + maps[i] = new HashMap<>(413); + HashMap finalMap = maps[i]; + threads[i] = new Thread(() -> { + try { + // do not work on empty chunks + if (chunkEnd - finalLastChunkEnd <= 0) + return; + // -1: left from found line break + processChunk(finalLastChunkEnd, chunkEnd - 1, finalMap); + } + catch (IOException e) { + throw new RuntimeException(e); + } + }); + lastChunkEnd = chunkEnd; + } + + } + catch (IOException e) { + throw new RuntimeException(e); + } + + // run it ... + for (Thread thread : threads) { + thread.start(); + } + // ... and wait for all threads to finish + for (Thread thread : threads) { + thread.join(); + } + + // merge results + TreeMap resultMap = new TreeMap<>(); + for (HashMap map : maps) { + for (Map.Entry entry : map.entrySet()) { + String stringKey = entry.getKey().asString(); + MeasurementAggregator agg = resultMap.get(stringKey); + if (agg == null) { + resultMap.put(stringKey, entry.getValue()); + } + else { + agg.min = Math.min(entry.getValue().min, agg.min); + agg.max = Math.max(entry.getValue().max, agg.max); + agg.count = entry.getValue().count + agg.count; + agg.sum = entry.getValue().sum + agg.sum; + } + } + } + System.out.println(resultMap); + } + + private static void processChunk(long fromPos, long toPos, HashMap map) + throws IOException { + + byte[] byteChunk = new byte[(int) (toPos - fromPos)]; + try (RandomAccessFile raf = new RandomAccessFile(FILE, "r")) { + raf.seek(fromPos); + raf.read(byteChunk); + } + IndexedByteArray chunk = new IndexedByteArray(byteChunk); + + // reads one line from right to left in each iteration + while (chunk.pos > 0) { + double mes = getMeasurement(chunk); + BytesKey cityKey = getCityKey(chunk); + addOrUpdateEntry(map, cityKey, mes); + } + } + + private static void addOrUpdateEntry(HashMap map, BytesKey cityKey, double mes) { + MeasurementAggregator agg = map.get(cityKey); + if (agg == null) { + map.put(cityKey, new MeasurementAggregator(mes, mes, (long) (mes * 10), 1)); + } + else { + agg.min = Math.min(agg.min, mes); + agg.max = Math.max(agg.max, mes); + agg.sum += (long) (mes * 10); + agg.count++; + } + } + + // did not know how much of a bottleneck new String(byteArray) can be :) + // use the byte array representation of a city as a HashMap key + // needs to be wrapped so that hashCode/equals can be defined + private static BytesKey getCityKey(IndexedByteArray chunk) { + // city name length >=3 known + int cityLength = 3; + chunk.dec(3); + while (chunk.pos >= 0 && chunk.getCurrent() != '\n') { + chunk.dec(); + cityLength++; + } + byte[] cityBytes = new byte[cityLength]; + // we are on '\n' or out of bounds by -1 -> srcPos is chunk.pos + 1 + System.arraycopy(chunk.array, chunk.pos + 1, cityBytes, 0, cityLength); + chunk.dec(); // move away from '\n' (or further out of bounds) + + return new BytesKey(cityBytes); + } + + // did not know how much of a bottleneck Double.parseDouble(s) can be :) + // parses a double value from right to left + // massively uses that a measurement value is very restricted + // (<100, >-100, exactly one decimal place) + private static double getMeasurement(IndexedByteArray chunk) { + int asciiNumbersOffset = 48; + double mes = (chunk.getCurrent() - asciiNumbersOffset) / 10.0; // 10^-1 place + chunk.dec(2); // jump over '.' + mes += chunk.getCurrent() - asciiNumbersOffset; // 10^0 place + chunk.dec(); + switch (chunk.getCurrent()) { + case '-' -> { + mes *= -1; // <10 and negative + chunk.dec(); + } + case ';' -> { + // do nothing - we will move to the left at the end + } + default -> { + mes += (chunk.getCurrent() - asciiNumbersOffset) * 10; // 10^1 place + chunk.dec(); + } + } + if (chunk.getCurrent() == '-') { + mes *= -1; // >=10 and negative + chunk.dec(); + } + chunk.dec(); // move away from ';' + return mes; + } + + // just a thin wrapper around a byte array - makes handling return values and + // position updates more easy + private static class IndexedByteArray { + private final byte[] array; + private int pos; + + public IndexedByteArray(byte[] array) { + + this.array = array; + this.pos = array.length - 1; + } + + public void dec(int by) { + pos -= by; + } + + public void dec() { + pos--; + } + + public int getCurrent() { + return array[pos]; + } + } + + private static void readUntilLineBreak(RandomAccessFile raf) throws IOException { + boolean eor = false; + + while (!eor) { + int c = raf.read(); + if (c == -1 || c == '\n') { + eor = true; + } + } + } + + // inspired by https://www.baeldung.com/java-map-key-byte-array + private static class BytesKey { + private final byte[] array; + private int hash; + + public BytesKey(byte[] array) { + this.array = array; + // pre-calculated value with perfect hash function (by trial and error :P) + // -> i.e., can be used also for equals + this.hash = 1; + for (byte b : array) + this.hash = 11 * this.hash + b; + } + + @Override + public boolean equals(Object o) { + return hash == o.hashCode(); + } + + @Override + public int hashCode() { + return hash; + } + + public String asString() { + return new String(array); + } + } +}