1brc/src/main/java/dev/morling/onebrc/CalculateAverage_twobiers.java

296 lines
20 KiB
Java
Raw Normal View History

/*
* Copyright 2023 The original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.morling.onebrc;
import static java.util.stream.Collectors.groupingBy;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.StreamSupport;
public class CalculateAverage_twobiers {
private static final String FILE = "./measurements.txt";
private static final FastAveragingCollector FAST_AVERAGING_COLLECTOR = new FastAveragingCollector();
// private static final VectorSpecies<Integer> SPECIES = IntVector.SPECIES_256;
private static class FastAveragingCollector implements Collector<Measurement, double[], String> {
@Override
public Supplier<double[]> supplier() {
// 0: current sum
// 1: count
// 2: current max
// 3: current min
return () -> new double[4];
}
@Override
public BiConsumer<double[], Measurement> accumulator() {
return (a, t) -> {
double val = t.value();
a[0] += val;
a[1]++;
if (val > a[2] || a[1] == 1) {
a[2] = val;
}
if (val < a[3] || a[1] == 1) {
a[3] = val;
}
};
}
@Override
public BinaryOperator<double[]> combiner() {
return (a, b) -> {
a[0] += b[0];
a[1] += b[1];
if (b[2] > a[2]) {
a[2] = b[2];
}
if (b[3] < a[3]) {
a[3] = b[3];
}
return a;
};
}
@Override
public Function<double[], String> finisher() {
return a -> {
var mean = (a[1] == 0) ? 0.0d : Math.round((a[0] / a[1]) * 10.0) / 10.0;
var max = a[2];
var min = a[3];
return min + "/" + mean + "/" + max;
};
}
@Override
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}
private static class FileChannelIterator implements Iterator<ByteBuffer> {
// File Size will be approx. 14G, 1MB Chunks sound kinda reasonable
private static final long CHUNK_SIZE = (long) Math.pow(2, 20);
private final FileChannel fileChannel;
private final long size;
private long bytesRead = 0;
public FileChannelIterator(FileChannel fileChannel) {
this.fileChannel = fileChannel;
try {
this.size = fileChannel.size();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean hasNext() {
return bytesRead < size;
}
@Override
public ByteBuffer next() {
try {
MappedByteBuffer mappedByteBuffer = fileChannel.map(MapMode.READ_ONLY, bytesRead, Math.min(CHUNK_SIZE, size - bytesRead));
// Ensure the chunks will end on a newline
int realEnd = mappedByteBuffer.limit() - 1;
while (mappedByteBuffer.get(realEnd) != '\n') {
realEnd--;
}
mappedByteBuffer.limit(++realEnd);
bytesRead += realEnd;
return mappedByteBuffer;
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
private record Measurement(String station, double value) {
}
public static void main(String[] args) throws IOException {
try (
var file = new RandomAccessFile(FILE, "r");
var channel = file.getChannel();) {
TreeMap<String, String> measurements = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(new FileChannelIterator(channel),
Spliterator.IMMUTABLE),
true)
.flatMap(a -> parseMeasurements(a).stream())
.collect(
groupingBy(
Measurement::station,
TreeMap::new,
FAST_AVERAGING_COLLECTOR));
System.out.println(measurements);
// Simple test for my generated file
assert measurements.toString().equals(
"{Abha=-30.1/18.0/72.2, Abidjan=-23.4/26.0/79.9, Abéché=-17.3/29.4/80.5, Accra=-24.3/26.4/77.9, Addis Ababa=-33.6/16.0/65.9, Adelaide=-33.7/17.3/71.1, Aden=-21.2/29.1/81.2, Ahvaz=-30.2/25.4/80.2, Albuquerque=-39.1/14.0/60.9, Alexandra=-36.6/11.0/62.9, Alexandria=-30.4/20.0/69.8, Algiers=-33.1/18.2/68.4, Alice Springs=-27.2/21.0/72.1, Almaty=-40.5/10.0/58.5, Amsterdam=-38.9/10.2/58.9, Anadyr=-58.2/-6.9/39.9, Anchorage=-45.3/2.8/54.3, Andorra la Vella=-42.9/9.8/60.6, Ankara=-36.7/12.0/63.7, Antananarivo=-30.9/17.9/70.8, Antsiranana=-23.0/25.2/75.4, Arkhangelsk=-48.2/1.3/51.4, Ashgabat=-30.4/17.1/64.9, Asmara=-32.6/15.6/65.4, Assab=-21.2/30.5/78.7, Astana=-45.5/3.5/49.3, Athens=-28.5/19.2/68.0, Atlanta=-33.3/17.0/69.7, Auckland=-33.6/15.2/65.0, Austin=-32.1/20.7/68.5, Baghdad=-29.6/22.8/71.6, Baguio=-29.9/19.5/68.7, Baku=-32.1/15.1/66.8, Baltimore=-36.4/13.1/62.8, Bamako=-27.0/27.8/82.0, Bangkok=-20.5/28.6/79.0, Bangui=-24.2/26.0/73.8, Banjul=-27.0/26.0/75.4, Barcelona=-36.1/18.2/74.0, Bata=-25.9/25.1/73.7, Batumi=-36.8/14.0/65.7, Beijing=-34.2/12.9/65.9, Beirut=-28.9/20.9/68.5, Belgrade=-35.7/12.5/65.5, Belize City=-24.8/26.7/75.3, Benghazi=-28.6/19.9/68.9, Bergen=-48.8/7.7/58.5, Berlin=-48.1/10.3/62.5, Bilbao=-34.5/14.7/65.2, Birao=-22.7/26.5/77.9, Bishkek=-37.2/11.3/62.1, Bissau=-24.1/27.0/82.6, Blantyre=-29.2/22.2/68.9, Bloemfontein=-32.1/15.6/66.1, Boise=-36.6/11.4/62.2, Bordeaux=-34.2/14.2/63.0, Bosaso=-20.8/30.0/77.2, Boston=-39.8/10.9/58.4, Bouaké=-24.8/26.0/76.8, Bratislava=-40.5/10.5/62.6, Brazzaville=-29.3/25.0/74.6, Bridgetown=-25.0/27.0/78.4, Brisbane=-27.2/21.4/69.7, Brussels=-37.0/10.5/56.6, Bucharest=-48.7/10.8/60.8, Budapest=-39.6/11.3/61.6, Bujumbura=-26.3/23.8/73.8, Bulawayo=-32.8/18.9/68.5, Burnie=-35.3/13.1/64.4, Busan=-34.8/15.0/62.8, Cabo San Lucas=-25.0/23.9/71.6, Cairns=-23.1/25.0/74.3, Cairo=-27.6/21.4/68.6, Calgary=-46.9/4.4/56.2, Canberra=-36.9/13.1/63.4, Cape Town=-33.1/16.2/63.5, Changsha=-33.3/17.4/64.8, Charlotte=-33.7/16.1/63.5, Chiang Mai=-26.4/25.8/76.2, Chicago=-39.7/9.8/60.6, Chihuahua=-33.0/18.6/72.4, Chittagong=-20.8/25.9/76.1, Chișinău=-39.5/10.2/59.2, Chongqing=-30.8/18.6/71.4, Christchurch=-37.9/12.2/62.1, City of San Marino=-40.0/11.8/64.1, Colombo=-27.2/27.4/78.0, Columbus=-39.6/11.7/63.3, Conakry=-29.0/26.4/77.7, Copenhagen=-41.2/9.1/59.8, Cotonou=-24.6/27.2/80.1, Cracow=-39.9/9.3/63.2, Da Lat=-29.9/17.9/66.6, Da Nang=-23.4/25.8/74.5, Dakar=-27.8/24.0/74.8, Dallas=-33.8/19.0/69.2, Damascus=-33.5/17.0/68.5, Dampier=-25.0/26.4/75.8, Dar es Salaam=-22.6/25.8/78.6, Darwin=-22.0/27.6/75.4, Denpasar=-24.0/23.7/76.5, Denver=-38.2/10.4/60.4, Detroit=-38.0/10.0/60.0, Dhaka=-24.9/25.9/75.7, Dikson=-59.6/-11.1/39.0, Dili=-23.6/26.6/76.5, Djibouti=-18.9/29.9/77.8, Dodoma=-26.5/22.7/73.5, Dolisie=-24.4/24.0/71.9, Douala=-23.0/26.7/77.2, Dubai=-20.8/26.9/79.8, Dublin=-38.2/9.8/62.2, Dunedin=-39.3/11.1/61.3, Durban=-26.3/20.6/73.5, Dushanbe=-33.2/14.7/66.6, Edinburgh=-45.9/9.3/58.6, Edmonton=-41.7/4.2/56.0, El Paso=-29.6/18.1/66.5, Entebbe=-27.0/21.0/72.7, Erbil=-31.8/19.5/71.9, Erzurum=-47.0/5.1/55.4, Fairbanks=-48.9/-2.3/47.3, Fianarantsoa=-33.2/17.9/68.1, Flores, Petén=-20.9/26.4/74.6, Frankfurt=-39.5/10.6/58.5, Fresno=-31.8/17.9/65.9, Fukuoka=-33.1/17.0/70.7, Gaborone=-29.0/21.0/73.1, Gabès=-34.3/19.5/70.3, Gagnoa=-24.6/26.0/80.8, Gangtok=-36.3/15.2/67.1, Garissa=-21.1/29.3/80.8, Garoua=-23.0/28.3/79.0, George Town=-23.4/27.9/78.1, Ghanzi=-33.3/21.4/73.2, Gjoa Haven=-64.9/-14.4/36.4, Guadalajara=-27.8/20.9/72.2, Guangzhou=-27.1/22.4/71.1, Guatemala City=-28.9/20.4/77.1, Halifax=-43.4/7.5/57.7, Hamburg=-41.5/9.7/65.0, Hamilton=-35.1/13.8/69.3, Hanga Roa=-29.1/20.5/70.8, Hanoi=-25.8/23.6/75.2, Harare=-33.2/18.4/69.1, Harbin=-45.1/5.0/55.0, Hargeisa=-33.4/21.7/71.1, Hat Yai=-22.5/27.0/74.3, Havana=-24.9/25.2/74.1, Helsinki=-42.3/5.9/55.2, Heraklion=-30.4/18.9/67.2, Hiroshima=-34.0/16.3/67.0, Ho Chi Minh City=-18.5/27.4/77.0, Hobart=-36.9/12.7/59.6, Hong Kong=-25.9/23.3/79.9, Honiara=-21.5/26.5/75.6, Honolulu=-22.9/25.4/73.7, Houston=-29.9/20.8/69.7, Ifrane=-36.8/11
}
}
private static List<Measurement> parseMeasurements(ByteBuffer byteBuffer) {
// Most of the code here is derived from @bjhara's implementation
// https://github.com/gunnarmorling/1brc/pull/10
var measurements = new ArrayList<Measurement>(100_000);
final int limit = byteBuffer.limit();
final byte[] buffer = new byte[64];
while (byteBuffer.position() < limit) {
final int start = byteBuffer.position();
int separatorPosition = start;
while (separatorPosition != limit &&
byteBuffer.get(separatorPosition) != ';') {
separatorPosition++;
}
int endOfLinePosition = separatorPosition; // must be after the separator
while (endOfLinePosition != limit &&
byteBuffer.get(endOfLinePosition) != '\n') {
endOfLinePosition++;
}
int nameOffset = separatorPosition - start;
byteBuffer.get(buffer, 0, nameOffset);
String key = new String(buffer, 0, nameOffset);
byteBuffer.get(); // Skip separator
int valueLength = endOfLinePosition - separatorPosition - 1;
byteBuffer.get(buffer, 0, valueLength);
double value = fastParseDouble(buffer, valueLength);
byteBuffer.get(); // Skip newline
measurements.add(new Measurement(key, value));
}
return measurements;
}
private static double fastParseDouble(byte[] bytes, int length) {
long value = 0;
int exp = 0;
boolean negative = false;
int decimalPlaces = Integer.MIN_VALUE;
for (int i = 0; i < length; i++) {
byte ch = bytes[i];
if (ch >= '0' && ch <= '9') {
value = value * 10 + (ch - '0');
decimalPlaces++;
}
else if (ch == '-') {
negative = true;
}
else if (ch == '.') {
decimalPlaces = 0;
}
}
return asDouble(value, exp, negative, decimalPlaces);
}
private static double asDouble(long value, int exp, boolean negative, int decimalPlaces) {
if (decimalPlaces > 0 && value < Long.MAX_VALUE / 2) {
if (value < Long.MAX_VALUE / (1L << 32)) {
exp -= 32;
value <<= 32;
}
if (value < Long.MAX_VALUE / (1L << 16)) {
exp -= 16;
value <<= 16;
}
if (value < Long.MAX_VALUE / (1L << 8)) {
exp -= 8;
value <<= 8;
}
if (value < Long.MAX_VALUE / (1L << 4)) {
exp -= 4;
value <<= 4;
}
if (value < Long.MAX_VALUE / (1L << 2)) {
exp -= 2;
value <<= 2;
}
if (value < Long.MAX_VALUE / (1L << 1)) {
exp -= 1;
value <<= 1;
}
}
for (; decimalPlaces > 0; decimalPlaces--) {
exp--;
long mod = value % 5;
value /= 5;
int modDiv = 1;
if (value < Long.MAX_VALUE / (1L << 4)) {
exp -= 4;
value <<= 4;
modDiv <<= 4;
}
if (value < Long.MAX_VALUE / (1L << 2)) {
exp -= 2;
value <<= 2;
modDiv <<= 2;
}
if (value < Long.MAX_VALUE / (1L << 1)) {
exp -= 1;
value <<= 1;
modDiv <<= 1;
}
if (decimalPlaces > 1) {
value += modDiv * mod / 5;
}
else {
value += (modDiv * mod + 4) / 5;
}
}
final double d = Math.scalb((double) value, exp);
return negative ? -d : d;
}
}