diff --git a/calculate_average_criccomini.sh b/calculate_average_criccomini.sh new file mode 100755 index 0000000..e610d83 --- /dev/null +++ b/calculate_average_criccomini.sh @@ -0,0 +1,21 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +#JAVA_OPTS="-XX:+UseZGC -server -Xms700m -Xlog:gc" +JAVA_OPTS="-XX:+UseZGC -Xms700m" +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_criccomini diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_criccomini.java b/src/main/java/dev/morling/onebrc/CalculateAverage_criccomini.java new file mode 100644 index 0000000..0e89bfc --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_criccomini.java @@ -0,0 +1,113 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.*; + +public class CalculateAverage_criccomini { + + private static final String FILE = "./measurements.txt"; + private static final long FILE_SIZE = new File(FILE).length(); + private static final long SEGMENT_SIZE = 256_000_000; + + private static class MeasurementAggregator { + private double min = Double.POSITIVE_INFINITY; + private double max = Double.NEGATIVE_INFINITY; + private double sum; + private long count; + + public String toString() { + return round(min) + "/" + round(sum / count) + "/" + round(max); + } + + private double round(double value) { + return Math.round(value * 10.0) / 10.0; + } + } + + private static Map processSegment(MappedByteBuffer buffer, int length) { + Map aggregates = new HashMap<>(); + int lineStart = 0; + int doubleStart = 0; + String station = null; + for (int i = 0; i < length; ++i) { + byte b = buffer.get(i); + if (b == ';') { + byte[] stationBuffer = new byte[i - lineStart]; + buffer.position(lineStart); + buffer.get(stationBuffer); + station = new String(stationBuffer, StandardCharsets.UTF_8); + doubleStart = i + 1; + } + else if (b == '\n') { + byte[] doubleBuffer = new byte[i - doubleStart]; + buffer.position(doubleStart); + buffer.get(doubleBuffer); + Double temperature = Double.parseDouble(new String(doubleBuffer)); + lineStart = i + 1; + + MeasurementAggregator aggregator = aggregates.computeIfAbsent(station, s -> new MeasurementAggregator()); + aggregator.min = Math.min(aggregator.min, temperature); + aggregator.max = Math.max(aggregator.max, temperature); + aggregator.sum += temperature; + aggregator.count++; + } + } + return aggregates; + } + + public static void main(String[] args) throws IOException, InterruptedException, ExecutionException { + ExecutorService executor = Executors.newFixedThreadPool(128); + RandomAccessFile file = new RandomAccessFile(FILE, "r"); + long position = 0; + List>> futures = new ArrayList<>(); + while (position < FILE_SIZE) { + int end = (int) Math.min(position + SEGMENT_SIZE, FILE_SIZE); + int length = (int) (end - position); + MappedByteBuffer buffer = file.getChannel().map(FileChannel.MapMode.READ_ONLY, position, length); + while (buffer.get(length - 1) != '\n') { + --length; + } + position += length; + int finalLength = length; + futures.add(executor.submit(() -> processSegment(buffer, finalLength))); + } + + executor.shutdown(); + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + + // Merge results into a single TreeMap + Map aggregates = new TreeMap<>(); + for (Future> future : futures) { + Map segmentAggregates = future.get(); + for (Map.Entry entry : segmentAggregates.entrySet()) { + MeasurementAggregator aggregator = aggregates.computeIfAbsent(entry.getKey(), s -> new MeasurementAggregator()); + aggregator.min = Math.min(aggregator.min, entry.getValue().min); + aggregator.max = Math.max(aggregator.max, entry.getValue().max); + aggregator.sum += entry.getValue().sum; + aggregator.count += entry.getValue().count; + } + } + System.out.println(aggregates); + } +}