影刀RPA店群自动化教程:Python协同流程模块化与依赖注入实战
2026/6/5 5:59:54 网站建设 项目流程

影刀RPA店群自动化教程:Python协同流程模块化与依赖注入实战


一个上货流程,从登录到提交,两百个步骤塞在一个文件里。

拼多多店群自动化上架方案

改一个选择器,要在这个“意大利面条”里翻找十分钟,还担心牵一发动全身。

店群自动化做到中后期,流程复杂度是指数级增长的。
早期一个“上货”流程就是一条线性的影刀脚本,后来逐渐加入了价格校验、图片审核、运费模板选择、活动报名检查……
流程文件越来越长,维护成本越来越高。

更麻烦的是,很多操作在不同流程中是重复的。
“登录店铺”、“选择商品类目”、“上传图片”这些步骤,在商品上架、商品编辑、活动报名等多个流程中都会出现。但因为是复制粘贴的,一个步骤的优化需要同步修改四五个流程文件。

我们后来将影刀RPA的流程进行了彻底的模块化拆分,并用Python构建了一套依赖注入和流程组装框架,让每一个流程都是“可组合的积木”。

这篇文章就展开这套模块化体系的设计思想、工程实现和落地经验。

TEMU店群如何管理运营?


一、把流程拆成原子模块:从“脚本”到“组件”

模块化的第一步,是把长流程分解为短小、单一职责的子流程。

拆分原则:

  • 一个子流程只做一件事:比如“登录”、“选择商品类目”、“上传主图”、“填写价格”。
    • 子流程独立可测试:每个子流程可以单独运行和验证,不依赖外部上下文。
    • 子流程有明确的输入和输出:输入是参数(JSON),输出是结果(JSON)和状态码。

早期我们拆分后的模块结构大概是这样的:

flows/ pdd/ login.flow select_category.flow upload_image.flow fill_price.flow submit_product.flow temu/ login.flow ... common/ wait_for_element.flow scroll_page.flow ``` 但光有文件拆分还不够。 模块之间的调用关系、参数传递、错误处理如果在影刀内部硬编码,依旧难以维护。 所以我们将“组装”的职责交给了Python调度层。 --- ## 二、流程组装的“配方”:用配置描述流程 我们不直接在影刀里写死调用哪个子流程,而是用一个JSON/YAML配置文件来描述整个业务流程的组合逻辑。 一个拼多多商品上架的“配方”示例如下: ```yaml flow_name: pdd_upload_product version: 3.2.0 modules: - module: pdd/login - id: login_step - params: - shop_id: "{{ input.shop_id }}" - on_failure: abort - module: common/navigate - id: nav_create - params: - url: "/product/create" - depends_on: [login_step] - module: pdd/select_category - id: select_cat - params: - category_id: "{{ input.category_id }}" - depends_on: [nav_create] - module: pdd/upload_image - id: upload_main_img - params: - image_url: "{{ input.main_image }}" - image_type: main - depends_on: [select_cat] - module: pdd/upload_image - id: upload_detail_imgs - params: - image_urls: "{{ input.detail_images }}" - image_type: detail - depends_on: [select_cat] - module: pdd/fill_price - id: fill_price_step - params: - price: "{{ input.price }}" - compare_price: "{{ input.compare_price }}" - depends_on: [select_cat] - module: pdd/submit_product - id: submit_step - params: - expect_status: "onsale" - depends_on: [upload_main_img, upload_detail_imgs, fill_price_step] - on_failure: retry_twice - ``` 这个配置描述了:整个上货流程由哪些模块组成,模块之间的依赖关系(`depends_on`),以及每个模块的输入参数如何从上游或外部输入中获得。 Python调度引擎读取这个配方,自动解析依赖,生成执行计划。 --- ## 三、依赖注入:让模块不关心参数从哪来 上面的配置中,参数值使用了 `{{ input.shop_id }}` 这样的占位符。 Python引擎在执行前,会将外部输入的数据注入到模块参数中,模块本身不需要知道数据来源。 依赖注入分几个层级: - **外部输入**:由调度层在创建任务时提供,如 `shop_id`, `product_data` - - **上游模块输出**:前一个模块的返回值可以作为后续模块的输入 - - **全局配置**:店铺的公共配置,如运费模板ID、默认仓库ID - - **环境变量**:Worker相关的参数,如临时目录路径 ```python import re from typing import Any, Dict class ParameterResolver: EXPR_PATTERN = re.compile(r'\{\{\s*(.+?)\s*\}\}') def resolve(self, template: Any, context: Dict[str, Any]) -> Any: if isinstance(template, str): return self.EXPR_PATTERN.sub( lambda m: str(self._resolve_expression(m.group(1), context)), template ) elif isinstance(template, dict): return {k: self.resolve(v, context) for k, v in template.items()} elif isinstance(template, list): return [self.resolve(item, context) for item in template] return template def _resolve_expression(self, expr: str, context: dict): parts = expr.split(".") value = context for part in parts: if isinstance(value, dict): value = value.get(part, "") else: value = getattr(value, part, "") return value ``` 上下文在任务创建时初始化,随着模块执行不断更新。 每个模块执行完毕后,其输出会自动合并到上下文中,供后续模块使用。 --- ## 四、流程组装引擎:自动处理依赖和并行 Python引擎在接收到配方后,先做两件事: 1. 解析 `depends_on` 关系,构建DAG(有向无环图)。 2. 2. 按依赖顺序执行,无依赖关系的模块可以并行执行。 ```python import asyncio class FlowAssembler: def __init__(self, module_registry, resolver): self.registry = module_registry self.resolver = resolver def build_execution_plan(self, recipe: dict) -> dict: modules = recipe["modules"] id_map = {m["id"]: m for m in modules} dep_graph = {m["id"]: set(m.get("depends_on", [])) for m in modules} # 拓扑排序 order = self._topological_sort(dep_graph) return {"order": order, "id_map": id_map} async def execute(self, recipe: dict, input_context: dict): plan = self.build_execution_plan(recipe) context = {"input": input_context, "modules": {}} for batch in plan["order"]: batch_tasks = [] for module_id in batch: module_def = plan["id_map"][module_id] resolved_params = self.resolver.resolve(module_def["params"], context) batch_tasks.append(self._execute_module(module_def, resolved_params)) results = await asyncio.gather(*batch_tasks, return_exceptions=True) for module_id, result in zip(batch, results): if isinstance(result, Exception): context["modules"][module_id] = {"success": False, "error": str(result)} else: context["modules"][module_id] = result return context def _topological_sort(self, dep_graph: dict) -> list: in_degree = {n: 0 for n in dep_graph} for deps in dep_graph.values(): for d in deps: in_degree[d] = in_degree.get(d, 0) + 1 # Kahn算法,返回每层并行节点 from collections import deque batches = [] queue = deque([n for n, d in in_degree.items() if d == 0]) while queue: batch = list(queue) batches.append(batch) queue.clear() for n in batch: for dep in dep_graph.get(n, []): in_degree[dep] -= 1 if in_degree[dep] == 0: queue.append(dep) return batches ``` 同一个配方在上例中,`upload_main_img`、`upload_detail_imgs` 和 `fill_price_step` 之间没有依赖,会被安排在同一批次并行执行,缩短总时间。 --- ## 五、模块注册表与版本管理 随着模块增加,需要一个注册表来管理所有可用模块及其版本。 ```python class ModuleRegistry: def __init__(self, db): self.db = db async def register_module(self, name: str, version: str, flow_file_path: str, input_schema: dict, output_schema: dict, platform: str): await self.db.execute( """INSERT INTO flow_modules (name, version, flow_path, input_schema, output_schema, platform, created_at) VALUES ($1, $2, $3, $4, $5, $6, NOW())""", name, version, flow_file_path, json.dumps(input_schema), json.dumps(output_schema), platform ) async def resolve_module(self, module_name: str, platform: str, version: str = None) -> dict: if version: row = await self.db.fetchrow( "SELECT * FROM flow_modules WHERE name=$1 AND platform=$2 AND version=$3", module_name, platform, version ) else: row = await self.db.fetchrow( "SELECT * FROM flow_modules WHERE name=$1 AND platform=$2 ORDER BY created_at DESC LIMIT 1", module_name, platform ) if not row: raise ModuleNotFoundError(f"Module {module_name} for {platform} not found") return dict(row) ``` 配方中引用的模块,由注册表找到对应的影刀流程文件路径,Worker拉取后执行。 --- ## 六、模块的独立测试与Mock 每个子流程模块都可以单独测试。 我们在沙箱测试环境中,可以只运行一个模块,传入模拟的输入参数,验证输出是否符合预期。 对于依赖外部服务的模块(如“上传图片”需要图片服务可用),我们提供Mock模块来替换。 Mock模块有相同的接口(输入输出Schema),但用固定返回数据替代真实操作。 ```python class MockModule: def __init__(self, name, output): self.name = name self.output = output async def execute(self, params): return {"success": True, "mock": True, "data": self.output} ``` 在测试配方中,可以通过配置将特定模块替换为Mock,实现低成本的功能验证。 --- ## 七、模块复用的收益 模块化推行后,我们看到了几个直接的收益: - **新流程开发时间**:从平均2天降到半天。很多新业务流程只需编写一个新的配方,复用现成的子流程模块。 - - **缺陷修复扩散**:一个“登录”模块的bug,修复后所有引用它的流程都自动修复,不再需要逐个改。 - - **跨平台复用**:虽然拼多多和TEMU的页面结构不同,但“商品图片压缩”、“文本敏感词过滤”这类通用模块可以直接复用,平台无关。 **真正的问题,不在于“能不能用”,而在于“改起来疼不疼”。** --- ## 八、与影刀子流程的配合 影刀RPA本身支持子流程调用,我们利用这个能力,将每个模块实现为一个独立的子流程。 主流程(由配方生成)其实是一个很薄的壳,里面只有“调用子流程A”、“调用子流程B”的控制逻辑,以及对返回结果的判断。 这样,模块化不仅停留在Python调度层,也贯彻到了影刀流程文件本身的结构上。 --- ## 九、踩坑与优化 **参数传递的序列化开销。** 模块之间通过JSON文件传递参数,早期每个模块执行完都写一次磁盘,影响性能。后来改为在Worker内存中维护上下文,仅将必要数据序列化,性能提升明显。 **模块契约管理。** 模块的输入输出Schema如果没有严格执行,会出现“改了输出格式导致下游模块报错”的问题。我们用JSON Schema校验每个模块的输入和输出,在测试阶段就能发现契约不一致。 **配方配置的校验。** 运营在配置配方时,可能引用不存在的模块或者写错参数名。我们在配方保存时做静态校验,确认所有引用的模块在注册表中存在,且参数名与模块Input Schema匹配。 --- ## 十、写在最后 自动化流程的模块化,不是简单的“把大文件拆成小文件”。 它是用工程化的手段——依赖注入、注册表、版本管理、契约校验——将自动化脚本变成可装配、可复用、可测试的软件组件。 当流程不再是一个几千步的巨型脚本,而是一个优雅的配方和一组精巧的积木时,整个团队的开发效率和信心都上了一个台阶。 > 模块化之后的自动化系统,不再怕改需求。 > > 因为你知道,改一个模块,二十个流程自动跟着变好,而不是悄悄坏掉。 --- *作者:林焱*

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询