From 78b34156784777e0bb8bdfcee8461541e0824199 Mon Sep 17 00:00:00 2001 From: Keshavram Kuduwa <131107576+kuduwa-keshavram@users.noreply.github.com> Date: Sun, 7 Jan 2024 14:41:56 +0530 Subject: [PATCH] Optimised Code to use FileSegments with ByteBuffer (#184) * Keshavram Kuduwa's Submission * Resolves #102 and Code Optimizations * Resolves #102 and Code Optimizations * Optimised Code with Roy's Reference * Fixed Tests * Clean Up Code --------- Co-authored-by: Keshavram Kuduwa --- .../CalculateAverage_kuduwa_keshavram.java | 204 +++++++++++++----- 1 file changed, 156 insertions(+), 48 deletions(-) diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_kuduwa_keshavram.java b/src/main/java/dev/morling/onebrc/CalculateAverage_kuduwa_keshavram.java index d9da874..d7f1a61 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_kuduwa_keshavram.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_kuduwa_keshavram.java @@ -15,53 +15,165 @@ */ package dev.morling.onebrc; +import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteOrder; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.DoubleSummaryStatistics; +import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.function.Function; -import java.util.function.ToDoubleFunction; +import java.util.Spliterator; +import java.util.Spliterators; import java.util.stream.Collectors; -import java.util.stream.Stream; +import java.util.stream.IntStream; +import java.util.stream.StreamSupport; public class CalculateAverage_kuduwa_keshavram { private static final String FILE = "./measurements.txt"; - private static final Function KEY_MAPPER = line -> { - int pivot = line.indexOf(";"); - return line.substring(0, pivot); - }; - private static final ToDoubleFunction VALUE_MAPPER = line -> { - int pivot = line.indexOf(";"); - return toDouble(line.substring(pivot + 1)); - }; + private static final long LEFT_SHIFT_EIGHT = Long.MAX_VALUE / (1L << 8); + private static final long LEFT_SHIFT_FOUR = Long.MAX_VALUE / (1L << 4); + private static final long LEFT_SHIFT_TWO = Long.MAX_VALUE / (1L << 2); + private static final long LEFT_SHIFT_ONE = Long.MAX_VALUE / (1L << 1); public static void main(String[] args) throws IOException, InterruptedException { - try (Stream lines = Files.lines(Path.of(FILE))) { - Map resultMap = lines - .parallel() - .collect( - Collectors.groupingBy(KEY_MAPPER, Collectors.summarizingDouble(VALUE_MAPPER))); - System.out.println( - resultMap.entrySet().stream() - .sorted(Map.Entry.comparingByKey()) - .map( - entry -> String.format( - "%s=%.1f/%.1f/%.1f", - entry.getKey(), - entry.getValue().getMin(), - entry.getValue().getAverage(), - entry.getValue().getMax())) - .collect(Collectors.joining(", ", "{", "}"))); + Map resultMap = getFileSegments(new File(FILE)).stream() + .parallel() + .flatMap( + segment -> { + try (FileChannel fileChannel = (FileChannel) Files.newByteChannel(Path.of(FILE), StandardOpenOption.READ)) { + MappedByteBuffer byteBuffer = fileChannel.map( + MapMode.READ_ONLY, segment.start, segment.end - segment.start); + byteBuffer.order(ByteOrder.nativeOrder()); + Iterator iterator = getMeasurementIterator(byteBuffer); + return StreamSupport.stream( + Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL), true); + } + catch (IOException e) { + throw new RuntimeException(e); + } + }) + .collect( + Collectors.groupingBy( + Measurement::city, Collectors.summarizingDouble(Measurement::temp))); + System.out.println( + resultMap.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .map( + entry -> String.format( + "%s=%.1f/%.1f/%.1f", + entry.getKey(), + entry.getValue().getMin(), + entry.getValue().getAverage(), + entry.getValue().getMax())) + .collect(Collectors.joining(", ", "{", "}"))); + } + + private static Iterator getMeasurementIterator(MappedByteBuffer byteBuffer) { + return new Iterator<>() { + + private int initialPosition; + + private int delimiterIndex; + + @Override + public boolean hasNext() { + boolean hasRemaining = byteBuffer.hasRemaining(); + if (hasRemaining) { + initialPosition = byteBuffer.position(); + delimiterIndex = 0; + while (true) { + byte b = byteBuffer.get(); + if (b == 59) { + break; + } + delimiterIndex++; + } + return true; + } + return false; + } + + @Override + public Measurement next() { + byteBuffer.position(initialPosition); + + byte[] city = new byte[delimiterIndex]; + for (int j = 0; j < delimiterIndex; j++) { + city[j] = byteBuffer.get(); + } + + byteBuffer.get(); + String temp = ""; + while (true) { + char c = (char) byteBuffer.get(); + if (c == '\n') { + break; + } + temp += c; + } + return new Measurement(new String(city), toDouble(temp)); + } + }; + } + + private record FileSegment(long start, long end) { + } + + private record Measurement(String city, double temp) { + } + + private static List getFileSegments(final File file) throws IOException { + final int numberOfSegments = Runtime.getRuntime().availableProcessors(); + final long fileSize = file.length(); + final long segmentSize = fileSize / numberOfSegments; + if (segmentSize < 1000) { + return List.of(new FileSegment(0, fileSize)); + } + + try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) { + int lastSegment = numberOfSegments - 1; + return IntStream.range(0, numberOfSegments) + .mapToObj( + i -> { + long segStart = i * segmentSize; + long segEnd = (i == lastSegment) ? fileSize : segStart + segmentSize; + try { + segStart = findSegment(i, 0, randomAccessFile, segStart, segEnd); + segEnd = findSegment(i, lastSegment, randomAccessFile, segEnd, fileSize); + } + catch (IOException e) { + throw new RuntimeException(e); + } + return new FileSegment(segStart, segEnd); + }) + .toList(); } } - private static final long MAX_VALUE_DIVIDE_10 = Long.MAX_VALUE / 10; + private static long findSegment( + final int i, final int skipSegment, RandomAccessFile raf, long location, final long fileSize) + throws IOException { + if (i != skipSegment) { + raf.seek(location); + while (location < fileSize) { + location++; + if (raf.read() == '\n') + return location; + } + } + return location; + } private static double toDouble(String num) { long value = 0; - int exp = 0; boolean negative = false; int decimalPlaces = Integer.MIN_VALUE; for (byte ch : num.getBytes()) { @@ -80,32 +192,26 @@ public class CalculateAverage_kuduwa_keshavram { } } - return asDouble(value, exp, negative, decimalPlaces); + return asDouble(value, negative, decimalPlaces); } - private static double asDouble(long value, int exp, boolean negative, int decimalPlaces) { + private static double asDouble(long value, boolean negative, int decimalPlaces) { + int exp = -48; + value <<= 48; if (decimalPlaces > 0 && value < Long.MAX_VALUE / 2) { - if (value < Long.MAX_VALUE / (1L << 32)) { - exp -= 32; - value <<= 32; - } - if (value < Long.MAX_VALUE / (1L << 16)) { - exp -= 16; - value <<= 16; - } - if (value < Long.MAX_VALUE / (1L << 8)) { + if (value < LEFT_SHIFT_EIGHT) { exp -= 8; value <<= 8; } - if (value < Long.MAX_VALUE / (1L << 4)) { + if (value < LEFT_SHIFT_FOUR) { exp -= 4; value <<= 4; } - if (value < Long.MAX_VALUE / (1L << 2)) { + if (value < LEFT_SHIFT_TWO) { exp -= 2; value <<= 2; } - if (value < Long.MAX_VALUE / (1L << 1)) { + if (value < LEFT_SHIFT_ONE) { exp -= 1; value <<= 1; } @@ -115,25 +221,27 @@ public class CalculateAverage_kuduwa_keshavram { long mod = value % 5; value /= 5; int modDiv = 1; - if (value < Long.MAX_VALUE / (1L << 4)) { + if (value < LEFT_SHIFT_FOUR) { exp -= 4; value <<= 4; modDiv <<= 4; } - if (value < Long.MAX_VALUE / (1L << 2)) { + if (value < LEFT_SHIFT_TWO) { exp -= 2; value <<= 2; modDiv <<= 2; } - if (value < Long.MAX_VALUE / (1L << 1)) { + if (value < LEFT_SHIFT_ONE) { exp -= 1; value <<= 1; modDiv <<= 1; } - if (decimalPlaces > 1) + if (decimalPlaces > 1) { value += modDiv * mod / 5; - else + } + else { value += (modDiv * mod + 4) / 5; + } } final double d = Math.scalb((double) value, exp); return negative ? -d : d;