diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_jincongho.java b/src/main/java/dev/morling/onebrc/CalculateAverage_jincongho.java index d2a7e66..0758703 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_jincongho.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_jincongho.java @@ -31,7 +31,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; /** * Changelog (based on Macbook Pro Intel i7 6-cores 2.6GHz): @@ -123,13 +122,31 @@ public class CalculateAverage_jincongho { // } // scalar implementation + // public static int hashCode(final MemorySegment array, final long offset, final short length) { + // final long limit = offset + length; + // int h = 1; + // for (long i = offset; i < limit; i++) { + // h = 31 * h + UNSAFE.getByte(array.address() + i); + // } + // return h; + // } + + // fxhash public static int hashCode(final MemorySegment array, final long offset, final short length) { - final long limit = offset + length; - int h = 1; - for (long i = offset; i < limit; i++) { - h = 31 * h + UNSAFE.getByte(array.address() + i); + final int seed = 0x9E3779B9; + final int rotate = 5; + + int x, y; + if (length >= Integer.BYTES) { + x = UNSAFE.getInt(array.address() + offset); + y = UNSAFE.getInt(array.address() + offset + length - Integer.BYTES); } - return h; + else { + x = UNSAFE.getByte(array.address() + offset); + y = UNSAFE.getByte(array.address() + offset + length - Byte.BYTES); + } + + return (Integer.rotateLeft(x * seed, rotate) ^ y) * seed; } /** Vectorized Key Comparison **/ @@ -209,7 +226,7 @@ public class CalculateAverage_jincongho { } else { index = (index + 1) & KEY_MASK; - keyOffset += KEY_SIZE; + keyOffset = KEYS.address() + (index * KEY_SIZE); } } @@ -254,7 +271,7 @@ public class CalculateAverage_jincongho { * Measurement Aggregation (for all partitions) * Simple Concurrent Hash Table so all partitions can merge concurrently */ - protected static class ResultAggr extends ConcurrentHashMap { + protected static class ResultAggr extends HashMap { public static class ByteKey implements Comparable { private final MemorySegment data; @@ -270,10 +287,8 @@ public class CalculateAverage_jincongho { @Override public boolean equals(Object other) { - if (length != ((ByteKey) other).length) - return false; - - return !VectorUtils.notEquals(data, offset, ((ByteKey) other).data, ((ByteKey) other).offset, length, VectorUtils.BYTE_SPECIES); + return (length == ((ByteKey) other).length) + && !VectorUtils.notEquals(data, offset, ((ByteKey) other).data, ((ByteKey) other).offset, length, VectorUtils.BYTE_SPECIES); } @Override @@ -311,8 +326,8 @@ public class CalculateAverage_jincongho { } - public ResultAggr(int initialCapacity, float loadFactor, int concurrencyLevel) { - super(initialCapacity, loadFactor, concurrencyLevel); + public ResultAggr(int initialCapacity, float loadFactor) { + super(initialCapacity, loadFactor); } public Map toSorted() { @@ -326,9 +341,9 @@ public class CalculateAverage_jincongho { private final MemorySegment data; private long offset; private final long limit; - private final ResultAggr result; + private final PartitionAggr result; - public Partition(MemorySegment data, long offset, long limit, ResultAggr result) { + public Partition(MemorySegment data, long offset, long limit, PartitionAggr result) { this.data = data; this.offset = offset; this.limit = limit; @@ -338,7 +353,7 @@ public class CalculateAverage_jincongho { @Override public void run() { // measurement parsing - PartitionAggr aggr = new PartitionAggr(); + final PartitionAggr aggr = this.result; // main loop (vectorized) final long loopLimit = limit - (VectorUtils.BYTE_SPECIES.length() * Math.ceilDiv(100, VectorUtils.BYTE_SPECIES.length()) + Long.BYTES); @@ -402,7 +417,7 @@ public class CalculateAverage_jincongho { } // measurement result collection - aggr.mergeTo(result); + // aggr.mergeTo(result); } } @@ -435,15 +450,25 @@ public class CalculateAverage_jincongho { // partition aggregation var threadList = new Thread[processors]; - ResultAggr result = new ResultAggr(1 << 14, 1, processors); + PartitionAggr[] partAggrs = new PartitionAggr[processors]; for (int i = 0; i < processors; i++) { - threadList[i] = new Thread(new Partition(data, partition[i], partition[i + 1], result)); + if (partition[i] == data.byteSize()) + break; + + partAggrs[i] = new PartitionAggr(); + threadList[i] = new Thread(new Partition(data, partition[i], partition[i + 1], partAggrs[i])); threadList[i].start(); } - for (var thread : threadList) { - thread.join(); - } + // result + ResultAggr result = new ResultAggr(1 << 14, 1); + for (int i = 0; i < processors; i++) { + if (partition[i] == data.byteSize()) + break; + + threadList[i].join(); + partAggrs[i].mergeTo(result); + } System.out.println(result.toSorted()); }