ddimtirov - lifted parallel mmapped i/o from Sam Pullara's implementation
This commit is contained in:
parent
57cfa54c68
commit
1923fc65a8
@ -15,14 +15,18 @@
|
||||
*/
|
||||
package dev.morling.onebrc;
|
||||
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.CharBuffer;
|
||||
import java.nio.MappedByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.TreeMap;
|
||||
|
||||
// gunnar morling - 2:10
|
||||
@ -30,217 +34,167 @@ import java.util.TreeMap;
|
||||
// 0:37
|
||||
|
||||
public class CalculateAverage_ddimtirov {
|
||||
private static final String FILE = "./measurements.txt";
|
||||
|
||||
private static final int HASH_NO_CLASH_MODULUS = 49999;
|
||||
private static final int OFFSET_MIN = 0;
|
||||
private static final int OFFSET_MAX = 1;
|
||||
private static final int OFFSET_COUNT = 2;
|
||||
|
||||
@SuppressWarnings("RedundantSuppression")
|
||||
public static void main(String[] args) throws IOException {
|
||||
// var start = Instant.now();
|
||||
Instant start = null;
|
||||
var path = Path.of("./measurements.txt");
|
||||
var bufferSize = 512 * 64; // 64 blocks
|
||||
var tracker = new Tracker();
|
||||
var charset = StandardCharsets.UTF_8;
|
||||
var path = Path.of(FILE);
|
||||
var start = Instant.now();
|
||||
var desiredSegmentsCount = Runtime.getRuntime().availableProcessors();
|
||||
|
||||
// Files.lines() is optimized for files that can be indexed by int
|
||||
// For larger files it falls back to buffered reader, which we now
|
||||
// use directly to be able to tweak the buffer size.
|
||||
try (var stream = Files.newInputStream(path); var reader = new InputStreamReader(stream, charset)) {
|
||||
var buffered = new RecordReader(reader, bufferSize);
|
||||
var segments = FileSegment.forFile(path, desiredSegmentsCount);
|
||||
|
||||
InputRecord record = null;
|
||||
while (true) {
|
||||
record = buffered.readRecord(record);
|
||||
if (record==null) break;
|
||||
|
||||
tracker.process(record);
|
||||
var trackers = segments.stream().parallel().map(segment -> {
|
||||
try (var fileChannel = (FileChannel) Files.newByteChannel(path, StandardOpenOption.READ)) {
|
||||
var tracker = new Tracker();
|
||||
var segmentBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, segment.start(), segment.size());
|
||||
tracker.processSegment(segmentBuffer, segment.end());
|
||||
return tracker;
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}).toList();
|
||||
|
||||
System.out.println(tracker.stationsToMetrics());
|
||||
var result = summarizeTrackers(trackers);
|
||||
System.out.println(result);
|
||||
|
||||
//noinspection ConstantValue
|
||||
if (start!=null) System.err.println(Duration.between(start, Instant.now()));
|
||||
|
||||
assert Files.readAllLines(Path.of("expected_result.txt")).getFirst().equals(tracker.stationsToMetrics().toString());
|
||||
assert Files.readAllLines(Path.of("expected_result.txt")).getFirst().equals(result);
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Reads records we can use to do the filtering from a stream of characters.
|
||||
* Takes care of framing, and I/O buffering.
|
||||
* This class in combination with {@link RecordReader} allows us to fully hide the
|
||||
* I/O and internal representation.
|
||||
*
|
||||
* <p>If used with recycled {@link InputRecord} instances, this class allocates no memory
|
||||
* after instantiation. The class is stateful, and the internals are not threadsafe
|
||||
* - use from single thread or with proper synchronization.
|
||||
*/
|
||||
static class RecordReader {
|
||||
/**
|
||||
* The source of input data
|
||||
*/
|
||||
private final Readable input;
|
||||
|
||||
/**
|
||||
* Used for i/o buffering and record parsing.
|
||||
* @see #buf
|
||||
*/
|
||||
private final CharBuffer buffer;
|
||||
record FileSegment(long start, long end) {
|
||||
public long size() { return end() - start(); }
|
||||
|
||||
/**
|
||||
* <p>Cached backing array from {@link #buffer}.
|
||||
* <p>This is optimization because {@link CharBuffer#array()} was showing on the CPU profile.
|
||||
*/
|
||||
private final char[] buf;
|
||||
public static List<FileSegment> forFile(Path file, int desiredSegmentsCount) throws IOException {
|
||||
try (var raf = new RandomAccessFile(file.toFile(), "r")) {
|
||||
List<FileSegment> segments = new ArrayList<>();
|
||||
long fileSize = raf.length();
|
||||
long segmentSize = fileSize / desiredSegmentsCount;
|
||||
for (int segmentIdx = 0; segmentIdx < desiredSegmentsCount; segmentIdx++) {
|
||||
long segStart = segmentIdx * segmentSize;
|
||||
long segEnd = (segmentIdx == desiredSegmentsCount - 1) ? fileSize : segStart + segmentSize;
|
||||
segStart = findSegmentBoundary(raf, segmentIdx, 0, segStart, segEnd);
|
||||
segEnd = findSegmentBoundary(raf, segmentIdx, desiredSegmentsCount - 1, segEnd, fileSize);
|
||||
|
||||
public RecordReader(Readable input, int bufferSize) {
|
||||
this.input = input;
|
||||
buffer = CharBuffer.allocate(bufferSize).flip();
|
||||
buf = buffer.array();
|
||||
segments.add(new FileSegment(segStart, segEnd));
|
||||
}
|
||||
return segments;
|
||||
}
|
||||
}
|
||||
|
||||
public InputRecord readRecord(InputRecord recycled) throws IOException {
|
||||
var record = parseRecord(recycled);
|
||||
if (record!=null) return record;
|
||||
private static long findSegmentBoundary(RandomAccessFile raf, int i, int skipForSegment, long location, long fileSize) throws IOException {
|
||||
if (i == skipForSegment) return location;
|
||||
|
||||
if (input.read(buffer.compact())==-1) return null;
|
||||
buffer.flip();
|
||||
|
||||
return parseRecord(recycled);
|
||||
}
|
||||
|
||||
|
||||
private InputRecord parseRecord(InputRecord recycled) {
|
||||
var lim = buffer.limit();
|
||||
if (buffer.isEmpty()) return null;
|
||||
|
||||
var nameOff = buffer.position();
|
||||
var buff = buf;
|
||||
while (buff[nameOff]=='\n' || buff[nameOff]=='\r' || buff[nameOff]==' ' ) {
|
||||
nameOff++;
|
||||
if (nameOff>=lim) return null;
|
||||
raf.seek(location);
|
||||
while (location < fileSize) {
|
||||
location++;
|
||||
if (raf.read() == '\n') break;
|
||||
}
|
||||
|
||||
var nameHash = 0;
|
||||
var nameLen = 0;
|
||||
while (buff[nameOff+nameLen]!=';') {
|
||||
nameHash = nameHash*31 + buff[nameOff+nameLen];
|
||||
nameLen++;
|
||||
if (nameOff+nameLen>=lim) return null;
|
||||
}
|
||||
|
||||
//noinspection DuplicateExpressions
|
||||
assert new String(buf, nameOff, nameLen).hashCode()==nameHash
|
||||
: "'%s'@%d !-> %d".formatted(new String(buf, nameOff, nameLen), new String(buf, nameOff, nameLen).hashCode(), nameHash);
|
||||
|
||||
var valCursor = nameOff + nameLen +1;
|
||||
int signum = 1;
|
||||
var acc = 0;
|
||||
while (true) {
|
||||
if (valCursor >= lim) {
|
||||
return null;
|
||||
}
|
||||
char c = buff[valCursor++];
|
||||
if (c == '\n' || c == '\r') {
|
||||
break;
|
||||
}
|
||||
if (c=='.') continue;
|
||||
if (acc == 0) {
|
||||
if (c == '-') {
|
||||
signum = -1;
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
acc *= 10;
|
||||
}
|
||||
var v = c - '0';
|
||||
assert v>=0 && v<=9 : String.format("Character '%s', value %,d", c, v);
|
||||
acc += v;
|
||||
}
|
||||
|
||||
buffer.position(valCursor);
|
||||
|
||||
var record = recycled!=null ? recycled : new InputRecord(buf);
|
||||
record.init(nameHash, nameOff, nameLen, acc*signum);
|
||||
return record;
|
||||
return location;
|
||||
}
|
||||
}
|
||||
|
||||
private static String summarizeTrackers(List<Tracker> trackers) {
|
||||
var result = new TreeMap<String, String>();
|
||||
for (int i = 0; i < HASH_NO_CLASH_MODULUS; i++) {
|
||||
String name = null;
|
||||
|
||||
int min = Integer.MAX_VALUE;
|
||||
int max = Integer.MIN_VALUE;
|
||||
long sum = 0;
|
||||
long count = 0;
|
||||
for (Tracker tracker : trackers) {
|
||||
if (tracker.names[i]==null) continue;
|
||||
if (name==null) name = tracker.names[i];
|
||||
|
||||
var minn = tracker.minMaxCount[i*3];
|
||||
var maxx = tracker.minMaxCount[i*3+1];
|
||||
if (minn<min) min = minn;
|
||||
if (maxx>max) max = maxx;
|
||||
count += tracker.minMaxCount[i*3+2];
|
||||
sum += tracker.sums[i];
|
||||
}
|
||||
if (name==null) continue;
|
||||
|
||||
var mean = Math.round((double) sum / count) / 10.0;
|
||||
result.put(name, (min/10.0) + "/" + mean + "/" + (max/10.0));
|
||||
}
|
||||
return result.toString();
|
||||
}
|
||||
|
||||
static class Tracker {
|
||||
private static final int ADDRESS_NO_CLASH_MODULUS = 49999;
|
||||
private static final int OFFSET_MIN = 0;
|
||||
private static final int OFFSET_MAX = 1;
|
||||
private static final int OFFSET_COUNT = 2;
|
||||
private final int[] minMaxCount = new int[HASH_NO_CLASH_MODULUS * 3];
|
||||
private final long[] sums = new long[HASH_NO_CLASH_MODULUS];
|
||||
private final String[] names = new String[HASH_NO_CLASH_MODULUS];
|
||||
private final byte[] nameThreadLocal = new byte[64];
|
||||
|
||||
private final int[] minMaxCount = new int[ADDRESS_NO_CLASH_MODULUS * 3];
|
||||
private final long[] sums = new long[ADDRESS_NO_CLASH_MODULUS];
|
||||
private final String[] names = new String[ADDRESS_NO_CLASH_MODULUS];
|
||||
private void processSegment(MappedByteBuffer segmentBuffer, long segmentEnd) {
|
||||
int startLine;
|
||||
int limit = segmentBuffer.limit();
|
||||
while ((startLine = segmentBuffer.position()) < limit) {
|
||||
int pos = startLine;
|
||||
byte b;
|
||||
|
||||
public void process(InputRecord r) {
|
||||
var i = Math.abs(r.nameHash) % ADDRESS_NO_CLASH_MODULUS;
|
||||
int nameLength = 0, nameHash = 0;
|
||||
while (pos != segmentEnd && (b = segmentBuffer.get(pos++)) != ';') {
|
||||
nameHash = nameHash*31 + b;
|
||||
nameLength++;
|
||||
}
|
||||
|
||||
if (names[i]==null) names[i] = r.name();
|
||||
int temperature = 0, sign = 1;
|
||||
outer:
|
||||
while (pos != segmentEnd && (b = segmentBuffer.get(pos++)) != '\n') {
|
||||
switch (b) {
|
||||
case '\r' :
|
||||
pos++;
|
||||
break outer;
|
||||
case '.' :
|
||||
break;
|
||||
case '-' :
|
||||
sign = -1;
|
||||
break;
|
||||
default :
|
||||
var digit = b - '0';
|
||||
assert digit >= 0 && digit <= 9;
|
||||
temperature = 10 * temperature + digit;
|
||||
}
|
||||
}
|
||||
|
||||
sums[i] += r.value;
|
||||
processLine(nameHash, segmentBuffer, startLine, nameLength, temperature * sign);
|
||||
segmentBuffer.position(pos);
|
||||
}
|
||||
}
|
||||
|
||||
public void processLine(int nameHash, MappedByteBuffer buffer, int nameOffset, int nameLength, int temperature) {
|
||||
var i = Math.abs(nameHash) % HASH_NO_CLASH_MODULUS;
|
||||
|
||||
if (names[i]==null) {
|
||||
names[i] = parseName(buffer, nameOffset, nameLength);
|
||||
} else {
|
||||
assert parseName(buffer, nameOffset, nameLength).equals(names[i]) : parseName(buffer, nameOffset, nameLength) + "!=" + names[i];
|
||||
}
|
||||
|
||||
sums[i] += temperature;
|
||||
|
||||
int mmcIndex = i * 3;
|
||||
var min = minMaxCount[mmcIndex + OFFSET_MIN];
|
||||
var max = minMaxCount[mmcIndex + OFFSET_MAX];
|
||||
if (r.value < min) minMaxCount[mmcIndex + OFFSET_MIN] = r.value;
|
||||
if (r.value > max) minMaxCount[mmcIndex + OFFSET_MAX] = r.value;
|
||||
if (temperature < min) minMaxCount[mmcIndex + OFFSET_MIN] = temperature;
|
||||
if (temperature > max) minMaxCount[mmcIndex + OFFSET_MAX] = temperature;
|
||||
|
||||
minMaxCount[mmcIndex + OFFSET_COUNT]++;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public Map<String, String> stationsToMetrics() {
|
||||
var m = new TreeMap<String, String>();
|
||||
for (int i = 0; i < names.length; i++) {
|
||||
var name = names[i];
|
||||
if (name==null) continue;
|
||||
|
||||
var min = minMaxCount[i*3] / 10.0;
|
||||
var max = minMaxCount[i*3+1] / 10.0;
|
||||
var count = minMaxCount[i*3+2];
|
||||
var sum = sums[i];
|
||||
var mean = Math.round((double) sum / count) / 10.0;
|
||||
|
||||
m.put(name, min + "/" + mean + "/" + max);
|
||||
}
|
||||
return m;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static class InputRecord {
|
||||
private final char[] chars;
|
||||
private int idOffset;
|
||||
private int idLength;
|
||||
|
||||
public int value; // fixpoint scaled by 10
|
||||
public int nameHash;
|
||||
|
||||
public InputRecord(char[] chars) {
|
||||
this.chars = chars;
|
||||
}
|
||||
|
||||
public void init(int nameHash, int nameOffset, int nameLength, int fixpointValue) {
|
||||
assert nameOffset+nameLength<chars.length : String.format("idOffset+idLength=%d < chars.length=%d", nameOffset+nameLength, chars.length);
|
||||
|
||||
this.idOffset = nameOffset;
|
||||
this.idLength = nameLength;
|
||||
this.value = fixpointValue;
|
||||
this.nameHash = nameHash;
|
||||
}
|
||||
|
||||
public String name() {
|
||||
return new String(chars, idOffset, idLength);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return name() + ";" + (value / 10.0);
|
||||
private String parseName(MappedByteBuffer buffer, int nameOffset, int nameLength) {
|
||||
buffer.get(nameOffset, nameThreadLocal, 0, nameLength);
|
||||
return new String(nameThreadLocal, 0, nameLength, StandardCharsets.UTF_8);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user