基于 Python + LangChain + LangSmith 实现数据监控
2026/6/22 11:44:59 网站建设 项目流程

前言

在 AI 应用开发中,我们往往专注于功能实现,却忽略了一个关键问题:我们的 AI 到底消耗了多少资源?每次对话用了多少 Token?产生了多少费用?响应延迟如何?哪些模型和接口被调用得最多?

本文将以一个完整的聊天应用为例,从零开始搭建一套AI 使用数据监控系统,涵盖后端数据采集、API 设计和前端可视化面板的完整实现。

技术栈

  • 后端: Python + Flask + LangChain (ChatOpenAI / DashScope)
  • 数据库: MySQL
  • 前端: React 19 + Ant Design 5 + Zustand

一、整体架构

用户请求 │ ▼ Flask 后端 (server.py) │ ├─ 调用 LLM (ChatOpenAI) │ │ │ ▼ │ 提取 response_metadata (token 用量) │ │ │ ▼ ├─ log_usage() → 写入 MySQL (ai_usage_logs 表) │ ├─ /api/monitoring/summary ── 聚合统计 ├─ /api/monitoring/daily ── 每日趋势 ├─ /api/monitoring/recent ── 最近请求 │ ▼ React 前端 (MonitoringPanel.jsx) │ ├─ 5 个统计卡片 (Tokens / 费用 / 请求数 / 延迟 / 系统状态) ├─ SVG 双轴趋势图 (面积图 + 柱状图) ├─ 模型分布 + 端点分布 └─ 可筛选/排序的请求记录表格

二、后端实现

2.1 创建数据库表

首先在 MySQL 中创建使用日志表:

CREATE TABLE IF NOT EXISTS ai_usage_logs ( id INT AUTO_INCREMENT PRIMARY KEY, endpoint VARCHAR(100) NOT NULL COMMENT 'API 端点', model VARCHAR(50) NOT NULL COMMENT '模型名称', prompt_tokens INT DEFAULT 0, completion_tokens INT DEFAULT 0, total_tokens INT DEFAULT 0, latency_ms INT DEFAULT 0 COMMENT '响应耗时(ms)', cost DOUBLE DEFAULT 0 COMMENT '费用(CNY)', conversation_id VARCHAR(36) DEFAULT NULL COMMENT '关联对话', request_preview VARCHAR(200) DEFAULT NULL COMMENT '用户输入预览', response_preview VARCHAR(200) DEFAULT NULL COMMENT 'AI回复预览', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, INDEX idx_model (model), INDEX idx_endpoint (endpoint), INDEX idx_created (created_at) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

每条记录对应一次 LLM 调用,存储端点、模型、Token 用量、延迟、费用等关键指标。

2.2 使用数据追踪模块

创建usage_tracker.py,封装数据记录和提取逻辑:

"""AI 使用数据追踪模块""" import pymysql from config import DB_CONFIG # DashScope 模型定价 (CNY / 1K tokens) MODEL_PRICING = { "qwen-plus": {"input": 0.004, "output": 0.012}, "qwen-vl-plus": {"input": 0.008, "output": 0.02}, } def calculate_cost(model, prompt_tokens, completion_tokens): """根据模型定价计算费用""" pricing = MODEL_PRICING.get(model, {"input": 0.004, "output": 0.012}) return (prompt_tokens * pricing["input"] + completion_tokens * pricing["output"]) / 1000 def log_usage(endpoint, model, prompt_tokens=0, completion_tokens=0, total_tokens=0, latency_ms=0, conversation_id=None, request_preview=None, response_preview=None): """记录一次 AI 调用的使用数据""" if total_tokens == 0 and (prompt_tokens > 0 or completion_tokens > 0): total_tokens = prompt_tokens + completion_tokens cost = calculate_cost(model, prompt_tokens, completion_tokens) conn = pymysql.connect(**DB_CONFIG) try: cursor = conn.cursor() cursor.execute( """INSERT INTO ai_usage_logs (endpoint, model, prompt_tokens, completion_tokens, total_tokens, latency_ms, cost, conversation_id, request_preview, response_preview) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""", (endpoint, model, prompt_tokens, completion_tokens, total_tokens, latency_ms, round(cost, 6), conversation_id, (request_preview or "")[:200], (response_preview or "")[:200]) ) conn.commit() finally: conn.close() def extract_metadata(response): """从 LangChain 响应中提取 Token 使用数据""" metadata = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0, "model_name": ""} if hasattr(response, "response_metadata"): rm = response.response_metadata if isinstance(rm, dict): token_usage = rm.get("token_usage", {}) metadata["prompt_tokens"] = token_usage.get("prompt_tokens", 0) metadata["completion_tokens"] = token_usage.get("completion_tokens", 0) metadata["total_tokens"] = token_usage.get("total_tokens", 0) metadata["model_name"] = rm.get("model_name", "") return metadata def estimate_tokens(text): """估算 Token 数 (中文约 1.5 token/字, 英文约 0.25/字符)""" if not text: return 0 chinese = sum(1 for c in text if '\u4e00' <= c <= '\u9fff') return int(chinese * 1.5 + (len(text) - chinese) * 0.25)

核心设计要点:

  • extract_metadata()— 从 LangChain 的ChatOpenAI响应对象中提取精确的 Token 用量
  • estimate_tokens()— 对于流式调用(DashScope 不返回 Token 统计),用字符估算替代
  • calculate_cost()— 基于模型定价自动计算每次调用费用
2.3 在 LLM 调用点注入追踪

流式调用(估算 Token):

# 流式调用 — DashScope 不返回 Token 统计,使用估算 start_time = time.time() full_reply = "" for chunk in llm.stream(history): if chunk.content: full_reply += chunk.content yield f"data: {json.dumps({'token': chunk.content})}\n\n" # 流结束后记录 latency = int((time.time() - start_time) * 1000) p_est = estimate_tokens(user_message) c_est = estimate_tokens(full_reply) log_usage( endpoint="/api/chat/stream", model="qwen-plus", prompt_tokens=p_est, completion_tokens=c_est, total_tokens=p_est + c_est, latency_ms=latency, request_preview=user_message, response_preview=full_reply[:200], )

关键陷阱:在流式端点中,如果不同路径(RAG、MCP、普通对话)共用log_usage,务必确保引用的变量在所有路径中都存在。例如history只在普通对话路径定义,RAG/MCP 路径应使用user_message进行 Token 估算。

2.4 监控 API 端点

提供三个聚合查询接口,供前端面板调用:

# ==================== 监控 API ==================== @app.route("/api/monitoring/summary", methods=["GET"]) def monitoring_summary(): """获取使用数据汇总""" days = int(request.args.get("days", 30)) import pymysql conn = pymysql.connect(**_DB_CONFIG) try: cursor = conn.cursor(pymysql.cursors.DictCursor) # 总体统计 cursor.execute( """SELECT COUNT(*) as total_requests, COALESCE(SUM(prompt_tokens), 0) as prompt_tokens, COALESCE(SUM(completion_tokens), 0) as completion_tokens, COALESCE(SUM(total_tokens), 0) as total_tokens, COALESCE(SUM(cost), 0) as total_cost, COALESCE(AVG(latency_ms), 0) as avg_latency, COALESCE(MAX(latency_ms), 0) as max_latency, COALESCE(MIN(latency_ms), 0) as min_latency FROM ai_usage_logs WHERE created_at >= DATE_SUB(NOW(), INTERVAL %s DAY)""", (days,), ) overall = cursor.fetchone() # P95 延迟 cursor.execute( """SELECT COALESCE(latency_ms, 0) as latency FROM ai_usage_logs WHERE created_at >= DATE_SUB(NOW(), INTERVAL %s DAY) AND latency_ms IS NOT NULL ORDER BY latency ASC""", (days,), ) latency_rows = cursor.fetchall() p95_latency = 0 if latency_rows: idx = int(len(latency_rows) * 0.95) p95_latency = latency_rows[min(idx, len(latency_rows) - 1)]["latency"] # 成功率 (有 response_preview 的视为成功) cursor.execute( """SELECT COUNT(*) as total, SUM(CASE WHEN response_preview IS NOT NULL AND response_preview != '' THEN 1 ELSE 0 END) as success FROM ai_usage_logs WHERE created_at >= DATE_SUB(NOW(), INTERVAL %s DAY)""", (days,), ) sr = cursor.fetchone() success_rate = round((int(sr["success"]) / int(sr["total"])) * 100, 1) if int(sr["total"]) > 0 else 100.0 # 按模型分组 cursor.execute( """SELECT model, COUNT(*) as requests, COALESCE(SUM(prompt_tokens), 0) as prompt_tokens, COALESCE(SUM(completion_tokens), 0) as completion_tokens, COALESCE(SUM(total_tokens), 0) as tokens, COALESCE(SUM(cost), 0) as cost FROM ai_usage_logs WHERE created_at >= DATE_SUB(NOW(), INTERVAL %s DAY) GROUP BY model""", (days,), ) model_rows = cursor.fetchall() models = {} for row in model_rows: models[row["model"]] = { "tokens": row["tokens"], "promptTokens": row["prompt_tokens"], "completionTokens": row["completion_tokens"], "requests": row["requests"], "cost": round(float(row["cost"]), 4), } # 按端点分组 cursor.execute( """SELECT endpoint, COUNT(*) as requests, COALESCE(SUM(total_tokens), 0) as tokens, COALESCE(SUM(cost), 0) as cost FROM ai_usage_logs WHERE created_at >= DATE_SUB(NOW(), INTERVAL %s DAY) GROUP BY endpoint""", (days,), ) ep_rows = cursor.fetchall() endpoints = {} for row in ep_rows: endpoints[row["endpoint"]] = { "tokens": row["tokens"], "requests": row["requests"], "cost": round(float(row["cost"]), 4), } # 今日统计 cursor.execute( """SELECT COALESCE(SUM(total_tokens), 0) as tokens, COUNT(*) as requests, COALESCE(SUM(cost), 0) as cost FROM ai_usage_logs WHERE created_at >= CURDATE()""" ) today = cursor.fetchone() # 昨日统计 cursor.execute( """SELECT COALESCE(SUM(total_tokens), 0) as tokens FROM ai_usage_logs WHERE created_at >= DATE_SUB(CURDATE(), INTERVAL 1 DAY) AND created_at < CURDATE()""" ) yesterday = cursor.fetchone() yd_tokens = yesterday["tokens"] td_tokens = int(today["tokens"]) yd_tokens = int(yesterday["tokens"]) tokens_change = round(((td_tokens - yd_tokens) / yd_tokens) * 100, 1) if yd_tokens > 0 else (100.0 if td_tokens > 0 else 0.0) # 最后请求时间 cursor.execute("SELECT MAX(created_at) as last_at FROM ai_usage_logs") last_row = cursor.fetchone() last_request_at = str(last_row["last_at"]) if last_row["last_at"] else None # 本月费用 cursor.execute( """SELECT COALESCE(SUM(cost), 0) as cost FROM ai_usage_logs WHERE created_at >= DATE_FORMAT(NOW(), '%%Y-%%m-01')""" ) month_cost = float(cursor.fetchone()["cost"]) return jsonify({ "code": 200, "data": { "totalTokens": overall["total_tokens"], "promptTokens": overall["prompt_tokens"], "completionTokens": overall["completion_tokens"], "totalCost": round(float(overall["total_cost"]), 4), "totalRequests": overall["total_requests"], "avgLatencyMs": int(overall["avg_latency"]), "maxLatencyMs": int(overall["max_latency"]), "minLatencyMs": int(overall["min_latency"]), "p95LatencyMs": int(p95_latency), "successRate": success_rate, "todayTokens": td_tokens, "todayRequests": today["requests"], "todayCost": round(float(today["cost"]), 4), "yesterdayTokens": yd_tokens, "tokensChangeRate": tokens_change, "lastRequestAt": last_request_at, "monthCost": round(month_cost, 4), "models": models, "endpoints": endpoints, }, }) except Exception as e: return jsonify({"code": 500, "msg": str(e)}) finally: conn.close() @app.route("/api/monitoring/daily", methods=["GET"]) def monitoring_daily(): """获取每日使用趋势""" days = int(request.args.get("days", 7)) import pymysql conn = pymysql.connect(**_DB_CONFIG) try: cursor = conn.cursor(pymysql.cursors.DictCursor) cursor.execute( """SELECT DATE(created_at) as date, COALESCE(SUM(total_tokens), 0) as tokens, COUNT(*) as requests, COALESCE(SUM(cost), 0) as cost FROM ai_usage_logs WHERE created_at >= DATE_SUB(NOW(), INTERVAL %s DAY) GROUP BY DATE(created_at) ORDER BY date ASC""", (days,), ) rows = cursor.fetchall() data = [] for row in rows: data.append({ "date": str(row["date"]), "tokens": row["tokens"], "requests": row["requests"], "cost": round(float(row["cost"]), 4), }) return jsonify({"code": 200, "data": data}) except Exception as e: return jsonify({"code": 500, "msg": str(e)}) finally: conn.close() @app.route("/api/monitoring/recent", methods=["GET"]) def monitoring_recent(): """获取最近的 AI 调用记录""" limit = int(request.args.get("limit", 20)) import pymysql conn = pymysql.connect(**_DB_CONFIG) try: cursor = conn.cursor(pymysql.cursors.DictCursor) cursor.execute( """SELECT id, endpoint, model, conversation_id, prompt_tokens, completion_tokens, total_tokens, latency_ms, cost, request_preview, response_preview, created_at FROM ai_usage_logs ORDER BY created_at DESC LIMIT %s""", (limit,), ) rows = cursor.fetchall() data = [] for row in rows: has_response = row["response_preview"] is not None and row["response_preview"] != "" data.append({ "id": row["id"], "endpoint": row["endpoint"], "model": row["model"], "conversationId": row["conversation_id"], "promptTokens": row["prompt_tokens"], "completionTokens": row["completion_tokens"], "totalTokens": row["total_tokens"], "latencyMs": row["latency_ms"], "cost": round(float(row["cost"]), 4), "requestPreview": row["request_preview"], "responsePreview": row["response_preview"][:200] if row["response_preview"] else None, "status": "success" if has_response else "failed", "createdAt": str(row["created_at"]), }) return jsonify({"code": 200, "data": data}) except Exception as e: return jsonify({"code": 500, "msg": str(e)}) finally: conn.close()

三、前端实现

3.1 API 服务层
// monitoringService.js /** * Monitoring API Service * AI 使用数据监控 */ const BASE_URL = process.env.REACT_APP_CHAT_API_URL || 'http://localhost:5000'; export class MonitoringService { async getSummary(days = 30) { const res = await fetch(`${BASE_URL}/api/monitoring/summary?days=${days}`); const data = await res.json(); if (data.code === 200) return data.data; throw new Error(data.msg || '获取监控汇总失败'); } async getDailyUsage(days = 7) { const res = await fetch(`${BASE_URL}/api/monitoring/daily?days=${days}`); const data = await res.json(); if (data.code === 200) return data.data; throw new Error(data.msg || '获取每日趋势失败'); } async getRecentRequests(limit = 20) { const res = await fetch(`${BASE_URL}/api/monitoring/recent?limit=${limit}`); const data = await res.json(); if (data.code === 200) return data.data; throw new Error(data.msg || '获取最近记录失败'); } } export const monitoringService = new MonitoringService();
3.2 监控面板组件

面板使用 antdModal弹窗 (960px 宽),包含以下区域:

1) 五个统计卡片

<Row gutter={[12, 12]} align="stretch"> {/* 总 Tokens / 总费用 / 请求次数 / 平均延迟 / 系统状态 */} {statCards.map(card => ( <Col flex="1" key={card.key}> <div className={styles.statCard} style={{ background: card.gradient }}> <div className={styles.statLabel}>{card.title}</div> <div className={styles.statValue}>{card.value}</div> <div className={styles.statSub}>{card.sub}</div> </div> </Col> ))} </Row>

每个卡片使用渐变背景色 + 环比变化率标签 + 今日子数据,系统状态卡片额外展示成功率、API 在线状态、月预算进度条。

2) SVG 双轴趋势图

使用纯 SVG 绘制面积图 + 柱状图组合:

  • 左 Y 轴 = Tokens(平滑曲线 + 面积填充)
  • 右 Y 轴 = 请求次数(半透明柱状图)
  • Catmull-Rom 曲线平滑算法,hover 显示详情
function DualChart({ data }) { // 平滑曲线路径 (Catmull-Rom → Cubic Bezier) function smoothPath(points) { let d = `M${points[0].x},${points[0].y}`; for (let i = 0; i < points.length - 1; i++) { const tension = 0.3; // 计算 Catmull-Rom 控制点... d += ` C${cp1x},${cp1y} ${cp2x},${cp2y} ${p2.x},${p2.y}`; } return d; } // SVG: 网格线 + 柱状图 + 面积 + 曲线 + 数据点 }

3) 模型分布 + 端点分布

左右并排展示,使用 antdProgress进度条 + 百分比,直观显示各模型和端点的使用占比。

4) 最近请求表格

<Table dataSource={recentRequests} columns={[ { title: '时间', sorter: true }, { title: '端点', filters: [...] }, // 可筛选 { title: '模型', filters: [...] }, { title: 'Input/Output' }, { title: '延迟', sorter: true, render: colorByLatency }, { title: '状态', render: successOrFailedTag }, { title: '操作', render: detailPopover }, ]} pagination={{ pageSize: 8 }} />

支持按端点、模型筛选,按时间、Token、延迟排序,点击「详情」查看完整请求/响应内容。

3.3 浮动按钮入口

在聊天页面右下角添加浮动按钮,点击打开监控弹窗:

<div className={styles.monitoringFab} onClick={() => setMonitoringOpen(true)}> <DashboardOutlined /> </div> <MonitoringPanel open={monitoringOpen} onClose={() => setMonitoringOpen(false)} /> </div>
.monitoringFab { position: fixed; bottom: 28px; right: 28px; width: 48px; height: 48px; border-radius: 50%; background: linear-gradient(135deg, #1677ff, #4096ff); color: #fff; display: flex; align-items: center; justify-content: center; font-size: 20px; cursor: pointer; box-shadow: 0 4px 16px rgba(22, 119, 255, 0.4); z-index: 100; transition: all 0.25s ease; user-select: none; &:hover { transform: scale(1.1); box-shadow: 0 6px 24px rgba(22, 119, 255, 0.5); } &:active { transform: scale(0.95); } }

五、最终效果

监控面板展示以下数据:

区域内容
统计卡片总 Tokens (含环比) / 总费用 / 请求次数 / 平均延迟 / 系统状态 + 预算进度
趋势图每日 Token 使用趋势 (面积图) + 请求次数 (柱状图),双 Y 轴
分布按模型分布 + 按端点分布的进度条
请求列表可筛选/排序的最近请求,含状态、详情弹窗

总结

本文实现了一套轻量但完整的 AI 使用监控系统,核心思路是:

  1. 拦截 LLM 响应— 从response_metadata提取精确数据,流式调用用估算
  2. 结构化存储— 每次调用记录端点、模型、Token、延迟、费用到 MySQL
  3. 聚合 API— 提供多维度统计查询,支持时间范围、模型/端点分组、环比计算
  4. 可视化面板- 渐变卡片 + SVG 图表 + 可操作表格,集成在聊天界面中

整个过程不需要 LangSmith 平台账号,纯自建实现。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询