diff --git a/calculate_average_karthikeyan97.sh b/calculate_average_karthikeyan97.sh index a6bd728..bbad1c4 100755 --- a/calculate_average_karthikeyan97.sh +++ b/calculate_average_karthikeyan97.sh @@ -16,4 +16,14 @@ # JAVA_OPTS="-Xms20480m -Xmx40960m " -java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_karthikeyan97 + +if [ -f target/CalculateAverage_karthikeyan97_image ]; then + #echo "Picking up existing native image 'target/CalculateAverage_karthikeyan97_image', delete the file to select JVM mode." 1>&2 + target/CalculateAverage_karthikeyan97_image -Xms20480m -Xmx32768m +else + JAVA_OPTS="--enable-preview" + #echo "Chosing to run the app in JVM mode as no native image was found, use prepare_karthikeyan97.sh to generate." 1>&2 + java -Xms20480m -Xmx32768m --enable-preview --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_karthikeyan97 +fi + + diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_karthikeyan97.java b/src/main/java/dev/morling/onebrc/CalculateAverage_karthikeyan97.java index c17e927..7014b12 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_karthikeyan97.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_karthikeyan97.java @@ -22,9 +22,12 @@ import static java.util.stream.Collectors.*; import java.io.FileInputStream; import java.io.RandomAccessFile; +import java.lang.foreign.Arena; import java.lang.reflect.Field; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -33,6 +36,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Scanner; import java.util.Set; import java.util.TreeMap; import java.util.function.BiConsumer; @@ -44,8 +48,21 @@ import java.util.stream.Collectors; public class CalculateAverage_karthikeyan97 { + private static final Unsafe UNSAFE = initUnsafe(); + private static final String FILE = "./measurements.txt"; + private static Unsafe initUnsafe() { + try { + Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); + theUnsafe.setAccessible(true); + return (Unsafe) theUnsafe.get(Unsafe.class); + } + catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + private record Measurement(modifiedbytearray station, double value) { } @@ -53,18 +70,18 @@ public class CalculateAverage_karthikeyan97 { } private static class MeasurementAggregator { - private double min = Double.POSITIVE_INFINITY; - private double max = Double.NEGATIVE_INFINITY; + private long min = Long.MAX_VALUE; + private long max = Long.MIN_VALUE; private long sum; private long count; public String toString() { return new StringBuffer(14) - .append(round(min)) + .append(round((1.0 * min))) .append("/") .append(round((1.0 * sum) / count)) .append("/") - .append(round(max)).toString(); + .append(round((1.0 * max))).toString(); } private double round(double value) { @@ -74,7 +91,7 @@ public class CalculateAverage_karthikeyan97 { public static void main(String[] args) throws Exception { // long start = System.nanoTime(); - System.setSecurityManager(null); + // System.setSecurityManager(null); Collector, MeasurementAggregator, MeasurementAggregator> collector = Collector.of( MeasurementAggregator::new, (a, m) -> { @@ -103,15 +120,17 @@ public class CalculateAverage_karthikeyan97 { }, agg -> agg); - RandomAccessFile raf = new RandomAccessFile(FILE, "rw"); + RandomAccessFile raf = new RandomAccessFile(FILE, "r"); + FileChannel fileChannel = raf.getChannel(); + final long mappedAddress = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, raf.length(), Arena.global()).address(); long length = raf.length(); - int cores = length > 1000 ? Runtime.getRuntime().availableProcessors() : 1; + final long endAddress = mappedAddress + length - 1; + int cores = length > 1000 ? Runtime.getRuntime().availableProcessors() * 2 : 1; long boundary[][] = new long[cores][2]; long segments = length / (cores); long before = -1; for (int i = 0; i < cores - 1; i++) { boundary[i][0] = before + 1; - byte[] b = new byte[107]; if (before + segments - 107 > 0) { raf.seek(before + segments - 107); } @@ -130,120 +149,92 @@ public class CalculateAverage_karthikeyan97 { f.setAccessible(true); Unsafe unsafe = (Unsafe) f.get(null); - int pageSize = unsafe.pageSize() * 10; + int l3Size = (13 * 1024 * 1024);// unsafe.l3Size(); System.out.println(new TreeMap((Arrays.stream(boundary).parallel().map(i -> { FileInputStream fileInputStream = null; try { - fileInputStream = new FileInputStream(FILE); - FileChannel fileChannel = fileInputStream.getChannel(); - HashMap resultmap = new HashMap<>(12000, 100); - - ByteBuffer buffer = ByteBuffer.allocateDirect(pageSize); - - fileChannel.position(i[0]); - int bytesReading = 0; - double num = 0; + int seglen = (int) (i[1] - i[0] + 1); + HashMap resultmap = new HashMap<>(1000); + long segstart = mappedAddress + i[0]; + int bytesRemaining = seglen; + long num = 0; int sign = 1; boolean isNumber = false; byte bi; modifiedbytearray stationName = null; - int hascode = 1; - int ctr = 0; - byte[] arr = new byte[100]; - int arrptr = 0; - int seglen = (int) (i[1] - i[0] + 1); - while (bytesReading < seglen) { - buffer.clear(); - int bytesRead = fileChannel.read(buffer); - if ((bytesReading + bytesRead) <= seglen) { - if (bytesRead < 0) { - bytesRead = 0; - } - } - else { - bytesRead = (seglen - bytesReading); - } - buffer.flip(); + int hascode = 5381; + while (bytesRemaining > 0) { int bytesptr = 0; - byte[] bufferArr = new byte[bytesRead]; - buffer.get(bufferArr); - while (bytesptr < bytesRead) { - bytesReading += 1; - bi = bufferArr[bytesptr++]; - if (ctr > 0) { - arr[arrptr++] = bi; - hascode = 31 * hascode + bi; - ctr--; - } - else { - if (bi >= 240) { - arr[arrptr++] = bi; - hascode = 31 * hascode + bi; - ctr = 3; - } - else if (bi >= 224) { - arr[arrptr++] = bi; - hascode = 31 * hascode + bi; - ctr = 2; - } - else if (bi >= 192) { - arr[arrptr++] = bi; - hascode = 31 * hascode + bi; - ctr = 1; + // int bytesread = buffer.remaining() > l3Size ? l3Size : buffer.remaining(); + // byte[] bufferArr = new byte[bytesread]; + // buffer.get(bufferArr); + int bbstart = 0; + int readSize = bytesRemaining > l3Size ? l3Size : bytesRemaining; + int actualReadSize = (segstart + readSize + 110 > endAddress || readSize + 110 > i[1]) ? readSize : readSize + 110; + byte[] readArr = new byte[actualReadSize]; + + UNSAFE.copyMemory(null, segstart, readArr, UNSAFE.ARRAY_BYTE_BASE_OFFSET, actualReadSize); + while (bytesptr < actualReadSize) { + bi = readArr[bytesptr++];// UNSAFE.getByte(segstart + bytesReading++); + if (!isNumber) { + if (bi >= 192) { + hascode = (hascode << 5) + hascode ^ bi; } else if (bi == 59) { isNumber = true; - stationName = new modifiedbytearray(arr, arrptr, hascode); - arr = new byte[100]; - arrptr = 0; - hascode = 1; - } - else if (bi == 10) { - hascode = 1; - isNumber = false; - MeasurementAggregator agg = resultmap.get(stationName); - num *= sign; - if (agg == null) { - agg = new MeasurementAggregator(); - agg.min = num; - agg.max = num; - agg.sum = (long) (num); - agg.count = 1; - resultmap.put(stationName, agg); + stationName = new modifiedbytearray(readArr, bbstart, bytesptr - 2, hascode & 0xFFFFFFFF); + bbstart = 0; + hascode = 5381; + if (bytesptr >= readSize) { + break; } - else { - if (agg.min >= num) { - agg.min = num; - } - if (agg.max <= num) { - agg.max = num; - } - agg.sum += (long) (num); - agg.count++; - } - num = 0; - sign = 1; } else { - hascode = 31 * hascode + bi; - if (isNumber) { - switch (bi) { - case 0x2E: - break; - case 0x2D: - sign = -1; - break; - default: - num = num * 10 + (bi - 0x30); + hascode = (hascode << 5) + hascode ^ bi; + } + } + else { + switch (bi) { + case 0x2E: + break; + case 0x2D: + sign = -1; + break; + case 10: + hascode = 5381; + isNumber = false; + bbstart = bytesptr; + MeasurementAggregator agg = resultmap.get(stationName); + num *= sign; + if (agg == null) { + agg = new MeasurementAggregator(); + agg.min = num; + agg.max = num; + agg.sum = (long) (num); + agg.count = 1; + resultmap.put(stationName, agg); } - } - else { - arr[arrptr++] = bi; - } + else { + if (agg.min >= num) { + agg.min = num; + } + if (agg.max <= num) { + agg.max = num; + } + agg.sum += (long) (num); + agg.count++; + } + num = 0; + sign = 1; + break; + default: + num = num * 10 + (bi - 0x30); } } } + bytesRemaining -= bytesptr; + segstart += bytesptr; } /* * while (bytesReading < (i[1] - i[0] + 1) && buffer.position() < buffer.limit()) { @@ -335,7 +326,7 @@ public class CalculateAverage_karthikeyan97 { */ // Get the FileChannel from the FileInputStream - // System.out.println("time taken:" + (System.nanoTime() - start) / 1000000); + // System.out.println("time taken1:" + (System.nanoTime() - start) / 1000000); // System.out.println(measurements); } @@ -343,17 +334,21 @@ public class CalculateAverage_karthikeyan97 { class modifiedbytearray { private int length; + private int start; + private int end; private byte[] arr; public int hashcode; - modifiedbytearray(byte[] arr, int length, int hashcode) { + modifiedbytearray(byte[] arr, int start, int end, int hashcode) { this.arr = arr; - this.length = length; + this.length = end - start + 1; + this.end = end; + this.start = start; this.hashcode = hashcode; } public String getStationName() { - return new String(this.getArr(), 0, length, StandardCharsets.UTF_8); + return new String(this.getArr(), start, length, StandardCharsets.UTF_8); } public byte[] getArr() { @@ -368,7 +363,7 @@ class modifiedbytearray { @Override public boolean equals(Object obj) { modifiedbytearray b = (modifiedbytearray) obj; - return Arrays.equals(this.getArr(), 0, length, b.arr, 0, b.length); + return Arrays.equals(this.getArr(), start, end, b.arr, b.start, b.end); } public int getHashcode() {