Add elh's Go solution (#435)
* add elh's Go solution * update elh. fix a bad for loop and add some tuning env vars
This commit is contained in:
		
							
								
								
									
										24
									
								
								src/main/go/elh/Dockerfile
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										24
									
								
								src/main/go/elh/Dockerfile
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,24 @@ | ||||
| # | ||||
| #  Copyright 2023 The original authors | ||||
| # | ||||
| #  Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| #  you may not use this file except in compliance with the License. | ||||
| #  You may obtain a copy of the License at | ||||
| # | ||||
| #      http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| #  Unless required by applicable law or agreed to in writing, software | ||||
| #  distributed under the License is distributed on an "AS IS" BASIS, | ||||
| #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| #  See the License for the specific language governing permissions and | ||||
| #  limitations under the License. | ||||
| # | ||||
|  | ||||
| FROM golang AS builder | ||||
| WORKDIR /app | ||||
| COPY . ./ | ||||
| RUN go build -ldflags "-w -s" -o /1brc-go . | ||||
|  | ||||
| FROM scratch AS runner | ||||
| WORKDIR / | ||||
| COPY --from=builder /1brc-go / | ||||
							
								
								
									
										3
									
								
								src/main/go/elh/go.mod
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										3
									
								
								src/main/go/elh/go.mod
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,3 @@ | ||||
| module github.com/elh/1brc-go | ||||
|  | ||||
| go 1.21.5 | ||||
							
								
								
									
										301
									
								
								src/main/go/elh/main.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										301
									
								
								src/main/go/elh/main.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,301 @@ | ||||
| package main | ||||
|  | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"log" | ||||
| 	"math" | ||||
| 	"os" | ||||
| 	"path/filepath" | ||||
| 	"runtime" | ||||
| 	"runtime/pprof" | ||||
| 	"sort" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| 	"unsafe" | ||||
| ) | ||||
|  | ||||
| // go run main.go [measurements_file] | ||||
| // tune env vars for performance | ||||
| // | ||||
| // Environment variables: | ||||
| // - NUM_PARSERS:         number of parsers to run concurrently. if unset, defaults | ||||
| //   			          to runtime.NumCPU() | ||||
| // - PARSE_CHUNK_SIZE_MB: size of each chunk to parse. if unset, defaults to | ||||
| //                        defaultParseChunkSize | ||||
| // - PROFILE:             if "true", enables profiling | ||||
|  | ||||
| var ( | ||||
| 	// others: "heap", "threadcreate", "block", "mutex" | ||||
| 	profileTypes = []string{"goroutine", "allocs"} | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	defaultMeasurementsPath = "measurements.txt" | ||||
| 	maxNameLen              = 100 | ||||
| 	maxNameNum              = 10000 | ||||
|  | ||||
| 	// tuned for a 2023 Macbook M2 Pro | ||||
| 	defaultParseChunkSizeMB = 64 | ||||
| 	mb                      = 1024 * 1024 // bytes | ||||
| ) | ||||
|  | ||||
| type Stats struct { | ||||
| 	Min, Max, Sum float64 | ||||
| 	Count         int | ||||
| } | ||||
|  | ||||
| // rounding floats to 1 decimal place with 0.05 rounding up to 0.1 | ||||
| func round(x float64) float64 { | ||||
| 	return math.Floor((x+0.05)*10) / 10 | ||||
| } | ||||
|  | ||||
| // parseFloatFast is a high performance float parser using the assumption that | ||||
| // the byte slice will always have a single decimal digit. | ||||
| func parseFloatFast(bs []byte) float64 { | ||||
| 	var intStartIdx int // is negative? | ||||
| 	if bs[0] == '-' { | ||||
| 		intStartIdx = 1 | ||||
| 	} | ||||
|  | ||||
| 	v := float64(bs[len(bs)-1]-'0') / 10 // single decimal digit | ||||
| 	place := 1.0 | ||||
| 	for i := len(bs) - 3; i >= intStartIdx; i-- { // integer part | ||||
| 		v += float64(bs[i]-'0') * place | ||||
| 		place *= 10 | ||||
| 	} | ||||
|  | ||||
| 	if intStartIdx == 1 { | ||||
| 		v *= -1 | ||||
| 	} | ||||
| 	return v | ||||
| } | ||||
|  | ||||
| // size is the intended number of bytes to parse. buffer should be longer than size | ||||
| // because we need to continue reading until the end of the line in order to | ||||
| // properly segment the entire file and not miss any data. | ||||
| func parseAt(f *os.File, buf []byte, offset int64, size int) map[string]*Stats { | ||||
| 	stats := make(map[string]*Stats, maxNameNum) | ||||
| 	n, err := f.ReadAt(buf, offset) // load the buffer | ||||
| 	if err != nil && err != io.EOF { | ||||
| 		log.Fatal(err) | ||||
| 	} | ||||
|  | ||||
| 	lastName := make([]byte, maxNameLen) // last name parsed | ||||
| 	var lastNameLen int | ||||
| 	isScanningName := true // currently scanning name or value? | ||||
|  | ||||
| 	// if offset is non-zero, skip to the first new line | ||||
| 	var idx, start int | ||||
| 	if offset != 0 { | ||||
| 		for idx < n { | ||||
| 			if buf[idx] == '\n' { | ||||
| 				idx++ | ||||
| 				start = idx | ||||
| 				break | ||||
| 			} | ||||
| 			idx++ | ||||
| 		} | ||||
| 	} | ||||
| 	// tick tock between parsing names and values while accummulating stats | ||||
| 	for { | ||||
| 		if isScanningName { | ||||
| 			for idx < n { | ||||
| 				if buf[idx] == ';' { | ||||
| 					nameBs := buf[start:idx] | ||||
| 					lastNameLen = copy(lastName, nameBs) | ||||
|  | ||||
| 					idx++ | ||||
| 					start = idx | ||||
| 					isScanningName = false | ||||
| 					break | ||||
| 				} | ||||
| 				idx++ | ||||
| 			} | ||||
| 		} else { | ||||
| 			for idx < n { | ||||
| 				if buf[idx] == '\n' { | ||||
| 					valueBs := buf[start:idx] | ||||
| 					value := parseFloatFast(valueBs) | ||||
|  | ||||
| 					nameUnsafe := unsafe.String(&lastName[0], lastNameLen) | ||||
| 					if s, ok := stats[nameUnsafe]; !ok { | ||||
| 						name := string(lastName[:lastNameLen]) // actually allocate string | ||||
| 						stats[name] = &Stats{Min: value, Max: value, Sum: value, Count: 1} | ||||
| 					} else { | ||||
| 						if value < s.Min { | ||||
| 							s.Min = value | ||||
| 						} | ||||
| 						if value > s.Max { | ||||
| 							s.Max = value | ||||
| 						} | ||||
| 						s.Sum += value | ||||
| 						s.Count++ | ||||
| 					} | ||||
|  | ||||
| 					idx++ | ||||
| 					start = idx | ||||
| 					isScanningName = true | ||||
| 					break | ||||
| 				} | ||||
| 				idx++ | ||||
| 			} | ||||
| 		} | ||||
| 		// terminate when we hit the first newline after the intended size OR | ||||
| 		// when we hit the end of the file | ||||
| 		if (isScanningName && idx >= size) || idx >= n { | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return stats | ||||
| } | ||||
|  | ||||
| func printResults(stats map[string]*Stats) { // doesn't help | ||||
| 	// sorted alphabetically for output | ||||
| 	names := make([]string, 0, len(stats)) | ||||
| 	for name := range stats { | ||||
| 		names = append(names, name) | ||||
| 	} | ||||
| 	sort.Strings(names) | ||||
|  | ||||
| 	var builder strings.Builder | ||||
| 	for i, name := range names { | ||||
| 		s := stats[name] | ||||
| 		// gotcha: first round the sum to to remove float precision errors! | ||||
| 		avg := round(round(s.Sum) / float64(s.Count)) | ||||
| 		builder.WriteString(fmt.Sprintf("%s=%.1f/%.1f/%.1f", name, s.Min, avg, s.Max)) | ||||
| 		if i < len(names)-1 { | ||||
| 			builder.WriteString(", ") | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	writer := bufio.NewWriter(os.Stdout) | ||||
| 	fmt.Fprintf(writer, "{%s}\n", builder.String()) | ||||
| 	writer.Flush() | ||||
| } | ||||
|  | ||||
| // Read file in chunks and parse concurrently. N parsers work off of a chunk | ||||
| // offset chan and send results on an output chan. The results are merged into a | ||||
| // single map of stats and printed. | ||||
| func main() { | ||||
| 	// parse env vars and inputs | ||||
| 	shouldProfile := os.Getenv("PROFILE") == "true" | ||||
| 	var err error | ||||
| 	var numParsers int | ||||
| 	{ | ||||
| 		if os.Getenv("NUM_PARSERS") != "" { | ||||
| 			numParsers, err = strconv.Atoi(os.Getenv("NUM_PARSERS")) | ||||
| 			if err != nil { | ||||
| 				log.Fatal(fmt.Errorf("failed to parse NUM_PARSERS: %w", err)) | ||||
| 			} | ||||
| 		} else { | ||||
| 			numParsers = runtime.NumCPU() | ||||
| 		} | ||||
| 	} | ||||
| 	var parseChunkSize int | ||||
| 	{ | ||||
| 		if os.Getenv("PARSE_CHUNK_SIZE_MB") != "" { | ||||
| 			parseChunkSizeMB, err := strconv.Atoi(os.Getenv("PARSE_CHUNK_SIZE_MB")) | ||||
| 			if err != nil { | ||||
| 				log.Fatal(fmt.Errorf("failed to parse PARSE_CHUNK_SIZE_MB: %w", err)) | ||||
| 			} | ||||
| 			parseChunkSize = parseChunkSizeMB * mb | ||||
| 		} else { | ||||
| 			parseChunkSize = defaultParseChunkSizeMB * mb | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	measurementsPath := defaultMeasurementsPath | ||||
| 	if len(os.Args) > 1 { | ||||
| 		measurementsPath = os.Args[1] | ||||
| 	} | ||||
|  | ||||
| 	// profile code | ||||
| 	if shouldProfile { | ||||
| 		nowUnix := time.Now().Unix() | ||||
| 		os.MkdirAll(fmt.Sprintf("profiles/%d", nowUnix), 0755) | ||||
| 		for _, profileType := range profileTypes { | ||||
| 			file, _ := os.Create(fmt.Sprintf("profiles/%d/%s.%s.pprof", | ||||
| 				nowUnix, filepath.Base(measurementsPath), profileType)) | ||||
| 			defer file.Close() | ||||
| 			defer pprof.Lookup(profileType).WriteTo(file, 0) | ||||
| 		} | ||||
|  | ||||
| 		file, _ := os.Create(fmt.Sprintf("profiles/%d/%s.cpu.pprof", | ||||
| 			nowUnix, filepath.Base(measurementsPath))) | ||||
| 		defer file.Close() | ||||
| 		pprof.StartCPUProfile(file) | ||||
| 		defer pprof.StopCPUProfile() | ||||
| 	} | ||||
|  | ||||
| 	// read file | ||||
| 	f, err := os.Open(measurementsPath) | ||||
| 	if err != nil { | ||||
| 		log.Fatal(fmt.Errorf("failed to open %s file: %w", measurementsPath, err)) | ||||
| 	} | ||||
| 	defer f.Close() | ||||
|  | ||||
| 	info, err := f.Stat() | ||||
| 	if err != nil { | ||||
| 		log.Fatal(fmt.Errorf("failed to read %s file: %w", measurementsPath, err)) | ||||
| 	} | ||||
|  | ||||
| 	// kick off "parser" workers | ||||
| 	wg := sync.WaitGroup{} | ||||
| 	wg.Add(numParsers) | ||||
|  | ||||
| 	// buffered to not block on merging | ||||
| 	chunkOffsetCh := make(chan int64, numParsers) | ||||
| 	chunkStatsCh := make(chan map[string]*Stats, numParsers) | ||||
|  | ||||
| 	go func() { | ||||
| 		i := 0 | ||||
| 		for i < int(info.Size()) { | ||||
| 			chunkOffsetCh <- int64(i) | ||||
| 			i += parseChunkSize | ||||
| 		} | ||||
| 		close(chunkOffsetCh) | ||||
| 	}() | ||||
|  | ||||
| 	for i := 0; i < numParsers; i++ { | ||||
| 		// WARN: w/ extra padding for line overflow. Each chunk should be read past | ||||
| 		// the intended size to the next new line. 128 bytes should be enough for | ||||
| 		// a max 100 byte name + the float value. | ||||
| 		buf := make([]byte, parseChunkSize+128) | ||||
| 		go func() { | ||||
| 			for chunkOffset := range chunkOffsetCh { | ||||
| 				chunkStatsCh <- parseAt(f, buf, chunkOffset, parseChunkSize) | ||||
| 			} | ||||
| 			wg.Done() | ||||
| 		}() | ||||
| 	} | ||||
|  | ||||
| 	go func() { | ||||
| 		wg.Wait() | ||||
| 		close(chunkStatsCh) | ||||
| 	}() | ||||
|  | ||||
| 	mergedStats := make(map[string]*Stats, maxNameNum) | ||||
| 	for chunkStats := range chunkStatsCh { | ||||
| 		for name, s := range chunkStats { | ||||
| 			if ms, ok := mergedStats[name]; !ok { | ||||
| 				mergedStats[name] = s | ||||
| 			} else { | ||||
| 				if s.Min < ms.Min { | ||||
| 					ms.Min = s.Min | ||||
| 				} | ||||
| 				if s.Max > ms.Max { | ||||
| 					ms.Max = s.Max | ||||
| 				} | ||||
| 				ms.Sum += s.Sum | ||||
| 				ms.Count += s.Count | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	printResults(mergedStats) | ||||
| } | ||||
		Reference in New Issue
	
	Block a user