diff --git a/calculate_average_gnabyl.sh b/calculate_average_gnabyl.sh new file mode 100755 index 0000000..14c449a --- /dev/null +++ b/calculate_average_gnabyl.sh @@ -0,0 +1,23 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Uncomment below to use sdk +# source "$HOME/.sdkman/bin/sdkman-init.sh" +# sdk use java 21.0.1-graal 1>&2 + +JAVA_OPTS="" +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_gnabyl diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java b/src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java new file mode 100644 index 0000000..7c9769e --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java @@ -0,0 +1,283 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class CalculateAverage_gnabyl { + + private static final String FILE = "./measurements.txt"; + + private static final int NB_CHUNKS = 8; + + private static record Chunk(long start, int bytesCount, MappedByteBuffer mappedByteBuffer) { + } + + private static int reduceSizeToFitLineBreak(FileChannel channel, long startPosition, int startSize) + throws IOException { + long currentPosition = startPosition + startSize - 1; + int realSize = startSize; + + if (currentPosition >= channel.size()) { + currentPosition = channel.size() - 1; + realSize = (int) (currentPosition - startPosition); + } + + while (currentPosition >= startPosition) { + channel.position(currentPosition); + byte byteValue = channel.map(FileChannel.MapMode.READ_ONLY, currentPosition, 1).get(); + if (byteValue == '\n') { + // found line break + break; + } + + realSize--; + currentPosition--; + } + return realSize; + } + + private static List readChunks(long nbChunks) throws IOException { + RandomAccessFile file = new RandomAccessFile(FILE, "rw"); + List res = new ArrayList<>(); + FileChannel channel = file.getChannel(); + long bytesCount = channel.size(); + long bytesPerChunk = bytesCount / nbChunks; + + // Memory map the file in read-only mode + // TODO: Optimize using threads + long currentPosition = 0; + for (int i = 0; i < nbChunks; i++) { + int startSize = (int) bytesPerChunk; + int realSize = startSize; + + if (i == nbChunks - 1) { + realSize = (int) (bytesCount - currentPosition); + MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, currentPosition, + realSize); + + res.add(new Chunk(currentPosition, realSize, mappedByteBuffer)); + break; + } + + // Adjust size so that it ends on a newline + realSize = reduceSizeToFitLineBreak(channel, currentPosition, startSize); + + MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, currentPosition, + realSize); + + res.add(new Chunk(currentPosition, realSize, mappedByteBuffer)); + currentPosition += realSize; + } + + channel.close(); + file.close(); + + return res; + } + + private static class StationData { + private double sum, min, max; + private long count; + + public StationData(double value) { + this.count = 1; + this.sum = value; + this.min = value; + this.max = value; + } + + public void update(double value) { + this.count++; + this.sum += value; + this.min = Math.min(this.min, value); + this.max = Math.max(this.max, value); + } + + public double getMean() { + return sum / count; + } + + public double getMin() { + return min; + } + + public double getMax() { + return max; + } + + public void mergeWith(StationData other) { + this.sum += other.sum; + this.count += other.count; + this.min = Math.min(this.min, other.min); + this.max = Math.max(this.max, other.max); + } + + } + + static double round(double value) { + return Math.round(value * 10.0) / 10.0; + } + + private static class ChunkResult { + private Map data; + + public ChunkResult() { + data = new HashMap<>(); + } + + public StationData getData(String name) { + return data.get(name); + } + + public void addStation(String name, double value) { + this.data.put(name, new StationData(value)); + } + + public void print() { + var stationNames = new ArrayList(this.data.keySet()); + Collections.sort(stationNames); + System.out.print("{"); + for (int i = 0; i < stationNames.size() - 1; i++) { + var name = stationNames.get(i); + var stationData = data.get(name); + System.out.printf("%s=%.1f/%.1f/%.1f, ", name, round(stationData.getMin()), + round(stationData.getMean()), + round(stationData.getMax())); + } + var name = stationNames.get(stationNames.size() - 1); + var stationData = data.get(name); + System.out.printf("%s=%.1f/%.1f/%.1f", name, round(stationData.getMin()), + round(stationData.getMean()), + round(stationData.getMax())); + System.out.println("}"); + } + + public void mergeWith(ChunkResult other) { + for (Map.Entry entry : other.data.entrySet()) { + String stationName = entry.getKey(); + StationData otherStationData = entry.getValue(); + StationData thisStationData = this.data.get(stationName); + + if (thisStationData == null) { + this.data.put(stationName, otherStationData); + } + else { + thisStationData.mergeWith(otherStationData); + } + } + } + } + + private static ChunkResult processChunk(Chunk chunk) { + ChunkResult result = new ChunkResult(); + + // Perform processing on the chunk data + byte[] data = new byte[chunk.bytesCount()]; + chunk.mappedByteBuffer().get(data); + + // Process each line + String stationName; + double value; + int iSplit, iEol; + StationData stationData; + long negative; + for (int offset = 0; offset < data.length; offset++) { + // Find station name + for (iSplit = offset; data[iSplit] != ';'; iSplit++) { + } + stationName = new String(data, offset, iSplit - offset, StandardCharsets.UTF_8); + + // Find value + iSplit++; + negative = 1; + value = 0; + for (iEol = iSplit; data[iEol] != '\n'; iEol++) { + if (data[iEol] == '-') { + negative = -1; + continue; + } + if (data[iEol] == '.') { + value = value + (data[iEol + 1] - 48) / 10.0; + iEol += 2; + break; + } + value = value * 10 + data[iEol] - 48; + } + value *= negative; + + // Init & count + stationData = result.getData(stationName); + + if (stationData == null) { + result.addStation(stationName, value); + } + else { + stationData.update(value); + } + + offset = iEol; + } + + return result; + } + + private static ChunkResult processAllChunks(List chunks) throws InterruptedException, ExecutionException { + // var globalRes = new ChunkResult(); + // for (var chunk : chunks) { + // var chunkRes = processChunk(chunk); + // globalRes.mergeWith(chunkRes); + // } + // return globalRes; + + List> computeTasks = new ArrayList<>(); + + for (Chunk chunk : chunks) { + computeTasks.add(CompletableFuture.supplyAsync(() -> processChunk(chunk))); + } + + ChunkResult globalRes = null; + + for (CompletableFuture completedTask : computeTasks) { + ChunkResult chunkRes = completedTask.get(); + if (globalRes == null) { + globalRes = completedTask.get(); + } + else { + globalRes.mergeWith(chunkRes); + } + } + + return globalRes; + } + + public static void main(String[] args) throws IOException, InterruptedException, ExecutionException { + var chunks = readChunks(NB_CHUNKS); + var result = processAllChunks(chunks); + result.print(); + } +}