Submission #5 [No bitwise tricks nor Unsafe yet; 13th place on leaderboard in local testing using evaluate2.sh] (#209)

* Linear probe for city indexing. Beats current leader spullara 2.2 vs 3.8 elapsed time.

* Straightforward impl using bytebuffers. Turns out memorysegments were slower than used mappedbytebuffers.

* A initial submit-worthy entry

Comparison to select entries (averaged over 3 runs)
* spullara 1.66s [5th on leaderboard currently]
* vemana (this submission) 1.65s
* artsiomkorzun 1.64s [4th on leaderboard currently]

Tests: PASS
Impl Class: dev.morling.onebrc.CalculateAverage_vemana

Machine specs
* 16 core Ryzen 7950X
* 128GB RAM

Description
* Decompose the full file into Shards of memory mapped files and process
  each independently, outputting a TreeMap: City -> Statistics
* Compose the final answer by merging the individual TreeMap outputs
* Select 1 Thread per available processor as reported by the JVM
* Size to fit all datastructure in 0.5x L3 cache (4MB/core on the
  evaluation machines)
* Use linear probing hash table, with identity of city name = byte[] and
  hash code computed inline
* Avoid all allocation in the hot path and instead use method
  parameters. So, instead of passing a single Object param called Point(x, y, z),
  pass 3 parameters for each of its components. It is ugly, but this
  challenge is so far from Java's idioms anyway
* G1GC seems to want to interfere; use ParallelGC instead (just a quick
  and dirty hack)

Things tried that did not work
* MemorySegments are actually slower than MappedByteBuffers
* Trying to inline everything: not needed; the JIT compiler is pretty
  good
* Playing with JIT compiler flags didn't yield clear wins. In
  particular, was surprised that using a max level of 3 and reducing
  compilation threshold did nothing.. when the jit logs print that none
  of the methods reach level 4 and stay there for long
* Hand-coded implementation of Array.equals(..) using
  readLong(..) & bitmask_based_on_length from a bytebuffer instead of byte by byte

* Further tuning to compile loop methods: timings are now consistenctly ahead of artsiomkorzun in 4th place.

There are methods on the data path that were being interpreted for far
too long. For example, the method that takes a byte range and simply
calls one method per line was taking a disproportionate amount of time.
Using `-XX:+AlwaysCompileLoopMethods` option improved completion time by 4%.

============= vemana ===============
[20:55:22] [lsv@vemana]$ for i in 1 2 3 4 5; do ./runTheir.sh vemana;
done;

Using java version 21.0.1-graal in this shell.

real    0m1.581s
user    0m34.166s
sys     0m1.435s

Using java version 21.0.1-graal in this shell.

real    0m1.593s
user    0m34.629s
sys     0m1.470s

Using java version 21.0.1-graal in this shell.

real    0m1.632s
user    0m35.893s
sys     0m1.340s

Using java version 21.0.1-graal in this shell.

real    0m1.596s
user    0m33.074s
sys     0m1.386s

Using java version 21.0.1-graal in this shell.

real    0m1.611s
user    0m35.516s
sys     0m1.438s

============= artsiomkorzun ===============
[20:56:12] [lsv@vemana]$ for i in 1 2 3 4 5; do ./runTheir.sh
artsiomkorzun; done;

Using java version 21.0.1-graal in this shell.

real    0m1.669s
user    0m38.043s
sys     0m1.287s

Using java version 21.0.1-graal in this shell.

real    0m1.679s
user    0m37.840s
sys     0m1.400s

Using java version 21.0.1-graal in this shell.

real    0m1.657s
user    0m37.607s
sys     0m1.298s

Using java version 21.0.1-graal in this shell.

real    0m1.643s
user    0m36.852s
sys     0m1.392s

Using java version 21.0.1-graal in this shell.

real    0m1.644s
user    0m36.951s
sys     0m1.279s

============= spullara ===============
[20:57:55] [lsv@vemana]$ for i in 1 2 3 4 5; do ./runTheir.sh spullara;
done;

Using java version 21.0.1-graal in this shell.

real    0m1.676s
user    0m37.404s
sys     0m1.386s

Using java version 21.0.1-graal in this shell.

real    0m1.652s
user    0m36.509s
sys     0m1.486s

Using java version 21.0.1-graal in this shell.

real    0m1.665s
user    0m36.451s
sys     0m1.506s

Using java version 21.0.1-graal in this shell.

real    0m1.671s
user    0m36.917s
sys     0m1.371s

Using java version 21.0.1-graal in this shell.

real    0m1.634s
user    0m35.624s
sys     0m1.573s

========================== Running Tests ======================

[21:17:57] [lsv@vemana]$ ./runTests.sh vemana
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-10000-unique-keys.txt

Using java version 21.0.1-graal in this shell.

real    0m0.150s
user    0m1.035s
sys     0m0.117s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-10.txt

Using java version 21.0.1-graal in this shell.

real    0m0.114s
user    0m0.789s
sys     0m0.116s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-1.txt

Using java version 21.0.1-graal in this shell.

real    0m0.115s
user    0m0.948s
sys     0m0.075s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-20.txt

Using java version 21.0.1-graal in this shell.

real    0m0.113s
user    0m0.926s
sys     0m0.066s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-2.txt

Using java version 21.0.1-graal in this shell.

real    0m0.110s
user    0m0.734s
sys     0m0.078s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-3.txt

Using java version 21.0.1-graal in this shell.

real    0m0.114s
user    0m0.870s
sys     0m0.095s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-boundaries.txt

Using java version 21.0.1-graal in this shell.

real    0m0.113s
user    0m0.843s
sys     0m0.084s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-complex-utf8.txt

Using java version 21.0.1-graal in this shell.

real    0m0.121s
user    0m0.852s
sys     0m0.171s

* Improve by a few % more; now, convincingly faster than 6th place
submission. So far, only algorithms and tuning; no bitwise tricks yet.

Improve chunking implementation to avoid allocation and allow
finegrained chunking for the last X% of work. Work now proceeds in two
stages: big chunk stage and small chunk stage. This is to avoid
straggler threads holding up result merging.

Tests pass

[07:14:49] [lsv@vemana]$ ./test.sh vemana
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-10000-unique-keys.txt

Using java version 21.0.1-graal in this shell.

real    0m0.152s
user    0m0.973s
sys     0m0.107s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-10.txt

Using java version 21.0.1-graal in this shell.

real    0m0.113s
user    0m0.840s
sys     0m0.060s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-1.txt

Using java version 21.0.1-graal in this shell.

real    0m0.107s
user    0m0.681s
sys     0m0.085s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-20.txt

Using java version 21.0.1-graal in this shell.

real    0m0.105s
user    0m0.894s
sys     0m0.068s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-2.txt

Using java version 21.0.1-graal in this shell.

real    0m0.099s
user    0m0.895s
sys     0m0.068s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-3.txt

Using java version 21.0.1-graal in this shell.

real    0m0.098s
user    0m0.813s
sys     0m0.050s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-boundaries.txt

Using java version 21.0.1-graal in this shell.

real    0m0.095s
user    0m0.777s
sys     0m0.087s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-complex-utf8.txt

Using java version 21.0.1-graal in this shell.

real    0m0.112s
user    0m0.904s
sys     0m0.069s

* Merge results from finished threads instead of waiting for all threads
to finish.

Not a huge difference overall but no reason to wait.

Also experiment with a few other compiler flags and attempt to use
jitwatch to understand what the jit is doing.

* Move to prepare_*.sh format and run evaluate2.sh locally.

Shows 7th place in leaderboard

| # | Result (m:s.ms) | Implementation     | JDK | Submitter     | Notes
|
|---|-----------------|--------------------|-----|---------------|-----------|
| 1 | 00:01.588 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_thomaswue.java)|
21.0.1-graal | [Thomas Wuerthinger](https://github.com/thomaswue) |
   GraalVM native binary |
| 2 | 00:01.866 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_merykitty.java)|
21.0.1-open | [Quan Anh Mai](https://github.com/merykitty) |  |
| 3 | 00:01.904 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_royvanrijn.java)|
21.0.1-graal | [Roy van Rijn](https://github.com/royvanrijn) |  |
|   | 00:02.398 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_ebarlas.java)|
21.0.1-graal | [Elliot Barlas](https://github.com/ebarlas) |  |
|   | 00:02.724 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_obourgain.java)|
21.0.1-open | [Olivier Bourgain](https://github.com/obourgain) |  |
|   | 00:02.771 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_algirdasrascius.java)|
21.0.1-open | [Algirdas Ra__ius](https://github.com/algirdasrascius) |
   |
|   | 00:02.842 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_vemana.java)|
21.0.1-graal | [Vemana](https://github.com/vemana) |  |
|   | 00:02.902 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_spullara.java)|
21.0.1-graal | [Sam Pullara](https://github.com/spullara) |  |
|   | 00:02.906 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_artsiomkorzun.java)|
21.0.1-graal | [artsiomkorzun](https://github.com/artsiomkorzun) |  |
|   | 00:02.970 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java)|
21.0.1-open | [Jamie Stansfield](https://github.com/isolgpus) |  |

* Tune chunksize to get another 2% improvement for 8 processors as used by
the evaluation script.

* Read int at a time for city name length detection; speeds up by 2% in local testing.

* Improve reading temperature double by exiting loop quicker; no major
tricks (like reading an int) yet, but good for 5th place on leaderboard in local testing.

This small change has caused a surprising gain in performance by about 4%.
I didn't expect such a big change, but perhaps in combination with the
earlier change to read int by int for the city name, temperature reading
is dominating that aspect of the time. Also, perhaps the quicker exit
(as soon as you see '.' instead of reading until '\n') means you get to
simply skip reading the '\n' across each of the lines. Since the lines
are on average like 15 characters, it may be that avoiding reading the \n
is a meaningful saving. Or maybe the JIT found a clever optimization for
reading the temperature.

Or maybe it is simply the case that the number of multiplications is now
down to 2 from the previous 3 is what's causing the performance gain?

| # | Result (m:s.ms) | Implementation     | JDK | Submitter     | Notes
|
|---|-----------------|--------------------|-----|---------------|-----------|
| 1 | 00:01.531 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_thomaswue.java)|
21.0.1-graal | [Thomas Wuerthinger](https://github.com/thomaswue) |
   GraalVM native binary |
| 2 | 00:01.794 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_royvanrijn.java)|
21.0.1-graal | [Roy van Rijn](https://github.com/royvanrijn) |  |
| 3 | 00:01.956 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_merykitty.java)|
21.0.1-open | [Quan Anh Mai](https://github.com/merykitty) |  |
|   | 00:02.346 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_ebarlas.java)|
21.0.1-graal | [Elliot Barlas](https://github.com/ebarlas) |  |
|   | 00:02.673 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_vemana.java)|
21.0.1-graal | [Subrahmanyam](https://github.com/vemana) |  |
|   | 00:02.689 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_obourgain.java)|
21.0.1-open | [Olivier Bourgain](https://github.com/obourgain) |  |
|   | 00:02.785 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_algirdasrascius.java)|
21.0.1-open | [Algirdas Ra__ius](https://github.com/algirdasrascius) |
   |
|   | 00:02.926 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java)|
21.0.1-open | [Jamie Stansfield](https://github.com/isolgpus) |  |
|   | 00:02.928 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_artsiomkorzun.java)|
21.0.1-graal | [Artsiom Korzun](https://github.com/artsiomkorzun) |  |
|   | 00:02.932 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_spullara.java)|
21.0.1-graal | [Sam Pullara](https://github.com/spullara) |  |

* Reduce one multiplication when temperature is +ve.

* Linear probe for city indexing. Beats current leader spullara 2.2 vs 3.8 elapsed time.

* Straightforward impl using bytebuffers. Turns out memorysegments were slower than used mappedbytebuffers.

* A initial submit-worthy entry

Comparison to select entries (averaged over 3 runs)
* spullara 1.66s [5th on leaderboard currently]
* vemana (this submission) 1.65s
* artsiomkorzun 1.64s [4th on leaderboard currently]

Tests: PASS
Impl Class: dev.morling.onebrc.CalculateAverage_vemana

Machine specs
* 16 core Ryzen 7950X
* 128GB RAM

Description
* Decompose the full file into Shards of memory mapped files and process
  each independently, outputting a TreeMap: City -> Statistics
* Compose the final answer by merging the individual TreeMap outputs
* Select 1 Thread per available processor as reported by the JVM
* Size to fit all datastructure in 0.5x L3 cache (4MB/core on the
  evaluation machines)
* Use linear probing hash table, with identity of city name = byte[] and
  hash code computed inline
* Avoid all allocation in the hot path and instead use method
  parameters. So, instead of passing a single Object param called Point(x, y, z),
  pass 3 parameters for each of its components. It is ugly, but this
  challenge is so far from Java's idioms anyway
* G1GC seems to want to interfere; use ParallelGC instead (just a quick
  and dirty hack)

Things tried that did not work
* MemorySegments are actually slower than MappedByteBuffers
* Trying to inline everything: not needed; the JIT compiler is pretty
  good
* Playing with JIT compiler flags didn't yield clear wins. In
  particular, was surprised that using a max level of 3 and reducing
  compilation threshold did nothing.. when the jit logs print that none
  of the methods reach level 4 and stay there for long
* Hand-coded implementation of Array.equals(..) using
  readLong(..) & bitmask_based_on_length from a bytebuffer instead of byte by byte

* Further tuning to compile loop methods: timings are now consistenctly ahead of artsiomkorzun in 4th place.

There are methods on the data path that were being interpreted for far
too long. For example, the method that takes a byte range and simply
calls one method per line was taking a disproportionate amount of time.
Using `-XX:+AlwaysCompileLoopMethods` option improved completion time by 4%.

============= vemana ===============
[20:55:22] [lsv@vemana]$ for i in 1 2 3 4 5; do ./runTheir.sh vemana;
done;

Using java version 21.0.1-graal in this shell.

real    0m1.581s
user    0m34.166s
sys     0m1.435s

Using java version 21.0.1-graal in this shell.

real    0m1.593s
user    0m34.629s
sys     0m1.470s

Using java version 21.0.1-graal in this shell.

real    0m1.632s
user    0m35.893s
sys     0m1.340s

Using java version 21.0.1-graal in this shell.

real    0m1.596s
user    0m33.074s
sys     0m1.386s

Using java version 21.0.1-graal in this shell.

real    0m1.611s
user    0m35.516s
sys     0m1.438s

============= artsiomkorzun ===============
[20:56:12] [lsv@vemana]$ for i in 1 2 3 4 5; do ./runTheir.sh
artsiomkorzun; done;

Using java version 21.0.1-graal in this shell.

real    0m1.669s
user    0m38.043s
sys     0m1.287s

Using java version 21.0.1-graal in this shell.

real    0m1.679s
user    0m37.840s
sys     0m1.400s

Using java version 21.0.1-graal in this shell.

real    0m1.657s
user    0m37.607s
sys     0m1.298s

Using java version 21.0.1-graal in this shell.

real    0m1.643s
user    0m36.852s
sys     0m1.392s

Using java version 21.0.1-graal in this shell.

real    0m1.644s
user    0m36.951s
sys     0m1.279s

============= spullara ===============
[20:57:55] [lsv@vemana]$ for i in 1 2 3 4 5; do ./runTheir.sh spullara;
done;

Using java version 21.0.1-graal in this shell.

real    0m1.676s
user    0m37.404s
sys     0m1.386s

Using java version 21.0.1-graal in this shell.

real    0m1.652s
user    0m36.509s
sys     0m1.486s

Using java version 21.0.1-graal in this shell.

real    0m1.665s
user    0m36.451s
sys     0m1.506s

Using java version 21.0.1-graal in this shell.

real    0m1.671s
user    0m36.917s
sys     0m1.371s

Using java version 21.0.1-graal in this shell.

real    0m1.634s
user    0m35.624s
sys     0m1.573s

========================== Running Tests ======================

[21:17:57] [lsv@vemana]$ ./runTests.sh vemana
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-10000-unique-keys.txt

Using java version 21.0.1-graal in this shell.

real    0m0.150s
user    0m1.035s
sys     0m0.117s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-10.txt

Using java version 21.0.1-graal in this shell.

real    0m0.114s
user    0m0.789s
sys     0m0.116s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-1.txt

Using java version 21.0.1-graal in this shell.

real    0m0.115s
user    0m0.948s
sys     0m0.075s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-20.txt

Using java version 21.0.1-graal in this shell.

real    0m0.113s
user    0m0.926s
sys     0m0.066s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-2.txt

Using java version 21.0.1-graal in this shell.

real    0m0.110s
user    0m0.734s
sys     0m0.078s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-3.txt

Using java version 21.0.1-graal in this shell.

real    0m0.114s
user    0m0.870s
sys     0m0.095s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-boundaries.txt

Using java version 21.0.1-graal in this shell.

real    0m0.113s
user    0m0.843s
sys     0m0.084s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-complex-utf8.txt

Using java version 21.0.1-graal in this shell.

real    0m0.121s
user    0m0.852s
sys     0m0.171s

* Improve by a few % more; now, convincingly faster than 6th place
submission. So far, only algorithms and tuning; no bitwise tricks yet.

Improve chunking implementation to avoid allocation and allow
finegrained chunking for the last X% of work. Work now proceeds in two
stages: big chunk stage and small chunk stage. This is to avoid
straggler threads holding up result merging.

Tests pass

[07:14:49] [lsv@vemana]$ ./test.sh vemana
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-10000-unique-keys.txt

Using java version 21.0.1-graal in this shell.

real    0m0.152s
user    0m0.973s
sys     0m0.107s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-10.txt

Using java version 21.0.1-graal in this shell.

real    0m0.113s
user    0m0.840s
sys     0m0.060s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-1.txt

Using java version 21.0.1-graal in this shell.

real    0m0.107s
user    0m0.681s
sys     0m0.085s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-20.txt

Using java version 21.0.1-graal in this shell.

real    0m0.105s
user    0m0.894s
sys     0m0.068s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-2.txt

Using java version 21.0.1-graal in this shell.

real    0m0.099s
user    0m0.895s
sys     0m0.068s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-3.txt

Using java version 21.0.1-graal in this shell.

real    0m0.098s
user    0m0.813s
sys     0m0.050s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-boundaries.txt

Using java version 21.0.1-graal in this shell.

real    0m0.095s
user    0m0.777s
sys     0m0.087s
Validating calculate_average_vemana.sh --
src/test/resources/samples/measurements-complex-utf8.txt

Using java version 21.0.1-graal in this shell.

real    0m0.112s
user    0m0.904s
sys     0m0.069s

* Merge results from finished threads instead of waiting for all threads
to finish.

Not a huge difference overall but no reason to wait.

Also experiment with a few other compiler flags and attempt to use
jitwatch to understand what the jit is doing.

* Move to prepare_*.sh format and run evaluate2.sh locally.

Shows 7th place in leaderboard

| # | Result (m:s.ms) | Implementation     | JDK | Submitter     | Notes
|
|---|-----------------|--------------------|-----|---------------|-----------|
| 1 | 00:01.588 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_thomaswue.java)|
21.0.1-graal | [Thomas Wuerthinger](https://github.com/thomaswue) |
   GraalVM native binary |
| 2 | 00:01.866 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_merykitty.java)|
21.0.1-open | [Quan Anh Mai](https://github.com/merykitty) |  |
| 3 | 00:01.904 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_royvanrijn.java)|
21.0.1-graal | [Roy van Rijn](https://github.com/royvanrijn) |  |
|   | 00:02.398 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_ebarlas.java)|
21.0.1-graal | [Elliot Barlas](https://github.com/ebarlas) |  |
|   | 00:02.724 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_obourgain.java)|
21.0.1-open | [Olivier Bourgain](https://github.com/obourgain) |  |
|   | 00:02.771 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_algirdasrascius.java)|
21.0.1-open | [Algirdas Ra__ius](https://github.com/algirdasrascius) |
   |
|   | 00:02.842 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_vemana.java)|
21.0.1-graal | [Vemana](https://github.com/vemana) |  |
|   | 00:02.902 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_spullara.java)|
21.0.1-graal | [Sam Pullara](https://github.com/spullara) |  |
|   | 00:02.906 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_artsiomkorzun.java)|
21.0.1-graal | [artsiomkorzun](https://github.com/artsiomkorzun) |  |
|   | 00:02.970 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java)|
21.0.1-open | [Jamie Stansfield](https://github.com/isolgpus) |  |

* Tune chunksize to get another 2% improvement for 8 processors as used by
the evaluation script.

* Read int at a time for city name length detection; speeds up by 2% in local testing.

* Improve reading temperature double by exiting loop quicker; no major
tricks (like reading an int) yet, but good for 5th place on leaderboard in local testing.

This small change has caused a surprising gain in performance by about 4%.
I didn't expect such a big change, but perhaps in combination with the
earlier change to read int by int for the city name, temperature reading
is dominating that aspect of the time. Also, perhaps the quicker exit
(as soon as you see '.' instead of reading until '\n') means you get to
simply skip reading the '\n' across each of the lines. Since the lines
are on average like 15 characters, it may be that avoiding reading the \n
is a meaningful saving. Or maybe the JIT found a clever optimization for
reading the temperature.

Or maybe it is simply the case that the number of multiplications is now
down to 2 from the previous 3 is what's causing the performance gain?

| # | Result (m:s.ms) | Implementation     | JDK | Submitter     | Notes
|
|---|-----------------|--------------------|-----|---------------|-----------|
| 1 | 00:01.531 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_thomaswue.java)|
21.0.1-graal | [Thomas Wuerthinger](https://github.com/thomaswue) |
   GraalVM native binary |
| 2 | 00:01.794 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_royvanrijn.java)|
21.0.1-graal | [Roy van Rijn](https://github.com/royvanrijn) |  |
| 3 | 00:01.956 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_merykitty.java)|
21.0.1-open | [Quan Anh Mai](https://github.com/merykitty) |  |
|   | 00:02.346 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_ebarlas.java)|
21.0.1-graal | [Elliot Barlas](https://github.com/ebarlas) |  |
|   | 00:02.673 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_vemana.java)|
21.0.1-graal | [Subrahmanyam](https://github.com/vemana) |  |
|   | 00:02.689 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_obourgain.java)|
21.0.1-open | [Olivier Bourgain](https://github.com/obourgain) |  |
|   | 00:02.785 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_algirdasrascius.java)|
21.0.1-open | [Algirdas Ra__ius](https://github.com/algirdasrascius) |
   |
|   | 00:02.926 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_isolgpus.java)|
21.0.1-open | [Jamie Stansfield](https://github.com/isolgpus) |  |
|   | 00:02.928 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_artsiomkorzun.java)|
21.0.1-graal | [Artsiom Korzun](https://github.com/artsiomkorzun) |  |
|   | 00:02.932 |
[link](https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_spullara.java)|
21.0.1-graal | [Sam Pullara](https://github.com/spullara) |  |

* Reduce one multiplication when temperature is +ve.

* Added some documentation on the approach.

---------

Co-authored-by: vemana <vemana.github@gmail.com>
This commit is contained in:
Vemana 2024-01-14 18:37:03 +05:30 committed by GitHub
parent b56f2205c2
commit f11fad2a68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 766 additions and 0 deletions

50
calculate_average_vemana.sh Executable file
View File

@ -0,0 +1,50 @@
#!/bin/bash
#
# Copyright 2023 The original authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Basics
JAVA_OPTS=""
JAVA_OPTS="$JAVA_OPTS --enable-preview"
#JAVA_OPTS="$JAVA_OPTS --add-modules jdk.incubator.vector"
#JAVA_OPTS="$JAVA_OPTS -XX:+UnlockDiagnosticVMOptions"
# JIT parameters
#JAVA_OPTS="$JAVA_OPTS -Xlog:class+load=info"
#JAVA_OPTS="$JAVA_OPTS -XX:+LogCompilation"
JAVA_OPTS="$JAVA_OPTS -XX:+AlwaysCompileLoopMethods"
#JAVA_OPTS="$JAVA_OPTS -XX:TieredStopAtLevel=1"
#JAVA_OPTS="$JAVA_OPTS -XX:TieredStopAtLevel=1"
#JAVA_OPTS="$JAVA_OPTS -XX:CompileCommand=inline,*State.processLine()"
#JAVA_OPTS="$JAVA_OPTS -XX:+PrintAssembly"
#JAVA_OPTS="$JAVA_OPTS -XX:LogFile=../hotspot.log"
#JAVA_OPTS="$JAVA_OPTS -XX:+DebugNonSafepoints"
#JAVA_OPTS="$JAVA_OPTS -XX:C1MaxInlineSize=150"
#JAVA_OPTS="$JAVA_OPTS -XX:C1InlineStackLimit=40"
#JAVA_OPTS="$JAVA_OPTS -XX:FreqInlineSize=500"
#JAVA_OPTS="$JAVA_OPTS -XX:+PrintCompilation"
#JAVA_OPTS="$JAVA_OPTS -XX:+PrintInlining"
#JAVA_OPTS="$JAVA_OPTS -XX:CompileThreshold=20 "
#JAVA_OPTS="$JAVA_OPTS -Xlog:async"
# GC parameters
JAVA_OPTS="$JAVA_OPTS -XX:+UseParallelGC"
#JAVA_OPTS="$JAVA_OPTS -Xlog:gc*=debug:file=/tmp/gc.log"
#JAVA_OPTS="$JAVA_OPTS -XX:+UseEpsilonGC -Xlog:all=off"
#JAVA_OPTS="$JAVA_OPTS -XX:+PrintGC -XX:+PrintGCDetails"
java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_vemana "$@"

View File

@ -49,3 +49,4 @@ yavuztas;Yavuz Tas
yehwankim23;김예환 Ye-Hwan Kim (Sam) yehwankim23;김예환 Ye-Hwan Kim (Sam)
hundredwatt;Jason Nochlin hundredwatt;Jason Nochlin
gnmathur;Gaurav Mathur gnmathur;Gaurav Mathur
vemana;Subrahmanyam

23
prepare_vemana.sh Executable file
View File

@ -0,0 +1,23 @@
#!/bin/bash
#
# Copyright 2023 The original authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
source "$HOME/.sdkman/bin/sdkman-init.sh"
#sdk1 use java 21.0.1-open 1>&2
sdk use java 21.0.1-graal 1>&2
#sdk1 use java 21.0.1-zulu 1>&2
#sdk1 use java 21.0.1-graalce 1>&2

View File

@ -0,0 +1,692 @@
/*
* Copyright 2023 The original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.morling.onebrc;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteOrder;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel.MapMode;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
* This submission focuses on exploiting the non-SIMD parallelism that is inherent in OOO
* super-scalar CPUs and avoids using Unsafe, SWAR and other such fine techniques. The hope is to
* remain readable for a majority of SWEs. At a high level, the approach relies on a few principles
* listed herein.
*
* <p>
* [Exploit Parallelism] Distribute the work into Shards. Separate threads (one per core) process
* Shards and follow it up by merging the results. parallelStream() is appealing but carries
* potential run-time variance (i.e. std. deviation) penalties based on informal testing. Variance
* is not ideal when trying to minimize the maximum worker latency.
*
* <p>
* [Use ByteBuffers over MemorySegment] Each Shard is further divided in Chunks. This would've been
* unnecessary except that Shards are too big to be backed by ByteBuffers. Besides, MemorySegment
* appears slower than ByteBuffers. So, to use ByteBuffers, we have to use smaller chunks.
*
* <p>
* [Straggler freedom] The optimization function here is to minimize the maximal worker thread
* completion. Law of large number averages means that all the threads will end up with similar
* amounts of work and similar completion times; but, however ever so often there could be a bad
* sharding and more importantly, Cores are not created equal; some will be throttled more than
* others. So, we have a shared {@code LazyShardQueue} that aims to distribute work to minimize the
* latest completion time.
*
* <p>
* [Work Assignment with LazyShardQueue] The queue provides each thread with its next big-chunk
* until X% of the work remains. Big-chunks belong to the thread and will not be provided to another
* thread. Then, it switches to providing small-chunk sizes. Small-chunks comprise the last X% of
* work and every thread can participate in completing the chunk. Even though the queue is shared
* across threads, there's no communication across thread during the big-chunk phases. The queue is
* effectively a per-thread queue while processing big-chunks. The small-chunk phase uses an
* AtomicLong to coordinate chunk allocation across threads.
*
* <p>
* [Chunk processing] Chunk processing is typical. Process line by line. Find a hash function
* (polynomial hash fns are slow, but will work fine), hash the city name, resolve conflicts using
* linear probing and then accumulate the temperature into the appropriate hash slot. The key
* element then is how fast can you identify the hash slot, read the temperature and update the new
* temperature in the slot (i.e. min, max, count).
*
* <p>
* [Cache friendliness] 7502P and my machine (7950X) offer 4MB L3 cache/core. This means we can hope
* to fit all our datastructures in L3 cache. Since SMT is turned on, the Runtime's available
* processors will show twice the number of actual cores and so we get 2MB L3 cache/thread. To be
* safe, we try to stay within 1.8 MB/thread and size our hashtable appropriately.
*
* <p>
* [Allocation] Since MemorySegment seemed slower than ByteBuffers, backing Chunks by bytebuffers
* was the logical option. Creating one ByteBuffer per chunk was no bueno because the system doesn't
* like it (JVM runs out of mapped file handle quota). Other than that, allocation in the hot path
* was avoided.
*
* <p>
* [General approach to fast hashing and temperature reading] Here, it helps to understand the
* various bottlenecks in execution. One particular thing that I kept coming back to was to
* understand the relative costs of instructions: See
* https://www.agner.org/optimize/instruction_tables.pdf It is helpful to think of hardware as a
* smart parallel execution machine that can do several operations in one cycle if only you can feed
* it. So, the idea is to reduce data-dependency chains in the bottleneck path. The other major idea
* is to just avoid unnecessary work. For example, copying the city name into a byte array just for
* the purpose of looking it up was costing a noticeable amount. Instead, encoding it as
* (bytebuffer, start, len) was helpful. Spotting unnecessary work was non-trivial.So, them pesky
* range checks? see if you can avoid them. For example, sometimes you can eliminate a "nextPos <
* endPos" in a tight loop by breaking it into two pieces: one piece where the check will not be
* needed and a tail piece where it will be needed.
*
* <p>
* [Understand What Cores like]. Cores like to go straight and loop back. Despite good branch
* prediction, performance sucks with mispredicted branches.
*
* <p>
* [JIT] Java performance requires understanding the JIT. It is helpful to understand what the JIT
* likes though it is still somewhat of a mystery to me. In general, it inlines small methods very
* well and after constant folding, it can optimize quite well across a reasonably deep call chain.
* My experience with the JIT was that everything I tried to tune it made it worse except for one
* parameter. I have a new-found respect for JIT - it likes and understands typical Java idioms.
*
* <p>[Tuning] Nothing was more insightful than actually playing with various tuning parameters.
* I can have all the theories but the hardware and JIT are giant blackboxes. I used a bunch of
* tools to optimize: (1) Command line parameters to tune big and small chunk sizes etc. This was
* also very helpful in forming a mental model of the JIT. Sometimes, it would compile some methods
* and sometimes it would just run them interpreted since the compilation threshold wouldn't be
* reached for intermediate methods. (2) AsyncProfiler - this was the first line tool to understand
* cache misses and cpu time to figure where to aim the next optimization effort. (3) JitWatch -
* invaluable for forming a mental model and attempting to tune the JIT.
*
* <p>[Things that didn't work]. This is a looong list and the hit rate is quite low. In general,
* removing unnecessary work had a high hit rate and my pet theories on how things work in hardware
* had a low hit rate. Java Vector API lacked a good gather API to load from arbitrary memory
* addresses; this prevented performing real SIMD on the entire dataset, one where you load 64 bytes
* from 64 different line starting positions (just after a previous line end) and then step through
* one byte at a time. This to me is the most natural SIMD approach to the 1BRC problem but I
* couldn't use it. I tried other local uses of the vector API and it was always slower, and mostly
* much slower. In other words, Java Vector API needs a problem for which it is suited (duh) but,
* unless I am overlooking something, the API still lacks gather from arbitrary memory addresses.
*
* <p>[My general takeaways]. Write simple, idiomatic java code and get 70-80% of the max
* performance of an optimally hand-tuned code. Focus any optimization efforts on being friendly to
* the JIT *before* thinking about tuning for the hardware. There's a real cost to EXTREME
* performance tuning: a loss of abstraction and maintainability, but being JIT friendly is probably
* much more achievable without sacrificing abstraction.
*/
public class CalculateAverage_vemana {
public static void checkArg(boolean condition) {
if (!condition) {
throw new IllegalArgumentException();
}
}
public static void main(String[] args) throws Exception {
// First process in large chunks without coordination among threads
// Use chunkSizeBits for the large-chunk size
int chunkSizeBits = 20;
// For the last commonChunkFraction fraction of total work, use smaller chunk sizes
double commonChunkFraction = 0;
// Use commonChunkSizeBits for the small-chunk size
int commonChunkSizeBits = 18;
// Size of the hashtable (attempt to fit in L3)
int hashtableSizeBits = 14;
if (args.length > 0) {
chunkSizeBits = Integer.parseInt(args[0]);
}
if (args.length > 1) {
commonChunkFraction = Double.parseDouble(args[1]);
}
if (args.length > 2) {
commonChunkSizeBits = Integer.parseInt(args[2]);
}
if (args.length > 3) {
hashtableSizeBits = Integer.parseInt(args[3]);
}
// System.err.println(STR."""
// Using the following parameters:
// - chunkSizeBits = \{chunkSizeBits}
// - commonChunkFraction = \{commonChunkFraction}
// - commonChunkSizeBits = \{commonChunkSizeBits}
// - hashtableSizeBits = \{hashtableSizeBits}
// """);
System.out.println(new Runner(
Path.of("measurements.txt"),
chunkSizeBits,
commonChunkFraction,
commonChunkSizeBits,
hashtableSizeBits).getSummaryStatistics());
}
public interface LazyShardQueue {
ByteRange take(int shardIdx);
}
// Mutable to avoid allocation
public static class ByteRange {
private static final int BUF_SIZE = 1 << 30;
private final long fileSize;
private final RandomAccessFile raf;
// ***************** What this is doing and why *****************
// Reading from ByteBuffer appears faster from MemorySegment, but ByteBuffer can only be
// Integer.MAX_VALUE long; Creating one byteBuffer per chunk kills native memory quota
// and JVM crashes without futher parameters.
//
// So, in this solution, create a sliding window of bytebuffers:
// - Create a large bytebuffer that spans the chunk
// - If the next chunk falls outside the byteBuffer, create another byteBuffer that spans the
// chunk. Because chunks are allocated serially, a single large (1<<30) byteBuffer spans
// many successive chunks.
// - In fact, for serial chunk allocation (which is friendly to page faulting anyway),
// the number of created ByteBuffers doesn't exceed [size of shard/(1<<30)] which is less than
// 100/thread and is comfortably below what the JVM can handle (65K) without further param
// tuning
// - This enables (relatively) allocation free chunking implementation. Our chunking impl uses
// fine grained chunking for the last say X% of work to avoid being hostage to stragglers
// The PUBLIC API
public MappedByteBuffer byteBuffer;
public int endInBuf; // where the chunk ends inside the buffer
public int startInBuf; // where the chunk starts inside the buffer
// Private State
private long bufferEnd; // byteBuffer's ending coordinate
private long bufferStart; // byteBuffer's begin coordinate
// Uninitialized; for mutability
public ByteRange(RandomAccessFile raf) {
this.raf = raf;
try {
this.fileSize = raf.length();
}
catch (IOException e) {
throw new RuntimeException(e);
}
bufferEnd = bufferStart = -1;
}
public void setRange(long rangeStart, long rangeEnd) {
if (rangeEnd + 1024 > bufferEnd || rangeStart < bufferStart) {
bufferStart = rangeStart;
bufferEnd = Math.min(bufferStart + BUF_SIZE, fileSize);
setByteBufferToRange(bufferStart, bufferEnd);
}
if (rangeStart > 0) {
rangeStart = 1 + nextNewLine(rangeStart);
}
if (rangeEnd < fileSize) {
rangeEnd = 1 + nextNewLine(rangeEnd);
}
else {
rangeEnd = fileSize;
}
startInBuf = (int) (rangeStart - bufferStart);
endInBuf = (int) (rangeEnd - bufferStart);
}
@Override
public String toString() {
return STR."""
ByteRange {
startInBuf = \{startInBuf}
endInBuf = \{endInBuf}
}
""";
}
private long nextNewLine(long pos) {
int nextPos = (int) (pos - bufferStart);
while (byteBuffer.get(nextPos) != '\n') {
nextPos++;
}
return nextPos + bufferStart;
}
private void setByteBufferToRange(long start, long end) {
try {
byteBuffer = raf.getChannel().map(MapMode.READ_ONLY, start, end - start);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public record Result(Map<String, Stat> tempStats) {
@Override
public String toString() {
return this.tempStats()
.entrySet()
.stream()
.sorted(Entry.comparingByKey())
.map(entry -> "%s=%s".formatted(entry.getKey(), entry.getValue()))
.collect(Collectors.joining(", ", "{", "}"));
}
}
public static class Runner {
private final double commonChunkFraction;
private final int commonChunkSizeBits;
private final int hashtableSizeBits;
private final Path inputFile;
private final int shardSizeBits;
public Runner(
Path inputFile, int chunkSizeBits, double commonChunkFraction, int commonChunkSizeBits,
int hashtableSizeBits) {
this.inputFile = inputFile;
this.shardSizeBits = chunkSizeBits;
this.commonChunkFraction = commonChunkFraction;
this.commonChunkSizeBits = commonChunkSizeBits;
this.hashtableSizeBits = hashtableSizeBits;
}
Result getSummaryStatistics() throws Exception {
int processors = Runtime.getRuntime().availableProcessors();
LazyShardQueue shardQueue = new SerialLazyShardQueue(
1L << shardSizeBits,
inputFile,
processors,
commonChunkFraction,
commonChunkSizeBits);
List<Future<Result>> results = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(
processors,
runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
return thread;
});
long[] finishTimes = new long[processors];
for (int i = 0; i < processors; i++) {
final int I = i;
final Callable<Result> callable = () -> {
Result result = new ShardProcessor(shardQueue, hashtableSizeBits, I).processShard();
finishTimes[I] = System.nanoTime();
return result;
};
results.add(executorService.submit(callable));
}
// printFinishTimes(finishTimes);
return executorService.submit(() -> merge(results)).get();
}
private Result merge(List<Future<Result>> results)
throws ExecutionException, InterruptedException {
Map<String, Stat> output = null;
boolean[] isDone = new boolean[results.size()];
int remaining = results.size();
// Let's be naughty and spin in a busy loop
while (remaining > 0) {
for (int i = 0; i < results.size(); i++) {
if (!isDone[i] && results.get(i).isDone()) {
isDone[i] = true;
remaining--;
if (output == null) {
output = new TreeMap<>(results.get(i).get().tempStats());
}
else {
for (Entry<String, Stat> entry : results.get(i).get().tempStats().entrySet()) {
output.compute(
entry.getKey(),
(key, value) -> value == null ? entry.getValue()
: Stat.merge(value, entry.getValue()));
}
}
}
}
}
return new Result(output);
}
private void printFinishTimes(long[] finishTimes) {
Arrays.sort(finishTimes);
int n = finishTimes.length;
System.err.println(STR."Finish Delta: \{(finishTimes[n - 1] - finishTimes[0]) / 1_000_000}ms");
}
}
public static class SerialLazyShardQueue implements LazyShardQueue {
private static long roundToNearestHigherMultipleOf(long divisor, long value) {
return (value + divisor - 1) / divisor * divisor;
}
private final ByteRange[] byteRanges;
private final long chunkSize;
private final long commonChunkSize;
private final AtomicLong commonPool;
private final long fileSize;
private final long[] nextStarts;
public SerialLazyShardQueue(
long chunkSize, Path filePath, int shards, double commonChunkFraction,
int commonChunkSizeBits)
throws IOException {
checkArg(commonChunkFraction < 0.9 && commonChunkFraction >= 0);
var raf = new RandomAccessFile(filePath.toFile(), "r");
this.fileSize = raf.length();
// Common pool
long commonPoolStart = Math.min(
roundToNearestHigherMultipleOf(chunkSize, (long) (fileSize * (1 - commonChunkFraction))),
fileSize);
this.commonPool = new AtomicLong(commonPoolStart);
this.commonChunkSize = 1L << commonChunkSizeBits;
// Distribute chunks to shards
this.nextStarts = new long[shards << 4]; // thread idx -> 16*idx to avoid cache line conflict
for (long i = 0, currentStart = 0, remainingChunks = (commonPoolStart + chunkSize - 1) / chunkSize; i < shards; i++) {
long remainingShards = shards - i;
long currentChunks = (remainingChunks + remainingShards - 1) / remainingShards;
// Shard i handles: [currentStart, currentStart + currentChunks * chunkSize)
int pos = (int) i << 4;
nextStarts[pos] = currentStart;
nextStarts[pos + 1] = currentStart + currentChunks * chunkSize;
currentStart += currentChunks * chunkSize;
remainingChunks -= currentChunks;
}
this.chunkSize = chunkSize;
this.byteRanges = new ByteRange[shards << 4];
for (int i = 0; i < shards; i++) {
byteRanges[i << 4] = new ByteRange(raf);
}
}
@Override
public ByteRange take(int idx) {
// Try for thread local range
final int pos = idx << 4;
long rangeStart = nextStarts[pos];
final long chunkEnd = nextStarts[pos + 1];
final long rangeEnd;
if (rangeStart < chunkEnd) {
rangeEnd = rangeStart + chunkSize;
nextStarts[pos] = rangeEnd;
}
else {
rangeStart = commonPool.getAndAdd(commonChunkSize);
// If that's exhausted too, nothing remains!
if (rangeStart >= fileSize) {
return null;
}
rangeEnd = rangeStart + commonChunkSize;
}
ByteRange chunk = byteRanges[pos];
chunk.setRange(rangeStart, rangeEnd);
return chunk;
}
}
public static class ShardProcessor {
private final LazyShardQueue shardQueue;
private final ShardProcessorState state;
private final int threadIdx;
public ShardProcessor(LazyShardQueue shardQueue, int hashtableSizeBits, int threadIdx) {
this.shardQueue = shardQueue;
this.threadIdx = threadIdx;
this.state = new ShardProcessorState(hashtableSizeBits);
}
public Result processShard() {
ByteRange range;
while ((range = shardQueue.take(threadIdx)) != null) {
processRange(range);
}
return result();
}
private void processRange(ByteRange range) {
MappedByteBuffer mmb = range.byteBuffer;
int nextPos = range.startInBuf;
int end = range.endInBuf;
while (nextPos < end) {
nextPos = state.processLine(mmb, nextPos);
}
}
private Result result() {
return state.result();
}
}
public static class ShardProcessorState {
private static final ByteOrder NATIVE_BYTE_ORDER = ByteOrder.nativeOrder();
private final byte[][] cityNames;
private final int slotsMask;
private final Stat[] stats;
public ShardProcessorState(int slotsBits) {
this.stats = new Stat[1 << slotsBits];
this.cityNames = new byte[1 << slotsBits][];
this.slotsMask = (1 << slotsBits) - 1;
}
public int processLine(MappedByteBuffer mmb, int nextPos) {
int originalPos = nextPos;
byte nextByte;
int hash = 0;
while (true) {
int x = mmb.getInt(nextPos);
if (NATIVE_BYTE_ORDER == ByteOrder.BIG_ENDIAN) {
x = Integer.reverseBytes(x);
}
byte a = (byte) (x >>> 24);
if (a == ';') {
nextPos += 1;
break;
}
byte b = (byte) (x >>> 16);
if (b == ';') {
nextPos += 2;
hash = hash * 31 + ((0xFF000000 & x));
break;
}
byte c = (byte) (x >>> 8);
if (c == ';') {
nextPos += 3;
hash = hash * 31 + ((0xFFFF0000 & x));
break;
}
byte d = (byte) (x >>> 0);
if (d == ';') {
nextPos += 4;
hash = hash * 31 + ((0xFFFFFF00 & x));
break;
}
hash = hash * 31 + x;
nextPos += 4;
}
int cityLen = nextPos - 1 - originalPos;
// Read temperature
int temperature = 0;
boolean negative = (nextByte = mmb.get(nextPos++)) == '-';
if (!negative) {
temperature = nextByte - '0';
}
else {
temperature = mmb.get(nextPos++) - '0';
}
while (true) {
nextByte = mmb.get(nextPos++);
if (nextByte != '.') {
temperature = temperature * 10 + (nextByte - '0');
}
else {
temperature = temperature * 10 + (mmb.get(nextPos++) - '0');
nextPos++;
break;
}
}
linearProbe(
cityLen,
hash & slotsMask,
negative ? -temperature : temperature,
mmb,
originalPos);
return nextPos;
}
public Result result() {
int N = stats.length;
TreeMap<String, Stat> map = new TreeMap<>();
for (int i = 0; i < N; i++) {
if (stats[i] != null) {
map.put(new String(cityNames[i]), stats[i]);
}
}
return new Result(map);
}
private byte[] copyFrom(MappedByteBuffer mmb, int offsetInMmb, int len) {
byte[] out = new byte[len];
for (int i = 0; i < len; i++) {
out[i] = mmb.get(offsetInMmb + i);
}
return out;
}
private boolean equals(byte[] left, MappedByteBuffer right, int offsetInMmb, int len) {
for (int i = 0; i < len; i++) {
if (left[i] != right.get(offsetInMmb + i)) {
return false;
}
}
return true;
}
private void linearProbe(int len, int hash, int temp, MappedByteBuffer mmb, int offsetInMmb) {
for (int i = hash;; i = (i + 1) & slotsMask) {
var curBytes = cityNames[i];
if (curBytes == null) {
cityNames[i] = copyFrom(mmb, offsetInMmb, len);
stats[i] = Stat.firstReading(temp);
return;
}
else {
// Overall, this tradeoff seems better than Arrays.equals(..)
// City name param is encoded as (mmb, offsetnInMmb, len)
// This avoids copying it into a (previously allocated) byte[]
// The downside is that we have to manually implement 'equals' and it can lose out
// to vectorized 'equals'; but the trade off seems to work in this particular case
if (len == curBytes.length && equals(curBytes, mmb, offsetInMmb, len)) {
stats[i].mergeReading(temp);
return;
}
}
}
}
}
public static class Stat {
public static Stat firstReading(int temp) {
return new Stat(temp, temp, temp, 1);
}
public static Stat merge(Stat left, Stat right) {
return new Stat(
Math.min(left.min, right.min),
Math.max(left.max, right.max),
left.sum + right.sum,
left.count + right.count);
}
private long count, sum;
private int min, max;
public Stat(int min, int max, long sum, long count) {
this.min = min;
this.max = max;
this.sum = sum;
this.count = count;
}
// Caution: Mutates
public void mergeReading(int curTemp) {
// Can this be improved furhter?
// Assuming random values for curTemp,
// min (&max) gets updated roughly log(N)/N fraction of the time (a small number)
// In the worst case, there will be at-most one branch misprediction.
if (curTemp > min) { // Mostly passes. On branch misprediction, just update min.
if (curTemp > max) { // Mostly fails. On branch misprediction, just update max.
max = curTemp;
}
}
else {
min = curTemp;
}
sum += curTemp;
count++;
}
@Override
public String toString() {
return "%.1f/%.1f/%.1f".formatted(min / 10.0, sum / 10.0 / count, max / 10.0);
}
}
}