ddimtirov - supporting hash collisions, should have fixed #101
* ddimtirov - supporting hash collisions, should have fixed #101 * Make life easier for Windows user who need to use WSL to run the tests
This commit is contained in:
parent
e8b2d2d7b4
commit
14918bb306
14
.gitattributes
vendored
Normal file
14
.gitattributes
vendored
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
*.java text
|
||||||
|
*.md text
|
||||||
|
*.yml text
|
||||||
|
*.xml text
|
||||||
|
*.gradle text
|
||||||
|
*.properties text
|
||||||
|
mvnw text eol=lf
|
||||||
|
*.sh text eol=lf
|
||||||
|
*.bat text eol=crlf
|
||||||
|
*.cmd text eol=crlf
|
||||||
|
*.jar binary
|
||||||
|
|
||||||
|
src/test/resources/samples/*.txt text eol=lf
|
||||||
|
src/test/resources/samples/*.out text eol=lf
|
@ -17,5 +17,7 @@
|
|||||||
|
|
||||||
# --enable-preview to use the new memory mapped segments
|
# --enable-preview to use the new memory mapped segments
|
||||||
# We don't allocate much, so just give it 1G heap and turn off GC; the AlwaysPreTouch was suggested by the ergonomics
|
# We don't allocate much, so just give it 1G heap and turn off GC; the AlwaysPreTouch was suggested by the ergonomics
|
||||||
JAVA_OPTS="--enable-preview -Xms1g -Xmx1g -XX:+UnlockExperimentalVMOptions -XX:+UseEpsilonGC -XX:+AlwaysPreTouch"
|
JAVA_OPTS="--enable-preview -da -dsa -Xms1g -Xmx1g -XX:+UnlockExperimentalVMOptions -XX:+UseEpsilonGC -XX:+AlwaysPreTouch"
|
||||||
time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_ddimtirov
|
time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_ddimtirov
|
||||||
|
|
||||||
|
# & "$Env:JAVA_HOME/bin/java" --enable-preview -Xms1g -Xmx1g -XX:+UnlockExperimentalVMOptions -XX:+UseEpsilonGC -XX:+AlwaysPreTouch --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_ddimtirov
|
||||||
|
@ -26,165 +26,245 @@ import java.nio.file.Path;
|
|||||||
import java.nio.file.StandardOpenOption;
|
import java.nio.file.StandardOpenOption;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.List;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.TreeMap;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
|
|
||||||
// gunnar morling - 2:10
|
|
||||||
// roy van rijn - 1:01
|
|
||||||
// 0:37
|
|
||||||
|
|
||||||
public class CalculateAverage_ddimtirov {
|
public class CalculateAverage_ddimtirov {
|
||||||
private static final String FILE = "./measurements.txt";
|
private static final String FILE = "./measurements.txt";
|
||||||
|
private static final int MAX_STATIONS = 100_000;
|
||||||
|
private static final int MAX_STATION_NAME_LENGTH = 100;
|
||||||
|
private static final int MAX_UTF8_CODEPOINT_SIZE = 3;
|
||||||
|
|
||||||
private static final int HASH_NO_CLASH_MODULUS = 49999;
|
|
||||||
private static final int OFFSET_MIN = 0;
|
private static final int OFFSET_MIN = 0;
|
||||||
private static final int OFFSET_MAX = 1;
|
private static final int OFFSET_MAX = 1;
|
||||||
private static final int OFFSET_COUNT = 2;
|
private static final int OFFSET_COUNT = 2;
|
||||||
|
|
||||||
@SuppressWarnings("RedundantSuppression")
|
private static final boolean assertions = CalculateAverage_ddimtirov.class.desiredAssertionStatus();
|
||||||
public static void main(String[] args) throws IOException {
|
private static final Map<String, LongAdder> hashCollisionOccurrences = new ConcurrentHashMap<>();
|
||||||
var path = Path.of(FILE);
|
|
||||||
var start = Instant.now();
|
|
||||||
var desiredSegmentsCount = Runtime.getRuntime().availableProcessors();
|
|
||||||
|
|
||||||
|
@SuppressWarnings("RedundantSuppression")
|
||||||
|
public static void main(String[] args) throws IOException, InterruptedException {
|
||||||
|
var path = Path.of(args.length>0 ? args[0] : FILE);
|
||||||
|
Instant start = null;// Instant.now();
|
||||||
|
|
||||||
|
var desiredSegmentsCount = Runtime.getRuntime().availableProcessors();
|
||||||
var fileSegments = FileSegment.forFile(path, desiredSegmentsCount);
|
var fileSegments = FileSegment.forFile(path, desiredSegmentsCount);
|
||||||
|
|
||||||
var trackers = fileSegments.stream().parallel().map(fileSegment -> {
|
var loaders = new ThreadGroup("Loaders");
|
||||||
|
var trackers = Collections.synchronizedList(new ArrayList<Tracker>());
|
||||||
|
var threads = fileSegments.stream().map(fileSegment -> Thread // manually start thread per segment
|
||||||
|
.ofPlatform()
|
||||||
|
.group(loaders)
|
||||||
|
.name(STR."Segment \{fileSegment}")
|
||||||
|
.start(() -> {
|
||||||
try (var fileChannel = (FileChannel) Files.newByteChannel(path, StandardOpenOption.READ)) {
|
try (var fileChannel = (FileChannel) Files.newByteChannel(path, StandardOpenOption.READ)) {
|
||||||
var tracker = new Tracker();
|
var tracker = new Tracker();
|
||||||
var memorySegment = fileChannel.map(FileChannel.MapMode.READ_ONLY, fileSegment.start(), fileSegment.size(), Arena.ofConfined());
|
var memorySegment = fileChannel.map(FileChannel.MapMode.READ_ONLY, fileSegment.start(), fileSegment.size(), Arena.ofConfined());
|
||||||
tracker.processSegment(memorySegment);
|
tracker.processSegment(memorySegment);
|
||||||
return tracker;
|
trackers.add(tracker);
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
}).toList();
|
})
|
||||||
|
).toList();
|
||||||
|
|
||||||
var result = summarizeTrackers(trackers);
|
for (Thread thread : threads) thread.join();
|
||||||
|
assert trackers.size() == threads.size();
|
||||||
|
assert trackers.size() <= desiredSegmentsCount;
|
||||||
|
|
||||||
|
var result = summarizeTrackers(trackers.toArray(Tracker[]::new));
|
||||||
System.out.println(result);
|
System.out.println(result);
|
||||||
|
|
||||||
// noinspection ConstantValue
|
// noinspection ConstantValue
|
||||||
if (start != null)
|
if (start != null) {
|
||||||
System.err.println(Duration.between(start, Instant.now()));
|
System.err.println(Duration.between(start, Instant.now()));
|
||||||
assert Files.readAllLines(Path.of("measurements_result.txt")).getFirst().equals(result);
|
if (assertions) System.err.printf("hash clashes: %s%n", hashCollisionOccurrences);
|
||||||
|
}
|
||||||
|
assert Files.readAllLines(Path.of("measurements.out")).getFirst().equals(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
record FileSegment(int index, long start, long size) {
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return STR."#\{index} [\{start}..\{start + size}] \{size} bytes";
|
||||||
}
|
}
|
||||||
|
|
||||||
record FileSegment(long start, long size) {
|
|
||||||
public static List<FileSegment> forFile(Path file, int desiredSegmentsCount) throws IOException {
|
public static List<FileSegment> forFile(Path file, int desiredSegmentsCount) throws IOException {
|
||||||
try (var raf = new RandomAccessFile(file.toFile(), "r")) {
|
try (var raf = new RandomAccessFile(file.toFile(), "r")) {
|
||||||
var segments = new ArrayList<FileSegment>();
|
var segments = new ArrayList<FileSegment>();
|
||||||
var fileSize = raf.length();
|
var fileSize = raf.length();
|
||||||
var segmentSize = fileSize / desiredSegmentsCount;
|
var segmentSize = Math.max(1024 * 1024, fileSize / desiredSegmentsCount);
|
||||||
for (int segmentIdx = 0; segmentIdx < desiredSegmentsCount; segmentIdx++) {
|
|
||||||
var segStart = segmentIdx * segmentSize;
|
|
||||||
var segEnd = (segmentIdx == desiredSegmentsCount - 1) ? fileSize : segStart + segmentSize;
|
|
||||||
segStart = findSegmentBoundary(raf, segmentIdx, 0, segStart, segEnd);
|
|
||||||
segEnd = findSegmentBoundary(raf, segmentIdx, desiredSegmentsCount - 1, segEnd, fileSize);
|
|
||||||
|
|
||||||
var segSize = segEnd - segStart;
|
var i = 1;
|
||||||
|
var prevEnd = 0L;
|
||||||
segments.add(new FileSegment(segStart, segSize));
|
while (prevEnd < fileSize-1) {
|
||||||
|
var start = prevEnd;
|
||||||
|
var end = findNewLineAfter(raf, prevEnd + segmentSize, fileSize);
|
||||||
|
segments.add(new FileSegment(i, start, end - start));
|
||||||
|
prevEnd = end;
|
||||||
}
|
}
|
||||||
return segments;
|
return segments;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long findSegmentBoundary(RandomAccessFile raf, int i, int skipForSegment, long location, long fileSize) throws IOException {
|
private static long findNewLineAfter(RandomAccessFile raf, long location, long fileSize) throws IOException {
|
||||||
if (i == skipForSegment) return location;
|
|
||||||
|
|
||||||
raf.seek(location);
|
raf.seek(location);
|
||||||
while (location < fileSize) {
|
while (location < fileSize) {
|
||||||
location++;
|
location++;
|
||||||
if (raf.read() == '\n') break;
|
int c = raf.read();
|
||||||
|
if (c == '\r' || c == '\n') break;
|
||||||
}
|
}
|
||||||
return location;
|
return Math.min(location, fileSize - 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String summarizeTrackers(List<Tracker> trackers) {
|
static class Accumulator {
|
||||||
var result = new TreeMap<String, String>();
|
public final String name;
|
||||||
for (var i = 0; i < HASH_NO_CLASH_MODULUS; i++) {
|
public int min = Integer.MAX_VALUE, max = Integer.MIN_VALUE, count;
|
||||||
String name = null;
|
public long sum;
|
||||||
|
|
||||||
var min = Integer.MAX_VALUE;
|
public Accumulator(String name) {
|
||||||
var max = Integer.MIN_VALUE;
|
this.name = name;
|
||||||
var sum = 0L;
|
|
||||||
var count = 0L;
|
|
||||||
for (Tracker tracker : trackers) {
|
|
||||||
if (tracker.names[i] == null)
|
|
||||||
continue;
|
|
||||||
if (name == null)
|
|
||||||
name = tracker.names[i];
|
|
||||||
|
|
||||||
var minn = tracker.minMaxCount[i * 3];
|
|
||||||
var maxx = tracker.minMaxCount[i * 3 + 1];
|
|
||||||
if (minn < min)
|
|
||||||
min = minn;
|
|
||||||
if (maxx > max)
|
|
||||||
max = maxx;
|
|
||||||
count += tracker.minMaxCount[i * 3 + 2];
|
|
||||||
sum += tracker.sums[i];
|
|
||||||
}
|
}
|
||||||
if (name == null)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
|
public void accumulate(int min, int max, int count, long sum) {
|
||||||
|
if (this.min > min)
|
||||||
|
this.min = min;
|
||||||
|
if (this.max < max)
|
||||||
|
this.max = max;
|
||||||
|
this.count += count;
|
||||||
|
this.sum += sum;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
var mean = Math.round((double) sum / count) / 10.0;
|
var mean = Math.round((double) sum / count) / 10.0;
|
||||||
result.put(name, (min / 10.0) + "/" + mean + "/" + (max / 10.0));
|
return (min / 10.0) + "/" + mean + "/" + (max / 10.0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String summarizeTrackers(Tracker[] trackers) {
|
||||||
|
var result = new TreeMap<String, Accumulator>();
|
||||||
|
|
||||||
|
for (var i = 0; i < Tracker.SIZE; i++) {
|
||||||
|
Accumulator acc = null;
|
||||||
|
|
||||||
|
for (Tracker tracker : trackers) {
|
||||||
|
var name = tracker.names[i];
|
||||||
|
if (name == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
else if (acc == null || !name.equals(acc.name)) {
|
||||||
|
acc = result.computeIfAbsent(name, Accumulator::new);
|
||||||
|
}
|
||||||
|
acc.accumulate(
|
||||||
|
tracker.minMaxCount[i * 3 + OFFSET_MIN],
|
||||||
|
tracker.minMaxCount[i * 3 + OFFSET_MAX],
|
||||||
|
tracker.minMaxCount[i * 3 + OFFSET_COUNT],
|
||||||
|
tracker.sums[i]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return result.toString();
|
return result.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
static class Tracker {
|
static class Tracker {
|
||||||
private final int[] minMaxCount = new int[HASH_NO_CLASH_MODULUS * 3];
|
public static final int SIZE = MAX_STATIONS * 10;
|
||||||
private final long[] sums = new long[HASH_NO_CLASH_MODULUS];
|
|
||||||
private final String[] names = new String[HASH_NO_CLASH_MODULUS];
|
private static final int CORRECTION_0_TO_9 = '0' * 10 + '0';
|
||||||
|
private static final int CORRECTION_10_TO_99 = '0' * 100 + '0' * 10 + '0';
|
||||||
|
|
||||||
|
private final byte[] tempNameBytes = new byte[MAX_STATION_NAME_LENGTH * MAX_UTF8_CODEPOINT_SIZE];
|
||||||
|
private final int[] minMaxCount = new int[SIZE * 3];
|
||||||
|
private final long[] sums = new long[SIZE];
|
||||||
|
private final String[] names = new String[SIZE];
|
||||||
|
private final byte[][] nameBytes = new byte[SIZE][];
|
||||||
|
|
||||||
private void processSegment(MemorySegment memory) {
|
private void processSegment(MemorySegment memory) {
|
||||||
int position = 0;
|
|
||||||
long limit = memory.byteSize();
|
long limit = memory.byteSize();
|
||||||
while (position < limit) {
|
|
||||||
int pos = position;
|
var pos = 0;
|
||||||
|
|
||||||
|
// skip newlines so the chunk limit check can work correctly
|
||||||
|
while (pos < limit) {
|
||||||
|
byte c = memory.get(ValueLayout.JAVA_BYTE, pos);
|
||||||
|
if (c != '\r' && c != '\n')
|
||||||
|
break;
|
||||||
|
pos++;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (pos < limit) {
|
||||||
byte b;
|
byte b;
|
||||||
|
|
||||||
int nameLength = 0, nameHash = 0;
|
int nameLength = 0, nameHash = 0;
|
||||||
while ((b = memory.get(ValueLayout.JAVA_BYTE, pos++)) != ';') {
|
while ((b = memory.get(ValueLayout.JAVA_BYTE, pos++)) != ';') {
|
||||||
|
tempNameBytes[nameLength++] = b;
|
||||||
nameHash = nameHash * 31 + b;
|
nameHash = nameHash * 31 + b;
|
||||||
nameLength++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int temperature = 0, sign = 1;
|
int sign;
|
||||||
outer: while ((b = memory.get(ValueLayout.JAVA_BYTE, pos++)) != '\n') {
|
if (memory.get(ValueLayout.JAVA_BYTE, pos) == '-') {
|
||||||
switch (b) {
|
|
||||||
case '\r':
|
|
||||||
pos++;
|
|
||||||
break outer;
|
|
||||||
case '.':
|
|
||||||
break;
|
|
||||||
case '-':
|
|
||||||
sign = -1;
|
sign = -1;
|
||||||
break;
|
pos++;
|
||||||
default:
|
|
||||||
var digit = b - '0';
|
|
||||||
assert digit >= 0 && digit <= 9;
|
|
||||||
temperature = 10 * temperature + digit;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
processLine(nameHash, memory, position, nameLength, temperature * sign);
|
|
||||||
position = pos;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void processLine(int nameHash, MemorySegment buffer, int nameOffset, int nameLength, int temperature) {
|
|
||||||
var i = Math.abs(nameHash) % HASH_NO_CLASH_MODULUS;
|
|
||||||
|
|
||||||
if (names[i] == null) {
|
|
||||||
names[i] = parseName(buffer, nameOffset, nameLength);
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
assert parseName(buffer, nameOffset, nameLength).equals(names[i]) : parseName(buffer, nameOffset, nameLength) + "!=" + names[i];
|
sign = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int temperature; // between [-99.9 and 99.9], mapped to fixed point int (scaled by 10)
|
||||||
|
if (memory.get(ValueLayout.JAVA_BYTE, pos + 1) == '.') { // between -9.99 and 9.99
|
||||||
|
assert memory.get(ValueLayout.JAVA_BYTE, pos + 1) == '.';
|
||||||
|
temperature = memory.get(ValueLayout.JAVA_BYTE, pos) * 10 +
|
||||||
|
memory.get(ValueLayout.JAVA_BYTE, pos + 2) - CORRECTION_0_TO_9;
|
||||||
|
pos += 3; // #.# - 3 chars
|
||||||
|
}
|
||||||
|
else { // between [-99.9 and -9.99] OR [9.99 and 99.9]
|
||||||
|
assert memory.get(ValueLayout.JAVA_BYTE, pos + 2) == '.';
|
||||||
|
temperature = memory.get(ValueLayout.JAVA_BYTE, pos) * 100 +
|
||||||
|
memory.get(ValueLayout.JAVA_BYTE, pos + 1) * 10 +
|
||||||
|
memory.get(ValueLayout.JAVA_BYTE, pos + 3) - CORRECTION_10_TO_99;
|
||||||
|
pos += 4; // ##.# - 4 chars
|
||||||
|
}
|
||||||
|
|
||||||
|
processLine(nameHash, tempNameBytes, nameLength, temperature * sign);
|
||||||
|
|
||||||
|
// skip newlines so the chunk limit check can work correctly
|
||||||
|
while (pos < limit) {
|
||||||
|
byte c = memory.get(ValueLayout.JAVA_BYTE, pos);
|
||||||
|
if (c != '\r' && c != '\n')
|
||||||
|
break;
|
||||||
|
pos++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void processLine(int nameHash, byte[] nameBytesBuffer, int nameLength, int temperature) {
|
||||||
|
var i = Math.abs(nameHash) % SIZE;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
if (names[i] == null) {
|
||||||
|
byte[] trimmedBytes = Arrays.copyOf(nameBytesBuffer, nameLength);
|
||||||
|
names[i] = new String(trimmedBytes, StandardCharsets.UTF_8);
|
||||||
|
nameBytes[i] = trimmedBytes;
|
||||||
|
minMaxCount[i*3 + OFFSET_MIN] = Integer.MAX_VALUE;
|
||||||
|
minMaxCount[i*3 + OFFSET_MAX] = Integer.MIN_VALUE;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else if (nameBytes[i].length==nameLength && Arrays.equals(nameBytes[i], 0, nameLength, nameBytesBuffer, 0, nameLength)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (assertions) {
|
||||||
|
var key = new String(nameBytesBuffer, 0, nameLength, StandardCharsets.UTF_8);
|
||||||
|
hashCollisionOccurrences.computeIfAbsent(key, _ -> new LongAdder()).increment();
|
||||||
|
}
|
||||||
|
i = (i + 1) % SIZE;
|
||||||
|
}
|
||||||
|
if (assertions) {
|
||||||
|
var key = new String(nameBytesBuffer, 0, nameLength, StandardCharsets.UTF_8);
|
||||||
|
if (hashCollisionOccurrences.containsKey(key)) {
|
||||||
|
hashCollisionOccurrences.computeIfAbsent(STR."\{key}[\{i}]", _ -> new LongAdder()).increment();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sums[i] += temperature;
|
sums[i] += temperature;
|
||||||
@ -200,9 +280,5 @@ public class CalculateAverage_ddimtirov {
|
|||||||
minMaxCount[mmcIndex + OFFSET_COUNT]++;
|
minMaxCount[mmcIndex + OFFSET_COUNT]++;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String parseName(MemorySegment memory, int nameOffset, int nameLength) {
|
|
||||||
byte[] array = memory.asSlice(nameOffset, nameLength).toArray(ValueLayout.JAVA_BYTE);
|
|
||||||
return new String(array, StandardCharsets.UTF_8);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user