shipilev: Amendments to version 4 (#627)
* Amendments * One more locality touchup: no need to carry the entire name array
This commit is contained in:
parent
9da1660ba5
commit
82197d4482
@ -15,12 +15,11 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
#
|
||||||
|
|
||||||
JAVA_OPTS="-XX:+UnlockExperimentalVMOptions -XX:+UseEpsilonGC -Xms64m -Xmx64m -XX:+AlwaysPreTouch -XX:+UseTransparentHugePages
|
JAVA_OPTS="-XX:+UnlockExperimentalVMOptions -XX:+UseEpsilonGC -Xms1g -Xmx1g -XX:-AlwaysPreTouch -XX:+UseTransparentHugePages
|
||||||
-XX:-TieredCompilation -XX:CICompilerCount=2 -XX:-UseCountedLoopSafepoints -XX:+TrustFinalNonStaticFields
|
-XX:-TieredCompilation -XX:-UseCountedLoopSafepoints -XX:+TrustFinalNonStaticFields -XX:CompileThreshold=2048
|
||||||
--add-opens java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.ref=ALL-UNNAMED
|
--add-opens java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.ref=ALL-UNNAMED
|
||||||
-XX:+UnlockDiagnosticVMOptions -XX:CompileCommand=quiet
|
-XX:+UnlockDiagnosticVMOptions -XX:CompileCommand=quiet
|
||||||
-XX:CompileCommand=dontinline,dev.morling.onebrc.CalculateAverage_shipilev\$ParsingTask::seqCompute
|
-XX:CompileCommand=dontinline,dev.morling.onebrc.CalculateAverage_shipilev\$ParsingTask::seqCompute
|
||||||
-XX:CompileCommand=dontinline,dev.morling.onebrc.CalculateAverage_shipilev\$MeasurementsMap::updateSlow
|
-XX:CompileCommand=dontinline,dev.morling.onebrc.CalculateAverage_shipilev\$MeasurementsMap::updateSlow
|
||||||
-XX:CompileCommand=inline,dev.morling.onebrc.CalculateAverage_shipilev::nameMatches
|
-XX:CompileCommand=inline,dev.morling.onebrc.CalculateAverage_shipilev\$Bucket::matches"
|
||||||
-XX:CompileThreshold=2048"
|
|
||||||
java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_shipilev
|
java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_shipilev
|
||||||
|
@ -102,45 +102,12 @@ public class CalculateAverage_shipilev {
|
|||||||
|
|
||||||
// ========================= MEATY GRITTY PARTS: PARSE AND AGGREGATE =========================
|
// ========================= MEATY GRITTY PARTS: PARSE AND AGGREGATE =========================
|
||||||
|
|
||||||
// Little helper method to compare the array with given bytebuffer range.
|
|
||||||
public static boolean nameMatches(Bucket bucket, ByteBuffer cand, int begin, int end) {
|
|
||||||
byte[] orig = bucket.name;
|
|
||||||
int origLen = orig.length;
|
|
||||||
int candLen = end - begin;
|
|
||||||
if (origLen != candLen) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check the tails first, to simplify the matches.
|
|
||||||
if (origLen >= 8) {
|
|
||||||
if (bucket.tail1 != cand.getLong(end - 8)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (origLen >= 16) {
|
|
||||||
if (bucket.tail2 != cand.getLong(end - 16)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
origLen -= 16;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
origLen -= 8;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check the rest.
|
|
||||||
for (int i = 0; i < origLen; i++) {
|
|
||||||
if (orig[i] != cand.get(begin + i)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static final class Bucket {
|
public static final class Bucket {
|
||||||
// Raw station name, its hash, and tails.
|
// Raw station name, its hash, and prefixes.
|
||||||
public final byte[] name;
|
public final byte[] nameTail;
|
||||||
|
public final int len;
|
||||||
public final int hash;
|
public final int hash;
|
||||||
public final long tail1, tail2;
|
public final int prefix1, prefix2;
|
||||||
|
|
||||||
// Temperature values, in 10x scale.
|
// Temperature values, in 10x scale.
|
||||||
public long sum;
|
public long sum;
|
||||||
@ -148,10 +115,32 @@ public class CalculateAverage_shipilev {
|
|||||||
public int min;
|
public int min;
|
||||||
public int max;
|
public int max;
|
||||||
|
|
||||||
public Bucket(byte[] name, long tail1, long tail2, int hash, int temp) {
|
public Bucket(ByteBuffer slice, int begin, int end, int hash, int temp) {
|
||||||
this.name = name;
|
len = end - begin;
|
||||||
this.tail1 = tail1;
|
|
||||||
this.tail2 = tail2;
|
// Also pick up any prefixes to simplify future matches.
|
||||||
|
int tailStart = 0;
|
||||||
|
if (len >= 8) {
|
||||||
|
prefix1 = slice.getInt(begin + 0);
|
||||||
|
prefix2 = slice.getInt(begin + 4);
|
||||||
|
tailStart += 8;
|
||||||
|
}
|
||||||
|
else if (len >= 4) {
|
||||||
|
prefix1 = slice.getInt(begin + 0);
|
||||||
|
prefix2 = 0;
|
||||||
|
tailStart += 4;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
prefix1 = 0;
|
||||||
|
prefix2 = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The rest goes to tail byte array. We are checking it names on hot-path.
|
||||||
|
// Therefore, it is convenient to keep allocation for names near the buckets.
|
||||||
|
int tailLen = len - tailStart;
|
||||||
|
nameTail = new byte[tailLen];
|
||||||
|
slice.get(begin + tailStart, nameTail, 0, tailLen);
|
||||||
|
|
||||||
this.hash = hash;
|
this.hash = hash;
|
||||||
this.sum = temp;
|
this.sum = temp;
|
||||||
this.count = 1;
|
this.count = 1;
|
||||||
@ -159,6 +148,48 @@ public class CalculateAverage_shipilev {
|
|||||||
this.max = temp;
|
this.max = temp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Little helper method to compare the array with given bytebuffer range.
|
||||||
|
public boolean matches(ByteBuffer cand, int begin, int end) {
|
||||||
|
int origLen = len;
|
||||||
|
int candLen = end - begin;
|
||||||
|
if (origLen != candLen) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the prefixes first, to simplify the matches.
|
||||||
|
int tailStart = 0;
|
||||||
|
if (origLen >= 8) {
|
||||||
|
if (prefix1 != cand.getInt(begin)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (prefix2 != cand.getInt(begin + 4)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
tailStart += 8;
|
||||||
|
}
|
||||||
|
else if (origLen >= 4) {
|
||||||
|
if (prefix1 != cand.getInt(begin)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
tailStart += 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the rest.
|
||||||
|
for (int i = 0; i < origLen - tailStart; i++) {
|
||||||
|
if (nameTail[i] != cand.get(begin + tailStart + i)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean matches(Bucket other) {
|
||||||
|
return len == other.len &&
|
||||||
|
prefix1 == other.prefix1 &&
|
||||||
|
prefix2 == other.prefix2 &&
|
||||||
|
Arrays.equals(nameTail, other.nameTail);
|
||||||
|
}
|
||||||
|
|
||||||
public void merge(int value) {
|
public void merge(int value) {
|
||||||
sum += value;
|
sum += value;
|
||||||
count++;
|
count++;
|
||||||
@ -178,8 +209,19 @@ public class CalculateAverage_shipilev {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Row toRow() {
|
public Row toRow() {
|
||||||
|
// Reconstruct the name
|
||||||
|
ByteBuffer bb = ByteBuffer.allocate(len);
|
||||||
|
bb.order(ByteOrder.LITTLE_ENDIAN);
|
||||||
|
if (len >= 4) {
|
||||||
|
bb.putInt(prefix1);
|
||||||
|
}
|
||||||
|
if (len >= 8) {
|
||||||
|
bb.putInt(prefix2);
|
||||||
|
}
|
||||||
|
bb.put(nameTail);
|
||||||
|
|
||||||
return new Row(
|
return new Row(
|
||||||
new String(name),
|
new String(Arrays.copyOf(bb.array(), len)),
|
||||||
Math.round((double) min) / 10.0,
|
Math.round((double) min) / 10.0,
|
||||||
Math.round((double) sum / count) / 10.0,
|
Math.round((double) sum / count) / 10.0,
|
||||||
Math.round((double) max) / 10.0);
|
Math.round((double) max) / 10.0);
|
||||||
@ -205,21 +247,11 @@ public class CalculateAverage_shipilev {
|
|||||||
while (true) {
|
while (true) {
|
||||||
Bucket cur = buckets[idx];
|
Bucket cur = buckets[idx];
|
||||||
if (cur == null) {
|
if (cur == null) {
|
||||||
// No bucket yet, lucky us. Lookup the name and create the bucket with it.
|
// No bucket yet, lucky us. Create the bucket with it.
|
||||||
// We are checking the names on hot-path. Therefore, it is convenient
|
buckets[idx] = new Bucket(name, begin, end, hash, temp);
|
||||||
// to keep allocation for names near the buckets.
|
|
||||||
int len = end - begin;
|
|
||||||
byte[] copy = new byte[len];
|
|
||||||
name.get(begin, copy, 0, len);
|
|
||||||
|
|
||||||
// Also pick up any tail to simplify future matches.
|
|
||||||
long tail1 = (len < 8) ? 0 : name.getLong(begin + len - 8);
|
|
||||||
long tail2 = (len < 16) ? 0 : name.getLong(begin + len - 16);
|
|
||||||
|
|
||||||
buckets[idx] = new Bucket(copy, tail1, tail2, hash, temp);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
else if ((cur.hash == hash) && nameMatches(cur, name, begin, end)) {
|
else if ((cur.hash == hash) && cur.matches(name, begin, end)) {
|
||||||
// Same as bucket fastpath. Check for collision by checking the full hash
|
// Same as bucket fastpath. Check for collision by checking the full hash
|
||||||
// first (since the index is truncated by map size), and then the exact name.
|
// first (since the index is truncated by map size), and then the exact name.
|
||||||
cur.merge(temp);
|
cur.merge(temp);
|
||||||
@ -244,7 +276,7 @@ public class CalculateAverage_shipilev {
|
|||||||
buckets[idx] = other;
|
buckets[idx] = other;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
else if ((cur.hash == other.hash) && Arrays.equals(cur.name, other.name)) {
|
else if ((cur.hash == other.hash) && cur.matches(other)) {
|
||||||
cur.merge(other);
|
cur.merge(other);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -425,7 +457,7 @@ public class CalculateAverage_shipilev {
|
|||||||
|
|
||||||
// Time to update!
|
// Time to update!
|
||||||
Bucket bucket = buckets[nameHash & (MAP_SIZE - 1)];
|
Bucket bucket = buckets[nameHash & (MAP_SIZE - 1)];
|
||||||
if ((bucket != null) && (nameHash == bucket.hash) && nameMatches(bucket, slice, nameBegin, nameEnd)) {
|
if ((bucket != null) && (nameHash == bucket.hash) && bucket.matches(slice, nameBegin, nameEnd)) {
|
||||||
// Lucky fast path, existing bucket hit. Most of the time we complete here.
|
// Lucky fast path, existing bucket hit. Most of the time we complete here.
|
||||||
bucket.merge(temp);
|
bucket.merge(temp);
|
||||||
}
|
}
|
||||||
@ -447,8 +479,8 @@ public class CalculateAverage_shipilev {
|
|||||||
// a given mmap slice, while there is still other work to do. This allows
|
// a given mmap slice, while there is still other work to do. This allows
|
||||||
// us to unmap slices on the go.
|
// us to unmap slices on the go.
|
||||||
public static final class RootTask extends CountedCompleter<Void> {
|
public static final class RootTask extends CountedCompleter<Void> {
|
||||||
public RootTask(CountedCompleter<Void> parent) {
|
public RootTask() {
|
||||||
super(parent);
|
super(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -517,8 +549,12 @@ public class CalculateAverage_shipilev {
|
|||||||
// ========================= Invocation =========================
|
// ========================= Invocation =========================
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
// Instantiate a separate FJP to match the parallelism accurately, without
|
||||||
|
// relying on common pool defaults.
|
||||||
|
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
|
||||||
|
|
||||||
// This little line carries the whole world
|
// This little line carries the whole world
|
||||||
new RootTask(null).fork();
|
pool.submit(new RootTask());
|
||||||
|
|
||||||
// While the root task is working, prepare what we need for the
|
// While the root task is working, prepare what we need for the
|
||||||
// end of the run. Go and try to report something to prepare the
|
// end of the run. Go and try to report something to prepare the
|
||||||
|
Loading…
Reference in New Issue
Block a user