From 30987d778c4be47f975af275f9cb121cb3149fe5 Mon Sep 17 00:00:00 2001 From: Jesse Van Rooy Date: Sun, 14 Jan 2024 19:09:58 +0100 Subject: [PATCH] CalculateAverage_JesseVanRooy (Submission 1) (#335) * Submission #1 * Submission #1 (Fixed casing of file names) * Submission #1 (Added executable to Git permissions) * Submission 1 (Fixed incorrect map size) * Submission 1 (Fixed output problems on Windows) --- calculate_average_JesseVanRooy.sh | 20 ++ .../onebrc/CalculateAverage_JesseVanRooy.java | 256 ++++++++++++++++++ 2 files changed, 276 insertions(+) create mode 100755 calculate_average_JesseVanRooy.sh create mode 100644 src/main/java/dev/morling/onebrc/CalculateAverage_JesseVanRooy.java diff --git a/calculate_average_JesseVanRooy.sh b/calculate_average_JesseVanRooy.sh new file mode 100755 index 0000000..c680e97 --- /dev/null +++ b/calculate_average_JesseVanRooy.sh @@ -0,0 +1,20 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +JAVA_OPTS="--enable-preview -XX:-TieredCompilation -Dsun.stdout.encoding=UTF-8" +java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_JesseVanRooy diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_JesseVanRooy.java b/src/main/java/dev/morling/onebrc/CalculateAverage_JesseVanRooy.java new file mode 100644 index 0000000..ba0475e --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_JesseVanRooy.java @@ -0,0 +1,256 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import sun.misc.Unsafe; + +import java.io.IOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.lang.reflect.Field; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.*; +import java.util.stream.IntStream; + +//Disclaimer: The idea from the segmentation into #core amount of chunks came from previously submitted solutions. +public class CalculateAverage_JesseVanRooy { + + private static final String FILE = "./measurements.txt"; + + private static final ValueLayout.OfByte DATA_LAYOUT = ValueLayout.JAVA_BYTE; + + private static final Unsafe UNSAFE = initUnsafe(); + + private static Unsafe initUnsafe() { + try { + Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); + theUnsafe.setAccessible(true); + return (Unsafe) theUnsafe.get(Unsafe.class); + } + catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + public static class Result { + long nameStart; + long nameSize; + String name; + int min; + int max; + long sum; + int count; + + double min() { + return min / 10.0; + } + + double max() { + return max / 10.0; + } + + double mean() { + return (sum / 10.0) / count; + } + } + + public static class ThreadResult { + Result[] results; + } + + static final int MAP_SIZE = 16384; + static final int MAP_MASK = MAP_SIZE - 1; + static final int VALUE_CAPACITY = 10000; + + static void process(MemorySegment memorySegment, ThreadResult threadResult) { + // initialize hash table + final int[] keys = new int[MAP_SIZE]; + Arrays.fill(keys, -1); + final Result[] values = new Result[MAP_SIZE]; + + // pre-create the result objects + final Result[] preCreatedResults = new Result[VALUE_CAPACITY]; + int usedPreCreatedResults = 0; + for (int i = 0; i < VALUE_CAPACITY; i++) + preCreatedResults[i] = new Result(); + + // load address info + final long size = memorySegment.byteSize(); + final long address = memorySegment.address(); + final long end = address + size; + + for (long index = address; index < end;) { + final long nameStart = index; + + byte next = UNSAFE.getByte(index); + + // hash the city name + int hash = 0; + while (next != ';') { + hash = (hash * 33) + next; + + index++; + next = UNSAFE.getByte(index); + } + + final long nameEnd = index; + + // skip the separator + index++; + next = UNSAFE.getByte(index); + + // check for negative + boolean negative = next == '-'; + if (negative) { + index++; + next = UNSAFE.getByte(index); + } + + // count the temperature + int temperature = next - '0'; + index++; + next = UNSAFE.getByte(index); + + if (next != '.') { + temperature = (temperature * 10) + (next - '0'); + index++; + } + + // skip the . + index++; + next = UNSAFE.getByte(index); + + // add the last digit to temperature + temperature = (temperature * 10) + (next - '0'); + index++; + + // negate the temperature if needed + if (negative) { + temperature = -temperature; + } + + // skip the newline + index++; + + // insert into map + for (int i = hash; i < hash + MAP_SIZE; i++) { + int mapIndex = i & MAP_MASK; + if (keys[mapIndex] == -1) { + Result result = preCreatedResults[usedPreCreatedResults++]; + result.nameStart = nameStart; + result.nameSize = nameEnd - nameStart; + result.min = temperature; + result.max = temperature; + result.sum = temperature; + result.count = 1; + + keys[mapIndex] = hash; + values[mapIndex] = result; + break; + } + if (keys[mapIndex] == hash) { + Result result = values[mapIndex]; + result.min = Math.min(result.min, temperature); + result.max = Math.max(result.max, temperature); + result.sum += temperature; + result.count++; + break; + } + } + } + + threadResult.results = Arrays.stream(values).filter(Objects::nonNull).toArray(Result[]::new); + + for (Result result : threadResult.results) { + result.name = new String(memorySegment.asSlice(result.nameStart - address, result.nameSize).toArray(DATA_LAYOUT)); + } + } + + public static void main(String[] args) throws IOException, InterruptedException { + int numberOfChunks = Runtime.getRuntime().availableProcessors(); + + try (var fileChannel = FileChannel.open(Path.of(FILE), StandardOpenOption.READ)) { + + long fileSize = fileChannel.size(); + MemorySegment allData = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize, Arena.global()); + + long segmentSize = (fileSize + numberOfChunks - 1) / numberOfChunks; + long[] segmentBounds = new long[numberOfChunks + 1]; + + segmentBounds[0] = 0; + for (int i = 1; i < numberOfChunks; i++) { + long chunkAddress = i * segmentSize; + while (chunkAddress < fileSize && allData.getAtIndex(DATA_LAYOUT, chunkAddress++) != '\n') { + } + segmentBounds[i] = Math.min(chunkAddress, fileSize); + } + segmentBounds[numberOfChunks] = fileSize; + + ThreadResult[] threadResults = IntStream.range(0, numberOfChunks) + .parallel() + .mapToObj(i -> { + long size = segmentBounds[i + 1] - segmentBounds[i]; + long offset = segmentBounds[i]; + MemorySegment segment = allData.asSlice(offset, size); + ThreadResult result = new ThreadResult(); + process(segment, result); + return result; + }) + .toArray(ThreadResult[]::new); + + HashMap combinedResults = new HashMap<>(1024); + + for (int i = 0; i < numberOfChunks; i++) { + for (Result result : threadResults[i].results) { + if (!combinedResults.containsKey(result.name)) { + Result newResult = new Result(); + newResult.name = result.name; + newResult.min = result.min; + newResult.max = result.max; + newResult.sum = result.sum; + newResult.count = result.count; + combinedResults.put(result.name, newResult); + } + else { + Result existingResult = combinedResults.get(result.name); + existingResult.min = Math.min(existingResult.min, result.min); + existingResult.max = Math.max(existingResult.max, result.max); + existingResult.sum += result.sum; + existingResult.count += result.count; + } + } + } + + Result[] sortedResults = combinedResults.values().toArray(Result[]::new); + Arrays.sort(sortedResults, Comparator.comparing(result -> result.name)); + + System.out.print("{"); + + for (int i = 0; i < sortedResults.length; i++) { + Result sortedResult = sortedResults[i]; + if (i != 0) { + System.out.print(", "); + } + System.out.printf(Locale.US, "%s=%.1f/%.1f/%.1f", sortedResult.name, sortedResult.min(), sortedResult.mean(), sortedResult.max()); + } + + System.out.printf("}\n"); + } + } +}