From 4265c7e9a87f89c4a5882e4b45f65ff2679d3792 Mon Sep 17 00:00:00 2001 From: jairo Date: Sat, 13 Jan 2024 11:41:26 -0400 Subject: [PATCH] simultaneous reading and calculation, reduction of memory consumption, hashing for the stations --- .../onebrc/CalculateAverage_jgrateron.java | 327 +++++++++++------- 1 file changed, 198 insertions(+), 129 deletions(-) diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_jgrateron.java b/src/main/java/dev/morling/onebrc/CalculateAverage_jgrateron.java index 251f26d..4886508 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_jgrateron.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_jgrateron.java @@ -22,10 +22,9 @@ import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Locale; -import java.util.Queue; +import java.util.Map; import java.util.Map.Entry; import java.util.stream.Collectors; @@ -38,11 +37,9 @@ public class CalculateAverage_jgrateron { public record Particion(long offset, long size) { } - public record Tupla(String str, double num) { - } - /* - * + * Divide el archivo segun el nro de cores de la PC + * La division se debe recalcular hasta encontrar un \n o \r (enter o return) */ public static List dividirArchivo(File archivo) throws IOException { var particiones = new ArrayList(); @@ -86,39 +83,33 @@ public class CalculateAverage_jgrateron { return particiones; } + /* + * cambiar el locale para que el separador decimal sea punto y no coma + * crear un hilo por cada particion + * totalizar las mediciones por cada hilo + * ordenar y mostrar + */ public static void main(String[] args) throws InterruptedException, IOException { Locale.setDefault(Locale.US); var startTime = System.nanoTime(); var archivo = new File(FILE); - var totalMediciones = new HashMap(); + var totalMediciones = new HashMap(); var tareas = new ArrayList(); var particiones = dividirArchivo(archivo); for (var p : particiones) { var hilo = Thread.ofVirtual().start(() -> { - var mediciones = new HashMap(); - try (var miArchivo = new MiArchivo(archivo)) { - miArchivo.seek(p); - for (;;) { - var tuples = miArchivo.readTuples(); - if (tuples.isEmpty()) { - break; - } - for (;;) { - var tuple = tuples.poll(); - if (tuple == null) { - break; - } - var estacion = tuple.str; - var temp = tuple.num; - var hashCode = estacion.hashCode(); - var medicion = mediciones.get(hashCode); + try (var miTarea = new MiTarea(archivo, p)) { + var mediciones = miTarea.calcularMediciones(); + synchronized (totalMediciones) { + for (var entry : mediciones.entrySet()) { + var medicion = totalMediciones.get(entry.getKey()); if (medicion == null) { - medicion = new Medicion(estacion, 1, temp, temp, temp); - mediciones.put(hashCode, medicion); + totalMediciones.put(entry.getKey(), entry.getValue()); } else { - medicion.update(1, temp, temp, temp); + var otraMed = entry.getValue(); + medicion.update(otraMed.count, otraMed.tempMin, otraMed.tempMax, otraMed.tempSum); } } } @@ -126,18 +117,6 @@ public class CalculateAverage_jgrateron { catch (IOException e) { System.exit(-1); } - synchronized (totalMediciones) { - for (var entry : mediciones.entrySet()) { - var medicion = totalMediciones.get(entry.getKey()); - if (medicion == null) { - totalMediciones.put(entry.getKey(), entry.getValue()); - } - else { - var otraMed = entry.getValue(); - medicion.update(otraMed.count, otraMed.tempMin, otraMed.tempMax, otraMed.tempSum); - } - } - } }); tareas.add(hilo); } @@ -145,72 +124,69 @@ public class CalculateAverage_jgrateron { hilo.join(); } - Comparator> comparar = (a, b) -> { - return a.getValue().estacion.compareTo(b.getValue().estacion); + Comparator> comparar = (a, b) -> { + return a.getKey().compareTo(b.getKey()); }; - var result = totalMediciones.entrySet().stream()// .sorted(comparar)// - .map(e -> e.getValue().toString())// + .map(e -> e.getKey() + "=" + e.getValue().toString())// .collect(Collectors.joining(", ")); System.out.println("{" + result + "}"); if (DEBUG) { System.out.println("Total: " + (System.nanoTime() - startTime) / 1000000 + "ms"); } - } /* - * + * Clase Index para reutilizar al realizar un get en el Map */ - public static double strToDouble(byte linea[], int posSeparator, int len) { - double number[] = { 0, 0 }; - int pos = 0; - boolean esNegativo = false; - for (int i = posSeparator + 1; i < len; i++) { - switch (linea[i]) { - case '0', '1', '2', '3', '4': - case '5', '6', '7', '8', '9': - number[pos] = number[pos] * 10; - number[pos] = number[pos] + (linea[i] - 48); - break; - case '-': - esNegativo = true; - break; - case '.': - pos = 1; - break; + static class Index { + private int hash; + + public Index() { + this.hash = 0; + } + + public Index(int hash) { + this.hash = hash; + } + + public void setHash(int hash) { + this.hash = hash; + } + + @Override + public int hashCode() { + return hash; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; } + var otro = (Index) obj; + return this.hash == otro.hash; } - double num = number[0]; - if (number[1] > 0) { - num += (number[1] / 10); - } - if (esNegativo) { - num = num * -1; - } - return num; } /* + * Clase para procesar el archivo a la particion que corresponde + * RandomAccessFile permite dezplazar el puntero de lectura del archivo + * Tenemos un Map para guardar las estadisticas y un map para guardar los + * nombres de las estaciones * */ - static class MiArchivo implements AutoCloseable { + static class MiTarea implements AutoCloseable { private final RandomAccessFile rFile; - private final byte buffer[] = new byte[MAX_BUFFER]; - private final byte line[] = new byte[MAX_LENGTH_LINE]; - private final byte rest[] = new byte[MAX_LENGTH_LINE]; - private int lenRest = 0; - private long maxRead = 0; - private long totalRead = 0; - private Queue tuples = new LinkedList(); + private long maxRead; + private Index index = new Index(); + private Map mediciones = new HashMap<>(); + private Map estaciones = new HashMap<>(); - public MiArchivo(File file) throws IOException { + public MiTarea(File file, Particion particion) throws IOException { rFile = new RandomAccessFile(file, "r"); - } - - public void seek(Particion particion) throws IOException { maxRead = particion.size; rFile.seek(particion.offset); } @@ -220,72 +196,164 @@ public class CalculateAverage_jgrateron { rFile.close(); } - public Queue readTuples() throws IOException { - if (totalRead == maxRead) { - return tuples; - } - long numBytes = rFile.read(buffer); - if (numBytes == -1) { - return tuples; - } - var totalLeidos = totalRead + numBytes; - if (totalLeidos > maxRead) { - numBytes = maxRead - totalRead; - } - totalRead += numBytes; - int pos = 0; - int len = 0; - int idx = 0; - while (pos < numBytes) { - if (buffer[pos] == '\n' || buffer[pos] == '\r') { - if (lenRest > 0) { - System.arraycopy(rest, 0, line, 0, lenRest); - System.arraycopy(buffer, idx, line, lenRest, len); - len += lenRest; - lenRest = 0; + /* + * Lee solo su particion + * Divide el buffer por lineas usando los separadores \n o \r (enter o return) + * obtiene la posicion de separacion ";" de la estacion y su temperatura + * calcula el hash, convierte a double y actualiza las estadisticas + */ + public Map calcularMediciones() throws IOException { + var buffer = new byte[MAX_BUFFER];// buffer para lectura en el archivo + var rest = new byte[MAX_LENGTH_LINE];// Resto que sobra en cada lectura del buffer + var lenRest = 0;// Longitud que sobrĂ³ en cada lectura del buffer + var totalRead = 0l; // Total bytes leidos + + for (;;) { + if (totalRead == maxRead) { + break; + } + long numBytes = rFile.read(buffer); + if (numBytes == -1) { + break; + } + var totalLeidos = totalRead + numBytes; + if (totalLeidos > maxRead) { + numBytes = maxRead - totalRead; + } + totalRead += numBytes; + int pos = 0; + int len = 0; + int idx = 0; + int semicolon = 0; + while (pos < numBytes) { + if (buffer[pos] == '\n' || buffer[pos] == '\r') { + if (lenRest > 0) { + // concatenamos el sobrante anterior con la nueva linea + System.arraycopy(buffer, idx, rest, lenRest, len); + len += lenRest; + semicolon = buscarSemicolon(rest, len); + lenRest = 0; + updateMediciones(rest, 0, semicolon); + } + else { + updateMediciones(buffer, idx, semicolon); + } + idx = pos + 1; + len = 0; + semicolon = 0; } else { - System.arraycopy(buffer, idx, line, 0, len); - } - int semicolon = 0; - for (int i = 0; i < len; i++) { - if (line[i] == ';') { - semicolon = i; - break; + if (buffer[pos] == ';') { + semicolon = len; } + len++; } - var temperatura = strToDouble(line, semicolon, len); - var tupla = new Tupla(new String(line, 0, semicolon), temperatura); - tuples.add(tupla); - idx = pos + 1; - len = 0; + pos++; } - else { - len++; + if (len > 0) { + System.arraycopy(buffer, idx, rest, 0, len); + lenRest = len; } + } + return transformMediciones(); + } + + /* + * Buscamos en reverso ya que el ; esta mas cerca de numero que la estacion + * ademas el minimo numero 0.0 asi que quitamos tres mas + */ + public int buscarSemicolon(byte data[], int len) { + for (int i = len - 4; i >= 0; i--) { + if (data[i] == ';') { + return i; + } + } + return 0; + } + + /* + * Busca una medicion por su hash y crea o actualiza la temperatura + */ + public void updateMediciones(byte data[], int pos, int semicolon) { + var hashEstacion = calcHashCode(0, data, pos, semicolon); + var temp = strToDouble(data, pos, semicolon); + index.setHash(hashEstacion); + var estacion = estaciones.get(index); + if (estacion == null) { + estacion = new String(data, pos, semicolon); + estaciones.put(new Index(hashEstacion), estacion); + } + index.setHash(hashEstacion); + var medicion = mediciones.get(index); + if (medicion == null) { + medicion = new Medicion(1, temp, temp, temp); + mediciones.put(new Index(hashEstacion), medicion); + } + else { + medicion.update(1, temp, temp, temp); + } + } + + /* + * Convierte las estaciones de hash a string + */ + private Map transformMediciones() { + var newMediciones = new HashMap(); + for (var e : mediciones.entrySet()) { + var estacion = estaciones.get(e.getKey()); + var medicion = e.getValue(); + newMediciones.put(estacion, medicion); + } + return newMediciones; + } + + /* + * Calcula el hash de cada estacion, esto es una copia de java.internal.hashcode + */ + private int calcHashCode(int result, byte[] a, int fromIndex, int length) { + int end = fromIndex + length; + for (int i = fromIndex; i < end; i++) { + result = 31 * result + a[i]; + } + return result; + } + + /* + * convierte de un arreglo de bytes a double + */ + public double strToDouble(byte linea[], int idx, int posSeparator) { + double number = 0; + int pos = idx + posSeparator + 1; + int esNegativo = linea[pos] == '-' ? -1 : 1; + if (esNegativo == -1) { pos++; } - if (len > 0) { - System.arraycopy(buffer, idx, rest, 0, len); - lenRest = len; + int digit1 = linea[pos] - 48; + pos++; + if (linea[pos] == '.') { + pos++; + number = (digit1 * 10) + (linea[pos] - 48); } - return tuples; + else { + int digit2 = linea[pos] - 48; + pos += 2; + number = (digit1 * 100) + (digit2 * 10) + (linea[pos] - 48); + } + return number / 10 * esNegativo; } } /* - * + * Clase para reservar las estadisticas por estacion */ static class Medicion { - private String estacion; private int count; private double tempMin; private double tempMax; private double tempSum; - public Medicion(String estacion, int count, double tempMin, double tempMax, double tempSum) { + public Medicion(int count, double tempMin, double tempMax, double tempSum) { super(); - this.estacion = estacion; this.count = count; this.tempMin = tempMin; this.tempMax = tempMax; @@ -305,8 +373,9 @@ public class CalculateAverage_jgrateron { @Override public String toString() { - double tempPro = tempSum / count; - return "%s=%.1f/%.1f/%.1f".formatted(estacion, tempMin, tempPro, tempMax); + double tempPro = (double) tempSum; + tempPro = tempPro / count; + return "%.1f/%.1f/%.1f".formatted(tempMin, tempPro, tempMax); } } }