diff --git a/calculate_average_adriacabeza.sh b/calculate_average_adriacabeza.sh new file mode 100755 index 0000000..984ab03 --- /dev/null +++ b/calculate_average_adriacabeza.sh @@ -0,0 +1,22 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# sdk use java 21.0.1-graal 1>&2 + + +JAVA_OPTS="-XX:+UseStringDeduplication -XX:+UnlockExperimentalVMOptions -XX:+UseEpsilonGC" +java --enable-preview -classpath target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_adriacabeza + diff --git a/prepare_adriacabeza.sh b/prepare_adriacabeza.sh new file mode 100755 index 0000000..f83a3ff --- /dev/null +++ b/prepare_adriacabeza.sh @@ -0,0 +1,19 @@ +#!/bin/bash +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source "$HOME/.sdkman/bin/sdkman-init.sh" +sdk use java 21.0.1-graal 1>&2 diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_adriacabeza.java b/src/main/java/dev/morling/onebrc/CalculateAverage_adriacabeza.java new file mode 100644 index 0000000..56a2c4d --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_adriacabeza.java @@ -0,0 +1,223 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * This class calculates average measurements from a file in a parallelized manner. + */ +public class CalculateAverage_adriacabeza { + + private static final Path FILE_PATH = Paths.get("./measurements.txt"); + public static final int CITY_NAME_MAX_CHARACTERS = 128; + + /** + * Represents result containing a HashMap with city as key and ResultRow as value. + */ + private static class Result { + private static class StationData { + private int min, sum, count, max; + + public StationData(int value) { + this.count = 1; + this.sum = value; + this.min = value; + this.max = value; + } + + public void update(int value) { + this.count++; + this.sum += value; + this.min = Math.min(this.min, value); + this.max = Math.max(this.max, value); + } + + public String toString() { + return "%.1f/%.1f/%.1f".formatted(min / 10.0, sum / 10.0 / count, max / 10.0); + } + + } + private final Map resultMap; + + public Result() { + this.resultMap = new HashMap<>(); + } + + public Map getResultMap() { + return resultMap; + } + + public void addMeasurement(String city, int value) { + resultMap.compute(city, (_, resultRow) -> { + if (resultRow == null) { + return new StationData(value); + } else { + resultRow.update(value); + return resultRow; + } + }); + } + + public void merge(Result other) { + other.getResultMap().forEach((city, resultRow) -> + resultMap.merge(city, resultRow, (existing, incoming) -> { + existing.min = Math.min(existing.min, incoming.min); + existing.max = Math.max(existing.max, incoming.max); + existing.sum += incoming.sum; + existing.count += incoming.count; + return existing; + })); + } + + public String toString() { + return this.resultMap.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .map(entry -> "%s=%s".formatted(entry.getKey(), entry.getValue())) + .collect(Collectors.joining(", ", "{", "}")); + } + } + + /** + * Finds the ending position in the file, ensuring it ends at the beginning of a line. + * + * @param channel File channel + * @param position Current position in the file + * @return Ending position at the beginning of a line + * @throws IOException If an I/O error occurs + */ + private static long findEndPosition(FileChannel channel, long position) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(1); + + // Iterate over the file from the given position to find the next newline character + while (position < channel.size()) { + channel.read(buffer, position); + + // Check if the current byte is a newline character + if (buffer.get(0) == '\n') { + return position + 1; // Return the position immediately after the newline + } + + position++; + buffer.clear(); + } + + return channel.size(); // Return the end of the file if no newline is found after the current position + } + + + /** + * Gets the mapped byte buffers for parallel processing. + * + * @param nProcessors Number of processors for parallelization + * @return List of MappedByteBuffers + * @throws IOException If an I/O error occurs + */ + private static List getMappedByteBuffers(int nProcessors) throws IOException { + try (FileChannel channel = FileChannel.open(FILE_PATH, StandardOpenOption.READ)) { + long fileSize = channel.size(); + long chunkSize = (fileSize + nProcessors - 1) / nProcessors; + long pos = 0; + + List buffers = new ArrayList<>(nProcessors); + for (int i = 0; i < nProcessors; i++) { + long endPosition = findEndPosition(channel, pos + chunkSize); + long size = endPosition - pos; + MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, pos, size); + pos = pos + size; + buffers.add(buffer); + } + return buffers; + } + } + + /** + * Calculates average measurements from the file. + * + * @return Result containing min/mean/max values for each city + */ + private static Result calculateAverageMeasurements(List chunks) { + // Process each buffer in parallel + return chunks.parallelStream() + .map(buffer -> { + Result partialResult = new Result(); + var limit = buffer.limit(); + var field = new byte[CITY_NAME_MAX_CHARACTERS]; + while (buffer.position() < limit) { + var fieldCurrentIndex = 0; + field[fieldCurrentIndex++] = buffer.get(); + while (buffer.position() < limit) { + var fieldByte = buffer.get(); + if (fieldByte == ';') + break; + field[fieldCurrentIndex++] = fieldByte; + } + var fieldStr = new String(field, 0, fieldCurrentIndex); + var number = 0; + var sign = 1; + while (buffer.position() < limit) { + var numberByte = buffer.get(); + if (numberByte == '-') + sign = -1; + else if (numberByte == '\n') + break; + else if (numberByte != '.') + number = number * 10 + (numberByte - '0'); + } + partialResult.addMeasurement(fieldStr, sign * number); + } + return partialResult; + }).reduce(new Result(), (partialResult1, partialResult2) -> { + Result result = new Result(); + result.merge(partialResult1); + result.merge(partialResult2); + return result; + }); + } + + /** + * The main method to run the average measurements calculations program. + * + * @param args Command line arguments. Not utilized in this program. + */ + public static void main(String[] args) { + try { + // Get the MappedByteBuffers by splitting the file evenly across available processors + var buffers = getMappedByteBuffers(Runtime.getRuntime().availableProcessors()); + + // Calculate the average measurements from the buffers obtained + var measurements = calculateAverageMeasurements(buffers); + + // Print the measurements result to the console. + System.out.println(measurements); + + } catch (IOException e) { + // Handle any potential I/O exceptions by printing the error message to the console + System.err.println(STR."Error processing file: \{e.getMessage()}"); + } + } +}