diff --git a/calculate_average_spullara.sh b/calculate_average_spullara.sh index 8d98cc4..a1502dd 100755 --- a/calculate_average_spullara.sh +++ b/calculate_average_spullara.sh @@ -17,5 +17,6 @@ JAVA_OPTS="" +sdk use java 21.0.1-graal time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_spullara diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_spullara.java b/src/main/java/dev/morling/onebrc/CalculateAverage_spullara.java index f6fdec4..0b70946 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_spullara.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_spullara.java @@ -27,13 +27,10 @@ import java.util.Arrays; import java.util.List; import java.util.TreeMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; -import java.util.function.Supplier; import java.util.stream.Collectors; public class CalculateAverage_spullara { - private static final String FILE = "./measurements.txt"; + private static final String FILE = "./measurements.txt"; /* * My results on this computer: @@ -44,189 +41,172 @@ public class CalculateAverage_spullara { * */ - public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { - var filename = args.length == 0 ? FILE : args[0]; - var file = new File(filename); - long start = System.currentTimeMillis(); + public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { + long start = System.currentTimeMillis(); + var filename = args.length == 0 ? FILE : args[0]; + var file = new File(filename); - var totalLines = new AtomicInteger(); - var results = getFileSegments(file).stream().map(segment -> { - var resultMap = new ByteArrayToResultMap(); - long segmentEnd = segment.end(); - try (var fileChannel = (FileChannel) Files.newByteChannel(Path.of(filename), StandardOpenOption.READ)) { - var bb = fileChannel.map(FileChannel.MapMode.READ_ONLY, segment.start(), segmentEnd - segment.start()); - var buffer = new byte[64]; - int lines = 0; - int startLine; - int limit = bb.limit(); - while ((startLine = bb.position()) < limit) { - int currentPosition = startLine; - byte b; - int offset = 0; - while (currentPosition != segmentEnd && (b = bb.get(currentPosition++)) != ';') { - buffer[offset++] = b; - } - int temp = 0; - int negative = 1; - outer: - while (currentPosition != segmentEnd && (b = bb.get(currentPosition++)) != '\n') { - switch (b) { - case '-': - negative = -1; - case '.': - break; - case '\r': - currentPosition++; - break outer; - default: - temp = 10 * temp + (b - '0'); + var resultsMap = getFileSegments(file).stream().map(segment -> { + var resultMap = new ByteArrayToResultMap(); + long segmentEnd = segment.end(); + try (var fileChannel = (FileChannel) Files.newByteChannel(Path.of(filename), StandardOpenOption.READ)) { + var bb = fileChannel.map(FileChannel.MapMode.READ_ONLY, segment.start(), segmentEnd - segment.start()); + // Up to 100 characters for a city name + var buffer = new byte[100]; + int startLine; + int limit = bb.limit(); + while ((startLine = bb.position()) < limit) { + int currentPosition = startLine; + byte b; + int offset = 0; + int hash = 0; + while (currentPosition != segmentEnd && (b = bb.get(currentPosition++)) != ';') { + buffer[offset++] = b; + hash = 31 * hash + b; + } + int temp; + int negative = 1; + // Inspired by @yemreinci to unroll this even further + if (bb.get(currentPosition) == '-') { + negative = -1; + currentPosition++; + } + if (bb.get(currentPosition + 1) == '.') { + temp = negative * ((bb.get(currentPosition) - '0') * 10 + (bb.get(currentPosition + 2) - '0')); + currentPosition += 3; + } + else { + temp = negative * ((bb.get(currentPosition) - '0') * 100 + ((bb.get(currentPosition + 1) - '0') * 10 + (bb.get(currentPosition + 3) - '0'))); + currentPosition += 4; + } + if (bb.get(currentPosition) == '\r') { + currentPosition++; + } + currentPosition++; + resultMap.putOrMerge(buffer, 0, offset, temp / 10.0, hash); + bb.position(currentPosition); + } + return resultMap; } - } - temp *= negative; - double finalTemp = temp / 10.0; - resultMap.putOrMerge(buffer, 0, offset, - () -> new Result(finalTemp), - measurement -> merge(measurement, finalTemp, finalTemp, finalTemp, 1)); - lines++; - bb.position(currentPosition); + catch (IOException e) { + throw new RuntimeException(e); + } + }).parallel().flatMap(partition -> partition.getAll().stream()) + .collect(Collectors.toMap(e -> new String(e.key()), Entry::value, CalculateAverage_spullara::merge, TreeMap::new)); + + System.out.println(resultsMap); + } + + private static List getFileSegments(File file) throws IOException { + int numberOfSegments = Runtime.getRuntime().availableProcessors(); + long fileSize = file.length(); + long segmentSize = fileSize / numberOfSegments; + List segments = new ArrayList<>(numberOfSegments); + // Pointless to split small files + if (segmentSize < 1_000_000) { + segments.add(new FileSegment(0, fileSize)); + return segments; } - totalLines.addAndGet(lines); - return resultMap; - } catch (IOException e) { - throw new RuntimeException(e); - } - }).parallel().toList(); + try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) { + for (int i = 0; i < numberOfSegments; i++) { + long segStart = i * segmentSize; + long segEnd = (i == numberOfSegments - 1) ? fileSize : segStart + segmentSize; + segStart = findSegment(i, 0, randomAccessFile, segStart, segEnd); + segEnd = findSegment(i, numberOfSegments - 1, randomAccessFile, segEnd, fileSize); - var resultMap = results.stream() - .flatMap(partition -> partition.getAll().stream()) - .collect(Collectors.toMap(e -> new String(e.key()), Entry::value, CalculateAverage_spullara::merge, TreeMap::new)); - - System.out.println("Time: " + (System.currentTimeMillis() - start) + "ms"); - System.out.println("Lines processed: " + totalLines); - System.out.println(resultMap); - } - - private static List getFileSegments(File file) throws IOException { - int numberOfSegments = Runtime.getRuntime().availableProcessors(); - long fileSize = file.length(); - long segmentSize = fileSize / numberOfSegments; - List segments = new ArrayList<>(); - try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) { - for (int i = 0; i < numberOfSegments; i++) { - long segStart = i * segmentSize; - long segEnd = (i == numberOfSegments - 1) ? fileSize : segStart + segmentSize; - segStart = findSegment(i, 0, randomAccessFile, segStart, segEnd); - segEnd = findSegment(i, numberOfSegments - 1, randomAccessFile, segEnd, fileSize); - - segments.add(new FileSegment(segStart, segEnd)); - } + segments.add(new FileSegment(segStart, segEnd)); + } + } + return segments; } - return segments; - } - private static Result merge(Result v, Result value) { - return merge(v, value.min, value.max, value.sum, value.count); - } - - private static Result merge(Result v, double value, double value1, double value2, long value3) { - v.min = Math.min(v.min, value); - v.max = Math.max(v.max, value1); - v.sum += value2; - v.count += value3; - return v; - } - - private static long findSegment(int i, int skipSegment, RandomAccessFile raf, long location, long fileSize) throws IOException { - if (i != skipSegment) { - raf.seek(location); - while (location < fileSize) { - location++; - if (raf.read() == '\n') - break; - } + private static Result merge(Result v, Result value) { + return merge(v, value.min, value.max, value.sum, value.count); + } + + private static Result merge(Result v, double value, double value1, double value2, long value3) { + v.min = Math.min(v.min, value); + v.max = Math.max(v.max, value1); + v.sum += value2; + v.count += value3; + return v; + } + + private static long findSegment(int i, int skipSegment, RandomAccessFile raf, long location, long fileSize) throws IOException { + if (i != skipSegment) { + raf.seek(location); + while (location < fileSize) { + location++; + if (raf.read() == '\n') + break; + } + } + return location; } - return location; - } } class Result { - double min, max, sum; - long count; + double min, max, sum; + long count; - Result(double value) { - min = max = sum = value; - this.count = 1; - } + Result(double value) { + min = max = sum = value; + this.count = 1; + } - @Override - public String toString() { - return round(min) + "/" + round(sum / count) + "/" + round(max); - } + @Override + public String toString() { + return round(min) + "/" + round(sum / count) + "/" + round(max); + } - double round(double v) { - return Math.round(v * 10.0) / 10.0; - } + double round(double v) { + return Math.round(v * 10.0) / 10.0; + } } -record Pair(int slot, Result slotValue) { -} + record Entry(byte[] key, Result value) { + } -record Entry(byte[] key, Result value) { -} - -record FileSegment(long start, long end) { -} + record FileSegment(long start, long end) { + } class ByteArrayToResultMap { - public static final int MAPSIZE = 1024*128; - Result[] slots = new Result[MAPSIZE]; - byte[][] keys = new byte[MAPSIZE][]; + public static final int MAPSIZE = 1024 * 128; + Result[] slots = new Result[MAPSIZE]; + byte[][] keys = new byte[MAPSIZE][]; - private int hashCode(byte[] a, int fromIndex, int length) { - int result = 0; - int end = fromIndex + length; - for (int i = fromIndex; i < end; i++) { - result = 31 * result + a[i]; + public void putOrMerge(byte[] key, int offset, int size, double temp, int hash) { + int slot = hash & (slots.length - 1); + var slotValue = slots[slot]; + // Linear probe for open slot + while (slotValue != null && (keys[slot].length != size || !Arrays.equals(keys[slot], 0, size, key, offset, size))) { + slot = (slot + 1) & (slots.length - 1); + slotValue = slots[slot]; + } + Result value = slotValue; + if (value == null) { + slots[slot] = new Result(temp); + byte[] bytes = new byte[size]; + System.arraycopy(key, offset, bytes, 0, size); + keys[slot] = bytes; + } else { + value.min = Math.min(value.min, temp); + value.max = Math.max(value.max, temp); + value.sum += temp; + value.count += 1; + } } - return result; - } - private Pair getPair(byte[] key, int offset, int size) { - int hash = hashCode(key, offset, size); - int slot = hash & (slots.length - 1); - var slotValue = slots[slot]; - // Linear probe for open slot - while (slotValue != null && (keys[slot].length != size || !Arrays.equals(keys[slot], 0, size, key, offset, size))) { - slot = (slot + 1) & (slots.length - 1); - slotValue = slots[slot]; + // Get all pairs + public List getAll() { + List result = new ArrayList<>(slots.length); + for (int i = 0; i < slots.length; i++) { + Result slotValue = slots[i]; + if (slotValue != null) { + result.add(new Entry(keys[i], slotValue)); + } + } + return result; } - return new Pair(slot, slotValue); - } - - public void putOrMerge(byte[] key, int offset, int size, Supplier supplier, Consumer merge) { - Pair result = getPair(key, offset, size); - Result value = result.slotValue(); - if (value == null) { - int slot = result.slot(); - slots[slot] = supplier.get(); - byte[] bytes = new byte[size]; - System.arraycopy(key, offset, bytes, 0, size); - keys[slot] = bytes; - } else { - merge.accept(value); - } - } - - // Get all pairs - public List getAll() { - List result = new ArrayList<>(); - for (int i = 0; i < slots.length; i++) { - Result slotValue = slots[i]; - if (slotValue != null) { - result.add(new Entry(keys[i], slotValue)); - } - } - return result; - } } \ No newline at end of file