diff --git a/calculate_average_plbpietrz.sh b/calculate_average_plbpietrz.sh new file mode 100755 index 0000000..bcd76ad --- /dev/null +++ b/calculate_average_plbpietrz.sh @@ -0,0 +1,20 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +JAVA_OPTS="" +java $JAVA_OPTS -Xmx99m --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_plbpietrz diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_plbpietrz.java b/src/main/java/dev/morling/onebrc/CalculateAverage_plbpietrz.java new file mode 100644 index 0000000..9fb3825 --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_plbpietrz.java @@ -0,0 +1,273 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.RandomAccessFile; +import java.io.UncheckedIOException; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.Charset; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class CalculateAverage_plbpietrz { + + private static final String FILE = "./measurements.txt"; + private static final int READ_SIZE = 1024; + private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); + + private static class TemperatureStats { + double min = 999, max = -999d; + double accumulated; + int count; + + public void update(double temp) { + this.min = Math.min(this.min, temp); + this.max = Math.max(this.max, temp); + this.accumulated += temp; + this.count++; + } + } + + private record FilePart(long pos, long size) { + } + + private static class WeatherStation { + private int length; + private int nameHash; + private byte[] nameBytes; + private String string; + + public WeatherStation() { + nameBytes = new byte[128]; + } + + public WeatherStation(WeatherStation station) { + this.nameBytes = Arrays.copyOf(station.nameBytes, station.length); + this.length = station.length; + this.nameHash = station.nameHash; + } + + @Override + public int hashCode() { + return nameHash; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o instanceof WeatherStation s) { + return this.nameHash == s.nameHash && Arrays.equals(this.nameBytes, 0, this.length, s.nameBytes, 0, s.length); + } + return false; + } + + @Override + public String toString() { + if (string == null) + string = new String(nameBytes, 0, length, Charset.defaultCharset()); + return string; + } + + public void appendByte(byte b) { + string = null; + nameBytes[length++] = b; + nameHash = nameHash * 31 + b; + } + + public void clear() { + this.length = 0; + this.nameHash = 0; + this.string = null; + } + + } + + public static void main(String[] args) throws IOException { + Path inputFilePath = Path.of(FILE); + Map results; + try (RandomAccessFile inputFile = new RandomAccessFile(inputFilePath.toFile(), "r")) { + var parsedBuffers = partitionInput(inputFile) + .stream() + .parallel() + .map(fp -> getMappedByteBuffer(fp, inputFile)) + .map(CalculateAverage_plbpietrz::parseBuffer); + results = parsedBuffers.flatMap(m -> m.entrySet().stream()) + .collect( + Collectors.groupingBy( + Map.Entry::getKey, + Collectors.reducing( + new TemperatureStats(), + Map.Entry::getValue, + CalculateAverage_plbpietrz::mergeTemperatureStats))); + try (PrintWriter pw = new PrintWriter(new BufferedOutputStream(System.out))) { + formatResults(pw, results); + } + } + } + + private static List partitionInput(RandomAccessFile inputFile) throws IOException { + List fileParts = new ArrayList<>(); + long fileLength = inputFile.length(); + + long blockSize = Math.min(fileLength, Math.max(READ_SIZE, fileLength / CPU_COUNT)); + + for (long start = 0, end; start < fileLength; start = end) { + end = findMinBlockOffset(inputFile, start, blockSize); + fileParts.add(new FilePart(start, end - start)); + } + return fileParts; + } + + private static long findMinBlockOffset(RandomAccessFile file, long startPosition, long minBlockSize) throws IOException { + long length = file.length(); + if (startPosition + minBlockSize < length) { + file.seek(startPosition + minBlockSize); + while (file.readByte() != '\n') { + } + return file.getFilePointer(); + } + else { + return length; + } + } + + private static MappedByteBuffer getMappedByteBuffer(FilePart fp, RandomAccessFile inputFile) { + try { + return inputFile.getChannel().map(FileChannel.MapMode.READ_ONLY, fp.pos, fp.size); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static Map parseBuffer(MappedByteBuffer buffer) { + byte[] readLong = new byte[READ_SIZE]; + byte[] temperature = new byte[32]; + int temperatureLineLenght = 0; + + int limit = buffer.limit(); + boolean readingName = true; + Map temperatures = new HashMap<>(); + WeatherStation station = new WeatherStation(); + + int bytesToRead = Math.min(READ_SIZE, limit - buffer.position()); + while (bytesToRead > 0) { + if (bytesToRead == READ_SIZE) { + buffer.get(readLong); + } + else { + for (int j = 0; j < bytesToRead; ++j) + readLong[j] = buffer.get(); + } + + for (int i = 0; i < bytesToRead; ++i) { + byte aChar = readLong[i]; + if (readingName) { + if (aChar != ';') { + if (aChar != '\n') { + station.appendByte(aChar); + } + } + else { + readingName = false; + } + } + else { + if (aChar != '\n') { + temperature[temperatureLineLenght++] = aChar; + } + else { + double temp = parseTemperature(temperature, temperatureLineLenght); + + if (!temperatures.containsKey(station)) { + temperatures.put(new WeatherStation(station), new TemperatureStats()); + } + TemperatureStats weatherStats = temperatures.get(station); + weatherStats.update(temp); + + station.clear(); + temperatureLineLenght = 0; + readingName = true; + } + } + } + + bytesToRead = Math.min(READ_SIZE, limit - buffer.position()); + } + return temperatures; + } + + private static double parseTemperature(byte[] temperature, int temperatureSize) { + double sign = 1; + double manitssa = 0; + double exponent = 1; + for (int i = 0; i < temperatureSize; ++i) { + byte c = temperature[i]; + switch (c) { + case '-': + sign = -1; + break; + case '.': + for (int j = i; j < temperatureSize - 1; ++j) + exponent *= 0.1; + break; + default: + manitssa = manitssa * 10 + (c - 48); + } + } + return sign * manitssa * exponent; + } + + private static TemperatureStats mergeTemperatureStats(TemperatureStats v1, TemperatureStats v2) { + TemperatureStats acc = new TemperatureStats(); + acc.min = Math.min(v1.min, v2.min); + acc.max = Math.max(v1.max, v2.max); + acc.accumulated = v1.accumulated + v2.accumulated; + acc.count = v1.count + v2.count; + return acc; + } + + private static void formatResults(PrintWriter pw, Map resultsMap) { + pw.print('{'); + var results = new ArrayList<>(resultsMap.entrySet()); + results.sort(Comparator.comparing(e -> e.getKey().toString())); + var iterator = results.iterator(); + while (iterator.hasNext()) { + var entry = iterator.next(); + TemperatureStats stats = entry.getValue(); + pw.printf("%s=%.1f/%.1f/%.1f", + entry.getKey(), + stats.min, + stats.accumulated / stats.count, + stats.max); + if ((iterator.hasNext())) + pw.print(", "); + } + pw.println('}'); + } + +}