• What we do
  • The People
  • About Us
  • Why Innovation Africa
  • Contact Us
Innovation AfricaCreating the Future Today
  • Feature Articles
  • Innovation
  • Agriculture
  • ICT
  • Technology
  • Entrepreneurship
  • Health
  • Store
  • Contact Us
Menu
  • Feature Articles
  • Innovation
  • Agriculture
  • ICT
  • Technology
  • Entrepreneurship
  • Health
  • Store
  • Contact Us
  • Run strikingly fast parallel file searches in Go with sync.ErrGroup

    September 19, 2016 Editor 0

    Go’s new sync.ErrGroup package significantly improves developer productivity with goroutines.

    One of Go’s flagship features is its powerful concurrency primitives, like channels and goroutines. But often goroutines are a foreign concept to newcomers to Go, so it’s not uncommon to see frustration as new learners try to master the concepts of concurrency.

    The first tool that the Go team released to help with the complexity of managing goroutines was sync.WaitGroup, which allowed you to create a WaitGroup that would block until a specified number of goroutines had finished executing. Here’s an example from the documentation:

       var wg sync.WaitGroup
        var urls = []string{
                "http://www.golang.org/",
                "http://www.google.com/",
                "http://www.somestupidname.com/",
        }
        for _, url := range urls {
                // Increment the WaitGroup counter.
                wg.Add(1)
                // Launch a goroutine to fetch the URL.
                go func(url string) {
                        // Decrement the counter when the goroutine completes.
                        defer wg.Done()
                        // Fetch the URL.
                        http.Get(url)
                }(url)
        }
        // Wait for all HTTP fetches to complete.
        wg.Wait()
    

    WaitGroups made it significantly easier to deal with concurrency in Go because they reduced the amount of accounting you had to do when launching goroutines. Every time you launch a goroutine you increment the WaitGroup by calling Add(). When one finishes, you call wg.Done(). To wait for all of them to complete, you call wg.Wait() which blocks until they’ve all finished. The only issue is that if a problem happened in one of your goroutines it was difficult to figure out what the error was.

    Extending sync.WaitGroup’s functionality

    Recently the Go team added a new package in the experimental repository called sync.ErrGroup. sync.ErrGroup extends sync.WaitGroup by adding error propagation and the ability to cancel an entire set of goroutines when an unrecoverable error occurs, or a timeout is reached. Here’s the same example rewritten to use an ErrGroup:

    var g errgroup.Group
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/",
    }
    for _, url := range urls {
        // Launch a goroutine to fetch the URL.
        url := url // https://golang.org/doc/faq#closures_and_goroutines
        g.Go(func() error {
            // Fetch the URL.
            resp, err := http.Get(url)
            if err == nil {
                resp.Body.Close()
            }
            return err
        })
    }
    // Wait for all HTTP fetches to complete.
    if err := g.Wait(); err == nil {
        fmt.Println("Successfully fetched all URLs.")
    }
    

    The g.Go() function above is a wrapper that allows you to launch an anonymous function but still capture the errors that it may return without all the verbose plumbing that would otherwise be required. It’s a significant improvement in developer productivity when using goroutines.

    To test all of the functionality of sync.ErrGroup, I’ve written a small program that searches a directory recursively for Go files with a specified pattern. This might be useful to find instances in your Go source tree where you’ve used a package that has been deprecated or updated. To test all of the features of sync.ErrGroup, I also added a time limit feature to the application. If the time limit is reached, all of the searching and processing goroutines will be cancelled and the program will exit.

    When applied against the directory for my sample application it produces these results:

    $ gogrep -timeout 1000ms . fmt                                                                                                 
    gogrep.go
    1 hits
    

    If you call it without the right number of parameters it prints the correct usage:

    gogrep by Brian Ketelsen
    Flags:
      -timeout duration
        	timeout in milliseconds (default 500ms)
    Usage:
    	gogrep [flags] path pattern
    

    How sync.ErrGroup makes application building easier

    Let’s take a look at the code and see how sync.ErrGroup makes this application so easy to build. We’ll start with main() because I like to read code like a story, and every code story starts with main().

    package main
    
    import (
    	"bytes"
    	"flag"
    	"fmt"
    	"io/ioutil"
    	"log"
    	"os"
    	"path/filepath"
    	"strings"
    	"time"
    
    	"golang.org/x/net/context"
    	"golang.org/x/sync/errgroup"
    )
    
    func main() {
    	duration := flag.Duration("timeout", 500*time.Millisecond, "timeout in milliseconds")
    	flag.Usage = func() {
    		fmt.Printf("%s by Brian Ketelsen\n", os.Args[0])
    		fmt.Println("Usage:")
    		fmt.Printf("	gogrep [flags] path pattern \n")
    		fmt.Println("Flags:")
    		flag.PrintDefaults()
    
    	}
    	flag.Parse()
    	if flag.NArg() != 2 {
    		flag.Usage()
    		os.Exit(-1)
    	}
    	path := flag.Arg(0)
    	pattern := flag.Arg(1)
    	ctx, _ := context.WithTimeout(context.Background(), *duration)
    	m, err := search(ctx, path, pattern)
    	if err != nil {
    		log.Fatal(err)
    	}
    	for _, name := range m {
    		fmt.Println(name)
    	}
    	fmt.Println(len(m), "hits")
    }
    

    The first 15 lines set up the flags and arguments that are expected and print a nice default error message when the program is called without the right number of arguments. The first line of interest is line 16:

    	ctx, _ := context.WithTimeout(context.Background(), *duration)
    

    Here, I’ve created a new context.Context with a timeout attached to it. The timeout duration is set to the duration flag variable. When the timeout is reached, “ctx” and all contexts that inherit from it will receive a message on a channel alerting them to the timeout. WithTimeout also returns a cancel function which we won’t need, so I’ve discarded it by assigning it to “_”.

    The next line calls the search() function passing in the context, search path, and search patterns. Finally the results are printed to the terminal followed by a count of search hits.

    Breaking down the search() function

    The search() function is a little longer than main() so I’ll break it up as I explain what’s happening.

    The first thing that happens in the search function is the creation of a new errgroup. This structure contains the context and does all the process accounting for the concurrency that will follow.

    func search(ctx context.Context, root string, pattern string) ([]string, error) {
    	g, ctx := errgroup.WithContext(ctx)
    

    Next, I created a channel to keep track of all of the files that need to be searched. Later we’ll send search candidates to this channel for further processing to determine if they’re a match to the supplied pattern. This channel has a buffer of 100 so the processing goroutines can get started before the file search goroutines finish.

    	paths := make(chan string, 100)
    

    The errgroup type has two methods: Wait() and Go(). Go() launches tasks and Wait() blocks until they’ve all completed. Here, we call Go() with an anonymous function that returns an error.

    	g.Go(func() error {
    

    Next, we defer closing the “paths” channel to signal that all of the directory searching has completed. This allows us to use Go’s “range” statement later to process all the candidate files in more goroutines.

    		defer close(paths)
    

    Finally we use the filepath package’s Walk() function to recursively look through all the files in the directory specified in the command line arguments. It checks to make sure that the file is a readable file, then adds a fast-path exit for any files that don’t have the “.go” suffix. There’s no point in searching for Go source code in files that aren’t Go source code.

    	return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
    			if err != nil {
    				return err
    			}
    			if !info.Mode().IsRegular() {
    				return nil
    			}
    			if !info.IsDir() && !strings.HasSuffix(info.Name(), ".go") {
    				return nil
    			}
    

    Spotting the real power of sync.Errgroup

    Each of the above conditions will abandon processing for the current file because it isn’t a candidate for our search. Anything that makes it past this point is a Go source file that we want to examine. Here’s where the real power of the sync.Errgroup starts to show. I use a select statement with two possible cases. The first case sends the name of the file to the “paths” channel where another goroutine will search its contents. The second case waits for the context’s timeout to occur. As long as there is still time left on the clock, the current file will be sent for processing. When the timer expires, the context’s Done channel will send a message that is caught causing the goroutine to return, which stops the file searching routine.

    		select {
    			case paths <- path:
    			case <-ctx.Done():
    				return ctx.Err()
    			}
    			return nil
    		})
    
    	})
    

    Next I created a channel to handle all the files that matched the search pattern.

    	c := make(chan string,100)
    

    Now we can iterate over the files in paths channel and search their contents.

    	for path := range paths {
    

    One unique thing here that’s worth pointing out: because the goroutine is a closure it captures the values of surrounding variables while it’s executing. Because we’re going to have multiple goroutines, we need to capture the current value of the path variable inside the loop, otherwise all goroutines would be operating on a value of the path variable that may have changed on the next iteration of the for loop. That would be a nasty race condition.

    		p := path
    

    Now we’ll fire off another anonymous function for every candidate file. This function reads the contents of the Go source file and checks to see if it contains the supplied search pattern.

    		g.Go(func() error {
    			data, err := ioutil.ReadFile(p)
    			if err != nil {
    				return err
    			}
    			if !bytes.Contains(data, []byte(pattern)) {
    				return nil
    			}
    

    Once again we’ll use a select statement to make watch for the timeout firing before our processing has completed.

    			select {
    			case c <- p:
    			case <-ctx.Done():
    				return ctx.Err()
    			}
    			return nil
    		})
    	}
    

    This function will wait for all of the errgroup’s goroutines to complete then close the results channel, signalling that all processing is complete and terminating the range statement above.

    	go func() {
    		g.Wait()
    		close(c)
    	}()
    

    Now we collect the results from the channel and put them in a slice to return back to main().

    	var m []string
    	for r := range c {
    		m = append(m, r)
    	}
    

    Finally we’ll wrap up by checking for errors in the errgroup. If any of the goroutines above returned an error, we’ll return it back to main() with an empty resultset.

    	return m, g.Wait()
    }
    

    This simple application is far from optimal. For example, I read the entire contents of the source code file into memory before testing the contents for a pattern match. A streaming reader would be much more efficient. The arbitrary channel buffer size of 100 means that up to 100 different goroutines can be reading files into memory. If you have a huge directory full of huge Go source files, it might make a noticeable dent in your memory consumption. I was able to search my entire Go source tree for occurrences of the “fmt” package in just a few seconds without any spikes in CPU or memory usage, so I’ve decided to leave it unoptimized.

    The full source code is available on Github here.

    If you have Go installed, you can also run “go get github.com/bketelsen/gogrep” from your command line to download the application.

    I hope you’re as excited as I am about how sync.Errgroup makes concurrency in Go so much easier. If you’re not using it yet, now is the time to start—you’ll be glad you did. Concepts like these are a core part of my upcoming O’Reilly Go Beyond the Basics in-person training in Boston, October 3 & 4, and also my online training October 25 & 26.

    Continue reading Run strikingly fast parallel file searches in Go with sync.ErrGroup.


    Go to Source

    Related Posts

    • The real ground breakers: innovation in Kenya
    • Africa: PhdDs in Focus – a Zimbabwean Researcher Tells Her StoryAfrica: PhdDs in Focus – a Zimbabwean Researcher Tells Her Story
    • Open Innovation CommunityOpen Innovation Community
    • SA space sector to benefit from new satellite projectSA space sector to benefit from new satellite project
    • Automated evaluation of website navigability: an empirical validation of multilevel quality modelsAutomated evaluation of website navigability: an empirical validation of multilevel quality models
    • New airliner landing approach technique to save SAA fuel and timeNew airliner landing approach technique to save SAA fuel and time
    Sovrn
    Share

    Categories: Technology

    Tags: sync.ErrGroup

    Feeding innovation – lessons from India and Tanzania Infographic: The bot platform ecosystem

    Leave a Reply Cancel reply

    You must be logged in to post a comment.

Subscribe to our stories


 

Recent Posts

  • Entrepreneurial Alertness, Innovation Modes, And Business Models in Small- And Medium-Sized Enterprises December 30, 2021
  • The Strategic Role of Design in Driving Digital Innovation June 10, 2021
  • Correction to: Hybrid mosquitoes? Evidence from rural Tanzania on how local communities conceptualize and respond to modified mosquitoes as a tool for malaria control June 10, 2021
  • BRIEF FOCUS: Optimal spacing for groundnuts in smallholder farming systems June 9, 2021
  • COVID-19 pandemic: impacts on the achievements of Sustainable Development Goals in Africa June 9, 2021

Categories

Archives

Popular Post-All time

  • A review on biomass-based... 1k views
  • Apply Now: $500,000 for Y... 798 views
  • Can blockchain disrupt ge... 797 views
  • Test Your Value Propositi... 749 views
  • Prize-winning projects pr... 722 views

Recent Posts

  • Entrepreneurial Alertness, Innovation Modes, And Business Models in Small- And Medium-Sized Enterprises
  • The Strategic Role of Design in Driving Digital Innovation
  • Correction to: Hybrid mosquitoes? Evidence from rural Tanzania on how local communities conceptualize and respond to modified mosquitoes as a tool for malaria control
  • BRIEF FOCUS: Optimal spacing for groundnuts in smallholder farming systems
  • COVID-19 pandemic: impacts on the achievements of Sustainable Development Goals in Africa
  • Explicit knowledge networks and their relationship with productivity in SMEs
  • Intellectual property issues in artificial intelligence: specific reference to the service sector
  • Africa RISING publishes a livestock feed and forage production manual for Ethiopia
  • Transforming crop residues into a precious feed resource for small ruminants in northern Ghana
  • Photo report: West Africa project partners cap off 2020 with farmers field day events in Northern Ghana and Southern Mali

Tag Cloud

    africa African Agriculture Business Business model Business_Finance Company Crowdsourcing data Development East Africa economics Education Entrepreneur entrepreneurs Entrepreneurship ethiopia ghana Health_Medical_Pharma ict Information technology Innovation kenya knowledge Knowledge Management Leadership marketing mobile Mobile phone nigeria Open innovation Organization Research rwanda science Science and technology studies social enterprise social entrepreneurship south africa Strategic management strategy tanzania Technology Technology_Internet uganda

Categories

Archives

  • A review on biomass-based hydrogen production for renewable energy supply 1k views
  • Apply Now: $500,000 for Your Big Data Innovations in Agriculture 798 views
  • Can blockchain disrupt gender inequality? 797 views
  • Test Your Value Proposition: Supercharge Lean Startup and CustDev Principles 749 views
  • Prize-winning projects promote healthier eating, smarter crop investments 722 views

Copyright © 2005-2020 Innovation Africa Theme created by PWT. Powered by WordPress.org