From 82197d4482ec8a5086713b4bbad88be6e9b59003 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksey=20Shipil=C3=ABv?= Date: Sun, 28 Jan 2024 22:49:34 +0100 Subject: [PATCH] shipilev: Amendments to version 4 (#627) * Amendments * One more locality touchup: no need to carry the entire name array --- calculate_average_shipilev.sh | 7 +- .../onebrc/CalculateAverage_shipilev.java | 156 +++++++++++------- 2 files changed, 99 insertions(+), 64 deletions(-) diff --git a/calculate_average_shipilev.sh b/calculate_average_shipilev.sh index 5d9f633..13a12cd 100755 --- a/calculate_average_shipilev.sh +++ b/calculate_average_shipilev.sh @@ -15,12 +15,11 @@ # limitations under the License. # -JAVA_OPTS="-XX:+UnlockExperimentalVMOptions -XX:+UseEpsilonGC -Xms64m -Xmx64m -XX:+AlwaysPreTouch -XX:+UseTransparentHugePages --XX:-TieredCompilation -XX:CICompilerCount=2 -XX:-UseCountedLoopSafepoints -XX:+TrustFinalNonStaticFields +JAVA_OPTS="-XX:+UnlockExperimentalVMOptions -XX:+UseEpsilonGC -Xms1g -Xmx1g -XX:-AlwaysPreTouch -XX:+UseTransparentHugePages +-XX:-TieredCompilation -XX:-UseCountedLoopSafepoints -XX:+TrustFinalNonStaticFields -XX:CompileThreshold=2048 --add-opens java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.ref=ALL-UNNAMED -XX:+UnlockDiagnosticVMOptions -XX:CompileCommand=quiet -XX:CompileCommand=dontinline,dev.morling.onebrc.CalculateAverage_shipilev\$ParsingTask::seqCompute -XX:CompileCommand=dontinline,dev.morling.onebrc.CalculateAverage_shipilev\$MeasurementsMap::updateSlow --XX:CompileCommand=inline,dev.morling.onebrc.CalculateAverage_shipilev::nameMatches --XX:CompileThreshold=2048" +-XX:CompileCommand=inline,dev.morling.onebrc.CalculateAverage_shipilev\$Bucket::matches" java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_shipilev diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_shipilev.java b/src/main/java/dev/morling/onebrc/CalculateAverage_shipilev.java index 4998986..1150f42 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_shipilev.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_shipilev.java @@ -102,45 +102,12 @@ public class CalculateAverage_shipilev { // ========================= MEATY GRITTY PARTS: PARSE AND AGGREGATE ========================= - // Little helper method to compare the array with given bytebuffer range. - public static boolean nameMatches(Bucket bucket, ByteBuffer cand, int begin, int end) { - byte[] orig = bucket.name; - int origLen = orig.length; - int candLen = end - begin; - if (origLen != candLen) { - return false; - } - - // Check the tails first, to simplify the matches. - if (origLen >= 8) { - if (bucket.tail1 != cand.getLong(end - 8)) { - return false; - } - if (origLen >= 16) { - if (bucket.tail2 != cand.getLong(end - 16)) { - return false; - } - origLen -= 16; - } - else { - origLen -= 8; - } - } - - // Check the rest. - for (int i = 0; i < origLen; i++) { - if (orig[i] != cand.get(begin + i)) { - return false; - } - } - return true; - } - public static final class Bucket { - // Raw station name, its hash, and tails. - public final byte[] name; + // Raw station name, its hash, and prefixes. + public final byte[] nameTail; + public final int len; public final int hash; - public final long tail1, tail2; + public final int prefix1, prefix2; // Temperature values, in 10x scale. public long sum; @@ -148,10 +115,32 @@ public class CalculateAverage_shipilev { public int min; public int max; - public Bucket(byte[] name, long tail1, long tail2, int hash, int temp) { - this.name = name; - this.tail1 = tail1; - this.tail2 = tail2; + public Bucket(ByteBuffer slice, int begin, int end, int hash, int temp) { + len = end - begin; + + // Also pick up any prefixes to simplify future matches. + int tailStart = 0; + if (len >= 8) { + prefix1 = slice.getInt(begin + 0); + prefix2 = slice.getInt(begin + 4); + tailStart += 8; + } + else if (len >= 4) { + prefix1 = slice.getInt(begin + 0); + prefix2 = 0; + tailStart += 4; + } + else { + prefix1 = 0; + prefix2 = 0; + } + + // The rest goes to tail byte array. We are checking it names on hot-path. + // Therefore, it is convenient to keep allocation for names near the buckets. + int tailLen = len - tailStart; + nameTail = new byte[tailLen]; + slice.get(begin + tailStart, nameTail, 0, tailLen); + this.hash = hash; this.sum = temp; this.count = 1; @@ -159,6 +148,48 @@ public class CalculateAverage_shipilev { this.max = temp; } + // Little helper method to compare the array with given bytebuffer range. + public boolean matches(ByteBuffer cand, int begin, int end) { + int origLen = len; + int candLen = end - begin; + if (origLen != candLen) { + return false; + } + + // Check the prefixes first, to simplify the matches. + int tailStart = 0; + if (origLen >= 8) { + if (prefix1 != cand.getInt(begin)) { + return false; + } + if (prefix2 != cand.getInt(begin + 4)) { + return false; + } + tailStart += 8; + } + else if (origLen >= 4) { + if (prefix1 != cand.getInt(begin)) { + return false; + } + tailStart += 4; + } + + // Check the rest. + for (int i = 0; i < origLen - tailStart; i++) { + if (nameTail[i] != cand.get(begin + tailStart + i)) { + return false; + } + } + return true; + } + + public boolean matches(Bucket other) { + return len == other.len && + prefix1 == other.prefix1 && + prefix2 == other.prefix2 && + Arrays.equals(nameTail, other.nameTail); + } + public void merge(int value) { sum += value; count++; @@ -178,8 +209,19 @@ public class CalculateAverage_shipilev { } public Row toRow() { + // Reconstruct the name + ByteBuffer bb = ByteBuffer.allocate(len); + bb.order(ByteOrder.LITTLE_ENDIAN); + if (len >= 4) { + bb.putInt(prefix1); + } + if (len >= 8) { + bb.putInt(prefix2); + } + bb.put(nameTail); + return new Row( - new String(name), + new String(Arrays.copyOf(bb.array(), len)), Math.round((double) min) / 10.0, Math.round((double) sum / count) / 10.0, Math.round((double) max) / 10.0); @@ -205,21 +247,11 @@ public class CalculateAverage_shipilev { while (true) { Bucket cur = buckets[idx]; if (cur == null) { - // No bucket yet, lucky us. Lookup the name and create the bucket with it. - // We are checking the names on hot-path. Therefore, it is convenient - // to keep allocation for names near the buckets. - int len = end - begin; - byte[] copy = new byte[len]; - name.get(begin, copy, 0, len); - - // Also pick up any tail to simplify future matches. - long tail1 = (len < 8) ? 0 : name.getLong(begin + len - 8); - long tail2 = (len < 16) ? 0 : name.getLong(begin + len - 16); - - buckets[idx] = new Bucket(copy, tail1, tail2, hash, temp); + // No bucket yet, lucky us. Create the bucket with it. + buckets[idx] = new Bucket(name, begin, end, hash, temp); return; } - else if ((cur.hash == hash) && nameMatches(cur, name, begin, end)) { + else if ((cur.hash == hash) && cur.matches(name, begin, end)) { // Same as bucket fastpath. Check for collision by checking the full hash // first (since the index is truncated by map size), and then the exact name. cur.merge(temp); @@ -244,7 +276,7 @@ public class CalculateAverage_shipilev { buckets[idx] = other; break; } - else if ((cur.hash == other.hash) && Arrays.equals(cur.name, other.name)) { + else if ((cur.hash == other.hash) && cur.matches(other)) { cur.merge(other); break; } @@ -425,7 +457,7 @@ public class CalculateAverage_shipilev { // Time to update! Bucket bucket = buckets[nameHash & (MAP_SIZE - 1)]; - if ((bucket != null) && (nameHash == bucket.hash) && nameMatches(bucket, slice, nameBegin, nameEnd)) { + if ((bucket != null) && (nameHash == bucket.hash) && bucket.matches(slice, nameBegin, nameEnd)) { // Lucky fast path, existing bucket hit. Most of the time we complete here. bucket.merge(temp); } @@ -447,8 +479,8 @@ public class CalculateAverage_shipilev { // a given mmap slice, while there is still other work to do. This allows // us to unmap slices on the go. public static final class RootTask extends CountedCompleter { - public RootTask(CountedCompleter parent) { - super(parent); + public RootTask() { + super(null); } @Override @@ -517,8 +549,12 @@ public class CalculateAverage_shipilev { // ========================= Invocation ========================= public static void main(String[] args) throws Exception { + // Instantiate a separate FJP to match the parallelism accurately, without + // relying on common pool defaults. + ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors()); + // This little line carries the whole world - new RootTask(null).fork(); + pool.submit(new RootTask()); // While the root task is working, prepare what we need for the // end of the run. Go and try to report something to prepare the