整体结构

Channel 的操作对应在运行时里的方法(都在 $GORROT/src/runtime/chan.go 里)

Channel 操作 运行时方法
make(chan int, 4) makechan(int, 4)
make(chan int) makechan(int, 0)
ch <- v chansend1(ch, &v)
v <- ch chanrecv1(ch, &v)
close(ch) closechan(ch)

channel 的基本数据结构

makechan 返回的是一个叫做 hchan 的结构。所以 chan 本质上是用 hchan 表示的,它的传参与赋值始终都是指针形式,每个 hchan 对象代表着一个 chan。

type hchan struct{
    qcount      uint // 队列里的元素总数目
    dataqsiz    uint // 环形队列的长度
    buf        unsafe.Pointer // 存放数据的环形数组的指针
    elemsize    uint16 // channel 中数据类型的大小
    closed      uint32
    elemtype    *_type // 元素类型
    sendx       uint // 发送的索引
    recvx       uint // 接受的索引
    recvq       waitq // 接受者列表
    sendq       waitq // 发送者列表
    
    // 锁会锁住 hchan 中的所有字段和阻塞在这个 hchan 上的 sudog 的几个字段
    lock        mutex 
}

type waitq struct{
    first *sudog
    last  *sudog
}

这样可以看出 hchan 结构本质上是一个环形队列+两个链表+一个锁。

  • buf 是一个环形队列,它表示已经发送但是还未被接收的数据缓存。buf 的大小由创建 chan 时的参数来决定。其中 sendx 表示下一个发送地址,recvx 表示下一个接收的地址。

  • qcount 表示当前缓冲区中有效数据的总量

  • sendx:发送索引,每次发送数据的时候就 +1,因为是循环队列,所以有一个 mod 的操作。

  • recvx:接收索引,每次从 channel 里面接收数据就 +1, 也会有 mod 操作。

  • dataqsiz 表示缓冲区的大小,对于无缓冲区通道而言,dataqsize 的值为 0。如果 qcount 和 dataqsiz 的值相同,则表示缓冲区用完了。

sendq 和 recvq 这两个链表分别存放阻塞在这个 channel 上的sender 和 reiceiver,它是个双向链表。

  • recvq 表示等待接收的 sudog 列表。一个接收语句执行时,如果缓冲区没有数据而且当前没有别的发送者在等待,那么执行者 goroutine 会被挂起,并且将对应的 sudog 放入到 recvq 中。

  • sendq 类似于 recvq,一个发送语句执行时,如果缓冲区满了或者没有接收者在等待,那么执行者 goroutine 会被挂起,并且将对应的 sudog 放到 sendq 中。

  • closed 表示通道是否已经关闭,0 表示没有关闭,非 0 值表示已经关闭。

  • lock 用于对 hchan 加锁。

相关数据结构

goroutine

在 runtime 库中,goroutine 用一个叫做 g 的结构表示,每个 g 对象个表示一个 goroutine。

// src/runtime/runtime2.go
type g struct{
    atomicstatus     uint32 // 表示 goroutine 的状态
    param           unsafe.Pointer // 唤醒时的参数
    waiting         *sudog // 等待队列,表示这个 goroutine 正在等待什么。
}

sudog

sudog 代表了等待列表里的 g。

sudog 是一个链表形式的类型,waitlink 表示它的下一个节点

// src/runtime/runtime2.go 
type sudog struct{
    g *g // 指向 goroutine 对象的指针
    isSelect       bool // isSelect 表示 g 加入了一个 select
    elem           unsafe.Pointer // 数据元素,可能是栈上的指针
    waitlink       *sudog  
    c              *hchan // 指向 channel 的指针
}

acquireSudog 申请一个 sudog 对象。releaseSudog 释放 sudog 对象。

gopark 和 goready

gopark 将当前的 goroutine 修改成等待状态,然后等待被唤醒。goready 函数用来唤醒一个 goroutine,它将 goroutine 的状态修改为可运行状态。随后会被调度器运行。当被调度时,对应的 gopark 函数返回。

race

在编译时,使用 -race 参数,可以进行竞态检查,源码中也很多代码提供了 race 的支持。分析时可以跳过这些代码。这部分可以参考:https://blog.golang.org/race-detector

创建通道

代码里的 ch := make(chan int, 2) 会被编译器翻译成:

t := typeof(chan int)
ch := makechan(t, 2)

这里的 makechan 就是真正创建 channel 的方法,它构造 hchan 对象并且返回。hchan 在程序中始终以引用的形式存在。hchan 在标准库中都是以指针的形式呈现给外部的。

makechan 的逻辑分为三种:

  1. 缓冲区大小为 0,这种情况下给 hchan 分配内存的时候,只需要分配 sizeof(hchan) 大小的内存。

  2. 缓冲区大小不为 0,而且数据类型不包含指针。这种情况分配一块连续的内存用于存放 hchan 和缓冲区对象。

  3. 缓冲区大小不为 0,而且数据类型包含指针。这种情况分配两块内存,一块存放 hchan,另一块用来表示 buf。

发送数据

chansend()

发送数据部分,runtime 中通过 chansend 实现。它的声明如下

func chansend(c *hcahn, ep unsafe.Pointer, block bool, callerpc uintptr) bool

c 表示向哪个 chan 发送数据,ep 表示要发送的数据的地址。block 表示是否需要阻塞(在 select 场景下使用就不阻塞)。callerpc 表示调用地址。返回值 bool 表示数据是否成功发送。

chansend 按照下面的逻辑执行

1. 如果通道是空的,对于非阻塞的发送,直接返回 false。对于阻塞的发送,将 goroutine 挂起,并且永不返回。

if c == nil{
    if !block{
        return false
    }
    gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
    throw("unreachable")
}

2. 如果通道没有关闭,而且当前没有接收者,缓冲区也已经满了或者没有缓冲区,非阻塞情况下,那么直接返回 false。阻塞情况下继续执行。

if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)){
    return false
}

这两步都没有加锁。

第一步中,没有访问 hchan 的任何成员,所以无需加锁。第二步中,可能被写的变量只有 closed,qcount 和 c.recvq.first,这些变量都是单字长的,所以对他们的单个值得读操作是原子性的。

需要注意的是这里的 if 判断里判断了 channel 是否关闭,虽然判断了是否关闭,但是可能在判断结束的那一刻 chanel 变成了关闭状态,所以虽然这里判断了,实际上还是可能未关闭或者关闭。

如果没有关闭,那么这个逻辑是没有问题的,返回 false。如果 channel 关闭了,那么理论上应该 panic,不过这里是返回了 false。(这里原因我也不清楚)。

3. 加锁以及判断是否已经关闭

// 加锁
lock(&c.lock)

// 如果通道已经关闭了,则 panic
if c.closed != 0{
    unlock(&c.lock)
    panic(plainError("send on closed channel"))
}

4. 然后从 recvq 中取出一个接收者,如果接收者存在,直接向该接收者发送数据。

这样操作就不用把数据先写入 buf,然后再读出到 goroutine 上,减少了一次复制操作。

if sg := c.recvq.dequeue(); sg != nil{
    send(c, sg, ep, func(){ unlock(&c.lock) }, 3)
    return true
}

send 函数将 ep 作为参数传送给接收方的 sg 对象,然后使用 goready 将其唤醒。sg.elem 如果非空,则将 ep 的内容直接复制到 elem 指向的地址。

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int){
    // ...
    if sg.elem !=nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer){
    dst := sg.elem
    memmove(dst, src, t.size)
}

5. 如果没有接收者,那么就存入到缓冲区。

如果缓冲区还有多余的空间,那么将数据写入缓冲区。

写入缓冲区后,将发送位置 sendx 往后移动一个单位,然后将 qcount 加 1。

if c.qcount < c.dataqsiz{
    qp := chanbuf(c, c.sendx)
    typememmove(e.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz{
        c.sendx == 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
}

其中 chanbuf 函数从 buf 中取出第 i 个元素的存放地址:

func chanbuf(c *hchan, i uint) unsafe.Pointer{
    return add(c.buf, uintptr(i) * uintptr(c.elemsize))
}

typedmemmove 函数将类型为 c.elemtype 的 ep 的内容拷贝到 qp 中。

如果前面的都执行完还没有发送成功,就表示缓冲区没有空间了,而且也没有接收者在等待。

所以后面必须要将 goroutine 挂起然后等待新的接收者了。但对于非阻塞的调用,不能等待,返回 false 表示数据发送不成功。

if !block{
    unlock(&c.lock)
    return false
}

对于阻塞的对象,创建 sudog 对象,然后入队并且让 goroutine 进入等待状态。直到被唤醒时 goparkunlock 才会返回。

gp := getg()

mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
mysg.g= gp
mysg.isSelect = false
mysg.c = c

gp.waiting = mysg
gp.param = nil

c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)

goparkunlcok 返回后,代表数据已经发送完成了,然后做一些清理工作,如将 sudog 对象释放,将 g 的 waiting 置空等。

chansend 中的 block

block 是为了实现如下代码的语义:

c := make(chan int)
// ...
select{
case <-c:
    // ...
default:
    // ...
}

这段代码会被编译成 selectnbsend 的调用:

if selectnbsend(c, v){
    // ...
}else{
    // ...
}

selectnbsend 实现如下

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool){
    return chansend(c, elem, false, getcallerpc()) // 非阻塞发送
}

接收数据

chanrecv()

接收数据的操作和发送数据的操作类似,它的实现函数为 chanrecv

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)

chanrecv 从 c 中接收数据,并且将接收到的数据存到 ep 中,block 表示是否要阻塞。

如果没有数据可以接收,而且是非阻塞的情况,则返回 (false, false)。如果 c 已经关闭了,则将 ep 指向的值置为 0 值,并且返回 (true, false)。其他情况返回值为 (true, true),表示成功从 c 中取到了数据。

recv 的逻辑如下

if c == nil{
    if !block{
        return
    }
    gopark(nil, nil, waitReasonChanREceiveNilChan, traceEvGoStop, 2)
    throw("unreachable")
}
// 如果 c 为空且没有被关闭,非阻塞模式下直接返回 (false, false)。否则继续

if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0{
    return
}

lock(&c.lock)

// 从 closed channel 接收数据,如果已经没有数据了,则返回默认值。
if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
                raceacquire(unsafe.Pointer(c))
        }
        unlock(&c.lock)
        if ep != nil {
                typedmemclr(c.elemtype, ep)
        }
        return true, false
}

// 当前有发送 goroutine 阻塞在 channel 上,这个时候 buf 已经满了
if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
}

// 如果 buf 中有可用数据
if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if ep != nil {
                typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
                c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
}

// buf 为空,阻塞
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
        mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// 让该G进入休眠,等待解锁
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

// 当再次被唤醒后
if mysg != gp.waiting {
                throw("G waiting list is corrupted")
        }
        gp.waiting = nil
        if mysg.releasetime > 0 {
                blockevent(mysg.releasetime-t0, 2)
        }
        closed := gp.param == nil
        gp.param = nil
        mysg.c = nil
        releaseSudog(mysg)
        return true, !closed

recv()

recv 有两种情况,

关闭通道

closechan 函数实现了通道的关闭,它的声明如下:

func closechan(c *hchan){
    // 加锁,判断通道是否已经关闭了,已经关闭了直接 panic
    lock(&c.lock)
    if c.closed != 0{
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }
    
    c.closed = 1
    
    // 唤醒所有接收者,将接收数据置为 0 值。
    // 唤醒所有发送者,另其 panic。
    var glist gList
    for {
        sg := c.recvq.dequeue()
        if sg == nil{
            break
        }

        if sg.elem != nil{
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0{
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        glist.push(gp)
    }
}

参考 深入理解 Go ChannelGo Channel 源码分析图解Go的channel底层原理

c.closed = 1

var glist *g

// release all readers
for {
        sg := c.recvq.dequeue()
      // ....
}

// release all writers (they will panic)
for {
        sg := c.sendq.dequeue()
      // ...
}
unlock(&c.lock)

// Ready all Gs now that we've dropped the channel lock.
for glist != nil {
        gp := glist
        glist = glist.schedlink.ptr()
        gp.schedlink = 0
        goready(gp, 3)
}