From 455b85c5af1cba9ddb6e6dc686c091d1e000a432 Mon Sep 17 00:00:00 2001 From: karthikeyan97 Date: Wed, 17 Jan 2024 03:16:11 +0530 Subject: [PATCH] karthikeyan97 implementation (#417) Co-authored-by: Karthikeyans --- calculate_average_karthikeyan97.sh | 19 + .../CalculateAverage_karthikeyan97.java | 382 ++++++++++++++++++ 2 files changed, 401 insertions(+) create mode 100755 calculate_average_karthikeyan97.sh create mode 100644 src/main/java/dev/morling/onebrc/CalculateAverage_karthikeyan97.java diff --git a/calculate_average_karthikeyan97.sh b/calculate_average_karthikeyan97.sh new file mode 100755 index 0000000..a6bd728 --- /dev/null +++ b/calculate_average_karthikeyan97.sh @@ -0,0 +1,19 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +JAVA_OPTS="-Xms20480m -Xmx40960m " +java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_karthikeyan97 diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_karthikeyan97.java b/src/main/java/dev/morling/onebrc/CalculateAverage_karthikeyan97.java new file mode 100644 index 0000000..c17e927 --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_karthikeyan97.java @@ -0,0 +1,382 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import sun.misc.Unsafe; + +import static java.util.stream.Collectors.*; + +import java.io.FileInputStream; + +import java.io.RandomAccessFile; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.function.BiConsumer; +import java.util.function.BinaryOperator; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collector; +import java.util.stream.Collectors; + +public class CalculateAverage_karthikeyan97 { + + private static final String FILE = "./measurements.txt"; + + private record Measurement(modifiedbytearray station, double value) { + } + + private record customPair(String stationName, MeasurementAggregator agg) { + } + + private static class MeasurementAggregator { + private double min = Double.POSITIVE_INFINITY; + private double max = Double.NEGATIVE_INFINITY; + private long sum; + private long count; + + public String toString() { + return new StringBuffer(14) + .append(round(min)) + .append("/") + .append(round((1.0 * sum) / count)) + .append("/") + .append(round(max)).toString(); + } + + private double round(double value) { + return Math.round(value) / 10.0; + } + } + + public static void main(String[] args) throws Exception { + // long start = System.nanoTime(); + System.setSecurityManager(null); + Collector, MeasurementAggregator, MeasurementAggregator> collector = Collector.of( + MeasurementAggregator::new, + (a, m) -> { + MeasurementAggregator agg = m.getValue(); + if (a.min >= agg.min) { + a.min = agg.min; + } + if (a.max <= agg.max) { + a.max = agg.max; + } + a.max = Math.max(a.max, m.getValue().max); + a.sum += m.getValue().sum; + a.count += m.getValue().count; + }, + (agg1, agg2) -> { + if (agg1.min <= agg2.min) { + agg2.min = agg1.min; + } + if (agg1.max >= agg2.max) { + agg2.max = agg1.max; + } + agg2.sum = agg1.sum + agg2.sum; + agg2.count = agg1.count + agg2.count; + + return agg2; + }, + agg -> agg); + + RandomAccessFile raf = new RandomAccessFile(FILE, "rw"); + long length = raf.length(); + int cores = length > 1000 ? Runtime.getRuntime().availableProcessors() : 1; + long boundary[][] = new long[cores][2]; + long segments = length / (cores); + long before = -1; + for (int i = 0; i < cores - 1; i++) { + boundary[i][0] = before + 1; + byte[] b = new byte[107]; + if (before + segments - 107 > 0) { + raf.seek(before + segments - 107); + } + else { + raf.seek(0); + } + while (raf.read() != '\n') { + } + boundary[i][1] = raf.getChannel().position() - 1; + before = boundary[i][1]; + } + boundary[cores - 1][0] = before + 1; + boundary[cores - 1][1] = length - 1; + + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + Unsafe unsafe = (Unsafe) f.get(null); + + int pageSize = unsafe.pageSize() * 10; + + System.out.println(new TreeMap((Arrays.stream(boundary).parallel().map(i -> { + FileInputStream fileInputStream = null; + try { + fileInputStream = new FileInputStream(FILE); + FileChannel fileChannel = fileInputStream.getChannel(); + HashMap resultmap = new HashMap<>(12000, 100); + + ByteBuffer buffer = ByteBuffer.allocateDirect(pageSize); + + fileChannel.position(i[0]); + int bytesReading = 0; + double num = 0; + int sign = 1; + boolean isNumber = false; + byte bi; + modifiedbytearray stationName = null; + int hascode = 1; + int ctr = 0; + byte[] arr = new byte[100]; + int arrptr = 0; + int seglen = (int) (i[1] - i[0] + 1); + while (bytesReading < seglen) { + buffer.clear(); + int bytesRead = fileChannel.read(buffer); + if ((bytesReading + bytesRead) <= seglen) { + if (bytesRead < 0) { + bytesRead = 0; + } + } + else { + bytesRead = (seglen - bytesReading); + } + buffer.flip(); + int bytesptr = 0; + byte[] bufferArr = new byte[bytesRead]; + buffer.get(bufferArr); + while (bytesptr < bytesRead) { + bytesReading += 1; + bi = bufferArr[bytesptr++]; + if (ctr > 0) { + arr[arrptr++] = bi; + hascode = 31 * hascode + bi; + ctr--; + } + else { + if (bi >= 240) { + arr[arrptr++] = bi; + hascode = 31 * hascode + bi; + ctr = 3; + } + else if (bi >= 224) { + arr[arrptr++] = bi; + hascode = 31 * hascode + bi; + ctr = 2; + } + else if (bi >= 192) { + arr[arrptr++] = bi; + hascode = 31 * hascode + bi; + ctr = 1; + } + else if (bi == 59) { + isNumber = true; + stationName = new modifiedbytearray(arr, arrptr, hascode); + arr = new byte[100]; + arrptr = 0; + hascode = 1; + } + else if (bi == 10) { + hascode = 1; + isNumber = false; + MeasurementAggregator agg = resultmap.get(stationName); + num *= sign; + if (agg == null) { + agg = new MeasurementAggregator(); + agg.min = num; + agg.max = num; + agg.sum = (long) (num); + agg.count = 1; + resultmap.put(stationName, agg); + } + else { + if (agg.min >= num) { + agg.min = num; + } + if (agg.max <= num) { + agg.max = num; + } + agg.sum += (long) (num); + agg.count++; + } + num = 0; + sign = 1; + } + else { + hascode = 31 * hascode + bi; + if (isNumber) { + switch (bi) { + case 0x2E: + break; + case 0x2D: + sign = -1; + break; + default: + num = num * 10 + (bi - 0x30); + } + } + else { + arr[arrptr++] = bi; + } + } + } + } + } + /* + * while (bytesReading < (i[1] - i[0] + 1) && buffer.position() < buffer.limit()) { + * buffer.clear(); + * bytesRead = fileChannel.read(buffer); + * buffer.flip(); + * while (bytesReading <= (i[1] - i[0]) && buffer.position() < buffer.limit()) { + * bytesReading += 1; + * bi = buffer.get(); + * String s; + * if (ctr > 0) { + * hascode = 31 * hascode + bi; + * ctr--; + * } + * else { + * if (bi >= 240) { + * ctr = 3; + * } + * else if (bi >= 224) { + * ctr = 2; + * } + * else if (bi >= 192) { + * ctr = 1; + * } + * else if (bi == 59) { + * isNumber = true; + * System.out.println(buffer); + * stationName = new modifiedbytearray(bbstart, buffer.position() - 1, hascode, buffer); + * hascode = 1; + * bbstart = buffer.position(); + * } + * else if (bi == 10) { + * hascode = 1; + * isNumber = false; + * MeasurementAggregator agg = resultmap.get(stationName); + * if (agg == null) { + * agg = new MeasurementAggregator(); + * agg.min = num * sign; + * agg.max = num * sign; + * agg.sum = (long) (num * sign); + * agg.count = 1; + * resultmap.put(stationName, agg); + * } + * else { + * agg.min = Math.min(agg.min, num * sign); + * agg.max = Math.max(agg.max, num * sign); + * agg.sum += (long) (num * sign); + * agg.count++; + * } + * num = 1; + * bbstart = buffer.position(); + * } + * else { + * hascode = 31 * hascode + bi; + * if (isNumber) { + * switch (bi) { + * case 0x2E: + * break; + * case 0x2D: + * num = num * -1; + * break; + * default: + * num = num * 10 + (bi - 0x30); + * } + * } + * } + * } + * } + * } + */ + return resultmap; + } + catch (Exception e) { + e.printStackTrace(); + } + return null; + }).flatMap(e -> e.entrySet().stream()).collect(groupingBy(e -> e.getKey(), collector)))) { + @Override + public Object put(Object key, Object value) { + return super.put(((modifiedbytearray) key).getStationName(), value); + } + }); + + /* + * .map(a -> { + * return a.stream().parallel().collect(groupingBy(m -> m.station(), collector)); + * }).flatMap(m -> m.entrySet() + * .stream() + */ + // Get the FileChannel from the FileInputStream + + // System.out.println("time taken:" + (System.nanoTime() - start) / 1000000); + // System.out.println(measurements); + } + +} + +class modifiedbytearray { + private int length; + private byte[] arr; + public int hashcode; + + modifiedbytearray(byte[] arr, int length, int hashcode) { + this.arr = arr; + this.length = length; + this.hashcode = hashcode; + } + + public String getStationName() { + return new String(this.getArr(), 0, length, StandardCharsets.UTF_8); + } + + public byte[] getArr() { + return this.arr; + } + + @Override + public String toString() { + return getStationName(); + } + + @Override + public boolean equals(Object obj) { + modifiedbytearray b = (modifiedbytearray) obj; + return Arrays.equals(this.getArr(), 0, length, b.arr, 0, b.length); + } + + public int getHashcode() { + return hashcode; + } + + @Override + public int hashCode() { + return hashcode; + } +}