No results
Table of Contents
API 参考
Event
Event(
type: str, # 事件类型(大写惯例)
source: str = "", # 来源
data: dict = None, # 附加数据
)
Agent
Agent(
agent_id: str,
sm_def: dict,
actions: dict[str, Callable],
llm_provider=None,
mcp_manager=None,
initial_context=None,
)
await agent.start(system)
await agent.stop()
await agent.send(event)
动作函数的 context 字典:
| 字段 | 说明 |
|---|---|
context["agent_id"] |
Agent 标识 |
context["_system"] |
MultiAgentSystem 引用 |
context["_state"] |
当前状态名 |
context["current_event"] |
触发事件 |
context["llm"] |
LLMProvider 实例 |
context["mcp"] |
MCPManager 实例 |
MultiAgentSystem
system = MultiAgentSystem()
system.register_agent(agent)
system.unregister_agent(agent_id)
await system.start_all()
await system.stop_all()
await system.dispatch_event(agent_id, event) # 点对点
await system.broadcast_event(event) # 广播
await system.send_to_group(group_ids, event) # 组播
AgentPersistence
from bixiweave.persistence import AgentPersistence
ap = AgentPersistence()
ap.save_snapshot("agent_1", state="negotiating", context={...}, pending_events=[...])
snap = ap.load_snapshot("agent_1")
# snap.state, snap.context, snap.pending_events
ap.restore_context(snap, target_context)
events = ap.restore_events(snap)
ap.delete_snapshot("agent_1")
ap.list_snapshots()