商城网站开发方案书,怎样给网站增加栏目,企业网站为什么打不开,个人备案经营网站Go-知识-定时器 1. 介绍2. Timer使用场景2.1 设定超时时间2.2 延迟执行某个方法 3. Timer 对外接口3.1 创建定时器3.2 停止定时器3.3 重置定时器3.4 After3.5 AfterFunc 4. Timer 的实现原理4.1 Timer数据结构4.1.1 Timer4.1.2 runtimeTimer 4.2 Timer 实现原理4.2.1 创建Timer… Go-知识-定时器 1. 介绍2. Timer使用场景2.1 设定超时时间2.2 延迟执行某个方法 3. Timer 对外接口3.1 创建定时器3.2 停止定时器3.3 重置定时器3.4 After3.5 AfterFunc 4. Timer 的实现原理4.1 Timer数据结构4.1.1 Timer4.1.2 runtimeTimer 4.2 Timer 实现原理4.2.1 创建Timer4.2.2 停止Timer4.2.3 重置Timer 5. Ticker 使用场景5.1 简单定时任务5.2 定时刷新缓存 6. Ticker 对外接口6.1 创建定时器6.2 停止定时器6.3 Tick 7. Ticker 实现原理7.1 数据结构7.1.1 Ticker7.1.2 runtimeTimer 7.2 Ticker实现原理7.2.1 创建Ticker7.2.2 停止Ticker 8. runtimeTimer 原理(go 1.11)8.1 定时器存储8.1.1 timer 的数据结构8.1.2 timersBucket 的数据结构8.1.3 Ticker timer timersBucket 关系8.1.4 timersBucket 数组 8.2 定时器运行机制8.2.1 创建定时器8.2.2 删除定时器 8.3 资源泄露 9. 性能优化 (go 1.14)9.1 消除了timersBucket9.2 消除了timeproc9.3 更少的锁竞争9.4 更少的上下文切换9.5 优化效果 Go语言提供了两种定时器分别为一次性定时器和周期性定时器。 一次性定时器(Timer):定时器只计时一次计时结束变停止运行。周期性定时器(Ticker):定时器周期性进行计时除非主动停止否则将永久运行。 以下都是 go 1.10~1.13的逻辑在go 1.14 有非常大的优化。 1. 介绍
Timer是一种单一事件的定时器即经过指定的时间后触发一个事件这个事件通过其本身提供的 channel进行通知。Timer只执行一次就结束。 通过timer.NewTimer(d Duration)可以创建一个Timer参数即等待时间时间到来后立即触发一个事件。 在src/time/sleep.go中定义了Timer的数据结构
Timer对外仅暴露一个channel指定的时间到来时就往该channel种写入系统时间即一个事件。
Ticker是周期性定时器即周期性地触发一个事件通过Ticker本身提供的channel将事件传递出去。 Ticker的数据结构与Timer非常类似
Ticker对外仅暴露一个channel当指定的时间到来时就往该channel种写入系统时间即一个事件。 在创建Ticker时会指定一个时间作为事件触发的周期这是Ticker和Timer最主要的区别。
2. Timer使用场景
2.1 设定超时时间
有时候在等待资源的时候又不希望永久等待而是希望加个超时时间如果在指定的时间还未获取到那么就超时不在等待了。
func TestWait(t *testing.T) {timer : time.NewTimer(1 * time.Second)c : make(-chan string)select {case -c:t.Log(chan string )case -timer.C:t.Log(time out)}
}因为case -c 不会触发所以在1秒后超时结束
通过select语句轮询timer.C 和 c 两个channel如果1s内c还没有数据写入那就认为超时了。
2.2 延迟执行某个方法
在应用启动中一般会初始化很多组件如果在应用启动后马上就使用可能出现组件还未初始化成功拿到的组件对象是还未准备好的对象。 这个时候如果能延迟使用那么就不会出现使用未准备好的组件的情况。
func TestDelay(t *testing.T) {timer : time.NewTimer(1 * time.Second)select {case -timer.C:t.Log(wait 1 s)}t.Log(do something)
}select 只有一个case就是当时间到了该case满足如果时间没到那么select会阻塞。 当触发select 后可以什么都不做也可以打印日志然后跳出select的语句顺序执行后续逻辑以此实现延迟等待后执行。
3. Timer 对外接口
3.1 创建定时器
使用func NewTimer(d Duration) *Timer方法指定一个时间即可创建一个Timer,Timer一经创建便开始计时不需要额外的启动命令。 实际上创建Timer意味着把一个计时任务交给系统守护协程该协程管理着所有的Timer当Timer的时间到达后Timer向Channel中发送当前的时间作为事件。
3.2 停止定时器
Timer创建后可以随时停止停止计时器的方法如下 func (t *Timer) Stop() bool 其返回值代表定时器有没有超时。
true : 定时器超时前停止后续不会在发送事件false : 定时器超时后停止。
实际上停止计时器意味着通知系统守护协程移除该定时器。
3.3 重置定时器
已过期的定时器或已停止的定时器可以通过重置动作重新激活重置方法如下 func (t *Timer) Reset(d Duration) bool 重置的动作实质上是先停止定时器在启动其返回值即停止计时器(Stop()) 的返回值。 需要注意的是重置定时器虽然可以用于修改还未超时的定时器但正确的使用方式还是针对已过期的定时器或已被停止的定时器同时其返回值也不可靠返回值存在的价值仅仅是与前面的版本兼容。 实际上重置定时器意味着通知系统守护协程移除该定时器重新设定时间后再把定时器交给守护协程。
3.4 After
如果仅仅是向等待指定的时间没有提前停止定时器的需求也没有复用该定时器的需求那么可以使用匿名的定时器。 使用func After(d Duration) -chan Time方法创建一个定时器并返回定时器的管道。
func TestAfter(t *testing.T) {select {case -time.After(1 * time.Second):t.Log(after 1 s)}
}执行后和之前的延迟执行一模一样
实际上还是一个定时器但是代码更加简洁。
3.5 AfterFunc
除了 After调用返回 channel进行同步处理还可以使用 AfterFunc将需要延迟的操作交给系统协程异步执行。 AfterFunc的定义 func AfterFunc(d Duration, f Func()) *Timer 比如
func TestAfterFunc(t *testing.T) {time.AfterFunc(1*time.Second, func() {t.Log(after 1 s)})t.Log(waiting)time.Sleep(2 * time.Second)t.Log(done)
}执行结果如下
很明显执行和等待已经不是一个协程了.
4. Timer 的实现原理
4.1 Timer数据结构
4.1.1 Timer
在源码包src/time/sleep.go中定义了其数据结构
Timer只有两个成员
C:channel 上层应用根据此channel接收事件r:runtime定时器该定时器即系统管理的定时器对上层应用不可见。 这里按照层次来理解Timer的数据结构Timer.C是面向Timer用户的Timer.r是面向底层的定时器实现。 4.1.2 runtimeTimer
创建一个Timer实质上是把一个定时任务交给专门的写成进行监控这个任务的载体便是runtimeTimer。 每创建一个Timer意味着创建了一个runtimeTimer变量然后把它交给系统进行监控。通过设置runtimeTimer过期后的行为来达到定时的目的。
type runtimeTimer struct {tb uintptr // 存储当前定时器的数组地址i int // 存储当前定时器的数组下标when int64 // 当前定时器触发事件period int64 // 当前定时器周期性触发间隔f func(interface{}, uintptr) // 定时器触发时执行的回调函数arg interface{} // 定时器触发时执行回调函数传递的参数一seq uintptr // 定时器触发时执行回调函数传递的参数二
}tb: 系统底层存储runtimeTimer的数组地址i: 当前runtimeTime在tb数组中的下标when: 定时器触发事件的时间period: 定时器周期性触发间隔(对于Timer来说此值为0)f: 定时器触发时执行的回调函数回调函数接收两个参数。arg: 定时器触发时执行回调函数的参数一seq: 定时器触发时执行回调函数的参数二(Timer并不使用该参数)
4.2 Timer 实现原理
一个进程中的多个Timer都由底层的写成来管理这个协程称为系统协程。 runtimeTimer存放在数组中并看招when字段对所有的runtimeTimer进行堆排序定时器触发时执行runtimeTimer中的预定义函数f即完成了一次定时任务。
4.2.1 创建Timer
创建Timer的实现非常简单
func NewTimer(d Duration) *Timer {c : make(chan Time, 1) // 创建一个channelt : Timer{ // Timer的数据结构C: c,r: runtimeTimer{when: when(d), // 触发事件f: sendTime, // 触发后执行 sendTime 函数arg: c, // 触发后执行sendTime函数时附带的参数},}startTimer(t.r) // 此处启动定时器只是把 runtimeTimer放到系统协程的堆中由系统协程维护return t
}NewTimer函数只是构造了一个Timer然后把Timer.r 通过startTimer()交给系统协程维护。其中when()方法是计算下一次 定时器触发的绝对时间即当前时间NewTimer()的参数d。sendTimer()方法是定时器触发时的动作。
when函数
// when是一个辅助函数用于设置runtimeTimer的“when”字段。
// 它返回未来的持续时间d单位为纳秒。
// 如果d为负则忽略。如果返回值小于
// 由于溢出返回MaxInt64。
func when(d Duration) int64 {if d 0 {return runtimeNano()}t : runtimeNano() int64(d)if t 0 {t 163 - 1 // math.MaxInt64}return t
}sendTimer函数
func sendTime(c interface{}, seq uintptr) {//c上的非阻塞时间发送。//在NewTimer中使用它无论如何都不能阻塞缓冲区。//在NewTicker中使用在地板上放下发送是//当读者落后时的期望行为//因为发送是周期性的。select {case c.(chan Time) - Now():default:}
}sendTime接收一个channel作为参数其主要任务是向channel中写入当前时间。 创建Timer时生成的管道含有一个缓冲区(make(chan Time,1))所以Timer触发时向channel写入事件永远不会阻塞sendTime写完即退出。 sendTime使用select搭配一个空的default分支是因为Ticker也复用了sendTimeTicker触发时也会向channel中写入事件但无法保证之前的数据已经 被取走所以使用select并搭配一个空的default分支确保sendTime不会阻塞Ticker触发时如果管道中还有值则本次不在向管道中写入时间将本次触发的事件直接丢弃。
startTime函数
startTime函数的具体实现在runtime包中其主要作用是把runtimeTimer写入系统写成的数组中并启动系统协程(如果系统协程还未开始运行)
其主要是addTimer(t *timer)函数timer是runtime包中用于表示time包中runtimeTimer结构的struct
addTimer(t *timer)函数
func addtimer(t *timer) {tb : t.assignBucket() // 获取time桶数组lock(tb.lock) // 加锁ok : tb.addtimerLocked(t) // 将 timer加入数据组unlock(tb.lock) // 解锁if !ok { // 如果timer加入数组失败那么触发 panic badTimer()}
}4.2.2 停止Timer
停止Timer,只是简单地把Timer从系统协程中移除。
stopTimer即通知系统协程把该Timer移除即不在监控。 stopTimer也是runtime中的函数
系统协程只是移除Timer,并不会关闭channel以避免用户协程读取错误。 Stop 的返回值取决于定时器的状态
如果Timer已经触发则Stop返回false如果Timer还未触发则Stop返回true 4.2.3 重置Timer
重置Timer时会先把Timer从系统协程中删除修改新的时间后重新添加到系统协程中。 重置的实现如下
其返回值与Stop保持一致如果Timer成功停止则返回true如果Timer已经触发则返回false.
由于新加的Timer时间很可能变化所以其在系统协程中的位置也会相应地发生变化。 需要注意的是按照源码注释Reset应该作用于已经停止的Timer或已经触发的Timer。 按照这个约定Reset的返回值总是false,仍然保留是为了保持向前兼容使用老版本Go编写的应用不需要因为Go升级而修改代码。 如果不按照此约定使用Reset则有可能遇到Reset和Timer触发后同时执行的情况此时有可能会收到两个事件从而对应用程序造成一些负面影响。
5. Ticker 使用场景
5.1 简单定时任务
假设需求是每隔1秒就报时一次
func TestTicker(t *testing.T) {// 创建一个一秒的Tickerticker : time.NewTicker(1 * time.Second)// 使用defer关闭Tickerdefer ticker.Stop()// 收到事件就打印时间 (根据之前Timer的实现res : -ticker.C , res 就是 Time 类型的时间戳 )for range ticker.C {t.Log(tick 1 s : time.Now().String())}
}如果不主动停止那么永远不会结束。
5.2 定时刷新缓存
假设需求是定时将内存中的数据写到磁盘中或者内存中数据满了需要落盘:
func TestFlush(t *testing.T) {// 创建一个每1秒刷新一次的Tickerticker : time.NewTicker(1 * time.Second)// 使用 defer 关闭defer ticker.Stop()// 模拟内存数据mem : 0for {select {// 时间到不管如何必须刷新case -ticker.C:t.Log(flush 1 s : time.Now().String())default:// 内存满了需要刷新if mem 15 {t.Logf(flush mem : %d, mem)mem 0}}// 每次for循环设置随机值mem rand.Intn(10)// 每次循环等待300毫秒time.Sleep(300 * time.Millisecond)}
}6. Ticker 对外接口
6.1 创建定时器
使用NewTicker函数就能创建一个Ticker
// NewTicker返回一个新的Ticker其中包含一个将发送
// 由duration参数指定的时间段。
// 它调整间隔或降低滴答声以弥补接收器速度慢的问题。
// 持续时间d必须大于零否则NewTicker将会恐慌。
// 停止自动收报机以释放相关资源。
func NewTicker(d Duration) *Ticker {if d 0 {panic(errors.New(non-positive interval for NewTicker))}c : make(chan Time, 1)t : Ticker{C: c,r: runtimeTimer{when: when(d),// Timer 的 period 是 0period: int64(d),f: sendTime,arg: c,},}startTimer(t.r)return t
}NewTimer和NewTicker非常类似在初始化runtimeTimer的时候Timer没有设置periodTicker则设置period等于d.
6.2 停止定时器
使用定时器对外暴露的Stop方法就可以停止Ticker
其实不管是 Timer还是Ticker的startTimer和stopTimer最终都是由runtime/time.go 中的timer的startTimer和stopTimer实现的 需要注意的是该方法会停止计时意味着不会向定时器的channel中写入事件但是channel并不会给关闭channel在使用完后声明周期结束后会自动释放。 Ticker 在使用完后务必要释放否则会产生资源泄露进而会持续消耗CPU资源最后会把CPU资源消耗完。 6.3 Tick
在有些长江下启动一个Ticker后该Ticker永远不会停止比如定时轮询任务此时可以使用一个简单的Tick函数来获取定时器的channel。
注释上面说的很明白因为该函数内部实际上还是创建了一个Ticker但是并没有返回只是返回了channel因为没有Ticker对象所以没法调用Stop。 在for循环中使用Ticker的时候一定三思 在Timer中很容易写出如下代码 for {select {case -time.After(1 * time.Second):t.Log(flush 1 s : time.Now().String())}
} 使用Timer这样写当然没错因为Timer在触发事件后就会从数组中移除。 但是当把Timer换成Ticker那么就出现了资源泄露 for {select {case -time.Tick(1 * time.Second):t.Log(flush 1 s : time.Now().String())}
}上面的代码出现了资源泄露因为 Tick 会创建Ticker并且因为使用Tick直接获取的Ticker.C所以没有手段去Stop。 随着for的执行最终会导致越来越多的Ticker耗尽CPU资源。 7. Ticker 实现原理
实际上Ticker与Timer几乎完全相同数据结构和内部实现机制都相同唯一不同的是创建方式。 在创建Timer时不指定时间触发周期时间触发后Timer自动销毁。而在创建Ticker时会指定一个事件触发周期事件按照这个周期触发如果不显式停止则定时器永不停止。
7.1 数据结构
7.1.1 Ticker
Ticker的数据结构与Timer的数据结构除名字不同外其他完全一样。 源码包src/time/tick.go中定义了数据结构
Ticker只有两个成员
C: channel上层应用根据此channel接收事件。r: runtimeTimer定时器该定时器即系统管理的定时器对上层应用不可见。
按照层次来理解Ticker的数据结构Ticker.C 是面向Ticker 用户的Ticker.r 是面向底层的定时器的。
7.1.2 runtimeTimer
runtimeTimer和Timer的一样创建一个Timer实质上是把一个定时任务交给专门的写成进行监控这个任务的载体便是runtimeTimer。 每创建一个Timer意味着创建了一个runtimeTimer变量然后把它交给系统进行监控。通过设置runtimeTimer过期后的行为来达到定时的目的。
type runtimeTimer struct {tb uintptr // 存储当前定时器的数组地址i int // 存储当前定时器的数组下标when int64 // 当前定时器触发事件period int64 // 当前定时器周期性触发间隔f func(interface{}, uintptr) // 定时器触发时执行的回调函数arg interface{} // 定时器触发时执行回调函数传递的参数一seq uintptr // 定时器触发时执行回调函数传递的参数二
}tb: 系统底层存储runtimeTimer的数组地址i: 当前runtimeTime在tb数组中的下标when: 定时器触发事件的时间period: 定时器周期性触发间隔(对于Timer来说此值为0)f: 定时器触发时执行的回调函数回调函数接收两个参数。arg: 定时器触发时执行回调函数的参数一seq: 定时器触发时执行回调函数的参数二(Timer并不使用该参数)
7.2 Ticker实现原理
7.2.1 创建Ticker
创建Ticker的实现非常简单
func NewTicker(d Duration) *Ticker {if d 0 {panic(errors.New(non-positive interval for NewTicker))}c : make(chan Time, 1)t : Ticker{C: c,r: runtimeTimer{when: when(d),period: int64(d), // 这个在Timer中是没有的f: sendTime,arg: c,},}startTimer(t.r)return t
}Ticker 和Timer的重要区别就是提供了period参数据此决定Timer是一次性的还是周期性的。 NewTicker只是构造了一个Ticker然后把Ticker通过startTimer交给系统协程维护。 其中period为事件触发的周期sendTime函数是定时器触发时的动作。
sendTimer函数
func sendTime(c interface{}, seq uintptr) {//c上的非阻塞时间发送。//在NewTimer中使用它无论如何都不能阻塞缓冲区。//在NewTicker中使用在地板上放下发送是//当读者落后时的期望行为//因为发送是周期性的。select {case c.(chan Time) - Now():default:}
}sendTime接收一个channel作为参数其主要任务是向channel中写入当前时间。 创建Timer时生成的管道含有一个缓冲区(make(chan Time,1))所以Timer触发时向channel写入事件永远不会阻塞sendTime写完即退出。 sendTime使用select搭配一个空的default分支是因为Ticker也复用了sendTimeTicker触发时也会向channel中写入事件但无法保证之前的数据已经 被取走所以使用select并搭配一个空的default分支确保sendTime不会阻塞Ticker触发时如果管道中还有值则本次不在向管道中写入时间将本次触发的事件直接丢弃。
startTime函数
startTime函数的具体实现在runtime包中其主要作用是把runtimeTimer写入系统写成的数组中并启动系统协程(如果系统协程还未开始运行)
其主要是addTimer(t *timer)函数timer是runtime包中用于表示time包中runtimeTimer结构的struct
addTimer(t *timer)函数
func addtimer(t *timer) {tb : t.assignBucket() // 获取time桶数组lock(tb.lock) // 加锁ok : tb.addtimerLocked(t) // 将 timer加入数据组unlock(tb.lock) // 解锁if !ok { // 如果timer加入数组失败那么触发 panic badTimer()}
}7.2.2 停止Ticker
停止Ticker时只是简单地把Ticker从系统协程中移除。
stopTimer即通知系统协程把该Ticker移除即不在监控。 stopTimer也是runtime中的函数
系统协程只是移除Ticker,并不会关闭channel以避免用户协程读取错误。 与Timer不同的是Ticker停止时没有返回值即不需要关注返回值实际上返回值也没有什么用途。 Ticker 没有重置接口即Ticker创建后不能通过重置修改周期。 需要格外注意的是Ticker用完后必须主动停止否则会产生资源泄露持续消耗CPU资源。 8. runtimeTimer 原理(go 1.11)
NewTimer和NewTicker都会在底层创建一个runtimeTimer,runtime包负责管理runtimeTimer保证定时器按照约定的时间触发。
Go 1.10 之前 所有的runtimeTimer 保存在一个全局的堆中G0 1.10 ~ 1.13 : runtimeTimer被拆分到多个全局的堆中减少了多个系统协程的锁等待时间Go 1.14: runtimeTimer保存在每个处理器P中消除了专门的系统协程减少了系统协程上下文切换。
8.1 定时器存储
8.1.1 timer 的数据结构
Timer和Ticker的数据结构除名字外其他完全一样二者都含有一个runtimeTimer类型的成员这就是系统协程所维护的对象。 runtimeTimer类型是time包的名字在runtime包中这个类型叫做timer.
// 包装时间知道这个结构的布局。
// 如果此结构更改请调整/time/sleep.go:/runtimeTimer。
// 对于GOOSnacl包syscall知道这个结构的布局。
// 如果此结构更改请调整/syscall/net_nacl.go:/runtimeTimer。
type timer struct {tb *timersBucket // the bucket the timer lives in // 当前定时器寄存于系统timer堆的地址i int // heap index // 当前定时器寄存于系统timer堆的下标//定时器在何时唤醒然后在何时时段唤醒。。。仅限大于0的时段//每次在计时器goroutine中调用fargnow时f必须//一个行为良好的函数而不是块。when int64 // 当前定时器下次触发时间period int64 // 当前定时器周期性触发间隔(如果是Timer间隔为0表示不重复触发)f func(interface{}, uintptr) // 定时器触发时执行的函数arg interface{} // 定时器触发时执行的参数一seq uintptr // 定时器触发时执行的参数二(该参数只在网络收发场景下使用)
}其中timersBucket便是系统协程存储timer的容器里面有一个切片来存储timer而i便是timer所在切片的下标。
8.1.2 timersBucket 的数据结构
//go:notinheap
type timersBucket struct {lock mutexgp *g // 处理堆中事件的协程created bool // 时间处理协程是否已创建默认为false添加收个定时器是置为truesleeping bool // 事件处理协程(gp)是否在睡眠(如果t中有定时器那么还未到达触发的时间gp会睡眠)rescheduling bool // 事件处理协程(gp)是否已暂停(如果t中定时器均已删除那么gp会暂停)sleepUntil int64 // 时间处理协程睡眠事件waitnote note // 时间处理协程睡眠事件(据此唤醒协程)t []*timer // 定时器切片
}Bucket是存储timer的桶。
lock: 互斥锁在timer增加和删除时需要加锁防止并发gp: 事件处理协程就是系统协程这个协程在首次创建Timer或Ticker时生成created: 状态值表示系统协程是否创建sleeping: 系统协程是否已暂停sleepUntil: 系统协程睡眠到指定的时间(如果有新的定时任务则可能会提前唤醒)waitnote: 提前唤醒时使用的通知t: 保存timer的切片当调用NewTimer或NewTicker时便会有新的timer存储到切片中
系统协程在首次创建定时器时创建定时器存储在切片中系统写成负责计时并维护这个切片。
8.1.3 Ticker timer timersBucket 关系
假设创建了1个Timer2个Ticker关系如下
用户创建Timer或者Ticker时会生成一个timer这个timer指向timersBuckettimersBucket记录timer的指针。
8.1.4 timersBucket 数组
通过timersBucket的数据结构可以看到系统写成负责计时并维护其中的多个timer,一个timersBucket由一个特定的系统协程来维护。 当系统重的定时器非常多时一个系统协程的处理能力可能跟不上所以Go在实现时实际上提供了多个timerBucket也就是有多个系统协程来处理定时器。 最理想的情况是应该预留GOMAXPROCS个timersBucket以便充分使用CPU资源但需要根据实际环境动态分配。为了实现简单Go在实现时预留了64个timersBucket可以满足绝大部分场景。
在addTimer时调用的
当协程创建定时器时使用协程所属的ProcessId%64来计算定时器存入的timersBucket。 在上面的关系中当三个协程创建定时器时定时器的分布可能如下 一般情况下同一个Process的协程创建的定时器分布于同一个timersBucket中只有当GOMAXPROCS大于64时才会出现多个Process分布于同一个timersBucket中的情况。 8.2 定时器运行机制
8.2.1 创建定时器
创建Timer或Ticker实际上分为两步
创建一个channel创建一个timer并启动(这里的timer是指runtime包中的timer,不是Timer) 不管是 Timer还是Ticker都是先创建channelchannel都是带有一个缓冲区的。 接着创建timer调用startTimer启动。 startTimer在runtime包中实现通过go:link关联
addTimer的实现如下
func addtimer(t *timer) {tb : t.assignBucket() // 分配 timersBucket ,从64个中选择一个lock(tb.lock) // 加锁ok : tb.addtimerLocked(t) // 加入切片unlock(tb.lock) // 解锁if !ok { // 加入失败badTimer() // 错误处理}
}首先每个timer都必须归属于某个timersBucket所以第一步是先选择一个timersBucket选择的算法很简单将当前协程所属的Process ID 与 timersBucket数据长度求模结果就是timersBucket数组的下标。
其次每个timer都必须加入timersBuckettimersBucket数据结构中的切片t保存着timer的指针新创建的timer也需要加入这个切片。 保存timer的切片是一个按timer触发事件排序的小顶堆所以新timer插入的过程中会触发堆调整堆顶的timer最快被触发。
// 在堆中添加一个计时器并在新计时器为时启动或启动timerproc
// 比其他任何人都早。
// 计时器已锁定。
// 返回是否一切正常如果数据结构损坏则返回false
// 由于用户级别的竞争。
func (tb *timersBucket) addtimerLocked(t *timer) bool {//何时决不能为负数否则timerproc将溢出//在其增量计算期间并且永远不会使其他运行时计时器过期。if t.when 0 {t.when 163 - 1}t.i len(tb.t) // 先把定时器插入堆尾tb.t append(tb.t, t) // 保存定时器if !siftupTimer(tb.t, t.i) { // 在堆中插入数据触发重新排序return false}if t.i 0 { // 堆排序后如果新插入的定时器跑到了堆顶需要唤醒系统协程来处理// siftup moved to top: new earliest deadline.if tb.sleeping tb.sleepUntil t.when { // 系统协程在睡眠(切片中有数据未到时间)唤醒系统协程来处理新加入的定时器tb.sleeping falsenotewakeup(tb.waitnote)}if tb.rescheduling { // 系统协程已暂停(切片中没有数据)唤醒系统协程来处理新加入的定时器tb.rescheduling falsegoready(tb.gp, 0)}if !tb.created { // 如果是系统协程收个定时器则启动协程处理堆中的定时器tb.created truego timerproc(tb) // 系统协程就是这里创建的}}return true
}如果timer的时间是负值那么就会被修改为很大的值来保证后续定时算法的正确性系统协程是在首次添加timer时创建的并不是一直存在新加入timer后如果新的timer跑到了堆顶则意味着新的timer需要立即处理那么会唤醒系统协程
小顶堆排序
func siftupTimer(t []*timer, i int) bool { // 入参是数组和数组尾部if i len(t) {return false} // 数组越界when : t[i].when // 获取数组尾部元素的触发事件tmp : t[i] // 存储到临时变量中for i 0 { // 只要没到堆顶那么就一直运行p : (i - 1) / 4 // parent // 使用 4 叉堆所以 (i-1)/4 得到父节点if when t[p].when { // 如果 子节点的触发事件大于等于 父节点那么结束满足小顶堆的规则break // 满足小顶堆的规则结束}t[i] t[p] // 否则先将父节点 写到 子节点位置上(子节点在临时变量中有不会丢失)t[i].i i // 将 timersBucket中的index 设置为子节点的indexi p // 将指针移动到父节点进行下一轮循环(因为使用数组保存树所以当index0时就到达了根节点)}if tmp ! t[i] { // 判断是否发生了并发理论上 tmp 就是 t[i]t[i] tmp // 如果并发导致数据不一致那么强行设置t[i].i i}return true
}这是用数组存储的一个小顶堆的维护逻辑。
创建系统协程 当第一次addTimer的时候会触发创建系统协程
// Timerproc运行时间驱动的事件。
// 它一直休眠到tb堆中的下一个事件。
// 如果addtimer插入一个新的较早事件它会提前唤醒timerproc。
func timerproc(tb *timersBucket) {tb.gp getg() // 获取协程for {lock(tb.lock) // 加锁tb.sleeping false // 设置标志now : nanotime() // 获取当前时间delta : int64(-1)for {if len(tb.t) 0 { // 切片数组为空堆中没有等待的元素delta -1 // 不创建系统协程结束break}t : tb.t[0] // 拿到小顶堆堆顶的元素delta t.when - now // 计算需要等待的时间if delta 0 { // 如果还未到时间结束delta 大于0表示时间未到小于0表示已过时执行break}ok : trueif t.period 0 { // Ticker 的 period 才会大于0 Timer 的 period 等于0// leave in heap but adjust next time to firet.when t.period * (1 -delta/t.period)// delta小于0表示本次是过时执行那么下次执行时间需要加上delta比如delta1s那么下次执行时间 when(next) when(this) period delta// 将上述式子进行变形 when(next) when(this) period (1 delta/period)// 因为已知delta 0 ,所以 when(next)when(this) period * (1 -delta/period) when period * (1 -delta/period)if !siftdownTimer(tb.t, 0) { // 将根节点值进行了增加那么从 小堆顶 进行平衡ok false}} else { // period 0 表示是 Timer触发一次后需要从小顶堆中删除// remove from heaplast : len(tb.t) - 1 // 切片长度缩小if last 0 { // 小顶堆不为空tb.t[0] tb.t[last] // 将最大值设置到根节点然后执行 根节点值增加然后从 小堆顶进行平衡tb.t[0].i 0}tb.t[last] nil // 将最后一个元素置空删除显示的置空用于 gctb.t tb.t[:last] // 将最后一个元素删除if last 0 { // 如果 小顶堆不为空那么从根节点进行平衡if !siftdownTimer(tb.t, 0) {ok false}}t.i -1 // mark as removed // 标记该 timer 已经被删除了无法通过 index 在 timersBucket中索引到了}f : t.f // 拿到 触发后的 funcarg : t.arg // 获取第一个参数channel , Timer.C or Ticker.Cseq : t.seq // 获取第二个参数只有网络才会用到unlock(tb.lock) // 解锁if !ok { // 如果小顶堆平衡失败那么返回错误信息badTimer()}if raceenabled {raceacquire(unsafe.Pointer(t))}f(arg, seq) // 执行预设的 func ,也就是 sendTime 也就是将 now 写入 Timer.C 或 Ticker.C 触发事件lock(tb.lock) // 加锁,防止多次触发分两次加锁解锁是为了防止执行 sendTime 的时候太费时间导致协程无法进行其他操作}if delta 0 || faketime 0 { // 小顶堆中没有元素了系统协程需要暂停// No timers left - put goroutine to sleep.tb.rescheduling true // 设置暂停标志goparkunlock(tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1)continue}// At least one timer pending. Sleep until then.tb.sleeping true // 没有待触发的事件设置睡眠标志tb.sleepUntil now delta // 计算系统协程睡眠时间noteclear(tb.waitnote) // 清除系统协程的唤醒通知unlock(tb.lock) // 解锁notetsleepg(tb.waitnote, delta) // 系统协程睡眠没有待触发的事件}
}唤醒系统协程(睡眠) 当有定时器需要触发的时候会唤醒系统协程触发事件
func notewakeup(n *note) {var v uintptrfor { // 加锁乐观锁自旋v atomic.Loaduintptr(n.key)if atomic.Casuintptr(n.key, v, locked) {break}}// Successfully set waitm to locked.// What was it before?switch {case v 0:// Nothing was waiting. Done.case v locked:// Two notewakeups! Not allowed.throw(notewakeup - double wakeup)default:// Must be the waiting m. Wake it up.semawakeup((*m)(unsafe.Pointer(v))) // 唤醒唤醒后进入 timerproc 的 for 循环再次触发事件 当小顶堆没有触发的定时器时协程会在for最后一步sleep,唤醒后继续执行}
}唤醒唤醒后进入 timerproc 的 for 循环再次触发事件 当小顶堆没有触发的定时器时协程会在for最后一步sleep,唤醒后继续执行. 唤醒系统协程(暂停) 当小顶堆中没有元素系统协程会进入暂停状态等待addTimer
func goready(gp *g, traceskip int) {systemstack(func() {ready(gp, traceskip, true)})
}睡眠Sleeptime.Sleep(duration) 函数会使当前的 goroutine 暂停指定的时间。 在这段时间内goroutine 不会执行任何操作也不会消耗 CPU 资源。 这个函数通常用于模拟 I/O 操作或者在测试中插入人为的延迟。 暂停Yieldruntime.Gosched() 函数会使当前的 goroutine 让出 CPU让其他 goroutine 有机会执行。 这个函数通常用于在一个长时间运行的 goroutine 中插入一些 “断点”以避免阻塞其他 goroutine 的执行。 需要注意的是runtime.Gosched() 并不保证当前 goroutine 会立即停止执行也不保证其他 goroutine 会立即开始执行。 8.2.2 删除定时器
当Timer执行结束或Ticker调用Stop时会触发定时器的删除操作。从timersBucket中删除定时器是添加定时器的你过程即堆中元素删除后触发小顶堆平衡。 不管是Timer还是Ticker的删除操作最终都会执行runtime中的 stopTimer
//go:linkname stopTimer time.stopTimer
func stopTimer(t *timer) bool {return deltimer(t)
}接下来看看deltimer的逻辑
// Do not need to update the timerproc: if it wakes up early, no big deal.
func deltimer(t *timer) bool {if t.tb nil { // 用户自己创建的 Timer 或者 Ticker 是不可用的// t.tb can be nil if the user created a timer// directly, without invoking startTimer e.g// time.Ticker{C: c}// In this case, return early without any deletion.// See Issue 21874.return false}tb : t.tblock(tb.lock) // 加锁removed, ok : tb.deltimerLocked(t) // 移除元素unlock(tb.lock) // 解锁if !ok {badTimer()}return removed
}得益于在 timerproc中加了两次锁删除小顶堆元素不需要马上通知timerproc。 因为在计算时间的时候是加锁的中间执行是不加锁的后面设置系统协程状态也是加锁的。 所以删除元素要么发生在系统协程睡眠或暂停的时候要么发生在 sendTime 的时候不管那种都不会影响触发的正确性。 因为可能删除小顶堆中任意位置的元素所以需要从该节点出发向上和向下平衡
8.3 资源泄露
对于不使用的Ticker需要显示的Stop否则会产生资源泄露问题。
首先创建Ticker 的协程并不负责计时只负责从Ticker的管道中获取事件其次系统写成只负责定时器计时向管道中发送事件并不关心上层协程如何处理事件。
如果创建了Ticker则系统协程将持续监控该Ticker的timer定期触发事件。如果Ticker不在使用且没有Stop那么系统协程的负担会越来越重持续消耗CPU资源。
9. 性能优化 (go 1.14)
上面的runtimeTimer的原理仅适用于Go 1.10 ~ 1.13 尽管定时器的性能已经能满足绝大多数场景但在一些高度依赖定时器的业务场景中 往往需要创建海量的定时器这些场景中需要定时器能更精确、占用系统资源更少。 Go 1.14 中对定时器又做了一次大的性能优化主要围绕如何管理runtimeTimer进行包含如何存储runtimeTimer如何检测以确保定时器能准时触发。
9.1 消除了timersBucket
在前面的版本中NewTimer和NewTicker创建的runtimeTimer会存储到全局的timersBucket桶中最多拥有64个timersBucket桶如果GOMAXPROCS的值不超过64timersBucket桶的数量等于GOMAXPROCS。 每个timersBucket桶中均包含一个堆用于保存runtimerTimer此外每个timersBucket桶对应一个专门的协程(timerproc)来监控runtimeTimer
在Go 1.14中取消了timersBucket桶直接把保存runtimeTimer的堆放到了处理器P中。 在处理器P的数据结构中除了包含协程的队列还直接包含了runtimeTimer。
消除timersBucket桶的同时也不再需要timerproc来监控定时器了。
9.2 消除了timeproc
在Go 1.14之前的版本中,timerproc实际上专门监控定时器的协程执行体。 在Go 1.14的设计中取消了timerproc,因为runtimeTimer不在存储在timerBucket桶中而是转移到每个处理器P中。
Go 1.14 中做的优化主要是为了取消timerproc不在依赖timerproc来监控定时器而是希望提供一种更搞笑的监控方式。
9.3 更少的锁竞争
在Go 1.14 之前runtimeTimer存储在timersBucket桶中runtimeTimer的添加、删除均需要加锁。 在处理器P的数据结构中仍然有一把锁(timersLock)来限制timers的并发访问。实际上在Go 1.14 中runtimer的添加、删除也需要加锁。 当协程发生系统调用时当前的工作线程将释放持有的处理器P当前工作线程专注于处理系统调用被阻塞然后启动一个新的工作线程来继续消费当前处理器P中的协程。当新的工作线程启动时需要寻找空闲的处理器P这是需要加锁的。 当程序中拥有大量的定时器时在Go 1.14之前每个timerproc处理完一个定时器都会休眠即触发系统调用从而释放处理器P启动新的工作线程多个新的工作线程在获取空闲处理器P时会争抢互斥锁。 从Go 1.14开始处理器不在由timerproc处理而是在每次协程调度时检查定时器是否需要触发在协程调度时捎带检查定时器。 相较于之前的timerproc定时器被关注的更加频繁而且不会因为协程触发系统调用而产生新的工作线程所以定时器触发的会更准时。
9.4 更少的上下文切换
在Go 1.14 之前timerproc也是夹杂在系统其他的协程中被调度的假设将timerproc标记为GT
由于timerproc夹杂在其他的协程中当协程较多时难以保证timerproc能被及时调度。假如程序中每1微妙就需要触发一个定时器而timerproc每2微妙才被调度一次那么定时器将产生1微妙的误差从而不准时了。 在Go 1.14 中由于每次调度协程时都会检查处理器所以当有定时器需要触发时先处理定时器在调度协程相当于每个协程都兼任了之前的timerproc的工作但不会触发系统调用。
由于在设计上取消了timerproc也避免了频繁的调度timerproc时产生的上下文切换从而在一定程度上节省了系统资源。
9.5 优化效果
在github上有优化代码的性能测试结果 https://github.com/golang/go/commit/6becb033341602f2df9d7c55cc23e64b925bbee2
可以看到在多个涉及定时器的场景中性能均有了较大程度的优化。 相关的好文 https://studygolang.com/articles/26529 https://github.com/golang/go/commit/76f4fd8a5251b4f63ea14a3c1e2fe2e78eb74f81 https://xiaorui.cc/archives/6483 https://www.pengrl.com/p/20021/