Golang是一种易于理解且快捷方便的编程语言,当然他也确实存在不好理解的,让几乎所有人都迷惑且伤脑的部分——Golang通道。
在实际操作时候基本上可以想象通道为锁定的、缓冲的队列。其原理为API与队列匹配,由队列实现,类比队列的抽象最容易编写和读取通道代码。
但是仅仅了解通道API的工作原理还不够。如果要能正确地、灵活地、高效地用好通道生态系统及其威力。通常还取决于等待组,错误组,go协程和其他并发原语。了解Golang运行时如何管理通道很重要。在某些情况下,通道会添加不必要的内容与互斥体或原子相比的开销。
本文虫虫将带大家一起深入探讨这些主题,以便可以更好理解Go通道,并最终善用它。
队列锁通道被实现为具有锁保护访问的队列。发送者将其添加到队列中。接收者从队列中读取。队列的大小总是固定的。发送者可能会遇到队列堆积满员,需要等待添才能添加,接收者也可以堆积竞争下一个项目。发送者和接收队列(与消息队列类比),链接到队列列表。Go运行时手动停止并向发送器和接收器发出信号。例如,避免发送者在满员队列上浪费时间,而接收器可以正常工作。
以下是将工作发送到通道的方法:
func send(w Work, ch chan Work) {
ch <- w
}
以下是从通道中读取信息的方法:
func recv(ch chan Work) (Work, bool) {
w, ok := <-ch
return w, ok
}
类似于从队列中添加/删除的方法:
func (q *Queue) send(w Work) {
q = append(q, w)
}
func (q *Queue) recv() (Work, bool) {
if len(q) == 0 {
return nil, false
}
w := q[0]
q = q[1:]
return w, true
}
由于通道操作可能会阻塞,因此selec语句用于在通道满员时扩展选择。下面是在发送消息到通道,但如通道满员导致发送失败:
func trySend(w Work, ch chan Work) (success bool) {
select {
case ch <-w:
return true
default:
return false
}
}
同样,也可以提供额外的通道来提供缓冲阻塞:
func sendWithEscapeHatch(w Work, ch chan Work, done chan struct{}) bool {
select {
case ch <- w:
return true
case <-done:
return false
}
}
func escapeHatch(done chan struct{}) {
close(done)
}
全通道实施有很多微妙的细节,但是类别队列的行为,对通道理解很有帮助:
写入已满的队列-> 发送者排队等待
从空队列中读取 -> 接受者排队等待
从关闭的队列中读取 -> 读取值直到为空
写入封闭队列 -> 应用程序做错了什么,要么错过了工作,要么程序应该中断编写器
关闭一个零通道或之前关闭的通道 -> 应用程序应该做错了什么
并发原语思考通道的第二种方式不是将其视为孤立的抽象,作为Golang生态系统的基本组成部分并发控制。要使用通道编写并发代码,需要了解其基本几种并发原语。
Go协程研究以下代码示例:
go func() {
fmt.Println("chongchong 1")
}()
go func() {
fmt.Println("chongchong 2")
}()
试问先打印“chongchong 1”还是“chongchong 2”呢,这是不确定的。因为这两个函数都在新线程中运行与原始调用上下文分开。
在使用通道的代码中经常在单独的Go协程运行send和recv操作,因为Go运行时根据工作的构建位置来切换上下文。
ch := make(chan Work)
go recv(ch)
for _, w := range work {
go send(w, ch)
}
定时器超时和定时器是两个使用基于时间的逻辑扩展选择块的方法,以下示例中使用一个ticker定时器每秒发送一个任务。
ticker := time.NewTicker(1 * time.Second)
for _, w := range work {
select {
case <-ticker.C
ch <- w
}
}
用超时可以做同样的事情:
for _, w := range work {
select {
case <-time.After(1 * time.Second):
ch <- w
}
}
计时器可以对通道的吞吐量进行速率限制,并且计时器可以用于中断需要很长时间才能完成的工作。
等待组等待组和错误组有两种协调线程关闭的方法。前面代码中已经使用过done模式,它可以用来杀死掉队的发送者/接收者运行时在后续步骤中在退出:
select {
case ch <- w:
return true
case <-done:
return false
}
如果提前知道工作量并且不关心错误,等待组阻塞主进程,直到出现一组固定的并发线程才结束:
func sumRange(n int) int {
var wg sync.WaitGroup
work := make(chan int)
res := make(chan int)
go func() {
var sum int
for {
select {
case i, ok := <-work:
if !ok {
res <- sum
return
}
sum += i
}
}
}()
for i := 0; i < n; i++ {
wg.Add(1) // 增加任务计数
i := i
go func() {
work <- i
wg.Done() // 减少任务计数
}()
}
wg.Wait() // 在计数为零时候解锁
close(work) // 等发送者完成,再安全退出
return <-res //队列清空后res
}
将发送者和接收者分两个池。发送者从WaitGroup递增和递减以表示正在进行的任务。接收者是一个单一的go协程,它接受来自发送者的任务。当发送者完成发送工作后,wg.Wait()解锁。这时,不会再有工作将添加到队列中,因此可以关闭任务通道。最后,使用一个结果通道res告诉接收者在返回之前结束添加值。
错误组上面处理中,应用程序提前了解其关闭例程,并使用done和等待组处理,这样的情况在实际中其实很难简单。根据经验,关闭时候通常会发生不可预测的情况,需要组合原语以优雅地完成以下三个工作:
(a)关停发送者,(b)刷新队列, (c)关停接收者。
队列再次分为发送者和执行者。现在发送者通过错误组eg而不是等待组来协调 与彼此。接收者线程仍然从队列中提取任务积累结果。结束也类似。finalize()阻塞直到正在进行的发送者完成所有的工作。发送者完成后,我们就可以安全地 close(c.work)。还有接收者将在清空队列后用结果通道占位填充。
和等待组的方法的主要区别在于这个方法支持有无限的任务。
无限制的工作当也会引发许多失败案例。如果ctx被取消,<-ctx.Done()会立即返回。发送者和接收器会提前关闭,错误组将不接受新的工作,并且调用 finalize将返回遇到的第一个错误。
在后一个示例中,还可以通过其他方式来管理关闭。可以有两个单独的错误组来跟踪发送者和接收者。可以使用done接收器错误向发送器发出信号停止通道(或至少避免阻塞)。
关于如何选择正确的组织模式和原语组合可能是一门艺术,需要大家根据自己的实际任务和经验来抉择。
消息传递最后说下Goer广为流传的一句话:“以消息传递方式共享内存,不要以共享内存方式传递消息”。
消息传递与共享内存不同。复制值发送方/接收方堆栈之间的共享比共享内存更安全。但它是也可以将带有指针的值发送到通道中。Golang对运行时的控制go协程使通道实现非常高效且是自动实现的。
Go的运行时可以暂停和恢复线程取决于有工作要做的地方。并发增加了一个上下文切换的固定开销,但是对于用户队列实现具有同样的性能。将通道嵌入到语言运行时也使得更容易编写更积极的并发代码。通道生态系统使得其更难学,但它也可以让我们做更多的事情。
Go运行时的智能上下文切换使之成为可能最大限度地提高网络通话之间的生产效率。
总结通道作为队列抽象最容易理解。但仅靠通道是不够的。实际应用取决于Golang 中的一组并发原语。最后,通道通常抽象细节,使开发人员的生活更轻松也比其他并发选项性能更高。所有这些抽象都是有用的,但它们通常适用于不同的情况。