Go channel 实现
整体结构
Channel 的操作对应在运行时里的方法(都在 $GORROT/src/runtime/chan.go 里)
Channel 操作 | 运行时方法 |
---|---|
make(chan int, 4) | makechan(int, 4) |
make(chan int) | makechan(int, 0) |
ch <- v | chansend1(ch, &v) |
v <- ch | chanrecv1(ch, &v) |
close(ch) | closechan(ch) |
channel 的基本数据结构
makechan 返回的是一个叫做 hchan 的结构。所以 chan 本质上是用 hchan 表示的,它的传参与赋值始终都是指针形式,每个 hchan 对象代表着一个 chan。
type hchan struct{
qcount uint // 队列里的元素总数目
dataqsiz uint // 环形队列的长度
buf unsafe.Pointer // 存放数据的环形数组的指针
elemsize uint16 // channel 中数据类型的大小
closed uint32
elemtype *_type // 元素类型
sendx uint // 发送的索引
recvx uint // 接受的索引
recvq waitq // 接受者列表
sendq waitq // 发送者列表
// 锁会锁住 hchan 中的所有字段和阻塞在这个 hchan 上的 sudog 的几个字段
lock mutex
}
type waitq struct{
first *sudog
last *sudog
}
这样可以看出 hchan 结构本质上是一个环形队列+两个链表+一个锁。
-
buf 是一个环形队列,它表示已经发送但是还未被接收的数据缓存。buf 的大小由创建 chan 时的参数来决定。其中 sendx 表示下一个发送地址,recvx 表示下一个接收的地址。
-
qcount 表示当前缓冲区中有效数据的总量
-
sendx:发送索引,每次发送数据的时候就 +1,因为是循环队列,所以有一个 mod 的操作。
-
recvx:接收索引,每次从 channel 里面接收数据就 +1, 也会有 mod 操作。
-
dataqsiz 表示缓冲区的大小,对于无缓冲区通道而言,dataqsize 的值为 0。如果 qcount 和 dataqsiz 的值相同,则表示缓冲区用完了。
sendq 和 recvq 这两个链表分别存放阻塞在这个 channel 上的sender 和 reiceiver,它是个双向链表。
-
recvq 表示等待接收的 sudog 列表。一个接收语句执行时,如果缓冲区没有数据而且当前没有别的发送者在等待,那么执行者 goroutine 会被挂起,并且将对应的 sudog 放入到 recvq 中。
-
sendq 类似于 recvq,一个发送语句执行时,如果缓冲区满了或者没有接收者在等待,那么执行者 goroutine 会被挂起,并且将对应的 sudog 放到 sendq 中。
-
closed 表示通道是否已经关闭,0 表示没有关闭,非 0 值表示已经关闭。
-
lock 用于对 hchan 加锁。
相关数据结构
goroutine
在 runtime 库中,goroutine 用一个叫做 g 的结构表示,每个 g 对象个表示一个 goroutine。
// src/runtime/runtime2.go
type g struct{
atomicstatus uint32 // 表示 goroutine 的状态
param unsafe.Pointer // 唤醒时的参数
waiting *sudog // 等待队列,表示这个 goroutine 正在等待什么。
}
sudog
sudog 代表了等待列表里的 g。
sudog 是一个链表形式的类型,waitlink 表示它的下一个节点
// src/runtime/runtime2.go
type sudog struct{
g *g // 指向 goroutine 对象的指针
isSelect bool // isSelect 表示 g 加入了一个 select
elem unsafe.Pointer // 数据元素,可能是栈上的指针
waitlink *sudog
c *hchan // 指向 channel 的指针
}
acquireSudog 申请一个 sudog 对象。releaseSudog 释放 sudog 对象。
gopark 和 goready
gopark 将当前的 goroutine 修改成等待状态,然后等待被唤醒。goready 函数用来唤醒一个 goroutine,它将 goroutine 的状态修改为可运行状态。随后会被调度器运行。当被调度时,对应的 gopark 函数返回。
race
在编译时,使用 -race 参数,可以进行竞态检查,源码中也很多代码提供了 race 的支持。分析时可以跳过这些代码。这部分可以参考:https://blog.golang.org/race-detector
创建通道
代码里的 ch := make(chan int, 2) 会被编译器翻译成:
t := typeof(chan int)
ch := makechan(t, 2)
这里的 makechan 就是真正创建 channel 的方法,它构造 hchan 对象并且返回。hchan 在程序中始终以引用的形式存在。hchan 在标准库中都是以指针的形式呈现给外部的。
makechan 的逻辑分为三种:
-
缓冲区大小为 0,这种情况下给 hchan 分配内存的时候,只需要分配 sizeof(hchan) 大小的内存。
-
缓冲区大小不为 0,而且数据类型不包含指针。这种情况分配一块连续的内存用于存放 hchan 和缓冲区对象。
-
缓冲区大小不为 0,而且数据类型包含指针。这种情况分配两块内存,一块存放 hchan,另一块用来表示 buf。
发送数据
chansend()
发送数据部分,runtime 中通过 chansend 实现。它的声明如下
func chansend(c *hcahn, ep unsafe.Pointer, block bool, callerpc uintptr) bool
c 表示向哪个 chan 发送数据,ep 表示要发送的数据的地址。block 表示是否需要阻塞(在 select 场景下使用就不阻塞)。callerpc 表示调用地址。返回值 bool 表示数据是否成功发送。
chansend 按照下面的逻辑执行
1. 如果通道是空的,对于非阻塞的发送,直接返回 false。对于阻塞的发送,将 goroutine 挂起,并且永不返回。
if c == nil{
if !block{
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
2. 如果通道没有关闭,而且当前没有接收者,缓冲区也已经满了或者没有缓冲区,非阻塞情况下,那么直接返回 false。阻塞情况下继续执行。
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)){
return false
}
这两步都没有加锁。
第一步中,没有访问 hchan 的任何成员,所以无需加锁。第二步中,可能被写的变量只有 closed,qcount 和 c.recvq.first,这些变量都是单字长的,所以对他们的单个值得读操作是原子性的。
需要注意的是这里的 if 判断里判断了 channel 是否关闭,虽然判断了是否关闭,但是可能在判断结束的那一刻 chanel 变成了关闭状态,所以虽然这里判断了,实际上还是可能未关闭或者关闭。
如果没有关闭,那么这个逻辑是没有问题的,返回 false。如果 channel 关闭了,那么理论上应该 panic,不过这里是返回了 false。(这里原因我也不清楚)。
3. 加锁以及判断是否已经关闭
// 加锁
lock(&c.lock)
// 如果通道已经关闭了,则 panic
if c.closed != 0{
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
4. 然后从 recvq 中取出一个接收者,如果接收者存在,直接向该接收者发送数据。
这样操作就不用把数据先写入 buf,然后再读出到 goroutine 上,减少了一次复制操作。
if sg := c.recvq.dequeue(); sg != nil{
send(c, sg, ep, func(){ unlock(&c.lock) }, 3)
return true
}
send 函数将 ep 作为参数传送给接收方的 sg 对象,然后使用 goready 将其唤醒。sg.elem 如果非空,则将 ep 的内容直接复制到 elem 指向的地址。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int){
// ...
if sg.elem !=nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer){
dst := sg.elem
memmove(dst, src, t.size)
}
5. 如果没有接收者,那么就存入到缓冲区。
如果缓冲区还有多余的空间,那么将数据写入缓冲区。
写入缓冲区后,将发送位置 sendx 往后移动一个单位,然后将 qcount 加 1。
if c.qcount < c.dataqsiz{
qp := chanbuf(c, c.sendx)
typememmove(e.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz{
c.sendx == 0
}
c.qcount++
unlock(&c.lock)
return true
}
其中 chanbuf 函数从 buf 中取出第 i 个元素的存放地址:
func chanbuf(c *hchan, i uint) unsafe.Pointer{
return add(c.buf, uintptr(i) * uintptr(c.elemsize))
}
typedmemmove 函数将类型为 c.elemtype 的 ep 的内容拷贝到 qp 中。
如果前面的都执行完还没有发送成功,就表示缓冲区没有空间了,而且也没有接收者在等待。
所以后面必须要将 goroutine 挂起然后等待新的接收者了。但对于非阻塞的调用,不能等待,返回 false 表示数据发送不成功。
if !block{
unlock(&c.lock)
return false
}
对于阻塞的对象,创建 sudog 对象,然后入队并且让 goroutine 进入等待状态。直到被唤醒时 goparkunlock 才会返回。
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
mysg.g= gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
goparkunlcok 返回后,代表数据已经发送完成了,然后做一些清理工作,如将 sudog 对象释放,将 g 的 waiting 置空等。
chansend 中的 block
block 是为了实现如下代码的语义:
c := make(chan int)
// ...
select{
case <-c:
// ...
default:
// ...
}
这段代码会被编译成 selectnbsend 的调用:
if selectnbsend(c, v){
// ...
}else{
// ...
}
selectnbsend 实现如下
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool){
return chansend(c, elem, false, getcallerpc()) // 非阻塞发送
}
接收数据
chanrecv()
接收数据的操作和发送数据的操作类似,它的实现函数为 chanrecv
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
chanrecv 从 c 中接收数据,并且将接收到的数据存到 ep 中,block 表示是否要阻塞。
如果没有数据可以接收,而且是非阻塞的情况,则返回 (false, false)。如果 c 已经关闭了,则将 ep 指向的值置为 0 值,并且返回 (true, false)。其他情况返回值为 (true, true),表示成功从 c 中取到了数据。
recv 的逻辑如下
if c == nil{
if !block{
return
}
gopark(nil, nil, waitReasonChanREceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 如果 c 为空且没有被关闭,非阻塞模式下直接返回 (false, false)。否则继续
if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0{
return
}
lock(&c.lock)
// 从 closed channel 接收数据,如果已经没有数据了,则返回默认值。
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 当前有发送 goroutine 阻塞在 channel 上,这个时候 buf 已经满了
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 如果 buf 中有可用数据
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
// buf 为空,阻塞
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// 让该G进入休眠,等待解锁
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
// 当再次被唤醒后
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
recv()
recv 有两种情况,
关闭通道
closechan 函数实现了通道的关闭,它的声明如下:
func closechan(c *hchan){
// 加锁,判断通道是否已经关闭了,已经关闭了直接 panic
lock(&c.lock)
if c.closed != 0{
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
// 唤醒所有接收者,将接收数据置为 0 值。
// 唤醒所有发送者,另其 panic。
var glist gList
for {
sg := c.recvq.dequeue()
if sg == nil{
break
}
if sg.elem != nil{
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0{
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
glist.push(gp)
}
}
c.closed = 1
var glist *g
// release all readers
for {
sg := c.recvq.dequeue()
// ....
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
// ...
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for glist != nil {
gp := glist
glist = glist.schedlink.ptr()
gp.schedlink = 0
goready(gp, 3)
}