serkan-ozal's 4th submission: (#645)
- split big regions into shared smaller tasks, so the workers complete their own tasks can pick up from the remaining instead of leaving its core idle - reduce number of executed instructions in the hot path
This commit is contained in:
parent
f4a0039a59
commit
7d52a37600
@ -26,7 +26,7 @@ if [[ ! "$(uname -s)" = "Darwin" ]]; then
|
|||||||
JAVA_OPTS="$JAVA_OPTS -XX:+UseTransparentHugePages"
|
JAVA_OPTS="$JAVA_OPTS -XX:+UseTransparentHugePages"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
CONFIGS="USE_SHARED_ARENA=true USE_SHARED_REGION=true CLOSE_STDOUT_ON_RESULT=true"
|
CONFIGS="USE_SHARED_ARENA=true USE_SHARED_REGION=true CLOSE_STDOUT_ON_RESULT=true REGION_COUNT=128"
|
||||||
|
|
||||||
#echo "Process started at $(date +%s%N | cut -b1-13)"
|
#echo "Process started at $(date +%s%N | cut -b1-13)"
|
||||||
eval "exec 3< <({ $CONFIGS java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_serkan_ozal; })"
|
eval "exec 3< <({ $CONFIGS java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_serkan_ozal; })"
|
||||||
|
@ -33,8 +33,10 @@ import java.util.ArrayList;
|
|||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Queue;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
@ -125,7 +127,9 @@ public class CalculateAverage_serkan_ozal {
|
|||||||
arena = Arena.ofShared();
|
arena = Arena.ofShared();
|
||||||
region = fc.map(FileChannel.MapMode.READ_ONLY, 0, fileSize, arena);
|
region = fc.map(FileChannel.MapMode.READ_ONLY, 0, fileSize, arena);
|
||||||
}
|
}
|
||||||
// Split whole file into regions and start region processors to handle those regions
|
|
||||||
|
List<Task> tasks = new ArrayList<>(regionCount);
|
||||||
|
// Split whole file into regions and create tasks for each region
|
||||||
List<Future<Response>> futures = new ArrayList<>(regionCount);
|
List<Future<Response>> futures = new ArrayList<>(regionCount);
|
||||||
for (int i = 0; i < regionCount; i++) {
|
for (int i = 0; i < regionCount; i++) {
|
||||||
long endPos = Math.min(fileSize, startPos + regionSize);
|
long endPos = Math.min(fileSize, startPos + regionSize);
|
||||||
@ -134,11 +138,19 @@ public class CalculateAverage_serkan_ozal {
|
|||||||
long closestLineEndPos = (i < regionCount - 1)
|
long closestLineEndPos = (i < regionCount - 1)
|
||||||
? findClosestLineEnd(fc, endPos, lineBuffer)
|
? findClosestLineEnd(fc, endPos, lineBuffer)
|
||||||
: fileSize;
|
: fileSize;
|
||||||
Request request = new Request(fc, arena, region, startPos, closestLineEndPos, result);
|
Task task = new Task(fc, region, startPos, closestLineEndPos);
|
||||||
|
tasks.add(task);
|
||||||
|
startPos = closestLineEndPos;
|
||||||
|
}
|
||||||
|
|
||||||
|
Queue<Task> sharedTasks = new ConcurrentLinkedQueue<>(tasks);
|
||||||
|
|
||||||
|
// Start region processors to process tasks for each region
|
||||||
|
for (int i = 0; i < concurrency; i++) {
|
||||||
|
Request request = new Request(arena, sharedTasks, result);
|
||||||
RegionProcessor regionProcessor = createRegionProcessor(request);
|
RegionProcessor regionProcessor = createRegionProcessor(request);
|
||||||
Future<Response> future = executor.submit(regionProcessor);
|
Future<Response> future = executor.submit(regionProcessor);
|
||||||
futures.add(future);
|
futures.add(future);
|
||||||
startPos = closestLineEndPos;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait processors to complete
|
// Wait processors to complete
|
||||||
@ -234,22 +246,14 @@ public class CalculateAverage_serkan_ozal {
|
|||||||
*/
|
*/
|
||||||
private static class RegionProcessor implements Callable<Response> {
|
private static class RegionProcessor implements Callable<Response> {
|
||||||
|
|
||||||
private final FileChannel fc;
|
|
||||||
private final Arena arena;
|
private final Arena arena;
|
||||||
private final MemorySegment region;
|
private final Queue<Task> sharedTasks;
|
||||||
private final long start;
|
|
||||||
private final long end;
|
|
||||||
private final long size;
|
|
||||||
private final Result result;
|
private final Result result;
|
||||||
private OpenMap map;
|
private OpenMap map;
|
||||||
|
|
||||||
private RegionProcessor(Request request) {
|
private RegionProcessor(Request request) {
|
||||||
this.fc = request.fileChannel;
|
|
||||||
this.arena = request.arena;
|
this.arena = request.arena;
|
||||||
this.region = request.region;
|
this.sharedTasks = request.sharedTasks;
|
||||||
this.start = request.start;
|
|
||||||
this.end = request.end;
|
|
||||||
this.size = end - start;
|
|
||||||
this.result = request.result;
|
this.result = request.result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -277,14 +281,17 @@ public class CalculateAverage_serkan_ozal {
|
|||||||
// If no shared global memory arena is used, create and use its own local memory arena
|
// If no shared global memory arena is used, create and use its own local memory arena
|
||||||
Arena a = arenaGiven ? arena : Arena.ofConfined();
|
Arena a = arenaGiven ? arena : Arena.ofConfined();
|
||||||
try {
|
try {
|
||||||
boolean regionGiven = region != null;
|
for (Task task = sharedTasks.poll(); task != null; task = sharedTasks.poll()) {
|
||||||
|
boolean regionGiven = task.region != null;
|
||||||
MemorySegment r = regionGiven
|
MemorySegment r = regionGiven
|
||||||
? region
|
? task.region
|
||||||
: fc.map(FileChannel.MapMode.READ_ONLY, start, size, a);
|
: task.fileChannel.map(FileChannel.MapMode.READ_ONLY, task.start, task.size, a);
|
||||||
long regionStart = regionGiven ? (r.address() + start) : r.address();
|
long regionStart = regionGiven ? (r.address() + task.start) : r.address();
|
||||||
long regionEnd = regionStart + size;
|
long regionEnd = regionStart + task.size;
|
||||||
|
|
||||||
doProcessRegion(r, r.address(), regionStart, regionEnd);
|
doProcessRegion(r, r.address(), regionStart, regionEnd);
|
||||||
|
}
|
||||||
|
|
||||||
if (VERBOSE) {
|
if (VERBOSE) {
|
||||||
System.out.println("[Processor-" + Thread.currentThread().getName() + "] Region processed at " + System.currentTimeMillis());
|
System.out.println("[Processor-" + Thread.currentThread().getName() + "] Region processed at " + System.currentTimeMillis());
|
||||||
}
|
}
|
||||||
@ -358,21 +365,22 @@ public class CalculateAverage_serkan_ozal {
|
|||||||
|
|
||||||
// Vectorized search for key/value separator
|
// Vectorized search for key/value separator
|
||||||
ByteVector keyVector = ByteVector.fromMemorySegment(BYTE_SPECIES, region, regionPtr - regionAddress, NATIVE_BYTE_ORDER);
|
ByteVector keyVector = ByteVector.fromMemorySegment(BYTE_SPECIES, region, regionPtr - regionAddress, NATIVE_BYTE_ORDER);
|
||||||
int keyValueSepOffset = keyVector.compare(VectorOperators.EQ, KEY_VALUE_SEPARATOR).firstTrue();
|
int keyLength = keyVector.compare(VectorOperators.EQ, KEY_VALUE_SEPARATOR).firstTrue();
|
||||||
// Check whether key/value separator is found in the first vector (city name is <= vector size)
|
// Check whether key/value separator is found in the first vector (city name is <= vector size)
|
||||||
if (keyValueSepOffset == vectorSize) {
|
if (keyLength != vectorSize) {
|
||||||
|
regionPtr += (keyLength + 1);
|
||||||
|
}
|
||||||
|
else {
|
||||||
regionPtr += vectorSize;
|
regionPtr += vectorSize;
|
||||||
keyValueSepOffset = 0;
|
|
||||||
for (; U.getByte(regionPtr) != KEY_VALUE_SEPARATOR; regionPtr++)
|
for (; U.getByte(regionPtr) != KEY_VALUE_SEPARATOR; regionPtr++)
|
||||||
;
|
;
|
||||||
|
keyLength = (int) (regionPtr - keyStartPtr);
|
||||||
|
regionPtr++;
|
||||||
// I have tried vectorized search for key/value separator in the remaining part,
|
// I have tried vectorized search for key/value separator in the remaining part,
|
||||||
// but since majority (99%) of the city names <= 16 bytes
|
// but since majority (99%) of the city names <= 16 bytes
|
||||||
// and other a few longer city names (have length < 16 and <= 32) not close to 32 bytes,
|
// and other a few longer city names (have length < 16 and <= 32) not close to 32 bytes,
|
||||||
// byte by byte search is better in terms of performance (according to my experiments) and simplicity.
|
// byte by byte search is better in terms of performance (according to my experiments) and simplicity.
|
||||||
}
|
}
|
||||||
regionPtr += keyValueSepOffset;
|
|
||||||
int keyLength = (int) (regionPtr - keyStartPtr);
|
|
||||||
regionPtr++;
|
|
||||||
////////////////////////////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
// Put key and get map offset to put value
|
// Put key and get map offset to put value
|
||||||
@ -411,21 +419,32 @@ public class CalculateAverage_serkan_ozal {
|
|||||||
*/
|
*/
|
||||||
private static final class Request {
|
private static final class Request {
|
||||||
|
|
||||||
private final FileChannel fileChannel;
|
|
||||||
private final Arena arena;
|
private final Arena arena;
|
||||||
|
private final Queue<Task> sharedTasks;
|
||||||
|
private final Result result;
|
||||||
|
|
||||||
|
private Request(Arena arena, Queue<Task> sharedTasks, Result result) {
|
||||||
|
this.arena = arena;
|
||||||
|
this.sharedTasks = sharedTasks;
|
||||||
|
this.result = result;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class Task {
|
||||||
|
|
||||||
|
private final FileChannel fileChannel;
|
||||||
private final MemorySegment region;
|
private final MemorySegment region;
|
||||||
private final long start;
|
private final long start;
|
||||||
private final long end;
|
private final long end;
|
||||||
private final Result result;
|
private final long size;
|
||||||
|
|
||||||
private Request(FileChannel fileChannel, Arena arena, MemorySegment region,
|
private Task(FileChannel fileChannel, MemorySegment region, long start, long end) {
|
||||||
long start, long end, Result result) {
|
|
||||||
this.fileChannel = fileChannel;
|
this.fileChannel = fileChannel;
|
||||||
this.arena = arena;
|
|
||||||
this.region = region;
|
this.region = region;
|
||||||
this.start = start;
|
this.start = start;
|
||||||
this.end = end;
|
this.end = end;
|
||||||
this.result = result;
|
this.size = end - start;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -550,6 +569,8 @@ public class CalculateAverage_serkan_ozal {
|
|||||||
// 128 bytes - total
|
// 128 bytes - total
|
||||||
|
|
||||||
private static final int ENTRY_SIZE = 128;
|
private static final int ENTRY_SIZE = 128;
|
||||||
|
private static final int ENTRY_SIZE_SHIFT = 7;
|
||||||
|
|
||||||
private static final int COUNT_OFFSET = 0;
|
private static final int COUNT_OFFSET = 0;
|
||||||
private static final int MIN_VALUE_OFFSET = 4;
|
private static final int MIN_VALUE_OFFSET = 4;
|
||||||
private static final int MAX_VALUE_OFFSET = 6;
|
private static final int MAX_VALUE_OFFSET = 6;
|
||||||
@ -563,12 +584,14 @@ public class CalculateAverage_serkan_ozal {
|
|||||||
private static final int KEY_ARRAY_OFFSET = KEY_OFFSET - Unsafe.ARRAY_BYTE_BASE_OFFSET;
|
private static final int KEY_ARRAY_OFFSET = KEY_OFFSET - Unsafe.ARRAY_BYTE_BASE_OFFSET;
|
||||||
|
|
||||||
private final byte[] data;
|
private final byte[] data;
|
||||||
// Max number of unique keys are 10K, so 1 << 14 (16384) is long enough to hold offsets for all of them
|
private final long[] entryOffsets;
|
||||||
private final long[] entryOffsets = new long[1 << 14];
|
private int entryOffsetIdx;
|
||||||
private int entryOffsetIdx = 0;
|
|
||||||
|
|
||||||
private OpenMap() {
|
private OpenMap() {
|
||||||
this.data = new byte[MAP_SIZE];
|
this.data = new byte[MAP_SIZE];
|
||||||
|
// Max number of unique keys are 10K, so 1 << 14 (16384) is long enough to hold offsets for all of them
|
||||||
|
this.entryOffsets = new long[1 << 14];
|
||||||
|
this.entryOffsetIdx = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Credits: merykitty
|
// Credits: merykitty
|
||||||
@ -591,12 +614,12 @@ public class CalculateAverage_serkan_ozal {
|
|||||||
// Calculate hash of key
|
// Calculate hash of key
|
||||||
int keyHash = calculateKeyHash(keyStartAddress, keyLength);
|
int keyHash = calculateKeyHash(keyStartAddress, keyLength);
|
||||||
// and get the position of the entry in the linear map based on calculated hash
|
// and get the position of the entry in the linear map based on calculated hash
|
||||||
int idx = keyHash & ENTRY_HASH_MASK;
|
int idx = (keyHash & ENTRY_HASH_MASK) << ENTRY_SIZE_SHIFT;
|
||||||
|
|
||||||
// Start searching from the calculated position
|
// Start searching from the calculated position
|
||||||
// and continue until find an available slot in case of hash collision
|
// and continue until find an available slot in case of hash collision
|
||||||
// TODO Prevent infinite loop if all the slots are in use for other keys
|
// TODO Prevent infinite loop if all the slots are in use for other keys
|
||||||
for (long entryOffset = Unsafe.ARRAY_BYTE_BASE_OFFSET + (idx * ENTRY_SIZE);; entryOffset = (entryOffset + ENTRY_SIZE) & ENTRY_MASK) {
|
for (long entryOffset = Unsafe.ARRAY_BYTE_BASE_OFFSET + idx;; entryOffset = (entryOffset + ENTRY_SIZE) & ENTRY_MASK) {
|
||||||
int keySize = U.getInt(data, entryOffset + KEY_SIZE_OFFSET);
|
int keySize = U.getInt(data, entryOffset + KEY_SIZE_OFFSET);
|
||||||
// Check whether current index is empty (no another key is inserted yet)
|
// Check whether current index is empty (no another key is inserted yet)
|
||||||
if (keySize == 0) {
|
if (keySize == 0) {
|
||||||
@ -624,16 +647,15 @@ public class CalculateAverage_serkan_ozal {
|
|||||||
// Use vectorized search for the comparison of keys.
|
// Use vectorized search for the comparison of keys.
|
||||||
// Since majority of the city names >= 8 bytes and <= 16 bytes,
|
// Since majority of the city names >= 8 bytes and <= 16 bytes,
|
||||||
// this way is more efficient (according to my experiments) than any other comparisons (byte by byte or 2 longs).
|
// this way is more efficient (according to my experiments) than any other comparisons (byte by byte or 2 longs).
|
||||||
int keyCheckLength = Math.min(BYTE_SPECIES_SIZE, keyLength);
|
|
||||||
ByteVector entryKeyVector = ByteVector.fromArray(BYTE_SPECIES, data, keyStartArrayOffset);
|
ByteVector entryKeyVector = ByteVector.fromArray(BYTE_SPECIES, data, keyStartArrayOffset);
|
||||||
long eqMask = keyVector.compare(VectorOperators.EQ, entryKeyVector).toLong();
|
long eqMask = keyVector.compare(VectorOperators.EQ, entryKeyVector).toLong();
|
||||||
int eqCount = Long.numberOfTrailingZeros(~eqMask);
|
int eqCount = Long.numberOfTrailingZeros(~eqMask);
|
||||||
if (eqCount < keyCheckLength) {
|
if (eqCount >= keyLength) {
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (keyCheckLength == keyLength) {
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
else if (keyLength <= BYTE_SPECIES_SIZE) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
keyCheckIdx = BYTE_SPECIES_SIZE;
|
keyCheckIdx = BYTE_SPECIES_SIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user