1 API参考
shaotao edited this page 2026-06-12 09:09:29 +08:00

API 参考

Event

Event(
    type: str,          # 事件类型(大写惯例)
    source: str = "",   # 来源
    data: dict = None,  # 附加数据
)

Agent

Agent(
    agent_id: str,
    sm_def: dict,
    actions: dict[str, Callable],
    llm_provider=None,
    mcp_manager=None,
    initial_context=None,
)

await agent.start(system)
await agent.stop()
await agent.send(event)

动作函数的 context 字典:

字段 说明
context["agent_id"] Agent 标识
context["_system"] MultiAgentSystem 引用
context["_state"] 当前状态名
context["current_event"] 触发事件
context["llm"] LLMProvider 实例
context["mcp"] MCPManager 实例

MultiAgentSystem

system = MultiAgentSystem()
system.register_agent(agent)
system.unregister_agent(agent_id)
await system.start_all()
await system.stop_all()
await system.dispatch_event(agent_id, event)      # 点对点
await system.broadcast_event(event)               # 广播
await system.send_to_group(group_ids, event)      # 组播

AgentPersistence

from bixiweave.persistence import AgentPersistence

ap = AgentPersistence()
ap.save_snapshot("agent_1", state="negotiating", context={...}, pending_events=[...])

snap = ap.load_snapshot("agent_1")
# snap.state, snap.context, snap.pending_events

ap.restore_context(snap, target_context)
events = ap.restore_events(snap)
ap.delete_snapshot("agent_1")
ap.list_snapshots()