当前位置: 首页 > news >正文

做3d打印网站wordpress文章链接带问号

做3d打印网站,wordpress文章链接带问号,wordpress模板打包,怎么策划一个网站1、引言 Do not communicate by sharing memory; instead, share memory by communicating Golang 的并发哲学是“不要通过共享内存进行通信#xff0c;而要通过通信来共享内存”#xff0c;提倡通过 channel 进行 goroutine 之间的数据传递和同步#xff0c;而不是通过共享…1、引言 Do not communicate by sharing memory; instead, share memory by communicating Golang 的并发哲学是“不要通过共享内存进行通信而要通过通信来共享内存”提倡通过 channel 进行 goroutine 之间的数据传递和同步而不是通过共享变量内存来实现。 func write(chanInt chan int) {for i : 0; i 10; i {chanInt - i}close(chanInt) }func read(chanInt chan int, chanExit chan bool) {for {v, ok : -chanIntif !ok {break}fmt.Println(v)}chanExit - trueclose(chanExit) }func TestCSP(t *testing.T) {chanInt : make(chan int, 10)chanExit : make(chan bool)go write(chanInt)go read(chanInt, chanExit)for {select {case _, ok : -chanExit:if !ok {fmt.Println(done)return}}}}如上述示例write 函数负责写read 函数负责读chanInt 负责在两个 goroutine 进行数据同步chanExit 负责监听数据已处理完成并最终退出。整个程序没有看到锁非常的优雅。 接下来来说说 channel 的特性最后结合底层源码来加深印象。 2、特性 2.1 基本用法 由于 channel 是引用类型需要用 make 来初始化 chanBuffer : make(chan int, 10) chanNoBuffer : make(chan int)这里创建的是可读写的 channel区别在于是否有 capacity(容量) 带缓冲区的 channel可以存储 cap 个数据不带缓冲区的 channel一般用于同步 chanWriteOnly : make(chan- int) chanReadOnly : make(-chan int)这里创建的是只写和只读的 channel不过这样写意义不大一般用于传参接下来用这两个 chan 把引言示示例中关于 write 和 read 函数给改下 func write(chanInt chan- int) {for i : 0; i 10; i {chanInt - i}close(chanInt) }func read(chanInt -chan int, chanExit chan bool) {for {v, ok : -chanIntif !ok {break}fmt.Println(v)}chanExit - trueclose(chanExit) }查看 channel 的长度和容量 func TestChanLenCAP(t *testing.T) {chanInt : make(chan int, 2)chanInt - 1fmt.Println(len(chanInt)) // 1fmt.Println(cap(chanInt)) // 2 }关闭 channel close(ch)判断 channel 是否已关闭 func TestChanIsClosed(t *testing.T) {chanInt : make(chan int, 10)close(chanInt)if _, ok : -chanInt; !ok {fmt.Println(closed)} }向一个已关闭的 channel 读数据会读到零值并且每次读也都是零值因此可以利用这个特性来判断 channel 是否已关闭。 2.2 异常情况 接下来看看几种需要注意的异常情况 注意 Golang 版本为 1.19.12。不同版本的调度器和运行时的行为可能会有所不同尤其是与死锁检测相关的机制。这些变化可能导致在某些版本中程序会更快地检测到死锁而在其他版本中则可能仅仅是阻塞而不报错。 2.2.1 给一个 nil channel发送数据 func TestWriteNil(t *testing.T) {var chanInt chan intchanInt - 1 }由于 chanInt 还没初始化值为 nil此时代码会阻塞在 chanInt - 1 这一行并最终形成死锁。 fatal error: all goroutines are asleep - deadlock!解法channel 使用前需要使用 make 初始化。 2.2.2 从一个 nil channel 读数据 func TestReadNil(t *testing.T) {var chanInt chan int-chanInt }由于 chanInt 还没初始化值为 nil此时代码会阻塞在 -chanInt 这一行并最终形成死锁。 fatal error: all goroutines are asleep - deadlock!解法channel 使用前需要使用 make 初始化。 2.2.3 关闭一个 nil channel func TestCloseNil(t *testing.T) {var chanInt chan intclose(chanInt) }如果尝试关闭一个 nil 的 channel会导致运行时错误 panic: close of nil channel。 panic: close of nil channel [recovered]panic: close of nil channel解法channel 使用前需要使用 make 初始化。 前三个异常说明channel 使用前一定要使用 make 进行初始化。 2.2.4 向一个已关闭的 channel 发数据 func TestWriteClosed(t *testing.T) {chanNoBuffer : make(chan int)close(chanNoBuffer)chanNoBuffer - 1 }向一个已关闭的 channel 发送数据会引起 panic。 panic: send on closed channel [recovered]panic: send on closed channel这是因为一旦 channel 被关闭就不能再向其发送数据但可以继续从中接收数据。 解法判断 channel 是否已关闭。 2.2.5 向一个已关闭的 channel 发起重复关闭动作 func TestClosedOnceMore(t *testing.T) {chanNoBuffer : make(chan int)close(chanNoBuffer)close(chanNoBuffer) }尝试关闭一个已经关闭的 channel 会导致运行时错误 panic: close of closed channel。这个错误通常出现在多个 goroutine 试图关闭同一个 channel 或者代码逻辑不正确导致同一个 channel 被关闭多次。 panic: close of closed channel [recovered]panic: close of closed channel解法判断 channel 是否已关闭。 2.2.6 向没有缓冲区的 channel 写数据但没有读取方 func TestSendNoBuffer(t *testing.T) {ch : make(chan int)ch - 4 }无缓冲的 channel 是一种同步通信机制当只有发送方没有接收方会陷入阻塞而死锁。 fatal error: all goroutines are asleep - deadlock!解法无缓冲 channel 是一种同步通信机制需要发送和接收操作同时进行。 2.2.7 向没有缓冲区的 channel 读取数据但没有写入方 func TestReadNoBuffer(t *testing.T) {ch : make(chan int)-ch }尝试从一个无缓冲的 channel 读取数据时如果没有其他 goroutine 向该 channel 发送数据读取操作将会阻塞。这会导致程序死锁并最终导致运行时错误。 fatal error: all goroutines are asleep - deadlock!解法无缓冲 channel 是一种同步通信机制需要发送和接收操作同时进行。 2.2.8 无缓冲区 channel 的发送和接收操作没有同时进行 func ReadNoBufferChan(chanBool chan bool) {-chanBool }func TestSendNoBufferChan(t *testing.T) {ch : make(chan bool)ch - truego ReadNoBufferChan(ch)time.Sleep(1 * time.Second) }上面两个异常一直强调由于无缓冲 channel 是一种同步通信机制需要发送和接收操作同时进行。代码执行到 ch - chan 时调度器发现没有任何 goroutine 接收于是阻塞并死锁。 fatal error: all goroutines are asleep - deadlock!解法无缓冲 channel 是一种同步通信机制需要发送和接收操作同时进行。 func TestSendNoBufferChan(t *testing.T) {ch : make(chan bool)go ReadNoBufferChan(ch)ch - truetime.Sleep(1 * time.Second) }把 go ReadNoBufferChan(ch) 提前这样就确保了在发送数据之前有一个 goroutine 正在等待接收数据。 对于无缓冲的 channel 读取和写入要成对出现并且不能在同一个 goroutine 里使用 for 读取数据时写入方需要关闭 channel 2.2.9 向有缓存区的 channel 先读数据 func TestWriteBufferChan(t *testing.T) {ch : make(chan int, 1)if _, ok : -ch; !ok {fmt.Println(closed)} }当尝试从一个空的带缓冲的 channel 读取数据时读取操作会阻塞直到有数据被写入 channel。这是因为即使是带缓冲的 channel也需要在读取数据时有数据可读。 带缓冲的 channel 和无缓冲的 channel 的主要区别在于带缓冲的 channel 可以存储一定数量的数据而无缓冲的 channel 则需要发送和接收操作同步进行。然而这并不改变以下事实当一个 goroutine 试图从空的 channel 读取数据时它会被阻塞直到有其他 goroutine 写入数据。 fatal error: all goroutines are asleep - deadlock!解法需要在读取数据时有数据可读。 2.2.10 向有缓存区的 channel 写数据但没有读取数据 func TestReadBufferChan(t *testing.T) {ch : make(chan int, 1)ch - 1ch - 2 }当带缓冲的 channel 在缓冲区满时写入操作会阻塞直到有数据被读取以腾出缓冲区空间。如没有读取方最后就会因阻塞而死锁。 fatal error: all goroutines are asleep - deadlock!解法当带缓冲的 channel 在缓冲区满时需要有读取方或者增加缓冲区的大小。 注意对于带缓冲的 channel 在缓冲区没超过容量之前写入数据若没有读取不像不带缓冲区的 channel 那样不会产生死锁的。 其实最后这两个带缓冲区 channel 异常情况总结就是 若在同一个 goroutine 里写数据操作一定在读数据操作前若 channel 空了接收者会阻塞若 channel 满了发送者会阻塞 3、底层实现 3.1 数据结构 Golang 的 channel 在运行时使用 runtime.hchan 结构体表示。 // runtime/chan.go type hchan struct {qcount uint // 队列中的数据个数dataqsiz uint // 环形缓冲区的大小buf unsafe.Pointer // 环形缓冲区指针elemsize uint16 // 单个元素的大小closed uint32 // 标志 channel 是否关闭elemtype *_type // 元素的类型sendx uint // 发送操作的索引recvx uint // 接收操作的索引recvq waitq // 等待接收的 goroutine 队列sendq waitq // 等待发送的 goroutine 队列lock mutex // 保护 channel 的锁 }先看看环形缓冲区相关的字段 qcount: 当前缓冲区中的元素个数。dataqsiz: 环形缓冲区的容量。buf: 实际存储数据的缓冲区类型为 unsafe.Pointer类似 C 语言的 void *。elemsize: 每个元素的大小。sendx: 环形缓冲区中下一个待写入的位置。recvx: 环形缓冲区中下一个待读取的位置。 再来看看发送和接收队列 recvq: 等待接收的 goroutine 队列。sendq: 等待发送的 goroutine 队列。 这两个队列是通过 waitq 结构体来实现的waitq 本质上是一个双向链表链表中的每个节点是一个 sudog 结构体sudog 代表一个等待中的 goroutine。 type waitq struct {first *sudoglast *sudog }最后看看 lock 字段 lock 锁用于保护 channel 数据结构的互斥锁。Golang 使用自旋锁和互斥锁的结合来保证 channel 操作的线程安全。 3.2 初始化 func makechan(t *chantype, size int) *hchan {elem : t.elem// compiler checks this but be safe.if elem.size 116 {throw(makechan: invalid channel element type)}if hchanSize%maxAlign ! 0 || elem.align maxAlign {throw(makechan: bad alignment)}mem, overflow : math.MulUintptr(elem.size, uintptr(size))if overflow || mem maxAlloc-hchanSize || size 0 {panic(plainError(makechan: size out of range))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.// buf points into the same allocation, elemtype is persistent.// SudoGs are referenced from their owning thread so they cant be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.var c *hchanswitch {case mem 0:// Queue or element size is zero.c (*hchan)(mallocgc(hchanSize, nil, true))// Race detector uses this location for synchronization.c.buf c.raceaddr()case elem.ptrdata 0:// Elements do not contain pointers.// Allocate hchan and buf in one call.c (*hchan)(mallocgc(hchanSizemem, nil, true))c.buf add(unsafe.Pointer(c), hchanSize)default:// Elements contain pointers.c new(hchan)c.buf mallocgc(mem, elem, true)}c.elemsize uint16(elem.size)c.elemtype elemc.dataqsiz uint(size)lockInit(c.lock, lockRankHchan)if debugChan {print(makechan: chan, c, ; elemsize, elem.size, ; dataqsiz, size, \n)}return c }这里主要说下 switch 相关的分支代码 第一个分支如果 channel 的缓冲区大小是 0也就是创建无缓冲 channel或 channel 中的元素大小是 0如 struct{}{} Golang 中“空结构体”是不占内存的size 为 0时调用 mallocgc() 在堆上为 channel 开辟一段大小为 hchanSize 的内存空间。 这里说下 c.buf c.raceaddr()c.raceaddr() 会返回一个地址这个地址在内存中不会被实际用于存储数据但会被数据竞争检测工具如 Golang 的 race detector用于同步这也是无缓冲区的 channel 用来做数据同步场景的由来。 第二个分支如果元素不包含指针时。调用 mallocgc 一次性分配 hchan 和 buf 的内存。第三个分支默认情况元素类型中有指针类型调用了两次分配空间的函数 new/mallocgc。 仔细看三个分支都调用了 mallocgc 在堆上分配内存也就说 channel 本身会被 GC 自动回收。 在函数的最后会初始化通道结构的字段包括元素大小、元素类型、缓冲区大小和锁。 3.2 发送数据 // entry point for c - x from compiled code // //go:nosplit func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc()) }/** generic single channel send/recv* If block is not nil,* then the protocol will not* sleep but return if it could* not complete.** sleep can wake up with g.param nil* when a channel involved in the sleep has* been closed. it is easiest to loop and re-run* the operation; well see that its now closed.*/ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 当 channel 为 nil 时处理if c nil {if !block {return false}gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)throw(unreachable)}if debugChan {print(chansend: chan, c, \n)}// 竞态检测是用来分析是否存在数据竞争。go test -race ./...if raceenabled {racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not closed, we observe that the channel is// not ready for sending. Each of these observations is a single word-sized read// (first c.closed and second full()).// Because a closed channel cannot transition from ready for sending to// not ready for sending, even if the channel is closed between the two observations,// they imply a moment between the two when the channel was both not yet closed// and not ready for sending. We behave as if we observed the channel at that moment,// and report that the send cannot proceed.//// It is okay if the reads are reordered here: if we observe that the channel is not// ready for sending and then observe that it is not closed, that implies that the// channel wasnt closed during the first observation. However, nothing here// guarantees forward progress. We rely on the side effects of lock release in// chanrecv() and closechan() to update this threads view of c.closed and full().if !block c.closed 0 full(c) {return false}var t0 int64if blockprofilerate 0 {t0 cputicks()}// 加锁lock(c.lock)// 检查 channel 是否关闭if c.closed ! 0 {unlock(c.lock)panic(plainError(send on closed channel))}// 检查是否有等待接收的 goroutineif sg : c.recvq.dequeue(); sg ! nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(c.lock) }, 3)return true}// 检查 channel 缓冲区是否有空位if c.qcount c.dataqsiz {// Space is available in the channel buffer. Enqueue the element to send.qp : chanbuf(c, c.sendx)if raceenabled {racenotify(c, c.sendx, nil)}typedmemmove(c.elemtype, qp, ep)c.sendxif c.sendx c.dataqsiz {c.sendx 0}c.qcountunlock(c.lock)return true}// 非阻塞模式下if !block {unlock(c.lock)return false}// 阻塞模式下将当前 goroutine 加入发送队列并挂起receiver 会帮我们完成后续的工作// Block on the channel. Some receiver will complete our operation for us.gp : getg()mysg : acquireSudog()mysg.releasetime 0if t0 ! 0 {mysg.releasetime -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.// 打包 sudogmysg.elem epmysg.waitlink nilmysg.g gpmysg.isSelect falsemysg.c cgp.waiting mysggp.param nil// 将当前这个发送 goroutine 打包后的 sudog 入队到 channel 的 sendq 队列中c.sendq.enqueue(mysg)// Signal to anyone trying to shrink our stack that were about// to park on a channel. The window between when this Gs status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.// 将这个发送 g 从 Grunning - Gwaiting// 进入休眠atomic.Store8(gp.parkingOnChan, 1)gopark(chanparkcommit, unsafe.Pointer(c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)// Ensure the value being sent is kept alive until the// receiver copies it out. The sudog has a pointer to the// stack object, but sudogs arent considered as roots of the// stack tracer.KeepAlive(ep)// 以下唤醒后需要执行的代码// someone woke us up.if mysg ! gp.waiting {throw(G waiting list is corrupted)}gp.waiting nilgp.activeStackChans falseclosed : !mysg.successgp.param nilif mysg.releasetime 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c nilreleaseSudog(mysg)if closed {// 唤醒后发现 channel 被关闭了if c.closed 0 {throw(chansend: spurious wakeup)}panic(plainError(send on closed channel))}return true }代码比较长可以分为两大部分异常检测和发送数据 3.2.1 异常检测 代码一开始就排除了在异常章节中 nil channel 的情形比如未初始化或是被 GC 回收了。 接着会检测非阻塞模式下也就是有缓冲区的 channel如果还未 close 并且缓冲区已经满了则直接返回 false。 func TestASyncSendFull(t *testing.T) {ch : make(chan int, 1) // 创建一个缓冲区大小为 1 的 channelch - 1 // 向 channel 发送一个元素此时缓冲区已满select {case ch - 2: // 尝试发送第二个元素fmt.Println(Successfully sent 2)default: // 缓冲区已满进入 default 分支fmt.Println(channel is full, unable to send 2)} }3.2.2 发送数据 发送数据可以归纳为以下三点 直接发送当 recvq 存在等待的接收者时那么通过 runtime.send 直接将数据发送给阻塞的接收者 注意这里不会立马唤醒阻塞的接收者而是将等待接收数据的 goroutine 标记成可运行状态 grunnable 并把该 goroutine 放到发送方所在的处理器的 runnext 上等待执行该处理器在下一次调度时会立刻唤醒数据的接收方 异步发送当 buf 缓冲区存在空余空间时将发送的数据写入 channel 的缓冲区阻塞发送当不存在缓冲区或者缓冲区已满时等待其他 goroutine 从 channel 接收数据 将当前 goroutine 加入 sendq 发送队列并挂起阻塞等待其他的协程从 channel 接收数据当唤醒后检查是否因为 channel 关闭而唤醒如果是则触发 panic。 发送数据的过程中包含几个会触发 goroutine 调度的时机 发送数据时发现 channel 上存在等待接收数据的 goroutine立刻设置处理器的 runnext 属性但是并不会立刻触发调度发送数据时并没有找到接收方并且缓冲区已经满了这时会将自己加入 channel 的 sendq 发送队列并调用 runtime.goparkunlock 触发 goroutine 的调度让出处理器的使用权 3.3 接收数据 // entry points for - c from compiled code // //go:nosplit func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true) }//go:nosplit func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {_, received chanrecv(c, elem, true)return }// chanrecv receives on channel c and writes the received data to ep. // ep may be nil, in which case received data is ignored. // If block false and no elements are available, returns (false, false). // Otherwise, if c is closed, zeros *ep and returns (true, false). // Otherwise, fills in *ep with an element and returns (true, true). // A non-nil ep must point to the heap or the callers stack. func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// raceenabled: dont need to check ep, as it is always on the stack// or is new memory allocated by reflect.if debugChan {print(chanrecv: chan, c, \n)}// 如果在 nil channel 上进行 recv 操作那么会永远阻塞if c nil {// 非阻塞的情况下要直接返回非阻塞出现在一些 select 的场景中if !block {return}gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)throw(unreachable)}// Fast path: check for failed non-blocking operation without acquiring the lock.if !block empty(c) {// After observing that the channel is not ready for receiving, we observe whether the// channel is closed.//// Reordering of these checks could lead to incorrect behavior when racing with a close.// For example, if the channel was open and not empty, was closed, and then drained,// reordered reads could incorrectly indicate open and empty. To prevent reordering,// we use atomic loads for both checks, and rely on emptying and closing to happen in// separate critical sections under the same lock. This assumption fails when closing// an unbuffered channel with a blocked send, but that is an error condition anyway.if atomic.Load(c.closed) 0 {// Because a channel cannot be reopened, the later observation of the channel// being not closed implies that it was also not closed at the moment of the// first observation. We behave as if we observed the channel at that moment// and report that the receive cannot proceed.return}// The channel is irreversibly closed. Re-check whether the channel has any pending data// to receive, which could have arrived between the empty and closed checks above.// Sequential consistency is also required here, when racing with such a send.if empty(c) {// The channel is irreversibly closed and empty.if raceenabled {raceacquire(c.raceaddr())}if ep ! nil {typedmemclr(c.elemtype, ep)}return true, false}}var t0 int64if blockprofilerate 0 {t0 cputicks()}lock(c.lock)// 当前 channel 中没有数据可读if c.closed ! 0 {if c.qcount 0 {if raceenabled {raceacquire(c.raceaddr())}unlock(c.lock)if ep ! nil {typedmemclr(c.elemtype, ep)}return true, false}// The channel has been closed, but the channels buffer have data.} else {// sender 队列中有 sudog 在等待// 直接从该 sudog 中获取数据拷贝到当前 g 即可// Just found waiting sender with not closed.if sg : c.sendq.dequeue(); sg ! nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add senders value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(c.lock) }, 3)return true, true}}if c.qcount 0 {// 直接从 buffer 里拷贝数据// Receive directly from queueqp : chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)}if ep ! nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)// 接收索引 1c.recvxif c.recvx c.dataqsiz {c.recvx 0}// buffer 元素计数 -1c.qcount--unlock(c.lock)return true, true}// 非阻塞时且无数据可收if !block {unlock(c.lock)return false, false}// no sender available: block on this channel.gp : getg()mysg : acquireSudog()mysg.releasetime 0if t0 ! 0 {mysg.releasetime -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.// 打包成 sudogmysg.elem epmysg.waitlink nilgp.waiting mysgmysg.g gpmysg.isSelect falsemysg.c cgp.param nil// 进入 recvq 队列c.recvq.enqueue(mysg)// Signal to anyone trying to shrink our stack that were about// to park on a channel. The window between when this Gs status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.atomic.Store8(gp.parkingOnChan, 1)gopark(chanparkcommit, unsafe.Pointer(c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)// someone woke us up// 被唤醒if mysg ! gp.waiting {throw(G waiting list is corrupted)}gp.waiting nilgp.activeStackChans falseif mysg.releasetime 0 {blockevent(mysg.releasetime-t0, 2)}success : mysg.successgp.param nilmysg.c nilreleaseSudog(mysg)// 如果 channel 未被关闭那就是真的 recv 到数据了return true, success }func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {if c.dataqsiz 0 {if raceenabled {racesync(c, sg)}if ep ! nil {// copy data from sender// 直接从发送者复制数据recvDirect(c.elemtype, sg, ep)}} else {// 缓冲区已满从队列头部取出数据// Queue is full. Take the item at the// head of the queue. Make the sender enqueue// its item at the tail of the queue. Since the// queue is full, those are both the same slot.qp : chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)racenotify(c, c.recvx, sg)}// 将数据从队列复制到接收者// copy data from queue to receiverif ep ! nil {typedmemmove(c.elemtype, ep, qp)}// 将数据从发送者复制到队列// copy data from sender to queuetypedmemmove(c.elemtype, qp, sg.elem)c.recvxif c.recvx c.dataqsiz {c.recvx 0}c.sendx c.recvx // c.sendx (c.sendx1) % c.dataqsiz}sg.elem nilgp : sg.gunlockf()gp.param unsafe.Pointer(sg)sg.success trueif sg.releasetime ! 0 {sg.releasetime cputicks()}goready(gp, skip1) }在 Golang 的 channel 中有两种接收方式 num - ch num, ok - ch这两种分别对应上述源码中的 chanrecv1 和 chanrecv2不过最终都会走到 chanrecv 函数。 3.3.1 异常检测 当我们从一个 nil channel 接收数据时(这里 nil 有可能是被 GC 回收导致的)若是非阻塞的 channel 会直接返回否则会直接调用 runtime.gopark 让出处理器的使用权。 如果当前 channel 已经被 close 并且缓冲区中不存在任何数据那么会清除 ep 指针中的数据并立刻返回。这里也就说明了为什么可以多次从已关闭的 channel 读取数据而不会报错。 3.3.2 接收数据 从 channel 接收数据可以归纳为以下三种情况 3.3.2.1 直接接收 当 sendq 发送队列存在等待的发送者时通过 runtime.recv 从阻塞的发送者或者缓冲区中获取数据。具体分为以下两种场景可以仔细看 recv 函数 场景一 当 buf 缓冲区的容量 dataqsiz 为 0也就是同步的 channel调用 recvDirect 将 sendq 发送队列中 sudog 存储的 ep 数据直接拷贝到接收者的内存地址中。 场景二 当缓冲区已满时会有两次内存的拷贝 先取出 buf 缓冲区头部的数据发给接收者第一次拷贝接着取出 sendq 发送队列头的数据拷贝到 buf 缓冲区中并释放一个 sudog 阻塞的 goroutine第二次拷贝 到这里获取有人会问为什么不直接从 sendq 取出数据发给接收方而是要从 buf 里取出发给接收方 原因在于 Golang 在缓冲模式下channel 的数据在缓冲区中按照 FIFO先入先出顺序存储。缓冲区头部的数据肯定是最先存入的那么也就需要最先取出。 这里再说下场景二下关于 recvx 和 sendx 的更新机制。 缓冲区已满时的处理逻辑 当 buf 缓冲区满时recvx 指向的是 buf 的头部位置这也是下一个将要被接收的数据。注意此时 sendx 也是指向缓冲区的头部位置。因为缓冲区已满下一次发送会覆盖最旧的数据。 从缓冲区读取数据 此时从已满的 buf 缓冲区读取数据接收者从缓冲区的头部位置 recvx 获取数据并将数据传递给接收方。并更新 recvx使其指向下一个将要被接收的数据位置。 将 sendq 拷贝到缓冲区 由于此时 buf 头部的数据已经发送那么则取出 sendq 头部的数据覆盖刚刚头部的位置所在的数据并更新 sendx使其和 recvx 保持一致指向下一个要发送的位置。 这两个场景无论发生哪种情况运行时都会调用 runtime.goready 将当前处理器的 runnext 设置成发送数据的 goroutine在调度器下一次调度时将阻塞的发送方唤醒。 3.3.2.2 异步接收 当 buf 缓冲区的 qcount 大于 0 时也就是带缓冲的 channel 有数据时那么会从 buf 缓冲区中 recvx 的索引位置取出数据进行处理 如果接收数据的内存地址不为空那么会使用 runtime.typedmemmove 将缓冲区中的数据拷贝到内存中并通过 runtime.typedmemclr 清除队列中的数据最后更新 channel 上相关数据recvx 指向下一个位置(如果移动到了环形队列的队尾下标需要回到队头)channel 的qcount 长度减一并释放持有 channel 的锁 3.3.2.3 阻塞接收 当不属于上述两种情况即当 channel 的 sendq 发送队列中不存在等待的 goroutine 并且 buf 缓冲区中也不存在任何数据时从 channel 中接收数据的操作会变成阻塞的。此时会将当前的goroutine 挂起并加入 channel 的接收队列 recvq以便在有数据可用时能够被唤醒。 当然了若是 goroutine 被唤醒后会完成 channel 的阻塞数据接收。接收完最后进行基本的参数检查解除 channel 的绑定并释放 sudog。 结合异常检测那一节发现从 channel 接收数据时会触发 goroutine 调度的两个时机 当 channel 为 nil 时当 buf 缓冲区中不存在数据并且也不存在数据的发送者时 3.4 关闭管道 最后来看看关闭通道实现 func closechan(c *hchan) {// 关闭一个 nil channel 会直接 panicif c nil {panic(plainError(close of nil channel))}// 上锁这个锁的粒度比较大一直到释放完所有的 sudog 才解锁lock(c.lock)// 在 close channel 时如果 channel 已经关闭过了,直接触发 panicif c.closed ! 0 {unlock(c.lock)panic(plainError(close of closed channel))}if raceenabled {callerpc : getcallerpc()racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))racerelease(c.raceaddr())}c.closed 1var glist gList// release all readersfor {sg : c.recvq.dequeue()// 弹出的 sudog 是 nil,说明读队列已经空了if sg nil {break}// sg.elem unsafe.Pointer指向 sudog 的数据元素// 该元素可能在堆上分配也可能在栈上if sg.elem ! nil {typedmemclr(c.elemtype, sg.elem)sg.elem nil}if sg.releasetime ! 0 {sg.releasetime cputicks()}// 将 goroutine 入 glist// 为最后将全部 goroutine 都 ready 做准备gp : sg.ggp.param unsafe.Pointer(sg)sg.success falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// release all writers (they will panic)// 将所有挂在 channel 上的 writer 从 sendq 中弹出// 该操作会使所有 writer panic(向一个关闭的 channel 发数据会引起 panic)for {sg : c.sendq.dequeue()if sg nil {break}sg.elem nilif sg.releasetime ! 0 {sg.releasetime cputicks()}// 将所有挂在 channel 上的 writer 从 sendq 中弹出// 该操作会使所有 writer panicgp : sg.ggp.param unsafe.Pointer(sg)sg.success falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// 在释放所有挂在 channel 上的读或写 sudog 时,是一直在临界区的unlock(c.lock)// Ready all Gs now that weve dropped the channel lock.for !glist.empty() {gp : glist.pop()gp.schedlink 0// 使 g 的状态切换到 Grunnablegoready(gp, 3)} }3.4.1 异常检测 关闭一个 nil channel 会直接 panic在 close channel 时如果 channel 已经关闭过了,直接触发 panic 3.4.2 释放所有接收方和发送方 关闭 channel 的主要工作是释放所有的 readers 和 writers。 主要就是取出 recvq 和 sendq 的 sudog 加入到 goroutine 待清除 glist 队列中与此同时该函数会清除所有 runtime.sudog 上未被处理的元素。同时需要注意的是在处理 sendq 时有可能会 panic在之前的异常情况中列举往一个 close 的 channel 发送数据会引起 panic。 最后会为所有被阻塞的 goroutine 调用 runtime.goready 触发调度。将所有 glist 队列中的 goroutine 状态从 _Gwaiting 设置为 _Grunnable 状态等待调度器的调度。 3.4.3 优雅关闭通道 最后说说如何优雅关闭 channel。 通过之前的异常小节介绍发现 向已关闭的 channel 发送数据会导致 panic重复关闭 channel也会导致 panic 同时还了解了 从一个已关闭的 channel 中接收数据会得到零值且不会导致程序异常关闭一个 channel那么所有接收这个 channel 的 select case 都会收到信号 那么这里就引用 How to Gracefully Close Channels 介绍的优雅关闭 channel 方法来收尾。 package _0240623import (logmath/randstrconvsynctestingtime )func TesGracefullyCloseChannel(t *testing.T) {rand.Seed(time.Now().UnixNano()) // needed before Go 1.20log.SetFlags(0)// ...const Max 100000const NumReceivers 10const NumSenders 1000wgReceivers : sync.WaitGroup{}wgReceivers.Add(NumReceivers)// ...dataCh : make(chan int)stopCh : make(chan struct{})// stopCh is an additional signal channel.// Its sender is the moderator goroutine shown// below, and its receivers are all senders// and receivers of dataCh.toStop : make(chan string, 1)// The channel toStop is used to notify the// moderator to close the additional signal// channel (stopCh). Its senders are any senders// and receivers of dataCh, and its receiver is// the moderator goroutine shown below.// It must be a buffered channel.var stoppedBy string// moderatorgo func() {stoppedBy -toStopclose(stopCh)}()// sendersfor i : 0; i NumSenders; i {go func(id string) {for {value : rand.Intn(Max)if value 0 {// Here, the try-send operation is// to notify the moderator to close// the additional signal channel.select {case toStop - sender# id:default:}return}// The try-receive operation here is to// try to exit the sender goroutine as// early as possible. Try-receive and// try-send select blocks are specially// optimized by the standard Go// compiler, so they are very efficient.select {case -stopCh:returndefault:}// Even if stopCh is closed, the first// branch in this select block might be// still not selected for some loops// (and for ever in theory) if the send// to dataCh is also non-blocking. If// this is unacceptable, then the above// try-receive operation is essential.select {case -stopCh:returncase dataCh - value:}}}(strconv.Itoa(i))}// receiversfor i : 0; i NumReceivers; i {go func(id string) {defer wgReceivers.Done()for {// Same as the sender goroutine, the// try-receive operation here is to// try to exit the receiver goroutine// as early as possible.select {case -stopCh:returndefault:}// Even if stopCh is closed, the first// branch in this select block might be// still not selected for some loops// (and forever in theory) if the receive// from dataCh is also non-blocking. If// this is not acceptable, then the above// try-receive operation is essential.select {case -stopCh:returncase value : -dataCh:if value Max-1 {// Here, the same trick is// used to notify the moderator// to close the additional// signal channel.select {case toStop - receiver# id:default:}return}log.Println(value)}}}(strconv.Itoa(i))}// ...wgReceivers.Wait()log.Println(stopped by, stoppedBy) }这段代码的核心是这里 // moderator go func() {stoppedBy -toStopclose(stopCh) }()对于生产者和消费者是 M*N 的情况显然既不能在生产方关闭通道也不适合在消费方关闭通道。那么就引入中间方那就是 toStop起个 goroutine 然后 stoppedBy -toStop 阻塞在这里只要生产者和消费者一方满足条件向 toStop 写入数据了那么就可以关闭 stopCh。这也正好契合上面的 moderator 注释一个 协调者用来协调生产者和消费者在 M*N 情况下如何优雅关闭 channel。
http://www.hkea.cn/news/14260795/

相关文章:

  • 网站开发要会英语吗怎么修改wordpress目录名字
  • 宁波 电商平台网站建设江西萍乡做网站公司
  • 企业建站公司哪里有搜索引擎优化包括以下哪些内容
  • 百度云自助建站怎样做才能让自己的网站
  • 青岛网站制作定制dede如何设置网站端口
  • 海珠营销型网站建设域名阿里云
  • 做网站存在的问题简单的wordpress模板
  • 个人免费自助建站网站建信网证书查询平台
  • 网站建设框架怎么写番禺核酸检测定点医院名单
  • 电子商务网站建设的目标是什么意思网站怎么做收录
  • 大学生可以做的网站项目凡科网建网站付费链接怎么做
  • 怎样做网页游戏网站保定城乡建设局网站
  • wordpress英语培训主题品牌seo如何优化
  • 网站被墙查询静态网页图片
  • 多网合一网站万网买的网站备案
  • 345诛仙网站是谁做的金坛网站建设报价
  • 用源码建设网站推荐软件分类
  • 可免费商用的cms建站系统深圳网站建设网牛天下
  • 几十万做网站平台阿里云怎么申请域名
  • 邯郸做移动网站找谁临检中心网站建设
  • 张家界搜索引擎优化抖音优化公司
  • 网站开发前后端分工抚州seo排名
  • 网站开发数据网页设计详细步骤
  • 设计网站用户需求分析报告如何在百度上发自己的广告?
  • 中英双语网站建设网络广告推广
  • 上海网站建设排名公司wordpress文章列分页
  • wordpress建售卖产品的网站网站地图的作用
  • 网站建设公司厦门有哪些网络设计与实施
  • 内网代理ip建设网站wordpress评论feed
  • 四川网站建设设计公司湖南中海建设集团有限公司网站