From e7ab90e3ac3fa371eaf8a120191f639d6007db0e Mon Sep 17 00:00:00 2001 From: Anthony Goubard Date: Wed, 10 Jan 2024 23:09:21 +0100 Subject: [PATCH] Implementation CalculateAverage_japplis of 1BRC from Anthony Goubard (#271) * Implementation CalculateAverage_japplis of 1BRC from Anthony Goubard (japplis). Local performance (7 years old desktop i7-6700K - 8 cores - 16GB) 26 seconds. For reference, Jamie Stansfield (isolgpus) is 23 seconds on my machine and 11s in your results. I've added the nbactions.xml to the .gitignore file. When you add in NetBeans options like --enable-preview to actions like debug file or run file, it creates this file. * Implementation CalculateAverage_japplis of 1BRC from Anthony Goubard (japplis). Local performance (7 years old desktop i7-6700K - 8 cores - 16GB) 26 seconds. For reference, Jamie Stansfield (isolgpus) is 23 seconds on my machine and 11s in your results. I've added the nbactions.xml to the .gitignore file. When you add in NetBeans options like --enable-preview to actions like debug file or run file, it creates this file. second commit: Removed BufferedInputStream and replaced Measurement with IntSummaryStatistics (thanks davecom): still 23" but cleaner code --- .gitignore | 1 + calculate_average_japplis.sh | 19 ++ .../onebrc/CalculateAverage_japplis.java | 244 ++++++++++++++++++ 3 files changed, 264 insertions(+) create mode 100755 calculate_average_japplis.sh create mode 100644 src/main/java/dev/morling/onebrc/CalculateAverage_japplis.java diff --git a/.gitignore b/.gitignore index 6357f90..6a1ef02 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,7 @@ bin/ # NetBeans nb-configuration.xml +nbactions.xml # Visual Studio Code .vscode diff --git a/calculate_average_japplis.sh b/calculate_average_japplis.sh new file mode 100755 index 0000000..0dae95c --- /dev/null +++ b/calculate_average_japplis.sh @@ -0,0 +1,19 @@ +#!/bin/bash +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +JAVA_OPTS="-Xmx2G" +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_japplis $* diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_japplis.java b/src/main/java/dev/morling/onebrc/CalculateAverage_japplis.java new file mode 100644 index 0000000..881e9cf --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_japplis.java @@ -0,0 +1,244 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.*; + +/** + * Maybe not the fastest but trying to get the most readable code for the performance. + * + * It allows: + * - pass another file as argument + * - the first lines can start with comments lines using '#' + * - the temperatures can have more than one fraction digit but it needs to be constant in the file + * - it does not require much RAM + * - Java 8 as minimal Java version + * Assumptions + * - No temperatures are above 100 or below -100 + * - the last character of the file is \n + * + * Changelog: + * - First local attempt with FileReader and TreeMap: Way too long + * - Switched to InputStream and ConcurrentHashMap: 23" + * - Added Semaphore to avoid OOMException: 23" + * - Replaced String with my own ByteText class: a bit slower (~10%) + * - Replaced compute lambda call with synchronized(city.intern()): 43" (due to intern()) + * - Removed BufferedInputStream and replaced Measurement with IntSummaryStatistics (thanks davecom): still 23" but cleaner code + * + * @author Anthony Goubard - Japplis + */ +public class CalculateAverage_japplis { + + private static final String DEFAULT_MEASUREMENT_FILE = "measurements.txt"; + private static final int BUFFER_SIZE = 5 * 1024 * 1024; // 5 MB + private static final int MAX_COMPUTE_THREADS = Runtime.getRuntime().availableProcessors(); + + private int precision = -1; + private int precisionLimitTenth; + + private Map cityMeasurementMap = new ConcurrentHashMap<>(); + private List previousBlockLastLine = new ArrayList<>(); + + private Semaphore readFileLock = new Semaphore(MAX_COMPUTE_THREADS); + + private void parseTemperatures(File measurementsFile) throws Exception { + try (InputStream measurementsFileIS = new FileInputStream(measurementsFile)) { + int readCount = BUFFER_SIZE; + ExecutorService threadPool = Executors.newFixedThreadPool(MAX_COMPUTE_THREADS); + List parseBlockTasks = new ArrayList<>(); + while (readCount > 0) { + byte[] buffer = new byte[BUFFER_SIZE]; + readCount = measurementsFileIS.read(buffer); + if (readCount > 0) { + readFileLock.acquire(); // Wait if all threads are busy + + // Process the block in a thread while the main thread continues to read the file + Future parseBlockTask = threadPool.submit(parseTemperaturesBlock(buffer, readCount)); + parseBlockTasks.add(parseBlockTask); + } + } + for (Future parseBlockTask : parseBlockTasks) // Wait for all tasks to finish + parseBlockTask.get(); + threadPool.shutdownNow(); + } + } + + private Runnable parseTemperaturesBlock(byte[] buffer, int readCount) { + int startIndex = handleSplitLine(buffer, readCount); + Runnable countAverageRun = () -> { + int bufferIndex = startIndex; + try { + while (bufferIndex < readCount) { + bufferIndex = readNextLine(bufferIndex, buffer); + } + } + catch (ArrayIndexOutOfBoundsException ex) { + // Done reading and parsing the buffer + } + readFileLock.release(); + }; + return countAverageRun; + } + + private int handleSplitLine(byte[] buffer, int readCount) { + int bufferIndex = readFirstLines(buffer); + List lastLine = new ArrayList<>(); // Store the last (partial) line of the block + int tailIndex = readCount; + if (tailIndex == buffer.length) { + byte car = buffer[--tailIndex]; + while (car != '\n') { + lastLine.add(0, car); + car = buffer[--tailIndex]; + } + } + if (previousBlockLastLine.isEmpty()) { + previousBlockLastLine = lastLine; + return bufferIndex; + } + bufferIndex = readSplitLine(buffer); + previousBlockLastLine = lastLine; + return bufferIndex; + } + + private int readSplitLine(byte[] buffer) { + int bufferIndex = 0; + byte car = buffer[bufferIndex++]; + while (car != '\n') { + previousBlockLastLine.add(car); + car = buffer[bufferIndex++]; + } + previousBlockLastLine.add((byte) '\n'); + byte[] splitLineBytes = new byte[previousBlockLastLine.size()]; + for (int i = 0; i < splitLineBytes.length; i++) { + splitLineBytes[i] = previousBlockLastLine.get(i); + } + readNextLine(0, splitLineBytes); + return bufferIndex; + } + + private int readFirstLines(byte[] buffer) { + if (precision >= 0) + return 0; // not the first lines of the file + int bufferIndex = 0; + while (buffer[bufferIndex] == '#') { // read comments (like in weather_stations.csv) + while (buffer[bufferIndex++] != '\n') { + } + } + int startIndex = bufferIndex; + int dotPos = bufferIndex; + byte car = buffer[bufferIndex++]; + while (car != '\n') { + if (car == '.') + dotPos = bufferIndex; + car = buffer[bufferIndex++]; + } + precision = bufferIndex - dotPos - 1; + int precisionLimit = (int) Math.pow(10, precision); + precisionLimitTenth = precisionLimit * 10; + return startIndex; + } + + private int readNextLine(int bufferIndex, byte[] buffer) { + int startLineIndex = bufferIndex; + while (buffer[bufferIndex] != ';') + bufferIndex++; + String city = new String(buffer, startLineIndex, bufferIndex - startLineIndex, StandardCharsets.UTF_8); + bufferIndex++; // skip ';' + int temperature = readTemperature(buffer, bufferIndex); + bufferIndex += precision + 3; // digit, dot and CR + if (temperature < 0) + bufferIndex++; + if (temperature <= -precisionLimitTenth || temperature >= precisionLimitTenth) + bufferIndex++; + addTemperature(city, temperature); + return bufferIndex; + } + + private int readTemperature(byte[] text, int measurementIndex) { + boolean negative = text[measurementIndex] == '-'; + if (negative) + measurementIndex++; + byte digitChar = text[measurementIndex++]; + int temperature = 0; + while (digitChar != '\n') { + temperature = temperature * 10 + (digitChar - '0'); + digitChar = text[measurementIndex++]; + if (digitChar == '.') + digitChar = text[measurementIndex++]; + } + if (negative) + temperature = -temperature; + return temperature; + } + + private void addTemperature(String city, int temperature) { + cityMeasurementMap.compute(city, (town, measurement) -> { + if (measurement == null) + measurement = new IntSummaryStatistics(); + measurement.accept(temperature); + return measurement; + }); + } + + private void printTemperatureStatsByCity() { + Set sortedCities = new TreeSet<>(cityMeasurementMap.keySet()); + StringBuilder result = new StringBuilder(cityMeasurementMap.size() * 40); + result.append('{'); + sortedCities.forEach(city -> { + IntSummaryStatistics measurement = cityMeasurementMap.get(city); + result.append(city); + result.append(getTemperatureStats(measurement)); + }); + result.delete(result.length() - 2, result.length()); + result.append('}'); + String temperaturesByCity = result.toString(); + System.out.println(temperaturesByCity); + } + + private String getTemperatureStats(IntSummaryStatistics measurement) { + StringBuilder stats = new StringBuilder(19); + stats.append('='); + appendTemperature(stats, measurement.getMin()); + stats.append('/'); + int average = (int) Math.round(measurement.getAverage()); + appendTemperature(stats, average); + stats.append('/'); + appendTemperature(stats, measurement.getMax()); + stats.append(", "); + return stats.toString(); + } + + private void appendTemperature(StringBuilder resultBuilder, int temperature) { + String temperatureAsText = String.valueOf(temperature); + int minCharacters = precision + (temperature < 0 ? 2 : 1); + for (int i = temperatureAsText.length(); i < minCharacters; i++) { + temperatureAsText = temperature < 0 ? "-0" + temperatureAsText.substring(1) : "0" + temperatureAsText; + } + resultBuilder.append(temperatureAsText.substring(0, temperatureAsText.length() - precision)); + resultBuilder.append('.'); + resultBuilder.append(temperatureAsText.substring(temperatureAsText.length() - precision)); + } + + public static final void main(String... args) throws Exception { + CalculateAverage_japplis cityTemperaturesCalculator = new CalculateAverage_japplis(); + String measurementFile = args.length == 1 ? args[0] : DEFAULT_MEASUREMENT_FILE; + cityTemperaturesCalculator.parseTemperatures(new File(measurementFile)); + cityTemperaturesCalculator.printTemperatureStatsByCity(); + } +}