温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Golang errgroup设计及实现原理是什么

发布时间:2022-08-29 16:28:32 来源:亿速云 阅读:154 作者:iii 栏目:开发技术

这篇文章主要介绍“Golang errgroup设计及实现原理是什么”,在日常操作中,相信很多人在Golang errgroup设计及实现原理是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Golang errgroup设计及实现原理是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

    errgroup 源码拆解

    errgroup 定义在 golang.org/x/sync/errgroup,承载核心能力的结构体是 Group。

    Group

    type token struct{}
    // A Group is a collection of goroutines working on subtasks that are part of
    // the same overall task.
    //
    // A zero Group is valid, has no limit on the number of active goroutines,
    // and does not cancel on error.
    type Group struct {
    	cancel func()
    	wg sync.WaitGroup
    	sem chan token
    	errOnce sync.Once
    	err     error
    }

    Group 就是对我们上面提到的一堆子任务执行计划的抽象。每一个子任务都会有自己对应的 goroutine 来执行。

    通过这个结构我们也可以看出来,errgroup 底层实现多个 goroutine 调度,等待的能力还是基于 sync.WaitGroup。

    WithContext

    我们可以调用 errgroup.WithContext 函数来创建一个 Group。

    // WithContext returns a new Group and an associated Context derived from ctx.
    //
    // The derived Context is canceled the first time a function passed to Go
    // returns a non-nil error or the first time Wait returns, whichever occurs
    // first.
    func WithContext(ctx context.Context) (*Group, context.Context) {
    	ctx, cancel := context.WithCancel(ctx)
    	return &Group{cancel: cancel}, ctx
    }

    这里可以看到,Group 的 cancel 函数本质就是为了支持 context 的 cancel 能力,初始化的 Group 只有一个 cancel 属性,其他都是默认的。一旦有一个子任务返回错误,或者是 Wait 调用返回,这个新 Context 就会被 cancel。

    Wait

    本质上和 WaitGroup 的 Wait 方法语义一样,既然我们是个 group task,就需要等待所有任务都执行完,这个语义就由 Wait 方法提供。如果有多个子任务返回错误,它只会返回第一个出现的错误,如果所有的子任务都执行成功,就返回 nil。

    // Wait blocks until all function calls from the Go method have returned, then
    // returns the first non-nil error (if any) from them.
    func (g *Group) Wait() error {
    	g.wg.Wait()
    	if g.cancel != nil {
    		g.cancel()
    	}
    	return g.err
    }

    Wait 的实现非常简单。一个前置的 WaitGroup Wait,结束后只做了两件事:

    • 如果对于公共的 Context 有 cancel 函数,就将其 cancel,因为事情办完了;

    • 返回公共的 Group 结构中的 err 给调用方。

    Go

    Group 的核心能力就在于能够并发执行多个子任务,从调用者的角度,我们只需要传入要执行的函数,签名为:func() error即可,非常通用。如果任务执行成功,就返回 nil,否则就返回 error,并且会 cancel 那个新的 Context。底层的调度逻辑由 Group 的 Go 方法实现:

    // Go calls the given function in a new goroutine.
    // It blocks until the new goroutine can be added without the number of
    // active goroutines in the group exceeding the configured limit.
    //
    // The first call to return a non-nil error cancels the group; its error will be
    // returned by Wait.
    func (g *Group) Go(f func() error) {
    	if g.sem != nil {
    		g.sem <- token{}
    	}
    	g.wg.Add(1)
    	go func() {
    		defer g.done()
    		if err := f(); err != nil {
    			g.errOnce.Do(func() {
    				g.err = err
    				if g.cancel != nil {
    					g.cancel()
    				}
    			})
    		}
    	}()
    }
    func (g *Group) done() {
    	if g.sem != nil {
    		<-g.sem
    	}
    	g.wg.Done()
    }

    我们重点来分析下 Go 这里发生了什么。

    WaitGroup 加 1 用作计数;

    启动一个新的 goroutine 执行调用方传入的 f() 函数;

    • 若 err 为 nil 说明执行正常;

    • 若 err 不为 nil,说明执行出错,此时将这个返回的 err 赋值给全局 Group 的变量 err,并将 context cancel 掉。注意,这里的处理在 once 分支中,也就是只有第一个来的错误会被处理。

    在 defer 语句中调用 Group 的 done 方法,底层依赖 WaitGroup 的 Done,说明这一个子任务结束。

    这里也可以看到,其实所谓 errgroup,我们并不是将所有子任务的 error 拼成一个字符串返回,而是直接在 Go 方法那里将第一个错误返回,底层依赖了 once 的能力。

    SetLimit

    其实看到这里,你有没有觉得 errgroup 的功能有点鸡肋?底层核心技术都是靠 WaitGroup 完成的,自己只不过是起了个 goroutine 执行方法,err 还只能保留一个。而且 Group 里面的 sem 那个 chan 是用来干什么呢?

    这一节我们就来看看,Golang 对 errgroup 能力的一次扩充。

    到目前为止,errgroup 是可以做到一开始人们对它的期望的,即并发执行子任务。但问题在于,这里是每一个子任务都开了个goroutine,如果是在高并发的环境里,频繁创建大量goroutine 这样的用法很容易对资源负载产生影响。开发者们提出,希望有办法限制 errgroup 创建的 goroutine 数量,参照这个 proposal: #27837

    // SetLimit limits the number of active goroutines in this group to at most n.
    // A negative value indicates no limit.
    //
    // Any subsequent call to the Go method will block until it can add an active
    // goroutine without exceeding the configured limit.
    //
    // The limit must not be modified while any goroutines in the group are active.
    func (g *Group) SetLimit(n int) {
    	if n < 0 {
    		g.sem = nil
    		return
    	}
    	if len(g.sem) != 0 {
    		panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
    	}
    	g.sem = make(chan token, n)
    }

    SetLimit 的参数 n 就是我们对这个 Group 期望的最大 goroutine 数量,这里其实除去校验逻辑,只干了一件事:g.sem = make(chan token, n),即创建一个长度为 n 的 channel,赋值给 sem。

    回忆一下我们在 Go 方法 defer 调用 done 中的表现,是不是清晰了很多?我们来理一下:

    首先,Group 结构体就包含了 sem 变量,只作为通信,元素是空结构体,不包含实际意义:

    type Group struct {
    	cancel func()
    	wg sync.WaitGroup
    	sem chan token
    	errOnce sync.Once
    	err     error
    }

    如果你对整个 Group 的 Limit 没有要求,which is fine,你就直接忽略这个 SetLimit,errgroup 的原有能力就可以支持你的诉求。

    但是,如果你希望保持 errgroup 的 goroutine 在一个上限之内,请在调用 Go 方法前,先 SetLimit,这样 Group 的 sem 就赋值为一个长度为 n 的 channel。

    那么,当你调用 Go 方法时,注意下面的框架代码,此时 g.sem 不为 nil,所以我们会塞一个 token 进去,作为占坑,语义上表示我申请一个 goroutine 用。

    func (g *Group) Go(f func() error) {
    	if g.sem != nil {
    		g.sem <- token{}
    	}
    	g.wg.Add(1)
    	go func() {
    		defer g.done()
                    ...
    	}()
    }

    当然,如果此时 goroutine 数量已经达到上限,这里就会 block 住,直到别的 goroutine 干完活,sem 输出了一个 token之后,才能继续往里面塞。

    在每个 goroutine 执行完毕后 defer 的 g.done 方法,则是完成了这一点:

    func (g *Group) done() {
    	if g.sem != nil {
    		<-g.sem
    	}
    	g.wg.Done()
    }

    这样就把 sem 的用法串起来了。我们通过创建一个定长的channel来实现对于 goroutine 数量的管控,对于channel实际包含的元素并不关心,所以用一个空结构体省一省空间,这是非常优秀的设计,大家平常也可以参考。

    TryGo

    TryGo 和 SetLimit 这套体系其实都是不久前欧长坤大佬提交到 errgroup 的能力。

    一如既往,所有带 TryXXX的函数,都不会阻塞。 其实办的事情非常简单,和 Go 方法一样,传进来一个 func() error来执行。

    Go 方法的区别在于,如果判断 limit 已经不够了,此时不再阻塞,而是直接 return false,代表执行失败。其他的部分完全一样。

    // TryGo calls the given function in a new goroutine only if the number of
    // active goroutines in the group is currently below the configured limit.
    //
    // The return value reports whether the goroutine was started.
    func (g *Group) TryGo(f func() error) bool {
    	if g.sem != nil {
    		select {
    		case g.sem <- token{}:
    			// Note: this allows barging iff channels in general allow barging.
    		default:
    			return false
    		}
    	}
    	g.wg.Add(1)
    	go func() {
    		defer g.done()
    		if err := f(); err != nil {
    			g.errOnce.Do(func() {
    				g.err = err
    				if g.cancel != nil {
    					g.cancel()
    				}
    			})
    		}
    	}()
    	return true
    }

    使用方法

    这里我们先看一个最常见的用法,针对一组任务,每一个都单独起 goroutine 执行,最后获取 Wait 返回的 err 作为整个 Group 的 err。

    package main
    import (
        "errors"
        "fmt"
        "time"
        "golang.org/x/sync/errgroup"
    )
    func main() {
        var g errgroup.Group
        // 启动第一个子任务,它执行成功
        g.Go(func() error {
            time.Sleep(5 * time.Second)
            fmt.Println("exec #1")
            return nil
        })
        // 启动第二个子任务,它执行失败
        g.Go(func() error {
            time.Sleep(10 * time.Second)
            fmt.Println("exec #2")
            return errors.New("failed to exec #2")
        })
        // 启动第三个子任务,它执行成功
        g.Go(func() error {
            time.Sleep(15 * time.Second)
            fmt.Println("exec #3")
            return nil
        })
        // 等待三个任务都完成
        if err := g.Wait(); err == nil {
            fmt.Println("Successfully exec all")
        } else {
            fmt.Println("failed:", err)
        }
    }

    你会发现,最后 err 打印出来就是第二个子任务的 err。

    当然,上面这个 case 是我们正好只有一个报错,但是,如果实际有多个任务都挂了呢?

    从完备性来考虑,有没有什么办法能够将多个任务的错误都返回呢?

    这一点其实 errgroup 库并没有提供非常好的支持,需要开发者自行做一些改造。因为 Group 中只有一个 err 变量,我们不可能基于 Group 来实现这一点。

    通常情况下,我们会创建一个 slice 来存储 f() 执行的 err。

    package main
    import (
        "errors"
        "fmt"
        "time"
        "golang.org/x/sync/errgroup"
    )
    func main() {
        var g errgroup.Group
        var result = make([]error, 3)
        // 启动第一个子任务,它执行成功
        g.Go(func() error {
            time.Sleep(5 * time.Second)
            fmt.Println("exec #1")
            result[0] = nil // 保存成功或者失败的结果
            return nil
        })
        // 启动第二个子任务,它执行失败
        g.Go(func() error {
            time.Sleep(10 * time.Second)
            fmt.Println("exec #2")
            result[1] = errors.New("failed to exec #2") // 保存成功或者失败的结果
            return result[1]
        })
        // 启动第三个子任务,它执行成功
        g.Go(func() error {
            time.Sleep(15 * time.Second)
            fmt.Println("exec #3")
            result[2] = nil // 保存成功或者失败的结果
            return nil
        })
        if err := g.Wait(); err == nil {
            fmt.Printf("Successfully exec all. result: %v\n", result)
        } else {
            fmt.Printf("failed: %v\n", result)
        }
    }

    可以看到,我们声明了一个 result slice,长度为 3。这里不用担心并发问题,因为每个 goroutine 读写的位置是确定唯一的。

    本质上可以理解为,我们把 f() 返回的 err 不仅仅给了 Group 一份,还自己存了一份,当出错的时候,Wait 返回的错误我们不一定真的用,而是直接用自己错的这一个 error slice。

    Go 官方文档中的利用 errgroup 实现 pipeline 的示例也很有参考意义:由一个子任务遍历文件夹下的文件,然后把遍历出的文件交给 20 个 goroutine,让这些 goroutine 并行计算文件的 md5。

    这里贴出来简化代码学习一下.

    package main
    import (
    	"context"
    	"crypto/md5"
    	"fmt"
    	"io/ioutil"
    	"log"
    	"os"
    	"path/filepath"
    	"golang.org/x/sync/errgroup"
    )
    // Pipeline demonstrates the use of a Group to implement a multi-stage
    // pipeline: a version of the MD5All function with bounded parallelism from
    // https://blog.golang.org/pipelines.
    func main() {
    	m, err := MD5All(context.Background(), ".")
    	if err != nil {
    		log.Fatal(err)
    	}
    	for k, sum := range m {
    		fmt.Printf("%s:\t%x\n", k, sum)
    	}
    }
    type result struct {
    	path string
    	sum  [md5.Size]byte
    }
    // MD5All reads all the files in the file tree rooted at root and returns a map
    // from file path to the MD5 sum of the file's contents. If the directory walk
    // fails or any read operation fails, MD5All returns an error.
    func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
    	// ctx is canceled when g.Wait() returns. When this version of MD5All returns
    	// - even in case of error! - we know that all of the goroutines have finished
    	// and the memory they were using can be garbage-collected.
    	g, ctx := errgroup.WithContext(ctx)
    	paths := make(chan string)
    	g.Go(func() error {
    		defer close(paths)
    		return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
    			if err != nil {
    				return err
    			}
    			if !info.Mode().IsRegular() {
    				return nil
    			}
    			select {
    			case paths <- path:
    			case <-ctx.Done():
    				return ctx.Err()
    			}
    			return nil
    		})
    	})
    	// Start a fixed number of goroutines to read and digest files.
    	c := make(chan result)
    	const numDigesters = 20
    	for i := 0; i < numDigesters; i++ {
    		g.Go(func() error {
    			for path := range paths {
    				data, err := ioutil.ReadFile(path)
    				if err != nil {
    					return err
    				}
    				select {
    				case c <- result{path, md5.Sum(data)}:
    				case <-ctx.Done():
    					return ctx.Err()
    				}
    			}
    			return nil
    		})
    	}
    	go func() {
    		g.Wait()
    		close(c)
    	}()
    	m := make(map[string][md5.Size]byte)
    	for r := range c {
    		m[r.path] = r.sum
    	}
    	// Check whether any of the goroutines failed. Since g is accumulating the
    	// errors, we don't need to send them (or check for them) in the individual
    	// results sent on the channel.
    	if err := g.Wait(); err != nil {
    		return nil, err
    	}
    	return m, nil
    }

    到此,关于“Golang errgroup设计及实现原理是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

    向AI问一下细节

    免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

    AI