diff --git a/calculate_average_entangled90.sh b/calculate_average_entangled90.sh new file mode 100755 index 0000000..f43c12d --- /dev/null +++ b/calculate_average_entangled90.sh @@ -0,0 +1,20 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +JAVA_OPTS="" +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_entangled90 diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_entangled90.java b/src/main/java/dev/morling/onebrc/CalculateAverage_entangled90.java new file mode 100644 index 0000000..8adce3c --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_entangled90.java @@ -0,0 +1,328 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import static java.lang.Math.round; + +public class CalculateAverage_entangled90 { + + private static final String FILE = "./measurements.txt"; + + public static final boolean DEBUG = false; + + public static void main(String[] args) throws IOException { + Scanner scanner = new Scanner(); + long start = System.currentTimeMillis(); + PooledChunkProcessor chunkProcessor = new PooledChunkProcessor(Runtime.getRuntime().availableProcessors()); + scanner.scan(FILE, chunkProcessor, 0L, -1); + long finish = System.currentTimeMillis(); + var map = chunkProcessor.result(); + map.printResults(); + if (DEBUG) { + System.out.println("Took: " + TimeUnit.MILLISECONDS.toSeconds(finish - start) + "seconds"); + } + } +} + +class AggregatedProcessor { + public double max = Double.NEGATIVE_INFINITY; + public double min = Double.POSITIVE_INFINITY; + private double sum = 0L; + public long count = 0L; + + public void addMeasure(double value) { + max = Math.max(max, value); + min = Math.min(min, value); + sum += value; + count++; + } + + public void combine(AggregatedProcessor processor) { + max = Math.max(max, processor.max); + min = Math.min(min, processor.min); + sum += processor.sum; + count += processor.count; + } + + public double mean() { + return sum / count; + } + + private static double round(double d) { + return Math.round(d * 10.0) / 10.0; + } + + @Override + public String toString() { + return String.format(Locale.US, "%.1f/%.1f/%.1f", round(min), round(mean()), round(max)); + } +} + +class ProcessorMap { + Map processors = new HashMap<>(1024); + + public void printResults() { + System.out.println("Processed: " + processors.entrySet().stream().mapToLong(e -> e.getValue().count).sum()); + System.out.print("{"); + System.out.print( + processors.entrySet().stream() + .sorted(Map.Entry.comparingByKey()).map(Object::toString).collect(Collectors.joining(", "))); + System.out.println("}"); + } + + public void addMeasure(BytesWrapper city, double value) { + var processor = processors.get(city); + if (processor == null) { + processor = new AggregatedProcessor(); + processors.put(city, processor); + } + processor.addMeasure(value); + } + + private void combine(BytesWrapper city, AggregatedProcessor processor) { + var thisProcessor = processors.get(city); + if (thisProcessor == null) { + processors.put(city, processor); + } + else { + thisProcessor.combine(processor); + } + } + + public static ProcessorMap combineAll(ProcessorMap... processors) { + var result = new ProcessorMap(); + for (var p : processors) { + for (var entry : p.processors.entrySet()) { + result.combine(entry.getKey(), entry.getValue()); + } + } + return result; + } +} + +class PooledChunkProcessor implements Consumer { + private final ArrayBlockingQueue queue; + private final ProcessorMap[] results; + private final CountDownLatch latch; + + private volatile boolean queueClosed = false; + + public PooledChunkProcessor(int n) { + queue = new ArrayBlockingQueue<>(4 * 1024); + results = new ProcessorMap[n]; + latch = new CountDownLatch(n); + for (int i = 0; i < n; i++) { + int finalI = i; + var t = new Thread(() -> { + var processor = new ChunkProcessor(); + while (!Thread.interrupted()) { + try { + var element = queue.poll(1, TimeUnit.MILLISECONDS); + if (element != null) + processor.processChunk(element); + else if (queueClosed) { + if (CalculateAverage_entangled90.DEBUG) { + System.out.println("Queue closed, thread #" + finalI + " stopping"); + } + break; + } + } + catch (InterruptedException e) { + break; + } + } + results[finalI] = processor.processors; + latch.countDown(); + }); + t.start(); + } + } + + @Override + public void accept(ByteBuffer byteBuffer) { + try { + queue.put(byteBuffer); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + public ProcessorMap result() { + try { + queueClosed = true; + latch.await(); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + return ProcessorMap.combineAll(results); + } +} + +class ChunkProcessor implements Consumer { + ProcessorMap processors = new ProcessorMap(); + + public void processChunk(ByteBuffer bb) { + while (processRow(bb)) { + } + } + + @Override + public void accept(ByteBuffer byteBuffer) { + processChunk(byteBuffer); + } + + // true if you can continue + // false if input is missing + public boolean processRow(ByteBuffer bb) { + int colonIdx = findSemiColon(bb); + if (colonIdx < 0) { + return false; + } + while (bb.get(bb.position()) == '\n') { + bb.get(); + } + var wrapper = wrapperFromBB(bb, colonIdx - bb.position()); + bb.position(colonIdx + 1); + double value = parseDoubleNewLine(bb); + processors.addMeasure(wrapper, value); + return true; + } + + private static BytesWrapper wrapperFromBB(ByteBuffer bb, int length) { + var bytes = new byte[length]; + bb.get(bytes); + return new BytesWrapper(bytes); + } + + // don't advance bb + private int findSemiColon(ByteBuffer bb) { + for (int i = bb.position(); i < bb.limit(); i++) { + if (bb.get(i) == (byte) ';') + return i; + } + return -1; + } + + // parses double until new line and advances buffer + private static double parseDoubleNewLine(ByteBuffer bb) { + int result = 0; + int sign = 1; + byte c; + do { + c = bb.get(); + switch (c) { + case '-': + sign = -1; + break; + case '.', ',', '\r', '\n': + break; + default: + result = result * 10 + (c - '0'); + } + } while (c != '\n' && bb.position() < bb.limit()); + return result * sign / 10.0; + } +} + +class Scanner { + private static final int MAX_MAPPED_MEMORY = 32 * 1024 * 1024; + + public void scan(String fileName, Consumer consumer, long offset, long len) throws IOException { + try (var file = new RandomAccessFile(fileName, "r"); FileChannel channel = file.getChannel()) { + int chunkId = 0; + if (len < 0) { + len = file.length(); + } + + while (true) { + if (offset > 0) { + file.seek(offset); + } + MappedByteBuffer bb = channel.map(FileChannel.MapMode.READ_ONLY, offset, Math.min(MAX_MAPPED_MEMORY, len - offset)); + var limitIdx = findLastNewLine(bb); + bb.limit(limitIdx); + // get last newline from the end + consumer.accept(bb); + chunkId++; + offset += limitIdx; + if (CalculateAverage_entangled90.DEBUG && chunkId % 10 == 0) { + System.out.println(" read chunk " + chunkId + " at pointer " + offset + "(" + ((int) (offset * 100 / len)) + "%)"); + } + + if (offset >= len - 1) { + break; + } + } + } + } + + public int findLastNewLine(ByteBuffer bb) { + for (int i = bb.limit() - 1; i > bb.position(); i--) { + if (bb.get(i) == '\n') { + return i; + } + } + return -1; + } +} + +class BytesWrapper implements Comparable { + private final byte[] bytes; + + public BytesWrapper(byte[] bytes) { + this.bytes = bytes; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof BytesWrapper) { + return Arrays.equals(bytes, ((BytesWrapper) obj).bytes); + } + else { + return false; + } + } + + @Override + public int hashCode() { + return Arrays.hashCode(bytes); + } + + @Override + public String toString() { + return new String(bytes); + } + + @Override + public int compareTo(BytesWrapper bytesWrapper) { + return this.toString().compareTo(bytesWrapper.toString()); + } +} \ No newline at end of file