开个做网站公司,怎么清空wordpress媒体库,广告公司 网站模板,asp网站代码 部分封装文章目录 一.Asynq介绍二.所需工具三.代码示例四.Reference 一.Asynq介绍
Asynq 是一个 Go 库#xff0c;一个高效的分布式任务队列。
Asynq 工作原理#xff1a;
客户端#xff08;生产者#xff09;将任务放入队列服务器#xff08;消费者#xff09;从队列中拉出任… 文章目录 一.Asynq介绍二.所需工具三.代码示例四.Reference 一.Asynq介绍
Asynq 是一个 Go 库一个高效的分布式任务队列。
Asynq 工作原理
客户端生产者将任务放入队列服务器消费者从队列中拉出任务并为每个任务启动一个工作 goroutine多个工作人员同时处理任务 git库https://github.com/hibiken/asynq 二.所需工具
Asynq 使用 Redis 作为消息代理。client 和 server 都需要连接到 Redis 进行写入和读取。
PS请确保所使用redis 5.0
三.代码示例
以记录操作的中间件函数向数据库写数据的情景为例。
生产者客户端函数调用入口
其中 map 为需向数据库写入的内容
client.Call(audit:opera, map[string]any{uri: uri,method: method,params: string(paramsByte),headers: string(headerByte),code: codeInt,model: model,action: action,user_id: userId,company_id: companyId,user_name: userName,company: companyName,
})生产者函数
func Call(t string, payload map[string]any) error {// redis连接client : asynq.NewClient(asynq.RedisClientOpt{Addr: 127.0.0.1:6379,Password: ,DB: 1,})defer client.Close()switch t {case audit:opera:// 初始化新任务task, err : server.NewOperateSendTask(payload)if err ! nil {return err}// 任务入队_, err client.Enqueue(task, asynq.Queue(audit))if err ! nil {log.Err(err).Msg(fmt.Sprintf(task: %v\n, task))return err}}return nil
}func NewOperateSendTask(data map[string]any) (*asynq.Task, error) {payload, err : json.Marshal(data)if err ! nil {return nil, err}return asynq.NewTask(consts.TypeAuditOpera, payload), nil
}
消费者函数
func HandlerAuditOperateTask(ctx context.Context, t *asynq.Task) error {var record ent.OperateRecord// 队列中取任务err : json.Unmarshal(t.Payload(), record)if err ! nil {log.Err(err).Msg(task.json.Unmarshal)return err}// 真正的数据库操作err dao.OperateRecord.CreateOperateRecord(record)if err ! nil {log.Err(err).Msg(task.dao.OperateRecord.CreateOperateRecord)return err}return nil
}asynq初始化消费者启动入口项目初始化时自动启动
func InitAsynq(ip string, port int, passwd string) {addr : fmt.Sprintf(%s:%d, ip, port)srv : asynq.NewServer(asynq.RedisClientOpt{Addr: 127.0.0.1:6379,Password: ,DB: 1,},// 异步队列asynq.Config{Queues: map[string]int{audit: 3,},},)mux : asynq.NewServeMux()// 启动消费者mux.HandleFunc(audit:opera, server.HandlerAuditOperateTask)go srv.Run(mux)}
四.Reference
Go异步任务解决方案之Asynq库详解 https://www.jb51.net/article/275392.htm