From 473791e188dabe1979b5bae818223aee590b01f5 Mon Sep 17 00:00:00 2001 From: Pratham Date: Wed, 10 Jan 2024 16:40:27 -0500 Subject: [PATCH] first cut 1brc submission (#216) * phd3 initial entry * Optimize parsing doubles * Remove redundant check * Update calculate_average_phd3.sh --------- Co-authored-by: Gunnar Morling --- calculate_average_phd3.sh | 20 ++ .../morling/onebrc/CalculateAverage_phd3.java | 241 ++++++++++++++++++ 2 files changed, 261 insertions(+) create mode 100755 calculate_average_phd3.sh create mode 100644 src/main/java/dev/morling/onebrc/CalculateAverage_phd3.java diff --git a/calculate_average_phd3.sh b/calculate_average_phd3.sh new file mode 100755 index 0000000..c1e16ed --- /dev/null +++ b/calculate_average_phd3.sh @@ -0,0 +1,20 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# -agentpath:/Users/phd3/tools/async-profiler-2.9-macos/build/libasyncProfiler.so=start,event=cpu,file=profile.html +JAVA_OPTS="" +java $JAVA_OPTS -XX:+HeapDumpOnOutOfMemoryError --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_phd3 diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_phd3.java b/src/main/java/dev/morling/onebrc/CalculateAverage_phd3.java new file mode 100644 index 0000000..e3d1cdb --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_phd3.java @@ -0,0 +1,241 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import static java.nio.charset.StandardCharsets.*; +import static java.util.stream.Collectors.*; + +import java.io.File; +import java.io.RandomAccessFile; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +public class CalculateAverage_phd3 { + + private static final int NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2; + private static final String FILE = "./measurements.txt"; + private static final long FILE_SIZE = new File(FILE).length(); + private static final int CHUNK_SIZE = 65536 * 1024; + private static final int PADDING = 512; + private static final double[] POWERS_OF_10 = IntStream.range(0, 6).mapToDouble(x -> Math.pow(10.0, x)).toArray(); + + private static final Map globalMap = new ConcurrentHashMap<>(); + + private record ResultRow(double min, double mean, double max) { + + public String toString() { + return round(min) + "/" + round(mean) + "/" + round(max); + } + + private double round(double value) { + return Math.round(value * 10.0) / 10.0; + } + }; + + public static ResultRow resultRow(AggregationInfo aggregationInfo) { + return new ResultRow(aggregationInfo.min, aggregationInfo.sum / aggregationInfo.count, aggregationInfo.max); + } + + public static void main(String[] args) throws Exception { + long fileLength = new File(FILE).length(); + int numChunks = (int) Math.ceil(fileLength * 1.0 / CHUNK_SIZE); + ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS); + BufferDataProvider provider = new RandomAccessBasedProvider(FILE, FILE_SIZE); + for (int chunkIndex = 0; chunkIndex < numChunks; chunkIndex++) { + executorService.submit(new Aggregator(chunkIndex, provider)); + } + + executorService.shutdown(); + executorService.awaitTermination(10, TimeUnit.MINUTES); + + Map measurements = new TreeMap<>(globalMap.entrySet().stream() + .collect(toMap(Map.Entry::getKey, e -> resultRow(e.getValue())))); + + System.out.println(measurements); + } + + private static class AggregationInfo { + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + double sum; + long count; + + public AggregationInfo update(AggregationInfo update) { + this.count += update.count; + this.sum += update.sum; + if (this.max < update.max) { + this.max = update.max; + } + if (this.min > update.min) { + this.min = update.min; + } + return this; + } + + public AggregationInfo update(double value) { + this.count++; + this.sum += value; + if (this.max < value) { + this.max = value; + } + if (this.min > value) { + this.min = value; + } + return this; + } + } + + private interface BufferDataProvider { + int read(byte[] buffer, long offset) throws Exception; + } + + private static class RandomAccessBasedProvider implements BufferDataProvider { + private final String filePath; + private final long fileSize; + + RandomAccessBasedProvider(String filePath, long fileSize) { + this.filePath = filePath; + this.fileSize = fileSize; + } + + @Override + public int read(byte[] buffer, long offset) throws Exception { + RandomAccessFile file = null; + try { + file = new RandomAccessFile(filePath, "r"); + file.seek(offset); + return file.read(buffer); + } + finally { + if (file != null) { + file.close(); + } + } + } + } + + private static class Aggregator implements Runnable { + private final long startByte; + private final BufferDataProvider dataProvider; + + public Aggregator(long chunkIndex, BufferDataProvider dataProvider) { + this.startByte = chunkIndex * CHUNK_SIZE; + this.dataProvider = dataProvider; + } + + @Override + public void run() { + try { + // offset for the last byte to be processed (excluded) + long endByte = Math.min(startByte + CHUNK_SIZE, FILE_SIZE); + // read a little more than needed to cover next entry if needed + long bufferSize = endByte - startByte + ((endByte == FILE_SIZE) ? 0 : PADDING); + byte[] buffer = new byte[(int) bufferSize]; + int bytes = dataProvider.read(buffer, startByte); + // Partial aggregation to avoid accessing global concurrent map for every entry + Map updated = processBuffer( + buffer, startByte == 0, endByte - startByte); + // Full aggregation with global map + updated.entrySet().forEach(entry -> { + globalMap.compute(entry.getKey(), (k, v) -> { + if (v == null) { + return entry.getValue(); + } + return v.update(entry.getValue()); + }); + }); + } + catch (Throwable e) { + throw new RuntimeException(e); + } + } + + private static Map processBuffer(byte[] buffer, boolean isFileStart, long nextChunkStart) { + int start = 0; + // Move to the next entry after '\n'. Don't do this if we're at the start of + // the file to avoid missing first entry. + if (!isFileStart) { + while (buffer[start] != '\n') { + start++; + } + start += 1; + } + + // local map for this thread, don't need thread safety + Map chunkMap = new HashMap<>(); + while (true) { + LineInfo lineInfo = getNextLine(buffer, start); + String key = new String(buffer, start, lineInfo.semicolonIndex - start); + double value = parseDouble(buffer, lineInfo.semicolonIndex + 1, lineInfo.nextStart - 1); + update(chunkMap, key, value); + + if ((lineInfo.nextStart > nextChunkStart) || (lineInfo.nextStart >= buffer.length)) { + // we are already at a point where the next line will be processed in the next chunk, + // so the job is done here + break; + } + + start = lineInfo.nextStart(); + } + return chunkMap; + } + + private static double parseDouble(byte[] bytes, int offset, int end) { + boolean negative = (bytes[offset] == '-'); + int current = negative ? offset + 1 : offset; + int preFloat = 0; + while (current < end && bytes[current] != '.') { + preFloat = (preFloat * 10) + (bytes[current++] - '0'); + } + current++; + int postFloatStart = current; + int postFloat = 0; + while (current < end) { + postFloat = (postFloat * 10) + (bytes[current++] - '0'); + } + + return (preFloat + ((postFloat) / POWERS_OF_10[end - postFloatStart])) * (negative ? -1 : 1); + } + + private static void update(Map state, String key, double value) { + AggregationInfo info = state.computeIfAbsent(key, k -> new AggregationInfo()); + info.update(value); + } + + // identifies indexes of the next ';' and '\n', which will be used to get entry key and value from line + private static LineInfo getNextLine(byte[] buffer, int start) { + // caller guarantees that the access is in bounds, so no index check + while (buffer[start] != ';') { + start++; + } + int semicolonIndex = start; + // caller guarantees that the access is in bounds, so no index check + while (buffer[start] != '\n') { + start++; + } + return new LineInfo(semicolonIndex, start + 1); + } + } + + private record LineInfo(int semicolonIndex, int nextStart) { + } +}