isolgpus: submission 2 - about a 25% improvement on submission 1. (#168)

* isolgpus: fix chunk sizing when not at 8 threads
use as many cores as are available
don't buffer the station name, only use it when we need it.
get rid of the main branch
move variables inside the loop

* isolgpus: optimistically assume we can read a whole int for the station name, but roll back if we get it wrong. This should be very beneficial on a dataset where station names are mostly over 4 chars

---------

Co-authored-by: Jamie Stansfield <jalstansfield@gmail.com>
This commit is contained in:
Jamie Stansfield
2024-01-07 19:55:30 +00:00
committed by GitHub
parent 22d8580ac8
commit dbd8ca4562

View File

@@ -32,7 +32,7 @@ public class CalculateAverage_isolgpus {
public static final int HISTOGRAMS_LENGTH = 1024 * 32; public static final int HISTOGRAMS_LENGTH = 1024 * 32;
public static final int HISTOGRAMS_MASK = HISTOGRAMS_LENGTH - 1; public static final int HISTOGRAMS_MASK = HISTOGRAMS_LENGTH - 1;
public static final int THREAD_COUNT = 8; public static final int THREAD_COUNT = Runtime.getRuntime().availableProcessors();
private static final String FILE = "./measurements.txt"; private static final String FILE = "./measurements.txt";
public static final byte SEPERATOR = 59; public static final byte SEPERATOR = 59;
public static final byte OFFSET = 48; public static final byte OFFSET = 48;
@@ -46,7 +46,7 @@ public class CalculateAverage_isolgpus {
File file = Paths.get(FILE).toFile(); File file = Paths.get(FILE).toFile();
long length = file.length(); long length = file.length();
long chunksCount = Math.max(THREAD_COUNT, (int) Math.ceil(length / (double) MAX_CHUNK_SIZE)); long chunksCount = length < 8_000_000 ? 1 : Math.max(THREAD_COUNT, (int) Math.ceil(length / (double) MAX_CHUNK_SIZE));
long estimatedChunkSize = length / chunksCount; long estimatedChunkSize = length / chunksCount;
@@ -80,7 +80,7 @@ public class CalculateAverage_isolgpus {
while (measurementCollectorFromChunk != null) { while (measurementCollectorFromChunk != null) {
MeasurementCollector currentMergedResult = mergedResults.get(new String(measurementCollectorFromChunk.name)); MeasurementCollector currentMergedResult = mergedResults.get(new String(measurementCollectorFromChunk.name));
if (currentMergedResult == null) { if (currentMergedResult == null) {
currentMergedResult = new MeasurementCollector(measurementCollectorFromChunk.name); currentMergedResult = new MeasurementCollector(measurementCollectorFromChunk.name, measurementCollectorFromChunk.nameSum);
mergedResults.put(new String(currentMergedResult.name), currentMergedResult); mergedResults.put(new String(currentMergedResult.name), currentMergedResult);
} }
currentMergedResult.merge(measurementCollectorFromChunk); currentMergedResult.merge(measurementCollectorFromChunk);
@@ -102,16 +102,10 @@ public class CalculateAverage_isolgpus {
MappedByteBuffer r = channel.map(FileChannel.MapMode.READ_ONLY, seekStart, length); MappedByteBuffer r = channel.map(FileChannel.MapMode.READ_ONLY, seekStart, length);
byte[] nameBuffer = new byte[100];
boolean isNegative; boolean isNegative;
byte[] valueBuffer = new byte[3]; byte[] valueBuffer = new byte[3];
MeasurementCollector[] measurementCollectors = new MeasurementCollector[HISTOGRAMS_LENGTH]; MeasurementCollector[] measurementCollectors = new MeasurementCollector[HISTOGRAMS_LENGTH];
int valueIndex = 0; int i = 0;
int nameBufferIndex = 0;
int nameSum = 0;
boolean parsingName = true;
long i = 0;
int hashResult = 0;
// seek to the start of the next message // seek to the start of the next message
if (estimatedStart != 0) { if (estimatedStart != 0) {
@@ -123,38 +117,49 @@ public class CalculateAverage_isolgpus {
try { try {
while (i <= lengthOfChunk || !parsingName) { while (i <= lengthOfChunk) {
int nameSum = 0;
int hashResult = 0;
int nameStart;
byte aChar; byte aChar;
if (parsingName) { nameStart = i;
int nameBufferIndex = 0;
int valueIndex = 0;
// optimistically assume that the name is at least 4 bytes
int firstInt = r.getInt();
nameBufferIndex = 4;
nameSum = firstInt;
hashResult = 31 * firstInt;
while ((aChar = r.get()) != SEPERATOR) { while ((aChar = r.get()) != SEPERATOR) {
nameBuffer[nameBufferIndex++] = aChar;
nameSum += aChar; nameSum += aChar;
// hash as we go, stolen after a discussion with palmr
hashResult = 31 * hashResult + aChar; hashResult = 31 * hashResult + aChar;
nameBufferIndex++;
// oh no we read too much, do it the byte for byte way instead
if (aChar == NEW_LINE) {
r.position(i);
nameBufferIndex = 0;
nameSum = 0;
hashResult = 0;
} }
parsingName = false; }
i += nameBufferIndex + 1; i += nameBufferIndex + 1;
}
else {
isNegative = (aChar = r.get()) == NEGATIVE; isNegative = (aChar = r.get()) == NEGATIVE;
valueIndex = readNumber(isNegative, valueBuffer, valueIndex, aChar, r); valueIndex = readNumber(isNegative, valueBuffer, valueIndex, aChar, r);
byte decimalValue = r.get(); int decimalValue = r.getShort() >> 8;
int value = resolveValue(valueIndex, valueBuffer, decimalValue, isNegative); int value = resolveValue(valueIndex, valueBuffer, decimalValue, isNegative);
// new line character
r.get();
MeasurementCollector measurementCollector = resolveMeasurementCollector(measurementCollectors, hashResult, nameBuffer, nameBufferIndex, nameSum); MeasurementCollector measurementCollector = resolveMeasurementCollector(measurementCollectors, hashResult, nameStart, nameBufferIndex, nameSum, r);
measurementCollector.feed(value); measurementCollector.feed(value);
i += valueIndex + (isNegative ? 4 : 3); i += valueIndex + (isNegative ? 4 : 3);
valueIndex = 0;
nameBufferIndex = 0;
nameSum = 0;
parsingName = true;
hashResult = 0;
}
} }
} }
@@ -168,18 +173,22 @@ public class CalculateAverage_isolgpus {
return measurementCollectors; return measurementCollectors;
} }
private static MeasurementCollector resolveMeasurementCollector(MeasurementCollector[] measurementCollectors, int hash, byte[] nameBuffer, int nameBufferIndex, private static MeasurementCollector resolveMeasurementCollector(MeasurementCollector[] measurementCollectors, int hash, int nameStart, int nameBufferLength,
int nameSum) { int nameSum, MappedByteBuffer r) {
MeasurementCollector measurementCollector = measurementCollectors[hash & HISTOGRAMS_MASK]; MeasurementCollector measurementCollector = measurementCollectors[hash & HISTOGRAMS_MASK];
if (measurementCollector == null) { if (measurementCollector == null) {
measurementCollector = new MeasurementCollector(Arrays.copyOf(nameBuffer, nameBufferIndex)); byte[] nameBuffer = new byte[nameBufferLength];
r.get(nameStart, nameBuffer, 0, nameBufferLength);
measurementCollector = new MeasurementCollector(nameBuffer, nameSum);
measurementCollectors[hash & HISTOGRAMS_MASK] = measurementCollector; measurementCollectors[hash & HISTOGRAMS_MASK] = measurementCollector;
} }
else { else {
// collision unhappy path, try to avoid // collision unhappy path, try to avoid
while (!nameEquals(measurementCollector.name, measurementCollector.nameSum, nameSum, nameBufferIndex)) { while (!nameEquals(measurementCollector.name, measurementCollector.nameSum, nameSum, nameBufferLength)) {
if (measurementCollector.link == null) { if (measurementCollector.link == null) {
measurementCollector.link = new MeasurementCollector(Arrays.copyOf(nameBuffer, nameBufferIndex)); byte[] nameBuffer = new byte[nameBufferLength];
r.get(nameStart, nameBuffer, 0, nameBufferLength);
measurementCollector.link = new MeasurementCollector(nameBuffer, nameSum);
measurementCollector = measurementCollector.link; measurementCollector = measurementCollector.link;
break; break;
} }
@@ -201,7 +210,7 @@ public class CalculateAverage_isolgpus {
return incomingNameSum == existingNameSum; return incomingNameSum == existingNameSum;
} }
private static int resolveValue(int valueIndex, byte[] valueBuffer, byte decimalValue, boolean isNegative) { private static int resolveValue(int valueIndex, byte[] valueBuffer, int decimalValue, boolean isNegative) {
int value; int value;
if (valueIndex == 1) { if (valueIndex == 1) {
value = ((valueBuffer[0] - OFFSET) * 10) + (decimalValue - OFFSET); value = ((valueBuffer[0] - OFFSET) * 10) + (decimalValue - OFFSET);
@@ -238,13 +247,9 @@ public class CalculateAverage_isolgpus {
private int min = Integer.MAX_VALUE; private int min = Integer.MAX_VALUE;
private int max = Integer.MIN_VALUE; private int max = Integer.MIN_VALUE;
public MeasurementCollector(byte[] name) { public MeasurementCollector(byte[] name, int nameSum) {
this.name = name; this.name = name;
int nameSum = 0;
for (int i = 0; i < name.length; i++) {
nameSum += name[i];
}
this.nameSum = nameSum; this.nameSum = nameSum;
} }
@@ -279,7 +284,6 @@ public class CalculateAverage_isolgpus {
@Override @Override
public String toString() { public String toString() {
// Abha=-24.9/18.0/61.7
return name + "=" + min + "/" + mean + "/" + max; return name + "=" + min + "/" + mean + "/" + max;
} }