From 80328e4898112cd49ea53780d117afe821818428 Mon Sep 17 00:00:00 2001 From: Jason Nochlin <91577+hundredwatt@users.noreply.github.com> Date: Thu, 11 Jan 2024 03:48:22 -0700 Subject: [PATCH] create fork hundredwatt (#279) Co-authored-by: Jason Nochlin --- calculate_average_hundredwatt.sh | 19 + prepare_hundredwatt.sh | 20 ++ .../onebrc/CalculateAverage_hundredwatt.java | 339 ++++++++++++++++++ 3 files changed, 378 insertions(+) create mode 100755 calculate_average_hundredwatt.sh create mode 100755 prepare_hundredwatt.sh create mode 100644 src/main/java/dev/morling/onebrc/CalculateAverage_hundredwatt.java diff --git a/calculate_average_hundredwatt.sh b/calculate_average_hundredwatt.sh new file mode 100755 index 0000000..006dda8 --- /dev/null +++ b/calculate_average_hundredwatt.sh @@ -0,0 +1,19 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +JAVA_OPTS="" +java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_hundredwatt diff --git a/prepare_hundredwatt.sh b/prepare_hundredwatt.sh new file mode 100755 index 0000000..90a0a42 --- /dev/null +++ b/prepare_hundredwatt.sh @@ -0,0 +1,20 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +source "$HOME/.sdkman/bin/sdkman-init.sh" +sdk use java 21.0.1-graal diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_hundredwatt.java b/src/main/java/dev/morling/onebrc/CalculateAverage_hundredwatt.java new file mode 100644 index 0000000..051de9c --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_hundredwatt.java @@ -0,0 +1,339 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class CalculateAverage_hundredwatt { + private static final String FILE = "./measurements.txt"; + private static final int MAX_ROW_SIZE = 100 + 1 + 5 + 1; // 100 for city name, 1 for ;, 5 for temperature, 1 for \n + private static final int THREAD_COUNT = Math.min(8, Runtime.getRuntime().availableProcessors()); + private static final long BUFFER_SIZE = 128 * 1024 * 1024; // 128MB + private static final long CHUNK_SIZE = BUFFER_SIZE / THREAD_COUNT; + private static final long FILE_CHUNK_SIZE = CHUNK_SIZE - MAX_ROW_SIZE; + public static final int TEMPERATURE_SLOTS = 5003; // prime number + private static final short[] TEMPERATURES = new short[TEMPERATURE_SLOTS]; + private static final long PERFECT_HASH_SEED = -5353381455852817461L; + + // Construct a perfect hash function mapping temperatures encoded as longs (e.g., 0x2d342e3000000000 for -4.3) to + // the corresponding short integer (e.g., -43). + static { + // Figure out encoding for all possible temperature values (1999 total) + long start = System.currentTimeMillis(); + Map decodeTemperatureMap = new HashMap<>(); + for (short i = -999; i <= 999; i++) { + long word = 0; + int shift = 56; + if (i < 0) { + word |= ((long) '-') << shift; + shift -= 8; + } + if (Math.abs(i) >= 100) { + int hh = Math.abs(i) / 100; + int tt = (Math.abs(i) - hh * 100) / 10; + + word |= ((long) (hh + '0')) << shift; + shift -= 8; + word |= ((long) (tt + '0')) << shift; + } + else { + int tt = Math.abs(i) / 10; + // convert to ascii + word |= ((long) (tt + '0')) << shift; + } + shift -= 8; + word |= ((long) '.') << shift; + shift -= 8; + int uu = Math.abs(i) % 10; + word |= ((long) (uu + '0')) << shift; + + // 31302e3000000000 + decodeTemperatureMap.put(word, i); + } + + // Brute force to find seed: + // Random rand = new Random(System.nanoTime()); + // int max = 0; + // int attempts = 0; + // while (true) { + // BitSet bs = new BitSet(5003); + // var seed = rand.nextLong(); + // seed |= 0b1; // make sure it's odd + // for (var word : decodeTemperatureMap.keySet()) { + // var h = (word * seed) & ~(1L << 63); + // var pos = (int) (h % 5003); + // bs.set(pos); + // } + // ; + // var c = bs.cardinality(); + // if (c == decodeTemperatureMap.size()) { + // System.out.println("seed: " + seed + " cardinality: " + c + " max cardinality: " + max); + // break; + // } + // max = Math.max(max, c); + // if (attempts % 100_000 == 0) + // System.out.println("seed: " + seed + " cardinality: " + c + " max cardinality: " + max); + // attempts++; + // } + + decodeTemperatureMap.entrySet().stream().forEach(e -> { + var word = e.getKey(); + var h = (word * PERFECT_HASH_SEED) & ~(1L << 63); + var pos = (int) (h % TEMPERATURE_SLOTS); + if (TEMPERATURES[pos] != 0) + throw new RuntimeException("collision at " + pos); + TEMPERATURES[pos] = e.getValue(); + }); + // System.out.println("Building table took " + (System.currentTimeMillis() - start) + "ms"); + } + + static class Record { + short min; + short max; + int sum; + int count; + + public Record() { + this.min = Short.MAX_VALUE; + this.max = Short.MIN_VALUE; + this.sum = 0; + this.count = 0; + } + + public void updateWith(short value) { + min = (short) Math.min(min, value); + max = (short) Math.max(max, value); + sum += value; + count++; + } + + @Override + public String toString() { + return round(min / 10.0) + "/" + round(sum / 10.0 / count) + "/" + round(max / 10.0); + } + + double round(double v) { + return Math.round(v * 10.0) / 10.0; + } + } + + record Entry(long[] key, Record value) { + } + + static class HashTable { + private static final int INITIAL_SIZE = 128 * 1024; + private static final float LOAD_FACTOR = 0.75f; + private static final int GROW_FACTOR = 4; + private final long[][] KEYS = new long[INITIAL_SIZE][]; + private final Record[] VALUES = new Record[INITIAL_SIZE]; + private int size = INITIAL_SIZE; + + public HashTable() { + for (int i = 0; i < INITIAL_SIZE; i++) { + VALUES[i] = new Record(); + } + } + + public void putOrMerge(int hash, int length, long[] key, short value) { + int idx = hash & (size - 1); + + // linear probing + int i = 0; + while (KEYS[idx] != null && (0 != Arrays.compareUnsigned(KEYS[idx], 0, KEYS[idx].length, key, 0, length))) { + i++; + idx = (idx + 1) & (size - 1); + } + + if (KEYS[idx] == null) { + KEYS[idx] = Arrays.copyOf(key, length); + } + + VALUES[idx].updateWith(value); + } + + public List getAll() { + List result = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + if (KEYS[i] != null) { + result.add(new Entry(KEYS[i], VALUES[i])); + } + } + return result; + } + } + + private static String keyToString(long[] key) { + ByteBuffer kb = ByteBuffer.allocate(8 * key.length); + Arrays.stream(key).forEach(kb::putLong); + + // remove trailing '\0' bytes from kb and + // fix two ';' in word issue here (rather than in hot path) + byte b; + int limit = kb.position() - 8; + kb.position(limit); + while ((b = kb.get()) != 0 && b != ';') { + limit++; + } + + kb.flip(); + byte[] bytes = new byte[limit]; + kb.get(bytes); + + return new String(bytes).replace("\0", ""); + } + + private static Record merge(Record v, Record value) { + var record = new Record(); + record.min = (short) Math.min(v.min, value.min); + record.max = (short) Math.max(v.max, value.max); + record.sum = v.sum + value.sum; + record.count = v.count + value.count; + return record; + } + + private static int processChunk(ByteBuffer bb, HashTable hashTable, long start, long size) { + // Find first entry + while (start != 0 && bb.get() != '\n') { + } + + long word; + long[] key = new long[13]; + int offset; + long arg, hasvalue, op1, op2; + int position = bb.position(); + long hash; + long temperature_hash; + int temperature_pos; + short temperature_value; + + int i = 0; + int end = (int) (size - MAX_ROW_SIZE); + while (position < end) { + i++; + offset = -1; + + // Parse city name + // First word + hash = key[++offset] = bb.getLong(position + offset * 8); + + // From "Determine if a word has a byte equal to n" + // https://graphics.stanford.edu/~seander/bithacks.html#ValueInWord + arg = (key[offset]) ^ (0x0101010101010101L * (';')); + op1 = (arg - 0x0101010101010101L); + op2 = ~(arg); + hasvalue = (op1 & op2 & 0x8080808080808080L); + + // Remaining words (if present) + while (hasvalue == 0) { + ++offset; + key[offset] = bb.getLong(position + offset * 8); + hash ^= key[offset]; + + arg = (key[offset]) ^ (0x0101010101010101L * (';')); + op1 = (arg - 0x0101010101010101L); + op2 = ~(arg); + hasvalue = (op1 & op2 & 0x8080808080808080L); + } + hash ^= key[offset]; // unset last word since it will be updated + key[offset] = key[offset] & (-hasvalue); // ';' == 0x3b and -hasvalue is something like 0xff8000..., we can ignore the 0x80 byte since 0x3b & 0x80 == 0 + hash ^= key[offset]; + + position = position + offset * 8 + (Long.numberOfLeadingZeros(hasvalue)) / 8 + 1; // +1 for \n + + // Parse temperature + word = bb.getLong(position); + arg = (word) ^ 0x0101010101010101L * ('\n'); + op1 = (arg - 0x0101010101010101L); + op2 = ~(arg); + hasvalue = (op1 & op2 & 0x8080808080808080L); + word = word & ((-hasvalue)); + + // Perfect hash lookup for temperature + temperature_hash = (word * PERFECT_HASH_SEED) & ~(1L << 63); + temperature_pos = (int) (temperature_hash % TEMPERATURE_SLOTS); + temperature_value = TEMPERATURES[temperature_pos]; + + position = position + (Long.numberOfLeadingZeros(hasvalue)) / 8 + 1; // +1 for \n + + int hash2 = (int) (hash ^ (hash >> 32)); + + hashTable.putOrMerge(hash2, offset + 1, key, temperature_value); + } + return i; + } + + public static void main(String[] args) throws IOException { + final long fileSize = Files.size(Path.of(FILE)); + // System.out.println("File size: " + fileSize); + + // AtomicLong rowCount = new AtomicLong(); + + // Read file in chunks using striping + try (var fileChannel = (FileChannel) Files.newByteChannel(Path.of(FILE), StandardOpenOption.READ)) { + var r = IntStream.range(0, THREAD_COUNT + 1).mapToObj(stripe -> { + long start = stripe * FILE_CHUNK_SIZE; + var hashTable = new HashTable(); + + if (stripe == THREAD_COUNT) { // last thread + try { + // handle trailing bytes in file in jankiest way possible (for now hopefully :) ) + byte[] trailing = new byte[MAX_ROW_SIZE * 2]; + fileChannel.read(ByteBuffer.wrap(trailing), Math.max(0, fileSize - MAX_ROW_SIZE)); + var rc = processChunk(ByteBuffer.wrap(trailing), hashTable, Math.max(0, fileSize - MAX_ROW_SIZE), + MAX_ROW_SIZE + Math.min(fileSize, MAX_ROW_SIZE)); + // rowCount.addAndGet(rc); + return hashTable; + + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + while (start < fileSize) { + long end = Math.min(start + CHUNK_SIZE, fileSize); + MappedByteBuffer bb = null; + try { + bb = fileChannel.map(FileChannel.MapMode.READ_ONLY, start, end - start); + } + catch (IOException e) { + throw new RuntimeException(e); + } + var rc = processChunk(bb, hashTable, start, end - start); + + // rowCount.addAndGet(rc); + start += FILE_CHUNK_SIZE * THREAD_COUNT; + } + + return hashTable; + }).parallel().flatMap(partition -> partition.getAll().stream()) + .collect(Collectors.toMap(e -> keyToString(e.key()), Entry::value, CalculateAverage_hundredwatt::merge, TreeMap::new)); + + System.out.println(r); + // System.out.println(rowCount.get()); + } + } +}