AI 边缘部署:多模型级联推理的工程实践,从任务拆分到流水线调度
一、单模型的边缘困境:一个模型搞不定所有事
边缘设备上的 AI 推理,往往不是"部署一个模型就完事"的简单场景。以工业质检为例,一个完整的检测流程需要:先定位产品区域(目标检测),再识别缺陷类型(分类),最后判断缺陷严重程度(回归)。如果用一个"万能大模型"端到端完成,模型参数量远超边缘芯片的承载能力;如果部署三个独立模型串行执行,中间结果的内存拷贝和上下文切换会吃掉大量算力。
更深层的问题是,多模型级联推理引入了新的系统复杂度:模型间的数据格式转换、内存共享机制、流水线调度策略、单模型失败时的降级方案。这些工程问题不是"调调参数"能解决的,需要从系统架构层面设计一套适合边缘资源约束的级联推理框架。
二、多模型级联推理的架构设计
2.1 级联推理的数据流
graph LR subgraph "输入预处理" Camera[摄像头帧] -->|缩放/归一化| PreProc[预处理模块] end subgraph "级联推理流水线" PreProc -->|640x640 RGB| Det[检测模型<br/>YOLOv5-Nano] Det -->|裁剪ROI| Crop[ROI 裁剪] Crop -->|96x96 Patch| Cls[分类模型<br/>MobileNetV3] Cls -->|缺陷类型| Reg[回归模型<br/>轻量MLP] Reg -->|严重程度| Post[后处理] end subgraph "内存管理" Shared[共享内存池] Shared -.->|零拷贝| Det Shared -.->|零拷贝| Cls Shared -.->|零拷贝| Reg end Post -->|判定结果| Output[输出]级联推理的核心设计原则是零拷贝传递。传统方案中,检测模型输出的 ROI 区域需要拷贝到新缓冲区再送入分类模型,每次拷贝涉及 CPU 介入和内存分配。在 ARM Cortex-A 平台上,一次 640x640 图像的内存拷贝约需 2ms,三个模型级联就是 6ms 的纯拷贝开销——对于要求 30fps 的实时场景,这是不可接受的。
解决方案是预分配一块连续的共享内存池,所有模型的输入输出张量都映射到这块内存中。检测模型输出的 ROI 坐标直接作为分类模型的输入偏移量,无需数据搬移。这种设计将级联间的数据传递开销从毫秒级降到微秒级。
2.2 流水线调度策略
gantt title 多模型级联推理时序(流水线 vs 串行) dateFormat X axisFormat %L section 串行执行 预处理 :a1, 0, 5 检测模型 :a2, 5, 20 ROI裁剪 :a3, 20, 22 分类模型 :a4, 22, 35 回归模型 :a5, 35, 42 section 流水线执行 预处理 :b1, 0, 5 检测模型 :b2, 5, 20 ROI裁剪 :b3, 20, 22 分类模型 :b4, 20, 33 回归模型 :b5, 33, 40 预处理_F2 :b6, 5, 10 检测_F2 :b7, 20, 35流水线调度的关键洞察:检测模型处理第 N 帧时,预处理模块可以同时处理第 N+1 帧。当检测模型输出 ROI 后,分类模型立即开始处理第 N 帧的 ROI,而检测模型可以开始处理第 N+1 帧。这种流水线重叠将整体吞吐量提升约 40%。
三、生产级级联推理框架实现
3.1 共享内存池与零拷贝张量传递
/* * 共享内存池:预分配连续内存,所有模型共享 * 核心思路:避免推理过程中的动态内存分配,消除内存碎片和拷贝开销 */ #include <stdint.h> #include <stdlib.h> #include <string.h> #define MEMORY_POOL_SIZE (32 * 1024 * 1024) /* 32MB 共享内存池 */ #define MAX_TENSOR_COUNT 16 typedef struct { void* data; /* 数据指针,指向内存池中的偏移位置 */ size_t size; /* 张量数据字节数 */ int shape[4]; /* 张量形状 [N, C, H, W] */ int ndim; /* 维度数 */ int dtype; /* 数据类型: 0=float32, 1=int8 */ int model_id; /* 所属模型ID,用于生命周期管理 */ int ref_count; /* 引用计数,为0时可回收 */ } TensorDescriptor; typedef struct { uint8_t* pool; /* 内存池基地址 */ size_t pool_size; /* 内存池总大小 */ size_t used_offset; /* 当前已分配偏移 */ TensorDescriptor tensors[MAX_TENSOR_COUNT]; int tensor_count; } MemoryPool; /* 初始化内存池 */ int mempool_init(MemoryPool* mp) { mp->pool = (uint8_t*)malloc(MEMORY_POOL_SIZE); if (!mp->pool) return -1; mp->pool_size = MEMORY_POOL_SIZE; mp->used_offset = 0; mp->tensor_count = 0; memset(mp->tensors, 0, sizeof(mp->tensors)); return 0; } /* 从内存池分配张量,返回张量描述符索引 */ int mempool_alloc_tensor(MemoryPool* mp, int model_id, const int* shape, int ndim, int dtype) { if (mp->tensor_count >= MAX_TENSOR_COUNT) return -1; /* 计算张量所需字节数 */ size_t elem_count = 1; for (int i = 0; i < ndim; i++) elem_count *= shape[i]; size_t elem_size = (dtype == 0) ? sizeof(float) : sizeof(int8_t); size_t total_size = elem_count * elem_size; /* 对齐到64字节,适配ARM NEON的访存对齐要求 */ size_t aligned_size = (total_size + 63) & ~63; if (mp->used_offset + aligned_size > mp->pool_size) { /* 内存不足,尝试回收引用计数为0的张量 */ mempool_gc(mp); if (mp->used_offset + aligned_size > mp->pool_size) { return -1; /* 回收后仍不足 */ } } int idx = mp->tensor_count++; mp->tensors[idx].data = mp->pool + mp->used_offset; mp->tensors[idx].size = aligned_size; memcpy(mp->tensors[idx].shape, shape, ndim * sizeof(int)); mp->tensors[idx].ndim = ndim; mp->tensors[idx].dtype = dtype; mp->tensors[idx].model_id = model_id; mp->tensors[idx].ref_count = 1; mp->used_offset += aligned_size; return idx; } /* ROI零拷贝:检测模型输出ROI坐标,分类模型直接引用偏移地址 */ int mempool_create_roi_view(MemoryPool* mp, int src_tensor_idx, int x, int y, int w, int h, int target_model_id) { /* 不分配新内存,直接在源张量上创建视图 */ TensorDescriptor* src = &mp->tensors[src_tensor_idx]; int src_w = src->shape[3]; /* 源张量宽度 */ int roi_shape[4] = {1, src->shape[1], h, w}; /* ROI形状 */ int idx = mp->tensor_count++; /* 计算ROI在源张量中的偏移地址 */ size_t elem_size = (src->dtype == 0) ? sizeof(float) : sizeof(int8_t); size_t row_stride = src_w * src->shape[1] * elem_size; mp->tensors[idx].data = (uint8_t*)src->data + y * row_stride + x * src->shape[1] * elem_size; mp->tensors[idx].size = 0; /* 视图模式,不拥有内存 */ memcpy(mp->tensors[idx].shape, roi_shape, 4 * sizeof(int)); mp->tensors[idx].ndim = 4; mp->tensors[idx].dtype = src->dtype; mp->tensors[idx].model_id = target_model_id; mp->tensors[idx].ref_count = 1; /* 增加源张量的引用计数,防止被回收 */ src->ref_count++; return idx; } /* 垃圾回收:释放引用计数为0的张量 */ void mempool_gc(MemoryPool* mp) { /* 简化实现:标记-压缩,将存活张量紧凑排列 */ size_t new_offset = 0; int new_count = 0; for (int i = 0; i < mp->tensor_count; i++) { if (mp->tensors[i].ref_count > 0) { if (mp->tensors[i].data != mp->pool + new_offset) { memmove(mp->pool + new_offset, mp->tensors[i].data, mp->tensors[i].size); mp->tensors[i].data = mp->pool + new_offset; } new_offset += mp->tensors[i].size; mp->tensors[new_count++] = mp->tensors[i]; } } mp->used_offset = new_offset; mp->tensor_count = new_count; }3.2 流水线调度器
/* * 流水线调度器:管理多模型级联推理的执行时序 * 核心设计:基于状态机的非阻塞调度,避免模型间的等待 */ #include <pthread.h> typedef enum { STAGE_IDLE, STAGE_RUNNING, STAGE_DONE, } StageState; typedef struct { int stage_id; int model_id; StageState state; int input_tensor_idx; int output_tensor_idx; int64_t start_time_us; int64_t end_time_us; } PipelineStage; typedef struct { PipelineStage stages[8]; int stage_count; int current_frame; pthread_mutex_t lock; } PipelineScheduler; /* 推进流水线:检查每个阶段的前置依赖是否完成 */ void pipeline_tick(PipelineScheduler* sched, MemoryPool* mp) { pthread_mutex_lock(&sched->lock); for (int i = 0; i < sched->stage_count; i++) { PipelineStage* s = &sched->stages[i]; if (s->state != STAGE_IDLE) continue; /* 检查前置阶段是否完成 */ if (i > 0 && sched->stages[i-1].state != STAGE_DONE) continue; /* 前置完成,启动当前阶段 */ s->state = STAGE_RUNNING; s->start_time_us = get_time_us(); /* 提交推理任务到NCNN/TFLite执行器 */ submit_inference(s->model_id, s->input_tensor_idx, s->output_tensor_idx, mp); } pthread_mutex_unlock(&sched->lock); } /* 推理完成回调 */ void on_inference_done(PipelineScheduler* sched, int stage_id) { pthread_mutex_lock(&sched->lock); PipelineStage* s = &sched->stages[stage_id]; s->state = STAGE_DONE; s->end_time_us = get_time_us(); /* 通知下一阶段可以启动 */ if (stage_id + 1 < sched->stage_count) { sched->stages[stage_id + 1].input_tensor_idx = s->output_tensor_idx; } pthread_mutex_unlock(&sched->lock); }四、级联推理的 Trade-offs 分析
方案一:级联推理 vs 端到端大模型
| 维度 | 多模型级联 | 端到端大模型 |
|---|---|---|
| 模型总参数量 | 小(3个小模型合计 < 5M) | 大(单模型 > 20M) |
| 推理延迟 | 中等(级联间有调度开销) | 低(单次前向传播) |
| 精度 | 高(每个模型专注单一任务) | 中等(多任务折衷) |
| 可维护性 | 高(单模型可独立更新) | 低(改一个任务需重训整体) |
| 内存峰值 | 低(模型可逐个加载) | 高(需同时加载整个模型) |
方案二:串行执行 vs 流水线执行
流水线执行将吞吐量提升约 40%,但引入了帧间状态管理的复杂度——第 N 帧的分类结果可能与第 N+1 帧的检测结果同时产生,需要用帧序号标记每个结果。此外,流水线要求每个阶段的执行时间相近,如果某个阶段耗时远大于其他阶段,流水线加速效果会退化为串行。
关键边界条件:
- 共享内存池的大小是硬约束。32MB 的内存池在 ARM Cortex-A53 上是合理的上限,超过这个值会影响 Linux 系统的页面缓存。如果三个模型的权重 + 中间张量超过 32MB,需要采用模型逐个加载策略,用 Flash 存储暂存不活跃的模型权重
- ROI 零拷贝视图要求源张量在内存中是连续排列的。如果检测模型的输出经过后处理(如 NMS)导致 ROI 区域不连续,零拷贝视图无法直接使用,需要退回到拷贝模式
- 流水线调度在单核 CPU 上没有收益——没有并行计算资源,流水线退化为串行。至少需要 2 核 CPU 才能体现流水线优势
五、总结
多模型级联推理是边缘 AI 落地的务实选择——用多个小模型协作替代一个大模型,在资源受限的边缘芯片上实现复杂的多步骤 AI 任务。核心工程挑战是级联间的数据传递开销和调度复杂度。
关键设计决策有三点:第一,共享内存池 + 零拷贝视图消除级联间的数据拷贝开销,将传递延迟从毫秒级降到微秒级;第二,流水线调度重叠相邻帧的推理阶段,提升整体吞吐量约 40%;第三,帧序号标记解决流水线模式下的结果归因问题。
落地建议:先用串行模式验证级联逻辑的正确性,再切换到流水线模式优化吞吐。内存池大小根据实际模型参数量设定,预留 20% 的安全余量。单模型失败时,采用降级策略——跳过失败模型,用前一个模型的输出作为最终结果,保证系统可用性而非完全中断。