网站优化心得,微信公众平台小程序管理在哪里,黄山做网站公司,建站快车代理商windows首先安装rabbitmq 点击参考安装
1、环境介绍
Python 3.10.16
其他通过pip安装的版本(Django、pika、celery这几个必须要有最好版本一致)
amqp 5.3.1
asgiref 3.8.1
async-timeout 5.0.1
billiard 4.2.1
celery 5.4.0
…windows首先安装rabbitmq 点击参考安装
1、环境介绍
Python 3.10.16
其他通过pip安装的版本(Django、pika、celery这几个必须要有最好版本一致)
amqp 5.3.1
asgiref 3.8.1
async-timeout 5.0.1
billiard 4.2.1
celery 5.4.0
click 8.1.7
click-didyoumean 0.3.1
click-plugins 1.1.1
click-repl 0.3.0
colorama 0.4.6
Django 4.2
dnspython 2.7.0
eventlet 0.38.2
greenlet 3.1.1
kombu 5.4.2
pika 1.3.2
pip 24.2
prompt_toolkit 3.0.48
python-dateutil 2.9.0.post0
redis 5.2.1
setuptools 75.1.0
six 1.17.0
sqlparse 0.5.3
typing_extensions 4.12.2
tzdata 2024.2
vine 5.1.0
wcwidth 0.2.13
wheel 0.44.02、创建Django 项目
django-admin startproject django_rabbitmq3、在setting最下边写上
# settings.py guest:guest 表示的是你安装好的rabbitmq的登录账号和密码
BROKER_URL amqp://guest:guestlocalhost:15672/
CELERY_RESULT_BACKEND rpc://4.1 简单模式
4.1.1 在和setting同级的目录下创建一个叫consumer.py的消费者文件其内容如下
import pikadef callback(ch, method, properties, body):print(f[x] Received {body.decode()})def start_consuming():# 创建与RabbitMQ的连接connection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()# 声明一个队列channel.queue_declare(queuehello)# 指定回调函数channel.basic_consume(queuehello, on_message_callbackcallback, auto_ackTrue)print([*] Waiting for messages. To exit press CTRLC)channel.start_consuming()if __name__ __main__:start_consuming()4.1.2 在和setting同级的目录下创建一个叫producer.py的生产者文件其内容如下
import pikadef publish_message():# message request.GET.get(msg)# 创建与RabbitMQ的连接connection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()# 声明一个队列channel.queue_declare(queuehello)# 发布消息message Hello World!channel.basic_publish(exchange, routing_keyhello, bodymessage)print(f[x] Sent {message})# 关闭连接connection.close()if __name__ __main__:publish_message()4.1.3 先运行消费者代码(consumer.py)再运行生产者代码(producer.py)
先python consumer.py
再 python producer.py4.1.4 运行结果如下 4.2 消息持久化模式
4.2.1 在和setting同级的目录下创建一个叫recv_msg_safe.py的消费者文件其内容如下
import time
import pikadef callback(ch, method, properties, body):print( [x] Received %r % body)time.sleep(20)print( [x] Done)# 下边这个就是标记消费完成了下次在启动接受消息就不用从头开始了即# 手动确认消息消费完成 和auto_ackFalse 搭配使用ch.basic_ack(delivery_tagmethod.delivery_tag) # method.delivery_tag就是一个标识符方便找对人def start_consuming():# 创建与RabbitMQ的连接connection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()# 声明一个队列channel.queue_declare(queuehello2, durableTrue) # 若声明过则换一个名字# 指定回调函数channel.basic_consume(queuehello2,on_message_callbackcallback,# auto_ackTrue # 为true则不能持久话消息即消费者关闭后下次收不到之前未收取的消息auto_ackFalse # 为False则下次依然从头开始收取消息直到callback函数调用完成)print([*] Waiting for messages. To exit press CTRLC)channel.start_consuming()if __name__ __main__:start_consuming()
4.2.2 在和setting同级的目录下创建一个叫send_msg_safe.py的生产者文件其内容如下
import pikadef publish_message():# 创建与RabbitMQ的连接connection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))channel connection.channel()# 声明一个队列 durableTrue队列持久化channel.queue_declare(queuehello2, durableTrue)channel.basic_publish(exchange,routing_keyhello2,bodyHello World!,# 消息持久话用主要用作宕机的时候估计是写入本地硬盘了propertiespika.BasicProperties(delivery_mode2, # make message persistent))# 关闭连接connection.close()if __name__ __main__:publish_message()
4.2.3 先运行消费者代码(recv_msg_safe.py)再运行生产者代码(send_msg_safe.py) 执行结果如下 4.3 广播模式
4.3.1 在和setting同级的目录下创建一个叫fanout_receive.py的消费者文件其内容如下
# 广播模式
import pika# credentials pika.PlainCredentials(guest, guest)
# connection pika.BlockingConnection(pika.ConnectionParameters(
# hostlocalhost, credentialscredentials))
# 在setting中如果不配置BROKER_URL和CELERY_RESULT_BACKEND的情况下请使用上边的代码
connection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))
channel connection.channel()
channel.exchange_declare(exchangelogs, exchange_typefanout) # 指定发送类型
# 必须能过queue来收消息
result channel.queue_declare(, exclusiveTrue) # 不指定queue名字,rabbit会随机分配一个名字,exclusiveTrue会在使用此queue的消费者断开后,自动将queue删除
queue_name result.method.queue
channel.queue_bind(exchangelogs, queuequeue_name) # 随机生成的Q绑定到exchange上面。
print( [*] Waiting for logs. To exit press CTRLC)def callback(ch, method, properties, body):print( [x] %r % body)channel.basic_consume(on_message_callbackcallback, queuequeue_name, auto_ackTrue)
channel.start_consuming()
4.3.2 在和setting同级的目录下创建一个叫fanout_send.py的生产者文件其内容如下
# 通过广播发消息
import pika
import sys# credentials pika.PlainCredentials(guest, guest)
# connection pika.BlockingConnection(pika.ConnectionParameters(
# hostlocalhost, credentialscredentials))
# 在setting中如果不配置BROKER_URL和CELERY_RESULT_BACKEND的情况下请使用上边的代码
connection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost))
channel connection.channel()
channel.exchange_declare(exchangelogs, exchange_typefanout) #发送消息类型为fanout,就是给所有人发消息# 如果等于空就输出hello world!
message .join(sys.argv[1:]) or info: Hello World!channel.basic_publish(exchangelogs,routing_key, # routing_key 转发到那个队列因为是广播所以不用写了bodymessage)print( [x] Sent %r % message)
connection.close()4.3.3 先运行消费者代码(fanout_receive.py)再运行生产者代码(fanout_send.py) 执行结果如下 4.4 组播模式
4.4.1 在和setting同级的目录下创建一个叫direct_recv.py的消费者文件其内容如下
import pika
import syscredentials pika.PlainCredentials(guest, guest)
connection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost, credentialscredentials))channel connection.channel() channel.exchange_declare(exchangedirect_logs, exchange_typedirect)
result channel.queue_declare(, exclusiveTrue)
queue_name result.method.queueseverities sys.argv[1:] # 接收那些消息指info还是空没写就报错
if not severities:sys.stderr.write(Usage: %s [info] [warning] [error]\n % sys.argv[0]) # 定义了三种接收消息方式info,warning,errorsys.exit(1)for severity in severities: # [error info warning]循环severitieschannel.queue_bind(exchangedirect_logs,queuequeue_name,routing_keyseverity) # 循环绑定关键字
print( [*] Waiting for logs. To exit press CTRLC)def callback(ch, method, properties, body):print( [x] %r:%r % (method.routing_key, body))channel.basic_consume(on_message_callbackcallback, queuequeue_name,)
channel.start_consuming()4.4.2 在和setting同级的目录下创建一个叫direct_send.py的生产者文件其内容如下
# 组播
import pika
import syscredentials pika.PlainCredentials(guest, guest)
connection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost, credentialscredentials))channel connection.channel()channel.exchange_declare(exchangedirect_logs,exchange_typedirect) #指定类型severity sys.argv[1] if len(sys.argv) 1 else info #严重程序级别判定条件到底是info还是空后面接消息message .join(sys.argv[2:]) or Hello World! #消息channel.basic_publish(exchangedirect_logs,routing_keyseverity, #绑定的是error 指定关键字哪些队列绑定了这个级别那些队列就可以收到这个消息bodymessage)print( [x] Sent %r:%r % (severity, message))
connection.close()4.4.3 先运行消费者代码(direct_recv.py)再运行生产者代码(direct_send.py) 执行结果如下 4.5 更细致的topic模式
4.5.1 在和setting同级的目录下创建一个叫topic_recv.py的消费者文件其内容如下
import pika
import syscredentials pika.PlainCredentials(guest, guest)
connection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost, credentialscredentials))channel connection.channel()
channel.exchange_declare(exchangetopic_logs,exchange_typetopic)result channel.queue_declare(, exclusiveTrue)
queue_name result.method.queuebinding_keys sys.argv[1:]
if not binding_keys:print(sys.argv[0], sys.argv[0])sys.stderr.write(Usage: %s [binding_key]...\n % sys.argv[0])sys.exit(1)for binding_key in binding_keys:channel.queue_bind(exchangetopic_logs,queuequeue_name,routing_keybinding_key)print( [*] Waiting for logs. To exit press CTRLC)def callback(ch, method, properties, body):print( [x] %r:%r % (method.routing_key, body))channel.basic_consume(on_message_callbackcallback,queuequeue_name)channel.start_consuming()4.5.2 在和setting同级的目录下创建一个叫topic_send.py的生产者文件其内容如下
import pika
import syscredentials pika.PlainCredentials(guest, guest)
connection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost, credentialscredentials))channel connection.channel()channel.exchange_declare(exchangetopic_logs,exchange_typetopic) #指定类型routing_key sys.argv[1] if len(sys.argv) 1 else anonymous.infomessage .join(sys.argv[2:]) or Hello World! #消息channel.basic_publish(exchangetopic_logs,routing_keyrouting_key,bodymessage)
print( [x] Sent %r:%r % (routing_key, message))
connection.close()4.5.3 先运行消费者代码(topic_recv.py)再运行生产者代码(topic_send.py) 执行结果如下 4.6 Remote procedure call (RPC) 双向模式
4.6.1 在和setting同级的目录下创建一个叫rpc_client.py的消费者文件其内容如下
import pika
import uuid
import time# 斐波那契数列 前两个数相加依次排列
class FibonacciRpcClient(object):def __init__(self):# 赋值变量一个循环值self.response None# 链接远程# self.connection pika.BlockingConnection(pika.ConnectionParameters(# hostlocalhost))credentials pika.PlainCredentials(guest, guest)self.connection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost, credentialscredentials))self.channel self.connection.channel()# 生成随机queueresult self.channel.queue_declare(, exclusiveTrue)# 随机取queue名字发给消费端self.callback_queue result.method.queue# self.on_response 回调函数:只要收到消息就调用这个函数。# 声明收到消息后就 收queueself.callback_queue内的消息 准备接受命令结果self.channel.basic_consume(queueself.callback_queue,auto_ackTrue, on_message_callbackself.on_response)# 收到消息就调用# ch 管道内存对象地址# method 消息发给哪个queue# body数据对象def on_response(self, ch, method, props, body):# 判断本机生成的ID 与 生产端发过来的ID是否相等if self.corr_id props.correlation_id:# 将body值 赋值给self.responseself.response bodydef call(self, n):# 随机一次唯一的字符串self.corr_id str(uuid.uuid4())# routing_keyrpc_queue 发一个消息到rpc_queue内self.channel.basic_publish(exchange,routing_keyrpc_queue,propertiespika.BasicProperties(# 执行命令之后结果返回给self.callaback_queue这个队列中reply_toself.callback_queue,# 生成UUID 发送给消费端correlation_idself.corr_id,),# 发的消息必须传入字符串不能传数字bodystr(n))# 没有数据就循环收while self.response is None:# 非阻塞版的start_consuming()# 没有消息不阻塞 检查队列里有没有新消息但不会阻塞self.connection.process_data_events()print(no msg...)time.sleep(0.5)return int(self.response)# 实例化
fibonacci_rpc FibonacciRpcClient()response fibonacci_rpc.call(5)
print( [.] Got %r % response)4.6.2 在和setting同级的目录下创建一个叫rpc_server.py的生产者文件其内容如下
#_*_coding:utf-8_*_
import pika
import time
# 链接socket
credentials pika.PlainCredentials(guest, guest)
connection pika.BlockingConnection(pika.ConnectionParameters(hostlocalhost, credentialscredentials))channel connection.channel()# 生成rpc queue 在这里声明的所以先启动这个
channel.queue_declare(queuerpc_queue)# 斐波那契数列
def fib(n):if n 0:return 0elif n 1:return 1else:return fib(n-1) fib(n-2)# 收到消息就调用
# ch 管道内存对象地址
# method 消息发给哪个queue
# props 返回给消费的返回参数
# body数据对象
def on_request(ch, method, props, body):n int(body)print( [.] fib(%s) % n)# 调用斐波那契函数 传入结果response fib(n)ch.basic_publish(exchange,# 生产端随机生成的queuerouting_keyprops.reply_to,# 获取UUID唯一 字符串数值propertiespika.BasicProperties(correlation_idprops.correlation_id),# 消息返回给生产端bodystr(response))# 确保任务完成# ch.basic_ack(delivery_tag method.delivery_tag)# 每次只处理一个任务
# channel.basic_qos(prefetch_count1)
# rpc_queue收到消息:调用on_request回调函数
# queuerpc_queue从rpc内收
channel.basic_consume(queuerpc_queue,auto_ackTrue,on_message_callbackon_request)
print( [x] Awaiting RPC requests)
channel.start_consuming()4.6.3 先运行消费者代码(rpc_server.py)再运行生产者代码(rpc_client.py) 执行结果如下 参考实现1
参考实现2
参考实现3