Under 2 minutes, could probably start optimizing as I did for rust, but it's good enough for now

This commit is contained in:
Fabian Schmidt 2024-08-30 15:33:40 +02:00
parent 5bb2363eee
commit 4d586c809e

View File

@ -33,23 +33,31 @@ function print_measurements(stations::Dict{String,StationMeasurements})
print("}") print("}")
end end
function main() function merge(stations_vec::Vector{Dict{String,StationMeasurements}})
open("../../../measurements.txt", "r") do f merged = Dict{String,StationMeasurements}()
println("Start") for stations in stations_vec
sz = Base.stat(f).size for (city, sm) in stations
data = mmap(f, Vector{UInt8}, sz) if haskey(merged, city)
idxs = findall(isequal(0x0a), data) merged_sm = merged[city]
stations = Dict{String,StationMeasurements}() sm.min = ifelse(merged_sm.min < sm.min, merged_sm.min, sm.min)
iteration = 0 sm.max = ifelse(merged_sm.max > sm.max, merged_sm.max, sm.max)
last_idx = 1 sm.sum += merged_sm.sum
for i in eachindex(idxs) sm.count += merged_sm.count
if iteration % 1000000 == 0 && iteration > 0 else
print("\x1b[J\x1b[H") merged[city] = sm
percent = round((iteration / 1000000000) * 100; digits=2)
println("$(percent)%")
end end
line = String(data[last_idx:idxs[i]-1]) end
last_idx = idxs[i] + 1 end
merged
end
function process_chunk(data, chunk)
stations = Dict{String,StationMeasurements}()
for i in eachindex(chunk)
if i == 1
continue
end
line = String(data[chunk[i-1]:chunk[i]-1])
station, temp_str = rsplit(line, ";") station, temp_str = rsplit(line, ";")
temp = parse(Float32, temp_str) temp = parse(Float32, temp_str)
if haskey(stations, station) if haskey(stations, station)
@ -61,10 +69,22 @@ function main()
else else
stations[station] = StationMeasurements(temp, temp, temp, 1) stations[station] = StationMeasurements(temp, temp, temp, 1)
end end
iteration += 1
end end
stations
end
function main()
open("../../../measurements.txt", "r") do f
sz = Base.stat(f).size
data = mmap(f, Vector{UInt8}, sz)
idxs = findall(isequal(0x0a), data)
idxs_chunks = collect(Iterators.partition(idxs, length(idxs) ÷ Threads.nthreads()))
tasks = map(idxs_chunks) do chunk
Threads.@spawn process_chunk(data, chunk)
end
stations_vec = fetch.(tasks)
stations = merge(stations_vec)
print_measurements(stations) print_measurements(stations)
println("End")
end end
end end