diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_tivrfoa.java b/src/main/java/dev/morling/onebrc/CalculateAverage_tivrfoa.java index 54f13cb..e6e9632 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_tivrfoa.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_tivrfoa.java @@ -38,34 +38,44 @@ import java.util.concurrent.atomic.AtomicInteger; * already, and maybe even 1st place for the 10k too. * See: https://github.com/gunnarmorling/1brc/pull/606 * - * But as I was already coding something, I'll submit just to - * see if it will be faster than his *previous* 10k time of - * 00:04.516 - * - * Changes: - * It's a similar idea of my previous solution, that if you split - * the chunks evenly, some threads might finish much faster and - * stay idle, so: - * 1) Create more chunks than threads, so the ones that finish first - * can do something; - * 2) Decrease chunk sizes as we get closer to the end of the file. + * As I was not able to make it faster ... so I'll make it slower, + * because my current solution should *not* stay at the top, as it added + * basically nothing. */ public class CalculateAverage_tivrfoa { private static final String FILE = "./measurements.txt"; - private static final int MIN_TEMP = -999; - private static final int MAX_TEMP = 999; + + private static final int MAX_CITIES = 10_000; + private static final int BUCKETS_LEN = 1 << 17; + private static final int LAST_BUCKET_ENTRY = BUCKETS_LEN - 1; + private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors(); + private static final AtomicInteger chunkIdx = new AtomicInteger(); + private static long[] chunks; + private static int numChunks; // Holding the current result for a single city. private static class Result { - long lastNameLong, secondLastNameLong; + long lastNameLong; long[] name; int count; short min, max; long sum; - private Result() { - this.min = MAX_TEMP; - this.max = MIN_TEMP; + private Result(short number, long nameAddress, byte nameLength, Scanner scanner) { + this.min = number; + this.max = number; + this.sum = number; + this.count = 1; + + name = new long[(nameLength / Long.BYTES) + 1]; + int pos = 0, i = 0; + for (; i < nameLength + 1 - Long.BYTES; i += Long.BYTES) { + name[pos++] = scanner.getLongAt(nameAddress + i); + } + + int remainingShift = (64 - (nameLength + 1 - i) << 3); + lastNameLong = (scanner.getLongAt(nameAddress + i) << remainingShift); + name[pos] = lastNameLong >> remainingShift; } public String toString() { @@ -88,6 +98,17 @@ public class CalculateAverage_tivrfoa { count += other.count; } + private void add(short number) { + if (number < min) { + min = number; + } + if (number > max) { + max = number; + } + sum += number; + count++; + } + public String calcName() { ByteBuffer bb = ByteBuffer.allocate(name.length * Long.BYTES).order(ByteOrder.nativeOrder()); bb.asLongBuffer().put(name); @@ -99,139 +120,95 @@ public class CalculateAverage_tivrfoa { } } - private static final int NUM_CPUS = Runtime.getRuntime().availableProcessors(); - private static final AtomicInteger chunkIdx = new AtomicInteger(); - private static long[] chunks; - private static int numChunks; + /** + * From: + * https://github.com/OpenHFT/Zero-Allocation-Hashing/blob/ea/src/main/java/net/openhft/hashing/XXH3.java + * + * Less collisions, but it will make the code slower. xD + * + * One interesting thing about Thomas' solution that I + * started to work with (d0a28599), is that it basically does not have + * any collision for the small data set (sometimes none!), but it + * has lots of collisions for the 10k, hence its poor performance. + * + */ + private static long XXH3_avalanche(long h64) { + h64 ^= h64 >>> 37; + h64 *= 0x165667919E3779F9L; + return h64 ^ (h64 >>> 32); + } private static final class SolveChunk extends Thread { - private long chunkStart, chunkEnd; - private Result[] results = new Result[10_000]; - private Result[] buckets = new Result[1 << 17]; + private int chunkStartIdx; + private Result[] results = new Result[MAX_CITIES]; + private Result[] buckets = new Result[BUCKETS_LEN]; private int resIdx = 0; - public SolveChunk(long chunkStart, long chunkEnd) { - this.chunkStart = chunkStart; - this.chunkEnd = chunkEnd; + public SolveChunk(int chunkStartIdx) { + this.chunkStartIdx = chunkStartIdx; } @Override public void run() { - parseLoop(); - int chunk = chunkIdx.getAndIncrement(); - if (chunk < numChunks) { - chunkStart = chunks[chunk]; - chunkEnd = chunks[chunk + 1]; - run(); - } - } + for (; chunkStartIdx < numChunks; chunkStartIdx = chunkIdx.getAndIncrement()) { + Scanner scanner = new Scanner(chunks[chunkStartIdx], chunks[chunkStartIdx + 1]); + long word = scanner.getLong(); + long pos = findDelimiter(word); + while (scanner.hasNext()) { + long nameAddress = scanner.pos(); + long hash = 0; - private void parseLoop() { - Scanner scanner = new Scanner(chunkStart, chunkEnd); - long word = scanner.getLong(); - long pos = findDelimiter(word); - while (scanner.hasNext()) { - long nameAddress = scanner.pos(); - long hash = 0; + while (true) { + if (pos != 0) { + pos = Long.numberOfTrailingZeros(pos) >>> 3; + scanner.add(pos); + word = mask(word, pos); + hash ^= XXH3_avalanche(word); + break; + } + else { + scanner.add(8); + hash ^= XXH3_avalanche(word); + } - // Search for ';', one long at a time. - if (pos != 0) { - pos = Long.numberOfTrailingZeros(pos) >>> 3; - scanner.add(pos); - word = mask(word, pos); - hash = word; - - int number = scanNumber(scanner); - long nextWord = scanner.getLong(); - long nextPos = findDelimiter(nextWord); - - Result existingResult = buckets[hashToIndex(hash, buckets)]; - if (existingResult != null && existingResult.lastNameLong == word) { - word = nextWord; - pos = nextPos; - record(existingResult, number); - continue; + word = scanner.getLong(); + pos = findDelimiter(word); + } + + byte nameLength = (byte) (scanner.pos() - nameAddress); + short number = scanNumber(scanner); + + int tableIndex = hashToIndex(hash); + outer: while (true) { + Result existingResult = buckets[tableIndex]; + if (existingResult == null) { + var newResult = new Result(number, nameAddress, nameLength, scanner); + buckets[tableIndex] = newResult; + results[resIdx++] = newResult; + break; + } + int i = 0; + int namePos = 0; + for (; i < nameLength + 1 - 8; i += 8) { + if (namePos >= existingResult.name.length || existingResult.name[namePos++] != scanner.getLongAt(nameAddress + i)) { + tableIndex = (tableIndex + 31) & (LAST_BUCKET_ENTRY); + continue outer; + } + } + + int remainingShift = (64 - (nameLength + 1 - i) << 3); + if (((existingResult.lastNameLong ^ (scanner.getLongAt(nameAddress + i) << remainingShift)) == 0)) { + existingResult.add(number); + break; + } + else { + tableIndex = (tableIndex + 31) & (LAST_BUCKET_ENTRY); + } } - scanner.setPos(nameAddress + pos); - } - else { - scanner.add(8); - hash = word; - long prevWord = word; word = scanner.getLong(); pos = findDelimiter(word); - if (pos != 0) { - pos = Long.numberOfTrailingZeros(pos) >>> 3; - scanner.add(pos); - word = mask(word, pos); - hash ^= word; - - Result existingResult = buckets[hashToIndex(hash, buckets)]; - if (existingResult != null && existingResult.lastNameLong == word && existingResult.secondLastNameLong == prevWord) { - int number = scanNumber(scanner); - word = scanner.getLong(); - pos = findDelimiter(word); - record(existingResult, number); - continue; - } - } - else { - scanner.add(8); - hash ^= word; - while (true) { - word = scanner.getLong(); - pos = findDelimiter(word); - if (pos != 0) { - pos = Long.numberOfTrailingZeros(pos) >>> 3; - scanner.add(pos); - word = mask(word, pos); - hash ^= word; - break; - } - else { - scanner.add(8); - hash ^= word; - } - } - } } - - // Save length of name for later. - int nameLength = (int) (scanner.pos() - nameAddress); - int number = scanNumber(scanner); - - // Final calculation for index into hash table. - int tableIndex = hashToIndex(hash, buckets); - outer: while (true) { - Result existingResult = buckets[tableIndex]; - if (existingResult == null) { - existingResult = newEntry(buckets, nameAddress, tableIndex, nameLength, scanner); - results[resIdx++] = existingResult; - } - // Check for collision. - int i = 0; - int namePos = 0; - for (; i < nameLength + 1 - 8; i += 8) { - if (namePos >= existingResult.name.length || existingResult.name[namePos++] != scanner.getLongAt(nameAddress + i)) { - tableIndex = (tableIndex + 31) & (buckets.length - 1); - continue outer; - } - } - - int remainingShift = (64 - (nameLength + 1 - i) << 3); - if (((existingResult.lastNameLong ^ (scanner.getLongAt(nameAddress + i) << remainingShift)) == 0)) { - record(existingResult, number); - break; - } - else { - // Collision error, try next. - tableIndex = (tableIndex + 31) & (buckets.length - 1); - } - } - - word = scanner.getLong(); - pos = findDelimiter(word); } } } @@ -247,77 +224,49 @@ public class CalculateAverage_tivrfoa { } } - public static void main(String[] args) throws Exception { - boolean runTrick = true; - for (var arg : args) { - if (arg.equals("--worker")) { - runTrick = false; - break; - } - } - if (runTrick) { - spawnWorker(); - return; - } - + public static void main(String[] args) throws InterruptedException, IOException { chunks = getSegments(NUM_CPUS); numChunks = chunks.length - 1; final SolveChunk[] threads = new SolveChunk[NUM_CPUS]; chunkIdx.set(NUM_CPUS); for (int i = 0; i < NUM_CPUS; i++) { - threads[i] = new SolveChunk(chunks[i], chunks[i + 1]); + threads[i] = new SolveChunk(i); threads[i].start(); } + System.out.println(getMap(threads)); + System.out.close(); + } + + private static TreeMap getMap(SolveChunk[] threads) throws InterruptedException { TreeMap map = new TreeMap<>(); - for (int i = 0; i < NUM_CPUS; ++i) { + threads[0].join(); + for (var r : threads[0].results) { + if (r == null) + break; + map.put(r.calcName(), r); + } + for (int i = 1; i < NUM_CPUS; ++i) { threads[i].join(); mergeIntoFinalMap(map, threads[i].results); } - System.out.println(map); - System.out.close(); + return map; } - private static void spawnWorker() throws IOException { - ProcessHandle.Info info = ProcessHandle.current().info(); - ArrayList workerCommand = new ArrayList<>(); - info.command().ifPresent(workerCommand::add); - info.arguments().ifPresent(args -> workerCommand.addAll(Arrays.asList(args))); - workerCommand.add("--worker"); - new ProcessBuilder() - .command(workerCommand) - .inheritIO() - .redirectOutput(ProcessBuilder.Redirect.PIPE) - .start() - .getInputStream() - .transferTo(System.out); - } - - private static int scanNumber(Scanner scanPtr) { + private static short scanNumber(Scanner scanPtr) { scanPtr.add(1); long numberWord = scanPtr.getLong(); int decimalSepPos = Long.numberOfTrailingZeros(~numberWord & 0x10101000); int number = convertIntoNumber(decimalSepPos, numberWord); scanPtr.add((decimalSepPos >>> 3) + 3); - return number; + return (short) number; } - private static void record(Result existingResult, int number) { - if (number < existingResult.min) { - existingResult.min = (short) number; - } - if (number > existingResult.max) { - existingResult.max = (short) number; - } - existingResult.sum += number; - existingResult.count++; - } - - private static int hashToIndex(long hash, Result[] results) { + private static int hashToIndex(long hash) { int hashAsInt = (int) (hash ^ (hash >>> 28)); int finalHash = (hashAsInt ^ (hashAsInt >>> 17)); - return (finalHash & (results.length - 1)); + return (finalHash & LAST_BUCKET_ENTRY); } private static long mask(long word, long pos) { @@ -346,28 +295,6 @@ public class CalculateAverage_tivrfoa { return tmp; } - private static Result newEntry(Result[] results, long nameAddress, int hash, int nameLength, Scanner scanner) { - Result r = new Result(); - results[hash] = r; - long[] name = new long[(nameLength / Long.BYTES) + 1]; - int pos = 0; - int i = 0; - for (; i < nameLength + 1 - Long.BYTES; i += Long.BYTES) { - name[pos++] = scanner.getLongAt(nameAddress + i); - } - - if (pos > 0) { - r.secondLastNameLong = name[pos - 1]; - } - - int remainingShift = (64 - (nameLength + 1 - i) << 3); - long lastWord = (scanner.getLongAt(nameAddress + i) << remainingShift); - r.lastNameLong = lastWord; - name[pos] = lastWord >> remainingShift; - r.name = name; - return r; - } - /** * - Split 70% of the file in even chunks for all cpus; * - Create smaller chunks for the remainder of the file.