From 6eb8b49b7c4fdcfd7bda08feb0303343f5848b0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruno=20F=C3=A9lix?= Date: Sat, 6 Jan 2024 11:27:30 +0100 Subject: [PATCH] Implementation of 1brc - felix19350 (#53) * Implementation of 1brc - felix19350 * Added license header * Fixed failing tests * Replaced parsing of doubles with a custom parser and integer arithmetic --------- Co-authored-by: Bruno Felix --- calculate_average_felix19350.sh | 20 ++ .../onebrc/CalculateAverage_felix19350.java | 237 ++++++++++++++++++ 2 files changed, 257 insertions(+) create mode 100755 calculate_average_felix19350.sh create mode 100644 src/main/java/dev/morling/onebrc/CalculateAverage_felix19350.java diff --git a/calculate_average_felix19350.sh b/calculate_average_felix19350.sh new file mode 100755 index 0000000..5e98ce1 --- /dev/null +++ b/calculate_average_felix19350.sh @@ -0,0 +1,20 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +JAVA_OPTS="--enable-preview -XX:+UseParallelGC -Xms4g -Xmx4g" +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_felix19350 diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_felix19350.java b/src/main/java/dev/morling/onebrc/CalculateAverage_felix19350.java new file mode 100644 index 0000000..c54976d --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_felix19350.java @@ -0,0 +1,237 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class CalculateAverage_felix19350 { + private static final String FILE = "./measurements.txt"; + private static final int NEW_LINE_SEEK_BUFFER_LEN = 128; + + private static class ResultRow { + + private int min; + private int max; + private int sum; + private int count; + + public ResultRow(int initialValue) { + this.min = initialValue; + this.max = initialValue; + this.sum = initialValue; + this.count = 1; + } + + public void mergeValue(int value) { + this.min = Math.min(this.min, value); + this.max = Math.max(this.max, value); + this.sum += value; + this.count += 1; + } + + public String toString() { + return round(min / 10.0) + "/" + round(sum / 10.0 / count) + "/" + round(max / 10.0); + } + + private double round(double value) { + return Math.round(value * 10.0) / 10.0; + } + + public void mergeResult(ResultRow value) { + min = Math.min(min, value.min); + max = Math.max(max, value.max); + sum += value.sum; + count += value.count; + } + } + + private record AverageAggregatorTask(MemorySegment memSegment) { + + public static Stream createStreamOf(List memorySegments) { + return memorySegments.stream().map(AverageAggregatorTask::new); + } + + public Map processChunk() { + final var result = new TreeMap(); + var offset = 0L; + var lineStart = 0L; + while (offset < memSegment.byteSize()) { + byte nextByte = memSegment.get(ValueLayout.OfByte.JAVA_BYTE, offset); + if ((char) nextByte == '\n') { + this.processLine(result, memSegment.asSlice(lineStart, (offset - lineStart)).asByteBuffer()); + lineStart = offset + ValueLayout.JAVA_BYTE.byteSize(); + } + offset += ValueLayout.OfByte.JAVA_BYTE.byteSize(); + } + + return result; + } + + private void processLine(Map result, ByteBuffer lineBytes) { + var separatorIdx = -1; + for (int i = 0; i < lineBytes.limit(); i++) { + if ((char) lineBytes.get() == ';') { + separatorIdx = i; + lineBytes.clear(); + break; + } + } + assert (separatorIdx > 0); + + var valueCapacity = lineBytes.capacity() - (separatorIdx + 1); + var cityBytes = new byte[separatorIdx]; + var valueBytes = new byte[valueCapacity]; + lineBytes.get(cityBytes, 0, separatorIdx); + lineBytes.get(separatorIdx + 1, valueBytes); + + var city = new String(cityBytes, StandardCharsets.UTF_8); + var value = parseInt(valueBytes); + + var latestValue = result.get(city); + if (latestValue != null) { + latestValue.mergeValue(value); + } else { + result.put(city, new ResultRow(value)); + } + } + + private static int parseInt(byte[] valueBytes) { + int multiplier = 1; + int digitValue = 0; + var numDigits = valueBytes.length-1; // there is always one decimal place + var ds = new int[]{1,10,100}; + + for (byte valueByte : valueBytes) { + switch ((char) valueByte) { + case '-': + multiplier = -1; + numDigits -= 1; + break; + case '.': + break; + default: + digitValue += ((int) valueByte - 48) * (ds[numDigits - 1]); + numDigits -= 1; + break;// TODO continue here + } + } + return multiplier*digitValue; + } + } + + public static void main(String[] args) throws IOException { + // memory map the files and divide by number of cores + var numProcessors = Runtime.getRuntime().availableProcessors(); + var memorySegments = calculateMemorySegments(numProcessors); + var tasks = AverageAggregatorTask.createStreamOf(memorySegments); + assert (memorySegments.size() == numProcessors); + + try (var pool = Executors.newFixedThreadPool(numProcessors)) { + var results = tasks + .parallel() + .map(task -> CompletableFuture.supplyAsync(task::processChunk, pool)) + .map(CompletableFuture::join) + .reduce(new TreeMap<>(), (partialMap, accumulator) -> { + partialMap.forEach((key, value) -> { + var prev = accumulator.get(key); + if (prev == null) { + accumulator.put(key, value); + } + else { + prev.mergeResult(value); + } + }); + return accumulator; + }); + + System.out.print("{"); + String output = results.keySet() + .stream() + .sorted() + .map(key -> key + "=" + results.get(key).toString()) + .collect(Collectors.joining(", ")); + System.out.print(output); + System.out.println("}"); + } + } + + private static List calculateMemorySegments(int numChunks) throws IOException { + try (RandomAccessFile raf = new RandomAccessFile(FILE, "r")) { + var result = new ArrayList(numChunks); + var chunks = new ArrayList(numChunks); + + var fileSize = raf.length(); + var chunkSize = fileSize / numChunks; + + for (int i = 0; i < numChunks; i++) { + var previousChunkEnd = i == 0 ? 0L : chunks.get(i - 1)[1]; + if (previousChunkEnd >= fileSize) { + // There is a scenario for very small files where the number of chunks may be greater than + // the number of lines. + break; + } + var chunk = new long[]{ previousChunkEnd, 0 }; + if (i == (numChunks - 1)) { + // ensure the last chunk covers the full file + chunk[1] = fileSize; + } + else { + // all other chunks are end at a new line (\n) + var theoreticalEnd = previousChunkEnd + chunkSize; + var buffer = new byte[NEW_LINE_SEEK_BUFFER_LEN]; + raf.seek(theoreticalEnd); + raf.read(buffer, 0, NEW_LINE_SEEK_BUFFER_LEN); + + var newLineOffset = 0; + for (byte b : buffer) { + newLineOffset += 1; + if ((char) b == '\n') { + break; + } + } + chunk[1] = Math.min(fileSize, theoreticalEnd + newLineOffset); + } + + assert (chunk[0] >= 0L); + assert (chunk[0] <= fileSize); + assert (chunk[1] > chunk[0]); + assert (chunk[1] <= fileSize); + + var memMappedFile = raf.getChannel() + .map(FileChannel.MapMode.READ_ONLY, chunk[0], (chunk[1] - chunk[0]), Arena.ofAuto()); + memMappedFile.load(); + chunks.add(chunk); + result.add(memMappedFile); + } + return result; + } + } +}