diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_PanagiotisDrakatos.java b/src/main/java/dev/morling/onebrc/CalculateAverage_PanagiotisDrakatos.java index ecf2b70..9ab7a22 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_PanagiotisDrakatos.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_PanagiotisDrakatos.java @@ -15,8 +15,10 @@ */ package dev.morling.onebrc; +import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; @@ -26,18 +28,27 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; public class CalculateAverage_PanagiotisDrakatos { + private static final String FILE = "./measurements.txt"; + private static final long SEGMENT_SIZE = 4 * 1024 * 1024; + private static final long COMMA_PATTERN = 0x3B3B3B3B3B3B3B3BL; + private static final long DOT_BITS = 0x10101000; + private static final long MAGIC_MULTIPLIER = (100 * 0x1000000 + 10 * 0x10000 + 1); + private static TreeMap sortedCities; public static void main(String[] args) throws IOException { SeekableByteRead(FILE); System.out.println(sortedCities); + boolean DEBUG = true; } private static void SeekableByteRead(String path) throws IOException { - FileInputStream fileInputStream = new FileInputStream(FILE); + FileInputStream fileInputStream = new FileInputStream(new File(FILE)); FileChannel fileChannel = fileInputStream.getChannel(); - Optional> optimistic = SplitSeekableByteChannel(fileChannel) + Optional> optimistic = getFileSegments(new File(FILE), fileChannel) + .stream() + .map(CalculateAverage_PanagiotisDrakatos::SplitSeekableByteChannel) .parallel() .map(CalculateAverage_PanagiotisDrakatos::MappingByteBufferToData) .reduce(CalculateAverage_PanagiotisDrakatos::combineMaps); @@ -46,37 +57,53 @@ public class CalculateAverage_PanagiotisDrakatos { } - private static Stream SplitSeekableByteChannel(FileChannel channel) throws IOException { - return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator() { - private static final long MAP_SIZE = 1024 * 1024 * 10L; + record FileSegment(long start, long end, FileChannel fileChannel) { + } - private long position = 0; - private long length = channel.size(); - - @Override - public boolean hasNext() { - while (position < length) { - return true; - } - return false; + private static List getFileSegments(final File file, final FileChannel fileChannel) throws IOException { + final int numberOfSegments = Runtime.getRuntime().availableProcessors(); + final long fileSize = file.length(); + final long segmentSize = fileSize / numberOfSegments; + final List segments = new ArrayList<>(); + if (segmentSize < 1000) { + segments.add(new FileSegment(0, fileSize, fileChannel)); + return segments; + } + try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) { + long segStart = 0; + long segEnd = segmentSize; + while (segStart < fileSize) { + segEnd = findSegment(randomAccessFile, segEnd, fileSize); + segments.add(new FileSegment(segStart, segEnd, fileChannel)); + segStart = segEnd; // Just re-use the end and go from there. + segEnd = Math.min(fileSize, segEnd + segmentSize); } + } + return segments; + } - @Override - public ByteBuffer next() { - try { - MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, position, Math.min(MAP_SIZE, length - position)); - int end = buffer.limit() - 1; - while (buffer.get(end) != '\n') { - end--; - } - position += end + 1; - return buffer.slice(0, end); - } - catch (IOException e) { - throw new RuntimeException(e); - } + private static long findSegment(RandomAccessFile raf, long location, final long fileSize) throws IOException { + raf.seek(location); + while (location < fileSize) { + location++; + if (raf.read() == '\n') + return location; + } + return location; + } + + private static ByteBuffer SplitSeekableByteChannel(FileSegment segment) { + try { + MappedByteBuffer buffer = segment.fileChannel.map(FileChannel.MapMode.READ_ONLY, segment.start(), segment.end - segment.start()); + int end = buffer.limit() - 1; + while (buffer.get(end) != '\n') { + end--; } - }, Spliterator.IMMUTABLE), false); + return buffer.slice(0, end); + } + catch (Exception ex) { + throw new RuntimeException(ex); + } } public static ByteBuffer concat(ByteBuffer[] buffers) {