use unsafe (#343)
This commit is contained in:
parent
594f6c4e5f
commit
06f9b74829
@ -15,5 +15,5 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
#
|
||||||
|
|
||||||
JAVA_OPTS=""
|
JAVA_OPTS="--enable-preview"
|
||||||
java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_zerninv
|
java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_zerninv
|
@ -15,9 +15,14 @@
|
|||||||
*/
|
*/
|
||||||
package dev.morling.onebrc;
|
package dev.morling.onebrc;
|
||||||
|
|
||||||
|
import sun.misc.Unsafe;
|
||||||
|
|
||||||
|
import java.io.BufferedOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.MappedByteBuffer;
|
import java.lang.foreign.Arena;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
import java.nio.channels.FileChannel;
|
import java.nio.channels.FileChannel;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.StandardOpenOption;
|
import java.nio.file.StandardOpenOption;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
@ -27,28 +32,33 @@ import java.util.concurrent.Future;
|
|||||||
|
|
||||||
public class CalculateAverage_zerninv {
|
public class CalculateAverage_zerninv {
|
||||||
private static final String FILE = "./measurements.txt";
|
private static final String FILE = "./measurements.txt";
|
||||||
private static final int MIN_FILE_SIZE = 1024 * 1024;
|
private static final int MIN_CHUNK_SIZE = 1024 * 1024 * 16;
|
||||||
private static final char DELIMITER = ';';
|
private static final char DELIMITER = ';';
|
||||||
private static final char LINE_SEPARATOR = '\n';
|
private static final char LINE_SEPARATOR = '\n';
|
||||||
private static final char ZERO = '0';
|
private static final char ZERO = '0';
|
||||||
private static final char NINE = '9';
|
private static final char NINE = '9';
|
||||||
private static final char MINUS = '-';
|
private static final char MINUS = '-';
|
||||||
|
|
||||||
|
private static final Unsafe UNSAFE = initUnsafe();
|
||||||
|
|
||||||
public static void main(String[] args) throws IOException {
|
public static void main(String[] args) throws IOException {
|
||||||
var results = new HashMap<String, MeasurementAggregation>();
|
var results = new HashMap<String, MeasurementAggregation>();
|
||||||
try (var channel = FileChannel.open(Path.of(FILE), StandardOpenOption.READ)) {
|
try (var channel = FileChannel.open(Path.of(FILE), StandardOpenOption.READ)) {
|
||||||
var fileSize = channel.size();
|
var fileSize = channel.size();
|
||||||
|
var memorySegment = channel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize, Arena.global());
|
||||||
|
long address = memorySegment.address();
|
||||||
var cores = Runtime.getRuntime().availableProcessors();
|
var cores = Runtime.getRuntime().availableProcessors();
|
||||||
var chunks = cores - 1;
|
var chunkAmount = cores - 1;
|
||||||
var maxChunkSize = fileSize < MIN_FILE_SIZE ? fileSize : Math.min(fileSize / chunks, Integer.MAX_VALUE);
|
// var maxChunkSize = Math.min(fileSize, MIN_CHUNK_SIZE);
|
||||||
var chunkOffsets = splitByChunks(channel, maxChunkSize);
|
var maxChunkSize = fileSize < MIN_CHUNK_SIZE ? fileSize : fileSize / chunkAmount;
|
||||||
|
var chunks = splitByChunks(address, address + fileSize, maxChunkSize);
|
||||||
|
|
||||||
var executor = Executors.newFixedThreadPool(cores);
|
var executor = Executors.newFixedThreadPool(cores);
|
||||||
List<Future<Map<String, MeasurementAggregation>>> fResults = new ArrayList<>();
|
List<Future<Map<String, MeasurementAggregation>>> fResults = new ArrayList<>();
|
||||||
for (int i = 1; i < chunkOffsets.size(); i++) {
|
for (int i = 1; i < chunks.size(); i++) {
|
||||||
final long prev = chunkOffsets.get(i - 1);
|
final long prev = chunks.get(i - 1);
|
||||||
final long curr = chunkOffsets.get(i);
|
final long curr = chunks.get(i);
|
||||||
fResults.add(executor.submit(() -> calcForChunk(channel, prev, curr)));
|
fResults.add(executor.submit(() -> calcForChunk(prev, curr)));
|
||||||
}
|
}
|
||||||
|
|
||||||
fResults.forEach(f -> {
|
fResults.forEach(f -> {
|
||||||
@ -69,49 +79,62 @@ public class CalculateAverage_zerninv {
|
|||||||
});
|
});
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
}
|
}
|
||||||
System.out.println(new TreeMap<>(results));
|
|
||||||
|
var bos = new BufferedOutputStream(System.out);
|
||||||
|
bos.write(new TreeMap<>(results).toString().getBytes(StandardCharsets.UTF_8));
|
||||||
|
bos.write('\n');
|
||||||
|
bos.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<Long> splitByChunks(FileChannel channel, long maxChunkSize) throws IOException {
|
private static Unsafe initUnsafe() {
|
||||||
long size = channel.size();
|
try {
|
||||||
|
Field unsafe = Unsafe.class.getDeclaredField("theUnsafe");
|
||||||
|
unsafe.setAccessible(true);
|
||||||
|
return (Unsafe) unsafe.get(Unsafe.class);
|
||||||
|
}
|
||||||
|
catch (IllegalAccessException | NoSuchFieldException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<Long> splitByChunks(long address, long end, long maxChunkSize) {
|
||||||
List<Long> result = new ArrayList<>();
|
List<Long> result = new ArrayList<>();
|
||||||
long current = 0;
|
result.add(address);
|
||||||
result.add(current);
|
while (address < end) {
|
||||||
while (current < size) {
|
long ptr = address + Math.min(end - address, maxChunkSize) - 1;
|
||||||
var mbb = channel.map(FileChannel.MapMode.READ_ONLY, current, Math.min(size - current, maxChunkSize));
|
while (UNSAFE.getByte(ptr) != LINE_SEPARATOR) {
|
||||||
int position = mbb.limit() - 1;
|
ptr--;
|
||||||
while (mbb.get(position) != LINE_SEPARATOR) {
|
|
||||||
position--;
|
|
||||||
}
|
}
|
||||||
current += position + 1;
|
address = ptr + 1;
|
||||||
result.add(current);
|
result.add(address);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Map<String, MeasurementAggregation> calcForChunk(FileChannel channel, long begin, long end) throws IOException {
|
private static Map<String, MeasurementAggregation> calcForChunk(long offset, long end) {
|
||||||
var mbb = channel.map(FileChannel.MapMode.READ_ONLY, begin, end - begin);
|
var results = new MeasurementContainer();
|
||||||
var results = new MeasurementContainer(mbb);
|
|
||||||
int cityOffset, cityNameSize, hashCode, temperatureOffset, temperature;
|
|
||||||
byte b;
|
|
||||||
|
|
||||||
while (mbb.hasRemaining()) {
|
long cityOffset, temperatureOffset;
|
||||||
cityOffset = mbb.position();
|
int hashCode, temperature;
|
||||||
|
byte cityNameSize, b;
|
||||||
|
|
||||||
|
while (offset < end) {
|
||||||
|
cityOffset = offset;
|
||||||
hashCode = 0;
|
hashCode = 0;
|
||||||
while ((b = mbb.get()) != DELIMITER) {
|
while ((b = UNSAFE.getByte(offset++)) != DELIMITER) {
|
||||||
hashCode = 31 * hashCode + b;
|
hashCode = 31 * hashCode + b;
|
||||||
}
|
}
|
||||||
|
|
||||||
temperatureOffset = mbb.position();
|
temperatureOffset = offset;
|
||||||
cityNameSize = temperatureOffset - cityOffset - 1;
|
cityNameSize = (byte) (temperatureOffset - cityOffset - 1);
|
||||||
|
|
||||||
temperature = 0;
|
temperature = 0;
|
||||||
while ((b = mbb.get()) != LINE_SEPARATOR) {
|
while ((b = UNSAFE.getByte(offset++)) != LINE_SEPARATOR) {
|
||||||
if (b >= ZERO && b <= NINE) {
|
if (b >= ZERO && b <= NINE) {
|
||||||
temperature = temperature * 10 + (b - ZERO);
|
temperature = temperature * 10 + (b - ZERO);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (mbb.get(temperatureOffset) == MINUS) {
|
if (UNSAFE.getByte(temperatureOffset) == MINUS) {
|
||||||
temperature *= -1;
|
temperature *= -1;
|
||||||
}
|
}
|
||||||
results.put(cityOffset, cityNameSize, hashCode, (short) temperature);
|
results.put(cityOffset, cityNameSize, hashCode, (short) temperature);
|
||||||
@ -121,11 +144,11 @@ public class CalculateAverage_zerninv {
|
|||||||
|
|
||||||
private static final class MeasurementAggregation {
|
private static final class MeasurementAggregation {
|
||||||
private long sum;
|
private long sum;
|
||||||
private int count;
|
private long count;
|
||||||
private short min;
|
private short min;
|
||||||
private short max;
|
private short max;
|
||||||
|
|
||||||
public MeasurementAggregation(long sum, int count, short min, short max) {
|
public MeasurementAggregation(long sum, long count, short min, short max) {
|
||||||
this.sum = sum;
|
this.sum = sum;
|
||||||
this.count = count;
|
this.count = count;
|
||||||
this.min = min;
|
this.min = min;
|
||||||
@ -151,74 +174,88 @@ public class CalculateAverage_zerninv {
|
|||||||
private static final class MeasurementContainer {
|
private static final class MeasurementContainer {
|
||||||
private static final int SIZE = 1024 * 16;
|
private static final int SIZE = 1024 * 16;
|
||||||
|
|
||||||
private final MappedByteBuffer mbb;
|
private static final int ENTRY_SIZE = 8 + 1 + 4 + 8 + 8 + 2 + 2;
|
||||||
private final int[] offsets = new int[SIZE];
|
private static final int COUNT_OFFSET = 0;
|
||||||
private final int[] sizes = new int[SIZE];
|
private static final int SIZE_OFFSET = 8;
|
||||||
private final int[] hashes = new int[SIZE];
|
private static final int HASH_OFFSET = 9;
|
||||||
|
private static final int ADDRESS_OFFSET = 13;
|
||||||
|
private static final int SUM_OFFSET = 21;
|
||||||
|
private static final int MIN_OFFSET = 29;
|
||||||
|
private static final int MAX_OFFSET = 31;
|
||||||
|
|
||||||
private final long[] sums = new long[SIZE];
|
private final long address;
|
||||||
private final int[] counts = new int[SIZE];
|
|
||||||
private final short[] mins = new short[SIZE];
|
|
||||||
private final short[] maxs = new short[SIZE];
|
|
||||||
|
|
||||||
private MeasurementContainer(MappedByteBuffer mbb) {
|
private MeasurementContainer() {
|
||||||
this.mbb = mbb;
|
address = UNSAFE.allocateMemory(ENTRY_SIZE * SIZE);
|
||||||
Arrays.fill(mins, Short.MAX_VALUE);
|
UNSAFE.setMemory(address, ENTRY_SIZE * SIZE, (byte) 0);
|
||||||
Arrays.fill(maxs, Short.MIN_VALUE);
|
for (long ptr = address; ptr < address + SIZE * ENTRY_SIZE; ptr += ENTRY_SIZE) {
|
||||||
|
UNSAFE.putShort(ptr + MIN_OFFSET, Short.MAX_VALUE);
|
||||||
|
UNSAFE.putShort(ptr + MAX_OFFSET, Short.MIN_VALUE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void put(int offset, int size, int hash, short value) {
|
public void put(long address, byte size, int hash, short value) {
|
||||||
int i = findIdx(offset, size, hash);
|
long ptr = findAddress(address, size, hash);
|
||||||
offsets[i] = offset;
|
|
||||||
sizes[i] = size;
|
|
||||||
hashes[i] = hash;
|
|
||||||
|
|
||||||
sums[i] += value;
|
UNSAFE.putLong(ptr + COUNT_OFFSET, UNSAFE.getLong(ptr + COUNT_OFFSET) + 1);
|
||||||
counts[i]++;
|
UNSAFE.putByte(ptr + SIZE_OFFSET, size);
|
||||||
|
UNSAFE.putInt(ptr + HASH_OFFSET, hash);
|
||||||
|
UNSAFE.putLong(ptr + ADDRESS_OFFSET, address);
|
||||||
|
|
||||||
if (value < mins[i]) {
|
UNSAFE.putLong(ptr + SUM_OFFSET, UNSAFE.getLong(ptr + SUM_OFFSET) + value);
|
||||||
mins[i] = value;
|
if (value < UNSAFE.getShort(ptr + MIN_OFFSET)) {
|
||||||
|
UNSAFE.putShort(ptr + MIN_OFFSET, value);
|
||||||
}
|
}
|
||||||
if (value > maxs[i]) {
|
if (value > UNSAFE.getShort(ptr + MAX_OFFSET)) {
|
||||||
maxs[i] = value;
|
UNSAFE.putShort(ptr + MAX_OFFSET, value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, MeasurementAggregation> toStringMap() {
|
public Map<String, MeasurementAggregation> toStringMap() {
|
||||||
var result = new HashMap<String, MeasurementAggregation>();
|
var result = new HashMap<String, MeasurementAggregation>();
|
||||||
for (int i = 0; i < SIZE; i++) {
|
for (int i = 0; i < SIZE; i++) {
|
||||||
if (counts[i] != 0) {
|
long ptr = this.address + i * ENTRY_SIZE;
|
||||||
var key = createString(offsets[i], sizes[i]);
|
if (UNSAFE.getLong(ptr + COUNT_OFFSET) != 0) {
|
||||||
result.put(key, new MeasurementAggregation(sums[i], counts[i], mins[i], maxs[i]));
|
var measurements = new MeasurementAggregation(
|
||||||
|
UNSAFE.getLong(ptr + SUM_OFFSET),
|
||||||
|
UNSAFE.getLong(ptr + COUNT_OFFSET),
|
||||||
|
UNSAFE.getShort(ptr + MIN_OFFSET),
|
||||||
|
UNSAFE.getShort(ptr + MAX_OFFSET));
|
||||||
|
var key = createString(UNSAFE.getLong(ptr + ADDRESS_OFFSET), UNSAFE.getByte(ptr + SIZE_OFFSET));
|
||||||
|
result.put(key, measurements);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
private int findIdx(int offset, int size, int hash) {
|
private long findAddress(long address, byte size, int hash) {
|
||||||
int i = Math.abs(hash % SIZE);
|
int idx = Math.abs(hash % SIZE);
|
||||||
while (counts[i] != 0) {
|
long ptr = this.address + idx * ENTRY_SIZE;
|
||||||
if (hashes[i] == hash && sizes[i] == size && isEqual(i, offset)) {
|
while (UNSAFE.getLong(ptr + COUNT_OFFSET) != 0) {
|
||||||
|
if (UNSAFE.getByte(ptr + SIZE_OFFSET) == size
|
||||||
|
&& UNSAFE.getInt(ptr + HASH_OFFSET) == hash
|
||||||
|
&& isEqual(UNSAFE.getLong(ptr + ADDRESS_OFFSET), address, size)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
i = (i + 1) % SIZE;
|
idx = (idx + 1) % SIZE;
|
||||||
|
ptr = this.address + idx * ENTRY_SIZE;
|
||||||
}
|
}
|
||||||
return i;
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isEqual(int index, int offset) {
|
private boolean isEqual(long address, long address2, byte size) {
|
||||||
for (int i = 0; i < sizes[index]; i++) {
|
for (int i = 0; i < size; i++) {
|
||||||
if (mbb.get(offsets[index] + i) != mbb.get(offset + i)) {
|
if (UNSAFE.getByte(address + i) != UNSAFE.getByte(address2 + i)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String createString(int offset, int size) {
|
private String createString(long address, byte size) {
|
||||||
byte[] arr = new byte[size];
|
byte[] arr = new byte[size];
|
||||||
for (int i = 0; i < size; i++) {
|
for (int i = 0; i < size; i++) {
|
||||||
arr[i] = mbb.get(offset + i);
|
arr[i] = UNSAFE.getByte(address + i);
|
||||||
}
|
}
|
||||||
return new String(arr);
|
return new String(arr);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user