feat: pipeline 编排层大重构 + AGPL-3.0 许可 #9

Merged
orion merged 6 commits from feat/pipeline-orchestration into main 2026-06-12 11:52:46 +08:00
Owner

变更内容

6 个 commit 合入 main:

  1. 事件驱动调度器 + StepRun 可观测性
  2. judge 条件表达式(>/</>=/<=/!=/==/=~)
  3. 错误处理策略(retry/skip/fallback/fail)
  4. 调度策略模式(polling/event_driven/hybrid)
  5. 扩展原语注册机制 + call 子流程扩展
  6. AGPL-3.0 开源协议 + README 更新
## 变更内容 6 个 commit 合入 main: 1. 事件驱动调度器 + StepRun 可观测性 2. judge 条件表达式(>/</>=/<=/!=/==/=~) 3. 错误处理策略(retry/skip/fallback/fail) 4. 调度策略模式(polling/event_driven/hybrid) 5. 扩展原语注册机制 + call 子流程扩展 6. AGPL-3.0 开源协议 + README 更新
事件驱动:
- _wait_for_state 从 0.1s 轮询改为 asyncio.Event 驱动
- 引入 _dispatch_and_notify,dispatch 事件后触发 state_event
- 0.5s 兜底超时保证无事件时主动检查

StepRun 可观测性:
- 新增 StepRun dataclass(step_index, action, agent, 时间戳, 快照, 错误)
- PipelineRun 增加 step_runs 列表记录每步详情
- 每步自动记录 input_snapshot / output_snapshot / start_time / end_time
新增 _parse_literal / _match_condition / _compare_num 三个模块级辅助函数。
_parse_literal 支持数字、引号字符串、None/True/False、正则模式。
_execute_judge 自动识别旧版纯字段名 vs 新版运算符表达式。
正则表达式语法:'field =~ /pattern/i'(i 表示忽略大小写)。
提取 _run_sub_pipeline 消除 judge/foreach/parallel 的重复代码。
PipelineStep 新增 on_error 字段用于声明式配置错误处理策略:
  - strategy="skip":静默跳过错误步骤
  - strategy="retry":重试 + 指数退避(max_retries, backoff)
  - strategy="fallback":重试失败后执行 fallback_steps
  - strategy="fail"(默认):抛出异常,终止 Pipeline

提取 _execute_single_step 方法(纯净执行,不带错误处理)。
提取 _execute_step_with_error_handling 方法(带策略包装)。
run() 中的 try 块改为调用 _execute_step_with_error_handling。
新增 bixiweave/scheduler/ 子包,提供三种可切换的调度策略:

  - PollingStrategy:纯轮询(interval=0.05s),零侵入,适合测试
  - EventDrivenStrategy:纯事件驱动,dispatch 后 on_dispatch() 唤醒,最低延迟
  - HybridStrategy:事件驱动 + 兜底轮询(fallback_interval=2.0s),推荐默认

设计要点:
  - SchedulerStrategy ABC 定义 wait_for_state / on_dispatch / cleanup 接口
  - create_scheduler 工厂函数,支持通过名称+kwargs 实例化
  - Pipeline 通过 ctx._scheduler / ctx._scheduler_config 选择策略(默认 hybrid)
  - 不入侵 system.py,策略通过 Pipeline._dispatch_and_notify 接收信号
  - 测试统一使用 polling 策略确保确定性行为
新增 bixiweave/pipeline_ext.py:
  - StepExtension ABC:can_handle + execute 接口
  - ExtensionRegistry:注册/注销/按步骤匹配/动态加载
  - 支持 DSL use 声明 + 点分路径导入

新增 bixiweave/ext/ 官方扩展包:
  - ext/call.py:子 Pipeline 调用,通过 register_pipeline() 注册可复用定义
  - ext/__init__.py:扩展包入口

Pipeline._execute_single_step 末尾 fallthrough 到扩展原语。
内建 6 个核心 + 5 个编排原语不受影响。
orion merged commit 0d9e2cf6a9 into main 2026-06-12 11:52:46 +08:00
Sign in to join this conversation.
No reviewers
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
bixiu/bixiweave!9
No description provided.