From 08541525cd582c11e4c5f6bdb9b2cc5581425023 Mon Sep 17 00:00:00 2001 From: MahmoudFawzyKhalil <73137611+MahmoudFawzyKhalil@users.noreply.github.com> Date: Wed, 17 Jan 2024 22:15:34 +0200 Subject: [PATCH] MahmoudFawzyKhalil's implementation (#438) * Initial commit trying out multiple things * Clean up code * Fix rounding error to fix failing test --- calculate_average_MahmoudFawzyKhalil.sh | 19 ++ .../CalculateAverage_MahmoudFawzyKhalil.java | 190 ++++++++++++++++++ 2 files changed, 209 insertions(+) create mode 100755 calculate_average_MahmoudFawzyKhalil.sh create mode 100644 src/main/java/dev/morling/onebrc/CalculateAverage_MahmoudFawzyKhalil.java diff --git a/calculate_average_MahmoudFawzyKhalil.sh b/calculate_average_MahmoudFawzyKhalil.sh new file mode 100755 index 0000000..761d7e6 --- /dev/null +++ b/calculate_average_MahmoudFawzyKhalil.sh @@ -0,0 +1,19 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +JAVA_OPTS="--enable-preview" +java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_MahmoudFawzyKhalil diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_MahmoudFawzyKhalil.java b/src/main/java/dev/morling/onebrc/CalculateAverage_MahmoudFawzyKhalil.java new file mode 100644 index 0000000..6eb426a --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_MahmoudFawzyKhalil.java @@ -0,0 +1,190 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.nio.channels.FileChannel; +import java.util.*; +import java.util.concurrent.ForkJoinPool; + +// Solution using project Panama and Map Reduce +public class CalculateAverage_MahmoudFawzyKhalil { + + private static final String FILE = "./measurements.txt"; + + public static void main(String[] args) throws Exception { + mapReduce(); + } + + private static void mapReduce() throws IOException { + var f = new File(FILE); + try (var raf = new RandomAccessFile(f, "r")) { + FileChannel channel = raf.getChannel(); + long fileSize = channel.size(); + MemorySegment ms = channel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize, Arena.global()); + long chunkSize = fileSize / ForkJoinPool.commonPool().getParallelism(); + List chunks = getChunks(ms, chunkSize); + Map result = chunks.stream() + .parallel() + .map(c -> readChunkToMap(c, ms)) + .reduce(Collections.emptyMap(), (a, b) -> combine(a, b)); + System.out.println(new TreeMap<>(result)); + } + } + + private static List getChunks(MemorySegment ms, long chunkSize) { + List chunks = new ArrayList<>(32); + long start = 0; + long fileSize = ms.byteSize(); + long end = chunkSize; + + while (start < fileSize) { + byte b = ms.get(ValueLayout.JAVA_BYTE, end); + if (b == '\n') { + chunks.add(new Chunk(start, end)); + start = end + 1; + end = Math.min(end + chunkSize, fileSize - 2); + } + end++; + } + return chunks; + } + + private static Map readChunkToMap(Chunk chunk, MemorySegment ms) { + Map map = new HashMap<>(); + + long start = chunk.start(); + while (start < chunk.end()) { + long cityNameSize = 0; + while (ms.get(ValueLayout.JAVA_BYTE, start + cityNameSize) != ';') { + cityNameSize++; + } + + String cityName = readString(ms, start, cityNameSize); + start = start + cityNameSize + 1; + + long temperatureSize = 0; + while (ms.get(ValueLayout.JAVA_BYTE, start + temperatureSize) != '\n') { + temperatureSize++; + } + + String temperature = readString(ms, start, temperatureSize); + start = start + temperatureSize + 1; + + // System.out.println(STR."\{cityName};\{temperature}"); + addMeasurement(map, cityName, temperature); + } + + return map; + } + + // Credit goes to imrafaelmerino for combine function + private static Map combine(Map xs, Map ys) { + Map result = new HashMap<>(); + + for (var key : xs.keySet()) { + var m1 = xs.get(key); + var m2 = ys.get(key); + var combined = (m2 == null) ? m1 : (m1 == null) ? m2 : m1.combine(m2); + result.put(key, combined); + } + + for (var key : ys.keySet()) + result.putIfAbsent(key, ys.get(key)); + return result; + } + + private static String readString(MemorySegment ms, long start, long size) { + byte[] stringBytes = ms.asSlice(start, size) + .toArray(ValueLayout.JAVA_BYTE); + return new String(stringBytes); + } + + private static void addMeasurement(Map measurements, String station, String reading) { + measurements.compute(station, + (_, oldMeasurements) -> oldMeasurements == null ? MeasurementAggregate.of(reading) : oldMeasurements.update(reading)); + } + + record Chunk(long start, long end) { + } + + private static final class MeasurementAggregate { + private double min; + private double max; + private double sum; + private long count; + + private MeasurementAggregate(double min, double max, double sum, long count) { + this.min = min; + this.max = max; + this.sum = sum; + this.count = count; + } + + public static MeasurementAggregate of(String temperature) { + double measurement = Double.parseDouble(temperature); + return new MeasurementAggregate(measurement, measurement, measurement, 1); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (MeasurementAggregate) obj; + return Double.doubleToLongBits(this.min) == Double.doubleToLongBits(that.min) && + Double.doubleToLongBits(this.max) == Double.doubleToLongBits(that.max) && + Double.doubleToLongBits(this.sum) == Double.doubleToLongBits(that.sum) && + this.count == that.count; + } + + @Override + public int hashCode() { + return Objects.hash(min, max, sum, count); + } + + public MeasurementAggregate update(String part) { + double measurement = Double.parseDouble(part); + this.min = Math.min(this.min, measurement); + this.max = Math.max(this.max, measurement); + this.sum += measurement; + this.count++; + return this; + } + + public String toString() { + return min + "/" + round(round(sum) / count) + "/" + max; + } + + private double round(double value) { + return Math.round(value * 10.0) / 10.0; + } + + public MeasurementAggregate combine(MeasurementAggregate m2) { + return new MeasurementAggregate( + Math.min(this.min, m2.min), + Math.max(this.max, m2.max), + this.sum + m2.sum, + this.count + m2.count); + } + } +}