make aggregation state grow dynamically

This commit is contained in:
Richard Startin 2024-01-04 22:11:28 +00:00 committed by Gunnar Morling
parent b467319e58
commit b2cd84c6bc

View File

@ -248,12 +248,18 @@ public class CalculateAverage_richardstartin {
return Arrays.copyOf(PAGE_PROTOTYPE, PAGE_PROTOTYPE.length);
}
static void update(double[][] pages, int position, double value) {
static void update(List<double[]> pages, int position, double value) {
// find the page
int pageIndex = position >>> PAGE_SHIFT;
double[] page = pages[pageIndex];
if (pageIndex >= pages.size()) {
for (int i = pages.size(); i <= pageIndex; i++) {
pages.add(null);
}
}
double[] page = pages.get(pageIndex);
if (page == null) {
pages[pageIndex] = page = newPage();
page = newPage();
pages.set(pageIndex, page);
}
// update local aggregates
@ -263,8 +269,8 @@ public class CalculateAverage_richardstartin {
page[(position & PAGE_MASK) * 4 + 3] += value; // sum
}
static ResultRow toResultRow(double[][] pages, int position) {
double[] page = pages[position >>> PAGE_SHIFT];
static ResultRow toResultRow(List<double[]> pages, int position) {
double[] page = pages.get(position >>> PAGE_SHIFT);
double count = page[(position & PAGE_MASK) * 4];
double min = page[(position & PAGE_MASK) * 4 + 1];
double max = page[(position & PAGE_MASK) * 4 + 2];
@ -273,7 +279,7 @@ public class CalculateAverage_richardstartin {
}
}
private static class AggregationTask extends RecursiveTask<double[][]> {
private static class AggregationTask extends RecursiveTask<List<double[]>> {
private final Dictionary dictionary;
private final List<ByteBuffer> slices;
@ -291,7 +297,7 @@ public class CalculateAverage_richardstartin {
this.max = max;
}
private void computeSlice(ByteBuffer slice, double[][] pages) {
private void computeSlice(ByteBuffer slice, List<double[]> pages) {
for (int offset = 0; offset < slice.limit();) {
int nextSeparator = findIndexOf(slice, offset, DELIMITER);
ByteBuffer key = slice.slice(offset, nextSeparator - offset).order(ByteOrder.LITTLE_ENDIAN);
@ -310,14 +316,17 @@ public class CalculateAverage_richardstartin {
}
}
private static void merge(double[][] contribution, double[][] aggregate) {
for (int i = 0; i < contribution.length; i++) {
if (aggregate[i] == null) {
aggregate[i] = contribution[i];
private static void merge(List<double[]> contribution, List<double[]> aggregate) {
for (int i = aggregate.size(); i < contribution.size(); i++) {
aggregate.add(null);
}
for (int i = 0; i < contribution.size(); i++) {
if (aggregate.get(i) == null) {
aggregate.set(i, contribution.get(i));
}
else if (contribution[i] != null) {
double[] to = aggregate[i];
double[] from = contribution[i];
else if (contribution.get(i) != null) {
double[] to = aggregate.get(i);
double[] from = contribution.get(i);
// todo won't vectorise - consider separating aggregates into distinct regions and apply
// loop fission (if this shows up in the profile)
for (int j = 0; j < to.length; j += 4) {
@ -331,10 +340,9 @@ public class CalculateAverage_richardstartin {
}
@Override
protected double[][] compute() {
protected List<double[]> compute() {
if (min == max) {
// fixme - hardcoded to problem size
var pages = new double[600][];
var pages = new ArrayList<double[]>();
var slice = slices.get(min);
computeSlice(slice, pages);
return pages;
@ -388,7 +396,7 @@ public class CalculateAverage_richardstartin {
var fjp = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
Dictionary dictionary = new Dictionary();
double[][] aggregates = fjp.submit(new AggregationTask(dictionary, slices)).join();
var aggregates = fjp.submit(new AggregationTask(dictionary, slices)).join();
var map = new TreeMap<String, ResultRow>();
dictionary.forEach((key, index) -> map.put(key, Page.toResultRow(aggregates, index)));
System.out.println(map);