diff --git a/calculate_average_zerninv.sh b/calculate_average_zerninv.sh new file mode 100755 index 0000000..1cc4197 --- /dev/null +++ b/calculate_average_zerninv.sh @@ -0,0 +1,19 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +JAVA_OPTS="" +java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_zerninv \ No newline at end of file diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_zerninv.java b/src/main/java/dev/morling/onebrc/CalculateAverage_zerninv.java new file mode 100644 index 0000000..8aa1a95 --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_zerninv.java @@ -0,0 +1,199 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +public class CalculateAverage_zerninv { + private static final String FILE = "./measurements.txt"; + private static final int MIN_FILE_SIZE = 1024 * 1024; + private static final char DELIMITER = ';'; + private static final char LINE_SEPARATOR = '\n'; + private static final char ZERO = '0'; + private static final char NINE = '9'; + private static final char MINUS = '-'; + + public static void main(String[] args) throws IOException { + var results = new HashMap(); + try (var channel = FileChannel.open(Path.of(FILE), StandardOpenOption.READ)) { + var fileSize = channel.size(); + var cores = Runtime.getRuntime().availableProcessors(); + var chunks = cores - 1; + var maxChunkSize = fileSize < MIN_FILE_SIZE ? fileSize : Math.min(fileSize / chunks, Integer.MAX_VALUE); + var chunkOffsets = splitByChunks(channel, maxChunkSize); + + var executor = Executors.newFixedThreadPool(cores); + List>> fResults = new ArrayList<>(); + for (int i = 1; i < chunkOffsets.size(); i++) { + final long prev = chunkOffsets.get(i - 1); + final long curr = chunkOffsets.get(i); + fResults.add(executor.submit(() -> calcForChunk(channel, prev, curr))); + } + + fResults.forEach(f -> { + try { + f.get().forEach((key, value) -> { + var result = results.get(key); + if (result != null) { + result.merge(value); + } + else { + results.put(key, value); + } + }); + } + catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + }); + executor.shutdown(); + } + System.out.println(new TreeMap<>(results)); + } + + private static List splitByChunks(FileChannel channel, long maxChunkSize) throws IOException { + long size = channel.size(); + List result = new ArrayList<>(); + long current = 0; + result.add(current); + while (current < size) { + var mbb = channel.map(FileChannel.MapMode.READ_ONLY, current, Math.min(size - current, maxChunkSize)); + int position = mbb.limit() - 1; + while (mbb.get(position) != LINE_SEPARATOR) { + position--; + } + current += position + 1; + result.add(current); + } + return result; + } + + private static Map calcForChunk(FileChannel channel, long begin, long end) throws IOException { + var results = new HashMap(10_000); + var mbb = channel.map(FileChannel.MapMode.READ_ONLY, begin, end - begin); + int cityOffset, hashCode, temperatureOffset, temperature; + byte b; + + while (mbb.hasRemaining()) { + cityOffset = mbb.position(); + hashCode = 0; + while ((b = mbb.get()) != DELIMITER) { + hashCode = 31 * hashCode + b; + } + + temperatureOffset = mbb.position(); + CityWrapper city = new CityWrapper(mbb, cityOffset, temperatureOffset - cityOffset - 1, hashCode); + + temperature = 0; + while ((b = mbb.get()) != LINE_SEPARATOR) { + if (b >= ZERO && b <= NINE) { + temperature = temperature * 10 + (b - ZERO); + } + } + if (mbb.get(temperatureOffset) == MINUS) { + temperature *= -1; + } + + var result = results.get(city); + if (result != null) { + result.addTemperature(temperature); + } + else { + results.put(city, new MeasurementAggregation().addTemperature(temperature)); + } + } + return results.entrySet() + .stream() + .collect(Collectors.toMap(entry -> entry.getKey().toString(), Map.Entry::getValue)); + } + + private static final class MeasurementAggregation { + private long sum; + private int count; + private int min = Integer.MAX_VALUE; + private int max = Integer.MIN_VALUE; + + public MeasurementAggregation addTemperature(int temperature) { + sum += temperature; + count++; + min = Math.min(temperature, min); + max = Math.max(temperature, max); + return this; + } + + public void merge(MeasurementAggregation o) { + if (o == null) { + return; + } + sum += o.sum; + count += o.count; + min = Math.min(min, o.min); + max = Math.max(max, o.max); + } + + @Override + public String toString() { + return String.format("%.1f/%.1f/%.1f", min / 10d, sum / 10d / count, max / 10d); + } + } + + private record CityWrapper(MappedByteBuffer mbb, int begin, int size, int hash) { + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + CityWrapper that = (CityWrapper) o; + if (hash != that.hash || size != that.size) { + return false; + } + for (int i = 0; i < size; i++) { + if (mbb.get(begin + i) != mbb.get(that.begin + i)) { + return false; + } + } + return true; + } + + @Override + public int hashCode() { + return hash; + } + + @Override + public String toString() { + byte[] arr = new byte[size]; + for (int i = 0; i < size; i++) { + arr[i] = mbb.get(begin + i); + } + return new String(arr); + } + } +}