ddimtirov - single-threaded datastructures tuning - reading to char buffers, one pass, no allocation processing
This commit is contained in:
		
				
					committed by
					
						 Gunnar Morling
						Gunnar Morling
					
				
			
			
				
	
			
			
			
						parent
						
							2458f056d6
						
					
				
				
					commit
					57cfa54c68
				
			| @@ -15,117 +15,232 @@ | ||||
|  */ | ||||
| package dev.morling.onebrc; | ||||
|  | ||||
| import java.io.BufferedReader; | ||||
| import java.io.IOException; | ||||
| import java.io.InputStreamReader; | ||||
| import java.io.*; | ||||
| import java.nio.CharBuffer; | ||||
| import java.nio.charset.StandardCharsets; | ||||
| import java.nio.file.Files; | ||||
| import java.nio.file.Path; | ||||
| import java.time.Duration; | ||||
| import java.time.Instant; | ||||
| import java.util.Map; | ||||
| import java.util.TreeMap; | ||||
| import java.util.stream.Collector; | ||||
|  | ||||
| import static java.util.stream.Collectors.groupingByConcurrent; | ||||
|  | ||||
| // gunnar morling - 2:10 | ||||
| // roy van rijn -   1:01 | ||||
| //                  0:53 | ||||
| //                  0:37 | ||||
|  | ||||
| public class CalculateAverage_ddimtirov { | ||||
|     public record InputLine(String station, int value) { | ||||
|  | ||||
|         public static InputLine fromLine(String line) { | ||||
|             int endOfText = line.indexOf(";"); | ||||
|  | ||||
|             String station = line.substring(0, endOfText); | ||||
|  | ||||
|             int startOfWhole = endOfText + 1; | ||||
|             int sign; | ||||
|             if (line.charAt(startOfWhole) == '-') { | ||||
|                 sign = -1; | ||||
|                 startOfWhole++; | ||||
|             } else { | ||||
|                 sign = 1; | ||||
|             } | ||||
|  | ||||
|             int endOfWhole = line.lastIndexOf("."); | ||||
|             var whole = unsafeParsePositiveInt(line, startOfWhole, endOfWhole); | ||||
|             var decimal = unsafeParsePositiveInt(line,endOfWhole+1, line.length()); | ||||
|             int fixpoint10 = (whole * 10  + decimal) * sign; | ||||
|  | ||||
|             return new InputLine(station, fixpoint10); | ||||
|         } | ||||
|  | ||||
|         static int unsafeParsePositiveInt(String s, int start, int end) { | ||||
|             int acc = 0; | ||||
|             for (int i = start; i<end; i++) { | ||||
|                 if (acc != 0) acc *= 10; | ||||
|                 char c = s.charAt(i); | ||||
|                 var v = c - '0'; | ||||
|                 assert v>=0 && v<=9 : String.format("Character '%s', value %,d", c, v); | ||||
|                 acc += v; | ||||
|             } | ||||
|             return acc; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     private static class OutputMetrics { | ||||
|         private int min = Integer.MAX_VALUE; | ||||
|         private int max = Integer.MIN_VALUE; | ||||
|         private long sum; | ||||
|         private long count; | ||||
|  | ||||
|         @SuppressWarnings("ManualMinMaxCalculation") | ||||
|         public OutputMetrics combine(OutputMetrics o) { | ||||
|             var r = new OutputMetrics(); | ||||
|             r.min = min < o.min ? min : o.min; | ||||
|             r.max = max > o.max ? max : o.max; | ||||
|             r.sum = sum + o.sum; | ||||
|             r.count = count + o.count; | ||||
|             return r; | ||||
|         } | ||||
|  | ||||
|         public void accumulate(InputLine m) { | ||||
|             if (m.value < min) min = m.value; | ||||
|             if (m.value > max) max = m.value; | ||||
|             sum += m.value; | ||||
|             count++; | ||||
|         } | ||||
|  | ||||
|         @Override | ||||
|         public String toString() { | ||||
|             var min = this.min / 10.0; | ||||
|             var mean = Math.round(this.sum / (double) count) / 10.0; | ||||
|             var max = this.max / 10.0; | ||||
|             return min + "/" + mean + "/" + max; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     @SuppressWarnings("RedundantSuppression") | ||||
|     public static void main(String[] args) throws IOException { | ||||
| //         var start = Instant.now(); | ||||
| //        var start = Instant.now(); | ||||
|         Instant start = null; | ||||
|         var path = Path.of("./measurements.txt"); | ||||
|         var buffer = 8192 * 20; | ||||
|         var bufferSize = 512 * 64; // 64 blocks | ||||
|         var tracker = new Tracker(); | ||||
|         var charset = StandardCharsets.UTF_8; | ||||
|  | ||||
|         // Files.lines() is optimized for files that can be indexed by int | ||||
|         // For larger files it falls back to buffered reader, which we now | ||||
|         // use directly to be able to tweak the buffer size. | ||||
|         try (var reader = new BufferedReader(new InputStreamReader(Files.newInputStream(path)), buffer)) { | ||||
|             var stationsToMetrics = reader.lines() | ||||
|                     .map(InputLine::fromLine) | ||||
|                     .parallel() | ||||
|                     .collect(groupingByConcurrent(InputLine::station, Collector.of( | ||||
|                             OutputMetrics::new, | ||||
|                             OutputMetrics::accumulate, | ||||
|                             OutputMetrics::combine, | ||||
|                             OutputMetrics::toString | ||||
|                     ))); | ||||
|             System.out.println(new TreeMap<>(stationsToMetrics)); | ||||
|             assert Files.readAllLines(Path.of("expected_result.txt")).getFirst().equals(new TreeMap<>(stationsToMetrics).toString()); | ||||
|         try (var stream = Files.newInputStream(path); var reader = new InputStreamReader(stream, charset)) { | ||||
|             var buffered = new RecordReader(reader, bufferSize); | ||||
|  | ||||
|             InputRecord record = null; | ||||
|             while (true) { | ||||
|                 record = buffered.readRecord(record); | ||||
|                 if (record==null) break; | ||||
|  | ||||
|                 tracker.process(record); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         System.out.println(tracker.stationsToMetrics()); | ||||
|  | ||||
|         //noinspection ConstantValue | ||||
|         if (start!=null) System.err.println(Duration.between(start, Instant.now())); | ||||
|  | ||||
|         assert Files.readAllLines(Path.of("expected_result.txt")).getFirst().equals(tracker.stationsToMetrics().toString()); | ||||
|     } | ||||
|  | ||||
|     /** | ||||
|      * <p>Reads records we can use to do the filtering from a stream of characters. | ||||
|      * Takes care of framing, and I/O buffering. | ||||
|      * This class in combination with {@link RecordReader} allows us to fully hide the | ||||
|      * I/O and internal representation. | ||||
|      * | ||||
|      * <p>If used with recycled {@link InputRecord} instances, this class allocates no memory | ||||
|      * after instantiation. The class is stateful, and the internals are not threadsafe | ||||
|      * - use from single thread or with proper synchronization. | ||||
|      */ | ||||
|     static class RecordReader { | ||||
|         /** | ||||
|          * The source of input data | ||||
|          */ | ||||
|         private final Readable input; | ||||
|  | ||||
|         /** | ||||
|          * Used for i/o buffering and record parsing. | ||||
|          * @see #buf | ||||
|          */ | ||||
|         private final CharBuffer buffer; | ||||
|  | ||||
|         /** | ||||
|          * <p>Cached backing array from {@link #buffer}. | ||||
|          * <p>This is optimization because {@link CharBuffer#array()} was showing on the CPU profile. | ||||
|          */ | ||||
|         private final char[] buf; | ||||
|  | ||||
|         public RecordReader(Readable input, int bufferSize) { | ||||
|             this.input = input; | ||||
|             buffer = CharBuffer.allocate(bufferSize).flip(); | ||||
|             buf = buffer.array(); | ||||
|         } | ||||
|  | ||||
|         public InputRecord readRecord(InputRecord recycled) throws IOException { | ||||
|             var record = parseRecord(recycled); | ||||
|             if (record!=null) return record; | ||||
|  | ||||
|             if (input.read(buffer.compact())==-1) return null; | ||||
|             buffer.flip(); | ||||
|  | ||||
|             return parseRecord(recycled); | ||||
|         } | ||||
|  | ||||
|  | ||||
|         private InputRecord parseRecord(InputRecord recycled) { | ||||
|             var lim = buffer.limit(); | ||||
|             if (buffer.isEmpty()) return null; | ||||
|  | ||||
|             var nameOff = buffer.position(); | ||||
|             var buff = buf; | ||||
|             while (buff[nameOff]=='\n' || buff[nameOff]=='\r' || buff[nameOff]==' ' ) { | ||||
|                 nameOff++; | ||||
|                 if (nameOff>=lim) return null; | ||||
|             } | ||||
|  | ||||
|             var nameHash = 0; | ||||
|             var nameLen = 0; | ||||
|             while (buff[nameOff+nameLen]!=';') { | ||||
|                 nameHash = nameHash*31 + buff[nameOff+nameLen]; | ||||
|                 nameLen++; | ||||
|                 if (nameOff+nameLen>=lim) return null; | ||||
|             } | ||||
|  | ||||
|             //noinspection DuplicateExpressions | ||||
|             assert new String(buf, nameOff, nameLen).hashCode()==nameHash | ||||
|                  : "'%s'@%d !-> %d".formatted(new String(buf, nameOff, nameLen), new String(buf, nameOff, nameLen).hashCode(), nameHash); | ||||
|  | ||||
|             var valCursor = nameOff + nameLen +1; | ||||
|             int signum = 1; | ||||
|             var acc = 0; | ||||
|             while (true) { | ||||
|                 if (valCursor >= lim) { | ||||
|                     return null; | ||||
|                 } | ||||
|                 char c = buff[valCursor++]; | ||||
|                 if (c == '\n' || c == '\r') { | ||||
|                     break; | ||||
|                 } | ||||
|                 if (c=='.') continue; | ||||
|                 if (acc == 0) { | ||||
|                     if (c == '-') { | ||||
|                         signum = -1; | ||||
|                         continue; | ||||
|                     } | ||||
|                 } else { | ||||
|                     acc *= 10; | ||||
|                 } | ||||
|                 var v = c - '0'; | ||||
|                 assert v>=0 && v<=9 : String.format("Character '%s', value %,d", c, v); | ||||
|                 acc += v; | ||||
|             } | ||||
|  | ||||
|             buffer.position(valCursor); | ||||
|  | ||||
|             var record = recycled!=null ? recycled : new InputRecord(buf); | ||||
|             record.init(nameHash, nameOff, nameLen, acc*signum); | ||||
|             return record; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|  | ||||
|     static class Tracker { | ||||
|         private static final int ADDRESS_NO_CLASH_MODULUS = 49999; | ||||
|         private static final int OFFSET_MIN = 0; | ||||
|         private static final int OFFSET_MAX = 1; | ||||
|         private static final int OFFSET_COUNT = 2; | ||||
|  | ||||
|         private final int[] minMaxCount = new int[ADDRESS_NO_CLASH_MODULUS * 3]; | ||||
|         private final long[] sums = new long[ADDRESS_NO_CLASH_MODULUS]; | ||||
|         private final String[] names = new String[ADDRESS_NO_CLASH_MODULUS]; | ||||
|  | ||||
|         public void process(InputRecord r) { | ||||
|             var i = Math.abs(r.nameHash) % ADDRESS_NO_CLASH_MODULUS; | ||||
|  | ||||
|             if (names[i]==null) names[i] = r.name(); | ||||
|  | ||||
|             sums[i] += r.value; | ||||
|  | ||||
|             int mmcIndex = i * 3; | ||||
|             var min = minMaxCount[mmcIndex + OFFSET_MIN]; | ||||
|             var max = minMaxCount[mmcIndex + OFFSET_MAX]; | ||||
|             if (r.value < min) minMaxCount[mmcIndex + OFFSET_MIN] = r.value; | ||||
|             if (r.value > max) minMaxCount[mmcIndex + OFFSET_MAX] = r.value; | ||||
|  | ||||
|             minMaxCount[mmcIndex + OFFSET_COUNT]++; | ||||
|         } | ||||
|  | ||||
|  | ||||
|  | ||||
|         public Map<String, String> stationsToMetrics() { | ||||
|             var m = new TreeMap<String, String>(); | ||||
|             for (int i = 0; i < names.length; i++) { | ||||
|                 var name = names[i]; | ||||
|                 if (name==null) continue; | ||||
|  | ||||
|                 var min = minMaxCount[i*3] / 10.0; | ||||
|                 var max = minMaxCount[i*3+1] / 10.0; | ||||
|                 var count = minMaxCount[i*3+2]; | ||||
|                 var sum = sums[i]; | ||||
|                 var mean = Math.round((double) sum / count) / 10.0; | ||||
|  | ||||
|                 m.put(name, min + "/" + mean + "/" + max); | ||||
|             } | ||||
|             return m; | ||||
|         } | ||||
|  | ||||
|     } | ||||
|  | ||||
|     static class InputRecord { | ||||
|         private final char[] chars; | ||||
|         private int idOffset; | ||||
|         private int idLength; | ||||
|  | ||||
|         public int value; // fixpoint scaled by 10 | ||||
|         public int nameHash; | ||||
|  | ||||
|         public InputRecord(char[] chars) { | ||||
|             this.chars = chars; | ||||
|         } | ||||
|  | ||||
|         public void init(int nameHash, int nameOffset, int nameLength, int fixpointValue) { | ||||
|             assert nameOffset+nameLength<chars.length : String.format("idOffset+idLength=%d < chars.length=%d", nameOffset+nameLength, chars.length); | ||||
|  | ||||
|             this.idOffset = nameOffset; | ||||
|             this.idLength = nameLength; | ||||
|             this.value = fixpointValue; | ||||
|             this.nameHash = nameHash; | ||||
|         } | ||||
|  | ||||
|         public String name() { | ||||
|             return new String(chars, idOffset, idLength); | ||||
|         } | ||||
|  | ||||
|         @Override | ||||
|         public String toString() { | ||||
|             return name() + ";" + (value / 10.0); | ||||
|         } | ||||
|     } | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user