From 865188ccee45fb23242ba159102e29b741d59adf Mon Sep 17 00:00:00 2001 From: Jaroslav Bachorik Date: Sun, 14 Jan 2024 14:12:19 +0100 Subject: [PATCH] Small improvements (#379) --- calculate_average_jbachorik.sh | 2 +- prepare_jbachorik.sh | 2 +- .../onebrc/CalculateAverage_jbachorik.java | 120 +++++++++--------- 3 files changed, 61 insertions(+), 63 deletions(-) diff --git a/calculate_average_jbachorik.sh b/calculate_average_jbachorik.sh index d4d8e91..7ea136d 100755 --- a/calculate_average_jbachorik.sh +++ b/calculate_average_jbachorik.sh @@ -15,5 +15,5 @@ # limitations under the License. # -JAVA_OPTS="-XX:+UseInlineCaches -XX:CICompilerCount=4 -Xmx512m -XX:+UseInlineCaches -XX:MaxInlineSize=512 -XX:FreqInlineSize=800 -XX:InlineSmallCode=190 -XX:CompileThreshold=2" +JAVA_OPTS="-Xmx512m -Xms512m" java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_jbachorik $@ diff --git a/prepare_jbachorik.sh b/prepare_jbachorik.sh index d2a3c6b..f83a3ff 100755 --- a/prepare_jbachorik.sh +++ b/prepare_jbachorik.sh @@ -16,4 +16,4 @@ # source "$HOME/.sdkman/bin/sdkman-init.sh" -sdk use java 21.0.1-tem 1>&2 +sdk use java 21.0.1-graal 1>&2 diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_jbachorik.java b/src/main/java/dev/morling/onebrc/CalculateAverage_jbachorik.java index 3eaa6cc..4455b0c 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_jbachorik.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_jbachorik.java @@ -50,11 +50,11 @@ public class CalculateAverage_jbachorik { // bb.get(offset, bytes); // String str = new String(bytes); - if (this.len != len || this.v0 != v0 || this.v1 != v1) { + if (((this.len ^ len) | (this.v0 ^ v0) | (this.v1 ^ v1)) != 0) { return false; } - for (int i = 0; i < (len / 8); i++) { - if (bb.getLong(this.offset + i * 8) != bb.getLong(offset + i * 8)) { + for (int i = 0; i < len - 8; i += 8) { + if (bb.getLong(this.offset + i) != bb.getLong(offset + i)) { return false; } } @@ -125,32 +125,26 @@ public class CalculateAverage_jbachorik { } } - private static final int BUCKETS = 1264532; - private static final int BUCKET_SIZE = 4; + private static final int BUCKETS = 65536; + private static final int BUCKET_SIZE = 16; private final StatsHolder[][] map = new StatsHolder[BUCKETS][BUCKET_SIZE]; - public Stats getOrInsert(ByteBuffer buffer, int offset, int len, int hash, long v0, long v1) { - int idx = Math.abs(hash % BUCKETS); - + public Stats getOrInsert(ByteBuffer buffer, int offset, int len, int idx, long v0, long v1) { StatsHolder[] bucket = map[idx]; - if (bucket[0] == null) { - Stats stats = new Stats(); - bucket[0] = new StatsHolder(new Key(buffer, offset, len, v0, v1, hash), stats); - return stats; - } int bucketOffset = 0; - while (bucketOffset < BUCKET_SIZE && bucket[bucketOffset] != null && !bucket[bucketOffset].key.equals(offset, len, v0, v1)) { + do { + StatsHolder statsHolder = bucket[bucketOffset]; + if (statsHolder == null) { + Stats stats = new Stats(); + bucket[bucketOffset] = new StatsHolder(new Key(buffer, offset, len, v0, v1, idx), stats); + return stats; + } + if (statsHolder.key.equals(offset, len, v0, v1)) { + return statsHolder.stats; + } bucketOffset++; - } - assert (bucketOffset <= BUCKET_SIZE); - if (bucket[bucketOffset] != null) { - return bucket[bucketOffset].stats; - } - else { - Stats stats = new Stats(); - bucket[bucketOffset] = new StatsHolder(new Key(buffer, offset, len, v0, v1, hash), stats); - return stats; - } + } while (bucketOffset < BUCKET_SIZE - 1); + throw new Error("Bucket overflow"); } public void forEach(BiConsumer consumer) { @@ -168,19 +162,19 @@ public class CalculateAverage_jbachorik { private static final long semiPattern = compilePattern((byte) ';'); public static void main(String[] args) throws Exception { - int workers = Runtime.getRuntime().availableProcessors() - 1; + int workers = Runtime.getRuntime().availableProcessors(); if (args.length == 1) { workers = Integer.parseInt(args[0]); } Map map = new TreeMap<>(); File f = new File("measurements.txt"); - ExecutorService workerPool = Executors.newFixedThreadPool(workers); - ExecutorService mergerPool = Executors.newSingleThreadExecutor(); + try (FileInputStream fis = new FileInputStream(f)) { FileChannel fc = fis.getChannel(); int granularity = 32 * 1024 * 1024; int targetWorkers = Math.min(Math.max(1, (int) (fc.size() / granularity)), workers); long chunkSize = fc.size() / targetWorkers; + ExecutorService workerPool = Executors.newFixedThreadPool(workers); // System.out.println("Chunk size: " + chunkSize + ", workers: " + targetWorkers); for (ByteBuffer bb : mmap(fc, (int) chunkSize)) { workerPool.submit(() -> { @@ -200,8 +194,6 @@ public class CalculateAverage_jbachorik { } workerPool.shutdown(); workerPool.awaitTermination(1, TimeUnit.HOURS); - mergerPool.shutdown(); - mergerPool.awaitTermination(1, TimeUnit.HOURS); } finally { // System.out.println("Keys: " + map.size()); @@ -209,21 +201,9 @@ public class CalculateAverage_jbachorik { } } - private static int longHash(long l, int h) { - if (l == 0) { - return h; - } - h = (int) (31 * 31 * 31 * 31 * 31 * 31 * 31 * 31 * h - + 31 * 31 * 31 * 31 * 31 * 31 * 31 * ((l >> 56 & 0xFF)) - + 31 * 31 * 31 * 31 * 31 * 31 * ((l >> 48 & 0xFF)) - + 31 * 31 * 31 * 31 * 31 * ((l >> 40 & 0xFF)) - + 31 * 31 * 31 * 31 * ((l >> 32 & 0xFF)) - + 31 * 31 * 31 * ((l >> 24 & 0xFF)) - + 31 * 31 * ((l >> 16) & 0xFF) - + 31 * ((l >> 8) & 0xFF) - + (l & 0xFF)); - return h; - } + // unrolled FNV-1a 64 bit hash + private static final long fnv64OffsetBasis = 0xCBF29CE484222325L; + private static final long fnv64Prime = 0x100000001B3L; private static StatsMap processChunk(ByteBuffer bb) { StatsMap map = new StatsMap(); @@ -233,7 +213,7 @@ public class CalculateAverage_jbachorik { int readLimit = limit - 8; long v0 = 0; long v1 = 0; - int hashCode = 0; + long hashCode = fnv64OffsetBasis; int lastNewLine = -1; while (offset < limit) { @@ -249,24 +229,34 @@ public class CalculateAverage_jbachorik { if (x != 0) { long value = 0; int valueLen = 0; - int pos = 7 - (Long.numberOfTrailingZeros(x) >>> 3); + int pos = Long.numberOfLeadingZeros(x) >>> 3; int yoffset = offset; int semiPos = firstInstance(v1, semiPattern); - if (semiPos == 8 || semiPos >= pos) { + if (semiPos >= pos) { yoffset -= 8; semiPos = firstInstance(v0, semiPattern); // semiPos will be at least 3 (new line is in the upper word and the value has at most 5 bytes) - long mask = semiPos == 0 ? 0L : (0xFFFFFFFFFFFFFFFFL << (8 - semiPos) * 8); - long newlineMask = pos == 0 ? 0L : (0xFFFFFFFFFFFFFFFFL << ((8 - pos) * 8)); + // a 64 bit value can not be rotated by 64 bits to 'clear' the bits + // instead, it must be rotated by at most 56 bits and then, if 64 bit rotation was requested, by 8 bits more + int rot2 = (8 - semiPos) >>> 3; + int rot1 = (7 - semiPos) + (~rot2 & 0x1); + long mask = ((0xFFFFFFFFFFFFFFFFL << (rot1 * 8)) << (rot2 * 8)); + rot2 = (8 - pos) >>> 3; + rot1 = (7 - pos) + (~rot2 & 0x1); + long newlineMask = ((0xFFFFFFFFFFFFFFFFL << (rot1 * 8)) << (rot2 * 8)); value = semiPos == 7 ? 0L : (v0 << (semiPos + 1) * 8); value |= ((v1 & newlineMask) >> (7 - semiPos) * 8); + // right-align the value bytes + // getting the number of trailing zeros is the easiest way to figure out the shift + // should be sufficiently fast but ... int zeros = (Long.numberOfTrailingZeros(value) >>> 3); value = value >>> zeros * 8; valueLen = 8 - zeros; v0 = v0 & mask; } else { - hashCode += longHash(v0, hashCode); + hashCode ^= v0; + hashCode *= fnv64Prime; long valMask = (0xFFFFFFFFFFFFFFFFL << (7 - semiPos) * 8); v0 = v1 & valMask; value = v1 & ~valMask; @@ -274,26 +264,33 @@ public class CalculateAverage_jbachorik { valueLen = pos - semiPos - 1; } v1 = 0; - hashCode += longHash(v0, hashCode); + hashCode ^= v0; + hashCode *= fnv64Prime; + int len = (yoffset + semiPos - 1) - lastNewLine; - hashCode += longHash(len, hashCode); + hashCode ^= len; + hashCode *= fnv64Prime; // byte[] strBuf = new byte[len]; // bb.get(lastNewLine + 1, strBuf); // String str = new String(strBuf); // System.out.println("===> " + str + ": " + Long.toHexString(value) + " :: " + fastParse(value, valueLen)); - map.getOrInsert(bb, lastNewLine + 1, len, hashCode, v0, v1).add(fastParse(value, valueLen)); + // projection of the hash code to 32 bits -> 65k buckets + long idx = ((hashCode & 0xFFFFFFFF00000000L) >> 32) ^ (hashCode & 0x00000000FFFFFFFFL); + idx = ((idx & 0x00000000FFFF0000L) >> 16) ^ (idx & 0x000000000000FFFFL); + map.getOrInsert(bb, lastNewLine + 1, len, (int) idx, v0, v1).add(fastParse(value, valueLen)); offset += pos + 1; lastNewLine = offset - 1; // reset the previous value v0 = 0; // reset the hash - hashCode = 0; + hashCode = fnv64OffsetBasis; } else { offset += 8; - hashCode += longHash(v0, hashCode); + hashCode ^= v0; + hashCode *= fnv64Prime; v0 = v1; } } @@ -305,10 +302,12 @@ public class CalculateAverage_jbachorik { private static int fastParse(long word, int len) { assert (len <= 5); - long singChar = (word >> ((len - 1) * 8)); - long sign = singChar & 0x2dL; - int negative = (int) ((sign ^ 0x2d) & 0xff) == 0 ? -1 : 1; - int shift = (8 - len - Math.min(negative, 0)) * 8; + int signChar = (int) (word >> ((len - 1) * 8)) & 0xFF; + int sign = signChar ^ 0x2d; + int base = ~(sign | -sign); + int offset = (base >> 7) & 0x01; + int multiplier = -(~((sign - 1) >> 31) | 0x1); + int shift = (8 - len + offset) * 8; long mask = (0xFFFFFFFFFFFFFFFFL >>> shift); word = (word ^ fastParserMask) & mask; @@ -319,8 +318,7 @@ public class CalculateAverage_jbachorik { int v4 = 100 * ((int) (word >> 24) & 0xff); // v5 is either the sign or not used - int val = ((v1 + v3 + v4) * negative); - return val; + return ((v1 + v3 + v4) * multiplier); } private static ByteBuffer[] mmap(FileChannel fc, int splitSize) throws Exception {