From 7d52a37600359de931395e218c909eaaf901f690 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Serkan=20=C3=96ZAL?= Date: Mon, 29 Jan 2024 23:27:06 +0300 Subject: [PATCH] serkan-ozal's 4th submission: (#645) - split big regions into shared smaller tasks, so the workers complete their own tasks can pick up from the remaining instead of leaving its core idle - reduce number of executed instructions in the hot path --- calculate_average_serkan-ozal.sh | 2 +- .../onebrc/CalculateAverage_serkan_ozal.java | 106 +++++++++++------- 2 files changed, 65 insertions(+), 43 deletions(-) diff --git a/calculate_average_serkan-ozal.sh b/calculate_average_serkan-ozal.sh index c075fc2..cce366f 100755 --- a/calculate_average_serkan-ozal.sh +++ b/calculate_average_serkan-ozal.sh @@ -26,7 +26,7 @@ if [[ ! "$(uname -s)" = "Darwin" ]]; then JAVA_OPTS="$JAVA_OPTS -XX:+UseTransparentHugePages" fi -CONFIGS="USE_SHARED_ARENA=true USE_SHARED_REGION=true CLOSE_STDOUT_ON_RESULT=true" +CONFIGS="USE_SHARED_ARENA=true USE_SHARED_REGION=true CLOSE_STDOUT_ON_RESULT=true REGION_COUNT=128" #echo "Process started at $(date +%s%N | cut -b1-13)" eval "exec 3< <({ $CONFIGS java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_serkan_ozal; })" diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_serkan_ozal.java b/src/main/java/dev/morling/onebrc/CalculateAverage_serkan_ozal.java index 0ca1fe7..576dd08 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_serkan_ozal.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_serkan_ozal.java @@ -33,8 +33,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.TreeMap; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -125,7 +127,9 @@ public class CalculateAverage_serkan_ozal { arena = Arena.ofShared(); region = fc.map(FileChannel.MapMode.READ_ONLY, 0, fileSize, arena); } - // Split whole file into regions and start region processors to handle those regions + + List tasks = new ArrayList<>(regionCount); + // Split whole file into regions and create tasks for each region List> futures = new ArrayList<>(regionCount); for (int i = 0; i < regionCount; i++) { long endPos = Math.min(fileSize, startPos + regionSize); @@ -134,11 +138,19 @@ public class CalculateAverage_serkan_ozal { long closestLineEndPos = (i < regionCount - 1) ? findClosestLineEnd(fc, endPos, lineBuffer) : fileSize; - Request request = new Request(fc, arena, region, startPos, closestLineEndPos, result); + Task task = new Task(fc, region, startPos, closestLineEndPos); + tasks.add(task); + startPos = closestLineEndPos; + } + + Queue sharedTasks = new ConcurrentLinkedQueue<>(tasks); + + // Start region processors to process tasks for each region + for (int i = 0; i < concurrency; i++) { + Request request = new Request(arena, sharedTasks, result); RegionProcessor regionProcessor = createRegionProcessor(request); Future future = executor.submit(regionProcessor); futures.add(future); - startPos = closestLineEndPos; } // Wait processors to complete @@ -234,22 +246,14 @@ public class CalculateAverage_serkan_ozal { */ private static class RegionProcessor implements Callable { - private final FileChannel fc; private final Arena arena; - private final MemorySegment region; - private final long start; - private final long end; - private final long size; + private final Queue sharedTasks; private final Result result; private OpenMap map; private RegionProcessor(Request request) { - this.fc = request.fileChannel; this.arena = request.arena; - this.region = request.region; - this.start = request.start; - this.end = request.end; - this.size = end - start; + this.sharedTasks = request.sharedTasks; this.result = request.result; } @@ -277,14 +281,17 @@ public class CalculateAverage_serkan_ozal { // If no shared global memory arena is used, create and use its own local memory arena Arena a = arenaGiven ? arena : Arena.ofConfined(); try { - boolean regionGiven = region != null; - MemorySegment r = regionGiven - ? region - : fc.map(FileChannel.MapMode.READ_ONLY, start, size, a); - long regionStart = regionGiven ? (r.address() + start) : r.address(); - long regionEnd = regionStart + size; + for (Task task = sharedTasks.poll(); task != null; task = sharedTasks.poll()) { + boolean regionGiven = task.region != null; + MemorySegment r = regionGiven + ? task.region + : task.fileChannel.map(FileChannel.MapMode.READ_ONLY, task.start, task.size, a); + long regionStart = regionGiven ? (r.address() + task.start) : r.address(); + long regionEnd = regionStart + task.size; + + doProcessRegion(r, r.address(), regionStart, regionEnd); + } - doProcessRegion(r, r.address(), regionStart, regionEnd); if (VERBOSE) { System.out.println("[Processor-" + Thread.currentThread().getName() + "] Region processed at " + System.currentTimeMillis()); } @@ -358,21 +365,22 @@ public class CalculateAverage_serkan_ozal { // Vectorized search for key/value separator ByteVector keyVector = ByteVector.fromMemorySegment(BYTE_SPECIES, region, regionPtr - regionAddress, NATIVE_BYTE_ORDER); - int keyValueSepOffset = keyVector.compare(VectorOperators.EQ, KEY_VALUE_SEPARATOR).firstTrue(); + int keyLength = keyVector.compare(VectorOperators.EQ, KEY_VALUE_SEPARATOR).firstTrue(); // Check whether key/value separator is found in the first vector (city name is <= vector size) - if (keyValueSepOffset == vectorSize) { + if (keyLength != vectorSize) { + regionPtr += (keyLength + 1); + } + else { regionPtr += vectorSize; - keyValueSepOffset = 0; for (; U.getByte(regionPtr) != KEY_VALUE_SEPARATOR; regionPtr++) ; + keyLength = (int) (regionPtr - keyStartPtr); + regionPtr++; // I have tried vectorized search for key/value separator in the remaining part, // but since majority (99%) of the city names <= 16 bytes // and other a few longer city names (have length < 16 and <= 32) not close to 32 bytes, // byte by byte search is better in terms of performance (according to my experiments) and simplicity. } - regionPtr += keyValueSepOffset; - int keyLength = (int) (regionPtr - keyStartPtr); - regionPtr++; //////////////////////////////////////////////////////////////////////////////////////////////////////// // Put key and get map offset to put value @@ -411,21 +419,32 @@ public class CalculateAverage_serkan_ozal { */ private static final class Request { - private final FileChannel fileChannel; private final Arena arena; + private final Queue sharedTasks; + private final Result result; + + private Request(Arena arena, Queue sharedTasks, Result result) { + this.arena = arena; + this.sharedTasks = sharedTasks; + this.result = result; + } + + } + + private static final class Task { + + private final FileChannel fileChannel; private final MemorySegment region; private final long start; private final long end; - private final Result result; + private final long size; - private Request(FileChannel fileChannel, Arena arena, MemorySegment region, - long start, long end, Result result) { + private Task(FileChannel fileChannel, MemorySegment region, long start, long end) { this.fileChannel = fileChannel; - this.arena = arena; this.region = region; this.start = start; this.end = end; - this.result = result; + this.size = end - start; } } @@ -550,6 +569,8 @@ public class CalculateAverage_serkan_ozal { // 128 bytes - total private static final int ENTRY_SIZE = 128; + private static final int ENTRY_SIZE_SHIFT = 7; + private static final int COUNT_OFFSET = 0; private static final int MIN_VALUE_OFFSET = 4; private static final int MAX_VALUE_OFFSET = 6; @@ -563,12 +584,14 @@ public class CalculateAverage_serkan_ozal { private static final int KEY_ARRAY_OFFSET = KEY_OFFSET - Unsafe.ARRAY_BYTE_BASE_OFFSET; private final byte[] data; - // Max number of unique keys are 10K, so 1 << 14 (16384) is long enough to hold offsets for all of them - private final long[] entryOffsets = new long[1 << 14]; - private int entryOffsetIdx = 0; + private final long[] entryOffsets; + private int entryOffsetIdx; private OpenMap() { this.data = new byte[MAP_SIZE]; + // Max number of unique keys are 10K, so 1 << 14 (16384) is long enough to hold offsets for all of them + this.entryOffsets = new long[1 << 14]; + this.entryOffsetIdx = 0; } // Credits: merykitty @@ -591,12 +614,12 @@ public class CalculateAverage_serkan_ozal { // Calculate hash of key int keyHash = calculateKeyHash(keyStartAddress, keyLength); // and get the position of the entry in the linear map based on calculated hash - int idx = keyHash & ENTRY_HASH_MASK; + int idx = (keyHash & ENTRY_HASH_MASK) << ENTRY_SIZE_SHIFT; // Start searching from the calculated position // and continue until find an available slot in case of hash collision // TODO Prevent infinite loop if all the slots are in use for other keys - for (long entryOffset = Unsafe.ARRAY_BYTE_BASE_OFFSET + (idx * ENTRY_SIZE);; entryOffset = (entryOffset + ENTRY_SIZE) & ENTRY_MASK) { + for (long entryOffset = Unsafe.ARRAY_BYTE_BASE_OFFSET + idx;; entryOffset = (entryOffset + ENTRY_SIZE) & ENTRY_MASK) { int keySize = U.getInt(data, entryOffset + KEY_SIZE_OFFSET); // Check whether current index is empty (no another key is inserted yet) if (keySize == 0) { @@ -624,16 +647,15 @@ public class CalculateAverage_serkan_ozal { // Use vectorized search for the comparison of keys. // Since majority of the city names >= 8 bytes and <= 16 bytes, // this way is more efficient (according to my experiments) than any other comparisons (byte by byte or 2 longs). - int keyCheckLength = Math.min(BYTE_SPECIES_SIZE, keyLength); ByteVector entryKeyVector = ByteVector.fromArray(BYTE_SPECIES, data, keyStartArrayOffset); long eqMask = keyVector.compare(VectorOperators.EQ, entryKeyVector).toLong(); int eqCount = Long.numberOfTrailingZeros(~eqMask); - if (eqCount < keyCheckLength) { - return false; - } - if (keyCheckLength == keyLength) { + if (eqCount >= keyLength) { return true; } + else if (keyLength <= BYTE_SPECIES_SIZE) { + return false; + } keyCheckIdx = BYTE_SPECIES_SIZE; }