Adding Nils Semmelrock's submission
nothing fancy, just work on chunks in parallel and optimize bottlenecks
This commit is contained in:
parent
1c74049991
commit
12ae36ade1
19
calculate_average_nstng.sh
Executable file
19
calculate_average_nstng.sh
Executable file
@ -0,0 +1,19 @@
|
||||
#!/bin/bash
|
||||
#
|
||||
# Copyright 2023 The original authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
JAVA_OPTS="-Xmx16G"
|
||||
time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_nstng
|
271
src/main/java/dev/morling/onebrc/CalculateAverage_nstng.java
Normal file
271
src/main/java/dev/morling/onebrc/CalculateAverage_nstng.java
Normal file
@ -0,0 +1,271 @@
|
||||
/*
|
||||
* Copyright 2023 The original authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package dev.morling.onebrc;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
public class CalculateAverage_nstng {
|
||||
|
||||
private static final File FILE = new File("./measurements.txt");
|
||||
|
||||
private static class MeasurementAggregator {
|
||||
private double min;
|
||||
private double max;
|
||||
private long sum;
|
||||
private long count;
|
||||
|
||||
public MeasurementAggregator(double min, double max, long sum, long count) {
|
||||
|
||||
this.min = min;
|
||||
this.max = max;
|
||||
this.sum = sum;
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "%s/%s/%s".formatted(min, round(sum / 10.0 / count), max);
|
||||
}
|
||||
|
||||
private double round(double value) {
|
||||
return Math.round(value * 10.0) / 10.0;
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
HashMap<BytesKey, MeasurementAggregator>[] maps;
|
||||
Thread[] threads;
|
||||
try (RandomAccessFile raf = new RandomAccessFile(FILE, "r")) {
|
||||
// calc num threads to use
|
||||
int availCpus = Runtime.getRuntime().availableProcessors();
|
||||
// make sure to use at least num of threads so that byte chunks fit into an array
|
||||
long fileLength = raf.length();
|
||||
int minThreads = (int) Math.ceil(fileLength / (1.0 * Integer.MAX_VALUE));
|
||||
int numThreads = Math.max(minThreads, availCpus);
|
||||
|
||||
// create threads - each thread processes a chunk of the file
|
||||
// the chunks have all equal size (modulo take care that a chunk does not
|
||||
// start/end in the middle of a line)
|
||||
threads = new Thread[numThreads];
|
||||
maps = new HashMap[numThreads];
|
||||
long chunkSize = fileLength / numThreads;
|
||||
long lastChunkEnd = 0;
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
raf.seek(Math.min(lastChunkEnd + chunkSize, fileLength));
|
||||
readUntilLineBreak(raf);
|
||||
long chunkEnd = raf.getFilePointer();
|
||||
long finalLastChunkEnd = lastChunkEnd;
|
||||
// 413 possible cities
|
||||
maps[i] = new HashMap<>(413);
|
||||
HashMap<BytesKey, MeasurementAggregator> finalMap = maps[i];
|
||||
threads[i] = new Thread(() -> {
|
||||
try {
|
||||
// do not work on empty chunks
|
||||
if (chunkEnd - finalLastChunkEnd <= 0)
|
||||
return;
|
||||
// -1: left from found line break
|
||||
processChunk(finalLastChunkEnd, chunkEnd - 1, finalMap);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
lastChunkEnd = chunkEnd;
|
||||
}
|
||||
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
// run it ...
|
||||
for (Thread thread : threads) {
|
||||
thread.start();
|
||||
}
|
||||
// ... and wait for all threads to finish
|
||||
for (Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
|
||||
// merge results
|
||||
TreeMap<String, MeasurementAggregator> resultMap = new TreeMap<>();
|
||||
for (HashMap<BytesKey, MeasurementAggregator> map : maps) {
|
||||
for (Map.Entry<BytesKey, MeasurementAggregator> entry : map.entrySet()) {
|
||||
String stringKey = entry.getKey().asString();
|
||||
MeasurementAggregator agg = resultMap.get(stringKey);
|
||||
if (agg == null) {
|
||||
resultMap.put(stringKey, entry.getValue());
|
||||
}
|
||||
else {
|
||||
agg.min = Math.min(entry.getValue().min, agg.min);
|
||||
agg.max = Math.max(entry.getValue().max, agg.max);
|
||||
agg.count = entry.getValue().count + agg.count;
|
||||
agg.sum = entry.getValue().sum + agg.sum;
|
||||
}
|
||||
}
|
||||
}
|
||||
System.out.println(resultMap);
|
||||
}
|
||||
|
||||
private static void processChunk(long fromPos, long toPos, HashMap<BytesKey, MeasurementAggregator> map)
|
||||
throws IOException {
|
||||
|
||||
byte[] byteChunk = new byte[(int) (toPos - fromPos)];
|
||||
try (RandomAccessFile raf = new RandomAccessFile(FILE, "r")) {
|
||||
raf.seek(fromPos);
|
||||
raf.read(byteChunk);
|
||||
}
|
||||
IndexedByteArray chunk = new IndexedByteArray(byteChunk);
|
||||
|
||||
// reads one line from right to left in each iteration
|
||||
while (chunk.pos > 0) {
|
||||
double mes = getMeasurement(chunk);
|
||||
BytesKey cityKey = getCityKey(chunk);
|
||||
addOrUpdateEntry(map, cityKey, mes);
|
||||
}
|
||||
}
|
||||
|
||||
private static void addOrUpdateEntry(HashMap<BytesKey, MeasurementAggregator> map, BytesKey cityKey, double mes) {
|
||||
MeasurementAggregator agg = map.get(cityKey);
|
||||
if (agg == null) {
|
||||
map.put(cityKey, new MeasurementAggregator(mes, mes, (long) (mes * 10), 1));
|
||||
}
|
||||
else {
|
||||
agg.min = Math.min(agg.min, mes);
|
||||
agg.max = Math.max(agg.max, mes);
|
||||
agg.sum += (long) (mes * 10);
|
||||
agg.count++;
|
||||
}
|
||||
}
|
||||
|
||||
// did not know how much of a bottleneck new String(byteArray) can be :)
|
||||
// use the byte array representation of a city as a HashMap key
|
||||
// needs to be wrapped so that hashCode/equals can be defined
|
||||
private static BytesKey getCityKey(IndexedByteArray chunk) {
|
||||
// city name length >=3 known
|
||||
int cityLength = 3;
|
||||
chunk.dec(3);
|
||||
while (chunk.pos >= 0 && chunk.getCurrent() != '\n') {
|
||||
chunk.dec();
|
||||
cityLength++;
|
||||
}
|
||||
byte[] cityBytes = new byte[cityLength];
|
||||
// we are on '\n' or out of bounds by -1 -> srcPos is chunk.pos + 1
|
||||
System.arraycopy(chunk.array, chunk.pos + 1, cityBytes, 0, cityLength);
|
||||
chunk.dec(); // move away from '\n' (or further out of bounds)
|
||||
|
||||
return new BytesKey(cityBytes);
|
||||
}
|
||||
|
||||
// did not know how much of a bottleneck Double.parseDouble(s) can be :)
|
||||
// parses a double value from right to left
|
||||
// massively uses that a measurement value is very restricted
|
||||
// (<100, >-100, exactly one decimal place)
|
||||
private static double getMeasurement(IndexedByteArray chunk) {
|
||||
int asciiNumbersOffset = 48;
|
||||
double mes = (chunk.getCurrent() - asciiNumbersOffset) / 10.0; // 10^-1 place
|
||||
chunk.dec(2); // jump over '.'
|
||||
mes += chunk.getCurrent() - asciiNumbersOffset; // 10^0 place
|
||||
chunk.dec();
|
||||
switch (chunk.getCurrent()) {
|
||||
case '-' -> {
|
||||
mes *= -1; // <10 and negative
|
||||
chunk.dec();
|
||||
}
|
||||
case ';' -> {
|
||||
// do nothing - we will move to the left at the end
|
||||
}
|
||||
default -> {
|
||||
mes += (chunk.getCurrent() - asciiNumbersOffset) * 10; // 10^1 place
|
||||
chunk.dec();
|
||||
}
|
||||
}
|
||||
if (chunk.getCurrent() == '-') {
|
||||
mes *= -1; // >=10 and negative
|
||||
chunk.dec();
|
||||
}
|
||||
chunk.dec(); // move away from ';'
|
||||
return mes;
|
||||
}
|
||||
|
||||
// just a thin wrapper around a byte array - makes handling return values and
|
||||
// position updates more easy
|
||||
private static class IndexedByteArray {
|
||||
private final byte[] array;
|
||||
private int pos;
|
||||
|
||||
public IndexedByteArray(byte[] array) {
|
||||
|
||||
this.array = array;
|
||||
this.pos = array.length - 1;
|
||||
}
|
||||
|
||||
public void dec(int by) {
|
||||
pos -= by;
|
||||
}
|
||||
|
||||
public void dec() {
|
||||
pos--;
|
||||
}
|
||||
|
||||
public int getCurrent() {
|
||||
return array[pos];
|
||||
}
|
||||
}
|
||||
|
||||
private static void readUntilLineBreak(RandomAccessFile raf) throws IOException {
|
||||
boolean eor = false;
|
||||
|
||||
while (!eor) {
|
||||
int c = raf.read();
|
||||
if (c == -1 || c == '\n') {
|
||||
eor = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// inspired by https://www.baeldung.com/java-map-key-byte-array
|
||||
private static class BytesKey {
|
||||
private final byte[] array;
|
||||
private int hash;
|
||||
|
||||
public BytesKey(byte[] array) {
|
||||
this.array = array;
|
||||
// pre-calculated value with perfect hash function (by trial and error :P)
|
||||
// -> i.e., can be used also for equals
|
||||
this.hash = 1;
|
||||
for (byte b : array)
|
||||
this.hash = 11 * this.hash + b;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
return hash == o.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return hash;
|
||||
}
|
||||
|
||||
public String asString() {
|
||||
return new String(array);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user