Experiment from entangled90
* single thread memory mapped file reader, pool of processors * cleanup of inner classes of MetricProcessor * doubles are parsed without external functions, strings are lazily created from byte arrays * remove load() MappedByteBuffer in memory * fixed handling of newline * fix a bug & correct locale used * MappedByteBuffer size set to 1MB * fixed rounding * Do not use ArrayBlockingQueue.offer since it drops elements when queue is full * MappedByteBuffer size = 32 MB
This commit is contained in:
		
							
								
								
									
										20
									
								
								calculate_average_entangled90.sh
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										20
									
								
								calculate_average_entangled90.sh
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,20 @@ | ||||
| #!/bin/sh | ||||
| # | ||||
| #  Copyright 2023 The original authors | ||||
| # | ||||
| #  Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| #  you may not use this file except in compliance with the License. | ||||
| #  You may obtain a copy of the License at | ||||
| # | ||||
| #      http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| #  Unless required by applicable law or agreed to in writing, software | ||||
| #  distributed under the License is distributed on an "AS IS" BASIS, | ||||
| #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| #  See the License for the specific language governing permissions and | ||||
| #  limitations under the License. | ||||
| # | ||||
|  | ||||
|  | ||||
| JAVA_OPTS="" | ||||
| time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_entangled90 | ||||
| @@ -0,0 +1,328 @@ | ||||
| /* | ||||
|  *  Copyright 2023 The original authors | ||||
|  * | ||||
|  *  Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  *  you may not use this file except in compliance with the License. | ||||
|  *  You may obtain a copy of the License at | ||||
|  * | ||||
|  *      http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  *  Unless required by applicable law or agreed to in writing, software | ||||
|  *  distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  *  See the License for the specific language governing permissions and | ||||
|  *  limitations under the License. | ||||
|  */ | ||||
| package dev.morling.onebrc; | ||||
|  | ||||
| import java.io.IOException; | ||||
| import java.io.RandomAccessFile; | ||||
| import java.nio.ByteBuffer; | ||||
| import java.nio.MappedByteBuffer; | ||||
| import java.nio.channels.FileChannel; | ||||
| import java.nio.charset.StandardCharsets; | ||||
| import java.util.*; | ||||
| import java.util.concurrent.ArrayBlockingQueue; | ||||
| import java.util.concurrent.CountDownLatch; | ||||
| import java.util.concurrent.TimeUnit; | ||||
| import java.util.function.Consumer; | ||||
| import java.util.stream.Collectors; | ||||
|  | ||||
| import static java.lang.Math.round; | ||||
|  | ||||
| public class CalculateAverage_entangled90 { | ||||
|  | ||||
|     private static final String FILE = "./measurements.txt"; | ||||
|  | ||||
|     public static final boolean DEBUG = false; | ||||
|  | ||||
|     public static void main(String[] args) throws IOException { | ||||
|         Scanner scanner = new Scanner(); | ||||
|         long start = System.currentTimeMillis(); | ||||
|         PooledChunkProcessor chunkProcessor = new PooledChunkProcessor(Runtime.getRuntime().availableProcessors()); | ||||
|         scanner.scan(FILE, chunkProcessor, 0L, -1); | ||||
|         long finish = System.currentTimeMillis(); | ||||
|         var map = chunkProcessor.result(); | ||||
|         map.printResults(); | ||||
|         if (DEBUG) { | ||||
|             System.out.println("Took: " + TimeUnit.MILLISECONDS.toSeconds(finish - start) + "seconds"); | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| class AggregatedProcessor { | ||||
|     public double max = Double.NEGATIVE_INFINITY; | ||||
|     public double min = Double.POSITIVE_INFINITY; | ||||
|     private double sum = 0L; | ||||
|     public long count = 0L; | ||||
|  | ||||
|     public void addMeasure(double value) { | ||||
|         max = Math.max(max, value); | ||||
|         min = Math.min(min, value); | ||||
|         sum += value; | ||||
|         count++; | ||||
|     } | ||||
|  | ||||
|     public void combine(AggregatedProcessor processor) { | ||||
|         max = Math.max(max, processor.max); | ||||
|         min = Math.min(min, processor.min); | ||||
|         sum += processor.sum; | ||||
|         count += processor.count; | ||||
|     } | ||||
|  | ||||
|     public double mean() { | ||||
|         return sum / count; | ||||
|     } | ||||
|  | ||||
|     private static double round(double d) { | ||||
|         return Math.round(d * 10.0) / 10.0; | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public String toString() { | ||||
|         return String.format(Locale.US, "%.1f/%.1f/%.1f", round(min), round(mean()), round(max)); | ||||
|     } | ||||
| } | ||||
|  | ||||
| class ProcessorMap { | ||||
|     Map<BytesWrapper, AggregatedProcessor> processors = new HashMap<>(1024); | ||||
|  | ||||
|     public void printResults() { | ||||
|         System.out.println("Processed: " + processors.entrySet().stream().mapToLong(e -> e.getValue().count).sum()); | ||||
|         System.out.print("{"); | ||||
|         System.out.print( | ||||
|                 processors.entrySet().stream() | ||||
|                         .sorted(Map.Entry.comparingByKey()).map(Object::toString).collect(Collectors.joining(", "))); | ||||
|         System.out.println("}"); | ||||
|     } | ||||
|  | ||||
|     public void addMeasure(BytesWrapper city, double value) { | ||||
|         var processor = processors.get(city); | ||||
|         if (processor == null) { | ||||
|             processor = new AggregatedProcessor(); | ||||
|             processors.put(city, processor); | ||||
|         } | ||||
|         processor.addMeasure(value); | ||||
|     } | ||||
|  | ||||
|     private void combine(BytesWrapper city, AggregatedProcessor processor) { | ||||
|         var thisProcessor = processors.get(city); | ||||
|         if (thisProcessor == null) { | ||||
|             processors.put(city, processor); | ||||
|         } | ||||
|         else { | ||||
|             thisProcessor.combine(processor); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     public static ProcessorMap combineAll(ProcessorMap... processors) { | ||||
|         var result = new ProcessorMap(); | ||||
|         for (var p : processors) { | ||||
|             for (var entry : p.processors.entrySet()) { | ||||
|                 result.combine(entry.getKey(), entry.getValue()); | ||||
|             } | ||||
|         } | ||||
|         return result; | ||||
|     } | ||||
| } | ||||
|  | ||||
| class PooledChunkProcessor implements Consumer<ByteBuffer> { | ||||
|     private final ArrayBlockingQueue<ByteBuffer> queue; | ||||
|     private final ProcessorMap[] results; | ||||
|     private final CountDownLatch latch; | ||||
|  | ||||
|     private volatile boolean queueClosed = false; | ||||
|  | ||||
|     public PooledChunkProcessor(int n) { | ||||
|         queue = new ArrayBlockingQueue<>(4 * 1024); | ||||
|         results = new ProcessorMap[n]; | ||||
|         latch = new CountDownLatch(n); | ||||
|         for (int i = 0; i < n; i++) { | ||||
|             int finalI = i; | ||||
|             var t = new Thread(() -> { | ||||
|                 var processor = new ChunkProcessor(); | ||||
|                 while (!Thread.interrupted()) { | ||||
|                     try { | ||||
|                         var element = queue.poll(1, TimeUnit.MILLISECONDS); | ||||
|                         if (element != null) | ||||
|                             processor.processChunk(element); | ||||
|                         else if (queueClosed) { | ||||
|                             if (CalculateAverage_entangled90.DEBUG) { | ||||
|                                 System.out.println("Queue closed, thread #" + finalI + " stopping"); | ||||
|                             } | ||||
|                             break; | ||||
|                         } | ||||
|                     } | ||||
|                     catch (InterruptedException e) { | ||||
|                         break; | ||||
|                     } | ||||
|                 } | ||||
|                 results[finalI] = processor.processors; | ||||
|                 latch.countDown(); | ||||
|             }); | ||||
|             t.start(); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void accept(ByteBuffer byteBuffer) { | ||||
|         try { | ||||
|             queue.put(byteBuffer); | ||||
|         } | ||||
|         catch (InterruptedException e) { | ||||
|             throw new RuntimeException(e); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     public ProcessorMap result() { | ||||
|         try { | ||||
|             queueClosed = true; | ||||
|             latch.await(); | ||||
|         } | ||||
|         catch (InterruptedException e) { | ||||
|             e.printStackTrace(); | ||||
|         } | ||||
|         return ProcessorMap.combineAll(results); | ||||
|     } | ||||
| } | ||||
|  | ||||
| class ChunkProcessor implements Consumer<ByteBuffer> { | ||||
|     ProcessorMap processors = new ProcessorMap(); | ||||
|  | ||||
|     public void processChunk(ByteBuffer bb) { | ||||
|         while (processRow(bb)) { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public void accept(ByteBuffer byteBuffer) { | ||||
|         processChunk(byteBuffer); | ||||
|     } | ||||
|  | ||||
|     // true if you can continue | ||||
|     // false if input is missing | ||||
|     public boolean processRow(ByteBuffer bb) { | ||||
|         int colonIdx = findSemiColon(bb); | ||||
|         if (colonIdx < 0) { | ||||
|             return false; | ||||
|         } | ||||
|         while (bb.get(bb.position()) == '\n') { | ||||
|             bb.get(); | ||||
|         } | ||||
|         var wrapper = wrapperFromBB(bb, colonIdx - bb.position()); | ||||
|         bb.position(colonIdx + 1); | ||||
|         double value = parseDoubleNewLine(bb); | ||||
|         processors.addMeasure(wrapper, value); | ||||
|         return true; | ||||
|     } | ||||
|  | ||||
|     private static BytesWrapper wrapperFromBB(ByteBuffer bb, int length) { | ||||
|         var bytes = new byte[length]; | ||||
|         bb.get(bytes); | ||||
|         return new BytesWrapper(bytes); | ||||
|     } | ||||
|  | ||||
|     // don't advance bb | ||||
|     private int findSemiColon(ByteBuffer bb) { | ||||
|         for (int i = bb.position(); i < bb.limit(); i++) { | ||||
|             if (bb.get(i) == (byte) ';') | ||||
|                 return i; | ||||
|         } | ||||
|         return -1; | ||||
|     } | ||||
|  | ||||
|     // parses double until new line and advances buffer | ||||
|     private static double parseDoubleNewLine(ByteBuffer bb) { | ||||
|         int result = 0; | ||||
|         int sign = 1; | ||||
|         byte c; | ||||
|         do { | ||||
|             c = bb.get(); | ||||
|             switch (c) { | ||||
|                 case '-': | ||||
|                     sign = -1; | ||||
|                     break; | ||||
|                 case '.', ',', '\r', '\n': | ||||
|                     break; | ||||
|                 default: | ||||
|                     result = result * 10 + (c - '0'); | ||||
|             } | ||||
|         } while (c != '\n' && bb.position() < bb.limit()); | ||||
|         return result * sign / 10.0; | ||||
|     } | ||||
| } | ||||
|  | ||||
| class Scanner { | ||||
|     private static final int MAX_MAPPED_MEMORY = 32 * 1024 * 1024; | ||||
|  | ||||
|     public void scan(String fileName, Consumer<ByteBuffer> consumer, long offset, long len) throws IOException { | ||||
|         try (var file = new RandomAccessFile(fileName, "r"); FileChannel channel = file.getChannel()) { | ||||
|             int chunkId = 0; | ||||
|             if (len < 0) { | ||||
|                 len = file.length(); | ||||
|             } | ||||
|  | ||||
|             while (true) { | ||||
|                 if (offset > 0) { | ||||
|                     file.seek(offset); | ||||
|                 } | ||||
|                 MappedByteBuffer bb = channel.map(FileChannel.MapMode.READ_ONLY, offset, Math.min(MAX_MAPPED_MEMORY, len - offset)); | ||||
|                 var limitIdx = findLastNewLine(bb); | ||||
|                 bb.limit(limitIdx); | ||||
|                 // get last newline from the end | ||||
|                 consumer.accept(bb); | ||||
|                 chunkId++; | ||||
|                 offset += limitIdx; | ||||
|                 if (CalculateAverage_entangled90.DEBUG && chunkId % 10 == 0) { | ||||
|                     System.out.println(" read chunk " + chunkId + " at pointer " + offset + "(" + ((int) (offset * 100 / len)) + "%)"); | ||||
|                 } | ||||
|  | ||||
|                 if (offset >= len - 1) { | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     public int findLastNewLine(ByteBuffer bb) { | ||||
|         for (int i = bb.limit() - 1; i > bb.position(); i--) { | ||||
|             if (bb.get(i) == '\n') { | ||||
|                 return i; | ||||
|             } | ||||
|         } | ||||
|         return -1; | ||||
|     } | ||||
| } | ||||
|  | ||||
| class BytesWrapper implements Comparable<BytesWrapper> { | ||||
|     private final byte[] bytes; | ||||
|  | ||||
|     public BytesWrapper(byte[] bytes) { | ||||
|         this.bytes = bytes; | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public boolean equals(Object obj) { | ||||
|         if (obj instanceof BytesWrapper) { | ||||
|             return Arrays.equals(bytes, ((BytesWrapper) obj).bytes); | ||||
|         } | ||||
|         else { | ||||
|             return false; | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public int hashCode() { | ||||
|         return Arrays.hashCode(bytes); | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public String toString() { | ||||
|         return new String(bytes); | ||||
|     } | ||||
|  | ||||
|     @Override | ||||
|     public int compareTo(BytesWrapper bytesWrapper) { | ||||
|         return this.toString().compareTo(bytesWrapper.toString()); | ||||
|     } | ||||
| } | ||||
		Reference in New Issue
	
	Block a user