本文转载自微信公众号「Golang来啦」,作者Seekload。转载本文请联系Golang来啦公众号。

你好,我是 Seekload。
今天给大家分享一篇 如何使用 context、waitGroup 实现程序快速且优雅退出 的文章!
原文如下:
最近,我正在编写一个“滴答器”的应用程序,每次“滴答”时可能会产生数千的 goroutine。我想确保当应用终止时,即使有一些特定的 goroutine 处理比较缓慢,它也能快速而优雅地退出。
刚开始的时候,围绕如何输出日志,我使用 sync.WaitGroup 实现流程控制,但我很快意识到如果我创建了很多 goroutine,即使其中很小一部分没有立即返回,我的程序会在终止时 hang 住。这让我重新考虑 context.WithCancel,并理解该如何重新调整我的程序,使其能快速且优雅地退出!
我们可以通过构建示例程序一步步来验证下,最初的示例程序并不会使用前面提到的技术点。
- package main
 - import (
 - "fmt"
 - "log"
 - "math/rand"
 - "os"
 - "os/signal"
 - "syscall"
 - "time"
 - )
 - func doSomething(ch chan int) {
 - fmt.Printf("Received job %d\n", <-ch)
 - }
 - func init() {
 - rand.Seed(time.Now().Unix())
 - }
 - func main() {
 - var (
 - closing = make(chan struct{})
 - ticker = time.NewTicker(1 * time.Second)
 - logger = log.New(os.Stderr, "", log.LstdFlags)
 - batchSize = 6
 - jobs = make(chan int, batchSize)
 - )
 - go func() {
 - signals := make(chan os.Signal, 1)
 - signal.Notify(signals, syscall.SIGTERM, os.Interrupt)
 - <-signals
 - close(closing)
 - }()
 - loop:
 - for {
 - select {
 - case <-closing:
 - break loop
 - case <-ticker.C:
 - for n := 0; n < batchSize; n++ {
 - jobs <- n
 - go doSomething(jobs)
 - }
 - logger.Printf("Completed doing %d things.", batchSize)
 - }
 - }
 - }
 
执行程序,我们会发现 Received job ... 和 Completed doing ... 会交替输出,输出可能类似下面这样:
- Received job 0
 - Received job 1
 - Received job 2
 - 2021/02/08 21:30:59 Completed doing 6 things.
 - Received job 3
 - Received job 4
 - Received job 5
 - 2021/02/08 21:31:00 Completed doing 6 things.
 
多次打印的结果并不一致!这是合理的,我们都知道 goroutines 并不会阻塞,所以除非我们对它做些什么,否则协程里的代码会立即执行。
我们添加 WaitGroup 来完善下流程,先在 var 代码块中定义变量:
- var (
 - ..
 - wg sync.WaitGroup
 - )
 
调整下 loop 循环:
- for n := 0; n < batchSize; n++ {
 - wg.Add(1)
 - jobs <- n
 - go doSomething(&wg, jobs)
 - }
 - wg.Wait()
 - logger.Printf("Completed doing %d things.", batchSize)
 
最后,修改协程函数:
- func doSomething(wg *sync.WaitGroup, ch chan int) {
 - defer wg.Done()
 - fmt.Printf("Received job %d\n", <-ch)
 - }
 
WaitGroups 会等待一组 goroutines 执行完成,仔细阅读代码我们发现:
很简单,是不是?我们再次执行程序,可以看到结果比之前的更一致:
- 2021/02/08 21:46:47 Completed doing 6 things.
 - Received job 0
 - Received job 1
 - Received job 2
 - Received job 4
 - Received job 5
 - Received job 3
 - 2021/02/08 21:46:48 Completed doing 6 things.
 - Received job 0
 - Received job 2
 - Received job 3
 - Received job 4
 - Received job 5
 - Received job 1
 
顺便说一句,与预期的一样,jobs 并不会按顺序执行,因为我们并没有采取任何措施来确保这一点。
在我们继续之前,按照目前的状态执行程序并尝试使用 Control+D 来终止程序,程序退出不会出现任何问题。
为了证明程序需要进一步完善,让我们添加一些代码模拟真实业务场景。我们新建一个函数,函数里面调用外部 API 并等待请求响应。请求过程中,我们将会调用 context.WithCancel 取消请求。
首先,创建一个未使用 context 的函数。下面的代码更复杂,有必要的话请看注释:
- func doAPICall(wg *sync.WaitGroup) error {
 - defer wg.Done()
 - req, err := http.NewRequest("GET", "https://httpstat.us/200", nil)
 - if err != nil {
 - return err
 - }
 - // The httpstat.us API accepts a sleep parameter which sleeps the request for the
 - // passed time in ms
 - q := req.URL.Query()
 - sleepMin := 1000
 - sleepMax := 4000
 - q.Set("sleep", fmt.Sprintf("%d", rand.Intn(sleepMax-sleepMin)+sleepMin))
 - req.URL.RawQuery = q.Encode()
 - // Make the request to the API in an anonymous function, using a channel to
 - // communicate the results
 - c := make(chan error, 1)
 - go func() {
 - // For the purposes of this example, we're not doing anything with the response.
 - _, err := http.DefaultClient.Do(req)
 - c <- err
 - }()
 - // Block until the channel is populated
 - return <-c
 - }
 
修改定时器“滴答”,删除调用 doSomething() 的代码、删除 jobs channel(不会再使用到它)并且调用 doAPICall()。
- for n := 0; n < batchSize; n++ {
 - wg.Add(1)
 - go doAPICall(&wg)
 - }
 
执行程序并再次尝试退出程序:
现在来演示 context.WithCancel 如何进一步控制程序取消。当 context.WithCancel 初始化之后,会返回一个 context 和取消函数 CancelFunc()。这个取消函数会取消 context,第一次听到这个会困惑。阅读 Go 官方博客的文章 Go Concurrency Patterns: Context[1] 对于进一步理解 context.WithCancel 会有所帮助,推荐阅读完本篇文章之后再看!
ok,我们回到正文。为了实现取消流程控制,需要修改下代码。首先,使用 context 创建一个取消函数:
- var (
 - ctx, cancel = context.WithCancel(context.Background())
 - ...
 - )
 
接着,在匿名函数里监听程序终止的信号,signals 被通知之后调用 CancelFunc,这意味着上下文将被视为已取消:
- go func() {
 - signals := make(chan os.Signal, 1)
 - signal.Notify(signals, syscall.SIGTERM, os.Interrupt)
 - <-signals
 - logger.Println("Initiating shutdown of producer.")
 - cancel()
 - close(closing)
 - }()
 
接着,调整 doAPICall() 函数,多接收一个 context 参数;使用 select-case 修改函数返回,等待 ctx.Done 或等待请求响应。为了简介,只展示了函数部分代码:
- func doAPICall(ctx context.Context, ....) {
 - // Cancel the request if ctx.Done is closed or await the response
 - select {
 - case <-ctx.Done():
 - return ctx.Err()
 - case err := <-c:
 - return err
 - }
 - }
 
最后,确保调用 doAPICall() 函数时传递了 context 参数。现在,运行程序并多次在不同的时间点终止程序。
现在会发生什么?程序会立即退出。select-case 代码会监听 ctx.Done 是否关闭或者接口请求是否响应,哪个 case 的 channel 信号先到就先执行谁。当应用程序终止时,ctx.Done() 优先执行并且函数提前返回,不再关心请求是否响应。WaitGroup 的作用没变 - 等待一组 goroutines 完成。现在,程序的终止流程得到很大改善。
Go 的基本哲学之一就是:
Don't communicate by sharing memory; share memory by communicating.
这里,我们使用 channel 在 goroutines 之间传递引用,这使得我们能够改进应用程序的流程。
有很多种办法可以用来改善流程,例如,我们不跨 goroutine 接收 API 的响应或者错误。值得庆幸的是,Go 很容易就可以实现这点,因此可以将它视为一个起点,如果你还想完善,可以尝试下这些想法。
下面是完整的示例,仅供参考:
- package main
 - import (
 - "context"
 - "fmt"
 - "log"
 - "math/rand"
 - "net/http"
 - "os"
 - "os/signal"
 - "sync"
 - "syscall"
 - "time"
 - )
 - func doAPICall(ctx context.Context, wg *sync.WaitGroup) error {
 - defer wg.Done()
 - req, err := http.NewRequest("GET", "https://httpstat.us/200", nil)
 - if err != nil {
 - return err
 - }
 - // The httpstat.us API accepts a sleep parameter which sleeps the request for the
 - // passed time in ms
 - q := req.URL.Query()
 - sleepMin := 1000
 - sleepMax := 4000
 - q.Set("sleep", fmt.Sprintf("%d", rand.Intn(sleepMax-sleepMin)+sleepMin))
 - req.URL.RawQuery = q.Encode()
 - c := make(chan error, 1)
 - go func() {
 - // For the purposes of this example, we're not doing anything with the response.
 - _, err := http.DefaultClient.Do(req)
 - c <- err
 - }()
 - // Block until either channel is populated or closed
 - select {
 - case <-ctx.Done():
 - return ctx.Err()
 - case err := <-c:
 - return err
 - }
 - }
 - func init() {
 - rand.Seed(time.Now().Unix())
 - }
 - func main() {
 - var (
 - closing = make(chan struct{})
 - ticker = time.NewTicker(1 * time.Second)
 - logger = log.New(os.Stderr, "", log.LstdFlags)
 - batchSize = 6
 - wg sync.WaitGroup
 - ctx, cancel = context.WithCancel(context.Background())
 - )
 - go func() {
 - signals := make(chan os.Signal, 1)
 - signal.Notify(signals, syscall.SIGTERM, os.Interrupt)
 - <-signals
 - cancel()
 - close(closing)
 - }()
 - loop:
 - for {
 - select {
 - case <-closing:
 - break loop
 - case <-ticker.C:
 - for n := 0; n < batchSize; n++ {
 - wg.Add(1)
 - go doAPICall(ctx, &wg)
 - }
 - wg.Wait()
 - logger.Printf("Completed doing %d things.", batchSize)
 - }
 - }
 - }
 
最后一点,本文部分代码受到博文 Go Concurrency Patterns: Context[2] 的启发,再次推荐这篇文章。这篇文章还介绍了其他控制函数,比如:context.WithTimeout 等。Go 官方博客是每个人都应该阅读的宝库!
参考资料
[1]Go Concurrency Patterns: Context: https://blog.golang.org/context
[2]Go Concurrency Patterns: Context: https://blog.golang.org/context
via:https://justbartek.ca/p/golang-context-wg-go-routines/
作者:Bartek
                网站栏目:使用Context、WaitGroup优雅处理Goroutine
                
                网页网址:http://www.csdahua.cn/qtweb/news30/312480.html
            
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网