1. 简介

channel 称为通道/管道,是 Go 中可以在协程之间发送和接收消息的通信机制,就像一个消息队列,每一个通道会指定一个固定的数据类型,这个通道只能收发这个类型的对象。

与其他语言通过共享内存来仔线程之间实现通信不同,Go 提倡“不要通过共享内存的方式进行通信,而是通过 Channel 通信的方式共享内存”。

1.1 类型

一个 int 类型的通道,它的类型是 chan int,同种类型的通道可以使用 == 进行比较。通道的零值是 nil,对 nil 通道发送和接收将永远阻塞。

通道是使用 make 函数创建的数据结构的引用,当复制或者作为参数传递给函数时,复制的是引用。

1.2 初始化

// 声明变量,值为nil
var ch chan int

// make函数
ch = make(chan int)      // 无缓冲通道
ch := make(chan int 0) // 无缓冲通道
ch := make(chan int 3) // 容量为3的缓冲通道

1.3 操作

通道有两个主要操作:发送(send)和接收(receive)。

ch <- x  // 发送

x := <-ch     // 接收,赋值变量
x, ok := <-ch // 接收,赋值变量且判断是否成功读取
<-ch          // 接收,丢弃结果

通道还有一个操作:关闭(close)。关闭后发送操作将导致 panic,而接收操作将获取缓冲区的值,直至通道为空,后续再接收将获取到通道元素类型对应的零值。

close(ch) // 关闭通道

通道默认可以读和写,而在传递到函数参数中时,可以限制通道在函数中是否可以读或写数据。

// 通道可读写
func f(ch chan int) {
}

// 通道只能读
func f(ch <-chan int) {
}

// 通道只能写
func f(ch chan<- int) {
}

1.4 缓冲

通道创建时可以指定缓冲区容量,在其内会维护一个元素队列,队列的最大长度是通道的容量。发送操作向队列尾部插入元素,若通道满了则阻塞,接收操作从通道头部移除一个元素,若通道空了则阻塞。

缓冲区容量为0是无缓冲通道,也称为同步通道。发送操作会阻塞,直至另一个协程对通道执行接收操作,接收操作也会阻塞,直至另一个协程对通道发送一个值。

无缓冲通道提供强同步保障,发送和接收同步,而缓冲通道将发送和接收解耦。当发送快于接收和接收快于发送,缓冲区保持满或者空,缓冲通道都是没有价值的。

获取通道缓冲区容量和当前元素个数:

cap(ch) // 获取通道容量
len(ch) // 获取通道当前元素个数

读取通道阻塞的条件有:

  • 通道无缓冲区
  • 通道的缓冲区无数据
  • 通道为nil

写入通道阻塞的条件有:

  • 通道无缓冲区
  • 通道的缓冲区已满
  • 通道为nil

2. 实现原理

2.1 数据结构

通道的数据结构定义在 go 源码的 src/runtime/chan.go 中:

type hchan struct {
	qcount   uint           // 队列当前元素个数
	dataqsiz uint           // 环形队列长度
	buf      unsafe.Pointer // 环形队列指针
	elemsize uint16         // 每个元素大小
	closed   uint32         // 标识关闭状态
	elemtype *_type         // 元素类型
	sendx    uint           // 写入时的队列下标
	recvx    uint           // 读取时的队列下标
	recvq    waitq          // 等待从通道读取的协程队列
	sendq    waitq          // 等待向通道写入的协程队列
	lock     mutex          // 互斥锁
}

可以看到 go 使用一个环形队列来实现通道,通过两个下标来标记读写在环形队列中的位置,以充分利用内存。

2.2 创建通道

编译器将 make 创建通道的表达式进行转换,进而调用 runtime.makechan 函数。

创建时根据通道是否有缓冲区、元素类型是否为指针,分配对应的内存。

func makechan(t *chantype, size int) *hchan {
	elem := t.Elem
	mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}
	var c *hchan
	switch {
	case mem == 0: // 无缓冲通道,分配通道默认内存
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		c.buf = c.raceaddr()
	case elem.PtrBytes == 0: // 元素类型不是指针,分配通道加缓冲区以及底层数组的内存
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default: // 元素类型是指针,分配通道加缓冲区的内存
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}
	c.elemsize = uint16(elem.Size_)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)
	return c
}

2.3 发送数据

编译器将对通道发送数据的语句转为调用 runtime.chansend 函数。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	lock(&c.lock) // 对当前通道加锁,防止被其它协程并发修改数据
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

	// 如果存在等待写的协程,从队列取出第一个协程,调用send函数
	if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	// 若缓冲区未满,将发送的数据写入缓冲区
	if c.qcount < c.dataqsiz {
		qp := chanbuf(c, c.sendx)
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}

	// 无缓冲区或缓冲区已满,阻塞等待
	if !block {
		unlock(&c.lock)
		return false
	}
	gp := getg() // 获取发送数据的协程
	mysg := acquireSudog()
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg) // 将这次发送阻塞信息加到等待写的协程队列
	gp.parkingOnChan.Store(true)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
	KeepAlive(ep)

	gp.waiting = nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true
}

send 函数将数据写入等待接收数据的协程,并将该协程标记为可运行状态。

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if sg.elem != nil {
		sendDirect(c.elemtype, sg, ep) // 将数据写入
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	goready(gp, skip+1) // 将协程标记为可运行状态
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	memmove(dst, src, t.Size_)
}

2.4 接收数据

编译器将从通道接收数据的语句转为调用 runtime.chanrecv 函数。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	lock(&c.lock) // 对当前通道加锁,防止被其它协程并发修改数据
	if c.closed != 0 {
		if c.qcount == 0 {
			unlock(&c.lock)
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
	} else {
		// 如果存在等待读的协程,从队列取出第一个协程,调用recv函数
		if sg := c.sendq.dequeue(); sg != nil {
			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
			return true, true
		}
	}

	// 若缓冲区已有数据,从缓冲区将数据取出
	if c.qcount > 0 {
		qp := chanbuf(c, c.recvx)
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

	// 无可以获取的数据,阻塞等待
	if !block {
		unlock(&c.lock)
		return false, false
	}
	gp := getg() // 获取接收数据的协程
	mysg := acquireSudog()
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg) // 将这次接收阻塞信息加到等待读的协程队列
	gp.parkingOnChan.Store(true)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)

	gp.waiting = nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, success
}

recv 函数将数据写入等待接收数据的协程,并将该协程标记为可运行状态。

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if c.dataqsiz == 0 {
		if ep != nil {
			// 无缓冲区,直接将数据写入
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
		qp := chanbuf(c, c.recvx)
		if ep != nil {
        	// 将队列的数据复制到接收方的内存地址
			typedmemmove(c.elemtype, ep, qp)
		}
        // 将发送队列头数据复制到缓冲区,释放一个阻塞的发送方
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	goready(gp, skip+1) // 将处理器分配至发送数据的协程
}

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
	memmove(dst, src, t.Size_)
}

2.5 关闭通道

编译器将关闭通道转为调用 runtime.closechan 函数。

func closechan(c *hchan) {
	if c == nil {
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock) // 对当前通道加锁,防止被其它协程并发修改数据
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	c.closed = 1

	var glist gList

	// 释放从通道读取的协程队列
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		glist.push(gp)
	}

	// 释放向通道写入的协程队列
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		glist.push(gp)
	}
	unlock(&c.lock)

	for !glist.empty() {
		goready(gp, 3) // 触发调度
	}
}

3. 参考