From f11fad2a688f5c07595c4c7636b39d79b5a877de Mon Sep 17 00:00:00 2001 From: Vemana Date: Sun, 14 Jan 2024 18:37:03 +0530 Subject: [PATCH] Submission #5 [No bitwise tricks nor Unsafe yet; 13th place on leaderboard in local testing using evaluate2.sh] (#209) * Linear probe for city indexing. Beats current leader spullara 2.2 vs 3.8 elapsed time. * Straightforward impl using bytebuffers. Turns out memorysegments were slower than used mappedbytebuffers. * A initial submit-worthy entry Comparison to select entries (averaged over 3 runs) * spullara 1.66s [5th on leaderboard currently] * vemana (this submission) 1.65s * artsiomkorzun 1.64s [4th on leaderboard currently] Tests: PASS Impl Class: dev.morling.onebrc.CalculateAverage_vemana Machine specs * 16 core Ryzen 7950X * 128GB RAM Description * Decompose the full file into Shards of memory mapped files and process each independently, outputting a TreeMap: City -> Statistics * Compose the final answer by merging the individual TreeMap outputs * Select 1 Thread per available processor as reported by the JVM * Size to fit all datastructure in 0.5x L3 cache (4MB/core on the evaluation machines) * Use linear probing hash table, with identity of city name = byte[] and hash code computed inline * Avoid all allocation in the hot path and instead use method parameters. So, instead of passing a single Object param called Point(x, y, z), pass 3 parameters for each of its components. It is ugly, but this challenge is so far from Java's idioms anyway * G1GC seems to want to interfere; use ParallelGC instead (just a quick and dirty hack) Things tried that did not work * MemorySegments are actually slower than MappedByteBuffers * Trying to inline everything: not needed; the JIT compiler is pretty good * Playing with JIT compiler flags didn't yield clear wins. In particular, was surprised that using a max level of 3 and reducing compilation threshold did nothing.. when the jit logs print that none of the methods reach level 4 and stay there for long * Hand-coded implementation of Array.equals(..) using readLong(..) & bitmask_based_on_length from a bytebuffer instead of byte by byte * Further tuning to compile loop methods: timings are now consistenctly ahead of artsiomkorzun in 4th place. There are methods on the data path that were being interpreted for far too long. For example, the method that takes a byte range and simply calls one method per line was taking a disproportionate amount of time. Using `-XX:+AlwaysCompileLoopMethods` option improved completion time by 4%. ============= vemana =============== [20:55:22] [lsv@vemana]$ for i in 1 2 3 4 5; do ./runTheir.sh vemana; done; Using java version 21.0.1-graal in this shell. real 0m1.581s user 0m34.166s sys 0m1.435s Using java version 21.0.1-graal in this shell. real 0m1.593s user 0m34.629s sys 0m1.470s Using java version 21.0.1-graal in this shell. real 0m1.632s user 0m35.893s sys 0m1.340s Using java version 21.0.1-graal in this shell. real 0m1.596s user 0m33.074s sys 0m1.386s Using java version 21.0.1-graal in this shell. real 0m1.611s user 0m35.516s sys 0m1.438s ============= artsiomkorzun =============== [20:56:12] [lsv@vemana]$ for i in 1 2 3 4 5; do ./runTheir.sh artsiomkorzun; done; Using java version 21.0.1-graal in this shell. real 0m1.669s user 0m38.043s sys 0m1.287s Using java version 21.0.1-graal in this shell. real 0m1.679s user 0m37.840s sys 0m1.400s Using java version 21.0.1-graal in this shell. real 0m1.657s user 0m37.607s sys 0m1.298s Using java version 21.0.1-graal in this shell. real 0m1.643s user 0m36.852s sys 0m1.392s Using java version 21.0.1-graal in this shell. real 0m1.644s user 0m36.951s sys 0m1.279s ============= spullara =============== [20:57:55] [lsv@vemana]$ for i in 1 2 3 4 5; do ./runTheir.sh spullara; done; Using java version 21.0.1-graal in this shell. real 0m1.676s user 0m37.404s sys 0m1.386s Using java version 21.0.1-graal in this shell. real 0m1.652s user 0m36.509s sys 0m1.486s Using java version 21.0.1-graal in this shell. real 0m1.665s user 0m36.451s sys 0m1.506s Using java version 21.0.1-graal in this shell. real 0m1.671s user 0m36.917s sys 0m1.371s Using java version 21.0.1-graal in this shell. real 0m1.634s user 0m35.624s sys 0m1.573s ========================== Running Tests ====================== [21:17:57] [lsv@vemana]$ ./runTests.sh vemana Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-10000-unique-keys.txt Using java version 21.0.1-graal in this shell. real 0m0.150s user 0m1.035s sys 0m0.117s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-10.txt Using java version 21.0.1-graal in this shell. real 0m0.114s user 0m0.789s sys 0m0.116s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-1.txt Using java version 21.0.1-graal in this shell. real 0m0.115s user 0m0.948s sys 0m0.075s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-20.txt Using java version 21.0.1-graal in this shell. real 0m0.113s user 0m0.926s sys 0m0.066s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-2.txt Using java version 21.0.1-graal in this shell. real 0m0.110s user 0m0.734s sys 0m0.078s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-3.txt Using java version 21.0.1-graal in this shell. real 0m0.114s user 0m0.870s sys 0m0.095s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-boundaries.txt Using java version 21.0.1-graal in this shell. real 0m0.113s user 0m0.843s sys 0m0.084s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-complex-utf8.txt Using java version 21.0.1-graal in this shell. real 0m0.121s user 0m0.852s sys 0m0.171s * Improve by a few % more; now, convincingly faster than 6th place submission. So far, only algorithms and tuning; no bitwise tricks yet. Improve chunking implementation to avoid allocation and allow finegrained chunking for the last X% of work. Work now proceeds in two stages: big chunk stage and small chunk stage. This is to avoid straggler threads holding up result merging. Tests pass [07:14:49] [lsv@vemana]$ ./test.sh vemana Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-10000-unique-keys.txt Using java version 21.0.1-graal in this shell. real 0m0.152s user 0m0.973s sys 0m0.107s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-10.txt Using java version 21.0.1-graal in this shell. real 0m0.113s user 0m0.840s sys 0m0.060s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-1.txt Using java version 21.0.1-graal in this shell. real 0m0.107s user 0m0.681s sys 0m0.085s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-20.txt Using java version 21.0.1-graal in this shell. real 0m0.105s user 0m0.894s sys 0m0.068s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-2.txt Using java version 21.0.1-graal in this shell. real 0m0.099s user 0m0.895s sys 0m0.068s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-3.txt Using java version 21.0.1-graal in this shell. real 0m0.098s user 0m0.813s sys 0m0.050s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-boundaries.txt Using java version 21.0.1-graal in this shell. real 0m0.095s user 0m0.777s sys 0m0.087s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-complex-utf8.txt Using java version 21.0.1-graal in this shell. real 0m0.112s user 0m0.904s sys 0m0.069s * Merge results from finished threads instead of waiting for all threads to finish. Not a huge difference overall but no reason to wait. Also experiment with a few other compiler flags and attempt to use jitwatch to understand what the jit is doing. * Move to prepare_*.sh format and run evaluate2.sh locally. Shows 7th place in leaderboard | # | Result (m:s.ms) | Implementation | JDK | Submitter | Notes | |---|-----------------|--------------------|-----|---------------|-----------| | 1 | 00:01.588 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_thomaswue.java)| 21.0.1-graal | [Thomas Wuerthinger](https://github.com/thomaswue) | GraalVM native binary | | 2 | 00:01.866 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_merykitty.java)| 21.0.1-open | [Quan Anh Mai](https://github.com/merykitty) | | | 3 | 00:01.904 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_royvanrijn.java)| 21.0.1-graal | [Roy van Rijn](https://github.com/royvanrijn) | | | | 00:02.398 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_ebarlas.java)| 21.0.1-graal | [Elliot Barlas](https://github.com/ebarlas) | | | | 00:02.724 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_obourgain.java)| 21.0.1-open | [Olivier Bourgain](https://github.com/obourgain) | | | | 00:02.771 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_algirdasrascius.java)| 21.0.1-open | [Algirdas Ra__ius](https://github.com/algirdasrascius) | | | | 00:02.842 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_vemana.java)| 21.0.1-graal | [Vemana](https://github.com/vemana) | | | | 00:02.902 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_spullara.java)| 21.0.1-graal | [Sam Pullara](https://github.com/spullara) | | | | 00:02.906 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_artsiomkorzun.java)| 21.0.1-graal | [artsiomkorzun](https://github.com/artsiomkorzun) | | | | 00:02.970 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java)| 21.0.1-open | [Jamie Stansfield](https://github.com/isolgpus) | | * Tune chunksize to get another 2% improvement for 8 processors as used by the evaluation script. * Read int at a time for city name length detection; speeds up by 2% in local testing. * Improve reading temperature double by exiting loop quicker; no major tricks (like reading an int) yet, but good for 5th place on leaderboard in local testing. This small change has caused a surprising gain in performance by about 4%. I didn't expect such a big change, but perhaps in combination with the earlier change to read int by int for the city name, temperature reading is dominating that aspect of the time. Also, perhaps the quicker exit (as soon as you see '.' instead of reading until '\n') means you get to simply skip reading the '\n' across each of the lines. Since the lines are on average like 15 characters, it may be that avoiding reading the \n is a meaningful saving. Or maybe the JIT found a clever optimization for reading the temperature. Or maybe it is simply the case that the number of multiplications is now down to 2 from the previous 3 is what's causing the performance gain? | # | Result (m:s.ms) | Implementation | JDK | Submitter | Notes | |---|-----------------|--------------------|-----|---------------|-----------| | 1 | 00:01.531 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_thomaswue.java)| 21.0.1-graal | [Thomas Wuerthinger](https://github.com/thomaswue) | GraalVM native binary | | 2 | 00:01.794 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_royvanrijn.java)| 21.0.1-graal | [Roy van Rijn](https://github.com/royvanrijn) | | | 3 | 00:01.956 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_merykitty.java)| 21.0.1-open | [Quan Anh Mai](https://github.com/merykitty) | | | | 00:02.346 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_ebarlas.java)| 21.0.1-graal | [Elliot Barlas](https://github.com/ebarlas) | | | | 00:02.673 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_vemana.java)| 21.0.1-graal | [Subrahmanyam](https://github.com/vemana) | | | | 00:02.689 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_obourgain.java)| 21.0.1-open | [Olivier Bourgain](https://github.com/obourgain) | | | | 00:02.785 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_algirdasrascius.java)| 21.0.1-open | [Algirdas Ra__ius](https://github.com/algirdasrascius) | | | | 00:02.926 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java)| 21.0.1-open | [Jamie Stansfield](https://github.com/isolgpus) | | | | 00:02.928 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_artsiomkorzun.java)| 21.0.1-graal | [Artsiom Korzun](https://github.com/artsiomkorzun) | | | | 00:02.932 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_spullara.java)| 21.0.1-graal | [Sam Pullara](https://github.com/spullara) | | * Reduce one multiplication when temperature is +ve. * Linear probe for city indexing. Beats current leader spullara 2.2 vs 3.8 elapsed time. * Straightforward impl using bytebuffers. Turns out memorysegments were slower than used mappedbytebuffers. * A initial submit-worthy entry Comparison to select entries (averaged over 3 runs) * spullara 1.66s [5th on leaderboard currently] * vemana (this submission) 1.65s * artsiomkorzun 1.64s [4th on leaderboard currently] Tests: PASS Impl Class: dev.morling.onebrc.CalculateAverage_vemana Machine specs * 16 core Ryzen 7950X * 128GB RAM Description * Decompose the full file into Shards of memory mapped files and process each independently, outputting a TreeMap: City -> Statistics * Compose the final answer by merging the individual TreeMap outputs * Select 1 Thread per available processor as reported by the JVM * Size to fit all datastructure in 0.5x L3 cache (4MB/core on the evaluation machines) * Use linear probing hash table, with identity of city name = byte[] and hash code computed inline * Avoid all allocation in the hot path and instead use method parameters. So, instead of passing a single Object param called Point(x, y, z), pass 3 parameters for each of its components. It is ugly, but this challenge is so far from Java's idioms anyway * G1GC seems to want to interfere; use ParallelGC instead (just a quick and dirty hack) Things tried that did not work * MemorySegments are actually slower than MappedByteBuffers * Trying to inline everything: not needed; the JIT compiler is pretty good * Playing with JIT compiler flags didn't yield clear wins. In particular, was surprised that using a max level of 3 and reducing compilation threshold did nothing.. when the jit logs print that none of the methods reach level 4 and stay there for long * Hand-coded implementation of Array.equals(..) using readLong(..) & bitmask_based_on_length from a bytebuffer instead of byte by byte * Further tuning to compile loop methods: timings are now consistenctly ahead of artsiomkorzun in 4th place. There are methods on the data path that were being interpreted for far too long. For example, the method that takes a byte range and simply calls one method per line was taking a disproportionate amount of time. Using `-XX:+AlwaysCompileLoopMethods` option improved completion time by 4%. ============= vemana =============== [20:55:22] [lsv@vemana]$ for i in 1 2 3 4 5; do ./runTheir.sh vemana; done; Using java version 21.0.1-graal in this shell. real 0m1.581s user 0m34.166s sys 0m1.435s Using java version 21.0.1-graal in this shell. real 0m1.593s user 0m34.629s sys 0m1.470s Using java version 21.0.1-graal in this shell. real 0m1.632s user 0m35.893s sys 0m1.340s Using java version 21.0.1-graal in this shell. real 0m1.596s user 0m33.074s sys 0m1.386s Using java version 21.0.1-graal in this shell. real 0m1.611s user 0m35.516s sys 0m1.438s ============= artsiomkorzun =============== [20:56:12] [lsv@vemana]$ for i in 1 2 3 4 5; do ./runTheir.sh artsiomkorzun; done; Using java version 21.0.1-graal in this shell. real 0m1.669s user 0m38.043s sys 0m1.287s Using java version 21.0.1-graal in this shell. real 0m1.679s user 0m37.840s sys 0m1.400s Using java version 21.0.1-graal in this shell. real 0m1.657s user 0m37.607s sys 0m1.298s Using java version 21.0.1-graal in this shell. real 0m1.643s user 0m36.852s sys 0m1.392s Using java version 21.0.1-graal in this shell. real 0m1.644s user 0m36.951s sys 0m1.279s ============= spullara =============== [20:57:55] [lsv@vemana]$ for i in 1 2 3 4 5; do ./runTheir.sh spullara; done; Using java version 21.0.1-graal in this shell. real 0m1.676s user 0m37.404s sys 0m1.386s Using java version 21.0.1-graal in this shell. real 0m1.652s user 0m36.509s sys 0m1.486s Using java version 21.0.1-graal in this shell. real 0m1.665s user 0m36.451s sys 0m1.506s Using java version 21.0.1-graal in this shell. real 0m1.671s user 0m36.917s sys 0m1.371s Using java version 21.0.1-graal in this shell. real 0m1.634s user 0m35.624s sys 0m1.573s ========================== Running Tests ====================== [21:17:57] [lsv@vemana]$ ./runTests.sh vemana Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-10000-unique-keys.txt Using java version 21.0.1-graal in this shell. real 0m0.150s user 0m1.035s sys 0m0.117s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-10.txt Using java version 21.0.1-graal in this shell. real 0m0.114s user 0m0.789s sys 0m0.116s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-1.txt Using java version 21.0.1-graal in this shell. real 0m0.115s user 0m0.948s sys 0m0.075s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-20.txt Using java version 21.0.1-graal in this shell. real 0m0.113s user 0m0.926s sys 0m0.066s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-2.txt Using java version 21.0.1-graal in this shell. real 0m0.110s user 0m0.734s sys 0m0.078s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-3.txt Using java version 21.0.1-graal in this shell. real 0m0.114s user 0m0.870s sys 0m0.095s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-boundaries.txt Using java version 21.0.1-graal in this shell. real 0m0.113s user 0m0.843s sys 0m0.084s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-complex-utf8.txt Using java version 21.0.1-graal in this shell. real 0m0.121s user 0m0.852s sys 0m0.171s * Improve by a few % more; now, convincingly faster than 6th place submission. So far, only algorithms and tuning; no bitwise tricks yet. Improve chunking implementation to avoid allocation and allow finegrained chunking for the last X% of work. Work now proceeds in two stages: big chunk stage and small chunk stage. This is to avoid straggler threads holding up result merging. Tests pass [07:14:49] [lsv@vemana]$ ./test.sh vemana Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-10000-unique-keys.txt Using java version 21.0.1-graal in this shell. real 0m0.152s user 0m0.973s sys 0m0.107s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-10.txt Using java version 21.0.1-graal in this shell. real 0m0.113s user 0m0.840s sys 0m0.060s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-1.txt Using java version 21.0.1-graal in this shell. real 0m0.107s user 0m0.681s sys 0m0.085s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-20.txt Using java version 21.0.1-graal in this shell. real 0m0.105s user 0m0.894s sys 0m0.068s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-2.txt Using java version 21.0.1-graal in this shell. real 0m0.099s user 0m0.895s sys 0m0.068s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-3.txt Using java version 21.0.1-graal in this shell. real 0m0.098s user 0m0.813s sys 0m0.050s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-boundaries.txt Using java version 21.0.1-graal in this shell. real 0m0.095s user 0m0.777s sys 0m0.087s Validating calculate_average_vemana.sh -- src/test/resources/samples/measurements-complex-utf8.txt Using java version 21.0.1-graal in this shell. real 0m0.112s user 0m0.904s sys 0m0.069s * Merge results from finished threads instead of waiting for all threads to finish. Not a huge difference overall but no reason to wait. Also experiment with a few other compiler flags and attempt to use jitwatch to understand what the jit is doing. * Move to prepare_*.sh format and run evaluate2.sh locally. Shows 7th place in leaderboard | # | Result (m:s.ms) | Implementation | JDK | Submitter | Notes | |---|-----------------|--------------------|-----|---------------|-----------| | 1 | 00:01.588 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_thomaswue.java)| 21.0.1-graal | [Thomas Wuerthinger](https://github.com/thomaswue) | GraalVM native binary | | 2 | 00:01.866 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_merykitty.java)| 21.0.1-open | [Quan Anh Mai](https://github.com/merykitty) | | | 3 | 00:01.904 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_royvanrijn.java)| 21.0.1-graal | [Roy van Rijn](https://github.com/royvanrijn) | | | | 00:02.398 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_ebarlas.java)| 21.0.1-graal | [Elliot Barlas](https://github.com/ebarlas) | | | | 00:02.724 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_obourgain.java)| 21.0.1-open | [Olivier Bourgain](https://github.com/obourgain) | | | | 00:02.771 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_algirdasrascius.java)| 21.0.1-open | [Algirdas Ra__ius](https://github.com/algirdasrascius) | | | | 00:02.842 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_vemana.java)| 21.0.1-graal | [Vemana](https://github.com/vemana) | | | | 00:02.902 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_spullara.java)| 21.0.1-graal | [Sam Pullara](https://github.com/spullara) | | | | 00:02.906 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_artsiomkorzun.java)| 21.0.1-graal | [artsiomkorzun](https://github.com/artsiomkorzun) | | | | 00:02.970 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java)| 21.0.1-open | [Jamie Stansfield](https://github.com/isolgpus) | | * Tune chunksize to get another 2% improvement for 8 processors as used by the evaluation script. * Read int at a time for city name length detection; speeds up by 2% in local testing. * Improve reading temperature double by exiting loop quicker; no major tricks (like reading an int) yet, but good for 5th place on leaderboard in local testing. This small change has caused a surprising gain in performance by about 4%. I didn't expect such a big change, but perhaps in combination with the earlier change to read int by int for the city name, temperature reading is dominating that aspect of the time. Also, perhaps the quicker exit (as soon as you see '.' instead of reading until '\n') means you get to simply skip reading the '\n' across each of the lines. Since the lines are on average like 15 characters, it may be that avoiding reading the \n is a meaningful saving. Or maybe the JIT found a clever optimization for reading the temperature. Or maybe it is simply the case that the number of multiplications is now down to 2 from the previous 3 is what's causing the performance gain? | # | Result (m:s.ms) | Implementation | JDK | Submitter | Notes | |---|-----------------|--------------------|-----|---------------|-----------| | 1 | 00:01.531 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_thomaswue.java)| 21.0.1-graal | [Thomas Wuerthinger](https://github.com/thomaswue) | GraalVM native binary | | 2 | 00:01.794 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_royvanrijn.java)| 21.0.1-graal | [Roy van Rijn](https://github.com/royvanrijn) | | | 3 | 00:01.956 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_merykitty.java)| 21.0.1-open | [Quan Anh Mai](https://github.com/merykitty) | | | | 00:02.346 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_ebarlas.java)| 21.0.1-graal | [Elliot Barlas](https://github.com/ebarlas) | | | | 00:02.673 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_vemana.java)| 21.0.1-graal | [Subrahmanyam](https://github.com/vemana) | | | | 00:02.689 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_obourgain.java)| 21.0.1-open | [Olivier Bourgain](https://github.com/obourgain) | | | | 00:02.785 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_algirdasrascius.java)| 21.0.1-open | [Algirdas Ra__ius](https://github.com/algirdasrascius) | | | | 00:02.926 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java)| 21.0.1-open | [Jamie Stansfield](https://github.com/isolgpus) | | | | 00:02.928 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_artsiomkorzun.java)| 21.0.1-graal | [Artsiom Korzun](https://github.com/artsiomkorzun) | | | | 00:02.932 | [link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_spullara.java)| 21.0.1-graal | [Sam Pullara](https://github.com/spullara) | | * Reduce one multiplication when temperature is +ve. * Added some documentation on the approach. --------- Co-authored-by: vemana --- calculate_average_vemana.sh | 50 ++ github_users.txt | 1 + prepare_vemana.sh | 23 + .../onebrc/CalculateAverage_vemana.java | 692 ++++++++++++++++++ 4 files changed, 766 insertions(+) create mode 100755 calculate_average_vemana.sh create mode 100755 prepare_vemana.sh create mode 100644 src/main/java/dev/morling/onebrc/CalculateAverage_vemana.java diff --git a/calculate_average_vemana.sh b/calculate_average_vemana.sh new file mode 100755 index 0000000..b3437f2 --- /dev/null +++ b/calculate_average_vemana.sh @@ -0,0 +1,50 @@ +#!/bin/bash +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Basics +JAVA_OPTS="" +JAVA_OPTS="$JAVA_OPTS --enable-preview" +#JAVA_OPTS="$JAVA_OPTS --add-modules jdk.incubator.vector" +#JAVA_OPTS="$JAVA_OPTS -XX:+UnlockDiagnosticVMOptions" + +# JIT parameters +#JAVA_OPTS="$JAVA_OPTS -Xlog:class+load=info" +#JAVA_OPTS="$JAVA_OPTS -XX:+LogCompilation" +JAVA_OPTS="$JAVA_OPTS -XX:+AlwaysCompileLoopMethods" +#JAVA_OPTS="$JAVA_OPTS -XX:TieredStopAtLevel=1" +#JAVA_OPTS="$JAVA_OPTS -XX:TieredStopAtLevel=1" +#JAVA_OPTS="$JAVA_OPTS -XX:CompileCommand=inline,*State.processLine()" +#JAVA_OPTS="$JAVA_OPTS -XX:+PrintAssembly" +#JAVA_OPTS="$JAVA_OPTS -XX:LogFile=../hotspot.log" +#JAVA_OPTS="$JAVA_OPTS -XX:+DebugNonSafepoints" +#JAVA_OPTS="$JAVA_OPTS -XX:C1MaxInlineSize=150" +#JAVA_OPTS="$JAVA_OPTS -XX:C1InlineStackLimit=40" + +#JAVA_OPTS="$JAVA_OPTS -XX:FreqInlineSize=500" +#JAVA_OPTS="$JAVA_OPTS -XX:+PrintCompilation" +#JAVA_OPTS="$JAVA_OPTS -XX:+PrintInlining" +#JAVA_OPTS="$JAVA_OPTS -XX:CompileThreshold=20 " +#JAVA_OPTS="$JAVA_OPTS -Xlog:async" + +# GC parameters +JAVA_OPTS="$JAVA_OPTS -XX:+UseParallelGC" + +#JAVA_OPTS="$JAVA_OPTS -Xlog:gc*=debug:file=/tmp/gc.log" +#JAVA_OPTS="$JAVA_OPTS -XX:+UseEpsilonGC -Xlog:all=off" +#JAVA_OPTS="$JAVA_OPTS -XX:+PrintGC -XX:+PrintGCDetails" + +java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_vemana "$@" diff --git a/github_users.txt b/github_users.txt index 5bfc5dc..ef5ef51 100644 --- a/github_users.txt +++ b/github_users.txt @@ -49,3 +49,4 @@ yavuztas;Yavuz Tas yehwankim23;κΉ€μ˜ˆν™˜ Ye-Hwan Kim (Sam) hundredwatt;Jason Nochlin gnmathur;Gaurav Mathur +vemana;Subrahmanyam diff --git a/prepare_vemana.sh b/prepare_vemana.sh new file mode 100755 index 0000000..0099505 --- /dev/null +++ b/prepare_vemana.sh @@ -0,0 +1,23 @@ +#!/bin/bash +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source "$HOME/.sdkman/bin/sdkman-init.sh" +#sdk1 use java 21.0.1-open 1>&2 +sdk use java 21.0.1-graal 1>&2 +#sdk1 use java 21.0.1-zulu 1>&2 +#sdk1 use java 21.0.1-graalce 1>&2 + diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_vemana.java b/src/main/java/dev/morling/onebrc/CalculateAverage_vemana.java new file mode 100644 index 0000000..7673fb5 --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_vemana.java @@ -0,0 +1,692 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteOrder; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel.MapMode; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +/** + * This submission focuses on exploiting the non-SIMD parallelism that is inherent in OOO + * super-scalar CPUs and avoids using Unsafe, SWAR and other such fine techniques. The hope is to + * remain readable for a majority of SWEs. At a high level, the approach relies on a few principles + * listed herein. + * + *

+ * [Exploit Parallelism] Distribute the work into Shards. Separate threads (one per core) process + * Shards and follow it up by merging the results. parallelStream() is appealing but carries + * potential run-time variance (i.e. std. deviation) penalties based on informal testing. Variance + * is not ideal when trying to minimize the maximum worker latency. + * + *

+ * [Use ByteBuffers over MemorySegment] Each Shard is further divided in Chunks. This would've been + * unnecessary except that Shards are too big to be backed by ByteBuffers. Besides, MemorySegment + * appears slower than ByteBuffers. So, to use ByteBuffers, we have to use smaller chunks. + * + *

+ * [Straggler freedom] The optimization function here is to minimize the maximal worker thread + * completion. Law of large number averages means that all the threads will end up with similar + * amounts of work and similar completion times; but, however ever so often there could be a bad + * sharding and more importantly, Cores are not created equal; some will be throttled more than + * others. So, we have a shared {@code LazyShardQueue} that aims to distribute work to minimize the + * latest completion time. + * + *

+ * [Work Assignment with LazyShardQueue] The queue provides each thread with its next big-chunk + * until X% of the work remains. Big-chunks belong to the thread and will not be provided to another + * thread. Then, it switches to providing small-chunk sizes. Small-chunks comprise the last X% of + * work and every thread can participate in completing the chunk. Even though the queue is shared + * across threads, there's no communication across thread during the big-chunk phases. The queue is + * effectively a per-thread queue while processing big-chunks. The small-chunk phase uses an + * AtomicLong to coordinate chunk allocation across threads. + * + *

+ * [Chunk processing] Chunk processing is typical. Process line by line. Find a hash function + * (polynomial hash fns are slow, but will work fine), hash the city name, resolve conflicts using + * linear probing and then accumulate the temperature into the appropriate hash slot. The key + * element then is how fast can you identify the hash slot, read the temperature and update the new + * temperature in the slot (i.e. min, max, count). + * + *

+ * [Cache friendliness] 7502P and my machine (7950X) offer 4MB L3 cache/core. This means we can hope + * to fit all our datastructures in L3 cache. Since SMT is turned on, the Runtime's available + * processors will show twice the number of actual cores and so we get 2MB L3 cache/thread. To be + * safe, we try to stay within 1.8 MB/thread and size our hashtable appropriately. + * + *

+ * [Allocation] Since MemorySegment seemed slower than ByteBuffers, backing Chunks by bytebuffers + * was the logical option. Creating one ByteBuffer per chunk was no bueno because the system doesn't + * like it (JVM runs out of mapped file handle quota). Other than that, allocation in the hot path + * was avoided. + * + *

+ * [General approach to fast hashing and temperature reading] Here, it helps to understand the + * various bottlenecks in execution. One particular thing that I kept coming back to was to + * understand the relative costs of instructions: See + * https://www.agner.org/optimize/instruction_tables.pdf It is helpful to think of hardware as a + * smart parallel execution machine that can do several operations in one cycle if only you can feed + * it. So, the idea is to reduce data-dependency chains in the bottleneck path. The other major idea + * is to just avoid unnecessary work. For example, copying the city name into a byte array just for + * the purpose of looking it up was costing a noticeable amount. Instead, encoding it as + * (bytebuffer, start, len) was helpful. Spotting unnecessary work was non-trivial.So, them pesky + * range checks? see if you can avoid them. For example, sometimes you can eliminate a "nextPos < + * endPos" in a tight loop by breaking it into two pieces: one piece where the check will not be + * needed and a tail piece where it will be needed. + * + *

+ * [Understand What Cores like]. Cores like to go straight and loop back. Despite good branch + * prediction, performance sucks with mispredicted branches. + * + *

+ * [JIT] Java performance requires understanding the JIT. It is helpful to understand what the JIT + * likes though it is still somewhat of a mystery to me. In general, it inlines small methods very + * well and after constant folding, it can optimize quite well across a reasonably deep call chain. + * My experience with the JIT was that everything I tried to tune it made it worse except for one + * parameter. I have a new-found respect for JIT - it likes and understands typical Java idioms. + * + *

[Tuning] Nothing was more insightful than actually playing with various tuning parameters. + * I can have all the theories but the hardware and JIT are giant blackboxes. I used a bunch of + * tools to optimize: (1) Command line parameters to tune big and small chunk sizes etc. This was + * also very helpful in forming a mental model of the JIT. Sometimes, it would compile some methods + * and sometimes it would just run them interpreted since the compilation threshold wouldn't be + * reached for intermediate methods. (2) AsyncProfiler - this was the first line tool to understand + * cache misses and cpu time to figure where to aim the next optimization effort. (3) JitWatch - + * invaluable for forming a mental model and attempting to tune the JIT. + * + *

[Things that didn't work]. This is a looong list and the hit rate is quite low. In general, + * removing unnecessary work had a high hit rate and my pet theories on how things work in hardware + * had a low hit rate. Java Vector API lacked a good gather API to load from arbitrary memory + * addresses; this prevented performing real SIMD on the entire dataset, one where you load 64 bytes + * from 64 different line starting positions (just after a previous line end) and then step through + * one byte at a time. This to me is the most natural SIMD approach to the 1BRC problem but I + * couldn't use it. I tried other local uses of the vector API and it was always slower, and mostly + * much slower. In other words, Java Vector API needs a problem for which it is suited (duh) but, + * unless I am overlooking something, the API still lacks gather from arbitrary memory addresses. + * + *

[My general takeaways]. Write simple, idiomatic java code and get 70-80% of the max + * performance of an optimally hand-tuned code. Focus any optimization efforts on being friendly to + * the JIT *before* thinking about tuning for the hardware. There's a real cost to EXTREME + * performance tuning: a loss of abstraction and maintainability, but being JIT friendly is probably + * much more achievable without sacrificing abstraction. + */ +public class CalculateAverage_vemana { + + public static void checkArg(boolean condition) { + if (!condition) { + throw new IllegalArgumentException(); + } + } + + public static void main(String[] args) throws Exception { + // First process in large chunks without coordination among threads + // Use chunkSizeBits for the large-chunk size + int chunkSizeBits = 20; + + // For the last commonChunkFraction fraction of total work, use smaller chunk sizes + double commonChunkFraction = 0; + + // Use commonChunkSizeBits for the small-chunk size + int commonChunkSizeBits = 18; + + // Size of the hashtable (attempt to fit in L3) + int hashtableSizeBits = 14; + + if (args.length > 0) { + chunkSizeBits = Integer.parseInt(args[0]); + } + + if (args.length > 1) { + commonChunkFraction = Double.parseDouble(args[1]); + } + + if (args.length > 2) { + commonChunkSizeBits = Integer.parseInt(args[2]); + } + + if (args.length > 3) { + hashtableSizeBits = Integer.parseInt(args[3]); + } + + // System.err.println(STR.""" + // Using the following parameters: + // - chunkSizeBits = \{chunkSizeBits} + // - commonChunkFraction = \{commonChunkFraction} + // - commonChunkSizeBits = \{commonChunkSizeBits} + // - hashtableSizeBits = \{hashtableSizeBits} + // """); + + System.out.println(new Runner( + Path.of("measurements.txt"), + chunkSizeBits, + commonChunkFraction, + commonChunkSizeBits, + hashtableSizeBits).getSummaryStatistics()); + } + + public interface LazyShardQueue { + + ByteRange take(int shardIdx); + } + + // Mutable to avoid allocation + public static class ByteRange { + + private static final int BUF_SIZE = 1 << 30; + + private final long fileSize; + private final RandomAccessFile raf; + + // ***************** What this is doing and why ***************** + // Reading from ByteBuffer appears faster from MemorySegment, but ByteBuffer can only be + // Integer.MAX_VALUE long; Creating one byteBuffer per chunk kills native memory quota + // and JVM crashes without futher parameters. + // + // So, in this solution, create a sliding window of bytebuffers: + // - Create a large bytebuffer that spans the chunk + // - If the next chunk falls outside the byteBuffer, create another byteBuffer that spans the + // chunk. Because chunks are allocated serially, a single large (1<<30) byteBuffer spans + // many successive chunks. + // - In fact, for serial chunk allocation (which is friendly to page faulting anyway), + // the number of created ByteBuffers doesn't exceed [size of shard/(1<<30)] which is less than + // 100/thread and is comfortably below what the JVM can handle (65K) without further param + // tuning + // - This enables (relatively) allocation free chunking implementation. Our chunking impl uses + // fine grained chunking for the last say X% of work to avoid being hostage to stragglers + + // The PUBLIC API + public MappedByteBuffer byteBuffer; + public int endInBuf; // where the chunk ends inside the buffer + public int startInBuf; // where the chunk starts inside the buffer + // Private State + private long bufferEnd; // byteBuffer's ending coordinate + private long bufferStart; // byteBuffer's begin coordinate + + // Uninitialized; for mutability + public ByteRange(RandomAccessFile raf) { + this.raf = raf; + try { + this.fileSize = raf.length(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + bufferEnd = bufferStart = -1; + } + + public void setRange(long rangeStart, long rangeEnd) { + if (rangeEnd + 1024 > bufferEnd || rangeStart < bufferStart) { + bufferStart = rangeStart; + bufferEnd = Math.min(bufferStart + BUF_SIZE, fileSize); + setByteBufferToRange(bufferStart, bufferEnd); + } + + if (rangeStart > 0) { + rangeStart = 1 + nextNewLine(rangeStart); + } + + if (rangeEnd < fileSize) { + rangeEnd = 1 + nextNewLine(rangeEnd); + } + else { + rangeEnd = fileSize; + } + + startInBuf = (int) (rangeStart - bufferStart); + endInBuf = (int) (rangeEnd - bufferStart); + } + + @Override + public String toString() { + return STR.""" + ByteRange { + startInBuf = \{startInBuf} + endInBuf = \{endInBuf} + } + """; + } + + private long nextNewLine(long pos) { + int nextPos = (int) (pos - bufferStart); + while (byteBuffer.get(nextPos) != '\n') { + nextPos++; + } + return nextPos + bufferStart; + } + + private void setByteBufferToRange(long start, long end) { + try { + byteBuffer = raf.getChannel().map(MapMode.READ_ONLY, start, end - start); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + public record Result(Map tempStats) { + + @Override + public String toString() { + return this.tempStats() + .entrySet() + .stream() + .sorted(Entry.comparingByKey()) + .map(entry -> "%s=%s".formatted(entry.getKey(), entry.getValue())) + .collect(Collectors.joining(", ", "{", "}")); + } + } + + public static class Runner { + + private final double commonChunkFraction; + private final int commonChunkSizeBits; + private final int hashtableSizeBits; + private final Path inputFile; + private final int shardSizeBits; + + public Runner( + Path inputFile, int chunkSizeBits, double commonChunkFraction, int commonChunkSizeBits, + int hashtableSizeBits) { + this.inputFile = inputFile; + this.shardSizeBits = chunkSizeBits; + this.commonChunkFraction = commonChunkFraction; + this.commonChunkSizeBits = commonChunkSizeBits; + this.hashtableSizeBits = hashtableSizeBits; + } + + Result getSummaryStatistics() throws Exception { + int processors = Runtime.getRuntime().availableProcessors(); + LazyShardQueue shardQueue = new SerialLazyShardQueue( + 1L << shardSizeBits, + inputFile, + processors, + commonChunkFraction, + commonChunkSizeBits); + + List> results = new ArrayList<>(); + ExecutorService executorService = Executors.newFixedThreadPool( + processors, + runnable -> { + Thread thread = new Thread(runnable); + thread.setDaemon(true); + return thread; + }); + + long[] finishTimes = new long[processors]; + + for (int i = 0; i < processors; i++) { + final int I = i; + final Callable callable = () -> { + Result result = new ShardProcessor(shardQueue, hashtableSizeBits, I).processShard(); + finishTimes[I] = System.nanoTime(); + return result; + }; + results.add(executorService.submit(callable)); + } + // printFinishTimes(finishTimes); + return executorService.submit(() -> merge(results)).get(); + } + + private Result merge(List> results) + throws ExecutionException, InterruptedException { + Map output = null; + boolean[] isDone = new boolean[results.size()]; + int remaining = results.size(); + // Let's be naughty and spin in a busy loop + while (remaining > 0) { + for (int i = 0; i < results.size(); i++) { + if (!isDone[i] && results.get(i).isDone()) { + isDone[i] = true; + remaining--; + if (output == null) { + output = new TreeMap<>(results.get(i).get().tempStats()); + } + else { + for (Entry entry : results.get(i).get().tempStats().entrySet()) { + output.compute( + entry.getKey(), + (key, value) -> value == null ? entry.getValue() + : Stat.merge(value, entry.getValue())); + } + } + } + } + } + return new Result(output); + } + + private void printFinishTimes(long[] finishTimes) { + Arrays.sort(finishTimes); + int n = finishTimes.length; + System.err.println(STR."Finish Delta: \{(finishTimes[n - 1] - finishTimes[0]) / 1_000_000}ms"); + } + } + + public static class SerialLazyShardQueue implements LazyShardQueue { + + private static long roundToNearestHigherMultipleOf(long divisor, long value) { + return (value + divisor - 1) / divisor * divisor; + } + + private final ByteRange[] byteRanges; + private final long chunkSize; + private final long commonChunkSize; + private final AtomicLong commonPool; + private final long fileSize; + private final long[] nextStarts; + + public SerialLazyShardQueue( + long chunkSize, Path filePath, int shards, double commonChunkFraction, + int commonChunkSizeBits) + throws IOException { + checkArg(commonChunkFraction < 0.9 && commonChunkFraction >= 0); + var raf = new RandomAccessFile(filePath.toFile(), "r"); + this.fileSize = raf.length(); + + // Common pool + long commonPoolStart = Math.min( + roundToNearestHigherMultipleOf(chunkSize, (long) (fileSize * (1 - commonChunkFraction))), + fileSize); + this.commonPool = new AtomicLong(commonPoolStart); + this.commonChunkSize = 1L << commonChunkSizeBits; + + // Distribute chunks to shards + this.nextStarts = new long[shards << 4]; // thread idx -> 16*idx to avoid cache line conflict + for (long i = 0, currentStart = 0, remainingChunks = (commonPoolStart + chunkSize - 1) / chunkSize; i < shards; i++) { + long remainingShards = shards - i; + long currentChunks = (remainingChunks + remainingShards - 1) / remainingShards; + // Shard i handles: [currentStart, currentStart + currentChunks * chunkSize) + int pos = (int) i << 4; + nextStarts[pos] = currentStart; + nextStarts[pos + 1] = currentStart + currentChunks * chunkSize; + currentStart += currentChunks * chunkSize; + remainingChunks -= currentChunks; + } + this.chunkSize = chunkSize; + + this.byteRanges = new ByteRange[shards << 4]; + for (int i = 0; i < shards; i++) { + byteRanges[i << 4] = new ByteRange(raf); + } + } + + @Override + public ByteRange take(int idx) { + // Try for thread local range + final int pos = idx << 4; + long rangeStart = nextStarts[pos]; + final long chunkEnd = nextStarts[pos + 1]; + + final long rangeEnd; + + if (rangeStart < chunkEnd) { + rangeEnd = rangeStart + chunkSize; + nextStarts[pos] = rangeEnd; + } + else { + rangeStart = commonPool.getAndAdd(commonChunkSize); + // If that's exhausted too, nothing remains! + if (rangeStart >= fileSize) { + return null; + } + rangeEnd = rangeStart + commonChunkSize; + } + + ByteRange chunk = byteRanges[pos]; + chunk.setRange(rangeStart, rangeEnd); + return chunk; + } + } + + public static class ShardProcessor { + + private final LazyShardQueue shardQueue; + private final ShardProcessorState state; + private final int threadIdx; + + public ShardProcessor(LazyShardQueue shardQueue, int hashtableSizeBits, int threadIdx) { + this.shardQueue = shardQueue; + this.threadIdx = threadIdx; + this.state = new ShardProcessorState(hashtableSizeBits); + } + + public Result processShard() { + ByteRange range; + while ((range = shardQueue.take(threadIdx)) != null) { + processRange(range); + } + return result(); + } + + private void processRange(ByteRange range) { + MappedByteBuffer mmb = range.byteBuffer; + int nextPos = range.startInBuf; + int end = range.endInBuf; + + while (nextPos < end) { + nextPos = state.processLine(mmb, nextPos); + } + } + + private Result result() { + return state.result(); + } + } + + public static class ShardProcessorState { + + private static final ByteOrder NATIVE_BYTE_ORDER = ByteOrder.nativeOrder(); + + private final byte[][] cityNames; + private final int slotsMask; + private final Stat[] stats; + + public ShardProcessorState(int slotsBits) { + this.stats = new Stat[1 << slotsBits]; + this.cityNames = new byte[1 << slotsBits][]; + this.slotsMask = (1 << slotsBits) - 1; + } + + public int processLine(MappedByteBuffer mmb, int nextPos) { + int originalPos = nextPos; + byte nextByte; + int hash = 0; + + while (true) { + int x = mmb.getInt(nextPos); + if (NATIVE_BYTE_ORDER == ByteOrder.BIG_ENDIAN) { + x = Integer.reverseBytes(x); + } + + byte a = (byte) (x >>> 24); + if (a == ';') { + nextPos += 1; + break; + } + + byte b = (byte) (x >>> 16); + if (b == ';') { + nextPos += 2; + hash = hash * 31 + ((0xFF000000 & x)); + break; + } + + byte c = (byte) (x >>> 8); + if (c == ';') { + nextPos += 3; + hash = hash * 31 + ((0xFFFF0000 & x)); + break; + } + + byte d = (byte) (x >>> 0); + if (d == ';') { + nextPos += 4; + hash = hash * 31 + ((0xFFFFFF00 & x)); + break; + } + + hash = hash * 31 + x; + nextPos += 4; + } + int cityLen = nextPos - 1 - originalPos; + + // Read temperature + int temperature = 0; + boolean negative = (nextByte = mmb.get(nextPos++)) == '-'; + if (!negative) { + temperature = nextByte - '0'; + } + else { + temperature = mmb.get(nextPos++) - '0'; + } + + while (true) { + nextByte = mmb.get(nextPos++); + if (nextByte != '.') { + temperature = temperature * 10 + (nextByte - '0'); + } + else { + temperature = temperature * 10 + (mmb.get(nextPos++) - '0'); + nextPos++; + break; + } + } + + linearProbe( + cityLen, + hash & slotsMask, + negative ? -temperature : temperature, + mmb, + originalPos); + + return nextPos; + } + + public Result result() { + int N = stats.length; + TreeMap map = new TreeMap<>(); + for (int i = 0; i < N; i++) { + if (stats[i] != null) { + map.put(new String(cityNames[i]), stats[i]); + } + } + return new Result(map); + } + + private byte[] copyFrom(MappedByteBuffer mmb, int offsetInMmb, int len) { + byte[] out = new byte[len]; + for (int i = 0; i < len; i++) { + out[i] = mmb.get(offsetInMmb + i); + } + return out; + } + + private boolean equals(byte[] left, MappedByteBuffer right, int offsetInMmb, int len) { + for (int i = 0; i < len; i++) { + if (left[i] != right.get(offsetInMmb + i)) { + return false; + } + } + return true; + } + + private void linearProbe(int len, int hash, int temp, MappedByteBuffer mmb, int offsetInMmb) { + for (int i = hash;; i = (i + 1) & slotsMask) { + var curBytes = cityNames[i]; + if (curBytes == null) { + cityNames[i] = copyFrom(mmb, offsetInMmb, len); + stats[i] = Stat.firstReading(temp); + return; + } + else { + // Overall, this tradeoff seems better than Arrays.equals(..) + // City name param is encoded as (mmb, offsetnInMmb, len) + // This avoids copying it into a (previously allocated) byte[] + // The downside is that we have to manually implement 'equals' and it can lose out + // to vectorized 'equals'; but the trade off seems to work in this particular case + if (len == curBytes.length && equals(curBytes, mmb, offsetInMmb, len)) { + stats[i].mergeReading(temp); + return; + } + } + } + } + } + + public static class Stat { + + public static Stat firstReading(int temp) { + return new Stat(temp, temp, temp, 1); + } + + public static Stat merge(Stat left, Stat right) { + return new Stat( + Math.min(left.min, right.min), + Math.max(left.max, right.max), + left.sum + right.sum, + left.count + right.count); + } + + private long count, sum; + private int min, max; + + public Stat(int min, int max, long sum, long count) { + this.min = min; + this.max = max; + this.sum = sum; + this.count = count; + } + + // Caution: Mutates + public void mergeReading(int curTemp) { + // Can this be improved furhter? + // Assuming random values for curTemp, + // min (&max) gets updated roughly log(N)/N fraction of the time (a small number) + // In the worst case, there will be at-most one branch misprediction. + if (curTemp > min) { // Mostly passes. On branch misprediction, just update min. + if (curTemp > max) { // Mostly fails. On branch misprediction, just update max. + max = curTemp; + } + } + else { + min = curTemp; + } + sum += curTemp; + count++; + } + + @Override + public String toString() { + return "%.1f/%.1f/%.1f".formatted(min / 10.0, sum / 10.0 / count, max / 10.0); + } + } +}