feat: Improve the concurrency

This commit is contained in:
rjianu
2023-07-12 17:30:18 +03:00
parent 9f35c4eeeb
commit f837a1d217

43
main.go
View File

@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"runtime"
"sync" "sync"
) )
@@ -41,29 +42,39 @@ func run(filenames []string, op string, column int, out io.Writer) error {
resCh := make(chan []float64) resCh := make(chan []float64)
errCh := make(chan error) errCh := make(chan error)
doneCh := make(chan struct{}) doneCh := make(chan struct{})
filesCh := make(chan string)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for _, fname := range filenames { go func() {
defer close(filesCh)
for _, fname := range filenames {
filesCh <- fname
}
}()
for i := 0; i < runtime.NumCPU(); i++ {
wg.Add(1) wg.Add(1)
go func(fname string) { go func() {
defer wg.Done() defer wg.Done()
f, err := os.Open(fname) for fname := range filesCh {
if err != nil { f, err := os.Open(fname)
errCh <- fmt.Errorf("cannot open file: %w", err) if err != nil {
return errCh <- fmt.Errorf("cannot open file: %w", err)
} return
}
data, err := csv2float(f, column) data, err := csv2float(f, column)
if err != nil { if err != nil {
errCh <- err errCh <- err
} }
if err := f.Close(); err != nil { if err := f.Close(); err != nil {
errCh <- err errCh <- err
} }
resCh <- data resCh <- data
}(fname) }
}()
} }