diff --git a/calculate_average_sudhirtumati.sh b/calculate_average_sudhirtumati.sh new file mode 100755 index 0000000..fb31f86 --- /dev/null +++ b/calculate_average_sudhirtumati.sh @@ -0,0 +1,19 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +JAVA_OPTS="--enable-preview -Xmx128m -XX:+UseSerialGC -XX:-TieredCompilation" +java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_sudhirtumati diff --git a/prepare_sudhirtumati.sh b/prepare_sudhirtumati.sh new file mode 100755 index 0000000..735bdab --- /dev/null +++ b/prepare_sudhirtumati.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Uncomment below to use sdk +source "$HOME/.sdkman/bin/sdkman-init.sh" +sdk use java 21.0.2-open 1>&2 diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_sudhirtumati.java b/src/main/java/dev/morling/onebrc/CalculateAverage_sudhirtumati.java new file mode 100644 index 0000000..813c561 --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_sudhirtumati.java @@ -0,0 +1,304 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; + +public class CalculateAverage_sudhirtumati { + + private static final String FILE = "./measurements.txt"; + private static final int bufferSize = 8192; + private static final byte SEMICOLON = (byte) ';'; + private static final byte NEW_LINE = (byte) '\n'; + private static final int THREAD_COUNT = Runtime.getRuntime().availableProcessors(); + private static final Semaphore PERMITS = new Semaphore(THREAD_COUNT); + private static final MeasurementAggregator globalAggregator = new MeasurementAggregator(); + private static final Semaphore AGGREGATOR_PERMITS = new Semaphore(1); + private static final Map LOCATION_STORE = new ConcurrentHashMap<>(); + + public static void main(String[] args) throws IOException, InterruptedException { + CalculateAverage_sudhirtumati instance = new CalculateAverage_sudhirtumati(); + instance.chunkProcess(); + } + + private void chunkProcess() throws IOException, InterruptedException { + try (FileInputStream is = new FileInputStream(FILE); + FileChannel fc = is.getChannel()) { + for (int i = 0; i < THREAD_COUNT; i++) { + PERMITS.acquire(); + Thread t = new ChunkProcessingThread(i, fc); + t.setName(STR."T\{i}"); + t.start(); + } + do { + Thread.sleep(100); + } while (PERMITS.availablePermits() != THREAD_COUNT); + } + System.out.println(globalAggregator.getResult()); + } + + static class ChunkProcessingThread extends Thread { + + private int index; + private final FileChannel fc; + private final MeasurementAggregator aggregator; + + ChunkProcessingThread(int index, FileChannel fc) { + this.index = index; + this.fc = fc; + aggregator = new MeasurementAggregator(); + } + + @Override + public void run() { + ByteBuffer buffer = ByteBuffer.allocate(index == 0 ? bufferSize : bufferSize + 50); + long fcPosition = index == 0 ? 0 : (((long) index * bufferSize) - 50); + try { + while (fc.read(buffer, fcPosition) != -1) { + buffer.flip(); + if (index != 0 /* && fc.position() != bufferSize */) { + seekStartPos(buffer); + } + processBuffer(buffer); + index += THREAD_COUNT; + fcPosition = ((long) index * bufferSize) - 50L; + if (buffer.capacity() == 8192) { + buffer = ByteBuffer.allocate(bufferSize + 50); + } + buffer.position(0); + } + AGGREGATOR_PERMITS.acquire(); + globalAggregator.process(aggregator); + AGGREGATOR_PERMITS.release(); + } + catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + PERMITS.release(); + } + + private void processBuffer(ByteBuffer buffer) throws IOException { + int mStartMark = buffer.position(); + int tStartMark = -1; + int count = buffer.position(); + do { + byte b = buffer.get(count); + if (b == SEMICOLON) { + tStartMark = count; + } + else if (b == NEW_LINE) { + byte[] locArr = new byte[tStartMark - mStartMark]; + byte[] tempArr = new byte[count - tStartMark]; + buffer.get(mStartMark, locArr); + buffer.get(mStartMark + locArr.length + 1, tempArr); + aggregator.process(locArr, tempArr); + mStartMark = count + 1; + } + count++; + } while (count < buffer.limit()); + } + + private void seekStartPos(ByteBuffer buffer) { + int i = buffer.limit() > 50 ? 49 : buffer.limit() - 2; + for (; i >= 0; i--) { + if (buffer.get(i) == NEW_LINE) { + buffer.position(i + 1); + break; + } + } + } + } + + static final class MeasurementAggregator { + private static final long MAX_VALUE_DIVIDE_10 = Long.MAX_VALUE / 10; + private final Map store = new HashMap<>(); + + public void process(MeasurementAggregator other) { + other.store.forEach((k, v) -> { + Measurement m = store.get(k); + if (m == null) { + m = new Measurement(); + store.put(k, m); + } + m.process(v); + }); + } + + public void process(byte[] location, byte[] temperature) throws IOException { + Integer hashCode = Arrays.hashCode(location); + LOCATION_STORE.computeIfAbsent(hashCode, _ -> new String(location)); + // String loc = new String(location); + Measurement measurement = store.get(hashCode); + if (measurement == null) { + measurement = new Measurement(); + store.put(hashCode, measurement); + } + double tempD = parseDouble(temperature); + measurement.process(tempD); + } + + public double parseDouble(byte[] bytes) { + long value = 0; + int exp = 0; + boolean negative = false; + int decimalPlaces = Integer.MIN_VALUE; + int index = 0; + int ch = bytes[index]; + if (ch == '-') { + negative = true; + ch = bytes[++index]; + } + while (index < bytes.length) { + if (ch >= '0' && ch <= '9') { + while (value >= MAX_VALUE_DIVIDE_10) { + value >>>= 1; + exp++; + } + value = value * 10 + (ch - '0'); + decimalPlaces++; + + } + else if (ch == '.') { + decimalPlaces = 0; + } + if (index == bytes.length - 1) { + break; + } + else { + ch = bytes[++index]; + } + } + return asDouble(value, exp, negative, decimalPlaces); + } + + private static double asDouble(long value, int exp, boolean negative, int decimalPlaces) { + if (decimalPlaces > 0 && value < Long.MAX_VALUE / 2) { + if (value < Long.MAX_VALUE / (1L << 32)) { + exp -= 32; + value <<= 32; + } + if (value < Long.MAX_VALUE / (1L << 16)) { + exp -= 16; + value <<= 16; + } + if (value < Long.MAX_VALUE / (1L << 8)) { + exp -= 8; + value <<= 8; + } + if (value < Long.MAX_VALUE / (1L << 4)) { + exp -= 4; + value <<= 4; + } + if (value < Long.MAX_VALUE / (1L << 2)) { + exp -= 2; + value <<= 2; + } + if (value < Long.MAX_VALUE / (1L << 1)) { + exp -= 1; + value <<= 1; + } + } + for (; decimalPlaces > 0; decimalPlaces--) { + exp--; + long mod = value % 5; + value /= 5; + int modDiv = 1; + if (value < Long.MAX_VALUE / (1L << 4)) { + exp -= 4; + value <<= 4; + modDiv <<= 4; + } + if (value < Long.MAX_VALUE / (1L << 2)) { + exp -= 2; + value <<= 2; + modDiv <<= 2; + } + if (value < Long.MAX_VALUE / (1L << 1)) { + exp -= 1; + value <<= 1; + modDiv <<= 1; + } + if (decimalPlaces > 1) + value += modDiv * mod / 5; + else + value += (modDiv * mod + 4) / 5; + } + final double d = Math.scalb((double) value, exp); + return negative ? -d : d; + } + + public String getResult() { + Map sortedMap = new TreeMap<>(); + store.forEach((k, v) -> sortedMap.put(LOCATION_STORE.get(k), v)); + return sortedMap.toString(); + } + } + + static final class Measurement { + private double min = Double.POSITIVE_INFINITY; + private double max = Double.NEGATIVE_INFINITY; + private double sum; + private long count; + + public void process(double value) { + if (value < min) { + min = value; + } + if (value > max) { + max = value; + } + sum += value; + count++; + } + + public void process(Measurement other) { + if (other.min < min) { + this.min = other.min; + } + if (other.max > max) { + this.max = other.max; + } + this.sum += other.sum; + this.count += other.count; + } + + public String toString() { + ResultRow result = new ResultRow(min, sum, count, max); + return result.toString(); + } + } + + private record ResultRow(double min, double sum, double count, double max) { + + public String toString() { + return STR."\{round(min)}/\{round((Math.round(sum * 10.0) / 10.0) / count)}/\{round(max)}"; + } + + private double round(double value) { + return Math.round(value * 10.0) / 10.0; + } + } + +}