辽阳市建设行业培训中心网站,做网站收费,建设农产品网络营销网站,国外平面设计欣赏网站大概流程就是job - JobQueue调度器循环获取JobQueue #xff0c;获取到的job #xff0c;再去异步获取等待可用的 worker#xff0c;取出 chan Job,将job 写入改worker的 chan Jobworker 处理任务#xff0c;先处理 case job : -w.JobChannel: 处理完成后再将 chan…大概流程就是job - JobQueue调度器循环获取JobQueue 获取到的job 再去异步获取等待可用的 worker取出 chan Job,将job 写入改worker的 chan Jobworker 处理任务先处理 case job : -w.JobChannel: 处理完成后再将 chan Job 写入到worker 里面等待调度去取调用
package mainimport (logosstrconvsynctime
)var (MaxWorker intMaxQueue intJobQueue chan Job
)func init() {var err errorMaxWorker, err strconv.Atoi(os.Getenv(MAX_WORKERS))if err ! nil {MaxWorker 5 // 默认值}MaxQueue, err strconv.Atoi(os.Getenv(MAX_QUEUE))if err ! nil {MaxQueue 10 // 默认值}JobQueue make(chan Job, MaxQueue)
}type Payload struct {// Payload的属性
}func (p *Payload) UploadToS3() error {// 模拟上传操作log.Println(Uploading to S3)return nil
}type Job struct {Payload Payload
}type Worker struct {WorkerPool chan chan JobJobChannel chan Jobquit chan bool
}func NewWorker(workerPool chan chan Job) Worker {return Worker{WorkerPool: workerPool,JobChannel: make(chan Job),quit: make(chan bool)}
}func (w Worker) Start() {go func() {for {w.WorkerPool - w.JobChannelselect {case job : -w.JobChannel:if err : job.Payload.UploadToS3(); err ! nil {log.Printf(Error uploading to S3: %s, err)}case -w.quit:return}}}()
}func (w *Worker) Stop() {go func() {w.quit - true // 通知工作线程停止}()
}type Dispatcher struct {WorkerPool chan chan JobmaxWorkers intworkers []Worker // 新增用于跟踪所有工作线程quit chan bool // 用于停止dispatch循环
}func NewDispatcher(maxWorkers int) *Dispatcher {return Dispatcher{WorkerPool: make(chan chan Job, maxWorkers),maxWorkers: maxWorkers,workers: make([]Worker, 0, maxWorkers),}
}func (d *Dispatcher) Runs() {for i : 0; i d.maxWorkers; i {worker : NewWorker(d.WorkerPool)d.workers append(d.workers, worker) // 跟踪新创建的工作线程worker.Start()}go d.dispatch()
}func (d *Dispatcher) dispatch() {for {select {// 从JobQueue中获取一个jobcase job : -JobQueue:go func(job Job) {// 尝试获取一个可用的worker job channel阻塞直到有可用的workerjobChannel : -d.WorkerPool// 分发job到worker job channel中jobChannel - job}(job)case -d.quit:// 退出return}}
}func (d *Dispatcher) StopAllWorkers() {var wg sync.WaitGroupfor _, worker : range d.workers {wg.Add(1)go func(w Worker) {w.Stop() // 停止工作线程wg.Done()}(worker)}wg.Wait() // 等待所有工作线程安全退出
}func (d *Dispatcher) Stop() {d.quit - trued.StopAllWorkers()
}func main() {dispatcher : NewDispatcher(MaxWorker)dispatcher.Runs()// 模拟作业提交for i : 0; i 20; i {payload : Payload{ /* ... */ }job : Job{Payload: payload}JobQueue - job}// 等待一段时间以便可以看到工作的完成time.Sleep(10 * time.Second)
}