如何优雅地关闭Go channel

作者: 天唯 | 来源:发表于2017-02-17 00:47 被阅读9253次

本文译自:How To Close Channels in Golang Elegantly
几天前,我写了一篇文章来说明golang中channel的使用规范。在redditHN,那篇文章收到了很多赞同,但是我也收到了下面几个关于Go channel设计和规范的批评:

  1. 在不能更改channel状态的情况下,没有简单普遍的方式来检查channel是否已经关闭了
  2. 关闭已经关闭的channel会导致panic,所以在closer(关闭者)不知道channel是否已经关闭的情况下去关闭channel是很危险的
  3. 发送值到已经关闭的channel会导致panic,所以如果sender(发送者)在不知道channel是否已经关闭的情况下去向channel发送值是很危险的

那些批评看起来都很有道理(实际上并没有)。是的,没有一个内置函数可以检查一个channel是否已经关闭。如果你能确定不会向channel发送任何值,那么也确实需要一个简单的方法来检查channel是否已经关闭:

package main

import "fmt"

type T int

func IsClosed(ch <-chan T) bool {
    select {
    case <-ch:
        return true
    default:
    }
    
    return false
}

func main() {
    c := make(chan T)
    fmt.Println(IsClosed(c)) // false
    close(c)
    fmt.Println(IsClosed(c)) // true
}

上面已经提到了,没有一种适用的方式来检查channel是否已经关闭了。但是,就算有一个简单的 closed(chan T) bool函数来检查channel是否已经关闭,它的用处还是很有限的,就像内置的len函数用来检查缓冲channel中元素数量一样。原因就在于,已经检查过的channel的状态有可能在调用了类似的方法返回之后就修改了,因此返回来的值已经不能够反映刚才检查的channel的当前状态了。
尽管在调用closed(ch)返回true的情况下停止向channel发送值是可以的,但是如果调用closed(ch)返回false,那么关闭channel或者继续向channel发送值就不安全了(会panic)。

The Channel Closing Principle

在使用Go channel的时候,一个适用的原则是不要从接收端关闭channel,也不要关闭有多个并发发送者的channel。换句话说,如果sender(发送者)只是唯一的sender或者是channel最后一个活跃的sender,那么你应该在sender的goroutine关闭channel,从而通知receiver(s)(接收者们)已经没有值可以读了。维持这条原则将保证永远不会发生向一个已经关闭的channel发送值或者关闭一个已经关闭的channel。
(下面,我们将会称上面的原则为channel closing principle

打破channel closing principle的解决方案

如果你因为某种原因从接收端(receiver side)关闭channel或者在多个发送者中的一个关闭channel,那么你应该使用列在Golang panic/recover Use Cases的函数来安全地发送值到channel中(假设channel的元素类型是T)

func SafeSend(ch chan T, value T) (closed bool) {
    defer func() {
        if recover() != nil {
            // the return result can be altered 
            // in a defer function call
            closed = true
        }
    }()
    
    ch <- value // panic if ch is closed
    return false // <=> closed = false; return
}

如果channel ch没有被关闭的话,那么这个函数的性能将和ch <- value接近。对于channel关闭的时候,SafeSend函数只会在每个sender goroutine中调用一次,因此程序不会有太大的性能损失。
同样的想法也可以用在从多个goroutine关闭channel中:

func SafeClose(ch chan T) (justClosed bool) {
    defer func() {
        if recover() != nil {
            justClosed = false
        }
    }()
    
    // assume ch != nil here.
    close(ch) // panic if ch is closed
    return true
}

很多人喜欢用sync.Once来关闭channel:

type MyChannel struct {
    C    chan T
    once sync.Once
}

func NewMyChannel() *MyChannel {
    return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
    mc.once.Do(func(){
        close(mc.C)
    })
}

当然了,我们也可以用sync.Mutex来避免多次关闭channel:

type MyChannel struct {
    C      chan T
    closed bool
    mutex  sync.Mutex
}

func NewMyChannel() *MyChannel {
    return &MyChannel{C: make(chan T)}
}

func (mc *MyChannel) SafeClose() {
    mc.mutex.Lock()
    if !mc.closed {
        close(mc.C)
        mc.closed = true
    }
    mc.mutex.Unlock()
}

func (mc *MyChannel) IsClosed() bool {
    mc.mutex.Lock()
    defer mc.mutex.Unlock()
    return mc.closed
}

我们应该要理解为什么Go不支持内置SafeSendSafeClose函数,原因就在于并不推荐从接收端或者多个并发发送端关闭channel。Golang甚至禁止关闭只接收(receive-only)的channel。

保持channel closing principle的优雅方案

上面的SaveSend函数有一个缺点是,在select语句的case关键字后不能作为发送操作被调用(译者注:类似于 case SafeSend(ch, t):)。另外一个缺点是,很多人,包括我自己都觉得上面通过使用panic/recoversync包的方案不够优雅。针对各种场景,下面介绍不用使用panic/recoversync包,纯粹是利用channel的解决方案。
(在下面的例子总,sync.WaitGroup只是用来让例子完整的。它的使用在实践中不一定一直都有用)

  • M个receivers,一个sender,sender通过关闭data channel说“不再发送”
    这是最简单的场景了,就只是当sender不想再发送的时候让sender关闭data 来关闭channel:
package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)
    
    // ...
    const MaxRandomNumber = 100000
    const NumReceivers = 100
    
    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)
    
    // ...
    dataCh := make(chan int, 100)
    
    // the sender
    go func() {
        for {
            if value := rand.Intn(MaxRandomNumber); value == 0 {
                // the only sender can close the channel safely.
                close(dataCh)
                return
            } else {            
                dataCh <- value
            }
        }
    }()
    
    // receivers
    for i := 0; i < NumReceivers; i++ {
        go func() {
            defer wgReceivers.Done()
            
            // receive values until dataCh is closed and
            // the value buffer queue of dataCh is empty.
            for value := range dataCh {
                log.Println(value)
            }
        }()
    }
    
    wgReceivers.Wait()
}
  • 一个receiver,N个sender,receiver通过关闭一个额外的signal channel说“请停止发送”
    这种场景比上一个要复杂一点。我们不能让receiver关闭data channel,因为这么做将会打破channel closing principle。但是我们可以让receiver关闭一个额外的signal channel来通知sender停止发送值:
package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)
    
    // ...
    const MaxRandomNumber = 100000
    const NumSenders = 1000
    
    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(1)
    
    // ...
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})
        // stopCh is an additional signal channel.
        // Its sender is the receiver of channel dataCh.
        // Its reveivers are the senders of channel dataCh.
    
    // senders
    for i := 0; i < NumSenders; i++ {
        go func() {
            for {
                value := rand.Intn(MaxRandomNumber)
                
                select {
                case <- stopCh:
                    return
                case dataCh <- value:
                }
            }
        }()
    }
    
    // the receiver
    go func() {
        defer wgReceivers.Done()
        
        for value := range dataCh {
            if value == MaxRandomNumber-1 {
                // the receiver of the dataCh channel is
                // also the sender of the stopCh cahnnel.
                // It is safe to close the stop channel here.
                close(stopCh)
                return
            }
            
            log.Println(value)
        }
    }()
    
    // ...
    wgReceivers.Wait()
}

正如注释说的,对于额外的signal channel来说,它的sender是data channel的receiver。这个额外的signal channel被它唯一的sender关闭,遵守了channel closing principle

  • M个receiver,N个sender,它们当中任意一个通过通知一个moderator(仲裁者)关闭额外的signal channel来说“让我们结束游戏吧”
    这是最复杂的场景了。我们不能让任意的receivers和senders关闭data channel,也不能让任何一个receivers通过关闭一个额外的signal channel来通知所有的senders和receivers退出游戏。这么做的话会打破channel closing principle。但是,我们可以引入一个moderator来关闭一个额外的signal channel。这个例子的一个技巧是怎么通知moderator去关闭额外的signal channel:
package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
    "strconv"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)
    
    // ...
    const MaxRandomNumber = 100000
    const NumReceivers = 10
    const NumSenders = 1000
    
    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)
    
    // ...
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})
        // stopCh is an additional signal channel.
        // Its sender is the moderator goroutine shown below.
        // Its reveivers are all senders and receivers of dataCh.
    toStop := make(chan string, 1)
        // the channel toStop is used to notify the moderator
        // to close the additional signal channel (stopCh).
        // Its senders are any senders and receivers of dataCh.
        // Its reveiver is the moderator goroutine shown below.
    
    var stoppedBy string
    
    // moderator
    go func() {
        stoppedBy = <- toStop // part of the trick used to notify the moderator
                              // to close the additional signal channel.
        close(stopCh)
    }()
    
    // senders
    for i := 0; i < NumSenders; i++ {
        go func(id string) {
            for {
                value := rand.Intn(MaxRandomNumber)
                if value == 0 {
                    // here, a trick is used to notify the moderator
                    // to close the additional signal channel.
                    select {
                    case toStop <- "sender#" + id:
                    default:
                    }
                    return
                }
                
                // the first select here is to try to exit the
                // goroutine as early as possible.
                select {
                case <- stopCh:
                    return
                default:
                }
                
                select {
                case <- stopCh:
                    return
                case dataCh <- value:
                }
            }
        }(strconv.Itoa(i))
    }
    
    // receivers
    for i := 0; i < NumReceivers; i++ {
        go func(id string) {
            defer wgReceivers.Done()
            
            for {
                // same as senders, the first select here is to 
                // try to exit the goroutine as early as possible.
                select {
                case <- stopCh:
                    return
                default:
                }
                
                select {
                case <- stopCh:
                    return
                case value := <-dataCh:
                    if value == MaxRandomNumber-1 {
                        // the same trick is used to notify the moderator 
                        // to close the additional signal channel.
                        select {
                        case toStop <- "receiver#" + id:
                        default:
                        }
                        return
                    }
                    
                    log.Println(value)
                }
            }
        }(strconv.Itoa(i))
    }
    
    // ...
    wgReceivers.Wait()
    log.Println("stopped by", stoppedBy)
}

在这个例子中,仍然遵守着channel closing principle
请注意channel toStop的缓冲大小是1.这是为了避免当mederator goroutine 准备好之前第一个通知就已经发送了,导致丢失。

  • 更多的场景?
    很多的场景变体是基于上面三种的。举个例子,一个基于最复杂情况的变体可能要求receivers读取buffer channel中剩下所有的值。这应该很容易处理,所有这篇文章也就不提了。
    尽管上面三种场景不能覆盖所有Go channel的使用场景,但它们是最基础的,实践中的大多数场景都可以分类到那三种中。

结论

这里没有一种场景要求你去打破channel closing principle。如果你遇到了这种场景,请思考一下你的设计并重写你的代码。
用Go编程就像在创作艺术。

相关文章

网友评论

  • 39d6aa82455d:一个receiver,多个sender的例子和多个receiver,多个sender的例子貌似并没有关闭dataChan,如果要是全局的dataChan, 干嘛还要搞这么复杂呢?
  • Jancd:可以转载你的这篇博文么?写的太好了
  • bc59f08c32a5:楼主您好,看了您的代码我有一个疑问,m个reiceiver、一个sender 这里是receiver收到某个特定值就通知sender退出,m个receiver、n个sender 也是在 reiceiver和sender中都有特定的值来通知moderator来退出,但是假如我没有一个这样的值,我只能在所有发送者都发完才关闭channel的情况下,该如何解决?
    bc59f08c32a5:@天唯 看来只能这样了,我的一个爬虫,类似于一个流水线,每一步都有发送、接收两个操作,如果在每一部都用sync.waitGroup的话,逻辑理起来有点烦,wg.Add(1)总是插错位置
    天唯:发送完了都退出吗?如果是可以使用sync.WaitGroup等待所有goroutine退出,然后关闭channel
  • 9ab9efcc6eec:楼主你好, 关于第三个例子有些问题请教

    1. value==0时为什么还要加个select, 不能直接发送给toStop吗?

    ```
    if value == 0 {
    // here, a trick is used to notify the moderator
    // to close the additional signal channel.
    select {
    case toStop <- "sender#" + id:
    default:
    }
    return
    }
    ```
    2. select stopCh 为什么写了两次? 第一个select可以省略吗?

    ```
    // the first select here is to try to exit the
    // goroutine as early as possible.
    select {
    case <- stopCh:
    return
    default:
    }

    select {
    case <- stopCh:
    return
    case dataCh <- value:
    }
    ```
    3. toStop的缓冲大小是1, 为了避免准备好之前通知就发送了怎么理解??

    请注意channel toStop的缓冲大小是1.这是为了避免当mederator goroutine 准备好之前第一个通知就已经发送了,导致丢失。
    9ab9efcc6eec:@天唯 楼主, 第3个问题, 我感觉是, 因为有缓冲的 发送 happens_before 接收之前, 所以mederator能保证接收到数据, 无缓冲的 接收 happens_before 发送之间, 可能会丢失数据
    天唯:1.toStop的buffer只有1,如果多个同时发送给toStop的话,会导致阻塞在 toStop <- id,所以使用了select,这样子当不能发送的时候就知道已经有其他goroutine发送了信号了。其实也可以将toStop的buffer大小改成接收者和发送者数量之和,这样子就可以直接发送了。
    2.注释也说了,是为了提前知道channel是否已经关闭了,如果省略了这个select,有可能计算关闭了channel,也会执行发送操作,因为在一个select里面,是随机选择一个能执行的case来执行的
    3.这个其实我也觉得奇怪,因为channel的数据是先进先出不会丢失的,个人感觉原作者这个说法有点问题,如果有知道的可以说一下
  • 81cc806cc824:赞, 学到了
  • ba6f8e025e77:我觉得最佳的实践是,不关闭任何channel。
    channel类似一个邮件地址,不发送邮件就好了,关闭一个email大部分时候是没必要的。
    天唯:有一些使用场景是用channel来做通知的,需要进行关闭通知
  • coloc:有多个发送者与接收者的情况, 通过一个互斥方法来获取chan引用, 以状态变量来维持chan的关闭状态, 同时关闭一个signal channel来通知接收者。这样实现起来或许简单点。
  • 5925d5123ff6:天唯 你好,我把此文章放到了 linkedinfo (https://www.linkedinfo.co/infos/) ,我的一个业余项目。如您觉得不妥,请随时联系我将之撤下。

    linkedinfo.co 是一个集合了各类优秀技术文章的站点,不展现全文,所有条目都附上作者与原文链接,读者最终都会进入到作者原文的站点。做 linkedinfo 的初衷是想方便自己且方便其他想学习的朋友,能更方便地找到自己感兴趣的技术文章。

    各位感兴趣的朋友可以去 https://linkedinfo.co 看看,非常希望能得到你们的意见与建议 (可在about 页面留言 https://linkedinfo.co/about )。
  • 开发者头条_程序员必装的App:感谢分享!已推荐到《开发者头条》:https://toutiao.io/posts/15220p 欢迎点赞支持!
    欢迎订阅《Go语言学习笔记》https://toutiao.io/subject/227710
  • 590f9055acc7:don't close a channel if the channel has multiple concurrent senders
    应该翻译成“不要关闭有多个发送者的channel”

    倒数第二个例子,dataCh有多个发送者,所以没有close(dataCh)
    最后一个例子,dataCh和toStop都有多个发送者,所以都没关闭
    天唯:谢谢指正!
  • zjh821:又学到东西了👍👍

本文标题:如何优雅地关闭Go channel

本文链接:https://www.haomeiwen.com/subject/cclkwttx.html