diff --git a/calculate_average_EduardoSaverin.sh b/calculate_average_EduardoSaverin.sh new file mode 100755 index 0000000..d94e7f7 --- /dev/null +++ b/calculate_average_EduardoSaverin.sh @@ -0,0 +1,19 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +JAVA_OPTS="--enable-preview" +java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_EduardoSaverin diff --git a/prepare_EduardoSaverin.sh b/prepare_EduardoSaverin.sh new file mode 100755 index 0000000..4cda7b4 --- /dev/null +++ b/prepare_EduardoSaverin.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Uncomment below to use sdk +# source "$HOME/.sdkman/bin/sdkman-init.sh" +# sdk use java 21.0.1-graal 1>&2 diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_EduardoSaverin.java b/src/main/java/dev/morling/onebrc/CalculateAverage_EduardoSaverin.java new file mode 100644 index 0000000..e33e4cf --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_EduardoSaverin.java @@ -0,0 +1,319 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import sun.misc.Unsafe; + +import java.io.IOException; +import java.lang.foreign.Arena; +import java.lang.reflect.Field; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static java.nio.file.StandardOpenOption.READ; + +public class CalculateAverage_EduardoSaverin { + private static final Path FILE = Path.of("./measurements.txt"); + private static final int NO_OF_THREADS = Runtime.getRuntime().availableProcessors(); + private static final Unsafe UNSAFE = initUnsafe(); + private static final int FNV_32_OFFSET = 0x811c9dc5; + private static final int FNV_32_PRIME = 0x01000193; + private static final Map resultRowMap = new HashMap<>(); + private static final Lock lock = new ReentrantLock(); + + private static Unsafe initUnsafe() { + try { + Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); + theUnsafe.setAccessible(true); + return (Unsafe) theUnsafe.get(Unsafe.class); + } + catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + public record Chunk(long start, long length) { + } + + record MapEntry(String key, ResultRow row) { + } + + private static final class ResultRow { + private double min; + private double max; + private double sum; + private int count; + + private ResultRow(double v) { + this.min = v; + this.max = v; + this.sum = v; + this.count = 1; + } + + public String toString() { + return round(min) + "/" + round(sum / count) + "/" + round(max); + } + + private double round(double value) { + return Math.round(value) / 10.0; + } + } + + /** + * 0xA - Represents New Line + * + * @param fileChannel + * @return + * @throws IOException + */ + static List getChunks(FileChannel fileChannel) throws IOException { + int numThreads = 1; + if (fileChannel.size() > 64000) { + numThreads = NO_OF_THREADS; + } + final long fileBytes = fileChannel.size(); + final long chunkSize = fileBytes / numThreads; + final List chunks = new ArrayList<>(numThreads); + final long mappedAddress = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileBytes, Arena.global()).address(); + long chunkStart = 0; + // Ensures that the chunk size does not exceed the remaining bytes in the file. + long chunkLength = Math.min(fileBytes - chunkStart - 1, chunkSize); + while (chunkStart < fileBytes) { + MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, chunkStart + chunkLength, + Math.min(Math.min(fileBytes - chunkStart - chunkLength, chunkLength), 100)); + // Until \n found + while (mappedByteBuffer.get() != 0xA) { + chunkLength++; + } + chunks.add(new Chunk(mappedAddress + chunkStart, chunkLength + 1)); + chunkStart += (chunkLength + 1); + chunkLength = Math.min(fileBytes - chunkStart - 1, chunkSize); + } + return chunks; + } + + static class SimplerHashMap { + final int MAPSIZE = 65536; + final ResultRow[] slots = new ResultRow[MAPSIZE]; + final byte[][] keys = new byte[MAPSIZE][]; + + public void putOrMerge(final byte[] key, final short length, final int hash, final int temp) { + int slot = hash; + ResultRow slotValue; + + // Doing Linear Probing if Collision + while ((slotValue = slots[slot]) != null && (keys[slot].length != length || !unsafeEquals(keys[slot], key, length))) { + slot++; + } + + // Existing Key + if (slotValue != null) { + slotValue.min = Math.min(slotValue.min, temp); + slotValue.max = Math.max(slotValue.max, temp); + slotValue.sum += temp; + slotValue.count++; + return; + } + + // New Key + slots[slot] = new ResultRow(temp); + byte[] bytes = new byte[length]; + System.arraycopy(key, 0, bytes, 0, length); + keys[slot] = bytes; + } + + static boolean unsafeEquals(final byte[] a, final byte[] b, final short length) { + // byte by byte comparisons are slow, so do as big chunks as possible + final int baseOffset = Unsafe.ARRAY_BYTE_BASE_OFFSET; + + short i = 0; + // Double + for (; i < (length & -8); i += 8) { + if (UNSAFE.getDouble(a, i + baseOffset) != UNSAFE.getDouble(b, i + baseOffset)) { + return false; + } + } + + // Long + for (; i < (length & -8); i += 8) { + if (UNSAFE.getLong(a, i + baseOffset) != UNSAFE.getLong(b, i + baseOffset)) { + return false; + } + } + if (i == length) { + return true; + } + // Int + for (; i < (length - i & -4); i += 4) { + if (UNSAFE.getInt(a, i + baseOffset) != UNSAFE.getInt(b, i + baseOffset)) { + return false; + } + } + if (i == length) { + return true; + } + // Short + for (; i < (length - i & -2); i += 2) { + if (UNSAFE.getShort(a, i + baseOffset) != UNSAFE.getShort(b, i + baseOffset)) { + return false; + } + } + if (i == length) { + return true; + } + // Byte + for (; i < (length - i); i++) { + if (UNSAFE.getByte(a, i + baseOffset) != UNSAFE.getByte(b, i + baseOffset)) { + return false; + } + } + + return true; + } + + // Get all pairs + public List getAll() { + final List result = new ArrayList<>(slots.length); + for (int i = 0; i < slots.length; i++) { + ResultRow slotValue = slots[i]; + if (slotValue != null) { + result.add(new MapEntry(new String(keys[i], StandardCharsets.UTF_8), slotValue)); + } + } + return result; + } + } + + private static class Task implements Runnable { + + private final SimplerHashMap results; + private final Chunk chunk; + + public Task(Chunk chunk) { + this.results = new SimplerHashMap(); + this.chunk = chunk; + } + + @Override + public void run() { + // Max length of any city name + final byte[] nameBytes = new byte[100]; + short nameIndex = 0; + int ot; + int hash = FNV_32_OFFSET; + + long i = chunk.start; + final long cl = chunk.start + chunk.length; + while (i < cl) { + byte c; + // 0x3B is ; + while ((c = UNSAFE.getByte(i++)) != 0x3B) { + nameBytes[nameIndex++] = c; + // FNV-1a hash : https://en.wikipedia.org/wiki/Fowler–Noll–Vo_hash_function + hash ^= c; + hash *= FNV_32_PRIME; + } + + // Temperature just after Semicolon + c = UNSAFE.getByte(i++); + // 0x2D is Minus(-) + // Below you will see -48 which is used to convert from ASCII to Integer, 48 represents 0 in ASCII + if (c == 0x2D) { + // X.X or XX.X + if (UNSAFE.getByte(i + 3) == 0xA) { + ot = (UNSAFE.getByte(i++) - 48) * 10; + } + else { + ot = (UNSAFE.getByte(i++) - 48) * 100; + ot += (UNSAFE.getByte(i++) - 48) * 10; + } + // Now dot + i++; // Skipping Dot + ot += (UNSAFE.getByte(i++) - 48); + // Make Number Negative Since we detected (-) sign + ot = -ot; + } + else { + // X.X or XX.X + if (UNSAFE.getByte(i + 2) == 0xA) { + ot = (c - 48) * 10; + } + else { + ot = (c - 48) * 100; + ot += (UNSAFE.getByte(i++) - 48) * 10; + } + // Now dot + i++; // Skipping Dot + // Number after dot + ot += (UNSAFE.getByte(i++) - 48); + } + // Since Parsed Line, Next thing must be newline + i++; + hash &= 65535; + results.putOrMerge(nameBytes, nameIndex, hash, ot); + // Reset + nameIndex = 0; + hash = FNV_32_OFFSET; + } + List all = results.getAll(); + lock.lock(); + try { + for (MapEntry me : all) { + ResultRow rr; + ResultRow lr = me.row; + if ((rr = resultRowMap.get(me.key)) != null) { + rr.min = Math.min(rr.min, lr.min); + rr.max = Math.max(rr.max, lr.max); + rr.count += lr.count; + rr.sum += lr.sum; + } + else { + resultRowMap.put(me.key, lr); + } + } + } + catch (Exception e) { + e.printStackTrace(); + } + finally { + lock.unlock(); + } + } + } + + public static void main(String[] args) throws IOException, InterruptedException { + FileChannel fileChannel = FileChannel.open(FILE, READ); + List chunks = getChunks(fileChannel); + List threads = new ArrayList<>(); + for (Chunk chunk : chunks) { + Thread thread = new Thread(new Task(chunk)); + thread.setPriority(Thread.MAX_PRIORITY); // Make this thread of highest priority + threads.add(thread); + thread.start(); + } + for (Thread thread : threads) { + thread.join(); + } + System.out.println(new TreeMap<>(resultRowMap)); + } +}