From 1086385f1f22965d30bbc54edefa6c5d2e5e45ce Mon Sep 17 00:00:00 2001 From: Prabhu R Date: Thu, 11 Jan 2024 01:39:42 +0530 Subject: [PATCH] Implementation by rprabhu Co-authored-by: Prabhu R --- calculate_average_rprabhu.sh | 20 +++ .../onebrc/CalculateAverage_rprabhu.java | 137 ++++++++++++++++++ 2 files changed, 157 insertions(+) create mode 100644 calculate_average_rprabhu.sh create mode 100644 src/main/java/dev/morling/onebrc/CalculateAverage_rprabhu.java diff --git a/calculate_average_rprabhu.sh b/calculate_average_rprabhu.sh new file mode 100644 index 0000000..f0efdbb --- /dev/null +++ b/calculate_average_rprabhu.sh @@ -0,0 +1,20 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +JAVA_OPTS="" +time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_rprabhu diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_rprabhu.java b/src/main/java/dev/morling/onebrc/CalculateAverage_rprabhu.java new file mode 100644 index 0000000..17ae3b2 --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_rprabhu.java @@ -0,0 +1,137 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class CalculateAverage_rprabhu { + + private static final String FILE = "./measurements.txt"; + + private static record Measurement(String station, double value) { + private Measurement(String[] parts) { + this(parts[0], Double.parseDouble(parts[1])); + } + } + + private static record ResultRow(double min, double mean, double max) { + public String toString() { + return round(min) + "/" + round(mean) + "/" + round(max); + } + + private double round(double value) { + return Math.round(value * 10.0) / 10.0; + } + }; + + private static class MeasurementAggregator { + private double min = Double.POSITIVE_INFINITY; + private double max = Double.NEGATIVE_INFINITY; + private double sum; + private long count; + } + + private static ConcurrentHashMap map = new ConcurrentHashMap<>(); + + private static final int TASK_CHUNK = 50000; + + private static class TaskExecutor implements Runnable { + List list; + + TaskExecutor(List list) { + this.list = list; + } + + @Override + public void run() { + for (String str : list) { + // System.out.println(str); + // String[] values = str.split(";"); + int index = str.indexOf(';'); + String station = str.substring(0, index); + double val = Double.parseDouble(str.substring(index + 1)); + // double val = Double.parseDouble(values[1]); + MeasurementAggregator aggr = map.getOrDefault(station, new MeasurementAggregator()); + aggr.count += 1; + aggr.sum += val; + aggr.max = (val >= aggr.max) ? val : aggr.max; // Math.max(aggr.max, val); + aggr.min = (val <= aggr.min) ? val : aggr.min; // Math.min(aggr.min, val); + map.put(station, aggr); + } + + } + + } + + private static class TaskScheduler { + ExecutorService executor; + List list = new ArrayList<>(TASK_CHUNK); + + TaskScheduler(ExecutorService executor) { + this.executor = executor; + } + + void push(String line) { + // System.out.println("adding: " + line); + list.add(line); + if (list.size() >= TASK_CHUNK) { + executor.submit(new TaskExecutor(list)); + list = new ArrayList<>(TASK_CHUNK); + } + } + + void completeRemaining() { + // System.out.println("Completing remaining: " + list.size()); + if (!list.isEmpty()) { + // System.out.println("Scheduling remaining"); + executor.submit(new TaskExecutor(list)); + } + } + + } + + public static void main(String[] args) throws IOException, InterruptedException, ExecutionException { + + // ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); + // ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + ExecutorService executor = Executors.newCachedThreadPool(); + TaskScheduler scheduler = new TaskScheduler(executor); + Files.lines(Paths.get(FILE), StandardCharsets.UTF_8).forEach(line -> { + scheduler.push(line); + }); + scheduler.completeRemaining(); + executor.shutdown(); + executor.awaitTermination(600, TimeUnit.SECONDS); + + Map sortedResult = new TreeMap<>(); + map.forEach( + (key, value) -> sortedResult.put(key, new ResultRow(value.min, value.sum / value.count, value.max))); + System.out.println(sortedResult); + + } +}