🤖 AI Tools
· 5 min read

AI Agent Error Handling: Retries, Fallbacks, and Circuit Breakers (2026)


AI agents fail in ways traditional software doesn’t. The API returns a 429. The model hallucinates a function that doesn’t exist. The agent enters an infinite tool-call loop. The response is valid JSON but semantically wrong. Traditional try/catch doesn’t cover these failure modes.

Production agents need layered error handling: retries for transient failures, fallbacks for persistent ones, circuit breakers for cascading failures, and validation for semantic errors.

The failure taxonomy

Failure typeExampleFrequencyFix
Rate limit (429)Too many API requestsCommonRetry with backoff
Server error (500/503)Provider outageOccasionalFallback to another model
TimeoutComplex reasoning takes too longOccasionalIncrease timeout or simplify
Invalid outputAgent returns malformed JSONCommonRetry with stricter prompt
HallucinationAgent calls a tool that doesn’t existCommonValidate before execution
Infinite loopAgent keeps calling the same toolRare but dangerousCircuit breaker
Context overflowConversation exceeds token limitGradualSummarize and compact
Budget exceededToken spend hits limitPlannedGraceful degradation

Retry with exponential backoff

The most common failure: rate limits from the API provider.

import asyncio
import random

async def retry_with_backoff(func, max_retries=5, base_delay=1.0):
    for attempt in range(max_retries):
        try:
            return await func()
        except RateLimitError:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            print(f"Rate limited. Retrying in {delay:.1f}s (attempt {attempt + 1}/{max_retries})")
            await asyncio.sleep(delay)
        except ServerError as e:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt)
            print(f"Server error: {e}. Retrying in {delay:.1f}s")
            await asyncio.sleep(delay)

The jitter (random.uniform(0, 1)) prevents thundering herd problems when multiple agents retry simultaneously.

Model fallback chain

When your primary model is down or degraded, fall back to alternatives:

MODEL_CHAIN = [
    {"model": "claude-sonnet-4", "provider": "anthropic"},
    {"model": "gpt-4o", "provider": "openai"},
    {"model": "gemini-2.5-pro", "provider": "google"},
    {"model": "deepseek-chat", "provider": "deepseek"},
]

async def run_with_fallback(agent_config, message):
    for model_config in MODEL_CHAIN:
        try:
            agent = Agent(
                **agent_config,
                model=model_config["model"],
            )
            result = await Runner.run(agent, message)
            return result
        except (RateLimitError, ServerError, TimeoutError) as e:
            print(f"{model_config['model']} failed: {e}. Trying next...")
            continue
    
    raise AllModelsFailed("No available models")

OpenRouter does this automatically — it routes to the cheapest available provider for a given model. Using OpenRouter as your API gateway gives you built-in fallback without custom code.

Circuit breaker pattern

Prevent cascading failures when a provider is down:

from datetime import datetime, timedelta

class CircuitBreaker:
    def __init__(self, failure_threshold=5, reset_timeout=60):
        self.failures = 0
        self.threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.last_failure = None
        self.state = "closed"  # closed = normal, open = blocking
    
    def can_execute(self) -> bool:
        if self.state == "closed":
            return True
        if self.state == "open":
            if datetime.now() - self.last_failure > timedelta(seconds=self.reset_timeout):
                self.state = "half-open"
                return True  # Allow one test request
            return False
        return True  # half-open
    
    def record_success(self):
        self.failures = 0
        self.state = "closed"
    
    def record_failure(self):
        self.failures += 1
        self.last_failure = datetime.now()
        if self.failures >= self.threshold:
            self.state = "open"

# One circuit breaker per provider
breakers = {
    "openai": CircuitBreaker(),
    "anthropic": CircuitBreaker(),
    "google": CircuitBreaker(),
}

When a provider fails 5 times in a row, the circuit opens and all requests skip that provider for 60 seconds. This prevents wasting time and tokens on a provider that’s clearly down.

Tool call validation

Agents sometimes hallucinate tool calls — calling functions that don’t exist or passing invalid arguments:

VALID_TOOLS = {"read_file", "write_file", "run_tests", "search_code"}

async def validate_tool_call(tool_name: str, args: dict) -> bool:
    if tool_name not in VALID_TOOLS:
        return False
    
    # Validate arguments per tool
    if tool_name == "read_file":
        path = args.get("path", "")
        if ".." in path or path.startswith("/etc"):
            return False  # Path traversal attempt
    
    if tool_name == "run_tests":
        if args.get("timeout", 30) > 300:
            return False  # Unreasonable timeout
    
    return True

Always validate tool calls before execution. An agent that can call arbitrary functions is a security risk.

Infinite loop detection

The most dangerous failure: an agent that keeps calling the same tool in a loop, burning tokens without making progress.

MAX_CONSECUTIVE_SAME_TOOL = 3
MAX_TOTAL_TOOL_CALLS = 15

class LoopDetector:
    def __init__(self):
        self.tool_history = []
    
    def check(self, tool_name: str, tool_args: str) -> bool:
        call_signature = f"{tool_name}:{tool_args}"
        self.tool_history.append(call_signature)
        
        # Check for exact repetition
        if len(self.tool_history) >= MAX_CONSECUTIVE_SAME_TOOL:
            recent = self.tool_history[-MAX_CONSECUTIVE_SAME_TOOL:]
            if len(set(recent)) == 1:
                return False  # Same call repeated N times
        
        # Check total calls
        if len(self.tool_history) > MAX_TOTAL_TOOL_CALLS:
            return False
        
        return True

When a loop is detected, interrupt the agent with a message: “You’ve called the same tool 3 times with the same arguments. The approach isn’t working. Try a different strategy.”

Output validation

The model returned a response, but is it actually correct?

async def validate_output(agent_output: str, expected_format: str) -> bool:
    if expected_format == "json":
        try:
            json.loads(agent_output)
            return True
        except json.JSONDecodeError:
            return False
    
    if expected_format == "code":
        # Basic syntax check
        try:
            compile(agent_output, "<agent>", "exec")
            return True
        except SyntaxError:
            return False
    
    return True  # No validation for free-form text

For critical outputs, use a second (cheaper) model to validate the first model’s output. This “LLM-as-judge” pattern catches hallucinations that structural validation misses.

Graceful degradation

When everything fails, degrade gracefully instead of crashing:

async def handle_request(user_id, message):
    try:
        return await run_with_fallback(agent_config, message)
    except AllModelsFailed:
        return "I'm experiencing issues connecting to AI services. Please try again in a few minutes."
    except BudgetExceeded:
        return "You've reached your daily usage limit. Your limit resets at midnight UTC."
    except LoopDetected:
        return "I got stuck on this problem. Could you rephrase your request or break it into smaller steps?"
    except ContextOverflow:
        # Auto-compact and retry
        await compact_session(user_id)
        return await run_with_fallback(agent_config, message)

The user should never see a stack trace or a generic “500 Internal Server Error.” Every failure mode should have a human-readable message and a suggested next step.

Monitoring error patterns

Track errors to identify systemic issues:

async def log_error(error_type, model, user_id, details):
    await metrics.increment("agent_error", tags={
        "type": error_type,
        "model": model,
        "user": user_id,
    })

Alert on:

  • Error rate exceeding 5% (something is wrong)
  • Same user hitting errors repeatedly (their use case might be unsupported)
  • Loop detection triggering more than once per hour (prompt needs fixing)
  • All models failing simultaneously (check your API keys)

Connect to your observability platform for dashboards and alerting.

Related: How to Debug AI Agents · AI Agent Cost Management · AI Agent Security · Deploy AI Agents to Production · LLM Regression Testing · OpenRouter Complete Guide · AI Agent State Management