【笔记】如何优雅的关闭 Go channel

作者 tinywell 日期 2019-08-07
【笔记】如何优雅的关闭 Go channel

读了大神的博文How to Gracefully Close Channels 之后,对于 Go channel 的关闭动作有了一些更清晰的认知,整理一点笔记,用于需要时翻阅。

为什么要关注 channel 的关闭

之所以要小心翼翼的对待 channel 的关闭,有两点原因:

    1. 向一个已关闭的 channel 中发送数据会导致 panic;
    1. 关闭一个已关闭的 channel 或导致 panic;

也就是说,胡乱关闭 channel 的话,搞不好程序就崩掉了,这显然不是我们希望看到的。

怎么办?

针对上面这两个问题,为了防止程序崩掉我们总结出关闭 channel 时需要避开的问题:

    1. 发送数据的一方必须掌握 channel 是否关闭的信息(防止向一个已关闭 channel 发送数据);
    1. 在并发环境中,只允许一个 goroutine 可以执行关闭 channel 的动作(防止关闭一个已关闭的 channel);

有了这个原则,我们在实际应用中,面对不同情景就有了应对的依据。

另外,关于 channel 的一些小知识会帮助我们解决问题:

  • 从一个已关闭的 channel 中接收数据不会导致程序异常;
  • 关闭一个 channel,那么所有接收这个 channel 的 select case 都会收到信号;

情景分析

channel 只有一个发送方的情况

这种情况下,只要发送方掌握 channel 的关闭主动权即可,我们可以通过在发送方的 goroutine 中检测关闭条件,并在满足条件的情况下直接关闭 channel,停止发送行为就行了。

因为是发送方关闭的 channel,所以可以及时掌握通道关闭情况,及时中止发送行为,满足第一个条件。同时发送方只有一个,不会重复关闭,满足第二个条件。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import (
"time"
"math/rand"
"sync"
"log"
)

func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)

// ...
const Max = 100000
const NumReceivers = 100

wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)

// ...
dataCh := make(chan int, 100)

// the sender
go func() {
for {
if value := rand.Intn(Max); value == 0 {
// The only sender can close
// the channel safely.
close(dataCh)
return
} else {
dataCh <- value
}
}
}()

// receivers
for i := 0; i < NumReceivers; i++ {
go func() {
defer wgReceivers.Done()

// Receive values until dataCh is
// closed and the value buffer queue
// of dataCh becomes empty.
for value := range dataCh {
log.Println(value)
}
}()
}

wgReceivers.Wait()
}

channel 有多个发送方一个接收方

如果存在多个发送方并行执行的话,那么在发送方直接关闭 channel 就不行了,因为很可能造成多次关闭,违背第二个条件;显然,我们也不能在接收方关闭 channel,这样发送方不能掌握其关闭情况,很可能向关闭了的 channel 中发送数据,违背第一个条件。

这时候,我们需要引入一个通知角色,用一个发送通知的 channel 来传递关闭 channel 的信号。因为接收方只有一个,我们可以让接收者来掌握这个通知 channel,而发送方来监听这个通知 channel 的情况。当接收者检测到关闭条件时,关闭这个通知 channel,从而告诉接收方,停止发送动作,并最终关闭数据 channel。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package main

import (
"time"
"math/rand"
"sync"
"log"
)

func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)

// ...
const MaxRandomNumber = 100000
const NumSenders = 1000

wgReceivers := sync.WaitGroup{}
wgReceivers.Add(1)

// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the receiver of channel dataCh.
// Its reveivers are the senders of channel dataCh.

// senders
for i := 0; i < NumSenders; i++ {
go func() {
for {
value := rand.Intn(MaxRandomNumber)

select {
case <- stopCh:
return
case dataCh <- value:
}
}
}()
}

// the receiver
go func() {
defer wgReceivers.Done()

for value := range dataCh {
if value == MaxRandomNumber-1 {
// the receiver of the dataCh channel is
// also the sender of the stopCh cahnnel.
// It is safe to close the stop channel here.
close(stopCh)
return
}

log.Println(value)
}
}()

// ...
wgReceivers.Wait()
}

channel 有多个发送方以及多个接收方

这种情况下,关闭信号 channel 也不合适了,因为发送方和接收方都有多个,关闭哪个 channel 都会违背第二个条件。

这时,我们需要引入一个新的中间人 channel,在满足退出条件是,用这个中间人发送信息告诉一个 goroutine 去关闭信号 channel(这样只会有一个 goroutine 去关闭通知 channel,不违背第二个条件),从而推动 channel 关闭流程(后面过程类似多发送方单接收方的场景)。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package main

import (
"time"
"math/rand"
"sync"
"log"
"strconv"
)

func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)

// ...
const MaxRandomNumber = 100000
const NumReceivers = 10
const NumSenders = 1000

wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)

// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the moderator goroutine shown below.
// Its reveivers are all senders and receivers of dataCh.
toStop := make(chan string, 1)
// the channel toStop is used to notify the moderator
// to close the additional signal channel (stopCh).
// Its senders are any senders and receivers of dataCh.
// Its reveiver is the moderator goroutine shown below.

var stoppedBy string

// moderator
go func() {
stoppedBy = <- toStop // part of the trick used to notify the moderator
// to close the additional signal channel.
close(stopCh)
}()

// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(MaxRandomNumber)
if value == 0 {
// here, a trick is used to notify the moderator
// to close the additional signal channel.
select {
case toStop <- "sender#" + id:
default:
}
return
}

// the first select here is to try to exit the
// goroutine as early as possible.
select {
case <- stopCh:
return
default:
}

select {