This is Part 3 in a series where I port pi-mono's TypeScript agent loop to Python as liteagent.
In Part 2 we built the raw loop, a stateless function that makes a single LLM call, executes tools, and yields events. Now we wrap it with the Agent class: a stateful layer that manages
conversation history, tool registration, and multi-turn interactions. This is where the raw loop becomes something you can actually use.
This notebook is a hands-on walkthrough of liteagent.Agent — the stateful class that wraps
the raw loop.
The raw loop (agent_loop, agent_loop_continue) that we saw in Part 2 is stateless. You pass in context, it returns
an EventStream, you iterate events yourself.
The Agent class wraps the loop and manages:
subscribe(callback) instead of manual async forsteer() and follow_up() with dequeue modesabort() with partial message preservationis_streaming, stream_message, pending_tool_calls, errorThis is the same two-layer design as pi-mono (agent-loop.ts + agent.ts).
What we'll cover:
subscribe() — the primary consumer APIprompt() overloads — string, dict, list, imagessteer() mid-runfollow_up() after idlecontinue_run() — resume from contextabort() and partial preservationwait_for_idle()reset() vs clear_messages()_default_convert_to_llm — what it doesThe Agent needs a model string (litellm format) and optionally tools, system prompt,
and a convert_to_llm function. Let's import everything and define our helpers.
pip install git+https://github.com/DrChrisLevy/liteagent.git
or
uv pip install git+https://github.com/DrChrisLevy/liteagent.git
You'll need API keys set as environment variables (e.g. ANTHROPIC_API_KEY, OPENAI_API_KEY). In a notebook you can load them from a .env file:
from dotenv import load_dotenv
load_dotenv()
from liteagent import Agent, Tool, ToolResult
from liteagent.convert import make_default_convert
# Default model for all examples
MODEL = "anthropic/claude-sonnet-4-6"
# The Agent has a built-in default converter (make_default_convert) that:
# - Strips liteagent metadata (usage, timestamp, stop_reason, etc.)
# - Passes everything else through (thinking_blocks, reasoning_content,
# provider_specific_fields — new litellm fields survive automatically)
# - For OpenAI models: hoists images from tool results into user messages
# (OpenAI ignores image blocks in tool result content)
#
# Most examples below use the default — no convert_to_llm needed.
# We'll explore the converter in detail in section 16.
# Simple echo tool — reused across examples
async def echo_execute(tool_call_id, params, signal=None, on_update=None):
return ToolResult(content=[{"type": "text", "text": params["message"]}])
echo_tool = Tool(
name="echo",
description="Echo back a message exactly",
parameters={
"type": "object",
"properties": {
"message": {"type": "string", "description": "The message to echo"}
},
"required": ["message"],
},
execute=echo_execute,
)
print("Setup complete.")
The absolute minimum: create an Agent, call prompt("..."), check agent.messages.
Unlike the raw loop (where you build AgentContext + AgentConfig, call agent_loop,
and iterate the EventStream yourself), the Agent does all of that internally.
prompt() blocks until the loop completes.
agent = Agent(
model=MODEL,
system_prompt="Be concise. One sentence max.",
)
await agent.prompt("What is 2 + 2?")
agent.messages
agent.state.messages
from pprint import pprint
pprint(agent.state)
agent.state.messages
# The default converter strips liteagent metadata, keeps LLM-compatible fields
convert = make_default_convert(MODEL)
convert(agent.state.messages)
# User message — our input, wrapped in a dict by prompt()
agent.messages[0]
# Assistant message — enriched with usage, stop_reason, timestamp, model, etc.
agent.messages[1]
# This is why we usd @property --> Read-only access. With @property, it prevents replacing the state object itself.
agent.state = "BREAK THE STATE"
The assistant message has all the extras the loop adds:
usage — token counts from litellmstop_reason — "stop" (normal), "tool_calls", "error", "aborted"timestamp — Unix msthinking_blocks / reasoning_content — None unless thinking is enabledprovider_specific_fields — opaque bag from litellmThese extras are why convert_to_llm exists — they must be stripped before
sending messages back to the LLM.
In the loop notebook, we used async for event in stream to consume events.
The Agent doesn't expose the stream. Instead, you subscribe a callback:
unsub = agent.subscribe(my_callback) # returns unsubscribe function
The callback fires synchronously during await agent.prompt() using the same thread so there are no concurrency issues.
This is how pi's agent works too.
Why callbacks instead of async iteration?
The Agent is the sole reader of the loop's EventStream (internal detail). External consumers get events via subscribe. This lets multiple consumers see the same events (unlike a queue where each item is consumed once).
agent = Agent(
model=MODEL,
system_prompt="Be concise.",
)
# Collect all events
events = []
unsub = agent.subscribe(lambda e: events.append(e))
await agent.prompt("Say hello!")
for e in events:
print(e)
Same events sequence as the raw loop:
Now let's test unsubscribe:
count_before = len(events)
count_before
unsub() # stop receiving events
agent._subscribers
await agent.prompt("Say goodbye.")
count_after = len(events)
print(f"Events before unsub: {count_before}")
print(f"Events after second prompt: {count_after}")
print(f"Unsubscribe worked: {count_before == count_after}")
agent.state.messages
Like pi's agent.prompt(), ours accepts four input shapes:
| Input | What happens |
|---|---|
prompt("string") |
Wrapped in {"role": "user", "content": "string", "timestamp": ...} |
prompt({"role": "user", ...}) |
Used as-is |
prompt([msg1, msg2]) |
Multiple messages injected |
prompt("text", images=[...]) |
Multimodal: text + images in content array |
Let's see what each produces.
# Overload 1: string
agent = Agent(model=MODEL)
await agent.prompt("Hello from a string")
agent.messages[0]
# Overload 2: dict (used as-is)
agent = Agent(model=MODEL)
await agent.prompt(
{"role": "user", "content": "Hello from a dict", "custom_field": "preserved"}
)
agent.messages[0]
convert = make_default_convert(MODEL)
convert(agent.messages)
# Overload 3: list of messages
agent = Agent(model=MODEL, system_prompt="Be concise.")
await agent.prompt(
[
{"role": "user", "content": "My name is Alice."},
{"role": "user", "content": "What is my name?"},
]
)
agent.messages
# Overload 4: string + images (multimodal)
# Send a real image and ask the LLM about it
image_block = {
"type": "image_url",
"image_url": {
"url": "https://upload.wikimedia.org/wikipedia/commons/thumb/7/7a/LeBron_James_%2851959977144%29_%28cropped2%29.jpg/250px-LeBron_James_%2851959977144%29_%28cropped2%29.jpg"
},
}
agent = Agent(
model=MODEL,
system_prompt="Be concise. One sentence max.",
)
await agent.prompt("What is in this image?", images=[image_block])
print(f"Response: {agent.messages[-1].get('content')}")
agent.messages
The Agent tracks state in an AgentState dataclass. You can inspect it at any time:
agent.state.is_streaming # True while loop is running
agent.state.stream_message # current partial message being streamed (or None)
agent.state.pending_tool_calls # set of tool call IDs currently executing
agent.state.error # last error message (or None)
agent.state.model # current model string
agent.state.system_prompt # current system prompt
agent.state.tools # current tool list
agent.state.thinking_level # "off", "minimal", "low", "medium", "high", "xhigh"
agent.messages # shorthand for agent.state.messages
Let's watch state change during a run using a subscriber:
agent = Agent(
model=MODEL,
system_prompt="Be concise.",
tools=[echo_tool],
)
# Track state transitions
state_log = []
print(
f"{'Event':<26} {'Streaming':<10} {'StreamMsg':<10} {'PendTools':<10} {'MsgCount':<10}"
)
def track_state(event):
t = event["type"]
entry = {
"event": t,
"is_streaming": agent.state.is_streaming,
"stream_msg": agent.state.stream_message is not None,
"pending_tools": len(agent.state.pending_tool_calls),
"msg_count": len(agent.messages),
}
state_log.append(entry)
print("-" * 66)
print(
f"{entry['event']:<26} {str(entry['is_streaming']):<10} {str(entry['stream_msg']):<10} {entry['pending_tools']:<10} {entry['msg_count']:<10}"
)
agent.subscribe(track_state)
await agent.prompt("Echo 'hello world'")
Notice how:
is_streaming is True throughout the runstream_message appears on message_start (assistant only), disappears on message_endpending_tool_calls increments on tool_execution_start, decrements on tool_execution_endmsg_count grows on each message_end — messages are appended incrementally, not batchedThis is the Agent's main value: messages persist across prompt() calls.
With the raw loop, you'd need to manually thread context between calls.
The Agent does it automatically.
agent = Agent(
model=MODEL,
system_prompt="Be concise. Remember everything the user says.",
)
# Turn 1: tell the agent something
await agent.prompt("My favorite color is blue.")
agent.messages
# Turn 2: ask about it — the agent should remember
await agent.prompt("What is my favorite color?")
agent.messages
steer() queues a message that gets injected during a run:
The loop also checks for steering at the start of each run (before the first LLM call).
So if you call steer() before prompt(), the steering message gets picked up immediately.
Let's demonstrate both: pre-queued steering, and mid-tool steering.
# Pre-queued steering: steer() before prompt()
agent = Agent(
model=MODEL,
system_prompt="Be concise.",
)
agent.steer("Actually, tell me a joke instead.")
await agent.prompt("What is the capital of France?")
agent.messages
# Mid-tool steering: steer() during tool execution
# When tool_a executes, it queues a steering message.
# tool_b should be SKIPPED.
call_log = []
steering_agent = None
async def tool_a_exec(tool_call_id, params, signal=None, on_update=None):
call_log.append("a")
steering_agent.steer("Stop! Do something else.") # interrupt!
return ToolResult(content=[{"type": "text", "text": "tool_a done"}])
async def tool_b_exec(tool_call_id, params, signal=None, on_update=None):
call_log.append("b")
return ToolResult(content=[{"type": "text", "text": "tool_b done"}])
tool_a = Tool(
name="tool_a",
description="Tool A",
parameters={"type": "object", "properties": {}},
execute=tool_a_exec,
)
tool_b = Tool(
name="tool_b",
description="Tool B",
parameters={"type": "object", "properties": {}},
execute=tool_b_exec,
)
steering_agent = Agent(
model=MODEL,
system_prompt="When asked, call both tool_a and tool_b in a single response. Be concise.",
tools=[tool_a, tool_b],
)
await steering_agent.prompt("Call both tool_a and tool_b now.")
steering_agent.messages
The key proof: tool_b never ran ('b' not in call_log), but it still has a tool result in the conversation. The LLM needs every tool call to have a result, even skipped ones.
follow_up() is the outer loop mechanism. Unlike steering (which interrupts),
follow-ups wait until the agent finishes everything (no more tool calls, no steering).
Then the follow-up message is injected and the agent continues.
agent = Agent(
model=MODEL,
system_prompt="Be concise. One sentence.",
)
# Queue a follow-up BEFORE the first prompt
agent.follow_up("Now tell me a fun fact about cats.")
await agent.prompt("What is 2 + 2?")
agent.messages
Both steering and follow-up have two modes:
"one-at-a-time" (default) — dequeue one message per poll"all" — dequeue everything at onceThis matters when multiple messages are queued. Let's see the difference.
# one-at-a-time (default): queue 3, dequeue returns 1
agent = Agent(model=MODEL)
agent.steer("msg1")
agent.steer("msg2")
agent.steer("msg3")
batch = agent._dequeue_steering()
print(
f"one-at-a-time: got {len(batch)} message(s), {len(agent._steering_queue)} remaining"
)
print(f" dequeued: '{batch[0]['content']}'")
# all mode: queue 3, dequeue returns all 3
agent = Agent(model=MODEL, steering_mode="all")
agent.steer("msg1")
agent.steer("msg2")
agent.steer("msg3")
batch = agent._dequeue_steering()
print(f"all mode: got {len(batch)} message(s), {len(agent._steering_queue)} remaining")
for m in batch:
print(f" '{m['content']}'")
continue_run() is for when the conversation ended at a tool result or user message
and you want the LLM to continue from there, without sending a new prompt.
Three interesting cases when the last message is an assistant message:
When would you actually use this? In normal chat (prompt() → response → prompt() again),
you won't. continue_run() is for recovery and resumption — when something outside the
normal flow modifies the message history. Real-world examples from pi-mono's coding agent:
continue() kicks the loop to pick up from there.continue()
re-runs the loop without needing a new user prompt.follow_up() or steer() is called after the agent
finishes, continue_run() restarts the loop to process them.# Case: continue from a tool result (manually built context)
weather_tool = Tool(
name="weather",
description="Get the weather in a location",
parameters={"type": "object", "properties": {"location": {"type": "string"}}},
execute=lambda *args: ToolResult(content=[{"type": "text", "text": "72°F and sunny in San Francisco"}]),
)
agent = Agent(
model=MODEL,
system_prompt="Be concise.",
tools=[weather_tool],
)
# Simulate: user asked about weather → assistant called tool → we have the result
agent._state.messages = [
{"role": "user", "content": "What's the weather?"},
{
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": "c0",
"type": "function",
"function": {"name": "weather", "arguments": "{}"},
}
],
"stop_reason": "tool_calls",
},
{
"role": "tool",
"tool_call_id": "c0",
"content": [{"type": "text", "text": "72°F and sunny in San Francisco"}],
"is_error": False,
},
]
await agent.continue_run()
agent.messages
# Case: continue from assistant + steering queue
agent = Agent(model=MODEL)
agent._state.messages = [
{"role": "user", "content": "Hi"},
{"role": "assistant", "content": "Hello!", "stop_reason": "stop"},
]
agent.steer("Now tell me a joke.")
await agent.continue_run()
agent.messages
# Case: continue from assistant + empty queues → error
agent = Agent(model=MODEL)
agent._state.messages = [
{"role": "user", "content": "Hi"},
{"role": "assistant", "content": "Hello!", "stop_reason": "stop"},
]
try:
await agent.continue_run()
except ValueError as e:
print(f"Got expected error: {e}")
abort() sets a signal that stops the loop. But what happens to the partial
assistant message that was being streamed? The Agent handles this edge case
(same as pi's agent.):
# abort() after receiving some text — partial should be preserved
agent = Agent(
model=MODEL,
system_prompt="Write a very long essay about the history of computing. At least 5000 words.",
)
chunk_count = 0
def abort_after_chunks(event):
print("signal is set: " + str(agent._signal.is_set()))
if agent.state.stream_message:
print(agent.state.stream_message["content"])
global chunk_count
if event["type"] == "message_update" and event.get("delta_type") == "text_delta":
chunk_count += 1
if chunk_count >= 5:
agent.abort()
agent.subscribe(abort_after_chunks)
await agent.prompt("Go ahead.")
print(f"is_streaming: {agent.state.is_streaming}")
print(f"signal cleaned up: {agent._signal is None}")
agent.messages
A coordination primitive. prompt() already awaits internally, so when it returns
the agent is idle. wait_for_idle() is for when something else triggered the agent
and you need to sync from a different place in your code:
# Some callback triggered a run
agent.follow_up("do something")
asyncio.create_task(agent.continue_run())
# Later, elsewhere:
await agent.wait_for_idle() # block until that run finishes
# now safe to inspect agent.messages
If you're always doing await agent.prompt() sequentially, you'll never need this.
# When idle, returns immediately
agent = Agent(model=MODEL)
await agent.wait_for_idle() # should not hang
print("wait_for_idle() returned immediately (agent is idle)")
# After a prompt, also returns immediately (prompt already blocks)
await agent.prompt("Hi")
await agent.wait_for_idle()
print("wait_for_idle() returned immediately (prompt already completed)")
import asyncio
agent = Agent(model=MODEL)
async def background_task():
agent.follow_up("Summarize what 2+2 is")
await agent.continue_run()
await agent.prompt("Hi")
# Kick off a run from a background task
asyncio.create_task(background_task())
await asyncio.sleep(0) # yield to event loop — lets background_task enter _run_loop and set _running_future
await agent.wait_for_idle()
agent.messages
Two ways to clear state, with different scopes:
| Method | Clears messages | Clears queues | Clears error | Keeps config |
|---|---|---|---|---|
reset() |
✓ | ✓ | ✓ | ✓ |
clear_messages() |
✓ | ✗ | ✗ | ✓ |
reset() is "start over". clear_messages() is "clear history but keep queued work".
agent = Agent(
model=MODEL,
system_prompt="you are a nice agent",
tools=[echo_tool],
)
agent.append_message({"role": "user", "content": "old message"})
agent.steer("queued steering")
agent.follow_up("queued follow-up")
agent._state.error = "some error"
print("Before clear_messages():")
agent.messages
agent.clear_messages()
print("After clear_messages():")
print(f" messages: {agent.messages}")
print(f" queued: {agent.has_queued_messages()}") # True — queues survived
print(f" error: {agent.state.error}") # "some error" — survived
# Now reset — clears everything
agent.append_message({"role": "user", "content": "new message"})
agent.reset()
print("After reset():")
print(
f" messages: {len(agent.messages)}, queued: {agent.has_queued_messages()}, error: {agent.state.error}"
)
print(f" model: {agent.state.model} ← preserved")
print(f" system_prompt: '{agent.state.system_prompt}' ← preserved")
print(f" tools: {len(agent.state.tools)} ← preserved")
Pi allows calling setModel(), setTools(), etc. even while the agent is streaming.
The loop snapshots context at the start of each run, so mid-run changes only take
effect on the next run. We match this behavior.
This enables patterns like:
agent = Agent(model=MODEL)
# Track which model gets called
def event_handler(e):
if e["type"] == "message_end":
print(f"Model Used: {agent.state.model}")
agent.subscribe(event_handler)
await agent.prompt("hey")
# Switch model mid-conversation
agent.set_model("gemini/gemini-3-flash-preview")
await agent.prompt("Bye")
agent.messages
When the LLM call fails (network error, rate limit, etc.), the Agent:
stop_reason="error"agent.state.erroragent_end eventThe Agent does not re-raise — it always completes cleanly. This lets consumers
check agent.state.error instead of wrapping every prompt() in try/except.
# Force an error by using a non-existent model
agent = Agent(model="fake-provider/nonexistent-model")
await agent.prompt("This will fail.")
agent.messages
When a tool raises, the exception is caught and turned into a ToolResult with is_error=True. The error gets sent back to the LLM as a tool result, and the loop continues — the LLM sees the error and can recover or report it. The run doesn't abort.
async def failing_fn(tool_call_id, args, signal, on_update):
raise ValueError("Something went wrong!")
failing_tool = Tool(
name="failing_tool",
description="A tool that always fails",
parameters={"type": "object", "properties": {}},
execute=failing_fn,
)
agent = Agent(model=MODEL, tools=[failing_tool], system_prompt="Use the failing_tool when asked.")
await agent.prompt("Use your tool")
agent.messages
If you don't provide convert_to_llm, the Agent builds one via
make_default_convert(model) from liteagent/convert.py.
This is the sole provider-specific boundary in the codebase. It uses a
denylist approach. It strips known liteagent metadata fields, pass everything
else through.
What it strips: timestamp, usage, stop_reason, error_message,
details, is_error
What it preserves: thinking_blocks, reasoning_content,
provider_specific_fields, multimodal content blocks, tool call metadata,
everything the LLM needs for multi-turn functionality.
OpenAI image hoisting: For OpenAI models, tool results with [text, image_url]
content get split — text stays in the tool message, images are hoisted into a
synthetic user message. This is because OpenAI's Chat Completions API silently
ignores image blocks in tool result content. Anthropic and Gemini handle them
natively, so no hoisting needed.
It's not perfect. There are many quirks and edge cases with litellm.
I will evolve this over time.
from liteagent.convert import make_default_convert
# Build the converter for Anthropic (no image hoisting needed)
convert_anthropic = make_default_convert("anthropic/claude-sonnet-4-6")
# Build one for OpenAI (will hoist tool-result images)
convert_openai = make_default_convert("gpt-5.2")
# Simulate a conversation with enriched messages
messages = [
{"role": "user", "content": "Hi", "timestamp": 12345},
{
"role": "assistant",
"content": "Hello!",
"tool_calls": None,
"thinking_blocks": [{"type": "thinking", "thinking": "greeting"}],
"reasoning_content": "simple greeting",
"provider_specific_fields": {"thought_signatures": ["sig123"]},
"usage": {"prompt_tokens": 10, "completion_tokens": 5},
"stop_reason": "stop",
"timestamp": 12346,
},
{
"role": "tool",
"tool_call_id": "c0",
"name": "chart",
"content": [
{"type": "text", "text": "Here is the chart."},
{"type": "image_url", "image_url": {"url": "data:image/png;base64,abc"}},
],
"is_error": False,
"details": {"extra": "ui-only"},
"timestamp": 12347,
},
]
print("=== Anthropic converter (images stay in tool message) ===")
for m in convert_anthropic(messages):
print(m)
print()
print("=== OpenAI converter (images hoisted to user message) ===")
for m in convert_openai(messages):
print(m)
The default converter handles the common cases transparently. You'd override it when:
{"role": "notification", ...} that need
to be filtered or convertedFor many uses, the default just works and you won't need to pass convert_to_llm.
The Agent is model-agnostic. Let's run the same prompt + tool call through some different models.
MODELS = [
"anthropic/claude-sonnet-4-6",
"anthropic/claude-opus-4-6",
"gemini/gemini-3-pro-preview",
"gemini/gemini-3-flash-preview",
"gpt-5.2",
"gpt-5.3-codex",
"gpt-5.4",
]
for model in MODELS:
agent = Agent(
model=model,
system_prompt="Use the echo tool. Be concise.",
tools=[echo_tool],
)
try:
await agent.prompt("Echo 'test'")
roles = [m.get("role") for m in agent.messages]
has_tool = "tool" in roles
assistants = [m for m in agent.messages if m.get("role") == "assistant"]
last_content = (assistants[-1].get("content") or "")[:50] if assistants else "?"
print(f" ✓ {model:<42} tool_used={has_tool} | {last_content}")
except Exception as e:
print(f" ✗ {model:<42} ERROR: {e}")
The Agent is framework-agnostic. Here's how you'd wire it into different consumers.
# Pattern 1: CLI — print text deltas as they arrive
agent = Agent(
model=MODEL,
system_prompt="Be concise.",
)
def cli_handler(event):
if event["type"] == "message_update" and event.get("delta_type") == "text_delta":
text = event["delta"].get("content", "")
print(text, end="", flush=True)
elif event["type"] == "agent_end":
print() # newline at the end
agent.subscribe(cli_handler)
print("Agent: ", end="")
await agent.prompt("What is the meaning of life, in one sentence?")
The default converter handles multimodal tool results transparently:
This test sends a bar chart image to the model via a tool result. The chart has an obvious spike in May (580 vs ~130 baseline). The model must identify "May" from the image — proving it actually saw the image, not just the text.
This exercises the full round-trip: tool returns [text, image_url] →
default converter handles it per-provider → model reasons about the image.
import base64
import io
def make_bar_chart_b64():
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun"]
values = [120, 135, 128, 142, 580, 131]
fig, ax = plt.subplots(figsize=(8, 4))
ax.bar(months, values, color=["#3498db" if v < 300 else "#e74c3c" for v in values])
ax.set_title("Monthly API Errors")
for i, v in enumerate(values):
ax.text(i, v + 15, str(v), ha="center", fontweight="bold")
plt.tight_layout()
buf = io.BytesIO()
fig.savefig(buf, format="png", dpi=72)
plt.close(fig)
return base64.b64encode(buf.getvalue()).decode()
chart_b64 = make_bar_chart_b64()
async def get_chart_exec(tool_call_id, params, signal=None, on_update=None):
return ToolResult(
content=[
{"type": "text", "text": "Here is the monthly error chart."},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{chart_b64}"},
},
],
)
chart_tool = Tool(
name="get_error_chart",
description="Get the monthly error chart. Returns text + chart image.",
parameters={"type": "object", "properties": {}},
execute=get_chart_exec,
)
MULTIMODAL_MODELS = [
"anthropic/claude-sonnet-4-6",
"anthropic/claude-opus-4-6",
"gpt-5.2",
"gpt-5.3-codex",
"gpt-5.4",
]
for model in MULTIMODAL_MODELS:
agent = Agent(
model=model,
tools=[chart_tool],
system_prompt=(
"Use get_error_chart when asked. After seeing the chart, "
"answer the user's question about it. Be concise."
),
)
try:
await agent.prompt(
"Get the error chart, then tell me: which month has the highest "
"error count? Reply with just the month name."
)
assistants = [m for m in agent.messages if m.get("role") == "assistant"]
last = assistants[-1] if assistants else {}
if last.get("error_message"):
raise RuntimeError(last["error_message"])
answer = (last.get("content") or "").lower()
saw_image = "may" in answer
status = "PASSED" if saw_image else "FAILED"
print(f" {status} {model:<42} | {answer[:60]}")
except Exception as e:
print(f" FAILED {model:<42} | ERROR: {e}")
This wraps up my intro three part series on porting pi-mono's agent loop to Python.
Agent class. That is the stateful layer that manages message history, event subscription, queues, and cancellation.The result is liteagent.
The biggest design decision in liteagent is delegating all provider communication to litellm instead of writing custom provider code.
pi-mono's packages/ai/src/providers/ contains thousands of lines TypeScript across files for providers like Anthropic, OpenAI Completions, OpenAI Codex Responses, Google, Google Vertex, Google Gemini CLI, Mistral, Amazon Bedrock, etc.
liteagent's entire provider boundary is less than 100 lines in a single file (convert.py). It strips liteagent metadata, passes everything else through, and handles one provider quirk: hoisting images from tool results into synthetic user messages for OpenAI (which silently ignores image blocks in tool result content).
It's not perfect though.The upside is obvious, less to write, less to maintain, access to every model litellm supports. The downside is you inherit every litellm bug and inconsistency, and you can't fix them at the source easily. I can see why pi-mono went the custom route, and honestly I'm tempted at times to do the same for liteagent.
Many of the issues are documented in DESIGN_NOTES and LITELLM_API_LANDSCAPE. I think litellm is a great project, and has a very difficult task of mapping out all the llm providers patterns and unifying it all. A few of the issues I ran into:
Thinking metadata is inconsistent across providers. Anthropic gets reasoning_content (string) plus first-class thinking_blocks (with cryptographic signatures). Gemini gets reasoning_content (sometimes — absent on trivial prompts) with signatures buried in provider_specific_fields["thought_signatures"]. OpenAI's GPT-5.x thinking is completely invisible through Chat Completions (the model spends reasoning tokens but the content is hidden).
GPT-5.4 reasoning is silently disabled with tools. litellm quietly drops reasoning_effort when tools are present for GPT-5.4 specifically. This only affects 5.4; earlier models handle reasoning + tools fine through Chat Completions. The Responses API doesn't have this limitation, but litellm's acompletion() doesn't use it for base GPT models.
OpenAI ignores images in tool results. OpenAI's Chat Completions API accepts only string content in tool messages. If you send [text, image_url] blocks, the model sees the text but the image is silently dropped. Anthropic and Gemini handle multimodal tool results natively. This required a provider-specific workaround in the converter, the one place liteagent has provider-aware code.
The key insight from this port is that the agent loop itself is provider-agnostic. The dual while-loop, steering, follow-ups, cancellation, event streaming, etc. do not
depend on the provider. None of that cares which LLM is on the other end. The provider pain is entirely at the boundary: message format conversion, thinking metadata, multimodal content handling. Isolating that boundary to a single file (convert.py) keeps the core clean even when litellm forces workarounds.
But I'm still on the fence about the litellm trade-off. I want to spend some time building consumers that use liteagent, and see how much pain I actually have to deal with.
pi-mono has so many great patterns. I love the streaming of events and having consumers subscribing to them. I also think the dual loop architecture with steering and follow-ups is a great pattern too. It goes beyond just a standard agent loop with some
simple yet elegant "bells and whistles". I'm excited to build some tooling and custom agents around my python liteagent implementation.