Skip to content

第 10 章 · 编排 vs 自决:5 管线替代 12 任务

pixiu 锚点:agent/scheduler.py(5 管线)、scheduler/pipeline.py(Pipeline/Step/StepResult)、agent/core.py(自决循环对比) 关键案例:5 管线重构替代 12 任务、R2-1(熔断的跨层协作)

开篇:agent 不只是对话

很多人对 agent 的理解,停留在"一问一答"的对话。但生产级 agent 通常有两种工作模式

  • 定时模式——每天早上 8 点生成晨报、每天收盘后刷新数据、每周六做维护。这些是固定流程,到点就干,不需要用户在场。
  • 对话模式——用户随时来问"海康怎么样",agent 根据问题决定调什么工具。这是动态流程

pixiu 两种都有,而且它们共享同一套工具(第 3、4 章那 32 个)。这一章讲这两种模式怎么分工——本质是第 6 章"循环"那件事的另一面:哪些流程该编排(写死),哪些该让 agent 自决(自己判断)。

定时任务:从 12 个到 5 条管线

pixiu 最早的定时任务是散的——十几个独立的 cron 任务,各管各的,互相之间没有编排,出了问题很难排查,状态也没记录。

后来我做了一次重构,把 12 个独立任务收敛成 5 条管线(pipeline):

管线时间干什么
morning_report交易日 08:00晨报(要闻+估值+信号+持仓)
post_close交易日 16:05数据刷新→行业聚合→探索扫描
close_summary交易日 16:30收盘总结
weekly_maintenance周六 09:00日历+完整性+财务+分红+标的池
periodic_report周六 11:00 / 月初周报/月报

这是一次典型的复杂度简化——不是功能变少了,是把零散的任务组织成了有语义的管线。每条管线是一串有序的步骤,干一件完整的事。

pipeline 抽象:把"流程"变成一等公民

重构的核心,是抽出了 scheduler/pipeline.py 里的 Pipeline / Step / StepResult 三件套。一条管线就是一组有序的 Step,每个 Step 包装一个 service 函数:

python
@dataclass
class Step:
    name: str
    func: Callable[..., StepResult | None]
    description: str = ""
    market_write: bool = False   # 是否写中央市场库(共享模式下 consumer 自动剔除)

这个抽象有几个设计值得讲:

一是执行历史落 DB。 每跑一步,都在数据库里记一条 TaskStep(状态 success/failed/skipped + rows_affected + error_message),整条管线记一条 TaskExecution(success/failed/partial)。这意味着定时任务完全可观测——出了问题,打开任务历史就能看到第几步挂了、为什么挂。这不是可选的锦上添花,是定时任务的基本卫生。

二是单步失败不阻断后续。 看 pipeline 执行循环里的注释:

python
except Exception as e:
    # 记录失败
    failed_count += 1
    # 单步失败不阻断后续步骤

这是有意识的取舍——管线里某一步(比如分红数据刷新)失败,不该让整条管线(晨报生成)也跟着失败。失败的那步记下来,后面的步骤继续跑,最后整体状态标 partial编排要韧,不要脆弱。

三是 market_write 配合双引擎权限。 这个标志位标记"该步骤是否写中央市场库"。在共享模式下(第 13 章讲的双引擎),consumer 角色没有写权限,market_write=True 的步骤会被自动剔除。一个标志位把"业务流程"和"数据库权限"优雅地耦合在一起。

编排 vs 自决:到底用哪个

把 pipeline 和第 6 章的 Agent Loop 放一起,你就看清了 pixiu 的分工:

定时管线(编排)对话循环(自决)
流程步骤写死,串行模型自己决定下一步
适合"知道该干啥"的固定流程"需要判断"的动态问题
例子每日晨报、收盘数据刷新"海康能不能买""布林带怎么调"
工具调 service 函数调那 32 个 agent 工具
速度快、稳、可复现灵活,但慢、有飘动

核心原则:能编排的编排(快、稳),需要判断的才自决。

这是第 6 章那句"保持循环简单,复杂度下沉"的延伸——复杂度的去向有两个:要么下沉到工具(让自决循环更简单),要么干脆编排掉(根本不进循环)。晨报这种每天固定干的事,编排出 pipeline 远比让 agent 每天自己"悟"一遍要靠谱。

但 pixiu 的精妙在于——编排里也藏着自决。 morning_report 管线里,有一步是"调 agent 生成晨报内容"。数据准备是编排(确定),但"把数据组织成一段有判断的晨报文字"是自决(调 LLM)。一条管线里,确定的步骤编排,需要生成的步骤调 agent。 这是编排和自决的混合体,也是生产级 agent 的真实形态——不是非此即彼。

编排也要韧性(R2-1 的跨层协作)

编排的韧性,除了"单步失败不阻断",还有一个跨层的例子,标注 R2-1

第 12 章会讲 pixiu 的 LLM 熔断器(CircuitOpenError)——LLM 连续失败时,client 层会熔断,拒绝调用。但熔断后,上层(scheduler/pipeline)该怎么处理这个异常?

如果当普通异常处理,pipeline 那步会标 failed,可能触发告警、重试,反而违背了熔断的初衷。pixiu 的做法是上层专门识别这个异常

python
from pixiu.agent.llm import CircuitOpenError  # R2-1:上层识别熔断,优雅跳过报告

# 在 morning/close 报告步骤里
try:
    ...
except CircuitOpenError:
    # 标记为 skipped(不视为失败)+ 清晰日志,不刷错误
    return StepResult(...)  # skipped

熔断是 client 层的决策,但"熔断了该怎么办"是上层(编排)的策略。这两层必须协作——下层抛特定异常,上层识别并优雅降级。否则熔断器装了也白装,上层还是把它当普通故障瞎处理。

这是一个重要的工程认知:韧性不是某一层的事,是跨层的契约。 熔断器(R2)定义"什么时候停",上层编排(R2-1)定义"停了之后怎么降级",两者缺一不可。

这一章的工具:编排 vs 自决自检

  • [ ] 你的 agent 有"定时/固定流程"的需求吗?如果有,是编排成 pipeline,还是也让 agent 自决?
  • [ ] 你的定时任务有执行历史落盘吗?(出问题能排查吗)
  • [ ] 单步失败会不会把整条流程搞崩?(该不该"不阻断")
  • [ ] 你的编排和自决,是不是共享同一套底层能力?(还是各写一遍)
  • [ ] 你有没有"编排里嵌自决"的混合流程?(很多真实场景都需要)

小结

agent 工程不只是"设计一个聪明的对话循环"。生产级 agent 是编排和自决的混合体——固定的流程用 pipeline 编排(快、稳、可观测),需要判断的才交给自决循环,两者共享同一套工具。

pixiu 用 5 条管线替代 12 个散任务,把"流程"提升成了一等公民(Pipeline/Step/StepResult + DB 历史),并在编排层实现了韧性(单步不阻断 + 熔断跨层协作)。

下一章讲这套系统的"记忆"层——agent 怎么记住用户说过的话、做过的事。

下一章

第 11 章 · 记忆与会话连续性 —— 会话是流水,记忆是蓄水池。