simultaneous reading and calculation, reduction of memory consumption, hashing for the stations

This commit is contained in:
jairo 2024-01-13 11:41:26 -04:00 committed by Gunnar Morling
parent b2a4b73c59
commit 4265c7e9a8

View File

@ -22,10 +22,9 @@ import java.io.RandomAccessFile;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Queue; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -38,11 +37,9 @@ public class CalculateAverage_jgrateron {
public record Particion(long offset, long size) { public record Particion(long offset, long size) {
} }
public record Tupla(String str, double num) {
}
/* /*
* * Divide el archivo segun el nro de cores de la PC
* La division se debe recalcular hasta encontrar un \n o \r (enter o return)
*/ */
public static List<Particion> dividirArchivo(File archivo) throws IOException { public static List<Particion> dividirArchivo(File archivo) throws IOException {
var particiones = new ArrayList<Particion>(); var particiones = new ArrayList<Particion>();
@ -86,46 +83,24 @@ public class CalculateAverage_jgrateron {
return particiones; return particiones;
} }
/*
* cambiar el locale para que el separador decimal sea punto y no coma
* crear un hilo por cada particion
* totalizar las mediciones por cada hilo
* ordenar y mostrar
*/
public static void main(String[] args) throws InterruptedException, IOException { public static void main(String[] args) throws InterruptedException, IOException {
Locale.setDefault(Locale.US); Locale.setDefault(Locale.US);
var startTime = System.nanoTime(); var startTime = System.nanoTime();
var archivo = new File(FILE); var archivo = new File(FILE);
var totalMediciones = new HashMap<Integer, Medicion>(); var totalMediciones = new HashMap<String, Medicion>();
var tareas = new ArrayList<Thread>(); var tareas = new ArrayList<Thread>();
var particiones = dividirArchivo(archivo); var particiones = dividirArchivo(archivo);
for (var p : particiones) { for (var p : particiones) {
var hilo = Thread.ofVirtual().start(() -> { var hilo = Thread.ofVirtual().start(() -> {
var mediciones = new HashMap<Integer, Medicion>(); try (var miTarea = new MiTarea(archivo, p)) {
try (var miArchivo = new MiArchivo(archivo)) { var mediciones = miTarea.calcularMediciones();
miArchivo.seek(p);
for (;;) {
var tuples = miArchivo.readTuples();
if (tuples.isEmpty()) {
break;
}
for (;;) {
var tuple = tuples.poll();
if (tuple == null) {
break;
}
var estacion = tuple.str;
var temp = tuple.num;
var hashCode = estacion.hashCode();
var medicion = mediciones.get(hashCode);
if (medicion == null) {
medicion = new Medicion(estacion, 1, temp, temp, temp);
mediciones.put(hashCode, medicion);
}
else {
medicion.update(1, temp, temp, temp);
}
}
}
}
catch (IOException e) {
System.exit(-1);
}
synchronized (totalMediciones) { synchronized (totalMediciones) {
for (var entry : mediciones.entrySet()) { for (var entry : mediciones.entrySet()) {
var medicion = totalMediciones.get(entry.getKey()); var medicion = totalMediciones.get(entry.getKey());
@ -138,6 +113,10 @@ public class CalculateAverage_jgrateron {
} }
} }
} }
}
catch (IOException e) {
System.exit(-1);
}
}); });
tareas.add(hilo); tareas.add(hilo);
} }
@ -145,72 +124,69 @@ public class CalculateAverage_jgrateron {
hilo.join(); hilo.join();
} }
Comparator<Entry<Integer, Medicion>> comparar = (a, b) -> { Comparator<Entry<String, Medicion>> comparar = (a, b) -> {
return a.getValue().estacion.compareTo(b.getValue().estacion); return a.getKey().compareTo(b.getKey());
}; };
var result = totalMediciones.entrySet().stream()// var result = totalMediciones.entrySet().stream()//
.sorted(comparar)// .sorted(comparar)//
.map(e -> e.getValue().toString())// .map(e -> e.getKey() + "=" + e.getValue().toString())//
.collect(Collectors.joining(", ")); .collect(Collectors.joining(", "));
System.out.println("{" + result + "}"); System.out.println("{" + result + "}");
if (DEBUG) { if (DEBUG) {
System.out.println("Total: " + (System.nanoTime() - startTime) / 1000000 + "ms"); System.out.println("Total: " + (System.nanoTime() - startTime) / 1000000 + "ms");
} }
} }
/* /*
* * Clase Index para reutilizar al realizar un get en el Map
*/ */
public static double strToDouble(byte linea[], int posSeparator, int len) { static class Index {
double number[] = { 0, 0 }; private int hash;
int pos = 0;
boolean esNegativo = false; public Index() {
for (int i = posSeparator + 1; i < len; i++) { this.hash = 0;
switch (linea[i]) {
case '0', '1', '2', '3', '4':
case '5', '6', '7', '8', '9':
number[pos] = number[pos] * 10;
number[pos] = number[pos] + (linea[i] - 48);
break;
case '-':
esNegativo = true;
break;
case '.':
pos = 1;
break;
} }
public Index(int hash) {
this.hash = hash;
} }
double num = number[0];
if (number[1] > 0) { public void setHash(int hash) {
num += (number[1] / 10); this.hash = hash;
} }
if (esNegativo) {
num = num * -1; @Override
public int hashCode() {
return hash;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
var otro = (Index) obj;
return this.hash == otro.hash;
} }
return num;
} }
/* /*
* Clase para procesar el archivo a la particion que corresponde
* RandomAccessFile permite dezplazar el puntero de lectura del archivo
* Tenemos un Map para guardar las estadisticas y un map para guardar los
* nombres de las estaciones
* *
*/ */
static class MiArchivo implements AutoCloseable { static class MiTarea implements AutoCloseable {
private final RandomAccessFile rFile; private final RandomAccessFile rFile;
private final byte buffer[] = new byte[MAX_BUFFER]; private long maxRead;
private final byte line[] = new byte[MAX_LENGTH_LINE]; private Index index = new Index();
private final byte rest[] = new byte[MAX_LENGTH_LINE]; private Map<Index, Medicion> mediciones = new HashMap<>();
private int lenRest = 0; private Map<Index, String> estaciones = new HashMap<>();
private long maxRead = 0;
private long totalRead = 0;
private Queue<Tupla> tuples = new LinkedList<Tupla>();
public MiArchivo(File file) throws IOException { public MiTarea(File file, Particion particion) throws IOException {
rFile = new RandomAccessFile(file, "r"); rFile = new RandomAccessFile(file, "r");
}
public void seek(Particion particion) throws IOException {
maxRead = particion.size; maxRead = particion.size;
rFile.seek(particion.offset); rFile.seek(particion.offset);
} }
@ -220,13 +196,25 @@ public class CalculateAverage_jgrateron {
rFile.close(); rFile.close();
} }
public Queue<Tupla> readTuples() throws IOException { /*
* Lee solo su particion
* Divide el buffer por lineas usando los separadores \n o \r (enter o return)
* obtiene la posicion de separacion ";" de la estacion y su temperatura
* calcula el hash, convierte a double y actualiza las estadisticas
*/
public Map<String, Medicion> calcularMediciones() throws IOException {
var buffer = new byte[MAX_BUFFER];// buffer para lectura en el archivo
var rest = new byte[MAX_LENGTH_LINE];// Resto que sobra en cada lectura del buffer
var lenRest = 0;// Longitud que sobró en cada lectura del buffer
var totalRead = 0l; // Total bytes leidos
for (;;) {
if (totalRead == maxRead) { if (totalRead == maxRead) {
return tuples; break;
} }
long numBytes = rFile.read(buffer); long numBytes = rFile.read(buffer);
if (numBytes == -1) { if (numBytes == -1) {
return tuples; break;
} }
var totalLeidos = totalRead + numBytes; var totalLeidos = totalRead + numBytes;
if (totalLeidos > maxRead) { if (totalLeidos > maxRead) {
@ -236,31 +224,28 @@ public class CalculateAverage_jgrateron {
int pos = 0; int pos = 0;
int len = 0; int len = 0;
int idx = 0; int idx = 0;
int semicolon = 0;
while (pos < numBytes) { while (pos < numBytes) {
if (buffer[pos] == '\n' || buffer[pos] == '\r') { if (buffer[pos] == '\n' || buffer[pos] == '\r') {
if (lenRest > 0) { if (lenRest > 0) {
System.arraycopy(rest, 0, line, 0, lenRest); // concatenamos el sobrante anterior con la nueva linea
System.arraycopy(buffer, idx, line, lenRest, len); System.arraycopy(buffer, idx, rest, lenRest, len);
len += lenRest; len += lenRest;
semicolon = buscarSemicolon(rest, len);
lenRest = 0; lenRest = 0;
updateMediciones(rest, 0, semicolon);
} }
else { else {
System.arraycopy(buffer, idx, line, 0, len); updateMediciones(buffer, idx, semicolon);
} }
int semicolon = 0;
for (int i = 0; i < len; i++) {
if (line[i] == ';') {
semicolon = i;
break;
}
}
var temperatura = strToDouble(line, semicolon, len);
var tupla = new Tupla(new String(line, 0, semicolon), temperatura);
tuples.add(tupla);
idx = pos + 1; idx = pos + 1;
len = 0; len = 0;
semicolon = 0;
} }
else { else {
if (buffer[pos] == ';') {
semicolon = len;
}
len++; len++;
} }
pos++; pos++;
@ -269,23 +254,106 @@ public class CalculateAverage_jgrateron {
System.arraycopy(buffer, idx, rest, 0, len); System.arraycopy(buffer, idx, rest, 0, len);
lenRest = len; lenRest = len;
} }
return tuples; }
return transformMediciones();
}
/*
* Buscamos en reverso ya que el ; esta mas cerca de numero que la estacion
* ademas el minimo numero 0.0 asi que quitamos tres mas
*/
public int buscarSemicolon(byte data[], int len) {
for (int i = len - 4; i >= 0; i--) {
if (data[i] == ';') {
return i;
}
}
return 0;
}
/*
* Busca una medicion por su hash y crea o actualiza la temperatura
*/
public void updateMediciones(byte data[], int pos, int semicolon) {
var hashEstacion = calcHashCode(0, data, pos, semicolon);
var temp = strToDouble(data, pos, semicolon);
index.setHash(hashEstacion);
var estacion = estaciones.get(index);
if (estacion == null) {
estacion = new String(data, pos, semicolon);
estaciones.put(new Index(hashEstacion), estacion);
}
index.setHash(hashEstacion);
var medicion = mediciones.get(index);
if (medicion == null) {
medicion = new Medicion(1, temp, temp, temp);
mediciones.put(new Index(hashEstacion), medicion);
}
else {
medicion.update(1, temp, temp, temp);
} }
} }
/* /*
* * Convierte las estaciones de hash a string
*/
private Map<String, Medicion> transformMediciones() {
var newMediciones = new HashMap<String, Medicion>();
for (var e : mediciones.entrySet()) {
var estacion = estaciones.get(e.getKey());
var medicion = e.getValue();
newMediciones.put(estacion, medicion);
}
return newMediciones;
}
/*
* Calcula el hash de cada estacion, esto es una copia de java.internal.hashcode
*/
private int calcHashCode(int result, byte[] a, int fromIndex, int length) {
int end = fromIndex + length;
for (int i = fromIndex; i < end; i++) {
result = 31 * result + a[i];
}
return result;
}
/*
* convierte de un arreglo de bytes a double
*/
public double strToDouble(byte linea[], int idx, int posSeparator) {
double number = 0;
int pos = idx + posSeparator + 1;
int esNegativo = linea[pos] == '-' ? -1 : 1;
if (esNegativo == -1) {
pos++;
}
int digit1 = linea[pos] - 48;
pos++;
if (linea[pos] == '.') {
pos++;
number = (digit1 * 10) + (linea[pos] - 48);
}
else {
int digit2 = linea[pos] - 48;
pos += 2;
number = (digit1 * 100) + (digit2 * 10) + (linea[pos] - 48);
}
return number / 10 * esNegativo;
}
}
/*
* Clase para reservar las estadisticas por estacion
*/ */
static class Medicion { static class Medicion {
private String estacion;
private int count; private int count;
private double tempMin; private double tempMin;
private double tempMax; private double tempMax;
private double tempSum; private double tempSum;
public Medicion(String estacion, int count, double tempMin, double tempMax, double tempSum) { public Medicion(int count, double tempMin, double tempMax, double tempSum) {
super(); super();
this.estacion = estacion;
this.count = count; this.count = count;
this.tempMin = tempMin; this.tempMin = tempMin;
this.tempMax = tempMax; this.tempMax = tempMax;
@ -305,8 +373,9 @@ public class CalculateAverage_jgrateron {
@Override @Override
public String toString() { public String toString() {
double tempPro = tempSum / count; double tempPro = (double) tempSum;
return "%s=%.1f/%.1f/%.1f".formatted(estacion, tempMin, tempPro, tempMax); tempPro = tempPro / count;
return "%.1f/%.1f/%.1f".formatted(tempMin, tempPro, tempMax);
} }
} }
} }