diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_ddimtirov.java b/src/main/java/dev/morling/onebrc/CalculateAverage_ddimtirov.java index 285ab00..9183893 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_ddimtirov.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_ddimtirov.java @@ -15,117 +15,232 @@ */ package dev.morling.onebrc; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; +import java.io.*; +import java.nio.CharBuffer; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.time.Instant; +import java.util.Map; import java.util.TreeMap; -import java.util.stream.Collector; - -import static java.util.stream.Collectors.groupingByConcurrent; // gunnar morling - 2:10 // roy van rijn - 1:01 -// 0:53 +// 0:37 public class CalculateAverage_ddimtirov { - public record InputLine(String station, int value) { - public static InputLine fromLine(String line) { - int endOfText = line.indexOf(";"); - - String station = line.substring(0, endOfText); - - int startOfWhole = endOfText + 1; - int sign; - if (line.charAt(startOfWhole) == '-') { - sign = -1; - startOfWhole++; - } else { - sign = 1; - } - - int endOfWhole = line.lastIndexOf("."); - var whole = unsafeParsePositiveInt(line, startOfWhole, endOfWhole); - var decimal = unsafeParsePositiveInt(line,endOfWhole+1, line.length()); - int fixpoint10 = (whole * 10 + decimal) * sign; - - return new InputLine(station, fixpoint10); - } - - static int unsafeParsePositiveInt(String s, int start, int end) { - int acc = 0; - for (int i = start; i=0 && v<=9 : String.format("Character '%s', value %,d", c, v); - acc += v; - } - return acc; - } - } - - private static class OutputMetrics { - private int min = Integer.MAX_VALUE; - private int max = Integer.MIN_VALUE; - private long sum; - private long count; - - @SuppressWarnings("ManualMinMaxCalculation") - public OutputMetrics combine(OutputMetrics o) { - var r = new OutputMetrics(); - r.min = min < o.min ? min : o.min; - r.max = max > o.max ? max : o.max; - r.sum = sum + o.sum; - r.count = count + o.count; - return r; - } - - public void accumulate(InputLine m) { - if (m.value < min) min = m.value; - if (m.value > max) max = m.value; - sum += m.value; - count++; - } - - @Override - public String toString() { - var min = this.min / 10.0; - var mean = Math.round(this.sum / (double) count) / 10.0; - var max = this.max / 10.0; - return min + "/" + mean + "/" + max; - } - } + @SuppressWarnings("RedundantSuppression") public static void main(String[] args) throws IOException { -// var start = Instant.now(); +// var start = Instant.now(); Instant start = null; var path = Path.of("./measurements.txt"); - var buffer = 8192 * 20; + var bufferSize = 512 * 64; // 64 blocks + var tracker = new Tracker(); + var charset = StandardCharsets.UTF_8; // Files.lines() is optimized for files that can be indexed by int // For larger files it falls back to buffered reader, which we now // use directly to be able to tweak the buffer size. - try (var reader = new BufferedReader(new InputStreamReader(Files.newInputStream(path)), buffer)) { - var stationsToMetrics = reader.lines() - .map(InputLine::fromLine) - .parallel() - .collect(groupingByConcurrent(InputLine::station, Collector.of( - OutputMetrics::new, - OutputMetrics::accumulate, - OutputMetrics::combine, - OutputMetrics::toString - ))); - System.out.println(new TreeMap<>(stationsToMetrics)); - assert Files.readAllLines(Path.of("expected_result.txt")).getFirst().equals(new TreeMap<>(stationsToMetrics).toString()); + try (var stream = Files.newInputStream(path); var reader = new InputStreamReader(stream, charset)) { + var buffered = new RecordReader(reader, bufferSize); + + InputRecord record = null; + while (true) { + record = buffered.readRecord(record); + if (record==null) break; + + tracker.process(record); + } } + System.out.println(tracker.stationsToMetrics()); + //noinspection ConstantValue if (start!=null) System.err.println(Duration.between(start, Instant.now())); + + assert Files.readAllLines(Path.of("expected_result.txt")).getFirst().equals(tracker.stationsToMetrics().toString()); } + /** + *

Reads records we can use to do the filtering from a stream of characters. + * Takes care of framing, and I/O buffering. + * This class in combination with {@link RecordReader} allows us to fully hide the + * I/O and internal representation. + * + *

If used with recycled {@link InputRecord} instances, this class allocates no memory + * after instantiation. The class is stateful, and the internals are not threadsafe + * - use from single thread or with proper synchronization. + */ + static class RecordReader { + /** + * The source of input data + */ + private final Readable input; + + /** + * Used for i/o buffering and record parsing. + * @see #buf + */ + private final CharBuffer buffer; + + /** + *

Cached backing array from {@link #buffer}. + *

This is optimization because {@link CharBuffer#array()} was showing on the CPU profile. + */ + private final char[] buf; + + public RecordReader(Readable input, int bufferSize) { + this.input = input; + buffer = CharBuffer.allocate(bufferSize).flip(); + buf = buffer.array(); + } + + public InputRecord readRecord(InputRecord recycled) throws IOException { + var record = parseRecord(recycled); + if (record!=null) return record; + + if (input.read(buffer.compact())==-1) return null; + buffer.flip(); + + return parseRecord(recycled); + } + + + private InputRecord parseRecord(InputRecord recycled) { + var lim = buffer.limit(); + if (buffer.isEmpty()) return null; + + var nameOff = buffer.position(); + var buff = buf; + while (buff[nameOff]=='\n' || buff[nameOff]=='\r' || buff[nameOff]==' ' ) { + nameOff++; + if (nameOff>=lim) return null; + } + + var nameHash = 0; + var nameLen = 0; + while (buff[nameOff+nameLen]!=';') { + nameHash = nameHash*31 + buff[nameOff+nameLen]; + nameLen++; + if (nameOff+nameLen>=lim) return null; + } + + //noinspection DuplicateExpressions + assert new String(buf, nameOff, nameLen).hashCode()==nameHash + : "'%s'@%d !-> %d".formatted(new String(buf, nameOff, nameLen), new String(buf, nameOff, nameLen).hashCode(), nameHash); + + var valCursor = nameOff + nameLen +1; + int signum = 1; + var acc = 0; + while (true) { + if (valCursor >= lim) { + return null; + } + char c = buff[valCursor++]; + if (c == '\n' || c == '\r') { + break; + } + if (c=='.') continue; + if (acc == 0) { + if (c == '-') { + signum = -1; + continue; + } + } else { + acc *= 10; + } + var v = c - '0'; + assert v>=0 && v<=9 : String.format("Character '%s', value %,d", c, v); + acc += v; + } + + buffer.position(valCursor); + + var record = recycled!=null ? recycled : new InputRecord(buf); + record.init(nameHash, nameOff, nameLen, acc*signum); + return record; + } + } + + + static class Tracker { + private static final int ADDRESS_NO_CLASH_MODULUS = 49999; + private static final int OFFSET_MIN = 0; + private static final int OFFSET_MAX = 1; + private static final int OFFSET_COUNT = 2; + + private final int[] minMaxCount = new int[ADDRESS_NO_CLASH_MODULUS * 3]; + private final long[] sums = new long[ADDRESS_NO_CLASH_MODULUS]; + private final String[] names = new String[ADDRESS_NO_CLASH_MODULUS]; + + public void process(InputRecord r) { + var i = Math.abs(r.nameHash) % ADDRESS_NO_CLASH_MODULUS; + + if (names[i]==null) names[i] = r.name(); + + sums[i] += r.value; + + int mmcIndex = i * 3; + var min = minMaxCount[mmcIndex + OFFSET_MIN]; + var max = minMaxCount[mmcIndex + OFFSET_MAX]; + if (r.value < min) minMaxCount[mmcIndex + OFFSET_MIN] = r.value; + if (r.value > max) minMaxCount[mmcIndex + OFFSET_MAX] = r.value; + + minMaxCount[mmcIndex + OFFSET_COUNT]++; + } + + + + public Map stationsToMetrics() { + var m = new TreeMap(); + for (int i = 0; i < names.length; i++) { + var name = names[i]; + if (name==null) continue; + + var min = minMaxCount[i*3] / 10.0; + var max = minMaxCount[i*3+1] / 10.0; + var count = minMaxCount[i*3+2]; + var sum = sums[i]; + var mean = Math.round((double) sum / count) / 10.0; + + m.put(name, min + "/" + mean + "/" + max); + } + return m; + } + + } + + static class InputRecord { + private final char[] chars; + private int idOffset; + private int idLength; + + public int value; // fixpoint scaled by 10 + public int nameHash; + + public InputRecord(char[] chars) { + this.chars = chars; + } + + public void init(int nameHash, int nameOffset, int nameLength, int fixpointValue) { + assert nameOffset+nameLength