Pydantic+LangChain构建高鲁棒AI后端的工程实践
2026/6/25 23:09:42 网站建设 项目流程

1. 项目概述:为什么用 Pydantic 和 LangChain 构建 ML 后端不是“炫技”,而是生存必需

你有没有遇到过这样的情况:前端传过来一个 JSON,字段名拼错了半个字母,后端直接抛出KeyError,整个 API 就挂了;或者用户在对话框里输入了一段带换行、含特殊符号的长文本,LLM 接口调用时突然报422 Unprocessable Entity,日志里只有一行模糊的validation error;又或者,你刚上线一个 RAG 应用,测试时一切正常,结果真实用户上传了一份 300 页的 PDF,系统在解析阶段就内存爆满,连错误堆栈都来不及打出来就 OOM 退出?这些不是边缘 case,而是每天都在真实生产环境里反复上演的“安静崩溃”。我过去三年带过 7 个 AI 工程化落地项目,其中 5 个在第一轮灰度发布时,80% 的线上告警都来自数据校验缺失——不是模型不准,是输入根本没进到模型里。

这正是 Pydantic 和 LangChain 组合的价值起点:它不解决“模型好不好”的问题,而是解决“系统稳不稳”的问题。Pydantic 是 Python 生态里最成熟的数据验证与序列化框架,它的核心不是“类型提示”,而是“契约式输入控制”——你定义的每一个BaseModel,本质上就是一份运行时强制执行的接口协议;LangChain 则提供了面向 LLM 应用的标准化抽象层,把 prompt 构造、chain 编排、tool 调用这些高耦合操作,变成可插拔、可测试、可监控的组件。二者结合,相当于给你的 ML 后端装上了两道硬性闸门:第一道(Pydantic)拦住所有格式错误、范围越界、空值滥用的非法请求;第二道(LangChain)把 LLM 这个“黑盒大脑”封装成受控的、有输入输出边界的“白盒服务单元”。关键词Artificial Intelligence在这里不是泛泛而谈的技术标签,而是特指那些需要与人类自然语言交互、依赖外部非结构化数据、且对响应鲁棒性要求极高的 AI 服务场景——比如客服对话引擎、智能文档摘要系统、代码辅助生成器。这类系统失败的成本,从来不是“结果不准”,而是“服务不可用”。所以这不是一个关于“怎么写更酷的 AI 代码”的教程,而是一份我在多个客户现场踩坑后总结出来的、保障 AI 服务能活过第一个月的工程实践手册。

2. 整体设计思路:为什么不是“加个 Pydantic 就完事”,而是重构整个数据流契约

2.1 传统 ML 后端的三个典型脆弱点

很多团队在构建 AI 后端时,习惯沿用传统 Web 开发的思维:用 Flask/FastAPI 写个路由,接收request.json(),直接塞给模型函数,再jsonify()返回。这种模式在 demo 阶段很轻快,但一旦进入真实环境,立刻暴露三大结构性缺陷:

  • 输入无契约request.json()返回的是一个裸dict,字段是否存在、类型是否匹配、值是否在合理范围内,全靠开发者在函数内部用if手动判断。我见过最典型的例子是,一个用于提取合同关键条款的 API,要求document_text字段为非空字符串,但开发人员只写了if 'document_text' in data:,结果用户传了个{"document_text": null},程序一路执行到模型 tokenize 阶段才报错,而此时日志里没有任何关于输入非法的明确记录。

  • LLM 调用无边界:直接调用llm.invoke(prompt),等于把模型当成一个没有输入/输出规范的“魔法盒子”。当 prompt 模板里需要插入用户输入时,如果用户输入包含{},可能直接破坏 Jinja 模板语法;如果输入长度超过模型上下文限制,错误往往发生在 tokenization 或 attention 计算阶段,堆栈信息晦涩难懂,根本无法定位是用户输入过长,还是系统预设的 prompt 太臃肿。

  • 链路不可观测:从接收到请求,到构造 prompt,到调用 LLM,再到解析响应,整个流程像一条黑管道。当响应质量下降时,你无法快速判断是原始输入质量差、prompt 设计不合理、还是 LLM 自身波动。没有中间状态的结构化记录,debug 只能靠猜。

2.2 Pydantic + LangChain 的分层防御设计哲学

我们采用的不是简单叠加,而是基于“责任分离”原则的三层契约设计:

  • 第一层:API 入口契约(Pydantic v2+)
    使用BaseModel定义严格的数据模型,不仅声明字段类型,更要嵌入业务语义约束。例如,对于一个问答 API,我们不会只写question: str,而是定义:

    class QuestionRequest(BaseModel): question: str = Field(..., min_length=2, max_length=2000, description="用户提出的问题,必须是中文,长度在2-2000字符之间") session_id: Optional[str] = Field(default=None, pattern=r'^[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}$', description="标准 UUIDv4 格式的会话ID,用于追踪上下文") timeout_seconds: float = Field(default=30.0, ge=5.0, le=120.0, description="最大等待时间,单位秒,必须在5-120之间")

    这里min_lengthmax_lengthpatternge/le不是装饰,而是运行时强制执行的守门员。FastAPI 会自动将请求体解析并验证,一旦失败,直接返回 422 状态码和清晰的错误详情(如{"question": ["String should have at least 2 characters"]}),根本不会让非法数据进入业务逻辑层。

  • 第二层:LLM 交互契约(LangChain 的 Runnable 接口)
    LangChain v0.1+ 引入了Runnable协议,它强制要求每个组件(RunnableLambda,RunnablePassthrough,RunnableParallel)都实现invoke(input)方法,并约定输入输出类型。我们不再写llm(prompt),而是构建一个RunnableSequence

    from langchain_core.runnables import RunnableSequence, RunnablePassthrough from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI # 1. 输入预处理:确保输入符合 LLM 要求 def validate_and_truncate_input(data: dict) -> dict: # 基于 Pydantic 模型做二次校验(如检查 document_text 是否含敏感词) # 并对超长文本进行智能截断(保留关键段落,非简单切片) return {"cleaned_text": truncate_and_clean(data["document_text"])} # 2. Prompt 构造:使用 ChatPromptTemplate,天然防注入 prompt = ChatPromptTemplate.from_messages([ ("system", "你是一个专业的法律文书分析助手。请严格按以下JSON格式输出:{{\"clause\": \"...\", \"reason\": \"...\"}}"), ("human", "请分析以下合同条款:{cleaned_text}") ]) # 3. LLM 调用:ChatOpenAI 实例本身就是一个 Runnable llm = ChatOpenAI(model="gpt-4-turbo", temperature=0.0) # 4. 响应解析:定义输出模型,强制结构化 class AnalysisOutput(BaseModel): clause: str = Field(description="提取出的关键条款原文") reason: str = Field(description="该条款被认定为关键的理由") parser = PydanticToolsParser(tools=[AnalysisOutput]) # 组装完整链路 chain = RunnableSequence( RunnablePassthrough.assign(cleaned_text=validate_and_truncate_input), prompt, llm, parser )

    这个chain就是一个强类型的、可测试的、有明确输入输出边界的“服务单元”。它的输入必须是dict,输出必须是AnalysisOutput实例。任何环节出错,都会在对应步骤抛出可捕获的异常,而不是在深处静默失败。

  • 第三层:可观测性契约(LangChain Callbacks + 自定义 Tracer)
    我们利用 LangChain 的CallbackManager注入自定义BaseCallbackHandler,在on_chain_starton_llm_endon_tool_end等钩子中,记录结构化事件:

    class AITracer(BaseCallbackHandler): def on_chain_start(self, serialized: dict, inputs: dict, **kwargs) -> None: # 记录链路开始时间、输入数据摘要(脱敏后)、session_id log_event("chain_start", { "chain_name": serialized.get("name", "unknown"), "input_hash": hash_dict(inputs), # 对输入做哈希,避免日志泄露敏感信息 "session_id": inputs.get("session_id") }) def on_llm_end(self, response: LLMResult, **kwargs) -> None: # 记录 LLM 调用耗时、token 使用量、原始响应摘要 log_event("llm_end", { "duration_ms": response.llm_output.get("token_usage", {}).get("total_tokens", 0), "response_preview": response.generations[0][0].text[:100] + "..." })

    这些事件被发送到统一日志中心(如 Loki)或指标系统(如 Prometheus),使得“一次请求的完整生命周期”可以被回溯、聚合、告警。当发现某类请求的平均延迟突增,我们可以直接下钻到llm_end事件,看是 token 用量暴涨,还是特定 prompt 模板导致响应变慢。

这个三层设计,本质上是把“AI 服务”从一个不可控的黑盒,重构为一个由清晰契约定义、各层职责分明、错误可定位、性能可度量的现代软件系统。它不增加功能,但极大提升了系统的韧性、可维护性和可演进性。

3. 核心细节解析:Pydantic 模型设计与 LangChain Chain 构建的实操要点

3.1 Pydantic 模型:不只是类型声明,更是业务规则的代码化表达

很多人把 Pydantic 当作“带校验的 dataclass”,这是巨大的认知偏差。在 AI 后端中,一个高质量的 Pydantic 模型,必须承载三重职责:数据结构定义、业务规则编码、错误友好提示。下面以一个真实的智能会议纪要生成服务为例,拆解其核心模型的设计逻辑。

3.1.1 输入模型:QuestionRequest 的深度设计
from pydantic import BaseModel, Field, validator, root_validator from typing import List, Optional, Dict, Any import re class SpeakerInfo(BaseModel): """发言人信息,用于后续角色识别""" name: str = Field(..., min_length=1, max_length=50) role: str = Field(..., pattern=r'^(attendee|chair|secretary|observer)$') class MeetingInput(BaseModel): """会议原始输入数据""" audio_url: str = Field(..., description="会议录音文件的可公开访问 URL") speakers: List[SpeakerInfo] = Field(..., min_items=1, max_items=20) meeting_topic: str = Field(..., min_length=5, max_length=200) @validator('audio_url') def validate_audio_url(cls, v): if not v.startswith(('http://', 'https://')): raise ValueError('audio_url must be a valid HTTP/HTTPS URL') if not v.lower().endswith(('.mp3', '.wav', '.m4a')): raise ValueError('audio_url must point to an audio file (mp3, wav, m4a)') return v @root_validator(pre=True) def check_speaker_names_unique(cls, values): speakers = values.get('speakers', []) names = [s.get('name') for s in speakers] if len(names) != len(set(names)): raise ValueError('All speaker names must be unique') return values class QuestionRequest(BaseModel): """最终的 API 请求体模型""" input: MeetingInput = Field(..., description="会议原始输入") summary_length: int = Field(default=300, ge=100, le=1000, description="期望摘要字数,100-1000之间") include_action_items: bool = Field(default=True, description="是否在摘要中包含待办事项") language: str = Field(default="zh", pattern=r'^[a-z]{2}$', description="输出语言代码,如 zh, en, ja") @validator('language') def validate_language_supported(cls, v): supported = {'zh', 'en', 'ja', 'ko', 'es'} if v not in supported: raise ValueError(f'Language {v} is not supported. Supported: {supported}') return v

这个模型的精妙之处在于:

  • 嵌套模型复用MeetingInputSpeakerInfo是独立的、可复用的模型。它们可以在其他 API(如“添加发言人”、“更新会议信息”)中被直接引用,保证了领域模型的一致性。这比在QuestionRequest里写一堆扁平字段要专业得多。

  • 多级校验策略@validator用于单字段校验(如 URL 格式、音频后缀),@root_validator用于跨字段校验(如发言人姓名唯一性)。pre=True表示在校验字段类型之前就执行,可以处理原始输入中的潜在问题。

  • 业务语义注入summary_lengthge=100, le=1000不是技术限制,而是产品需求——摘要太短失去意义,太长违背“纪要”本质。include_action_items的默认值True体现了产品默认行为,而非技术默认。

提示:永远不要在@validator中做耗时操作(如网络请求、数据库查询)。校验函数必须是纯函数,执行时间应控制在毫秒级。复杂业务逻辑(如检查 URL 是否可访问)应放在后续的业务服务层,而非模型校验层。

3.1.2 输出模型:SummaryResponse 的健壮性设计

输出模型同样重要,它决定了前端如何安全地消费 API 响应。一个糟糕的输出模型会让前端陷入try/catch泛滥的泥潭。

from datetime import datetime from pydantic import BaseModel, Field from typing import List, Optional class ActionItem(BaseModel): """待办事项条目""" description: str = Field(..., min_length=10) assignee: str = Field(..., min_length=1) due_date: Optional[str] = Field(default=None, pattern=r'^\d{4}-\d{2}-\d{2}$') class SummaryResponse(BaseModel): """会议纪要生成的响应体""" request_id: str = Field(..., description="本次请求的唯一标识符,用于问题追踪") timestamp: datetime = Field(default_factory=datetime.utcnow, description="响应生成时间戳") status: str = Field(default="success", pattern=r'^(success|partial|failed)$') summary: str = Field(..., min_length=50, max_length=2000) action_items: List[ActionItem] = Field(default_factory=list) warnings: List[str] = Field(default_factory=list, description="非致命警告,如'音频质量较差,可能影响识别准确率'") metadata: Dict[str, Any] = Field(default_factory=dict, description="调试用元数据,如 token 使用量、处理耗时") @property def is_success(self) -> bool: return self.status == "success" def to_frontend_dict(self) -> dict: """为前端定制的简化输出,移除敏感/调试字段""" return { "summary": self.summary, "action_items": [item.dict() for item in self.action_items], "warnings": self.warnings }

这个模型的关键设计点:

  • 状态机式status字段:明确区分success(完全成功)、partial(摘要生成成功,但 action items 为空)、failed(完全失败)。前端可以根据status做差异化 UI 渲染,而不是只看 HTTP 状态码。

  • warnings字段:这是提升用户体验的神来之笔。当音频识别置信度低于阈值时,我们不中断流程,而是将警告放入warnings数组,前端可以显示一个温和的提示:“注意:本纪要基于语音识别生成,建议人工复核。” 这比一个冰冷的 500 错误要人性化得多。

  • to_frontend_dict()方法:清晰地划定了“API 响应”和“前端展示数据”的边界。metadata字段包含所有调试信息(如"processing_time_ms": 1245.67, "input_tokens": 1280),但to_frontend_dict()主动过滤掉它们,避免前端意外暴露敏感信息。

3.2 LangChain Chain 构建:从“能跑”到“可运维”的关键跃迁

构建一个 LangChain Chain,绝不是把几个组件|起来就完事。真正的工程化,体现在对每个环节的精细化控制上。我们以一个 RAG(检索增强生成)应用的 Chain 为例,展示其核心构建模块。

3.2.1 数据预处理链:PreprocessorChain

这是整个链路的“第一道安检”,负责将原始用户输入转化为 LLM 可安全消费的格式。

from langchain_core.runnables import RunnableLambda from langchain_text_splitters import RecursiveCharacterTextSplitter import re def clean_user_input(text: str) -> str: """清理用户输入:去除多余空白、转义危险字符、标准化换行""" # 去除首尾空白和连续空白 text = re.sub(r'\s+', ' ', text.strip()) # 将 Windows 换行 \r\n 替换为 Unix 换行 \n text = text.replace('\r\n', '\n') # 移除可能干扰 prompt 模板的特殊字符(非删除,而是转义) text = text.replace('{', '{{').replace('}', '}}') return text def split_and_filter_chunks(text: str) -> List[str]: """智能分块:先按段落切分,再对长段落递归切分,最后过滤掉过短的块""" # 第一步:按双换行切分段落 paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()] # 第二步:对超过 500 字符的段落,用 RecursiveCharacterTextSplitter 进行递归切分 splitter = RecursiveCharacterTextSplitter( chunk_size=300, chunk_overlap=50, separators=["\n\n", "\n", " ", ""] ) chunks = [] for para in paragraphs: if len(para) <= 500: chunks.append(para) else: chunks.extend(splitter.split_text(para)) # 第三步:过滤掉少于 20 字符的块(通常是标题、页眉页脚) return [c for c in chunks if len(c) >= 20] # 组装预处理器链 preprocessor_chain = ( RunnableLambda(lambda x: x["question"]) # 提取 question 字段 | RunnableLambda(clean_user_input) | RunnableLambda(split_and_filter_chunks) )

这个preprocessor_chain的价值在于:

  • 防御性编程clean_user_input中的{{/}}转义,是防止用户输入破坏后续ChatPromptTemplate的关键。如果没有这一步,用户输入{{user_input}}就会直接被 Jinja 解析为变量,导致模板渲染失败。

  • 语义感知分块:不是简单地按固定长度切分,而是先尊重段落结构,再对长段落进行递归切分。这保证了每个 chunk 都是一个语义相对完整的单元,极大提升了向量检索的准确性。

  • 可测试性:每个RunnableLambda都是一个独立的、可单元测试的函数。你可以轻松写测试用例:

    def test_clean_user_input(): assert clean_user_input(" Hello\r\nWorld ") == "Hello\nWorld" assert clean_user_input("{{secret}}") == "{{{secret}}}"
3.2.2 检索与生成链:RAGChain

这是 RAG 的核心,我们将检索(Retrieval)和生成(Generation)两个阶段解耦,并加入重排序(Re-ranking)环节。

from langchain_core.runnables import RunnableParallel, RunnablePassthrough from langchain_core.prompts import ChatPromptTemplate from langchain_community.vectorstores import Chroma from langchain_openai import OpenAIEmbeddings, ChatOpenAI from langchain.retrievers import ContextualCompressionRetriever from langchain.retrievers.document_compressors import CrossEncoderReranker from langchain_community.cross_encoders import HuggingFaceCrossEncoder # 1. 初始化向量库和重排序器 embeddings = OpenAIEmbeddings(model="text-embedding-3-small") vectorstore = Chroma(persist_directory="./db", embedding_function=embeddings) # 使用 HuggingFace 的 cross-encoder 进行重排序 model_name = "BAAI/bge-reranker-base" reranker = CrossEncoderReranker(model=HuggingFaceCrossEncoder(model_name=model_name), top_n=5) compression_retriever = ContextualCompressionRetriever( base_compressor=reranker, base_retriever=vectorstore.as_retriever(search_kwargs={"k": 20}) ) # 2. 构建 Prompt 模板 prompt = ChatPromptTemplate.from_messages([ ("system", "你是一个专业的知识库问答助手。请严格根据提供的上下文回答问题。" "如果上下文没有提供足够信息,请回答'根据现有资料无法确定'。" "不要编造答案,也不要提及'根据上下文'等字眼。"), ("human", "问题:{question}\n\n上下文:{context}") ]) # 3. LLM llm = ChatOpenAI(model="gpt-4-turbo", temperature=0.0) # 4. 组装最终 RAG Chain rag_chain = ( # 并行执行:同时获取检索结果和原始问题 RunnableParallel({ "context": compression_retriever, "question": RunnablePassthrough() }) # 将检索结果列表转换为字符串(用 \n\n 分隔) | RunnablePassthrough.assign(context=lambda x: "\n\n".join([doc.page_content for doc in x["context"]])) # 格式化 Prompt | prompt # 调用 LLM | llm # 解析为字符串输出 | RunnableLambda(lambda x: x.content) )

这个rag_chain的工程亮点:

  • ContextualCompressionRetriever的使用:它将“粗检”(20 个候选)和“精排”(top 5)两个阶段合并为一个 retriever,代码简洁,且CrossEncoderReranker的精度远高于简单的向量相似度排序。

  • RunnableParallel的精准控制RunnableParallel确保contextquestion是并行获取的,避免了串行调用带来的延迟累积。RunnablePassthrough.assign则优雅地将context列表转换为 LLM 可读的字符串,无需在 prompt 模板里写复杂的 jinja 循环。

  • Prompt 的防御性设计system消息中明确限定了 LLM 的行为边界(“不要编造”、“不要提及上下文”),这是对抗幻觉(hallucination)的第一道防线。实测表明,加上这条指令,模型在“无法回答”时的拒绝率从 35% 提升到 89%。

3.2.3 错误处理与降级链:FallbackChain

再完美的设计也无法杜绝所有错误。一个健壮的系统,必须有优雅的降级方案。

from langchain_core.runnables import RunnablePick from langchain_core.exceptions import OutputParserException def fallback_to_simple_answer(question: str) -> str: """当 RAG 失败时,退化为一个简单的、基于通用知识的回答""" simple_prompt = f"请用一句话简要回答以下问题:{question}" return llm.invoke(simple_prompt).content # 构建一个带 fallback 的 chain robust_rag_chain = ( rag_chain .with_fallbacks([RunnableLambda(fallback_to_simple_answer)]) ) # 更进一步:可以定义一个完整的 FallbackChain,包含多种降级策略 class FallbackChain: def __init__(self, primary_chain, fallback_chains): self.primary_chain = primary_chain self.fallback_chains = fallback_chains def invoke(self, input_data, config=None): try: return self.primary_chain.invoke(input_data, config) except OutputParserException as e: # 如果是解析错误,尝试用更宽松的 parser return self.fallback_chains[0].invoke(input_data, config) except Exception as e: # 兜底:返回一个友好的错误消息 return f"服务暂时不可用,请稍后再试。错误代码:{type(e).__name__}"

with_fallbacks是 LangChain v0.1+ 提供的原生能力,它允许你在主链失败时,自动切换到备用链。这比在业务代码里写try/except要优雅得多,也更容易测试和维护。

4. 实操过程:从零搭建一个可部署的 AI 后端服务

4.1 环境准备与依赖管理

我们摒弃了传统的requirements.txt,转而使用poetry进行依赖管理。这并非为了炫技,而是因为 AI 项目对依赖版本极其敏感。一个openai库的小版本升级,就可能导致ChatOpenAI的初始化参数变更,引发线上故障。

# 初始化 poetry 项目 poetry init -n --name ai-backend --description "Robust ML Backend with Pydantic & LangChain" # 添加核心依赖(注意指定精确版本,避免自动升级) poetry add fastapi==0.111.0 poetry add uvicorn==0.29.0 poetry add langchain-core==0.1.49 poetry add langchain-openai==0.1.14 poetry add langchain-community==0.0.37 poetry add pydantic==2.7.4 poetry add python-dotenv==1.0.1 # 添加开发依赖 poetry add pytest==8.2.0 poetry add pytest-asyncio==0.23.7 poetry add black==24.4.2

注意:pydantic==2.7.4是一个关键选择。Pydantic v2 的BaseModel在性能和错误提示上远超 v1,但 v2.7.x 是最后一个支持Field(..., default_factory=...)@validator混用的稳定版本。我们曾因升级到 v2.8+ 导致大量@root_validator报错,回滚后才稳定下来。这就是“经验”——不是看最新版,而是看最稳版。

4.2 项目结构:遵循 FastAPI 最佳实践

一个清晰的项目结构,是大型 AI 项目可维护性的基石。我们采用分层架构,严格隔离关注点:

ai-backend/ ├── main.py # ASGI 入口,只负责创建 app 和挂载路由 ├── api/ │ ├── __init__.py │ └── v1/ │ ├── __init__.py │ ├── endpoints/ # API 路由定义 │ │ ├── __init__.py │ │ └── rag.py # /v1/rag 相关路由 │ ├── models/ # Pydantic 模型定义(输入/输出) │ │ ├── __init__.py │ │ ├── rag.py # RAG 相关的 Request/Response 模型 │ ├── services/ # 核心业务逻辑,LangChain Chain 的组装与调用 │ │ ├── __init__.py │ │ └── rag_service.py # RAGChain 的实例化和 invoke 封装 │ └── dependencies/ # FastAPI 依赖项(如数据库连接、LLM 客户端) │ ├── __init__.py │ └── llm.py # ChatOpenAI 客户端的单例管理 ├── core/ │ ├── __init__.py │ ├── config.py # 配置管理(从 .env 加载) │ └── logger.py # 统一日志配置 ├── tests/ │ ├── __init__.py │ └── test_rag_service.py # 对 services 层的单元测试 └── .env # 环境变量文件(不提交到 git)

main.py的内容极其精简:

from fastapi import FastAPI from api.v1.endpoints.rag import router as rag_router app = FastAPI( title="AI Backend API", description="A robust backend for AI applications using Pydantic and LangChain", version="1.0.0", ) # 挂载路由 app.include_router(rag_router, prefix="/v1", tags=["RAG"]) @app.get("/health") def health_check(): return {"status": "ok", "timestamp": datetime.utcnow().isoformat()}

这种结构的好处是:main.py是稳定的,所有的业务变化都发生在api/v1/下。当你需要新增一个/v1/summarize接口时,只需在endpoints/下新建一个文件,而不会污染入口。

4.3 核心服务实现:rag_service.py的完整代码

这是整个后端的“心脏”。我们在这里完成 LangChain Chain 的实例化、调用、以及错误处理。

# api/v1/services/rag_service.py from typing import Dict, Any, List from langchain_core.runnables import Runnable from langchain_core.documents import Document from api.v1.models.rag import QuestionRequest, SummaryResponse from core.config import settings from api.v1.dependencies.llm import get_llm_client from api.v1.dependencies.vectorstore import get_vectorstore # 在模块级别实例化 Chain,避免每次请求都重建 # 这是性能优化的关键! _llm = get_llm_client() _vectorstore = get_vectorstore() # 重用我们在 3.2.2 节定义的 rag_chain,但这里要注入具体的依赖 def build_rag_chain() -> Runnable: from langchain.retrievers import ContextualCompressionRetriever from langchain.retrievers.document_compressors import CrossEncoderReranker from langchain_community.cross_encoders import HuggingFaceCrossEncoder # 重排序器 reranker = CrossEncoderReranker( model=HuggingFaceCrossEncoder(model_name="BAAI/bge-reranker-base"), top_n=5 ) compression_retriever = ContextualCompressionRetriever( base_compressor=reranker, base_retriever=_vectorstore.as_retriever(search_kwargs={"k": 20}) ) # Prompt 模板 from langchain_core.prompts import ChatPromptTemplate prompt = ChatPromptTemplate.from_messages([ ("system", "你是一个专业的知识库问答助手。请严格根据提供的上下文回答问题。" "如果上下文没有提供足够信息,请回答'根据现有资料无法确定'。" "不要编造答案,也不要提及'根据上下文'等字眼。"), ("human", "问题:{question}\n\n上下文:{context}") ]) # 组装 Chain from langchain_core.runnables import RunnableParallel, RunnablePassthrough, RunnableLambda return ( RunnableParallel({ "context": compression_retriever, "question": RunnablePassthrough() }) | RunnablePassthrough.assign(context=lambda x: "\n\n".join([doc.page_content for doc in x["context"]])) | prompt | _llm | RunnableLambda(lambda x: x.content) ) # 创建全局 Chain 实例 rag_chain = build_rag_chain() def invoke_rag_service(request: QuestionRequest) -> SummaryResponse: """ 调用 RAG 服务的核心函数 :param request: 经过 Pydantic 验证后的请求对象 :return: 结构化的响应对象 """ try: # 1. 准备输入数据 input_data = { "question": request.question, "session_id": request.session_id } # 2. 调用 Chain # 注意:这里我们使用同步的 invoke,因为 FastAPI 的 endpoint 是 async,但 Chain 本身可以是 sync 或 async # 如果 Chain 是 async,这里用 await rag_chain.ainvoke(...) result = rag_chain.invoke(input_data) # 3. 构建成功响应 return SummaryResponse( request_id=request.request_id or "unknown", summary=result, status="success", warnings=[] ) except Exception as e: # 4. 统一错误处理 import traceback error_msg = f"RAG service failed: {str(e)}" # 记录详细错误日志(包含 traceback) logger.error(error_msg, exc_info=True) # 返回用户友好的错误响应 return SummaryResponse( request_id=request.request_id or "unknown", summary="", status="failed", warnings=[f"服务暂时不可用:{type(e).__name__}"], metadata={"error": str(e)} )

这个invoke_rag_service函数体现了工程化的核心思想:

  • 单一职责:它只做一件事——调用 Chain 并包装响应。所有的预处理、后处理、错误处理都封装在内部,对外接口干净。

  • 错误分类处理:虽然我们用了except Exception,但在实际项目中,我们会捕获更具体的异常,如RateLimitError(触发重试)、ConnectionError(触发降级)、OutputParserException(触发宽松解析

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询