diff --git a/calculate_average_vemana.sh b/calculate_average_vemana.sh
index b3437f2..06a911a 100755
--- a/calculate_average_vemana.sh
+++ b/calculate_average_vemana.sh
@@ -18,6 +18,8 @@
# Basics
JAVA_OPTS=""
JAVA_OPTS="$JAVA_OPTS --enable-preview"
+JAVA_OPTS="$JAVA_OPTS --add-exports java.base/jdk.internal.ref=ALL-UNNAMED"
+JAVA_OPTS="$JAVA_OPTS --add-opens java.base/java.nio=ALL-UNNAMED"
#JAVA_OPTS="$JAVA_OPTS --add-modules jdk.incubator.vector"
#JAVA_OPTS="$JAVA_OPTS -XX:+UnlockDiagnosticVMOptions"
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_vemana.java b/src/main/java/dev/morling/onebrc/CalculateAverage_vemana.java
index d4f0a2f..8f690e3 100644
--- a/src/main/java/dev/morling/onebrc/CalculateAverage_vemana.java
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_vemana.java
@@ -17,21 +17,27 @@ package dev.morling.onebrc;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel.MapMode;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -46,6 +52,20 @@ import java.util.stream.Collectors;
* potential run-time variance (i.e. std. deviation) penalties based on informal testing. Variance
* is not ideal when trying to minimize the maximum worker latency.
*
+ *
[Understand that unmapping is serial and runs in exit()]. This is very much about exploiting
+ * parallelism. After adding tracing (plain old printfs), it was clear that the JVM was taking 400ms
+ * (out of 1500ms) just to exit. Turns out that the kernel tries to unmap all the mappings as part
+ * of the exit() call. Even strace wouldn't report this because the unmapping is running as part of
+ * the exit() call. perf stat barely hinted at it, but we had more insights by actually running a
+ * couple of experiments: reduce touched pages --> JVM shutdown latency went down; manually run
+ * unmap() call to free up the ByteBuffers --> parallel execution doesn't help at all. From this it
+ * was conclusive that unmap() executes serially and the 400ms was being spent purely unmapping.
+ * Now, the challenge is to both (1) unmap a MappedByteBuffer (no such methods exposed) from code
+ * rather than via exit() syscall and (2) do it in parallel without causing lock contention. For 1,
+ * use Reflection and (2) is an interesting math problem with a provably optimal solution.
+ * Parallelism in munmap() is achieved by using a fast lock that prevents two threads from
+ * simultaneously cleaning (i.e. munmap()) the ByteBuffer.
+ *
*
[Use ByteBuffers over MemorySegment] Each Shard is further divided in Chunks. This would've
* been unnecessary except that Shards are too big to be backed by ByteBuffers. Besides,
* MemorySegment appears slower than ByteBuffers. So, to use ByteBuffers, we have to use smaller
@@ -138,6 +158,14 @@ import java.util.stream.Collectors;
public class CalculateAverage_vemana {
public static void main(String[] args) throws Exception {
+ Tracing.recordAppStart();
+ Runtime.getRuntime()
+ .addShutdownHook(
+ new Thread(
+ () -> {
+ Tracing.recordEvent("In Shutdown hook");
+ }));
+
// First process in large chunks without coordination among threads
// Use chunkSizeBits for the large-chunk size
int chunkSizeBits = 20;
@@ -151,20 +179,32 @@ public class CalculateAverage_vemana {
// Size of the hashtable (attempt to fit in L3)
int hashtableSizeBits = 14;
- if (args.length > 0) {
- chunkSizeBits = Integer.parseInt(args[0]);
- }
+ int minReservedBytesAtFileTail = 9;
- if (args.length > 1) {
- commonChunkFraction = Double.parseDouble(args[1]);
- }
+ String inputFile = "measurements.txt";
- if (args.length > 2) {
- commonChunkSizeBits = Integer.parseInt(args[2]);
- }
-
- if (args.length > 3) {
- hashtableSizeBits = Integer.parseInt(args[3]);
+ for (String arg : args) {
+ String key = arg.substring(0, arg.indexOf('='));
+ String value = arg.substring(key.length() + 1);
+ switch (key) {
+ case "chunkSizeBits":
+ chunkSizeBits = Integer.parseInt(value);
+ break;
+ case "commonChunkFraction":
+ commonChunkFraction = Double.parseDouble(value);
+ break;
+ case "commonChunkSizeBits":
+ commonChunkSizeBits = Integer.parseInt(value);
+ break;
+ case "hashtableSizeBits":
+ hashtableSizeBits = Integer.parseInt(value);
+ break;
+ case "inputfile":
+ inputFile = value;
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown argument: " + arg);
+ }
}
// System.err.println(STR."""
@@ -177,12 +217,15 @@ public class CalculateAverage_vemana {
System.out.println(
new Runner(
- Path.of("measurements.txt"),
+ Path.of(inputFile),
chunkSizeBits,
commonChunkFraction,
commonChunkSizeBits,
- hashtableSizeBits)
+ hashtableSizeBits,
+ minReservedBytesAtFileTail)
.getSummaryStatistics());
+
+ Tracing.recordEvent("After printing result");
}
public record AggregateResult(Map tempStats) {
@@ -202,7 +245,9 @@ public class CalculateAverage_vemana {
private static final int BUF_SIZE = 1 << 30;
private final long fileSize;
+ private final long maxEndPos; // Treat as if the file ends here
private final RandomAccessFile raf;
+ private final List unclosedBuffers = new ArrayList<>();
// ***************** What this is doing and why *****************
// Reading from ByteBuffer appears faster from MemorySegment, but ByteBuffer can only be
@@ -220,7 +265,6 @@ public class CalculateAverage_vemana {
// tuning
// - This enables (relatively) allocation free chunking implementation. Our chunking impl uses
// fine grained chunking for the last say X% of work to avoid being hostage to stragglers
-
// The PUBLIC API
public MappedByteBuffer byteBuffer;
public int endInBuf; // where the chunk ends inside the buffer
@@ -230,8 +274,9 @@ public class CalculateAverage_vemana {
private long bufferStart; // byteBuffer's begin coordinate
// Uninitialized; for mutability
- public ByteRange(RandomAccessFile raf) {
+ public ByteRange(RandomAccessFile raf, long maxEndPos) {
this.raf = raf;
+ this.maxEndPos = maxEndPos;
try {
this.fileSize = raf.length();
}
@@ -241,6 +286,20 @@ public class CalculateAverage_vemana {
bufferEnd = bufferStart = -1;
}
+ public void close(int shardIdx) {
+ Tracing.recordWorkStart("cleaner", shardIdx);
+ if (byteBuffer != null) {
+ unclosedBuffers.add(byteBuffer);
+ }
+ for (MappedByteBuffer buf : unclosedBuffers) {
+ close(buf);
+ }
+ unclosedBuffers.clear();
+ bufferEnd = bufferStart = -1;
+ byteBuffer = null;
+ Tracing.recordWorkEnd("cleaner", shardIdx);
+ }
+
public void setRange(long rangeStart, long rangeEnd) {
if (rangeEnd + 1024 > bufferEnd || rangeStart < bufferStart) {
bufferStart = rangeStart;
@@ -251,12 +310,15 @@ public class CalculateAverage_vemana {
if (rangeStart > 0) {
rangeStart = 1 + nextNewLine(rangeStart);
}
+ else {
+ rangeStart = 0;
+ }
- if (rangeEnd < fileSize) {
+ if (rangeEnd < maxEndPos) {
rangeEnd = 1 + nextNewLine(rangeEnd);
}
else {
- rangeEnd = fileSize;
+ rangeEnd = maxEndPos;
}
startInBuf = (int) (rangeStart - bufferStart);
@@ -267,12 +329,24 @@ public class CalculateAverage_vemana {
public String toString() {
return STR."""
ByteRange {
+ bufferStart = \{bufferStart}
+ bufferEnd = \{bufferEnd}
startInBuf = \{startInBuf}
endInBuf = \{endInBuf}
}
""";
}
+ private void close(MappedByteBuffer buffer) {
+ Method cleanerMethod = Reflection.findMethodNamed(buffer, "cleaner");
+ cleanerMethod.setAccessible(true);
+ Object cleaner = Reflection.invoke(buffer, cleanerMethod);
+
+ Method cleanMethod = Reflection.findMethodNamed(cleaner, "clean");
+ cleanMethod.setAccessible(true);
+ Reflection.invoke(cleaner, cleanMethod);
+ }
+
private long nextNewLine(long pos) {
int nextPos = (int) (pos - bufferStart);
while (byteBuffer.get(nextPos) != '\n') {
@@ -282,6 +356,9 @@ public class CalculateAverage_vemana {
}
private void setByteBufferToRange(long start, long end) {
+ if (byteBuffer != null) {
+ unclosedBuffers.add(byteBuffer);
+ }
try {
byteBuffer = raf.getChannel().map(MapMode.READ_ONLY, start, end - start);
byteBuffer.order(ByteOrder.nativeOrder());
@@ -306,15 +383,41 @@ public class CalculateAverage_vemana {
public interface LazyShardQueue {
+ void close(int shardIdx);
+
+ Optional fileTailEndWork(int idx);
+
ByteRange take(int shardIdx);
}
+ static final class Reflection {
+
+ static Method findMethodNamed(Object object, String name, Class... paramTypes) {
+ try {
+ return object.getClass().getMethod(name, paramTypes);
+ }
+ catch (NoSuchMethodException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static Object invoke(Object receiver, Method method, Object... params) {
+ try {
+ return method.invoke(receiver, params);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
public static class Runner {
private final double commonChunkFraction;
private final int commonChunkSizeBits;
private final int hashtableSizeBits;
private final Path inputFile;
+ private final int minReservedBytesAtFileTail;
private final int shardSizeBits;
public Runner(
@@ -322,45 +425,74 @@ public class CalculateAverage_vemana {
int chunkSizeBits,
double commonChunkFraction,
int commonChunkSizeBits,
- int hashtableSizeBits) {
+ int hashtableSizeBits,
+ int minReservedBytesAtFileTail) {
this.inputFile = inputFile;
this.shardSizeBits = chunkSizeBits;
this.commonChunkFraction = commonChunkFraction;
this.commonChunkSizeBits = commonChunkSizeBits;
this.hashtableSizeBits = hashtableSizeBits;
+ this.minReservedBytesAtFileTail = minReservedBytesAtFileTail;
}
AggregateResult getSummaryStatistics() throws Exception {
- int processors = Runtime.getRuntime().availableProcessors();
+ int nThreads = Runtime.getRuntime().availableProcessors();
LazyShardQueue shardQueue = new SerialLazyShardQueue(
- 1L << shardSizeBits, inputFile, processors, commonChunkFraction, commonChunkSizeBits);
+ 1L << shardSizeBits,
+ inputFile,
+ nThreads,
+ commonChunkFraction,
+ commonChunkSizeBits,
+ minReservedBytesAtFileTail);
List> results = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(
- processors,
+ nThreads,
runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
return thread;
});
- long[] finishTimes = new long[processors];
-
- for (int i = 0; i < processors; i++) {
- final int I = i;
+ for (int i = 0; i < nThreads; i++) {
+ final int shardIdx = i;
final Callable callable = () -> {
- AggregateResult result = new ShardProcessor(shardQueue, hashtableSizeBits, I).processShard();
- finishTimes[I] = System.nanoTime();
+ Tracing.recordWorkStart("shard", shardIdx);
+ AggregateResult result = new ShardProcessor(shardQueue, hashtableSizeBits, shardIdx).processShard();
+ Tracing.recordWorkEnd("shard", shardIdx);
return result;
};
results.add(executorService.submit(callable));
}
- // printFinishTimes(finishTimes);
- return executorService.submit(() -> merge(results)).get();
+ Tracing.recordEvent("Basic push time");
+
+ AggregateResult result = executorService.submit(() -> merge(results)).get();
+
+ Tracing.recordEvent("Merge results received");
+
+ // Note that munmap() is serial and not parallel
+ executorService.submit(
+ () -> {
+ for (int i = 0; i < nThreads; i++) {
+ shardQueue.close(i);
+ }
+ });
+
+ Tracing.recordEvent("Waiting for executor shutdown");
+
+ executorService.shutdown();
+ executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+
+ Tracing.recordEvent("Executor terminated");
+ Tracing.analyzeWorkThreads("cleaner", nThreads);
+ Tracing.recordEvent("After cleaner finish printed");
+
+ return result;
}
private AggregateResult merge(List> results)
throws ExecutionException, InterruptedException {
+ Tracing.recordEvent("Merge start time");
Map output = null;
boolean[] isDone = new boolean[results.size()];
int remaining = results.size();
@@ -383,51 +515,55 @@ public class CalculateAverage_vemana {
}
}
}
+ Tracing.recordEvent("Merge end time");
+ Tracing.analyzeWorkThreads("shard", results.size());
return new AggregateResult(output);
}
-
- private void printFinishTimes(long[] finishTimes) {
- Arrays.sort(finishTimes);
- int n = finishTimes.length;
- System.err.println(
- STR."Finish Delta: \{(finishTimes[n - 1] - finishTimes[0]) / 1_000_000}ms");
- }
}
public static class SerialLazyShardQueue implements LazyShardQueue {
- private static long roundToNearestHigherMultipleOf(long divisor, long value) {
- return (value + divisor - 1) / divisor * divisor;
+ private static long roundToNearestLowerMultipleOf(long divisor, long value) {
+ return value / divisor * divisor;
}
private final ByteRange[] byteRanges;
private final long chunkSize;
private final long commonChunkSize;
private final AtomicLong commonPool;
+ private final long effectiveFileSize;
private final long fileSize;
- private final long[] nextStarts;
+ private final long[] perThreadData;
+ private final RandomAccessFile raf;
+ private final SeqLock seqLock;
public SerialLazyShardQueue(
long chunkSize,
Path filePath,
int shards,
double commonChunkFraction,
- int commonChunkSizeBits)
+ int commonChunkSizeBits,
+ int fileTailReservedBytes)
throws IOException {
Checks.checkArg(commonChunkFraction < 0.9 && commonChunkFraction >= 0);
- var raf = new RandomAccessFile(filePath.toFile(), "r");
+ Checks.checkArg(fileTailReservedBytes >= 0);
+ this.raf = new RandomAccessFile(filePath.toFile(), "r");
this.fileSize = raf.length();
+ fileTailReservedBytes = fileTailReservedBytes == 0
+ ? 0
+ : consumeToPreviousNewLineExclusive(raf, fileTailReservedBytes);
+ this.effectiveFileSize = fileSize - fileTailReservedBytes;
// Common pool
long commonPoolStart = Math.min(
- roundToNearestHigherMultipleOf(
- chunkSize, (long) (fileSize * (1 - commonChunkFraction))),
- fileSize);
+ roundToNearestLowerMultipleOf(
+ chunkSize, (long) (effectiveFileSize * (1 - commonChunkFraction))),
+ effectiveFileSize);
this.commonPool = new AtomicLong(commonPoolStart);
this.commonChunkSize = 1L << commonChunkSizeBits;
// Distribute chunks to shards
- this.nextStarts = new long[shards << 4]; // thread idx -> 16*idx to avoid cache line conflict
+ this.perThreadData = new long[shards << 4]; // thread idx -> 16*idx to avoid cache line conflict
for (long i = 0,
currentStart = 0,
remainingChunks = (commonPoolStart + chunkSize - 1) / chunkSize; i < shards; i++) {
@@ -435,8 +571,17 @@ public class CalculateAverage_vemana {
long currentChunks = (remainingChunks + remainingShards - 1) / remainingShards;
// Shard i handles: [currentStart, currentStart + currentChunks * chunkSize)
int pos = (int) i << 4;
- nextStarts[pos] = currentStart;
- nextStarts[pos + 1] = currentStart + currentChunks * chunkSize;
+ perThreadData[pos] = currentStart; // next chunk begin
+ perThreadData[pos + 1] = currentStart + currentChunks * chunkSize; // shard end
+ perThreadData[pos + 2] = currentChunks; // active chunks remaining
+ // threshold below which need to shrink
+ // 0.03 is a practical number but the optimal strategy is this:
+ // Shard number N (1-based) should unmap as soon as it completes (R/(R+1))^N fraction of
+ // its work, where R = relative speed of unmap compared to the computation.
+ // For our problem, R ~ 75 because unmap unmaps 30GB/sec (but, it is serial) while
+ // cores go through data at the rate of 400MB/sec.
+ perThreadData[pos + 3] = (long) (currentChunks * (0.03 * (shards - i)));
+ perThreadData[pos + 4] = 1;
currentStart += currentChunks * chunkSize;
remainingChunks -= currentChunks;
}
@@ -444,53 +589,128 @@ public class CalculateAverage_vemana {
this.byteRanges = new ByteRange[shards << 4];
for (int i = 0; i < shards; i++) {
- byteRanges[i << 4] = new ByteRange(raf);
+ byteRanges[i << 4] = new ByteRange(raf, effectiveFileSize);
}
+
+ this.seqLock = new SeqLock();
}
@Override
- public ByteRange take(int idx) {
- // Try for thread local range
- final int pos = idx << 4;
- long rangeStart = nextStarts[pos];
- final long chunkEnd = nextStarts[pos + 1];
+ public void close(int shardIdx) {
+ byteRanges[shardIdx << 4].close(shardIdx);
+ }
+ @Override
+ public Optional fileTailEndWork(int idx) {
+ if (idx == 0 && effectiveFileSize < fileSize) {
+ ByteRange chunk = new ByteRange(raf, fileSize);
+ chunk.setRange(
+ effectiveFileSize == 0 ? 0 : effectiveFileSize - 1 /* will consume newline at eFS-1 */,
+ fileSize);
+ return Optional.of(chunk);
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public ByteRange take(int shardIdx) {
+ // Try for thread local range
+ final int pos = shardIdx << 4;
+ long rangeStart = perThreadData[pos];
+ final long chunkEnd = perThreadData[pos + 1];
final long rangeEnd;
if (rangeStart < chunkEnd) {
rangeEnd = rangeStart + chunkSize;
- nextStarts[pos] = rangeEnd;
+ perThreadData[pos] = rangeEnd;
+ perThreadData[pos + 2]--;
}
else {
rangeStart = commonPool.getAndAdd(commonChunkSize);
// If that's exhausted too, nothing remains!
- if (rangeStart >= fileSize) {
+ if (rangeStart >= effectiveFileSize) {
return null;
}
rangeEnd = rangeStart + commonChunkSize;
}
+ if (perThreadData[pos + 2] <= perThreadData[pos + 3] && perThreadData[pos + 4] > 0) {
+ if (attemptClose(shardIdx)) {
+ perThreadData[pos + 4]--;
+ }
+ }
+
ByteRange chunk = byteRanges[pos];
chunk.setRange(rangeStart, rangeEnd);
return chunk;
}
+
+ private boolean attemptClose(int shardIdx) {
+ if (seqLock.acquire()) {
+ byteRanges[shardIdx << 4].close(shardIdx);
+ seqLock.release();
+ return true;
+ }
+ return false;
+ }
+
+ private int consumeToPreviousNewLineExclusive(RandomAccessFile raf, int minReservedBytes) {
+ try {
+ long pos = Math.max(raf.length() - minReservedBytes - 1, -1);
+ if (pos < 0) {
+ return (int) raf.length();
+ }
+
+ long start = Math.max(pos - 512, 0);
+ ByteBuffer buf = raf.getChannel().map(MapMode.READ_ONLY, start, pos + 1 - start);
+ while (pos >= 0 && buf.get((int) (pos - start)) != '\n') {
+ pos--;
+ }
+ pos++;
+ return (int) (raf.length() - pos);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /** A low-traffic non-blocking lock. */
+ static class SeqLock {
+
+ private final AtomicBoolean isOccupied = new AtomicBoolean(false);
+
+ boolean acquire() {
+ return !isOccupied.get() && isOccupied.compareAndSet(false, true);
+ }
+
+ void release() {
+ isOccupied.set(false);
+ }
}
public static class ShardProcessor {
+ private final int shardIdx;
private final LazyShardQueue shardQueue;
private final ShardProcessorState state;
- private final int threadIdx;
- public ShardProcessor(LazyShardQueue shardQueue, int hashtableSizeBits, int threadIdx) {
+ public ShardProcessor(LazyShardQueue shardQueue, int hashtableSizeBits, int shardIdx) {
this.shardQueue = shardQueue;
- this.threadIdx = threadIdx;
+ this.shardIdx = shardIdx;
this.state = new ShardProcessorState(hashtableSizeBits);
}
public AggregateResult processShard() {
+ return processShardReal();
+ }
+
+ public AggregateResult processShardReal() {
+ // First process the file tail work to give ourselves freedom to go past ranges in parsing
+ shardQueue.fileTailEndWork(shardIdx).ifPresent(this::processRangeSlow);
+
ByteRange range;
- while ((range = shardQueue.take(threadIdx)) != null) {
+ while ((range = shardQueue.take(shardIdx)) != null) {
processRange(range);
}
return result();
@@ -506,6 +726,13 @@ public class CalculateAverage_vemana {
}
}
+ private void processRangeSlow(ByteRange range) {
+ int nextPos = range.startInBuf;
+ while (nextPos < range.endInBuf) {
+ nextPos = state.processLineSlow(range.byteBuffer, nextPos);
+ }
+ }
+
private AggregateResult result() {
return state.result();
}
@@ -513,8 +740,9 @@ public class CalculateAverage_vemana {
public static class ShardProcessorState {
+ public static final long ONE_MASK = 0x0101010101010101L;
private static final ByteOrder NATIVE_BYTE_ORDER = ByteOrder.nativeOrder();
-
+ private static final long SEMICOLON_MASK = 0x3b3b3b3b3b3b3b3bL;
private final byte[][] cityNames;
private final int slotsMask;
private final Stat[] stats;
@@ -545,21 +773,21 @@ public class CalculateAverage_vemana {
byte b = (byte) (x >>> 8);
if (b == ';') {
nextPos += 2;
- hash = hash * 31 + ((0xFF & x));
+ hash = hash * 31 + (0xFF & x);
break;
}
byte c = (byte) (x >>> 16);
if (c == ';') {
nextPos += 3;
- hash = hash * 31 + ((0xFFFF & x));
+ hash = hash * 31 + (0xFFFF & x);
break;
}
byte d = (byte) (x >>> 24);
if (d == ';') {
nextPos += 4;
- hash = hash * 31 + ((0xFFFFFF & x));
+ hash = hash * 31 + (0xFFFFFF & x);
break;
}
@@ -596,9 +824,47 @@ public class CalculateAverage_vemana {
return nextPos;
}
+ /** A slow version which is used only for the tail part of the file. */
+ public int processLineSlow(MappedByteBuffer mmb, int nextPos) {
+ int originalPos = nextPos;
+ byte nextByte;
+ int hash = 0;
+
+ outer: while (true) {
+ int accumulated = 0;
+ for (int i = 0; i < 4; i++) {
+ nextByte = mmb.get(nextPos++);
+ if (nextByte == ';') {
+ if (i > 0) {
+ hash = hash * 31 + accumulated;
+ }
+ break outer;
+ }
+ else {
+ accumulated |= ((int) nextByte << (8 * i));
+ }
+ }
+ hash = hash * 31 + accumulated;
+ }
+ int cityLen = nextPos - 1 - originalPos;
+
+ int temperature = 0;
+ boolean negative = mmb.get(nextPos) == '-';
+ while ((nextByte = mmb.get(nextPos++)) != '\n') {
+ if (nextByte != '-' && nextByte != '.') {
+ temperature = temperature * 10 + (nextByte - '0');
+ }
+ }
+
+ linearProbe(
+ cityLen, hash & slotsMask, negative ? -temperature : temperature, mmb, originalPos);
+
+ return nextPos;
+ }
+
public AggregateResult result() {
int N = stats.length;
- TreeMap map = new TreeMap<>();
+ Map map = new LinkedHashMap<>(5_000);
for (int i = 0; i < N; i++) {
if (stats[i] != null) {
map.put(new String(cityNames[i]), stats[i]);
@@ -624,6 +890,11 @@ public class CalculateAverage_vemana {
return true;
}
+ private boolean hasSemicolonByte(long value) {
+ long a = value ^ SEMICOLON_MASK;
+ return (((a - ONE_MASK) & ~a) & (0x8080808080808080L)) != 0;
+ }
+
private void linearProbe(int len, int hash, int temp, MappedByteBuffer mmb, int offsetInMmb) {
for (int i = hash;; i = (i + 1) & slotsMask) {
var curBytes = cityNames[i];
@@ -633,11 +904,6 @@ public class CalculateAverage_vemana {
return;
}
else {
- // Overall, this tradeoff seems better than Arrays.equals(..)
- // City name param is encoded as (mmb, offsetnInMmb, len)
- // This avoids copying it into a (previously allocated) byte[]
- // The downside is that we have to manually implement 'equals' and it can lose out
- // to vectorized 'equals'; but the trade off seems to work in this particular case
if (len == curBytes.length && equals(curBytes, mmb, offsetInMmb, len)) {
stats[i].mergeReading(temp);
return;
@@ -695,4 +961,94 @@ public class CalculateAverage_vemana {
return "%.1f/%.1f/%.1f".formatted(min / 10.0, sum / 10.0 / count, max / 10.0);
}
}
+
+ static class Tracing {
+
+ private static final long[] cleanerTimes = new long[1 << 6 << 1];
+ private static final long[] threadTimes = new long[1 << 6 << 1];
+ private static long startTime;
+
+ static void analyzeWorkThreads(String id, int nThreads) {
+ printTimingsAnalysis(id + " Stats", nThreads, timingsArray(id));
+ }
+
+ static void recordAppStart() {
+ startTime = System.nanoTime();
+ }
+
+ static void recordEvent(String event) {
+ printEvent(event, System.nanoTime());
+ }
+
+ static void recordWorkEnd(String id, int threadId) {
+ timingsArray(id)[2 * threadId + 1] = System.nanoTime();
+ }
+
+ static void recordWorkStart(String id, int threadId) {
+ timingsArray(id)[2 * threadId] = System.nanoTime();
+ }
+
+ /////////////////////////////////////////////////////////////////////////////////////////////////
+
+ private static void errPrint(String message) {
+ System.err.println(message);
+ }
+
+ private static void printEvent(String message, long nanoTime) {
+ errPrint(STR."\{message} = \{(nanoTime - startTime) / 1_000_000}ms");
+ }
+
+ private static void printTimingsAnalysis(String header, int nThreads, long[] timestamps) {
+ long minDuration = Long.MAX_VALUE, maxDuration = Long.MIN_VALUE;
+ long minBegin = Long.MAX_VALUE, maxCompletion = Long.MIN_VALUE;
+ long maxBegin = Long.MIN_VALUE, minCompletion = Long.MAX_VALUE;
+
+ long[] durationsMs = new long[nThreads];
+ long[] completionsMs = new long[nThreads];
+ long[] beginMs = new long[nThreads];
+ for (int i = 0; i < nThreads; i++) {
+ long durationNs = timestamps[2 * i + 1] - timestamps[2 * i];
+ durationsMs[i] = durationNs / 1_000_000;
+ completionsMs[i] = (timestamps[2 * i + 1] - startTime) / 1_000_000;
+ beginMs[i] = (timestamps[2 * i] - startTime) / 1_000_000;
+
+ minDuration = Math.min(minDuration, durationNs);
+ maxDuration = Math.max(maxDuration, durationNs);
+
+ minBegin = Math.min(minBegin, timestamps[2 * i]);
+ maxBegin = Math.max(maxBegin, timestamps[2 * i]);
+
+ maxCompletion = Math.max(maxCompletion, timestamps[2 * i + 1]);
+ minCompletion = Math.min(minCompletion, timestamps[2 * i + 1]);
+ }
+ errPrint(
+ STR."""
+ -------------------------------------------------------------------------------------------
+ \{header}
+ -------------------------------------------------------------------------------------------
+ Max duration = \{maxDuration / 1_000_000} ms
+ Min duration = \{minDuration / 1_000_000} ms
+ Timespan[max(end)-min(start)] = \{(maxCompletion - minBegin) / 1_000_000} ms
+ Completion Timespan[max(end)-min(end)] = \{(maxCompletion - minCompletion) / 1_000_000} ms
+ Begin Timespan[max(begin)-min(begin)] = \{(maxBegin - minBegin) / 1_000_000} ms
+ Durations = \{toString(durationsMs)} in ms
+ Begin Timestamps = \{toString(beginMs)} in ms
+ Completion Timestamps = \{toString(completionsMs)} in ms
+ """);
+ }
+
+ private static long[] timingsArray(String id) {
+ return switch (id) {
+ case "cleaner" -> cleanerTimes;
+ case "shard" -> threadTimes;
+ default -> throw new RuntimeException("");
+ };
+ }
+
+ private static String toString(long[] array) {
+ return Arrays.stream(array)
+ .mapToObj(x -> String.format("%6d", x))
+ .collect(Collectors.joining(", ", "[ ", " ]"));
+ }
+ }
}