diff --git a/calculate_average_yavuztas.sh b/calculate_average_yavuztas.sh new file mode 100755 index 0000000..8c351c2 --- /dev/null +++ b/calculate_average_yavuztas.sh @@ -0,0 +1,22 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source "$HOME/.sdkman/bin/sdkman-init.sh" +sdk use java 21.0.1-graal 1>&2 + +JAVA_OPTS="-Xms1g -Xmx1g" +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_yavuztas diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_yavuztas.java b/src/main/java/dev/morling/onebrc/CalculateAverage_yavuztas.java new file mode 100644 index 0000000..bef902e --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_yavuztas.java @@ -0,0 +1,287 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; + +public class CalculateAverage_yavuztas { + + private static final Path FILE = Path.of("./measurements.txt"); + + static class Measurement { + private double min; + private double max; + private double sum; + private int count = 1; + + public Measurement(double initial) { + this.min = initial; + this.max = initial; + this.sum = initial; + } + + public String toString() { + return round(this.min) + "/" + round(this.sum / this.count) + "/" + round(this.max); + } + + private double round(double value) { + return Math.round(value * 10.0) / 10.0; + } + } + + static class KeyBuffer { + + ByteBuffer value; + int hash; + + public KeyBuffer(ByteBuffer buffer) { + this.value = buffer; + this.hash = buffer.hashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + + final KeyBuffer keyBuffer = (KeyBuffer) o; + if (o == null || getClass() != o.getClass() || this.hash != keyBuffer.hash) + return false; + + return this.value.equals(keyBuffer.value); + } + + @Override + public int hashCode() { + return this.hash; + } + + @Override + public String toString() { + final int limit = this.value.limit(); + final byte[] bytes = new byte[limit]; + this.value.get(bytes); + return new String(bytes, 0, limit, StandardCharsets.UTF_8); + } + } + + static class FixedRegionDataAccessor { + + static final byte SEMI_COLON = 59; // ';' + static final byte LINE_BREAK = 10; // '\n' + + final byte[] workBuffer = new byte[256]; // assuming max 256 bytes for a row is enough + + long startPos; + long size; + ByteBuffer buffer; + int position; // relative + + public FixedRegionDataAccessor(long startPos, long size, ByteBuffer buffer) { + this.startPos = startPos; + this.size = size; + this.buffer = buffer; + } + + void traverse(BiConsumer consumer) { + + int semiColonPos = 0; + int lineBreakPos = 0; + while (this.buffer.hasRemaining()) { + + while ((this.workBuffer[0] = this.buffer.get()) != LINE_BREAK) { + if (this.workBuffer[0] == SEMI_COLON) { // save semicolon pos + semiColonPos = this.buffer.position(); // semicolon exclusive + } + } + // found linebreak + lineBreakPos = this.buffer.position(); + + this.buffer.position(this.position); // set back to line start + final int length1 = semiColonPos - this.position; // station length + final int length2 = lineBreakPos - semiColonPos; // temperature length + + final ByteBuffer station = getRef(length1); // read station + final String temperature = readString(length2); // read temperature + + this.position = lineBreakPos; // skip to line end + + consumer.accept(new KeyBuffer(station), Double.parseDouble(temperature)); + } + } + + Map accumulate(Map initial) { + + traverse((station, temperature) -> { + initial.compute(station, (k, m) -> { + if (m == null) { + return new Measurement(temperature); + } + // aggregate + m.min = Math.min(m.min, temperature); + m.max = Math.max(m.max, temperature); + m.sum += temperature; + m.count++; + return m; + }); + }); + + return initial; + } + + String readString(int length) { + this.buffer.get(this.workBuffer, 0, length); + return new String(this.workBuffer, 0, length - 1, // strip the last char + StandardCharsets.UTF_8); + } + + ByteBuffer getRef(int length) { + final ByteBuffer slice = this.buffer.slice().limit(length - 1); + skip(this.buffer, length); + return slice; + } + + static void skip(ByteBuffer buffer, int length) { + final int pos = buffer.position(); + buffer.position(pos + length); + } + + } + + static class FastDataReader implements Closeable { + + private final FixedRegionDataAccessor[] accessors; + private final ExecutorService mergerThread; + private final ExecutorService accessorPool; + + public FastDataReader(Path path) throws IOException { + var concurrency = Runtime.getRuntime().availableProcessors(); + final long fileSize = Files.size(path); + long regionSize = fileSize / concurrency; + + if (regionSize > Integer.MAX_VALUE) { + // TODO multiply concurrency and try again + throw new IllegalArgumentException("Bigger than integer!"); + } + // handling extreme cases + if (regionSize <= 256) { // small file, no need concurrency + concurrency = 1; + regionSize = fileSize; + } + + long startPosition = 0; + this.accessors = new FixedRegionDataAccessor[concurrency]; + for (int i = 0; i < concurrency - 1; i++) { + // map regions + try (final FileChannel channel = (FileChannel) Files.newByteChannel(path, StandardOpenOption.READ)) { + final long maxSize = startPosition + regionSize > fileSize ? fileSize - startPosition : regionSize; + final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, startPosition, maxSize); + this.accessors[i] = new FixedRegionDataAccessor(startPosition, maxSize, buffer); + // adjust positions back and forth until we find a linebreak! + final int closestPos = findClosestLineEnd((int) maxSize - 1, buffer); + buffer.limit(closestPos + 1); + startPosition += closestPos + 1; + } + } + // map the last region + try (final FileChannel channel = (FileChannel) Files.newByteChannel(path, StandardOpenOption.READ)) { + final long maxSize = fileSize - startPosition; // last region will take the rest + final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, startPosition, maxSize); + this.accessors[concurrency - 1] = new FixedRegionDataAccessor(startPosition, maxSize, buffer); + } + // create executors + this.mergerThread = Executors.newSingleThreadExecutor(); + this.accessorPool = Executors.newFixedThreadPool(concurrency); + } + + void readAndCollect(Map output) { + for (final FixedRegionDataAccessor accessor : this.accessors) { + this.accessorPool.submit(() -> { + final Map partial = accessor.accumulate(new HashMap<>(1 << 10, 1)); // aka 1k + this.mergerThread.submit(() -> mergeMaps(output, partial)); + }); + } + } + + @Override + public void close() { + try { + this.accessorPool.shutdown(); + this.accessorPool.awaitTermination(60, TimeUnit.SECONDS); + this.mergerThread.shutdown(); + this.mergerThread.awaitTermination(60, TimeUnit.SECONDS); + } + catch (Exception e) { + this.accessorPool.shutdownNow(); + this.mergerThread.shutdownNow(); + } + } + + /** + * Scans the given buffer to the left + */ + private static int findClosestLineEnd(int regionSize, ByteBuffer buffer) { + int position = regionSize; + int left = regionSize; + while (buffer.get(position) != FixedRegionDataAccessor.LINE_BREAK) { + position = --left; + } + return position; + } + + private static Map mergeMaps(Map map1, Map map2) { + map2.forEach((s, measurement) -> { + map1.merge(s, measurement, (m1, m2) -> { + m1.min = Math.min(m1.min, m2.min); + m1.max = Math.max(m1.max, m2.max); + m1.sum += m2.sum; + m1.count += m2.count; + return m1; + }); + }); + + return map1; + } + + } + + public static void main(String[] args) throws IOException, InterruptedException { + final Map output = new HashMap<>(1 << 10, 1); // aka 1k + try (final FastDataReader reader = new FastDataReader(FILE)) { + reader.readAndCollect(output); + } + + final TreeMap sorted = new TreeMap<>(); + output.forEach((s, measurement) -> sorted.put(s.toString(), measurement)); + System.out.println(sorted); + } + +}