diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java b/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java index 65528c4..96ecf08 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java @@ -32,7 +32,7 @@ public class CalculateAverage_isolgpus { public static final int HISTOGRAMS_LENGTH = 1024 * 32; public static final int HISTOGRAMS_MASK = HISTOGRAMS_LENGTH - 1; - public static final int THREAD_COUNT = 8; + public static final int THREAD_COUNT = Runtime.getRuntime().availableProcessors(); private static final String FILE = "./measurements.txt"; public static final byte SEPERATOR = 59; public static final byte OFFSET = 48; @@ -46,7 +46,7 @@ public class CalculateAverage_isolgpus { File file = Paths.get(FILE).toFile(); long length = file.length(); - long chunksCount = Math.max(THREAD_COUNT, (int) Math.ceil(length / (double) MAX_CHUNK_SIZE)); + long chunksCount = length < 8_000_000 ? 1 : Math.max(THREAD_COUNT, (int) Math.ceil(length / (double) MAX_CHUNK_SIZE)); long estimatedChunkSize = length / chunksCount; @@ -80,7 +80,7 @@ public class CalculateAverage_isolgpus { while (measurementCollectorFromChunk != null) { MeasurementCollector currentMergedResult = mergedResults.get(new String(measurementCollectorFromChunk.name)); if (currentMergedResult == null) { - currentMergedResult = new MeasurementCollector(measurementCollectorFromChunk.name); + currentMergedResult = new MeasurementCollector(measurementCollectorFromChunk.name, measurementCollectorFromChunk.nameSum); mergedResults.put(new String(currentMergedResult.name), currentMergedResult); } currentMergedResult.merge(measurementCollectorFromChunk); @@ -102,16 +102,10 @@ public class CalculateAverage_isolgpus { MappedByteBuffer r = channel.map(FileChannel.MapMode.READ_ONLY, seekStart, length); - byte[] nameBuffer = new byte[100]; boolean isNegative; byte[] valueBuffer = new byte[3]; MeasurementCollector[] measurementCollectors = new MeasurementCollector[HISTOGRAMS_LENGTH]; - int valueIndex = 0; - int nameBufferIndex = 0; - int nameSum = 0; - boolean parsingName = true; - long i = 0; - int hashResult = 0; + int i = 0; // seek to the start of the next message if (estimatedStart != 0) { @@ -123,38 +117,49 @@ public class CalculateAverage_isolgpus { try { - while (i <= lengthOfChunk || !parsingName) { + while (i <= lengthOfChunk) { + int nameSum = 0; + int hashResult = 0; + int nameStart; byte aChar; - if (parsingName) { + nameStart = i; + int nameBufferIndex = 0; + int valueIndex = 0; - while ((aChar = r.get()) != SEPERATOR) { - nameBuffer[nameBufferIndex++] = aChar; - nameSum += aChar; - hashResult = 31 * hashResult + aChar; + // optimistically assume that the name is at least 4 bytes + int firstInt = r.getInt(); + nameBufferIndex = 4; + nameSum = firstInt; + hashResult = 31 * firstInt; + + while ((aChar = r.get()) != SEPERATOR) { + nameSum += aChar; + // hash as we go, stolen after a discussion with palmr + hashResult = 31 * hashResult + aChar; + nameBufferIndex++; + + // oh no we read too much, do it the byte for byte way instead + if (aChar == NEW_LINE) { + r.position(i); + nameBufferIndex = 0; + nameSum = 0; + hashResult = 0; } - parsingName = false; - i += nameBufferIndex + 1; } - else { - isNegative = (aChar = r.get()) == NEGATIVE; - valueIndex = readNumber(isNegative, valueBuffer, valueIndex, aChar, r); - byte decimalValue = r.get(); + i += nameBufferIndex + 1; - int value = resolveValue(valueIndex, valueBuffer, decimalValue, isNegative); - // new line character - r.get(); + isNegative = (aChar = r.get()) == NEGATIVE; + valueIndex = readNumber(isNegative, valueBuffer, valueIndex, aChar, r); - MeasurementCollector measurementCollector = resolveMeasurementCollector(measurementCollectors, hashResult, nameBuffer, nameBufferIndex, nameSum); + int decimalValue = r.getShort() >> 8; - measurementCollector.feed(value); - i += valueIndex + (isNegative ? 4 : 3); - valueIndex = 0; - nameBufferIndex = 0; - nameSum = 0; - parsingName = true; - hashResult = 0; - } + int value = resolveValue(valueIndex, valueBuffer, decimalValue, isNegative); + + MeasurementCollector measurementCollector = resolveMeasurementCollector(measurementCollectors, hashResult, nameStart, nameBufferIndex, nameSum, r); + + measurementCollector.feed(value); + i += valueIndex + (isNegative ? 4 : 3); } } @@ -168,18 +173,22 @@ public class CalculateAverage_isolgpus { return measurementCollectors; } - private static MeasurementCollector resolveMeasurementCollector(MeasurementCollector[] measurementCollectors, int hash, byte[] nameBuffer, int nameBufferIndex, - int nameSum) { + private static MeasurementCollector resolveMeasurementCollector(MeasurementCollector[] measurementCollectors, int hash, int nameStart, int nameBufferLength, + int nameSum, MappedByteBuffer r) { MeasurementCollector measurementCollector = measurementCollectors[hash & HISTOGRAMS_MASK]; if (measurementCollector == null) { - measurementCollector = new MeasurementCollector(Arrays.copyOf(nameBuffer, nameBufferIndex)); + byte[] nameBuffer = new byte[nameBufferLength]; + r.get(nameStart, nameBuffer, 0, nameBufferLength); + measurementCollector = new MeasurementCollector(nameBuffer, nameSum); measurementCollectors[hash & HISTOGRAMS_MASK] = measurementCollector; } else { // collision unhappy path, try to avoid - while (!nameEquals(measurementCollector.name, measurementCollector.nameSum, nameSum, nameBufferIndex)) { + while (!nameEquals(measurementCollector.name, measurementCollector.nameSum, nameSum, nameBufferLength)) { if (measurementCollector.link == null) { - measurementCollector.link = new MeasurementCollector(Arrays.copyOf(nameBuffer, nameBufferIndex)); + byte[] nameBuffer = new byte[nameBufferLength]; + r.get(nameStart, nameBuffer, 0, nameBufferLength); + measurementCollector.link = new MeasurementCollector(nameBuffer, nameSum); measurementCollector = measurementCollector.link; break; } @@ -201,7 +210,7 @@ public class CalculateAverage_isolgpus { return incomingNameSum == existingNameSum; } - private static int resolveValue(int valueIndex, byte[] valueBuffer, byte decimalValue, boolean isNegative) { + private static int resolveValue(int valueIndex, byte[] valueBuffer, int decimalValue, boolean isNegative) { int value; if (valueIndex == 1) { value = ((valueBuffer[0] - OFFSET) * 10) + (decimalValue - OFFSET); @@ -238,13 +247,9 @@ public class CalculateAverage_isolgpus { private int min = Integer.MAX_VALUE; private int max = Integer.MIN_VALUE; - public MeasurementCollector(byte[] name) { + public MeasurementCollector(byte[] name, int nameSum) { this.name = name; - int nameSum = 0; - for (int i = 0; i < name.length; i++) { - nameSum += name[i]; - } this.nameSum = nameSum; } @@ -279,7 +284,6 @@ public class CalculateAverage_isolgpus { @Override public String toString() { - // Abha=-24.9/18.0/61.7 return name + "=" + min + "/" + mean + "/" + max; }