diff --git a/calculate_average_semotpan.sh b/calculate_average_semotpan.sh new file mode 100755 index 0000000..ef7cdf1 --- /dev/null +++ b/calculate_average_semotpan.sh @@ -0,0 +1,20 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +JAVA_OPTS="" +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_semotpan diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_semotpan.java b/src/main/java/dev/morling/onebrc/CalculateAverage_semotpan.java new file mode 100644 index 0000000..8ff122a --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_semotpan.java @@ -0,0 +1,199 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.nio.channels.FileChannel.MapMode.READ_ONLY; +import static java.nio.file.StandardOpenOption.READ; + +public class CalculateAverage_semotpan { + + private static final Path FILE = Path.of("./measurements.txt"); + + public static void main(String[] args) throws IOException, InterruptedException { + var spliterator = new SegmentSpliterator(FILE); + var summaryStations = IntStream.range(0, spliterator.segments) + .parallel() + .mapToObj(segment -> { + var buff = spliterator.buffSegment(segment); + return new SegmentReader(buff); + }) + .flatMap(segmentReader -> segmentReader.values().stream()) + .collect(Collectors.groupingByConcurrent(StationSummary::station)); + + // Complexity: O(stations * segments) + var output = new TreeMap(); + summaryStations.forEach((station, summaries) -> { + var summary = summaries.stream() + .reduce(new StationSummary(), (identity, acc) -> { + identity.min = Integer.min(identity.min, acc.min); + identity.max = Integer.max(identity.max, acc.max); + identity.sum += acc.sum; + identity.count += acc.count; + return identity; + }); + output.put(station, summary); + }); + + System.out.println(output); + } + + private static class StationSummary { + + public String station; + public int min, max, sum, count; + + StationSummary() { + min = Integer.MAX_VALUE; + max = Integer.MIN_VALUE; + } + + StationSummary(String station, int temperature) { + this.station = station; + this.min = this.max = this.sum = temperature; + this.count = 1; + } + + String station() { + return station; + } + + private double round(double value) { + return Math.round(value * 10.0) / 10.0; + } + + @Override + public String toString() { + return new StringBuilder() + .append(round((double) min * 0.1)).append('/') + .append(round((double) sum * 0.1 / count)) + .append('/').append(round((double) max * 0.1)) + .toString(); + } + } + + static class SegmentSpliterator { + + private int segments; + private final ByteBuffer[] buffSegments; + + SegmentSpliterator(Path file) throws IOException { + this.segments = Runtime.getRuntime().availableProcessors(); + var fileSize = Files.size(file); + var segmentSize = fileSize / segments; + + if (segmentSize <= (1 << 8)) { // small segment: 1 is enough + segments = 1; + } + + buffSegments = new ByteBuffer[segments]; + + var pos = 0L; + for (var s = 0; s < segments - 1; s++) { + try (var channel = (FileChannel) Files.newByteChannel(FILE, READ)) { + var buff = channel.map(READ_ONLY, pos, segmentSize); + pos = normalize(buff, (int) segmentSize - 1, pos); + buffSegments[s] = buff; + } + } + + // handle last segment + try (var channel = (FileChannel) Files.newByteChannel(FILE, READ)) { + var buff = channel.map(READ_ONLY, pos, fileSize - pos); + buffSegments[segments - 1] = buff; + } + } + + private long normalize(ByteBuffer buff, int relativePos, long pos) { + while (buff.get(relativePos) != '\n') { + relativePos--; + } + + buff.limit(relativePos + 1); + return pos + (relativePos + 1); + } + + ByteBuffer buffSegment(int index) { + return buffSegments[index]; + } + } + + static class SegmentReader { + + private final Map accumulator; + private final byte[] keyBuff = new byte[256]; + + SegmentReader(ByteBuffer byteBuffer) { + accumulator = new HashMap<>(); + read(byteBuffer); + } + + private void read(ByteBuffer buff) { + byte b; + for (int pos = 0, limit = buff.limit(); buff.hasRemaining(); buff.position(pos)) { + // parse station (key) + int offset = 0; + while ((b = buff.get(pos++)) != ';') { + keyBuff[offset++] = b; + } + + // parse temperature (value) as int (1.1 * 10 = 11) + int temp = 0; + int negative = 1; + while (pos != limit && (b = buff.get(pos++)) != '\n') { + switch (b) { + case '-': + negative = -1; + case '.': + continue; + default: + temp = 10 * temp + (b - '0'); + } + } + temp *= negative; + + // compute station + var station = new String(keyBuff, 0, offset); + var finalTemp = temp; + accumulator.compute(station, (key, summary) -> { + if (summary == null) { + return new StationSummary(station, finalTemp); + } + summary.min = Integer.min(summary.min, finalTemp); + summary.max = Integer.max(summary.max, finalTemp); + summary.sum += finalTemp; + summary.count += 1; + return summary; + }); + } + } + + Collection values() { + return accumulator.values(); + } + } +}