From a8bd6b58ce10678719aaa633171676503206e18e Mon Sep 17 00:00:00 2001 From: Elliot Barlas Date: Thu, 4 Jan 2024 12:06:19 -0800 Subject: [PATCH] Elliot Barlas: Use proper hash key collision detection scheme * Use open-addressing scheme to deal with hash table collisions. Reduce concurrency from 16 to 8. Use bit mask rather than mod operator to confine hash code to table range. * Properly handle file partitions that reside entirely within a line. * Reorder statements in doProcessBuffer. --- calculate_average_ebarlas.sh | 2 +- .../onebrc/CalculateAverage_ebarlas.java | 99 +++++++++++++------ 2 files changed, 69 insertions(+), 32 deletions(-) diff --git a/calculate_average_ebarlas.sh b/calculate_average_ebarlas.sh index 65cc651..73e286f 100755 --- a/calculate_average_ebarlas.sh +++ b/calculate_average_ebarlas.sh @@ -17,4 +17,4 @@ sdk use java 21.0.1-graalce JAVA_OPTS="" -time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_ebarlas measurements.txt 16 +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_ebarlas measurements.txt 8 diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_ebarlas.java b/src/main/java/dev/morling/onebrc/CalculateAverage_ebarlas.java index bdb9a8a..23b3403 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_ebarlas.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_ebarlas.java @@ -21,13 +21,15 @@ import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; +import java.util.Arrays; import java.util.List; import java.util.TreeMap; public class CalculateAverage_ebarlas { - private static final int HASH_FACTOR = 278; - private static final int HASH_MOD = 3_487; + private static final int MAX_KEY_SIZE = 100; + private static final int HASH_FACTOR = 433; + private static final int HASH_TBL_SIZE = 16_383; // range of allowed hash values, inclusive public static void main(String[] args) throws IOException, InterruptedException { if (args.length != 2) { @@ -92,16 +94,11 @@ public class CalculateAverage_ebarlas { var current = partitions.get(i).stats; for (int j = 0; j < current.length; j++) { if (current[j] != null) { - var t = target[j]; - if (t == null) { - target[j] = current[j]; // copy ref from current to target - } - else { - t.min = Math.min(t.min, current[j].min); - t.max = Math.max(t.max, current[j].max); - t.sum += current[j].sum; - t.count += current[j].count; - } + var t = findInTable(target, current[j].hash, current[j].key, current[j].key.length); + t.min = Math.min(t.min, current[j].min); + t.max = Math.max(t.max, current[j].max); + t.sum += current[j].sum; + t.count += current[j].count; } } } @@ -114,7 +111,12 @@ public class CalculateAverage_ebarlas { var pPrev = partitions.get(i - 1); var merged = mergeFooterAndHeader(pPrev.footer, pNext.header); if (merged != null) { - doProcessBuffer(ByteBuffer.wrap(merged), true, pPrev.stats); // fold into prev partition + if (merged[merged.length - 1] == '\n') { // fold into prev partition + doProcessBuffer(ByteBuffer.wrap(merged), true, pPrev.stats); + } + else { // no newline appeared in partition, carry forward + pNext.footer = merged; + } } } } @@ -133,32 +135,36 @@ public class CalculateAverage_ebarlas { } private static Partition processBuffer(ByteBuffer buffer, boolean first) { - return doProcessBuffer(buffer, first, new Stats[HASH_MOD * 2]); + return doProcessBuffer(buffer, first, new Stats[HASH_TBL_SIZE + 1]); } private static Partition doProcessBuffer(ByteBuffer buffer, boolean first, Stats[] stats) { - var readingKey = true; - var keyHash = 0; - var keyStart = 0; - var negative = false; - var val = 0; var header = first ? null : readHeader(buffer); + var readingKey = true; // reading key or value? + var keyBuf = new byte[MAX_KEY_SIZE]; // buffer for key + var keyPos = 0; // current position in key buffer + var keyHash = 0; // accumulating hash of key + var keyStart = buffer.position(); // start of key in buffer used for footer calc + var negative = false; // is value negative? + var val = 0; // accumulating value Stats st = null; while (buffer.hasRemaining()) { var b = buffer.get(); if (readingKey) { - if (b == ';') { - var idx = HASH_MOD + keyHash % HASH_MOD; - st = stats[idx]; - if (st == null) { - var key = new byte[buffer.position() - keyStart - 1]; - buffer.get(keyStart, key, 0, key.length); - st = stats[idx] = new Stats(key); - } - readingKey = false; + if (b != ';') { + keyHash = HASH_FACTOR * keyHash + b; + keyBuf[keyPos++] = b; } else { - keyHash = HASH_FACTOR * keyHash + b; + var idx = keyHash & HASH_TBL_SIZE; + st = stats[idx]; + if (st == null) { // nothing in table, eagerly claim spot + st = stats[idx] = newStats(keyBuf, keyPos, keyHash); + } + else if (!Arrays.equals(st.key, 0, st.key.length, keyBuf, 0, keyPos)) { + st = findInTable(stats, keyHash, keyBuf, keyPos); + } + readingKey = false; } } else { @@ -173,6 +179,7 @@ public class CalculateAverage_ebarlas { val = 0; negative = false; keyStart = buffer.position(); + keyPos = 0; } else if (b == '-') { negative = true; @@ -186,6 +193,25 @@ public class CalculateAverage_ebarlas { return new Partition(header, footer, stats); } + private static Stats findInTable(Stats[] stats, int hash, byte[] key, int len) { // open-addressing scan + var idx = hash & HASH_TBL_SIZE; + var st = stats[idx]; + while (st != null && !Arrays.equals(st.key, 0, st.key.length, key, 0, len)) { + idx = (idx + 1) % (HASH_TBL_SIZE + 1); + st = stats[idx]; + } + if (st != null) { + return st; + } + return stats[idx] = newStats(key, len, hash); + } + + private static Stats newStats(byte[] buffer, int len, int hash) { + var k = new byte[len]; + System.arraycopy(buffer, 0, k, 0, len); + return new Stats(k, hash); + } + private static byte[] readFooter(ByteBuffer buffer, int lineStart) { // read from line start to current pos (end-of-input) var footer = new byte[buffer.position() - lineStart]; buffer.get(lineStart, footer, 0, footer.length); @@ -200,18 +226,29 @@ public class CalculateAverage_ebarlas { return header; } - record Partition(byte[] header, byte[] footer, Stats[] stats) { + private static class Partition { + byte[] header; + byte[] footer; + Stats[] stats; + + Partition(byte[] header, byte[] footer, Stats[] stats) { + this.header = header; + this.footer = footer; + this.stats = stats; + } } private static class Stats { // min, max, and sum values are modeled with integral types that represent tenths of a unit final byte[] key; + final int hash; int min = Integer.MAX_VALUE; int max = Integer.MIN_VALUE; long sum; long count; - Stats(byte[] key) { + Stats(byte[] key, int hash) { this.key = key; + this.hash = hash; } } }