From d5854d65e60b381d66e746eef1fa31c1770aee7c Mon Sep 17 00:00:00 2001 From: Aleksei <971356+bytesfellow@users.noreply.github.com> Date: Sun, 28 Jan 2024 18:06:18 +0100 Subject: [PATCH] Bytesfellow initial submittion (#619) * Latest snapshot (#1) preparing initial version * Improved performance to 20seconds (-9seconds from the previous version) (#2) improved performance a bit * Improved performance to 14 seconds (-6 seconds) (#3) improved performance to 14 seconds * sync branches (#4) * initial commit * some refactoring of methods * some fixes for partitioning * some fixes for partitioning * fixed hacky getcode for utf8 bytes * simplified getcode for partitioning * temp solution with syncing * temp solution with syncing * new stream processing * new stream processing * some improvements * cleaned stuff * run configuration * round buffer for the stream to pages * not using compute since it's slower than straightforward get/put. using own byte array equals. * using parallel gc * avoid copying bytes when creating a station object * formatting * Copy less arrays. Improved performance to 12.7 seconds (-2 seconds) (#5) * initial commit * some refactoring of methods * some fixes for partitioning * some fixes for partitioning * fixed hacky getcode for utf8 bytes * simplified getcode for partitioning * temp solution with syncing * temp solution with syncing * new stream processing * new stream processing * some improvements * cleaned stuff * run configuration * round buffer for the stream to pages * not using compute since it's slower than straightforward get/put. using own byte array equals. * using parallel gc * avoid copying bytes when creating a station object * formatting * some tuning to increase performance * some tuning to increase performance * avoid copying data; fast hashCode with slightly more collisions * avoid copying data; fast hashCode with slightly more collisions * cleanup (#6) * tidy up --- calculate_average_bytesfellow.sh | 19 + .../onebrc/CalculateAverage_bytesfellow.java | 557 ++++++++++++++++++ 2 files changed, 576 insertions(+) create mode 100755 calculate_average_bytesfellow.sh create mode 100644 src/main/java/dev/morling/onebrc/CalculateAverage_bytesfellow.java diff --git a/calculate_average_bytesfellow.sh b/calculate_average_bytesfellow.sh new file mode 100755 index 0000000..eb21169 --- /dev/null +++ b/calculate_average_bytesfellow.sh @@ -0,0 +1,19 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +JAVA_OPTS="-Xms12g -Xmx12g -XX:+AlwaysPreTouch -XX:+UseParallelGC -XX:-OmitStackTraceInFastThrow " +java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_bytesfellow diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_bytesfellow.java b/src/main/java/dev/morling/onebrc/CalculateAverage_bytesfellow.java new file mode 100644 index 0000000..869b195 --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_bytesfellow.java @@ -0,0 +1,557 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.IntStream; + +public class CalculateAverage_bytesfellow { + + public static final String CPU_CORES_1BRC_ENV_VARIABLE = "CPU_CORES_1BRC"; + private static final byte Separator = ';'; + + private static final double SchedulerCpuRatio = 0.4; + + private static final int availableCpu = System.getenv(CPU_CORES_1BRC_ENV_VARIABLE) != null ? Integer.parseInt(System.getenv(CPU_CORES_1BRC_ENV_VARIABLE)) + : Runtime.getRuntime().availableProcessors(); + + private static final int SchedulerPoolSize = Math.max((int) (availableCpu * SchedulerCpuRatio), 1); + private static final int SchedulerQueueSize = Math.min(SchedulerPoolSize * 3, 12); + private static final int PartitionsNumber = Math.max((availableCpu - SchedulerPoolSize), 1); + private static final int PartitionExecutorQueueSize = 1000; + + private static final int InputStreamBlockSize = 4096; + private static final int InputStreamReadBufferLen = 250 * InputStreamBlockSize; + + static class Partition { + + private static final AtomicInteger cntr = new AtomicInteger(-1); + private final Map partitionResult = new HashMap<>(10000); // as per requirement we have not more than 10K keys + private final AtomicInteger leftToExecute = new AtomicInteger(0); + + private final String name = "partition-" + cntr.incrementAndGet(); + + private final Executor executor = new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(PartitionExecutorQueueSize) { // some limit to avoid OOM + @Override + public boolean offer(Runnable runnable) { + try { + put(runnable); // block if limit was exceeded + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + return true; + } + }, r -> { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName(name); + return t; + }); + + public void scheduleToProcess(byte[] slice, List lines) { + + if (!lines.isEmpty()) { + leftToExecute.incrementAndGet(); + executor.execute( + () -> { + for (int i = 0; i < lines.size(); i++) { + LineParams lineParams = lines.get(i); + + Measurement measurement = getMeasurement(slice, lineParams); + + MeasurementAggregator measurementAggregator = partitionResult.get(measurement.station); + if (measurementAggregator == null) { + partitionResult.put(new Station(measurement.station), new MeasurementAggregator().withMeasurement(measurement)); + } + else { + measurementAggregator.withMeasurement(measurement); + } + } + + leftToExecute.decrementAndGet(); + }); + } + + } + + public void materializeNames() { + partitionResult.keySet().forEach(Station::materializeName); + } + + public Map getResult() { + return partitionResult; + } + + public boolean allTasksCompleted() { + return leftToExecute.get() == 0; + } + + } + + record LineParams(int start, int length) { + } + + static class Partitioner { + + private final List allPartitions = new ArrayList<>(); + private final int partitionsSize; + + AtomicInteger jobsScheduled = new AtomicInteger(0); + + final Executor scheduler = new ThreadPoolExecutor(SchedulerPoolSize, SchedulerPoolSize, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(SchedulerQueueSize) { // some limit to avoid OOM + + @Override + public Runnable take() throws InterruptedException { + return super.take(); + } + + @Override + public boolean offer(Runnable runnable) { + try { + put(runnable); // preventing unlimited scheduling due to possible OOM + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + return true; + } + }, r -> { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName("scheduler"); + return t; + }); + + Partitioner(int partitionsSize) { + IntStream.range(0, partitionsSize).forEach((i) -> allPartitions.add(new Partition())); + this.partitionsSize = partitionsSize; + } + + private int partitionsSize() { + return partitionsSize; + } + + void processSlice(byte[] slice) { + + jobsScheduled.incrementAndGet(); + + scheduler.execute(() -> { + List> partitionedLines = new ArrayList<>(partitionsSize()); + // allocate some capacity, assuming that on average lines are half of the max (407 bytes) length + IntStream.range(0, partitionsSize()).forEach((p) -> partitionedLines.add(new ArrayList<>(slice.length / 407 / 2))); + + int start = 0; + int i = 0; + int startCharLen = 0; + while (i < slice.length) { + + if (slice[i] == '\n' || i == (slice.length - 1)) { + + int lineLength = i - start + (i == (slice.length - 1) ? 1 : 0); + LineParams lineParams = new LineParams(start, lineLength); + + int partitioningCode = getPartitioningCode(slice, start, getUtf8CharNumberOfBytes(slice[start])); + int partition = computePartition(partitioningCode); + + partitionedLines.get(partition).add(lineParams); + start = i + 1; + + } + + i++; + } + + processPartitionedBatch(slice, partitionedLines); + + jobsScheduled.decrementAndGet(); + }); + + } + + private static byte[] getLine(byte[] slice, int lineLength, int start) { + byte[] line = new byte[lineLength]; + System.arraycopy(slice, start, line, 0, lineLength); + return line; + } + + private void processPartitionedBatch(byte[] slice, List> partitionedLines) { + for (int i = 0; i < partitionedLines.size(); i++) { + allPartitions.get(i).scheduleToProcess(slice, partitionedLines.get(i)); + } + } + + private int computePartition(int code) { + return Math.abs(code % partitionsSize()); + } + + private static int getPartitioningCode(byte[] line, int start, int utf8CharNumberOfBytes) { + // seems good enough + if (utf8CharNumberOfBytes == 4) { + return line[start] + line[start + 1] + line[start + 2] + line[start + 3]; + } + else if (utf8CharNumberOfBytes == 3) { + return line[start] + line[start + 1] + line[start + 2]; + } + else if (utf8CharNumberOfBytes == 2) { + return line[start] + line[start + 1]; + } + else { + return line[start]; + } + } + + SortedMap getAllResults() { + allPartitions.parallelStream().forEach(Partition::materializeNames); + SortedMap result = new TreeMap<>(); + allPartitions.forEach((p) -> result.putAll(p.getResult())); + return result; + } + + public boolean allTasksCompleted() { + return allPartitions.stream().allMatch(Partition::allTasksCompleted); + } + + } + + private static final String FILE = "./measurements.txt"; + + public static class Station implements Comparable { + + private final byte[] inputSlice; + private final int hash; + + private final int startIdx; + private final int len; + + private volatile String nameAsString; + + public Station(byte[] inputSlice, int startIdx, int len) { + this.inputSlice = inputSlice; + this.startIdx = startIdx; + this.len = len; + this.hash = hashcodeFast(); + } + + public Station(Station from) { + this.inputSlice = new byte[from.len]; + System.arraycopy(from.inputSlice, from.startIdx, this.inputSlice, 0, from.len); + this.startIdx = 0; + this.len = from.len; + this.hash = from.hash; + } + + private int hashcodeFast() { + if (len == 0) { + return 0; + } + else if (len == 1) { + return inputSlice[startIdx] * 109; + } + else if (len == 2) { + return inputSlice[startIdx + 1] * 109 * 109 + inputSlice[startIdx]; + } + else if (len == 3) { + return inputSlice[startIdx + 2] * 109 * 109 * 109 + inputSlice[startIdx + 1] * 109 * 109 + inputSlice[startIdx]; + } + else { + return inputSlice[startIdx + 3] * 109 * 109 * 109 * 109 + inputSlice[startIdx + 2] * 109 * 109 * 109 + inputSlice[startIdx + 1] * 109 * 109 + + inputSlice[startIdx]; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + Station station = (Station) o; + + if (len != station.len) { + return false; + } + + return Arrays.equals(inputSlice, startIdx, startIdx + len, station.inputSlice, station.startIdx, station.startIdx + len); + } + + @Override + public int hashCode() { + return hash; + } + + @Override + public int compareTo(Station o) { + return materializeName().compareTo(o.materializeName()); // + } + + public String materializeName() { + if (nameAsString == null) { + byte[] nameForMaterialization = new byte[len]; + System.arraycopy(inputSlice, startIdx, nameForMaterialization, 0, len); + nameAsString = new String(nameForMaterialization, StandardCharsets.UTF_8); + } + + return nameAsString; + } + + @Override + public String toString() { + return materializeName(); + } + } + + private record Measurement(Station station, long value) { + } + + private record ResultRow(long min, long sum, long count, long max) { + + public String toString() { + return fakeDouble(min) + "/" + round((double) sum / (double) count / 10.0) + "/" + fakeDouble(max); + } + + private String fakeDouble(long value) { + long positiveValue = value < 0 ? -value : value; + long wholePart = positiveValue / 10; + String positiveDouble = wholePart + "." + (positiveValue - wholePart * 10); + + + return (value < 0 ? "-" : "") + positiveDouble; + } + + private double round(double value) { + return Math.round(value * 10.0) / 10.0; + } + + } + + public static class MeasurementAggregator { + private long min = Long.MAX_VALUE; + private long max = Long.MIN_VALUE; + private long sum; + private long count; + + MeasurementAggregator withMeasurement(Measurement m) { + + min = Math.min(min, m.value); + max = Math.max(max, m.value); + sum += m.value; + count++; + + return this; + } + + @Override + public String toString() { + return new ResultRow(min, sum, count, max).toString(); + } + + } + + private static long parseToLongIgnoringDecimalPoint(byte[] slice, int startIndex, int len) { + long value = 0; + + int start = startIndex; + if (slice[startIndex] == '-') { + start = startIndex + 1; + } + + for (int i = start; i < startIndex + len; i++) { + if (slice[i] == '.') { + continue; + } + + if (i > 0) { + value = multipleByTen(value); // *= 10; + } + value += digitAsLong(slice, i); + } + + return start > startIndex ? -value : value; + } + + private static long multipleByTen(long value) { + return (value << 3) + (value << 1); + } + + private static long digitAsLong(byte[] digits, int position) { + return (digits[position] - 48); + } + + public static void main(String[] args) throws IOException { + + Partitioner partitioner = new Partitioner(PartitionsNumber); + + try (FileInputStream fileInputStream = new FileInputStream(FILE)) { + parseStreamWithBytes(fileInputStream, InputStreamReadBufferLen, partitioner::processSlice); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + showResults(partitioner); + + } + + static void parseStreamWithBytes(InputStream inputStream, int bufferLen, Consumer sliceConsumer) throws IOException { + + byte[] byteArray = new byte[bufferLen]; + int offset = 0; + int lenToRead = bufferLen; + + int readLen; + + while ((readLen = inputStream.read(byteArray, offset, lenToRead)) > -1) { + if (readLen == 0) { + continue; + } + + int traverseLen = Math.min(offset + readLen, bufferLen); + int lastLineBreakInSlicePosition = traverseLen; + + for (int j = traverseLen - 1; j >= 0; j--) { + if (byteArray[j] == '\n') { + lastLineBreakInSlicePosition = j + 1; + break; + } + } + + if (lastLineBreakInSlicePosition == traverseLen) { + // todo: end of line was not found in a slice? + } + + int sliceSize = lastLineBreakInSlicePosition / SchedulerPoolSize; + + int s = 0; + + int j = Math.min(sliceSize, lastLineBreakInSlicePosition - 1); + while (s < lastLineBreakInSlicePosition && j < lastLineBreakInSlicePosition) { + if (byteArray[j] == '\n') { + int len = j - s; + byte[] slice = new byte[len]; + System.arraycopy(byteArray, s, slice, 0, len); + sliceConsumer.accept(slice); + + s = j + 1; + j = Math.min(s + sliceSize, lastLineBreakInSlicePosition - 1); + + } + else { + j++; + } + } + + if (s < traverseLen && lastLineBreakInSlicePosition < traverseLen) { + // some tail left, carry it over to the next read + int len = traverseLen - s; + System.arraycopy(byteArray, s, byteArray, 0, len); + offset = len; + lenToRead = bufferLen - len; + } + else { + offset = 0; + lenToRead = bufferLen; + } + } + } + + static int getUtf8CharNumberOfBytes(byte firstByteOfChar) { + int masked = firstByteOfChar & 0b11111000; + if (masked == 0b11110000) { + return 4; + } + else if (masked == 0b11100000) { + return 3; + } + else if (masked == 0b11000000) { + return 2; + } + else { + return 1; + } + } + + static void showResults(Partitioner partitioner) { + + CountDownLatch c = new CountDownLatch(1); + partitioner.scheduler.execute(() -> { + + try { + // check if any unprocessed slices + while (partitioner.jobsScheduled.get() > 0) { + } + + // check if anything left in partitions + while (!partitioner.allTasksCompleted()) { + } + + SortedMap result = partitioner.getAllResults(); + System.out.println(result); // output aggregated measurements according to the requirement + } + catch (Exception e) { + System.out.println(e); + } + c.countDown(); + }); + + try { + c.await(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + + } + + private static Measurement getMeasurement(byte[] slice, LineParams lineParams) { + int idx = lastIndexOfSeparator(slice, lineParams); + return new Measurement( + new Station(slice, lineParams.start, idx - lineParams.start), + parseToLongIgnoringDecimalPoint(slice, idx + 1, lineParams.start + lineParams.length - (idx + 1))); + } + + private static int lastIndexOfSeparator(byte[] slice, LineParams lineParams) { + // hacky - we know that from the end of the line we have only + // single byte characters + // -2 is also hacky since we expect a particular format at the end of the line + + int lastIdx = lineParams.start + lineParams.length() - 1; + if (slice[lastIdx - 3] == Separator) { + return lastIdx - 3; + } + else if (slice[lastIdx - 4] == Separator) { + return lastIdx - 4; + } + else if (slice[lastIdx - 5] == Separator) { + return lastIdx - 5; + } + + return -1; + } + +}