From ff35a4628b7dd5d0b5050345d9de8840d2b3b053 Mon Sep 17 00:00:00 2001 From: Andrzej Nestoruk Date: Sun, 28 Jan 2024 22:59:04 +0100 Subject: [PATCH] anestoruk second attempt (#625) * initial implementation * few improvements and a cleanup (down to ~12s) * use array instead of hashmap for collecting partial results --- .../onebrc/CalculateAverage_anestoruk.java | 126 +++++++++--------- 1 file changed, 64 insertions(+), 62 deletions(-) diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_anestoruk.java b/src/main/java/dev/morling/onebrc/CalculateAverage_anestoruk.java index add938f..293087e 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_anestoruk.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_anestoruk.java @@ -22,9 +22,7 @@ import java.nio.channels.FileChannel; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -69,14 +67,14 @@ public class CalculateAverage_anestoruk { TreeMap result = new TreeMap<>(); try (ExecutorService executor = Executors.newFixedThreadPool(cpus)) { - List>> futures = new ArrayList<>(); + List> futures = new ArrayList<>(); for (SegmentRange range : rangeList) { futures.add(supplyAsync(() -> process(range, segment), executor)); } - for (CompletableFuture> future : futures) { + for (CompletableFuture future : futures) { try { - Map partialResult = future.get(); - combine(result, partialResult); + Record[] partialResult = future.get(); + mergeResult(result, partialResult); } catch (InterruptedException | ExecutionException ex) { throw new RuntimeException(ex); @@ -87,20 +85,19 @@ public class CalculateAverage_anestoruk { System.out.println(result); } - private static Map process(SegmentRange range, MemorySegment segment) { - Map partialResult = new HashMap<>(1_000); - byte[] buffer = new byte[100]; + private static Record[] process(SegmentRange range, MemorySegment segment) { + Record[] records = new Record[1024 * 100]; + byte[] cityBuffer = new byte[100]; long offset = range.startOffset; byte b; while (offset < range.endOffset) { - int cityIdx = 0; + int cityLength = 0; + int hash = 0; while ((b = segment.get(JAVA_BYTE, offset++)) != ';') { - buffer[cityIdx++] = b; + cityBuffer[cityLength++] = b; + hash = hash * 31 + b; } - byte[] city = new byte[cityIdx]; - System.arraycopy(buffer, 0, city, 0, cityIdx); - ByteWrapper cityWrapper = new ByteWrapper(city); - + hash = Math.abs(hash); int value = 0; boolean negative; if ((b = segment.get(JAVA_BYTE, offset++)) == '-') { @@ -116,45 +113,77 @@ public class CalculateAverage_anestoruk { } } int temperature = negative ? -value : value; - - partialResult.compute(cityWrapper, (_, record) -> update(record, temperature)); + byte[] city = new byte[cityLength]; + System.arraycopy(cityBuffer, 0, city, 0, cityLength); + addResult(records, hash, city, temperature); + } + return records; + } + + private static void addResult(Record[] records, int hash, byte[] city, int temperature) { + int idx = hash % records.length; + Record record; + while ((record = records[idx]) != null) { + if (record.hash == hash && Arrays.equals(record.city, city)) { + record.add(temperature); + return; + } + idx = (idx + 1) % records.length; + } + records[idx] = new Record(hash, city, temperature); + } + + private static void mergeResult(TreeMap result, Record[] partialResult) { + for (Record partialRecord : partialResult) { + if (partialRecord == null) { + continue; + } + String cityName = new String(partialRecord.city, UTF_8); + result.compute(cityName, (_, record) -> { + if (record == null) { + return partialRecord; + } + record.merge(partialRecord); + return record; + }); } - return partialResult; } private record SegmentRange(long startOffset, long endOffset) { } - private record ByteWrapper(byte[] bytes) { - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ByteWrapper that = (ByteWrapper) o; - return Arrays.equals(bytes, that.bytes); - } - - @Override - public int hashCode() { - return Arrays.hashCode(bytes); - } - } - private static class Record { + private final int hash; + private final byte[] city; private int min; private int max; private long sum; private int count; - public Record(int temperature) { + public Record(int hash, byte[] city, int temperature) { + this.hash = hash; + this.city = city; this.min = temperature; this.max = temperature; this.sum = temperature; this.count = 1; } + public void add(int temperature) { + min = min(min, temperature); + max = max(max, temperature); + sum += temperature; + count++; + } + + public void merge(Record other) { + min = min(min, other.min); + max = max(max, other.max); + sum += other.sum; + count += other.count; + } + @Override public String toString() { return "%.1f/%.1f/%.1f".formatted( @@ -163,31 +192,4 @@ public class CalculateAverage_anestoruk { (max / 10.0)); } } - - private static Record update(Record record, int temperature) { - if (record == null) { - return new Record(temperature); - } - record.min = min(record.min, temperature); - record.max = max(record.max, temperature); - record.sum += temperature; - record.count++; - return record; - } - - private static void combine(TreeMap result, Map partialResult) { - partialResult.forEach((wrapper, partialRecord) -> { - String city = new String(wrapper.bytes, UTF_8); - result.compute(city, (_, record) -> { - if (record == null) { - return partialRecord; - } - record.min = min(record.min, partialRecord.min); - record.max = max(record.max, partialRecord.max); - record.sum += partialRecord.sum; - record.count += partialRecord.count; - return record; - }); - }); - } }