From b0d381c91e650e2dc799969ce38546c000646682 Mon Sep 17 00:00:00 2001 From: maeda6uiui <63230315+maeda6uiui@users.noreply.github.com> Date: Mon, 8 Jan 2024 18:22:03 +0900 Subject: [PATCH] Add code by maeda6uiui --- calculate_average_maeda6uiui.sh | 23 + .../onebrc/CalculateAverage_maeda6uiui.java | 485 ++++++++++++++++++ 2 files changed, 508 insertions(+) create mode 100755 calculate_average_maeda6uiui.sh create mode 100644 src/main/java/dev/morling/onebrc/CalculateAverage_maeda6uiui.java diff --git a/calculate_average_maeda6uiui.sh b/calculate_average_maeda6uiui.sh new file mode 100755 index 0000000..a261e90 --- /dev/null +++ b/calculate_average_maeda6uiui.sh @@ -0,0 +1,23 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Uncomment below to use sdk +# source "$HOME/.sdkman/bin/sdkman-init.sh" +# sdk use java 21.0.1-graal 1>&2 + +JAVA_OPTS="" +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_maeda6uiui diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_maeda6uiui.java b/src/main/java/dev/morling/onebrc/CalculateAverage_maeda6uiui.java new file mode 100644 index 0000000..244fc4c --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_maeda6uiui.java @@ -0,0 +1,485 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class CalculateAverage_maeda6uiui { + record RecordCollectorResult( + Map mins, + Map maxes, + Map sums, + Map counts) { + + } + + static class RecordCollector implements Callable { + private String inputFilepath; + private long startByteIndex; + private long numBytesToRead; + private char delimiter; + private int byteBufferSize; + private int bisBufferSize; + + private Map mins; + private Map maxes; + private Map sums; + private Map counts; + + public RecordCollector( + String inputFilepath, + long startByteIndex, + long numBytesToRead, + char delimiter, + int byteBufferSize, + int bisBufferSize) { + this.inputFilepath = inputFilepath; + this.startByteIndex = startByteIndex; + this.numBytesToRead = numBytesToRead; + this.delimiter = delimiter; + this.byteBufferSize = byteBufferSize; + this.bisBufferSize = bisBufferSize; + + mins = new HashMap<>(); + maxes = new HashMap<>(); + sums = new HashMap<>(); + counts = new HashMap<>(); + } + + private int byteToInt(byte b) { + return switch (b) { + case '0' -> 0; + case '1' -> 1; + case '2' -> 2; + case '3' -> 3; + case '4' -> 4; + case '5' -> 5; + case '6' -> 6; + case '7' -> 7; + case '8' -> 8; + case '9' -> 9; + default -> -1; + }; + } + + private double parseDouble(byte[] bs) { + // Get the sign + int valSign; + if (bs[0] == '-') { + valSign = -1; + } + else { + valSign = 1; + } + + // Get the dot position + int dotPos = -1; + for (int i = 0; i < bs.length; i++) { + if (bs[i] == '.') { + dotPos = i; + break; + } + } + if (dotPos == -1) { + return Double.NaN; + } + + // Get the integer part + int valIntPart; + int intPartStartIndex = (valSign == -1) ? 1 : 0; + int intPartLength = dotPos - intPartStartIndex; + + // One-digit value + if (intPartLength == 1) { + valIntPart = this.byteToInt(bs[dotPos - 1]); + } + // Two-digit value + else if (intPartLength == 2) { + int valTens = this.byteToInt(bs[dotPos - 2]); + int valOnes = this.byteToInt(bs[dotPos - 1]); + valIntPart = valTens * 10 + valOnes; + } + else { + return Double.NaN; + } + + // Get the decimal part + double valDecPart = this.byteToInt(bs[dotPos + 1]) * 0.1; + + return valSign * (valIntPart + valDecPart); + } + + @Override + public RecordCollectorResult call() { + // Start and end indices are most likely pointing to the middle of a line + // Therefore, actual start and end indices should be determined + // before proceeding to actual reading of the file + long actualStartByteIndex = -1; + long actualEndByteIndex = -1; + + try (var bis = new BufferedInputStream(new FileInputStream(inputFilepath))) { + int b; + int readCount = 0; + long firstLFPos; + + // If start index specified is 0, actual start index is also 0 + if (startByteIndex == 0) { + actualStartByteIndex = 0; + } + else { + // Skip until the preceding byte of the start index specified + bis.skipNBytes(startByteIndex - 1); + + // Get the preceding byte + b = bis.read(); + + // If the preceding byte is LF, + // actual start index is the start index specified + // because it is the start of a new line + if (b == '\n') { + actualStartByteIndex = startByteIndex; + } + } + + if (actualStartByteIndex != -1) { + // Skip until the end byte specified + bis.skipNBytes(numBytesToRead); + } + // Start index specified is pointing to the middle of a line + // In that case, actual start index is the one following the LF of that line + // (Start index of the next line) + else { + firstLFPos = startByteIndex; + while ((b = bis.read()) != -1) { + readCount++; + if (b == '\n') { + break; + } + + firstLFPos++; + } + actualStartByteIndex = firstLFPos + 1; + + // Skip until the end byte specified + bis.skipNBytes(numBytesToRead - readCount); + } + + // Actual end index is the first LF encountered + readCount = 0; + firstLFPos = startByteIndex + numBytesToRead; + while ((b = bis.read()) != -1) { + readCount++; + if (b == '\n') { + break; + } + + firstLFPos++; + } + actualEndByteIndex = firstLFPos; + } + catch (IOException e) { + System.err.println(e); + return null; + } + + // Get actual number of bytes to read + long actualNumBytesToRead = actualEndByteIndex - actualStartByteIndex + 1; + + // Read bytes from the range obtained above + try (var bis = new BufferedInputStream(new FileInputStream(inputFilepath), bisBufferSize)) { + // Skip until the start byte + bis.skipNBytes(actualStartByteIndex); + + final int EXTENSION_SIZE = 64; + var buffer = new byte[byteBufferSize]; + var extendedBuffer = new byte[byteBufferSize + EXTENSION_SIZE]; + + // Read bytes in chunk + long numTotalBytesRead = 0; + while (true) { + int chunkSize; + if (actualNumBytesToRead - numTotalBytesRead < byteBufferSize) { + chunkSize = (int) (actualNumBytesToRead - numTotalBytesRead); + } + else { + chunkSize = byteBufferSize; + } + + if (chunkSize <= 0) { + break; + } + + Arrays.fill(buffer, (byte) 0); + bis.read(buffer, 0, chunkSize); + numTotalBytesRead += chunkSize; + + // Copy read content to another buffer + Arrays.fill(extendedBuffer, (byte) 0); + System.arraycopy(buffer, 0, extendedBuffer, 0, chunkSize); + + // Read until next LF is found + // if end of buffer read above does not correspond to end of line + for (int i = 0; i < EXTENSION_SIZE; i++) { + int b = bis.read(); + if (b == -1) { + break; + } + else if (b == '\n') { + extendedBuffer[chunkSize + i] = '\n'; + numTotalBytesRead++; + break; + } + + extendedBuffer[chunkSize + i] = (byte) b; + numTotalBytesRead++; + } + + int currentDelimPos = -1; + int currentLFPos = -1; + int nextLineStartPos = 0; + + for (int i = 0; i < extendedBuffer.length; i++) { + if (extendedBuffer[i] == 0) { + break; + } + + if (extendedBuffer[i] == delimiter) { + currentDelimPos = i; + } + else if (extendedBuffer[i] == '\n') { + currentLFPos = i; + } + + if (currentLFPos != -1) { + // Error + if (currentDelimPos == -1) { + System.err.printf( + "Error near byte index %d\n", + actualStartByteIndex + numTotalBytesRead); + } + else { + String stationName = new String( + Arrays.copyOfRange(extendedBuffer, nextLineStartPos, currentDelimPos)); + + // Parse string to double by myself + // because Double.parseDouble() is slow... + double temperature = this.parseDouble( + Arrays.copyOfRange(extendedBuffer, currentDelimPos + 1, currentLFPos)); + + // Populate the maps + if (!mins.containsKey(stationName)) { + mins.put(stationName, temperature); + maxes.put(stationName, temperature); + sums.put(stationName, temperature); + counts.put(stationName, 1); + } + else { + double currentMin = mins.get(stationName); + double currentMax = maxes.get(stationName); + double currentSum = sums.get(stationName); + int currentCount = counts.get(stationName); + + if (temperature < currentMin) { + mins.put(stationName, temperature); + } + else if (temperature > currentMax) { + maxes.put(stationName, temperature); + } + + sums.put(stationName, currentSum + temperature); + counts.put(stationName, currentCount + 1); + } + } + + nextLineStartPos = currentLFPos + 1; + currentDelimPos = -1; + currentLFPos = -1; + } + } + } + } + catch (IOException e) { + System.err.println(e); + return null; + } + + return new RecordCollectorResult(mins, maxes, sums, counts); + } + } + + private static double round(double d) { + return Math.round(d * 10.0) / 10.0; + } + + public static void main(String[] args) { + final String INPUT_FILEPATH = "./measurements.txt"; + final int DESIRED_NUM_THREADS = 20; + final char DELIMITER = ';'; + final int BIS_BUFFER_SIZE = 1024 * 1024; + final int BYTE_BUFFER_SIZE = 1024; + final int MULTI_THREAD_NUM_LINES_THRESHOLD = DESIRED_NUM_THREADS * 10; + + // First get the number of total bytes in the input file + long numTotalBytes; + try { + numTotalBytes = Files.size(Paths.get(INPUT_FILEPATH)); + } + catch (IOException e) { + e.printStackTrace(); + return; + } + + // Make sure the input file has enough lines + // for this multithreading approach to work efficiently + int actualNumThreads = 1; + try (var br = new BufferedReader(new FileReader(INPUT_FILEPATH))) { + int lineCount = 0; + while (br.readLine() != null) { + lineCount++; + if (lineCount >= MULTI_THREAD_NUM_LINES_THRESHOLD) { + actualNumThreads = DESIRED_NUM_THREADS; + break; + } + } + } + catch (IOException e) { + e.printStackTrace(); + return; + } + + // Calculate the number of bytes each thread has to process + long numBytesToProcessPerThread = numTotalBytes / actualNumThreads; + long remainingNumBytesToProcess = numTotalBytes % actualNumThreads; + + var exec = Executors.newFixedThreadPool(actualNumThreads); + + var futures = new ArrayList>(); + for (int i = 0; i < actualNumThreads; i++) { + RecordCollector recordCollector; + if (i == actualNumThreads - 1) { + recordCollector = new RecordCollector( + INPUT_FILEPATH, + i * numBytesToProcessPerThread, + numBytesToProcessPerThread + remainingNumBytesToProcess, + DELIMITER, + BYTE_BUFFER_SIZE, + BIS_BUFFER_SIZE); + } + else { + recordCollector = new RecordCollector( + INPUT_FILEPATH, + i * numBytesToProcessPerThread, + numBytesToProcessPerThread, + DELIMITER, + BYTE_BUFFER_SIZE, + BIS_BUFFER_SIZE); + } + + Future future = exec.submit(recordCollector); + futures.add(future); + } + + // Consolidate results of each thread + var mins = new HashMap(); + var maxes = new HashMap(); + var sums = new HashMap(); + var counts = new HashMap(); + try { + for (var future : futures) { + RecordCollectorResult result = future.get(); + + result.mins.forEach((k, v) -> { + if (!mins.containsKey(k)) { + mins.put(k, v); + } + else { + mins.put(k, Double.min(v, mins.get(k))); + } + }); + result.maxes.forEach((k, v) -> { + if (!maxes.containsKey(k)) { + maxes.put(k, v); + } + else { + maxes.put(k, Double.max(v, maxes.get(k))); + } + }); + result.sums.forEach((k, v) -> { + if (!sums.containsKey(k)) { + sums.put(k, v); + } + else { + sums.put(k, Double.sum(v, sums.get(k))); + } + }); + result.counts.forEach((k, v) -> { + if (!counts.containsKey(k)) { + counts.put(k, v); + } + else { + counts.put(k, Integer.sum(v, counts.get(k))); + } + }); + } + } + catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + return; + } + finally { + exec.shutdown(); + } + + // Calculate means + var means = new HashMap(); + sums.forEach((k, v) -> means.put(k, v / counts.get(k))); + + // Sort station names + List sortedStationNames = means + .keySet() + .stream() + .sorted() + .toList(); + + // Create output string + var sb = new StringBuilder(); + + sb.append("{"); + sortedStationNames.forEach(stationName -> { + sb + .append(stationName) + .append("=") + .append(round(mins.get(stationName))) + .append("/") + .append(round(means.get(stationName))) + .append("/") + .append(round(maxes.get(stationName))) + .append(", "); + }); + sb.delete(sb.length() - 2, sb.length()); + sb.append("}"); + + // Print result string + System.out.println(sb); + } +} \ No newline at end of file