Error Recovery
Strategies for building resilient integrations that gracefully handle failures.
Error Categories​
Transient Errors (Retry)​
Temporary issues that resolve themselves:
| HTTP Code | Meaning | Strategy |
|---|---|---|
| 429 | Rate Limited | Retry with backoff |
| 500 | Server Error | Retry 3 times |
| 502 | Bad Gateway | Retry 3 times |
| 503 | Service Unavailable | Retry with longer delay |
| 504 | Gateway Timeout | Retry 3 times |
Permanent Errors (Don't Retry)​
Issues requiring manual intervention:
| HTTP Code | Meaning | Strategy |
|---|---|---|
| 400 | Bad Request | Fix data, resubmit |
| 401 | Unauthorized | Check API key |
| 403 | Forbidden | Check permissions |
| 404 | Not Found | Check endpoint/ID |
| 422 | Validation Error | Fix invoice, resubmit |
Retry Implementation​
Exponential Backoff​
import time
import random
from functools import wraps
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2,
jitter: bool = True
):
"""Decorator for retry with exponential backoff."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
while True:
try:
return func(*args, **kwargs)
except RetryableError as e:
retries += 1
if retries > max_retries:
raise MaxRetriesExceeded(
f"Failed after {max_retries} retries: {e}"
)
# Calculate delay
delay = min(
base_delay * (exponential_base ** (retries - 1)),
max_delay
)
# Add jitter to prevent thundering herd
if jitter:
delay *= (0.5 + random.random())
logger.warning(
f"Retry {retries}/{max_retries} after {delay:.2f}s: {e}"
)
time.sleep(delay)
except PermanentError:
raise # Don't retry
return wrapper
return decorator
Using the Decorator​
class RetryableError(Exception):
"""Error that should be retried."""
pass
class PermanentError(Exception):
"""Error that should not be retried."""
pass
@retry_with_backoff(max_retries=3, base_delay=2.0)
def send_invoice(invoice_data: dict) -> dict:
"""Send invoice with automatic retry."""
response = requests.post(
"https://app.goroute.ai/peppol-api/api/v1/send",
headers={"X-API-Key": api_key},
json=invoice_data,
timeout=30
)
if response.status_code == 429:
# Rate limited - extract retry-after if available
retry_after = int(response.headers.get("Retry-After", 60))
raise RetryableError(f"Rate limited, retry after {retry_after}s")
if response.status_code >= 500:
raise RetryableError(f"Server error: {response.status_code}")
if response.status_code == 400:
raise PermanentError(f"Validation error: {response.json()}")
response.raise_for_status()
return response.json()
With tenacity Library​
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log
)
import logging
logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(RetryableError),
before_sleep=before_sleep_log(logger, logging.WARNING)
)
def send_with_tenacity(invoice_data: dict) -> dict:
"""Send invoice using tenacity for retries."""
response = requests.post(url, json=invoice_data)
if response.status_code in [429, 500, 502, 503, 504]:
raise RetryableError(f"Status {response.status_code}")
response.raise_for_status()
return response.json()
Circuit Breaker Pattern​
Prevent cascading failures:
from datetime import datetime, timedelta
from enum import Enum
from threading import Lock
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
"""Circuit breaker for API calls."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = timedelta(seconds=recovery_timeout)
self.expected_exception = expected_exception
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.lock = Lock()
def call(self, func, *args, **kwargs):
"""Execute function through circuit breaker."""
with self.lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitOpenError(
f"Circuit open, retry after {self.recovery_timeout}"
)
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _should_attempt_reset(self) -> bool:
return (
self.last_failure_time and
datetime.now() - self.last_failure_time >= self.recovery_timeout
)
def _on_success(self):
with self.lock:
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
with self.lock:
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Usage
circuit = CircuitBreaker(failure_threshold=5, recovery_timeout=60)
def send_invoice_with_circuit(invoice):
return circuit.call(send_invoice, invoice)
Dead Letter Queue​
Handle unprocessable messages:
import json
import redis
from datetime import datetime
class DeadLetterQueue:
"""Store failed messages for later analysis."""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.queue_key = "dead_letter_queue"
def add(self, message: dict, error: str, retry_count: int):
"""Add failed message to dead letter queue."""
entry = {
"message": message,
"error": str(error),
"retry_count": retry_count,
"failed_at": datetime.utcnow().isoformat(),
"status": "pending"
}
self.redis.lpush(self.queue_key, json.dumps(entry))
# Alert if queue is getting large
queue_size = self.redis.llen(self.queue_key)
if queue_size > 100:
self._alert_ops_team(queue_size)
def process_dlq(self, handler):
"""Process dead letter queue items."""
while True:
item = self.redis.rpop(self.queue_key)
if not item:
break
entry = json.loads(item)
try:
handler(entry)
except Exception as e:
# Put back for manual review
entry["last_process_error"] = str(e)
entry["status"] = "requires_review"
self.redis.lpush(self.queue_key, json.dumps(entry))
# Usage
dlq = DeadLetterQueue(redis.Redis())
def send_with_dlq(invoice: dict, max_retries: int = 3):
"""Send invoice with dead letter queue fallback."""
retry_count = 0
while retry_count < max_retries:
try:
return send_invoice(invoice)
except RetryableError:
retry_count += 1
time.sleep(2 ** retry_count)
except PermanentError as e:
# Don't retry, add to DLQ
dlq.add(invoice, e, retry_count)
raise
# Max retries exceeded
dlq.add(invoice, "Max retries exceeded", retry_count)
raise MaxRetriesExceeded(f"Failed after {max_retries} attempts")
Error Logging and Alerting​
Structured Logging​
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
"""Format logs as JSON for easy parsing."""
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
}
# Add extra fields
if hasattr(record, "transaction_id"):
log_entry["transaction_id"] = record.transaction_id
if hasattr(record, "error_code"):
log_entry["error_code"] = record.error_code
return json.dumps(log_entry)
# Setup logger
logger = logging.getLogger("peppol")
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
# Usage
def send_invoice(invoice):
try:
response = api_call(invoice)
logger.info(
"Invoice sent successfully",
extra={
"transaction_id": response["transaction_id"],
"receiver": invoice["receiver"]["identifier"]
}
)
return response
except Exception as e:
logger.error(
f"Failed to send invoice: {e}",
extra={
"invoice_id": invoice.get("id"),
"error_code": getattr(e, "code", "unknown")
}
)
raise
Alerting​
import requests
from functools import wraps
def alert_on_failure(func):
"""Decorator to send alerts on repeated failures."""
failure_count = {}
@wraps(func)
def wrapper(*args, **kwargs):
key = func.__name__
try:
result = func(*args, **kwargs)
failure_count[key] = 0 # Reset on success
return result
except Exception as e:
failure_count[key] = failure_count.get(key, 0) + 1
if failure_count[key] >= 5:
send_alert(
title=f"Repeated failures in {func.__name__}",
message=f"Failed {failure_count[key]} times: {e}",
severity="high"
)
failure_count[key] = 0 # Reset after alert
raise
return wrapper
def send_alert(title: str, message: str, severity: str = "medium"):
"""Send alert to ops channel."""
# Slack webhook
requests.post(
os.environ["SLACK_WEBHOOK_URL"],
json={
"text": f"*{severity.upper()}* - {title}",
"attachments": [{"text": message}]
}
)
Recovery Strategies​
Automatic Reprocessing​
from celery import Celery
app = Celery('tasks')
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60
)
def process_invoice(self, invoice_data):
"""Celery task with automatic retry."""
try:
return send_invoice(invoice_data)
except RetryableError as e:
raise self.retry(exc=e)
except PermanentError:
# Store for manual review
store_failed_invoice(invoice_data)
raise
Manual Recovery Queue​
class RecoveryQueue:
"""Queue for invoices requiring manual intervention."""
def __init__(self, db_session):
self.db = db_session
def add_for_review(
self,
invoice_id: str,
error: str,
suggested_action: str
):
"""Add invoice to review queue."""
self.db.execute(
"""
INSERT INTO recovery_queue
(invoice_id, error, suggested_action, created_at, status)
VALUES (%s, %s, %s, NOW(), 'pending')
""",
(invoice_id, error, suggested_action)
)
self.db.commit()
def get_pending(self) -> list:
"""Get all pending items for review."""
result = self.db.execute(
"""
SELECT * FROM recovery_queue
WHERE status = 'pending'
ORDER BY created_at
"""
)
return result.fetchall()
def resolve(self, item_id: str, action: str, notes: str):
"""Mark item as resolved."""
self.db.execute(
"""
UPDATE recovery_queue
SET status = 'resolved',
resolution_action = %s,
resolution_notes = %s,
resolved_at = NOW()
WHERE id = %s
""",
(action, notes, item_id)
)
self.db.commit()
Best Practices Summary​
| Strategy | When to Use |
|---|---|
| Retry with backoff | Transient errors (429, 5xx) |
| Circuit breaker | Prevent cascade failures |
| Dead letter queue | Store unprocessable items |
| Idempotency keys | Prevent duplicates on retry |
| Structured logging | Enable debugging and analysis |
| Alerting | Notify on repeated failures |