【与我学 ClaudeCode】规划与协调篇 之 Task System :持久化任务图与多 Agent 协作骨架
2026/5/22 16:37:23 网站建设 项目流程

作者:逆境不可逃

技术永无止境

希望我的内容可以帮助到你!!!!!


大家吼 ! 我是 逆境不可逃 今天给大家带来文章《【与我学 ClaudeCode】规划与协调篇 之 Task System :持久化任务图与多 Agent 协作骨架》.

Learn-Claude-Code 官方地址 :

https://github.com/shareAI-lab/learn-claude-code

上一篇文章:

【与我学 ClaudeCode】记忆管理篇 之 Context Compact :三层压缩实现「无限会话」-CSDN博客

Task System 是迭代的第 7 个版本(s07),核心解决早期 Todo 模型的局限性:它将内存中的扁平清单升级为持久化到磁盘的任务依赖图(DAG),为多 Agent 协作、长时任务提供了可靠的协调骨架,让任务的生命周期超越单次对话和上下文压缩。

学习路线:s01 > s02 > s03 >s04> s05 > s06| s07> s08 > s09 > s10 > s11 > s12


一、问题根源:为什么早期 Todo 模型撑不起复杂任务?

s03 的 TodoManager 是内存中的扁平清单,存在三个致命缺陷:

  1. 无依赖关系:只有 “做完 / 没做完” 两种状态,无法表达 “任务 B 必须等任务 A 完成才能开始” 的顺序约束,也无法识别可并行执行的任务
  2. 状态易丢失:仅保存在内存中,一旦进程崩溃、上下文压缩或会话重置,任务状态就会永久丢失
  3. 无法多 Agent 协作:没有持久化的共享状态,多个 Agent 无法协同推进任务,也无法感知彼此的工作进度

二、四大核心设计决策

Task System 通过四个关键设计,构建了一个零依赖、高可靠、可扩展的任务协调框架。

1. 任务存储为 JSON 文件,而非内存

核心设计:任务以 JSON 文件形式持久化在.tasks/目录中,而非保存在内存里。这带来三个关键好处:

  1. 进程崩溃不丢失:Agent 在任务中途崩溃,重启后任务板仍在磁盘上,可继续推进
  2. 多 Agent 协调:多个 Agent 读写同一任务目录,无需共享内存即可实现多代理协调,文件系统就是共享数据库
  3. 可调试可干预:人类可以查看和手动编辑任务文件来调试,直接修改状态或依赖关系

替代方案的致命缺陷

内存存储(如 s03 的 TodoWrite)实现更简单更快,但崩溃会丢失状态,也无法跨多个代理进程工作;使用 SQLite、Redis 等数据库能提供更好的并发性,但会增加依赖和运维复杂度。文件是零依赖、可在任何地方工作的持久化层。

2. 任务具有blocks/blockedBy依赖字段

核心设计:每个任务可以声明它阻塞哪些下游任务blocks),以及它被哪些上游任务阻塞blockedBy)。Agent 永远不会开始有未解决blockedBy依赖的任务。这对多代理协调至关重要:当 Agent A 在编写数据库 Schema、Agent B 需要写查询时,Agent B 的任务会被 Agent A 的任务阻塞,避免针对不存在的 Schema 工作。

替代方案的致命缺陷

简单的优先级排序(高 / 中 / 低)无法表达 “任务 B 必须等任务 A 完成才能开始” 的硬依赖;中心化协调器按顺序分配任务会创建单点故障和瓶颈。声明式依赖让每个 Agent 通过读取任务文件就能独立判断自己能做什么。

3. Task 为课程主线,Todo 仍有适用场景

核心设计:TaskManager 延续了 Todo 的心智模型,并在 s07 之后成为默认主线。两者都管理带状态的任务项,但 TaskManager 增加了:

  • 文件持久化(崩溃后可恢复)
  • 依赖追踪(blocks/blockedBy
  • owner字段与多进程协作能力

Todo 仍适合短、线性、一次性的轻量级跟踪,无需复杂依赖和持久化。

替代方案的致命缺陷

只用 Todo 能保持模型极简,但不适合长期运行或协作工作;到处都用 Task 能最大化一致性,但对一次性小任务来说会显得笨重。分层设计兼顾了不同场景的效率。

4. 持久化仍需要写入纪律

核心设计:文件持久化能降低上下文丢失,但不会自动消除并发写入风险。写任务状态前,应先重读 JSON 文件、校验status/blockedBy是否符合预期,再原子写回,避免不同 Agent 悄悄覆盖彼此状态。

替代方案的致命缺陷

盲目覆盖写入实现更简单,但在并行执行下会破坏协调状态;带乐观锁的数据库能提供更强的安全性,但课程选择基于文件的状态以保持零依赖教学。


三、系统整体架构与工作原理

1. 核心架构:持久化任务依赖图(DAG)

.tasks/ task_1.json {"id":1, "subject":"Setup DB", "status":"completed"} task_2.json {"id":2, "subject":"API routes", "blockedBy":[1], "status":"pending"} task_3.json {"id":3, "subject":"Auth module", "blockedBy":[1], "status":"pending"} task_4.json {"id":4, "subject":"Integration", "blockedBy":[2,3], "status":"pending"} 任务图 (DAG): +----------+ +--> | task 2 | --+ | | pending | | +----------+ +----------+ +--> +----------+ | task 1 | | task 4 | | completed| --> +----------+ +--> | blocked | +----------+ | task 3 | --+ +----------+ | pending | +----------+ 顺序: task 1 必须先完成, 才能开始 2 和 3 并行: task 2 和 3 可以同时执行 依赖: task 4 要等 2 和 3 都完成 状态: pending -> in_progress -> completed

这个任务图回答了三个关键问题:

  • 什么可以做?:状态为pendingblockedBy为空的任务
  • 什么被卡住?:等待前置任务完成的任务(blockedBy不为空)
  • 什么做完了?:状态为completed的任务,完成时自动解锁后续任务

2. 关键组件与实现细节

(1) TaskManager:任务 CRUD 与依赖图核心
class TaskManager: def __init__(self, tasks_dir: Path): self.dir = tasks_dir self.dir.mkdir(exist_ok=True) self._next_id = self._max_id() + 1 # 自动生成下一个任务ID def _max_id(self) -> int: """获取当前最大任务ID,用于生成新任务ID""" ids = [int(f.stem.split("_")[1]) for f in self.dir.glob("task_*.json")] return max(ids) if ids else 0 def _load(self, task_id: int) -> dict: """从磁盘加载任务文件""" path = self.dir / f"task_{task_id}.json" if not path.exists(): raise ValueError(f"Task {task_id} not found") return json.loads(path.read_text()) def _save(self, task: dict): """将任务写入磁盘(原子写入可扩展)""" path = self.dir / f"task_{task['id']}.json" path.write_text(json.dumps(task, indent=2, ensure_ascii=False)) def create(self, subject: str, description: str = "") -> str: """创建新任务,初始状态为pending,无依赖""" task = { "id": self._next_id, "subject": subject, "description": description, "status": "pending", "blockedBy": [], "owner": "", } self._save(task) self._next_id += 1 return json.dumps(task, indent=2, ensure_ascii=False) def update(self, task_id: int, status: str = None, add_blocked_by: list = None, remove_blocked_by: list = None) -> str: """更新任务状态或依赖关系,完成任务时自动解锁下游""" task = self._load(task_id) if status: if status not in ("pending", "in_progress", "completed"): raise ValueError(f"Invalid status: {status}") task["status"] = status # 任务完成时,自动从所有下游任务的blockedBy中移除自身ID if status == "completed": self._clear_dependency(task_id) # 添加/移除依赖 if add_blocked_by: task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by)) if remove_blocked_by: task["blockedBy"] = [x for x in task["blockedBy"] if x not in remove_blocked_by] self._save(task) return json.dumps(task, indent=2, ensure_ascii=False) def _clear_dependency(self, completed_id: int): """解除已完成任务对其他任务的阻塞""" for f in self.dir.glob("task_*.json"): task = json.loads(f.read_text()) if completed_id in task.get("blockedBy", []): task["blockedBy"].remove(completed_id) self._save(task) def list_all(self) -> str: """列出所有任务,格式化显示状态和依赖""" tasks = [] files = sorted(self.dir.glob("task_*.json"), key=lambda f: int(f.stem.split("_")[1])) for f in files: tasks.append(json.loads(f.read_text())) if not tasks: return "No tasks." lines = [] for t in tasks: marker = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(t["status"], "[?]") blocked = f" (blocked by: {t['blockedBy']})" if t.get("blockedBy") else "" lines.append(f"{marker} #{t['id']}: {t['subject']}{blocked}") return "\n".join(lines)
(2) 工具注册:四个任务工具
TOOL_HANDLERS = { # 基础工具(bash、read_file等) "bash": lambda **kw: run_bash(kw["command"]), "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")), "write_file": lambda **kw: run_write(kw["path"], kw["content"]), "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]), # 任务系统工具 "task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")), "task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status"), kw.get("addBlockedBy"), kw.get("removeBlockedBy")), "task_list": lambda **kw: TASKS.list_all(), "task_get": lambda **kw: TASKS.get(kw["task_id"]), } TOOLS = [ # 其他工具(略) {"name": "task_create", "description": "Create a new task.", "input_schema": {"type": "object", "properties": {"subject": {"type": "string"}, "description": {"type": "string"}}, "required": ["subject"]}}, {"name": "task_update", "description": "Update a task's status or dependencies.", "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}, "status": {"type": "string", "enum": ["pending", "in_progress", "completed"]}, "addBlockedBy": {"type": "array", "items": {"type": "integer"}}, "removeBlockedBy": {"type": "array", "items": {"type": "integer"}}}, "required": ["task_id"]}}, {"name": "task_list", "description": "List all tasks with status summary.", "input_schema": {"type": "object", "properties": {}}}, {"name": "task_get", "description": "Get full details of a task by ID.", "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"]}}, ]
(3) Agent 主循环:工具调用处理
def agent_loop(messages: list): while True: response = client.messages.create( model=MODEL, system=SYSTEM, messages=messages, tools=TOOLS, max_tokens=8000, ) messages.append({"role": "assistant", "content": response.content}) if response.stop_reason != "tool_use": return results = [] for block in response.content: if block.type == "tool_use": handler = TOOL_HANDLERS.get(block.name) try: output = handler(**block.input) if handler else f"Unknown tool: {block.name}" except Exception as e: output = f"Error: {e}" print(f"> {block.name}:") print(str(output)[:200]) results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)}) messages.append({"role": "user", "content": results})
(4)执行流程


四、与 Context Compact(s06)的关键变更对比

组件之前(s06 Context Compact)之后(s07 Task System)
工具集5 个工具(基础 +compact8 个工具(基础 +task_create/update/list/get
规划模型无(仅上下文压缩)带依赖关系的任务图(DAG)
任务关系blockedBy依赖边,支持顺序 / 并行约束
状态追踪无(对话级状态)pending -> in_progress -> completed三状态
持久化对话压缩后丢失任务状态保存在磁盘,压缩和重启后存活
核心优化上下文压缩与归档任务持久化、依赖管理与多 Agent 协作

五、核心优势与创新点

  1. 超越对话的持久化状态:任务状态保存在磁盘,不受上下文压缩、会话重置、进程崩溃影响,支持长时任务
  2. 声明式依赖与自动解锁:通过blockedBy字段表达任务依赖,完成任务时自动解除下游阻塞,Agent 无需手动管理依赖关系
  3. 天然支持多 Agent 协作:多个 Agent 读写同一任务目录,通过文件系统实现无锁协调,无需中心化调度器
  4. 可调试可干预:人类可直接查看、编辑任务文件,手动调整状态或依赖关系,调试成本极低
  5. 零依赖实现:基于文件系统,无需数据库或额外服务,可在任何环境中运行,符合课程教学的零依赖原则

六、运行示例

假设 Agent 正在开发一个 Web 应用,任务系统的工作流程如下:

  1. Agent 调用task_create创建初始任务:
    • #1 Setup DB schema(状态:pending)
    • #2 Build API routes(状态:pending,blockedBy: [1])
    • #3 Implement auth module(状态:pending,blockedBy: [1])
    • #4 Integrate routes with auth(状态:pending,blockedBy: [2,3])
  2. Agent 调用task_update#1状态改为in_progress,开始执行数据库 Schema 设计
  3. 完成后,将#1状态改为completed,TaskManager 自动将#2#3blockedBy中的1移除,解锁这两个任务
  4. Agent(或多个 Agent)可以同时执行#2#3,无需等待彼此
  5. #2#3都完成后,它们的completed事件会自动解除#4的阻塞,Agent 开始执行集成任务
  6. 即使中间会话被压缩或进程重启,所有任务状态仍保存在.tasks/目录中,重启后 Agent 调用task_list即可恢复进度

七、可扩展方向

  • 原子写入与并发控制:实现文件写入的原子操作(如先写临时文件再重命名),避免多 Agent 并发写入导致的状态损坏
  • 任务优先级与分配:为任务添加priority字段和owner字段,支持 Agent 认领任务和优先级调度
  • 依赖可视化:基于 JSON 文件生成任务依赖图(如 Mermaid 或 DOT 格式),方便人类直观查看任务关系
  • 任务超时与重试:为任务添加timeoutretries字段,自动标记超时任务并支持重试
  • 跨 Agent 通信:基于任务文件添加简单的消息字段,实现 Agent 之间的异步通信(如任务备注、问题反馈)

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询