Pipeline编排层原语
shaotao edited this page 2026-06-12 09:09:29 +08:00
This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

Pipeline 编排层原语

Pipeline 支持两层原语。编排层原语(推荐)通过 AgentCapability 契约驱动 Agent隐藏状态机细节。

run — 按角色执行 Agent 任务

{"action": "run", "agent": "reviewer", "role": "REVIEWER"}

自动完成:发 start_event → 等待 done_state → 取 output_key 写入 context。

支持输入映射:

{"action": "run", "agent": "tester", "role": "TESTER",
 "input": "$context.code_to_test"}

parallel — 并行分支

{"action": "parallel", "branches": [
    {"steps": [{"action": "run", "agent": "reviewer_a", "role": "REVIEWER"}]},
    {"steps": [{"action": "run", "agent": "reviewer_b", "role": "REVIEWER"}]}
]}

每个分支有独立 context 副本,执行完合并回主 context。

judge — 条件分支

{"action": "judge", "condition": "decision",
 "cases": [
     {"value": "approve", "steps": [{"action": "run", "agent": "writer", "role": "WRITER"}]},
     {"value": "reject",  "steps": [{"action": "notify", "data": "已拒绝"}]}
 ],
 "default": [{"action": "notify", "data": "未决定"}]}

transform — 数据映射

{"action": "transform", "from_key": "write_result", "to_key": "code_to_review"}

foreach — 列表迭代

{"action": "foreach", "items_key": "files", "item_key": "file",
 "sub_steps": [
     {"action": "run", "agent": "reviewer", "role": "REVIEWER",
      "input": "$context.file"}
 ]}

底层原语(向后兼容)

步骤类型:start / wait / send / sleep / notify

from bixiweave.pipeline import Pipeline

pipe = Pipeline({
    "name": "客服流程",
    "steps": [
        {"action": "start", "agent": "a", "event": "GO"},
        {"action": "wait",  "agent": "a", "state": "done"},
        {"action": "send",  "from": "a", "to": "b", "event": "PROCESS"},
        {"action": "wait",  "agent": "b", "state": "done"},
    ],
    "timeout": 60,
})

run = await pipe.run(system=system)