Fixed failing tests
This commit is contained in:
parent
6761d784a2
commit
6381aefcc1
@ -16,5 +16,5 @@
|
|||||||
#
|
#
|
||||||
|
|
||||||
|
|
||||||
JAVA_OPTS="-Xnoclassgc -Xms16G -Xmx16G"
|
JAVA_OPTS="-server -Xnoclassgc -Xms16G -Xmx16G"
|
||||||
java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_fatroom
|
java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_fatroom
|
||||||
|
@ -69,7 +69,7 @@ public class CalculateAverage_fatroom {
|
|||||||
long fileSize = file.length();
|
long fileSize = file.length();
|
||||||
long position = 0;
|
long position = 0;
|
||||||
|
|
||||||
List<Callable<Map<String, MeasurementAggregator>>> tasks = new ArrayList<>();
|
List<Callable<Map<Station, MeasurementAggregator>>> tasks = new LinkedList<>();
|
||||||
while (position < fileSize) {
|
while (position < fileSize) {
|
||||||
long end = Math.min(position + SEGMENT_LENGTH, fileSize);
|
long end = Math.min(position + SEGMENT_LENGTH, fileSize);
|
||||||
int length = (int) (end - position);
|
int length = (int) (end - position);
|
||||||
@ -88,42 +88,48 @@ public class CalculateAverage_fatroom {
|
|||||||
for (var future : executor.invokeAll(tasks)) {
|
for (var future : executor.invokeAll(tasks)) {
|
||||||
var segmentAggregates = future.get();
|
var segmentAggregates = future.get();
|
||||||
for (var entry : segmentAggregates.entrySet()) {
|
for (var entry : segmentAggregates.entrySet()) {
|
||||||
aggregates.merge(entry.getKey(), entry.getValue(), MeasurementAggregator::combineWith);
|
aggregates.merge(entry.getKey().toString(), entry.getValue(), MeasurementAggregator::combineWith);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
|
|
||||||
|
// no sense to wait longer than base case
|
||||||
|
executor.awaitTermination(5, TimeUnit.MINUTES);
|
||||||
|
|
||||||
System.out.println(aggregates);
|
System.out.println(aggregates);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Map<String, MeasurementAggregator> processBuffer(MappedByteBuffer source, int length) {
|
private static Map<Station, MeasurementAggregator> processBuffer(MappedByteBuffer source, int length) {
|
||||||
Map<String, MeasurementAggregator> aggregates = new HashMap<>(500);
|
|
||||||
String station;
|
Map<Station, MeasurementAggregator> aggregates = new HashMap<>(500);
|
||||||
byte[] buffer = new byte[64];
|
Station station;
|
||||||
|
byte[] buffer = new byte[200];
|
||||||
|
byte[] measurement = new byte[5];
|
||||||
int measurementLength;
|
int measurementLength;
|
||||||
int idx = 0;
|
int idx = 0;
|
||||||
|
int hash = 1;
|
||||||
for (int i = 0; i < length; ++i) {
|
for (int i = 0; i < length; ++i) {
|
||||||
byte b = source.get(i);
|
byte b = source.get(i);
|
||||||
|
hash = 31 * hash + b;
|
||||||
buffer[idx++] = b;
|
buffer[idx++] = b;
|
||||||
if (b == ';') {
|
if (b == ';') {
|
||||||
station = new String(buffer, 0, idx - 1, StandardCharsets.UTF_8);
|
station = new Station(hash, buffer, idx - 1);
|
||||||
measurementLength = 3;
|
measurementLength = 3;
|
||||||
source.position(i + 1);
|
measurement[0] = source.get(++i);
|
||||||
buffer[0] = source.get(++i);
|
measurement[1] = source.get(++i);
|
||||||
buffer[1] = source.get(++i);
|
measurement[2] = source.get(++i);
|
||||||
buffer[2] = source.get(++i);
|
measurement[3] = source.get(++i);
|
||||||
buffer[3] = source.get(++i);
|
if (measurement[3] != '\n') {
|
||||||
if (buffer[3] != '\n') {
|
|
||||||
measurementLength++;
|
measurementLength++;
|
||||||
buffer[4] = source.get(++i);
|
measurement[4] = source.get(++i);
|
||||||
if (buffer[4] != '\n') {
|
if (measurement[4] != '\n') {
|
||||||
i++;
|
i++;
|
||||||
measurementLength++;
|
measurementLength++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
aggregates.computeIfAbsent(station, s -> new MeasurementAggregator()).consume(parseMeasurement(buffer, measurementLength));
|
aggregates.computeIfAbsent(station, s -> new MeasurementAggregator()).consume(parseMeasurement(measurement, measurementLength));
|
||||||
idx = 0;
|
idx = 0;
|
||||||
|
hash = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return aggregates;
|
return aggregates;
|
||||||
@ -132,10 +138,38 @@ public class CalculateAverage_fatroom {
|
|||||||
static double parseMeasurement(byte[] source, int size) {
|
static double parseMeasurement(byte[] source, int size) {
|
||||||
int isNegativeSignPresent = ~(source[0] >> 4) & 1;
|
int isNegativeSignPresent = ~(source[0] >> 4) & 1;
|
||||||
int firstDigit = source[isNegativeSignPresent] - '0';
|
int firstDigit = source[isNegativeSignPresent] - '0';
|
||||||
int secondDigit = source[size - 3] - '0';
|
int secondDigit = source[size - 3];
|
||||||
int thirdDigit = source[size - 1] - '0';
|
int thirdDigit = source[size - 1];
|
||||||
int has4 = (size - isNegativeSignPresent) >> 2;
|
int has4 = (size - isNegativeSignPresent) >> 2;
|
||||||
int value = has4 * firstDigit * 100 + secondDigit * 10 + thirdDigit;
|
int value = has4 * firstDigit * 100 + secondDigit * 10 + thirdDigit - 528;
|
||||||
return -isNegativeSignPresent ^ value - isNegativeSignPresent;
|
return -isNegativeSignPresent ^ value - isNegativeSignPresent;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class Station {
|
||||||
|
private byte[] bytes;
|
||||||
|
private int hash;
|
||||||
|
|
||||||
|
public Station(int hash, byte[] bytes, int length) {
|
||||||
|
this.bytes = new byte[length];
|
||||||
|
System.arraycopy(bytes, 0, this.bytes, 0, length);
|
||||||
|
this.hash = hash;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return new String(bytes, 0, bytes.length, StandardCharsets.UTF_8);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
Station station = (Station) o;
|
||||||
|
if (hash != station.hash)
|
||||||
|
return false;
|
||||||
|
return Arrays.equals(bytes, station.bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return hash;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user