Run agent with agent_run (Streaming)
agent_run
provides a concise and thread-friendly way to run an agent while exposing real-time streaming output via MessageObserver
. It is ideal for server-side or frontend event stream rendering, as well as MCP tool integration scenarios.
Quick Start
import json
import asyncio
import logging
from threading import Event
from nexent.core.agents.run_agent import agent_run
from nexent.core.agents.agent_model import (
AgentRunInfo,
AgentConfig,
ModelConfig
)
from nexent.core.utils.observer import MessageObserver
async def main():
# 1) Create message observer (for receiving streaming messages)
observer = MessageObserver(lang="en")
# 2) External stop flag (useful to interrupt from UI)
stop_event = Event()
# 3) Configure model
model_config = ModelConfig(
cite_name="gpt-4", # Model alias (custom, referenced by AgentConfig)
api_key="<YOUR_API_KEY>",
model_name="Qwen/Qwen2.5-32B-Instruct",
url="https://api.siliconflow.cn/v1",
temperature=0.3,
top_p=0.9
)
# 4) Configure Agent
agent_config = AgentConfig(
name="example_agent",
description="An example agent that can execute Python code and search the web",
prompt_templates=None,
tools=[],
max_steps=5,
model_name="gpt-4", # Corresponds to model_config.cite_name
provide_run_summary=False,
managed_agents=[]
)
# 5) Assemble run info
agent_run_info = AgentRunInfo(
query="How many letter r are in strrawberry?", # Example question
model_config_list=[model_config],
observer=observer,
agent_config=agent_config,
mcp_host=None, # Optional: MCP service addresses
history=None, # Optional: chat history
stop_event=stop_event
)
# 6) Run with streaming and consume messages
async for message in agent_run(agent_run_info):
message_data = json.loads(message)
message_type = message_data.get("type", "unknown")
content = message_data.get("content", "")
print(f"[{message_type}] {content}")
# 7) Read final answer (if any)
final_answer = observer.get_final_answer()
if final_answer:
print(f"\nFinal Answer: {final_answer}")
if __name__ == "__main__":
logging.disable(logging.CRITICAL)
asyncio.run(main())
Tip: Store sensitive config such as api_key
in environment variables or a secrets manager, not in code.
Message Stream Format and Handling
Internally, agent_run
executes the agent in a background thread and continuously yields JSON strings from the MessageObserver
message buffer. You can parse these fields for categorized display or logging.
- Important fields
type
: message type (corresponds toProcessType
)content
: text contentagent_name
: optional, which agent produced this message
Common type
values (from ProcessType
):
AGENT_NEW_RUN
: new task startedSTEP_COUNT
: step updatesMODEL_OUTPUT_THINKING
/MODEL_OUTPUT_CODE
: model thinking/code snippetsPARSE
: code parsing resultsEXECUTION_LOGS
: Python execution logsFINAL_ANSWER
: final answerERROR
: error information
Configuration Reference
ModelConfig
cite_name
: model alias (referenced byAgentConfig.model_name
)api_key
: model service API keymodel_name
: model invocation nameurl
: base URL of the model servicetemperature
/top_p
: sampling params
AgentConfig
name
: agent namedescription
: agent descriptionprompt_templates
: optional, Jinja template dicttools
: tool configuration list (see ToolConfig)max_steps
: maximum stepsmodel_name
: model alias (corresponds toModelConfig.cite_name
)provide_run_summary
: whether sub-agents provide run summarymanaged_agents
: list of sub-agent configurations
Pass Chat History (optional)
You can pass historical messages via AgentRunInfo.history
, and Nexent will write them into internal memory:
from nexent.core.agents.agent_model import AgentHistory
history = [
AgentHistory(role="user", content="Hi"),
AgentHistory(role="assistant", content="Hello, how can I help you?"),
]
agent_run_info = AgentRunInfo(
# ... other fields omitted
history=history,
)
MCP Tool Integration (optional)
If you provide mcp_host
(list of MCP service addresses), Nexent will automatically pull remote tools through ToolCollection.from_mcp
and inject them into the agent:
agent_run_info = AgentRunInfo(
# ... other fields omitted
mcp_host=["http://localhost:3000"],
)
Friendly error messages (EN/ZH) will be produced if the connection fails.
Interrupt Execution
During execution, you can trigger interruption via stop_event.set()
:
stop_event.set() # The agent will gracefully stop after the current step completes
Relation to CoreAgent
agent_run
is a wrapper overNexentAgent
andCoreAgent
, responsible for:- Constructing
CoreAgent
(including models and tools) - Injecting history into memory
- Driving streaming execution and forwarding buffered messages from
MessageObserver
- Constructing
- You can also directly use
CoreAgent.run(stream=True)
to handle streaming yourself (seecore/agents.md
);agent_run
provides a more convenient threaded and JSON-message oriented interface.