Squashing a bunch of commits together. (#428)

Commit#2; Uplift of 7% using native byteorder from ByteBuffer.
Commit#1: Minor changes to formatting.

Co-authored-by: vemana <vemana.github@gmail.com>
This commit is contained in:
Vemana 2024-01-16 00:40:50 +05:30 committed by GitHub
parent 702d41df15
commit 6fe395cbae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -41,27 +41,24 @@ import java.util.stream.Collectors;
* remain readable for a majority of SWEs. At a high level, the approach relies on a few principles * remain readable for a majority of SWEs. At a high level, the approach relies on a few principles
* listed herein. * listed herein.
* *
* <p> * <p>[Exploit Parallelism] Distribute the work into Shards. Separate threads (one per core) process
* [Exploit Parallelism] Distribute the work into Shards. Separate threads (one per core) process
* Shards and follow it up by merging the results. parallelStream() is appealing but carries * Shards and follow it up by merging the results. parallelStream() is appealing but carries
* potential run-time variance (i.e. std. deviation) penalties based on informal testing. Variance * potential run-time variance (i.e. std. deviation) penalties based on informal testing. Variance
* is not ideal when trying to minimize the maximum worker latency. * is not ideal when trying to minimize the maximum worker latency.
* *
* <p> * <p>[Use ByteBuffers over MemorySegment] Each Shard is further divided in Chunks. This would've
* [Use ByteBuffers over MemorySegment] Each Shard is further divided in Chunks. This would've been * been unnecessary except that Shards are too big to be backed by ByteBuffers. Besides,
* unnecessary except that Shards are too big to be backed by ByteBuffers. Besides, MemorySegment * MemorySegment appears slower than ByteBuffers. So, to use ByteBuffers, we have to use smaller
* appears slower than ByteBuffers. So, to use ByteBuffers, we have to use smaller chunks. * chunks.
* *
* <p> * <p>[Straggler freedom] The optimization function here is to minimize the maximal worker thread
* [Straggler freedom] The optimization function here is to minimize the maximal worker thread
* completion. Law of large number averages means that all the threads will end up with similar * completion. Law of large number averages means that all the threads will end up with similar
* amounts of work and similar completion times; but, however ever so often there could be a bad * amounts of work and similar completion times; but, however ever so often there could be a bad
* sharding and more importantly, Cores are not created equal; some will be throttled more than * sharding and more importantly, Cores are not created equal; some will be throttled more than
* others. So, we have a shared {@code LazyShardQueue} that aims to distribute work to minimize the * others. So, we have a shared {@code LazyShardQueue} that aims to distribute work to minimize the
* latest completion time. * latest completion time.
* *
* <p> * <p>[Work Assignment with LazyShardQueue] The queue provides each thread with its next big-chunk
* [Work Assignment with LazyShardQueue] The queue provides each thread with its next big-chunk
* until X% of the work remains. Big-chunks belong to the thread and will not be provided to another * until X% of the work remains. Big-chunks belong to the thread and will not be provided to another
* thread. Then, it switches to providing small-chunk sizes. Small-chunks comprise the last X% of * thread. Then, it switches to providing small-chunk sizes. Small-chunks comprise the last X% of
* work and every thread can participate in completing the chunk. Even though the queue is shared * work and every thread can participate in completing the chunk. Even though the queue is shared
@ -69,27 +66,29 @@ import java.util.stream.Collectors;
* effectively a per-thread queue while processing big-chunks. The small-chunk phase uses an * effectively a per-thread queue while processing big-chunks. The small-chunk phase uses an
* AtomicLong to coordinate chunk allocation across threads. * AtomicLong to coordinate chunk allocation across threads.
* *
* <p> * <p>[Chunk processing] Chunk processing is typical. Process line by line. Find a hash function
* [Chunk processing] Chunk processing is typical. Process line by line. Find a hash function
* (polynomial hash fns are slow, but will work fine), hash the city name, resolve conflicts using * (polynomial hash fns are slow, but will work fine), hash the city name, resolve conflicts using
* linear probing and then accumulate the temperature into the appropriate hash slot. The key * linear probing and then accumulate the temperature into the appropriate hash slot. The key
* element then is how fast can you identify the hash slot, read the temperature and update the new * element then is how fast can you identify the hash slot, read the temperature and update the new
* temperature in the slot (i.e. min, max, count). * temperature in the slot (i.e. min, max, count).
* *
* <p> * <p>[Cache friendliness] 7502P and my machine (7950X) offer 4MB L3 cache/core. This means we can
* [Cache friendliness] 7502P and my machine (7950X) offer 4MB L3 cache/core. This means we can hope * hope to fit all our datastructures in L3 cache. Since SMT is turned on, the Runtime's available
* to fit all our datastructures in L3 cache. Since SMT is turned on, the Runtime's available
* processors will show twice the number of actual cores and so we get 2MB L3 cache/thread. To be * processors will show twice the number of actual cores and so we get 2MB L3 cache/thread. To be
* safe, we try to stay within 1.8 MB/thread and size our hashtable appropriately. * safe, we try to stay within 1.8 MB/thread and size our hashtable appropriately.
* *
* <p> * <p>[Native ByteOrder is MUCH better] There was almost a 10% lift by reading ints from bytebuffers
* [Allocation] Since MemorySegment seemed slower than ByteBuffers, backing Chunks by bytebuffers * using native byteorder . It so happens that both the eval machine (7502P) and my machine 7950X
* use native LITTLE_ENDIAN order, which again apparently is because X86[-64] is little-endian. But,
* by default, ByteBuffers use BIG_ENDIAN order, which appears to be a somewhat strange default from
* Java.
*
* <p>[Allocation] Since MemorySegment seemed slower than ByteBuffers, backing Chunks by bytebuffers
* was the logical option. Creating one ByteBuffer per chunk was no bueno because the system doesn't * was the logical option. Creating one ByteBuffer per chunk was no bueno because the system doesn't
* like it (JVM runs out of mapped file handle quota). Other than that, allocation in the hot path * like it (JVM runs out of mapped file handle quota). Other than that, allocation in the hot path
* was avoided. * was avoided.
* *
* <p> * <p>[General approach to fast hashing and temperature reading] Here, it helps to understand the
* [General approach to fast hashing and temperature reading] Here, it helps to understand the
* various bottlenecks in execution. One particular thing that I kept coming back to was to * various bottlenecks in execution. One particular thing that I kept coming back to was to
* understand the relative costs of instructions: See * understand the relative costs of instructions: See
* https://www.agner.org/optimize/instruction_tables.pdf It is helpful to think of hardware as a * https://www.agner.org/optimize/instruction_tables.pdf It is helpful to think of hardware as a
@ -102,24 +101,22 @@ import java.util.stream.Collectors;
* endPos" in a tight loop by breaking it into two pieces: one piece where the check will not be * endPos" in a tight loop by breaking it into two pieces: one piece where the check will not be
* needed and a tail piece where it will be needed. * needed and a tail piece where it will be needed.
* *
* <p> * <p>[Understand What Cores like]. Cores like to go straight and loop back. Despite good branch
* [Understand What Cores like]. Cores like to go straight and loop back. Despite good branch
* prediction, performance sucks with mispredicted branches. * prediction, performance sucks with mispredicted branches.
* *
* <p> * <p>[JIT] Java performance requires understanding the JIT. It is helpful to understand what the
* [JIT] Java performance requires understanding the JIT. It is helpful to understand what the JIT * JIT likes though it is still somewhat of a mystery to me. In general, it inlines small methods
* likes though it is still somewhat of a mystery to me. In general, it inlines small methods very * very well and after constant folding, it can optimize quite well across a reasonably deep call
* well and after constant folding, it can optimize quite well across a reasonably deep call chain. * chain. My experience with the JIT was that everything I tried to tune it made it worse except for
* My experience with the JIT was that everything I tried to tune it made it worse except for one * one parameter. I have a new-found respect for JIT - it likes and understands typical Java idioms.
* parameter. I have a new-found respect for JIT - it likes and understands typical Java idioms.
* *
* <p>[Tuning] Nothing was more insightful than actually playing with various tuning parameters. * <p>[Tuning] Nothing was more insightful than actually playing with various tuning parameters. I
* I can have all the theories but the hardware and JIT are giant blackboxes. I used a bunch of * can have all the theories but the hardware and JIT are giant blackboxes. I used a bunch of tools
* tools to optimize: (1) Command line parameters to tune big and small chunk sizes etc. This was * to optimize: (1) Command line parameters to tune big and small chunk sizes etc. This was also
* also very helpful in forming a mental model of the JIT. Sometimes, it would compile some methods * very helpful in forming a mental model of the JIT. Sometimes, it would compile some methods and
* and sometimes it would just run them interpreted since the compilation threshold wouldn't be * sometimes it would just run them interpreted since the compilation threshold wouldn't be reached
* reached for intermediate methods. (2) AsyncProfiler - this was the first line tool to understand * for intermediate methods. (2) AsyncProfiler - this was the first line tool to understand cache
* cache misses and cpu time to figure where to aim the next optimization effort. (3) JitWatch - * misses and cpu time to figure where to aim the next optimization effort. (3) JitWatch -
* invaluable for forming a mental model and attempting to tune the JIT. * invaluable for forming a mental model and attempting to tune the JIT.
* *
* <p>[Things that didn't work]. This is a looong list and the hit rate is quite low. In general, * <p>[Things that didn't work]. This is a looong list and the hit rate is quite low. In general,
@ -140,12 +137,6 @@ import java.util.stream.Collectors;
*/ */
public class CalculateAverage_vemana { public class CalculateAverage_vemana {
public static void checkArg(boolean condition) {
if (!condition) {
throw new IllegalArgumentException();
}
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// First process in large chunks without coordination among threads // First process in large chunks without coordination among threads
// Use chunkSizeBits for the large-chunk size // Use chunkSizeBits for the large-chunk size
@ -184,17 +175,25 @@ public class CalculateAverage_vemana {
// - hashtableSizeBits = \{hashtableSizeBits} // - hashtableSizeBits = \{hashtableSizeBits}
// """); // """);
System.out.println(new Runner( System.out.println(
new Runner(
Path.of("measurements.txt"), Path.of("measurements.txt"),
chunkSizeBits, chunkSizeBits,
commonChunkFraction, commonChunkFraction,
commonChunkSizeBits, commonChunkSizeBits,
hashtableSizeBits).getSummaryStatistics()); hashtableSizeBits)
.getSummaryStatistics());
} }
public interface LazyShardQueue { public record AggregateResult(Map<String, Stat> tempStats) {
ByteRange take(int shardIdx); @Override
public String toString() {
return this.tempStats().entrySet().stream()
.sorted(Entry.comparingByKey())
.map(entry -> "%s=%s".formatted(entry.getKey(), entry.getValue()))
.collect(Collectors.joining(", ", "{", "}"));
}
} }
// Mutable to avoid allocation // Mutable to avoid allocation
@ -285,6 +284,7 @@ public class CalculateAverage_vemana {
private void setByteBufferToRange(long start, long end) { private void setByteBufferToRange(long start, long end) {
try { try {
byteBuffer = raf.getChannel().map(MapMode.READ_ONLY, start, end - start); byteBuffer = raf.getChannel().map(MapMode.READ_ONLY, start, end - start);
byteBuffer.order(ByteOrder.nativeOrder());
} }
catch (IOException e) { catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@ -292,19 +292,23 @@ public class CalculateAverage_vemana {
} }
} }
public record Result(Map<String, Stat> tempStats) { public static final class Checks {
@Override public static void checkArg(boolean condition) {
public String toString() { if (!condition) {
return this.tempStats() throw new IllegalArgumentException();
.entrySet()
.stream()
.sorted(Entry.comparingByKey())
.map(entry -> "%s=%s".formatted(entry.getKey(), entry.getValue()))
.collect(Collectors.joining(", ", "{", "}"));
} }
} }
private Checks() {
}
}
public interface LazyShardQueue {
ByteRange take(int shardIdx);
}
public static class Runner { public static class Runner {
private final double commonChunkFraction; private final double commonChunkFraction;
@ -314,7 +318,10 @@ public class CalculateAverage_vemana {
private final int shardSizeBits; private final int shardSizeBits;
public Runner( public Runner(
Path inputFile, int chunkSizeBits, double commonChunkFraction, int commonChunkSizeBits, Path inputFile,
int chunkSizeBits,
double commonChunkFraction,
int commonChunkSizeBits,
int hashtableSizeBits) { int hashtableSizeBits) {
this.inputFile = inputFile; this.inputFile = inputFile;
this.shardSizeBits = chunkSizeBits; this.shardSizeBits = chunkSizeBits;
@ -323,16 +330,12 @@ public class CalculateAverage_vemana {
this.hashtableSizeBits = hashtableSizeBits; this.hashtableSizeBits = hashtableSizeBits;
} }
Result getSummaryStatistics() throws Exception { AggregateResult getSummaryStatistics() throws Exception {
int processors = Runtime.getRuntime().availableProcessors(); int processors = Runtime.getRuntime().availableProcessors();
LazyShardQueue shardQueue = new SerialLazyShardQueue( LazyShardQueue shardQueue = new SerialLazyShardQueue(
1L << shardSizeBits, 1L << shardSizeBits, inputFile, processors, commonChunkFraction, commonChunkSizeBits);
inputFile,
processors,
commonChunkFraction,
commonChunkSizeBits);
List<Future<Result>> results = new ArrayList<>(); List<Future<AggregateResult>> results = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool( ExecutorService executorService = Executors.newFixedThreadPool(
processors, processors,
runnable -> { runnable -> {
@ -345,8 +348,8 @@ public class CalculateAverage_vemana {
for (int i = 0; i < processors; i++) { for (int i = 0; i < processors; i++) {
final int I = i; final int I = i;
final Callable<Result> callable = () -> { final Callable<AggregateResult> callable = () -> {
Result result = new ShardProcessor(shardQueue, hashtableSizeBits, I).processShard(); AggregateResult result = new ShardProcessor(shardQueue, hashtableSizeBits, I).processShard();
finishTimes[I] = System.nanoTime(); finishTimes[I] = System.nanoTime();
return result; return result;
}; };
@ -356,7 +359,7 @@ public class CalculateAverage_vemana {
return executorService.submit(() -> merge(results)).get(); return executorService.submit(() -> merge(results)).get();
} }
private Result merge(List<Future<Result>> results) private AggregateResult merge(List<Future<AggregateResult>> results)
throws ExecutionException, InterruptedException { throws ExecutionException, InterruptedException {
Map<String, Stat> output = null; Map<String, Stat> output = null;
boolean[] isDone = new boolean[results.size()]; boolean[] isDone = new boolean[results.size()];
@ -374,20 +377,20 @@ public class CalculateAverage_vemana {
for (Entry<String, Stat> entry : results.get(i).get().tempStats().entrySet()) { for (Entry<String, Stat> entry : results.get(i).get().tempStats().entrySet()) {
output.compute( output.compute(
entry.getKey(), entry.getKey(),
(key, value) -> value == null ? entry.getValue() (key, value) -> value == null ? entry.getValue() : Stat.merge(value, entry.getValue()));
: Stat.merge(value, entry.getValue()));
} }
} }
} }
} }
} }
return new Result(output); return new AggregateResult(output);
} }
private void printFinishTimes(long[] finishTimes) { private void printFinishTimes(long[] finishTimes) {
Arrays.sort(finishTimes); Arrays.sort(finishTimes);
int n = finishTimes.length; int n = finishTimes.length;
System.err.println(STR."Finish Delta: \{(finishTimes[n - 1] - finishTimes[0]) / 1_000_000}ms"); System.err.println(
STR."Finish Delta: \{(finishTimes[n - 1] - finishTimes[0]) / 1_000_000}ms");
} }
} }
@ -405,23 +408,29 @@ public class CalculateAverage_vemana {
private final long[] nextStarts; private final long[] nextStarts;
public SerialLazyShardQueue( public SerialLazyShardQueue(
long chunkSize, Path filePath, int shards, double commonChunkFraction, long chunkSize,
Path filePath,
int shards,
double commonChunkFraction,
int commonChunkSizeBits) int commonChunkSizeBits)
throws IOException { throws IOException {
checkArg(commonChunkFraction < 0.9 && commonChunkFraction >= 0); Checks.checkArg(commonChunkFraction < 0.9 && commonChunkFraction >= 0);
var raf = new RandomAccessFile(filePath.toFile(), "r"); var raf = new RandomAccessFile(filePath.toFile(), "r");
this.fileSize = raf.length(); this.fileSize = raf.length();
// Common pool // Common pool
long commonPoolStart = Math.min( long commonPoolStart = Math.min(
roundToNearestHigherMultipleOf(chunkSize, (long) (fileSize * (1 - commonChunkFraction))), roundToNearestHigherMultipleOf(
chunkSize, (long) (fileSize * (1 - commonChunkFraction))),
fileSize); fileSize);
this.commonPool = new AtomicLong(commonPoolStart); this.commonPool = new AtomicLong(commonPoolStart);
this.commonChunkSize = 1L << commonChunkSizeBits; this.commonChunkSize = 1L << commonChunkSizeBits;
// Distribute chunks to shards // Distribute chunks to shards
this.nextStarts = new long[shards << 4]; // thread idx -> 16*idx to avoid cache line conflict this.nextStarts = new long[shards << 4]; // thread idx -> 16*idx to avoid cache line conflict
for (long i = 0, currentStart = 0, remainingChunks = (commonPoolStart + chunkSize - 1) / chunkSize; i < shards; i++) { for (long i = 0,
currentStart = 0,
remainingChunks = (commonPoolStart + chunkSize - 1) / chunkSize; i < shards; i++) {
long remainingShards = shards - i; long remainingShards = shards - i;
long currentChunks = (remainingChunks + remainingShards - 1) / remainingShards; long currentChunks = (remainingChunks + remainingShards - 1) / remainingShards;
// Shard i handles: [currentStart, currentStart + currentChunks * chunkSize) // Shard i handles: [currentStart, currentStart + currentChunks * chunkSize)
@ -479,7 +488,7 @@ public class CalculateAverage_vemana {
this.state = new ShardProcessorState(hashtableSizeBits); this.state = new ShardProcessorState(hashtableSizeBits);
} }
public Result processShard() { public AggregateResult processShard() {
ByteRange range; ByteRange range;
while ((range = shardQueue.take(threadIdx)) != null) { while ((range = shardQueue.take(threadIdx)) != null) {
processRange(range); processRange(range);
@ -497,7 +506,7 @@ public class CalculateAverage_vemana {
} }
} }
private Result result() { private AggregateResult result() {
return state.result(); return state.result();
} }
} }
@ -527,30 +536,30 @@ public class CalculateAverage_vemana {
x = Integer.reverseBytes(x); x = Integer.reverseBytes(x);
} }
byte a = (byte) (x >>> 24); byte a = (byte) (x >>> 0);
if (a == ';') { if (a == ';') {
nextPos += 1; nextPos += 1;
break; break;
} }
byte b = (byte) (x >>> 16); byte b = (byte) (x >>> 8);
if (b == ';') { if (b == ';') {
nextPos += 2; nextPos += 2;
hash = hash * 31 + ((0xFF000000 & x)); hash = hash * 31 + ((0xFF & x));
break; break;
} }
byte c = (byte) (x >>> 8); byte c = (byte) (x >>> 16);
if (c == ';') { if (c == ';') {
nextPos += 3; nextPos += 3;
hash = hash * 31 + ((0xFFFF0000 & x)); hash = hash * 31 + ((0xFFFF & x));
break; break;
} }
byte d = (byte) (x >>> 0); byte d = (byte) (x >>> 24);
if (d == ';') { if (d == ';') {
nextPos += 4; nextPos += 4;
hash = hash * 31 + ((0xFFFFFF00 & x)); hash = hash * 31 + ((0xFFFFFF & x));
break; break;
} }
@ -582,16 +591,12 @@ public class CalculateAverage_vemana {
} }
linearProbe( linearProbe(
cityLen, cityLen, hash & slotsMask, negative ? -temperature : temperature, mmb, originalPos);
hash & slotsMask,
negative ? -temperature : temperature,
mmb,
originalPos);
return nextPos; return nextPos;
} }
public Result result() { public AggregateResult result() {
int N = stats.length; int N = stats.length;
TreeMap<String, Stat> map = new TreeMap<>(); TreeMap<String, Stat> map = new TreeMap<>();
for (int i = 0; i < N; i++) { for (int i = 0; i < N; i++) {
@ -599,7 +604,7 @@ public class CalculateAverage_vemana {
map.put(new String(cityNames[i]), stats[i]); map.put(new String(cityNames[i]), stats[i]);
} }
} }
return new Result(map); return new AggregateResult(map);
} }
private byte[] copyFrom(MappedByteBuffer mmb, int offsetInMmb, int len) { private byte[] copyFrom(MappedByteBuffer mmb, int offsetInMmb, int len) {
@ -642,6 +647,7 @@ public class CalculateAverage_vemana {
} }
} }
/** Represents aggregate stats. */
public static class Stat { public static class Stat {
public static Stat firstReading(int temp) { public static Stat firstReading(int temp) {