diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_ddimtirov.java b/src/main/java/dev/morling/onebrc/CalculateAverage_ddimtirov.java index 9183893..5a9261b 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_ddimtirov.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_ddimtirov.java @@ -15,14 +15,18 @@ */ package dev.morling.onebrc; + import java.io.*; -import java.nio.CharBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.time.Duration; import java.time.Instant; -import java.util.Map; +import java.util.ArrayList; +import java.util.List; import java.util.TreeMap; // gunnar morling - 2:10 @@ -30,217 +34,167 @@ import java.util.TreeMap; // 0:37 public class CalculateAverage_ddimtirov { + private static final String FILE = "./measurements.txt"; + private static final int HASH_NO_CLASH_MODULUS = 49999; + private static final int OFFSET_MIN = 0; + private static final int OFFSET_MAX = 1; + private static final int OFFSET_COUNT = 2; @SuppressWarnings("RedundantSuppression") public static void main(String[] args) throws IOException { -// var start = Instant.now(); - Instant start = null; - var path = Path.of("./measurements.txt"); - var bufferSize = 512 * 64; // 64 blocks - var tracker = new Tracker(); - var charset = StandardCharsets.UTF_8; + var path = Path.of(FILE); + var start = Instant.now(); + var desiredSegmentsCount = Runtime.getRuntime().availableProcessors(); - // Files.lines() is optimized for files that can be indexed by int - // For larger files it falls back to buffered reader, which we now - // use directly to be able to tweak the buffer size. - try (var stream = Files.newInputStream(path); var reader = new InputStreamReader(stream, charset)) { - var buffered = new RecordReader(reader, bufferSize); + var segments = FileSegment.forFile(path, desiredSegmentsCount); - InputRecord record = null; - while (true) { - record = buffered.readRecord(record); - if (record==null) break; - - tracker.process(record); + var trackers = segments.stream().parallel().map(segment -> { + try (var fileChannel = (FileChannel) Files.newByteChannel(path, StandardOpenOption.READ)) { + var tracker = new Tracker(); + var segmentBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, segment.start(), segment.size()); + tracker.processSegment(segmentBuffer, segment.end()); + return tracker; + } catch (IOException e) { + throw new RuntimeException(e); } - } + }).toList(); - System.out.println(tracker.stationsToMetrics()); + var result = summarizeTrackers(trackers); + System.out.println(result); //noinspection ConstantValue if (start!=null) System.err.println(Duration.between(start, Instant.now())); - - assert Files.readAllLines(Path.of("expected_result.txt")).getFirst().equals(tracker.stationsToMetrics().toString()); + assert Files.readAllLines(Path.of("expected_result.txt")).getFirst().equals(result); } - /** - *

Reads records we can use to do the filtering from a stream of characters. - * Takes care of framing, and I/O buffering. - * This class in combination with {@link RecordReader} allows us to fully hide the - * I/O and internal representation. - * - *

If used with recycled {@link InputRecord} instances, this class allocates no memory - * after instantiation. The class is stateful, and the internals are not threadsafe - * - use from single thread or with proper synchronization. - */ - static class RecordReader { - /** - * The source of input data - */ - private final Readable input; - /** - * Used for i/o buffering and record parsing. - * @see #buf - */ - private final CharBuffer buffer; + record FileSegment(long start, long end) { + public long size() { return end() - start(); } - /** - *

Cached backing array from {@link #buffer}. - *

This is optimization because {@link CharBuffer#array()} was showing on the CPU profile. - */ - private final char[] buf; + public static List forFile(Path file, int desiredSegmentsCount) throws IOException { + try (var raf = new RandomAccessFile(file.toFile(), "r")) { + List segments = new ArrayList<>(); + long fileSize = raf.length(); + long segmentSize = fileSize / desiredSegmentsCount; + for (int segmentIdx = 0; segmentIdx < desiredSegmentsCount; segmentIdx++) { + long segStart = segmentIdx * segmentSize; + long segEnd = (segmentIdx == desiredSegmentsCount - 1) ? fileSize : segStart + segmentSize; + segStart = findSegmentBoundary(raf, segmentIdx, 0, segStart, segEnd); + segEnd = findSegmentBoundary(raf, segmentIdx, desiredSegmentsCount - 1, segEnd, fileSize); - public RecordReader(Readable input, int bufferSize) { - this.input = input; - buffer = CharBuffer.allocate(bufferSize).flip(); - buf = buffer.array(); + segments.add(new FileSegment(segStart, segEnd)); + } + return segments; + } } - public InputRecord readRecord(InputRecord recycled) throws IOException { - var record = parseRecord(recycled); - if (record!=null) return record; + private static long findSegmentBoundary(RandomAccessFile raf, int i, int skipForSegment, long location, long fileSize) throws IOException { + if (i == skipForSegment) return location; - if (input.read(buffer.compact())==-1) return null; - buffer.flip(); - - return parseRecord(recycled); - } - - - private InputRecord parseRecord(InputRecord recycled) { - var lim = buffer.limit(); - if (buffer.isEmpty()) return null; - - var nameOff = buffer.position(); - var buff = buf; - while (buff[nameOff]=='\n' || buff[nameOff]=='\r' || buff[nameOff]==' ' ) { - nameOff++; - if (nameOff>=lim) return null; + raf.seek(location); + while (location < fileSize) { + location++; + if (raf.read() == '\n') break; } - - var nameHash = 0; - var nameLen = 0; - while (buff[nameOff+nameLen]!=';') { - nameHash = nameHash*31 + buff[nameOff+nameLen]; - nameLen++; - if (nameOff+nameLen>=lim) return null; - } - - //noinspection DuplicateExpressions - assert new String(buf, nameOff, nameLen).hashCode()==nameHash - : "'%s'@%d !-> %d".formatted(new String(buf, nameOff, nameLen), new String(buf, nameOff, nameLen).hashCode(), nameHash); - - var valCursor = nameOff + nameLen +1; - int signum = 1; - var acc = 0; - while (true) { - if (valCursor >= lim) { - return null; - } - char c = buff[valCursor++]; - if (c == '\n' || c == '\r') { - break; - } - if (c=='.') continue; - if (acc == 0) { - if (c == '-') { - signum = -1; - continue; - } - } else { - acc *= 10; - } - var v = c - '0'; - assert v>=0 && v<=9 : String.format("Character '%s', value %,d", c, v); - acc += v; - } - - buffer.position(valCursor); - - var record = recycled!=null ? recycled : new InputRecord(buf); - record.init(nameHash, nameOff, nameLen, acc*signum); - return record; + return location; } } + private static String summarizeTrackers(List trackers) { + var result = new TreeMap(); + for (int i = 0; i < HASH_NO_CLASH_MODULUS; i++) { + String name = null; + + int min = Integer.MAX_VALUE; + int max = Integer.MIN_VALUE; + long sum = 0; + long count = 0; + for (Tracker tracker : trackers) { + if (tracker.names[i]==null) continue; + if (name==null) name = tracker.names[i]; + + var minn = tracker.minMaxCount[i*3]; + var maxx = tracker.minMaxCount[i*3+1]; + if (minnmax) max = maxx; + count += tracker.minMaxCount[i*3+2]; + sum += tracker.sums[i]; + } + if (name==null) continue; + + var mean = Math.round((double) sum / count) / 10.0; + result.put(name, (min/10.0) + "/" + mean + "/" + (max/10.0)); + } + return result.toString(); + } static class Tracker { - private static final int ADDRESS_NO_CLASH_MODULUS = 49999; - private static final int OFFSET_MIN = 0; - private static final int OFFSET_MAX = 1; - private static final int OFFSET_COUNT = 2; + private final int[] minMaxCount = new int[HASH_NO_CLASH_MODULUS * 3]; + private final long[] sums = new long[HASH_NO_CLASH_MODULUS]; + private final String[] names = new String[HASH_NO_CLASH_MODULUS]; + private final byte[] nameThreadLocal = new byte[64]; - private final int[] minMaxCount = new int[ADDRESS_NO_CLASH_MODULUS * 3]; - private final long[] sums = new long[ADDRESS_NO_CLASH_MODULUS]; - private final String[] names = new String[ADDRESS_NO_CLASH_MODULUS]; + private void processSegment(MappedByteBuffer segmentBuffer, long segmentEnd) { + int startLine; + int limit = segmentBuffer.limit(); + while ((startLine = segmentBuffer.position()) < limit) { + int pos = startLine; + byte b; - public void process(InputRecord r) { - var i = Math.abs(r.nameHash) % ADDRESS_NO_CLASH_MODULUS; + int nameLength = 0, nameHash = 0; + while (pos != segmentEnd && (b = segmentBuffer.get(pos++)) != ';') { + nameHash = nameHash*31 + b; + nameLength++; + } - if (names[i]==null) names[i] = r.name(); + int temperature = 0, sign = 1; + outer: + while (pos != segmentEnd && (b = segmentBuffer.get(pos++)) != '\n') { + switch (b) { + case '\r' : + pos++; + break outer; + case '.' : + break; + case '-' : + sign = -1; + break; + default : + var digit = b - '0'; + assert digit >= 0 && digit <= 9; + temperature = 10 * temperature + digit; + } + } - sums[i] += r.value; + processLine(nameHash, segmentBuffer, startLine, nameLength, temperature * sign); + segmentBuffer.position(pos); + } + } + + public void processLine(int nameHash, MappedByteBuffer buffer, int nameOffset, int nameLength, int temperature) { + var i = Math.abs(nameHash) % HASH_NO_CLASH_MODULUS; + + if (names[i]==null) { + names[i] = parseName(buffer, nameOffset, nameLength); + } else { + assert parseName(buffer, nameOffset, nameLength).equals(names[i]) : parseName(buffer, nameOffset, nameLength) + "!=" + names[i]; + } + + sums[i] += temperature; int mmcIndex = i * 3; var min = minMaxCount[mmcIndex + OFFSET_MIN]; var max = minMaxCount[mmcIndex + OFFSET_MAX]; - if (r.value < min) minMaxCount[mmcIndex + OFFSET_MIN] = r.value; - if (r.value > max) minMaxCount[mmcIndex + OFFSET_MAX] = r.value; + if (temperature < min) minMaxCount[mmcIndex + OFFSET_MIN] = temperature; + if (temperature > max) minMaxCount[mmcIndex + OFFSET_MAX] = temperature; minMaxCount[mmcIndex + OFFSET_COUNT]++; } - - - public Map stationsToMetrics() { - var m = new TreeMap(); - for (int i = 0; i < names.length; i++) { - var name = names[i]; - if (name==null) continue; - - var min = minMaxCount[i*3] / 10.0; - var max = minMaxCount[i*3+1] / 10.0; - var count = minMaxCount[i*3+2]; - var sum = sums[i]; - var mean = Math.round((double) sum / count) / 10.0; - - m.put(name, min + "/" + mean + "/" + max); - } - return m; - } - - } - - static class InputRecord { - private final char[] chars; - private int idOffset; - private int idLength; - - public int value; // fixpoint scaled by 10 - public int nameHash; - - public InputRecord(char[] chars) { - this.chars = chars; - } - - public void init(int nameHash, int nameOffset, int nameLength, int fixpointValue) { - assert nameOffset+nameLength