FastAPI ASGI on Modal for OpenAI Throughput

Intro

In this post I show how easy it is to scale OpenAI throughput with a FastAPI web service on Modal.

To keep this concrete, I use one task throughout this post: topic classification for Hacker News titles. I benchmark four setups:

All approaches use the same prompt/model and the same title dataset. This is intentionally a demo, not production architecture. The goal is minimal code and clear comparisons you can reproduce quickly.

Approach 1: Single-Host Thread Pool

This baseline uses the sync OpenAI client and ThreadPoolExecutor on one machine. It is simple and works well enough as a control before moving to async and then multi-container ASGI.

from dotenv import load_dotenv

# load in the OpenAI API key
load_dotenv("posts/modal_asgi_openai/.env")
True
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Literal
import time

from IPython.display import HTML, display
from openai import OpenAI
from pydantic import BaseModel, Field

OPENAI_MODEL = "gpt-4.1-mini"
OPENAI_TIMEOUT_S = 20

TOPIC_PROMPT = """Classify one Hacker News title into a topic label.

topic:
- ai_ml_agents
- programming_dev_tools
- infra_cloud_ops
- hardware_electronics
- security_privacy
- business_startups_career
- science_math_research
- society_policy_economy
- culture_media_games
- other

Return only fields in the schema.
confidence is a float from 0.0 to 1.0.
"""

Topic = Literal[
    "ai_ml_agents",
    "programming_dev_tools",
    "infra_cloud_ops",
    "hardware_electronics",
    "security_privacy",
    "business_startups_career",
    "science_math_research",
    "society_policy_economy",
    "culture_media_games",
    "other",
]

class TopicClassification(BaseModel):
    topic: Topic
    confidence: float = Field(ge=0.0, le=1.0)


def show_df(df):
    display(HTML(df.to_html(index=False)))
DATA_PATH = Path("posts/modal_asgi_openai/data/hn_recent_story_titles_1000.txt")
titles = [line.strip() for line in DATA_PATH.read_text(encoding="utf-8").splitlines() if line.strip()]
def classify_one_sync(client: OpenAI, title: str) -> TopicClassification:
    response = client.responses.parse(
        model=OPENAI_MODEL,
        instructions=TOPIC_PROMPT,
        input=[{"role": "user", "content": title}],
        text_format=TopicClassification,
        timeout=OPENAI_TIMEOUT_S,
    )
    return response.output_parsed

def run_threadpool(titles: list[str], max_workers: int = 40) -> dict:
    if max_workers < 1:
        raise ValueError("max_workers must be >= 1")

    t0 = time.perf_counter()
    results: list[TopicClassification | Exception] = [None] * len(titles)

    client = OpenAI()
    try:
        with ThreadPoolExecutor(max_workers=max_workers) as ex:
            fut_to_idx = {ex.submit(classify_one_sync, client, title): i for i, title in enumerate(titles)}
            for fut in as_completed(fut_to_idx):
                idx = fut_to_idx[fut]
                try:
                    results[idx] = fut.result()
                except Exception as exc:  # demo notebook
                    results[idx] = exc
    finally:
        client.close()

    elapsed = time.perf_counter() - t0
    errors = sum(isinstance(x, Exception) for x in results)
    return {
        "num_items": len(titles),
        "max_workers": max_workers,
        "concurrency": max_workers,
        "elapsed_s": round(elapsed, 2),
        "req_per_s": round(len(titles) / elapsed, 1),
        "errors": errors,
        "results": results,
    }
bench_1 = run_threadpool(titles, max_workers=40)
{k: bench_1[k] for k in ["num_items", "max_workers", "elapsed_s", "req_per_s", "errors"]}
{'num_items': 1000,
 'max_workers': 40,
 'elapsed_s': 23.61,
 'req_per_s': 42.4,
 'errors': 0}
import pandas as pd

WORKER_SWEEP = [10, 20, 40, 80, 120]
NUM_ITEMS = len(titles)  # lower this for quick/cheap runs
sweep_titles = titles[:NUM_ITEMS]

sweep_rows = []
for w in WORKER_SWEEP:
    row = run_threadpool(sweep_titles, max_workers=w)
    row.pop("results", None)
    sweep_rows.append(row)

sweep_df = pd.DataFrame(sweep_rows)
show_df(sweep_df)
num_items max_workers concurrency elapsed_s req_per_s errors
1000 10 10 82.17 12.2 0
1000 20 20 41.83 23.9 0
1000 40 40 24.71 40.5 0
1000 80 80 17.99 55.6 0
1000 120 120 18.21 54.9 0
from pathlib import Path
from datetime import datetime, timezone

OUT_PATH = Path("posts/modal_asgi_openai/data/threadpool_worker_sweep.csv")
run_df = sweep_df.assign(ran_at_utc=datetime.now(timezone.utc).isoformat())
if OUT_PATH.exists():
    run_df = pd.concat([pd.read_csv(OUT_PATH), run_df], ignore_index=True)
run_df.to_csv(OUT_PATH, index=False)
OUT_PATH
PosixPath('posts/modal_asgi_openai/data/threadpool_worker_sweep.csv')

Approach 2: Single-Host Vanilla Async

This uses the same async classification pattern as we see in modal_service.py (later in this post), but runs locally on one host. The only tuning knob here is concurrency (semaphore size).

import asyncio

from openai import AsyncOpenAI

async def run_vanilla_async(titles: list[str], concurrency: int = 50) -> dict:
    if concurrency < 1:
        raise ValueError("concurrency must be >= 1")

    t0 = time.perf_counter()
    client = AsyncOpenAI()
    try:
        sem = asyncio.Semaphore(min(concurrency, len(titles)))

        async def classify_one(title: str) -> TopicClassification:
            async with sem:
                response = await client.responses.parse(
                    model=OPENAI_MODEL,
                    instructions=TOPIC_PROMPT,
                    input=[{"role": "user", "content": title}],
                    text_format=TopicClassification,
                    timeout=OPENAI_TIMEOUT_S,
                )
                return response.output_parsed

        raw_results = await asyncio.gather(
            *(classify_one(title=t) for t in titles),
            return_exceptions=True,
        )
        results = []
        errors = 0
        for item in raw_results:
            if isinstance(item, Exception):
                errors += 1
                results.append({"topic": None, "confidence": None, "error": str(item)})
            else:
                results.append(
                    {
                        "topic": item.topic,
                        "confidence": item.confidence,
                        "error": None,
                    }
                )
    finally:
        await client.close()

    elapsed = time.perf_counter() - t0
    return {
        "num_items": len(titles),
        "concurrency": concurrency,
        "elapsed_s": round(elapsed, 2),
        "req_per_s": round(len(titles) / elapsed, 1),
        "errors": errors,
        "results": results,
    }
bench_2 = await run_vanilla_async(titles, concurrency=50)
{k: bench_2[k] for k in ["num_items", "concurrency", "elapsed_s", "req_per_s", "errors"]}
{'num_items': 1000,
 'concurrency': 50,
 'elapsed_s': 17.78,
 'req_per_s': 56.3,
 'errors': 0}
import pandas as pd

ASYNC_CONCURRENCY_SWEEP = [10, 20, 40, 80, 120, 240]
NUM_ITEMS = len(titles)  # lower this for quick/cheap runs
sweep_titles = titles[:NUM_ITEMS]

async_rows = []
for c in ASYNC_CONCURRENCY_SWEEP:
    row = await run_vanilla_async(sweep_titles, concurrency=c)
    row.pop("results", None)
    async_rows.append(row)

async_df = pd.DataFrame(async_rows)
show_df(async_df)
num_items concurrency elapsed_s req_per_s errors
1000 10 85.80 11.7 0
1000 20 44.65 22.4 0
1000 40 22.02 45.4 0
1000 80 12.16 82.2 0
1000 120 8.36 119.6 0
1000 240 7.22 138.6 0
from pathlib import Path
from datetime import datetime, timezone

OUT_PATH = Path("posts/modal_asgi_openai/data/vanilla_async_concurrency_sweep.csv")
run_df = async_df.assign(ran_at_utc=datetime.now(timezone.utc).isoformat())
if OUT_PATH.exists():
    run_df = pd.concat([pd.read_csv(OUT_PATH), run_df], ignore_index=True)
run_df.to_csv(OUT_PATH, index=False)
OUT_PATH
PosixPath('posts/modal_asgi_openai/data/vanilla_async_concurrency_sweep.csv')

Approach 2b: Single-Host Async + Client Shards

This keeps everything on one machine, but splits load across multiple AsyncOpenAI clients and HTTP/2 connection pools.

The point is to test whether a single host can push much closer to provider limits before moving to multi-container infrastructure.

import httpx
from openai import AsyncOpenAI


def _split_concurrency(total_concurrency: int, client_shards: int) -> list[int]:
    shard_count = min(total_concurrency, client_shards)
    base, extra = divmod(total_concurrency, shard_count)
    return [base + (1 if i < extra else 0) for i in range(shard_count)]


async def run_sharded_async(
    titles: list[str],
    total_concurrency: int = 240,
    client_shards: int = 16,
) -> dict:
    t0 = time.perf_counter()

    shard_sizes = _split_concurrency(total_concurrency, client_shards)
    clients = []
    semaphores = []
    for shard_concurrency in shard_sizes:
        http_client = httpx.AsyncClient(
            http2=True,
            limits=httpx.Limits(
                max_connections=shard_concurrency,
                max_keepalive_connections=shard_concurrency,
            ),
        )
        clients.append(AsyncOpenAI(http_client=http_client, max_retries=0))
        semaphores.append(asyncio.Semaphore(shard_concurrency))

    async def classify_one(i: int, title: str) -> TopicClassification:
        shard = i % len(clients)
        async with semaphores[shard]:
            response = await clients[shard].responses.parse(
                model=OPENAI_MODEL,
                instructions=TOPIC_PROMPT,
                input=[{"role": "user", "content": title}],
                text_format=TopicClassification,
                timeout=OPENAI_TIMEOUT_S,
            )
            return response.output_parsed

    try:
        raw_results = await asyncio.gather(
            *(classify_one(i=i, title=t) for i, t in enumerate(titles)),
            return_exceptions=True,
        )
    finally:
        await asyncio.gather(*(client.close() for client in clients))

    results = []
    errors = 0
    for item in raw_results:
        if isinstance(item, Exception):
            errors += 1
            results.append({"topic": None, "confidence": None, "error": str(item)})
        else:
            results.append(
                {
                    "topic": item.topic,
                    "confidence": item.confidence,
                    "error": None,
                }
            )

    elapsed = time.perf_counter() - t0
    return {
        "num_items": len(titles),
        "total_concurrency": total_concurrency,
        "client_shards": client_shards,
        "elapsed_s": round(elapsed, 2),
        "req_per_s": round(len(titles) / elapsed, 1),
        "errors": errors,
        "results": results,
    }
bench_2b = await run_sharded_async(titles * 3, total_concurrency=400, client_shards=16)
{k: bench_2b[k] for k in ["num_items", "total_concurrency", "client_shards", "elapsed_s", "req_per_s", "errors"]}
{'num_items': 3000,
 'total_concurrency': 400,
 'client_shards': 16,
 'elapsed_s': 25.06,
 'req_per_s': 119.7,
 'errors': 0}
import pandas as pd

SHARDED_CONCURRENCY_SWEEP = [120, 240, 360, 480, 600]
CLIENT_SHARDS = 16
NUM_ITEMS = len(titles) * 3
sweep_titles = (titles * 3)[:NUM_ITEMS]

sharded_rows = []
for c in SHARDED_CONCURRENCY_SWEEP:
    row = await run_sharded_async(
        sweep_titles,
        total_concurrency=c,
        client_shards=CLIENT_SHARDS,
    )
    row.pop("results", None)
    sharded_rows.append(row)

sharded_df = pd.DataFrame(sharded_rows)
show_df(sharded_df)
num_items total_concurrency client_shards elapsed_s req_per_s errors
3000 120 16 28.03 107.0 0
3000 240 16 34.80 86.2 6
3000 360 16 27.21 110.3 0
3000 480 16 10.42 287.9 0
3000 600 16 26.98 111.2 1
from pathlib import Path
from datetime import datetime, timezone

OUT_PATH = Path("posts/modal_asgi_openai/data/sharded_async_concurrency_sweep.csv")
run_df = sharded_df.assign(ran_at_utc=datetime.now(timezone.utc).isoformat())
if OUT_PATH.exists():
    run_df = pd.concat([pd.read_csv(OUT_PATH), run_df], ignore_index=True)
run_df.to_csv(OUT_PATH, index=False)
OUT_PATH
PosixPath('posts/modal_asgi_openai/data/sharded_async_concurrency_sweep.csv')

Approach 3: Auto-Scaling ASGI on Modal

For the final approach, we keep the same async classification pattern but move it behind a FastAPI ASGI endpoint on Modal. The request handler in posts/modal_asgi_openai/modal_service.py uses the same semaphore + asyncio.gather(..., return_exceptions=True) flow as the single-host async approaches.

The difference is where concurrency comes from:

Deploy:

uv run --no-project --with modal --with python-dotenv modal deploy posts/modal_asgi_openai/modal_service.py

Call:

curl -X POST "https://drchrislevy--hn-title-classifier-api.modal.run/classify-batch" \
  -H "content-type: application/json" \
  -d '{"titles":["Show HN: Turn LinkedIn profiles into Markdown for LLM use"]}'

Benchmarking the Modal Endpoint

For this approach, the client splits titles into batches and fans requests out in parallel. We set request_concurrency = ceil(num_items / batch_size) so batches are sent in one wave. Batch size defaults to 20 for smaller runs and 50 once num_items > 1000.

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

MODAL_ENDPOINT = "https://drchrislevy--hn-title-classifier-api.modal.run/classify-batch"

def _post_modal_batch(batch: list[str], timeout_s: int) -> dict:
    resp = requests.post(
        MODAL_ENDPOINT,
        json={"titles": batch},
        timeout=timeout_s,
    )
    resp.raise_for_status()
    return resp.json()

def run_modal_endpoint(
    titles: list[str],
    batch_size: int | None = None,
    timeout_s: int = 300,
) -> dict:
    if batch_size is None:
        batch_size = 50 if len(titles) > 1000 else 20
    if batch_size < 1:
        raise ValueError("batch_size must be >= 1")

    batches = [
        (start, titles[start : start + batch_size])
        for start in range(0, len(titles), batch_size)
    ]
    num_batches = len(batches)
    request_concurrency = max(1, num_batches)

    results = [None] * len(titles)
    errors = 0
    t0 = time.perf_counter()

    if batches:
        with ThreadPoolExecutor(max_workers=request_concurrency) as ex:
            fut_to_meta = {
                ex.submit(_post_modal_batch, batch, timeout_s): (start, len(batch))
                for start, batch in batches
            }
            for fut in as_completed(fut_to_meta):
                start, size = fut_to_meta[fut]
                end = start + size
                try:
                    payload = fut.result()
                except Exception as exc:  # demo notebook
                    errors += size
                    results[start:end] = [
                        {"topic": None, "confidence": None, "error": str(exc)}
                    ] * size
                    continue

                batch_results = payload["results"]
                results[start:end] = batch_results
                errors += int(payload["errors"])

    elapsed = time.perf_counter() - t0
    return {
        "num_items": len(titles),
        "batch_size": batch_size,
        "num_batches": num_batches,
        "request_concurrency": request_concurrency,
        "elapsed_s": round(elapsed, 2),
        "req_per_s": round(len(titles) / elapsed, 1) if titles else 0.0,
        "errors": errors,
        "results": results,
    }
bench_3 = run_modal_endpoint(titles*3, batch_size=None)
{k: bench_3[k] for k in ["num_items", "batch_size", "num_batches", "request_concurrency", "elapsed_s", "req_per_s", "errors"]}
{'num_items': 3000,
 'batch_size': 50,
 'num_batches': 60,
 'request_concurrency': 60,
 'elapsed_s': 6.23,
 'req_per_s': 481.7,
 'errors': 0}
{'num_items': 3000,
 'batch_size': 50,
 'num_batches': 60,
 'request_concurrency': 60,
 'elapsed_s': 34.96,
 'req_per_s': 85.8,
 'errors': 0}
from pathlib import Path
from datetime import datetime, timezone

OUT_PATH = Path("posts/modal_asgi_openai/data/modal_endpoint_run.csv")
row = {k: v for k, v in bench_3.items() if k != "results"}
run_df = pd.DataFrame([row]).assign(ran_at_utc=datetime.now(timezone.utc).isoformat())
if OUT_PATH.exists():
    run_df = pd.concat([pd.read_csv(OUT_PATH), run_df], ignore_index=True)
run_df.to_csv(OUT_PATH, index=False)
OUT_PATH
PosixPath('posts/modal_asgi_openai/data/modal_endpoint_run.csv')

Key Code: ASGI FastAPI on Modal

This is the implementation worth paying attention to in this post.

It is a minimal FastAPI ASGI service deployed on Modal, with:

This is the exact code used for Approach 3:

from pathlib import Path
from IPython.display import Markdown, display

service_path = Path("posts/modal_asgi_openai/modal_service.py")
service_code = service_path.read_text(encoding="utf-8")
display(Markdown(f"```python\n{service_code}\n```"))
#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.10"
# dependencies = [
#     "modal",
#     "python-dotenv",
#     "fastapi[standard]",
#     "openai>=2.20.0",
# ]
# ///
"""Minimal auto-scaling ASGI classifier for Hacker News titles on Modal.

Deploy:
    uv run --no-project --with modal --with python-dotenv modal deploy posts/modal_asgi_openai/modal_service.py

Call:
    curl -X POST 'https://drchrislevy--hn-title-classifier-api.modal.run/classify-batch' \
      -H 'content-type: application/json' \
      -d '{"titles":["Show HN: Turn LinkedIn profiles into Markdown for LLM use"]}'

Requirements:
    - Add OPENAI_API_KEY,MODAL_TOKEN_ID,MODAL_TOKEN_SECRET to posts/modal_asgi_openai/.env
"""

import asyncio
from contextlib import asynccontextmanager
from typing import Literal

import modal

MIN_CONTAINERS = None
MAX_CONTAINERS = None
MAX_INPUTS = 1
OPENAI_WORKERS_PER_REQUEST = 50
OPENAI_MODEL = 'gpt-4.1-mini'
OPENAI_TIMEOUT_S = 20

CLASSIFIER_PROMPT = """Classify one Hacker News title into a topic label.

topic:
- ai_ml_agents
- programming_dev_tools
- infra_cloud_ops
- hardware_electronics
- security_privacy
- business_startups_career
- science_math_research
- society_policy_economy
- culture_media_games
- other

Return only fields in the schema.
confidence is a float from 0.0 to 1.0.
"""

image = modal.Image.debian_slim().pip_install('fastapi[standard]', 'openai>=2.20.0')
app = modal.App('hn-title-classifier')
secrets = [modal.Secret.from_dotenv(filename='posts/modal_asgi_openai/.env')]


@app.function(
    image=image,
    secrets=secrets,
    timeout=60 * 10,
    min_containers=MIN_CONTAINERS,
    max_containers=MAX_CONTAINERS,
    scaledown_window=5 * 60,
    cpu=1.0,
    memory=256,
)
@modal.concurrent(max_inputs=MAX_INPUTS)
@modal.asgi_app()
def api():
    from fastapi import FastAPI
    from openai import AsyncOpenAI
    from pydantic import BaseModel, Field

    Topic = Literal[
        'ai_ml_agents',
        'programming_dev_tools',
        'infra_cloud_ops',
        'hardware_electronics',
        'security_privacy',
        'business_startups_career',
        'science_math_research',
        'society_policy_economy',
        'culture_media_games',
        'other',
    ]

    class HNClassification(BaseModel):
        topic: Topic
        confidence: float = Field(ge=0.0, le=1.0)

    class ClassifyResult(BaseModel):
        topic: Topic | None = None
        confidence: float | None = Field(default=None, ge=0.0, le=1.0)
        error: str | None = None

    class ClassifyBatchRequest(BaseModel):
        titles: list[str] = Field(min_length=1, max_length=2000)

    class ClassifyBatchResponse(BaseModel):
        results: list[ClassifyResult]
        num_items: int
        errors: int

    @asynccontextmanager
    async def lifespan(web_app: FastAPI):
        web_app.state.openai_client = AsyncOpenAI()
        yield
        await web_app.state.openai_client.close()

    web_app = FastAPI(title='HN Title Classifier', lifespan=lifespan)

    @web_app.post('/classify-batch', response_model=ClassifyBatchResponse)
    async def classify_batch(req: ClassifyBatchRequest) -> ClassifyBatchResponse:
        client: AsyncOpenAI = web_app.state.openai_client
        sem = asyncio.Semaphore(min(OPENAI_WORKERS_PER_REQUEST, len(req.titles)))

        async def classify_one(title: str) -> HNClassification:
            async with sem:
                response = await client.responses.parse(
                    model=OPENAI_MODEL,
                    instructions=CLASSIFIER_PROMPT,
                    input=[{'role': 'user', 'content': title}],
                    text_format=HNClassification,
                    timeout=OPENAI_TIMEOUT_S,
                )
                return response.output_parsed

        raw_results = await asyncio.gather(
            *(classify_one(title=t) for t in req.titles),
            return_exceptions=True,
        )
        results: list[ClassifyResult] = []
        errors = 0
        for item in raw_results:
            if isinstance(item, Exception):
                errors += 1
                results.append(ClassifyResult(error=str(item)))
            else:
                results.append(
                    ClassifyResult(
                        topic=item.topic,
                        confidence=item.confidence,
                    )
                )
        return ClassifyBatchResponse(results=results, num_items=len(results), errors=errors)

    return web_app

Getting an API Key

Combined Benchmark Summary

The CSVs are append-only, so each run is preserved. This cell picks the best req_per_s row per approach and writes benchmark_summary.csv.

For fair comparisons, match num_items across approaches (for example, compare 3000 vs 3000).

from pathlib import Path

import pandas as pd

DATA_DIR = Path("posts/modal_asgi_openai/data")

thread_df = pd.read_csv(DATA_DIR / "threadpool_worker_sweep.csv")
async_df = pd.read_csv(DATA_DIR / "vanilla_async_concurrency_sweep.csv")
sharded_df = pd.read_csv(DATA_DIR / "sharded_async_concurrency_sweep.csv")
modal_df = pd.read_csv(DATA_DIR / "modal_endpoint_run.csv")

thread_best = thread_df.loc[thread_df["req_per_s"].idxmax()]
async_best = async_df.loc[async_df["req_per_s"].idxmax()]
sharded_best = sharded_df.loc[sharded_df["req_per_s"].idxmax()]
modal_best = modal_df.loc[modal_df["req_per_s"].idxmax()]

summary_df = pd.DataFrame(
    [
        {
            "approach": "threadpool_single_host",
            "setting": f"workers={int(thread_best['max_workers'])}",
            "num_items": int(thread_best["num_items"]),
            "best_req_per_s": float(thread_best["req_per_s"]),
            "elapsed_s": float(thread_best["elapsed_s"]),
            "errors": int(thread_best["errors"]),
            "num_runs": int(len(thread_df)),
            "ran_at_utc": thread_best["ran_at_utc"],
        },
        {
            "approach": "asyncio_single_host",
            "setting": f"concurrency={int(async_best['concurrency'])}",
            "num_items": int(async_best["num_items"]),
            "best_req_per_s": float(async_best["req_per_s"]),
            "elapsed_s": float(async_best["elapsed_s"]),
            "errors": int(async_best["errors"]),
            "num_runs": int(len(async_df)),
            "ran_at_utc": async_best["ran_at_utc"],
        },
        {
            "approach": "asyncio_sharded_single_host",
            "setting": (
                f"concurrency={int(sharded_best['total_concurrency'])}, "
                f"shards={int(sharded_best['client_shards'])}"
            ),
            "num_items": int(sharded_best["num_items"]),
            "best_req_per_s": float(sharded_best["req_per_s"]),
            "elapsed_s": float(sharded_best["elapsed_s"]),
            "errors": int(sharded_best["errors"]),
            "num_runs": int(len(sharded_df)),
            "ran_at_utc": sharded_best["ran_at_utc"],
        },
        {
            "approach": "modal_asgi_autoscale",
            "setting": (
                f"batch_size={int(modal_best['batch_size'])}, "
                f"requests={int(modal_best['request_concurrency'])}"
            ),
            "num_items": int(modal_best["num_items"]),
            "best_req_per_s": float(modal_best["req_per_s"]),
            "elapsed_s": float(modal_best["elapsed_s"]),
            "errors": int(modal_best["errors"]),
            "num_runs": int(len(modal_df)),
            "ran_at_utc": modal_best["ran_at_utc"],
        },
    ]
).sort_values("best_req_per_s", ascending=False).reset_index(drop=True)

OUT_PATH = DATA_DIR / "benchmark_summary.csv"
summary_df.to_csv(OUT_PATH, index=False)
show_df(summary_df)
approach setting num_items best_req_per_s elapsed_s errors num_runs ran_at_utc
modal_asgi_autoscale batch_size=50, requests=60 3000 481.7 6.23 0 11 2026-02-17T01:10:20.187556+00:00
asyncio_sharded_single_host concurrency=480, shards=16 3000 287.9 10.42 0 42 2026-02-17T01:35:14.869706+00:00
asyncio_single_host concurrency=240 3000 194.7 15.40 0 33 2026-02-17T01:18:44.082334+00:00
threadpool_single_host workers=120 3000 58.5 51.30 0 26 2026-02-17T01:15:58.557673+00:00
import matplotlib.pyplot as plt
import seaborn as sns

LABELS = {
    "threadpool_single_host": "Thread Pool",
    "asyncio_single_host": "Async (single client)",
    "asyncio_sharded_single_host": "Async (sharded clients)",
    "modal_asgi_autoscale": "Modal ASGI autoscale",
}

plot_df = summary_df.sort_values("best_req_per_s", ascending=True).reset_index(drop=True)
plot_df["label"] = plot_df["approach"].map(LABELS)

sns.set_theme(style="whitegrid")
fig, ax = plt.subplots(figsize=(8, 3.2))
colors = sns.color_palette("viridis", n_colors=len(plot_df))
bars = ax.barh(plot_df["label"], plot_df["best_req_per_s"], color=colors)

x_pad = plot_df["best_req_per_s"].max() * 0.02
for i, bar in enumerate(bars):
    value = bar.get_width()
    n_items = int(plot_df.loc[i, "num_items"])
    ax.text(
        value + x_pad,
        bar.get_y() + bar.get_height() / 2,
        f"{value:.1f} req/s (n={n_items})",
        va="center",
        ha="left",
        fontsize=10,
    )

ax.set_xlabel("requests / second")
ax.set_ylabel("")
ax.set_title("OpenAI Throughput by Approach (best run per approach)")
ax.set_xlim(right=plot_df["best_req_per_s"].max() * 1.35)
sns.despine(left=True, bottom=True)
plt.tight_layout()

Takeaways