10% improvement from parallelizing munmap(); jumps to around 12th from 16th based on local testing; no Unsafe; no bitwise tricks yet (#465)
* Squashing a bunch of commits together. Commit#2; Uplift of 7% using native byteorder from ByteBuffer. Commit#1: Minor changes to formatting. * Commit #4: Parallelize munmap() and reduce completion time further by 10%. As the jvm exits with exit(0) syscall, the kernel reclaims the memory mappings via munmap() call. Prior to this change. all the unmap() calls were happening right at the end as the JVM exited. This led to serial execution of about 350ms out of 2500 ms right at the end after each shard completed its work. We can parallelize it by exposing the Cleaner from MappedByteBuffer and then ensure that it is truly parallel execution of munmap() by using a non-blocking lock (SeqLock). The optimal strategy for when each thread must call unmap() is an interesting math problem with an exact solution and this code roughly reflects it. Commit #3: Tried out reading long at a time from bytebuffer and checking for presence of ';'.. it was slower compared to just reading int(). Removed the code for reading longs; just retaining the hasSemicolonByte(..) check code Commit #2: Introduce processLineSlow() and processRangeSlow() for the tial part. Commit #1: Create a separate tail piece of work for the last few lines to be processed separately from the main loop. This allows the main loop to read past its allocated range (by a 'long' if we reserve atleast 8 bytes for the tail piece of work.)
This commit is contained in:
parent
aee71b961d
commit
199d6415bb
@ -18,6 +18,8 @@
|
||||
# Basics
|
||||
JAVA_OPTS=""
|
||||
JAVA_OPTS="$JAVA_OPTS --enable-preview"
|
||||
JAVA_OPTS="$JAVA_OPTS --add-exports java.base/jdk.internal.ref=ALL-UNNAMED"
|
||||
JAVA_OPTS="$JAVA_OPTS --add-opens java.base/java.nio=ALL-UNNAMED"
|
||||
#JAVA_OPTS="$JAVA_OPTS --add-modules jdk.incubator.vector"
|
||||
#JAVA_OPTS="$JAVA_OPTS -XX:+UnlockDiagnosticVMOptions"
|
||||
|
||||
|
@ -17,21 +17,27 @@ package dev.morling.onebrc;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.lang.reflect.Method;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.nio.MappedByteBuffer;
|
||||
import java.nio.channels.FileChannel.MapMode;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -46,6 +52,20 @@ import java.util.stream.Collectors;
|
||||
* potential run-time variance (i.e. std. deviation) penalties based on informal testing. Variance
|
||||
* is not ideal when trying to minimize the maximum worker latency.
|
||||
*
|
||||
* <p>[Understand that unmapping is serial and runs in exit()]. This is very much about exploiting
|
||||
* parallelism. After adding tracing (plain old printfs), it was clear that the JVM was taking 400ms
|
||||
* (out of 1500ms) just to exit. Turns out that the kernel tries to unmap all the mappings as part
|
||||
* of the exit() call. Even strace wouldn't report this because the unmapping is running as part of
|
||||
* the exit() call. perf stat barely hinted at it, but we had more insights by actually running a
|
||||
* couple of experiments: reduce touched pages --> JVM shutdown latency went down; manually run
|
||||
* unmap() call to free up the ByteBuffers --> parallel execution doesn't help at all. From this it
|
||||
* was conclusive that unmap() executes serially and the 400ms was being spent purely unmapping.
|
||||
* Now, the challenge is to both (1) unmap a MappedByteBuffer (no such methods exposed) from code
|
||||
* rather than via exit() syscall and (2) do it in parallel without causing lock contention. For 1,
|
||||
* use Reflection and (2) is an interesting math problem with a provably optimal solution.
|
||||
* Parallelism in munmap() is achieved by using a fast lock that prevents two threads from
|
||||
* simultaneously cleaning (i.e. munmap()) the ByteBuffer.
|
||||
*
|
||||
* <p>[Use ByteBuffers over MemorySegment] Each Shard is further divided in Chunks. This would've
|
||||
* been unnecessary except that Shards are too big to be backed by ByteBuffers. Besides,
|
||||
* MemorySegment appears slower than ByteBuffers. So, to use ByteBuffers, we have to use smaller
|
||||
@ -138,6 +158,14 @@ import java.util.stream.Collectors;
|
||||
public class CalculateAverage_vemana {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Tracing.recordAppStart();
|
||||
Runtime.getRuntime()
|
||||
.addShutdownHook(
|
||||
new Thread(
|
||||
() -> {
|
||||
Tracing.recordEvent("In Shutdown hook");
|
||||
}));
|
||||
|
||||
// First process in large chunks without coordination among threads
|
||||
// Use chunkSizeBits for the large-chunk size
|
||||
int chunkSizeBits = 20;
|
||||
@ -151,20 +179,32 @@ public class CalculateAverage_vemana {
|
||||
// Size of the hashtable (attempt to fit in L3)
|
||||
int hashtableSizeBits = 14;
|
||||
|
||||
if (args.length > 0) {
|
||||
chunkSizeBits = Integer.parseInt(args[0]);
|
||||
}
|
||||
int minReservedBytesAtFileTail = 9;
|
||||
|
||||
if (args.length > 1) {
|
||||
commonChunkFraction = Double.parseDouble(args[1]);
|
||||
}
|
||||
String inputFile = "measurements.txt";
|
||||
|
||||
if (args.length > 2) {
|
||||
commonChunkSizeBits = Integer.parseInt(args[2]);
|
||||
}
|
||||
|
||||
if (args.length > 3) {
|
||||
hashtableSizeBits = Integer.parseInt(args[3]);
|
||||
for (String arg : args) {
|
||||
String key = arg.substring(0, arg.indexOf('='));
|
||||
String value = arg.substring(key.length() + 1);
|
||||
switch (key) {
|
||||
case "chunkSizeBits":
|
||||
chunkSizeBits = Integer.parseInt(value);
|
||||
break;
|
||||
case "commonChunkFraction":
|
||||
commonChunkFraction = Double.parseDouble(value);
|
||||
break;
|
||||
case "commonChunkSizeBits":
|
||||
commonChunkSizeBits = Integer.parseInt(value);
|
||||
break;
|
||||
case "hashtableSizeBits":
|
||||
hashtableSizeBits = Integer.parseInt(value);
|
||||
break;
|
||||
case "inputfile":
|
||||
inputFile = value;
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown argument: " + arg);
|
||||
}
|
||||
}
|
||||
|
||||
// System.err.println(STR."""
|
||||
@ -177,12 +217,15 @@ public class CalculateAverage_vemana {
|
||||
|
||||
System.out.println(
|
||||
new Runner(
|
||||
Path.of("measurements.txt"),
|
||||
Path.of(inputFile),
|
||||
chunkSizeBits,
|
||||
commonChunkFraction,
|
||||
commonChunkSizeBits,
|
||||
hashtableSizeBits)
|
||||
hashtableSizeBits,
|
||||
minReservedBytesAtFileTail)
|
||||
.getSummaryStatistics());
|
||||
|
||||
Tracing.recordEvent("After printing result");
|
||||
}
|
||||
|
||||
public record AggregateResult(Map<String, Stat> tempStats) {
|
||||
@ -202,7 +245,9 @@ public class CalculateAverage_vemana {
|
||||
private static final int BUF_SIZE = 1 << 30;
|
||||
|
||||
private final long fileSize;
|
||||
private final long maxEndPos; // Treat as if the file ends here
|
||||
private final RandomAccessFile raf;
|
||||
private final List<MappedByteBuffer> unclosedBuffers = new ArrayList<>();
|
||||
|
||||
// ***************** What this is doing and why *****************
|
||||
// Reading from ByteBuffer appears faster from MemorySegment, but ByteBuffer can only be
|
||||
@ -220,7 +265,6 @@ public class CalculateAverage_vemana {
|
||||
// tuning
|
||||
// - This enables (relatively) allocation free chunking implementation. Our chunking impl uses
|
||||
// fine grained chunking for the last say X% of work to avoid being hostage to stragglers
|
||||
|
||||
// The PUBLIC API
|
||||
public MappedByteBuffer byteBuffer;
|
||||
public int endInBuf; // where the chunk ends inside the buffer
|
||||
@ -230,8 +274,9 @@ public class CalculateAverage_vemana {
|
||||
private long bufferStart; // byteBuffer's begin coordinate
|
||||
|
||||
// Uninitialized; for mutability
|
||||
public ByteRange(RandomAccessFile raf) {
|
||||
public ByteRange(RandomAccessFile raf, long maxEndPos) {
|
||||
this.raf = raf;
|
||||
this.maxEndPos = maxEndPos;
|
||||
try {
|
||||
this.fileSize = raf.length();
|
||||
}
|
||||
@ -241,6 +286,20 @@ public class CalculateAverage_vemana {
|
||||
bufferEnd = bufferStart = -1;
|
||||
}
|
||||
|
||||
public void close(int shardIdx) {
|
||||
Tracing.recordWorkStart("cleaner", shardIdx);
|
||||
if (byteBuffer != null) {
|
||||
unclosedBuffers.add(byteBuffer);
|
||||
}
|
||||
for (MappedByteBuffer buf : unclosedBuffers) {
|
||||
close(buf);
|
||||
}
|
||||
unclosedBuffers.clear();
|
||||
bufferEnd = bufferStart = -1;
|
||||
byteBuffer = null;
|
||||
Tracing.recordWorkEnd("cleaner", shardIdx);
|
||||
}
|
||||
|
||||
public void setRange(long rangeStart, long rangeEnd) {
|
||||
if (rangeEnd + 1024 > bufferEnd || rangeStart < bufferStart) {
|
||||
bufferStart = rangeStart;
|
||||
@ -251,12 +310,15 @@ public class CalculateAverage_vemana {
|
||||
if (rangeStart > 0) {
|
||||
rangeStart = 1 + nextNewLine(rangeStart);
|
||||
}
|
||||
else {
|
||||
rangeStart = 0;
|
||||
}
|
||||
|
||||
if (rangeEnd < fileSize) {
|
||||
if (rangeEnd < maxEndPos) {
|
||||
rangeEnd = 1 + nextNewLine(rangeEnd);
|
||||
}
|
||||
else {
|
||||
rangeEnd = fileSize;
|
||||
rangeEnd = maxEndPos;
|
||||
}
|
||||
|
||||
startInBuf = (int) (rangeStart - bufferStart);
|
||||
@ -267,12 +329,24 @@ public class CalculateAverage_vemana {
|
||||
public String toString() {
|
||||
return STR."""
|
||||
ByteRange {
|
||||
bufferStart = \{bufferStart}
|
||||
bufferEnd = \{bufferEnd}
|
||||
startInBuf = \{startInBuf}
|
||||
endInBuf = \{endInBuf}
|
||||
}
|
||||
""";
|
||||
}
|
||||
|
||||
private void close(MappedByteBuffer buffer) {
|
||||
Method cleanerMethod = Reflection.findMethodNamed(buffer, "cleaner");
|
||||
cleanerMethod.setAccessible(true);
|
||||
Object cleaner = Reflection.invoke(buffer, cleanerMethod);
|
||||
|
||||
Method cleanMethod = Reflection.findMethodNamed(cleaner, "clean");
|
||||
cleanMethod.setAccessible(true);
|
||||
Reflection.invoke(cleaner, cleanMethod);
|
||||
}
|
||||
|
||||
private long nextNewLine(long pos) {
|
||||
int nextPos = (int) (pos - bufferStart);
|
||||
while (byteBuffer.get(nextPos) != '\n') {
|
||||
@ -282,6 +356,9 @@ public class CalculateAverage_vemana {
|
||||
}
|
||||
|
||||
private void setByteBufferToRange(long start, long end) {
|
||||
if (byteBuffer != null) {
|
||||
unclosedBuffers.add(byteBuffer);
|
||||
}
|
||||
try {
|
||||
byteBuffer = raf.getChannel().map(MapMode.READ_ONLY, start, end - start);
|
||||
byteBuffer.order(ByteOrder.nativeOrder());
|
||||
@ -306,15 +383,41 @@ public class CalculateAverage_vemana {
|
||||
|
||||
public interface LazyShardQueue {
|
||||
|
||||
void close(int shardIdx);
|
||||
|
||||
Optional<ByteRange> fileTailEndWork(int idx);
|
||||
|
||||
ByteRange take(int shardIdx);
|
||||
}
|
||||
|
||||
static final class Reflection {
|
||||
|
||||
static Method findMethodNamed(Object object, String name, Class... paramTypes) {
|
||||
try {
|
||||
return object.getClass().getMethod(name, paramTypes);
|
||||
}
|
||||
catch (NoSuchMethodException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
static Object invoke(Object receiver, Method method, Object... params) {
|
||||
try {
|
||||
return method.invoke(receiver, params);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class Runner {
|
||||
|
||||
private final double commonChunkFraction;
|
||||
private final int commonChunkSizeBits;
|
||||
private final int hashtableSizeBits;
|
||||
private final Path inputFile;
|
||||
private final int minReservedBytesAtFileTail;
|
||||
private final int shardSizeBits;
|
||||
|
||||
public Runner(
|
||||
@ -322,45 +425,74 @@ public class CalculateAverage_vemana {
|
||||
int chunkSizeBits,
|
||||
double commonChunkFraction,
|
||||
int commonChunkSizeBits,
|
||||
int hashtableSizeBits) {
|
||||
int hashtableSizeBits,
|
||||
int minReservedBytesAtFileTail) {
|
||||
this.inputFile = inputFile;
|
||||
this.shardSizeBits = chunkSizeBits;
|
||||
this.commonChunkFraction = commonChunkFraction;
|
||||
this.commonChunkSizeBits = commonChunkSizeBits;
|
||||
this.hashtableSizeBits = hashtableSizeBits;
|
||||
this.minReservedBytesAtFileTail = minReservedBytesAtFileTail;
|
||||
}
|
||||
|
||||
AggregateResult getSummaryStatistics() throws Exception {
|
||||
int processors = Runtime.getRuntime().availableProcessors();
|
||||
int nThreads = Runtime.getRuntime().availableProcessors();
|
||||
LazyShardQueue shardQueue = new SerialLazyShardQueue(
|
||||
1L << shardSizeBits, inputFile, processors, commonChunkFraction, commonChunkSizeBits);
|
||||
1L << shardSizeBits,
|
||||
inputFile,
|
||||
nThreads,
|
||||
commonChunkFraction,
|
||||
commonChunkSizeBits,
|
||||
minReservedBytesAtFileTail);
|
||||
|
||||
List<Future<AggregateResult>> results = new ArrayList<>();
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(
|
||||
processors,
|
||||
nThreads,
|
||||
runnable -> {
|
||||
Thread thread = new Thread(runnable);
|
||||
thread.setDaemon(true);
|
||||
return thread;
|
||||
});
|
||||
|
||||
long[] finishTimes = new long[processors];
|
||||
|
||||
for (int i = 0; i < processors; i++) {
|
||||
final int I = i;
|
||||
for (int i = 0; i < nThreads; i++) {
|
||||
final int shardIdx = i;
|
||||
final Callable<AggregateResult> callable = () -> {
|
||||
AggregateResult result = new ShardProcessor(shardQueue, hashtableSizeBits, I).processShard();
|
||||
finishTimes[I] = System.nanoTime();
|
||||
Tracing.recordWorkStart("shard", shardIdx);
|
||||
AggregateResult result = new ShardProcessor(shardQueue, hashtableSizeBits, shardIdx).processShard();
|
||||
Tracing.recordWorkEnd("shard", shardIdx);
|
||||
return result;
|
||||
};
|
||||
results.add(executorService.submit(callable));
|
||||
}
|
||||
// printFinishTimes(finishTimes);
|
||||
return executorService.submit(() -> merge(results)).get();
|
||||
Tracing.recordEvent("Basic push time");
|
||||
|
||||
AggregateResult result = executorService.submit(() -> merge(results)).get();
|
||||
|
||||
Tracing.recordEvent("Merge results received");
|
||||
|
||||
// Note that munmap() is serial and not parallel
|
||||
executorService.submit(
|
||||
() -> {
|
||||
for (int i = 0; i < nThreads; i++) {
|
||||
shardQueue.close(i);
|
||||
}
|
||||
});
|
||||
|
||||
Tracing.recordEvent("Waiting for executor shutdown");
|
||||
|
||||
executorService.shutdown();
|
||||
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
|
||||
|
||||
Tracing.recordEvent("Executor terminated");
|
||||
Tracing.analyzeWorkThreads("cleaner", nThreads);
|
||||
Tracing.recordEvent("After cleaner finish printed");
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private AggregateResult merge(List<Future<AggregateResult>> results)
|
||||
throws ExecutionException, InterruptedException {
|
||||
Tracing.recordEvent("Merge start time");
|
||||
Map<String, Stat> output = null;
|
||||
boolean[] isDone = new boolean[results.size()];
|
||||
int remaining = results.size();
|
||||
@ -383,51 +515,55 @@ public class CalculateAverage_vemana {
|
||||
}
|
||||
}
|
||||
}
|
||||
Tracing.recordEvent("Merge end time");
|
||||
Tracing.analyzeWorkThreads("shard", results.size());
|
||||
return new AggregateResult(output);
|
||||
}
|
||||
|
||||
private void printFinishTimes(long[] finishTimes) {
|
||||
Arrays.sort(finishTimes);
|
||||
int n = finishTimes.length;
|
||||
System.err.println(
|
||||
STR."Finish Delta: \{(finishTimes[n - 1] - finishTimes[0]) / 1_000_000}ms");
|
||||
}
|
||||
}
|
||||
|
||||
public static class SerialLazyShardQueue implements LazyShardQueue {
|
||||
|
||||
private static long roundToNearestHigherMultipleOf(long divisor, long value) {
|
||||
return (value + divisor - 1) / divisor * divisor;
|
||||
private static long roundToNearestLowerMultipleOf(long divisor, long value) {
|
||||
return value / divisor * divisor;
|
||||
}
|
||||
|
||||
private final ByteRange[] byteRanges;
|
||||
private final long chunkSize;
|
||||
private final long commonChunkSize;
|
||||
private final AtomicLong commonPool;
|
||||
private final long effectiveFileSize;
|
||||
private final long fileSize;
|
||||
private final long[] nextStarts;
|
||||
private final long[] perThreadData;
|
||||
private final RandomAccessFile raf;
|
||||
private final SeqLock seqLock;
|
||||
|
||||
public SerialLazyShardQueue(
|
||||
long chunkSize,
|
||||
Path filePath,
|
||||
int shards,
|
||||
double commonChunkFraction,
|
||||
int commonChunkSizeBits)
|
||||
int commonChunkSizeBits,
|
||||
int fileTailReservedBytes)
|
||||
throws IOException {
|
||||
Checks.checkArg(commonChunkFraction < 0.9 && commonChunkFraction >= 0);
|
||||
var raf = new RandomAccessFile(filePath.toFile(), "r");
|
||||
Checks.checkArg(fileTailReservedBytes >= 0);
|
||||
this.raf = new RandomAccessFile(filePath.toFile(), "r");
|
||||
this.fileSize = raf.length();
|
||||
fileTailReservedBytes = fileTailReservedBytes == 0
|
||||
? 0
|
||||
: consumeToPreviousNewLineExclusive(raf, fileTailReservedBytes);
|
||||
this.effectiveFileSize = fileSize - fileTailReservedBytes;
|
||||
|
||||
// Common pool
|
||||
long commonPoolStart = Math.min(
|
||||
roundToNearestHigherMultipleOf(
|
||||
chunkSize, (long) (fileSize * (1 - commonChunkFraction))),
|
||||
fileSize);
|
||||
roundToNearestLowerMultipleOf(
|
||||
chunkSize, (long) (effectiveFileSize * (1 - commonChunkFraction))),
|
||||
effectiveFileSize);
|
||||
this.commonPool = new AtomicLong(commonPoolStart);
|
||||
this.commonChunkSize = 1L << commonChunkSizeBits;
|
||||
|
||||
// Distribute chunks to shards
|
||||
this.nextStarts = new long[shards << 4]; // thread idx -> 16*idx to avoid cache line conflict
|
||||
this.perThreadData = new long[shards << 4]; // thread idx -> 16*idx to avoid cache line conflict
|
||||
for (long i = 0,
|
||||
currentStart = 0,
|
||||
remainingChunks = (commonPoolStart + chunkSize - 1) / chunkSize; i < shards; i++) {
|
||||
@ -435,8 +571,17 @@ public class CalculateAverage_vemana {
|
||||
long currentChunks = (remainingChunks + remainingShards - 1) / remainingShards;
|
||||
// Shard i handles: [currentStart, currentStart + currentChunks * chunkSize)
|
||||
int pos = (int) i << 4;
|
||||
nextStarts[pos] = currentStart;
|
||||
nextStarts[pos + 1] = currentStart + currentChunks * chunkSize;
|
||||
perThreadData[pos] = currentStart; // next chunk begin
|
||||
perThreadData[pos + 1] = currentStart + currentChunks * chunkSize; // shard end
|
||||
perThreadData[pos + 2] = currentChunks; // active chunks remaining
|
||||
// threshold below which need to shrink
|
||||
// 0.03 is a practical number but the optimal strategy is this:
|
||||
// Shard number N (1-based) should unmap as soon as it completes (R/(R+1))^N fraction of
|
||||
// its work, where R = relative speed of unmap compared to the computation.
|
||||
// For our problem, R ~ 75 because unmap unmaps 30GB/sec (but, it is serial) while
|
||||
// cores go through data at the rate of 400MB/sec.
|
||||
perThreadData[pos + 3] = (long) (currentChunks * (0.03 * (shards - i)));
|
||||
perThreadData[pos + 4] = 1;
|
||||
currentStart += currentChunks * chunkSize;
|
||||
remainingChunks -= currentChunks;
|
||||
}
|
||||
@ -444,53 +589,128 @@ public class CalculateAverage_vemana {
|
||||
|
||||
this.byteRanges = new ByteRange[shards << 4];
|
||||
for (int i = 0; i < shards; i++) {
|
||||
byteRanges[i << 4] = new ByteRange(raf);
|
||||
byteRanges[i << 4] = new ByteRange(raf, effectiveFileSize);
|
||||
}
|
||||
|
||||
this.seqLock = new SeqLock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteRange take(int idx) {
|
||||
// Try for thread local range
|
||||
final int pos = idx << 4;
|
||||
long rangeStart = nextStarts[pos];
|
||||
final long chunkEnd = nextStarts[pos + 1];
|
||||
public void close(int shardIdx) {
|
||||
byteRanges[shardIdx << 4].close(shardIdx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ByteRange> fileTailEndWork(int idx) {
|
||||
if (idx == 0 && effectiveFileSize < fileSize) {
|
||||
ByteRange chunk = new ByteRange(raf, fileSize);
|
||||
chunk.setRange(
|
||||
effectiveFileSize == 0 ? 0 : effectiveFileSize - 1 /* will consume newline at eFS-1 */,
|
||||
fileSize);
|
||||
return Optional.of(chunk);
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteRange take(int shardIdx) {
|
||||
// Try for thread local range
|
||||
final int pos = shardIdx << 4;
|
||||
long rangeStart = perThreadData[pos];
|
||||
final long chunkEnd = perThreadData[pos + 1];
|
||||
final long rangeEnd;
|
||||
|
||||
if (rangeStart < chunkEnd) {
|
||||
rangeEnd = rangeStart + chunkSize;
|
||||
nextStarts[pos] = rangeEnd;
|
||||
perThreadData[pos] = rangeEnd;
|
||||
perThreadData[pos + 2]--;
|
||||
}
|
||||
else {
|
||||
rangeStart = commonPool.getAndAdd(commonChunkSize);
|
||||
// If that's exhausted too, nothing remains!
|
||||
if (rangeStart >= fileSize) {
|
||||
if (rangeStart >= effectiveFileSize) {
|
||||
return null;
|
||||
}
|
||||
rangeEnd = rangeStart + commonChunkSize;
|
||||
}
|
||||
|
||||
if (perThreadData[pos + 2] <= perThreadData[pos + 3] && perThreadData[pos + 4] > 0) {
|
||||
if (attemptClose(shardIdx)) {
|
||||
perThreadData[pos + 4]--;
|
||||
}
|
||||
}
|
||||
|
||||
ByteRange chunk = byteRanges[pos];
|
||||
chunk.setRange(rangeStart, rangeEnd);
|
||||
return chunk;
|
||||
}
|
||||
|
||||
private boolean attemptClose(int shardIdx) {
|
||||
if (seqLock.acquire()) {
|
||||
byteRanges[shardIdx << 4].close(shardIdx);
|
||||
seqLock.release();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private int consumeToPreviousNewLineExclusive(RandomAccessFile raf, int minReservedBytes) {
|
||||
try {
|
||||
long pos = Math.max(raf.length() - minReservedBytes - 1, -1);
|
||||
if (pos < 0) {
|
||||
return (int) raf.length();
|
||||
}
|
||||
|
||||
long start = Math.max(pos - 512, 0);
|
||||
ByteBuffer buf = raf.getChannel().map(MapMode.READ_ONLY, start, pos + 1 - start);
|
||||
while (pos >= 0 && buf.get((int) (pos - start)) != '\n') {
|
||||
pos--;
|
||||
}
|
||||
pos++;
|
||||
return (int) (raf.length() - pos);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** A low-traffic non-blocking lock. */
|
||||
static class SeqLock {
|
||||
|
||||
private final AtomicBoolean isOccupied = new AtomicBoolean(false);
|
||||
|
||||
boolean acquire() {
|
||||
return !isOccupied.get() && isOccupied.compareAndSet(false, true);
|
||||
}
|
||||
|
||||
void release() {
|
||||
isOccupied.set(false);
|
||||
}
|
||||
}
|
||||
|
||||
public static class ShardProcessor {
|
||||
|
||||
private final int shardIdx;
|
||||
private final LazyShardQueue shardQueue;
|
||||
private final ShardProcessorState state;
|
||||
private final int threadIdx;
|
||||
|
||||
public ShardProcessor(LazyShardQueue shardQueue, int hashtableSizeBits, int threadIdx) {
|
||||
public ShardProcessor(LazyShardQueue shardQueue, int hashtableSizeBits, int shardIdx) {
|
||||
this.shardQueue = shardQueue;
|
||||
this.threadIdx = threadIdx;
|
||||
this.shardIdx = shardIdx;
|
||||
this.state = new ShardProcessorState(hashtableSizeBits);
|
||||
}
|
||||
|
||||
public AggregateResult processShard() {
|
||||
return processShardReal();
|
||||
}
|
||||
|
||||
public AggregateResult processShardReal() {
|
||||
// First process the file tail work to give ourselves freedom to go past ranges in parsing
|
||||
shardQueue.fileTailEndWork(shardIdx).ifPresent(this::processRangeSlow);
|
||||
|
||||
ByteRange range;
|
||||
while ((range = shardQueue.take(threadIdx)) != null) {
|
||||
while ((range = shardQueue.take(shardIdx)) != null) {
|
||||
processRange(range);
|
||||
}
|
||||
return result();
|
||||
@ -506,6 +726,13 @@ public class CalculateAverage_vemana {
|
||||
}
|
||||
}
|
||||
|
||||
private void processRangeSlow(ByteRange range) {
|
||||
int nextPos = range.startInBuf;
|
||||
while (nextPos < range.endInBuf) {
|
||||
nextPos = state.processLineSlow(range.byteBuffer, nextPos);
|
||||
}
|
||||
}
|
||||
|
||||
private AggregateResult result() {
|
||||
return state.result();
|
||||
}
|
||||
@ -513,8 +740,9 @@ public class CalculateAverage_vemana {
|
||||
|
||||
public static class ShardProcessorState {
|
||||
|
||||
public static final long ONE_MASK = 0x0101010101010101L;
|
||||
private static final ByteOrder NATIVE_BYTE_ORDER = ByteOrder.nativeOrder();
|
||||
|
||||
private static final long SEMICOLON_MASK = 0x3b3b3b3b3b3b3b3bL;
|
||||
private final byte[][] cityNames;
|
||||
private final int slotsMask;
|
||||
private final Stat[] stats;
|
||||
@ -545,21 +773,21 @@ public class CalculateAverage_vemana {
|
||||
byte b = (byte) (x >>> 8);
|
||||
if (b == ';') {
|
||||
nextPos += 2;
|
||||
hash = hash * 31 + ((0xFF & x));
|
||||
hash = hash * 31 + (0xFF & x);
|
||||
break;
|
||||
}
|
||||
|
||||
byte c = (byte) (x >>> 16);
|
||||
if (c == ';') {
|
||||
nextPos += 3;
|
||||
hash = hash * 31 + ((0xFFFF & x));
|
||||
hash = hash * 31 + (0xFFFF & x);
|
||||
break;
|
||||
}
|
||||
|
||||
byte d = (byte) (x >>> 24);
|
||||
if (d == ';') {
|
||||
nextPos += 4;
|
||||
hash = hash * 31 + ((0xFFFFFF & x));
|
||||
hash = hash * 31 + (0xFFFFFF & x);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -596,9 +824,47 @@ public class CalculateAverage_vemana {
|
||||
return nextPos;
|
||||
}
|
||||
|
||||
/** A slow version which is used only for the tail part of the file. */
|
||||
public int processLineSlow(MappedByteBuffer mmb, int nextPos) {
|
||||
int originalPos = nextPos;
|
||||
byte nextByte;
|
||||
int hash = 0;
|
||||
|
||||
outer: while (true) {
|
||||
int accumulated = 0;
|
||||
for (int i = 0; i < 4; i++) {
|
||||
nextByte = mmb.get(nextPos++);
|
||||
if (nextByte == ';') {
|
||||
if (i > 0) {
|
||||
hash = hash * 31 + accumulated;
|
||||
}
|
||||
break outer;
|
||||
}
|
||||
else {
|
||||
accumulated |= ((int) nextByte << (8 * i));
|
||||
}
|
||||
}
|
||||
hash = hash * 31 + accumulated;
|
||||
}
|
||||
int cityLen = nextPos - 1 - originalPos;
|
||||
|
||||
int temperature = 0;
|
||||
boolean negative = mmb.get(nextPos) == '-';
|
||||
while ((nextByte = mmb.get(nextPos++)) != '\n') {
|
||||
if (nextByte != '-' && nextByte != '.') {
|
||||
temperature = temperature * 10 + (nextByte - '0');
|
||||
}
|
||||
}
|
||||
|
||||
linearProbe(
|
||||
cityLen, hash & slotsMask, negative ? -temperature : temperature, mmb, originalPos);
|
||||
|
||||
return nextPos;
|
||||
}
|
||||
|
||||
public AggregateResult result() {
|
||||
int N = stats.length;
|
||||
TreeMap<String, Stat> map = new TreeMap<>();
|
||||
Map<String, Stat> map = new LinkedHashMap<>(5_000);
|
||||
for (int i = 0; i < N; i++) {
|
||||
if (stats[i] != null) {
|
||||
map.put(new String(cityNames[i]), stats[i]);
|
||||
@ -624,6 +890,11 @@ public class CalculateAverage_vemana {
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean hasSemicolonByte(long value) {
|
||||
long a = value ^ SEMICOLON_MASK;
|
||||
return (((a - ONE_MASK) & ~a) & (0x8080808080808080L)) != 0;
|
||||
}
|
||||
|
||||
private void linearProbe(int len, int hash, int temp, MappedByteBuffer mmb, int offsetInMmb) {
|
||||
for (int i = hash;; i = (i + 1) & slotsMask) {
|
||||
var curBytes = cityNames[i];
|
||||
@ -633,11 +904,6 @@ public class CalculateAverage_vemana {
|
||||
return;
|
||||
}
|
||||
else {
|
||||
// Overall, this tradeoff seems better than Arrays.equals(..)
|
||||
// City name param is encoded as (mmb, offsetnInMmb, len)
|
||||
// This avoids copying it into a (previously allocated) byte[]
|
||||
// The downside is that we have to manually implement 'equals' and it can lose out
|
||||
// to vectorized 'equals'; but the trade off seems to work in this particular case
|
||||
if (len == curBytes.length && equals(curBytes, mmb, offsetInMmb, len)) {
|
||||
stats[i].mergeReading(temp);
|
||||
return;
|
||||
@ -695,4 +961,94 @@ public class CalculateAverage_vemana {
|
||||
return "%.1f/%.1f/%.1f".formatted(min / 10.0, sum / 10.0 / count, max / 10.0);
|
||||
}
|
||||
}
|
||||
|
||||
static class Tracing {
|
||||
|
||||
private static final long[] cleanerTimes = new long[1 << 6 << 1];
|
||||
private static final long[] threadTimes = new long[1 << 6 << 1];
|
||||
private static long startTime;
|
||||
|
||||
static void analyzeWorkThreads(String id, int nThreads) {
|
||||
printTimingsAnalysis(id + " Stats", nThreads, timingsArray(id));
|
||||
}
|
||||
|
||||
static void recordAppStart() {
|
||||
startTime = System.nanoTime();
|
||||
}
|
||||
|
||||
static void recordEvent(String event) {
|
||||
printEvent(event, System.nanoTime());
|
||||
}
|
||||
|
||||
static void recordWorkEnd(String id, int threadId) {
|
||||
timingsArray(id)[2 * threadId + 1] = System.nanoTime();
|
||||
}
|
||||
|
||||
static void recordWorkStart(String id, int threadId) {
|
||||
timingsArray(id)[2 * threadId] = System.nanoTime();
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
private static void errPrint(String message) {
|
||||
System.err.println(message);
|
||||
}
|
||||
|
||||
private static void printEvent(String message, long nanoTime) {
|
||||
errPrint(STR."\{message} = \{(nanoTime - startTime) / 1_000_000}ms");
|
||||
}
|
||||
|
||||
private static void printTimingsAnalysis(String header, int nThreads, long[] timestamps) {
|
||||
long minDuration = Long.MAX_VALUE, maxDuration = Long.MIN_VALUE;
|
||||
long minBegin = Long.MAX_VALUE, maxCompletion = Long.MIN_VALUE;
|
||||
long maxBegin = Long.MIN_VALUE, minCompletion = Long.MAX_VALUE;
|
||||
|
||||
long[] durationsMs = new long[nThreads];
|
||||
long[] completionsMs = new long[nThreads];
|
||||
long[] beginMs = new long[nThreads];
|
||||
for (int i = 0; i < nThreads; i++) {
|
||||
long durationNs = timestamps[2 * i + 1] - timestamps[2 * i];
|
||||
durationsMs[i] = durationNs / 1_000_000;
|
||||
completionsMs[i] = (timestamps[2 * i + 1] - startTime) / 1_000_000;
|
||||
beginMs[i] = (timestamps[2 * i] - startTime) / 1_000_000;
|
||||
|
||||
minDuration = Math.min(minDuration, durationNs);
|
||||
maxDuration = Math.max(maxDuration, durationNs);
|
||||
|
||||
minBegin = Math.min(minBegin, timestamps[2 * i]);
|
||||
maxBegin = Math.max(maxBegin, timestamps[2 * i]);
|
||||
|
||||
maxCompletion = Math.max(maxCompletion, timestamps[2 * i + 1]);
|
||||
minCompletion = Math.min(minCompletion, timestamps[2 * i + 1]);
|
||||
}
|
||||
errPrint(
|
||||
STR."""
|
||||
-------------------------------------------------------------------------------------------
|
||||
\{header}
|
||||
-------------------------------------------------------------------------------------------
|
||||
Max duration = \{maxDuration / 1_000_000} ms
|
||||
Min duration = \{minDuration / 1_000_000} ms
|
||||
Timespan[max(end)-min(start)] = \{(maxCompletion - minBegin) / 1_000_000} ms
|
||||
Completion Timespan[max(end)-min(end)] = \{(maxCompletion - minCompletion) / 1_000_000} ms
|
||||
Begin Timespan[max(begin)-min(begin)] = \{(maxBegin - minBegin) / 1_000_000} ms
|
||||
Durations = \{toString(durationsMs)} in ms
|
||||
Begin Timestamps = \{toString(beginMs)} in ms
|
||||
Completion Timestamps = \{toString(completionsMs)} in ms
|
||||
""");
|
||||
}
|
||||
|
||||
private static long[] timingsArray(String id) {
|
||||
return switch (id) {
|
||||
case "cleaner" -> cleanerTimes;
|
||||
case "shard" -> threadTimes;
|
||||
default -> throw new RuntimeException("");
|
||||
};
|
||||
}
|
||||
|
||||
private static String toString(long[] array) {
|
||||
return Arrays.stream(array)
|
||||
.mapToObj(x -> String.format("%6d", x))
|
||||
.collect(Collectors.joining(", ", "[ ", " ]"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user