diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_zerninv.java b/src/main/java/dev/morling/onebrc/CalculateAverage_zerninv.java index 0ca1141..cd4e3d7 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_zerninv.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_zerninv.java @@ -32,7 +32,7 @@ import java.util.concurrent.Future; public class CalculateAverage_zerninv { private static final String FILE = "./measurements.txt"; - private static final int MIN_CHUNK_SIZE = 1024 * 1024 * 16; + private static final int MIN_FILE_SIZE = 1024 * 1024 * 16; private static final char DELIMITER = ';'; private static final char LINE_SEPARATOR = '\n'; private static final char ZERO = '0'; @@ -48,10 +48,8 @@ public class CalculateAverage_zerninv { var memorySegment = channel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize, Arena.global()); long address = memorySegment.address(); var cores = Runtime.getRuntime().availableProcessors(); - var chunkAmount = cores - 1; - // var maxChunkSize = Math.min(fileSize, MIN_CHUNK_SIZE); - var maxChunkSize = fileSize < MIN_CHUNK_SIZE ? fileSize : fileSize / chunkAmount; - var chunks = splitByChunks(address, address + fileSize, maxChunkSize); + var minChunkSize = fileSize < MIN_FILE_SIZE ? fileSize : fileSize / cores; + var chunks = splitByChunks(address, address + fileSize, minChunkSize); var executor = Executors.newFixedThreadPool(cores); List>> fResults = new ArrayList<>(); @@ -97,15 +95,13 @@ public class CalculateAverage_zerninv { } } - private static List splitByChunks(long address, long end, long maxChunkSize) { + private static List splitByChunks(long address, long end, long minChunkSize) { List result = new ArrayList<>(); result.add(address); while (address < end) { - long ptr = address + Math.min(end - address, maxChunkSize) - 1; - while (UNSAFE.getByte(ptr) != LINE_SEPARATOR) { - ptr--; + address += Math.min(end - address, minChunkSize); + while (address < end && UNSAFE.getByte(address++) != LINE_SEPARATOR) { } - address = ptr + 1; result.add(address); } return result; @@ -114,41 +110,41 @@ public class CalculateAverage_zerninv { private static Map calcForChunk(long offset, long end) { var results = new MeasurementContainer(); - long cityOffset, temperatureOffset; - int hashCode, temperature; + long cityOffset; + int hashCode, temperature, multiplier; byte cityNameSize, b; while (offset < end) { cityOffset = offset; hashCode = 0; while ((b = UNSAFE.getByte(offset++)) != DELIMITER) { - hashCode = 31 * hashCode + b; + hashCode = hashCode * 31 + b; } + cityNameSize = (byte) (offset - cityOffset - 1); - temperatureOffset = offset; - cityNameSize = (byte) (temperatureOffset - cityOffset - 1); - - temperature = 0; + multiplier = 1; + temperature = UNSAFE.getByte(offset++) - ZERO; + if (temperature == MINUS - ZERO) { + multiplier = -1; + temperature = 0; + } while ((b = UNSAFE.getByte(offset++)) != LINE_SEPARATOR) { if (b >= ZERO && b <= NINE) { temperature = temperature * 10 + (b - ZERO); } } - if (UNSAFE.getByte(temperatureOffset) == MINUS) { - temperature *= -1; - } - results.put(cityOffset, cityNameSize, hashCode, (short) temperature); + results.put(cityOffset, cityNameSize, hashCode, (short) (temperature * multiplier)); } return results.toStringMap(); } private static final class MeasurementAggregation { private long sum; - private long count; + private int count; private short min; private short max; - public MeasurementAggregation(long sum, long count, short min, short max) { + public MeasurementAggregation(long sum, int count, short min, short max) { this.sum = sum; this.count = count; this.min = min; @@ -174,14 +170,14 @@ public class CalculateAverage_zerninv { private static final class MeasurementContainer { private static final int SIZE = 1024 * 16; - private static final int ENTRY_SIZE = 8 + 1 + 4 + 8 + 8 + 2 + 2; + private static final int ENTRY_SIZE = 4 + 4 + 1 + 8 + 8 + 2 + 2; private static final int COUNT_OFFSET = 0; + private static final int HASH_OFFSET = 4; private static final int SIZE_OFFSET = 8; - private static final int HASH_OFFSET = 9; - private static final int ADDRESS_OFFSET = 13; - private static final int SUM_OFFSET = 21; - private static final int MIN_OFFSET = 29; - private static final int MAX_OFFSET = 31; + private static final int ADDRESS_OFFSET = 9; + private static final int SUM_OFFSET = 17; + private static final int MIN_OFFSET = 25; + private static final int MAX_OFFSET = 27; private final long address; @@ -195,11 +191,23 @@ public class CalculateAverage_zerninv { } public void put(long address, byte size, int hash, short value) { - long ptr = findAddress(address, size, hash); + int idx = Math.abs(hash % SIZE); + long ptr = this.address + idx * ENTRY_SIZE; + int count; - UNSAFE.putLong(ptr + COUNT_OFFSET, UNSAFE.getLong(ptr + COUNT_OFFSET) + 1); - UNSAFE.putByte(ptr + SIZE_OFFSET, size); + while ((count = UNSAFE.getInt(ptr + COUNT_OFFSET)) != 0) { + if (UNSAFE.getInt(ptr + HASH_OFFSET) == hash + && UNSAFE.getByte(ptr + SIZE_OFFSET) == size + && isEqual(UNSAFE.getLong(ptr + ADDRESS_OFFSET), address, size)) { + break; + } + idx = (idx + 1) % SIZE; + ptr = this.address + idx * ENTRY_SIZE; + } + + UNSAFE.putInt(ptr + COUNT_OFFSET, count + 1); UNSAFE.putInt(ptr + HASH_OFFSET, hash); + UNSAFE.putByte(ptr + SIZE_OFFSET, size); UNSAFE.putLong(ptr + ADDRESS_OFFSET, address); UNSAFE.putLong(ptr + SUM_OFFSET, UNSAFE.getLong(ptr + SUM_OFFSET) + value); @@ -213,12 +221,14 @@ public class CalculateAverage_zerninv { public Map toStringMap() { var result = new HashMap(); + int count; for (int i = 0; i < SIZE; i++) { long ptr = this.address + i * ENTRY_SIZE; - if (UNSAFE.getLong(ptr + COUNT_OFFSET) != 0) { + count = UNSAFE.getInt(ptr + COUNT_OFFSET); + if (count != 0) { var measurements = new MeasurementAggregation( UNSAFE.getLong(ptr + SUM_OFFSET), - UNSAFE.getLong(ptr + COUNT_OFFSET), + count, UNSAFE.getShort(ptr + MIN_OFFSET), UNSAFE.getShort(ptr + MAX_OFFSET)); var key = createString(UNSAFE.getLong(ptr + ADDRESS_OFFSET), UNSAFE.getByte(ptr + SIZE_OFFSET)); @@ -228,21 +238,6 @@ public class CalculateAverage_zerninv { return result; } - private long findAddress(long address, byte size, int hash) { - int idx = Math.abs(hash % SIZE); - long ptr = this.address + idx * ENTRY_SIZE; - while (UNSAFE.getLong(ptr + COUNT_OFFSET) != 0) { - if (UNSAFE.getByte(ptr + SIZE_OFFSET) == size - && UNSAFE.getInt(ptr + HASH_OFFSET) == hash - && isEqual(UNSAFE.getLong(ptr + ADDRESS_OFFSET), address, size)) { - break; - } - idx = (idx + 1) % SIZE; - ptr = this.address + idx * ENTRY_SIZE; - } - return ptr; - } - private boolean isEqual(long address, long address2, byte size) { for (int i = 0; i < size; i++) { if (UNSAFE.getByte(address + i) != UNSAFE.getByte(address2 + i)) {