diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_palmr.java b/src/main/java/dev/morling/onebrc/CalculateAverage_palmr.java index c687031..5dcfcde 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_palmr.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_palmr.java @@ -27,16 +27,15 @@ import java.util.stream.StreamSupport; public class CalculateAverage_palmr { private static final String FILE = "./measurements.txt"; private static final int CHUNK_SIZE = 1024 * 1024 * 10; // Trial and error showed ~10MB to be a good size on our machine - private static final int STATION_NAME_BUFFER_SIZE = 50; + private static final int STATION_NAME_BUFFER_SIZE = 128; private static final int THREAD_COUNT = Math.min(8, Runtime.getRuntime().availableProcessors()); private static final char SEPARATOR_CHAR = ';'; - private static final char NEWLINE_CHAR = '\n'; + private static final char END_OF_RECORD = '\n'; private static final char MINUS_CHAR = '-'; private static final char DECIMAL_POINT_CHAR = '.'; public static void main(String[] args) throws IOException { - @SuppressWarnings("resource") // It's faster to leak the file than be well-behaved final var file = new RandomAccessFile(FILE, "r"); final var channel = file.getChannel(); @@ -50,22 +49,33 @@ public class CalculateAverage_palmr { private record ThreadChunk(long startPoint, long endPoint, long size) { public static Spliterator chunk(final RandomAccessFile file, final int chunkCount) throws IOException { final var fileSize = file.length(); - final var idealChunkSize = fileSize / THREAD_COUNT; + final var idealChunkSize = Math.max(CHUNK_SIZE, fileSize / THREAD_COUNT); final var chunks = new CalculateAverage_palmr.ThreadChunk[chunkCount]; + var validChunks = 0; var startPoint = 0L; for (int i = 0; i < chunkCount; i++) { var endPoint = Math.min(startPoint + idealChunkSize, fileSize); - file.seek(endPoint); - while (endPoint < fileSize && file.readByte() != NEWLINE_CHAR) { - endPoint++; + if (startPoint + idealChunkSize < fileSize) + { + file.seek(endPoint); + while (endPoint++ < fileSize && file.readByte() != END_OF_RECORD) { + Thread.onSpinWait(); + } } + final var actualSize = endPoint - startPoint; - chunks[i] = new CalculateAverage_palmr.ThreadChunk(startPoint, endPoint, actualSize); - startPoint += actualSize; + if (actualSize > 1) { + chunks[i] = new CalculateAverage_palmr.ThreadChunk(startPoint, endPoint, actualSize); + startPoint += actualSize; + validChunks++; + } + else { + break; + } } - return Spliterators.spliterator(chunks, + return Spliterators.spliterator(chunks, 0, validChunks, Spliterator.ORDERED | Spliterator.DISTINCT | Spliterator.SORTED | @@ -101,7 +111,8 @@ public class CalculateAverage_palmr { if (currentChar == SEPARATOR_CHAR) { state.parsingValue = true; - } else if (currentChar == NEWLINE_CHAR) { + } + else if (currentChar == END_OF_RECORD) { if (state.stationPointerEnd != 0) { final var value = state.measurementValue * state.exponent; @@ -114,20 +125,24 @@ public class CalculateAverage_palmr { // reset state.reset(); - } else { + } + else { if (!state.parsingValue) { state.stationBuffer[state.stationPointerEnd++] = currentChar; state.signedHashCode = 31 * state.signedHashCode + (currentChar & 0xff); - } else { + } + else { if (currentChar == MINUS_CHAR) { state.exponent = -0.1; - } else if (currentChar != DECIMAL_POINT_CHAR) { + } + else if (currentChar != DECIMAL_POINT_CHAR) { state.measurementValue = state.measurementValue * 10 + (currentChar - '0'); } } } } - } catch (IOException e) { + } + catch (IOException e) { throw new RuntimeException(e); } } @@ -188,7 +203,7 @@ public class CalculateAverage_palmr { * IT also uses Linear probing for collision resolution, which given the minimal collision count should hold up well. */ private static class ByteArrayKeyedMap { - private final int BUCKET_COUNT = 0xFFF; // 413 unique stations in the data set, & 0xFFF ~= 399 (only 14 collisions (given our hashcode implementation)) + private final int BUCKET_COUNT = 0xFFFF; private final MeasurementAggregator[] buckets = new MeasurementAggregator[BUCKET_COUNT + 1]; private final List compactUnorderedBuckets = new ArrayList<>(413); @@ -203,7 +218,8 @@ public class CalculateAverage_palmr { } index++; index &= BUCKET_COUNT; - } else { + } + else { final var copiedKey = Arrays.copyOf(key, keyLength); MeasurementAggregator measurementAggregator = new MeasurementAggregator(copiedKey, keyHashCode); buckets[index] = measurementAggregator;