Handling GPT-4o Rate Limits Without Hardcoded Fallbacks
You hit a rate limit, your agent crashes, and the fix everyone reaches for, try/except RateLimitError, only solves the easy case. Here's what actually works.
The Problem With Catching the Error
The standard pattern looks like this:
import openai
client = openai.OpenAI()
def call_model(prompt: str) -> str:
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
except openai.RateLimitError:
# fall back to mini
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
This handles the 429. It does not handle:
- Degraded performance before the 429. When you're approaching your rate limit, latency spikes significantly before OpenAI starts rejecting requests. Your agent is still "working" but producing slow, sometimes lower-quality responses.
- Retry storms. Multiple agent workers hitting the same limit simultaneously, all catching the error, all retrying, all making the problem worse.
- Stale fallback decisions. You hardcoded gpt-4o-mini as the fallback at deploy time. That decision doesn't update based on what's actually happening right now.
Let's go through three approaches from worst to best.
Approach 1: Manual Fallback (Worst)
This is what most people ship first.
import openai
import time
from typing import Optional
client = openai.OpenAI()
FALLBACK_CHAIN = ["gpt-4o", "gpt-4o-mini", "gpt-3.5-turbo"]
def call_with_fallback(
prompt: str,
model_chain: list[str] = FALLBACK_CHAIN
) -> tuple[str, str]:
"""Returns (response_text, model_used)"""
for model in model_chain:
try:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
timeout=30
)
return response.choices[0].message.content, model
except openai.RateLimitError:
print(f"Rate limited on {model}, trying next...")
time.sleep(1)
continue
except openai.APIError as e:
print(f"API error on {model}: {e}")
continue
raise RuntimeError("All models exhausted")
Why this fails in production:
The fallback order is static. You defined it when you wrote the code. Your agent will silently degrade to gpt-3.5-turbo on complex synthesis tasks without you knowing, because the code has no concept of task complexity. And it only triggers after the rate limit hits, by then you've already taken the latency hit.
Approach 2: Retry With Exponential Backoff (Better, Still Incomplete)
The tenacity library is the right tool for retry logic:
import openai
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log
)
import logging
logger = logging.getLogger(__name__)
client = openai.OpenAI()
@retry(
retry=retry_if_exception_type(openai.RateLimitError),
wait=wait_exponential(multiplier=1, min=2, max=60),
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARNING)
)
def call_gpt4o(prompt: str) -> str:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt)]
)
return response.choices[0].message.content
This is better. Exponential backoff prevents retry storms. The tenacity decorator is clean and composable.
But it still has a ceiling problem: you're retrying on gpt-4o until you exhaust attempts. If you're consistently rate-limited, you need to route differently, not just retry harder. And it still doesn't detect degradation before the error. It only responds to the error.
You can combine retry with fallback:
import openai
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
client = openai.OpenAI()
def create_retrying_caller(model: str):
@retry(
retry=retry_if_exception_type(openai.RateLimitError),
wait=wait_exponential(multiplier=1, min=2, max=30),
stop=stop_after_attempt(3)
)
def _call(prompt: str) -> str:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
return _call
call_gpt4o = create_retrying_caller("gpt-4o")
call_mini = create_retrying_caller("gpt-4o-mini")
def resilient_call(prompt: str) -> tuple[str, str]:
try:
return call_gpt4o(prompt), "gpt-4o"
except openai.RateLimitError:
return call_mini(prompt), "gpt-4o-mini"
Better. Still static routing that doesn't learn.
Approach 3: Outcome Routing (Best)
The core insight: rate limit handling should be a routing decision, not an error handling decision.
Instead of "call model X, catch error, call model Y," you want "ask the router which model has the highest success probability for this task right now, given current conditions."
Here's a minimal version you can build yourself:
import openai
import time
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Optional
import threading
client = openai.OpenAI()
@dataclass
class ModelStats:
successes: int = 0
failures: int = 0
total_latency_ms: float = 0.0
last_rate_limited: Optional[float] = None
def success_rate(self) -> float:
total = self.successes + self.failures
if total == 0:
return 0.5 # prior
return self.successes / total
def avg_latency(self) -> float:
if self.successes == 0:
return float('inf')
return self.total_latency_ms / self.successes
def is_cooling_down(self, cooldown_seconds: int = 30) -> bool:
if self.last_rate_limited is None:
return False
return (time.time() - self.last_rate_limited) < cooldown_seconds
class OutcomeRouter:
def __init__(self, models: list[str]):
self.models = models
self.stats: dict[str, ModelStats] = {m: ModelStats() for m in models}
self._lock = threading.Lock()
def select_model(self, task_type: str = "default") -> str:
"""Select model based on recent outcome history."""
with self._lock:
available = [
m for m in self.models
if not self.stats[m].is_cooling_down()
]
if not available:
# everyone is cooling down, wait and retry primary
return self.models[0]
# score each model: success rate, penalize high latency
def score(model: str) -> float:
s = self.stats[model]
sr = s.success_rate()
lat_penalty = min(s.avg_latency() / 10000, 0.3) # cap at 30% penalty
return sr, lat_penalty
return max(available, key=score)
def record_outcome(
self,
model: str,
success: bool,
latency_ms: float,
rate_limited: bool = False
):
with self._lock:
s = self.stats[model]
if success:
s.successes += 1
s.total_latency_ms += latency_ms
else:
s.failures += 1
if rate_limited:
s.last_rate_limited = time.time()
def call(self, prompt: str, task_type: str = "default") -> tuple[str, str]:
model = self.select_model(task_type)
start = time.time()
try:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
timeout=30
)
latency_ms = (time.time() - start) * 1000
self.record_outcome(model, True, latency_ms)
return response.choices[0].message.content, model
except openai.RateLimitError:
latency_ms = (time.time() - start) * 1000
self.record_outcome(model, False, latency_ms, rate_limited=True)
# immediately try next best model
fallback = self.select_model(task_type)
if fallback == model:
raise # nothing else available
response = client.chat.completions.create(
model=fallback,
messages=[{"role": "user", "content": prompt}],
timeout=30
)
latency_ms_fallback = (time.time() - start) * 1000
self.record_outcome(fallback, True, latency_ms_fallback)
return response.choices[0].message.content, fallback
# usage
router = OutcomeRouter(["gpt-4o", "gpt-4o-mini"])
response, model_used = router.call("Summarize this document: ...")
print(f"Response from {model_used}: {response[:100]}")
This router:
- Avoids rate-limited models during cooldown periods
- Weights model selection by recent success rate
- Penalizes high-latency models before they fully fail
- Updates dynamically from live traffic
What Kalibr Does Differently
The router above is a solid start, but it still only detects problems after they happen. Kalibr's approach uses canary traffic patterns, a small percentage of calls run across alternative models simultaneously, so it has live outcome data before the primary model degrades.
When you instrument your calls with Kalibr:
import kalibr
import openai
kalibr.init()
client = openai.OpenAI()
# kalibr wraps your client and handles routing transparently
response = client.chat.completions.create(
model="gpt-4o", # your preferred model
messages=[{"role": "user", "content": prompt}]
)
# if gpt-4o is degrading, kalibr routes to the best-performing
# alternative automatically, before you hit the rate limit wall
The key difference: Kalibr's router has outcome data from your actual traffic patterns across all tenants, not just the 10 requests your single agent has made this session. When GPT-4o starts degrading at 2pm on a Tuesday because everyone's hitting it, Kalibr already knows, and starts routing away before your agent errors.
What to Ship
If you're handling rate limits today:
- Minimum viable: Add exponential backoff with
tenacity. Stop retrying blindly. - Better: Build the
OutcomeRouterabove. Wire it into your model selection layer. Log which model gets selected and why. - Production: Let an adaptive router handle it. The data requirements for good routing exceed what any single agent accumulates in a session.
The try/except RateLimitError pattern isn't wrong. It's just the last line of defense, not the only one.
Kalibr keeps complex AI agents running without human intervention.
Get started free