网站模块数据同步,中国万网商城,网站主页设计模板图片,王烨涛本文主要从提供SSE方式接入DeepSeek#xff0c;并通过fastapi websocket对外提供接入方法。 参考文档#xff1a; 腾讯云大模型#xff1a;https://cloud.tencent.com/document/product/1759/109380 fastAPI官网#xff1a;https://fastapi.tiangolo.com/ WebSocketManager… 本文主要从提供SSE方式接入DeepSeek并通过fastapi websocket对外提供接入方法。 参考文档 腾讯云大模型https://cloud.tencent.com/document/product/1759/109380 fastAPI官网https://fastapi.tiangolo.com/ WebSocketManager 提供WebsocketManager对websocket连接实例进行管理代码片段如下 from fastapi import WebSocketclass WsManager:websocket managerconnectors: dict[str, WebSocket] {}staticmethoddef get_connector(connector_id: str) - WebSocket | None:获取连接实例:param connector_id::return:connector WsManager.connectors.get(connector_id)return connectorstaticmethoddef add_connector(connector_id: str, connector: WebSocket):添加websocket客户端:param connector_id: 客户端ID:param connector: 客户端连接实例:return:WsManager.connectors[connector_id] connectorstaticmethoddef remove_connector(connector_id: str):移除websocket连接:param connector_id: 连接ID:return:try:del WsManager.connectors[connector_id]except Exception:passstaticmethodasync def send_message(connector_id: str, message: str):connector WsManager.connectors.get(connector_id)if connector is None:returntry:await connector.send_text(message)except Exception as e:print(消息发送失败)staticmethodasync def send_message_json(connector_id: str, message: dict):connector WsManager.connectors.get(connector_id)if connector is None:returntry:await connector.send_json(message)except Exception as e:print(消息发送失败)接入DeepSeek
import uuid
import os
import requests
import jsonfrom src.core.WsManager import WsManager
from .session import get_sessionTCLOUD_URL lke.tencentcloudapi.com
TCLOUD_DeepSeek_SSE_URL https://wss.lke.cloud.tencent.com/v1/qbot/chat/sseSECRET_ID XXXXXXXXXXXXXXXXXXXX
SECRET_KEY XXXXXXXXXXXXXXXXXXXXX
REGION ap-guangzhou
TYPE 5def get_session():生成一个 UUID:return session id 字符串new_uuid uuid.uuid4()# 将 UUID 转换为字符串session_id str(new_uuid)return session_iddef _resolve_message(message: str, prev_message: str):处理消息:param message::param prev_message::return:if prev_message :return Noneif prev_message.startswith(event:):# 生成消息message message.replace(data:, ).strip()try:message_json json.loads(message)message_type message_json.get(type)payload message_json.get(payload)if message_type token_stat:# token统计事件return Noneelif message_type thought:# 思考事件return Noneelif message_type error:# 错误事件return Noneelif message_type reference:# 参考来源事件return Nonecontent payload.get(content)record_id payload.get(record_id)request_id payload.get(request_id)session_id payload.get(session_id)trace_id payload.get(trace_id)message_id payload.get(message_id)return {type: message_type,data: {content: content,record_id: record_id,request_id: request_id,session_id: session_id,trace_id: trace_id,message_id: message_id,}}except Exception as e:print(e)else:return Noneasync def get_q_a_result(visitor_id: str, question: str):获取问答结果:param visitor_id: 访客ID:param question: 问题内容:return: 回答内容session_id get_session()req_data {content: f{question},bot_app_key: XXXXXXXX, # 可以获取方式见下图visitor_biz_id: visitor_id,session_id: session_id,streaming_throttle: 100}res requests.post(TCLOUD_DeepSeek_SSE_URL, datajson.dumps(req_data),streamTrue, headers{Accept: text/event-stream})prev_message: str # 上一条消息if res.status_code 200:for line in res.iter_lines():if line:data line.decode(utf-8)message _resolve_message(data, prev_message)if message:await WsManager.send_message_json(visitor_id, message)prev_message dataelse:print(Failed to get data. Status code: {response.status_code})# 关闭websocket连接connector WsManager.get_connector(visitor_id)if connector:await connector.close()WsManager.remove_connector(visitor_id)
进入控制台 2. 创建应用并点击调用
通过fastapi 以websocket方式对外提供接口 这个接口是前台连接websocket后后台直接根据连接参数开始调用问答问答结束后自动关闭websocket连接 import asynciofrom fastapi import APIRouter, WebSocket, WebSocketDisconnect, BackgroundTasks
from fastapi.responses import JSONResponsefrom src.core.WsManager import WsManager
from src.utils.tcloudLLM import get_example_recommend
from src.utils.session import get_visitor_idrouter APIRouter(prefix/api/tcloud, tags[腾信云大模型接口])router.websocket(/ws/{compound_name})
async def get_exercises_ws(compound_name: str, websocket: WebSocket, background_tasks: BackgroundTasks):await websocket.accept()visitor_id get_visitor_id()WsManager.add_connector(visitor_id, websocket)# 本来使用BackgroundTask去做后台任务结果不知道什么原因调用不起来然后换成asyncio处理asyncio.create_task(get_example_recommend(visitor_id, compound_name)) # 创建对应问答任务try:while True:data await websocket.receive_text()await websocket.send_text(fMessage text was: {data})except WebSocketDisconnect:WsManager.remove_connector(visitor_id)except Exception:WsManager.remove_connector(visitor_id)如果需要进行多轮问答提供为多次问答设置相关的 request_id 参数进行消息串联。