329 lines
9.9 KiB
Java
329 lines
9.9 KiB
Java
/*
|
|
* Copyright 2023 The original authors
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
package dev.morling.onebrc;
|
|
|
|
import java.io.IOException;
|
|
import java.io.RandomAccessFile;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.MappedByteBuffer;
|
|
import java.nio.channels.FileChannel;
|
|
import java.nio.charset.StandardCharsets;
|
|
import java.util.*;
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.function.Consumer;
|
|
import java.util.stream.Collectors;
|
|
|
|
import static java.lang.Math.round;
|
|
|
|
public class CalculateAverage_entangled90 {
|
|
|
|
private static final String FILE = "./measurements.txt";
|
|
|
|
public static final boolean DEBUG = false;
|
|
|
|
public static void main(String[] args) throws IOException {
|
|
Scanner scanner = new Scanner();
|
|
long start = System.currentTimeMillis();
|
|
PooledChunkProcessor chunkProcessor = new PooledChunkProcessor(Runtime.getRuntime().availableProcessors());
|
|
scanner.scan(FILE, chunkProcessor, 0L, -1);
|
|
long finish = System.currentTimeMillis();
|
|
var map = chunkProcessor.result();
|
|
map.printResults();
|
|
if (DEBUG) {
|
|
System.out.println("Took: " + TimeUnit.MILLISECONDS.toSeconds(finish - start) + "seconds");
|
|
}
|
|
}
|
|
}
|
|
|
|
class AggregatedProcessor {
|
|
public double max = Double.NEGATIVE_INFINITY;
|
|
public double min = Double.POSITIVE_INFINITY;
|
|
private double sum = 0L;
|
|
public long count = 0L;
|
|
|
|
public void addMeasure(double value) {
|
|
max = Math.max(max, value);
|
|
min = Math.min(min, value);
|
|
sum += value;
|
|
count++;
|
|
}
|
|
|
|
public void combine(AggregatedProcessor processor) {
|
|
max = Math.max(max, processor.max);
|
|
min = Math.min(min, processor.min);
|
|
sum += processor.sum;
|
|
count += processor.count;
|
|
}
|
|
|
|
public double mean() {
|
|
return sum / count;
|
|
}
|
|
|
|
private static double round(double d) {
|
|
return Math.round(d * 10.0) / 10.0;
|
|
}
|
|
|
|
@Override
|
|
public String toString() {
|
|
return String.format(Locale.US, "%.1f/%.1f/%.1f", round(min), round(mean()), round(max));
|
|
}
|
|
}
|
|
|
|
class ProcessorMap {
|
|
Map<BytesWrapper, AggregatedProcessor> processors = new HashMap<>(1024);
|
|
|
|
public void printResults() {
|
|
// System.out.println("Processed: " + processors.entrySet().stream().mapToLong(e -> e.getValue().count).sum());
|
|
System.out.print("{");
|
|
System.out.print(
|
|
processors.entrySet().stream()
|
|
.sorted(Map.Entry.comparingByKey()).map(Object::toString).collect(Collectors.joining(", ")));
|
|
System.out.println("}");
|
|
}
|
|
|
|
public void addMeasure(BytesWrapper city, double value) {
|
|
var processor = processors.get(city);
|
|
if (processor == null) {
|
|
processor = new AggregatedProcessor();
|
|
processors.put(city, processor);
|
|
}
|
|
processor.addMeasure(value);
|
|
}
|
|
|
|
private void combine(BytesWrapper city, AggregatedProcessor processor) {
|
|
var thisProcessor = processors.get(city);
|
|
if (thisProcessor == null) {
|
|
processors.put(city, processor);
|
|
}
|
|
else {
|
|
thisProcessor.combine(processor);
|
|
}
|
|
}
|
|
|
|
public static ProcessorMap combineAll(ProcessorMap... processors) {
|
|
var result = new ProcessorMap();
|
|
for (var p : processors) {
|
|
for (var entry : p.processors.entrySet()) {
|
|
result.combine(entry.getKey(), entry.getValue());
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
}
|
|
|
|
class PooledChunkProcessor implements Consumer<ByteBuffer> {
|
|
private final ArrayBlockingQueue<ByteBuffer> queue;
|
|
private final ProcessorMap[] results;
|
|
private final CountDownLatch latch;
|
|
|
|
private volatile boolean queueClosed = false;
|
|
|
|
public PooledChunkProcessor(int n) {
|
|
queue = new ArrayBlockingQueue<>(4 * 1024);
|
|
results = new ProcessorMap[n];
|
|
latch = new CountDownLatch(n);
|
|
for (int i = 0; i < n; i++) {
|
|
int finalI = i;
|
|
var t = new Thread(() -> {
|
|
var processor = new ChunkProcessor();
|
|
while (!Thread.interrupted()) {
|
|
try {
|
|
var element = queue.poll(1, TimeUnit.MILLISECONDS);
|
|
if (element != null)
|
|
processor.processChunk(element);
|
|
else if (queueClosed) {
|
|
if (CalculateAverage_entangled90.DEBUG) {
|
|
System.out.println("Queue closed, thread #" + finalI + " stopping");
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
catch (InterruptedException e) {
|
|
break;
|
|
}
|
|
}
|
|
results[finalI] = processor.processors;
|
|
latch.countDown();
|
|
});
|
|
t.start();
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public void accept(ByteBuffer byteBuffer) {
|
|
try {
|
|
queue.put(byteBuffer);
|
|
}
|
|
catch (InterruptedException e) {
|
|
throw new RuntimeException(e);
|
|
}
|
|
}
|
|
|
|
public ProcessorMap result() {
|
|
try {
|
|
queueClosed = true;
|
|
latch.await();
|
|
}
|
|
catch (InterruptedException e) {
|
|
e.printStackTrace();
|
|
}
|
|
return ProcessorMap.combineAll(results);
|
|
}
|
|
}
|
|
|
|
class ChunkProcessor implements Consumer<ByteBuffer> {
|
|
ProcessorMap processors = new ProcessorMap();
|
|
|
|
public void processChunk(ByteBuffer bb) {
|
|
while (processRow(bb)) {
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public void accept(ByteBuffer byteBuffer) {
|
|
processChunk(byteBuffer);
|
|
}
|
|
|
|
// true if you can continue
|
|
// false if input is missing
|
|
public boolean processRow(ByteBuffer bb) {
|
|
int colonIdx = findSemiColon(bb);
|
|
if (colonIdx < 0) {
|
|
return false;
|
|
}
|
|
while (bb.get(bb.position()) == '\n') {
|
|
bb.get();
|
|
}
|
|
var wrapper = wrapperFromBB(bb, colonIdx - bb.position());
|
|
bb.position(colonIdx + 1);
|
|
double value = parseDoubleNewLine(bb);
|
|
processors.addMeasure(wrapper, value);
|
|
return true;
|
|
}
|
|
|
|
private static BytesWrapper wrapperFromBB(ByteBuffer bb, int length) {
|
|
var bytes = new byte[length];
|
|
bb.get(bytes);
|
|
return new BytesWrapper(bytes);
|
|
}
|
|
|
|
// don't advance bb
|
|
private int findSemiColon(ByteBuffer bb) {
|
|
for (int i = bb.position(); i < bb.limit(); i++) {
|
|
if (bb.get(i) == (byte) ';')
|
|
return i;
|
|
}
|
|
return -1;
|
|
}
|
|
|
|
// parses double until new line and advances buffer
|
|
private static double parseDoubleNewLine(ByteBuffer bb) {
|
|
int result = 0;
|
|
int sign = 1;
|
|
byte c;
|
|
do {
|
|
c = bb.get();
|
|
switch (c) {
|
|
case '-':
|
|
sign = -1;
|
|
break;
|
|
case '.', ',', '\r', '\n':
|
|
break;
|
|
default:
|
|
result = result * 10 + (c - '0');
|
|
}
|
|
} while (c != '\n' && bb.position() < bb.limit());
|
|
return result * sign / 10.0;
|
|
}
|
|
}
|
|
|
|
class Scanner {
|
|
private static final int MAX_MAPPED_MEMORY = 32 * 1024 * 1024;
|
|
|
|
public void scan(String fileName, Consumer<ByteBuffer> consumer, long offset, long len) throws IOException {
|
|
try (var file = new RandomAccessFile(fileName, "r"); FileChannel channel = file.getChannel()) {
|
|
int chunkId = 0;
|
|
if (len < 0) {
|
|
len = file.length();
|
|
}
|
|
|
|
while (true) {
|
|
if (offset > 0) {
|
|
file.seek(offset);
|
|
}
|
|
MappedByteBuffer bb = channel.map(FileChannel.MapMode.READ_ONLY, offset, Math.min(MAX_MAPPED_MEMORY, len - offset));
|
|
var limitIdx = findLastNewLine(bb);
|
|
bb.limit(limitIdx);
|
|
// get last newline from the end
|
|
consumer.accept(bb);
|
|
chunkId++;
|
|
offset += limitIdx;
|
|
if (CalculateAverage_entangled90.DEBUG && chunkId % 10 == 0) {
|
|
System.out.println(" read chunk " + chunkId + " at pointer " + offset + "(" + ((int) (offset * 100 / len)) + "%)");
|
|
}
|
|
|
|
if (offset >= len - 1) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
public int findLastNewLine(ByteBuffer bb) {
|
|
for (int i = bb.limit() - 1; i > bb.position(); i--) {
|
|
if (bb.get(i) == '\n') {
|
|
return i;
|
|
}
|
|
}
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
class BytesWrapper implements Comparable<BytesWrapper> {
|
|
private final byte[] bytes;
|
|
|
|
public BytesWrapper(byte[] bytes) {
|
|
this.bytes = bytes;
|
|
}
|
|
|
|
@Override
|
|
public boolean equals(Object obj) {
|
|
if (obj instanceof BytesWrapper) {
|
|
return Arrays.equals(bytes, ((BytesWrapper) obj).bytes);
|
|
}
|
|
else {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public int hashCode() {
|
|
return Arrays.hashCode(bytes);
|
|
}
|
|
|
|
@Override
|
|
public String toString() {
|
|
return new String(bytes);
|
|
}
|
|
|
|
@Override
|
|
public int compareTo(BytesWrapper bytesWrapper) {
|
|
return this.toString().compareTo(bytesWrapper.toString());
|
|
}
|
|
}
|