Rate Limits API Claude : comment les gérer sans perdre la tête
La semaine dernière, un dev m’a contacté en panique. Son app marchait nickel en test, mais dès qu’il a ouvert à 50 users beta, boom : erreurs 429 partout. Le truc classique quand on découvre les rate limits en prod.
Les rate limits de l’API Claude, c’est pas juste une limite de requêtes par minute. C’est plus subtil que ça. Anthropic limite sur plusieurs dimensions : requêtes par minute (RPM), tokens par minute (TPM), et tokens par jour (TPD). Du coup, vous pouvez avoir plein de quota requêtes mais être bloqué sur les tokens.
Je vais vous montrer comment gérer ça proprement, avec du code qui marche vraiment.
Les limites par tier (et pourquoi ça compte)
Anthropic a 4 tiers qui débloquent automatiquement selon votre utilisation :
- Tier 1 (début) : 50 RPM, 40k TPM, 1M TPD
- Tier 2 (après 5$) : 1000 RPM, 80k TPM, 2.5M TPD
- Tier 3 (après 100$) : 2000 RPM, 160k TPM, 5M TPD
- Tier 4 (après 1000$) : 4000 RPM, 320k TPM, 10M TPD
Le piège que personne ne voit venir : même en Tier 4, si vous envoyez des prompts longs avec Claude 3.5 Sonnet, vous tapez la limite tokens bien avant la limite requêtes. Un prompt de 100k tokens (genre analyse de doc) ? Vous en passez 3 par minute max avec 320k TPM.
Bon, maintenant le code.
Implémenter un retry exponentiel intelligent
La base, c’est détecter l’erreur 429 et réessayer. Mais pas n’importe comment. Anthropic renvoie un header retry-after qui vous dit exactement combien attendre.
Voici ma classe Python que j’utilise sur tous mes projets :
import anthropic
import time
from typing import Optional
class ClaudeClientWithRetry:
def __init__(self, api_key: str, max_retries: int = 5):
self.client = anthropic.Anthropic(api_key=api_key)
self.max_retries = max_retries
def create_message_with_retry(self, **kwargs):
for attempt in range(self.max_retries):
try:
return self.client.messages.create(**kwargs)
except anthropic.RateLimitError as e:
if attempt == self.max_retries - 1:
raise
# Anthropic renvoie le délai dans l'exception
retry_after = getattr(e, 'retry_after', None)
if retry_after:
wait_time = float(retry_after)
else:
# Fallback : backoff exponentiel
wait_time = (2 ** attempt) + (random.random() * 0.1)
print(f"Rate limit hit, waiting {wait_time}s...")
time.sleep(wait_time)
except anthropic.APIError as e:
# Autres erreurs API (500, timeouts, etc.)
if attempt == self.max_retries - 1:
raise
wait_time = (2 ** attempt)
time.sleep(wait_time)
raise Exception("Max retries exceeded")
# Utilisation
client = ClaudeClientWithRetry(api_key="votre-clé")
response = client.create_message_with_retry(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": "Explique-moi les quarks"}]
)
Le truc important : on respecte retry_after quand il existe, sinon on fait un backoff exponentiel avec un peu de jitter (le random) pour éviter que tous vos workers retry en même temps.
Queue et rate limiting côté client
Franchement, compter sur les retries seuls c’est moyen. En prod, vous voulez contrôler le débit AVANT d’envoyer les requêtes.
J’utilise une queue avec un token bucket. Ça sonne compliqué mais c’est simple :
import asyncio
import time
from collections import deque
class RateLimiter:
def __init__(self, rpm: int, tpm: int):
self.rpm = rpm
self.tpm = tpm
self.request_timestamps = deque()
self.token_usage = deque() # (timestamp, tokens)
async def acquire(self, estimated_tokens: int):
while True:
now = time.time()
# Nettoyer les entrées vieilles de plus d'1 minute
while self.request_timestamps and now - self.request_timestamps[0] > 60:
self.request_timestamps.popleft()
while self.token_usage and now - self.token_usage[0][0] > 60:
self.token_usage.popleft()
# Calculer l'usage actuel
current_requests = len(self.request_timestamps)
current_tokens = sum(tokens for _, tokens in self.token_usage)
# Vérifier si on peut passer
if (current_requests < self.rpm and
current_tokens + estimated_tokens <= self.tpm):
self.request_timestamps.append(now)
self.token_usage.append((now, estimated_tokens))
return
# Attendre 1 seconde avant de réessayer
await asyncio.sleep(1)
# Utilisation
limiter = RateLimiter(rpm=1000, tpm=80000) # Tier 2
async def call_claude_safe(prompt: str):
# Estimer les tokens (roughement 1 token = 4 chars)
estimated = len(prompt) // 4 + 1000 # +1000 pour la réponse
await limiter.acquire(estimated)
# Maintenant on peut appeler l'API
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
return response
Ça marche super bien pour des workers asyncio qui traitent des queues de jobs. Vous contrôlez le débit en amont, donc moins d’erreurs 429.
Stratégie multi-tier pour scaler vite
Un truc que j’ai testé sur un projet avec des pics de trafic : utiliser plusieurs clés API à différents tiers.
Vous gardez votre clé principale (Tier 3 ou 4), mais vous avez 2-3 clés secondaires en Tier 1-2. Quand la principale hit les limites, vous basculez temporairement sur les secondaires pour les requêtes moins critiques.
class MultiTierClient:
def __init__(self, primary_key: str, secondary_keys: list[str]):
self.primary = ClaudeClientWithRetry(primary_key)
self.secondaries = [ClaudeClientWithRetry(k) for k in secondary_keys]
self.current_secondary = 0
async def create_message(self, priority: str = "normal", **kwargs):
if priority == "high":
# Toujours utiliser la clé principale pour haute priorité
return self.primary.create_message_with_retry(**kwargs)
# Pour priorité normale, essayer secondaires d'abord
try:
client = self.secondaries[self.current_secondary]
response = client.create_message_with_retry(**kwargs)
return response
except anthropic.RateLimitError:
# Passer à la secondaire suivante
self.current_secondary = (self.current_secondary + 1) % len(self.secondaries)
# Fallback sur la principale
return self.primary.create_message_with_retry(**kwargs)
Bon c’est pas gratuit (vous payez plusieurs abonnements), mais ça peut sauver les meubles pendant les pics.
Les erreurs que tout le monde fait
Erreur 1 : Estimer les tokens à l’arrache. Utilisez anthropic.count_tokens() si vous voulez être précis. Sinon la règle des 4 chars par token marche en moyenne.
Erreur 2 : Retry indéfiniment sans backoff max. J’ai vu des apps qui retry pendant 10 minutes. Mettez un max raisonnable (5 retries = ~30 secondes max).
Erreur 3 : Ignorer les autres erreurs API. Les timeouts (408), les 500, les 529 (overloaded), ça arrive aussi. Votre logique de retry doit les gérer.
Erreur 4 : Oublier que TPD existe. Vous pouvez respecter RPM et TPM mais quand même exploser votre quota journalier à 18h. Trackez votre usage cumulé.
Un petit helper que j’utilise pour ça :
class DailyTokenTracker:
def __init__(self, limit: int):
self.limit = limit
self.used_today = 0
self.current_day = time.strftime("%Y-%m-%d")
def check_and_update(self, tokens: int) -> bool:
today = time.strftime("%Y-%m-%d")
if today != self.current_day:
# Nouveau jour, reset
self.used_today = 0
self.current_day = today
if self.used_today + tokens > self.limit:
return False # Quota dépassé
self.used_today += tokens
return True
Monitoring : ce qu’il faut tracker
En prod, vous devez logger ces métriques :
- Nombre de 429 par heure (si ça monte, vous êtes à la limite)
- Latence moyenne des requêtes (avec vs sans retry)
- Distribution des temps d’attente sur retry
- Usage tokens par heure vs quota disponible
Personnellement j’envoie tout ça dans Grafana avec des alertes quand je dépasse 80% du quota. Ça me laisse le temps de réagir avant que ça casse.
Voici un exemple simple avec des logs structurés :
import logging
import json
logger = logging.getLogger(__name__)
def log_api_call(tokens_used: int, retries: int, duration: float):
logger.info(json.dumps({
"event": "claude_api_call",
"tokens": tokens_used,
"retries": retries,
"duration_ms": duration * 1000
}))
Vous parsez ces logs avec votre stack de monitoring habituelle.
Pour finir
Les rate limits c’est chiant mais c’est gérable. Le secret c’est de les anticiper plutôt que de les subir. Implémentez du rate limiting côté client, des retries intelligents, et du monitoring décent.
Ah et montez en tier rapidement si vous êtes sérieux. La différence entre Tier 1 et Tier 2 (juste 5$ de conso) c’est x20 sur les requêtes et x2 sur les tokens. Ça vaut vraiment le coup.
Le code complet avec gestion d’erreurs et monitoring est dispo dans mes gists : github.com/askjeanclaude (lien fictif mais vous avez l’idée).
Bonne intégration, et que vos 429 soient rares.