1brc/src/main/java/dev/morling/onebrc/CalculateAverage_entangled90.java
2024-01-07 10:58:05 +01:00

329 lines
9.9 KiB
Java

/*
* Copyright 2023 The original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.morling.onebrc;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static java.lang.Math.round;
public class CalculateAverage_entangled90 {
private static final String FILE = "./measurements.txt";
public static final boolean DEBUG = false;
public static void main(String[] args) throws IOException {
Scanner scanner = new Scanner();
long start = System.currentTimeMillis();
PooledChunkProcessor chunkProcessor = new PooledChunkProcessor(Runtime.getRuntime().availableProcessors());
scanner.scan(FILE, chunkProcessor, 0L, -1);
long finish = System.currentTimeMillis();
var map = chunkProcessor.result();
map.printResults();
if (DEBUG) {
System.out.println("Took: " + TimeUnit.MILLISECONDS.toSeconds(finish - start) + "seconds");
}
}
}
class AggregatedProcessor {
public double max = Double.NEGATIVE_INFINITY;
public double min = Double.POSITIVE_INFINITY;
private double sum = 0L;
public long count = 0L;
public void addMeasure(double value) {
max = Math.max(max, value);
min = Math.min(min, value);
sum += value;
count++;
}
public void combine(AggregatedProcessor processor) {
max = Math.max(max, processor.max);
min = Math.min(min, processor.min);
sum += processor.sum;
count += processor.count;
}
public double mean() {
return sum / count;
}
private static double round(double d) {
return Math.round(d * 10.0) / 10.0;
}
@Override
public String toString() {
return String.format(Locale.US, "%.1f/%.1f/%.1f", round(min), round(mean()), round(max));
}
}
class ProcessorMap {
Map<BytesWrapper, AggregatedProcessor> processors = new HashMap<>(1024);
public void printResults() {
// System.out.println("Processed: " + processors.entrySet().stream().mapToLong(e -> e.getValue().count).sum());
System.out.print("{");
System.out.print(
processors.entrySet().stream()
.sorted(Map.Entry.comparingByKey()).map(Object::toString).collect(Collectors.joining(", ")));
System.out.println("}");
}
public void addMeasure(BytesWrapper city, double value) {
var processor = processors.get(city);
if (processor == null) {
processor = new AggregatedProcessor();
processors.put(city, processor);
}
processor.addMeasure(value);
}
private void combine(BytesWrapper city, AggregatedProcessor processor) {
var thisProcessor = processors.get(city);
if (thisProcessor == null) {
processors.put(city, processor);
}
else {
thisProcessor.combine(processor);
}
}
public static ProcessorMap combineAll(ProcessorMap... processors) {
var result = new ProcessorMap();
for (var p : processors) {
for (var entry : p.processors.entrySet()) {
result.combine(entry.getKey(), entry.getValue());
}
}
return result;
}
}
class PooledChunkProcessor implements Consumer<ByteBuffer> {
private final ArrayBlockingQueue<ByteBuffer> queue;
private final ProcessorMap[] results;
private final CountDownLatch latch;
private volatile boolean queueClosed = false;
public PooledChunkProcessor(int n) {
queue = new ArrayBlockingQueue<>(4 * 1024);
results = new ProcessorMap[n];
latch = new CountDownLatch(n);
for (int i = 0; i < n; i++) {
int finalI = i;
var t = new Thread(() -> {
var processor = new ChunkProcessor();
while (!Thread.interrupted()) {
try {
var element = queue.poll(1, TimeUnit.MILLISECONDS);
if (element != null)
processor.processChunk(element);
else if (queueClosed) {
if (CalculateAverage_entangled90.DEBUG) {
System.out.println("Queue closed, thread #" + finalI + " stopping");
}
break;
}
}
catch (InterruptedException e) {
break;
}
}
results[finalI] = processor.processors;
latch.countDown();
});
t.start();
}
}
@Override
public void accept(ByteBuffer byteBuffer) {
try {
queue.put(byteBuffer);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public ProcessorMap result() {
try {
queueClosed = true;
latch.await();
}
catch (InterruptedException e) {
e.printStackTrace();
}
return ProcessorMap.combineAll(results);
}
}
class ChunkProcessor implements Consumer<ByteBuffer> {
ProcessorMap processors = new ProcessorMap();
public void processChunk(ByteBuffer bb) {
while (processRow(bb)) {
}
}
@Override
public void accept(ByteBuffer byteBuffer) {
processChunk(byteBuffer);
}
// true if you can continue
// false if input is missing
public boolean processRow(ByteBuffer bb) {
int colonIdx = findSemiColon(bb);
if (colonIdx < 0) {
return false;
}
while (bb.get(bb.position()) == '\n') {
bb.get();
}
var wrapper = wrapperFromBB(bb, colonIdx - bb.position());
bb.position(colonIdx + 1);
double value = parseDoubleNewLine(bb);
processors.addMeasure(wrapper, value);
return true;
}
private static BytesWrapper wrapperFromBB(ByteBuffer bb, int length) {
var bytes = new byte[length];
bb.get(bytes);
return new BytesWrapper(bytes);
}
// don't advance bb
private int findSemiColon(ByteBuffer bb) {
for (int i = bb.position(); i < bb.limit(); i++) {
if (bb.get(i) == (byte) ';')
return i;
}
return -1;
}
// parses double until new line and advances buffer
private static double parseDoubleNewLine(ByteBuffer bb) {
int result = 0;
int sign = 1;
byte c;
do {
c = bb.get();
switch (c) {
case '-':
sign = -1;
break;
case '.', ',', '\r', '\n':
break;
default:
result = result * 10 + (c - '0');
}
} while (c != '\n' && bb.position() < bb.limit());
return result * sign / 10.0;
}
}
class Scanner {
private static final int MAX_MAPPED_MEMORY = 32 * 1024 * 1024;
public void scan(String fileName, Consumer<ByteBuffer> consumer, long offset, long len) throws IOException {
try (var file = new RandomAccessFile(fileName, "r"); FileChannel channel = file.getChannel()) {
int chunkId = 0;
if (len < 0) {
len = file.length();
}
while (true) {
if (offset > 0) {
file.seek(offset);
}
MappedByteBuffer bb = channel.map(FileChannel.MapMode.READ_ONLY, offset, Math.min(MAX_MAPPED_MEMORY, len - offset));
var limitIdx = findLastNewLine(bb);
bb.limit(limitIdx);
// get last newline from the end
consumer.accept(bb);
chunkId++;
offset += limitIdx;
if (CalculateAverage_entangled90.DEBUG && chunkId % 10 == 0) {
System.out.println(" read chunk " + chunkId + " at pointer " + offset + "(" + ((int) (offset * 100 / len)) + "%)");
}
if (offset >= len - 1) {
break;
}
}
}
}
public int findLastNewLine(ByteBuffer bb) {
for (int i = bb.limit() - 1; i > bb.position(); i--) {
if (bb.get(i) == '\n') {
return i;
}
}
return -1;
}
}
class BytesWrapper implements Comparable<BytesWrapper> {
private final byte[] bytes;
public BytesWrapper(byte[] bytes) {
this.bytes = bytes;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof BytesWrapper) {
return Arrays.equals(bytes, ((BytesWrapper) obj).bytes);
}
else {
return false;
}
}
@Override
public int hashCode() {
return Arrays.hashCode(bytes);
}
@Override
public String toString() {
return new String(bytes);
}
@Override
public int compareTo(BytesWrapper bytesWrapper) {
return this.toString().compareTo(bytesWrapper.toString());
}
}