Part 2 of Porting pi-mono's agent loop to Python - The Dual While Loop Engine
Intro
This is part 2 of a series where I port pi-mono's agent loop from TypeScript to Python as a learning exercise. In Part 1 we walked through Async and Event Streams i.e. EventStream. This was the async pipe that connects producers to consumers.
When you call agent.prompt("do something"), a loop takes over. It calls the LLM, streams the response token by token, executes tools, checks for user interruptions, and repeats until the LLM
is done.
In this post we will work with the loop layer directly. We will interact with agent_loop() and agent_loop_continue() instead of going through the Agent class. You wouldn't do this in practice. The Agent is the public facing API. The Agent class (Part 3 blog post to come) adds state management and queues on top. But the interesting mechanics all live here. This is an educational post to see what is happening under the hood.
We will walkthrough the functionality of liteagent.loop.
We'll call the loop directly, watch events stream, see tool execution, and understand each moving part.
Here are some topics we'll cover:
Why does convert_to_llm exist? (internal format vs what litellm accepts)
Simplest possible call — one LLM round, no tools
Adding a tool — echo
Error handling — tools that throw
Pydantic validation + type coercion
Multi-turn context — how messages accumulate
The on_update callback — streaming from tools
Multiple tool calls
Usage tracking
agent_loop_continue — resuming from manually-built context
Testing with different models
Steering — interrupting the loop mid-run
Follow-ups — the outer loop
Cancellation — signal
transform_context — modifying messages before each LLM call
reasoning_effort — thinking/reasoning
ToolResult.details — UI-only data
Multimodal tools + make_default_convert
Setup
The loop has two entry points (like pi's agentLoop and agentLoopContinue):
agent_loop(prompts, context, config, signal) — start a new run with prompt messages
agent_loop_continue(context, config, signal) — resume from existing context
Both return an EventStream immediately. The actual LLM call runs as an async task.
To use them, we need:
AgentContext — system prompt + messages + tools (the data the loop works on)
AgentConfig — model + convert_to_llm + hooks (the behavior of the loop)
I chose to use litellm as the LLM library.
But regardless of the library used, the loop needs to translate our messages into the provider's format.
litellm accepts OpenAI-format messages and translates them for each provider. So why do we need a convert_to_llm hook at all? Can't we just send our messages straight through?
Let's find out by looking at what the loop actually stores on messages.
As of writing, the messages are plain dicts, not typed dicts or dataclasses.
I might change this in the future.
importjsonasyncdefecho_fn(tool_call_id, params, signal=None, on_update=None):
return ToolResult(content=[{"type": "text", "text": params["message"]}])
echo_tool = Tool(
name="echo", description="Echo a message",
parameters={"type": "object", "properties": {"message": {"type": "string"}}, "required": ["message"]},
execute=echo_fn,
)
MODEL ="anthropic/claude-sonnet-4-6"
context = AgentContext(system_prompt="Use the echo tool.", messages=[], tools=[echo_tool])
config = AgentConfig(model=MODEL, convert_to_llm=make_default_convert(MODEL))
stream = agent_loop([{"role": "user", "content": "Echo 'hello'"}], context, config)
asyncfor event in stream:
if event["type"] =="agent_end":
for msg in event["messages"]:
print(f"\n=== {msg['role']} ===")
print(json.dumps(msg, indent=2, default=str))
So convert_to_llm exists for two reasons:
Reason 1: Strip our extras. The loop enriches messages with metadata the LLM can't see
(usage, stop_reason, timestamp, details, is_error, etc.). These need to be stripped
before sending to the provider.
Reason 2: Provider quirks. For example, OpenAI requires tool results as plain strings, not content
block arrays. When a tool returns images, OpenAI silently drops them from tool results —
so they must be hoisted into synthetic user messages. Anthropic/Gemini accept content blocks
with images natively. convert_to_llm handles this so the loop stays provider-agnostic.
The built-in default: make_default_convert
liteagent ships a default converter (liteagent.convert.make_default_convert) that handles
both of these. It uses a denylist approach — strips the known liteagent metadata fields
and passes everything else through (so new litellm fields like provider_specific_fields
work automatically). It also handles OpenAI image hoisting.
fromliteagentimport make_default_convert
convert = make_default_convert("anthropic/claude-sonnet-4-6") # returns a function
The Agent class uses this by default. You only need to pass convert_to_llm if you want
custom behavior (e.g., mapping app-specific message types like bashExecution → user messages).
Pi's coding agent does exactly this: overrides the converter to map custom message types
(bashExecution, branchSummary, etc.) into standard user/assistant/tool messages.
We will dive more into this in Part 3 blog post.
event["messages"]
# Demo: what the default converter does to our enriched messages
convert = make_default_convert("anthropic/claude-sonnet-4-6")
convert(event["messages"])
2. Simplest possible call — one LLM round, no tools
Let's use make_default_convert and make a simple call. The absolute minimum:
send a prompt, get a response, watch the events stream back.
# MODEL — change this to test different providers
MODEL ="anthropic/claude-sonnet-4-6"
context = AgentContext(
system_prompt="You are a helpful assistant. Be concise.",
messages=[],
tools=None,
)
config = AgentConfig(
model=MODEL,
convert_to_llm=make_default_convert(MODEL),
)
# agent_loop takes a list of messages to inject — any role works,# but in practice these are user messages (the new prompt).
prompt_messages = [{"role": "user", "content": "What is 2+2? Answer in several words."}]
stream = agent_loop(prompt_messages, context, config)
# Consume the stream — every event printed
events = []
asyncfor event in stream:
print(event)
What just happened — event sequence
For a simple no-tools call, the event sequence is:
agent_start — loop begins
turn_start — first LLM call
message_start — user prompt echoed
message_end — user prompt done
message_start — assistant starts streaming
message_update (x N) — text deltas arrive token by token
message_end — assistant message finalized
turn_end — turn complete (no tool results)
agent_end — loop done, messages returned
The stream's .result() gives you all new messages from this run.
# The final result — all **new** messages from this run
result =await stream.result()
result
3. Adding a tool — echo
Tools are Tool dataclasses with:
name, description, parameters — what the LLM sees (JSON Schema)
execute — what we call: async def(tool_call_id, params, signal, on_update) -> ToolResult
params_model (optional) — Pydantic BaseModel for validation + type coercion
When the LLM decides to call a tool, the loop:
Parses the JSON arguments
Validates with Pydantic (if params_model set)
Calls execute()
Wraps the result as a tool message
Sends it back to the LLM for the next turn
# Define the echo toolasyncdefecho_execute(tool_call_id, params, signal=None, on_update=None):
return ToolResult(content=[{"type": "text", "text": params["message"]}])
echo_tool = Tool(
name="echo",
description="Echo back the given message. Use this when asked to echo something.",
parameters={
"type": "object",
"properties": {
"message": {"type": "string", "description": "The message to echo back"}
},
"required": ["message"],
},
execute=echo_execute,
)
print(f"Tool defined: {echo_tool.name}")
print(f"Parameters schema: {json.dumps(echo_tool.parameters,indent=2)}")
context = AgentContext(
system_prompt="You are helpful. When asked to echo, use the echo tool.",
messages=[],
tools=[echo_tool],
)
config = AgentConfig(model=MODEL, convert_to_llm=make_default_convert(MODEL))
prompt = [{"role": "user", "content": "Echo the message: hello world"}]
stream = agent_loop(prompt, context, config)
events = []
asyncfor event in stream:
print(event)
message_update deltas during tool calls are for UI preview only — the loop waits
for message_end (built by stream_chunk_builder) before executing anything
turn_end bundles the assistant message + its tool results — one event, full picture of the turn
agent_end has ALL messages from the entire run across all turns
The loop checks tool_calls presence (not stop_reason) to decide whether to continue
# Look at the messages that were returned
result =await stream.result()
result
Another thing to notice is how every message has a message_start / message_end pair. For user and tool messages the content is identical in both since nothing is streamed (the args→result progression for tools is captured separately by tool_execution_start / tool_execution_end). For assistant messages the start is an empty skeleton and the end is the finalized version.
Also note that the event stream is hierarchical: the finalized message from message_end gets bundled again into turn_end (which gives a complete snapshot of that turn — the assistant message plus any tool results), and then again into agent_end (which carries the full conversation history across all turns). This means consumers can subscribe at whatever granularity they need — message_update for real-time streaming, turn_end for turn-level summaries, or agent_end for the final state.
4. Error handling — tools that throw
When a tool raises an exception, the loop does not stop. It:
Catches the exception
Wraps str(e) as a ToolResult with is_error=True
Sends it back to the LLM as a tool result
The LLM sees the error and can react (retry, try differently, or explain)
This is different from an LLM error (API failure), which stops the loop immediately
with stop_reason="error".
# A tool that always failsasyncdeffail_execute(tool_call_id, params, signal=None, on_update=None):
raiseException(params.get("reason", "Something went wrong"))
fail_tool = Tool(
name="risky_operation",
description="Attempts a risky operation that might fail. Use when asked to do something risky.",
parameters={
"type": "object",
"properties": {"reason": {"type": "string", "description": "What to attempt"}},
},
execute=fail_execute,
)
context = AgentContext(
system_prompt="You have a risky_operation tool. If it fails, explain what happened. Also have echo for simple tasks.",
messages=[],
tools=[fail_tool, echo_tool],
)
config = AgentConfig(model=MODEL, convert_to_llm=make_default_convert(MODEL))
prompt = [
{"role": "user", "content": "Try the risky operation with reason 'disk full'"}
]
stream = agent_loop(prompt, context, config)
asyncfor event in stream:
if event["type"] !="message_update":
print(event)
5. Pydantic validation + type coercion
LLMs sometimes send "42" (string) when the schema says int. Pydantic coerces this automatically.
If validation fails entirely, the error becomes a tool result with is_error=True.
Set params_model on a Tool to enable this.
frompydanticimport BaseModel
fromliteagent.loopimport _validate_tool_args
classAddParams(BaseModel):
a: int
b: intasyncdefadd_execute(tool_call_id, params, signal=None, on_update=None):
# params is already validated and coerced — {"a": int, "b": int}
result = params["a"] + params["b"]
return ToolResult(content=[{"type": "text", "text": str(result)}])
add_tool = Tool(
name="add",
description="Add two numbers together.",
parameters={
"type": "object",
"properties": {
"a": {"type": "integer", "description": "First number"},
"b": {"type": "integer", "description": "Second number"},
},
"required": ["a", "b"],
},
params_model=AddParams,
execute=add_execute,
)
# Test the validation directly (what the loop does internally)# Normal caseprint("Normal:", _validate_tool_args(add_tool, {"a": 3, "b": 5}))
# Coercion case — strings become intsprint("Coerced:", _validate_tool_args(add_tool, {"a": "3", "b": "5"}))
# Failure casetry:
_validate_tool_args(add_tool, {"a": "not_a_number", "b": 5})
exceptExceptionas e:
print(f"Validation error: {type(e).__name__}")
Live: validation error → LLM sees error → reacts
The add tool above requires integers. Let's make a stricter version that rejects
negative numbers via a Pydantic validator, then ask the LLM to use a negative number.
The validation error becomes a tool result with is_error=True — the LLM sees it and reacts.
frompydanticimport field_validator
classPositiveAddParams(BaseModel):
a: int
b: int@field_validator("a", "b")
@classmethoddefmust_be_positive(cls, v):
if v <0:
raiseValueError(f"must be positive, got {v}")
return v
asyncdefpositive_add_execute(tool_call_id, params, signal=None, on_update=None):
result = params["a"] + params["b"]
return ToolResult(content=[{"type": "text", "text": str(result)}])
positive_add_tool = Tool(
name="add",
description="Add two numbers",
parameters={
"type": "object",
"properties": {
"a": {"type": "integer", "description": "First number"},
"b": {"type": "integer", "description": "Second number"},
},
"required": ["a", "b"],
},
params_model=PositiveAddParams,
execute=positive_add_execute,
)
context = AgentContext(
system_prompt="You have an add tool. Use it when asked to add. If it fails, make numbers positive.",
messages=[],
tools=[positive_add_tool],
)
config = AgentConfig(model=MODEL, convert_to_llm=make_default_convert(MODEL))
# Ask it to add -3 + 5 — validation will reject -3
stream = agent_loop(
[{"role": "user", "content": "Add -3 and 5 using the add tool."}],
context,
config,
)
asyncfor event in stream:
# Skip streaming deltas (text + tool_call) to reduce noiseif event["type"] =="message_update":
continueprint(event)
6. Multi-turn context — how messages accumulate
The loop appends to context.messages as it runs. Each agent_loop call creates a
snapshot of the context, so the original isn't mutated. But within a run, the loop builds
up the full conversation:
The agent_end event and stream.result() return only the new messages, not the full history.
To continue a conversation, you pass the accumulated messages as the context for the next call.
# Turn 1: ask a question
all_messages = []
context = AgentContext(
system_prompt="You are helpful. Be concise (1-2 sentences max).",
messages=all_messages,
tools=None,
)
config = AgentConfig(model=MODEL, convert_to_llm=make_default_convert(MODEL))
prompt1 = [{"role": "user", "content": "My name is Alice. Remember that."}]
stream1 = agent_loop(prompt1, context, config)
asyncfor event in stream1:
if event["type"] =="message_update":
continueprint(event)
new1 =await stream1.result()
assert new1 == event["messages"]
new1
all_messages.extend(new1)
# Turn 2: ask about context from turn 1
context2 = AgentContext(
system_prompt="You are helpful. Be concise (1-2 sentences max).",
messages=all_messages, # carries forward
tools=None,
)
prompt2 = [{"role": "user", "content": "What is my name?"}]
stream2 = agent_loop(prompt2, context2, config)
asyncfor event in stream2:
if event["type"] =="message_update":
continueprint(event)
new2 =await stream2.result()
assert new2 == event["messages"]
new2
all_messages.extend(new2)
all_messages
7. The on_update callback — streaming from tools
Tools can stream partial results during execution via the on_update callback.
This emits tool_execution_update events — useful for showing progress in a UI
(like a bash command streaming stdout line by line).
importasyncioasyncdefcountdown_execute(tool_call_id, params, signal=None, on_update=None):
"""Count down, streaming each number."""
n = params.get("seconds", 3)
for i inrange(n, 0, -1):
if signal and signal.is_set():
raiseException("Aborted")
if on_update:
on_update(ToolResult(content=[{"type": "text", "text": f"{i}..."}]))
await asyncio.sleep(0.5)
return ToolResult(content=[{"type": "text", "text": "Liftoff!"}])
countdown_tool = Tool(
name="countdown",
description="Count down from N seconds. Use when asked to count down.",
parameters={
"type": "object",
"properties": {
"seconds": {"type": "integer", "description": "Seconds to count down from"}
},
},
execute=countdown_execute,
)
context = AgentContext(
system_prompt="You have a countdown tool. Use it when asked to count down.",
messages=[],
tools=[countdown_tool],
)
config = AgentConfig(model=MODEL, convert_to_llm=make_default_convert(MODEL))
stream = agent_loop(
[{"role": "user", "content": "Count down from 3"}],
context,
config,
)
asyncfor event in stream:
# if event['type'] == 'message_update':# continueprint(event)
8. Multiple tool calls
The LLM can call tools across separate turns (sequential reasoning — needs result A before calling B)
or request multiple tools in one response (parallel — independent calls).
The loop handles both. When multiple tools arrive in one response, it executes them
sequentially (not in parallel), checking for steering messages after each one.
Let's see both patterns.
# Pattern 1: Sequential — LLM needs result A before calling B# (3 + 5) * 2 requires the add result before multiplyasyncdefmultiply_execute(tool_call_id, params, signal=None, on_update=None):
result = params["a"] * params["b"]
return ToolResult(content=[{"type": "text", "text": str(result)}])
multiply_tool = Tool(
name="multiply",
description="Multiply two numbers.",
parameters={
"type": "object",
"properties": {
"a": {"type": "number", "description": "First number"},
"b": {"type": "number", "description": "Second number"},
},
"required": ["a", "b"],
},
execute=multiply_execute,
)
context = AgentContext(
system_prompt="You have add and multiply tools. Use them to compute expressions.",
messages=[],
tools=[add_tool, multiply_tool],
)
config = AgentConfig(model=MODEL, convert_to_llm=make_default_convert(MODEL))
stream = agent_loop(
[{"role": "user", "content": "What is (3 + 5) * 2? Use the tools."}],
context,
config,
)
asyncfor event in stream:
if event["type"] =="message_update":
continueprint(event)
Pattern 1 above: tools across turns (sequential reasoning)
The LLM called add(3, 5) in turn 1, got 8, then called multiply(8, 2) in turn 2.
It needed the first result before making the second call — so each tool is in a separate turn.
Pattern 2 below: multiple tools in one response (parallel/independent)
When the LLM doesn't need result A to call B, it can request both in a single response.
The loop still executes them sequentially (for steering), but they arrive together.
# Pattern 2: Parallel — independent tool calls in one response# "Add 3+5 AND add 10+20" — no dependency between them
context = AgentContext(
system_prompt="You have an add tool. When asked to do multiple additions, call the tool for each one in a single response.",
messages=[],
tools=[add_tool],
)
config = AgentConfig(model=MODEL, convert_to_llm=make_default_convert(MODEL))
stream = agent_loop(
[{"role": "user", "content": "Add 3+5 and also add 10+20. Do both at once."}],
context,
config,
)
asyncfor event in stream:
if event["type"] =="message_update":
continueprint(event)
await stream.result()
9. Usage tracking
Every assistant message has a usage dict with token counts.
These come from litellm's response (requires stream_options={"include_usage": True},
which the loop always passes).
Usage is tracked per assistant message, not aggregated. The consumer sums across turns.
# Run a simple call and inspect usage
context = AgentContext(system_prompt="Be concise.", messages=[], tools=None)
config = AgentConfig(model=MODEL, convert_to_llm=make_default_convert(MODEL))
stream = agent_loop(
[{"role": "user", "content": "Explain gravity in one sentence."}],
context,
config,
)
result =await stream.result()
result
10. agent_loop_continue — resuming from manually-built context
Unlike agent_loop (which injects new prompt messages), agent_loop_continue takes
the context as-is and lets the LLM respond to whatever's already there. No new messages added.
agent_loop(prompts, context, config, signal) # has prompts
agent_loop_continue(context, config, signal) # no prompts — context already has everything
When to use it examples:
Restoring a conversation from a database where the last message is a tool result
External tool execution — you ran the tool outside the loop and want the LLM to see the result
Testing — inject specific conversation states without running through the whole flow
Constraint: the last message in context must be user or tool (not assistant).
If it's already an assistant response, there's nothing for the LLM to respond to.
# Build a context as if we had a conversation, then continue without a new prompt
manual_context = AgentContext(
system_prompt="You are helpful. Be concise.",
messages=[
{"role": "user", "content": "My favorite color is blue."},
{"role": "assistant", "content": "Got it — blue!", "tool_calls": None},
{"role": "user", "content": "And I love pizza."},
{"role": "assistant", "content": "Noted — pizza lover!", "tool_calls": None},
# The "new" message we're continuing from — no agent_loop prompt needed
{"role": "user", "content": "What have we discussed so far?"},
],
tools=None,
)
config = AgentConfig(model=MODEL, convert_to_llm=make_default_convert(MODEL))
stream = agent_loop_continue(manual_context, config)
await stream.result()
# Continue from a tool result — as if we ran the tool externally
weather_context = AgentContext(
system_prompt="You are helpful. Be concise. Summarize tool results for the user.",
messages=[
{"role": "user", "content": "What's the weather in NYC?."},
{
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {"name": "get_weather", "arguments": '{"city": "NYC"}'},
}
],
},
{
"role": "tool",
"tool_call_id": "call_1",
"content": "72°F, sunny, light breeze",
},
],
tools=[
Tool(
name="get_weather",
description="Get the weather for a city",
parameters={
"type": "object",
"properties": {
"city": {"type": "string"}
},
"required": ["city"]
},
execute=lambda city: ToolResult(content=[{"type": "text", "text": ""}]),
)
],
)
config = AgentConfig(model=MODEL, convert_to_llm=make_default_convert(MODEL))
stream = agent_loop_continue(weather_context, config)
await stream.result()
11. Testing with different models
The loop is model-agnostic — just change config.model. Let's try the same prompt
across providers to see how they differ.
MODELS_TO_TEST = [
"anthropic/claude-sonnet-4-6",
"gemini/gemini-3-flash-preview",
"gpt-5.2",
]
for model in MODELS_TO_TEST:
print(f"\n{'='*60}")
print(f"Model: {model}")
print(f"{'='*60}")
context = AgentContext(
system_prompt="Be concise. One sentence.",
messages=[],
tools=[echo_tool],
)
config = AgentConfig(model=model, convert_to_llm=make_default_convert(model))
stream = agent_loop(
[{"role": "user", "content": "Echo the word 'ping' using the echo tool."}],
context,
config,
)
result =await stream.result()
print(result[-1]['content'])
No state management (that's the Agent class, See part 3 blog post)
No tools provided (consumer brings them)
No system prompt (consumer provides it)
No retry logic (consumer calls agent_loop_continue)
No parallel tool execution (sequential for steering)
No context compaction (consumer provides transform_context hook)
The Agent class will wrap everything together as we will see in part 3 blog post.
12. Steering — interrupting the loop mid-run
get_steering_messages is a hook on AgentConfig that the loop calls:
Before the first LLM call — to check for queued messages
After each tool execution — to check if the user interrupted
If it returns messages, those get injected into context and the loop continues
with them instead of the LLM's plan. Remaining tool calls get skipped
(marked as errors with "Skipped due to queued user message").
This is how pi implements "user types while agent is running" — the agent sees
the new message and pivots.
# Simulate: user sends "Actually, just say hi" after the first tool executes# We use a counter so steering fires once (after first tool), then returns None
steering_called =0defsteering_after_first_tool():
global steering_called
steering_called +=1if steering_called ==2: # first call is before LLM, second is after first toolreturn [
{"role": "user", "content": "Actually, forget the additions. Just say hi."}
]
returnNone# Give it 3 independent adds — steering should skip the 2nd and 3rd
context = AgentContext(
system_prompt="You have an add tool. Use it when asked. Be concise.",
messages=[],
tools=[add_tool],
)
config = AgentConfig(
model=MODEL,
convert_to_llm=make_default_convert(MODEL),
get_steering_messages=steering_after_first_tool,
)
stream = agent_loop(
[
{
"role": "user",
"content": "Add 1+2, add 3+4, and add 5+6. Call all three at once.",
}
],
context,
config,
)
await stream.result()
13. Follow-ups — the outer loop
get_follow_up_messages is checked when the agent would normally stop (no more tool calls,
no pending steering). If it returns messages, the outer loop continues — injecting them
and starting another inner loop cycle.
This is how you queue "when you're done with X, also do Y" without interrupting the current task.
Unlike steering (which interrupts mid-tool-batch), follow-ups only fire at natural stopping points.
# Follow-up: after the agent answers the first question, ask a second one
follow_up_sent =Falsedefcheck_follow_ups():
global follow_up_sent
ifnot follow_up_sent:
follow_up_sent =Truereturn [{"role": "user", "content": "Now, what is 10 * 10?"}]
returnNone
context = AgentContext(
system_prompt="You are helpful. Be concise. One sentence max.",
messages=[],
tools=None,
)
config = AgentConfig(
model=MODEL,
convert_to_llm=make_default_convert(MODEL),
get_follow_up_messages=check_follow_ups,
)
stream = agent_loop(
[{"role": "user", "content": "What is 2 + 2?"}],
context,
config,
)
asyncfor event in stream:
if event["type"] =="message_update":
continueprint(event)
14. Cancellation — signal
Pass an asyncio.Event as signal. When you call signal.set(), the loop stops
consuming chunks and the message gets stop_reason: "aborted". The loop exits immediately.
# Cancel after we receive the first text delta
cancel_signal = asyncio.Event()
context = AgentContext(
system_prompt="Write a very long essay about the history of computing.",
messages=[],
tools=None,
)
config = AgentConfig(model=MODEL, convert_to_llm=make_default_convert(MODEL))
stream = agent_loop(
[{"role": "user", "content": "Go ahead, write the essay."}],
context,
config,
signal=cancel_signal,
)
text_chunks =0
cancelled =Falseasyncfor event in stream:
if event["type"] =="message_update"and event.get("delta_type") =="text_delta":
text_chunks +=1print(event["delta"]["content"], end="", flush=True)
if text_chunks >=3: # cancel after 3 chunks
cancel_signal.set()
elif event["type"] =="message_end"and event["message"].get("role") =="assistant":
print(f"\nstop_reason: {event['message'].get('stop_reason')}")
await stream.result()
15. transform_context — modifying messages before each LLM call
Called before convert_to_llm on every LLM call. Receives the full messages list,
returns a (possibly modified) list. The original context.messages is NOT mutated —
this only affects what the LLM sees for this call.
Use cases: context compaction (summarize old messages), token pruning (drop old turns),
injecting dynamic context (current time, file contents).
Note: transforms are invisible to the event stream and .result(). The injected/modified
messages don't appear in events — only in what the LLM receives. You can verify
it worked by the LLM's response (e.g. it knows the time) but you won't see the
injected message in any event.
# Simple transform: inject a "current time" message before each LLM callfromdatetimeimport datetime
definject_time(messages, signal):
time_msg = {
"role": "user",
"content": f"[System: current time is {datetime.now().strftime('%H:%M:%S')}]",
}
return [time_msg] + messages
context = AgentContext(
system_prompt="You are helpful. Be concise.",
messages=[],
tools=None,
)
config = AgentConfig(
model=MODEL,
convert_to_llm=make_default_convert(MODEL),
transform_context=inject_time,
)
stream = agent_loop(
[{"role": "user", "content": "What time is it?"}],
context,
config,
)
asyncfor event in stream:
print(event)
16. reasoning_effort — thinking/reasoning
Controls how much the model "thinks" before responding. Maps to provider-specific
parameters (Anthropic's thinking.budget_tokens, OpenAI's reasoning_effort, etc.).
litellm translates for each provider.
Values: "minimal", "low", "medium", "high", "xhigh" (or None for default).
# Compare reasoning across all target models and effort levels
QUESTION ="A bat and a ball cost $1.10 in total. The bat costs $1.00 more than the ball. How much does the ball cost? Think carefully."for model in ALL_MODELS:
for effort in [None, "high"]:
context = AgentContext(
system_prompt="Be concise. Give the answer and brief reasoning.",
messages=[],
tools=None,
)
config = AgentConfig(
model=model,
convert_to_llm=make_default_convert(model),
reasoning_effort=effort,
)
stream = agent_loop(
[{"role": "user", "content": QUESTION}],
context,
config,
)
result =await stream.result()
assistant = [m for m in result if m.get("role") =="assistant"][-1]
thinking = assistant.get("reasoning_content")
print(f"{model} | reasoning_effort={effort!r}")
print(f" answer: {assistant.get('content')[:120]}")
print(f" thinking: {thinking[:100]+'...'ifthinkingelse'(none)'}")
print(f" tokens: {assistant.get('usage',{}).get('total_tokens')}")
print()
17. ToolResult.details — UI-only data
ToolResult has two fields: content (sent to the LLM) and details (UI-only, never sent to LLM).
This split lets tools return rich metadata for the UI (interactive charts, syntax highlighting,
source URLs) without polluting what the LLM sees. The loop carries details through events
so the UI can render them, but convert_to_llm strips them before the LLM call.
# A "lookup" tool: content has the answer text, details has rich UI metadataasyncdeflookup_execute(tool_call_id, params, signal=None, on_update=None):
return ToolResult(
content=[{"type": "text", "text": "The population of Tokyo is 14 million."}],
details={
"source_url": "https://example.com/tokyo",
"confidence": 0.95,
"chart_html": "<div>interactive population chart</div>",
},
)
lookup_tool = Tool(
name="lookup",
description="Look up a fact. Use when asked about factual data.",
parameters={
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"],
},
execute=lookup_execute,
)
# Wrap convert_to_llm to capture what actually gets sent to the LLM
llm_calls = []
base_convert = make_default_convert(MODEL)
deflogging_convert(messages):
converted = base_convert(messages)
llm_calls.append(converted)
return converted
context = AgentContext(
system_prompt="Use the lookup tool when asked about facts. Be concise.",
messages=[],
tools=[lookup_tool],
)
config = AgentConfig(model=MODEL, convert_to_llm=logging_convert)
stream = agent_loop(
[{"role": "user", "content": "What is the population of Tokyo?"}],
context,
config,
)
asyncfor event in stream:
if event["type"] =="message_update":
continueprint(event)
# Now show what the LLM actually saw on the second call (after tool result)print("\n=== What the LLM received (2nd call, after tool executed) ===")
for msg in llm_calls[1]:
print(f" [{msg['role']}] keys={list(msg.keys())}")
if msg["role"] =="tool":
print(f" content={msg['content']!r:.60s}")
print(f" has 'details': {'details'inmsg}") # should be False
await stream.result()
18. Multimodal tools + make_default_convert
Tools can return images alongside text. The interesting part is what happens at the
provider boundary — because OpenAI doesn't support images in tool result messages.
Anthropic / Gemini: Tool results accept content block arrays with image_url blocks.
The image goes directly in the tool result message. Simple.
OpenAI (GPT-5.2): Tool results are string-only. If you put an image in a tool result,
OpenAI silently drops it. The workaround: strip images from tool results, re-inject them
as a synthetic user message after the tool result. The LLM still sees the image — just
via a different message type.
make_default_convert handles this automatically — it detects the model provider and
hoists images for OpenAI, passes them natively for Anthropic/Gemini. No custom converter needed.
We'll use a chart tool that returns text + a bar chart image, then ask the LLM about the
chart in a follow-up turn. This proves the image flows through the conversation and the
LLM can actually see it.
importbase64importioimportrandomimportmatplotlib
matplotlib.use("Agg")
importmatplotlib.pyplotasplt# Chart tool — returns text + image with one obvious spikeasyncdefchart_execute(tool_call_id, params, signal=None, on_update=None):
months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun"]
values = [random.randint(5, 15) for _ in months]
# Make one bar 10x bigger so any model can spot it
spike_idx = random.randint(0, 5)
values[spike_idx] =150
spike_month = months[spike_idx]
fig, ax = plt.subplots(figsize=(6, 3))
colors = ["#e74c3c"if i == spike_idx else"#3498db"for i inrange(6)]
ax.bar(months, values, color=colors)
for i, v inenumerate(values):
ax.text(i, v +2, str(v), ha="center", fontsize=10, fontweight="bold")
ax.set_title(params.get("title", "Monthly Errors"))
ax.set_ylabel("Count")
buf = io.BytesIO()
fig.savefig(buf, format="png", dpi=80, bbox_inches="tight")
plt.close(fig)
img_b64 = base64.b64encode(buf.getvalue()).decode()
return ToolResult(
content=[
{
"type": "text",
"text": f"Chart generated. Months: {months}, Values: {values}",
},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{img_b64}"},
},
],
details={"spike_month": spike_month},
)
chart_tool = Tool(
name="generate_chart",
description="Generate a bar chart of monthly server errors. Returns text + image.",
parameters={
"type": "object",
"properties": {"title": {"type": "string", "description": "Chart title"}},
},
execute=chart_execute,
)
# Test all models: generate chart → ask which month has the spike# Use logging_convert to see exactly what each provider receives
MODELS_TO_TEST = [
"anthropic/claude-sonnet-4-6",
"anthropic/claude-opus-4-6",
"gemini/gemini-3-flash-preview",
"gpt-5.2",
]
for model in MODELS_TO_TEST:
print(f"\n{'='*60}")
print(f" {model}")
print(f"{'='*60}")
llm_calls = []
base_convert = make_default_convert(model)
deflogging_convert(messages, _base=base_convert, _calls=llm_calls):
converted = _base(messages)
_calls.append(converted)
return converted
# Turn 1: generate the chart
context = AgentContext(
system_prompt="You are a data analyst. Use tools when asked. Be concise.",
messages=[],
tools=[chart_tool],
)
config = AgentConfig(model=model, convert_to_llm=logging_convert)
stream = agent_loop(
[{"role": "user", "content": "Generate a chart of monthly server errors."}],
context,
config,
)
spike_month =None
new_messages = []
asyncfor event in stream:
if event["type"] =="tool_execution_end":
spike_month = event["result"]["details"]["spike_month"]
print(f" spike month: {spike_month}")
if (
event["type"] =="message_end"and event["message"].get("role") =="assistant"
):
content = event["message"].get("content")
if content:
print(f" assistant: {content[:100]}...")
new_messages =await stream.result()
context.messages.extend(new_messages)
# Turn 2: ask about the chart (LLM must see the image)
llm_calls.clear()
stream2 = agent_loop(
[
{
"role": "user",
"content": "Which month has the highest error count? Reply with just the month name.",
}
],
context,
config,
)
answer =""asyncfor event in stream2:
if (
event["type"] =="message_end"and event["message"].get("role") =="assistant"
):
answer = (event["message"].get("content") or"").strip()
# Show what the LLM received on turn 2print("\n --- What the LLM received (turn 2) ---")
if llm_calls:
for msg in llm_calls[0]:
role = msg["role"]
content = msg.get("content", "")
if role =="tool":
content_type =type(content).__name__ifisinstance(content, list):
types = [b.get("type") for b in content]
print(f" [{role}] content blocks: {types}")
else:
print(f" [{role}] content: {content[:60]!r}")
elif role =="user"andisinstance(content, list):
types = [b.get("type") for b in content]
print(
f" [{role}] content blocks: {types} ← synthetic image injection"
)
else:
c = content ifisinstance(content, str) elsestr(content)
print(f" [{role}] {c[:80]!r}")
# Did the model spot the spike?
month_map = {
"jan": "january",
"feb": "february",
"mar": "march",
"apr": "april",
"may": "may",
"jun": "june",
}
full = month_map.get(spike_month.lower(), spike_month.lower())
found = spike_month.lower() in answer.lower() or full in answer.lower()
print(f"\n answer: {answer!r}")
print(f" correct: {found} (expected {spike_month})")
Conclusion
Everything we've explored here (streaming, tool execution, steering, follow-ups, cancellation, context transforms, etc.) are the mechanics that run inside every agent.prompt() call. The
loop is the engine and you don't need to interact with it directly like we did here.
In Part 3, we'll use the Agent class on top of this. That's the public API. It's where you actually configure models, queue steering messages, subscribe to events, and manage conversation
state.