From f837a1d2179233b8c3c0819e75e8f44e1e5b76a9 Mon Sep 17 00:00:00 2001 From: rjianu Date: Wed, 12 Jul 2023 17:30:18 +0300 Subject: [PATCH] feat: Improve the concurrency --- main.go | 43 +++++++++++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/main.go b/main.go index 00a8477..c88544a 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "runtime" "sync" ) @@ -41,29 +42,39 @@ func run(filenames []string, op string, column int, out io.Writer) error { resCh := make(chan []float64) errCh := make(chan error) doneCh := make(chan struct{}) + filesCh := make(chan string) wg := sync.WaitGroup{} - for _, fname := range filenames { + go func() { + defer close(filesCh) + for _, fname := range filenames { + filesCh <- fname + } + }() + + for i := 0; i < runtime.NumCPU(); i++ { wg.Add(1) - go func(fname string) { + go func() { defer wg.Done() - f, err := os.Open(fname) - if err != nil { - errCh <- fmt.Errorf("cannot open file: %w", err) - return - } + for fname := range filesCh { + f, err := os.Open(fname) + if err != nil { + errCh <- fmt.Errorf("cannot open file: %w", err) + return + } - data, err := csv2float(f, column) - if err != nil { - errCh <- err - } + data, err := csv2float(f, column) + if err != nil { + errCh <- err + } - if err := f.Close(); err != nil { - errCh <- err - } + if err := f.Close(); err != nil { + errCh <- err + } - resCh <- data - }(fname) + resCh <- data + } + }() }