个人做企业 网站,佛山附近做网站的公司,seo搜论坛,宁波网站建设推广Kombu 是一个用于 Python 的消息队列库#xff0c;提供了高效、灵活的消息传递机制。它是 Celery 的核心组件之一#xff0c;但也可以单独使用。Kombu 支持多种消息代理#xff08;如 RabbitMQ、Redis、Amazon SQS 等#xff09;#xff0c;并提供了消息生产者和消费者的功…Kombu 是一个用于 Python 的消息队列库提供了高效、灵活的消息传递机制。它是 Celery 的核心组件之一但也可以单独使用。Kombu 支持多种消息代理如 RabbitMQ、Redis、Amazon SQS 等并提供了消息生产者和消费者的功能。安装命令 pip install kombu redis。
一.主要功能
1.消息队列
提供可靠的消息传递和队列机制允许将消息从生产者发送到消费者。
2.消息代理支持
支持多种消息代理如 RabbitMQ、Redis、Amazon SQS、MongoDB 等。
3.异步任务
可以用来实现异步任务处理配合 Celery 使用时可以构建分布式任务队列。
4.消息格式
支持多种消息格式包括 JSON、YAML、pickle 等。
5.路由和交换
提供了高级的消息路由和交换功能可以实现复杂的消息分发逻辑。
二.基本使用
1. 创建消息生产者
生产者负责向消息队列发送消息。
1Redis 消息代理
from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URLRedis
broker_url redis://localhost:6379/0# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange Exchange(my_exchange, typedirect)queue Queue(my_queue, exchange, routing_keymy_key)# 创建生产者with Producer(conn) as producer:# 发送消息producer.publish({key: value},exchangeexchange,routing_keymy_key,serializerjson)print(Message sent.)2RabbitMQ 消息代理
from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL
broker_url amqp://guest:guestlocalhost//# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange Exchange(my_exchange, typedirect)queue Queue(my_queue, exchange, routing_keymy_key)# 创建生产者with Producer(conn) as producer:# 发送消息producer.publish({key: value},exchangeexchange,routing_keymy_key,serializerjson)print(Message sent.)2. 创建消息消费者
消费者从消息队列中接收和处理消息。
1Redis 消息代理
from kombu import Connection, Exchange, Queue, Consumer# 设置消息代理的连接URLRedis
broker_url redis://localhost:6379/0def callback(body, message):print(fReceived message: {body})message.ack() # 确认消息已处理# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange Exchange(my_exchange, typedirect)queue Queue(my_queue, exchange, routing_keymy_key)# 创建消费者with Consumer(conn, [queue], callbackcallback) as consumer:print(Waiting for messages...)# 运行消费者等待消息while True:conn.drain_events()2RabbitMQ 消息代理
from kombu import Connection, Exchange, Queue, Consumer# 设置消息代理的连接URL
broker_url amqp://guest:guestlocalhost//def callback(body, message):print(fReceived message: {body})message.ack() # 确认消息已处理# 创建连接
with Connection(broker_url) as conn:# 创建交换机和队列exchange Exchange(my_exchange, typedirect)queue Queue(my_queue, exchange, routing_keymy_key)# 创建消费者with Consumer(conn, [queue], callbackcallback) as consumer:print(Waiting for messages...)# 运行消费者等待消息while True:conn.drain_events()3. 高级用法消息路由
Kombu 支持复杂的消息路由配置以下示例展示了如何使用路由功能将消息发送到不同的队列。
1Redis 消息代理
from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URLRedis
broker_url redis://localhost:6379/0# 创建交换机和队列
exchange Exchange(my_exchange, typedirect)
queue1 Queue(queue1, exchange, routing_keykey1)
queue2 Queue(queue2, exchange, routing_keykey2)def route_message(message):if message[type] type1:return key1return key2# 创建连接
with Connection(broker_url) as conn:with Producer(conn) as producer:# 发送消息producer.publish({type: type1, data: value1},exchangeexchange,routing_keyroute_message({type: type1}),serializerjson)print(Message routed and sent.)2RabbitMQ 消息代理
from kombu import Connection, Exchange, Producer, Queue# 设置消息代理的连接URL
broker_url amqp://guest:guestlocalhost//# 创建交换机和队列
exchange Exchange(my_exchange, typedirect)
queue1 Queue(queue1, exchange, routing_keykey1)
queue2 Queue(queue2, exchange, routing_keykey2)def route_message(message):if message[type] type1:return key1return key2# 创建连接
with Connection(broker_url) as conn:with Producer(conn) as producer:# 发送消息producer.publish({type: type1, data: value1},exchangeexchange,routing_keyroute_message({type: type1}),serializerjson)print(Message routed and sent.)4. 结合 Celery 使用
Kombu 通常与 Celery 一起使用来处理异步任务。简单理解Kombu 是 Celery 的依赖库Celery 需要 Kombu 来访问消息队列系统。同时 Celery 扩展了 Kombu 的功能提供了一个高级的任务队列系统。Celery 使用 Kombu 来处理与消息代理之间的连接、消息发送、消息接收等操作。
1Redis 消息代理
from celery import Celery# 配置 Celery 使用 Redis 作为消息代理通过 Kombu 处理
app Celery(tasks, brokerredis://localhost:6379/0)app.task
def add(x, y):return x y在 Dify 中默认消息代理使用 Redis如下所示 2RabbitMQ 消息代理
from celery import Celery# 配置 Celery 使用 RabbitMQ 作为消息代理通过 Kombu 处理
app Celery(tasks, brokeramqp://guest:guestlocalhost//)app.task
def add(x, y):return x yKombu 是一个强大的消息传递库提供了多种消息代理的支持并能实现复杂的消息队列和路由功能。它支持多种消息格式和高级功能如交换机、队列、路由等。基础用法 包括创建生产者和消费者通过消息代理发送和接收消息。高级用法 包括消息路由、与 Celery 集成等用于构建分布式系统和异步任务处理。
参考文献
[1] https://github.com/celery/kombu
[2] https://docs.celeryq.dev/projects/kombu/en/stable/
[3] 消息队列 Kombu 之 基本架构https://www.cnblogs.com/rossiXYZ/p/14454761.html
[4] Kombu 库用法详解连接、连接池、生产者、消费者https://blog.csdn.net/weixin_44799217/article/details/128490325
NLP工程化(星球号)