From d17619c95c1b0a90f03e860b169d11ed5ec255c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Algirdas=20Ra=C5=A1=C4=8Dius?= Date: Wed, 10 Jan 2024 23:35:37 +0200 Subject: [PATCH] Quick and dirty first version (#215) * Quick and dirty first version * Update script to new standard --- calculate_average_algirdasrascius.sh | 19 ++ .../CalculateAverage_algirdasrascius.java | 272 ++++++++++++++++++ 2 files changed, 291 insertions(+) create mode 100755 calculate_average_algirdasrascius.sh create mode 100644 src/main/java/dev/morling/onebrc/CalculateAverage_algirdasrascius.java diff --git a/calculate_average_algirdasrascius.sh b/calculate_average_algirdasrascius.sh new file mode 100755 index 0000000..de79606 --- /dev/null +++ b/calculate_average_algirdasrascius.sh @@ -0,0 +1,19 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +JAVA_OPTS="" +java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_algirdasrascius diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_algirdasrascius.java b/src/main/java/dev/morling/onebrc/CalculateAverage_algirdasrascius.java new file mode 100644 index 0000000..5ad4b12 --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_algirdasrascius.java @@ -0,0 +1,272 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + +public class CalculateAverage_algirdasrascius { + + private static final String FILE = "./measurements.txt"; + + private static final int MAX_NAMES = 10000; + private static final int MAP_ENTRY_COUNT = 20011; // Prime exceeding double MAX_NAMES count + private static final int MAX_NAME_LENGTH_IN_CHARS = 100; + private static final int MAX_NAME_LENGTH_IN_BYTES = MAX_NAME_LENGTH_IN_CHARS * 4; + private static final int NAME_BUFFER_LENGTH = MAX_NAMES * MAX_NAME_LENGTH_IN_BYTES; + + private static class AggregatorMap { + private final AggregatorMapEntry[] entries = new AggregatorMapEntry[MAP_ENTRY_COUNT]; + private final byte[] nameBuffer = new byte[NAME_BUFFER_LENGTH]; + private int nameBufferEnd = 0; + + void add(byte[] buffer, int nameStart, int nameEnd, int nameHash, short value) { + getEntry(buffer, nameStart, nameEnd, nameHash).accumulate(value); + } + + void combineWith(AggregatorMap other) { + for (int i = 0; i < MAP_ENTRY_COUNT; i++) { + AggregatorMapEntry entry = other.entries[i]; + while (entry != null) { + getEntry(other.nameBuffer, entry.nameStart, entry.nameEnd, entry.nameHash).combineWith(entry); + entry = entry.nextEntry; + } + } + } + + void printResult() { + Map sortedMeasurements = new TreeMap<>(); + for (int i = 0; i < MAP_ENTRY_COUNT; i++) { + AggregatorMapEntry entry = entries[i]; + while (entry != null) { + String name = new String(nameBuffer, entry.nameStart, entry.nameEnd - entry.nameStart, StandardCharsets.UTF_8); + sortedMeasurements.put(name, entry.result()); + entry = entry.nextEntry; + } + } + System.out.println(sortedMeasurements); + } + + private AggregatorMapEntry getEntry(byte[] buffer, int nameStart, int nameEnd, int nameHash) { + int index = (nameHash & 0x7FFFFFFF) % MAP_ENTRY_COUNT; + AggregatorMapEntry firstEntry = entries[index]; + AggregatorMapEntry entry = firstEntry; + while (entry != null && (entry.nameHash != nameHash || !Arrays.equals(buffer, nameStart, nameEnd, nameBuffer, entry.nameStart, entry.nameEnd))) { + entry = entry.nextEntry; + } + if (entry == null) { + int entryNameStart = nameBufferEnd; + int nameLength = nameEnd - nameStart; + System.arraycopy(buffer, nameStart, nameBuffer, entryNameStart, nameLength); + nameBufferEnd += nameLength; + entry = new AggregatorMapEntry(entryNameStart, nameBufferEnd, nameHash, firstEntry); + entries[index] = entry; + } + return entry; + } + + } + + private static class AggregatorMapEntry { + private final int nameStart; + private final int nameEnd; + private final int nameHash; + private final AggregatorMapEntry nextEntry; + private short min = Short.MAX_VALUE; + private short max = Short.MIN_VALUE; + private long sum; + private int count; + + public AggregatorMapEntry(int nameStart, int nameEnd, int nameHash, AggregatorMapEntry nextEntry) { + this.nameStart = nameStart; + this.nameEnd = nameEnd; + this.nameHash = nameHash; + this.nextEntry = nextEntry; + } + + void accumulate(short value) { + if (min > value) { + min = value; + } + if (max < value) { + max = value; + } + sum += value; + count++; + } + + void combineWith(AggregatorMapEntry other) { + if (min > other.min) { + min = other.min; + } + if (max < other.max) { + max = other.max; + } + sum += other.sum; + count += other.count; + } + + ResultRow result() { + // return new ResultRow(min, (short) ((sum + count / 2) / count), max); + return new ResultRow(min, (short) Math.round(((double) sum) / count), max); + } + } + + private record ResultRow(short min, short mean, short max) { + public String toString() { + return round(min) + "/" + round(mean) + "/" + round(max); + } + + private String round(short value) { + return value >= 0 + ? ((value / 10) + "." + (value % 10)) + : ("-" + (-value / 10) + "." + (-value % 10)); + } + } + + private static final int READ_CHUNK_SIZE = 1024 * 1024; + private static final int MAX_LINE_SIZE = MAX_NAME_LENGTH_IN_BYTES + 10; + private static final int READ_BUFFER_SIZE = READ_CHUNK_SIZE + MAX_LINE_SIZE; + + private static class ReaderTask implements Callable { + private final AggregatorMap aggregatorMap = new AggregatorMap(); + private final byte[] buffer = new byte[READ_BUFFER_SIZE]; + private final ByteBuffer byteBuffer = ByteBuffer.wrap(buffer); + private final FileChannel channel; + private final AtomicLong nextChunkPosition; + + public ReaderTask(FileChannel channel, AtomicLong nextChunkPosition) { + this.channel = channel; + this.nextChunkPosition = nextChunkPosition; + } + + @Override + public AggregatorMap call() throws Exception { + while (processChunk()) { + } + return aggregatorMap; + } + + private boolean processChunk() throws IOException { + long channelPosition = nextChunkPosition.getAndAdd(READ_CHUNK_SIZE); + int endIndex = 0; + + byteBuffer.rewind(); + do { + int bytesRead = channel.read(byteBuffer, channelPosition + endIndex); + if (bytesRead < 0) { + break; + } + endIndex += bytesRead; + } while (endIndex < READ_CHUNK_SIZE); + + int index = 0; + + // If this is not the first chunk skip till first line end + if (channelPosition != 0) { + byte v; + while (index < endIndex && buffer[index] != '\n') { + index++; + } + index++; + } + + if (endIndex > READ_CHUNK_SIZE + 1) { + endIndex = READ_CHUNK_SIZE + 1; + } + + while (index < endIndex) { + index = processLine(index); + } + + return endIndex > READ_CHUNK_SIZE; + } + + private int processLine(int index) { + // Read station name + byte v; + int nameStart = index; + int nameHash = 0; + while ((v = buffer[index]) != ';') { + index++; + nameHash = 31 * nameHash + v; + } + int nameEnd = index; + + // Skip ; + index++; + + // Read temperature value + v = buffer[index++]; + boolean negative = false; + if (v == '-') { + negative = true; + v = buffer[index++]; + } + int value = v - '0'; + v = buffer[index++]; + if (v != '.') { + value = value * 10 + (v - '0'); + index++; + } + v = buffer[index++]; + value = value * 10 + (v - '0'); + if (negative) { + value = -value; + } + + // Skip line feed + index++; + + // System.out.println(new String(buffer, nameStart, nameEnd - nameStart) + " = " + value); + aggregatorMap.add(buffer, nameStart, nameEnd, nameHash, (short) value); + return index; + } + + } + + public static void main(String[] args) throws Exception { + try (FileInputStream stream = new FileInputStream(FILE)) { + FileChannel channel = stream.getChannel(); + AtomicLong nextChunkPosition = new AtomicLong(0L); + + int threadCount = Runtime.getRuntime().availableProcessors() * 2; + ExecutorService executorService = Executors.newFixedThreadPool(threadCount); + ExecutorCompletionService completionService = new ExecutorCompletionService<>(executorService); + for (int i = 0; i < threadCount; i++) { + completionService.submit(new ReaderTask(channel, nextChunkPosition)); + } + + AggregatorMap aggregatorMap = completionService.take().get(); + for (int i = 1; i < threadCount; i++) { + aggregatorMap.combineWith(completionService.take().get()); + } + aggregatorMap.printResult(); + executorService.close(); + } + } +}