Skip to main content

Error Recovery

Strategies for building resilient integrations that gracefully handle failures.

Error Categories​

Transient Errors (Retry)​

Temporary issues that resolve themselves:

HTTP CodeMeaningStrategy
429Rate LimitedRetry with backoff
500Server ErrorRetry 3 times
502Bad GatewayRetry 3 times
503Service UnavailableRetry with longer delay
504Gateway TimeoutRetry 3 times

Permanent Errors (Don't Retry)​

Issues requiring manual intervention:

HTTP CodeMeaningStrategy
400Bad RequestFix data, resubmit
401UnauthorizedCheck API key
403ForbiddenCheck permissions
404Not FoundCheck endpoint/ID
422Validation ErrorFix invoice, resubmit

Retry Implementation​

Exponential Backoff​

import time
import random
from functools import wraps

def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2,
jitter: bool = True
):
"""Decorator for retry with exponential backoff."""

def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0

while True:
try:
return func(*args, **kwargs)

except RetryableError as e:
retries += 1

if retries > max_retries:
raise MaxRetriesExceeded(
f"Failed after {max_retries} retries: {e}"
)

# Calculate delay
delay = min(
base_delay * (exponential_base ** (retries - 1)),
max_delay
)

# Add jitter to prevent thundering herd
if jitter:
delay *= (0.5 + random.random())

logger.warning(
f"Retry {retries}/{max_retries} after {delay:.2f}s: {e}"
)
time.sleep(delay)

except PermanentError:
raise # Don't retry

return wrapper
return decorator

Using the Decorator​

class RetryableError(Exception):
"""Error that should be retried."""
pass

class PermanentError(Exception):
"""Error that should not be retried."""
pass

@retry_with_backoff(max_retries=3, base_delay=2.0)
def send_invoice(invoice_data: dict) -> dict:
"""Send invoice with automatic retry."""

response = requests.post(
"https://app.goroute.ai/peppol-api/api/v1/send",
headers={"X-API-Key": api_key},
json=invoice_data,
timeout=30
)

if response.status_code == 429:
# Rate limited - extract retry-after if available
retry_after = int(response.headers.get("Retry-After", 60))
raise RetryableError(f"Rate limited, retry after {retry_after}s")

if response.status_code >= 500:
raise RetryableError(f"Server error: {response.status_code}")

if response.status_code == 400:
raise PermanentError(f"Validation error: {response.json()}")

response.raise_for_status()
return response.json()

With tenacity Library​

from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log
)
import logging

logger = logging.getLogger(__name__)

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(RetryableError),
before_sleep=before_sleep_log(logger, logging.WARNING)
)
def send_with_tenacity(invoice_data: dict) -> dict:
"""Send invoice using tenacity for retries."""
response = requests.post(url, json=invoice_data)

if response.status_code in [429, 500, 502, 503, 504]:
raise RetryableError(f"Status {response.status_code}")

response.raise_for_status()
return response.json()

Circuit Breaker Pattern​

Prevent cascading failures:

from datetime import datetime, timedelta
from enum import Enum
from threading import Lock

class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing recovery

class CircuitBreaker:
"""Circuit breaker for API calls."""

def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = timedelta(seconds=recovery_timeout)
self.expected_exception = expected_exception

self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.lock = Lock()

def call(self, func, *args, **kwargs):
"""Execute function through circuit breaker."""

with self.lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitOpenError(
f"Circuit open, retry after {self.recovery_timeout}"
)

try:
result = func(*args, **kwargs)
self._on_success()
return result

except self.expected_exception as e:
self._on_failure()
raise

def _should_attempt_reset(self) -> bool:
return (
self.last_failure_time and
datetime.now() - self.last_failure_time >= self.recovery_timeout
)

def _on_success(self):
with self.lock:
self.failure_count = 0
self.state = CircuitState.CLOSED

def _on_failure(self):
with self.lock:
self.failure_count += 1
self.last_failure_time = datetime.now()

if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN

# Usage
circuit = CircuitBreaker(failure_threshold=5, recovery_timeout=60)

def send_invoice_with_circuit(invoice):
return circuit.call(send_invoice, invoice)

Dead Letter Queue​

Handle unprocessable messages:

import json
import redis
from datetime import datetime

class DeadLetterQueue:
"""Store failed messages for later analysis."""

def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.queue_key = "dead_letter_queue"

def add(self, message: dict, error: str, retry_count: int):
"""Add failed message to dead letter queue."""
entry = {
"message": message,
"error": str(error),
"retry_count": retry_count,
"failed_at": datetime.utcnow().isoformat(),
"status": "pending"
}

self.redis.lpush(self.queue_key, json.dumps(entry))

# Alert if queue is getting large
queue_size = self.redis.llen(self.queue_key)
if queue_size > 100:
self._alert_ops_team(queue_size)

def process_dlq(self, handler):
"""Process dead letter queue items."""
while True:
item = self.redis.rpop(self.queue_key)
if not item:
break

entry = json.loads(item)
try:
handler(entry)
except Exception as e:
# Put back for manual review
entry["last_process_error"] = str(e)
entry["status"] = "requires_review"
self.redis.lpush(self.queue_key, json.dumps(entry))

# Usage
dlq = DeadLetterQueue(redis.Redis())

def send_with_dlq(invoice: dict, max_retries: int = 3):
"""Send invoice with dead letter queue fallback."""
retry_count = 0

while retry_count < max_retries:
try:
return send_invoice(invoice)
except RetryableError:
retry_count += 1
time.sleep(2 ** retry_count)
except PermanentError as e:
# Don't retry, add to DLQ
dlq.add(invoice, e, retry_count)
raise

# Max retries exceeded
dlq.add(invoice, "Max retries exceeded", retry_count)
raise MaxRetriesExceeded(f"Failed after {max_retries} attempts")

Error Logging and Alerting​

Structured Logging​

import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
"""Format logs as JSON for easy parsing."""

def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
}

# Add extra fields
if hasattr(record, "transaction_id"):
log_entry["transaction_id"] = record.transaction_id
if hasattr(record, "error_code"):
log_entry["error_code"] = record.error_code

return json.dumps(log_entry)

# Setup logger
logger = logging.getLogger("peppol")
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)

# Usage
def send_invoice(invoice):
try:
response = api_call(invoice)
logger.info(
"Invoice sent successfully",
extra={
"transaction_id": response["transaction_id"],
"receiver": invoice["receiver"]["identifier"]
}
)
return response
except Exception as e:
logger.error(
f"Failed to send invoice: {e}",
extra={
"invoice_id": invoice.get("id"),
"error_code": getattr(e, "code", "unknown")
}
)
raise

Alerting​

import requests
from functools import wraps

def alert_on_failure(func):
"""Decorator to send alerts on repeated failures."""
failure_count = {}

@wraps(func)
def wrapper(*args, **kwargs):
key = func.__name__

try:
result = func(*args, **kwargs)
failure_count[key] = 0 # Reset on success
return result

except Exception as e:
failure_count[key] = failure_count.get(key, 0) + 1

if failure_count[key] >= 5:
send_alert(
title=f"Repeated failures in {func.__name__}",
message=f"Failed {failure_count[key]} times: {e}",
severity="high"
)
failure_count[key] = 0 # Reset after alert

raise

return wrapper

def send_alert(title: str, message: str, severity: str = "medium"):
"""Send alert to ops channel."""
# Slack webhook
requests.post(
os.environ["SLACK_WEBHOOK_URL"],
json={
"text": f"*{severity.upper()}* - {title}",
"attachments": [{"text": message}]
}
)

Recovery Strategies​

Automatic Reprocessing​

from celery import Celery

app = Celery('tasks')

@app.task(
bind=True,
max_retries=3,
default_retry_delay=60
)
def process_invoice(self, invoice_data):
"""Celery task with automatic retry."""
try:
return send_invoice(invoice_data)
except RetryableError as e:
raise self.retry(exc=e)
except PermanentError:
# Store for manual review
store_failed_invoice(invoice_data)
raise

Manual Recovery Queue​

class RecoveryQueue:
"""Queue for invoices requiring manual intervention."""

def __init__(self, db_session):
self.db = db_session

def add_for_review(
self,
invoice_id: str,
error: str,
suggested_action: str
):
"""Add invoice to review queue."""
self.db.execute(
"""
INSERT INTO recovery_queue
(invoice_id, error, suggested_action, created_at, status)
VALUES (%s, %s, %s, NOW(), 'pending')
""",
(invoice_id, error, suggested_action)
)
self.db.commit()

def get_pending(self) -> list:
"""Get all pending items for review."""
result = self.db.execute(
"""
SELECT * FROM recovery_queue
WHERE status = 'pending'
ORDER BY created_at
"""
)
return result.fetchall()

def resolve(self, item_id: str, action: str, notes: str):
"""Mark item as resolved."""
self.db.execute(
"""
UPDATE recovery_queue
SET status = 'resolved',
resolution_action = %s,
resolution_notes = %s,
resolved_at = NOW()
WHERE id = %s
""",
(action, notes, item_id)
)
self.db.commit()

Best Practices Summary​

StrategyWhen to Use
Retry with backoffTransient errors (429, 5xx)
Circuit breakerPrevent cascade failures
Dead letter queueStore unprocessable items
Idempotency keysPrevent duplicates on retry
Structured loggingEnable debugging and analysis
AlertingNotify on repeated failures

Next Steps​