diff --git a/src/main/julia/main.jl b/src/main/julia/main.jl index 5679514..35b7614 100644 --- a/src/main/julia/main.jl +++ b/src/main/julia/main.jl @@ -33,38 +33,58 @@ function print_measurements(stations::Dict{String,StationMeasurements}) print("}") end +function merge(stations_vec::Vector{Dict{String,StationMeasurements}}) + merged = Dict{String,StationMeasurements}() + for stations in stations_vec + for (city, sm) in stations + if haskey(merged, city) + merged_sm = merged[city] + sm.min = ifelse(merged_sm.min < sm.min, merged_sm.min, sm.min) + sm.max = ifelse(merged_sm.max > sm.max, merged_sm.max, sm.max) + sm.sum += merged_sm.sum + sm.count += merged_sm.count + else + merged[city] = sm + end + end + end + merged +end + +function process_chunk(data, chunk) + stations = Dict{String,StationMeasurements}() + for i in eachindex(chunk) + if i == 1 + continue + end + line = String(data[chunk[i-1]:chunk[i]-1]) + station, temp_str = rsplit(line, ";") + temp = parse(Float32, temp_str) + if haskey(stations, station) + sm = stations[station] + sm.min = ifelse(temp < sm.min, temp, sm.min) + sm.max = ifelse(temp > sm.max, temp, sm.max) + sm.sum += temp + sm.count += 1 + else + stations[station] = StationMeasurements(temp, temp, temp, 1) + end + end + stations +end + function main() open("../../../measurements.txt", "r") do f - println("Start") sz = Base.stat(f).size data = mmap(f, Vector{UInt8}, sz) idxs = findall(isequal(0x0a), data) - stations = Dict{String,StationMeasurements}() - iteration = 0 - last_idx = 1 - for i in eachindex(idxs) - if iteration % 1000000 == 0 && iteration > 0 - print("\x1b[J\x1b[H") - percent = round((iteration / 1000000000) * 100; digits=2) - println("$(percent)%") - end - line = String(data[last_idx:idxs[i]-1]) - last_idx = idxs[i] + 1 - station, temp_str = rsplit(line, ";") - temp = parse(Float32, temp_str) - if haskey(stations, station) - sm = stations[station] - sm.min = ifelse(temp < sm.min, temp, sm.min) - sm.max = ifelse(temp > sm.max, temp, sm.max) - sm.sum += temp - sm.count += 1 - else - stations[station] = StationMeasurements(temp, temp, temp, 1) - end - iteration += 1 + idxs_chunks = collect(Iterators.partition(idxs, length(idxs) รท Threads.nthreads())) + tasks = map(idxs_chunks) do chunk + Threads.@spawn process_chunk(data, chunk) end + stations_vec = fetch.(tasks) + stations = merge(stations_vec) print_measurements(stations) - println("End") end end