From 990483dcc8ed048add975e541f25d681a6aff8dc Mon Sep 17 00:00:00 2001 From: ags Date: Sat, 6 Jan 2024 19:13:38 +0000 Subject: [PATCH] first attempt --- calculate_average_ags313.sh | 22 ++ .../onebrc/CalculateAverage_ags313.java | 250 ++++++++++++++++++ 2 files changed, 272 insertions(+) create mode 100755 calculate_average_ags313.sh create mode 100644 src/main/java/dev/morling/onebrc/CalculateAverage_ags313.java diff --git a/calculate_average_ags313.sh b/calculate_average_ags313.sh new file mode 100755 index 0000000..3e07ce1 --- /dev/null +++ b/calculate_average_ags313.sh @@ -0,0 +1,22 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source "$HOME/.sdkman/bin/sdkman-init.sh" +sdk use java 21.0.1-graal 1>&2 + +JAVA_OPTS="-server -Xnoclassgc -Xmx1G" +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_ags313 diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_ags313.java b/src/main/java/dev/morling/onebrc/CalculateAverage_ags313.java new file mode 100644 index 0000000..9af3ca6 --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_ags313.java @@ -0,0 +1,250 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import sun.misc.Unsafe; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.lang.reflect.Field; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.nio.channels.FileChannel; +import java.util.HashMap; +import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class CalculateAverage_ags313 { + + private static final int THREAD_COUNT = 8; + private static final int FUTURE_BUFFER = 1024; + private static final int ALLOCATION = 128 * 1024 * 1024; + + // private static final String FILE = "./measurementsCut.txt"; + private static final String FILE = "./measurements.txt"; + // private static final String FILE = "./src/test/resources/samples/measurements-1.txt"; + // private static final String FILE = "./src/test/resources/samples/measurements-20.txt"; + + private static final int NAME_LENGTH_LIMIT_BYTES = 128; + private static final int BUFFER_SIZE = 108; + private static final ExecutorService exec = Executors.newFixedThreadPool(THREAD_COUNT); + + /** + * 1B rows + * 8s multithreaded + * 44s single threaded + * + * ideas: + * 1) replace hashmap with something faster(er) + * 2) play with graal + **/ + + private static class Key implements Comparable { + private final byte[] value = new byte[NAME_LENGTH_LIMIT_BYTES]; + private int hashCode; + private int length = 0; + + // https://stackoverflow.com/questions/20952739/how-would-you-convert-a-string-to-a-64-bit-integer + public void accept(byte b) { + value[length] = b; + length += 1; + hashCode = hashCode * 10191 + b; + } + + @Override + public boolean equals(Object that) { + if (this == that) + return true; + if (that == null) + return false; + Key key = (Key) that; // not checking class, nothing else uses this + if (hashCode != key.hashCode) + return false; + if (length != key.length) + return false; + for (int i = 0; i < length; i++) { + if (UNSAFE.getByte(value, i) != UNSAFE.getByte(key.value, i)) { + return false; + } + } + + return true; + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public String toString() { + return new String(value, 0, length); + } + + void reset() { + length = 0; + hashCode = 0; + } + + @Override + public int compareTo(Key other) { + return toString().compareTo(other.toString()); + } + } + + private static HashMap readChunk(FileChannel from, long start, int bytesToRead) throws IOException { + HashMap result = new HashMap<>(1024 * 16); + + var bbuffer = from.map(FileChannel.MapMode.READ_ONLY, start, bytesToRead); + + var key = new Key(); + while (bbuffer.remaining() > 1) { + key.reset(); + boolean negative = false; + var temperature = 0; + + while (bbuffer.remaining() > 0) { + byte b = bbuffer.get(); + if (b == ';') { + break; + } + key.accept(b); + } + + loop2: while (bbuffer.remaining() > 0) { + byte b = bbuffer.get(); + switch (b) { + case '-': + negative = true; + break; + case '.': + temperature = (temperature * 10) + (bbuffer.get() - '0'); // single decimal + break; + case '\n': + break loop2; + default: + temperature = (temperature * 10) + (b - '0'); + } + } + + int measure = negative ? -temperature : temperature; + Stats stats = result.computeIfAbsent(key, c -> new Stats()); + if (stats.count == 0) { + key = new Key(); + } + + stats.count++; + stats.total += measure; + if (measure < stats.min) { + stats.min = measure; + } + if (measure > stats.max) { + stats.max = measure; + } + } + + return result; + } + + public static void main(String[] args) throws Exception { + var channel = new RandomAccessFile(FILE, "r").getChannel(); + long totalToRead = channel.size(); + + Future>[] futures = new Future[FUTURE_BUFFER]; + + long allocated = 0; + int chunkCounter = 0; + while (allocated < totalToRead) { + var start = allocated; + var bytesToReadInPass = Math.min(totalToRead - allocated, ALLOCATION); + + if (bytesToReadInPass < BUFFER_SIZE) { + // System.out.println("Want to read: " + bytesToReadInPass + ", starting buffer at: " + startBufferAt); + // System.out.println("Total: " + totalToRead + ", allocated: " + allocated + ", allocating: " + (correctedBytesToRead)); + futures[chunkCounter++] = exec.submit(() -> readChunk(channel, start, (int) bytesToReadInPass)); + allocated += bytesToReadInPass; + } + else { + var startBufferAt = Math.max(0, allocated + bytesToReadInPass - BUFFER_SIZE); + var buffer = channel.map(FileChannel.MapMode.READ_ONLY, startBufferAt, BUFFER_SIZE); + + // System.out.println("Want to read: " + bytesToReadInPass + ", starting buffer at: " + startBufferAt); + + var endOfLine = 0; + for (int i = 0; i < BUFFER_SIZE; i++) { + if (buffer.get() == '\n') { + endOfLine = i; + break; + } + } + long correctedBytesToRead = bytesToReadInPass - BUFFER_SIZE + endOfLine; + + // System.out.println("Total: " + totalToRead + ", allocated: " + allocated + ", allocating: " + (correctedBytesToRead)); + futures[chunkCounter++] = exec.submit(() -> readChunk(channel, start, (int) correctedBytesToRead)); + allocated += correctedBytesToRead + 1; + } + } + + var accumulator = new HashMap(1024 * 16); + for (int i = 0; i < chunkCounter; i++) { + var aMap = futures[i].get(); + aMap.forEach((key, value) -> accumulator.computeIfAbsent(key, __ -> new Stats()).merge(value)); + } + + exec.shutdown(); + + System.out.println(new TreeMap(accumulator)); + } + + static class Stats { + private int min = Integer.MAX_VALUE; + private int max = Integer.MIN_VALUE; + private long total; + private int count; + + Stats() { + } + + @Override + public String toString() { + return BigDecimal.valueOf(min / (10.0)).setScale(1, RoundingMode.HALF_UP) + "/" + + BigDecimal.valueOf(total / (10.0 * count)).setScale(1, RoundingMode.HALF_UP) + '/' + + BigDecimal.valueOf(max / (10.0)).setScale(1, RoundingMode.HALF_UP); + } + + void merge(Stats that) { + max = Math.max(this.max, that.max); + min = Math.min(this.min, that.min); + total += that.total; + count += that.count; + } + } + + private static final Unsafe UNSAFE = unsafe(); + + private static Unsafe unsafe() { + try { + Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); + theUnsafe.setAccessible(true); + return (Unsafe) theUnsafe.get(Unsafe.class); + } + catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } +}