Updating Sam Pullara's entry
This commit is contained in:
		| @@ -17,5 +17,6 @@ | |||||||
|  |  | ||||||
|  |  | ||||||
| JAVA_OPTS="" | JAVA_OPTS="" | ||||||
|  | sdk use java 21.0.1-graal | ||||||
| time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_spullara | time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_spullara | ||||||
|  |  | ||||||
|   | |||||||
| @@ -27,9 +27,6 @@ import java.util.Arrays; | |||||||
| import java.util.List; | import java.util.List; | ||||||
| import java.util.TreeMap; | import java.util.TreeMap; | ||||||
| import java.util.concurrent.ExecutionException; | import java.util.concurrent.ExecutionException; | ||||||
| import java.util.concurrent.atomic.AtomicInteger; |  | ||||||
| import java.util.function.Consumer; |  | ||||||
| import java.util.function.Supplier; |  | ||||||
| import java.util.stream.Collectors; | import java.util.stream.Collectors; | ||||||
|  |  | ||||||
| public class CalculateAverage_spullara { | public class CalculateAverage_spullara { | ||||||
| @@ -45,72 +42,71 @@ public class CalculateAverage_spullara { | |||||||
|      */ |      */ | ||||||
|  |  | ||||||
|     public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { |     public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { | ||||||
|  |         long start = System.currentTimeMillis(); | ||||||
|         var filename = args.length == 0 ? FILE : args[0]; |         var filename = args.length == 0 ? FILE : args[0]; | ||||||
|         var file = new File(filename); |         var file = new File(filename); | ||||||
|     long start = System.currentTimeMillis(); |  | ||||||
|  |  | ||||||
|     var totalLines = new AtomicInteger(); |         var resultsMap = getFileSegments(file).stream().map(segment -> { | ||||||
|     var results = getFileSegments(file).stream().map(segment -> { |  | ||||||
|             var resultMap = new ByteArrayToResultMap(); |             var resultMap = new ByteArrayToResultMap(); | ||||||
|             long segmentEnd = segment.end(); |             long segmentEnd = segment.end(); | ||||||
|             try (var fileChannel = (FileChannel) Files.newByteChannel(Path.of(filename), StandardOpenOption.READ)) { |             try (var fileChannel = (FileChannel) Files.newByteChannel(Path.of(filename), StandardOpenOption.READ)) { | ||||||
|                 var bb = fileChannel.map(FileChannel.MapMode.READ_ONLY, segment.start(), segmentEnd - segment.start()); |                 var bb = fileChannel.map(FileChannel.MapMode.READ_ONLY, segment.start(), segmentEnd - segment.start()); | ||||||
|         var buffer = new byte[64]; |                 // Up to 100 characters for a city name | ||||||
|         int lines = 0; |                 var buffer = new byte[100]; | ||||||
|                 int startLine; |                 int startLine; | ||||||
|                 int limit = bb.limit(); |                 int limit = bb.limit(); | ||||||
|                 while ((startLine = bb.position()) < limit) { |                 while ((startLine = bb.position()) < limit) { | ||||||
|                     int currentPosition = startLine; |                     int currentPosition = startLine; | ||||||
|                     byte b; |                     byte b; | ||||||
|                     int offset = 0; |                     int offset = 0; | ||||||
|  |                     int hash = 0; | ||||||
|                     while (currentPosition != segmentEnd && (b = bb.get(currentPosition++)) != ';') { |                     while (currentPosition != segmentEnd && (b = bb.get(currentPosition++)) != ';') { | ||||||
|                         buffer[offset++] = b; |                         buffer[offset++] = b; | ||||||
|  |                         hash = 31 * hash + b; | ||||||
|                     } |                     } | ||||||
|           int temp = 0; |                     int temp; | ||||||
|                     int negative = 1; |                     int negative = 1; | ||||||
|           outer: |                     // Inspired by @yemreinci to unroll this even further | ||||||
|           while (currentPosition != segmentEnd && (b = bb.get(currentPosition++)) != '\n') { |                     if (bb.get(currentPosition) == '-') { | ||||||
|             switch (b) { |  | ||||||
|               case '-': |  | ||||||
|                         negative = -1; |                         negative = -1; | ||||||
|               case '.': |  | ||||||
|                 break; |  | ||||||
|               case '\r': |  | ||||||
|                         currentPosition++; |                         currentPosition++; | ||||||
|                 break outer; |  | ||||||
|               default: |  | ||||||
|                 temp = 10 * temp + (b - '0'); |  | ||||||
|                     } |                     } | ||||||
|  |                     if (bb.get(currentPosition + 1) == '.') { | ||||||
|  |                         temp = negative * ((bb.get(currentPosition) - '0') * 10 + (bb.get(currentPosition + 2) - '0')); | ||||||
|  |                         currentPosition += 3; | ||||||
|                     } |                     } | ||||||
|           temp *= negative; |                     else { | ||||||
|           double finalTemp = temp / 10.0; |                         temp = negative * ((bb.get(currentPosition) - '0') * 100 + ((bb.get(currentPosition + 1) - '0') * 10 + (bb.get(currentPosition + 3) - '0'))); | ||||||
|           resultMap.putOrMerge(buffer, 0, offset, |                         currentPosition += 4; | ||||||
|                   () -> new Result(finalTemp), |                     } | ||||||
|                   measurement -> merge(measurement, finalTemp, finalTemp, finalTemp, 1)); |                     if (bb.get(currentPosition) == '\r') { | ||||||
|           lines++; |                         currentPosition++; | ||||||
|  |                     } | ||||||
|  |                     currentPosition++; | ||||||
|  |                     resultMap.putOrMerge(buffer, 0, offset, temp / 10.0, hash); | ||||||
|                     bb.position(currentPosition); |                     bb.position(currentPosition); | ||||||
|                 } |                 } | ||||||
|         totalLines.addAndGet(lines); |  | ||||||
|                 return resultMap; |                 return resultMap; | ||||||
|       } catch (IOException e) { |             } | ||||||
|  |             catch (IOException e) { | ||||||
|                 throw new RuntimeException(e); |                 throw new RuntimeException(e); | ||||||
|             } |             } | ||||||
|     }).parallel().toList(); |         }).parallel().flatMap(partition -> partition.getAll().stream()) | ||||||
|  |  | ||||||
|     var resultMap = results.stream() |  | ||||||
|             .flatMap(partition -> partition.getAll().stream()) |  | ||||||
|                 .collect(Collectors.toMap(e -> new String(e.key()), Entry::value, CalculateAverage_spullara::merge, TreeMap::new)); |                 .collect(Collectors.toMap(e -> new String(e.key()), Entry::value, CalculateAverage_spullara::merge, TreeMap::new)); | ||||||
|  |  | ||||||
|     System.out.println("Time: " + (System.currentTimeMillis() - start) + "ms"); |         System.out.println(resultsMap); | ||||||
|     System.out.println("Lines processed: " + totalLines); |  | ||||||
|     System.out.println(resultMap); |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     private static List<FileSegment> getFileSegments(File file) throws IOException { |     private static List<FileSegment> getFileSegments(File file) throws IOException { | ||||||
|         int numberOfSegments = Runtime.getRuntime().availableProcessors(); |         int numberOfSegments = Runtime.getRuntime().availableProcessors(); | ||||||
|         long fileSize = file.length(); |         long fileSize = file.length(); | ||||||
|         long segmentSize = fileSize / numberOfSegments; |         long segmentSize = fileSize / numberOfSegments; | ||||||
|     List<FileSegment> segments = new ArrayList<>(); |         List<FileSegment> segments = new ArrayList<>(numberOfSegments); | ||||||
|  |         // Pointless to split small files | ||||||
|  |         if (segmentSize < 1_000_000) { | ||||||
|  |             segments.add(new FileSegment(0, fileSize)); | ||||||
|  |             return segments; | ||||||
|  |         } | ||||||
|         try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) { |         try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) { | ||||||
|             for (int i = 0; i < numberOfSegments; i++) { |             for (int i = 0; i < numberOfSegments; i++) { | ||||||
|                 long segStart = i * segmentSize; |                 long segStart = i * segmentSize; | ||||||
| @@ -169,9 +165,6 @@ class Result { | |||||||
|  |  | ||||||
| } | } | ||||||
|  |  | ||||||
| record Pair(int slot, Result slotValue) { |  | ||||||
| } |  | ||||||
|  |  | ||||||
|     record Entry(byte[] key, Result value) { |     record Entry(byte[] key, Result value) { | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -183,17 +176,7 @@ class ByteArrayToResultMap { | |||||||
|     Result[] slots = new Result[MAPSIZE]; |     Result[] slots = new Result[MAPSIZE]; | ||||||
|     byte[][] keys = new byte[MAPSIZE][]; |     byte[][] keys = new byte[MAPSIZE][]; | ||||||
|  |  | ||||||
|   private int hashCode(byte[] a, int fromIndex, int length) { |     public void putOrMerge(byte[] key, int offset, int size, double temp, int hash) { | ||||||
|     int result = 0; |  | ||||||
|     int end = fromIndex + length; |  | ||||||
|     for (int i = fromIndex; i < end; i++) { |  | ||||||
|       result = 31 * result + a[i]; |  | ||||||
|     } |  | ||||||
|     return result; |  | ||||||
|   } |  | ||||||
|  |  | ||||||
|   private Pair getPair(byte[] key, int offset, int size) { |  | ||||||
|     int hash = hashCode(key, offset, size); |  | ||||||
|         int slot = hash & (slots.length - 1); |         int slot = hash & (slots.length - 1); | ||||||
|         var slotValue = slots[slot]; |         var slotValue = slots[slot]; | ||||||
|         // Linear probe for open slot |         // Linear probe for open slot | ||||||
| @@ -201,26 +184,23 @@ class ByteArrayToResultMap { | |||||||
|             slot = (slot + 1) & (slots.length - 1); |             slot = (slot + 1) & (slots.length - 1); | ||||||
|             slotValue = slots[slot]; |             slotValue = slots[slot]; | ||||||
|         } |         } | ||||||
|     return new Pair(slot, slotValue); |         Result value = slotValue; | ||||||
|   } |  | ||||||
|  |  | ||||||
|   public void putOrMerge(byte[] key, int offset, int size, Supplier<Result> supplier, Consumer<Result> merge) { |  | ||||||
|     Pair result = getPair(key, offset, size); |  | ||||||
|     Result value = result.slotValue(); |  | ||||||
|         if (value == null) { |         if (value == null) { | ||||||
|       int slot = result.slot(); |             slots[slot] = new Result(temp); | ||||||
|       slots[slot] = supplier.get(); |  | ||||||
|             byte[] bytes = new byte[size]; |             byte[] bytes = new byte[size]; | ||||||
|             System.arraycopy(key, offset, bytes, 0, size); |             System.arraycopy(key, offset, bytes, 0, size); | ||||||
|             keys[slot] = bytes; |             keys[slot] = bytes; | ||||||
|         } else { |         } else { | ||||||
|       merge.accept(value); |             value.min = Math.min(value.min, temp); | ||||||
|  |             value.max = Math.max(value.max, temp); | ||||||
|  |             value.sum += temp; | ||||||
|  |             value.count += 1; | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     // Get all pairs |     // Get all pairs | ||||||
|     public List<Entry> getAll() { |     public List<Entry> getAll() { | ||||||
|     List<Entry> result = new ArrayList<>(); |         List<Entry> result = new ArrayList<>(slots.length); | ||||||
|         for (int i = 0; i < slots.length; i++) { |         for (int i = 0; i < slots.length; i++) { | ||||||
|             Result slotValue = slots[i]; |             Result slotValue = slots[i]; | ||||||
|             if (slotValue != null) { |             if (slotValue != null) { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user