From 20e52aaadfba5d830a5e099fe1f40633f7263efa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jairo=20Grater=C3=B3n?= <58091322+jgrateron@users.noreply.github.com> Date: Thu, 11 Jan 2024 16:22:58 -0400 Subject: [PATCH] Divide the reading of the file by parts (#254) * divide the reading of the file by parts * fix format * add number of core partition * fix format * implement strToDouble * fix strtodouble * add locale, fix read file, tests pass * delete unnecessary method clean --- .../onebrc/CalculateAverage_jgrateron.java | 320 +++++++++++++----- 1 file changed, 233 insertions(+), 87 deletions(-) diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_jgrateron.java b/src/main/java/dev/morling/onebrc/CalculateAverage_jgrateron.java index c6edb26..251f26d 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_jgrateron.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_jgrateron.java @@ -16,115 +16,267 @@ package dev.morling.onebrc; -import java.io.BufferedReader; -import java.io.FileReader; +import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; -import java.util.Map; +import java.util.List; +import java.util.Locale; import java.util.Queue; -import java.util.TreeMap; +import java.util.Map.Entry; import java.util.stream.Collectors; public class CalculateAverage_jgrateron { private static final String FILE = "./measurements.txt"; - private static int MAX_LINES = 100000; + private static final int MAX_LENGTH_LINE = 115; + private static final int MAX_BUFFER = 1024 * 8; + private static boolean DEBUG = false; - public static void main(String[] args) throws IOException, InterruptedException { - // long startTime = System.nanoTime(); + public record Particion(long offset, long size) { + } - var tasks = new ArrayList(); - try (var reader = new BufferedReader(new FileReader(FILE))) { - String line; - var listaLineas = new LinkedList(); - while ((line = reader.readLine()) != null) { - listaLineas.add(line); - if (listaLineas.size() > MAX_LINES) { - var taskCalcular = new TaskCalcular(listaLineas); - listaLineas = new LinkedList(); - tasks.add(taskCalcular); - } - } - if (listaLineas.size() > 0) { - var taskCalcular = new TaskCalcular(listaLineas); - tasks.add(taskCalcular); - } - } - // combinar todas las particiones - var totalMediciones = new TreeMap(); - for (var task : tasks) { - task.join(); - var mediciones = task.getMediciones(); - for (var entry : mediciones.entrySet()) { - var medicion = totalMediciones.get(entry.getKey()); - if (medicion == null) { - totalMediciones.put(entry.getKey(), entry.getValue()); - } - else { - var otraMed = entry.getValue(); - medicion.update(otraMed.count, otraMed.tempMin, otraMed.tempMax, otraMed.tempSum); - } - } - } - var result = totalMediciones.entrySet().stream()// - .map(e -> e.getKey() + "=" + e.getValue())// - .collect(Collectors.joining(", ")); - - System.out.println("{" + result + "}"); - - // System.out.println("Total: " + (System.nanoTime() - startTime) / 1000000); + public record Tupla(String str, double num) { } /* * */ - static class TaskCalcular { - - private Queue listaLineas; - private Map mediciones; - private Thread hilo; - - public TaskCalcular(Queue listaLineas) { - this.listaLineas = listaLineas; - mediciones = new HashMap(); - hilo = Thread.ofPlatform().unstarted(() -> { - run(); - }); - hilo.start(); + public static List dividirArchivo(File archivo) throws IOException { + var particiones = new ArrayList(); + var buffer = new byte[MAX_LENGTH_LINE]; + var length = archivo.length(); + int cores = Runtime.getRuntime().availableProcessors(); + var sizeParticion = length / cores; + if (sizeParticion > MAX_BUFFER) { + var ini = 0l; + try (var rfile = new RandomAccessFile(archivo, "r")) { + for (;;) { + var size = sizeParticion; + var pos = ini + size; + if (pos > length) { + pos = length - 1; + size = length - ini; + } + rfile.seek(pos); + int count = rfile.read(buffer); + if (count == -1) { + break; + } + for (int i = 0; i < count; i++) { + size++; + if (buffer[i] == '\n' || buffer[i] == '\r') { + break; + } + } + var particion = new Particion(ini, size); + particiones.add(particion); + if (count != buffer.length) { + break; + } + ini += size; + } + } } + else { + particiones.add(new Particion(0, length)); + } + return particiones; + } - public void join() throws InterruptedException { + public static void main(String[] args) throws InterruptedException, IOException { + Locale.setDefault(Locale.US); + var startTime = System.nanoTime(); + var archivo = new File(FILE); + var totalMediciones = new HashMap(); + var tareas = new ArrayList(); + var particiones = dividirArchivo(archivo); + + for (var p : particiones) { + var hilo = Thread.ofVirtual().start(() -> { + var mediciones = new HashMap(); + try (var miArchivo = new MiArchivo(archivo)) { + miArchivo.seek(p); + for (;;) { + var tuples = miArchivo.readTuples(); + if (tuples.isEmpty()) { + break; + } + for (;;) { + var tuple = tuples.poll(); + if (tuple == null) { + break; + } + var estacion = tuple.str; + var temp = tuple.num; + var hashCode = estacion.hashCode(); + var medicion = mediciones.get(hashCode); + if (medicion == null) { + medicion = new Medicion(estacion, 1, temp, temp, temp); + mediciones.put(hashCode, medicion); + } + else { + medicion.update(1, temp, temp, temp); + } + } + } + } + catch (IOException e) { + System.exit(-1); + } + synchronized (totalMediciones) { + for (var entry : mediciones.entrySet()) { + var medicion = totalMediciones.get(entry.getKey()); + if (medicion == null) { + totalMediciones.put(entry.getKey(), entry.getValue()); + } + else { + var otraMed = entry.getValue(); + medicion.update(otraMed.count, otraMed.tempMin, otraMed.tempMax, otraMed.tempSum); + } + } + } + }); + tareas.add(hilo); + } + for (var hilo : tareas) { hilo.join(); } - public void run() { - String linea; - int pos; - while ((linea = listaLineas.poll()) != null) { - pos = linea.indexOf(";"); - var estacion = linea.substring(0, pos); - var temp = Double.parseDouble(linea.substring(pos + 1)); - var medicion = mediciones.get(estacion); - if (medicion == null) { - medicion = new Medicion(estacion, 1, temp, temp, temp); - mediciones.put(estacion, medicion); - } - else { - medicion.update(1, temp, temp, temp); - } - } + Comparator> comparar = (a, b) -> { + return a.getValue().estacion.compareTo(b.getValue().estacion); + }; + + var result = totalMediciones.entrySet().stream()// + .sorted(comparar)// + .map(e -> e.getValue().toString())// + .collect(Collectors.joining(", ")); + + System.out.println("{" + result + "}"); + if (DEBUG) { + System.out.println("Total: " + (System.nanoTime() - startTime) / 1000000 + "ms"); } - public Map getMediciones() { - return mediciones; + } + + /* + * + */ + public static double strToDouble(byte linea[], int posSeparator, int len) { + double number[] = { 0, 0 }; + int pos = 0; + boolean esNegativo = false; + for (int i = posSeparator + 1; i < len; i++) { + switch (linea[i]) { + case '0', '1', '2', '3', '4': + case '5', '6', '7', '8', '9': + number[pos] = number[pos] * 10; + number[pos] = number[pos] + (linea[i] - 48); + break; + case '-': + esNegativo = true; + break; + case '.': + pos = 1; + break; + } + } + double num = number[0]; + if (number[1] > 0) { + num += (number[1] / 10); + } + if (esNegativo) { + num = num * -1; + } + return num; + } + + /* + * + */ + static class MiArchivo implements AutoCloseable { + private final RandomAccessFile rFile; + private final byte buffer[] = new byte[MAX_BUFFER]; + private final byte line[] = new byte[MAX_LENGTH_LINE]; + private final byte rest[] = new byte[MAX_LENGTH_LINE]; + private int lenRest = 0; + private long maxRead = 0; + private long totalRead = 0; + private Queue tuples = new LinkedList(); + + public MiArchivo(File file) throws IOException { + rFile = new RandomAccessFile(file, "r"); + } + + public void seek(Particion particion) throws IOException { + maxRead = particion.size; + rFile.seek(particion.offset); + } + + @Override + public void close() throws IOException { + rFile.close(); + } + + public Queue readTuples() throws IOException { + if (totalRead == maxRead) { + return tuples; + } + long numBytes = rFile.read(buffer); + if (numBytes == -1) { + return tuples; + } + var totalLeidos = totalRead + numBytes; + if (totalLeidos > maxRead) { + numBytes = maxRead - totalRead; + } + totalRead += numBytes; + int pos = 0; + int len = 0; + int idx = 0; + while (pos < numBytes) { + if (buffer[pos] == '\n' || buffer[pos] == '\r') { + if (lenRest > 0) { + System.arraycopy(rest, 0, line, 0, lenRest); + System.arraycopy(buffer, idx, line, lenRest, len); + len += lenRest; + lenRest = 0; + } + else { + System.arraycopy(buffer, idx, line, 0, len); + } + int semicolon = 0; + for (int i = 0; i < len; i++) { + if (line[i] == ';') { + semicolon = i; + break; + } + } + var temperatura = strToDouble(line, semicolon, len); + var tupla = new Tupla(new String(line, 0, semicolon), temperatura); + tuples.add(tupla); + idx = pos + 1; + len = 0; + } + else { + len++; + } + pos++; + } + if (len > 0) { + System.arraycopy(buffer, idx, rest, 0, len); + lenRest = len; + } + return tuples; } } /* * */ - static class Medicion implements Comparable { + static class Medicion { private String estacion; private int count; private double tempMin; @@ -154,13 +306,7 @@ public class CalculateAverage_jgrateron { @Override public String toString() { double tempPro = tempSum / count; - return "%.1f/%.1f/%.1f".formatted(tempMin, tempPro, tempMax); - } - - @Override - public int compareTo(Medicion medicion) { - return estacion.compareTo(medicion.estacion); + return "%s=%.1f/%.1f/%.1f".formatted(estacion, tempMin, tempPro, tempMax); } } - }